Built-in I/O Transforms

Snowflake I/O

Pipeline options and general information about using and running Snowflake IO.

Before you start

To use SnowflakeIO, add the Maven artifact dependency to your pom.xml file.

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-snowflake</artifactId>
    <version>2.61.0</version>
</dependency>

Additional resources:

Authentication

Reading and batch writing supports the following authentication methods:

Streaming writing supports only key pair authentication. For details, see: BEAM-3304.

Passing credentials is done via Pipeline options used to instantiate SnowflakeIO.DataSourceConfiguration class. Each authentication method has different ways to configure this class.

Username and password

To use username/password authentication in SnowflakeIO, invoke your pipeline with the following Pipeline options:

--username=<USERNAME> --password=<PASSWORD>

Passing credentials is done via Pipeline options used to instantiate SnowflakeIO.DataSourceConfiguration class.

SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create()
        .withUsernamePasswordAuth(
                options.getUsername(),
                options.getPassword())
        .withServerName(options.getServerName())
        .withDatabase(options.getDatabase())
        .withRole(options.getRole())
        .withWarehouse(options.getWarehouse())
        .withSchema(options.getSchema());

Key pair

To use this authentication method, you must first generate a key pair and associate the public key with the Snowflake user that will connect using the IO transform. For instructions, see the Key Pair Authentication & Key Pair Rotation in Snowflake documentation.

To use key pair authentication with SnowflakeIO, invoke your pipeline with one of the following set of Pipeline options:

OAuth token

SnowflakeIO also supports OAuth token.

IMPORTANT: SnowflakeIO requires a valid OAuth access token. It will neither be able to refresh the token nor obtain it using a web-based flow. For information on configuring an OAuth integration and obtaining the token, see the Snowflake documentation.

Once you have the token, invoke your pipeline with following Pipeline Options:

--oauthToken=<TOKEN>
The initialization of an SnowflakeIO.DataSourceConfiguration class may be as follows:
 SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration
            .create()
            .withUrl(options.getUrl())
            .withServerName(options.getServerName())
            .withDatabase(options.getDatabase())
            .withWarehouse(options.getWarehouse())
            .withSchema(options.getSchema());

DataSource Configuration

DataSource configuration is required in both read and write object for configuring Snowflake connection properties for IO purposes.

General usage

Create the DataSource configuration:

 SnowflakeIO.DataSourceConfiguration
            .create()
            .withUrl(options.getUrl())
            .withServerName(options.getServerName())
            .withDatabase(options.getDatabase())
            .withWarehouse(options.getWarehouse())
            .withSchema(options.getSchema());
Where parameters can be:

Note - either .withUrl(...) or .withServerName(...) is required.

Pipeline options

Use Beam’s Pipeline options to set options via the command line.

Snowflake Pipeline options

Snowflake IO library supports following options that can be passed via the command line by default when a Pipeline uses them:

--url Snowflake’s JDBC-like url including account name and region without any parameters.

--serverName Full server name with account, zone and domain.

--username Required for username/password and Private Key authentication.

--oauthToken Required for OAuth authentication only.

--password Required for username/password authentication only.

--privateKeyPath Path to Private Key file. Required for Private Key authentication only.

--rawPrivateKey Private Key. Required for Private Key authentication only.

--privateKeyPassphrase Private Key’s passphrase. Required for Private Key authentication only.

--stagingBucketName External bucket path ending with /. I.e. {gs,s3}://bucket/. Sub-directories are allowed.

--storageIntegrationName Storage integration name

--warehouse Warehouse to use. Optional.

--database Database name to connect to. Optional.

--schema Schema to use. Optional.

--table Table to use. Optional.

--query Query to use. Optional.

--role Role to use. Optional.

--authenticator Authenticator to use. Optional.

--portNumber Port number. Optional.

--loginTimeout Login timeout. Optional.

--snowPipe SnowPipe name. Optional.

Running main command with Pipeline options

To pass Pipeline options via the command line, use --args in a gradle command as follows:

