Beam YAML API
Beam YAML is a declarative syntax for describing Apache Beam pipelines by using YAML files. You can use Beam YAML to author and run a Beam pipeline without writing any code.
Overview
Beam provides a powerful model for creating sophisticated data processing pipelines. However, getting started with Beam programming can be challenging because it requires writing code in one of the supported Beam SDK languages. You need to understand the APIs, set up a project, manage dependencies, and perform other programming tasks.
Beam YAML makes it easier to get started with creating Beam pipelines. Instead of writing code, you create a YAML file using any text editor. Then you submit the YAML file to be executed by a runner.
The Beam YAML syntax is designed to be human-readable but also suitable as an intermediate representation for tools. For example, a pipeline authoring GUI could output YAML, or a lineage analysis tool could consume the YAML pipeline specifications.
Beam YAML is still under development, but any features already included are considered stable. Feedback is welcome at dev@apache.beam.org.
Prerequisites
The Beam YAML parser is currently included as part of the Apache Beam Python SDK. You don’t need to write Python code to use Beam YAML, but you need the SDK to run pipelines locally.
We recommend creating a virtual environment so that all packages are installed in an isolated and self-contained environment. After you set up your Python environment, install the SDK as follows:
pip install apache_beam[yaml,gcp]
In addition, several of the provided transforms, such as the SQL transform, are implemented in Java and require a working Java interpeter. When you a run a pipeline with these transforms, the required artifacts are automatically downloaded from the Apache Maven repositories.
Getting started
Use a text editor to create a file named pipeline.yaml
. Paste the following
text into the file and save:
pipeline:
transforms:
- type: Create
config:
elements: [1, 2, 3]
- type: LogForTesting
input: Create
This file defines a simple pipeline with two transforms:
- The
Create
transform creates a collection. The value ofconfig
is a dictionary of configuration settings. In this case,elements
specifies the members of the collection. Other transform types have other configuration settings. - The
LogForTesting
transform logs each input element. This transform doesn’t require aconfig
setting. Theinput
key specifies thatLogForTesting
receives input from theCreate
transform.
Run the pipeline
To execute the pipeline, run the following Python command:
python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml
The output should contain log statements similar to the following:
INFO:root:{"element": 1}
INFO:root:{"element": 2}
INFO:root:{"element": 3}
Run the pipeline in Dataflow
You can submit a YAML pipeline to Dataflow by using the
gcloud CLI. To create a Dataflow job
from the YAML file, use the
gcloud dataflow yaml run
command:
gcloud dataflow yaml run $JOB_NAME \
--yaml-pipeline-file=pipeline.yaml \
--region=$REGION
When you use the gcloud
CLI, you don’t need to install the Beam SDKs locally.
Visualize the pipeline
You can use the
apache_beam.runners.render
module to render the pipeline execution graph as a PNG file, as follows:
Install Graphviz.
Run the following command:
python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml \ --runner=apache_beam.runners.render.RenderRunner \ --render_output=out.png
Example: Reading CSV data
The following pipeline reads data from a set of CSV files and writes the data in JSON format. This pipeline assumes the CSV files have a header row. The column names become JSON field names.
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: WriteToJson
config:
path: /path/to/output.json
input: ReadFromCsv
Add a filter
The Filter
transform filters records. It keeps input
records that satisfy a Boolean predicate and discards records that don’t
satisify the predicate. The following example keeps records where the value of
col3
is greater than 100:
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
input: ReadFromCsv
- type: WriteToJson
config:
path: /path/to/output.json
input: Filter
Add a mapping function
Beam YAML supports various mapping functions.
The following example uses the Sql
transform to group by col1
and output the
counts for each key.
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
input: ReadFromCsv
- type: Sql
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
input: Filter
- type: WriteToJson
config:
path: /path/to/output.json
input: Sql
Patterns
This section describes some common patterns in Beam YAML.
Named transforms
You can name the transforms in your pipeline to help with monitoring and debugging. Names are also used to disambiguate transforms if the pipeline contains more than one transform of the same type.
pipeline:
transforms:
- type: ReadFromCsv
name: ReadMyData
config:
path: /path/to/input*.csv
- type: Filter
name: KeepBigRecords
config:
language: python
keep: "col3 > 100"
input: ReadMyData
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
input: KeepBigRecords
- type: WriteToJson
name: WriteTheOutput
config:
path: /path/to/output.json
input: MySqlTransform
Chaining transforms
If a pipeline is linear (no branching or merging), you can designate the
pipeline as a chain
type. In a chain
-type pipeline, you don’t need to
specify the inputs. The inputs are implicit from the order they appear in the
YAML file:
pipeline:
type: chain
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
- type: WriteToJson
config:
path: /path/to/output.json
Source and sink transforms
As syntactic sugar, you can name the first and last transforms in your pipeline
as source
and sink
. This convention does not change the resulting pipeline,
but it signals the intent of the source and sink transforms.
pipeline:
type: chain
source:
type: ReadFromCsv
config:
path: /path/to/input*.csv
transforms:
- type: Filter
config:
language: python
keep: "col3 > 100"
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
sink:
type: WriteToJson
config:
path: /path/to/output.json
Non-linear pipelines
Beam YAML supports arbitrary non-linear pipelines. The following pipeline reads two sources, joins them, and writes two outputs:
pipeline:
transforms:
- type: ReadFromCsv
name: ReadLeft
config:
path: /path/to/left*.csv
- type: ReadFromCsv
name: ReadRight
config:
path: /path/to/right*.csv
- type: Sql
config:
query: select A.col1, B.col2 from A join B using (col3)
input:
A: ReadLeft
B: ReadRight
- type: WriteToJson
name: WriteAll
input: Sql
config:
path: /path/to/all.json
- type: Filter
name: FilterToBig
input: Sql
config:
language: python
keep: "col2 > 100"
- type: WriteToCsv
name: WriteBig
input: FilterToBig
config:
path: /path/to/big.csv
Because the pipeline is not linear, you must explicitly declare the inputs for
each transform. However, you can nest a chain
within a non-linear pipeline.
The chain is a linear sub-path within the pipeline.
The following example creates a chain named ExtraProcessingForBigRows
. The
chain takes input from the Sql
transform and applies several additional
filters plus a sink. Notice that within the chain, the inputs don’t need to be
specified.
pipeline:
transforms:
- type: ReadFromCsv
name: ReadLeft
config:
path: /path/to/left*.csv
- type: ReadFromCsv
name: ReadRight
config:
path: /path/to/right*.csv
- type: Sql
config:
query: select A.col1, B.col2 from A join B using (col3)
input:
A: ReadLeft
B: ReadRight
- type: WriteToJson
name: WriteAll
input: Sql
config:
path: /path/to/all.json
- type: chain
name: ExtraProcessingForBigRows
input: Sql
transforms:
- type: Filter
config:
language: python
keep: "col2 > 100"
- type: Filter
config:
language: python
keep: "len(col1) > 10"
- type: Filter
config:
language: python
keep: "col1 > 'z'"
sink:
type: WriteToCsv
config:
path: /path/to/big.csv
Windowing
This API can be used to define both streaming and batch pipelines.
In order to meaningfully aggregate elements in a streaming pipeline,
some kind of windowing is typically required. Beam’s
windowing
and triggering
can be declared using the same WindowInto
transform available in all other
Beam SDKs.
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: JSON
schema:
type: object
properties:
col1: {type: string}
col2: {type: integer}
col3: {type: number}
- type: WindowInto
windowing:
type: fixed
size: 60s
- type: SomeGroupingTransform
config:
arg: ...
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
Rather than using an explicit WindowInto
operation, you can tag a transform
with a specified windowing, which causes its inputs (and hence the transform
itself) to be applied with that windowing.
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: SomeGroupingTransform
config:
arg: ...
windowing:
type: sliding
size: 60s
period: 10s
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
Note that the Sql
operation itself is often a from of aggregation, and
applying a windowing (or consuming an already windowed input) causes all
grouping to be done per window.
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
windowing:
type: sessions
gap: 60s
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
The specified windowing is applied to all inputs, in this case resulting in a join per window.
pipeline:
transforms:
- type: ReadFromPubSub
name: ReadLeft
config:
topic: leftTopic
format: ...
schema: ...
- type: ReadFromPubSub
name: ReadRight
config:
topic: rightTopic
format: ...
schema: ...
- type: Sql
config:
query: select A.col1, B.col2 from A join B using (col3)
input:
A: ReadLeft
B: ReadRight
windowing:
type: fixed
size: 60s
options:
streaming: true
For a transform with no inputs, the specified windowing is instead applied to its output(s). As per the Beam model, the windowing is then inherited by all consuming operations. This is especially useful for root operations like Read.
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 60s
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
One can also specify windowing at the top level of a pipeline (or composite), which is a shorthand for applying this same windowing to all root operations that don’t otherwise specify their own windowing. This approach is effective way to apply a window everywhere in the pipeline.
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
windowing:
type: fixed
size: 60
options:
streaming: true
Note that all these windowing specifications are compatible with the source
and sink
syntax as well:
pipeline:
type: chain
source:
type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 10s
transforms:
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
sink:
type: WriteToCsv
config:
path: /path/to/output.json
windowing:
type: fixed
size: 5m
options:
streaming: true
Providers
Though we aim to offer a large suite of built-in transforms, it is inevitable that people will want to author their own. This is made possible through the notion of Providers which leverage expansion services and schema transforms.
For example, you could build a jar that vends a cross language transform or schema transform and then use it in a transform as follows
pipeline:
type: chain
source:
type: ReadFromCsv
config:
path: /path/to/input*.csv
transforms:
- type: MyCustomTransform
config:
arg: whatever
sink:
type: WriteToJson
config:
path: /path/to/output.json
providers:
- type: javaJar
config:
jar: /path/or/url/to/myExpansionService.jar
transforms:
MyCustomTransform: "urn:registered:in:expansion:service"
A full example of how to build a java provider can be found here.
Arbitrary Python transforms can be provided as well, using the syntax
providers:
- type: pythonPackage
config:
packages:
- my_pypi_package>=version
- /path/to/local/package.zip
transforms:
MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable"
Pipeline options
Pipeline options are used to configure different aspects of your pipeline, such as the pipeline runner that will execute your pipeline and any runner-specific configuration required by the chosen runner. To set pipeline options, append an options block at the end of your yaml file. For example:
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 60s
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
Jinja Templatization
It is a common to want to run a single Beam pipeline in different contexts
and/or with different configurations.
When running a YAML pipeline using apache_beam.yaml.main
or via gcloud,
the yaml file can be parameterized with externally provided variables using
the jinja variable syntax.
The values are then passed via a --jinja_variables
command line flag.
For example, one could start a pipeline with
pipeline:
transforms:
- type: ReadFromCsv
config:
path: {{input_pattern}}
and then run it with
python -m apache_beam.yaml.main \
--yaml_pipeline_file=pipeline.yaml \
--jinja_variables='{"input_pattern": "gs://path/to/this/runs/files*.csv"}'
Arbitrary jinja control structures, such as looping and conditionals, can be used as well if desired as long as the output results in a valid Beam YAML pipeline.
We also expose the datetime
module as a variable by default, which can be particularly useful in reading
or writing dated sources and sinks, e.g.
- type: WriteToJson
config:
path: "gs://path/to/{{ datetime.datetime.now().strftime('%Y/%m/%d') }}/dated-output.json"
would write to files like gs://path/to/2016/08/04/dated-output*.json
.