Package org.apache.beam.sdk.io.snowflake
Class SnowflakeIO.Write<T>
java.lang.Object
org.apache.beam.sdk.transforms.PTransform<PCollection<T>,PDone>
org.apache.beam.sdk.io.snowflake.SnowflakeIO.Write<T>
- All Implemented Interfaces:
Serializable
,HasDisplayData
- Enclosing class:
SnowflakeIO
Implementation of
SnowflakeIO.write()
.- See Also:
-
Field Summary
Fields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionexpand
(PCollection<T> input) Override this method to specify how thisPTransform
should be expanded on the givenInputT
.protected ParDo.SingleOutput
<List<String>, Void> streamToTable
(SnowflakeServices snowflakeServices, ValueProvider<String> stagingBucketDir) A table name to be written in Snowflake.to
(ValueProvider<String> table) withCreateDisposition
(CreateDisposition createDisposition) A disposition to be used during table preparation.Setting information about Snowflake server.withDataSourceProviderFn
(SerializableFunction<Void, DataSource> dataSourceProviderFn) Setting function that will provideSnowflakeIO.DataSourceConfiguration
in runtime.withDebugMode
(StreamingLogLevel debugLevel) The option to verbose info (or only errors) of loaded files while streaming.withFileNameTemplate
(String fileNameTemplate) A template name for files saved to GCP.withFlushRowLimit
(Integer rowsCount) Sets number of row limit that will be saved to the staged file and then loaded to Snowflake.withFlushTimeLimit
(Duration triggeringFrequency) Sets duration how often staged files will be created and then how often ingested by Snowflake during streaming.withQueryTransformation
(String query) A query to be executed in Snowflake.withQuotationMark
(String quotationMark) Sets Snowflake-specific quotations around strings.withShardsNumber
(Integer shardsNumber) Number of shards that are created per window.withSnowflakeServices
(SnowflakeServices snowflakeServices) A snowflake serviceSnowflakeServices
implementation which is supposed to be used.withSnowPipe
(String snowPipe) Sets name of SnowPipe which can be created in Snowflake dashboard or cli:withSnowPipe
(ValueProvider<String> snowPipe) Same aswithSnowPipe(String
, but with aValueProvider
.withStagingBucketName
(String stagingBucketName) Name of the cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement.withStagingBucketName
(ValueProvider<String> stagingBucketName) withStorageIntegrationName
(String integrationName) Name of the Storage Integration in Snowflake to be used.withStorageIntegrationName
(ValueProvider<String> integrationName) withTableSchema
(SnowflakeTableSchema tableSchema) Table schema to be used during creating table.withUserDataMapper
(SnowflakeIO.UserDataMapper<T> userDataMapper) User-defined function mapping user data into CSV lines.withWriteDisposition
(WriteDisposition writeDisposition) A disposition to be used during writing to table phase.Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setDisplayData, setResourceHints, toString, validate, validate
-
Constructor Details
-
Write
public Write()
-
-
Method Details
-
withDataSourceConfiguration
Setting information about Snowflake server.- Parameters:
config
- An instance ofSnowflakeIO.DataSourceConfiguration
.
-
withDataSourceProviderFn
public SnowflakeIO.Write<T> withDataSourceProviderFn(SerializableFunction<Void, DataSource> dataSourceProviderFn) Setting function that will provideSnowflakeIO.DataSourceConfiguration
in runtime.- Parameters:
dataSourceProviderFn
- aSerializableFunction
.
-
to
A table name to be written in Snowflake.- Parameters:
table
- String with the name of the table.
-
to
-
withStagingBucketName
Name of the cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement.- Parameters:
stagingBucketName
- String with the name of the bucket.
-
withStagingBucketName
-
withStorageIntegrationName
Name of the Storage Integration in Snowflake to be used. See https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html for reference.- Parameters:
integrationName
- String with the name of the Storage Integration.
-
withStorageIntegrationName
-
withQueryTransformation
A query to be executed in Snowflake.- Parameters:
query
- String with query.
-
withQueryTransformation
-
withFileNameTemplate
A template name for files saved to GCP.- Parameters:
fileNameTemplate
- String with template name for files.
-
withUserDataMapper
User-defined function mapping user data into CSV lines.- Parameters:
userDataMapper
- an instance ofSnowflakeIO.UserDataMapper
.
-
withFlushTimeLimit
Sets duration how often staged files will be created and then how often ingested by Snowflake during streaming.- Parameters:
triggeringFrequency
- time for triggering frequency inDuration
type.- Returns:
-
withSnowPipe
Sets name of SnowPipe which can be created in Snowflake dashboard or cli:CREATE snowPipeName AS COPY INTO your_table from @yourstage;
The stage in COPY statement should be pointing to the cloud integration with the valid bucket url, ex. for GCS:
CREATE STAGE yourstage URL = 'gcs://yourbucket/path/' STORAGE_INTEGRATION = your_integration;
CREATE STORAGE INTEGRATION your_integration TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = GCS ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = ('gcs://yourbucket/path/')
- Parameters:
snowPipe
- name of created SnowPipe in Snowflake dashboard.- Returns:
-
withSnowPipe
Same aswithSnowPipe(String
, but with aValueProvider
.- Parameters:
snowPipe
- name of created SnowPipe in Snowflake dashboard.- Returns:
-
withShardsNumber
Number of shards that are created per window.- Parameters:
shardsNumber
- defined number of shards or 1 by default.- Returns:
-
withFlushRowLimit
Sets number of row limit that will be saved to the staged file and then loaded to Snowflake. If the number of rows will be lower than the limit it will be loaded with current number of rows after certain time specified by settingwithFlushTimeLimit(Duration triggeringFrequency)
- Parameters:
rowsCount
- Number of rows that will be in one file staged for loading. Default: 10000.- Returns:
-
withWriteDisposition
A disposition to be used during writing to table phase.- Parameters:
writeDisposition
- an instance ofWriteDisposition
.
-
withCreateDisposition
A disposition to be used during table preparation.- Parameters:
createDisposition
- - an instance ofCreateDisposition
.
-
withTableSchema
Table schema to be used during creating table.- Parameters:
tableSchema
- - an instance ofSnowflakeTableSchema
.
-
withSnowflakeServices
A snowflake serviceSnowflakeServices
implementation which is supposed to be used.- Parameters:
snowflakeServices
- an instance ofSnowflakeServices
.
-
withQuotationMark
Sets Snowflake-specific quotations around strings.- Parameters:
quotationMark
- with possible single quote'
, double quote"
or nothing. Default value is single quotation'
.- Returns:
-
withDebugMode
The option to verbose info (or only errors) of loaded files while streaming. It is not set by default because it may influence performance. For details: insert report REST API.- Parameters:
debugLevel
- error or info debug level from enumStreamingLogLevel
- Returns:
-
expand
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PCollection<T>,
PDone>
-
streamToTable
protected ParDo.SingleOutput<List<String>,Void> streamToTable(SnowflakeServices snowflakeServices, ValueProvider<String> stagingBucketDir)
-