./gradle run
    --args="
        --serverName=<SNOWFLAKE SERVER NAME>
           Example: --serverName=account.region.gcp.snowflakecomputing.com
        --username=<SNOWFLAKE USERNAME>
           Example: --username=testuser
        --password=<SNOWFLAKE PASSWORD>
           Example: --password=mypassword
        --database=<SNOWFLAKE DATABASE>
           Example: --database=TEST_DATABASE
        --schema=<SNOWFLAKE SCHEMA>
           Example: --schema=public
        --table=<SNOWFLAKE TABLE IN DATABASE>
           Example: --table=TEST_TABLE
        --query=<IF NOT TABLE THEN QUERY>
           Example: --query=‘SELECT column FROM TABLE’
        --storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>
           Example: --storageIntegrationName=my_integration
        --stagingBucketName=<GCS OR S3 BUCKET>
           Example: --stagingBucketName={gs,s3}://bucket/
        --runner=<DirectRunner/DataflowRunner>
           Example: --runner=DataflowRunner
        --project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME>
           Example: --project=my_project
        --tempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING
                        WITH gs://…>
           Example: --tempLocation=gs://bucket/temp/
        --region=<FOR DATAFLOW RUNNER: GCP REGION>
           Example: --region=us-east-1
        --appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>
           Example: --appName=my_job"
Then in the code it is possible to access the parameters with arguments using the options.getStagingBucketName() command.

Running test command with Pipeline options

To pass Pipeline options via the command line, use -DintegrationTestPipelineOptions in a gradle command as follows:

./gradlew test --tests nameOfTest
-DintegrationTestPipelineOptions='[
  "--serverName=<SNOWFLAKE SERVER NAME>",
      Example: --serverName=account.region.gcp.snowflakecomputing.com
  "--username=<SNOWFLAKE USERNAME>",
      Example: --username=testuser
  "--password=<SNOWFLAKE PASSWORD>",
      Example: --password=mypassword
  "--schema=<SNOWFLAKE SCHEMA>",
      Example: --schema=PUBLIC
  "--table=<SNOWFLAKE TABLE IN DATABASE>",
      Example: --table=TEST_TABLE
  "--database=<SNOWFLAKE DATABASE>",
      Example: --database=TEST_DATABASE
  "--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>",
      Example: --storageIntegrationName=my_integration
  "--stagingBucketName=<GCS OR S3 BUCKET>",
      Example: --stagingBucketName={gs,s3}://bucket
  "--externalLocation=<GCS BUCKET URL STARTING WITH GS://>",
      Example: --tempLocation=gs://bucket/temp/
]' --no-build-cache

Where all parameters are starting with “–”, they are surrounded with double quotation and separated with comma:

Running pipelines on Dataflow

By default, pipelines are run on Direct Runner on your local machine. To run a pipeline on Google Dataflow, you must provide the following Pipeline options:

More pipeline options for Dataflow can be found here.

Note: To properly authenticate with Google Cloud, please use gcloud or follow the Google Cloud documentation.

Important: Please acknowledge Google Dataflow pricing

Running pipeline templates on Dataflow

Google Dataflow is supporting template creation which means staging pipelines on Cloud Storage and running them with ability to pass runtime parameters that are only available during pipeline execution.

The process of creating own Dataflow template is following

  1. Create your own pipeline.
  2. Create Dataflow template with checking which options SnowflakeIO is supporting at runtime.
  3. Run a Dataflow template using Cloud Console, REST API or gcloud.

Currently, SnowflakeIO supports following options at runtime:

Currently, SnowflakeIO doesn’t support following options at runtime:

Writing to Snowflake tables

One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user’s PCollection to your Snowflake database.

Batch write (from a bounded source)

The basic .write() operation usage is as follows:

data.apply(
   SnowflakeIO.<type>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
)
Replace type with the data type of the PCollection object to write; for example, SnowflakeIO.<String> for an input PCollection of Strings.

All the below parameters are required:

Note: SnowflakeIO uses COPY statements behind the scenes to write (using COPY to table). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path.

Optional for batching:

Streaming write (from unbounded source)

