Class SnowflakeIO
SnowflakeIO uses Snowflake JDBC driver under the hood, but data isn't read/written using JDBC directly. Instead, SnowflakeIO uses dedicated COPY operations to read/write data from/to a cloud bucket. By now only Google Cloud Storage is supported.
To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
SnowflakeIO.DataSourceConfiguration
using SnowflakeIO.DataSourceConfiguration.create()
. Additionally one
of SnowflakeIO.DataSourceConfiguration.withServerName(String)
or SnowflakeIO.DataSourceConfiguration.withUrl(String)
must be used to tell SnowflakeIO which instance to use.
There are also other options available to configure connection to Snowflake:
SnowflakeIO.DataSourceConfiguration.withWarehouse(String)
to specify which Warehouse to useSnowflakeIO.DataSourceConfiguration.withDatabase(String)
to specify which Database to connect toSnowflakeIO.DataSourceConfiguration.withSchema(String)
to specify which schema to useSnowflakeIO.DataSourceConfiguration.withRole(String)
to specify which role to useSnowflakeIO.DataSourceConfiguration.withLoginTimeout(Integer)
to specify the timeout for the loginSnowflakeIO.DataSourceConfiguration.withPortNumber(Integer)
to specify custom port of Snowflake instance
For example:
SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
SnowflakeIO.DataSourceConfiguration.create()
.withUsernamePasswordAuth(username, password)
.withServerName(options.getServerName())
.withWarehouse(options.getWarehouse())
.withDatabase(options.getDatabase())
.withSchema(options.getSchema());
Reading from Snowflake
SnowflakeIO.Read returns a bounded collection of T
as a PCollection<T>
. T is
the type returned by the provided SnowflakeIO.CsvMapper
.
For example
PCollection<GenericRecord> items = pipeline.apply(
SnowflakeIO.<GenericRecord>read()
.withDataSourceConfiguration(dataSourceConfiguration)
.fromQuery(QUERY)
.withStagingBucketName(...)
.withStorageIntegrationName(...)
.withCsvMapper(...)
.withCoder(...));
Important When reading data from Snowflake, temporary CSV files are created on the specified stagingBucketName in directory named `sf_copy_csv_[RANDOM CHARS]_[TIMESTAMP]`. This directory and all the files are cleaned up automatically by default, but in case of failed pipeline they may remain and will have to be cleaned up manually.
Writing to Snowflake
SnowflakeIO.Write supports writing records into a database. It writes a PCollection
to
the database by converting each T into a Object
via a user-provided SnowflakeIO.UserDataMapper
.
For example
items.apply(
SnowflakeIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(dataSourceConfiguration)
.withStagingBucketName(...)
.withStorageIntegrationName(...)
.withUserDataMapper(maper)
.to(table);
Important When writing data to Snowflake, firstly data will be saved as CSV files on specified stagingBucketName in directory named 'data' and then into Snowflake.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
Combines list ofString
to provide oneString
with paths where files were staged for write.static interface
Interface for user-defined function mapping parts of CSV line into T.static class
A POJO describing aDataSource
, providing all properties allowing to create aDataSource
.static class
WrapsSnowflakeIO.DataSourceConfiguration
to provide DataSource.static class
Implementation ofread()
.static interface
Interface for user-defined function mapping T into array of Objects.static class
Implementation ofwrite()
. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <T> SnowflakeIO.Read
<T> read()
Read data from Snowflake via COPY statement using defaultSnowflakeBatchServiceImpl
.static <T> SnowflakeIO.Read
<T> read
(SnowflakeServices snowflakeServices) Read data from Snowflake via COPY statement using user-definedSnowflakeServices
.static <T> SnowflakeIO.Write
<T> write()
Write data to Snowflake via COPY statement.
-
Constructor Details
-
SnowflakeIO
public SnowflakeIO()
-
-
Method Details
-
read
Read data from Snowflake via COPY statement using user-definedSnowflakeServices
.- Type Parameters:
T
- Type of the data to be read.- Parameters:
snowflakeServices
- user-definedSnowflakeServices
-
read
Read data from Snowflake via COPY statement using defaultSnowflakeBatchServiceImpl
.- Type Parameters:
T
- Type of the data to be read.
-
write
Write data to Snowflake via COPY statement.- Type Parameters:
T
- Type of data to be written.
-