Built-in I/O Transforms

Snowflake I/O

Pipeline options and general information about using and running Snowflake IO.

Authentication

All authentication methods available for the Snowflake JDBC Driver are possible to use with the IO transforms:

Passing credentials is done via Pipeline options.

Passing credentials is done via Pipeline options used to instantiate SnowflakeIO.DataSourceConfiguration:

SnowflakePipelineOptions options = PipelineOptionsFactory
        .fromArgs(args)
        .withValidation()
        .as(SnowflakePipelineOptions.class);
SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(options);

SnowflakeIO.DataSourceConfiguration.create(credentials)
        .(other DataSourceConfiguration options)

Username and password

To use username/password authentication in SnowflakeIO, invoke your pipeline with the following Pipeline options:

--username=<USERNAME> --password=<PASSWORD>

Key pair

To use this authentication method, you must first generate a key pair and associate the public key with the Snowflake user that will connect using the IO transform. For instructions, see the Snowflake documentation.

To use key pair authentication with SnowflakeIO, invoke your pipeline with one of the following set of Pipeline options:

--username=<USERNAME> --privateKeyPath=<PATH_TO_P8_FILE> --privateKeyPassphrase=<PASSWORD_FOR_KEY>
--username=<USERNAME> --rawPrivateKey=<PRIVATE_KEY> --privateKeyPassphrase=<PASSWORD_FOR_KEY>

OAuth token

SnowflakeIO also supports OAuth token.

IMPORTANT: SnowflakeIO requires a valid OAuth access token. It will neither be able to refresh the token nor obtain it using a web-based flow. For information on configuring an OAuth integration and obtaining the token, see the Snowflake documentation.

Once you have the token, invoke your pipeline with following Pipeline Options:

--oauthToken=<TOKEN>

DataSource Configuration

DataSource configuration is required in both read and write object for configuring Snowflake connection properties for IO purposes.

General usage

Create the DataSource configuration:

 SnowflakeIO.DataSourceConfiguration
            .create()
            .withUrl(options.getUrl())
            .withServerName(options.getServerName())
            .withDatabase(options.getDatabase())
            .withWarehouse(options.getWarehouse())
            .withSchema(options.getSchema());
Where parameters can be:

Note - either .withUrl(...) or .withServerName(...) is required.

Pipeline options

Use Beam’s Pipeline options to set options via the command line.

Snowflake Pipeline options

Snowflake IO library supports following options that can be passed via the command line by default when a Pipeline uses them:

--url Snowflake’s JDBC-like url including account name and region without any parameters.

--serverName Full server name with account, zone and domain.

--username Required for username/password and Private Key authentication.

--oauthToken Required for OAuth authentication only.

--password Required for username/password authentication only.

--privateKeyPath Path to Private Key file. Required for Private Key authentication only.

--rawPrivateKey Private Key. Required for Private Key authentication only.

--privateKeyPassphrase Private Key’s passphrase. Required for Private Key authentication only.

--stagingBucketName External bucket path ending with /. I.e. gs://bucket/. Sub-directories are allowed.

--storageIntegrationName Storage integration name

--warehouse Warehouse to use. Optional.

--database Database name to connect to. Optional.

--schema Schema to use. Optional.

--table Table to use. Optional.

--query Query to use. Optional.

--role Role to use. Optional.

--authenticator Authenticator to use. Optional.

--portNumber Port number. Optional.

--loginTimeout Login timeout. Optional.

--snowPipe SnowPipe name. Optional.

Running main command with Pipeline options

To pass Pipeline options via the command line, use --args in a gradle command as follows:

