@Experimental(value=SCHEMAS) public class CoGroup extends java.lang.Object
PCollection
s.
This transform has similarities to CoGroupByKey
, however works on PCollections that
have schemas. This allows users of the transform to simply specify schema fields to join on. The
output type of the transform is a KV<Row, Row>
where the value contains one field for
every input PCollection and the key represents the fields that were joined on. By default the
cross product is not expanded, so all fields in the output row are array fields.
For example, the following demonstrates joining three PCollections on the "user" and "country" fields:
PCollection<KV<Row, Row>> joined =
PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3)
.apply(CoGroup.join(By.fieldNames("user", "country")));
In the above case, the key schema will contain the two string fields "user" and "country"; in this case, the schemas for Input1, Input2, Input3 must all have fields named "user" and "country". The value schema will contain three array of Row fields named "input1" "input2" and "input3". The value Row contains all inputs that came in on any of the inputs for that key. Standard join types (inner join, outer join, etc.) can be accomplished by expanding the cross product of these arrays in various ways.
To put it in other words, the key schema is convertible to the following POJO:
@DefaultSchema(JavaFieldSchema.class)
public class JoinedKey {
public String user;
public String country;
}
PCollection<JoinedKey> keys = joined
.apply(Keys.create())
.apply(Convert.to(JoinedKey.class));
The value schema is convertible to the following POJO:
@DefaultSchema(JavaFieldSchema.class)
public class JoinedValue {
// The below lists contain all values from each of the three inputs that match on the given
// key.
public List<Input1Type> input1;
public List<Input2Type> input2;
public List<Input3Type> input3;
}
PCollection<JoinedValue> values = joined
.apply(Values.create())
.apply(Convert.to(JoinedValue.class));
It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example:
PCollection<KV<Row, Row>> joined
= PCollectionTuple.of("input1Tag", input1, "input2Tag", input2)
.apply(CoGroup
.join("input1Tag", By.fieldNames("referringUser")))
.join("input2Tag", By.fieldNames("user")));
Traditional (SQL) joins are cross-product joins. All rows that match the join condition are
combined into individual rows and returned; in fact any SQL inner joins is a subset of the
cross-product of two tables. This transform also supports the same functionality using the CoGroup.Impl.crossProductJoin()
method.
For example, consider the SQL join: SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user
You could express this with:
PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2)
.apply(CoGroup.join(By.fieldNames("user")).crossProductJoin();
The schema of the output PCollection contains a nested message for each of input1 and input2.
Like above, you could use the Convert
transform to convert it to the following POJO:
{@literal @}DefaultSchema(JavaFieldSchema.class)
public class JoinedValue {
public Input1Type input1;
public Input2Type input2;
}
The Unnest
transform can then be used to flatten all the subfields into one single
top-level row containing all the fields in both Input1 and Input2; this will often be combined
with a Select
transform to select out the fields of interest, as the key fields will be
identical between input1 and input2.
This transform also supports outer-join semantics. By default, all input PCollections must participate fully in the join, providing inner-join semantics. This means that the join will only produce values for "Bob" if all inputs have values for "Bob;" if even a single input does not have a value for "Bob," an inner-join will produce no value. However, if you mark that input as having optional participation then the join will contain values for "Bob," as long as at least one input has a "Bob" value; null values will be added for inputs that have no "Bob" values. To continue the SQL example:
SELECT * FROM input1 LEFT OUTER JOIN input2 ON input1.user = input2.user
Is equivalent to:
PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2)
.apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation())
.join("input2", By.fieldNames("user"))
.crossProductJoin();
SELECT * FROM input1 RIGHT OUTER JOIN input2 ON input1.user = input2.user
Is equivalent to:
PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2)
.apply(CoGroup.join("input1", By.fieldNames("user"))
.join("input2", By.fieldNames("user").withOptionalParticipation())
.crossProductJoin();
and SELECT * FROM input1 FULL OUTER JOIN input2 ON input1.user = input2.user
Is equivalent to:
PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2)
.apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation())
.join("input2", By.fieldNames("user").withOptionalParticipation())
.crossProductJoin();
While the above examples use two inputs to mimic SQL's left and right join semantics, the
CoGroup
transform supports any number of inputs, and optional participation can be
specified on any subset of them.
Do note that cross-product joins while simpler and easier to program, can cause
Modifier and Type | Class and Description |
---|---|
static class |
CoGroup.By
Defines the set of fields to extract for the join key, as well as other per-input join options.
|
static class |
CoGroup.ExpandCrossProduct
A
PTransform that calculates the cross-product join. |
static class |
CoGroup.Impl
The implementing PTransform.
|
Constructor and Description |
---|
CoGroup() |
Modifier and Type | Method and Description |
---|---|
static CoGroup.Impl |
join(CoGroup.By clause)
Join all input PCollections using the same args.
|
static CoGroup.Impl |
join(java.lang.String tag,
CoGroup.By clause)
Specify the following join arguments (including fields to join by_ for the specified
PCollection.
|
public static CoGroup.Impl join(CoGroup.By clause)
The same fields and other options are used in all input PCollections.
public static CoGroup.Impl join(java.lang.String tag, CoGroup.By clause)
Each PCollection in the input must have args specified for the join key.