Class SnowflakeIO

java.lang.Object
org.apache.beam.sdk.io.snowflake.SnowflakeIO

public class SnowflakeIO extends Object
IO to read and write data on Snowflake.

SnowflakeIO uses Snowflake JDBC driver under the hood, but data isn't read/written using JDBC directly. Instead, SnowflakeIO uses dedicated COPY operations to read/write data from/to a cloud bucket. By now only Google Cloud Storage is supported.

To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a SnowflakeIO.DataSourceConfiguration using SnowflakeIO.DataSourceConfiguration.create(). Additionally one of SnowflakeIO.DataSourceConfiguration.withServerName(String) or SnowflakeIO.DataSourceConfiguration.withUrl(String) must be used to tell SnowflakeIO which instance to use.
There are also other options available to configure connection to Snowflake:

For example:


 SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
     SnowflakeIO.DataSourceConfiguration.create()
         .withUsernamePasswordAuth(username, password)
         .withServerName(options.getServerName())
         .withWarehouse(options.getWarehouse())
         .withDatabase(options.getDatabase())
         .withSchema(options.getSchema());
 

Reading from Snowflake

SnowflakeIO.Read returns a bounded collection of T as a PCollection<T>. T is the type returned by the provided SnowflakeIO.CsvMapper.

For example


 PCollection<GenericRecord> items = pipeline.apply(
  SnowflakeIO.<GenericRecord>read()
    .withDataSourceConfiguration(dataSourceConfiguration)
    .fromQuery(QUERY)
    .withStagingBucketName(...)
    .withStorageIntegrationName(...)
    .withCsvMapper(...)
    .withCoder(...));
 

Important When reading data from Snowflake, temporary CSV files are created on the specified stagingBucketName in directory named `sf_copy_csv_[RANDOM CHARS]_[TIMESTAMP]`. This directory and all the files are cleaned up automatically by default, but in case of failed pipeline they may remain and will have to be cleaned up manually.

Writing to Snowflake

SnowflakeIO.Write supports writing records into a database. It writes a PCollection to the database by converting each T into a Object via a user-provided SnowflakeIO.UserDataMapper.

For example


 items.apply(
     SnowflakeIO.<KV<Integer, String>>write()
         .withDataSourceConfiguration(dataSourceConfiguration)
         .withStagingBucketName(...)
         .withStorageIntegrationName(...)
         .withUserDataMapper(maper)
         .to(table);
 

Important When writing data to Snowflake, firstly data will be saved as CSV files on specified stagingBucketName in directory named 'data' and then into Snowflake.

  • Constructor Details

    • SnowflakeIO

      public SnowflakeIO()
  • Method Details