./gradle run
    --args="
        --serverName=<SNOWFLAKE SERVER NAME>
           Example: --serverName=account.region.gcp.snowflakecomputing.com
        --username=<SNOWFLAKE USERNAME>
           Example: --username=testuser
        --password=<SNOWFLAKE PASSWORD>
           Example: --password=mypassword
        --database=<SNOWFLAKE DATABASE>
           Example: --database=TEST_DATABASE
        --schema=<SNOWFLAKE SCHEMA>
           Example: --schema=public
        --table=<SNOWFLAKE TABLE IN DATABASE>
           Example: --table=TEST_TABLE
        --query=<IF NOT TABLE THEN QUERY>
           Example: --query=‘SELECT column FROM TABLE’
        --storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>
           Example: --storageIntegrationName=my_integration
        --stagingBucketName=<GCS BUCKET NAME>
           Example: --stagingBucketName=gs://my_gcp_bucket/
        --runner=<DirectRunner/DataflowRunner>
           Example: --runner=DataflowRunner
        --project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME>
           Example: --project=my_project
        --tempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING
                        WITH gs://…>
           Example: --tempLocation=gs://my_bucket/temp/
        --region=<FOR DATAFLOW RUNNER: GCP REGION>
           Example: --region=us-east-1
        --appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>
           Example: --appName=my_job"
Then in the code it is possible to access the parameters with arguments using the options.getStagingBucketName(); command.

Running test command with Pipeline options

To pass Pipeline options via the command line, use -DintegrationTestPipelineOptions in a gradle command as follows:

./gradlew test --tests nameOfTest
-DintegrationTestPipelineOptions='[
  "--serverName=<SNOWFLAKE SERVER NAME>",
      Example: --serverName=account.region.gcp.snowflakecomputing.com
  "--username=<SNOWFLAKE USERNAME>",
      Example: --username=testuser
  "--password=<SNOWFLAKE PASSWORD>",
      Example: --password=mypassword
  "--schema=<SNOWFLAKE SCHEMA>",
      Example: --schema=PUBLIC
  "--table=<SNOWFLAKE TABLE IN DATABASE>",
      Example: --table=TEST_TABLE
  "--database=<SNOWFLAKE DATABASE>",
      Example: --database=TEST_DATABASE
  "--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>",
      Example: --storageIntegrationName=my_integration
  "--stagingBucketName=<GCS BUCKET NAME>",
      Example: --stagingBucketName=gs://my_gcp_bucket
  "--externalLocation=<GCS BUCKET URL STARTING WITH GS://>",
      Example: --tempLocation=gs://my_bucket/temp/
]' --no-build-cache

Where all parameters are starting with “–”, they are surrounded with double quotation and separated with comma:

Running pipelines on Dataflow

By default, pipelines are run on Direct Runner on your local machine. To run a pipeline on Google Dataflow, you must provide the following Pipeline options:

More pipeline options for Dataflow can be found here.

Note: To properly authenticate with Google Cloud, please use gcloud or follow the Google Cloud documentation.

Important: Please acknowledge Google Dataflow pricing

Running pipeline templates on Dataflow

Google Dataflow is supporting template creation which means staging pipelines on Cloud Storage and running them with ability to pass runtime parameters that are only available during pipeline execution.

The process of creating own Dataflow template is following

  1. Create your own pipeline.
  2. Create Dataflow template with checking which options SnowflakeIO is supporting at runtime.
  3. Run a Dataflow template using Cloud Console, REST API or gcloud.

Currently, SnowflakeIO supports following options at runtime:

Note: table and query is not in pipeline options by default, it may be added by extending PipelineOptions.

Currently, SnowflakeIO doesn’t support following options at runtime:

Writing to Snowflake tables

One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user’s PCollection to your Snowflake database.

Batch write (from a bounded source)

The basic .write() operation usage is as follows:

data.apply(
   SnowflakeIO.<type>write()
       .withDataSourceConfiguration(dc)
       .toTable("MY_TABLE")
       .withStagingBucketName("BUCKET NAME")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
)
Replace type with the data type of the PCollection object to write; for example, SnowflakeIO. for an input PCollection of Strings.

