Apache Beam Programming Guide

The Beam Programming Guide is intended for Beam users who want to use the Beam SDKs to create data processing pipelines. It provides guidance for using the Beam SDK classes to build and test your pipeline. It is not intended as an exhaustive reference, but as a language-agnostic, high-level guide to programmatically building your Beam pipeline. As the programming guide is filled out, the text will include code samples in multiple languages to help illustrate how to implement Beam concepts in your pipelines.

The Python SDK supports Python 2.7, 3.5, 3.6, and 3.7. New Python SDK releases will stop supporting Python 2.7 in 2020 (BEAM-8371). For best results, use Beam with Python 3.

1. Overview

To use Beam, you need to first create a driver program using the classes in one of the Beam SDKs. Your driver program defines your pipeline, including all of the inputs, transforms, and outputs; it also sets execution options for your pipeline (typically passed in using command-line options). These include the Pipeline Runner, which, in turn, determines what back-end your pipeline will run on.

The Beam SDKs provide a number of abstractions that simplify the mechanics of large-scale distributed data processing. The same Beam abstractions work with both batch and streaming data sources. When you create your Beam pipeline, you can think about your data processing task in terms of these abstractions. They include:

A typical Beam driver program works as follows:

When you run your Beam driver program, the Pipeline Runner that you designate constructs a workflow graph of your pipeline based on the PCollection objects you’ve created and transforms that you’ve applied. That graph is then executed using the appropriate distributed processing back-end, becoming an asynchronous “job” (or equivalent) on that back-end.

2. Creating a pipeline

The Pipeline abstraction encapsulates all the data and steps in your data processing task. Your Beam driver program typically starts by constructing a Pipeline Pipeline object, and then using that object as the basis for creating the pipeline’s data sets as PCollections and its operations as Transforms.

To use Beam, your driver program must first create an instance of the Beam SDK class Pipeline (typically in the main() function). When you create your Pipeline, you’ll also need to set some configuration options. You can set your pipeline’s configuration options programmatically, but it’s often easier to set the options ahead of time (or read them from the command line) and pass them to the Pipeline object when you create the object.

// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline p = Pipeline.create(options);
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

with beam.Pipeline(options=PipelineOptions()) as p:
  pass  # build your pipeline here
// In order to start creating the pipeline for execution, a Pipeline object and a Scope object are needed.
p, s := beam.NewPipelineWithRoot()

2.1. Configuring pipeline options

Use the pipeline options to configure different aspects of your pipeline, such as the pipeline runner that will execute your pipeline and any runner-specific configuration required by the chosen runner. Your pipeline options will potentially include information such as your project ID or a location for storing files.

When you run the pipeline on a runner of your choice, a copy of the PipelineOptions will be available to your code. For example, if you add a PipelineOptions parameter to a DoFn’s @ProcessElement method, it will be populated by the system.

2.1.1. Setting PipelineOptions from command-line arguments

While you can configure your pipeline by creating a PipelineOptions object and setting the fields directly, the Beam SDKs include a command-line parser that you can use to set fields in PipelineOptions using command-line arguments.

To read options from the command-line, construct your PipelineOptions object as demonstrated in the following example code:

PipelineOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().create();
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

with beam.Pipeline(options=PipelineOptions()) as p:
  pass  # build your pipeline here
// If beamx or Go flags are used, flags must be parsed first.
flag.Parse()

This interprets command-line arguments that follow the format:

--<option>=<value>

Note: Appending the method .withValidation will check for required command-line arguments and validate argument values.

Building your PipelineOptions this way lets you specify any of the options as a command-line argument.

Note: The WordCount example pipeline demonstrates how to set pipeline options at runtime by using command-line options.

2.1.2. Creating custom options

You can add your own custom options in addition to the standard PipelineOptions. To add your own options, define an interface with getter and setter methods for each option, as in the following example for adding input and output custom options:

public interface MyOptions extends PipelineOptions {
    String getInput();
    void setInput(String input);

    String getOutput();
    void setOutput(String output);
}
from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')
var (
  input = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

You can also specify a description, which appears when a user passes --help as a command-line argument, and a default value.

You set the description and default value using annotations, as follows:

public interface MyOptions extends PipelineOptions {
    @Description("Input for the pipeline")
    @Default.String("gs://my-bucket/input")
    String getInput();
    void setInput(String input);

    @Description("Output for the pipeline")
    @Default.String("gs://my-bucket/output")
    String getOutput();
    void setOutput(String output);
}
from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        help='Input for the pipeline',
        default='gs://my-bucket/input')
    parser.add_argument(
        '--output',
        help='Output for the pipeline',
        default='gs://my-bucket/output')
var (
  input = flag.String("input", "gs://my-bucket/input", "Input for the pipeline")
  output = flag.String("output", "gs://my-bucket/output", "Output for the pipeline")
)

It’s recommended that you register your interface with PipelineOptionsFactory and then pass the interface when creating the PipelineOptions object. When you register your interface with PipelineOptionsFactory, the --help can find your custom options interface and add it to the output of the --help command. PipelineOptionsFactory will also validate that your custom options are compatible with all other registered options.

The following example code shows how to register your custom options interface with PipelineOptionsFactory:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                                .withValidation()
                                                .as(MyOptions.class);

Now your pipeline can accept --input=value and --output=value as command-line arguments.

3. PCollections

The PCollection PCollection abstraction represents a potentially distributed, multi-element data set. You can think of a PCollection as “pipeline” data; Beam transforms use PCollection objects as inputs and outputs. As such, if you want to work with data in your pipeline, it must be in the form of a PCollection.

After you’ve created your Pipeline, you’ll need to begin by creating at least one PCollection in some form. The PCollection you create serves as the input for the first operation in your pipeline.

3.1. Creating a PCollection

You create a PCollection by either reading data from an external source using Beam’s Source API, or you can create a PCollection of data stored in an in-memory collection class in your driver program. The former is typically how a production pipeline would ingest data; Beam’s Source APIs contain adapters to help you read from external sources like large cloud-based files, databases, or subscription services. The latter is primarily useful for testing and debugging purposes.

3.1.1. Reading from an external source

To read from an external source, you use one of the Beam-provided I/O adapters. The adapters vary in their exact usage, but all of them read from some external data source and return a PCollection whose elements represent the data records in that source.

Each data source adapter has a Read transform; to read, you must apply that transform to the Pipeline object itself. TextIO.Read io.TextFileSource, for example, reads from an external text file and returns a PCollection whose elements are of type String, each String represents one line from the text file. Here’s how you would apply TextIO.Read io.TextFileSource to your Pipeline to create a PCollection:

public static void main(String[] args) {
    // Create the pipeline.
    PipelineOptions options =
        PipelineOptionsFactory.fromArgs(args).create();
    Pipeline p = Pipeline.create(options);

    // Create the PCollection 'lines' by applying a 'Read' transform.
    PCollection<String> lines = p.apply(
      "ReadMyFile", TextIO.read().from("gs://some/inputData.txt"));
}
lines = p | 'ReadMyFile' >> beam.io.ReadFromText(
    'gs://some/inputData.txt')
lines := textio.Read(s, "gs://some/inputData.txt")

See the section on I/O to learn more about how to read from the various data sources supported by the Beam SDK.

3.1.2. Creating a PCollection from in-memory data

To create a PCollection from an in-memory Java Collection, you use the Beam-provided Create transform. Much like a data adapter’s Read, you apply Create directly to your Pipeline object itself.

As parameters, Create accepts the Java Collection and a Coder object. The Coder specifies how the elements in the Collection should be encoded.

To create a PCollection from an in-memory list, you use the Beam-provided Create transform. Apply this transform directly to your Pipeline object itself.

The following example code shows how to create a PCollection from an in-memory Listlist:

public static void main(String[] args) {
    // Create a Java Collection, in this case a List of Strings.
    final List<String> LINES = Arrays.asList(
      "To be, or not to be: that is the question: ",
      "Whether 'tis nobler in the mind to suffer ",
      "The slings and arrows of outrageous fortune, ",
      "Or to take arms against a sea of troubles, ");

    // Create the pipeline.
    PipelineOptions options =
        PipelineOptionsFactory.fromArgs(args).create();
    Pipeline p = Pipeline.create(options);

    // Apply Create, passing the list and the coder, to create the PCollection.
    p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of());
}
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# argv = None  # if None, uses sys.argv
pipeline_options = PipelineOptions(argv)
with beam.Pipeline(options=pipeline_options) as pipeline:
  lines = (
      pipeline
      | beam.Create([
          'To be, or not to be: that is the question: ',
          "Whether 'tis nobler in the mind to suffer ",
          'The slings and arrows of outrageous fortune, ',
          'Or to take arms against a sea of troubles, ',
      ]))

3.2. PCollection characteristics

A PCollection is owned by the specific Pipeline object for which it is created; multiple pipelines cannot share a PCollection. In some respects, a PCollection functions like a Collection class. However, a PCollection can differ in a few key ways:

3.2.1. Element type

The elements of a PCollection may be of any type, but must all be of the same type. However, to support distributed processing, Beam needs to be able to encode each individual element as a byte string (so elements can be passed around to distributed workers). The Beam SDKs provide a data encoding mechanism that includes built-in encoding for commonly-used types as well as support for specifying custom encodings as needed.

3.2.2. Element schema

In many cases, the element type in a PCollection has a structure that can introspected. Examples are JSON, Protocol Buffer, Avro, and database records. Schemas provide a way to express types as a set of named fields, allowing for more-expressive aggregations.

3.2.3. Immutability

A PCollection is immutable. Once created, you cannot add, remove, or change individual elements. A Beam Transform might process each element of a PCollection and generate new pipeline data (as a new PCollection), but it does not consume or modify the original input collection.

3.2.4. Random access

A PCollection does not support random access to individual elements. Instead, Beam Transforms consider every element in a PCollection individually.

3.2.5. Size and boundedness

A PCollection is a large, immutable “bag” of elements. There is no upper limit on how many elements a PCollection can contain; any given PCollection might fit in memory on a single machine, or it might represent a very large distributed data set backed by a persistent data store.

A PCollection can be either bounded or unbounded in size. A bounded PCollection represents a data set of a known, fixed size, while an unbounded PCollection represents a data set of unlimited size. Whether a PCollection is bounded or unbounded depends on the source of the data set that it represents. Reading from a batch data source, such as a file or a database, creates a bounded PCollection. Reading from a streaming or continuously-updating data source, such as Pub/Sub or Kafka, creates an unbounded PCollection (unless you explicitly tell it not to).

The bounded (or unbounded) nature of your PCollection affects how Beam processes your data. A bounded PCollection can be processed using a batch job, which might read the entire data set once, and perform processing in a job of finite length. An unbounded PCollection must be processed using a streaming job that runs continuously, as the entire collection can never be available for processing at any one time.

Beam uses windowing to divide a continuously updating unbounded PCollection into logical windows of finite size. These logical windows are determined by some characteristic associated with a data element, such as a timestamp. Aggregation transforms (such as GroupByKey and Combine) work on a per-window basis — as the data set is generated, they process each PCollection as a succession of these finite windows.

3.2.6. Element timestamps

Each element in a PCollection has an associated intrinsic timestamp. The timestamp for each element is initially assigned by the Source that creates the PCollection. Sources that create an unbounded PCollection often assign each new element a timestamp that corresponds to when the element was read or added.

Note: Sources that create a bounded PCollection for a fixed data set also automatically assign timestamps, but the most common behavior is to assign every element the same timestamp (Long.MIN_VALUE).

Timestamps are useful for a PCollection that contains elements with an inherent notion of time. If your pipeline is reading a stream of events, like Tweets or other social media messages, each element might use the time the event was posted as the element timestamp.

You can manually assign timestamps to the elements of a PCollection if the source doesn’t do it for you. You’ll want to do this if the elements have an inherent timestamp, but the timestamp is somewhere in the structure of the element itself (such as a “time” field in a server log entry). Beam has Transforms that take a PCollection as input and output an identical PCollection with timestamps attached; see Adding Timestamps for more information about how to do so.

4. Transforms

Transforms are the operations in your pipeline, and provide a generic processing framework. You provide processing logic in the form of a function object (colloquially referred to as “user code”), and your user code is applied to each element of an input PCollection (or more than one PCollection). Depending on the pipeline runner and back-end that you choose, many different workers across a cluster may execute instances of your user code in parallel. The user code running on each worker generates the output elements that are ultimately added to the final output PCollection that the transform produces.

The Beam SDKs contain a number of different transforms that you can apply to your pipeline’s PCollections. These include general-purpose core transforms, such as ParDo or Combine. There are also pre-written composite transforms included in the SDKs, which combine one or more of the core transforms in a useful processing pattern, such as counting or combining elements in a collection. You can also define your own more complex composite transforms to fit your pipeline’s exact use case.

4.1. Applying transforms

To invoke a transform, you must apply it to the input PCollection. Each transform in the Beam SDKs has a generic apply method (or pipe operator |). Invoking multiple Beam transforms is similar to method chaining, but with one slight difference: You apply the transform to the input PCollection, passing the transform itself as an argument, and the operation returns the output PCollection. This takes the general form:

[Output PCollection] = [Input PCollection].apply([Transform])
[Output PCollection] = [Input PCollection] | [Transform]

Because Beam uses a generic apply method for PCollection, you can both chain transforms sequentially and also apply transforms that contain other transforms nested within (called composite transforms in the Beam SDKs).

How you apply your pipeline’s transforms determines the structure of your pipeline. The best way to think of your pipeline is as a directed acyclic graph, where PTransform nodes are subroutines that accept PCollection nodes as inputs and emit PCollection nodes as outputs. For example, you can chain together transforms to create a pipeline that successively modifies input data:

[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
              | [Second Transform]
              | [Third Transform])

The graph of this pipeline looks like the following:

This linear pipeline starts with one input collection, sequentially appliesthree transforms, and ends with one output collection.

Figure 1: A linear pipeline with three sequential transforms.

However, note that a transform does not consume or otherwise alter the input collection — remember that a PCollection is immutable by definition. This means that you can apply multiple transforms to the same input PCollection to create a branching pipeline, like so:

[PCollection of database table rows] = [Database Table Reader].apply([Read Transform])
[PCollection of 'A' names] = [PCollection of database table rows].apply([Transform A])
[PCollection of 'B' names] = [PCollection of database table rows].apply([Transform B])
[PCollection of database table rows] = [Database Table Reader] | [Read Transform]
[PCollection of 'A' names] = [PCollection of database table rows] | [Transform A]
[PCollection of 'B' names] = [PCollection of database table rows] | [Transform B]

The graph of this branching pipeline looks like the following:

This pipeline applies two transforms to a single input collection. Eachtransform produces an output collection.

Figure 2: A branching pipeline. Two transforms are applied to a single PCollection of database table rows.

You can also build your own composite transforms that nest multiple transforms inside a single, larger transform. Composite transforms are particularly useful for building a reusable sequence of simple steps that get used in a lot of different places.

4.2. Core Beam transforms

Beam provides the following core transforms, each of which represents a different processing paradigm:

4.2.1. ParDo

ParDo is a Beam transform for generic parallel processing. The ParDo processing paradigm is similar to the “Map” phase of a Map/Shuffle/Reduce-style algorithm: a ParDo transform considers each element in the input PCollection, performs some processing function (your user code) on that element, and emits zero, one, or multiple elements to an output PCollection.

ParDo is useful for a variety of common data processing operations, including:

In such roles, ParDo is a common intermediate step in a pipeline. You might use it to extract certain fields from a set of raw input records, or convert raw input into a different format; you might also use ParDo to convert processed data into a format suitable for output, like database table rows or printable strings.

When you apply a ParDo transform, you’ll need to provide user code in the form of a DoFn object. DoFn is a Beam SDK class that defines a distributed processing function.

When you create a subclass of DoFn, note that your subclass should adhere to the Requirements for writing user code for Beam transforms.

4.2.1.1. Applying ParDo

Like all Beam transforms, you apply ParDo by calling the apply method on the input PCollection and passing ParDo as an argument, as shown in the following example code:

// The input PCollection of Strings.
PCollection<String> words = ...;

// The DoFn to perform on each element in the input PCollection.
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

