Beam YAML Transform Index


Asserts that the input contains exactly the elements provided.

This is primarily used for testing; it will cause the entire pipeline to fail if the input to this transform is not exactly the set of elements given in the config parameter.

As with Create, YAML/JSON-style mappings are interpreted as Beam rows, e.g.

type: AssertEqual
input: SomeTransform
     - {a: 0, b: "foo"}
     - {a: 1, b: "bar"}

would ensure that SomeTransform produced exactly two elements with values (a=0, b="foo") and (a=1, b="bar") respectively.


  • elements Array[?] : The set of elements that should belong to the PCollection. YAML/JSON-style mappings will be interpreted as Beam rows.


type: AssertEqual
input: ...
  - element
  - element
  - ...


Assigns a new timestamp each element of its input.

This can be useful when reading records that have the timestamp embedded in them, for example with various file types or other sources that by default set all timestamps to the infinite past.

Note that the timestamp should only be set forward, as setting it backwards may not cause it to hold back an already advanced watermark and the data could become droppably late.

Supported languages: generic, javascript, python.


  • timestamp ? (Optional) : A field, callable, or expression giving the new timestamp.

  • language string (Optional) : The language of the timestamp expression.

  • error_handling Row (Optional) : Whether and how to handle errors during timestamp evaluation.

    Row fields:

    • output string : Name to use for the output error collection


type: AssignTimestamps
input: ...
  timestamp: timestamp
  language: "language"
    output: "output"


Groups and combines records sharing common fields.

Built-in combine functions are sum, max, min, all, any, mean, count, group, concat but custom aggregation functions can be used as well.

See also the documentation on YAML Aggregation.

Supported languages: calcite, generic, javascript, python, sql.


  • group_by Array[string] : The field(s) to aggregate on.

  • combine Map[string, Map[string, ?]] : The aggregation function to use.

  • language string (Optional) : The language used to define (and execute) the custom callables in combine. Defaults to generic.


type: Combine
input: ...
  - "group_by"
  - "group_by"
  - ...
      a: combine_value_a_value_a
      b: combine_value_a_value_b
      c: ...
      a: combine_value_b_value_a
      b: combine_value_b_value_b
      c: ...
    c: ...
  language: "language"


Creates a collection containing a specified set of elements.

This transform always produces schema'd data. For example

type: Create
  elements: [1, 2, 3]

will result in an output with three elements with a schema of Row(element=int) whereas YAML/JSON-style mappings will be interpreted directly as Beam rows, e.g.

type: Create
     - {first: 0, second: {str: "foo", values: [1, 2, 3]}}
     - {first: 1, second: {str: "bar", values: [4, 5, 6]}}

will result in a schema of the form (int, Row(string, list[int])).

This can also be expressed as YAML

type: Create
    - first: 0
        str: "foo"
         values: [1, 2, 3]
    - first: 1
        str: "bar"
         values: [4, 5, 6]


  • elements Array[?] : The set of elements that should belong to the PCollection. YAML/JSON-style mappings will be interpreted as Beam rows. Primitives will be mapped to rows with a single "element" field.

  • reshuffle boolean (Optional) : (optional) Whether to introduce a reshuffle (to possibly redistribute the work) if there is more than one element in the collection. Defaults to True.


type: Create
  - element
  - element
  - ...
  reshuffle: true|false


The Enrichment transform allows one to dynamically enhance elements in a pipeline by performing key-value lookups against external services like APIs or databases.

Example using BigTable:

- type: Enrichment
    enrichment_handler: 'BigTable'
      project_id: 'apache-beam-testing'
      instance_id: 'beam-test'
      table_id: 'bigtable-enrichment-test'
      row_key: 'product_id'
    timeout: 30

For more information on Enrichment, see the Beam docs.


  • enrichment_handler string : Specifies the source from where data needs to be extracted into the pipeline for enriching data. One of "BigQuery", "BigTable", "FeastFeatureStore" or "VertexAIFeatureStore".

  • handler_config Map[string, ?] : Specifies the parameters for the respective enrichment_handler in a YAML/JSON format. To see the full set of handler_config parameters, see their corresponding doc pages:

  • timeout double (Optional) : Timeout for source requests in seconds. Defaults to 30 seconds.


type: Enrichment
input: ...
  enrichment_handler: "enrichment_handler"
    a: handler_config_value_a
    b: handler_config_value_b
    c: ...
  timeout: timeout


Explodes (aka unnest/flatten) one or more fields producing multiple rows.

