Beam YAML Transform Index

AssignTimestamps

Assigns a new timestamp each element of its input.

This can be useful when reading records that have the timestamp embedded in them, for example with various file types or other sources that by default set all timestamps to the infinite past.

Note that the timestamp should only be set forward, as setting it backwards may not cause it to hold back an already advanced watermark and the data could become droppably late.

Supported languages: generic, javascript, python

Configuration

Usage

type: AssignTimestamps
input: ...
config:
  timestamp: timestamp
  language: "language"
  error_handling:
    output: "output"

Combine

Groups and combines records sharing common fields.

Built-in combine functions are sum, max, min, all, any, mean, count, group, concat but custom aggregation functions can be used as well.

See also the documentation on YAML Aggregation.

Supported languages: calcite, generic, javascript, python, sql

Configuration

Usage

type: Combine
input: ...
config:
  group_by:
  - "group_by"
  - ...
  combine:
    a:
      a: combine_value_a_value_a
      b: combine_value_a_value_b
      c: ...
    b:
      a: combine_value_b_value_a
      b: combine_value_b_value_b
      c: ...
    c: ...
  language: "language"

Create

Creates a collection containing a specified set of elements.

This transform always produces schema'd data. For example

type: Create
config:
  elements: [1, 2, 3]

will result in an output with three elements with a schema of Row(element=int) whereas YAML/JSON-style mappings will be interpreted directly as Beam rows, e.g.

type: Create
config:
  elements:
     - {first: 0, second: {str: "foo", values: [1, 2, 3]}}
     - {first: 1, second: {str: "bar", values: [4, 5, 6]}}

will result in a schema of the form (int, Row(string, List[int])).

This can also be expressed as YAML

type: Create
config:
  elements:
    - first: 0
      second:
        str: "foo"
         values: [1, 2, 3]
    - first: 1
      second:
        str: "bar"
         values: [4, 5, 6]

Configuration

Usage

type: Create
config:
  elements:
  - elements
  - ...
  reshuffle: true|false

Explode

Explodes (aka unnest/flatten) one or more fields producing multiple rows.

Given one or more fields of iterable type, produces multiple rows, one for each value of that field. For example, a row of the form ('a', [1, 2, 3]) would expand to ('a', 1), ('a', 2'), and ('a', 3) when exploded on the second field.

This is akin to a FlatMap when paired with the MapToFields transform.

See more complete documentation on YAML Mapping Functions.

Configuration

Usage

type: Explode
input: ...
config:
  fields: fields
  cross_product: true|false
  error_handling:
    a: error_handling_value_a
    b: error_handling_value_b
    c: ...

Filter

Keeps only records that satisfy the given criteria.

See more complete documentation on YAML Filtering.

Supported languages: calcite, java, javascript, python, sql

Configuration

Usage

type: Filter
input: ...
config:
  language: "language"
  keep:
    callable: "callable"
    expression: "expression"
    name: "name"
    path: "path"
  error_handling:
    output: "output"

Flatten

Flattens multiple PCollections into a single PCollection.

The elements of the resulting PCollection will be the (disjoint) union of all the elements of all the inputs.

Note that in YAML transforms can always take a list of inputs which will be implicitly flattened.

Configuration

No configuration parameters.

Usage

type: Flatten
input: ...

LogForTesting

Logs each element of its input PCollection.

The output of this transform is a copy of its input for ease of use in chain-style pipelines.

Configuration

Usage

type: LogForTesting
input: ...
config:
  level: "level"
  prefix: "prefix"

MapToFields

Creates records with new fields defined in terms of the input fields.

See more complete documentation on YAML Mapping Functions.

Supported languages: calcite, generic, java, javascript, python, sql

Configuration

Usage

type: MapToFields
input: ...
config:
  language: "language"
  append: true|false
  drop:
  - "drop"
  - ...
  fields:
    a:
      callable: "callable"
      expression: "expression"
      name: "name"
      path: "path"
    b:
      callable: "callable"
      expression: "expression"
      name: "name"
      path: "path"
    c: ...
  error_handling:
    output: "output"

PyTransform

A Python PTransform identified by fully qualified name.

This allows one to import, construct, and apply any Beam Python transform. This can be useful for using transforms that have not yet been exposed via a YAML interface. Note, however, that conversion may be required if this transform does not accept or produce Beam Rows.

For example

type: PyTransform
config:
   constructor: apache_beam.pkg.mod.SomeClass
   args: [1, 'foo']
   kwargs:
     baz: 3

can be used to access the transform apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3).

See also the documentation on Inlining Python.

Configuration

Usage

type: PyTransform
input: ...
config:
  constructor: "constructor"
  args:
  - args
  - ...
  kwargs:
    a: kwargs_value_a
    b: kwargs_value_b
    c: ...

Sql

Configuration

Usage

type: Sql
input: ...
config: ...

WindowInto

A window transform assigning windows to each element of a PCollection.