// Apply a ParDo to the PCollection "words" to compute lengths for each word.
PCollection<Integer> wordLengths = words.apply(
    ParDo
    .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
                                            // we define above.
# The input PCollection of Strings.
words = ...

# The DoFn to perform on each element in the input PCollection.

class ComputeWordLengthFn(beam.DoFn):
  def process(self, element):
    return [len(element)]



# Apply a ParDo to the PCollection "words" to compute lengths for each word.
word_lengths = words | beam.ParDo(ComputeWordLengthFn())
// words is the input PCollection of strings
var words beam.PCollection = ...

func computeWordLengthFn(word string) int {
      return len(word)
}

wordLengths := beam.ParDo(s, computeWordLengthFn, words)

In the example, our input PCollection contains String values. We apply a ParDo transform that specifies a function (ComputeWordLengthFn) to compute the length of each string, and outputs the result to a new PCollection of Integer values that stores the length of each word.

4.2.1.2. Creating a DoFn

The DoFn object that you pass to ParDo contains the processing logic that gets applied to the elements in the input collection. When you use Beam, often the most important pieces of code you’ll write are these DoFns - they’re what define your pipeline’s exact data processing tasks.

Note: When you create your DoFn, be mindful of the Requirements for writing user code for Beam transforms and ensure that your code follows them.

A DoFn processes one element at a time from the input PCollection. When you create a subclass of DoFn, you’ll need to provide type parameters that match the types of the input and output elements. If your DoFn processes incoming String elements and produces Integer elements for the output collection (like our previous example, ComputeWordLengthFn), your class declaration would look like this:

static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

Inside your DoFn subclass, you’ll write a method annotated with @ProcessElement where you provide the actual processing logic. You don’t need to manually extract the elements from the input collection; the Beam SDKs handle that for you. Your @ProcessElement method should accept a parameter tagged with @Element, which will be populated with the input element. In order to output elements, the method can also take a parameter of type OutputReceiver which provides a method for emitting elements. The parameter types must match the input and output types of your DoFn or the framework will raise an error. Note: @Element and OutputReceiver were introduced in Beam 2.5.0; if using an earlier release of Beam, a ProcessContext parameter should be used instead.

Inside your DoFn subclass, you’ll write a method process where you provide the actual processing logic. You don’t need to manually extract the elements from the input collection; the Beam SDKs handle that for you. Your process method should accept an argument element, which is the input element, and return an iterable with its output values. You can accomplish this by emitting individual elements with yield statements. You can also use a return statement with an iterable, like a list or a generator.

static class ComputeWordLengthFn extends DoFn<String, Integer> {
  @ProcessElement
  public void processElement(@Element String word, OutputReceiver<Integer> out) {
    // Use OutputReceiver.output to emit the output element.
    out.output(word.length());
  }
}
class ComputeWordLengthFn(beam.DoFn):
  def process(self, element):
    return [len(element)]

Note: If the elements in your input PCollection are key/value pairs, you can access the key or value by using element.getKey() or element.getValue(), respectively.

A given DoFn instance generally gets invoked one or more times to process some arbitrary bundle of elements. However, Beam doesn’t guarantee an exact number of invocations; it may be invoked multiple times on a given worker node to account for failures and retries. As such, you can cache information across multiple calls to your processing method, but if you do so, make sure the implementation does not depend on the number of invocations.

In your processing method, you’ll also need to meet some immutability requirements to ensure that Beam and the processing back-end can safely serialize and cache the values in your pipeline. Your method should meet the following requirements:

4.2.1.3. Lightweight DoFns and other abstractions

If your function is relatively straightforward, you can simplify your use of ParDo by providing a lightweight DoFn in-line, as an anonymous inner class instance a lambda function.

Here’s the previous example, ParDo with ComputeLengthWordsFn, with the DoFn specified as an anonymous inner class instance a lambda function:

// The input PCollection.
PCollection<String> words = ...;

// Apply a ParDo with an anonymous DoFn to the PCollection words.
// Save the result as the PCollection wordLengths.
PCollection<Integer> wordLengths = words.apply(
  "ComputeWordLengths",                     // the transform name
  ParDo.of(new DoFn<String, Integer>() {    // a DoFn as an anonymous inner class instance
      @ProcessElement
      public void processElement(@Element String word, OutputReceiver<Integer> out) {
        out.output(word.length());
      }
    }));
# The input PCollection of strings.
words = ...

# Apply a lambda function to the PCollection words.
# Save the result as the PCollection word_lengths.

word_lengths = words | beam.FlatMap(lambda word: [len(word)])
// words is the input PCollection of strings
var words beam.PCollection = ...

lengths := beam.ParDo(s, func (word string) int {
      return len(word)
}, words)

If your ParDo performs a one-to-one mapping of input elements to output elements–that is, for each input element, it applies a function that produces exactly one output element, you can use the higher-level MapElementsMap transform. MapElements can accept an anonymous Java 8 lambda function for additional brevity.

Here’s the previous example using MapElements Map:

// The input PCollection.
PCollection<String> words = ...;

// Apply a MapElements with an anonymous lambda function to the PCollection words.
// Save the result as the PCollection wordLengths.
PCollection<Integer> wordLengths = words.apply(
  MapElements.into(TypeDescriptors.integers())
             .via((String word) -> word.length()));
# The input PCollection of string.
words = ...

# Apply a Map with a lambda function to the PCollection words.
# Save the result as the PCollection word_lengths.

word_lengths = words | beam.Map(len)

Note: You can use Java 8 lambda functions with several other Beam transforms, including Filter, FlatMapElements, and Partition.

4.2.1.4. DoFn lifecycle

Here is a sequence diagram that shows the lifecycle of the DoFn during the execution of the ParDo transform. The comments give useful information to pipeline developers such as the constraints that apply to the objects or particular cases such as failover or instance reuse. They also give instantiation use cases.

This is a sequence diagram that shows the lifecycle of the DoFn

4.2.2. GroupByKey

GroupByKey is a Beam transform for processing collections of key/value pairs. It’s a parallel reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. The input to GroupByKey is a collection of key/value pairs that represents a multimap, where the collection contains multiple pairs that have the same key, but different values. Given such a collection, you use GroupByKey to collect all of the values associated with each unique key.

GroupByKey is a good way to aggregate data that has something in common. For example, if you have a collection that stores records of customer orders, you might want to group together all the orders from the same postal code (wherein the “key” of the key/value pair is the postal code field, and the “value” is the remainder of the record).

Let’s examine the mechanics of GroupByKey with a simple example case, where our data set consists of words from a text file and the line number on which they appear. We want to group together all the line numbers (values) that share the same word (key), letting us see all the places in the text where a particular word appears.

Our input is a PCollection of key/value pairs where each word is a key, and the value is a line number in the file where the word appears. Here’s a list of the key/value pairs in the input collection:

cat, 1
dog, 5
and, 1
jump, 3
tree, 2
cat, 5
dog, 2
and, 2
cat, 9
and, 6
...

GroupByKey gathers up all the values with the same key and outputs a new pair consisting of the unique key and a collection of all of the values that were associated with that key in the input collection. If we apply GroupByKey to our input collection above, the output collection would look like this:

cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
jump, [3]
tree, [2]
...

Thus, GroupByKey represents a transform from a multimap (multiple keys to individual values) to a uni-map (unique keys to collections of values).

4.2.2.1 GroupByKey and unbounded PCollections

If you are using unbounded PCollections, you must use either non-global windowing or an aggregation trigger in order to perform a GroupByKey or CoGroupByKey. This is because a bounded GroupByKey or CoGroupByKey must wait for all the data with a certain key to be collected, but with unbounded collections, the data is unlimited. Windowing and/or triggers allow grouping to operate on logical, finite bundles of data within the unbounded data streams.

If you do apply GroupByKey or CoGroupByKey to a group of unbounded PCollections without setting either a non-global windowing strategy, a trigger strategy, or both for each collection, Beam generates an IllegalStateException error at pipeline construction time.

When using GroupByKey or CoGroupByKey to group PCollections that have a windowing strategy applied, all of the PCollections you want to group must use the same windowing strategy and window sizing. For example, all of the collections you are merging must use (hypothetically) identical 5-minute fixed windows, or 4-minute sliding windows starting every 30 seconds.

If your pipeline attempts to use GroupByKey or CoGroupByKey to merge PCollections with incompatible windows, Beam generates an IllegalStateException error at pipeline construction time.

4.2.3. CoGroupByKey

CoGroupByKey performs a relational join of two or more key/value PCollections that have the same key type. Design Your Pipeline shows an example pipeline that uses a join.

Consider using CoGroupByKey if you have multiple data sets that provide information about related things. For example, let’s say you have two different files with user data: one file has names and email addresses; the other file has names and phone numbers. You can join those two data sets, using the user name as a common key and the other data as the associated values. After the join, you have one data set that contains all of the information (email addresses and phone numbers) associated with each name.

If you are using unbounded PCollections, you must use either non-global windowing or an aggregation trigger in order to perform a CoGroupByKey. See GroupByKey and unbounded PCollections for more details.

In the Beam SDK for Java, `CoGroupByKey` accepts a tuple of keyed `PCollection`s (`PCollection>`) as input. For type safety, the SDK requires you to pass each `PCollection` as part of a `KeyedPCollectionTuple`. You must declare a `TupleTag` for each input `PCollection` in the `KeyedPCollectionTuple` that you want to pass to `CoGroupByKey`. As output, `CoGroupByKey` returns a `PCollection>`, which groups values from all the input `PCollection`s by their common keys. Each key (all of type `K`) will have a different `CoGbkResult`, which is a map from `TupleTag` to `Iterable`. You can access a specific collection in an `CoGbkResult` object by using the `TupleTag` that you supplied with the initial collection. In the Beam SDK for Python, `CoGroupByKey` accepts a dictionary of keyed `PCollection`s as input. As output, `CoGroupByKey` creates a single output `PCollection` that contains one key/value tuple for each key in the input `PCollection`s. Each key's value is a dictionary that maps each tag to an iterable of the values under they key in the corresponding `PCollection`.

The following conceptual examples use two input collections to show the mechanics of CoGroupByKey.

The first set of data has a `TupleTag` called `emailsTag` and contains names and email addresses. The second set of data has a `TupleTag` called `phonesTag` and contains names and phone numbers. The first set of data contains names and email addresses. The second set of data contains names and phone numbers.
final List<KV<String, String>> emailsList =
    Arrays.asList(
        KV.of("amy", "amy@example.com"),
        KV.of("carl", "carl@example.com"),
        KV.of("julia", "julia@example.com"),
        KV.of("carl", "carl@email.com"));

final List<KV<String, String>> phonesList =
    Arrays.asList(
        KV.of("amy", "111-222-3333"),
        KV.of("james", "222-333-4444"),
        KV.of("amy", "333-444-5555"),
        KV.of("carl", "444-555-6666"));

PCollection<KV<String, String>> emails = p.apply("CreateEmails", Create.of(emailsList));
PCollection<KV<String, String>> phones = p.apply("CreatePhones", Create.of(phonesList));
emails_list = [
    ('amy', 'amy@example.com'),
    ('carl', 'carl@example.com'),
    ('julia', 'julia@example.com'),
    ('carl', 'carl@email.com'),
]
phones_list = [
    ('amy', '111-222-3333'),
    ('james', '222-333-4444'),
    ('amy', '333-444-5555'),
    ('carl', '444-555-6666'),
]

emails = p | 'CreateEmails' >> beam.Create(emails_list)
phones = p | 'CreatePhones' >> beam.Create(phones_list)

After CoGroupByKey, the resulting data contains all data associated with each unique key from any of the input collections.

final TupleTag<String> emailsTag = new TupleTag<>();
final TupleTag<String> phonesTag = new TupleTag<>();

final List<KV<String, CoGbkResult>> expectedResults =
    Arrays.asList(
        KV.of(
            "amy",
            CoGbkResult.of(emailsTag, Arrays.asList("amy@example.com"))
                .and(phonesTag, Arrays.asList("111-222-3333", "333-444-5555"))),
        KV.of(
            "carl",
            CoGbkResult.of(emailsTag, Arrays.asList("carl@email.com", "carl@example.com"))
                .and(phonesTag, Arrays.asList("444-555-6666"))),
        KV.of(
            "james",
            CoGbkResult.of(emailsTag, Arrays.asList())
                .and(phonesTag, Arrays.asList("222-333-4444"))),
        KV.of(
            "julia",
            CoGbkResult.of(emailsTag, Arrays.asList("julia@example.com"))
                .and(phonesTag, Arrays.asList())));
results = [
    (
        'amy',
        {
            'emails': ['amy@example.com'],
            'phones': ['111-222-3333', '333-444-5555']
        }),
    (
        'carl',
        {
            'emails': ['carl@email.com', 'carl@example.com'],
            'phones': ['444-555-6666']
        }),
    ('james', {
        'emails': [], 'phones': ['222-333-4444']
    }),
    ('julia', {
        'emails': ['julia@example.com'], 'phones': []
    }),
]

The following code example joins the two PCollections with CoGroupByKey, followed by a ParDo to consume the result. Then, the code uses tags to look up and format data from each collection.

PCollection<KV<String, CoGbkResult>> results =
    KeyedPCollectionTuple.of(emailsTag, emails)
        .and(phonesTag, phones)
        .apply(CoGroupByKey.create());

PCollection<String> contactLines =
    results.apply(
        ParDo.of(
            new DoFn<KV<String, CoGbkResult>, String>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
                KV<String, CoGbkResult> e = c.element();
                String name = e.getKey();
                Iterable<String> emailsIter = e.getValue().getAll(emailsTag);
                Iterable<String> phonesIter = e.getValue().getAll(phonesTag);
                String formattedResult =
                    Snippets.formatCoGbkResults(name, emailsIter, phonesIter);
                c.output(formattedResult);
              }
            }));
# The result PCollection contains one key-value element for each key in the
# input PCollections. The key of the pair will be the key from the input and
# the value will be a dictionary with two entries: 'emails' - an iterable of
# all values for the current key in the emails PCollection and 'phones': an
# iterable of all values for the current key in the phones PCollection.
results = ({'emails': emails, 'phones': phones} | beam.CoGroupByKey())

def join_info(name_info):
  (name, info) = name_info
  return '%s; %s; %s' %\
      (name, sorted(info['emails']), sorted(info['phones']))

contact_lines = results | beam.Map(join_info)

The formatted data looks like this:

final List<String> formattedResults =
    Arrays.asList(
        "amy; ['amy@example.com']; ['111-222-3333', '333-444-5555']",
        "carl; ['carl@email.com', 'carl@example.com']; ['444-555-6666']",
        "james; []; ['222-333-4444']",
        "julia; ['julia@example.com']; []");
formatted_results = [
    "amy; ['amy@example.com']; ['111-222-3333', '333-444-5555']",
    "carl; ['carl@email.com', 'carl@example.com']; ['444-555-6666']",
    "james; []; ['222-333-4444']",
    "julia; ['julia@example.com']; []",
]

4.2.4. Combine

Combine Combine is a Beam transform for combining collections of elements or values in your data. Combine has variants that work on entire PCollections, and some that combine the values for each key in PCollections of key/value pairs.

When you apply a Combine transform, you must provide the function that contains the logic for combining the elements or values. The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key. Because the input data (including the value collection) may be distributed across multiple workers, the combining function might be called multiple times to perform partial combining on subsets of the value collection. The Beam SDK also provides some pre-built combine functions for common numeric combination operations such as sum, min, and max.

Simple combine operations, such as sums, can usually be implemented as a simple function. More complex combination operations might require you to create a subclass of CombineFn that has an accumulation type distinct from the input/output type.

4.2.4.1. Simple combinations using simple functions

The following example code shows a simple combine function.

// Sum a collection of Integer values. The function SumInts implements the interface SerializableFunction.
public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
  @Override
  public Integer apply(Iterable<Integer> input) {
    int sum = 0;
    for (int item : input) {
      sum += item;
    }
    return sum;
  }
}
pc = [1, 10, 100, 1000]

def bounded_sum(values, bound=500):
  return min(sum(values), bound)

small_sum = pc | beam.CombineGlobally(bounded_sum)  # [500]
large_sum = pc | beam.CombineGlobally(bounded_sum, bound=5000)  # [1111]
4.2.4.2. Advanced combinations using CombineFn

For more complex combine functions, you can define a subclass of CombineFn. You should use CombineFn if the combine function requires a more sophisticated accumulator, must perform additional pre- or post-processing, might change the output type, or takes the key into account.

A general combining operation consists of four operations. When you create a subclass of CombineFn, you must provide four operations by overriding the corresponding methods:

  1. Create Accumulator creates a new “local” accumulator. In the example case, taking a mean average, a local accumulator tracks the running sum of values (the numerator value for our final average division) and the number of values summed so far (the denominator value). It may be called any number of times in a distributed fashion.

  2. Add Input adds an input element to an accumulator, returning the accumulator value. In our example, it would update the sum and increment the count. It may also be invoked in parallel.

  3. Merge Accumulators merges several accumulators into a single accumulator; this is how data in multiple accumulators is combined before the final calculation. In the case of the mean average computation, the accumulators representing each portion of the division are merged together. It may be called again on its outputs any number of times.

  4. Extract Output performs the final computation. In the case of computing a mean average, this means dividing the combined sum of all the values by the number of values summed. It is called once on the final, merged accumulator.

The following example code shows how to define a CombineFn that computes a mean average:

public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
  public static class Accum {
    int sum = 0;
    int count = 0;
  }

  @Override
  public Accum createAccumulator() { return new Accum(); }

  @Override
  public Accum addInput(Accum accum, Integer input) {
      accum.sum += input;
      accum.count++;
      return accum;
  }

  @Override
  public Accum mergeAccumulators(Iterable<Accum> accums) {
    Accum merged = createAccumulator();
    for (Accum accum : accums) {
      merged.sum += accum.sum;
      merged.count += accum.count;
    }
    return merged;
  }

  @Override
  public Double extractOutput(Accum accum) {
    return ((double) accum.sum) / accum.count;
  }
}
pc = ...

class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return (0.0, 0)

  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum + input, count + 1

  def merge_accumulators(self, accumulators):
    sums, counts = zip(*accumulators)
    return sum(sums), sum(counts)

  def extract_output(self, sum_count):
    (sum, count) = sum_count
    return sum / count if count else float('NaN')
4.2.4.3. Combining a PCollection into a single value

Use the global combine to transform all of the elements in a given PCollection into a single value, represented in your pipeline as a new PCollection containing one element. The following example code shows how to apply the Beam provided sum combine function to produce a single sum value for a PCollection of integers.

// Sum.SumIntegerFn() combines the elements in the input PCollection. The resulting PCollection, called sum,
// contains one value: the sum of all the elements in the input PCollection.
PCollection<Integer> pc = ...;
PCollection<Integer> sum = pc.apply(
   Combine.globally(new Sum.SumIntegerFn()));
# sum combines the elements in the input PCollection.
# The resulting PCollection, called result, contains one value: the sum of all
# the elements in the input PCollection.
pc = ...

average = pc | beam.CombineGlobally(AverageFn())
4.2.4.4. Combine and global windowing

If your input PCollection uses the default global windowing, the default behavior is to return a PCollection containing one item. That item’s value comes from the accumulator in the combine function that you specified when applying Combine. For example, the Beam provided sum combine function returns a zero value (the sum of an empty input), while the min combine function returns a maximal or infinite value.

To have Combine instead return an empty PCollection if the input is empty, specify .withoutDefaults when you apply your Combine transform, as in the following code example:

PCollection<Integer> pc = ...;
PCollection<Integer> sum = pc.apply(
  Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());
pc = ...
sum = pc | beam.CombineGlobally(sum).without_defaults()
4.2.4.5. Combine and non-global windowing

If your PCollection uses any non-global windowing function, Beam does not provide the default behavior. You must specify one of the following options when applying Combine:

4.2.4.6. Combining values in a keyed PCollection

After creating a keyed PCollection (for example, by using a GroupByKey transform), a common pattern is to combine the collection of values associated with each key into a single, merged value. Drawing on the previous example from GroupByKey, a key-grouped PCollection called groupedWords looks like this:

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

In the above PCollection, each element has a string key (for example, “cat”) and an iterable of integers for its value (in the first element, containing [1, 5, 9]). If our pipeline’s next processing step combines the values (rather than considering them individually), you can combine the iterable of integers to create a single, merged value to be paired with each key. This pattern of a GroupByKey followed by merging the collection of values is equivalent to Beam’s Combine PerKey transform. The combine function you supply to Combine PerKey must be an associative reduction function or a subclass of CombineFn.

// PCollection is grouped by key and the Double values associated with each key are combined into a Double.
PCollection<KV<String, Double>> salesRecords = ...;
PCollection<KV<String, Double>> totalSalesPerPerson =
  salesRecords.apply(Combine.<String, Double, Double>perKey(
    new Sum.SumDoubleFn()));

// The combined value is of a different type than the original collection of values per key. PCollection has
// keys of type String and values of type Integer, and the combined value is a Double.
PCollection<KV<String, Integer>> playerAccuracy = ...;
PCollection<KV<String, Double>> avgAccuracyPerPlayer =
  playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
    new MeanInts())));
# PCollection is grouped by key and the numeric values associated with each key
# are averaged into a float.
player_accuracies = ...

avg_accuracy_per_player = (
    player_accuracies
    | beam.CombinePerKey(beam.combiners.MeanCombineFn()))

4.2.5. Flatten

Flatten Flatten is a Beam transform for PCollection objects that store the same data type. Flatten merges multiple PCollection objects into a single logical PCollection.

The following example shows how to apply a Flatten transform to merge multiple PCollection objects.

// Flatten takes a PCollectionList of PCollection objects of a given type.
// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);

PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
# Flatten takes a tuple of PCollection objects.
# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.

merged = (
    (pcoll1, pcoll2, pcoll3)
    # A list of tuples can be "piped" directly into a Flatten transform.
    | beam.Flatten())
4.2.5.1. Data encoding in merged collections

By default, the coder for the output PCollection is the same as the coder for the first PCollection in the input PCollectionList. However, the input PCollection objects can each use different coders, as long as they all contain the same data type in your chosen language.

4.2.5.2. Merging windowed collections

When using Flatten to merge PCollection objects that have a windowing strategy applied, all of the PCollection objects you want to merge must use a compatible windowing strategy and window sizing. For example, all the collections you’re merging must all use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.

If your pipeline attempts to use Flatten to merge PCollection objects with incompatible windows, Beam generates an IllegalStateException error when your pipeline is constructed.

4.2.6. Partition

Partition Partition is a Beam transform for PCollection objects that store the same data type. Partition splits a single PCollection into a fixed number of smaller collections.

Partition divides the elements of a PCollection according to a partitioning function that you provide. The partitioning function contains the logic that determines how to split up the elements of the input PCollection into each resulting partition PCollection. The number of partitions must be determined at graph construction time. You can, for example, pass the number of partitions as a command-line option at runtime (which will then be used to build your pipeline graph), but you cannot determine the number of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).

The following example divides a PCollection into percentile groups.

// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the
// partitioning function. In this example, we define the PartitionFn in-line. Returns a PCollectionList
// containing each of the resulting partitions as individual PCollection objects.
PCollection<Student> students = ...;
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
    students.apply(Partition.of(10, new PartitionFn<Student>() {
        public int partitionFor(Student student, int numPartitions) {
            return student.getPercentile()  // 0..99
                 * numPartitions / 100;
        }}));

// You can extract each partition from the PCollectionList using the get method, as follows:
PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
# Provide an int value with the desired number of result partitions, and a partitioning function (partition_fn in this example).
# Returns a tuple of PCollection objects containing each of the resulting partitions as individual PCollection objects.
students = ...

def partition_fn(student, num_partitions):
  return int(get_percentile(student) * num_partitions / 100)

by_decile = students | beam.Partition(partition_fn, 10)


# You can extract each partition from the tuple of PCollection objects as follows:

fortieth_percentile = by_decile[4]

4.3. Requirements for writing user code for Beam transforms

When you build user code for a Beam transform, you should keep in mind the distributed nature of execution. For example, there might be many copies of your function running on a lot of different machines in parallel, and those copies function independently, without communicating or sharing state with any of the other copies. Depending on the Pipeline Runner and processing back-end you choose for your pipeline, each copy of your user code function may be retried or run multiple times. As such, you should be cautious about including things like state dependency in your user code.

In general, your user code must fulfill at least these requirements:

In addition, it’s recommended that you make your function object idempotent. Non-idempotent functions are supported by Beam, but require additional thought to ensure correctness when there are external side effects.

Note: These requirements apply to subclasses of DoFn (a function object used with the ParDo transform), CombineFn (a function object used with the Combine transform), and WindowFn (a function object used with the Window transform).

4.3.1. Serializability

Any function object you provide to a transform must be fully serializable. This is because a copy of the function needs to be serialized and transmitted to a remote worker in your processing cluster. The base classes for user code, such as DoFn, CombineFn, and WindowFn, already implement Serializable; however, your subclass must not add any non-serializable members.

Some other serializability factors you should keep in mind are:

4.3.2. Thread-compatibility

Your function object should be thread-compatible. Each instance of your function object is accessed by a single thread at a time on a worker instance, unless you explicitly create your own threads. Note, however, that the Beam SDKs are not thread-safe. If you create your own threads in your user code, you must provide your own synchronization. Note that static members in your function object are not passed to worker instances and that multiple instances of your function may be accessed from different threads.

4.3.3. Idempotence