It is required to create a SnowPipe in the Snowflake console. SnowPipe should use the same integration and the same bucket as specified by .withStagingBucketName and .withStorageIntegrationName methods. The write operation might look as follows:

data.apply(
   SnowflakeIO.<type>write()
      .withStagingBucketName("BUCKET")
      .withStorageIntegrationName("STORAGE INTEGRATION NAME")
      .withDataSourceConfiguration(dc)
      .withUserDataMapper(mapper)
      .withSnowPipe("MY_SNOW_PIPE")
      .withFlushTimeLimit(Duration.millis(time))
      .withFlushRowLimit(rowsNumber)
      .withShardsNumber(shardsNumber)
)

Parameters

Required for streaming:

Note: this is important to provide schema and database names.

Note:

As mentioned before SnowflakeIO uses SnowPipe REST calls behind the scenes for writing from unbounded sources. StagingBucketName will be used to save CSV files which will end up in Snowflake. SnowflakeIO is not going to delete created CSV files from path under the “stagingBucketName” either during or after finishing streaming.

Optional for streaming:

Important notice:

  1. Streaming accepts only key pair authentication. For details, see: Issue 21287.
  2. The role parameter configured in SnowflakeIO.DataSourceConfiguration object is ignored for streaming writing. For details, see: Issue 21365

Flush time: duration & number of rows

Duration: streaming write will write periodically files on stage according to time duration specified in flush time limit (for example. every 1 minute).

Number of rows: files staged for write will have number of rows specified in flush row limit unless the flush time limit will be reached (for example if the limit is 1000 rows and buffer collected 99 rows and the 1-minute flush time passes, the rows will be sent to SnowPipe for insertion).

Size of staged files will depend on the rows size and used compression (GZIP).

UserDataMapper function

The UserDataMapper function is required to map data from a PCollection to an array of String values before the write() operation saves the data to temporary .csv files. For example:

public static SnowflakeIO.UserDataMapper<Long> getCsvMapper() {
    return (SnowflakeIO.UserDataMapper<Long>) recordLine -> new String[] {recordLine.toString()};
}

Additional write options

Transformation query

The .withQueryTransformation() option for the write() operation accepts a SQL query as a String value, which will be performed while transfering data staged in CSV files directly to the target Snowflake table. For information about the transformation SQL syntax, see the Snowflake Documentation.

Usage:

String query = "SELECT t.$1 from YOUR_TABLE;";
data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withQueryTransformation(query)
)

Write disposition

Define the write behaviour based on the table where data will be written to by specifying the .withWriteDisposition(...) option for the write() operation. The following values are supported:

Example of usage:

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withWriteDisposition(TRUNCATE)
)

Create disposition

The .withCreateDisposition() option defines the behavior of the write operation if the target table does not exist . The following values are supported:

Usage:

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withCreateDisposition(CREATE_NEVER)
)

Table schema disposition

When the .withCreateDisposition() option is set to CREATE_IF_NEEDED, the .withTableSchema() option enables specifying the schema for the created target table. A table schema is a list of SnowflakeColumn objects with name and type corresponding to column type for each column in the table.

Usage:

SnowflakeTableSchema tableSchema =
    new SnowflakeTableSchema(
        SnowflakeColumn.of("my_date", new SnowflakeDate(), true),
        new SnowflakeColumn("id", new SnowflakeNumber()),
        SnowflakeColumn.of("name", new SnowflakeText(), true));

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withTableSchema(tableSchema)
)

Reading from Snowflake

One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a PCollection of user-defined data type.

General usage

The basic .read() operation usage:

PCollection<USER_DATA_TYPE> items = pipeline.apply(
   SnowflakeIO.<USER_DATA_TYPE>read()
       .withDataSourceConfiguration(dc)
       .fromTable("MY_TABLE") // or .fromQuery("QUERY")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withCsvMapper(mapper)
       .withCoder(coder));
)
Where all below parameters are required:

Note: SnowflakeIO uses COPY statements behind the scenes to read (using COPY to location) files staged in cloud storage.StagingBucketName will be used as a temporary location for storing CSV files. Those temporary directories will be named sf_copy_csv_DATE_TIME_RANDOMSUFFIX and they will be removed automatically once Read operation finishes.

