public class ParDo
extends java.lang.Object
ParDo
is the core element-wise transform in Apache Beam, invoking a user-specified
function on each of the elements of the input PCollection
to produce zero or more output
elements, all of which are collected into the output PCollection
.
Elements are processed independently, and possibly in parallel across distributed cloud resources.
The ParDo
processing style is similar to what happens inside
the "Mapper" or "Reducer" class of a MapReduce-style algorithm.
DoFns
The function to use to process each element is specified by a
DoFn<InputT, OutputT>
, primarily via its
ProcessElement
method. The DoFn
may also
provide a StartBundle
and finishBundle
method.
Conceptually, when a ParDo
transform is executed, the
elements of the input PCollection
are first divided up
into some number of "bundles". These are farmed off to distributed
worker machines (or run locally, if using the DirectRunner
).
For each bundle of input elements processing proceeds as follows:
DoFn
is created
on a worker, and the DoFn.Setup
method is called on this instance. This may be
through deserialization or other means. A PipelineRunner
may reuse DoFn
instances for multiple bundles. A DoFn
that has terminated abnormally (by throwing an
Exception
) will never be reused.DoFn's
DoFn.StartBundle
method, if provided, is called to
initialize it.DoFn's
DoFn.ProcessElement
method
is called on each of the input elements in the bundle.DoFn's
DoFn.FinishBundle
method, if provided, is called
to complete its work. After DoFn.FinishBundle
is called, the
framework will not again invoke DoFn.ProcessElement
or
DoFn.FinishBundle
until a new call to DoFn.StartBundle
has occurred.DoFn.Setup
, DoFn.StartBundle
, DoFn.ProcessElement
or
DoFn.FinishBundle
methods throw an exception, the DoFn.Teardown
method, if
provided, will be called on the DoFn
instance.DoFn
, the DoFn.Teardown
method, if
provided, will be called on the discarded instance.Note also that calls to DoFn.Teardown
are best effort, and may not be called before a
DoFn
is discarded in the general case. As a result, use of the DoFn.Teardown
method to perform side effects is not appropriate, because the elements that produced the side
effect will not be replayed in case of failure, and those side effects are permanently lost.
Each of the calls to any of the DoFn's
processing
methods can produce zero or more output elements. All of the
of output elements from all of the DoFn
instances
are included in an output PCollection
.
For example:
PCollection<String> lines = ...;
PCollection<String> words =
lines.apply(ParDo.of(new DoFn<String, String>() {
{@literal @}ProcessElement
public void processElement({@literal @}Element String line,
{@literal @}OutputReceiver<String> r) {
for (String word : line.split("[^a-zA-Z']+")) {
r.output(word);
}
}}));
PCollection<Integer> wordLengths =
words.apply(ParDo.of(new DoFn<String, Integer>() {
{@literal @}ProcessElement
public void processElement({@literal @}Element String word,
{@literal @}OutputReceiver<Integer> r) {
Integer length = word.length();
r.output(length);
}}));
Each output element has the same timestamp and is in the same windows
as its corresponding input element, and the output PCollection
has the same WindowFn
associated with it as the input.
ParDo
transformsThe name of a transform is used to provide a name for any node in the
Pipeline
graph resulting from application of the transform.
It is best practice to provide a name at the time of application,
via PCollection.apply(String, PTransform)
. Otherwise,
a unique name - which may not be stable across pipeline revision -
will be generated, based on the transform name.
For example:
PCollection<String> words =
lines.apply("ExtractWords", ParDo.of(new DoFn<String, String>() { ... }));
PCollection<Integer> wordLengths =
words.apply("ComputeWordLengths", ParDo.of(new DoFn<String, Integer>() { ... }));
While a ParDo
processes elements from a single "main input"
PCollection
, it can take additional "side input"
PCollectionViews
. These side input
PCollectionViews
express styles of accessing
PCollections
computed by earlier pipeline operations,
passed in to the ParDo
transform using
ParDo.SingleOutput.withSideInputs(org.apache.beam.sdk.values.PCollectionView<?>...)
, and their contents accessible to each of
the DoFn
operations via sideInput
.
For example:
PCollection<String> words = ...;
PCollection<Integer> maxWordLengthCutOff = ...; // Singleton PCollection
final PCollectionView<Integer> maxWordLengthCutOffView =
maxWordLengthCutOff.apply(View.<Integer>asSingleton());
PCollection<String> wordsBelowCutOff =
words.apply(ParDo.of(new DoFn<String, String>() {
{@literal @}ProcessElement
public void processElement(ProcessContext c) {
String word = c.element();
int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
if (word.length() <= lengthCutOff) {
c.output(word);
}
}}).withSideInputs(maxWordLengthCutOffView));
Optionally, a ParDo
transform can produce multiple
output PCollections
, both a "main output"
PCollection<OutputT>
plus any number of additional output
PCollections
, each keyed by a distinct TupleTag
,
and bundled in a PCollectionTuple
. The TupleTags
to be used for the output PCollectionTuple
are specified by
invoking ParDo.SingleOutput.withOutputTags(org.apache.beam.sdk.values.TupleTag<OutputT>, org.apache.beam.sdk.values.TupleTagList)
. Unconsumed outputs do not
necessarily need to be explicitly specified, even if the DoFn
generates them. Within the DoFn
, an element is added to the
main output PCollection
as normal, using
DoFn.WindowedContext.output(Object)
, while an element is added to any additional output
PCollection
using DoFn.WindowedContext.output(TupleTag, Object)
. For example:
PCollection<String> words = ...;
// Select words whose length is below a cut off,
// plus the lengths of words that are above the cut off.
// Also select words starting with "MARKER".
final int wordLengthCutOff = 10;
// Create tags to use for the main and additional outputs.
final TupleTag<String> wordsBelowCutOffTag =
new TupleTag<String>(){};
final TupleTag<Integer> wordLengthsAboveCutOffTag =
new TupleTag<Integer>(){};
final TupleTag<String> markedWordsTag =
new TupleTag<String>(){};
PCollectionTuple results =
words.apply(
ParDo
.of(new DoFn<String, String>() {
// Create a tag for the unconsumed output.
final TupleTag<String> specialWordsTag =
new TupleTag<String>(){};
{@literal @}ProcessElement
public void processElement(@Element String word, MultiOutputReceiver r) {
if (word.length() <= wordLengthCutOff) {
// Emit this short word to the main output.
r.output(wordsBelowCutOffTag, word);
} else {
// Emit this long word's length to a specified output.
r.output(wordLengthsAboveCutOffTag, word.length());
}
if (word.startsWith("MARKER")) {
// Emit this word to a different specified output.
r.output(markedWordsTag, word);
}
if (word.startsWith("SPECIAL")) {
// Emit this word to the unconsumed output.
r.output(specialWordsTag, word);
}
}})
// Specify the main and consumed output tags of the
// PCollectionTuple result:
.withOutputTags(wordsBelowCutOffTag,
TupleTagList.of(wordLengthsAboveCutOffTag)
.and(markedWordsTag)));
// Extract the PCollection results, by tag.
PCollection<String> wordsBelowCutOff =
results.get(wordsBelowCutOffTag);
PCollection<Integer> wordLengthsAboveCutOff =
results.get(wordLengthsAboveCutOffTag);
PCollection<String> markedWords =
results.get(markedWordsTag);
By default, the Coder<OutputT>
for the
elements of the main output PCollection<OutputT>
is
inferred from the concrete type of the DoFn<InputT, OutputT>
.
By default, the Coder<AdditionalOutputT>
for the elements of
an output PCollection<AdditionalOutputT>
is inferred
from the concrete type of the corresponding TupleTag<AdditionalOutputT>
.
To be successful, the TupleTag
should be created as an instance
of a trivial anonymous subclass, with {}
suffixed to the
constructor call. Such uses block Java's generic type parameter
inference, so the <X>
argument must be provided explicitly.
For example:
// A TupleTag to use for a side input can be written concisely:
final TupleTag<Integer> sideInputag = new TupleTag<>();
// A TupleTag to use for an output should be written with "{}",
// and explicit generic parameter type:
final TupleTag<String> additionalOutputTag = new TupleTag<String>(){};
This style of TupleTag
instantiation is used in the example of
ParDos
that produce multiple outputs, above.
DoFns
A DoFn
passed to a ParDo
transform must be
Serializable
. This allows the DoFn
instance
created in this "main program" to be sent (in serialized form) to
remote worker machines and reconstituted for bundles of elements
of the input PCollection
being processed. A DoFn
can have instance variable state, and non-transient instance
variable state will be serialized in the main program and then
deserialized on remote worker machines for some number of bundles
of elements to process.
DoFns
expressed as anonymous inner classes can be
convenient, but due to a quirk in Java's rules for serializability,
non-static inner or nested classes (including anonymous inner
classes) automatically capture their enclosing class's instance in
their serialized state. This can lead to including much more than
intended in the serialized state of a DoFn
, or even things
that aren't Serializable
.
There are two ways to avoid unintended serialized state in a
DoFn
:
DoFn
as a named, static class.
DoFn
as an anonymous inner class inside of
a static method.
Both of these approaches ensure that there is no implicit enclosing
instance serialized along with the DoFn
instance.
Prior to Java 8, any local variables of the enclosing
method referenced from within an anonymous inner class need to be
marked as final
. If defining the DoFn
as a named
static class, such variables would be passed as explicit
constructor arguments and stored in explicit instance variables.
There are three main ways to initialize the state of a
DoFn
instance processing a bundle:
DoFn
's constructor (which is
implicit for an anonymous inner class). This state will be
automatically serialized and then deserialized in the DoFn
instances created for bundles. This method is good for state
known when the original DoFn
is created in the main
program, if it's not overly large. This is not suitable for any
state which must only be used for a single bundle, as DoFn's
may be used to process multiple bundles.
PCollection
and pass it
in as a side input to the DoFn
. This is good if the state
needs to be computed by the pipeline, or if the state is very large
and so is best read from file(s) rather than sent as part of the
DoFn's
serialized state.
DoFn
instance, in a
DoFn.StartBundle
method. This is good if the initialization
doesn't depend on any information known only by the main program or
computed by earlier pipeline operations, but is the same for all
instances of this DoFn
for all program executions, say
setting up empty caches or initializing constant data.
ParDo
operations are intended to be able to run in
parallel across multiple worker machines. This precludes easy
sharing and updating mutable state across those machines. There is
no support in the Beam model for communicating
and synchronizing updates to shared state across worker machines,
so programs should not access any mutable static variable state in
their DoFn
, without understanding that the Java processes
for the main program and workers will each have its own independent
copy of such state, and there won't be any automatic copying of
that state across Java processes. All information should be
communicated to DoFn
instances via main and side inputs and
serialized state, and all output should be communicated from a
DoFn
instance via output PCollections
, in the absence of
external communication mechanisms written by user code.
In a distributed system, things can fail: machines can crash,
machines can be unable to communicate across the network, etc.
While individual failures are rare, the larger the job, the greater
the chance that something, somewhere, will fail. Beam runners may strive
to mask such failures by retrying failed DoFn
bundle. This means
that a DoFn
instance might process a bundle partially, then
crash for some reason, then be rerun (often in a new JVM) on that
same bundle and on the same elements as before.
Sometimes two or more DoFn
instances will be running on the
same bundle simultaneously, with the system taking the results of
the first instance to complete successfully. Consequently, the
code in a DoFn
needs to be written such that these
duplicate (sequential or concurrent) executions do not cause
problems. If the outputs of a DoFn
are a pure function of
its inputs, then this requirement is satisfied. However, if a
DoFn's
execution has external side-effects, such as performing
updates to external HTTP services, then the DoFn's
code
needs to take care to ensure that those updates are idempotent and
that concurrent updates are acceptable. This property can be
difficult to achieve, so it is advisable to strive to keep
DoFns
as pure functions as much as possible.
Beam runners may choose to apply optimizations to a
pipeline before it is executed. A key optimization, fusion,
relates to ParDo
operations. If one ParDo
operation produces a
PCollection
that is then consumed as the main input of another
ParDo
operation, the two ParDo
operations will be fused
together into a single ParDo operation and run in a single pass;
this is "producer-consumer fusion". Similarly, if
two or more ParDo operations have the same PCollection
main input,
they will be fused into a single ParDo
that makes just one pass
over the input PCollection
; this is "sibling fusion".
If after fusion there are no more unfused references to a
PCollection
(e.g., one between a producer ParDo and a consumer
ParDo
), the PCollection
itself is "fused away" and won't ever be
written to disk, saving all the I/O and space expense of
constructing it.
When Beam runners apply fusion optimization, it is essentially "free"
to write ParDo
operations in a
very modular, composable style, each ParDo
operation doing one
clear task, and stringing together sequences of ParDo
operations to
get the desired overall effect. Such programs can be easier to
understand, easier to unit-test, easier to extend and evolve, and
easier to reuse in new programs. The predefined library of
PTransforms that come with Beam makes heavy use of
this modular, composable style, trusting to the runner to
"flatten out" all the compositions into highly optimized stages.
Modifier and Type | Class and Description |
---|---|
static class |
ParDo.MultiOutput<InputT,OutputT>
A
PTransform that, when applied to a PCollection<InputT> , invokes a
user-specified DoFn<InputT, OutputT> on all its elements, which can emit elements to
any of the PTransform 's output PCollection s, which are bundled into a result
PCollectionTuple . |
static class |
ParDo.SingleOutput<InputT,OutputT>
A
PTransform that, when applied to a PCollection<InputT> ,
invokes a user-specified DoFn<InputT, OutputT> on all its elements,
with all its outputs collected into an output
PCollection<OutputT> . |
Constructor and Description |
---|
ParDo() |
Modifier and Type | Method and Description |
---|---|
static <InputT,OutputT> |
of(DoFn<InputT,OutputT> fn)
|
public static <InputT,OutputT> ParDo.SingleOutput<InputT,OutputT> of(DoFn<InputT,OutputT> fn)
ParDo
PTransform
that will invoke the
given DoFn
function.
The resulting PTransform
is ready to be applied, or further
properties can be set on it first.