Execution model
The Beam model allows runners to execute your pipeline in different ways. You may observe various effects as a result of the runner’s choices. This page describes these effects so you can better understand how Beam pipelines execute.
Processing of elements
The serialization and communication of elements between machines is one of the most expensive operations in a distributed execution of your pipeline. Avoiding this serialization may require re-processing elements after failures or may limit the distribution of output to other machines.
Serialization and communication
The runner might serialize elements between machines for communication purposes and for other reasons such as persistence.
A runner may decide to transfer elements between transforms in a variety of ways, such as:
- Routing elements to a worker for processing as part of a grouping operation. This may involve serializing elements and grouping or sorting them by their key.
- Redistributing elements between workers to adjust parallelism. This may involve serializing elements and communicating them to other workers.
- Using the elements in a side input to a
ParDo
. This may require serializing the elements and broadcasting them to all the workers executing theParDo
. - Passing elements between transforms that are running on the same worker. This may allow the runner to avoid serializing elements; instead, the runner can just pass the elements in memory. This is done as part of an optimization that is known as fusion.
Some situations where the runner may serialize and persist elements are:
- When used as part of a stateful
DoFn
, the runner may persist values to some state mechanism. - When committing the results of processing, the runner may persist the outputs as a checkpoint.
Bundling and persistence
Beam pipelines often focus on “embarassingly parallel” problems. Because of this, the APIs emphasize processing elements in parallel, which makes it difficult to express actions like “assign a sequence number to each element in a PCollection”. This is intentional as such algorithms are much more likely to suffer from scalability problems.
Processing all elements in parallel also has some drawbacks. Specifically, it makes it impossible to batch any operations, such as writing elements to a sink or checkpointing progress during processing.
Instead of processing all elements simultaneously, the elements in a
PCollection
are processed in bundles. The division of the collection into
bundles is arbitrary and selected by the runner. This allows the runner to
choose an appropriate middle-ground between persisting results after every
element, and having to retry everything if there is a failure. For example, a
streaming runner may prefer to process and commit small bundles, and a batch
runner may prefer to process larger bundles.
Data partitioning and inter-stage execution
Partitioning and parallelization of element processing within a Beam pipeline is dependent on two things:
- Data source implementation
- Inter-stage key parallelism
Beam pipelines read data from a source (e.g. KafkaIO
, BigQueryIO
, JdbcIO
,
or your own source implementation). To implement a Source in Beam one must
implement it as a Splittable DoFn
. A Splittable DoFn
provides the runner
with interfaces to facilitate the splitting of work.
When running key-based operations in Beam (e.g. GroupByKey
, Combine
,
Reshuffle.perKey
, and stateful DoFn
s), Beam runners perform serialization
and transfer of data known as shuffle1. Shuffle allows data
elements of the same key to be processed together.
The way in which runners shuffle data may be slightly different for Batch and Streaming execution modes.
1Not to be confused with the shuffle
operation in some runners.
Data ordering in a pipeline execution
The Beam model does not define strict guidelines regarding the order in which
runners process elements or transport them across PTransforms
. Runners are
free to implement data transfer semantics in different forms.
Some use cases exist where user pipelines may need to rely on specific ordering semantics in pipeline execution. The capability matrix documents runner behavior for key-ordered delivery.
Consider a single Beam worker processing a series of bundles from the same Beam
transform, and consider a PTransform
that outputs data from this Stage into a
downstream PCollection
. Finally, consider two events with the same key
emitted in a certain order by this worker (within the same bundle or as part of
different bundles).
We say that the Beam runner supports key-ordered delivery if it guarantees that these two events will be observed in the same order by a PTransform that is immediately downstream independently of the kind of data transmission method.
This characteristic will hold true in runners and operations that have key-limited parallelism.
Failures and parallelism within and between transforms
In this section, we discuss how elements in the input collection are processed in parallel, and how transforms are retried when failures occur.
Data-parallelism within one transform
When executing a single ParDo
, a runner might divide an example input
collection of nine elements into two bundles as shown in figure 1.
Figure 1: A runner divides an input collection into two bundles.
When the ParDo
executes, workers may process the two bundles in parallel as
shown in figure 2.
Figure 2: Two workers process the two bundles in parallel.
Since elements cannot be split, the maximum parallelism for a transform depends on the number of elements in the collection. In figure 3, the input collection has nine elements, so the maximum parallelism is nine.
Figure 3: Nine workers process a nine element input collection in parallel.
Note: Splittable ParDo allows splitting the processing of a single input across multiple bundles. This feature is a work in progress.
Dependent-parallelism between transforms
ParDo
transforms that are in sequence may be dependently parallel if the
runner chooses to execute the consuming transform on the producing transform’s
output elements without altering the bundling. In figure 4, ParDo1
and
ParDo2
are dependently parallel if the output of ParDo1
for a given
element must be processed on the same worker.
Figure 4: Two transforms in sequence and their corresponding input collections.
Figure 5 shows how these dependently parallel transforms might execute. The
first worker executes ParDo1
on the elements in bundle A (which results in
bundle C), and then executes ParDo2
on the elements in bundle C. Similarly,
the second worker executes ParDo1
on the elements in bundle B (which results
in bundle D), and then executes ParDo2
on the elements in bundle D.
Figure 5: Two workers execute dependently parallel ParDo transforms.
Executing transforms this way allows a runner to avoid redistributing elements between workers, which saves on communication costs. However, the maximum parallelism now depends on the maximum parallelism of the first of the dependently parallel steps.
Failures within one transform
If processing of an element within a bundle fails, the entire bundle fails. The elements in the bundle must be retried (otherwise the entire pipeline fails), although they do not need to be retried with the same bundling.
For this example, we will use the ParDo
from figure 1 that has an input
collection with nine elements and is divided into two bundles.
In figure 6, the first worker successfully processes all five elements in bundle A. The second worker processes the four elements in bundle B: the first two elements were successfully processed, the third element’s processing failed, and there is one element still awaiting processing.
We see that the runner retries all elements in bundle B and the processing completes successfully the second time. Note that the retry does not necessarily happen on the same worker as the original processing attempt, as shown in the figure.
Figure 6: The processing of an element within bundle B fails, and another worker retries the entire bundle.
Because we encountered a failure while processing an element in the input bundle, we had to reprocess all of the elements in the input bundle. This means the runner must throw away the entire output of the bundle (including any state mutations and set timers) since all of the results it contains will be recomputed.
Note that if the failed transform is a ParDo
, then the DoFn
instance is torn
down and abandoned.
Coupled failure: Failures between transforms
If a failure to process an element in ParDo2
causes ParDo1
to re-execute,
these two steps are said to be co-failing.
For this example, we will use the two ParDo
s from figure 4.
In figure 7, worker two successfully executes ParDo1
on all elements in bundle
B. However, the worker fails to process an element in bundle D, so ParDo2
fails (shown as the red X). As a result, the runner must discard and recompute
the output of ParDo2
. Because the runner was executing ParDo1
and ParDo2
together, the output bundle from ParDo1
must also be thrown away, and all
elements in the input bundle must be retried. These two ParDo
s are co-failing.
Figure 7: Processing of an element within bundle D fails, so all elements in the input bundle are retried.
Note that the retry does not necessarily have the same processing time as the original attempt, as shown in the diagram.
All DoFns
that experience coupled failures are terminated and must be torn
down since they aren’t following the normal DoFn
lifecycle .
Executing transforms this way allows a runner to avoid persisting elements between transforms, saving on persistence costs.
Last updated on 2024/09/20
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!