# CombineValues Pydoc

Combines an iterable of values in a keyed collection of elements.

## Examples

In the following examples, we create a pipeline with a `PCollection` of produce. Then, we apply `CombineValues` in multiple ways to combine the keyed values in the `PCollection`.

`CombineValues` accepts a function that takes an `iterable` of elements as an input, and combines them to return a single element. `CombineValues` expects a keyed `PCollection` of elements, where the value is an iterable of elements to be combined.

### Example 1: Combining with a predefined function

We use the function `sum` which takes an `iterable` of numbers and adds them together.

``````import apache_beam as beam

with beam.Pipeline() as pipeline:
total = (
pipeline
| 'Create produce counts' >> beam.Create([
('🥕', [3, 2]),
('🍆', ),
('🍅', [4, 5, 3]),
])
| 'Sum' >> beam.CombineValues(sum)
| beam.Map(print))``````

Output:

``````('🥕', 5)
('🍆', 1)
('🍅', 12)`````` View source code

### Example 2: Combining with a function

We want the sum to be bounded up to a maximum value, so we use saturated arithmetic.

We define a function `saturated_sum` which takes an `iterable` of numbers and adds them together, up to a predefined maximum number.

``````import apache_beam as beam

def saturated_sum(values):
max_value = 8
return min(sum(values), max_value)

with beam.Pipeline() as pipeline:
saturated_total = (
pipeline
| 'Create plant counts' >> beam.Create([
('🥕', [3, 2]),
('🍆', ),
('🍅', [4, 5, 3]),
])
| 'Saturated sum' >> beam.CombineValues(saturated_sum)
| beam.Map(print))``````

Output:

``````('🥕', 5)
('🍆', 1)
('🍅', 8)`````` View source code

### Example 3: Combining with a lambda function

We can also use lambda functions to simplify Example 2.

``````import apache_beam as beam

with beam.Pipeline() as pipeline:
saturated_total = (
pipeline
| 'Create plant counts' >> beam.Create([
('🥕', [3, 2]),
('🍆', ),
('🍅', [4, 5, 3]),
])
| 'Saturated sum' >>
beam.CombineValues(lambda values: min(sum(values), 8))
| beam.Map(print))``````

Output:

``````('🥕', 5)
('🍆', 1)
('🍅', 8)`````` View source code

### Example 4: Combining with multiple arguments

You can pass functions with multiple arguments to `CombineValues`. They are passed as additional positional arguments or keyword arguments to the function.

In this example, the lambda function takes `values` and `max_value` as arguments.

``````import apache_beam as beam

with beam.Pipeline() as pipeline:
saturated_total = (
pipeline
| 'Create plant counts' >> beam.Create([
('🥕', [3, 2]),
('🍆', ),
('🍅', [4, 5, 3]),
])
| 'Saturated sum' >> beam.CombineValues(
lambda values, max_value: min(sum(values), max_value), max_value=8)
| beam.Map(print))``````

Output:

``````('🥕', 5)
('🍆', 1)
('🍅', 8)`````` View source code

### Example 5: Combining with side inputs as singletons

If the `PCollection` has a single value, such as the average from another computation, passing the `PCollection` as a singleton accesses that value.

In this example, we pass a `PCollection` the value `8` as a singleton. We then use that value as the `max_value` for our saturated sum.

``````import apache_beam as beam

with beam.Pipeline() as pipeline:
max_value = pipeline | 'Create max_value' >> beam.Create()

saturated_total = (
pipeline
| 'Create plant counts' >> beam.Create([
('🥕', [3, 2]),
('🍆', ),
('🍅', [4, 5, 3]),
])
| 'Saturated sum' >> beam.CombineValues(
lambda values,
max_value: min(sum(values), max_value),
max_value=beam.pvalue.AsSingleton(max_value))
| beam.Map(print))``````

Output:

``````('🥕', 5)
('🍆', 1)
('🍅', 8)`````` View source code

### Example 6: Combining with side inputs as iterators

If the `PCollection` has multiple values, pass the `PCollection` as an iterator. This accesses elements lazily as they are needed, so it is possible to iterate over large `PCollection`s that won’t fit into memory.