The assigned windows will affect all downstream aggregating operations, which will aggregate by window as well as by key.

See the Beam documentation on windowing for more details.

Sizes, offsets, periods and gaps (where applicable) must be defined using a time unit suffix 'ms', 's', 'm', 'h' or 'd' for milliseconds, seconds, minutes, hours or days, respectively. If a time unit is not specified, it will default to 's'.

For example

windowing:
   type: fixed
   size: 30s

Note that any Yaml transform can have a windowing parameter, which is applied to its inputs (if any) or outputs (if there are no inputs) which means that explicit WindowInto operations are not typically needed.

Configuration

Usage

type: WindowInto
input: ...
config:
  windowing: windowing

ReadFromAvro

A PTransform for reading records from avro files.

Each record of the resulting PCollection will contain a single record read from a source. Records that are of simple types will be mapped to beam Rows with a single record field containing the records value. Records that are of Avro type RECORD will be mapped to Beam rows that comply with the schema contained in the Avro file that contains those records.

Configuration

Usage

type: ReadFromAvro
config:
  path: path

WriteToAvro

A PTransform for writing avro files.

If the input has a schema, a corresponding avro schema will be automatically generated and used to write the output records.

Configuration

Usage

type: WriteToAvro
input: ...
config:
  path: path

ReadFromBigQuery

Reads data from BigQuery.

Exactly one of table or query must be set. If query is set, neither row_restriction nor fields should be set.

Configuration

Usage

type: ReadFromBigQuery
config:
  query: "query"
  table: "table"
  fields:
  - "fields"
  - ...
  row_restriction: "row_restriction"

WriteToBigQuery

