CombineFn may be applied to combine all elements in a
PCollection (global combine) or to combine all elements associated
with each key.
While the result is similar to applying a
GroupByKey followed by
aggregating values in each
Iterable, there is an impact
on the code you must write as well as the performance of the pipeline.
ParDo that counts the number of elements in each value
would be very straightforward. However, as described in the execution
model, it would also require all values associated with each key to be
processed by a single worker. This introduces a lot of communication overhead.
CombineFn requires the code be structured as an associative and
commumative operation. But, it allows the use of partial sums to be precomputed.
See more information in the Beam Programming Guide.
Example 1: Global combine
Use the global combine to combine all of the elements in a given
into a single value, represented in your pipeline as a new
one element. The following example code shows how to apply the Beam-provided
sum combine function to produce a single sum value for a
PCollection of integers.
// Sum.SumIntegerFn() combines the elements in the input PCollection. The resulting PCollection, called sum, // contains one value: the sum of all the elements in the input PCollection. PCollection<Integer> pc = ...; PCollection<Integer> sum = pc.apply( Combine.globally(new Sum.SumIntegerFn()));
Example 2: Keyed combine Use a keyed combine to combine all of the values associated with each key into a single output value for each key. As with the global combine, the function passed to a keyed combine must be associative and commutative.
// PCollection is grouped by key and the Double values associated with each key are combined into a Double. PCollection<KV<String, Double>> salesRecords = ...; PCollection<KV<String, Double>> totalSalesPerPerson = salesRecords.apply(Combine.<String, Double, Double>perKey( new Sum.SumDoubleFn())); // The combined value is of a different type than the original collection of values per key. PCollection has // keys of type String and values of type Integer, and the combined value is a Double. PCollection<KV<String, Integer>> playerAccuracy = ...; PCollection<KV<String, Double>> avgAccuracyPerPlayer = playerAccuracy.apply(Combine.<String, Integer, Double>perKey( new MeanInts())));