Python is a dynamically-typed language with no static type checking. Because of the way Python’s type checking works, as well as the deferred nature of runner execution, developer productivity can easily become bottle-necked by time spent investigating type-related errors.
The Apache Beam SDK for Python uses type hints during pipeline construction and runtime to try to emulate the correctness guarantees achieved by true static typing. Additionally, using type hints lays some groundwork that allows the backend service to perform efficient type deduction and registration of
Python version 3.5 introduces a module called typing to provide hints for type validators in the language. The Beam SDK for Python, based on Python version 2.7, implements a subset of PEP 484 and aims to follow it as closely as possible in its own typehints module.
The Beam SDK for Python includes some automatic type checking: for example, some
PTransforms, such as
Create and simple
ParDo transforms, attempt to deduce their output type given their input. However, the Beam cannot infer types in all cases. Therefore, the recommendation is that you declare type hints to aid you in performing your own type checks if necessary.
When you use type hints, the runner raises exceptions during pipeline construction time, rather than runtime. For example, the runner generates an exception if it detects that your pipeline applies mismatched
PTransforms (where the expected outputs of one transform do not match the expected inputs of the following transform). These exceptions are raised at pipeline construction time, regardless of where your pipeline will execute. Introducing type hints for the
PTransforms you define allows you to catch potential bugs up front in the local runner, rather than after minutes of execution into a deep, complex pipeline.
Consider the following example, in which
numbers is a
p = TestPipeline(options=PipelineOptions(pipeline_type_check=True)) numbers = p | beam.Create(['1', '2', '3'])
The code then applies a
Filter transform to the
numbers collection with a callable that retrieves the even numbers.
evens = numbers | beam.Filter(lambda x: x % 2 == 0)
When you call
p.run(), this code generates an error because
Filter expects a
PCollection of integers, but is given a
PCollection of strings instead.
You can declare type hints on callables,
DoFns, or entire
PTransforms. There are two ways to declare type hints: inline during pipeline construction, or as properties of the
PTransform, using decorators.
You can always declare type hints inline, but if you need them for code that is going to be reused, declare them as decorators. For example, if your
DoFn requires an
int input, it makes more sense to declare the type hint for the input as a property of the
DoFn rather than inline.
To specify type hints inline, use the methods
with_output_types. The following example code declares an input type hint inline:
evens = numbers | beam.Filter(lambda x: x % 2 == 0).with_input_types(int)
When you apply the Filter transform to the numbers collection in the example above, you’ll be able to catch the error during pipeline construction.
To specify type hints as properties of a
PTransform, use the decorators
The following code declares an
int type hint on
FilterEvensDoFn, using the decorator
@beam.typehints.with_input_types(int) class FilterEvensDoFn(beam.DoFn): def process(self, element): if element % 2 == 0: yield element evens = numbers | beam.ParDo(FilterEvensDoFn())
Decorators receive an arbitrary number of positional and/or keyword arguments, typically interpreted in the context of the function they’re wrapping. Generally the first argument is a type hint for the main input, and additional arguments are type hints for side inputs.
You can use type hint annotations to define generic types. The following code specifies an input type hint that asserts the generic type
T, and an output type hint that asserts the type
T = beam.typehints.TypeVariable('T') @beam.typehints.with_input_types(T) @beam.typehints.with_output_types(beam.typehints.Tuple[int, T]) class MyTransform(beam.PTransform): def expand(self, pcoll): return pcoll | beam.Map(lambda x: (len(x), x)) words_with_lens = words | MyTransform()
You can use type hints with any class, including Python primitive types, container classes, and user-defined classes. All classes, such as
float, and user-defined classes, can be used to define type hints, called simple type hints. Container types such as lists, tuples, and iterables, can also be used to define type hints and are called parameterized type hints. Finally, there are some special types that don’t correspond to any concrete Python classes, such as
Union, that are also permitted as type hints.
Type hints can be of any class, from
str, to user-defined classes. If you have a class as a type hint, you may want to define a coder for it.
Parameterized type hints are useful for hinting the types of container-like Python objects, such as
list. These type hints further refine the elements in those container objects.
The parameters for parameterized type hints can be simple types, parameterized types, or type variables. Element types that are type variables, such as
T, impose relationships between the inputs and outputs of an operation (for example,
T). Type hints can be nested, allowing you to define type hints for complex types. For example,
List[Tuple[int, int, str]].
In order to avoid conflicting with the namespace of the built-in container types, the first letter is capitalized.
The following parameterized type hints are permitted:
Tuple[T, U] type hint is a tuple with a fixed number of heterogeneously typed elements, while the
Tuple[T, ...] type hint is a tuple with a variable of homogeneously typed elements.
The following are special type hints that don’t correspond to a class, but rather to special types introduced in PEP 484.
Union[T, U, V]
In addition to using type hints for type checking at pipeline construction, you can enable runtime type checking to check that actual elements satisfy the declared type constraints during pipeline execution.
For example, the following code would pass at both pipeline construction and runtime.
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
However, if you enable runtime type checking, the code passes at pipeline construction and fails at runtime. To enable runtime type checking, set the pipeline option
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str) p.run()
When your pipeline reads, writes, or otherwise materializes its data, the elements in your
PCollection need to be encoded and decoded to and from byte strings. Byte strings are used for intermediate storage, for comparing keys in
GroupByKey operations, and for reading from sources and writing to sinks.
The Beam SDK for Python uses Python’s native support for serializing objects, a process called pickling, to serialize user functions. However, using the
PickleCoder comes with several drawbacks: it is less efficient in time and space, and the encoding used is not deterministic, which hinders distributed partitioning, grouping, and state lookup.
To avoid these drawbacks, you can define
Coder classes for encoding and decoding types in a more efficient way. You can specify a
Coder to describe how the elements of a given
PCollection should be encoded and decoded.
In order to be correct and efficient, a
Coder needs type information and for
PCollections to be associated with a specific type. Type hints are what make this type information available. The Beam SDK for Python provides built-in coders for the standard Python types
If you don’t define a
Coder, the default is
PickleCoder, which is nondeterministic. In some cases, you must specify a deterministic
Coder or else you will get a runtime error.
For example, suppose you have a
PCollection of key-value pairs whose keys are
Player objects. If you apply a
GroupByKey transform to such a collection, its key objects might be serialized differently on different machines when a nondeterministic coder, such as the default pickle coder, is used. Since
GroupByKey uses this serialized representation to compare keys, this may result in incorrect behavior. To ensure that the elements are always encoded and decoded in the same way, you need to define a deterministic
Coder for the
The following code shows the example
Player class and how to define a
Coder for it. When you use type hints, Beam infers which
Coders to use, using
beam.coders.registry. The following code registers
PlayerCoder as a coder for the
Player class. In the example, the input type declared for
Tuple[Player, int]. In this case, Beam infers that the
Coder objects to use are
class Player(object): def __init__(self, team, name): self.team = team self.name = name class PlayerCoder(beam.coders.Coder): def encode(self, player): return '%s:%s' % (player.team, player.name) def decode(self, s): return Player(*s.split(':')) def is_deterministic(self): return True beam.coders.registry.register_coder(Player, PlayerCoder) def parse_player_and_score(csv): name, team, score = csv.split(',') return Player(team, name), int(score) totals = ( lines | beam.Map(parse_player_and_score) | beam.CombinePerKey(sum).with_input_types( beam.typehints.Tuple[Player, int]))