CombineGlobally
![]() |
Combines all elements in a collection.
See more information in the Beam Programming Guide.
Examples
In the following examples, we create a pipeline with a PCollection
of produce.
Then, we apply CombineGlobally
in multiple ways to combine all the elements in the PCollection
.
CombineGlobally
accepts a function that takes an iterable
of elements as an input, and combines them to return a single element.
Example 1: Combining with a function
We define a function get_common_items
which takes an iterable
of sets as an input, and calculates the intersection (common items) of those sets.
import apache_beam as beam
def get_common_items(sets):
# set.intersection() takes multiple sets as separete arguments.
# We unpack the `sets` list into multiple arguments with the * operator.
# The combine transform might give us an empty list of `sets`,
# so we use a list with an empty set as a default value.
return set.intersection(*(sets or [set()]))
with beam.Pipeline() as pipeline:
common_items = (
pipeline
| 'Create produce' >> beam.Create([
{'π', 'π₯', 'π', 'π
', 'πΆοΈ'},
{'π', 'π₯', 'π₯', 'π
', 'π₯'},
{'π', 'π₯', 'π', 'π
', 'π'},
{'π₯', 'π₯', 'π½', 'π
', 'π₯₯'},
])
| 'Get common items' >> beam.CombineGlobally(get_common_items)
| beam.Map(print))
Output:
{'π
', 'π₯'}
![]() |
Example 2: Combining with a lambda function
We can also use lambda functions to simplify Example 1.
import apache_beam as beam
with beam.Pipeline() as pipeline:
common_items = (
pipeline
| 'Create produce' >> beam.Create([
{'π', 'π₯', 'π', 'π
', 'πΆοΈ'},
{'π', 'π₯', 'π₯', 'π
', 'π₯'},
{'π', 'π₯', 'π', 'π
', 'π'},
{'π₯', 'π₯', 'π½', 'π
', 'π₯₯'},
])
| 'Get common items' >>
beam.CombineGlobally(lambda sets: set.intersection(*(sets or [set()])))
| beam.Map(print))
Output:
{'π
', 'π₯'}
![]() |
Example 3: Combining with multiple arguments
You can pass functions with multiple arguments to CombineGlobally
.
They are passed as additional positional arguments or keyword arguments to the function.
In this example, the lambda function takes sets
and exclude
as arguments.
import apache_beam as beam
with beam.Pipeline() as pipeline:
common_items_with_exceptions = (
pipeline
| 'Create produce' >> beam.Create([
{'π', 'π₯', 'π', 'π
', 'πΆοΈ'},
{'π', 'π₯', 'π₯', 'π
', 'π₯'},
{'π', 'π₯', 'π', 'π
', 'π'},
{'π₯', 'π₯', 'π½', 'π
', 'π₯₯'},
])
| 'Get common items with exceptions' >> beam.CombineGlobally(
lambda sets, exclude: \
set.intersection(*(sets or [set()])) - exclude,
exclude={'π₯'})
| beam.Map(print)
)
Output:
{'π
'}
![]() |
Example 4: Combining with side inputs as singletons
If the PCollection
has a single value, such as the average from another computation,
passing the PCollection
as a singleton accesses that value.
In this example, we pass a PCollection
the value 'π₯'
as a singleton.
We then use that value to exclude specific items.
import apache_beam as beam
with beam.Pipeline() as pipeline:
single_exclude = pipeline | 'Create single_exclude' >> beam.Create(['π₯'])
common_items_with_exceptions = (
pipeline
| 'Create produce' >> beam.Create([
{'π', 'π₯', 'π', 'π
', 'πΆοΈ'},
{'π', 'π₯', 'π₯', 'π
', 'π₯'},
{'π', 'π₯', 'π', 'π
', 'π'},
{'π₯', 'π₯', 'π½', 'π
', 'π₯₯'},
])
| 'Get common items with exceptions' >> beam.CombineGlobally(
lambda sets, single_exclude: \
set.intersection(*(sets or [set()])) - {single_exclude},
single_exclude=beam.pvalue.AsSingleton(single_exclude))
| beam.Map(print)
)
Output:
{'π
'}
![]() |
Example 5: Combining with side inputs as iterators
If the PCollection
has multiple values, pass the PCollection
as an iterator.
This accesses elements lazily as they are needed,
so it is possible to iterate over large PCollection
s that won’t fit into memory.
import apache_beam as beam
with beam.Pipeline() as pipeline:
exclude = pipeline | 'Create exclude' >> beam.Create(['π₯'])
common_items_with_exceptions = (
pipeline
| 'Create produce' >> beam.Create([
{'π', 'π₯', 'π', 'π
', 'πΆοΈ'},
{'π', 'π₯', 'π₯', 'π
', 'π₯'},
{'π', 'π₯', 'π', 'π
', 'π'},
{'π₯', 'π₯', 'π½', 'π
', 'π₯₯'},
])
| 'Get common items with exceptions' >> beam.CombineGlobally(
lambda sets, exclude: \
set.intersection(*(sets or [set()])) - set(exclude),
exclude=beam.pvalue.AsIter(exclude))
| beam.Map(print)
)
Output:
{'π
'}
![]() |
Note: You can pass the
PCollection
as a list withbeam.pvalue.AsList(pcollection)
, but this requires that all the elements fit into memory.
Example 6: Combining with side inputs as dictionaries
If a PCollection
is small enough to fit into memory, then that PCollection
can be passed as a dictionary.
Each element must be a (key, value)
pair.
Note that all the elements of the PCollection
must fit into memory for this.
If the PCollection
won’t fit into memory, use beam.pvalue.AsIter(pcollection)
instead.
import apache_beam as beam
def get_custom_common_items(sets, options):
sets = sets or [set()]
common_items = set.intersection(*sets)
common_items |= options['include'] # union
common_items &= options['exclude'] # intersection
return common_items
with beam.Pipeline() as pipeline:
options = pipeline | 'Create options' >> beam.Create([
('exclude', {'π₯'}),
('include', {'π', 'π½'}),
])
custom_common_items = (
pipeline
| 'Create produce' >> beam.Create([
{'π', 'π₯', 'π', 'π
', 'πΆοΈ'},
{'π', 'π₯', 'π₯', 'π
', 'π₯'},
{'π', 'π₯', 'π', 'π
', 'π'},
{'π₯', 'π₯', 'π½', 'π
', 'π₯₯'},
])
| 'Get common items' >> beam.CombineGlobally(
get_custom_common_items, options=beam.pvalue.AsDict(options))
| beam.Map(print))
Output:
{'π
', 'π', 'π½'}
![]() |
Example 7: Combining with a CombineFn
The more general way to combine elements, and the most flexible, is with a class that inherits from CombineFn
.
CombineFn.create_accumulator()
: This creates an empty accumulator. For example, an empty accumulator for a sum would be0
, while an empty accumulator for a product (multiplication) would be1
.CombineFn.add_input()
: Called once per element. Takes an accumulator and an input element, combines them and returns the updated accumulator.CombineFn.merge_accumulators()
: Multiple accumulators could be processed in parallel, so this function helps merging them into a single accumulator.CombineFn.extract_output()
: It allows to do additional calculations before extracting a result.
import apache_beam as beam
class PercentagesFn(beam.CombineFn):
def create_accumulator(self):
return {}
def add_input(self, accumulator, input):
# accumulator == {}
# input == 'π₯'
if input not in accumulator:
accumulator[input] = 0 # {'π₯': 0}
accumulator[input] += 1 # {'π₯': 1}
return accumulator
def merge_accumulators(self, accumulators):
# accumulators == [
# {'π₯': 1, 'π
': 2},
# {'π₯': 1, 'π
': 1, 'π': 1},
# {'π₯': 1, 'π
': 3},
# ]
merged = {}
for accum in accumulators:
for item, count in accum.items():
if item not in merged:
merged[item] = 0
merged[item] += count
# merged == {'π₯': 3, 'π
': 6, 'π': 1}
return merged
def extract_output(self, accumulator):
# accumulator == {'π₯': 3, 'π
': 6, 'π': 1}
total = sum(accumulator.values()) # 10
percentages = {item: count / total for item, count in accumulator.items()}
# percentages == {'π₯': 0.3, 'π
': 0.6, 'π': 0.1}
return percentages
with beam.Pipeline() as pipeline:
percentages = (
pipeline
| 'Create produce' >> beam.Create(
['π₯', 'π
', 'π
', 'π₯', 'π', 'π
', 'π
', 'π
', 'π₯', 'π
'])
| 'Get percentages' >> beam.CombineGlobally(PercentagesFn())
| beam.Map(print))
Output:
{'π₯': 0.3, 'π
': 0.6, 'π': 0.1}
![]() |
Related transforms
You can use the following combiner transforms:
![]() |