Combines all elements for each key in a collection.
See more information in the Beam Programming Guide.
In the following examples, we create a pipeline with a
PCollection of produce.
Then, we apply
CombinePerKey in multiple ways to combine all the elements in the
CombinePerKey accepts a function that takes a list of values as an input, and combines them for each key.
Example 1: Combining with a predefined function
We use the function
which takes an
iterable of numbers and adds them together.
Example 2: Combining with a function
We define a function
saturated_sum which takes an
iterable of numbers and adds them together, up to a predefined maximum number.
import apache_beam as beam def saturated_sum(values): max_value = 8 return min(sum(values), max_value) with beam.Pipeline() as pipeline: saturated_total = ( pipeline | 'Create plant counts' >> beam.Create([ ('🥕', 3), ('🥕', 2), ('🍆', 1), ('🍅', 4), ('🍅', 5), ('🍅', 3), ]) | 'Saturated sum' >> beam.CombinePerKey(saturated_sum) | beam.Map(print))
Example 3: Combining with a lambda function
We can also use lambda functions to simplify Example 2.
Example 4: Combining with multiple arguments
You can pass functions with multiple arguments to
They are passed as additional positional arguments or keyword arguments to the function.
In this example, the lambda function takes
max_value as arguments.
import apache_beam as beam with beam.Pipeline() as pipeline: saturated_total = ( pipeline | 'Create plant counts' >> beam.Create([ ('🥕', 3), ('🥕', 2), ('🍆', 1), ('🍅', 4), ('🍅', 5), ('🍅', 3), ]) | 'Saturated sum' >> beam.CombinePerKey( lambda values, max_value: min(sum(values), max_value), max_value=8) | beam.Map(print))
Example 5: Combining with a
The more general way to combine elements, and the most flexible, is with a class that inherits from
CombineFn.create_accumulator(): This creates an empty accumulator. For example, an empty accumulator for a sum would be
0, while an empty accumulator for a product (multiplication) would be
CombineFn.add_input(): Called once per element. Takes an accumulator and an input element, combines them and returns the updated accumulator.
CombineFn.merge_accumulators(): Multiple accumulators could be processed in parallel, so this function helps merging them into a single accumulator.
CombineFn.extract_output(): It allows to do additional calculations before extracting a result.
import apache_beam as beam class AverageFn(beam.CombineFn): def create_accumulator(self): sum = 0.0 count = 0 accumulator = sum, count return accumulator def add_input(self, accumulator, input): sum, count = accumulator return sum + input, count + 1 def merge_accumulators(self, accumulators): # accumulators = [(sum1, count1), (sum2, count2), (sum3, count3), ...] sums, counts = zip(*accumulators) # sums = [sum1, sum2, sum3, ...] # counts = [count1, count2, count3, ...] return sum(sums), sum(counts) def extract_output(self, accumulator): sum, count = accumulator if count == 0: return float('NaN') return sum / count with beam.Pipeline() as pipeline: average = ( pipeline | 'Create plant counts' >> beam.Create([ ('🥕', 3), ('🥕', 2), ('🍆', 1), ('🍅', 4), ('🍅', 5), ('🍅', 3), ]) | 'Average' >> beam.CombinePerKey(AverageFn()) | beam.Map(print))
You can use the following combiner transforms:
See also GroupBy which allows you to combine more than one field at once.
Last updated on 2023/06/05
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!