Ensuring Python Type Safety

Python is a dynamically-typed language with no static type checking. Because of the way Python’s type checking works, as well as the deferred nature of runner execution, developer productivity can easily become bottle-necked by time spent investigating type-related errors.

The Apache Beam SDK for Python uses type hints during pipeline construction and runtime to try to emulate the correctness guarantees achieved by true static typing. Additionally, using type hints lays some groundwork that allows the backend service to perform efficient type deduction and registration of Coder objects.

Python version 3.5 introduces a module called typing to provide hints for type validators in the language. The Beam SDK for Python implements a subset of PEP 484 and aims to follow it as closely as possible in its own typehints module.

These flags control Beam type safety:

Benefits of Type Hints

When you use type hints, Beam raises exceptions during pipeline construction time, rather than runtime. For example, Beam generates an exception if it detects that your pipeline applies mismatched PTransforms (where the expected outputs of one transform do not match the expected inputs of the following transform). These exceptions are raised at pipeline construction time, regardless of where your pipeline will execute. Introducing type hints for the PTransforms you define allows you to catch potential bugs up front in the local runner, rather than after minutes of execution into a deep, complex pipeline.

Consider the following example, in which numbers is a PCollection of str values:

p = TestPipeline()

numbers = p | beam.Create(['1', '2', '3'])

The code then applies a Filter transform to the numbers collection with a callable that retrieves the even numbers.

evens = numbers | beam.Filter(lambda x: x % 2 == 0)

When you call p.run(), this code generates an error when trying to execute this transform because Filter expects a PCollection of integers, but is given a PCollection of strings instead. With type hints, this error could have been caught at pipeline construction time, before the pipeline even started running.

The Beam SDK for Python includes some automatic type hinting: for example, some PTransforms, such as Create and simple ParDo transforms, attempt to deduce their output type given their input. However, Beam cannot deduce types in all cases. Therefore, the recommendation is that you declare type hints to aid you in performing your own type checks.

Declaring Type Hints

You can declare type hints on callables, DoFns, or entire PTransforms. There are three ways to declare type hints: inline during pipeline construction, as properties of the DoFn or PTransform using decorators, or as Python 3 type annotations on certain functions.

You can always declare type hints inline, but if you need them for code that is going to be reused, declare them as annotations or decorators. For example, if your DoFn requires an int input, it makes more sense to declare the type hint for the input as an annotation of the arguments to process (or a property of the DoFn) rather than inline.

Using Annotations has the added benefit of allowing use of a static type checker (such as mypy) to additionally type check your code. If you already use a type checker, using annotations instead of decorators reduces code duplication. However, annotations do not cover all the use cases that decorators and inline declarations do. For instance, they do not work for lambda functions.

Declaring Type Hints Using Type Annotations

New in version 2.21.0.

To specify type hints as annotations on certain functions, use them as usual and omit any decorator hints or inline hints.

Annotations are currently supported on:

The following code declares an int input and a str output type hint on the to_id transform, using annotations on my_fn.

def my_fn(element: int) -> str:
  return 'id_' + str(element)

ids = numbers | 'to_id' >> beam.Map(my_fn)

The following code demonstrates how to use annotations on PTransform subclasses. A valid annotation is a PCollection that wraps an internal (nested) type, PBegin, PDone, or None. The following code declares typehints on a custom PTransform, that takes a PCollection[int] input and outputs a PCollection[str], using annotations.

from apache_beam.pvalue import PCollection

class IntToStr(beam.PTransform):
  def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
    return pcoll | beam.Map(lambda elem: str(elem))

ids = numbers | 'convert to str' >> IntToStr()

The following code declares int input and output type hints on filter_evens, using annotations on FilterEvensDoFn.process. Since process returns a generator, the output type for a DoFn producing a PCollection[int] is annotated as Iterable[int] (Generator[int, None, None] would also work here). Beam will remove the outer iterable of the return type on the DoFn.process method and functions passed to FlatMap to deduce the element type of resulting PCollection . It is an error to have a non-iterable return type annotation for these functions. Other supported iterable types include: Iterator, Generator, Tuple, List.

