ParDo
A transform for generic parallel processing. A ParDo
transform considers each
element in the input PCollection
, performs some processing function
(your user code) on that element, and emits zero or more elements to
an output PCollection.
See more information in the Beam Programming Guide.
Examples
Example 1: Passing side inputs
// Pass side inputs to your ParDo transform by invoking .withSideInputs.
// Inside your DoFn, access the side input by using the method DoFn.ProcessContext.sideInput.
// The input PCollection to ParDo.
PCollection<String> words = ...;
// A PCollection of word lengths that we'll combine into a single value.
PCollection<Integer> wordLengths = ...; // Singleton PCollection
// Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
final PCollectionView<Integer> maxWordLengthCutOffView =
wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());
// Apply a ParDo that takes maxWordLengthCutOffView as a side input.
PCollection<String> wordsBelowCutOff =
words.apply(ParDo
.of(new DoFn<String, String>() {
public void processElement(ProcessContext c) {
String word = c.element();
// In our DoFn, access the side input.
int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
if (word.length() <= lengthCutOff) {
c.output(word);
}
}
}).withSideInputs(maxWordLengthCutOffView)
);
Example 2: Emitting to multiple outputs in your DoFn
// To emit elements to multiple output PCollections, create a TupleTag object to identify each collection
// that your ParDo produces. For example, if your ParDo produces three output PCollections (the main output
// and two additional outputs), you must create three TupleTags. The following example code shows how to
// create TupleTags for a ParDo with three output PCollections.
// Input PCollection to our ParDo.
PCollection<String> words = ...;
// The ParDo will filter words whose length is below a cutoff and add them to
// the main output PCollection<String>.
// If a word is above the cutoff, the ParDo will add the word length to an
// output PCollection<Integer>.
// If a word starts with the string "MARKER", the ParDo will add that word to an
// output PCollection<String>.
final int wordLengthCutOff = 10;
// Create three TupleTags, one for each output PCollection.
// Output that contains words below the length cutoff.
final TupleTag<String> wordsBelowCutOffTag =
new TupleTag<String>(){};
// Output that contains word lengths.
final TupleTag<Integer> wordLengthsAboveCutOffTag =
new TupleTag<Integer>(){};
// Output that contains "MARKER" words.
final TupleTag<String> markedWordsTag =
new TupleTag<String>(){};
// Passing Output Tags to ParDo:
// After you specify the TupleTags for each of your ParDo outputs, pass the tags to your ParDo by invoking
// .withOutputTags. You pass the tag for the main output first, and then the tags for any additional outputs
// in a TupleTagList. Building on our previous example, we pass the three TupleTags for our three output
// PCollections to our ParDo. Note that all of the outputs (including the main output PCollection) are
// bundled into the returned PCollectionTuple.
PCollectionTuple results =
words.apply(ParDo
.of(new DoFn<String, String>() {
// DoFn continues here.
...
})
// Specify the tag for the main output.
.withOutputTags(wordsBelowCutOffTag,
// Specify the tags for the two additional outputs as a TupleTagList.
TupleTagList.of(wordLengthsAboveCutOffTag)
.and(markedWordsTag)));
Example 3: Tags for multiple outputs
// Inside your ParDo's DoFn, you can emit an element to a specific output PCollection by passing in the
// appropriate TupleTag when you call ProcessContext.output.
// After your ParDo, extract the resulting output PCollections from the returned PCollectionTuple.
// Based on the previous example, this shows the DoFn emitting to the main output and two additional outputs.
.of(new DoFn<String, String>() {
public void processElement(ProcessContext c) {
String word = c.element();
if (word.length() <= wordLengthCutOff) {
// Emit short word to the main output.
// In this example, it is the output with tag wordsBelowCutOffTag.
c.output(word);
} else {
// Emit long word length to the output with tag wordLengthsAboveCutOffTag.
c.output(wordLengthsAboveCutOffTag, word.length());
}
if (word.startsWith("MARKER")) {
// Emit word to the output with tag markedWordsTag.
c.output(markedWordsTag, word);
}
}}));
Example 4: Apply a simple custom function
Related transforms
- MapElements applies a simple 1-to-1 mapping function over each element in the collection.
- Filter is useful if the function is just deciding whether to output an element or not.
Last updated on 2025/01/19
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!