The Beam model allows runners to execute your pipeline in different ways. You may observe various effects as a result of the runner’s choices. This page describes these effects so you can better understand how Beam pipelines execute.
The serialization and communication of elements between machines is one of the most expensive operations in a distributed execution of your pipeline. Avoiding this serialization may require re-processing elements after failures or may limit the distribution of output to other machines.
The runner might serialize elements between machines for communication purposes and for other reasons such as persistence.
A runner may decide to transfer elements between transforms in a variety of ways, such as:
ParDo. This may require serializing the elements and broadcasting them to all the workers executing the
Some situations where the runner may serialize and persist elements are:
DoFn, the runner may persist values to some state mechanism.
Beam pipelines often focus on “embarassingly parallel” problems. Because of this, the APIs emphasize processing elements in parallel, which makes it difficult to express actions like “assign a sequence number to each element in a PCollection”. This is intentional as such algorithms are much more likely to suffer from scalability problems.
Processing all elements in parallel also has some drawbacks. Specifically, it makes it impossible to batch any operations, such as writing elements to a sink or checkpointing progress during processing.
Instead of processing all elements simultaneously, the elements in a
PCollection are processed in bundles. The division of the collection into
bundles is arbitrary and selected by the runner. This allows the runner to
choose an appropriate middle-ground between persisting results after every
element, and having to retry everything if there is a failure. For example, a
streaming runner may prefer to process and commit small bundles, and a batch
runner may prefer to process larger bundles.
In this section, we discuss how elements in the input collection are processed in parallel, and how transforms are retried when failures occur.
When executing a single
ParDo, a runner might divide an example input
collection of nine elements into two bundles as shown in figure 1.
Figure 1: A runner divides an input collection with nine elements into two bundles.
ParDo executes, workers may process the two bundles in parallel as
shown in figure 2.
Figure 2: Two workers process the two bundles in parallel. The elements in each bundle are processed in sequence.
Since elements cannot be split, the maximum parallelism for a transform depends on the number of elements in the collection. In our example, the input collection has nine elements, so the maximum parallelism is nine.
Figure 3: The maximum parallelism is nine, as there are nine elements in the input collection.
Note: Splittable ParDo allows splitting the processing of a single input across multiple bundles. This feature is a work in progress.
ParDo transforms that are in sequence may be dependently parallel if the
runner chooses to execute the consuming transform on the producing transform’s
output elements without altering the bundling. In figure 4,
ParDo2 are dependently parallel if the output of
ParDo1 for a given
element must be processed on the same worker.
Figure 4: Two transforms in sequence and their corresponding input collections.
Figure 5 shows how these dependently parallel transforms might execute. The
first worker executes
ParDo1 on the elements in bundle A (which results in
bundle C), and then executes
ParDo2 on the elements in bundle C. Similarly,
the second worker executes
ParDo1 on the elements in bundle B (which results
in bundle D), and then executes
ParDo2 on the elements in bundle D.
Figure 5: Two workers execute dependently parallel ParDo transforms.
Executing transforms this way allows a runner to avoid redistributing elements between workers, which saves on communication costs. However, the maximum parallelism now depends on the maximum parallelism of the first of the dependently parallel steps.
If processing of an element within a bundle fails, the entire bundle fails. The elements in the bundle must be retried (otherwise the entire pipeline fails), although they do not need to be retried with the same bundling.
For this example, we will use the
ParDo from figure 1 that has an input
collection with nine elements and is divided into two bundles.
In figure 6, the first worker successfully processes all five elements in bundle A. The second worker processes the four elements in bundle B: the first two elements were successfully processed, the third element’s processing failed, and there is one element still awaiting processing.
We see that the runner retries all elements in bundle B and the processing completes successfully the second time. Note that the retry does not necessarily happen on the same worker as the original processing attempt, as shown in the diagram.
Figure 6: The processing of an element within bundle B fails, and another worker retries the entire bundle.
Because we encountered a failure while processing an element in the input bundle, we had to reprocess all of the elements in the input bundle. This means the runner must throw away the entire output bundle since all of the results it contains will be recomputed.
Note that if the failed transform is a
ParDo, then the
DoFn instance is torn
down and abandoned.
If a failure to process an element in
ParDo1 to re-execute,
these two steps are said to be co-failing.
For this example, we will use the two
ParDos from figure 4.
In figure 7, worker two successfully executes
ParDo1 on all elements in bundle
B. However, the worker fails to process an element in bundle D, so
fails (shown as the red X). As a result, the runner must discard and recompute
the output of
ParDo2. Because the runner was executing
together, the output bundle from
ParDo1 must also be thrown away, and all
elements in the input bundle must be retried. These two
ParDos are co-failing.
Figure 7: Processing of an element within bundle D fails, so all elements in the input bundle are retried.
Note that the retry does not necessarily have the same processing time as the original attempt, as shown in the diagram.
DoFns that experience coupled failures are terminated and must be torn
down since they aren’t following the normal
DoFn lifecycle .
Executing transforms this way allows a runner to avoid persisting elements between transforms, saving on persistence costs.