Writes data to BigQuery using the Storage Write API (https://cloud.google.com/bigquery/docs/write-api).

This expects a single PCollection of Beam Rows and outputs two dead-letter queues (DLQ) that contain failed rows. The first DLQ has tag [FailedRows] and contains the failed rows. The second DLQ has tag [FailedRowsWithErrors] and contains failed rows and along with their respective errors.

Configuration

Usage

type: WriteToBigQuery
input: ...
config:
  table: "table"
  create_disposition: "create_disposition"
  write_disposition: "write_disposition"
  error_handling:
    output: "output"
  num_streams: num_streams

ReadFromCsv

A PTransform for reading comma-separated values (csv) files into a PCollection.

Configuration

Usage

type: ReadFromCsv
config:
  path: "path"
  delimiter: delimiter
  comment: comment

WriteToCsv

A PTransform for writing a schema'd PCollection as a (set of) comma-separated values (csv) files.

Configuration

Usage

type: WriteToCsv
input: ...
config:
  delimiter: "delimiter"
  path: "path"

ReadFromJdbc

Configuration

Usage

type: ReadFromJdbc
config:
  driver_class_name: "driver_class_name"
  type: "type"
  url: "url"
  username: "username"
  password: "password"
  table: "table"
  query: "query"
  driver_jars: "driver_jars"
  connection_properties: "connection_properties"
  connection_init_sql:
  - "connection_init_sql"
  - ...

WriteToJdbc

Configuration

Usage

type: WriteToJdbc
input: ...
config:
  driver_class_name: "driver_class_name"
  type: "type"
  url: "url"
  username: "username"
  password: "password"
  table: "table"
  driver_jars: "driver_jars"
  connection_properties: "connection_properties"
  connection_init_sql:
  - "connection_init_sql"
  - ...

ReadFromJson

A PTransform for reading json values from files into a PCollection.

Configuration

Usage

type: ReadFromJson
config:
  path: "path"

WriteToJson

A PTransform for writing a PCollection as json values to files.

Configuration

Usage

type: WriteToJson
input: ...
config:
  path: "path"

ReadFromKafka

Configuration

Usage

type: ReadFromKafka
config:
  schema: "schema"
  consumer_config:
    a: "consumer_config_value_a"
    b: "consumer_config_value_b"
    c: ...
  format: "format"
  topic: "topic"
  bootstrap_servers: "bootstrap_servers"
  confluent_schema_registry_url: "confluent_schema_registry_url"
  confluent_schema_registry_subject: "confluent_schema_registry_subject"
  auto_offset_reset_config: "auto_offset_reset_config"
  error_handling:
    output: "output"
  file_descriptor_path: "file_descriptor_path"
  message_name: "message_name"

WriteToKafka

Configuration

Usage

type: WriteToKafka
input: ...
config:
  format: "format"
  topic: "topic"
  bootstrap_servers: "bootstrap_servers"
  producer_config_updates:
    a: "producer_config_updates_value_a"
    b: "producer_config_updates_value_b"
    c: ...
  error_handling:
    output: "output"
  file_descriptor_path: "file_descriptor_path"
  message_name: "message_name"
  schema: "schema"

ReadFromMySql

Configuration

Usage

type: ReadFromMySql
config:
  driver_class_name: "driver_class_name"
  url: "url"
  username: "username"
  password: "password"
  table: "table"
  query: "query"
  driver_jars: "driver_jars"
  connection_properties: "connection_properties"
  connection_init_sql:
  - "connection_init_sql"
  - ...

WriteToMySql

Configuration

Usage

type: WriteToMySql
input: ...
config:
  driver_class_name: "driver_class_name"
  url: "url"
  username: "username"
  password: "password"
  table: "table"
  driver_jars: "driver_jars"
  connection_properties: "connection_properties"
  connection_init_sql:
  - "connection_init_sql"
  - ...

ReadFromOracle

Configuration

Usage

type: ReadFromOracle
config:
  driver_class_name: "driver_class_name"
  url: "url"
  username: "username"
  password: "password"
  table: "table"
  query: "query"
  driver_jars: "driver_jars"
  connection_properties: "connection_properties"
  connection_init_sql:
  - "connection_init_sql"
  - ...

WriteToOracle

Configuration

Usage

type: WriteToOracle
input: ...
config:
  driver_class_name: "driver_class_name"
  url: "url"
  username: "username"
  password: "password"
  table: "table"
  driver_jars: "driver_jars"
  connection_properties: "connection_properties"
  connection_init_sql:
  - "connection_init_sql"
  - ...

ReadFromParquet

A PTransform for reading Parquet files.

Configuration

Usage

type: ReadFromParquet
config:
  path: path

WriteToParquet

A PTransform for writing parquet files.

Configuration

Usage

type: WriteToParquet
input: ...
config:
  path: path

ReadFromPostgres

Configuration

Usage

type: ReadFromPostgres
config:
  driver_class_name: "driver_class_name"
  url: "url"
  username: "username"
  password: "password"
  table: "table"
  query: "query"
  driver_jars: "driver_jars"
  connection_properties: "connection_properties"
  connection_init_sql:
  - "connection_init_sql"
  - ...

WriteToPostgres

Configuration

Usage

type: WriteToPostgres
input: ...
config:
  driver_class_name: "driver_class_name"
  url: "url"
  username: "username"
  password: "password"
  table: "table"
  driver_jars: "driver_jars"
  connection_properties: "connection_properties"
  connection_init_sql:
  - "connection_init_sql"
  - ...

ReadFromPubSub

Reads messages from Cloud Pub/Sub.

Configuration

Usage

type: ReadFromPubSub
config:
  topic: "topic"
  subscription: "subscription"
  format: "format"
  schema: schema
  attributes:
  - "attributes"
  - ...
  attributes_map: "attributes_map"
  id_attribute: "id_attribute"
  timestamp_attribute: "timestamp_attribute"
  error_handling:
    output: "output"

WriteToPubSub

Writes messages to Cloud Pub/Sub.

Configuration

Usage

type: WriteToPubSub
input: ...
config:
  topic: "topic"
  format: "format"
  schema: schema
  attributes:
  - "attributes"
  - ...
  attributes_map: "attributes_map"
  id_attribute: "id_attribute"
  timestamp_attribute: "timestamp_attribute"
  error_handling:
    output: "output"

ReadFromPubSubLite

Configuration

Usage

type: ReadFromPubSubLite
config:
  project: "project"
  schema: "schema"
  format: "format"
  subscription_name: "subscription_name"
  location: "location"
  attributes:
  - "attributes"
  - ...
  attribute_map: "attribute_map"
  attribute_id: "attribute_id"
  error_handling:
    output: "output"
  file_descriptor_path: "file_descriptor_path"
  message_name: "message_name"

WriteToPubSubLite

Configuration

Usage

type: WriteToPubSubLite
input: ...
config:
  project: "project"
  format: "format"
  topic_name: "topic_name"
  location: "location"
  attributes:
  - "attributes"
  - ...
  attribute_id: "attribute_id"
  error_handling:
    output: "output"
  file_descriptor_path: "file_descriptor_path"
  message_name: "message_name"
  schema: "schema"

ReadFromSqlServer

Configuration

Usage

type: ReadFromSqlServer
config:
  driver_class_name: "driver_class_name"
  url: "url"
  username: "username"
  password: "password"
  table: "table"
  query: "query"
  driver_jars: "driver_jars"
  connection_properties: "connection_properties"
  connection_init_sql:
  - "connection_init_sql"
  - ...

WriteToSqlServer

Configuration

Usage

type: WriteToSqlServer
input: ...
config:
  driver_class_name: "driver_class_name"
  url: "url"
  username: "username"
  password: "password"
  table: "table"
  driver_jars: "driver_jars"
  connection_properties: "connection_properties"
  connection_init_sql:
  - "connection_init_sql"
  - ...

ReadFromText

Reads lines from a text files.

The resulting PCollection consists of rows with a single string filed named "line."

Configuration

Usage

type: ReadFromText
config:
  path: "path"

WriteToText

Writes a PCollection to a (set of) text files(s).

The input must be a PCollection whose schema has exactly one field.

Configuration

Usage

type: WriteToText
input: ...
config:
  path: "path"