CombineGlobally
![]() |
Combines all elements in a collection.
See more information in the Beam Programming Guide.
Examples
In the following examples, we create a pipeline with a PCollection
of produce.
Then, we apply CombineGlobally
in multiple ways to combine all the elements in the PCollection
.
CombineGlobally
accepts a function that takes an iterable
of elements as an input, and combines them to return a single element.
Example 1: Combining with a function
We define a function get_common_items
which takes an iterable
of sets as an input, and calculates the intersection (common items) of those sets.
import apache_beam as beam
def get_common_items(sets):
# set.intersection() takes multiple sets as separete arguments.
# We unpack the `sets` list into multiple arguments with the * operator.
# The combine transform might give us an empty list of `sets`,
# so we use a list with an empty set as a default value.
return set.intersection(*(sets or [set()]))
with beam.Pipeline() as pipeline:
common_items = (
pipeline
| 'Create produce' >> beam.Create([
{'π', 'π₯', 'π', 'π
', 'πΆοΈ'},
{'π', 'π₯', 'π₯', 'π
', 'π₯'},
{'π', 'π₯', 'π', 'π
', 'π'},
{'π₯', 'π₯', 'π½', 'π
', 'π₯₯'},
])
| 'Get common items' >> beam.CombineGlobally(get_common_items)
| beam.Map(print))
Output:
Example 2: Combining with a lambda function
We can also use lambda functions to simplify Example 1.
import apache_beam as beam
with beam.Pipeline() as pipeline:
common_items = (
pipeline
| 'Create produce' >> beam.Create([
{'π', 'π₯', 'π', 'π
', 'πΆοΈ'},
{'π', 'π₯', 'π₯', 'π
', 'π₯'},
{'π', 'π₯', 'π', 'π
', 'π'},
{'π₯', 'π₯', 'π½', 'π
', 'π₯₯'},
])
| 'Get common items' >>
beam.CombineGlobally(lambda sets: set.intersection(*(sets or [set()])))
| beam.Map(print))
Output:
Example 3: Combining with multiple arguments
You can pass functions with multiple arguments to CombineGlobally
.
They are passed as additional positional arguments or keyword arguments to the function.
In this example, the lambda function takes sets
and exclude
as arguments.
import apache_beam as beam
with beam.Pipeline() as pipeline:
common_items_with_exceptions = (
pipeline
| 'Create produce' >> beam.Create([
{'π', 'π₯', 'π', 'π
', 'πΆοΈ'},
{'π', 'π₯', 'π₯', 'π
', 'π₯'},
{'π', 'π₯', 'π', 'π
', 'π'},
{'π₯', 'π₯', 'π½', 'π
', 'π₯₯'},
])
| 'Get common items with exceptions' >> beam.CombineGlobally(
lambda sets, exclude: \
set.intersection(*(sets or [set()])) - exclude,
exclude={'π₯'})
| beam.Map(print)
)
Output:
Example 4: Combining with a CombineFn
The more general way to combine elements, and the most flexible, is with a class that inherits from CombineFn
.
CombineFn.create_accumulator()
: This creates an empty accumulator. For example, an empty accumulator for a sum would be0
, while an empty accumulator for a product (multiplication) would be1
.CombineFn.add_input()
: Called once per element. Takes an accumulator and an input element, combines them and returns the updated accumulator.CombineFn.merge_accumulators()
: Multiple accumulators could be processed in parallel, so this function helps merging them into a single accumulator.CombineFn.extract_output()
: It allows to do additional calculations before extracting a result.
import apache_beam as beam
class PercentagesFn(beam.CombineFn):
def create_accumulator(self):
return {}
def add_input(self, accumulator, input):
# accumulator == {}
# input == 'π₯'
if input not in accumulator:
accumulator[input] = 0 # {'π₯': 0}
accumulator[input] += 1 # {'π₯': 1}
return accumulator
def merge_accumulators(self, accumulators):
# accumulators == [
# {'π₯': 1, 'π
': 2},
# {'π₯': 1, 'π
': 1, 'π': 1},
# {'π₯': 1, 'π
': 3},
# ]
merged = {}
for accum in accumulators:
for item, count in accum.items():
if item not in merged:
merged[item] = 0
merged[item] += count
# merged == {'π₯': 3, 'π
': 6, 'π': 1}
return merged
def extract_output(self, accumulator):
# accumulator == {'π₯': 3, 'π
': 6, 'π': 1}
total = sum(accumulator.values()) # 10
percentages = {item: count / total for item, count in accumulator.items()}
# percentages == {'π₯': 0.3, 'π
': 0.6, 'π': 0.1}
return percentages
with beam.Pipeline() as pipeline:
percentages = (
pipeline
| 'Create produce' >> beam.Create(
['π₯', 'π
', 'π
', 'π₯', 'π', 'π
', 'π
', 'π
', 'π₯', 'π
'])
| 'Get percentages' >> beam.CombineGlobally(PercentagesFn())
| beam.Map(print))
Output:
Related transforms
You can use the following combiner transforms:
![]() |
Last updated on 2023/06/02
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!