ParDo
![]() |
A transform for generic parallel processing.
A ParDo
transform considers each element in the input PCollection
,
performs some processing function (your user code) on that element,
and emits zero or more elements to an output PCollection
.
See more information in the Beam Programming Guide.
Examples
In the following examples, we explore how to create custom DoFn
s and access
the timestamp and windowing information.
Example 1: ParDo with a simple DoFn
The following example defines a simple DoFn
class called SplitWords
which stores the delimiter
as an object field.
The process
method is called once per element,
and it can yield zero or more output elements.
import apache_beam as beam
class SplitWords(beam.DoFn):
def __init__(self, delimiter=','):
self.delimiter = delimiter
def process(self, text):
for word in text.split(self.delimiter):
yield word
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'🍓Strawberry,🥕Carrot,🍆Eggplant',
'🍅Tomato,🥔Potato',
])
| 'Split words' >> beam.ParDo(SplitWords(','))
| beam.Map(print))
Output:
Example 2: ParDo with timestamp and window information
In this example, we add new parameters to the process
method to bind parameter values at runtime.
beam.DoFn.TimestampParam
binds the timestamp information as anapache_beam.utils.timestamp.Timestamp
object.beam.DoFn.WindowParam
binds the window information as the appropriateapache_beam.transforms.window.*Window
object.
import apache_beam as beam
class AnalyzeElement(beam.DoFn):
def process(
self,
elem,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam):
yield '\n'.join([
'# timestamp',
'type(timestamp) -> ' + repr(type(timestamp)),
'timestamp.micros -> ' + repr(timestamp.micros),
'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
'',
'# window',
'type(window) -> ' + repr(type(window)),
'window.start -> {} ({})'.format(
window.start, window.start.to_utc_datetime()),
'window.end -> {} ({})'.format(
window.end, window.end.to_utc_datetime()),
'window.max_timestamp() -> {} ({})'.format(
window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
])
with beam.Pipeline() as pipeline:
dofn_params = (
pipeline
| 'Create a single test element' >> beam.Create([':)'])
| 'Add timestamp (Spring equinox 2020)' >>
beam.Map(lambda elem: beam.window.TimestampedValue(elem, 1584675660))
|
'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
| 'Analyze element' >> beam.ParDo(AnalyzeElement())
| beam.Map(print))
Output:
# timestamp
type(timestamp) -> <class 'apache_beam.utils.timestamp.Timestamp'>
timestamp.micros -> 1584675660000000
timestamp.to_rfc3339() -> '2020-03-20T03:41:00Z'
timestamp.to_utc_datetime() -> datetime.datetime(2020, 3, 20, 3, 41)
# window
type(window) -> <class 'apache_beam.transforms.window.IntervalWindow'>
window.start -> Timestamp(1584675660) (2020-03-20 03:41:00)
window.end -> Timestamp(1584675690) (2020-03-20 03:41:30)
window.max_timestamp() -> Timestamp(1584675689.999999) (2020-03-20 03:41:29.999999)
Example 3: ParDo with DoFn methods
A DoFn
can be customized with a number of methods that can help create more complex behaviors.
You can customize what a worker does when it starts and shuts down with setup
and teardown
.
You can also customize what to do when a
bundle of elements
starts and finishes with start_bundle
and finish_bundle
.
DoFn.setup()
: Called whenever theDoFn
instance is deserialized on the worker. This means it can be called more than once per worker because multiple instances of a givenDoFn
subclass may be created (e.g., due to parallelization, or due to garbage collection after a period of disuse). This is a good place to connect to database instances, open network connections or other resources.DoFn.start_bundle()
: Called once per bundle of elements before callingprocess
on the first element of the bundle. This is a good place to start keeping track of the bundle elements.DoFn.process(element, *args, **kwargs)
: Called once per element, can yield zero or more elements. Additional*args
or**kwargs
can be passed throughbeam.ParDo()
. [required]DoFn.finish_bundle()
: Called once per bundle of elements after callingprocess
after the last element of the bundle, can yield zero or more elements. This is a good place to do batch calls on a bundle of elements, such as running a database query.For example, you can initialize a batch in
start_bundle
, add elements to the batch inprocess
instead of yielding them, then running a batch query on those elements onfinish_bundle
, and yielding all the results.Note that yielded elements from
finish_bundle
must be of the typeapache_beam.utils.windowed_value.WindowedValue
. You need to provide a timestamp as a unix timestamp, which you can get from the last processed element. You also need to provide a window, which you can get from the last processed element like in the example below.DoFn.teardown()
: Called once (as a best effort) perDoFn
instance when theDoFn
instance is shutting down. This is a good place to close database instances, close network connections or other resources.Note that
teardown
is called as a best effort and is not guaranteed. For example, if the worker crashes,teardown
might not be called.
import apache_beam as beam
class DoFnMethods(beam.DoFn):
def __init__(self):
print('__init__')
self.window = beam.transforms.window.GlobalWindow()
def setup(self):
print('setup')
def start_bundle(self):
print('start_bundle')
def process(self, element, window=beam.DoFn.WindowParam):
self.window = window
yield '* process: ' + element
def finish_bundle(self):
yield beam.utils.windowed_value.WindowedValue(
value='* finish_bundle: 🌱🌳🌍',
timestamp=0,
windows=[self.window],
)
def teardown(self):
print('teardown')
with beam.Pipeline() as pipeline:
results = (
pipeline
| 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔'])
| 'DoFn methods' >> beam.ParDo(DoFnMethods())
| beam.Map(print))
Output:
Known issues:
- [Issue 19394]
DoFn.teardown()
metrics are lost.
Related transforms
- Map behaves the same, but produces exactly one output for each input.
- FlatMap behaves the same as
Map
, but for each input it may produce zero or more outputs. - Filter is useful if the function is just deciding whether to output an element or not.
![]() |
Last updated on 2023/05/31
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!