Given one or more fields of iterable type, produces multiple rows, one for each value of that field. For example, a row of the form ('a', [1, 2, 3]) would expand to ('a', 1), ('a', 2'), and ('a', 3) when exploded on the second field.

This is akin to a FlatMap when paired with the MapToFields transform.

See more complete documentation on YAML Mapping Functions.


  • fields ? (Optional) : The list of fields to expand.

  • cross_product boolean (Optional) : If multiple fields are specified, indicates whether the full cross-product of combinations should be produced, or if the first element of the first field corresponds to the first element of the second field, etc. For example, the row (['a', 'b'], [1, 2]) would expand to the four rows ('a', 1), ('a', 2), ('b', 1), and ('b', 2) when cross_product is set to true but only the two rows ('a', 1) and ('b', 2) when it is set to false. Only meaningful (and required) if multiple rows are specified.

  • error_handling Row (Optional) : Whether and how to handle errors during iteration.

    Row fields:

    • output string : Name to use for the output error collection


type: Explode
input: ...
  fields: fields
  cross_product: true|false
    a: error_handling_value_a
    b: error_handling_value_b
    c: ...


Keeps only records that satisfy the given criteria.

See more complete documentation on YAML Filtering.

Supported languages: calcite, generic, java, javascript, python, sql.


  • language string (Optional)

  • keep Row

    Row fields:

    • callable string (Optional)

    • expression string (Optional)

    • name string (Optional)

    • path string (Optional)

  • error_handling Row (Optional) : This option specifies whether and where to output error rows.

    Row fields:

    • output string : Name to use for the output error collection


type: Filter
input: ...
  language: "language"
    callable: "callable"
    expression: "expression"
    name: "name"
    path: "path"
    output: "output"


Flattens multiple PCollections into a single PCollection.

The elements of the resulting PCollection will be the (disjoint) union of all the elements of all the inputs.

Note that in YAML transforms can always take a list of inputs which will be implicitly flattened.


No configuration parameters.


type: Flatten
input: ...


Joins two or more inputs using a specified condition.

For example

type: Join
  input1: SomeTransform
  input2: AnotherTransform
  input3: YetAnotherTransform
  type: inner
    - input1: colA
      input2: colB
    - input2: colX
      input3: colY
    input1: [colA, colB, colC]
    input2: {new_name: colB}

would perform an inner join on the three inputs satisfying the constraints that input1.colA = input2.colB and input2.colX = input3.colY emitting rows with colA, colB and colC from input1, the values of input2.colB as a field called new_name, and all the fields from input3.


  • equalities ? (Optional) : The condition to join on. A list of sets of columns that should be equal to fulfill the join condition. For the simple scenario of joining on the same column across all inputs where the column name is the same, one can specify the column name as the equality rather than having to list it for every input.

  • type ? (Optional) : The type of join. Could be a string value in ["inner", "left", "right", "outer"] that specifies the type of join to be performed. For scenarios with multiple inputs to join where different join types are desired, specify the inputs to be outer joined. For example, {outer: [input1, input2]} means that input1 and input2 will be outer joined using the conditions specified, while other inputs will be inner joined.

  • fields Map[string, ?] (Optional) : The fields to be outputted. A mapping with the input alias as the key and the list of fields in the input to be outputted. The value in the map can either be a dictionary with the new field name as the key and the original field name as the value (e.g new_field_name: field_name), or a list of the fields to be outputted with their original names (e.g [col1, col2, col3]), or an '*' indicating all fields in the input will be outputted. If not specified, all fields from all inputs will be outputted.


type: Join
input: ...
  equalities: equalities
  type: type
    a: fields_value_a
    b: fields_value_b
    c: ...


Logs each element of its input PCollection.

The output of this transform is a copy of its input for ease of use in chain-style pipelines.


  • level string (Optional) : one of ERROR, INFO, or DEBUG, mapped to a corresponding language-specific logging level

  • prefix string (Optional) : an optional identifier that will get prepended to the element being logged

  • error_handling Row (Optional) : This option specifies whether and where to output error rows.

    Row fields:

    • output string : Name to use for the output error collection


type: LogForTesting
input: ...
  level: "level"
  prefix: "prefix"
    output: "output"



  • write_artifact_location string (Optional)

  • read_artifact_location string (Optional)

  • transforms Array[?] (Optional)


type: MLTransform
input: ...
  write_artifact_location: "write_artifact_location"
  read_artifact_location: "read_artifact_location"
  - transforms
  - transforms
  - ...


Creates records with new fields defined in terms of the input fields.

See more complete documentation on YAML Mapping Functions.

Supported languages: calcite, generic, java, javascript, python, sql.


  • fields Map[string, ?] : The output fields to compute, each mapping to the expression or callable that creates them.

  • append boolean (Optional) : Whether to append the created fields to the set of fields already present, outputting a union of both the new fields and the original fields for each record. Defaults to False.

  • drop Array[string] (Optional) : If append is true, enumerates a subset of fields from the original record that should not be kept

  • language string (Optional) : The language used to define (and execute) the expressions and/or callables in fields. Defaults to generic.

  • error_handling Row (Optional) : Whether and where to output records that throw errors when the above expressions are evaluated.

    Row fields:

    • output string : Name to use for the output error collection


type: MapToFields
input: ...
  language: "language"
  append: true|false
  - "drop"
  - "drop"
  - ...
      name: "name"
      path: "path"
      expression: "expression"
      callable: "callable"
      name: "name"
      path: "path"
      expression: "expression"
      callable: "callable"
    c: ...
    output: "output"


Splits an input into several distinct outputs.

Each input element will go to a distinct output based on the field or function given in the by configuration parameter.

Supported languages: generic, javascript, python.


  • by ? (Optional) : A field, callable, or expression giving the destination output for this element. Should return a string that is a member of the outputs parameter. If unknown_output is also set, other returns values are accepted as well, otherwise an error will be raised.

  • outputs Array[string] : The set of outputs into which this input is being partitioned.

  • unknown_output string (Optional) : (Optional) If set, indicates a destination output for any elements that are not assigned an output listed in the outputs parameter.

  • error_handling Row (Optional) : (Optional) Whether and how to handle errors during partitioning.

    Row fields:

    • output string : Name to use for the output error collection
  • language string (Optional) : (Optional) The language of the by expression.


type: Partition
input: ...
  by: by
  - "outputs"
  - "outputs"
  - ...
  unknown_output: "unknown_output"
    a: error_handling_value_a
    b: error_handling_value_b
    c: ...
  language: "language"


A Python PTransform identified by fully qualified name.

This allows one to import, construct, and apply any Beam Python transform. This can be useful for using transforms that have not yet been exposed via a YAML interface. Note, however, that conversion may be required if this transform does not accept or produce Beam Rows.

For example

type: PyTransform
   constructor: apache_beam.pkg.mod.SomeClass
   args: [1, 'foo']
     baz: 3

can be used to access the transform apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3).

See also the documentation on Inlining Python.


  • constructor string : Fully qualified name of a callable used to construct the transform. Often this is a class such as apache_beam.pkg.mod.SomeClass but it can also be a function or any other callable that returns a PTransform.

  • args Array[?] (Optional) : A list of parameters to pass to the callable as positional arguments.

  • kwargs Map[string, ?] (Optional) : A list of parameters to pass to the callable as keyword arguments.


type: PyTransform
input: ...
  constructor: "constructor"
  - arg
  - arg
  - ...
    a: kwargs_value_a
    b: kwargs_value_b
    c: ...


A transform that takes the input rows, containing examples (or features), for use on an ML model. The transform then appends the inferences (or predictions) for those examples to the input row.

A ModelHandler must be passed to the model_handler parameter. The ModelHandler is responsible for configuring how the ML model will be loaded and how input data will be passed to it. Every ModelHandler has a config tag, similar to how a transform is defined, where the parameters are defined.

For example:

- type: RunInference
      type: ModelHandler
        param_1: arg1
        param_2: arg2

By default, the RunInference transform will return the input row with a single field appended named by the inference_tag parameter ("inference" by default) that contains the inference directly returned by the underlying ModelHandler, after any optional postprocessing.

For example, if the input had the following:

Row(question="What is a car?")

The output row would look like:

Row(question="What is a car?", inference=...)

where the inference tag can be overridden with the inference_tag parameter.

However, if one specified the following transform config:

- type: RunInference
    inference_tag: my_inference
    model_handler: ...

The output row would look like:

Row(question="What is a car?", my_inference=...)

See more complete documentation on the underlying RunInference transform.

Preprocessing input data

In most cases, the model will be expecting data in a particular data format, whether it be a Python Dict, PyTorch tensor, etc. However, the outputs of all built-in Beam YAML transforms are Beam Rows. To allow for transforming the Beam Row into a data format the model recognizes, each ModelHandler is equipped with a preprocessing parameter for performing necessary data preprocessing. It is possible for a ModelHandler to define a default preprocessing function, but in most cases, one will need to be specified by the caller.

For example, using callable:

  type: chain

    - type: Create
          - question: "What is a car?"
          - question: "Where is the Eiffel Tower located?"

    - type: RunInference
          type: ModelHandler
            param_1: arg1
            param_2: arg2
              callable: 'lambda row: {"prompt": row.question}'

In the above example, the Create transform generates a collection of two Beam Row elements, each with a single field - "question". The model, however, expects a Python Dict with a single key, "prompt". In this case, we can specify a simple Lambda function (alternatively could define a full function), to map the data.

Postprocessing predictions

It is also possible to define a postprocessing function to postprocess the data output by the ModelHandler. See the documentation for the ModelHandler you intend to use (list defined below under model_handler parameter doc).