It’s recommended that you make your function object idempotent–that is, that it can be repeated or retried as often as necessary without causing unintended side effects. Non-idempotent functions are supported, however the Beam model provides no guarantees as to the number of times your user code might be invoked or retried; as such, keeping your function object idempotent keeps your pipeline’s output deterministic, and your transforms’ behavior more predictable and easier to debug.

4.4. Side inputs

In addition to the main input PCollection, you can provide additional inputs to a ParDo transform in the form of side inputs. A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection. When you specify a side input, you create a view of some other data that can be read from within the ParDo transform’s DoFn while processing each element.

Side inputs are useful if your ParDo needs to inject additional data when processing each element in the input PCollection, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline.

4.4.1. Passing side inputs to ParDo

  // Pass side inputs to your ParDo transform by invoking .withSideInputs.
  // Inside your DoFn, access the side input by using the method DoFn.ProcessContext.sideInput.

  // The input PCollection to ParDo.
  PCollection<String> words = ...;

  // A PCollection of word lengths that we'll combine into a single value.
  PCollection<Integer> wordLengths = ...; // Singleton PCollection

  // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
  final PCollectionView<Integer> maxWordLengthCutOffView =
     wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());


  // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
  PCollection<String> wordsBelowCutOff =
  words.apply(ParDo
      .of(new DoFn<String, String>() {
          @ProcessElement
          public void processElement(@Element String word, OutputReceiver<String> out, ProcessContext c) {
            // In our DoFn, access the side input.
            int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
            if (word.length() <= lengthCutOff) {
              out.output(word);
            }
          }
      }).withSideInputs(maxWordLengthCutOffView)
  );
# Side inputs are available as extra arguments in the DoFn's process method or Map / FlatMap's callable.
# Optional, positional, and keyword arguments are all supported. Deferred arguments are unwrapped into their
# actual values. For example, using pvalue.AsIteor(pcoll) at pipeline construction time results in an iterable
# of the actual elements of pcoll being passed into each process invocation. In this example, side inputs are
# passed to a FlatMap transform as extra arguments and consumed by filter_using_length.
words = ...

# Callable takes additional arguments.
def filter_using_length(word, lower_bound, upper_bound=float('inf')):
  if lower_bound <= len(word) <= upper_bound:
    yield word

# Construct a deferred side input.
avg_word_len = (
    words
    | beam.Map(len)
    | beam.CombineGlobally(beam.combiners.MeanCombineFn()))

# Call with explicit side inputs.
small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)

# A single deferred side input.
larger_than_average = (
    words | 'large' >> beam.FlatMap(
        filter_using_length, lower_bound=pvalue.AsSingleton(avg_word_len))
)

# Mix and match.
small_but_nontrivial = words | beam.FlatMap(
    filter_using_length,
    lower_bound=2,
    upper_bound=pvalue.AsSingleton(avg_word_len))


# We can also pass side inputs to a ParDo transform, which will get passed to its process method.
# The first two arguments for the process method would be self and element.


class FilterUsingLength(beam.DoFn):
  def process(self, element, lower_bound, upper_bound=float('inf')):
    if lower_bound <= len(element) <= upper_bound:
      yield element

small_words = words | beam.ParDo(FilterUsingLength(), 0, 3)

...

4.4.2. Side inputs and windowing

A windowed PCollection may be infinite and thus cannot be compressed into a single value (or single collection class). When you create a PCollectionView of a windowed PCollection, the PCollectionView represents a single entity per window (one singleton per window, one list per window, etc.).

Beam uses the window(s) for the main input element to look up the appropriate window for the side input element. Beam projects the main input element’s window into the side input’s window set, and then uses the side input from the resulting window. If the main input and side inputs have identical windows, the projection provides the exact corresponding window. However, if the inputs have different windows, Beam uses the projection to choose the most appropriate side input window.

For example, if the main input is windowed using fixed-time windows of one minute, and the side input is windowed using fixed-time windows of one hour, Beam projects the main input window against the side input window set and selects the side input value from the appropriate hour-long side input window.

If the main input element exists in more than one window, then processElement gets called multiple times, once for each window. Each call to processElement projects the “current” window for the main input element, and thus might provide a different view of the side input each time.

If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger.

4.5. Additional outputs

While ParDo always produces a main output PCollection (as the return value from apply), you can also have your ParDo produce any number of additional output PCollections. If you choose to have multiple outputs, your ParDo returns all of the output PCollections (including the main output) bundled together.

4.5.1. Tags for multiple outputs

// To emit elements to multiple output PCollections, create a TupleTag object to identify each collection
// that your ParDo produces. For example, if your ParDo produces three output PCollections (the main output
// and two additional outputs), you must create three TupleTags. The following example code shows how to
// create TupleTags for a ParDo with three output PCollections.

  // Input PCollection to our ParDo.
  PCollection<String> words = ...;

  // The ParDo will filter words whose length is below a cutoff and add them to
  // the main output PCollection<String>.
  // If a word is above the cutoff, the ParDo will add the word length to an
  // output PCollection<Integer>.
  // If a word starts with the string "MARKER", the ParDo will add that word to an
  // output PCollection<String>.
  final int wordLengthCutOff = 10;

  // Create three TupleTags, one for each output PCollection.
  // Output that contains words below the length cutoff.
  final TupleTag<String> wordsBelowCutOffTag =
      new TupleTag<String>(){};
  // Output that contains word lengths.
  final TupleTag<Integer> wordLengthsAboveCutOffTag =
      new TupleTag<Integer>(){};
  // Output that contains "MARKER" words.
  final TupleTag<String> markedWordsTag =
      new TupleTag<String>(){};

// Passing Output Tags to ParDo:
// After you specify the TupleTags for each of your ParDo outputs, pass the tags to your ParDo by invoking
// .withOutputTags. You pass the tag for the main output first, and then the tags for any additional outputs
// in a TupleTagList. Building on our previous example, we pass the three TupleTags for our three output
// PCollections to our ParDo. Note that all of the outputs (including the main output PCollection) are
// bundled into the returned PCollectionTuple.

  PCollectionTuple results =
      words.apply(ParDo
          .of(new DoFn<String, String>() {
            // DoFn continues here.
            ...
          })
          // Specify the tag for the main output.
          .withOutputTags(wordsBelowCutOffTag,
          // Specify the tags for the two additional outputs as a TupleTagList.
                          TupleTagList.of(wordLengthsAboveCutOffTag)
                                      .and(markedWordsTag)));
# To emit elements to multiple output PCollections, invoke with_outputs() on the ParDo, and specify the
# expected tags for the outputs. with_outputs() returns a DoOutputsTuple object. Tags specified in
# with_outputs are attributes on the returned DoOutputsTuple object. The tags give access to the
# corresponding output PCollections.


results = (
    words
    | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x').with_outputs(
        'above_cutoff_lengths',
        'marked strings',
        main='below_cutoff_strings'))
below = results.below_cutoff_strings
above = results.above_cutoff_lengths
marked = results['marked strings']  # indexing works as well


# The result is also iterable, ordered in the same order that the tags were passed to with_outputs(),
# the main tag (if specified) first.


below, above, marked = (words
                        | beam.ParDo(
                            ProcessWords(), cutoff_length=2, marker='x')
                        .with_outputs('above_cutoff_lengths',
                                      'marked strings',
                                      main='below_cutoff_strings'))

4.5.2. Emitting to multiple outputs in your DoFn

// Inside your ParDo's DoFn, you can emit an element to a specific output PCollection by providing a
// MultiOutputReceiver to your process method, and passing in the appropriate TupleTag to obtain an OutputReceiver.
// After your ParDo, extract the resulting output PCollections from the returned PCollectionTuple.
// Based on the previous example, this shows the DoFn emitting to the main output and two additional outputs.

  .of(new DoFn<String, String>() {
     public void processElement(@Element String word, MultiOutputReceiver out) {
       if (word.length() <= wordLengthCutOff) {
         // Emit short word to the main output.
         // In this example, it is the output with tag wordsBelowCutOffTag.
         out.get(wordsBelowCutOffTag).output(word);
       } else {
         // Emit long word length to the output with tag wordLengthsAboveCutOffTag.
         out.get(wordLengthsAboveCutOffTag).output(word.length());
       }
       if (word.startsWith("MARKER")) {
         // Emit word to the output with tag markedWordsTag.
         out.get(markedWordsTag).output(word);
       }
     }}));
# Inside your ParDo's DoFn, you can emit an element to a specific output by wrapping the value and the output tag (str).
# using the pvalue.OutputValue wrapper class.
# Based on the previous example, this shows the DoFn emitting to the main output and two additional outputs.


class ProcessWords(beam.DoFn):
  def process(self, element, cutoff_length, marker):
    if len(element) <= cutoff_length:
      # Emit this short word to the main output.
      yield element
    else:
      # Emit this word's long length to the 'above_cutoff_lengths' output.
      yield pvalue.TaggedOutput('above_cutoff_lengths', len(element))
    if element.startswith(marker):
      # Emit this word to a different output with the 'marked strings' tag.
      yield pvalue.TaggedOutput('marked strings', element)



# Producing multiple outputs is also available in Map and FlatMap.
# Here is an example that uses FlatMap and shows that the tags do not need to be specified ahead of time.


def even_odd(x):
  yield pvalue.TaggedOutput('odd' if x % 2 else 'even', x)
  if x % 10 == 0:
    yield x

results = numbers | beam.FlatMap(even_odd).with_outputs()

evens = results.even
odds = results.odd
tens = results[None]  # the undeclared main output

4.5.3. Accessing additional parameters in your DoFn

In addition to the element and the OutputReceiver, Beam will populate other parameters to your DoFn’s @ProcessElement method. Any combination of these parameters can be added to your process method in any order.

In addition to the element, Beam will populate other parameters to your DoFn’s process method. Any combination of these parameters can be added to your process method in any order.

Timestamp: To access the timestamp of an input element, add a parameter annotated with @Timestamp of type Instant. For example:

Timestamp: To access the timestamp of an input element, add a keyword parameter default to DoFn.TimestampParam. For example:

.of(new DoFn<String, String>() {
     public void processElement(@Element String word, @Timestamp Instant timestamp) {
  }})
import apache_beam as beam

class ProcessRecord(beam.DoFn):

  def process(self, element, timestamp=beam.DoFn.TimestampParam):
     # access timestamp of element.
     pass

Window: To access the window an input element falls into, add a parameter of the type of the window used for the input PCollection. If the parameter is a window type (a subclass of BoundedWindow) that does not match the input PCollection, then an error will be raised. If an element falls in multiple windows (for example, this will happen when using SlidingWindows), then the @ProcessElement method will be invoked multiple time for the element, once for each window. For example, when fixed windows are being used, the window is of type IntervalWindow.

Window: To access the window an input element falls into, add a keyword parameter default to DoFn.WindowParam. If an element falls in multiple windows (for example, this will happen when using SlidingWindows), then the process method will be invoked multiple time for the element, once for each window.

.of(new DoFn<String, String>() {
     public void processElement(@Element String word, IntervalWindow window) {
  }})
import apache_beam as beam

class ProcessRecord(beam.DoFn):

  def process(self, element, window=beam.DoFn.WindowParam):
     # access window e.g. window.end.micros
     pass

PaneInfo: When triggers are used, Beam provides a PaneInfo object that contains information about the current firing. Using PaneInfo you can determine whether this is an early or a late firing, and how many times this window has already fired for this key.

PaneInfo: When triggers are used, Beam provides a DoFn.PaneInfoParam object that contains information about the current firing. Using DoFn.PaneInfoParam you can determine whether this is an early or a late firing, and how many times this window has already fired for this key. This feature implementation in Python SDK is not fully completed; see more at BEAM-3759.

.of(new DoFn<String, String>() {
     public void processElement(@Element String word, PaneInfo paneInfo) {
  }})
import apache_beam as beam

class ProcessRecord(beam.DoFn):

  def process(self, element, pane_info=beam.DoFn.PaneInfoParam):
     # access pane info, e.g. pane_info.is_first, pane_info.is_last, pane_info.timing
     pass

PipelineOptions: The PipelineOptions for the current pipeline can always be accessed in a process method by adding it as a parameter:

.of(new DoFn<String, String>() {
     public void processElement(@Element String word, PipelineOptions options) {
  }})

@OnTimer methods can also access many of these parameters. Timestamp, Window, key, PipelineOptions, OutputReceiver, and MultiOutputReceiver parameters can all be accessed in an @OnTimer method. In addition, an @OnTimer method can take a parameter of type TimeDomain which tells whether the timer is based on event time or processing time. Timers are explained in more detail in the Timely (and Stateful) Processing with Apache Beam blog post.

Timer and State: In addition to aforementioned parameters, user defined Timer and State parameters can be used in a stateful DoFn. Timers and States are explained in more detail in the Timely (and Stateful) Processing with Apache Beam blog post.

class StatefulDoFn(beam.DoFn):
  """An example stateful DoFn with state and timer"""

  BUFFER_STATE_1 = BagStateSpec('buffer1', beam.BytesCoder())
  BUFFER_STATE_2 = BagStateSpec('buffer2', beam.VarIntCoder())
  WATERMARK_TIMER = TimerSpec('watermark_timer', TimeDomain.WATERMARK)

  def process(self,
              element,
              timestamp=beam.DoFn.TimestampParam,
              window=beam.DoFn.WindowParam,
              buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
              buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2),
              watermark_timer=beam.DoFn.TimerParam(WATERMARK_TIMER)):

    # Do your processing here
    key, value = element
    # Read all the data from buffer1
    all_values_in_buffer_1 = [x for x in buffer_1.read()]

    if StatefulDoFn._is_clear_buffer_1_required(all_values_in_buffer_1):
        # clear the buffer data if required conditions are met.
        buffer_1.clear()

    # add the value to buffer 2
    buffer_2.add(value)

    if StatefulDoFn._all_condition_met():
      # Clear the timer if certain condition met and you don't want to trigger
      # the callback method.
      watermark_timer.clear()

    yield element

  @on_timer(WATERMARK_TIMER)
  def on_expiry_1(self,
                  timestamp=beam.DoFn.TimestampParam,
                  window=beam.DoFn.WindowParam,
                  key=beam.DoFn.KeyParam,
                  buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
                  buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2)):
    # Window and key parameters are really useful especially for debugging issues.
    yield 'expired1'

  @staticmethod
  def _all_condition_met():
      # some logic
      return True

  @staticmethod
  def _is_clear_buffer_1_required(buffer_1_data):
      # Some business logic
      return True

4.6. Composite transforms

Transforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one ParDo, Combine, GroupByKey, or even other composite transforms). These transforms are called composite transforms. Nesting multiple transforms inside a single composite transform can make your code more modular and easier to understand.

The Beam SDK comes packed with many useful composite transforms. See the API reference pages for a list of transforms:

4.6.1. An example composite transform

The CountWords transform in the WordCount example program is an example of a composite transform. CountWords is a PTransform subclass that consists of multiple nested transforms.

In its expand method, the CountWords transform applies the following transform operations:

  1. It applies a ParDo on the input PCollection of text lines, producing an output PCollection of individual words.
  2. It applies the Beam SDK library transform Count on the PCollection of words, producing a PCollection of key/value pairs. Each key represents a word in the text, and each value represents the number of times that word appeared in the original data.
  public static class CountWords extends PTransform<PCollection<String>,
      PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
          ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
          words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }
# The CountWords Composite Transform inside the WordCount pipeline.
class CountWords(beam.PTransform):
  def expand(self, pcoll):
    return (
        pcoll
        # Convert lines of text into individual words.
        | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
        # Count the number of times each word occurs.
        | beam.combiners.Count.PerElement()
        # Format each word and count into a printable string.
        | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Note: Because Count is itself a composite transform, CountWords is also a nested composite transform.

4.6.2. Creating a composite transform

To create your own composite transform, create a subclass of the PTransform class and override the expand method to specify the actual processing logic. You can then use this transform just as you would a built-in transform from the Beam SDK.

For the PTransform class type parameters, you pass the PCollection types that your transform takes as input, and produces as output. To take multiple PCollections as input, or produce multiple PCollections as output, use one of the multi-collection types for the relevant type parameter.

The following code sample shows how to declare a PTransform that accepts a PCollection of Strings for input, and outputs a PCollection of Integers:

  static class ComputeWordLengths
    extends PTransform<PCollection<String>, PCollection<Integer>> {
    ...
  }
class ComputeWordLengths(beam.PTransform):
  def expand(self, pcoll):
    # Transform logic goes here.
    return pcoll | beam.Map(lambda x: len(x))

Within your PTransform subclass, you’ll need to override the expand method. The expand method is where you add the processing logic for the PTransform. Your override of expand must accept the appropriate type of input PCollection as a parameter, and specify the output PCollection as the return value.

The following code sample shows how to override expand for the ComputeWordLengths class declared in the previous example:

  static class ComputeWordLengths
      extends PTransform<PCollection<String>, PCollection<Integer>> {
    @Override
    public PCollection<Integer> expand(PCollection<String>) {
      ...
      // transform logic goes here
      ...
    }
class ComputeWordLengths(beam.PTransform):
  def expand(self, pcoll):
    # Transform logic goes here.
    return pcoll | beam.Map(lambda x: len(x))

As long as you override the expand method in your PTransform subclass to accept the appropriate input PCollection(s) and return the corresponding output PCollection(s), you can include as many transforms as you want. These transforms can include core transforms, composite transforms, or the transforms included in the Beam SDK libraries.

Your composite transform’s parameters and return value must match the initial input type and final return type for the entire transform, even if the transform’s intermediate data changes type multiple times.

Note: The expand method of a PTransform is not meant to be invoked directly by the user of a transform. Instead, you should call the apply method on the PCollection itself, with the transform as an argument. This allows transforms to be nested within the structure of your pipeline.

4.6.3. PTransform Style Guide

The PTransform Style Guide contains additional information not included here, such as style guidelines, logging and testing guidance, and language-specific considerations. The guide is a useful starting point when you want to write new composite PTransforms.

5. Pipeline I/O

When you create a pipeline, you often need to read data from some external source, such as a file or a database. Likewise, you may want your pipeline to output its result data to an external storage system. Beam provides read and write transforms for a number of common data storage types. If you want your pipeline to read from or write to a data storage format that isn’t supported by the built-in transforms, you can implement your own read and write transforms.

5.1. Reading input data

Read transforms read data from an external source and return a PCollection representation of the data for use by your pipeline. You can use a read transform at any point while constructing your pipeline to create a new PCollection, though it will be most common at the start of your pipeline.

PCollection<String> lines = p.apply(TextIO.read().from("gs://some/inputData.txt"));
lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt')

5.2. Writing output data

Write transforms write the data in a PCollection to an external data source. You will most often use write transforms at the end of your pipeline to output your pipeline’s final results. However, you can use a write transform to output a PCollection's data at any point in your pipeline.

output.apply(TextIO.write().to("gs://some/outputData"));
output | beam.io.WriteToText('gs://some/outputData')

5.3. File-based input and output data

5.3.1. Reading from multiple locations

Many read transforms support reading from multiple input files matching a glob operator you provide. Note that glob operators are filesystem-specific and obey filesystem-specific consistency models. The following TextIO example uses a glob operator (*) to read all matching input files that have prefix “input-” and the suffix “.csv” in the given location:

p.apply("ReadFromText",
    TextIO.read().from("protocol://my_bucket/path/to/input-*.csv"));
lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')

To read data from disparate sources into a single PCollection, read each one independently and then use the Flatten transform to create a single PCollection.

5.3.2. Writing to multiple output files

For file-based output data, write transforms write to multiple output files by default. When you pass an output file name to a write transform, the file name is used as the prefix for all output files that the write transform produces. You can append a suffix to each output file by specifying a suffix.

The following write transform example writes multiple output files to a location. Each file has the prefix “numbers”, a numeric tag, and the suffix “.csv”.

records.apply("WriteToText",
    TextIO.write().to("protocol://my_bucket/path/to/numbers")
                .withSuffix(".csv"));
filtered_words | 'WriteToText' >> beam.io.WriteToText(
    '/path/to/numbers', file_name_suffix='.csv')

5.4. Beam-provided I/O transforms

See the Beam-provided I/O Transforms page for a list of the currently available I/O transforms.

6. Schemas

Often, the types of the records being processed have an obvious structure. Common Beam sources produce JSON, Avro, Protocol Buffer, or database row objects; all of these types have well defined structures, structures that can often be determined by examining the type. Even within a SDK pipeline, Simple Java POJOs (or equivalent structures in other languages) are often used as intermediate types, and these also have a clear structure that can be inferred by inspecting the class. By understanding the structure of a pipeline’s records, we can provide much more concise APIs for data processing.

6.1. What is a schema?

Most structured records share some common characteristics:

Often records have a nested structure. A nested structure occurs when a field itself has subfields so the type of the field itself has a schema. Fields that are array or map types is also a common feature of these structured records.

For example, consider the following schema, representing actions in a fictitious e-commerce company:

Purchase

Field NameField Type
userIdSTRING
itemIdINT64
shippingAddressROW(ShippingAddress)
costINT64
transactionsARRAY[ROW(Transaction)]

ShippingAddress

Field NameField Type
streetAddressSTRING
citySTRING
statenullable STRING
countrySTRING
postCodeSTRING

Transaction

Field NameField Type
bankSTRING
purchaseAmountDOUBLE

Purchase event records are represented by the above purchase schema. Each purchase event contains a shipping address, which is a nested row containing its own schema. Each purchase also contains an array of credit-card transactions (a list, because a purchase might be split across multiple credit cards); each item in the transaction list is a row with its own schema.

This provides an abstract description of the types involved, one that is abstracted away from any specific programming language.

Schemas provide us a type-system for Beam records that is independent of any specific programming-language type. There might be multiple Java classes that all have the same schema (for example a Protocol-Buffer class or a POJO class), and Beam will allow us to seamlessly convert between these types. Schemas also provide a simple way to reason about types across different programming-language APIs.

A PCollection with a schema does not need to have a Coder specified, as Beam knows how to encode and decode Schema rows; Beam uses a special coder to encode schema types.

6.2. Schemas for programming language types

While schemas themselves are language independent, they are designed to embed naturally into the programming languages of the Beam SDK being used. This allows Beam users to continue using native types while reaping the advantage of having Beam understand their element schemas.

In Java you could use the following set of classes to represent the purchase schema. Beam will automatically infer the correct schema based on the members of the class.

In Python you can use the following set of classes to represent the purchase schema. Beam will automatically infer the correct schema based on the members of the class.

@DefaultSchema(JavaBeanSchema.class)
public class Purchase {
  public String getUserId();  // Returns the id of the user who made the purchase.
  public long getItemId();  // Returns the identifier of the item that was purchased.
  public ShippingAddress getShippingAddress();  // Returns the shipping address, a nested type.
  public long getCostCents();  // Returns the cost of the item.
  public List<Transaction> getTransactions();  // Returns the transactions that paid for this purchase (returns a list, since the purchase might be spread out over multiple credit cards).

  @SchemaCreate
  public Purchase(String userId, long itemId, ShippingAddress shippingAddress, long costCents,
                  List<Transaction> transactions) {
      ...
  }
}

@DefaultSchema(JavaBeanSchema.class)
public class ShippingAddress {
  public String getStreetAddress();
  public String getCity();
  @Nullable public String getState();
  public String getCountry();
  public String getPostCode();

  @SchemaCreate
  public ShippingAddress(String streetAddress, String city, @Nullable String state, String country,
                         String postCode) {
     ...
  }
}

@DefaultSchema(JavaBeanSchema.class)
public class Transaction {
  public String getBank();
  public double getPurchaseAmount();

  @SchemaCreate
  public Transaction(String bank, double purchaseAmount) {
     ...
  }
}
import typing

class Purchase(typing.NamedTuple):
  user_id: str  # The id of the user who made the purchase.
  item_id: int  # The identifier of the item that was purchased.
  shipping_address: ShippingAddress  # The shipping address, a nested type.
  cost_cents: int  # The cost of the item
  transactions: typing.Sequence[Transaction]  # The transactions that paid for this purchase (a list, since the purchase might be spread out over multiple credit cards).

class ShippingAddress(typing.NamedTuple):
  street_address: str
  city: str
  state: typing.Optional[str]
  country: str
  postal_code: str

class Transaction(typing.NamedTuple):
  bank: str
  purchase_amount: float

Using JavaBean classes as above is one way to map a schema to Java classes. However multiple Java classes might have the same schema, in which case the different Java types can often be used interchangeably. Beam will add implicit conversions between types that have matching schemas. For example, the above Transaction class has the same schema as the following class:

@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojo {
  public String bank;
  public double purchaseAmount;
}

So if we had two PCollections as follows

PCollection<Transaction> transactionBeans = readTransactionsAsJavaBean();
PCollection<TransactionPojos> transactionPojos = readTransactionsAsPojo();

Then these two PCollections would have the same schema, even though their Java types would be different. This means for example the following two code snippets are valid:

transactionBeans.apply(ParDo.of(new DoFn<...>() {
   @ProcessElement public void process(@Element TransactionPojo pojo) {
      ...
   }
}));

and

transactionPojos.apply(ParDo.of(new DoFn<...>() {
   @ProcessElement public void process(@Element Transaction row) {
    }
}));

Even though the in both cases the @Element parameter differs from the the PCollection's Java type, since the schemas are the same Beam will automatically make the conversion. The built-in Convert transform can also be used to translate between Java types of equivalent schemas, as detailed below.

6.3. Schema definition

The schema for a PCollection defines elements of that PCollection as an ordered list of named fields. Each field has a name, a type, and possibly a set of user options. The type of a field can be primitive or composite. The following are the primitive types currently supported by Beam:

TypeDescription
BYTEAn 8-bit signed value
INT16A 16-bit signed value
INT32A 32-bit signed value
INT64A 64-bit signed value
DECIMALAn arbitrary-precision decimal type
FLOATA 32-bit IEEE 754 floating point number
DOUBLEA 64-bit IEEE 754 floating point number
STRINGA string
DATETIMEA timestamp represented as milliseconds since the epoch
BOOLEANA boolean value
BYTESA raw byte array

A field can also reference a nested schema. In this case, the field will have type ROW, and the nested schema will be an attribute of this field type.

Three collection types are supported as field types: ARRAY, ITERABLE and MAP:

6.4. Logical types

Users can extend the schema type system to add custom logical types that can be used as a field. A logical type is identified by a unique identifier and an argument. A logical type also specifies an underlying schema type to be used for storage, along with conversions to and from that type. As an example, a logical union can always be represented as a row with nullable fields, where the user ensures that only one of those fields is ever set at a time. However this can be tedious and complex to manage. The OneOf logical type provides a value class that makes it easier to manage the type as a union, while still using a row with nullable fields as its underlying storage. Each logical type also has a unique identifier, so they can be interpreted by other languages as well. More examples of logical types are listed below.

6.4.1. Defining a logical type

To define a logical type you must specify a Schema type to be used to represent the underlying type as well as a unique identifier for that type. A logical type imposes additional semantics on top a schema type. For example, a logical type to represent nanosecond timestamps is represented as a schema containing an INT64 and an INT32 field. This schema alone does not say anything about how to interpret this type, however the logical type tells you that this represents a nanosecond timestamp, with the INT64 field representing seconds and the INT32 field representing nanoseconds.

Logical types are also specified by an argument, which allows creating a class of related types. For example, a limited-precision decimal type would have an integer argument indicating how many digits of precision are represented. The argument is represented by a schema type, so can itself be a complex type.

In Java, a logical type is specified as a subclass of the LogicalType class. A custom Java class can be specified to represent the logical type and conversion functions must be supplied to convert back and forth between this Java class and the underlying Schema type representation. For example, the logical type representing nanosecond timestamp might be implemented as follows

// A Logical type using java.time.Instant to represent the logical type.
public class TimestampNanos implements LogicalType<Instant, Row> {
  // The underlying schema used to represent rows.
  private final Schema SCHEMA = Schema.builder().addInt64Field("seconds").addInt32Field("nanos").build();
  @Override public String getIdentifier() { return "timestampNanos"; }
  @Override public FieldType getBaseType() { return schema; }

  // Convert the representation type to the underlying Row type. Called by Beam when necessary.
  @Override public Row toBaseType(Instant instant) {
    return Row.withSchema(schema).addValues(instant.getEpochSecond(), instant.getNano()).build();
  }

  // Convert the underlying Row type to an Instant. Called by Beam when necessary.
  @Override public Instant toInputType(Row base) {
    return Instant.of(row.getInt64("seconds"), row.getInt32("nanos"));
  }

     ...
}

6.4.2. Useful logical types

EnumerationType

This logical type allows creating an enumeration type consisting of a set of named constants.

Schema schema = Schema.builder()
               
     .addLogicalTypeField("color", EnumerationType.create("RED", "GREEN", "BLUE"))
     .build();

The value of this field is stored in the row as an INT32 type, however the logical type defines a value type that lets you access the enumeration either as a string or a value. For example:

EnumerationType.Value enumValue = enumType.valueOf("RED");
enumValue.getValue();  // Returns 0, the integer value of the constant.
enumValue.toString();  // Returns "RED", the string value of the constant

Given a row object with an enumeration field, you can also extract the field as the enumeration value.

EnumerationType.Value enumValue = row.getLogicalTypeValue("color", EnumerationType.Value.class);

Automatic schema inference from Java POJOs and JavaBeans automatically converts Java enums to EnumerationType logical types.

OneOfType

OneOfType allows creating a disjoint union type over a set of schema fields. For example:

Schema schema = Schema.builder()
               
     .addLogicalTypeField("oneOfField",
        OneOfType.create(Field.of("intField", FieldType.INT32),
                         Field.of("stringField", FieldType.STRING),
                         Field.of("bytesField", FieldType.BYTES)))
      .build();

The value of this field is stored in the row as another Row type, where all the fields are marked as nullable. The logical type however defines a Value object that contains an enumeration value indicating which field was set and allows getting just that field:

// Returns an enumeration indicating all possible case values for the enum.
// For the above example, this will be
// EnumerationType.create("intField", "stringField", "bytesField");
EnumerationType oneOfEnum = onOfType.getCaseEnumType();

// Creates an instance of the union with the string field set.
OneOfType.Value oneOfValue = oneOfType.createValue("stringField", "foobar");

// Handle the oneof
switch (oneOfValue.getCaseEnumType().toString()) {
  case "intField":
    return processInt(oneOfValue.getValue(Integer.class));
  case "stringField":
    return processString(oneOfValue.getValue(String.class));
  case "bytesField":
    return processBytes(oneOfValue.getValue(bytes[].class));
}

In the above example we used the field names in the switch statement for clarity, however the enum integer values could also be used.

6.5. Creating Schemas

In order to take advantage of schemas, your PCollections must have a schema attached to it. Often, the source itself will attach a schema to the PCollection. For example, when using AvroIO to read Avro files, the source can automatically infer a Beam schema from the Avro schema and attach that to the Beam PCollection. However not all sources produce schemas. In addition, often Beam pipelines have intermediate stages and types, and those also can benefit from the expressiveness of schemas.

6.5.1. Inferring schemas

Beam is able to infer schemas from a variety of common Java types. The @DefaultSchema annotation can be used to tell Beam to infer schemas from a specific type. The annotation takes a SchemaProvider as an argument, and SchemaProvider classes are already built in for common Java types. The SchemaRegistry can also be invoked programmatically for cases where it is not practical to annotate the Java type itself.

Java POJOs

A POJO (Plain Old Java Object) is a Java object that is not bound by any restriction other than the Java Language Specification. A POJO can contain member variables that are primitives, that are other POJOs, or are collections maps or arrays thereof. POJOs do not have to extend prespecified classes or extend any specific interfaces.

If a POJO class is annotated with @DefaultSchema(JavaFieldSchema.class), Beam will automatically infer a schema for this class. Nested classes are supported as are classes with List, array, and Map fields.

For example, annotating the following class tells Beam to infer a schema from this POJO class and apply it to any PCollection<TransactionPojo>.

@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojo {
  public final String bank;
  public final double purchaseAmount;
  @SchemaCreate
  public TransactionPojo(String bank, double purchaseAmount) {
    this.bank = bank.
    this.purchaseAmount = purchaseAmount;
  }
}
// Beam will automatically infer the correct schema for this PCollection. No coder is needed as a result.
PCollection<TransactionPojo> pojos = readPojos();

The @SchemaCreate annotation tells Beam that this constructor can be used to create instances of TransactionPojo, assuming that constructor parameters have the same names as the field names. @SchemaCreate can also be used to annotate static factory methods on the class, allowing the constructor to remain private. If there is no @SchemaCreate annotation then all the fields must be non-final and the class must have a zero-argument constructor.

There are a couple of other useful annotations that affect how Beam infers schemas. By default the schema field names inferred will match that of the class field names. However @SchemaFieldName can be used to specify a different name to be used for the schema field. @SchemaIgnore can be used to mark specific class fields as excluded from the inferred schema. For example, it’s common to have ephemeral fields in a class that should not be included in a schema (e.g. caching the hash value to prevent expensive recomputation of the hash), and @SchemaIgnore can be used to exclude these fields. Note that ignored fields will not be included in the encoding of these records.

In some cases it is not convenient to annotate the POJO class, for example if the POJO is in a different package that is not owned by the Beam pipeline author. In these cases the schema inference can be triggered programmatically in pipeline’s main function as follows:

 pipeline.getSchemaRegistry().registerPOJO(TransactionPOJO.class);

Java Beans

Java Beans are a de-facto standard for creating reusable property classes in Java. While the full standard has many characteristics, the key ones are that all properties are accessed via getter and setter classes, and the name format for these getters and setters is standardized. A Java Bean class can be annotated with @DefaultSchema(JavaBeanSchema.class) and Beam will automatically infer a schema for this class. For example:

@DefaultSchema(JavaBeanSchema.class)
public class TransactionBean {
  public TransactionBean() {  }
  public String getBank() {  }
  public void setBank(String bank) {  }
  public double getPurchaseAmount() {  }
  public void setPurchaseAmount(double purchaseAmount) {  }
}
// Beam will automatically infer the correct schema for this PCollection. No coder is needed as a result.
PCollection<TransactionBean> beans = readBeans();

The @SchemaCreate annotation can be used to specify a constructor or a static factory method, in which case the setters and zero-argument constructor can be omitted.

@DefaultSchema(JavaBeanSchema.class)
public class TransactionBean {
  @SchemaCreate
  Public TransactionBean(String bank, double purchaseAmount) {  }
  public String getBank() {  }
  public double getPurchaseAmount() {  }
}

@SchemaFieldName and @SchemaIgnore can be used to alter the schema inferred, just like with POJO classes.

AutoValue

Java value classes are notoriously difficult to generate correctly. There is a lot of boilerplate you must create in order to properly implement a value class. AutoValue is a popular library for easily generating such classes by implementing a simple abstract base class.

Beam can infer a schema from an AutoValue class. For example:

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class TransactionValue {
  public abstract String getBank();
  public abstract double getPurchaseAmount();
}

This is all that’s needed to generate a simple AutoValue class, and the above @DefaultSchema annotation tells Beam to infer a schema from it. This also allows AutoValue elements to be used inside of PCollections.

@SchemaFieldName and @SchemaIgnore can be used to alter the schema inferred.

Beam has a few different mechanisms for inferring schemas from Python code.

NamedTuple classes

A NamedTuple class is a Python class that wraps a tuple, assigning a name to each element and restricting it to a particular type. Beam will automatically infer the schema for PCollections with NamedTuple output types. For example:

class Transaction(typing.NamedTuple):
  bank: str
  purchase_amount: float

pc = input | beam.Map(lambda ...).with_output_types(Transaction)

beam.Row

It’s also possible to create ad-hoc schema declarations with a simple lambda that returns instances of beam.Row:

input_pc = ... # {"bank": ..., "purchase_amount": ...}
output_pc = input_pc | beam.Map(lambda item: beam.Row(bank=item["bank"],
                                                      purchase_amount=item["purchase_amount"])

Note that this declaration doesn’t include any specific information about the types of the bank and purchase_amount fields. Beam will attempt to infer type information, if it’s unable to it will fall back to the generic type Any. Sometimes this is not ideal, you can use casts to make sure Beam correctly infers types with beam.Row:

input_pc = ... # {"bank": ..., "purchase_amount": ...}
output_pc = input_pc | beam.Map(lambda item: beam.Row(bank=str(item["bank"]),
                                                      purchase_amount=float(item["purchase_amount"]))

6.6. Using Schema Transforms

A schema on a PCollection enables a rich variety of relational transforms. The fact that each record is composed of named fields allows for simple and readable aggregations that reference fields by name, similar to the aggregations in a SQL expression.

6.6.1. Field selection syntax

The advantage of schemas is that they allow referencing of element fields by name. Beam provides a selection syntax for referencing fields, including nested and repeated fields. This syntax is used by all of the schema transforms when referencing the fields they operate on. The syntax can also be used inside of a DoFn to specify which schema fields to process.

Addressing fields by name still retains type safety as Beam will check that schemas match at the time the pipeline graph is constructed. If a field is specified that does not exist in the schema, the pipeline will fail to launch. In addition, if a field is specified with a type that does not match the type of that field in the schema, the pipeline will fail to launch.

The following characters are not allowed in field names: . * [ ] { }

Top-level fields

In order to select a field at the top level of a schema, the name of the field is specified. For example, to select just the user ids from a PCollection of purchases one would write (using the Select transform)

purchases.apply(Select.fieldNames("userId"));
Nested fields

Individual nested fields can be specified using the dot operator. For example, to select just the postal code from the shipping address one would write

purchases.apply(Select.fieldNames("shippingAddress.postCode"));
Wildcards

The * operator can be specified at any nesting level to represent all fields at that level. For example, to select all shipping-address fields one would write

purchases.apply(Select.fieldNames("shippingAddress.*"));
Arrays

An array field, where the array element type is a row, can also have subfields of the element type addressed. When selected, the result is an array of the selected subfield type. For example

purchases.apply(Select.fieldNames("transactions[].bank"));

Will result in a row containing an array field with element-type string, containing the list of banks for each transaction.

While the use of [] brackets in the selector is recommended, to make it clear that array elements are being selected, they can be omitted for brevity. In the future, array slicing will be supported, allowing selection of portions of the array.

Maps

A map field, where the value type is a row, can also have subfields of the value type addressed. When selected, the result is a map where the keys are the same as in the original map but the value is the specified type. Similar to arrays, the use of {} curly brackets in the selector is recommended, to make it clear that map value elements are being selected, they can be omitted for brevity. In the future, map key selectors will be supported, allowing selection of specific keys from the map. For example, given the following schema:

PurchasesByType

Field NameField Type
purchasesMAP{STRING, ROW{PURCHASE}

The following

purchasesByType.apply(Select.fieldNames("purchases{}.userId"));

Will result in a row containing a map field with key-type string and value-type string. The selected map will contain all of the keys from the original map, and the values will be the userId contained in the purchase record.

While the use of {} brackets in the selector is recommended, to make it clear that map value elements are being selected, they can be omitted for brevity. In the future, map slicing will be supported, allowing selection of specific keys from the map.

6.6.2. Schema transforms

Beam provides a collection of transforms that operate natively on schemas. These transforms are very expressive, allowing selections and aggregations in terms of named schema fields. Following are some examples of useful schema transforms.

Selecting input

Often a computation is only interested in a subset of the fields in an input PCollection. The Select transform allows one to easily project out only the fields of interest. The resulting PCollection has a schema containing each selected field as a top-level field. Both top-level and nested fields can be selected. For example, in the Purchase schema, one could select only the userId and streetAddress fields as follows

purchases.apply(Select.fieldNames("userId", "shippingAddress.streetAddress"));

The resulting PCollection will have the following schema

Field NameField Type
userIdSTRING
streetAddressSTRING

The same is true for wildcard selections. The following

purchases.apply(Select.fieldNames("userId", "shippingAddress.*"));

Will result in the following schema

Field NameField Type
userIdSTRING
streetAddressSTRING
citySTRING
statenullable STRING
countrySTRING
postCodeSTRING

When selecting fields nested inside of an array, the same rule applies that each selected field appears separately as a top-level field in the resulting row. This means that if multiple fields are selected from the same nested row, each selected field will appear as its own array field. For example

purchases.apply(Select.fieldNames( "transactions.bank", "transactions.purchaseAmount"));

Will result in the following schema

Field NameField Type
bankARRAY[STRING]
purchaseAmountARRAY[DOUBLE]

Wildcard selections are equivalent to separately selecting each field.

Selecting fields nested inside of maps have the same semantics as arrays. If you select multiple fields from a map , then each selected field will be expanded to its own map at the top level. This means that the set of map keys will be copied, once for each selected field.

Sometimes different nested rows will have fields with the same name. Selecting multiple of these fields would result in a name conflict, as all selected fields are put in the same row schema. When this situation arises, the Select.withFieldNameAs builder method can be used to provide an alternate name for the selected field.

Another use of the Select transform is to flatten a nested schema into a single flat schema. For example

purchases.apply(Select.flattenedSchema());

Will result in the following schema

Field NameField Type
userIdSTRING
itemIdSTRING
shippingAddress_streetAddressSTRING
shippingAddress_citynullable STRING
shippingAddress_stateSTRING
shippingAddress_countrySTRING
shippingAddress_postCodeSTRING
costCentsINT64
transactions_bankARRAY[STRING]
transactions_purchaseAmountARRAY[DOUBLE]

Grouping aggregations

The Group transform allows simply grouping data by any number of fields in the input schema, applying aggregations to those groupings, and storing the result of those aggregations in a new schema field. The output of the Group transform has a schema with one field corresponding to each aggregation performed.

The simplest usage of Group specifies no aggregations, in which case all inputs matching the provided set of fields are grouped together into an ITERABLE field. For example

purchases.apply(Group.byFieldNames("userId", "shippingAddress.streetAddress"));

The output schema of this is:

Field NameField Type
keyROW{userId:STRING, streetAddress:STRING}
valuesITERABLE[ROW[Purchase]]

The key field contains the grouping key and the values field contains a list of all the values that matched that key.

The names of the key and values fields in the output schema can be controlled using this withKeyField and withValueField builders, as follows:

purchases.apply(Group.byFieldNames("userId", "shippingAddress.streetAddress")
    .withKeyField("userAndStreet")
    .withValueField("matchingPurchases"));

It is quite common to apply one or more aggregations to the grouped result. Each aggregation can specify one or more fields to aggregate, an aggregation function, and the name of the resulting field in the output schema. For example, the following application computes three aggregations grouped by userId, with all aggregations represented in a single output schema:

purchases.apply(Group.byFieldNames("userId")
    .aggregateField("itemId", Count.combineFn(), "numPurchases")
    .aggregateField("costCents", Sum.ofLongs(), "totalSpendCents")
    .aggregateField("costCents", Top.<Long>largestLongsFn(10), "topPurchases"));

The result of this aggregation will have the following schema:

Field NameField Type
keyROW{userId:STRING}
valueROW{numPurchases: INT64, totalSpendCents: INT64, topPurchases: ARRAY[INT64]}

Often Selected.flattenedSchema will be use to flatten the result into a non-nested, flat schema.

Joins

Beam supports equijoins on schema PCollections - namely joins where the join condition depends on the equality of a subset of fields. For example, the following examples uses the Purchases schema to join transactions with the reviews that are likely associated with that transaction (both the user and product match that in the transaction). This is a “natural join” - one in which the same field names are used on both the left-hand and right-hand sides of the join - and is specified with the using keyword:

PCollection<Transaction> transactions = readTransactions();
PCollection<Review> reviews = readReviews();
PCollection<Row> joined = transactions.apply(
    Join.innerJoin(reviews).using("userId", "productId"));

The resulting schema is the following:

Field NameField Type
lhsROW{Transaction}
rhsROW{Review}

Each resulting row contains one Review and one Review that matched the join condition.

If the fields to match in the two schemas have different names, then the on function can be used. For example, if the Review schema named those fields differently than the Transaction schema, then we could write the following:

PCollection<Row> joined = transactions.apply(
    Join.innerJoin(reviews).on(
      FieldsEqual
         .left("userId", "productId")
         .right("reviewUserId", "reviewProductId")));

In addition to inner joins, the Join transform supports full outer joins, left outer joins, and right outer joins.

Complex joins

While most joins tend to be binary joins - joining two inputs together - sometimes you have more than two input streams that all need to be joined on a common key. The CoGroup transform allows joining multiple PCollections together based on equality of schema fields. Each PCollection can be marked as required or optional in the final join record, providing a generalization of outer joins to joins with greater than two input PCollections. The output can optionally be expanded - providing individual joined records, as in the Join transform. The output can also be processed in unexpanded format - providing the join key along with Iterables of all records from each input that matched that key.

Filtering events

The Filter transform can be configured with a set of predicates, each one based one specified fields. Only records for which all predicates return true will pass the filter. For example the following

purchases.apply(Filter
    .whereFieldName("costCents", c -> c > 100 * 20)
    .whereFieldName("shippingAddress.country", c -> c.equals("de"));

Will produce all purchases made from Germany with a purchase price of greater than twenty cents.

Adding fields to a schema

The AddFields transform can be used to extend a schema with new fields. Input rows will be extended to the new schema by inserting null values for the new fields, though alternate default values can be specified; if the default null value is used then the new field type will be marked as nullable. Nested subfields can be added using the field selection syntax, including nested fields inside arrays or map values.

For example, the following application

purchases.apply(AddFields.<PurchasePojo>create()
    .field("timeOfDaySeconds", FieldType.INT32)
    .field("shippingAddress.deliveryNotes", FieldType.STRING)
    .field("transactions.isFlagged", FieldType.BOOLEAN, false));

Results in a PCollection with an expanded schema. All of the rows and fields of the input, but also with the specified fields added to the schema. All resulting rows will have null values filled in for the timeOfDaySeconds and the shippingAddress.deliveryNotes fields, and a false value filled in for the transactions.isFlagged field.

Removing fields from a schema

DropFields allows specific fields to be dropped from a schema. Input rows will have their schemas truncated, and any values for dropped fields will be removed from the output. Nested fields can also be dropped using the field selection syntax.

For example, the following snippet

purchases.apply(DropFields.fields("userId", "shippingAddress.streetAddress"));

Results in a copy of the input with those two fields and their corresponding values removed.

Renaming schema fields

RenameFields allows specific fields in a schema to be renamed. The field values in input rows are left unchanged, only the schema is modified. This transform is often used to prepare records for output to a schema-aware sink, such as an RDBMS, to make sure that the PCollection schema field names match that of the output. It can also be used to rename fields generated by other transforms to make them more usable (similar to SELECT AS in SQL). Nested fields can also be renamed using the field-selection syntax.

For example, the following snippet

purchases.apply(RenameFields.<PurchasePojo>create()
  .rename("userId", "userIdentifier")
  .rename("shippingAddress.streetAddress", "shippingAddress.street"));

Results in the same set of unmodified input elements, however the schema on the PCollection has been changed to rename userId to userIdentifier and shippingAddress.streetAddress to shippingAddress.street.

Converting between types

As mentioned, Beam can automatically convert between different Java types, as long as those types have equivalent schemas. One way to do this is by using the Convert transform, as follows.

PCollection<PurchaseBean> purchaseBeans = readPurchasesAsBeans();
PCollection<PurchasePojo> pojoPurchases =
    purchaseBeans.apply(Convert.to(PurchasePojo.class));

Beam will validate that the inferred schema for PurchasePojo matches that of the input PCollection, and will then cast to a PCollection<PurchasePojo>.

Since the Row class can support any schema, any PCollection with schema can be cast to a PCollection of rows, as follows.

PCollection<Row> purchaseRows = purchaseBeans.apply(Convert.toRows());

If the source type is a single-field schema, Convert will also convert to the type of the field if asked, effectively unboxing the row. For example, give a schema with a single INT64 field, the following will convert it to a PCollection<Long>

PCollection<Long> longs = rows.apply(Convert.to(TypeDescriptors.longs()));

In all cases, type checking is done at pipeline graph construction, and if the types do not match the schema then the pipeline will fail to launch.

6.6.3. Schemas in ParDo

A PCollection with a schema can apply a ParDo, just like any other PCollection. However the Beam runner is aware of schemas when applying a ParDo, which enables additional functionality.

Input conversion

Since Beam knows the schema of the source PCollection, it can automatically convert the elements to any Java type for which a matching schema is known. For example, using the above-mentioned Transaction schema, say we have the following PCollection:

PCollection<PurchasePojo> purchases = readPurchases();

If there were no schema, then the applied DoFn would have to accept an element of type TransactionPojo. However since there is a schema, you could apply the following DoFn:

purchases.appy(ParDo.of(new DoFn<PurchasePojo, PurchasePojo>() {
  @ProcessElement public void process(@Element PurchaseBean purchase) {
      ...
  }
}));

Even though the @Element parameter does not match the Java type of the PCollection, since it has a matching schema Beam will automatically convert elements. If the schema does not match, Beam will detect this at graph-construction time and will fail the job with a type error.

Since every schema can be represented by a Row type, Row can also be used here:

purchases.appy(ParDo.of(new DoFn<PurchasePojo, PurchasePojo>() {
  @ProcessElement public void process(@Element Row purchase) {
      ...
  }
}));
Input selection

Since the input has a schema, you can also automatically select specific fields to process in the DoFn.

Given the above purchases PCollection, say you want to process just the userId and the itemId fields. You can do these using the above-described selection expressions, as follows:

purchases.appy(ParDo.of(new DoFn<PurchasePojo, PurchasePojo>() {
  @ProcessElement public void process(
     @FieldAccess("userId") String userId, @FieldAccess("itemId") long itemId) {
      ...
  }
}));

You can also select nested fields, as follows.

purchases.appy(ParDo.of(new DoFn<PurchasePojo, PurchasePojo>() {
  @ProcessElement public void process(
    @FieldAccess("shippingAddress.street") String street) {
      ...
  }
}));

For more information, see the section on field-selection expressions. When selecting subschemas, Beam will automatically convert to any matching schema type, just like when reading the entire row.

7. Data encoding and type safety

When Beam runners execute your pipeline, they often need to materialize the intermediate data in your PCollections, which requires converting elements to and from byte strings. The Beam SDKs use objects called Coders to describe how the elements of a given PCollection may be encoded and decoded.

Note that coders are unrelated to parsing or formatting data when interacting with external data sources or sinks. Such parsing or formatting should typically be done explicitly, using transforms such as ParDo or MapElements.

In the Beam SDK for Java, the type Coder provides the methods required for encoding and decoding data. The SDK for Java provides a number of Coder subclasses that work with a variety of standard Java types, such as Integer, Long, Double, StringUtf8 and more. You can find all of the available Coder subclasses in the Coder package.

In the Beam SDK for Python, the type Coder provides the methods required for encoding and decoding data. The SDK for Python provides a number of Coder subclasses that work with a variety of standard Python types, such as primitive types, Tuple, Iterable, StringUtf8 and more. You can find all of the available Coder subclasses in the apache_beam.coders package.

Note that coders do not necessarily have a 1:1 relationship with types. For example, the Integer type can have multiple valid coders, and input and output data can use different Integer coders. A transform might have Integer-typed input data that uses BigEndianIntegerCoder, and Integer-typed output data that uses VarIntCoder.

7.1. Specifying coders

The Beam SDKs require a coder for every PCollection in your pipeline. In most cases, the Beam SDK is able to automatically infer a Coder for a PCollection based on its element type or the transform that produces it, however, in some cases the pipeline author will need to specify a Coder explicitly, or develop a Coder for their custom type.

You can explicitly set the coder for an existing PCollection by using the method PCollection.setCoder. Note that you cannot call setCoder on a PCollection that has been finalized (e.g. by calling .apply on it).

You can get the coder for an existing PCollection by using the method getCoder. This method will fail with an IllegalStateException if a coder has not been set and cannot be inferred for the given PCollection.

Beam SDKs use a variety of mechanisms when attempting to automatically infer the Coder for a PCollection.

Each pipeline object has a CoderRegistry. The CoderRegistry represents a mapping of Java types to the default coders that the pipeline should use for PCollections of each type.

The Beam SDK for Python has a CoderRegistry that represents a mapping of Python types to the default coder that should be used for PCollections of each type.

By default, the Beam SDK for Java automatically infers the Coder for the elements of a PCollection produced by a PTransform using the type parameter from the transform’s function object, such as DoFn. In the case of ParDo, for example, a DoFn<Integer, String> function object accepts an input element of type Integer and produces an output element of type String. In such a case, the SDK for Java will automatically infer the default Coder for the output PCollection<String> (in the default pipeline CoderRegistry, this is StringUtf8Coder).

By default, the Beam SDK for Python automatically infers the Coder for the elements of an output PCollection using the typehints from the transform’s function object, such as DoFn. In the case of ParDo, for example a DoFn with the typehints @beam.typehints.with_input_types(int) and @beam.typehints.with_output_types(str) accepts an input element of type int and produces an output element of type str. In such a case, the Beam SDK for Python will automatically infer the default Coder for the output PCollection (in the default pipeline CoderRegistry, this is BytesCoder).

NOTE: If you create your PCollection from in-memory data by using the Create transform, you cannot rely on coder inference and default coders. Create does not have access to any typing information for its arguments, and may not be able to infer a coder if the argument list contains a value whose exact run-time class doesn’t have a default coder registered.

When using Create, the simplest way to ensure that you have the correct coder is by invoking withCoder when you apply the Create transform.

7.2. Default coders and the CoderRegistry

Each Pipeline object has a CoderRegistry object, which maps language types to the default coder the pipeline should use for those types. You can use the CoderRegistry yourself to look up the default coder for a given type, or to register a new default coder for a given type.

CoderRegistry contains a default mapping of coders to standard JavaPython types for any pipeline you create using the Beam SDK for JavaPython. The following table shows the standard mapping:

Java TypeDefault Coder
DoubleDoubleCoder
InstantInstantCoder
IntegerVarIntCoder
IterableIterableCoder
KVKvCoder
ListListCoder
MapMapCoder
LongVarLongCoder
StringStringUtf8Coder
TableRowTableRowJsonCoder
VoidVoidCoder
byte[ ]ByteArrayCoder
TimestampedValueTimestampedValueCoder

Python TypeDefault Coder
intVarIntCoder
floatFloatCoder
strBytesCoder
bytesStrUtf8Coder
TupleTupleCoder

7.2.1. Looking up a default coder

You can use the method CoderRegistry.getCoder to determine the default Coder for a Java type. You can access the CoderRegistry for a given pipeline by using the method Pipeline.getCoderRegistry. This allows you to determine (or set) the default Coder for a Java type on a per-pipeline basis: i.e. “for this pipeline, verify that Integer values are encoded using BigEndianIntegerCoder.”

You can use the method CoderRegistry.get_coder to determine the default Coder for a Python type. You can use coders.registry to access the CoderRegistry. This allows you to determine (or set) the default Coder for a Python type.

7.2.2. Setting the default coder for a type

To set the default Coder for a JavaPython type for a particular pipeline, you obtain and modify the pipeline’s CoderRegistry. You use the method Pipeline.getCoderRegistry coders.registry to get the CoderRegistry object, and then use the method CoderRegistry.registerCoder CoderRegistry.register_coder to register a new Coder for the target type.

The following example code demonstrates how to set a default Coder, in this case BigEndianIntegerCoder, for Integerint values for a pipeline.

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

CoderRegistry cr = p.getCoderRegistry();
cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);
apache_beam.coders.registry.register_coder(int, BigEndianIntegerCoder)

7.2.3. Annotating a custom data type with a default coder

If your pipeline program defines a custom data type, you can use the @DefaultCoder annotation to specify the coder to use with that type. For example, let’s say you have a custom data type for which you want to use SerializableCoder. You can use the @DefaultCoder annotation as follows:

@DefaultCoder(AvroCoder.class)
public class MyCustomDataType {
  ...
}

If you’ve created a custom coder to match your data type, and you want to use the @DefaultCoder annotation, your coder class must implement a static Coder.of(Class<T>) factory method.

public class MyCustomCoder implements Coder {
  public static Coder<T> of(Class<T> clazz) {...}
  ...
}

@DefaultCoder(MyCustomCoder.class)
public class MyCustomDataType {
  ...
}

The Beam SDK for Python does not support annotating data types with a default coder. If you would like to set a default coder, use the method described in the previous section, Setting the default coder for a type.

8. Windowing

Windowing subdivides a PCollection according to the timestamps of its individual elements. Transforms that aggregate multiple elements, such as GroupByKey and Combine, work implicitly on a per-window basis — they process each PCollection as a succession of multiple, finite windows, though the entire collection itself may be of unbounded size.

A related concept, called triggers, determines when to emit the results of aggregation as unbounded data arrives. You can use triggers to refine the windowing strategy for your PCollection. Triggers allow you to deal with late-arriving data or to provide early results. See the triggers section for more information.

8.1. Windowing basics

Some Beam transforms, such as GroupByKey and Combine, group multiple elements by a common key. Ordinarily, that grouping operation groups all of the elements that have the same key within the entire data set. With an unbounded data set, it is impossible to collect all of the elements, since new elements are constantly being added and may be infinitely many (e.g. streaming data). If you are working with unbounded PCollections, windowing is especially useful.

In the Beam model, any PCollection (including unbounded PCollections) can be subdivided into logical windows. Each element in a PCollection is assigned to one or more windows according to the PCollection's windowing function, and each individual window contains a finite number of elements. Grouping transforms then consider each PCollection's elements on a per-window basis. GroupByKey, for example, implicitly groups the elements of a PCollection by key and window.

Caution: Beam’s default windowing behavior is to assign all elements of a PCollection to a single, global window and discard late data, even for unbounded PCollections. Before you use a grouping transform such as GroupByKey on an unbounded PCollection, you must do at least one of the following:

If you don’t set a non-global windowing function or a non-default trigger for your unbounded PCollection and subsequently use a grouping transform such as GroupByKey or Combine, your pipeline will generate an error upon construction and your job will fail.

8.1.1. Windowing constraints

After you set the windowing function for a PCollection, the elements’ windows are used the next time you apply a grouping transform to that PCollection. Window grouping occurs on an as-needed basis. If you set a windowing function using the Window transform, each element is assigned to a window, but the windows are not considered until GroupByKey or Combine aggregates across a window and key. This can have different effects on your pipeline. Consider the example pipeline in the figure below:

Diagram of pipeline applying windowing

Figure 3: Pipeline applying windowing

In the above pipeline, we create an unbounded PCollection by reading a set of key/value pairs using KafkaIO, and then apply a windowing function to that collection using the Window transform. We then apply a ParDo to the the collection, and then later group the result of that ParDo using GroupByKey. The windowing function has no effect on the ParDo transform, because the windows are not actually used until they’re needed for the GroupByKey. Subsequent transforms, however, are applied to the result of the GroupByKey – data is grouped by both key and window.

8.1.2. Windowing with bounded PCollections

You can use windowing with fixed-size data sets in bounded PCollections. However, note that windowing considers only the implicit timestamps attached to each element of a PCollection, and data sources that create fixed data sets (such as TextIO) assign the same timestamp to every element. This means that all the elements are by default part of a single, global window.

To use windowing with fixed data sets, you can assign your own timestamps to each element. To assign timestamps to elements, use a ParDo transform with a DoFn that outputs each element with a new timestamp (for example, the WithTimestamps transform in the Beam SDK for Java).

To illustrate how windowing with a bounded PCollection can affect how your pipeline processes data, consider the following pipeline:

Diagram of GroupByKey and ParDo without windowing, on a bounded collection

Figure 4: GroupByKey and ParDo without windowing, on a bounded collection.

In the above pipeline, we create a bounded PCollection by reading lines from a file using TextIO. We then group the collection using GroupByKey, and apply a ParDo transform to the grouped PCollection. In this example, the GroupByKey creates a collection of unique keys, and then ParDo gets applied exactly once per key.

Note that even if you don’t set a windowing function, there is still a window – all elements in your PCollection are assigned to a single global window.

Now, consider the same pipeline, but using a windowing function:

Diagram of GroupByKey and ParDo with windowing, on a bounded collection

Figure 5: GroupByKey and ParDo with windowing, on a bounded collection.

As before, the pipeline creates a bounded PCollection by reading lines from a file. We then set a windowing function for that PCollection. The GroupByKey transform groups the elements of the PCollection by both key and window, based on the windowing function. The subsequent ParDo transform gets applied multiple times per key, once for each window.

8.2. Provided windowing functions

You can define different kinds of windows to divide the elements of your PCollection. Beam provides several windowing functions, including:

You can also define your own WindowFn if you have a more complex need.

Note that each element can logically belong to more than one window, depending on the windowing function you use. Sliding time windowing, for example, creates overlapping windows wherein a single element can be assigned to multiple windows.

8.2.1. Fixed time windows

The simplest form of windowing is using fixed time windows: given a timestamped PCollection which might be continuously updating, each window might capture (for example) all elements with timestamps that fall into a 30 second interval.

A fixed time window represents a consistent duration, non overlapping time interval in the data stream. Consider windows with a 30 second duration: all of the elements in your unbounded PCollection with timestamp values from 0:00:00 up to (but not including) 0:00:30 belong to the first window, elements with timestamp values from 0:00:30 up to (but not including) 0:01:00 belong to the second window, and so on.

Diagram of fixed time windows, 30s in duration

Figure 6: Fixed time windows, 30s in duration.

8.2.2. Sliding time windows

A sliding time window also represents time intervals in the data stream; however, sliding time windows can overlap. For example, each window might capture 60 seconds worth of data, but a new window starts every 30 seconds. The frequency with which sliding windows begin is called the period. Therefore, our example would have a window duration of 60 seconds and a period of 30 seconds.

Because multiple windows overlap, most elements in a data set will belong to more than one window. This kind of windowing is useful for taking running averages of data; using sliding time windows, you can compute a running average of the past 60 seconds’ worth of data, updated every 30 seconds, in our example.

Diagram of sliding time windows, with 1 minute window duration and 30s window period

Figure 7: Sliding time windows, with 1 minute window duration and 30s window period.

8.2.3. Session windows

A session window function defines windows that contain elements that are within a certain gap duration of another element. Session windowing applies on a per-key basis and is useful for data that is irregularly distributed with respect to time. For example, a data stream representing user mouse activity may have long periods of idle time interspersed with high concentrations of clicks. If data arrives after the minimum specified gap duration time, this initiates the start of a new window.

Diagram of session windows with a minimum gap duration

Figure 8: Session windows, with a minimum gap duration. Note how each data key has different windows, according to its data distribution.

8.2.4. The single global window

By default, all data in a PCollection is assigned to the single global window, and late data is discarded. If your data set is of a fixed size, you can use the global window default for your PCollection.

You can use the single global window if you are working with an unbounded data set (e.g. from a streaming data source) but use caution when applying aggregating transforms such as GroupByKey and Combine. The single global window with a default trigger generally requires the entire data set to be available before processing, which is not possible with continuously updating data. To perform aggregations on an unbounded PCollection that uses global windowing, you should specify a non-default trigger for that PCollection.

8.3. Setting your PCollection’s windowing function

You can set the windowing function for a PCollection by applying the Window transform. When you apply the Window transform, you must provide a WindowFn. The WindowFn determines the windowing function your PCollection will use for subsequent grouping transforms, such as a fixed or sliding time window.

When you set a windowing function, you may also want to set a trigger for your PCollection. The trigger determines when each individual window is aggregated and emitted, and helps refine how the windowing function performs with respect to late data and computing early results. See the triggers section for more information.

8.3.1. Fixed-time windows

The following example code shows how to apply Window to divide a PCollection into fixed windows, each 60 seconds in length:

    PCollection<String> items = ...;
    PCollection<String> fixedWindowedItems = items.apply(
        Window.<String>into(FixedWindows.of(Duration.standardSeconds(60))));
from apache_beam import window
fixed_windowed_items = (
    items | 'window' >> beam.WindowInto(window.FixedWindows(60)))

8.3.2. Sliding time windows

The following example code shows how to apply Window to divide a PCollection into sliding time windows. Each window is 30 seconds in length, and a new window begins every five seconds:

    PCollection<String> items = ...;
    PCollection<String> slidingWindowedItems = items.apply(
        Window.<String>into(SlidingWindows.of(Duration.standardSeconds(30)).every(Duration.standardSeconds(5))));
from apache_beam import window
sliding_windowed_items = (
    items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5)))

8.3.3. Session windows

The following example code shows how to apply Window to divide a PCollection into session windows, where each session must be separated by a time gap of at least 10 minutes (600 seconds):

    PCollection<String> items = ...;
    PCollection<String> sessionWindowedItems = items.apply(
        Window.<String>into(Sessions.withGapDuration(Duration.standardSeconds(600))));
from apache_beam import window
session_windowed_items = (
    items | 'window' >> beam.WindowInto(window.Sessions(10 * 60)))

Note that the sessions are per-key — each key in the collection will have its own session groupings depending on the data distribution.

8.3.4. Single global window

If your PCollection is bounded (the size is fixed), you can assign all the elements to a single global window. The following example code shows how to set a single global window for a PCollection:

    PCollection<String> items = ...;
    PCollection<String> batchItems = items.apply(
        Window.<String>into(new GlobalWindows()));
from apache_beam import window
session_windowed_items = (
    items | 'window' >> beam.WindowInto(window.GlobalWindows()))

8.4. Watermarks and late data

In any data processing system, there is a certain amount of lag between the time a data event occurs (the “event time”, determined by the timestamp on the data element itself) and the time the actual data element gets processed at any stage in your pipeline (the “processing time”, determined by the clock on the system processing the element). In addition, there are no guarantees that data events will appear in your pipeline in the same order that they were generated.

For example, let’s say we have a PCollection that’s using fixed-time windowing, with windows that are five minutes long. For each window, Beam must collect all the data with an event time timestamp in the given window range (between 0:00 and 4:59 in the first window, for instance). Data with timestamps outside that range (data from 5:00 or later) belong to a different window.

However, data isn’t always guaranteed to arrive in a pipeline in time order, or to always arrive at predictable intervals. Beam tracks a watermark, which is the system’s notion of when all data in a certain window can be expected to have arrived in the pipeline. Once the watermark progresses past the end of a window, any further element that arrives with a timestamp in that window is considered late data.

From our example, suppose we have a simple watermark that assumes approximately 30s of lag time between the data timestamps (the event time) and the time the data appears in the pipeline (the processing time), then Beam would close the first window at 5:30. If a data record arrives at 5:34, but with a timestamp that would put it in the 0:00-4:59 window (say, 3:38), then that record is late data.

Note: For simplicity, we’ve assumed that we’re using a very straightforward watermark that estimates the lag time. In practice, your PCollection's data source determines the watermark, and watermarks can be more precise or complex.

Beam’s default windowing configuration tries to determine when all data has arrived (based on the type of data source) and then advances the watermark past the end of the window. This default configuration does not allow late data. Triggers allow you to modify and refine the windowing strategy for a PCollection. You can use triggers to decide when each individual window aggregates and reports its results, including how the window emits late elements.

8.4.1. Managing late data

You can allow late data by invoking the .withAllowedLateness operation when you set your PCollection's windowing strategy. The following code example demonstrates a windowing strategy that will allow late data up to two days after the end of a window.

    PCollection<String> items = ...;
    PCollection<String> fixedWindowedItems = items.apply(
        Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
              .withAllowedLateness(Duration.standardDays(2)));
   pc = [Initial PCollection]
   pc | beam.WindowInto(
              FixedWindows(60),
              trigger=trigger_fn,
              accumulation_mode=accumulation_mode,
              timestamp_combiner=timestamp_combiner,
              allowed_lateness=Duration(seconds=2*24*60*60)) # 2 days

When you set .withAllowedLateness on a PCollection, that allowed lateness propagates forward to any subsequent PCollection derived from the first PCollection you applied allowed lateness to. If you want to change the allowed lateness later in your pipeline, you must do so explicitly by applying Window.configure().withAllowedLateness().

8.5. Adding timestamps to a PCollection’s elements

An unbounded source provides a timestamp for each element. Depending on your unbounded source, you may need to configure how the timestamp is extracted from the raw data stream.

However, bounded sources (such as a file from TextIO) do not provide timestamps. If you need timestamps, you must add them to your PCollection’s elements.

You can assign new timestamps to the elements of a PCollection by applying a ParDo transform that outputs new elements with timestamps that you set.

An example might be if your pipeline reads log records from an input file, and each log record includes a timestamp field; since your pipeline reads the records in from a file, the file source doesn’t assign timestamps automatically. You can parse the timestamp field from each record and use a ParDo transform with a DoFn to attach the timestamps to each element in your PCollection.

      PCollection<LogEntry> unstampedLogs = ...;
      PCollection<LogEntry> stampedLogs =
          unstampedLogs.apply(ParDo.of(new DoFn<LogEntry, LogEntry>() {
            public void processElement(@Element LogEntry element, OutputReceiver<LogEntry> out) {
              // Extract the timestamp from log entry we're currently processing.
              Instant logTimeStamp = extractTimeStampFromLogEntry(element);
              // Use OutputReceiver.outputWithTimestamp (rather than
              // OutputReceiver.output) to emit the entry with timestamp attached.
              out.outputWithTimestamp(element, logTimeStamp);
            }
          }));
class AddTimestampDoFn(beam.DoFn):
  def process(self, element):
    # Extract the numeric Unix seconds-since-epoch timestamp to be
    # associated with the current log entry.
    unix_timestamp = extract_timestamp_from_log_entry(element)
    # Wrap and emit the current entry and new timestamp in a
    # TimestampedValue.
    yield beam.window.TimestampedValue(element, unix_timestamp)

timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())

9. Triggers

When collecting and grouping data into windows, Beam uses triggers to determine when to emit the aggregated results of each window (referred to as a pane). If you use Beam’s default windowing configuration and default trigger, Beam outputs the aggregated result when it estimates all data has arrived, and discards all subsequent data for that window.

You can set triggers for your PCollections to change this default behavior. Beam provides a number of pre-built triggers that you can set:

At a high level, triggers provide two additional capabilities compared to simply outputting at the end of a window:

These capabilities allow you to control the flow of your data and balance between different factors depending on your use case:

For example, a system that requires time-sensitive updates might use a strict time-based trigger that emits a window every N seconds, valuing promptness over data completeness. A system that values data completeness more than the exact timing of results might choose to use Beam’s default trigger, which fires at the end of the window.

You can also set a trigger for an unbounded PCollection that uses a single global window for its windowing function. This can be useful when you want your pipeline to provide periodic updates on an unbounded data set — for example, a running average of all data provided to the present time, updated every N seconds or every N elements.

9.1. Event time triggers

The AfterWatermark trigger operates on event time. The AfterWatermark trigger emits the contents of a window after the watermark passes the end of the window, based on the timestamps attached to the data elements. The watermark is a global progress metric, and is Beam’s notion of input completeness within your pipeline at any given point. AfterWatermark.pastEndOfWindow() AfterWatermark only fires when the watermark passes the end of the window.

In addition, you can configure triggers that fire if your pipeline receives data before or after the end of the window.

The following example shows a billing scenario, and uses both early and late firings:

  // Create a bill at the end of the month.
  AfterWatermark.pastEndOfWindow()
      // During the month, get near real-time estimates.
      .withEarlyFirings(
          AfterProcessingTime
              .pastFirstElementInPane()
              .plusDuration(Duration.standardMinutes(1))
      // Fire on any late data so the bill can be corrected.
      .withLateFirings(AfterPane.elementCountAtLeast(1))
AfterWatermark(
    early=AfterProcessingTime(delay=1 * 60), late=AfterCount(1))

9.1.1. Default trigger

The default trigger for a PCollection is based on event time, and emits the results of the window when the Beam’s watermark passes the end of the window, and then fires each time late data arrives.

However, if you are using both the default windowing configuration and the default trigger, the default trigger emits exactly once, and late data is discarded. This is because the default windowing configuration has an allowed lateness value of 0. See the Handling Late Data section for information about modifying this behavior.

9.2. Processing time triggers

The AfterProcessingTime trigger operates on processing time. For example, the AfterProcessingTime.pastFirstElementInPane() AfterProcessingTime trigger emits a window after a certain amount of processing time has passed since data was received. The processing time is determined by the system clock, rather than the data element’s timestamp.

The AfterProcessingTime trigger is useful for triggering early results from a window, particularly a window with a large time frame such as a single global window.

9.3. Data-driven triggers

Beam provides one data-driven trigger, AfterPane.elementCountAtLeast() AfterCount. This trigger works on an element count; it fires after the current pane has collected at least N elements. This allows a window to emit early results (before all the data has accumulated), which can be particularly useful if you are using a single global window.

It is important to note that if, for example, you specify .elementCountAtLeast(50) AfterCount(50) and only 32 elements arrive, those 32 elements sit around forever. If the 32 elements are important to you, consider using composite triggers to combine multiple conditions. This allows you to specify multiple firing conditions such as “fire either when I receive 50 elements, or every 1 second”.

9.4. Setting a trigger

When you set a windowing function for a PCollection by using the WindowWindowInto transform, you can also specify a trigger.

You set the trigger(s) for a PCollection by invoking the method .triggering() on the result of your Window.into() transform. This code sample sets a time-based trigger for a PCollection, which emits results one minute after the first element in that window has been processed. The last line in the code sample, .discardingFiredPanes(), sets the window’s accumulation mode.

You set the trigger(s) for a PCollection by setting the trigger parameter when you use the WindowInto transform. This code sample sets a time-based trigger for a PCollection, which emits results one minute after the first element in that window has been processed. The accumulation_mode parameter sets the window’s accumulation mode.

  PCollection<String> pc = ...;
  pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))
                               .triggering(AfterProcessingTime.pastFirstElementInPane()
                                                              .plusDelayOf(Duration.standardMinutes(1)))
                               .discardingFiredPanes());
  pcollection | WindowInto(
    FixedWindows(1 * 60),
    trigger=AfterProcessingTime(1 * 60),
    accumulation_mode=AccumulationMode.DISCARDING)

