apache_beam.transforms.ptransform module

PTransform and descendants.

A PTransform is an object describing (not executing) a computation. The actual execution semantics for a transform is captured by a runner object. A transform object always belongs to a pipeline object.

A PTransform derived class needs to define the expand() method that describes how one or more PValues are created by the transform.

The module defines a few standard transforms: FlatMap (parallel do), GroupByKey (group by key), etc. Note that the expand() methods for these classes contain code that will add nodes to the processing graph associated with a pipeline.

As support for the FlatMap transform, the module also defines a DoFn class and wrapper class that allows lambda functions to be used as FlatMap processing functions.

class apache_beam.transforms.ptransform.PTransform(label=None)[source]

Bases: apache_beam.typehints.decorators.WithTypeHints, apache_beam.transforms.display.HasDisplayData

A transform object used to modify one or more PCollections.

Subclasses must define an expand() method that will be used when the transform is applied to some arguments. Typical usage pattern will be:

input | CustomTransform(…)

The expand() method of the CustomTransform object passed in will be called with input as an argument.

side_inputs = ()
pipeline = None
label
default_label()[source]
default_type_hints()[source]
with_input_types(input_type_hint)[source]

Annotates the input type of a PTransform with a type-hint.

Parameters:input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.
Raises:TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.
Returns:A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.
Return type:PTransform
with_output_types(type_hint)[source]

Annotates the output type of a PTransform with a type-hint.

Parameters:type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint.
Raises:TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.
Returns:A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.
Return type:PTransform
type_check_inputs(pvalueish)[source]
infer_output_type(unused_input_type)[source]
type_check_outputs(pvalueish)[source]
type_check_inputs_or_outputs(pvalueish, input_or_output)[source]
expand(input_or_inputs)[source]
get_windowing(inputs)[source]

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

classmethod register_urn(urn, parameter_type, constructor=None)[source]
to_runner_api(context, has_parts=False, **extra_kwargs)[source]
classmethod from_runner_api(proto, context)[source]
to_runner_api_parameter(unused_context)[source]
to_runner_api_pickled(unused_context)[source]
runner_api_requires_keyed_input()[source]
apache_beam.transforms.ptransform.ptransform_fn(fn)[source]

A decorator for a function-based PTransform.

Parameters:fn – A function implementing a custom PTransform.
Returns:A CallablePTransform instance wrapping the function-based PTransform.

This wrapper provides an alternative, simpler way to define a PTransform. The standard method is to subclass from PTransform and override the expand() method. An equivalent effect can be obtained by defining a function that accepts an input PCollection and additional optional arguments and returns a resulting PCollection. For example:

@ptransform_fn
@beam.typehints.with_input_types(..)
@beam.typehints.with_output_types(..)
def CustomMapper(pcoll, mapfn):
  return pcoll | ParDo(mapfn)

The equivalent approach using PTransform subclassing:

@beam.typehints.with_input_types(..)
@beam.typehints.with_output_types(..)
class CustomMapper(PTransform):

  def __init__(self, mapfn):
    super(CustomMapper, self).__init__()
    self.mapfn = mapfn

  def expand(self, pcoll):
    return pcoll | ParDo(self.mapfn)

With either method the custom PTransform can be used in pipelines as if it were one of the “native” PTransforms:

result_pcoll = input_pcoll | 'Label' >> CustomMapper(somefn)

Note that for both solutions the underlying implementation of the pipe operator (i.e., |) will inject the pcoll argument in its proper place (first argument if no label was specified and second argument otherwise).

Type hint support needs to be enabled via the –type_check_additional=ptransform_fn flag in Beam 2. If CustomMapper is a Cython function, you can still specify input and output types provided the decorators appear before @ptransform_fn.

apache_beam.transforms.ptransform.label_from_callable(fn)[source]