``````import apache_beam as beam

def bounded_sum(values, data_range):
min_value = min(data_range)
result = sum(values)
if result < min_value:
return min_value
max_value = max(data_range)
if result > max_value:
return max_value
return result

with beam.Pipeline() as pipeline:
data_range = pipeline | 'Create data_range' >> beam.Create([2, 4, 8])

bounded_total = (
pipeline
| 'Create plant counts' >> beam.Create([
('🥕', [3, 2]),
('🍆', ),
('🍅', [4, 5, 3]),
])
| 'Bounded sum' >> beam.CombineValues(
bounded_sum, data_range=beam.pvalue.AsIter(data_range))
| beam.Map(print))``````

Output:

``````('🥕', 5)
('🍆', 2)
('🍅', 8)`````` View source code

Note: You can pass the `PCollection` as a list with `beam.pvalue.AsList(pcollection)`, but this requires that all the elements fit into memory.

### Example 7: Combining with side inputs as dictionaries

If a `PCollection` is small enough to fit into memory, then that `PCollection` can be passed as a dictionary. Each element must be a `(key, value)` pair. Note that all the elements of the `PCollection` must fit into memory for this. If the `PCollection` won’t fit into memory, use `beam.pvalue.AsIter(pcollection)` instead.

``````import apache_beam as beam

def bounded_sum(values, data_range):
min_value = data_range['min']
result = sum(values)
if result < min_value:
return min_value
max_value = data_range['max']
if result > max_value:
return max_value
return result

with beam.Pipeline() as pipeline:
data_range = pipeline | 'Create data_range' >> beam.Create([
('min', 2),
('max', 8),
])

bounded_total = (
pipeline
| 'Create plant counts' >> beam.Create([
('🥕', [3, 2]),
('🍆', ),
('🍅', [4, 5, 3]),
])
| 'Bounded sum' >> beam.CombineValues(
bounded_sum, data_range=beam.pvalue.AsDict(data_range))
| beam.Map(print))``````

Output:

``````('🥕', 5)
('🍆', 2)
('🍅', 8)`````` View source code

### Example 8: Combining with a `CombineFn`

The more general way to combine elements, and the most flexible, is with a class that inherits from `CombineFn`.

``````import apache_beam as beam

class AverageFn(beam.CombineFn):
def create_accumulator(self):
return {}

# accumulator == {}
# input == '🥕'
if input not in accumulator:
accumulator[input] = 0  # {'🥕': 0}
accumulator[input] += 1  # {'🥕': 1}
return accumulator

def merge_accumulators(self, accumulators):
# accumulators == [
#     {'🥕': 1, '🍅': 1},
#     {'🥕': 1, '🍅': 1, '🍆': 1},
# ]
merged = {}
for accum in accumulators:
for item, count in accum.items():
if item not in merged:
merged[item] = 0
merged[item] += count
# merged == {'🥕': 2, '🍅': 2, '🍆': 1}
return merged

def extract_output(self, accumulator):
# accumulator == {'🥕': 2, '🍅': 2, '🍆': 1}
total = sum(accumulator.values())  # 5
percentages = {item: count / total for item, count in accumulator.items()}
# percentages == {'🥕': 0.4, '🍅': 0.4, '🍆': 0.2}
return percentages

with beam.Pipeline() as pipeline:
percentages_per_season = (
pipeline
| 'Create produce' >> beam.Create([
('spring', ['🥕', '🍅', '🥕', '🍅', '🍆']),
('summer', ['🥕', '🍅', '🌽', '🍅', '🍅']),
('fall', ['🥕', '🥕', '🍅', '🍅']),
('winter', ['🍆', '🍆']),
])
| 'Average' >> beam.CombineValues(AverageFn())
| beam.Map(print))``````

Output:

``````('spring', {'🥕': 0.4, '🍅': 0.4, '🍆': 0.2})
('summer', {'🥕': 0.2, '🍅': 0.6, '🌽': 0.2})
('fall', {'🥕': 0.5, '🍅': 0.5})
('winter', {'🍆': 1.0})`````` View source code

You can use the following combiner transforms: Pydoc