9.4.1. Window accumulation modes

When you specify a trigger, you must also set the the window’s accumulation mode. When a trigger fires, it emits the current contents of the window as a pane. Since a trigger can fire multiple times, the accumulation mode determines whether the system accumulates the window panes as the trigger fires, or discards them.

To set a window to accumulate the panes that are produced when the trigger fires, invoke.accumulatingFiredPanes() when you set the trigger. To set a window to discard fired panes, invoke .discardingFiredPanes().

To set a window to accumulate the panes that are produced when the trigger fires, set the accumulation_mode parameter to ACCUMULATING when you set the trigger. To set a window to discard fired panes, set accumulation_mode to DISCARDING.

Let’s look an example that uses a PCollection with fixed-time windowing and a data-based trigger. This is something you might do if, for example, each window represented a ten-minute running average, but you wanted to display the current value of the average in a UI more frequently than every ten minutes. We’ll assume the following conditions:

The following diagram shows data events for key X as they arrive in the PCollection and are assigned to windows. To keep the diagram a bit simpler, we’ll assume that the events all arrive in the pipeline in order.

Diagram of data events for acculumating mode example

9.4.1.1. Accumulating mode

If our trigger is set to accumulating mode, the trigger emits the following values each time it fires. Keep in mind that the trigger fires every time three elements arrive:

  First trigger firing:  [5, 8, 3]
  Second trigger firing: [5, 8, 3, 15, 19, 23]
  Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]