All the below parameters are required:

  • .withDataSourceConfiguration() Accepts a DatasourceConfiguration object.

  • .toTable() Accepts the target Snowflake table name.

  • .withStagingBucketName() Accepts a cloud bucket path ended with slash. -Example: .withStagingBucketName("gs://mybucket/my/dir/")

  • .withStorageIntegrationName() Accepts a name of a Snowflake storage integration object created according to Snowflake documentationt. Example:

    CREATE OR REPLACE STORAGE INTEGRATION test_integration
    TYPE = EXTERNAL_STAGE
    STORAGE_PROVIDER = GCS
    ENABLED = TRUE
    STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
    Then:
    .withStorageIntegrationName(test_integration)

  • .withUserDataMapper() Accepts the UserDataMapper function that will map a user’s PCollection to an array of String values (String[]).

Note: SnowflakeIO uses COPY statements behind the scenes to write (using COPY to table). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path.

Optional for batching:

  • .withQuotationMark()
    • Default value: (single quotation mark).
    • Accepts String with one character. It will surround all text (String) fields saved to CSV. It should be one of the accepted characters by Snowflake’s FIELD_OPTIONALLY_ENCLOSED_BY parameter (double quotation mark, single quotation mark or none).
    • Example: .withQuotationMark("'")

Streaming write (from unbounded source)

It is required to create a SnowPipe in the Snowflake console. SnowPipe should use the same integration and the same bucket as specified by .withStagingBucketName and .withStorageIntegrationName methods. The write operation might look as follows:

data.apply(
   SnowflakeIO.<type>write()
      .withStagingBucketName("BUCKET NAME")
      .withStorageIntegrationName("STORAGE INTEGRATION NAME")
      .withDataSourceConfiguration(dc)
      .withUserDataMapper(mapper)
      .withSnowPipe("MY_SNOW_PIPE")
      .withFlushTimeLimit(Duration.millis(time))
      .withFlushRowLimit(rowsNumber)
      .withShardsNumber(shardsNumber)
)

Parameters

Required for streaming:

  • .withDataSourceConfiguration()

    • Accepts a DatasourceConfiguration object.
  • .toTable()

    • Accepts the target Snowflake table name.
    • Example: .toTable("MY_TABLE)
  • .withStagingBucketName()

    • Accepts a cloud bucket path ended with slash.
    • Example: .withStagingBucketName("gs://mybucket/my/dir/")
  • .withStorageIntegrationName()

    • Accepts a name of a Snowflake storage integration object created according to Snowflake documentationt.
    • Example:
      CREATE OR REPLACE STORAGE INTEGRATION test_integration
      TYPE = EXTERNAL_STAGE
      STORAGE_PROVIDER = GCS
      ENABLED = TRUE
      STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
      Then:
      .withStorageIntegrationName(test_integration)
  • .withSnowPipe()

    • Accepts the target SnowPipe name. .withSnowPipe() accepts the exact name of snowpipe. Example:

      CREATE OR REPLACE PIPE test_database.public.test_gcs_pipe
      AS COPY INTO stream_table from @streamstage;

    • Then:

      .withSnowPipe(test_gcs_pipe)

Note: this is important to provide schema and database names.

  • .withUserDataMapper()
    • Accepts the UserDataMapper function that will map a user’s PCollection to an array of String values (String[]).

Note:

SnowflakeIO uses COPY statements behind the scenes to write (using COPY to table). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path.

Optional for streaming:

  • .withFlushTimeLimit()

    • Default value: 30 seconds
    • Accepts Duration objects with the specified time after each the streaming write will be repeated
    • Example: .withFlushTimeLimit(Duration.millis(180000))
  • .withFlushRowLimit()

    • Default value: 10,000 rows
    • Limit of rows written to each staged file
    • Example: .withFlushRowLimit(500000)
  • .withShardNumber()

    • Default value: 1 shard
    • Number of files that will be saved in every flush (for purposes of parallel write).
    • Example: .withShardNumber(5)
  • .withQuotationMark()

    • Default value: (single quotation mark).
    • Accepts String with one character. It will surround all text (String) fields saved to CSV. It should be one of the accepted characters by Snowflake’s FIELD_OPTIONALLY_ENCLOSED_BY parameter (double quotation mark, single quotation mark or none). Example: .withQuotationMark("") (no quotation marks)
  • .withDebugMode()

    • Accepts:
      • SnowflakeIO.StreamingLogLevel.INFO - shows whole info about loaded files
      • SnowflakeIO.StreamingLogLevel.ERROR - shows only errors.
    • Shows logs about streamed files to Snowflake similarly to insertReport. Enabling debug mode may influence performance.
    • Example: .withDebugMode(SnowflakeIO.StreamingLogLevel.INFO)

Important notice: Streaming accepts only key pair authentication.

Flush time: duration & number of rows

Duration: streaming write will write periodically files on stage according to time duration specified in flush time limit (for example. every 1 minute).

Number of rows: files staged for write will have number of rows specified in flush row limit unless the flush time limit will be reached (for example if the limit is 1000 rows and buffor collected 99 rows and the 1 minute flush time passes, the rows will be sent to SnowPipe for insertion).

Size of staged files will depend on the rows size and used compression (GZIP).

UserDataMapper function

The UserDataMapper function is required to map data from a PCollection to an array of String values before the write() operation saves the data to temporary .csv files. For example:

public static SnowflakeIO.UserDataMapper<Long> getCsvMapper() {
    return (SnowflakeIO.UserDataMapper<Long>) recordLine -> new String[] {recordLine.toString()};
}

Additional write options

Transformation query

The .withQueryTransformation() option for the write() operation accepts a SQL query as a String value, which will be performed while transfering data staged in CSV files directly to the target Snowflake table. For information about the transformation SQL syntax, see the Snowflake Documentation.

Usage:

String query = "SELECT t.$1 from YOUR_TABLE;";
data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .toTable("MY_TABLE")
       .withStagingBucketName("BUCKET NAME")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withQueryTransformation(query)
)

