CoGroupByKey
Aggregates all input elements by their key and allows downstream processing
to consume all values associated with the key. While GroupByKey
performs
this operation over a single input collection and thus a single type of
input values, CoGroupByKey
operates over multiple input collections. As
a result, the result for each key is a tuple of the values associated with
that key in each input collection.
See more information in the Beam Programming Guide.
Examples
Example 1: Say you have two different files with user data; one file has names and email addresses and the other file has names and phone numbers.
You can join those two data sets, using the username as a common key and the other data as the associated values. After the join, you have one data set that contains all of the information (email addresses and phone numbers) associated with each name.
PCollection<KV<UID, Integer>> pt1 = /* ... */;
PCollection<KV<UID, String>> pt2 = /* ... */;
final TupleTag<Integer> t1 = new TupleTag<>();
final TupleTag<String> t2 = new TupleTag<>();
PCollection<KV<UID, CoGBKResult>> result =
KeyedPCollectionTuple.of(t1, pt1).and(t2, pt2)
.apply(CoGroupByKey.create());
result.apply(ParDo.of(new DoFn<KV<K, CoGbkResult>, /* some result */>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<K, CoGbkResult> e = c.element();
CoGbkResult result = e.getValue();
// Retrieve all integers associated with this key from pt1
Iterable<Integer> allIntegers = result.getAll(t1);
// Retrieve the string associated with this key from pt2.
// Note: This will fail if multiple values had the same key in pt2.
String string = e.getOnly(t2);
...
}));
Example 2:
Related transforms
- GroupByKey takes one input collection.
Last updated on 2025/01/28
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!