9.4.1.2. Discarding mode

If our trigger is set to discarding mode, the trigger emits the following values on each firing:

  First trigger firing:  [5, 8, 3]
  Second trigger firing:           [15, 19, 23]
  Third trigger firing:                         [9, 13, 10]

9.4.2. Handling late data

If you want your pipeline to process data that arrives after the watermark passes the end of the window, you can apply an allowed lateness when you set your windowing configuration. This gives your trigger the opportunity to react to the late data. If allowed lateness is set, the default trigger will emit new results immediately whenever late data arrives.

You set the allowed lateness by using .withAllowedLateness() when you set your windowing function:

  PCollection<String> pc = ...;
  pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))
                              .triggering(AfterProcessingTime.pastFirstElementInPane()
                                                             .plusDelayOf(Duration.standardMinutes(1)))
                              .withAllowedLateness(Duration.standardMinutes(30));
  pc = [Initial PCollection]
  pc | beam.WindowInto(
            FixedWindows(60),
            trigger=AfterProcessingTime(60),
            allowed_lateness=1800) # 30 minutes
     | ...

This allowed lateness propagates to all PCollections derived as a result of applying transforms to the original PCollection. If you want to change the allowed lateness later in your pipeline, you can apply Window.configure().withAllowedLateness() again, explicitly.

9.5. Composite triggers

You can combine multiple triggers to form composite triggers, and can specify a trigger to emit results repeatedly, at most once, or under other custom conditions.

9.5.1. Composite trigger types

Beam includes the following composite triggers:

9.5.2. Composition with AfterWatermark

Some of the most useful composite triggers fire a single time when Beam estimates that all the data has arrived (i.e. when the watermark passes the end of the window) combined with either, or both, of the following:

You can express this pattern using AfterWatermark. For example, the following example trigger code fires on the following conditions:

  .apply(Window
      .configure()
      .triggering(AfterWatermark
           .pastEndOfWindow()
           .withLateFirings(AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardMinutes(10))))
      .withAllowedLateness(Duration.standardDays(2)));
pcollection | WindowInto(
    FixedWindows(1 * 60),
    trigger=AfterWatermark(late=AfterProcessingTime(10 * 60)),
    allowed_lateness=10,
    accumulation_mode=AccumulationMode.DISCARDING)

9.5.3. Other composite triggers

You can also build other sorts of composite triggers. The following example code shows a simple composite trigger that fires whenever the pane has at least 100 elements, or after a minute.

  Repeatedly.forever(AfterFirst.of(
      AfterPane.elementCountAtLeast(100),
      AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))
pcollection | WindowInto(
    FixedWindows(1 * 60),
    trigger=Repeatedly(
        AfterAny(AfterCount(100), AfterProcessingTime(1 * 60))),
    accumulation_mode=AccumulationMode.DISCARDING)

10. Metrics

In the Beam model, metrics provide some insight into the current state of a user pipeline, potentially while the pipeline is running. There could be different reasons for that, for instance:

10.1. The main concepts of Beam metrics

Reported metrics are implicitly scoped to the transform within the pipeline that reported them. This allows reporting the same metric name in multiple places and identifying the value each transform reported, as well as aggregating the metric across the entire pipeline.

Note: It is runner-dependent whether metrics are accessible during pipeline execution or only after jobs have completed.

10.2. Types of metrics

There are three types of metrics that are supported for the moment: Counter, Distribution and Gauge.

Counter: A metric that reports a single long value and can be incremented or decremented.

Counter counter = Metrics.counter( "namespace", "counter1");

@ProcessElement
public void processElement(ProcessContext context) {
  // count the elements
  counter.inc();
  ...
}

Distribution: A metric that reports information about the distribution of reported values.

Distribution distribution = Metrics.distribution( "namespace", "distribution1");

@ProcessElement
public void processElement(ProcessContext context) {
  Integer element = context.element();
    // create a distribution (histogram) of the values
    distribution.update(element);
    ...
}

Gauge: A metric that reports the latest value out of reported values. Since metrics are collected from many workers the value may not be the absolute last, but one of the latest values.

Gauge gauge = Metrics.gauge( "namespace", "gauge1");

@ProcessElement
public void processElement(ProcessContext context) {
  Integer element = context.element();
  // create a gauge (latest value received) of the values
  gauge.set(element);
  ...
}

10.3. Querying metrics

PipelineResult has a method metrics() which returns a MetricResults object that allows accessing metrics. The main method available in MetricResults allows querying for all metrics matching a given filter.

public interface PipelineResult {
  MetricResults metrics();
}

public abstract class MetricResults {
  public abstract MetricQueryResults queryMetrics(@Nullable MetricsFilter filter);
}

public interface MetricQueryResults {
  Iterable<MetricResult<Long>> getCounters();
  Iterable<MetricResult<DistributionResult>> getDistributions();
  Iterable<MetricResult<GaugeResult>> getGauges();
}

public interface MetricResult<T> {
  MetricName getName();
  String getStep();
  T getCommitted();
  T getAttempted();
}

10.4. Using metrics in pipeline

Below, there is a simple example of how to use a Counter metric in a user pipeline.

// creating a pipeline with custom metrics DoFn
pipeline
    .apply(...)
    .apply(ParDo.of(new MyMetricsDoFn()));

pipelineResult = pipeline.run().waitUntilFinish(...);

// request the metric called "counter1" in namespace called "namespace"
MetricQueryResults metrics =
    pipelineResult
        .metrics()
        .queryMetrics(
            MetricsFilter.builder()
                .addNameFilter(MetricNameFilter.named("namespace", "counter1"))
                .build());

// print the metric value - there should be only one line because there is only one metric
// called "counter1" in the namespace called "namespace"
for (MetricResult<Long> counter: metrics.getCounters()) {
  System.out.println(counter.getName() + ":" + counter.getAttempted());
}

public class MyMetricsDoFn extends DoFn<Integer, Integer> {
  private final Counter counter = Metrics.counter( "namespace", "counter1");

  @ProcessElement
  public void processElement(ProcessContext context) {
    // count the elements
    counter.inc();
    context.output(context.element());
  }
}

10.5. Export metrics

Beam metrics can be exported to external sinks. If a metrics sink is set up in the configuration, the runner will push metrics to it at a default 5s period. The configuration is held in the MetricsOptions class. It contains push period configuration and also sink specific options such as type and URL. As for now only the REST HTTP and the Graphite sinks are supported and only Flink and Spark runners support metrics export.

Also Beam metrics are exported to inner Spark and Flink dashboards to be consulted in their respective UI.

11. State and Timers

Beam’s windowing and triggering facilities provide a powerful abstraction for grouping and aggregating unbounded input data based on timestamps. However there are aggregation use cases for which developers may require a higher degree of control than provided by windows and triggers. Beam provides an API for manually managing per-key state, allowing for fine-grained control over aggregations.

Beam’s state API models state per key. To use the state API, you start out with a keyed PCollection, which in Java is modeled as a PCollection<KV<K, V>>. A ParDo processing this PCollection can now declare state variables. Inside the ParDo these state variables can be used to write or update state for the current key or to read previous state written for that key. State is always fully scoped only to the current processing key.

Windowing can still be used together with stateful processing. All state for a key is scoped to the current window. This means that the first time a key is seen for a given window any state reads will return empty, and that a runner can garbage collect state when a window is completed. It’s also often useful to use Beam’s windowed aggregations prior to the stateful operator. For example, using a combiner to preaggregate data, and then storing aggregated data inside of state. Merging windows are not currently supported when using state and timers.