CSVMapper

SnowflakeIO uses a COPY INTO statement to move data from a Snowflake table to GCS/S3 as CSV files. These files are then downloaded via FileIO and processed line by line. Each line is split into an array of Strings using the OpenCSV library.

The CSVMapper’s job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom POJO.

Example implementation of CsvMapper for GenericRecord:

static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
   return (SnowflakeIO.CsvMapper<GenericRecord>)
           parts -> {
               return new GenericRecordBuilder(PARQUET_SCHEMA)
                       .set("ID", Long.valueOf(parts[0]))
                       .set("NAME", parts[1])
                       [...]
                       .build();
           };
}

Using SnowflakeIO with AWS S3

To be able to use AWS S3 bucket as stagingBucketName is required to:

  1. Create PipelineOptions interface which is extending SnowflakePipelineOptions and S3Options with AwsAccessKey and AwsSecretKey options. Example:

public interface AwsPipelineOptions extends SnowflakePipelineOptions, S3Options {

    @Description("AWS Access Key")
    @Default.String("access_key")
    String getAwsAccessKey();

    void setAwsAccessKey(String awsAccessKey);

    @Description("AWS secret key")
    @Default.String("secret_key")
    String getAwsSecretKey();

    void setAwsSecretKey(String awsSecretKey);
}
2. Set AwsCredentialsProvider option by using AwsAccessKey and AwsSecretKey options.

options.setAwsCredentialsProvider(
    new AWSStaticCredentialsProvider(
        new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey())
    )
);
3. Create pipeline

Pipeline p = Pipeline.create(options);

Note: Remember to set awsRegion from S3Options.

Using SnowflakeIO in Python SDK

Intro

Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to cross-language which is part of Portability Framework Roadmap which aims to provide full interoperability across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).

For more information about cross-language please see multi sdk efforts and Cross-language transforms API and expansion service articles.

Additional resources:

Reading from Snowflake

One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a PCollection of user-defined data type.

General usage

OPTIONS = ["--runner=FlinkRunner"]

with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
   (p
       | ReadFromSnowflake(...)
       | <FURTHER TRANSFORMS>)

Required parameters

Authentication parameters

It’s required to pass one of the following combinations of valid parameters for authentication:

Additional parameters

Writing to Snowflake

One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user’s PCollection to your Snowflake database.

General usage

OPTIONS = ["--runner=FlinkRunner"]

with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
   (p
       | <SOURCE OF DATA>
       | WriteToSnowflake(
           server_name=<SNOWFLAKE SERVER NAME>,
           username=<SNOWFLAKE USERNAME>,
           password=<SNOWFLAKE PASSWORD>,
           o_auth_token=<OAUTH TOKEN>,
           private_key_path=<PATH TO P8 FILE>,
           raw_private_key=<PRIVATE_KEY>
           private_key_passphrase=<PASSWORD FOR KEY>,
           schema=<SNOWFLAKE SCHEMA>,
           database=<SNOWFLAKE DATABASE>,
           staging_bucket_name=<GCS OR S3 BUCKET>,
           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
           create_disposition=<CREATE DISPOSITION>,
           write_disposition=<WRITE DISPOSITION>,
           table_schema=<SNOWFLAKE TABLE SCHEMA>,
           user_data_mapper=<USER DATA MAPPER FUNCTION>,
           table=<SNOWFLAKE TABLE>,
           query=<IF NOT TABLE THEN QUERY>,
           role=<SNOWFLAKE ROLE>,
           warehouse=<SNOWFLAKE WAREHOUSE>,
           expansion_service=<EXPANSION SERVICE ADDRESS>))

Required parameters

Authentication parameters

It’s required to pass one of the following combination of valid parameters for authentication:

Additional parameters

Limitations

SnowflakeIO currently has the following limitations.

  1. Streaming writing supports only pair key authentication. For details, see: Issue 21287.

  2. The role parameter configured in SnowflakeIO.DataSourceConfiguration object is ignored for streaming writing. For details, see: Issue 21365