CombineValues

Pydoc Pydoc




Combines an iterable of values in a keyed collection of elements.

See more information in the Beam Programming Guide.

Examples

In the following examples, we create a pipeline with a PCollection of produce. Then, we apply CombineValues in multiple ways to combine the keyed values in the PCollection.

CombineValues accepts a function that takes an iterable of elements as an input, and combines them to return a single element. CombineValues expects a keyed PCollection of elements, where the value is an iterable of elements to be combined.

Example 1: Combining with a predefined function

We use the function sum which takes an iterable of numbers and adds them together.

import apache_beam as beam

with beam.Pipeline() as pipeline:
  total = (
      pipeline
      | 'Create produce counts' >> beam.Create([
          ('πŸ₯•', [3, 2]),
          ('πŸ†', [1]),
          ('πŸ…', [4, 5, 3]),
      ])
      | 'Sum' >> beam.CombineValues(sum)
      | beam.Map(print))

Output:

('πŸ₯•', 5)
('πŸ†', 1)
('πŸ…', 12)

Example 2: Combining with a function

We want the sum to be bounded up to a maximum value, so we use saturated arithmetic.

We define a function saturated_sum which takes an iterable of numbers and adds them together, up to a predefined maximum number.

import apache_beam as beam

def saturated_sum(values):
  max_value = 8
  return min(sum(values), max_value)

with beam.Pipeline() as pipeline:
  saturated_total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('πŸ₯•', [3, 2]),
          ('πŸ†', [1]),
          ('πŸ…', [4, 5, 3]),
      ])
      | 'Saturated sum' >> beam.CombineValues(saturated_sum)
      | beam.Map(print))

Output:

('πŸ₯•', 5)
('πŸ†', 1)
('πŸ…', 8)

Example 3: Combining with a lambda function

We can also use lambda functions to simplify Example 2.

import apache_beam as beam

with beam.Pipeline() as pipeline:
  saturated_total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('πŸ₯•', [3, 2]),
          ('πŸ†', [1]),
          ('πŸ…', [4, 5, 3]),
      ])
      | 'Saturated sum' >>
      beam.CombineValues(lambda values: min(sum(values), 8))
      | beam.Map(print))

Output:

('πŸ₯•', 5)
('πŸ†', 1)
('πŸ…', 8)

Example 4: Combining with multiple arguments

You can pass functions with multiple arguments to CombineValues. They are passed as additional positional arguments or keyword arguments to the function.

In this example, the lambda function takes values and max_value as arguments.

import apache_beam as beam

with beam.Pipeline() as pipeline:
  saturated_total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('πŸ₯•', [3, 2]),
          ('πŸ†', [1]),
          ('πŸ…', [4, 5, 3]),
      ])
      | 'Saturated sum' >> beam.CombineValues(
          lambda values, max_value: min(sum(values), max_value), max_value=8)
      | beam.Map(print))

Output:

('πŸ₯•', 5)
('πŸ†', 1)
('πŸ…', 8)

Example 5: Combining with a CombineFn

The more general way to combine elements, and the most flexible, is with a class that inherits from CombineFn.

import apache_beam as beam

class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return {}

  def add_input(self, accumulator, input):
    # accumulator == {}
    # input == 'πŸ₯•'
    if input not in accumulator:
      accumulator[input] = 0  # {'πŸ₯•': 0}
    accumulator[input] += 1  # {'πŸ₯•': 1}
    return accumulator

  def merge_accumulators(self, accumulators):
    # accumulators == [
    #     {'πŸ₯•': 1, 'πŸ…': 1},
    #     {'πŸ₯•': 1, 'πŸ…': 1, 'πŸ†': 1},
    # ]
    merged = {}
    for accum in accumulators:
      for item, count in accum.items():
        if item not in merged:
          merged[item] = 0
        merged[item] += count
    # merged == {'πŸ₯•': 2, 'πŸ…': 2, 'πŸ†': 1}
    return merged

  def extract_output(self, accumulator):
    # accumulator == {'πŸ₯•': 2, 'πŸ…': 2, 'πŸ†': 1}
    total = sum(accumulator.values())  # 5
    percentages = {item: count / total for item, count in accumulator.items()}
    # percentages == {'πŸ₯•': 0.4, 'πŸ…': 0.4, 'πŸ†': 0.2}
    return percentages

with beam.Pipeline() as pipeline:
  percentages_per_season = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('spring', ['πŸ₯•', 'πŸ…', 'πŸ₯•', 'πŸ…', 'πŸ†']),
          ('summer', ['πŸ₯•', 'πŸ…', '🌽', 'πŸ…', 'πŸ…']),
          ('fall', ['πŸ₯•', 'πŸ₯•', 'πŸ…', 'πŸ…']),
          ('winter', ['πŸ†', 'πŸ†']),
      ])
      | 'Average' >> beam.CombineValues(AverageFn())
      | beam.Map(print))

Output:

('spring', {'πŸ₯•': 0.4, 'πŸ…': 0.4, 'πŸ†': 0.2})
('summer', {'πŸ₯•': 0.2, 'πŸ…': 0.6, '🌽': 0.2})
('fall', {'πŸ₯•': 0.5, 'πŸ…': 0.5})
('winter', {'πŸ†': 1.0})

You can use the following combiner transforms:

Pydoc Pydoc