# CombinePerKey

Combines all elements for each key in a collection.

See more information in the Beam Programming Guide.

## Examples

In the following examples, we create a pipeline with a `PCollection` of produce. Then, we apply `CombinePerKey` in multiple ways to combine all the elements in the `PCollection`.

`CombinePerKey` accepts a function that takes a list of values as an input, and combines them for each key.

### Example 1: Combining with a predefined function

We use the function `sum` which takes an `iterable` of numbers and adds them together.

``````import apache_beam as beam

with beam.Pipeline() as pipeline:
total = (
pipeline
| 'Create plant counts' >> beam.Create([
('π₯', 3),
('π₯', 2),
('π', 1),
('π', 4),
('π', 5),
('π', 3),
])
| 'Sum' >> beam.CombinePerKey(sum)
| beam.Map(print))``````

Output:

``````('π₯', 5)
('π', 1)
('π', 12)``````

### Example 2: Combining with a function

We define a function `saturated_sum` which takes an `iterable` of numbers and adds them together, up to a predefined maximum number.

``````import apache_beam as beam

def saturated_sum(values):
max_value = 8
return min(sum(values), max_value)

with beam.Pipeline() as pipeline:
saturated_total = (
pipeline
| 'Create plant counts' >> beam.Create([
('π₯', 3),
('π₯', 2),
('π', 1),
('π', 4),
('π', 5),
('π', 3),
])
| 'Saturated sum' >> beam.CombinePerKey(saturated_sum)
| beam.Map(print))``````

Output:

``````('π₯', 5)
('π', 1)
('π', 8)``````

### Example 3: Combining with a lambda function

We can also use lambda functions to simplify Example 2.

``````import apache_beam as beam

with beam.Pipeline() as pipeline:
saturated_total = (
pipeline
| 'Create plant counts' >> beam.Create([
('π₯', 3),
('π₯', 2),
('π', 1),
('π', 4),
('π', 5),
('π', 3),
])
| 'Saturated sum' >>
beam.CombinePerKey(lambda values: min(sum(values), 8))
| beam.Map(print))``````

Output:

``````('π₯', 5)
('π', 1)
('π', 8)``````

### Example 4: Combining with multiple arguments

You can pass functions with multiple arguments to `CombinePerKey`. They are passed as additional positional arguments or keyword arguments to the function.

In this example, the lambda function takes `values` and `max_value` as arguments.

``````import apache_beam as beam

with beam.Pipeline() as pipeline:
saturated_total = (
pipeline
| 'Create plant counts' >> beam.Create([
('π₯', 3),
('π₯', 2),
('π', 1),
('π', 4),
('π', 5),
('π', 3),
])
| 'Saturated sum' >> beam.CombinePerKey(
lambda values, max_value: min(sum(values), max_value), max_value=8)
| beam.Map(print))``````

Output:

``````('π₯', 5)
('π', 1)
('π', 8)``````

### Example 5: Combining with a `CombineFn`

The more general way to combine elements, and the most flexible, is with a class that inherits from `CombineFn`.

``````import apache_beam as beam

class AverageFn(beam.CombineFn):
def create_accumulator(self):
sum = 0.0
count = 0
accumulator = sum, count
return accumulator

def add_input(self, accumulator, input):
sum, count = accumulator
return sum + input, count + 1

def merge_accumulators(self, accumulators):
# accumulators = [(sum1, count1), (sum2, count2), (sum3, count3), ...]
sums, counts = zip(*accumulators)
# sums = [sum1, sum2, sum3, ...]
# counts = [count1, count2, count3, ...]
return sum(sums), sum(counts)

def extract_output(self, accumulator):
sum, count = accumulator
if count == 0:
return float('NaN')
return sum / count

with beam.Pipeline() as pipeline:
average = (
pipeline
| 'Create plant counts' >> beam.Create([
('π₯', 3),
('π₯', 2),
('π', 1),
('π', 4),
('π', 5),
('π', 3),
])
| 'Average' >> beam.CombinePerKey(AverageFn())
| beam.Map(print))``````

Output:

``````('π₯', 2.5)
('π', 1.0)
('π', 4.0)``````

You can use the following combiner transforms:

See also GroupBy which allows you to combine more than one field at once.

