Beam YAML Transform Index
AssignTimestamps
Assigns a new timestamp each element of its input.
This can be useful when reading records that have the timestamp embedded in them, for example with various file types or other sources that by default set all timestamps to the infinite past.
Note that the timestamp should only be set forward, as setting it backwards may not cause it to hold back an already advanced watermark and the data could become droppably late.
Supported languages: generic, javascript, python
Configuration
-
timestamp
?
(Optional) : A field, callable, or expression giving the new timestamp. -
language
string
(Optional) : The language of the timestamp expression. -
error_handling
Row
: Whether and how to handle errors during iteration.Row fields:
- output
string
- output
Usage
type: AssignTimestamps
input: ...
config:
timestamp: timestamp
language: "language"
error_handling:
output: "output"
Combine
Groups and combines records sharing common fields.
Built-in combine functions are sum
, max
, min
, all
, any
, mean
, count
, group
, concat
but custom aggregation functions can be used as well.
See also the documentation on YAML Aggregation.
Supported languages: calcite, generic, javascript, python, sql
Configuration
-
group_by
Array[string]
-
combine
Map[string, Map[string, ?]]
-
language
string
(Optional)
Usage
type: Combine
input: ...
config:
group_by:
- "group_by"
- ...
combine:
a:
a: combine_value_a_value_a
b: combine_value_a_value_b
c: ...
b:
a: combine_value_b_value_a
b: combine_value_b_value_b
c: ...
c: ...
language: "language"
Create
Creates a collection containing a specified set of elements.
This transform always produces schema'd data. For example
type: Create
config:
elements: [1, 2, 3]
will result in an output with three elements with a schema of Row(element=int) whereas YAML/JSON-style mappings will be interpreted directly as Beam rows, e.g.
type: Create
config:
elements:
- {first: 0, second: {str: "foo", values: [1, 2, 3]}}
- {first: 1, second: {str: "bar", values: [4, 5, 6]}}
will result in a schema of the form (int, Row(string, List[int])).
This can also be expressed as YAML
type: Create
config:
elements:
- first: 0
second:
str: "foo"
values: [1, 2, 3]
- first: 1
second:
str: "bar"
values: [4, 5, 6]
Configuration
-
elements
Array[?]
: The set of elements that should belong to the PCollection. YAML/JSON-style mappings will be interpreted as Beam rows. Primitives will be mapped to rows with a single "element" field. -
reshuffle
boolean
(Optional) : (optional) Whether to introduce a reshuffle (to possibly redistribute the work) if there is more than one element in the collection. Defaults to True.
Usage
type: Create
config:
elements:
- elements
- ...
reshuffle: true|false
Explode
Explodes (aka unnest/flatten) one or more fields producing multiple rows.
Given one or more fields of iterable type, produces multiple rows, one for
each value of that field. For example, a row of the form ('a', [1, 2, 3])
would expand to ('a', 1)
, ('a', 2')
, and ('a', 3)
when exploded on
the second field.
This is akin to a FlatMap
when paired with the MapToFields transform.
See more complete documentation on YAML Mapping Functions.
Configuration
-
fields
?
(Optional) : The list of fields to expand. -
cross_product
boolean
(Optional) : If multiple fields are specified, indicates whether the full cross-product of combinations should be produced, or if the first element of the first field corresponds to the first element of the second field, etc. For example, the row(['a', 'b'], [1, 2])
would expand to the four rows('a', 1)
,('a', 2)
,('b', 1)
, and('b', 2)
whencross_product
is set totrue
but only the two rows('a', 1)
and('b', 2)
when it is set tofalse
. Only meaningful (and required) if multiple rows are specified. -
error_handling
Map[string, ?]
(Optional) : Whether and how to handle errors during iteration.
Usage
type: Explode
input: ...
config:
fields: fields
cross_product: true|false
error_handling:
a: error_handling_value_a
b: error_handling_value_b
c: ...
Filter
Keeps only records that satisfy the given criteria.
See more complete documentation on YAML Filtering.
Supported languages: calcite, java, javascript, python, sql
Configuration
-
language
string
(Optional) -
keep
Row
Row fields:
-
callable
string
(Optional) -
expression
string
(Optional) -
name
string
(Optional) -
path
string
(Optional)
-
-
error_handling
Row
(Optional)Row fields:
- output
string
- output
Usage
type: Filter
input: ...
config:
language: "language"
keep:
callable: "callable"
expression: "expression"
name: "name"
path: "path"
error_handling:
output: "output"
Flatten
Flattens multiple PCollections into a single PCollection.
The elements of the resulting PCollection will be the (disjoint) union of all the elements of all the inputs.
Note that in YAML transforms can always take a list of inputs which will be implicitly flattened.
Configuration
No configuration parameters.
Usage
type: Flatten
input: ...
LogForTesting
Logs each element of its input PCollection.
The output of this transform is a copy of its input for ease of use in chain-style pipelines.
Configuration
-
level
string
(Optional) : one of ERROR, INFO, or DEBUG, mapped to a corresponding language-specific logging level -
prefix
string
(Optional) : an optional identifier that will get prepended to the element being logged
Usage
type: LogForTesting
input: ...
config:
level: "level"
prefix: "prefix"
MapToFields
Creates records with new fields defined in terms of the input fields.
See more complete documentation on YAML Mapping Functions.
Supported languages: calcite, generic, java, javascript, python, sql
Configuration
-
language
string
(Optional) -
append
boolean
(Optional) -
drop
Array[string]
(Optional) -
fields
Map[string, Row]
-
error_handling
Row
(Optional)Row fields:
- output
string
- output
Usage
type: MapToFields
input: ...
config:
language: "language"
append: true|false
drop:
- "drop"
- ...
fields:
a:
callable: "callable"
expression: "expression"
name: "name"
path: "path"
b:
callable: "callable"
expression: "expression"
name: "name"
path: "path"
c: ...
error_handling:
output: "output"
PyTransform
A Python PTransform identified by fully qualified name.
This allows one to import, construct, and apply any Beam Python transform. This can be useful for using transforms that have not yet been exposed via a YAML interface. Note, however, that conversion may be required if this transform does not accept or produce Beam Rows.
For example
type: PyTransform
config:
constructor: apache_beam.pkg.mod.SomeClass
args: [1, 'foo']
kwargs:
baz: 3
can be used to access the transform
apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3)
.
See also the documentation on Inlining Python.
Configuration
-
constructor
string
: Fully qualified name of a callable used to construct the transform. Often this is a class such asapache_beam.pkg.mod.SomeClass
but it can also be a function or any other callable that returns a PTransform. -
args
Array[?]
(Optional) : A list of parameters to pass to the callable as positional arguments. -
kwargs
Map[string, ?]
(Optional) : A list of parameters to pass to the callable as keyword arguments.
Usage
type: PyTransform
input: ...
config:
constructor: "constructor"
args:
- args
- ...
kwargs:
a: kwargs_value_a
b: kwargs_value_b
c: ...
Sql
Configuration
Usage
type: Sql
input: ...
config: ...
WindowInto
A window transform assigning windows to each element of a PCollection.
The assigned windows will affect all downstream aggregating operations, which will aggregate by window as well as by key.
See the Beam documentation on windowing for more details.
Sizes, offsets, periods and gaps (where applicable) must be defined using a time unit suffix 'ms', 's', 'm', 'h' or 'd' for milliseconds, seconds, minutes, hours or days, respectively. If a time unit is not specified, it will default to 's'.
For example
windowing:
type: fixed
size: 30s
Note that any Yaml transform can have a windowing parameter, which is applied to its inputs (if any) or outputs (if there are no inputs) which means that explicit WindowInto operations are not typically needed.
Configuration
- windowing
?
(Optional) : the type and parameters of the windowing to perform
Usage
type: WindowInto
input: ...
config:
windowing: windowing
ReadFromAvro
A PTransform
for reading records from avro files.
Each record of the resulting PCollection will contain
a single record read from a source. Records that are of simple types will be
mapped to beam Rows with a single record
field containing the records
value. Records that are of Avro type RECORD
will be mapped to Beam rows
that comply with the schema contained in the Avro file that contains those
records.
Configuration
- path
?
(Optional)
Usage
type: ReadFromAvro
config:
path: path
WriteToAvro
A PTransform
for writing avro files.
If the input has a schema, a corresponding avro schema will be automatically generated and used to write the output records.
Configuration
- path
?
(Optional)
Usage
type: WriteToAvro
input: ...
config:
path: path
ReadFromBigQuery
Reads data from BigQuery.
Exactly one of table or query must be set. If query is set, neither row_restriction nor fields should be set.
Configuration
-
query
string
(Optional) -
table
string
(Optional) -
fields
Array[string]
(Optional) -
row_restriction
string
(Optional)
Usage
type: ReadFromBigQuery
config:
query: "query"
table: "table"
fields:
- "fields"
- ...
row_restriction: "row_restriction"
WriteToBigQuery
Writes data to BigQuery using the Storage Write API (https://cloud.google.com/bigquery/docs/write-api).
This expects a single PCollection of Beam Rows and outputs two dead-letter queues (DLQ) that contain failed rows. The first DLQ has tag [FailedRows] and contains the failed rows. The second DLQ has tag [FailedRowsWithErrors] and contains failed rows and along with their respective errors.
Configuration
-
table
string
-
create_disposition
string
(Optional) -
write_disposition
string
(Optional) -
error_handling
Row
(Optional)Row fields:
- output
string
- output
-
num_streams
int32
(Optional)
Usage
type: WriteToBigQuery
input: ...
config:
table: "table"
create_disposition: "create_disposition"
write_disposition: "write_disposition"
error_handling:
output: "output"
num_streams: num_streams
ReadFromCsv
A PTransform for reading comma-separated values (csv) files into a PCollection.
Configuration
-
path
string
: The file path to read from. The path can contain glob characters such as*
and?
. -
delimiter
?
(Optional) -
comment
?
(Optional)
Usage
type: ReadFromCsv
config:
path: "path"
delimiter: delimiter
comment: comment
WriteToCsv
A PTransform for writing a schema'd PCollection as a (set of) comma-separated values (csv) files.
Configuration
-
path
string
: The file path to write to. The files written will begin with this prefix, followed by a shard identifier (seenum_shards
) according to thefile_naming
parameter. -
delimiter
?
(Optional)
Usage
type: WriteToCsv
input: ...
config:
delimiter: "delimiter"
path: "path"
ReadFromJdbc
Configuration
-
driver_class_name
string
(Optional) -
type
string
(Optional) -
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
query
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
Usage
type: ReadFromJdbc
config:
driver_class_name: "driver_class_name"
type: "type"
url: "url"
username: "username"
password: "password"
table: "table"
query: "query"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
WriteToJdbc
Configuration
-
driver_class_name
string
(Optional) -
type
string
(Optional) -
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
Usage
type: WriteToJdbc
input: ...
config:
driver_class_name: "driver_class_name"
type: "type"
url: "url"
username: "username"
password: "password"
table: "table"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
ReadFromJson
A PTransform for reading json values from files into a PCollection.
Configuration
- path
string
: The file path to read from. The path can contain glob characters such as*
and?
.
Usage
type: ReadFromJson
config:
path: "path"
WriteToJson
A PTransform for writing a PCollection as json values to files.
Configuration
- path
string
: The file path to write to. The files written will begin with this prefix, followed by a shard identifier (seenum_shards
) according to thefile_naming
parameter.
Usage
type: WriteToJson
input: ...
config:
path: "path"
ReadFromKafka
Configuration
-
schema
string
(Optional) -
consumer_config
Map[string, string]
(Optional) -
format
string
(Optional) -
topic
string
-
bootstrap_servers
string
-
confluent_schema_registry_url
string
(Optional) -
confluent_schema_registry_subject
string
(Optional) -
auto_offset_reset_config
string
(Optional) -
error_handling
Row
(Optional)Row fields:
- output
string
- output
-
file_descriptor_path
string
(Optional) -
message_name
string
(Optional)
Usage
type: ReadFromKafka
config:
schema: "schema"
consumer_config:
a: "consumer_config_value_a"
b: "consumer_config_value_b"
c: ...
format: "format"
topic: "topic"
bootstrap_servers: "bootstrap_servers"
confluent_schema_registry_url: "confluent_schema_registry_url"
confluent_schema_registry_subject: "confluent_schema_registry_subject"
auto_offset_reset_config: "auto_offset_reset_config"
error_handling:
output: "output"
file_descriptor_path: "file_descriptor_path"
message_name: "message_name"
WriteToKafka
Configuration
-
format
string
-
topic
string
-
bootstrap_servers
string
-
producer_config_updates
Map[string, string]
(Optional) -
error_handling
Row
(Optional)Row fields:
- output
string
- output
-
file_descriptor_path
string
(Optional) -
message_name
string
(Optional) -
schema
string
(Optional)
Usage
type: WriteToKafka
input: ...
config:
format: "format"
topic: "topic"
bootstrap_servers: "bootstrap_servers"
producer_config_updates:
a: "producer_config_updates_value_a"
b: "producer_config_updates_value_b"
c: ...
error_handling:
output: "output"
file_descriptor_path: "file_descriptor_path"
message_name: "message_name"
schema: "schema"
ReadFromMySql
Configuration
-
driver_class_name
string
(Optional) -
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
query
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
Usage
type: ReadFromMySql
config:
driver_class_name: "driver_class_name"
url: "url"
username: "username"
password: "password"
table: "table"
query: "query"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
WriteToMySql
Configuration
-
driver_class_name
string
(Optional) -
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
Usage
type: WriteToMySql
input: ...
config:
driver_class_name: "driver_class_name"
url: "url"
username: "username"
password: "password"
table: "table"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
ReadFromOracle
Configuration
-
driver_class_name
string
(Optional) -
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
query
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
Usage
type: ReadFromOracle
config:
driver_class_name: "driver_class_name"
url: "url"
username: "username"
password: "password"
table: "table"
query: "query"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
WriteToOracle
Configuration
-
driver_class_name
string
(Optional) -
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
Usage
type: WriteToOracle
input: ...
config:
driver_class_name: "driver_class_name"
url: "url"
username: "username"
password: "password"
table: "table"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
ReadFromParquet
A PTransform
for reading Parquet files.
Configuration
- path
?
(Optional)
Usage
type: ReadFromParquet
config:
path: path
WriteToParquet
A PTransform
for writing parquet files.
Configuration
- path
?
(Optional)
Usage
type: WriteToParquet
input: ...
config:
path: path
ReadFromPostgres
Configuration
-
driver_class_name
string
(Optional) -
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
query
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
Usage
type: ReadFromPostgres
config:
driver_class_name: "driver_class_name"
url: "url"
username: "username"
password: "password"
table: "table"
query: "query"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
WriteToPostgres
Configuration
-
driver_class_name
string
(Optional) -
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
Usage
type: WriteToPostgres
input: ...
config:
driver_class_name: "driver_class_name"
url: "url"
username: "username"
password: "password"
table: "table"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
ReadFromPubSub
Reads messages from Cloud Pub/Sub.
Configuration
-
topic
string
(Optional) : Cloud Pub/Sub topic in the form "projects//topics/ ". If provided, subscription must be None. -
subscription
string
(Optional) : Existing Cloud Pub/Sub subscription to use in the form "projects//subscriptions/ ". If not specified, a temporary subscription will be created from the specified topic. If provided, topic must be None. -
format
string
: The expected format of the message payload. Currently suported formats are- raw: Produces records with a single
payload
field whose contents are the raw bytes of the pubsub message. - avro: Parses records with a given avro schema.
- json: Parses records with a given json schema.
- raw: Produces records with a single
-
schema
?
(Optional) : Schema specification for the given format. -
attributes
Array[string]
(Optional) : List of attribute keys whose values will be flattened into the output message as additional fields. For example, if the format israw
and attributes is["a", "b"]
then this read will produce elements of the formRow(payload=..., a=..., b=...)
. -
attributes_map
string
(Optional) -
id_attribute
string
(Optional) : The attribute on incoming Pub/Sub messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for deduplication of messages. If not provided, we cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort. -
timestamp_attribute
string
(Optional) : Message value to use as element timestamp. If None, uses message publishing time as the timestamp.Timestamp values should be in one of two formats:
- A numerical value representing the number of milliseconds since the Unix epoch.
- A string in RFC 3339 format, UTC timezone. Example:
2015-10-29T23:41:41.123Z
. The sub-second component of the timestamp is optional, and digits beyond the first three (i.e., time units smaller than milliseconds) may be ignored.
-
error_handling
Row
Row fields:
- output
string
- output
Usage
type: ReadFromPubSub
config:
topic: "topic"
subscription: "subscription"
format: "format"
schema: schema
attributes:
- "attributes"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
WriteToPubSub
Writes messages to Cloud Pub/Sub.
Configuration
-
topic
string
: Cloud Pub/Sub topic in the form "/topics// ". -
format
string
: How to format the message payload. Currently suported formats are- raw: Expects a message with a single field (excluding attribute-related fields) whose contents are used as the raw bytes of the pubsub message.
- avro: Encodes records with a given avro schema, which may be inferred from the input PCollection schema.
- json: Formats records with a given json schema, which may be inferred from the input PCollection schema.
-
schema
?
(Optional) : Schema specification for the given format. -
attributes
Array[string]
(Optional) : List of attribute keys whose values will be pulled out as PubSub message attributes. For example, if the format israw
and attributes is["a", "b"]
then elements of the formRow(any_field=..., a=..., b=...)
will result in PubSub messages whose payload has the contents of any_field and whose attribute will be populated with the values ofa
andb
. -
attributes_map
string
(Optional) -
id_attribute
string
(Optional) : If set, will set an attribute for each Cloud Pub/Sub message with the given name and a unique value. This attribute can then be used in a ReadFromPubSub PTransform to deduplicate messages. -
timestamp_attribute
string
(Optional) : If set, will set an attribute for each Cloud Pub/Sub message with the given name and the message's publish time as the value. -
error_handling
Row
Row fields:
- output
string
- output
Usage
type: WriteToPubSub
input: ...
config:
topic: "topic"
format: "format"
schema: schema
attributes:
- "attributes"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
ReadFromPubSubLite
Configuration
-
project
string
(Optional) -
schema
string
(Optional) -
format
string
-
subscription_name
string
-
location
string
-
attributes
Array[string]
(Optional) -
attribute_map
string
(Optional) -
attribute_id
string
(Optional) -
error_handling
Row
(Optional)Row fields:
- output
string
- output
-
file_descriptor_path
string
(Optional) -
message_name
string
(Optional)
Usage
type: ReadFromPubSubLite
config:
project: "project"
schema: "schema"
format: "format"
subscription_name: "subscription_name"
location: "location"
attributes:
- "attributes"
- ...
attribute_map: "attribute_map"
attribute_id: "attribute_id"
error_handling:
output: "output"
file_descriptor_path: "file_descriptor_path"
message_name: "message_name"
WriteToPubSubLite
Configuration
-
project
string
-
format
string
-
topic_name
string
-
location
string
-
attributes
Array[string]
(Optional) -
attribute_id
string
(Optional) -
error_handling
Row
(Optional)Row fields:
- output
string
- output
-
file_descriptor_path
string
(Optional) -
message_name
string
(Optional) -
schema
string
(Optional)
Usage
type: WriteToPubSubLite
input: ...
config:
project: "project"
format: "format"
topic_name: "topic_name"
location: "location"
attributes:
- "attributes"
- ...
attribute_id: "attribute_id"
error_handling:
output: "output"
file_descriptor_path: "file_descriptor_path"
message_name: "message_name"
schema: "schema"
ReadFromSqlServer
Configuration
-
driver_class_name
string
(Optional) -
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
query
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
Usage
type: ReadFromSqlServer
config:
driver_class_name: "driver_class_name"
url: "url"
username: "username"
password: "password"
table: "table"
query: "query"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
WriteToSqlServer
Configuration
-
driver_class_name
string
(Optional) -
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
Usage
type: WriteToSqlServer
input: ...
config:
driver_class_name: "driver_class_name"
url: "url"
username: "username"
password: "password"
table: "table"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
ReadFromText
Reads lines from a text files.
The resulting PCollection consists of rows with a single string filed named "line."
Configuration
- path
string
: The file path to read from. The path can contain glob characters such as*
and?
.
Usage
type: ReadFromText
config:
path: "path"
WriteToText
Writes a PCollection to a (set of) text files(s).
The input must be a PCollection whose schema has exactly one field.
Configuration
- path
string
: The file path to write to. The files written will begin with this prefix, followed by a shard identifier.
Usage
type: WriteToText
input: ...
config:
path: "path"