ParDo

Pydoc Pydoc




A transform for generic parallel processing. A ParDo transform considers each element in the input PCollection, performs some processing function (your user code) on that element, and emits zero or more elements to an output PCollection.

See more information in the Beam Programming Guide.

Examples

In the following examples, we explore how to create custom DoFns and access the timestamp and windowing information.

Example 1: ParDo with a simple DoFn

The following example defines a simple DoFn class called SplitWords which stores the delimiter as an object field. The process method is called once per element, and it can yield zero or more output elements.

import apache_beam as beam

class SplitWords(beam.DoFn):
  def __init__(self, delimiter=','):
    self.delimiter = delimiter

  def process(self, text):
    for word in text.split(self.delimiter):
      yield word

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '🍓Strawberry,🥕Carrot,🍆Eggplant',
          '🍅Tomato,🥔Potato',
      ])
      | 'Split words' >> beam.ParDo(SplitWords(','))
      | beam.Map(print)
  )

Output PCollection after ParDo:

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato
Run code now Run code now
View source code View source code




Example 2: ParDo with timestamp and window information

In this example, we add new parameters to the process method to bind parameter values at runtime.

import apache_beam as beam

class AnalyzeElement(beam.DoFn):
  def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
    yield '\n'.join([
        '# timestamp',
        'type(timestamp) -> ' + repr(type(timestamp)),
        'timestamp.micros -> ' + repr(timestamp.micros),
        'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
        'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
        '',
        '# window',
        'type(window) -> ' + repr(type(window)),
        'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()),
        'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()),
        'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
    ])

with beam.Pipeline() as pipeline:
  dofn_params = (
      pipeline
      | 'Create a single test element' >> beam.Create([':)'])
      | 'Add timestamp (Spring equinox 2020)' >> beam.Map(
          lambda elem: beam.window.TimestampedValue(elem, 1584675660))
      | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
      | 'Analyze element' >> beam.ParDo(AnalyzeElement())
      | beam.Map(print)
  )

stdout output:

# timestamp
type(timestamp) -> <class 'apache_beam.utils.timestamp.Timestamp'>
timestamp.micros -> 1584675660000000
timestamp.to_rfc3339() -> '2020-03-20T03:41:00Z'
timestamp.to_utc_datetime() -> datetime.datetime(2020, 3, 20, 3, 41)

# window
type(window) -> <class 'apache_beam.transforms.window.IntervalWindow'>
window.start -> Timestamp(1584675660) (2020-03-20 03:41:00)
window.end -> Timestamp(1584675690) (2020-03-20 03:41:30)
window.max_timestamp() -> Timestamp(1584675689.999999) (2020-03-20 03:41:29.999999)
Run code now Run code now
View source code View source code




Example 3: ParDo with DoFn methods

A DoFn can be customized with a number of methods that can help create more complex behaviors. You can customize what a worker does when it starts and shuts down with setup and teardown. You can also customize what to do when a bundle of elements starts and finishes with start_bundle and finish_bundle.

import apache_beam as beam

class DoFnMethods(beam.DoFn):
  def __init__(self):
    print('__init__')
    self.window = beam.window.GlobalWindow()

  def setup(self):
    print('setup')

  def start_bundle(self):
    print('start_bundle')

  def process(self, element, window=beam.DoFn.WindowParam):
    self.window = window
    yield '* process: ' + element

  def finish_bundle(self):
    yield beam.utils.windowed_value.WindowedValue(
        value='* finish_bundle: 🌱🌳🌍',
        timestamp=0,
        windows=[self.window],
    )

  def teardown(self):
    print('teardown')

with beam.Pipeline() as pipeline:
  results = (
      pipeline
      | 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔'])
      | 'DoFn methods' >> beam.ParDo(DoFnMethods())
      | beam.Map(print)
  )

stdout output:

__init__
setup
start_bundle
* process: 🍓
* process: 🥕
* process: 🍆
* process: 🍅
* process: 🥔
* finish_bundle: 🌱🌳🌍
teardown
Run code now Run code now
View source code View source code




Known issues:

  • [BEAM-7885] DoFn.setup() doesn’t run for streaming jobs running in the DirectRunner.
  • [BEAM-7340] DoFn.teardown() metrics are lost.
Pydoc Pydoc