@Experimental(value=SCHEMAS) public class CoGroup extends java.lang.Object
PCollection
s.
This transform has similarities to CoGroupByKey
, however works on PCollections that
have schemas. This allows users of the transform to simply specify schema fields to join on. The
output type of the transform is Row
that contains one row field for the key and an ITERABLE
field for each input containing the rows that joined on that key; by default the cross product is
not expanded, but the cross product can be optionally expanded. By default the key field is named
"key" (the name can be overridden using withKeyField) and has index 0. The tags in the
PCollectionTuple control the names of the value fields in the Row.
For example, the following demonstrates joining three PCollections on the "user" and "country" fields:
PCollection<Row> joined =
PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3)
.apply(CoGroup.join(By.fieldNames("user", "country")));
In the above case, the key schema will contain the two string fields "user" and "country"; in this case, the schemas for Input1, Input2, Input3 must all have fields named "user" and "country". The remainder of the Row will contain three iterable of Row fields named "input1" "input2" and "input3". This contains all inputs that came in on any of the inputs for that key. Standard join types (inner join, outer join, etc.) can be accomplished by expanding the cross product of these iterables in various ways.
To put it in other words, the key schema is convertible to the following POJO:
@DefaultSchema(JavaFieldSchema.class) public class JoinedKey { public String user; public String country; }
The value schema is convertible to the following POJO:
{@code @DefaultSchema(JavaFieldSchema.class) public class JoinedValue { public JoinedKey key; // The below lists contain all values from each of the three inputs that match on the given // key. public Iterableinput1; public Iterable input2; public Iterable input3; } PCollection values = joined.apply(Convert.to(JoinedValue.class)); PCollection keys = values .apply(Select.fieldNames("key")) .apply(Convert.to(JoinedKey.class)); }
It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example:
{@code PCollectionjoined = PCollectionTuple.of("input1Tag", input1, "input2Tag", input2) .apply(CoGroup .join("input1Tag", By.fieldNames("referringUser"))) .join("input2Tag", By.fieldNames("user"))); }
Traditional (SQL) joins are cross-product joins. All rows that match the join condition are combined into individual rows and returned; in fact any SQL inner joins is a subset of the cross-product of two tables. This transform also supports the same functionality using the {@link Impl#crossProductJoin()} method.
For example, consider the SQL join: SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user
You could express this with:
{@code PCollectionjoined = PCollectionTuple.of("input1", input1, "input2", input2) .apply(CoGroup.join(By.fieldNames("user")).crossProductJoin(); }
The schema of the output PCollection contains a nested message for each of input1 and input2. Like above, you could use the {@link Convert} transform to convert it to the following POJO:
{@code {@literal @}DefaultSchema(JavaFieldSchema.class) public class JoinedValue { public Input1Type input1; public Input2Type input2; } }
The {@link Unnest} transform can then be used to flatten all the subfields into one single top-level row containing all the fields in both Input1 and Input2; this will often be combined with a {@link Select} transform to select out the fields of interest, as the key fields will be identical between input1 and input2.
This transform also supports outer-join semantics. By default, all input PCollections must participate fully in the join, providing inner-join semantics. This means that the join will only produce values for "Bob" if all inputs have values for "Bob;" if even a single input does not have a value for "Bob," an inner-join will produce no value. However, if you mark that input as having optional participation then the join will contain values for "Bob," as long as at least one input has a "Bob" value; null values will be added for inputs that have no "Bob" values. To continue the SQL example:
SELECT * FROM input1 LEFT OUTER JOIN input2 ON input1.user = input2.user
Is equivalent to:
{@code PCollectionjoined = PCollectionTuple.of("input1", input1, "input2", input2) .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) .join("input2", By.fieldNames("user")) .crossProductJoin(); }
SELECT * FROM input1 RIGHT OUTER JOIN input2 ON input1.user = input2.user
Is equivalent to:
{@code PCollectionjoined = PCollectionTuple.of("input1", input1, "input2", input2) .apply(CoGroup.join("input1", By.fieldNames("user")) .join("input2", By.fieldNames("user").withOptionalParticipation()) .crossProductJoin(); }
and SELECT * FROM input1 FULL OUTER JOIN input2 ON input1.user = input2.user
Is equivalent to:
{@code PCollectionjoined = PCollectionTuple.of("input1", input1, "input2", input2) .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) .join("input2", By.fieldNames("user").withOptionalParticipation()) .crossProductJoin(); }
While the above examples use two inputs to mimic SQL's left and right join semantics, the {@link CoGroup} transform supports any number of inputs, and optional participation can be specified on any subset of them.
Do note that cross-product joins while simpler and easier to program, can cause performance problems.
Modifier and Type | Class and Description |
---|---|
static class |
CoGroup.By
Defines the set of fields to extract for the join key, as well as other per-input join options.
|
static class |
CoGroup.ExpandCrossProduct
A
PTransform that calculates the cross-product join. |
static class |
CoGroup.Impl
The implementing PTransform.
|
Constructor and Description |
---|
CoGroup() |
Modifier and Type | Method and Description |
---|---|
static CoGroup.Impl |
join(CoGroup.By clause)
Join all input PCollections using the same args.
|
static CoGroup.Impl |
join(java.lang.String tag,
CoGroup.By clause)
Specify the following join arguments (including fields to join by_ for the specified
PCollection.
|
public static CoGroup.Impl join(CoGroup.By clause)
The same fields and other options are used in all input PCollections.
public static CoGroup.Impl join(java.lang.String tag, CoGroup.By clause)
Each PCollection in the input must have args specified for the join key.