Sometimes stateful processing is used to implement state-machine style processing inside a DoFn. When doing this, care must be taken to remember that the elements in input PCollection have no guaranteed order and to ensure that the program logic is resilient to this. Unit tests written using the DirectRunner will shuffle the order of element processing, and are recommended to test for correctness.

In Java DoFn declares states to be accessed by creating final StateSpec member variables representing each state. Each state must be named using the StateId annotation; this name is unique to a ParDo in the graph and has no relation to other nodes in the graph. A DoFn can declare multiple state variables.

In Python DoFn declares states to be accessed by creating StateSpec class member variables representing each state. Each StateSpec is initialized with a name, this name is unique to a ParDo in the graph and has no relation to other nodes in the graph. A DoFn can declare multiple state variables.

11.1. Types of state

Beam provides several types of state:

ValueState

A ValueState is a scalar state value. For each key in the input, a ValueState will store a typed value that can be read and modified inside the DoFn’s @ProcessElement or @OnTimer methods. If the type of the ValueState has a coder registered, then Beam will automatically infer the coder for the state value. Otherwise, a coder can be explicitly specified when creating the ValueState. For example, the following ParDo creates a single state variable that accumulates the number of elements seen.

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  @StateId("state") private final StateSpec<ValueState<Integer>> numElements = StateSpecs.value();

  @ProcessElement public void process(@StateId("state") ValueState<Integer> state) {
    // Read the number element seen so far for this user key.
    // state.read() returns null if it was never set. The below code allows us to have a default value of 0.
    int currentValue = MoreObjects.firstNonNull(state.read(), 0);
    // Update the state.
    state.write(currentValue + 1);
  }
}));

Beam also allows explicitly specifying a coder for ValueState values. For example:

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  @StateId("state") private final StateSpec<ValueState<MyType>> numElements = StateSpecs.value(new MyTypeCoder());
                 ...
}));

CombiningState

CombiningState allows you to create a state object that is updated using a Beam combiner. For example, the previous ValueState example could be rewritten to use CombiningState

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  @StateId("state") private final StateSpec<CombiningState<Integer, int[], Integer>> numElements =
      StateSpecs.combining(Sum.ofIntegers());

  @ProcessElement public void process(@StateId("state") ValueState<Integer> state) {
    state.add(1);
  }
}));
class CombiningStateDoFn(DoFn):
  SUM_TOTAL = CombiningValueStateSpec('total', sum)

  def process(self, element, state=SoFn.StateParam(SUM_TOTAL)):
    state.add(1)

_ = (p | 'Read per user' >> ReadPerUser()
       | 'Combine state pardo' >> beam.ParDo(CombiningStateDofn()))

BagState

A common use case for state is to accumulate multiple elements. BagState allows for accumulating an unordered set of elements. This allows for addition of elements to the collection without requiring the reading of the entire collection first, which is an efficiency gain. In addition, runners that support paged reads can allow individual bags larger than available memory.

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  @StateId("state") private final StateSpec<BagState<ValueT>> numElements = StateSpecs.bag();

  @ProcessElement public void process(
    @Element KV<String, ValueT> element,
    @StateId("state") BagState<ValueT> state) {
    // Add the current element to the bag for this key.
    state.add(element.getValue());
    if (shouldFetch()) {
      // Occasionally we fetch and process the values.
      Iterable<ValueT> values = state.read();
      processValues(values);
      state.clear();  // Clear the state for this key.
    }
  }
}));
class BagStateDoFn(DoFn):
  ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder())

  def process(self, element_pair, state=DoFn.StateParam(ALL_ELEMENTS)):
    state.add(element_pair[1])
    if should_fetch():
      all_elements = list(state.read())
      process_values(all_elements)
      state.clear()

_ = (p | 'Read per user' >> ReadPerUser()
       | 'Bag state pardo' >> beam.ParDo(BagStateDoFn()))

11.2. Deferred state reads

When a DoFn contains multiple state specifications, reading each one in order can be slow. Calling the read() function on a state can cause the runner to perform a blocking read. Performing multiple blocking reads in sequence adds latency to element processing. If you know that a state will always be read, you can annotate it as @AlwaysFetched, and then the runner can prefetch all of the states necessary. For example:

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   @StateId("state1") private final StateSpec<ValueState<Integer>> state1 = StateSpecs.value();
   @StateId("state2") private final StateSpec<ValueState<String>> state2 = StateSpecs.value();
   @StateId("state3") private final StateSpec<BagState<ValueT>> state3 = StateSpecs.bag();

  @ProcessElement public void process(
    @AlwaysFetched @StateId("state1") ValueState<Integer> state1,
    @AlwaysFetched @StateId("state2") ValueState<String> state2,
    @AlwaysFetched @StateId("state3") BagState<ValueT> state3) {
    state1.read();
    state2.read();
    state3.read();
  }
}));

If however there are code paths in which the states are not fetched, then annotating with @AlwaysFetched will add unnecessary fetching for those paths. In this case, the readLater method allows the runner to know that the state will be read in the future, allowing multiple state reads to be batched together.

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  @StateId("state1") private final StateSpec<ValueState<Integer>> state1 = StateSpecs.value();
  @StateId("state2") private final StateSpec<ValueState<String>> state2 = StateSpecs.value();
  @StateId("state3") private final StateSpec<BagState<ValueT>> state3 = StateSpecs.bag();

  @ProcessElement public void process(
    @StateId("state1") ValueState<Integer> state1,
    @StateId("state2") ValueState<String> state2,
    @StateId("state3") BagState<ValueT> state3) {
    if (/* should read state */) {
      state1.readLater();
      state2.readLater();
      state3.readLater();
    }

    // The runner can now batch all three states into a single read, reducing latency.
    processState1(state1.read());
    processState2(state2.read());
    processState3(state3.read());
  }
}));

11.3. Timers

Beam provides a per-key timer callback API. This allows for delayed processing of data stored using the state API. Timers can be set to callback at either an event-time or a processing-time timestamp. Every timer is identified with a TimerId. A given timer for a key can only be set for a single timestamp. Calling set on a timer overwrites the previous firing time for that key’s timer.

11.3.1. Event-time timers

Event-time timers fire when the input watermark for the DoFn passes the time at which the timer is set, meaning that the runner believes that there are no more elements to be processed with timestamps before the timer timestamp. This allows for event-time aggregations.

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  @StateId("state") private final StateSpec<ValueState<Integer>> state = StateSpecs.value();
  @TimerId("timer") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement public void process(
      @Element KV<String, ValueT> element,
      @Timestamp Instant elementTs,
      @StateId("state") ValueState<Integer> state,
      @TimerId("timer") Timer timer) {
     ...
     // Set an event-time timer to the element timestamp.
     timer.set(elementTs);
  }

   @OnTimer("timer") public void onTimer() {
      //Process timer.
   }
}));
class EventTimerDoFn(DoFn):
  ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder())
  TIMER = TimerSpec('timer', TimeDomain.WATERMARK)

  def process(self,
              element_pair,
              t = DoFn.TimestampParam,
              buffer = DoFn.StateParam(ALL_ELEMENTS),
              timer = DoFn.TimerParam(TIMER)):
    buffer.add(element_pair[1])
    # Set an event-time timer to the element timestamp.
    timer.set(t)

  @on_timer(TIMER)
  def expiry_callback(self, buffer = DoFn.StateParam(ALL_ELEMENTS)):
    state.clear()

_ = (p | 'Read per user' >> ReadPerUser()
       | 'EventTime timer pardo' >> beam.ParDo(EventTimerDoFn()))

11.3.2. Processing-time timers

Processing-time timers fire when the real wall-clock time passes. This is often used to create larger batches of data before processing. It can also be used to schedule events that should occur at a specific time. Just like with event-time timers, processing-time timers are per key - each key has a separate copy of the timer.

While processing-time timers can be set to an absolute timestamp, it is very common to set them to an offset relative to the current time. In Java, the Timer.offset and Timer.setRelative methods can be used to accomplish this.

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  @TimerId("timer") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @ProcessElement public void process(@TimerId("timer") Timer timer) {
     ...
     // Set a timer to go off 30 seconds in the future.
     timer.offset(Duration.standardSeconds(30)).setRelative();
  }

   @OnTimer("timer") public void onTimer() {
      //Process timer.
   }
}));
class ProcessingTimerDoFn(DoFn):
  ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder())
  TIMER = TimerSpec('timer', TimeDomain.REAL_TIME)

  def process(self,
              element_pair,
              buffer = DoFn.StateParam(ALL_ELEMENTS),
              timer = DoFn.TimerParam(TIMER)):
    buffer.add(element_pair[1])
    # Set a timer to go off 30 seconds in the future.
    timer.set(Timestamp.now() + Duration(seconds=30))

  @on_timer(TIMER)
  def expiry_callback(self, buffer = DoFn.StateParam(ALL_ELEMENTS)):
    # Process timer.
    state.clear()

_ = (p | 'Read per user' >> ReadPerUser()
       | 'ProcessingTime timer pardo' >> beam.ParDo(ProcessingTimerDoFn()))

11.3.3. Dynamic timer tags

Beam also supports dynamically setting a timer tag using TimerMap. This allows for setting multiple different timers in a DoFn and allowing for the timer tags to be dynamically chosen - e.g. based on data in the input elements. A timer with a specific tag can only be set to a single timestamp, so setting the timer again has the effect of overwriting the previous expiration time for the timer with that tag. Each TimerMap is identified with a timer family id, and timers in different timer families are independent.

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  @TimerFamily("actionTimers") private final TimerSpec timer =
    TimerSpecs.timerMap(TimeDomain.EVENT_TIME);

  @ProcessElement public void process(
      @Element KV<String, ValueT> element,
      @Timestamp Instant elementTs,
      @TimerFamily("actionTimers") TimerMap timers) {
     timers.set(element.getValue().getActionType(), elementTs);
  }

   @OnTimerFamily("actionTimers") public void onTimer(@TimerId String timerId) {
     LOG.info("Timer fired with id " + timerId);
   }
}));
To be supported, See BEAM-9602

11.3.4. Timer output timestamps

By default, event-time timers will hold the output watermark of the ParDo to the timestamp of the timer. This means that if a timer is set to 12pm, any windowed aggregations or event-time timers later in the pipeline graph that finish after 12pm will not expire. The timestamp of the timer is also the default output timestamp for the timer callback. This means that any elements output from the onTimer method will have a timestamp equal to the timestamp of the timer firing. For processing-time timers, the default output timestamp and watermark hold is the value of the input watermark at the time the timer was set.

In some cases, a DoFn needs to output timestamps earlier than the timer expiration time, and therefore also needs to hold its output watermark to those timestamps. For example, consider the following pipeline that temporarily batches records into state, and sets a timer to drain the state. This code may appear correct, but will not work properly.

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  @StateId("elementBag") private final StateSpec<BagState<ValueT>> elementBag = StateSpecs.bag();
  @StateId("timerSet") private final StateSpec<ValueState<Boolean>> timerSet = StateSpecs.value();
  @TimerId("outputState") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @ProcessElement public void process(
      @Element KV<String, ValueT> element,
      @StateId("elementBag") BagState<ValueT> elementBag,
      @StateId("timerSet") ValueState<Boolean> timerSet,
      @TimerId("outputState") Timer timer) {
    // Add the current element to the bag for this key.
    elementBag.add(element.getValue());
    if (!MoreObjects.firstNonNull(timerSet.read(), false)) {
      // If the timer is not current set, then set it to go off in a minute.
      timer.offset(Duration.standardMinutes(1)).setRelative();
      timerSet.write(true);
    }
  }

  @OnTimer("outputState") public void onTimer(
      @StateId("elementBag") BagState<ValueT> elementBag,
      @StateId("timerSet") ValueState<Boolean> timerSet,
      OutputReceiver<ValueT> output) {
    for (ValueT bufferedElement : elementBag.read()) {
      // Output each element.
      output.outputWithTimestamp(bufferedElement, bufferedElement.timestamp());
    }
    elementBag.clear();
    // Note that the timer has now fired.
    timerSet.clear();
  }
}));

The problem with this code is that the ParDo is buffering elements, however nothing is preventing the watermark from advancing past the timestamp of those elements, so all those elements might be dropped as late data. In order to prevent this from happening, an output timestamp needs to be set on the timer to prevent the watermark from advancing past the timestamp of the minimum element. The following code demonstrates this.

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  // The bag of elements accumulated.
  @StateId("elementBag") private final StateSpec<BagState<ValueT>> elementBag = StateSpecs.bag();
  // The timestamp of the timer set.
  @StateId("timerTimestamp") private final StateSpec<ValueState<Long>> timerTimestamp = StateSpecs.value();
  // The minimum timestamp stored in the bag.
  @StateId("minTimestampInBag") private final StateSpec<CombiningState<Long, long[], Long>>
     minTimestampInBag = StateSpecs.combining(Min.ofLongs());

  @TimerId("outputState") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @ProcessElement public void process(
      @Element KV<String, ValueT> element,
      @StateId("elementBag") BagState<ValueT> elementBag,
      @AlwaysFetched @StateId("timerTimestamp") ValueState<Long> timerTimestamp,
      @AlwaysFetched @StateId("minTimestampInBag") CombiningState<Long, long[], Long> minTimestamp,
      @TimerId("outputState") Timer timer) {
    // Add the current element to the bag for this key.
    elementBag.add(element.getValue());
    // Keep track of the minimum element timestamp currently stored in the bag.
    minTimestamp.add(element.getValue().timestamp());

    // If the timer is already set, then reset it at the same time but with an updated output timestamp (otherwise
    // we would keep resetting the timer to the future). If there is no timer set, then set one to expire in a minute.
    Long timerTimestampMs = timerTimestamp.read();
    Instant timerToSet = (timerTimestamp.isEmpty().read())
        ? Instant.now().plus(Duration.standardMinutes(1)) : new Instant(timerTimestampMs);
    // Setting the outputTimestamp to the minimum timestamp in the bag holds the watermark to that timestamp until the
    // timer fires. This allows outputting all the elements with their timestamp.
    timer.withOutputTimestamp(minTimestamp.read()).set(timerToSet).
    timerTimestamp.write(timerToSet.getMillis());
  }

  @OnTimer("outputState") public void onTimer(
      @StateId("elementBag") BagState<ValueT> elementBag,
      @StateId("timerTimestamp") ValueState<Long> timerTimestamp,
      OutputReceiver<ValueT> output) {
    for (ValueT bufferedElement : elementBag.read()) {
      // Output each element.
      output.outputWithTimestamp(bufferedElement, bufferedElement.timestamp());
    }
    // Note that the timer has now fired.
    timerTimestamp.clear();
  }
}));

11.4. Garbage collecting state

Per-key state needs to be garbage collected, or eventually the increasing size of state may negatively impact performance. There are two common strategies for garbage collecting state.

11.4.1. Using windows for garbage collection

All state and timers for a key is scoped to the window it is in. This means that depending on the timestamp of the input element the ParDo will see different values for the state depending on the window that element falls into. In addition, once the input watermark passes the end of the window, the runner should garbage collect all state for that window. (note: if allowed lateness is set to a positive value for the window, the runner must wait for the watermark to pass the end of the window plus the allowed lateness before garbage collecting state). This can be used as a garbage-collection strategy.

For example, given the following:

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(Window.into(CalendarWindows.days(1)
   .withTimeZone(DateTimeZone.forID("America/Los_Angeles"))));
       .apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
           @StateId("state") private final StateSpec<ValueState<Integer>> state = StateSpecs.value();
                              ...
           @ProcessElement public void process(@Timestamp Instant ts, @StateId("state") ValueState<Integer> state) {
              // The state is scoped to a calendar day window. That means that if the input timestamp ts is after
              // midnight PST, then a new copy of the state will be seen for the next day.
           }
         }));
class StateDoFn(DoFn):
  ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder())

  def process(self,
              element_pair,
              buffer = DoFn.StateParam(ALL_ELEMENTS)):
    ...

_ = (p | 'Read per user' >> ReadPerUser()
       | 'Windowing' >> beam.WindowInto(FixedWindows(60 * 60 * 24))
       | 'DoFn' >> beam.ParDo(StateDoFn()))

This ParDo stores state per day. Once the pipeline is done processing data for a given day, all the state for that day is garbage collected.

11.4.1. Using timers For garbage collection

In some cases, it is difficult to find a windowing strategy that models the desired garbage-collection strategy. For example, a common desire is to garbage collect state for a key once no activity has been seen on the key for some time. This can be done by updating a timer that garbage collects state. For example

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  // The state for the key.
  @StateId("state") private final StateSpec<ValueState<ValueT>> state = StateSpecs.value();

  // The maximum element timestamp seen so far.
  @StateId("maxTimestampSeen") private final StateSpec<CombiningState<Long, long[], Long>>
     maxTimestamp = StateSpecs.combining(Max.ofLongs());

  @TimerId("gcTimer") private final TimerSpec gcTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement public void process(
      @Element KV<String, ValueT> element,
      @Timestamp Instant ts,
      @StateId("state") ValueState<ValueT> state,
      @StateId("maxTimestampSeen") CombiningState<Long, long[], Long> maxTimestamp,
      @TimerId("gcTimer") gcTimer) {
    updateState(state, element);
    maxTimestamp.add(ts.getMillis());

    // Set the timer to be one hour after the maximum timestamp seen. This will keep overwriting the same timer, so
    // as long as there is activity on this key the state will stay active. Once the key goes inactive for one hour's
    // worth of event time (as measured by the watermark), then the gc timer will fire.
    Instant expirationTime = new Instant(maxTimestamp.read()).plus(Duration.standardHours(1));
    timer.set(expirationTime);
  }

  @OnTimer("gcTimer") public void onTimer(
      @StateId("state") ValueState<ValueT> state,
      @StateId("maxTimestampSeen") CombiningState<Long, long[], Long> maxTimestamp) {
       // Clear all state for the key.
       state.clear();
       maxTimestamp.clear();
    }
 }
class UserDoFn(DoFn):
  ALL_ELEMENTS = BagStateSpec('state', coders.VarIntCoder())
  MAX_TIMESTAMP = CombiningValueStateSpec('max_timestamp_seen', max)
  TIMER = TimerSpec('gc-timer', TimeDomain.WATERMARK)

  def process(self,
              element,
              t = DoFn.TimestampParam,
              state = DoFn.StateParam(ALL_ELEMENTS),
              max_timestamp = DoFn.StateParam(MAX_TIMESTAMP),
              timer = DoFn.TimerParam(TIMER)):
    update_state(state, element)
    max_timestamp.add(t.micros)

    # Set the timer to be one hour after the maximum timestamp seen. This will keep overwriting the same timer, so
    # as long as there is activity on this key the state will stay active. Once the key goes inactive for one hour's
    # worth of event time (as measured by the watermark), then the gc timer will fire.
    expiration_time = Timestamp(micros=max_timestamp.read()) + Duration(seconds=60*60)
    timer.set(expiration_time)

  @on_timer(TIMER)
  def expiry_callback(self,
                      state = DoFn.StateParam(ALL_ELEMENTS),
                      max_timestamp = DoFn.StateParam(MAX_TIMESTAMP)):
    state.clear()
    max_timestamp.clear()


_ = (p | 'Read per user' >> ReadPerUser()
       | 'User DoFn' >> beam.ParDo(UserDoFn()))

11.5. State and timers examples

Following are some example uses of state and timers

11.5.1. Joining clicks and views

In this example, the pipeline is processing data from an e-commerce site’s home page. There are two input streams: a stream of views, representing suggested product links displayed to the user on the home page, and a stream of clicks, representing actual user clicks on these links. The goal of the pipeline is to join click events with view events, outputting a new joined event that contains information from both events. Each link has a unique identifier that is present in both the view event and the join event.

Many view events will never be followed up with clicks. This pipeline will wait one hour for a click, after which it will give up on this join. While every click event should have a view event, some small number of view events may be lost and never make it to the Beam pipeline; the pipeline will similarly wait one hour after seeing a click event, and give up if the view event does not arrive in that time. Input events are not ordered - it is possible to see the click event before the view event. The one hour join timeout should be based on event time, not on processing time.

// Read the event stream and key it by the link id.
PCollection<KV<String, Event>> eventsPerLinkId =
    readEvents()
    .apply(WithKeys.of(Event::getLinkId).withKeyType(TypeDescriptors.strings()));