In many cases, before postprocessing, the object will be a PredictionResult. # pylint: disable=line-too-long This type behaves very similarly to a Beam Row and fields can be accessed using dot notation. However, make sure to check the docs for your ModelHandler to see which fields its PredictionResult contains or if it returns a different object altogether.

For example:

- type: RunInference
      type: ModelHandler
        param_1: arg1
        param_2: arg2
          callable: |
            def fn(x: PredictionResult):
              return beam.Row(x.example, x.inference, x.model_id)

The above example demonstrates converting the original output data type (in this case it is PredictionResult), and converts to a Beam Row, which allows for easier mapping in a later transform.

File-based pre/postprocessing functions

For both preprocessing and postprocessing, it is also possible to specify a Python UDF (User-defined function) file that contains the function. This is possible by specifying the path to the file (local file or GCS path) and the name of the function in the file.

For example:

- type: RunInference
      type: ModelHandler
        param_1: arg1
        param_2: arg2
          path: gs://my-bucket/path/to/
          name: my_preprocess_fn
          path: gs://my-bucket/path/to/
          name: my_postprocess_fn


  • model_handler Map[string, ?] : Specifies the parameters for the respective enrichment_handler in a YAML/JSON format. To see the full set of handler_config parameters, see their corresponding doc pages:

  • inference_tag string (Optional) : The tag to use for the returned inference. Default is 'inference'.

  • inference_args Map[string, ?] (Optional) : Extra arguments for models whose inference call requires extra parameters. Make sure to check the underlying ModelHandler docs to see which args are allowed.


type: RunInference
input: ...
    a: model_handler_value_a
    b: model_handler_value_b
    c: ...
  inference_tag: "inference_tag"
    a: inference_args_value_a
    b: inference_args_value_b
    c: ...




type: Sql
input: ...
config: ...


Strips error metadata from outputs returned via error handling.

Generally the error outputs for transformations return information about the error encountered (e.g. error messages and tracebacks) in addition to the failing element itself. This transformation attempts to remove that metadata and returns the bad element alone which can be useful for re-processing.

For example, in the following pipeline snippet

- name: MyMappingTransform
  type: MapToFields
  input: SomeInput
    language: python
      output: errors

- name: RecoverOriginalElements
  type: StripErrorMetadata
  input: MyMappingTransform.errors

the output of RecoverOriginalElements will contain exactly those elements from SomeInput that failed to processes (whereas MyMappingTransform.errors would contain those elements paired with error information).

Note that this relies on the preceding transform actually returning the failing input in a schema'd way. Most built-in transformation follow the correct conventions.


No configuration parameters.


type: StripErrorMetadata
input: ...


Validates each element of a PCollection against a json schema.


  • schema Map[string, ?] : A json schema against which to validate each element.

  • error_handling Row (Optional) : Whether and how to handle errors during iteration. If this is not set, invalid elements will fail the pipeline, otherwise invalid elements will be passed to the specified error output along with information about how the schema was invalidated.

    Row fields:

    • output string : Name to use for the output error collection


type: ValidateWithSchema
input: ...
    a: schema_value_a
    b: schema_value_b
    c: ...
    a: error_handling_value_a
    b: error_handling_value_b
    c: ...


A window transform assigning windows to each element of a PCollection.

The assigned windows will affect all downstream aggregating operations, which will aggregate by window as well as by key.

See the Beam documentation on windowing for more details.

Sizes, offsets, periods and gaps (where applicable) must be defined using a time unit suffix 'ms', 's', 'm', 'h' or 'd' for milliseconds, seconds, minutes, hours or days, respectively. If a time unit is not specified, it will default to 's'.

For example

   type: fixed
   size: 30s

Note that any Yaml transform can have a windowing parameter, which is applied to its inputs (if any) or outputs (if there are no inputs) which means that explicit WindowInto operations are not typically needed.


  • windowing ? (Optional) : the type and parameters of the windowing to perform


type: WindowInto
input: ...
  windowing: windowing


A PTransform for reading records from avro files.

Each record of the resulting PCollection will contain a single record read from a source. Records that are of simple types will be mapped to beam Rows with a single record field containing the records value. Records that are of Avro type RECORD will be mapped to Beam rows that comply with the schema contained in the Avro file that contains those records.


  • path ? (Optional)


type: ReadFromAvro
  path: path


A PTransform for writing avro files.

If the input has a schema, a corresponding avro schema will be automatically generated and used to write the output records.


  • path ? (Optional)


type: WriteToAvro
input: ...
  path: path


Reads data from BigQuery.

Exactly one of table or query must be set. If query is set, neither row_restriction nor fields should be set.


  • query string (Optional) : The SQL query to be executed to read from the BigQuery table.

  • table string (Optional) : The fully-qualified name of the BigQuery table to read from. Format: [${PROJECT}:]${DATASET}.${TABLE}

  • fields Array[string] (Optional) : Read only the specified fields (columns) from a BigQuery table. Fields may not be returned in the order specified. If no value is specified, then all fields are returned. Example: "col1, col2, col3"

  • row_restriction string (Optional) : Read only rows that match this filter, which must be compatible with Google standard SQL. This is not supported when reading via query.


type: ReadFromBigQuery
  query: "query"
  table: "table"
  - "field"
  - "field"
  - ...
  row_restriction: "row_restriction"