from typing import Iterable

class FilterEvensDoFn(beam.DoFn):
  def process(self, element: int) -> Iterable[int]:
    if element % 2 == 0:
      yield element

evens = numbers | 'filter_evens' >> beam.ParDo(FilterEvensDoFn())

The following code declares int input and output type hints on double_evens, using annotations on FilterEvensDoubleDoFn.process. Since process returns a list or None, the output type is annotated as Optional[List[int]]. Beam will also remove the outer Optional and (as above) the outer iterable of the return type, only on the DoFn.process method and functions passed to FlatMap.

from typing import List, Optional

class FilterEvensDoubleDoFn(beam.DoFn):
  def process(self, element: int) -> Optional[List[int]]:
    if element % 2 == 0:
      return [element, element]
    return None

evens = numbers | 'double_evens' >> beam.ParDo(FilterEvensDoubleDoFn())

Declaring Type Hints Inline

To specify type hints inline, use the methods with_input_types and with_output_types. The following example code declares an input type hint inline:

evens = numbers | beam.Filter(lambda x: x % 2 == 0).with_input_types(int)

When you apply the Filter transform to the numbers collection in the example above, you’ll be able to catch the error during pipeline construction.

Declaring Type Hints Using Decorators

To specify type hints as properties of a DoFn or PTransform, use the decorators @with_input_types() and @with_output_types().

The following code declares an int type hint on FilterEvensDoFn, using the decorator @with_input_types().

@beam.typehints.with_input_types(int)
class FilterEvensDoFn(beam.DoFn):
  def process(self, element):
    if element % 2 == 0:
      yield element

evens = numbers | beam.ParDo(FilterEvensDoFn())

Decorators receive an arbitrary number of positional and/or keyword arguments, typically interpreted in the context of the function they’re wrapping. Generally the first argument is a type hint for the main input, and additional arguments are type hints for side inputs.

Disabling Annotations Use

Since this style of type hint declaration is enabled by default, here are some ways to disable it.

  1. Using the @beam.typehints.no_annotations decorator on the specific function you want Beam to ignore annotations for.
  2. Declaring type hints using the decorator or inline methods above. These will take precedence over annotations.
  3. Calling beam.typehints.disable_type_annotations() before pipeline creation. This will prevent Beam from looking at annotations on all functions.

Defining Generic Types

You can use type hint annotations to define generic types. The following code specifies an input type hint that asserts the generic type T, and an output type hint that asserts the type Tuple[int, T]. If the input to MyTransform is of type str, Beam will infer the output type to be Tuple[int, str].

from typing import Tuple, TypeVar

T = TypeVar('T')

@beam.typehints.with_input_types(T)
@beam.typehints.with_output_types(Tuple[int, T])
class MyTransform(beam.PTransform):
  def expand(self, pcoll):
    return pcoll | beam.Map(lambda x: (len(x), x))

words_with_lens = words | MyTransform()

Kinds of Type Hints

You can use type hints with any class, including Python primitive types, container classes, and user-defined classes. All classes, such as int, float, and user-defined classes, can be used to define type hints, called simple type hints. Container types such as lists, tuples, and iterables, can also be used to define type hints and are called parameterized type hints. Finally, there are some special types that don’t correspond to any concrete Python classes, such as Any, Optional, and Union, that are also permitted as type hints.

Beam defines its own internal type hint types, which are still available for use for backward compatibility. It also supports Python’s typing module types, which are internally converted to Beam internal types.

For new code, it is recommended to use typing module types.

Simple Type Hints

Type hints can be of any class, from int and str, to user-defined classes. If you have a class as a type hint, you may want to define a coder for it.

Parameterized Type Hints

Parameterized type hints are useful for hinting the types of container-like Python objects, such as list. These type hints further refine the elements in those container objects.

The parameters for parameterized type hints can be simple types, parameterized types, or type variables. Element types that are type variables, such as T, impose relationships between the inputs and outputs of an operation (for example, List[T] -> T). Type hints can be nested, allowing you to define type hints for complex types. For example, List[Tuple[int, int, str]].

