Combine

Javadoc Javadoc


A user-defined CombineFn may be applied to combine all elements in a PCollection (global combine) or to combine all elements associated with each key.

While the result is similar to applying a GroupByKey followed by aggregating values in each Iterable, there is an impact on the code you must write as well as the performance of the pipeline. Writing a ParDo that counts the number of elements in each value would be very straightforward. However, as described in the execution model, it would also require all values associated with each key to be processed by a single worker. This introduces a lot of communication overhead. Using a CombineFn requires the code be structured as an associative and commumative operation. But, it allows the use of partial sums to be precomputed.

See more information in the Beam Programming Guide.

Examples

Example 1: Global combine Use the global combine to combine all of the elements in a given PCollection into a single value, represented in your pipeline as a new PCollection containing one element. The following example code shows how to apply the Beam-provided sum combine function to produce a single sum value for a PCollection of integers.

// Sum.SumIntegerFn() combines the elements in the input PCollection. The resulting PCollection, called sum,
// contains one value: the sum of all the elements in the input PCollection.
PCollection<Integer> pc = ...;
PCollection<Integer> sum = pc.apply(
   Combine.globally(new Sum.SumIntegerFn()));

Example 2: Keyed combine Use a keyed combine to to combine all of the values associated with each key into a single output value for each key. As with the global combine, the function passed to a keyed combine must be associative and commutative.

// PCollection is grouped by key and the Double values associated with each key are combined into a Double.
PCollection<KV<String, Double>> salesRecords = ...;
PCollection<KV<String, Double>> totalSalesPerPerson =
  salesRecords.apply(Combine.<String, Double, Double>perKey(
    new Sum.SumDoubleFn()));
// The combined value is of a different type than the original collection of values per key. PCollection has
// keys of type String and values of type Integer, and the combined value is a Double.
PCollection<KV<String, Integer>> playerAccuracy = ...;
PCollection<KV<String, Double>> avgAccuracyPerPlayer =
  playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
    new MeanInts())));