Asserts that the input contains exactly the elements provided.
This is primarily used for testing; it will cause the entire pipeline to
fail if the input to this transform is not exactly the set of elements
given in the config parameter.
As with Create, YAML/JSON-style mappings are interpreted as Beam rows, e.g.
type: AssertEqual
input: SomeTransform
config:
elements:
- {a: 0, b: "foo"}
- {a: 1, b: "bar"}
would ensure that SomeTransform
produced exactly two elements with values
(a=0, b="foo")
and (a=1, b="bar")
respectively.
Array[?]
: The set of elements that should belong to the PCollection.
YAML/JSON-style mappings will be interpreted as Beam rows.type: AssertEqual
input: ...
config:
elements:
- element
- element
- ...
Assigns a new timestamp each element of its input.
This can be useful when reading records that have the timestamp embedded in them, for example with various file types or other sources that by default set all timestamps to the infinite past.
Note that the timestamp should only be set forward, as setting it backwards may not cause it to hold back an already advanced watermark and the data could become droppably late.
Supported languages: generic, javascript, python.
timestamp ?
(Optional) : A field, callable, or expression giving the new timestamp.
language string
(Optional) : The language of the timestamp expression.
error_handling Row
(Optional) : Whether and how to handle errors during timestamp
evaluation.
Row fields:
string
: Name to use for the output error collectiontype: AssignTimestamps
input: ...
config:
timestamp: timestamp
language: "language"
error_handling:
output: "output"
Groups and combines records sharing common fields.
Built-in combine functions are sum
, max
, min
, all
, any
, mean
, count
, group
, concat
but custom aggregation functions can be used as well.
See also the documentation on YAML Aggregation.
Supported languages: calcite, generic, javascript, python, sql.
group_by Array[string]
: The field(s) to aggregate on.
combine Map[string, Map[string, ?]]
: The aggregation function to use.
language string
(Optional) : The language used to define (and execute) the
custom callables in combine
. Defaults to generic.
type: Combine
input: ...
config:
group_by:
- "group_by"
- "group_by"
- ...
combine:
a:
a: combine_value_a_value_a
b: combine_value_a_value_b
c: ...
b:
a: combine_value_b_value_a
b: combine_value_b_value_b
c: ...
c: ...
language: "language"
Creates a collection containing a specified set of elements.
This transform always produces schema'd data. For example
type: Create
config:
elements: [1, 2, 3]
will result in an output with three elements with a schema of Row(element=int) whereas YAML/JSON-style mappings will be interpreted directly as Beam rows, e.g.
type: Create
config:
elements:
- {first: 0, second: {str: "foo", values: [1, 2, 3]}}
- {first: 1, second: {str: "bar", values: [4, 5, 6]}}
will result in a schema of the form (int, Row(string, List[int])).
This can also be expressed as YAML
type: Create
config:
elements:
- first: 0
second:
str: "foo"
values: [1, 2, 3]
- first: 1
second:
str: "bar"
values: [4, 5, 6]
elements Array[?]
: The set of elements that should belong to the PCollection.
YAML/JSON-style mappings will be interpreted as Beam rows.
Primitives will be mapped to rows with a single "element" field.
reshuffle boolean
(Optional) : (optional) Whether to introduce a reshuffle (to possibly
redistribute the work) if there is more than one element in the
collection. Defaults to True.
type: Create
config:
elements:
- element
- element
- ...
reshuffle: true|false
The Enrichment transform allows one to dynamically enhance elements in a pipeline by performing key-value lookups against external services like APIs or databases.
Example using BigTable:
- type: Enrichment
config:
enrichment_handler: 'BigTable'
handler_config:
project_id: 'apache-beam-testing'
instance_id: 'beam-test'
table_id: 'bigtable-enrichment-test'
row_key: 'product_id'
timeout: 30
For more information on Enrichment, see the Beam docs.
enrichment_handler string
: Specifies the source from where data needs
to be extracted into the pipeline for enriching data. One of
"BigQuery", "BigTable", "FeastFeatureStore" or "VertexAIFeatureStore".
handler_config Map[string, ?]
: Specifies the parameters for the respective
enrichment_handler in a YAML/JSON format. To see the full set of
handler_config parameters, see their corresponding doc pages:
timeout double
(Optional) : Timeout for source requests in seconds. Defaults to 30
seconds.
type: Enrichment
input: ...
config:
enrichment_handler: "enrichment_handler"
handler_config:
a: handler_config_value_a
b: handler_config_value_b
c: ...
timeout: timeout
Explodes (aka unnest/flatten) one or more fields producing multiple rows.
Given one or more fields of iterable type, produces multiple rows, one for
each value of that field. For example, a row of the form ('a', [1, 2, 3])
would expand to ('a', 1)
, ('a', 2')
, and ('a', 3)
when exploded on
the second field.
This is akin to a FlatMap
when paired with the MapToFields transform.
See more complete documentation on YAML Mapping Functions.
fields ?
(Optional) : The list of fields to expand.
cross_product boolean
(Optional) : If multiple fields are specified, indicates whether the
full cross-product of combinations should be produced, or if the
first element of the first field corresponds to the first element
of the second field, etc. For example, the row
(['a', 'b'], [1, 2])
would expand to the four rows
('a', 1)
, ('a', 2)
, ('b', 1)
, and ('b', 2)
when
cross_product
is set to true
but only the two rows
('a', 1)
and ('b', 2)
when it is set to false
.
Only meaningful (and required) if multiple rows are specified.
error_handling Row
(Optional) : Whether and how to handle errors during iteration.
Row fields:
string
: Name to use for the output error collectiontype: Explode
input: ...
config:
fields: fields
cross_product: true|false
error_handling:
a: error_handling_value_a
b: error_handling_value_b
c: ...
Keeps only records that satisfy the given criteria.
See more complete documentation on YAML Filtering.
Supported languages: calcite, generic, java, javascript, python, sql.
keep ?
(Optional) : An expression evaluating to true for those records that should be kept.
language string
(Optional) : The language of the above expression.
Defaults to generic.
error_handling Row
(Optional) : Whether and where to output records that throw errors when
the above expressions are evaluated.
Row fields:
string
: Name to use for the output error collectiontype: Filter
input: ...
config:
keep: keep
language: "language"
error_handling:
output: "output"
Flattens multiple PCollections into a single PCollection.
The elements of the resulting PCollection will be the (disjoint) union of all the elements of all the inputs.
Note that in YAML transforms can always take a list of inputs which will be implicitly flattened.
No configuration parameters.
type: Flatten
input: ...
config: ...
Joins two or more inputs using a specified condition.
For example
type: Join
input:
input1: SomeTransform
input2: AnotherTransform
input3: YetAnotherTransform
config:
type: inner
equalities:
- input1: colA
input2: colB
- input2: colX
input3: colY
fields:
input1: [colA, colB, colC]
input2: {new_name: colB}
would perform an inner join on the three inputs satisfying the constraints
that input1.colA = input2.colB
and input2.colX = input3.colY
emitting rows with colA
, colB
and colC
from input1
, the values of
input2.colB
as a field called new_name
, and all the fields from input3
.
equalities ?
(Optional) : The condition to join on. A list of sets of columns that should
be equal to fulfill the join condition. For the simple scenario of
joining on the same column across all inputs where the column name is
the same, one can specify the column name as the equality rather than
having to list it for every input.
type ?
(Optional) : The type of join. Could be a string value in
["inner", "left", "right", "outer"] that specifies the type of join to
be performed. For scenarios with multiple inputs to join where different
join types are desired, specify the inputs to be outer joined. For
example, {outer: [input1, input2]}
means that input1
and input2
will be outer joined using the conditions specified, while other inputs
will be inner joined.
fields Map[string, ?]
(Optional) : The fields to be outputted. A mapping with the input alias as the
key and the list of fields in the input to be outputted.
The value in the map
can either be a dictionary with the new field name as the key and the
original field name as the value (e.g new_field_name: field_name), or a
list of the fields to be outputted with their original names
(e.g [col1, col2, col3]
), or an '*' indicating all fields in the
input will be outputted. If not specified, all fields from all inputs
will be outputted.
type: Join
input: ...
config:
equalities: equalities
type: type
fields:
a: fields_value_a
b: fields_value_b
c: ...
Logs each element of its input PCollection.
The output of this transform is a copy of its input for ease of use in chain-style pipelines.
level string
(Optional) : one of ERROR, INFO, or DEBUG, mapped to a corresponding
language-specific logging level
prefix string
(Optional) : an optional identifier that will get prepended to the element
being logged
error_handling Row
(Optional) : This option specifies whether and where to output error rows.
Row fields:
string
: Name to use for the output error collectiontype: LogForTesting
input: ...
config:
level: "level"
prefix: "prefix"
error_handling:
output: "output"
write_artifact_location string
(Optional)
read_artifact_location string
(Optional)
transforms Array[?]
(Optional)
type: MLTransform
input: ...
config:
write_artifact_location: "write_artifact_location"
read_artifact_location: "read_artifact_location"
transforms:
- transforms
- transforms
- ...
Creates records with new fields defined in terms of the input fields.
See more complete documentation on YAML Mapping Functions.
Supported languages: calcite, generic, java, javascript, python, sql.
fields Map[string, ?]
: The output fields to compute, each mapping to the expression or
callable that creates them.
append boolean
(Optional) : Whether to append the created fields to the set of
fields already present, outputting a union of both the new fields and
the original fields for each record. Defaults to False.
drop Array[string]
(Optional) : If append
is true, enumerates a subset of fields from the
original record that should not be kept
language string
(Optional) : The language used to define (and execute) the
expressions and/or callables in fields
. Defaults to generic.
error_handling Row
(Optional) : Whether and where to output records that throw errors when
the above expressions are evaluated.
Row fields:
string
: Name to use for the output error collectiontype: MapToFields
input: ...
config:
fields:
a: fields_value_a
b: fields_value_b
c: ...
append: true|false
drop:
- "drop"
- "drop"
- ...
language: "language"
error_handling:
output: "output"
Splits an input into several distinct outputs.
Each input element will go to a distinct output based on the field or
function given in the by
configuration parameter.
Supported languages: generic, javascript, python.
by ?
(Optional) : A field, callable, or expression giving the destination output for
this element. Should return a string that is a member of the outputs
parameter. If unknown_output
is also set, other returns values are
accepted as well, otherwise an error will be raised.
outputs Array[string]
: The set of outputs into which this input is being partitioned.
unknown_output string
(Optional) : (Optional) If set, indicates a destination output for any
elements that are not assigned an output listed in the outputs
parameter.
error_handling Row
(Optional) : (Optional) Whether and how to handle errors during
partitioning.
Row fields:
string
: Name to use for the output error collectionlanguage string
(Optional) : (Optional) The language of the by
expression.
type: Partition
input: ...
config:
by: by
outputs:
- "outputs"
- "outputs"
- ...
unknown_output: "unknown_output"
error_handling:
a: error_handling_value_a
b: error_handling_value_b
c: ...
language: "language"
A Python PTransform identified by fully qualified name.
This allows one to import, construct, and apply any Beam Python transform. This can be useful for using transforms that have not yet been exposed via a YAML interface. Note, however, that conversion may be required if this transform does not accept or produce Beam Rows.
For example
type: PyTransform
config:
constructor: apache_beam.pkg.mod.SomeClass
args: [1, 'foo']
kwargs:
baz: 3
can be used to access the transform
apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3)
.
See also the documentation on Inlining Python.
constructor string
: Fully qualified name of a callable used to construct the
transform. Often this is a class such as
apache_beam.pkg.mod.SomeClass
but it can also be a function or
any other callable that returns a PTransform.
args Array[?]
(Optional) : A list of parameters to pass to the callable as positional
arguments.
kwargs Map[string, ?]
(Optional) : A list of parameters to pass to the callable as keyword
arguments.
type: PyTransform
input: ...
config:
constructor: "constructor"
args:
- arg
- arg
- ...
kwargs:
a: kwargs_value_a
b: kwargs_value_b
c: ...
type: Sql
input: ...
config: ...
Strips error metadata from outputs returned via error handling.
Generally the error outputs for transformations return information about the error encountered (e.g. error messages and tracebacks) in addition to the failing element itself. This transformation attempts to remove that metadata and returns the bad element alone which can be useful for re-processing.
For example, in the following pipeline snippet
- name: MyMappingTransform
type: MapToFields
input: SomeInput
config:
language: python
fields:
...
error_handling:
output: errors
- name: RecoverOriginalElements
type: StripErrorMetadata
input: MyMappingTransform.errors
the output of RecoverOriginalElements
will contain exactly those elements
from SomeInput that failed to processes (whereas MyMappingTransform.errors
would contain those elements paired with error information).
Note that this relies on the preceding transform actually returning the failing input in a schema'd way. Most built-in transformation follow the correct conventions.
No configuration parameters.
type: StripErrorMetadata
input: ...
Validates each element of a PCollection against a json schema.
schema Map[string, ?]
: A json schema against which to validate each element.
error_handling Row
(Optional) : Whether and how to handle errors during iteration.
If this is not set, invalid elements will fail the pipeline, otherwise
invalid elements will be passed to the specified error output along
with information about how the schema was invalidated.
Row fields:
string
: Name to use for the output error collectiontype: ValidateWithSchema
input: ...
config:
schema:
a: schema_value_a
b: schema_value_b
c: ...
error_handling:
a: error_handling_value_a
b: error_handling_value_b
c: ...
A window transform assigning windows to each element of a PCollection.
The assigned windows will affect all downstream aggregating operations, which will aggregate by window as well as by key.
See the Beam documentation on windowing for more details.
Sizes, offsets, periods and gaps (where applicable) must be defined using a time unit suffix 'ms', 's', 'm', 'h' or 'd' for milliseconds, seconds, minutes, hours or days, respectively. If a time unit is not specified, it will default to 's'.
For example
windowing:
type: fixed
size: 30s
Note that any Yaml transform can have a windowing parameter, which is applied to its inputs (if any) or outputs (if there are no inputs) which means that explicit WindowInto operations are not typically needed.
?
(Optional) : the type and parameters of the windowing to performtype: WindowInto
input: ...
config:
windowing: windowing
A PTransform
for reading records from avro files.
Each record of the resulting PCollection will contain
a single record read from a source. Records that are of simple types will be
mapped to beam Rows with a single record
field containing the records
value. Records that are of Avro type RECORD
will be mapped to Beam rows
that comply with the schema contained in the Avro file that contains those
records.
?
(Optional)type: ReadFromAvro
config:
path: path
A PTransform
for writing avro files.
If the input has a schema, a corresponding avro schema will be automatically generated and used to write the output records.
?
(Optional)type: WriteToAvro
input: ...
config:
path: path
Reads data from BigQuery.
Exactly one of table or query must be set. If query is set, neither row_restriction nor fields should be set.
table string
(Optional) : The table to read from, specified as DATASET.TABLE
or PROJECT:DATASET.TABLE
.
query string
(Optional) : A query to be used instead of the table argument.
row_restriction string
(Optional) : Optional SQL text filtering statement, similar to a
WHERE clause in a query. Aggregates are not supported. Restricted to a
maximum length for 1 MB.
fields Array[string]
(Optional)
type: ReadFromBigQuery
config:
table: "table"
query: "query"
row_restriction: "row_restriction"
fields:
- "field"
- "field"
- ...
type: WriteToBigQuery
input: ...
config: ...
A PTransform for reading comma-separated values (csv) files into a PCollection.
path string
: The file path to read from. The path can contain glob
characters such as *
and ?
.
delimiter ?
(Optional)
comment ?
(Optional)
type: ReadFromCsv
config:
path: "path"
delimiter: delimiter
comment: comment
A PTransform for writing a schema'd PCollection as a (set of) comma-separated values (csv) files.
path string
: The file path to write to. The files written will
begin with this prefix, followed by a shard identifier (see
num_shards
) according to the file_naming
parameter.
delimiter ?
(Optional)
type: WriteToCsv
input: ...
config:
path: "path"
delimiter: delimiter
type: ReadFromJdbc
config: ...
type: WriteToJdbc
input: ...
config: ...
A PTransform for reading json values from files into a PCollection.
string
: The file path to read from. The path can contain glob
characters such as *
and ?
.type: ReadFromJson
config:
path: "path"
A PTransform for writing a PCollection as json values to files.
string
: The file path to write to. The files written will
begin with this prefix, followed by a shard identifier (see
num_shards
) according to the file_naming
parameter.type: WriteToJson
input: ...
config:
path: "path"
type: ReadFromKafka
config: ...
type: WriteToKafka
input: ...
config: ...
type: ReadFromMySql
config: ...
type: WriteToMySql
input: ...
config: ...
type: ReadFromOracle
config: ...
type: WriteToOracle
input: ...
config: ...
A PTransform
for reading Parquet files.
?
(Optional)type: ReadFromParquet
config:
path: path
A PTransform
for writing parquet files.
?
(Optional)type: WriteToParquet
input: ...
config:
path: path
type: ReadFromPostgres
config: ...
type: WriteToPostgres
input: ...
config: ...
Reads messages from Cloud Pub/Sub.
topic string
(Optional) : Cloud Pub/Sub topic in the form
"projects/
subscription string
(Optional) : Existing Cloud Pub/Sub subscription to use in the
form "projects/
format string
: The expected format of the message payload. Currently suported
formats are
payload
field whose contents
are the raw bytes of the pubsub message.schema ?
(Optional) : Schema specification for the given format.
attributes Array[string]
(Optional) : List of attribute keys whose values will be flattened into the
output message as additional fields. For example, if the format is raw
and attributes is ["a", "b"]
then this read will produce elements of
the form Row(payload=..., a=..., b=...)
.
attributes_map string
(Optional)
id_attribute string
(Optional) : The attribute on incoming Pub/Sub messages to use as a unique
record identifier. When specified, the value of this attribute (which
can be any string that uniquely identifies the record) will be used for
deduplication of messages. If not provided, we cannot guarantee
that no duplicate data will be delivered on the Pub/Sub stream. In this
case, deduplication of the stream will be strictly best effort.
timestamp_attribute string
(Optional) : Message value to use as element timestamp. If None,
uses message publishing time as the timestamp.
Timestamp values should be in one of two formats:
2015-10-29T23:41:41.123Z
. The sub-second component of the
timestamp is optional, and digits beyond the first three (i.e., time
units smaller than milliseconds) may be ignored.error_handling Row
(Optional) : This option specifies whether and where to output error rows.
Row fields:
string
: Name to use for the output error collectiontype: ReadFromPubSub
config:
topic: "topic"
subscription: "subscription"
format: "format"
schema: schema
attributes:
- "attribute"
- "attribute"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
Writes messages to Cloud Pub/Sub.
topic string
: Cloud Pub/Sub topic in the form "/topics/
format string
: How to format the message payload. Currently suported
formats are
schema ?
(Optional) : Schema specification for the given format.
attributes Array[string]
(Optional) : List of attribute keys whose values will be pulled out as
PubSub message attributes. For example, if the format is raw
and attributes is ["a", "b"]
then elements of the form
Row(any_field=..., a=..., b=...)
will result in PubSub messages whose
payload has the contents of any_field and whose attribute will be
populated with the values of a
and b
.
attributes_map string
(Optional)
id_attribute string
(Optional) : If set, will set an attribute for each Cloud Pub/Sub message
with the given name and a unique value. This attribute can then be used
in a ReadFromPubSub PTransform to deduplicate messages.
timestamp_attribute string
(Optional) : If set, will set an attribute for each Cloud Pub/Sub
message with the given name and the message's publish time as the value.
error_handling Row
(Optional) : This option specifies whether and where to output error rows.
Row fields:
string
: Name to use for the output error collectiontype: WriteToPubSub
input: ...
config:
topic: "topic"
format: "format"
schema: schema
attributes:
- "attribute"
- "attribute"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
type: ReadFromPubSubLite
config: ...
type: WriteToPubSubLite
input: ...
config: ...
type: ReadFromSpanner
config: ...
type: WriteToSpanner
input: ...
config: ...
type: ReadFromSqlServer
config: ...
type: WriteToSqlServer
input: ...
config: ...
Reads lines from a text files.
The resulting PCollection consists of rows with a single string filed named "line."
string
: The file path to read from. The path can contain glob
characters such as *
and ?
.type: ReadFromText
config:
path: "path"
Writes a PCollection to a (set of) text files(s).
The input must be a PCollection whose schema has exactly one field.
string
: The file path to write to. The files written will
begin with this prefix, followed by a shard identifier.type: WriteToText
input: ...
config:
path: "path"