Writes data to BigQuery using the Storage Write API (

This expects a single PCollection of Beam Rows and outputs two dead-letter queues (DLQ) that contain failed rows. The first DLQ has tag [FailedRows] and contains the failed rows. The second DLQ has tag [FailedRowsWithErrors] and contains failed rows and along with their respective errors.


  • table string : The bigquery table to write to. Format: [${PROJECT}:]${DATASET}.${TABLE}

  • create_disposition string (Optional) : Optional field that specifies whether the job is allowed to create new tables. The following values are supported: CREATE_IF_NEEDED (the job may create the table), CREATE_NEVER (the job must fail if the table does not exist already).

  • write_disposition string (Optional) : Specifies the action that occurs if the destination table already exists. The following values are supported: WRITE_TRUNCATE (overwrites the table data), WRITE_APPEND (append the data to the table), WRITE_EMPTY (job must fail if the table is not empty).

  • error_handling Row (Optional) : This option specifies whether and where to output unwritable rows.

    Row fields:

    • output string : Name to use for the output error collection
  • num_streams int32 (Optional) : Specifies the number of write streams that the Storage API sink will use. This parameter is only applicable when writing unbounded data.


type: WriteToBigQuery
input: ...
  table: "table"
  create_disposition: "create_disposition"
  write_disposition: "write_disposition"
    output: "output"
  num_streams: num_streams


A PTransform for reading comma-separated values (csv) files into a PCollection.


  • path string : The file path to read from. The path can contain glob characters such as * and ?.

  • delimiter ? (Optional)

  • comment ? (Optional)


type: ReadFromCsv
  path: "path"
  delimiter: delimiter
  comment: comment


A PTransform for writing a schema'd PCollection as a (set of) comma-separated values (csv) files.


  • path string : The file path to write to. The files written will begin with this prefix, followed by a shard identifier (see num_shards) according to the file_naming parameter.

  • delimiter ? (Optional)


type: WriteToCsv
input: ...
  delimiter: "delimiter"
  path: "path"


Reads an Apache Iceberg table.

See also the Apache Iceberg Beam documentation.


  • table string : The identifier of the Apache Iceberg table. Example: "db.table1".

  • catalog_name string (Optional) : The name of the catalog. Example: "local".

  • catalog_properties Map[string, string] (Optional) : A map of configuration properties for the Apache Iceberg catalog. The required properties depend on the catalog. For more information, see CatalogUtil in the Apache Iceberg documentation.

  • config_properties Map[string, string] (Optional) : An optional set of Hadoop configuration properties. For more information, see CatalogUtil in the Apache Iceberg documentation.


type: ReadFromIceberg
  table: "table"
  catalog_name: "catalog_name"
    a: "catalog_properties_value_a"
    b: "catalog_properties_value_b"
    c: ...
    a: "config_properties_value_a"
    b: "config_properties_value_b"
    c: ...


Writes to an Apache Iceberg table.

See also the Apache Iceberg Beam documentation including the dynamic destinations section for use of the keep, drop, and only parameters.


  • table string : The identifier of the Apache Iceberg table. Example: "db.table1".

  • catalog_name string (Optional) : The name of the catalog. Example: "local".

  • catalog_properties Map[string, string] (Optional) : A map of configuration properties for the Apache Iceberg catalog. The required properties depend on the catalog. For more information, see CatalogUtil in the Apache Iceberg documentation.

  • config_properties Map[string, string] (Optional) : An optional set of Hadoop configuration properties. For more information, see CatalogUtil in the Apache Iceberg documentation.

  • triggering_frequency_seconds int64 (Optional) : For streaming write pipelines, the frequency at which the sink attempts to produce snapshots, in seconds.

  • keep Array[string] (Optional) : An optional list of field names to keep when writing to the destination. Other fields are dropped. Mutually exclusive with drop and only.

  • drop Array[string] (Optional) : An optional list of field names to drop before writing to the destination. Mutually exclusive with keep and only.

  • only string (Optional) : The name of exactly one field to keep as the top level record when writing to the destination. All other fields are dropped. This field must be of row type. Mutually exclusive with drop and keep.


type: WriteToIceberg
input: ...
  table: "table"
  catalog_name: "catalog_name"
    a: "catalog_properties_value_a"
    b: "catalog_properties_value_b"
    c: ...
    a: "config_properties_value_a"
    b: "config_properties_value_b"
    c: ...
  triggering_frequency_seconds: triggering_frequency_seconds
  - "keep"
  - "keep"
  - ...
  - "drop"
  - "drop"
  - ...
  only: "only"


Read from a JDBC source using a SQL query or by directly accessing a single table.

This transform can be used to read from a JDBC source using either a given JDBC driver jar and class name, or by using one of the default packaged drivers given a jdbc_type.

Using a default driver

This transform comes packaged with drivers for several popular JDBC distributions. The following distributions can be declared as the jdbc_type: mysql, oracle, postgres, mssql.

For example, reading a MySQL source using a SQL query:

- type: ReadFromJdbc
    jdbc_type: mysql
    url: "jdbc:mysql://my-host:3306/database"
    query: "SELECT * FROM table"

Note: See the following transforms which are built on top of this transform and simplify this logic for several popular JDBC distributions:

Declaring custom JDBC drivers

If reading from a JDBC source not listed above, or if it is necessary to use a custom driver not packaged with Beam, one must define a JDBC driver and class name.

For example, reading a MySQL source table:

- type: ReadFromJdbc
    driver_jars: "path/to/some/jdbc.jar"
    driver_class_name: "com.mysql.jdbc.Driver"
    url: "jdbc:mysql://my-host:3306/database"
    table: "my-table"

Connection Properties

Connection properties are properties sent to the Driver used to connect to the JDBC source. For example, to set the character encoding to UTF-8, one could write:

- type: ReadFromJdbc
    connectionProperties: "characterEncoding=UTF-8;"

All properties should be semi-colon-delimited (e.g. "key1=value1;key2=value2;")


  • url string : Connection URL for the JDBC source.

  • connection_init_sql Array[string] (Optional) : Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.

  • connection_properties string (Optional) : Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".

  • disable_auto_commit boolean (Optional) : Whether to disable auto commit on read. Defaults to true if not provided. The need for this config varies depending on the database platform. Informix requires this to be set to false while Postgres requires this to be set to true.

  • driver_class_name string (Optional) : Name of a Java Driver class to use to connect to the JDBC source. For example, "com.mysql.jdbc.Driver".

  • driver_jars string (Optional) : Comma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.

  • fetch_size int32 (Optional) : This method is used to override the size of the data that is going to be fetched and loaded in memory per every database call. It should ONLY be used if the default value throws memory errors.

  • output_parallelization boolean (Optional) : Whether to reshuffle the resulting PCollection so results are distributed to all workers.

  • password string (Optional) : Password for the JDBC source.

  • query string (Optional) : SQL query used to query the JDBC source.

  • table string (Optional) : Name of the table to read from.

  • type string (Optional) : Type of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.

  • username string (Optional) : Username for the JDBC source.


type: ReadFromJdbc
  url: "url"
  - "connection_init_sql"
  - "connection_init_sql"
  - ...
  connection_properties: "connection_properties"
  disable_auto_commit: true|false
  driver_class_name: "driver_class_name"
  driver_jars: "driver_jars"
  fetch_size: fetch_size
  output_parallelization: true|false
  password: "password"
  query: "query"
  table: "table"
  type: "type"
  username: "username"


Write to a JDBC sink using a SQL query or by directly accessing a single table.

This transform can be used to write to a JDBC sink using either a given JDBC driver jar and class name, or by using one of the default packaged drivers given a jdbc_type.

Using a default driver

This transform comes packaged with drivers for several popular JDBC distributions. The following distributions can be declared as the jdbc_type: mysql, oracle, postgres, mssql.

For example, writing to a MySQL sink using a SQL query:

- type: WriteToJdbc
    jdbc_type: mysql
    url: "jdbc:mysql://my-host:3306/database"
    query: "INSERT INTO table VALUES(?, ?)"

Note: See the following transforms which are built on top of this transform and simplify this logic for several popular JDBC distributions:

Declaring custom JDBC drivers

If writing to a JDBC sink not listed above, or if it is necessary to use a custom driver not packaged with Beam, one must define a JDBC driver and class name.

For example, writing to a MySQL table:

- type: WriteToJdbc
    driver_jars: "path/to/some/jdbc.jar"
    driver_class_name: "com.mysql.jdbc.Driver"
    url: "jdbc:mysql://my-host:3306/database"
    table: "my-table"

Connection Properties

Connection properties are properties sent to the Driver used to connect to the JDBC source. For example, to set the character encoding to UTF-8, one could write:

- type: WriteToJdbc
    connectionProperties: "characterEncoding=UTF-8;"

All properties should be semi-colon-delimited (e.g. "key1=value1;key2=value2;")


  • url string : Connection URL for the JDBC sink.

  • auto_sharding boolean (Optional) : If true, enables using a dynamically determined number of shards to write.

  • connection_init_sql Array[string] (Optional) : Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.

  • connection_properties string (Optional) : Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".

  • driver_class_name string (Optional) : Name of a Java Driver class to use to connect to the JDBC source. For example, "com.mysql.jdbc.Driver".

  • driver_jars string (Optional) : Comma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.

  • password string (Optional) : Password for the JDBC source.

  • table string (Optional) : Name of the table to write to.

  • batch_size int64 (Optional)

  • type string (Optional) : Type of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.

  • username string (Optional) : Username for the JDBC source.

  • query string (Optional) : SQL query used to insert records into the JDBC sink.


type: WriteToJdbc
input: ...
  url: "url"
  auto_sharding: true|false
  - "connection_init_sql"
  - "connection_init_sql"
  - ...
  connection_properties: "connection_properties"
  driver_class_name: "driver_class_name"
  driver_jars: "driver_jars"
  password: "password"
  table: "table"
  batch_size: batch_size
  type: "type"
  username: "username"
  query: "query"


A PTransform for reading json values from files into a PCollection.


  • path string : The file path to read from. The path can contain glob characters such as * and ?.


type: ReadFromJson
  path: "path"


A PTransform for writing a PCollection as json values to files.


  • path string : The file path to write to. The files written will begin with this prefix, followed by a shard identifier (see num_shards) according to the file_naming parameter.


type: WriteToJson
input: ...
  path: "path"



  • schema string (Optional) : The schema in which the data is encoded in the Kafka topic. For AVRO data, this is a schema defined with AVRO schema syntax ( For JSON data, this is a schema defined with JSON-schema syntax ( If a URL to Confluent Schema Registry is provided, then this field is ignored, and the schema is fetched from Confluent Schema Registry.

  • consumer_config Map[string, string] (Optional) : A list of key-value pairs that act as configuration parameters for Kafka consumers. Most of these configurations will not be needed, but if you need to customize your Kafka consumer, you may use this. See a detailed list:

  • format string (Optional) : The encoding format for the data stored in Kafka. Valid options are: RAW,AVRO,JSON,PROTO

  • topic string

  • bootstrap_servers string : A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,...

  • confluent_schema_registry_url string (Optional)

  • confluent_schema_registry_subject string (Optional)

  • auto_offset_reset_config string (Optional) : What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. (1) earliest: automatically reset the offset to the earliest offset. (2) latest: automatically reset the offset to the latest offset (3) none: throw exception to the consumer if no previous offset is found for the consumer’s group

  • error_handling Row (Optional) : This option specifies whether and where to output unwritable rows.

    Row fields:

    • output string : Name to use for the output error collection
  • file_descriptor_path string (Optional) : The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization.

  • message_name string (Optional) : The name of the Protocol Buffer message to be used for schema extraction and data conversion.


type: ReadFromKafka
  schema: "schema"
    a: "consumer_config_value_a"
    b: "consumer_config_value_b"
    c: ...
  format: "format"
  topic: "topic"
  bootstrap_servers: "bootstrap_servers"
  confluent_schema_registry_url: "confluent_schema_registry_url"
  confluent_schema_registry_subject: "confluent_schema_registry_subject"
  auto_offset_reset_config: "auto_offset_reset_config"
    output: "output"
  file_descriptor_path: "file_descriptor_path"
  message_name: "message_name"



  • format string : The encoding format for the data stored in Kafka. Valid options are: RAW,JSON,AVRO,PROTO

  • topic string

  • bootstrap_servers string : A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. | Format: host1:port1,host2:port2,...

  • producer_config_updates Map[string, string] (Optional) : A list of key-value pairs that act as configuration parameters for Kafka producers. Most of these configurations will not be needed, but if you need to customize your Kafka producer, you may use this. See a detailed list:

  • error_handling Row (Optional) : This option specifies whether and where to output unwritable rows.

    Row fields:

    • output string : Name to use for the output error collection
  • file_descriptor_path string (Optional) : The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization.

  • message_name string (Optional) : The name of the Protocol Buffer message to be used for schema extraction and data conversion.

  • schema string (Optional)


type: WriteToKafka
input: ...
  format: "format"
  topic: "topic"
  bootstrap_servers: "bootstrap_servers"
    a: "producer_config_updates_value_a"
    b: "producer_config_updates_value_b"
    c: ...
    output: "output"
  file_descriptor_path: "file_descriptor_path"
  message_name: "message_name"
  schema: "schema"


Read from a MySQL source using a SQL query or by directly accessing a single table.

This is a special case of ReadFromJdbc that includes the necessary MySQL Driver and classes.

An example of using ReadFromMySql with SQL query:

- type: ReadFromMySql
    url: "jdbc:mysql://my-host:3306/database"
    query: "SELECT * FROM table"

It is also possible to read a table by specifying a table name. For example, the following configuration will perform a read on an entire table:

- type: ReadFromMySql
    url: "jdbc:mysql://my-host:3306/database"
    table: "my-table"

Advanced Usage

It might be necessary to use a custom JDBC driver that is not packaged with this transform. If that is the case, see ReadFromJdbc which allows for more custom configuration.


  • url string : Connection URL for the JDBC source.

  • connection_init_sql Array[string] (Optional) : Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.

  • connection_properties string (Optional) : Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".

  • disable_auto_commit boolean (Optional) : Whether to disable auto commit on read. Defaults to true if not provided. The need for this config varies depending on the database platform. Informix requires this to be set to false while Postgres requires this to be set to true.

  • fetch_size int32 (Optional) : This method is used to override the size of the data that is going to be fetched and loaded in memory per every database call. It should ONLY be used if the default value throws memory errors.

  • output_parallelization boolean (Optional) : Whether to reshuffle the resulting PCollection so results are distributed to all workers.

  • password string (Optional) : Password for the JDBC source.

  • query string (Optional) : SQL query used to query the JDBC source.

  • table string (Optional) : Name of the table to read from.

  • username string (Optional) : Username for the JDBC source.


type: ReadFromMySql
  url: "url"
  - "connection_init_sql"
  - "connection_init_sql"
  - ...
  connection_properties: "connection_properties"
  disable_auto_commit: true|false
  fetch_size: fetch_size
  output_parallelization: true|false
  password: "password"
  query: "query"
  table: "table"
  username: "username"


Write to a MySQL sink using a SQL query or by directly accessing a single table.

This is a special case of WriteToJdbc that includes the necessary MySQL Driver and classes.

An example of using WriteToMySql with SQL query:

- type: WriteToMySql
    url: "jdbc:mysql://my-host:3306/database"
    query: "INSERT INTO table VALUES(?, ?)"

It is also possible to read a table by specifying a table name. For example, the following configuration will perform a read on an entire table:

- type: WriteToMySql
    url: "jdbc:mysql://my-host:3306/database"
    table: "my-table"

Advanced Usage

It might be necessary to use a custom JDBC driver that is not packaged with this transform. If that is the case, see WriteToJdbc which allows for more custom configuration.


  • url string : Connection URL for the JDBC sink.

  • auto_sharding boolean (Optional) : If true, enables using a dynamically determined number of shards to write.

  • connection_init_sql Array[string] (Optional) : Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.

  • connection_properties string (Optional) : Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".

  • password string (Optional) : Password for the JDBC source.

  • table string (Optional) : Name of the table to write to.

  • batch_size int64 (Optional)

  • username string (Optional) : Username for the JDBC source.

  • query string (Optional) : SQL query used to insert records into the JDBC sink.


type: WriteToMySql
input: ...
  url: "url"
  auto_sharding: true|false
  - "connection_init_sql"
  - "connection_init_sql"
  - ...
  connection_properties: "connection_properties"
  password: "password"
  table: "table"
  batch_size: batch_size
  username: "username"
  query: "query"


Read from a Oracle source using a SQL query or by directly accessing a single table.

This is a special case of ReadFromJdbc that includes the necessary Oracle Driver and classes.

An example of using ReadFromOracle with SQL query:

- type: ReadFromOracle
    url: "jdbc:oracle://my-host:1521/database"
    query: "SELECT * FROM table"

It is also possible to read a table by specifying a table name. For example, the following configuration will perform a read on an entire table:

- type: ReadFromOracle
    url: "jdbc:oracle://my-host:1521/database"
    table: "my-table"

Advanced Usage

It might be necessary to use a custom JDBC driver that is not packaged with this transform. If that is the case, see ReadFromJdbc which allows for more custom configuration.


  • url string : Connection URL for the JDBC source.

  • connection_properties string (Optional) : Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".

  • disable_auto_commit boolean (Optional) : Whether to disable auto commit on read. Defaults to true if not provided. The need for this config varies depending on the database platform. Informix requires this to be set to false while Postgres requires this to be set to true.

  • fetch_size int32 (Optional) : This method is used to override the size of the data that is going to be fetched and loaded in memory per every database call. It should ONLY be used if the default value throws memory errors.

  • output_parallelization boolean (Optional) : Whether to reshuffle the resulting PCollection so results are distributed to all workers.

  • password string (Optional) : Password for the JDBC source.

  • query string (Optional) : SQL query used to query the JDBC source.

  • table string (Optional) : Name of the table to read from.

  • username string (Optional) : Username for the JDBC source.


type: ReadFromOracle
  url: "url"
  connection_properties: "connection_properties"
  disable_auto_commit: true|false
  fetch_size: fetch_size
  output_parallelization: true|false
  password: "password"
  query: "query"
  table: "table"
  username: "username"


Write to a Oracle sink using a SQL query or by directly accessing a single table.

This is a special case of WriteToJdbc that includes the necessary Oracle Driver and classes.

An example of using WriteToOracle with SQL query:

- type: WriteToOracle
    url: "jdbc:oracle://my-host:1521/database"
    query: "INSERT INTO table VALUES(?, ?)"

It is also possible to read a table by specifying a table name. For example, the following configuration will perform a read on an entire table:

- type: WriteToOracle
    url: "jdbc:oracle://my-host:1521/database"
    table: "my-table"

Advanced Usage

It might be necessary to use a custom JDBC driver that is not packaged with this transform. If that is the case, see WriteToJdbc which allows for more custom configuration.


  • url string : Connection URL for the JDBC sink.

  • auto_sharding boolean (Optional) : If true, enables using a dynamically determined number of shards to write.

  • connection_properties string (Optional) : Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".

  • password string (Optional) : Password for the JDBC source.

  • table string (Optional) : Name of the table to write to.

  • batch_size int64 (Optional)

  • username string (Optional) : Username for the JDBC source.

  • query string (Optional) : SQL query used to insert records into the JDBC sink.


type: WriteToOracle
input: ...
  url: "url"
  auto_sharding: true|false
  connection_properties: "connection_properties"
  password: "password"
  table: "table"
  batch_size: batch_size
  username: "username"
  query: "query"


A PTransform for reading Parquet files.


  • path ? (Optional)


type: ReadFromParquet
  path: path


A PTransform for writing parquet files.


  • path ? (Optional)


type: WriteToParquet
input: ...
  path: path


Read from a Postgres source using a SQL query or by directly accessing a single table.

This is a special case of ReadFromJdbc that includes the necessary Postgres Driver and classes.

An example of using ReadFromPostgres with SQL query:

- type: ReadFromPostgres
    url: "jdbc:postgresql://my-host:5432/database"
    query: "SELECT * FROM table"

It is also possible to read a table by specifying a table name. For example, the following configuration will perform a read on an entire table:

- type: ReadFromPostgres
    url: "jdbc:postgresql://my-host:5432/database"
    table: "my-table"

Advanced Usage

It might be necessary to use a custom JDBC driver that is not packaged with this transform. If that is the case, see ReadFromJdbc which allows for more custom configuration.


  • url string : Connection URL for the JDBC source.

  • connection_properties string (Optional) : Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".

  • disable_auto_commit boolean (Optional) : Whether to disable auto commit on read. Defaults to true if not provided. The need for this config varies depending on the database platform. Informix requires this to be set to false while Postgres requires this to be set to true.

  • fetch_size int32 (Optional) : This method is used to override the size of the data that is going to be fetched and loaded in memory per every database call. It should ONLY be used if the default value throws memory errors.

  • output_parallelization boolean (Optional) : Whether to reshuffle the resulting PCollection so results are distributed to all workers.

  • password string (Optional) : Password for the JDBC source.

  • query string (Optional) : SQL query used to query the JDBC source.

  • table string (Optional) : Name of the table to read from.

  • username string (Optional) : Username for the JDBC source.


type: ReadFromPostgres
  url: "url"
  connection_properties: "connection_properties"
  disable_auto_commit: true|false
  fetch_size: fetch_size
  output_parallelization: true|false
  password: "password"
  query: "query"
  table: "table"
  username: "username"


Write to a Postgres sink using a SQL query or by directly accessing a single table.

This is a special case of WriteToJdbc that includes the necessary Postgres Driver and classes.

An example of using WriteToPostgres with SQL query:

- type: WriteToPostgres
    url: "jdbc:postgresql://my-host:5432/database"
    query: "INSERT INTO table VALUES(?, ?)"

It is also possible to read a table by specifying a table name. For example, the following configuration will perform a read on an entire table:

- type: WriteToPostgres
    url: "jdbc:postgresql://my-host:5432/database"
    table: "my-table"

Advanced Usage

It might be necessary to use a custom JDBC driver that is not packaged with this transform. If that is the case, see WriteToJdbc which allows for more custom configuration.


  • url string : Connection URL for the JDBC sink.

  • auto_sharding boolean (Optional) : If true, enables using a dynamically determined number of shards to write.

  • connection_properties string (Optional) : Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".

  • password string (Optional) : Password for the JDBC source.

  • table string (Optional) : Name of the table to write to.

  • batch_size int64 (Optional)

  • username string (Optional) : Username for the JDBC source.

  • query string (Optional) : SQL query used to insert records into the JDBC sink.


type: WriteToPostgres
input: ...
  url: "url"
  auto_sharding: true|false
  connection_properties: "connection_properties"
  password: "password"
  table: "table"
  batch_size: batch_size
  username: "username"
  query: "query"


Reads messages from Cloud Pub/Sub.


  • topic string (Optional) : Cloud Pub/Sub topic in the form "projects//topics/". If provided, subscription must be None.

  • subscription string (Optional) : Existing Cloud Pub/Sub subscription to use in the form "projects//subscriptions/". If not specified, a temporary subscription will be created from the specified topic. If provided, topic must be None.

  • format string : The expected format of the message payload. Currently suported formats are

    • RAW: Produces records with a single payload field whose contents are the raw bytes of the pubsub message.
    • AVRO: Parses records with a given Avro schema.
    • JSON: Parses records with a given JSON schema.
  • schema ? (Optional) : Schema specification for the given format.

  • attributes Array[string] (Optional) : List of attribute keys whose values will be flattened into the output message as additional fields. For example, if the format is raw and attributes is ["a", "b"] then this read will produce elements of the form Row(payload=..., a=..., b=...).

  • attributes_map string (Optional)

  • id_attribute string (Optional) : The attribute on incoming Pub/Sub messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for deduplication of messages. If not provided, we cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort.

  • timestamp_attribute string (Optional) : Message value to use as element timestamp. If None, uses message publishing time as the timestamp.

    Timestamp values should be in one of two formats:

    • A numerical value representing the number of milliseconds since the Unix epoch.
    • A string in RFC 3339 format, UTC timezone. Example: 2015-10-29T23:41:41.123Z. The sub-second component of the timestamp is optional, and digits beyond the first three (i.e., time units smaller than milliseconds) may be ignored.
  • error_handling Row (Optional) : This option specifies whether and where to output error rows.

    Row fields:

    • output string : Name to use for the output error collection


type: ReadFromPubSub
  topic: "topic"
  subscription: "subscription"
  format: "format"
  schema: schema
  - "attribute"
  - "attribute"
  - ...
  attributes_map: "attributes_map"
  id_attribute: "id_attribute"
  timestamp_attribute: "timestamp_attribute"
    output: "output"


Writes messages to Cloud Pub/Sub.


  • topic string : Cloud Pub/Sub topic in the form "/topics//".

  • format string : How to format the message payload. Currently suported formats are

    • RAW: Expects a message with a single field (excluding attribute-related fields) whose contents are used as the raw bytes of the pubsub message.
    • AVRO: Encodes records with a given Avro schema, which may be inferred from the input PCollection schema.
    • JSON: Formats records with a given JSON schema, which may be inferred from the input PCollection schema.
  • schema ? (Optional) : Schema specification for the given format.

  • attributes Array[string] (Optional) : List of attribute keys whose values will be pulled out as PubSub message attributes. For example, if the format is raw and attributes is ["a", "b"] then elements of the form Row(any_field=..., a=..., b=...) will result in PubSub messages whose payload has the contents of any_field and whose attribute will be populated with the values of a and b.

  • attributes_map string (Optional)

  • id_attribute string (Optional) : If set, will set an attribute for each Cloud Pub/Sub message with the given name and a unique value. This attribute can then be used in a ReadFromPubSub PTransform to deduplicate messages.

  • timestamp_attribute string (Optional) : If set, will set an attribute for each Cloud Pub/Sub message with the given name and the message's publish time as the value.

  • error_handling Row (Optional) : This option specifies whether and where to output error rows.

    Row fields:

    • output string : Name to use for the output error collection


type: WriteToPubSub
input: ...
  topic: "topic"
  format: "format"
  schema: schema
  - "attribute"
  - "attribute"
  - ...
  attributes_map: "attributes_map"
  id_attribute: "id_attribute"
  timestamp_attribute: "timestamp_attribute"
    output: "output"


Performs a read from Google Pub/Sub Lite.

Note: This provider is deprecated. See Pub/Sub Lite documentation for more information.


  • project string (Optional) : The GCP project where the Pubsub Lite reservation resides. This can be a project number of a project ID.

  • schema string (Optional) : The schema in which the data is encoded in the Pubsub Lite topic. For AVRO data, this is a schema defined with AVRO schema syntax ( For JSON data, this is a schema defined with JSON-schema syntax (

  • format string : The encoding format for the data stored in Pubsub Lite. Valid options are: RAW,AVRO,JSON,PROTO

  • subscription_name string : The name of the subscription to consume data. This will be concatenated with the project and location parameters to build a full subscription path.

  • location string : The region or zone where the Pubsub Lite reservation resides.

  • attributes Array[string] (Optional) : List of attribute keys whose values will be flattened into the output message as additional fields. For example, if the format is RAW and attributes is ["a", "b"] then this read will produce elements of the form Row(payload=..., a=..., b=...)

  • attribute_map string (Optional) : Name of a field in which to store the full set of attributes associated with this message. For example, if the format is RAW and attribute_map is set to "attrs" then this read will produce elements of the form Row(payload=..., attrs=...) where attrs is a Map type of string to string. If both attributes and attribute_map are set, the overlapping attribute values will be present in both the flattened structure and the attribute map.

  • attribute_id string (Optional) : The attribute on incoming Pubsub Lite messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for deduplication of messages. If not provided, we cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort.

  • error_handling Row (Optional) : This option specifies whether and where to output unwritable rows.

    Row fields:

    • output string : Name to use for the output error collection
  • file_descriptor_path string (Optional) : The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization.

  • message_name string (Optional) : The name of the Protocol Buffer message to be used for schema extraction and data conversion.


type: ReadFromPubSubLite
  project: "project"
  schema: "schema"
  format: "format"
  subscription_name: "subscription_name"
  location: "location"
  - "attribute"
  - "attribute"
  - ...
  attribute_map: "attribute_map"
  attribute_id: "attribute_id"
    output: "output"
  file_descriptor_path: "file_descriptor_path"
  message_name: "message_name"


Performs a write to Google Pub/Sub Lite.

Note: This provider is deprecated. See Pub/Sub Lite documentation for more information.


  • project string : The GCP project where the Pubsub Lite reservation resides. This can be a project number of a project ID.

  • format string : The encoding format for the data stored in Pubsub Lite. Valid options are: RAW,JSON,AVRO,PROTO

  • topic_name string : The name of the topic to publish data into. This will be concatenated with the project and location parameters to build a full topic path.

  • location string : The region or zone where the Pubsub Lite reservation resides.

  • attributes Array[string] (Optional) : List of attribute keys whose values will be pulled out as Pubsub Lite message attributes. For example, if the format is JSON and attributes is ["a", "b"] then elements of the form Row(any_field=..., a=..., b=...) will result in Pubsub Lite messages whose payload has the contents of any_field and whose attribute will be populated with the values of a and b.

  • attribute_id string (Optional) : If set, will set an attribute for each Pubsub Lite message with the given name and a unique value. This attribute can then be used in a ReadFromPubSubLite PTransform to deduplicate messages.

  • error_handling Row (Optional) : This option specifies whether and where to output unwritable rows.

    Row fields:

    • output string : Name to use for the output error collection
  • file_descriptor_path string (Optional) : The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization.

  • message_name string (Optional) : The name of the Protocol Buffer message to be used for schema extraction and data conversion.

  • schema string (Optional)


type: WriteToPubSubLite
input: ...
  project: "project"
  format: "format"
  topic_name: "topic_name"
  location: "location"
  - "attribute"
  - "attribute"
  - ...
  attribute_id: "attribute_id"
    output: "output"
  file_descriptor_path: "file_descriptor_path"
  message_name: "message_name"
  schema: "schema"


Performs a Bulk read from Google Cloud Spanner using a specified SQL query or by directly accessing a single table and its columns.

Both Query and Read APIs are supported. See more information about reading from Cloud Spanner.

Example configuration for performing a read using a SQL query:

- type: ReadFromSpanner
    instance_id: 'my-instance-id'
    database_id: 'my-database'
    query: 'SELECT * FROM table'

It is also possible to read a table by specifying a table name and a list of columns. For example, the following configuration will perform a read on an entire table:

- type: ReadFromSpanner
    instance_id: 'my-instance-id'
    database_id: 'my-database'
    table: 'my-table'
    columns: ['col1', 'col2']

Additionally, to read using a Secondary Index, specify the index name:

- type: ReadFromSpanner
    instance_id: 'my-instance-id'
    database_id: 'my-database'
    table: 'my-table'
    index: 'my-index'
    columns: ['col1', 'col2']

Advanced Usage

Reads by default use the PartitionQuery API which enforces some limitations on the type of queries that can be used so that the data can be read in parallel. If the query is not supported by the PartitionQuery API, then you can specify a non-partitioned read by setting batching to false.

For example:

- type: ReadFromSpanner
    batching: false

Note: See SpannerIO for more advanced information.


  • project string (Optional) : Specifies the GCP project ID.

  • instance string : Specifies the Cloud Spanner instance.

  • database string : Specifies the Cloud Spanner database.

  • table string (Optional) : Specifies the Cloud Spanner table.

  • query string (Optional) : Specifies the SQL query to execute.

  • columns Array[string] (Optional) : Specifies the columns to read from the table. This parameter is required when table is specified.

  • index string (Optional) : Specifies the Index to read from. This parameter can only be specified when using table.

  • batching boolean (Optional) : Set to false to disable batching. Useful when using a query that is not compatible with the PartitionQuery API. Defaults to true.


type: ReadFromSpanner
  project: "project"
  instance: "instance"
  database: "database"
  table: "table"
  query: "query"
  - "columns"
  - "columns"
  - ...
  index: "index"
  batching: true|false


Performs a bulk write to a Google Cloud Spanner table.

Example configuration for performing a write to a single table:

- type: ReadFromSpanner
    project_id: 'my-project-id'
    instance_id: 'my-instance-id'
    database_id: 'my-database'
    table: 'my-table'

Note: See SpannerIO for more advanced information.


  • project string (Optional) : Specifies the GCP project.

  • instance string : Specifies the Cloud Spanner instance.

  • database string : Specifies the Cloud Spanner database.

  • table string : Specifies the Cloud Spanner table.

  • error_handling Row (Optional) : Whether and how to handle write errors.

    Row fields:

    • output string : Name to use for the output error collection


type: WriteToSpanner
input: ...
  project: "project"
  instance: "instance"
  database: "database"
  table: "table"
    output: "output"


Read from a SQL Server source using a SQL query or by directly accessing a single table.

This is a special case of ReadFromJdbc that includes the necessary SQL Server Driver and classes.

An example of using ReadFromSqlServer with SQL query:

- type: ReadFromSqlServer
    url: "jdbc:sqlserver://my-host:1433/database"
    query: "SELECT * FROM table"

It is also possible to read a table by specifying a table name. For example, the following configuration will perform a read on an entire table:

- type: ReadFromSqlServer
    url: "jdbc:sqlserver://my-host:1433/database"
    table: "my-table"

Advanced Usage

It might be necessary to use a custom JDBC driver that is not packaged with this transform. If that is the case, see ReadFromJdbc which allows for more custom configuration.


  • url string : Connection URL for the JDBC source.

  • connection_properties string (Optional) : Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".

  • disable_auto_commit boolean (Optional) : Whether to disable auto commit on read. Defaults to true if not provided. The need for this config varies depending on the database platform. Informix requires this to be set to false while Postgres requires this to be set to true.

  • fetch_size int32 (Optional) : This method is used to override the size of the data that is going to be fetched and loaded in memory per every database call. It should ONLY be used if the default value throws memory errors.

  • output_parallelization boolean (Optional) : Whether to reshuffle the resulting PCollection so results are distributed to all workers.

  • password string (Optional) : Password for the JDBC source.

  • query string (Optional) : SQL query used to query the JDBC source.

  • table string (Optional) : Name of the table to read from.

  • username string (Optional) : Username for the JDBC source.


type: ReadFromSqlServer
  url: "url"
  connection_properties: "connection_properties"
  disable_auto_commit: true|false
  fetch_size: fetch_size
  output_parallelization: true|false
  password: "password"
  query: "query"
  table: "table"
  username: "username"


Write to a SQL Server sink using a SQL query or by directly accessing a single table.

This is a special case of WriteToJdbc that includes the necessary SQL Server Driver and classes.

An example of using WriteToSqlServer with SQL query:

- type: WriteToSqlServer
    url: "jdbc:sqlserver://my-host:1433/database"
    query: "INSERT INTO table VALUES(?, ?)"

It is also possible to read a table by specifying a table name. For example, the following configuration will perform a read on an entire table:

- type: WriteToSqlServer
    url: "jdbc:sqlserver://my-host:1433/database"
    table: "my-table"

Advanced Usage

It might be necessary to use a custom JDBC driver that is not packaged with this transform. If that is the case, see WriteToJdbc which allows for more custom configuration.


  • url string : Connection URL for the JDBC sink.

  • auto_sharding boolean (Optional) : If true, enables using a dynamically determined number of shards to write.

  • connection_properties string (Optional) : Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".

  • password string (Optional) : Password for the JDBC source.

  • table string (Optional) : Name of the table to write to.

  • batch_size int64 (Optional)

  • username string (Optional) : Username for the JDBC source.

  • query string (Optional) : SQL query used to insert records into the JDBC sink.


type: WriteToSqlServer
input: ...
  url: "url"
  auto_sharding: true|false
  connection_properties: "connection_properties"
  password: "password"
  table: "table"
  batch_size: batch_size
  username: "username"
  query: "query"


Reads lines from a text files.

The resulting PCollection consists of rows with a single string filed named "line."


  • path string : The file path to read from. The path can contain glob characters such as * and ?.


type: ReadFromText
  path: "path"


Writes a PCollection to a (set of) text files(s).

The input must be a PCollection whose schema has exactly one field.


  • path string : The file path to write to. The files written will begin with this prefix, followed by a shard identifier.


type: WriteToText
input: ...
  path: "path"