Execution model

The Beam model allows runners to execute your pipeline in different ways. You may observe various effects as a result of the runner’s choices. This page describes these effects so you can better understand how Beam pipelines execute.

Processing of elements

The serialization and communication of elements between machines is one of the most expensive operations in a distributed execution of your pipeline. Avoiding this serialization may require re-processing elements after failures or may limit the distribution of output to other machines.

Serialization and communication

The runner might serialize elements between machines for communication purposes and for other reasons such as persistence.

A runner may decide to transfer elements between transforms in a variety of ways, such as:

Some situations where the runner may serialize and persist elements are:

  1. When used as part of a stateful DoFn, the runner may persist values to some state mechanism.
  2. When committing the results of processing, the runner may persist the outputs as a checkpoint.

Bundling and persistence

Beam pipelines often focus on “embarassingly parallel” problems. Because of this, the APIs emphasize processing elements in parallel, which makes it difficult to express actions like “assign a sequence number to each element in a PCollection”. This is intentional as such algorithms are much more likely to suffer from scalability problems.

Processing all elements in parallel also has some drawbacks. Specifically, it makes it impossible to batch any operations, such as writing elements to a sink or checkpointing progress during processing.

Instead of processing all elements simultaneously, the elements in a PCollection are processed in bundles. The division of the collection into bundles is arbitrary and selected by the runner. This allows the runner to choose an appropriate middle-ground between persisting results after every element, and having to retry everything if there is a failure. For example, a streaming runner may prefer to process and commit small bundles, and a batch runner may prefer to process larger bundles.

Data partitioning and inter-stage execution

Partitioning and parallelization of element processing within a Beam pipeline is dependent on two things:

Beam pipelines read data from a source (e.g. KafkaIO, BigQueryIO, JdbcIO, or your own source implementation). To implement a Source in Beam one must implement it as a Splittable DoFn. A Splittable DoFn provides the runner with interfaces to facilitate the splitting of work.

When running key-based operations in Beam (e.g. GroupByKey, Combine, Reshuffle.perKey, and stateful DoFns), Beam runners perform serialization and transfer of data known as shuffle1. Shuffle allows data elements of the same key to be processed together.

The way in which runners shuffle data may be slightly different for Batch and Streaming execution modes.

1Not to be confused with the shuffle operation in some runners.

Data ordering in a pipeline execution

The Beam model does not define strict guidelines regarding the order in which runners process elements or transport them across PTransforms. Runners are free to implement data transfer semantics in different forms.

Some use cases exist where user pipelines may need to rely on specific ordering semantics in pipeline execution. The capability matrix documents runner behavior for key-ordered delivery.

Consider a single Beam worker processing a series of bundles from the same Beam transform, and consider a PTransform that outputs data from this Stage into a downstream PCollection. Finally, consider two events with the same key emitted in a certain order by this worker (within the same bundle or as part of different bundles).

We say that the Beam runner supports key-ordered delivery if it guarantees that these two events will be observed in the same order by a PTransform that is immediately downstream independently of the kind of data transmission method.

This characteristic will hold true in runners and operations that have key-limited parallelism.

Failures and parallelism within and between transforms

In this section, we discuss how elements in the input collection are processed in parallel, and how transforms are retried when failures occur.

Data-parallelism within one transform

When executing a single ParDo, a runner might divide an example input collection of nine elements into two bundles as shown in figure 1.

Bundle A contains five elements. Bundle B contains four elements.

Figure 1: A runner divides an input collection into two bundles.

When the ParDo executes, workers may process the two bundles in parallel as shown in figure 2.

Two workers process the two bundles in parallel. Worker one processes bundle A. Worker two processes bundle B.

Figure 2: Two workers process the two bundles in parallel.

Since elements cannot be split, the maximum parallelism for a transform depends on the number of elements in the collection. In figure 3, the input collection has nine elements, so the maximum parallelism is nine.

Nine workers process a nine element input collection in parallel.

Figure 3: Nine workers process a nine element input collection in parallel.

Note: Splittable ParDo allows splitting the processing of a single input across multiple bundles. This feature is a work in progress.

Dependent-parallelism between transforms

ParDo transforms that are in sequence may be dependently parallel if the runner chooses to execute the consuming transform on the producing transform’s output elements without altering the bundling. In figure 4, ParDo1 and ParDo2 are dependently parallel if the output of ParDo1 for a given element must be processed on the same worker.

ParDo1 processes an input collection that contains bundles A and B. ParDo2 then processes the output collection from ParDo1, which contains bundles C and D.

Figure 4: Two transforms in sequence and their corresponding input collections.

Figure 5 shows how these dependently parallel transforms might execute. The first worker executes ParDo1 on the elements in bundle A (which results in bundle C), and then executes ParDo2 on the elements in bundle C. Similarly, the second worker executes ParDo1 on the elements in bundle B (which results in bundle D), and then executes ParDo2 on the elements in bundle D.

Worker one executes ParDo1 on bundle A and Pardo2 on bundle C. Worker two executes ParDo1 on bundle B and ParDo2 on bundle D.

Figure 5: Two workers execute dependently parallel ParDo transforms.

Executing transforms this way allows a runner to avoid redistributing elements between workers, which saves on communication costs. However, the maximum parallelism now depends on the maximum parallelism of the first of the dependently parallel steps.

Failures within one transform

If processing of an element within a bundle fails, the entire bundle fails. The elements in the bundle must be retried (otherwise the entire pipeline fails), although they do not need to be retried with the same bundling.

For this example, we will use the ParDo from figure 1 that has an input collection with nine elements and is divided into two bundles.

In figure 6, the first worker successfully processes all five elements in bundle A. The second worker processes the four elements in bundle B: the first two elements were successfully processed, the third element’s processing failed, and there is one element still awaiting processing.

We see that the runner retries all elements in bundle B and the processing completes successfully the second time. Note that the retry does not necessarily happen on the same worker as the original processing attempt, as shown in the figure.

Worker two fails to process an element in bundle B. Worker one finishes processing bundle A and then successfully retries to execute bundle B.

Figure 6: The processing of an element within bundle B fails, and another worker retries the entire bundle.

Because we encountered a failure while processing an element in the input bundle, we had to reprocess all of the elements in the input bundle. This means the runner must throw away the entire output of the bundle (including any state mutations and set timers) since all of the results it contains will be recomputed.

Note that if the failed transform is a ParDo, then the DoFn instance is torn down and abandoned.

Coupled failure: Failures between transforms

If a failure to process an element in ParDo2 causes ParDo1 to re-execute, these two steps are said to be co-failing.

For this example, we will use the two ParDos from figure 4.

In figure 7, worker two successfully executes ParDo1 on all elements in bundle B. However, the worker fails to process an element in bundle D, so ParDo2 fails (shown as the red X). As a result, the runner must discard and recompute the output of ParDo2. Because the runner was executing ParDo1 and ParDo2 together, the output bundle from ParDo1 must also be thrown away, and all elements in the input bundle must be retried. These two ParDos are co-failing.

Worker two fails to process en element in bundle D, so all elements in both bundle B and bundle D must be retried.

Figure 7: Processing of an element within bundle D fails, so all elements in the input bundle are retried.

Note that the retry does not necessarily have the same processing time as the original attempt, as shown in the diagram.

All DoFns that experience coupled failures are terminated and must be torn down since they aren’t following the normal DoFn lifecycle .

Executing transforms this way allows a runner to avoid persisting elements between transforms, saving on persistence costs.