In order to avoid conflicting with the namespace of the built-in container types, the first letter is capitalized.

The following parameterized type hints are permitted:

Note: The Tuple[T, U] type hint is a tuple with a fixed number of heterogeneously typed elements, while the Tuple[T, ...] type hint is a tuple with a variable of homogeneously typed elements.

Special Type Hints

The following are special type hints that don’t correspond to a class, but rather to special types introduced in PEP 484.

Runtime Type Checking

In addition to using type hints for type checking at pipeline construction, you can enable runtime type checking to check that actual elements satisfy the declared type constraints during pipeline execution.

For example, the following pipeline emits elements of the wrong type. Depending on the runner implementation, its execution may or may not fail at runtime.

p = TestPipeline()
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)

However, if you enable runtime type checking, the code is guaranteed to fail at runtime. To enable runtime type checking, set the pipeline option runtime_type_check to True.

p = TestPipeline(options=PipelineOptions(runtime_type_check=True))
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
p.run()

Note that because runtime type checks are done for each PCollection element, enabling this feature may incur a significant performance penalty. It is therefore recommended that runtime type checks are disabled for production pipelines. See the following section for a quicker, production-friendly alternative.

Faster Runtime Type Checking

You can enable faster, sampling-based runtime type checking by setting the pipeline option performance_runtime_type_check to True.

The is a Python 3 only feature that works by runtime type checking a small subset of values, called a sample, using optimized Cython code.

Currently, this feature does not support runtime type checking for side inputs or combining operations. These are planned to be supported in a future release of Beam.

Use of Type Hints in Coders

When your pipeline reads, writes, or otherwise materializes its data, the elements in your PCollection need to be encoded and decoded to and from byte strings. Byte strings are used for intermediate storage, for comparing keys in GroupByKey operations, and for reading from sources and writing to sinks.

The Beam SDK for Python uses Python’s native support for serializing objects of unknown type, a process called pickling. However, using the PickleCoder comes with several drawbacks: it is less efficient in time and space, and the encoding used is not deterministic, which hinders distributed partitioning, grouping, and state lookup.

To avoid these drawbacks, you can define Coder classes for encoding and decoding types in a more efficient way. You can specify a Coder to describe how the elements of a given PCollection should be encoded and decoded.

In order to be correct and efficient, a Coder needs type information and for PCollections to be associated with a specific type. Type hints are what make this type information available. The Beam SDK for Python provides built-in coders for the standard Python types such as int, float, str, bytes, and unicode.

Deterministic Coders

If you don’t define a Coder, the default is a coder that falls back to pickling for unknown types. In some cases, you must specify a deterministic Coder or else you will get a runtime error.

For example, suppose you have a PCollection of key-value pairs whose keys are Player objects. If you apply a GroupByKey transform to such a collection, its key objects might be serialized differently on different machines when a nondeterministic coder, such as the default pickle coder, is used. Since GroupByKey uses this serialized representation to compare keys, this may result in incorrect behavior. To ensure that the elements are always encoded and decoded in the same way, you need to define a deterministic Coder for the Player class.

The following code shows the example Player class and how to define a Coder for it. When you use type hints, Beam infers which Coders to use, using beam.coders.registry. The following code registers PlayerCoder as a coder for the Player class. In the example, the input type declared for CombinePerKey is Tuple[Player, int]. In this case, Beam infers that the Coder objects to use are TupleCoder, PlayerCoder, and IntCoder.

from typing import Tuple

class Player(object):
  def __init__(self, team, name):
    self.team = team
    self.name = name

class PlayerCoder(beam.coders.Coder):
  def encode(self, player):
    return ('%s:%s' % (player.team, player.name)).encode('utf-8')

  def decode(self, s):
    return Player(*s.decode('utf-8').split(':'))

  def is_deterministic(self):
    return True

beam.coders.registry.register_coder(Player, PlayerCoder)

def parse_player_and_score(csv):
  name, team, score = csv.split(',')
  return Player(team, name), int(score)

totals = (
    lines
    | beam.Map(parse_player_and_score)
    | beam.CombinePerKey(sum).with_input_types(Tuple[Player, int]))