Write disposition

Define the write behaviour based on the table where data will be written to by specifying the .withWriteDisposition(...) option for the write() operation. The following values are supported:

  • APPEND - Default behaviour. Written data is added to the existing rows in the table,

  • EMPTY - The target table must be empty; otherwise, the write operation fails,

  • TRUNCATE - The write operation deletes all rows from the target table before writing to it.

Example of usage:

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .toTable("MY_TABLE")
       .withStagingBucketName("BUCKET NAME")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withWriteDisposition(TRUNCATE)
)

Create disposition

The .withCreateDisposition() option defines the behavior of the write operation if the target table does not exist . The following values are supported:

  • CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the .withTableSchema() option.

  • CREATE_NEVER - The write operation fails if the target table does not exist.

Usage:

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .toTable("MY_TABLE")
       .withStagingBucketName("BUCKET NAME")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withCreateDisposition(CREATE_NEVER)
)

Table schema disposition

When the .withCreateDisposition() .option is set to CREATE_IF_NEEDED, the .withTableSchema() option enables specifying the schema for the created target table. A table schema is a list of SFColumn objects with name and type corresponding to column type for each column in the table.

Usage:

SFTableSchema tableSchema =
    new SFTableSchema(
        SFColumn.of("my_date", new SFDate(), true),
        new SFColumn("id", new SFNumber()),
        SFColumn.of("name", new SFText(), true));

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .toTable("MY_TABLE")
       .withStagingBucketName("BUCKET NAME")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withTableSchema(tableSchema)
)

Reading from Snowflake

One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a PCollection of user-defined data type.

General usage

The basic .read() operation usage:

PCollection<USER_DATA_TYPE> items = pipeline.apply(
   SnowflakeIO.<USER_DATA_TYPE>read()
       .withDataSourceConfiguration(dc)
       .fromTable("MY_TABLE") // or .fromQuery("QUERY")
       .withStagingBucketName("BUCKET NAME")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withCsvMapper(mapper)
       .withCoder(coder));
)
Where all below parameters are required:

  • .withDataSourceConfiguration(...)

    • Accepts a DataSourceConfiguration object.
  • .fromTable(...) or .fromQuery(...)

    • Specifies a Snowflake table name or custom SQL query.
  • .withStagingBucketName()

    • Accepts a cloud bucket name.
  • .withStorageIntegrationName()

  • Accepts a name of a Snowflake storage integration object created according to Snowflake documentation. Example:

    CREATE OR REPLACE STORAGE INTEGRATION test_integration
    TYPE = EXTERNAL_STAGE
    STORAGE_PROVIDER = GCS
    ENABLED = TRUE
    STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
    Then:
    .withStorageIntegrationName(test_integration)

  • .withCsvMapper(mapper)

    • Accepts a CSVMapper instance for mapping String[] to USER_DATA_TYPE.
  • .withCoder(coder)

    • Accepts the Coder for USER_DATA_TYPE.

Note: SnowflakeIO uses COPY statements behind the scenes to read (using COPY to location) files staged in cloud storage.StagingBucketName will be used as a temporary location for storing CSV files. Those temporary directories will be named sf_copy_csv_DATE_TIME_RANDOMSUFFIX and they will be removed automatically once Read operation finishes.

CSVMapper

SnowflakeIO uses a COPY INTO statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via FileIO and processed line by line. Each line is split into an array of Strings using the OpenCSV library.

The CSVMapper’s job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom POJO.

Example implementation of CsvMapper for GenericRecord:

static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
   return (SnowflakeIO.CsvMapper<GenericRecord>)
           parts -> {
               return new GenericRecordBuilder(PARQUET_SCHEMA)
                       .set("ID", Long.valueOf(parts[0]))
                       .set("NAME", parts[1])
                       [...]
                       .build();
           };
}

Using SnowflakeIO in Python SDK

Intro

Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to cross-language which is part of Portability Framework Roadmap which aims to provide full interoperability across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).

For more information about cross-language please see multi sdk efforts and Cross-language transforms API and expansion service articles.

Reading from Snowflake

One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a PCollection of user-defined data type.

General usage

OPTIONS = ["--runner=FlinkRunner"]

with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
   (p
       | ReadFromSnowflake(...)
       | <FURTHER TRANSFORMS>)

Required parameters

  • server_name Full Snowflake server name with an account, zone, and domain.

  • schema Name of the Snowflake schema in the database to use.

  • database Name of the Snowflake database to use.

  • staging_bucket_name Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named sf_copy_csv_DATE_TIME_RANDOMSUFFIX and they will be removed automatically once Read operation finishes.

  • storage_integration_name Is the name of a Snowflake storage integration object created according to Snowflake documentation.

  • csv_mapper Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a COPY INTO statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via FileIO and processed line by line. Each line is split into an array of Strings using the OpenCSV library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects. Example:

    def csv_mapper(strings_array):
        return User(strings_array[0], int(strings_array[1])))

  • table or query Specifies a Snowflake table name or custom SQL query

Authentication parameters

It’s required to pass one of the following combinations of valid parameters for authentication:

  • username and password Specifies username and password for username/password authentication method.

  • private_key_path and private_key_passphrase Specifies a path to private key and passphrase for key/pair authentication method.

  • raw_private_key and private_key_passphrase Specifies a private key and passphrase for key/pair authentication method.

  • o_auth_token Specifies access token for OAuth authentication method.

Additional parameters

  • role specifies Snowflake role. If not specified the user’s default will be used.

  • warehouse specifies Snowflake warehouse name. If not specified the user’s default will be used.

  • expansion_service specifies URL of expansion service.

Writing to Snowflake

One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user’s PCollection to your Snowflake database.

General usage

OPTIONS = ["--runner=FlinkRunner"]