perUser.apply(ParDo.of(new DoFn<KV<String, Event>, JoinedEvent>() {
  // Store the view event.
  @StateId("view") private final StateSpec<ValueState<Event>> viewState = StateSpecs.value();
  // Store the click event.
  @StateId("click") private final StateSpec<ValueState<Event>> clickState = StateSpecs.value();

  // The maximum element timestamp seen so far.
  @StateId("maxTimestampSeen") private final StateSpec<CombiningState<Long, long[], Long>>
     maxTimestamp = StateSpecs.combining(Max.ofLongs());

  // Timer that fires when an hour goes by with an incomplete join.
  @TimerId("gcTimer") private final TimerSpec gcTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement public void process(
      @Element KV<String, Event> element,
      @Timestamp Instant ts,
      @AlwaysFetched @StateId("view") ValueState<Event> viewState,
      @AlwaysFetched @StateId("click") ValueState<Event> clickState,
      @AlwaysFetched @StateId("maxTimestampSeen") CombiningState<Long, long[], Long> maxTimestampState,
      @TimerId("gcTimer") gcTimer,
      OutputReceiver<JoinedEvent> output) {
    // Store the event into the correct state variable.
    Event event = element.getValue();
    ValueState<Event> valueState = event.getType().equals(VIEW) ? viewState : clickState;
    valueState.write(event);

    Event view = viewState.read();
    Event click = clickState.read();
    (if view != null && click != null) {
      // We've seen both a view and a click. Output a joined event and clear state.
      output.output(JoinedEvent.of(view, click));
      clearState(viewState, clickState, maxTimestampState);
    } else {
       // We've only seen on half of the join.
       // Set the timer to be one hour after the maximum timestamp seen. This will keep overwriting the same timer, so
       // as long as there is activity on this key the state will stay active. Once the key goes inactive for one hour's
       // worth of event time (as measured by the watermark), then the gc timer will fire.
        maxTimestampState.add(ts.getMillis());
       Instant expirationTime = new Instant(maxTimestampState.read()).plus(Duration.standardHours(1));
       gcTimer.set(expirationTime);
    }
  }

  @OnTimer("gcTimer") public void onTimer(
      @StateId("view") ValueState<Event> viewState,
      @StateId("click") ValueState<Event> clickState,
      @StateId("maxTimestampSeen") CombiningState<Long, long[], Long> maxTimestampState) {
       // An hour has gone by with an incomplete join. Give up and clear the state.
       clearState(viewState, clickState, maxTimestampState);
    }

    private void clearState(
      @StateId("view") ValueState<Event> viewState,
      @StateId("click") ValueState<Event> clickState,
      @StateId("maxTimestampSeen") CombiningState<Long, long[], Long> maxTimestampState) {
      viewState.clear();
      clickState.clear();
      maxTimestampState.clear();
    }
 }));

11.5.2. Batching RPCs

In this example, input elements are being forwarded to an external RPC service. The RPC accepts batch requests - multiple events for the same user can be batched in a single RPC call. Since this RPC service also imposes rate limits, we want to batch ten seconds worth of events together in order to reduce the number of calls.

PCollection<KV<String, ValueT>> perUser = readPerUser();
perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  // Store the elements buffered so far.
  @StateId("state") private final StateSpec<BagState<ValueT>> elements = StateSpecs.bag();
  // Keep track of whether a timer is currently set or not.
  @StateId("isTimerSet") private final StateSpec<ValueState<Boolean>> isTimerSet = StateSpecs.value();
  // The processing-time timer user to publish the RPC.
  @TimerId("outputState") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @ProcessElement public void process(
    @Element KV<String, ValueT> element,
    @StateId("state") BagState<ValueT> elementsState,
    @StateId("isTimerSet") ValueState<Boolean> isTimerSetState,
    @TimerId("outputState") Timer timer) {
    // Add the current element to the bag for this key.
    state.add(element.getValue());
    if (!MoreObjects.firstNonNull(isTimerSetState.read(), false)) {
      // If there is no timer currently set, then set one to go off in 10 seconds.
      timer.offset(Duration.standardSeconds(10)).setRelative();
      isTimerSetState.write(true);
   }
  }

  @OnTimer("outputState") public void onTimer(
    @StateId("state") BagState<ValueT> elementsState,
    @StateId("isTimerSet") ValueState<Boolean> isTimerSetState) {
    // Send an RPC containing the batched elements and clear state.
    sendRPC(elementsState.read());
    elementsState.clear();
    isTimerSetState.clear();
  }
}));

12. Splittable DoFns

A Splittable DoFn (SDF) enables users to create modular components containing I/Os (and some advanced non I/O use cases). Having modular I/O components that can be connected to each other simplify typical patterns that users want. For example, a popular use case is to read filenames from a message queue followed by parsing those files. Traditionally, users were required to either write a single I/O connector that contained the logic for the message queue and the file reader (increased complexity) or choose to reuse a message queue I/O followed by a regular DoFn that read the file (decreased performance). With SDF, we bring the richness of Apache Beam’s I/O APIs to a DoFn enabling modularity while maintaining the performance of traditional I/O connectors.

12.1. SDF basics

At a high level, an SDF is responsible for processing element and restriction pairs. A restriction represents a subset of work that would have been necessary to have been done when processing the element.

Executing an SDF follows the following steps:

  1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
  2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
  3. The runner redistributes the element and restriction pairs to several workers.
  4. Element and restriction pairs are processed in parallel (e.g. the file is read). Within this last step, the element and restriction pair can pause its own processing and/or be split into further element and restriction pairs.

Diagram of steps that an SDF is composed of

12.1.1. A basic SDF

A basic SDF is composed of three parts: a restriction, a restriction provider, and a restriction tracker. The restriction is used to represent a subset of work for a given element. The restriction provider lets SDF authors override default implementations for splitting, sizing, watermark estimation, and so forth. In Java and Go, this is the DoFn. Python has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking what subset of the restriction has been completed during processing.

To define an SDF, you must choose whether the SDF is bounded (default) or unbounded and define a way to initialize an initial restriction for an element.

@BoundedPerElement
private static class FileToWordsFn extends DoFn<String, Integer> {
  @GetInitialRestriction
  public OffsetRange getInitialRestriction(@Element String fileName) throws IOException {
    return new OffsetRange(0, new File(fileName).length());
  }

  @ProcessElement
  public void processElement(
      @Element String fileName,
      RestrictionTracker<OffsetRange, Long> tracker,
      OutputReceiver<Integer> outputReceiver)
      throws IOException {
    RandomAccessFile file = new RandomAccessFile(fileName, "r");
    seekToNextRecordBoundaryInFile(file, tracker.currentRestriction().getFrom());
    while (tracker.tryClaim(file.getFilePointer())) {
      outputReceiver.output(readNextRecord(file));
    }
  }

  // Providing the coder is only necessary if it can not be inferred at runtime.
  @GetRestrictionCoder
  public Coder<OffsetRange> getRestrictionCoder() {
    return OffsetRange.Coder.of();
  }
}
class FileToWordsRestrictionProvider(beam.io.RestrictionProvider):
  def initial_restriction(self, file_name):
    return OffsetRange(0, os.stat(file_name).st_size)

  def create_tracker(self, restriction):
    return beam.io.restriction_trackers.OffsetRestrictionTracker()

class FileToWordsFn(beam.DoFn):
  def process(
      self,
      file_name,
      tracker=beam.DoFn.RestrictionParam(FileToWordsRestrictionProvider())):
    with open(file_name) as file_handle:
      file_handle.seek(tracker.current_restriction.start())
      while tracker.try_claim(file_handle.tell()):
        yield read_next_record(file_handle)

  # Providing the coder is only necessary if it can not be inferred at
  # runtime.
  def restriction_coder(self):
    return ...
func (fn *splittableDoFn) CreateInitialRestriction(filename string) offsetrange.Restriction {
	return offsetrange.Restriction{
		Start: 0,
		End:   getFileLength(filename),
	}
}

func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
}

func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
            file, err := os.Open(filename)
	if err != nil {
		return err
	}
	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)

	if err != nil {
		return err
	}
	for rt.TryClaim(offset) {
		record, newOffset := readNextRecord(file)
		emit(record)
		offset = newOffset
	}
	return nil
}

At this point, we have an SDF that supports runner-initiated splits enabling dynamic work rebalancing. To increase the rate at which initial parallelization of work occurs or for those runners that do not support runner-initiated splitting, we recommend providing a set of initial splits:

void splitRestriction(
    @Restriction OffsetRange restriction, OutputReceiver<OffsetRange> splitReceiver) {
  long splitSize = 64 * (1 << 20);
  long i = restriction.getFrom();
  while (i < restriction.getTo() - splitSize) {
    // Compute and output 64 MiB size ranges to process in parallel
    long end = i + splitSize;
    splitReceiver.output(new OffsetRange(i, end));
    i = end;
  }
  // Output the last range
  splitReceiver.output(new OffsetRange(i, restriction.getTo()));
}
class FileToWordsRestrictionProvider(beam.io.RestrictionProvider):
  def split(self, file_name, restriction):
    # Compute and output 64 MiB size ranges to process in parallel
    split_size = 64 * (1 << 20)
    i = restriction.start
    while i < restriction.end - split_size:
      yield OffsetRange(i, i + split_size)
      i += split_size
    yield OffsetRange(i, restriction.end)
func (fn *splittableDoFn) SplitRestriction(filename string, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
	size := 64 * (1 << 20)
	i := rest.Start
	for i < rest.End - size {
		// Compute and output 64 MiB size ranges to process in parallel
		end := i + size
     		splits = append(splits, offsetrange.Restriction{i, end})
		i = end
	}
	// Output the last range
	splits = append(splits, offsetrange.Restriction{i, rest.End})
	return splits
}

12.2. Sizing and progress

Sizing and progress are used during execution of an SDF to inform runners so that they may perform intelligent decisions about which restrictions to split and how to parallelize work.

Before processing an element and restriction, an initial size may be used by a runner to choose how and who processes the restrictions attempting to improve initial balancing and parallelization of work. During the processing of an element and restriction, sizing and progress are used to choose which restrictions to split and who should process them.

By default, we use the restriction tracker’s estimate for work remaining falling back to assuming that all restrictions have an equal cost. To override the default, SDF authors can provide the appropriate method within the restriction provider.

@GetSize
double getSize(@Element String fileName, @Restriction OffsetRange restriction) {
  return (fileName.contains("expensiveRecords") ? 2 : 1) * restriction.getTo()
      - restriction.getFrom();
}
# The RestrictionProvider is responsible for calculating the size of given
# restriction.
class MyRestrictionProvider(beam.transforms.core.RestrictionProvider):
  def restriction_size(self, file_name, restriction):
    weight = 2 if "expensiveRecords" in file_name else 1
    return restriction.size() * weight
func (fn *splittableDoFn) RestrictionSize(filename string, rest offsetrange.Restriction) float64 {
	weight := float64(1)
	if strings.Contains(filename, expensiveRecords) {
		weight = 2
	}
	return weight * (rest.End - rest.Start)
}

12.3. User-initiated checkpoint

Some I/Os cannot produce all of the data necessary to complete a restriction within the lifetime of a single bundle. This typically happens with unbounded restrictions, but can also happen with bounded restrictions. For example, there could be more data that needs to be ingested but is not available yet. Another cause of this scenario is the source system throttling your data.

Your SDF can signal to you that you are not done processing the current restriction. This signal can suggest a time to resume at. While the runner tries to honor the resume time, this is not guaranteed. This allows execution to continue on a restriction that has available work improving resource utilization.

@ProcessElement
public ProcessContinuation processElement(
    RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Record> outputReceiver) {
  long currentPosition = tracker.currentRestriction().getFrom();
  Service service = initializeService();
  try {
    while (true) {
      List<Record> records = service.readNextRecords(currentPosition);
      if (records.isEmpty()) {
        // Return a short delay if there is no data to process at the moment.
        return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(10));
      }
      for (Record record : records) {
        if (!tracker.tryClaim(record.getPosition())) {
          return ProcessContinuation.stop();
        }
        currentPosition = record.getPosition() + 1;

        outputReceiver.output(record);
      }
    }
  } catch (ThrottlingException exception) {
    // Return a longer delay in case we are being throttled.
    return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(60));
  }
}
class MySplittableDoFn(beam.DoFn):
  def process(
      self,
      element,
      restriction_tracker=beam.DoFn.RestrictionParam(
          MyRestrictionProvider())):
    current_position = restriction_tracker.current_restriction.start()
    while True:
      # Pull records from an external service.
      try:
        records = external_service.fetch(current_position)
        if records.empty():
          # Set a shorter delay in case we are being throttled.
          restriction_tracker.defer_remainder(timestamp.Duration(second=10))
          return
        for record in records:
          if restriction_tracker.try_claim(record.position):
            current_position = record.position
            yield record
          else:
            return
      except TimeoutError:
        # Set a longer delay in case we are being throttled.
        restriction_tracker.defer_remainder(timestamp.Duration(seconds=60))
        return

12.4. Runner-initiated split

A runner at any time may attempt to split a restriction while it is being processed. This allows the runner to either pause processing of the restriction so that other work may be done (common for unbounded restrictions to limit the amount of output and/or improve latency) or split the restriction into two pieces, increasing the available parallelism within the system. It is important to author a SDF with this in mind since the end of the restriction may change. Thus when writing the processing loop, it is important to use the result from trying to claim a piece of the restriction instead of assuming one can process till the end.

@ProcessElement
public void badTryClaimLoop(
    @Element String fileName,
    RestrictionTracker<OffsetRange, Long> tracker,
    OutputReceiver<Integer> outputReceiver)
    throws IOException {
  RandomAccessFile file = new RandomAccessFile(fileName, "r");
  seekToNextRecordBoundaryInFile(file, tracker.currentRestriction().getFrom());
  // The restriction tracker can be modified by another thread in parallel
  // so storing state locally is ill advised.
  long end = tracker.currentRestriction().getTo();
  while (file.getFilePointer() < end) {
    // Only after successfully claiming should we produce any output and/or
    // perform side effects.
    tracker.tryClaim(file.getFilePointer());
    outputReceiver.output(readNextRecord(file));
  }
}
class BadTryClaimLoop(beam.DoFn):
  def process(
      self,
      file_name,
      tracker=beam.DoFn.RestrictionParam(FileToWordsRestrictionProvider())):
    with open(file_name) as file_handle:
      file_handle.seek(tracker.current_restriction.start())
      # The restriction tracker can be modified by another thread in parallel
      # so storing state locally is ill advised.
      end = tracker.current_restriction.end()
      while file_handle.tell() < end:
        # Only after successfully claiming should we produce any output and/or
        # perform side effects.
        tracker.try_claim(file_handle.tell())
        yield read_next_record(file_handle)
func (fn *badTryClaimLoop) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
            file, err := os.Open(filename)
	if err != nil {
		return err
	}
	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)

	if err != nil {
		return err
	}

	// The restriction tracker can be modified by another thread in parallel
	// so storing state locally is ill advised.
	end = rt.GetRestriction().(offsetrange.Restriction).End
	for offset < end {
		// Only after successfully claiming should we produce any output and/or
		// perform side effects.
    	rt.TryClaim(offset)
		record, newOffset := readNextRecord(file)
		emit(record)
		offset = newOffset
	}
	return nil
}

12.5. Watermark estimation

The default watermark estimator does not produce a watermark estimate. Therefore, the output watermark is solely computed by the minimum of upstream watermarks.

An SDF can advance the output watermark by specifying a lower bound for all future output that this element and restriction pair will produce. The runner computes the minimum output watermark by taking the minimum over all upstream watermarks and the minimum reported by each element and restriction pair. The reported watermark must monotonically increase for each element and restriction pair across bundle boundaries. When an element and restriction pair stops processing its watermark, it is no longer considered part of the above calculation.

Tips:

12.5.1. Controlling the watermark

There are two general types of watermark estimators: timestamp observing and external clock observing. Timestamp observing watermark estimators use the output timestamp of each record to compute the watermark estimate while external clock observing watermark estimators control the watermark by using a clock that is not associated to any individual output, such as the local clock of the machine or a clock exposed through an external service.

The restriction provider lets you override the default watermark estimation logic and use an existing watermark estimator implementation. You can also provide your own watermark estimator implementation.

      // (Optional) Define a custom watermark state type to save information between bundle
      // processing rounds.
      public static class MyCustomWatermarkState {
        public MyCustomWatermarkState(String element, OffsetRange restriction) {
          // Store data necessary for future watermark computations
        }
      }

      // (Optional) Choose which coder to use to encode the watermark estimator state.
      @GetWatermarkEstimatorStateCoder
      public Coder<MyCustomWatermarkState> getWatermarkEstimatorStateCoder() {
        return AvroCoder.of(MyCustomWatermarkState.class);
      }

      // Define a WatermarkEstimator
      public static class MyCustomWatermarkEstimator
          implements TimestampObservingWatermarkEstimator<MyCustomWatermarkState> {

        public MyCustomWatermarkEstimator(MyCustomWatermarkState type) {
          // Initialize watermark estimator state
        }

        @Override
        public void observeTimestamp(Instant timestamp) {
          // Will be invoked on each output from the SDF
        }

        @Override
        public Instant currentWatermark() {
          // Return a monotonically increasing value
          return currentWatermark;
        }

        @Override
        public MyCustomWatermarkState getState() {
          // Return state to resume future watermark estimation after a checkpoint/split
          return null;
        }
      }

      // Then, update the DoFn to generate the initial watermark estimator state for all new element
      // and restriction pairs and to create a new instance given watermark estimator state.

      @GetInitialWatermarkEstimatorState
      public MyCustomWatermarkState getInitialWatermarkEstimatorState(
          @Element String element, @Restriction OffsetRange restriction) {
        // Compute and return the initial watermark estimator state for each element and
        // restriction. All subsequent processing of an element and restriction will be restored
        // from the existing state.
        return new MyCustomWatermarkState(element, restriction);
      }

      @NewWatermarkEstimator
      public WatermarkEstimator<MyCustomWatermarkState> newWatermarkEstimator(
          @WatermarkEstimatorState MyCustomWatermarkState oldState) {
        return new MyCustomWatermarkEstimator(oldState);
      }
    }
# (Optional) Define a custom watermark state type to save information between
# bundle processing rounds.
class MyCustomerWatermarkEstimatorState(object):
  def __init__(self, element, restriction):
    # Store data necessary for future watermark computations
    pass

# Define a WatermarkEstimator
class MyCustomWatermarkEstimator(WatermarkEstimator):
  def __init__(self, estimator_state):
    self.state = estimator_state

  def observe_timestamp(self, timestamp):
    # Will be invoked on each output from the SDF
    pass

  def current_watermark(self):
    # Return a monotonically increasing value
    return current_watermark

  def get_estimator_state(self):
    # Return state to resume future watermark estimation after a
    # checkpoint/split
    return self.state

# Then, a WatermarkEstimatorProvider needs to be created for this
# WatermarkEstimator
class MyWatermarkEstimatorProvider(WatermarkEstimatorProvider):
  def initial_estimator_state(self, element, restriction):
    return MyCustomerWatermarkEstimatorState(element, restriction)

  def create_watermark_estimator(self, estimator_state):
    return MyCustomWatermarkEstimator(estimator_state)

# Finally, define the SDF using your estimator.
class MySplittableDoFn(beam.DoFn):
  def process(
      self,
      element,
      restriction_tracker=beam.DoFn.RestrictionParam(MyRestrictionProvider()),
      watermark_estimator=beam.DoFn.WatermarkEstimatorParam(
          MyWatermarkEstimatorProvider())):
    # The current watermark can be inspected.
    watermark_estimator.current_watermark()

12.6. Truncating during drain

Runners which support draining pipelines need the ability to drain SDFs; otherwise, the pipeline may never stop. By default, bounded restrictions process the remainder of the restriction while unbounded restrictions finish processing at the next SDF-initiated checkpoint or runner-initiated split. You are able to override this default behavior by defining the appropriate method on the restriction provider.

@TruncateRestriction
@Nullable
TruncateResult<OffsetRange> truncateRestriction(
    @Element String fileName, @Restriction OffsetRange restriction) {
  if (fileName.contains("optional")) {
    // Skip optional files
    return null;
  }
  return TruncateResult.of(restriction);
}
class MyRestrictionProvider(beam.transforms.core.RestrictionProvider):
  def truncate(self, file_name, restriction):
    if "optional" in file_name:
      # Skip optional files
      return None
    return restriction

12.7. Bundle finalization

Bundle finalization enables a DoFn to perform side effects by registering a callback. The callback is invoked once the runner has acknowledged that it has durably persisted the output. For example, a message queue might need to acknowledge messages that it has ingested into the pipeline. Bundle finalization is not limited to SDFs but is called out here since this is the primary use case.

@ProcessElement
public void processElement(ProcessContext c, BundleFinalizer bundleFinalizer) {
  // ... produce output ...

  bundleFinalizer.afterBundleCommit(
      Instant.now().plus(Duration.standardMinutes(5)),
      () -> {
        // ... perform a side effect ...
      });
}
class MySplittableDoFn(beam.DoFn):
  def process(self, element, bundle_finalizer=beam.DoFn.BundleFinalizerParam):
    # ... produce output ...

    # Register callback function for this bundle that performs the side
    # effect.
    bundle_finalizer.register(my_callback_func)