@Experimental public class SnowflakeIO extends java.lang.Object
SnowflakeIO uses Snowflake JDBC driver under the hood, but data isn't read/written using JDBC directly. Instead, SnowflakeIO uses dedicated COPY operations to read/write data from/to a cloud bucket. By now only Google Cloud Storage is supported.
To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
SnowflakeIO.DataSourceConfiguration
using SnowflakeIO.DataSourceConfiguration.create()
. Additionally one
of SnowflakeIO.DataSourceConfiguration.withServerName(String)
or SnowflakeIO.DataSourceConfiguration.withUrl(String)
must be used to tell SnowflakeIO which instance to use.
There are also other options available to configure connection to Snowflake:
SnowflakeIO.DataSourceConfiguration.withWarehouse(String)
to specify which Warehouse to use
SnowflakeIO.DataSourceConfiguration.withDatabase(String)
to specify which Database to connect
to
SnowflakeIO.DataSourceConfiguration.withSchema(String)
to specify which schema to use
SnowflakeIO.DataSourceConfiguration.withRole(String)
to specify which role to use
SnowflakeIO.DataSourceConfiguration.withLoginTimeout(Integer)
to specify the timeout for the
login
SnowflakeIO.DataSourceConfiguration.withPortNumber(Integer)
to specify custom port of Snowflake
instance
For example:
SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
SnowflakeIO.DataSourceConfiguration.create()
.withUsernamePasswordAuth(username, password)
.withServerName(options.getServerName())
.withWarehouse(options.getWarehouse())
.withDatabase(options.getDatabase())
.withSchema(options.getSchema());
SnowflakeIO.Read returns a bounded collection of T
as a PCollection<T>
. T is
the type returned by the provided SnowflakeIO.CsvMapper
.
For example
PCollection<GenericRecord> items = pipeline.apply(
SnowflakeIO.<GenericRecord>read()
.withDataSourceConfiguration(dataSourceConfiguration)
.fromQuery(QUERY)
.withStagingBucketName(...)
.withStorageIntegrationName(...)
.withCsvMapper(...)
.withCoder(...));
Important When reading data from Snowflake, temporary CSV files are created on the specified stagingBucketName in directory named `sf_copy_csv_[RANDOM CHARS]_[TIMESTAMP]`. This directory and all the files are cleaned up automatically by default, but in case of failed pipeline they may remain and will have to be cleaned up manually.
SnowflakeIO.Write supports writing records into a database. It writes a PCollection
to
the database by converting each T into a Object[]
via a user-provided SnowflakeIO.UserDataMapper
.
For example
items.apply(
SnowflakeIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(dataSourceConfiguration)
.withStagingBucketName(...)
.withStorageIntegrationName(...)
.withUserDataMapper(maper)
.to(table);
Important When writing data to Snowflake, firstly data will be saved as CSV files on specified stagingBucketName in directory named 'data' and then into Snowflake.
Modifier and Type | Class and Description |
---|---|
static class |
SnowflakeIO.Concatenate
Combines list of
String to provide one String with paths where files were
staged for write. |
static interface |
SnowflakeIO.CsvMapper<T>
Interface for user-defined function mapping parts of CSV line into T.
|
static class |
SnowflakeIO.DataSourceConfiguration
A POJO describing a
DataSource , providing all properties allowing to create a DataSource . |
static class |
SnowflakeIO.DataSourceProviderFromDataSourceConfiguration
Wraps
SnowflakeIO.DataSourceConfiguration to provide DataSource. |
static class |
SnowflakeIO.Read<T>
Implementation of
read() . |
static interface |
SnowflakeIO.UserDataMapper<T>
Interface for user-defined function mapping T into array of Objects.
|
static class |
SnowflakeIO.Write<T>
Implementation of
write() . |
Constructor and Description |
---|
SnowflakeIO() |
Modifier and Type | Method and Description |
---|---|
static <T> SnowflakeIO.Read<T> |
read()
Read data from Snowflake via COPY statement using default
SnowflakeBatchServiceImpl . |
static <T> SnowflakeIO.Read<T> |
read(SnowflakeService snowflakeService)
Read data from Snowflake via COPY statement using user-defined
SnowflakeService . |
static <T> SnowflakeIO.Write<T> |
write()
Write data to Snowflake via COPY statement.
|
public static <T> SnowflakeIO.Read<T> read(SnowflakeService snowflakeService)
SnowflakeService
.T
- Type of the data to be read.snowflakeService
- user-defined SnowflakeService
public static <T> SnowflakeIO.Read<T> read()
SnowflakeBatchServiceImpl
.T
- Type of the data to be read.public static <T> SnowflakeIO.Write<T> write()
T
- Type of data to be written.