Runner Authoring Guide
This guide walks through how to implement a new runner. It is aimed at someone who has a data processing system and wants to use it to execute a Beam pipeline. The guide starts from the basics, to help you evaluate the work ahead. Then the sections become more and more detailed, to be a resource throughout the development of your runner.
Topics covered:
- Implementing the Beam Primitives
- Working with pipelines
- Testing your runner
- Integrating your runner nicely with SDKs
- Writing an SDK-independent runner
- The Runner API protos
- The Jobs API RPCs
Implementing the Beam Primitives
Aside from encoding and persisting data - which presumably your engine already does in some way or another - most of what you need to do is implement the Beam primitives. This section provides a detailed look at each primitive, covering what you need to know that might not be obvious and what support code is provided.
The primitives are designed for the benefit of pipeline authors, not runner
authors. Each represents a different conceptual mode of operation (external IO,
element-wise, grouping, windowing, union) rather than a specific implementation
decision. The same primitive may require a very different implementation based
on how the user instantiates it. For example, a ParDo
that uses state or
timers may require key partitioning, a GroupByKey
with speculative triggering
may require a more costly or complex implementation.
What if you haven’t implemented some of these features?
That’s OK! You don’t have to do it all at once, and there may even be features
that don’t make sense for your runner to ever support. We maintain a
capability matrix on the Beam site so you can tell
users what you support. When you receive a Pipeline
, you should traverse it
and determine whether or not you can execute each DoFn
that you find. If
you cannot execute some DoFn
in the pipeline (or if there is any other
requirement that your runner lacks) you should reject the pipeline. In your
native environment, this may look like throwing an
UnsupportedOperationException
. The Runner API RPCs will make this explicit,
for cross-language portability.
Implementing the Impulse primitive
Impulse
is a PTransform that takes no inputs and produces exactly one output
during the lifetime of the pipeline which should be the empty bytes in the
global window with the minimum timestamp. This has the encoded value of
7f df 3b 64 5a 1c ac 09 00 00 00 01 0f 00
when encoded with the standard
windowed value coder.
Though Impulse
is generally not invoked by a user, it is the only root
primitive operation, and other root operations (like Read
s and Create
)
are composite operations constructed from an Impulse
followed by a series
of (possibly Splittable) ParDo
s.
Implementing the ParDo primitive
The ParDo
primitive describes element-wise transformation for a
PCollection
. ParDo
is the most complex primitive, because it is where any
per-element processing is described. In addition to very simple operations like
standard map
or flatMap
from functional programming, ParDo
also supports
multiple outputs, side inputs, initialization, flushing, teardown, and stateful
processing.
The UDF that is applied to each element is called a DoFn
. The exact APIs for
a DoFn
can vary per language/SDK but generally follow the same pattern, so we
can discuss it with pseudocode. I will also often refer to the Java support
code, since I know it and most of our current and future runners are
Java-based.
Generally, rather than applying a series of ParDo
s one at a time over the
entire input data set, it is more efficient to fuse several ParDo
s together
in a single executable stage that consists of a whole series (in general,
a DAG) of mapping operations. In addition to ParDo
s, windowing operations,
local (pre- or post-GBK) combining operations, and other mapping operations
may be fused into these stages as well.
As DoFns may execute code in a different language, or requiring a different environment, than the runner itself, Beam provides the ability to call these in a cross-process way. This is the crux of the Beam Fn API, for which more detail can be found below. It is, however, perfectly acceptable for a runner to invoke this user code in process (for simplicity or efficiency) when the environments are compatible.
Bundles
For correctness, a DoFn
should represent an element-wise function, but in
most SDKS this is a long-lived object that processes elements in small groups
called bundles.
Your runner decides how many elements, and which elements, to include in a bundle, and can even decide dynamically in the middle of processing that the current bundle has “ended”. How a bundle is processed ties in with the rest of a DoFn’s lifecycle.
It will generally improve throughput to make the largest bundles possible, so that initialization and finalization costs are amortized over many elements. But if your data is arriving as a stream, then you will want to terminate a bundle in order to achieve appropriate latency, so bundles may be just a few elements.
A bundle is the unit of commitment in Beam. If an error is encountered while processing a bundle, all the prior outputs of that bundle (including any modifications to state or timers) must be discarded by the runner and the entire bundle retried. Upon successful completion of a bundle, its outputs, together with any state/timer modifications and watermark updates, must be committed atomically.
The DoFn Lifecycle
DoFns
in many SDKS have several methods such as setup
, start_bundle
,
finish_bundle
, teardown
, etc. in addition to the standard,
element-wise process
calls. Generally proper invocation of
this lifecycle
should be handled for you when invoking one or more
DoFn
s from the standard bundle processors (either via the FnAPI or directly
using a BundleProcessor
(java
(python)).
SDK-independent runners should never have to worry about these details directly.
Side Inputs
Main design document: https://s.apache.org/beam-side-inputs-1-pager
A side input is a global view of a window of a PCollection
. This distinguishes
it from the main input, which is processed one element at a time. The SDK/user
prepares a PCollection
adequately, the runner materializes it, and then the
runner feeds it to the DoFn
.
Unlike main input data, which is pushed by the runner to the ParDo
(generally
via the FnApi Data channel), side input data is pulled by the ParDo
from the runner (generally over the FnAPI State channel).
A side input is accessed via a specific access_pattern
.
There are currently two access patterns enumerated in the
StandardSideInputTypes
proto: beam:side_input:iterable:v1
which indicates
the runner must return all values in a PCollection corresponding to a specific
window and beam:side_input:multimap:v1
which indicates the runner must return
all values corresponding to a specific key and window.
Being able to serve these access patterns efficiently may influence how a
runner materializes this PCollection.
SideInputs can be detected by looking at the side_inputs
map in the
ParDoPayload
of ParDo
transforms.
The ParDo
operation itself is responsible for invoking the
window_mapping_fn
(before invoking the runner) and view_fn
(on the
runner-returned values), so the runner need not concern itself with these
fields.
When a side input is needed but the side input has no data associated with it
for a given window, elements in that window must be deferred until the side
input has some data or the watermark has advances sufficiently such that
we can be sure there will be no data for that window. The
PushBackSideInputDoFnRunner
is an example of implementing this.
State and Timers
Main design document: https://s.apache.org/beam-state
When a ParDo
includes state and timers, its execution on your runner is usually
very different. In particular, the state must be persisted when the bundle
completes and retrieved for future bundles. Timers that are set must also be
injected into future bundles as the watermark advances sufficiently.
State and timers are partitioned per key and window, that is, a DoFn
processing a given key must have a consistent view of the state and timers
across all elements that share this key. You may need or want to
explicitly shuffle data to support this.
Once the watermark has passed the end of the window (plus an allowance for
allowed lateness, if any), state associated with this window can be dropped.
State setting and retrieval is performed on the FnAPI State channel, whereas timer setting and firing happens on the FnAPI Data channel.
Splittable DoFn
Main design document: https://s.apache.org/splittable-do-fn
Splittable DoFn
is a generalization of ParDo
that is useful for high-fanout
mappings that can be done in parallel. The prototypical example of such an
operation is reading from a file, where a single file name (as an input element)
can be mapped to all the elements contained in that file.
The DoFn
is considered splittable in the sense that an element representing,
say, a single file can be split (e.g. into ranges of that file) to be processed
(e.g. read) by different workers.
The full power of this primitive is in the fact that these splits can happen
dynamically rather than just statically (i.e. ahead of time) avoiding the
problem of over- or undersplitting.
A full explanation of Splittable DoFn
is out of scope for this doc, but
here is a brief overview as it pertains to its execution.
A Splittable DoFn
can participate in the dynamic splitting protocol by
splitting within an element as well as between elements. Dynamic splitting
is triggered by the runner issuing ProcessBundleSplitRequest
messages on
the control channel. The SDK will commit to process just a portion of the
indicated element and return a description of the remainder (i.e. the
unprocessed portion) to the runner in the ProcessBundleSplitResponse
to be scheduled by the runner (e.g. on a different worker or as part of a
different bundle).
A Splittable DoFn
can also initiate its own spitting, indicating it has
processed an element as far as it can for the moment (e.g. when tailing a file)
but more remains. These most often occur when reading unbounded sources.
In this case a set of elements representing the deferred work are passed back
in the residual_roots
field of the ProcessBundleResponse
.
At a future time, the runner must re-invoke these same operations with
the elements given in residual_roots
.
Implementing the GroupByKey (and window) primitive
The GroupByKey
operation (sometimes called GBK for short) groups a
PCollection
of key-value pairs by key and window, emitting results according
to the PCollection
’s triggering configuration.
It is quite a bit more elaborate than simply colocating elements with the same
key, and uses many fields from the PCollection
’s windowing strategy.
Group By Encoded Bytes
For both the key and window, your runner sees them as “just bytes”. So you need to group in a way that is consistent with grouping by those bytes, even if you have some special knowledge of the types involved.
The elements you are processing will be key-value pairs, and you’ll need to extract
the keys. For this reason, the format of key-value pairs is
standardized and shared
across all SDKS. See either
KvCoder
in Java or
TupleCoder
in Python for documentation on the binary format.
Window Merging
As well as grouping by key, your runner must group elements by their window. A
WindowFn
has the option of declaring that it merges windows on a per-key
basis. For example, session windows for the same key will be merged if they
overlap. So your runner must invoke the merge method of the WindowFn
during
grouping.
Implementing via GroupByKeyOnly + GroupAlsoByWindow
The Java and Python codebases includes support code for a particularly common way of
implementing the full GroupByKey
operation: first group the keys, and then group
by window. For merging windows, this is essentially required, since merging is
per key.
Often presenting the set of values in timestamp order can allow more efficient grouping of these values into their final windows.
Dropping late data
Main design document: https://s.apache.org/beam-lateness
A window is expired in a PCollection
if the watermark of the input PCollection
has exceeded the end of the window by at least the input PCollection
’s
allowed lateness.
Data for an expired window can be dropped any time and should be dropped at a
GroupByKey
. If you are using GroupAlsoByWindow
, then just before executing
this transform. You may shuffle less data if you drop data prior to
GroupByKeyOnly
, but should only safely be done for non-merging windows, as a
window that appears expired may merge to become not expired.
Triggering
Main design document: https://s.apache.org/beam-triggers
The input PCollection
’s trigger and accumulation mode specify when and how
outputs should be emitted from the GroupByKey
operation.
In Java, there is a lot of support code for executing triggers in the
GroupAlsoByWindow
implementations, ReduceFnRunner
(legacy name), and
TriggerStateMachine
, which is an obvious way of implementing all triggers as
an event-driven machine over elements and timers.
In Python this is supported by the
TriggerDriver classes.
TimestampCombiner
When an aggregated output is produced from multiple inputs, the GroupByKey
operation has to choose a timestamp for the combination. To do so, first the
WindowFn has a chance to shift timestamps - this is needed to ensure watermarks
do not prevent progress of windows like sliding windows (the details are beyond
this doc). Then, the shifted timestamps need to be combined - this is specified
by a TimestampCombiner
, which can either select the minimum or maximum of its
inputs, or just ignore inputs and choose the end of the window.
Implementing the Window primitive
The window primitive applies a WindowFn
UDF to place each input element into
one or more windows of its output PCollection. Note that the primitive also
generally configures other aspects of the windowing strategy for a PCollection
,
but the fully constructed graph that your runner receives will already have a
complete windowing strategy for each PCollection
.
To implement this primitive, you need to invoke the provided WindowFn on each
element, which will return some set of windows for that element to be a part of
in the output PCollection
.
Most runners implement this by fusing these window-altering mappings in with
the DoFns
.
Implementation considerations
A “window” is just a second grouping key that has a “maximum timestamp”. It can
be any arbitrary user-defined type. The WindowFn
provides the coder for the
window type.
Beam’s support code provides WindowedValue
which is a compressed
representation of an element in multiple windows. You may want to do use this,
or your own compressed representation. Remember that it simply represents
multiple elements at the same time; there is no such thing as an element “in
multiple windows”.
For values in the global window, you may want to use an even further compressed representation that doesn’t bother including the window at all.
We provide coders with these optimizations such as
PARAM_WINDOWED_VALUE
that can be used to reduce the size of serialized data.
In the future, this primitive may be retired as it can be implemented as a ParDo if the capabilities of ParDo are enhanced to allow output to new windows.
Implementing the Flatten primitive
This one is easy - take as input a finite set of PCollections
and outputs their
bag union, keeping windows intact.
For this operation to make sense, it is the SDK’s responsibility to make sure the windowing strategies are compatible.
Also note that there is no requirement that the coders for all the PCollections
be the same. If your runner wants to require that (to avoid tedious
re-encoding) you have to enforce it yourself. Or you could just implement the
fast path as an optimization.
Special mention: the Combine composite
A composite transform that is almost always treated specially by a runner is
CombinePerKey
, which applies an associative and commutative operator to
the elements of a PCollection
. This composite is not a primitive. It is
implemented in terms of ParDo
and GroupByKey
, so your runner will work
without treating it - but it does carry additional information that you
probably want to use for optimizations: the associative-commutative operator,
known as a CombineFn
.
Generally runners will want to implement this via what is called
combiner lifting, where a new operation is placed before the GroupByKey
that does partial (within-bundle) combining, which often requires a slight
modification of what comes after the GroupByKey
as well.
An example of this transformation can be found in the
Python
or go
implementations of this optimization.
The resulting pre- and post-GroupByKey
operations are generally fused in with
the ParDo
s and executed as above.
Working with pipelines
When you receive a pipeline from a user, you will need to translate it. An explanation of how Beam pipelines are represented can be found here which compliment the official proto declarations.
Testing your runner
The Beam Java SDK and Python SDK have suites of runner validation tests. The
configuration may evolve faster than this document, so check the configuration
of other Beam runners. But be aware that we have tests and you can use them
very easily! To enable these tests in a Java-based runner using Gradle, you
scan the dependencies of the SDK for tests with the JUnit category
ValidatesRunner
.
task validatesRunner(type: Test) {
group = "Verification"
description = "Validates the runner"
def pipelineOptions = JsonOutput.toJson(["--runner=MyRunner", ... misc test options ...])
systemProperty "beamTestPipelineOptions", pipelineOptions
classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
}
}
Enabling these tests in other languages is unexplored.
Integrating your runner nicely with SDKs
Whether or not your runner is based in the same language as an SDK (such as Java), you will want to provide a shim to invoke it from another SDK if you want the users of that SDK (such as Python) to use it.
Integrating with the Java SDK
Allowing users to pass options to your runner
The mechanism for configuration is
PipelineOptions
,
an interface that works completely differently than normal Java objects. Forget
what you know, and follow the rules, and PipelineOptions
will treat you well.
You must implement a sub-interface for your runner with getters and setters with matching names, like so:
public interface MyRunnerOptions extends PipelineOptions {
@Description("The Foo to use with MyRunner")
@Required
public Foo getMyRequiredFoo();
public void setMyRequiredFoo(Foo newValue);
@Description("Enable Baz; on by default")
@Default.Boolean(true)
public Boolean isBazEnabled();
public void setBazEnabled(Boolean newValue);
}
You can set up defaults, etc. See the javadoc for details. When your runner is
instantiated with a PipelineOptions
object, you access your interface by
options.as(MyRunnerOptions.class)
.
To make these options available on the command line, you register your options
with a PipelineOptionsRegistrar
. It is easy if you use @AutoService
:
Registering your runner with SDKs for command line use
To make your runner available on the command line, you register your options
with a PipelineRunnerRegistrar
. It is easy if you use @AutoService
:
Integrating with the Python SDK
In the Python SDK the registration of the code is not automatic. So there are few things to keep in mind when creating a new runner.
Any dependencies on packages for the new runner should be options so create a
new target in extra_requires
in setup.py
that is needed for the new runner.
All runner code should go in it’s own package in apache_beam/runners
directory.
Register the new runner in the create_runner
function of runner.py
so that the
partial name is matched with the correct class to be used.
Python Runners can also be identified (e.g. when passing the runner parameter) by their fully qualified name whether or not they live in the Beam repository.
Writing an SDK-independent runner
There are two aspects to making your runner SDK-independent, able to run pipelines written in other languages: The Fn API and the Runner API.
The Fn API
Design documents:
To run a user’s pipeline, you need to be able to invoke their UDFs. The Fn API is an RPC interface for the standard UDFs of Beam, implemented using protocol buffers over gRPC.
The Fn API includes:
- APIs for registering a subgraph of UDFs
- APIs for streaming elements of a bundle
- Shared data formats (key-value pairs, timestamps, iterables, etc)
You are fully welcome to also use the SDK for your language for utility code, or provide optimized implementations of bundle processing for same-language UDFs.
The Runner API
The Runner API is an SDK-independent schema for a pipeline along with RPC interfaces for launching a pipeline and checking the status of a job. By examining a pipeline only through Runner API interfaces, you remove your runner’s dependence on the SDK for its language for pipeline analysis and job translation.
To execute such an SDK-independent pipeline, you will need to support the Fn API. UDFs are embedded in the pipeline as a specification of the function (often just opaque serialized bytes for a particular language) plus a specification of an environment that can execute it (essentially a particular SDK). So far, this specification is expected to be a URI for a Docker container hosting the SDK’s Fn API harness.
You are fully welcome to also use the SDK for your language, which may offer useful utility code.
The language-independent definition of a pipeline is described via a protocol buffers schema, covered below for reference. But your runner need not directly manipulate protobuf messages. Instead, the Beam codebase provides utilities for working with pipelines so that you don’t need to be aware of whether or not the pipeline has ever been serialized or transmitted, or what language it may have been written in to begin with.
Java
If your runner is Java-based, the tools to interact with pipelines in an
SDK-agnostic manner are in the beam-runners-core-construction-java
artifact, in the org.apache.beam.sdk.util.construction
namespace.
The utilities are named consistently, like so:
PTransformTranslation
- registry of known transforms and standard URNsParDoTranslation
- utilities for working withParDo
in a language-independent mannerWindowIntoTranslation
- same forWindow
FlattenTranslation
- same forFlatten
WindowingStrategyTranslation
- same for windowing strategiesCoderTranslation
- same for coders- … etc, etc …
By inspecting transforms only through these classes, your runner will not depend on the particulars of the Java SDK.
The Runner API protos
The Runner API refers to a specific manifestation of the concepts in the Beam model, as a protocol buffers schema. Even though you should not manipulate these messages directly, it can be helpful to know the canonical data that makes up a pipeline.
Most of the API is exactly the same as the high-level description; you can get started implementing a runner without understanding all the low-level details.
The most important takeaway of the Runner API for you is that it is a language-independent definition of a Beam pipeline. You will probably always interact via a particular SDK’s support code wrapping these definitions with sensible idiomatic APIs, but always be aware that this is the specification and any other data is not necessarily inherent to the pipeline, but may be SDK-specific enrichments (or bugs!).
The UDFs in the pipeline may be written for any Beam SDK, or even multiple in the same pipeline. So this is where we will start, taking a bottom-up approach to understanding the protocol buffers definitions for UDFs before going back to the higher-level, mostly obvious, record definitions.
FunctionSpec
proto
The heart of cross-language portability is the FunctionSpec
. This is a
language-independent specification of a function, in the usual programming
sense that includes side effects, etc.
A FunctionSpec
includes a URN identifying the function as well as an arbitrary
fixed parameter. For example the (hypothetical) “max” CombineFn might have the
URN beam:combinefn:max:0.1
and a parameter that indicates by what
comparison to take the max.
For most UDFs in a pipeline constructed using a particular language’s SDK, the
URN will indicate that the SDK must interpret it, for example
beam:dofn:javasdk:0.1
or beam:dofn:pythonsdk:0.1
. The parameter
will contain serialized code, such as a Java-serialized DoFn
or a Python
pickled DoFn
.
A FunctionSpec
is not only for UDFs. It is just a generic way to name/specify
any function. It is also used as the specification for a PTransform
. But when
used in a PTransform
it describes a function from PCollection
to PCollection
and cannot be specific to an SDK because the runner is in charge of evaluating
transforms and producing PCollections
.
It goes without saying that not every environment will be able to deserialize
every function spec. For this reason PTransform
s have an environment_id
parameter that indicates at least one environment that is capable of interpreting
the contained URNs. This is a reference to an environment in the environments
map of the Pipeline proto and is typically defined by a docker image (possibly
with some extra dependencies).
There may be other environments that are also capable of
doing so, and a runner is free to use them if it has this knowledge.
Primitive transform payload protos
The payload for the primitive transforms are just proto serializations of their specifications. Rather than reproduce their full code here, I will just highlight the important pieces to show how they fit together.
It is worth emphasizing again that while you probably will not interact directly with these payloads, they are the only data that is inherently part of the transform.
ParDoPayload
proto
A ParDo
transform carries its DoFn
in an SdkFunctionSpec
and then
provides language-independent specifications for its other features - side
inputs, state declarations, timer declarations, etc.
CombinePayload
proto
Combine
is not a primitive. But non-primitives are perfectly able to carry
additional information for better optimization. The most important thing that a
Combine
transform carries is the CombineFn
in an SdkFunctionSpec
record.
In order to effectively carry out the optimizations desired, it is also
necessary to know the coder for intermediate accumulations, so it also carries
a reference to this coder.
PTransform
proto
A PTransform
is a function from PCollection
to PCollection
. This is
represented in the proto using a FunctionSpec.
A PTransform
may have subtransforms if it is a composite, in which case the
FunctionSpec
may be omitted since the subtransforms define its behavior.
The input and output PCollections
are unordered and referred to by a local
name. The SDK decides what this name is, since it will likely be embedded in
serialized UDFs.
A runner that understands the specification of a given PTransform
(whether
primitive or composite), as defined by its FunctionSpec
, is free to
substitute it with another PTransform
(or set thereof) that has identical
semantics.
This is typically how CombinePerKey
is handled, but many other substitutions
can be done as well.
PCollection
proto
A PCollection
just stores a coder, windowing strategy, and whether or not it
is bounded.
Coder
proto
This is a very interesting proto. A coder is a parameterized function that may
only be understood by a particular SDK, hence an FunctionSpec
, but also
may have component coders that fully define it. For example, a ListCoder
is
only a meta-format, while ListCoder(VarIntCoder)
is a fully specified format.
There are a large number of standard coders understood by most, if not all, SDKs. Using these allows for cross-language transforms.
The Jobs API RPCs
While your language’s SDK will may insulate you from touching the Runner API protos directly, you may need to implement adapters for your runner, to expose it to another language. This allows a Python SDK to invoke a Java runner or vice versa. A typical implementation of this can be found in local_job_service.py which is used directly to front several Python-implemented runners.
The RPCs themselves will necessarily follow the existing APIs of PipelineRunner and PipelineResult, but altered to be the minimal backend channel, versus a rich and convenient API.
A key piece of this is the Artifacts API, which allows a Runner to fetch and deploy binary artifacts (such as jars, pypi packages, etc.) that are listed as dependencies in the various environments, and may have various representations. This is invoked after a pipeline is submitted, but before it is executed. The SDK submitting a pipeline acts as an artifact server to the runner receiving the request, and in turn the runner then acts as an artifact server to the workers (environments) hosting the users UDFs.
Last updated on 2024/12/29
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!