with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
   (p
       | <SOURCE OF DATA>
       | WriteToSnowflake(
           server_name=<SNOWFLAKE SERVER NAME>,
           username=<SNOWFLAKE USERNAME>,
           password=<SNOWFLAKE PASSWORD>,
           o_auth_token=<OAUTH TOKEN>,
           private_key_path=<PATH TO P8 FILE>,
           raw_private_key=<PRIVATE_KEY>
           private_key_passphrase=<PASSWORD FOR KEY>,
           schema=<SNOWFLAKE SCHEMA>,
           database=<SNOWFLAKE DATABASE>,
           staging_bucket_name=<GCS BUCKET NAME>,
           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
           create_disposition=<CREATE DISPOSITION>,
           write_disposition=<WRITE DISPOSITION>,
           table_schema=<SNOWFLAKE TABLE SCHEMA>,
           user_data_mapper=<USER DATA MAPPER FUNCTION>,
           table=<SNOWFLAKE TABLE>,
           query=<IF NOT TABLE THEN QUERY>,
           role=<SNOWFLAKE ROLE>,
           warehouse=<SNOWFLAKE WAREHOUSE>,
           expansion_service=<EXPANSION SERVICE ADDRESS>))

Required parameters

  • server_name Full Snowflake server name with account, zone and domain.

  • schema Name of the Snowflake schema in the database to use.

  • database Name of the Snowflake database to use.

  • staging_bucket_name Path to Google Cloud Storage bucket ended with slash. Bucket will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under “staging_bucket_name” path.

  • storage_integration_name Is the name of a Snowflake storage integration object created according to Snowflake documentation.

  • user_data_mapper Specifies a function which maps data from a PCollection to an array of String values before the write operation saves the data to temporary .csv files. Example:

    def user_data_mapper(user):
        return [user.name, str(user.age)]

  • table or query Specifies a Snowflake table name or custom SQL query

Authentication parameters

It’s required to pass one of the following combination of valid parameters for authentication:

  • username and password Specifies username/password authentication method.

  • private_key_path and private_key_passphrase Specifies a path to private key and passphrase for key/pair authentication method.

  • raw_private_key and private_key_passphrase Specifies a private key and passphrase for key/pair authentication method.

  • o_auth_token Specifies access token for OAuth authentication method.

Additional parameters

  • role specifies Snowflake role. If not specified the user’s default will be used.

  • warehouse specifies Snowflake warehouse name. If not specified the user’s default will be used.

  • create_disposition Defines the behaviour of the write operation if the target table does not exist. The following values are supported:

    • CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the table_schema parameter.
    • CREATE_NEVER - The write operation fails if the target table does not exist.
  • write_disposition Defines the write behaviour based on the table where data will be written to. The following values are supported:

    • APPEND - Default behaviour. Written data is added to the existing rows in the table,
    • EMPTY - The target table must be empty; otherwise, the write operation fails,
    • TRUNCATE - The write operation deletes all rows from the target table before writing to it.
  • table_schema When the create_disposition parameter is set to CREATE_IF_NEEDED, the table_schema parameter enables specifying the schema for the created target table. A table schema is a JSON array with the following structure:

    {"schema": [
        {
          "dataType":{"type":"<COLUMN DATA TYPE>"},
          "name":"<COLUMN  NAME> ",
          "nullable": <NULLABLE>
        },
            ...
      ]}
    All supported data types:
    {"type":"date"},
    {"type":"datetime"},
    {"type":"time"},
    {"type":"timestamp"},
    {"type":"timestamp_ltz"},
    {"type":"timestamp_ntz"},
    {"type":"timestamp_tz"},
    {"type":"boolean"},
    {"type":"decimal","precision":38,"scale":1},
    {"type":"double"},
    {"type":"float"},
    {"type":"integer","precision":38,"scale":0},
    {"type":"number","precision":38,"scale":1},
    {"type":"numeric","precision":38,"scale":2},
    {"type":"real"},
    {"type":"array"},
    {"type":"object"},
    {"type":"variant"},
    {"type":"binary","size":null},
    {"type":"char","length":1},
    {"type":"string","length":null},
    {"type":"text","length":null},
    {"type":"varbinary","size":null},
    {"type":"varchar","length":100}]
    You can read about Snowflake data types at Snowflake data types.

  • expansion_service Specifies URL of expansion service.