CoGroupByKey

Pydoc Pydoc




Aggregates all input elements by their key and allows downstream processing to consume all values associated with the key. While GroupByKey performs this operation over a single input collection and thus a single type of input values, CoGroupByKey operates over multiple input collections. As a result, the result for each key is a tuple of the values associated with that key in each input collection.

See more information in the Beam Programming Guide.

Examples

In the following example, we create a pipeline with two PCollections of produce, one with icons and one with durations, both with a common key of the produce name. Then, we apply CoGroupByKey to join both PCollections using their keys.

CoGroupByKey expects a dictionary of named keyed PCollections, and produces elements joined by their keys. The values of each output element are dictionaries where the names correspond to the input dictionary, with lists of all the values found for that key.

import apache_beam as beam

with beam.Pipeline() as pipeline:
  icon_pairs = pipeline | 'Create icons' >> beam.Create([
      ('Apple', '๐ŸŽ'),
      ('Apple', '๐Ÿ'),
      ('Eggplant', '๐Ÿ†'),
      ('Tomato', '๐Ÿ…'),
  ])

  duration_pairs = pipeline | 'Create durations' >> beam.Create([
      ('Apple', 'perennial'),
      ('Carrot', 'biennial'),
      ('Tomato', 'perennial'),
      ('Tomato', 'annual'),
  ])

  plants = (({
      'icons': icon_pairs, 'durations': duration_pairs
  })
            | 'Merge' >> beam.CoGroupByKey()
            | beam.Map(print))

Output:

('Apple', {'icons': ['๐ŸŽ', '๐Ÿ'], 'durations': ['perennial']})
('Carrot', {'icons': [], 'durations': ['biennial']})
('Tomato', {'icons': ['๐Ÿ…'], 'durations': ['perennial', 'annual']})
('Eggplant', {'icons': ['๐Ÿ†'], 'durations': []})
Pydoc Pydoc