Apache Beam Execution Model

The Beam model allows runners to execute your pipeline in different ways. You may observe various effects as a result of the runner’s choices. This page describes these effects so you can better understand how Beam pipelines execute.

Processing of elements

The serialization and communication of elements between machines is one of the most expensive operations in a distributed execution of your pipeline. Avoiding this serialization may require re-processing elements after failures or may limit the distribution of output to other machines.

Serialization and communication

The runner might serialize elements between machines for communication purposes and for other reasons such as persistence.

A runner may decide to transfer elements between transforms in a variety of ways, such as:

  1. Routing elements to a worker for processing as part of a grouping operation. This may involve serializing elements and grouping or sorting them by their key.
  2. Redistributing elements between workers to adjust parallelism. This may involve serializing elements and communicating them to other workers.
  3. Using the elements in a side input to a ParDo. This may require serializing the elements and broadcasting them to all the workers executing the ParDo.
  4. Passing elements between transforms that are running on the same worker. This may allow the runner to avoid serializing elements; instead, the runner can just pass the elements in memory.

Some situations where the runner may serialize and persist elements are:

  1. When used as part of a stateful DoFn, the runner may persist values to some state mechanism.
  2. When committing the results of processing, the runner may persist the outputs as a checkpoint.

Bundling and persistence

Beam pipelines often focus on “embarassingly parallel” problems. Because of this, the APIs emphasize processing elements in parallel, which makes it difficult to express actions like “assign a sequence number to each element in a PCollection”. This is intentional as such algorithms are much more likely to suffer from scalability problems.

Processing all elements in parallel also has some drawbacks. Specifically, it makes it impossible to batch any operations, such as writing elements to a sink or checkpointing progress during processing.

Instead of processing all elements simultaneously, the elements in a PCollection are processed in bundles. The division of the collection into bundles is arbitrary and selected by the runner. This allows the runner to choose an appropriate middle-ground between persisting results after every element, and having to retry everything if there is a failure. For example, a streaming runner may prefer to process and commit small bundles, and a batch runner may prefer to process larger bundles.

Failures and parallelism within and between transforms

In this section, we discuss how elements in the input collection are processed in parallel, and how transforms are retried when failures occur.

Data-parallelism within one transform

When executing a single ParDo, a runner might divide an example input collection of nine elements into two bundles as shown in figure 1.

bundling

Figure 1: A runner divides an input collection with nine elements into two bundles.

When the ParDo executes, workers may process the two bundles in parallel as shown in figure 2.

bundling_gantt

Figure 2: Two workers process the two bundles in parallel. The elements in each bundle are processed in sequence.

Since elements cannot be split, the maximum parallelism for a transform depends on the number of elements in the collection. In our example, the input collection has nine elements, so the maximum parallelism is nine.

bundling_gantt_max

Figure 3: The maximum parallelism is nine, as there are nine elements in the input collection.

Note: Splittable ParDo allows splitting the processing of a single input across multiple bundles. This feature is a work in progress.

Dependent-parallelism between transforms

ParDo transforms that are in sequence may be dependently parallel if the runner chooses to execute the consuming transform on the producing transform’s output elements without altering the bundling. In figure 4, ParDo1 and ParDo2 are dependently parallel if the output of ParDo1 for a given element must be processed on the same worker.

bundling_multi

Figure 4: Two transforms in sequence and their corresponding input collections.

Figure 5 shows how these dependently parallel transforms might execute. The first worker executes ParDo1 on the elements in bundle A (which results in bundle C), and then executes ParDo2 on the elements in bundle C. Similarly, the second worker executes ParDo1 on the elements in bundle B (which results in bundle D), and then executes ParDo2 on the elements in bundle D.

bundling_multi_gantt.svg

Figure 5: Two workers execute dependently parallel ParDo transforms.

Executing transforms this way allows a runner to avoid redistributing elements between workers, which saves on communication costs. However, the maximum parallelism now depends on the maximum parallelism of the first of the dependently parallel steps.

Failures within one transform

If processing of an element within a bundle fails, the entire bundle fails. The elements in the bundle must be retried (otherwise the entire pipeline fails), although they do not need to be retried with the same bundling.

For this example, we will use the ParDo from figure 1 that has an input collection with nine elements and is divided into two bundles.

In figure 6, the first worker successfully processes all five elements in bundle A. The second worker processes the four elements in bundle B: the first two elements were successfully processed, the third element’s processing failed, and there is one element still awaiting processing.

We see that the runner retries all elements in bundle B and the processing completes successfully the second time. Note that the retry does not necessarily happen on the same worker as the original processing attempt, as shown in the diagram.

failure_retry

Figure 6: The processing of an element within bundle B fails, and another worker retries the entire bundle.

Because we encountered a failure while processing an element in the input bundle, we had to reprocess all of the elements in the input bundle. This means the runner must throw away the entire output bundle since all of the results it contains will be recomputed.

Note that if the failed transform is a ParDo, then the DoFn instance is torn down and abandoned.

Coupled failure: Failures between transforms

If a failure to process an element in ParDo2 causes ParDo1 to re-execute, these two steps are said to be co-failing.

For this example, we will use the two ParDos from figure 4.

In figure 7, worker two successfully executes ParDo1 on all elements in bundle B. However, the worker fails to process an element in bundle D, so ParDo2 fails (shown as the red X). As a result, the runner must discard and recompute the output of ParDo2. Because the runner was executing ParDo1 and ParDo2 together, the output bundle from ParDo1 must also be thrown away, and all elements in the input bundle must be retried. These two ParDos are co-failing.

bundling_coupled failure

Figure 7: Processing of an element within bundle D fails, so all elements in the input bundle are retried.

Note that the retry does not necessarily have the same processing time as the original attempt, as shown in the diagram.

All DoFns that experience coupled failures are terminated and must be torn down since they aren’t following the normal DoFn lifecycle .

Executing transforms this way allows a runner to avoid persisting elements between transforms, saving on persistence costs.