Schema Patterns
The samples on this page describe common patterns using Schemas. Schemas provide us a type-system for Beam records that is independent of any specific programming-language type. There might be multiple Java classes that all have the same schema (for example a Protocol-Buffer class or a POJO class), and Beam will allow us to seamlessly convert between these types. Schemas also provide a simple way to reason about types across different programming-language APIs. For more information, see the programming guide section on Schemas.
- Java SDK
Using Joins
Beam supports equijoins on schema PCollections
of Schemas where the join condition depends on the equality of a subset of fields.
Consider using Join
if you have multiple collections that provide information about related things, and their structure is known.
For example let’s say we have two different collections with user data: one collection contains names and email addresses; the other collection contains names and phone numbers. We can join the two collections using the name as a common key and the other data as the associated values. After the join, we have one collection that contains all the information (email address and phone numbers) associated with each name.
The following conceptual example uses two input collections to show the mechanism of Join
.
First, we define Schemas and User data.
// Define Schemas
Schema emailSchema =
Schema.of(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("email", Schema.FieldType.STRING));
Schema phoneSchema =
Schema.of(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("phone", Schema.FieldType.STRING));
// Create User Data Collections
final List<Row> emailUsers =
Arrays.asList(
Row.withSchema(emailSchema).addValue("person1").addValue("person1@example.com").build(),
Row.withSchema(emailSchema).addValue("person2").addValue("person2@example.com").build(),
Row.withSchema(emailSchema).addValue("person3").addValue("person3@example.com").build(),
Row.withSchema(emailSchema).addValue("person4").addValue("person4@example.com").build(),
Row.withSchema(emailSchema)
.addValue("person6")
.addValue("person6@example.com")
.build());
final List<Row> phoneUsers =
Arrays.asList(
Row.withSchema(phoneSchema).addValue("person1").addValue("111-222-3333").build(),
Row.withSchema(phoneSchema).addValue("person2").addValue("222-333-4444").build(),
Row.withSchema(phoneSchema).addValue("person3").addValue("444-333-4444").build(),
Row.withSchema(phoneSchema).addValue("person4").addValue("555-333-4444").build(),
Row.withSchema(phoneSchema).addValue("person5").addValue("777-333-4444").build());
Then we create the Pcollections
for user data and perform join on the two PCollections
using a Join
.
// Create/Read Schema PCollections
PCollection<Row> emailList =
p.apply("CreateEmails", Create.of(emailUsers).withRowSchema(emailSchema));
PCollection<Row> phoneList =
p.apply("CreatePhones", Create.of(phoneUsers).withRowSchema(phoneSchema));
// Perform Join
PCollection<Row> resultRow =
emailList.apply("Apply Join", Join.<Row, Row>innerJoin(phoneList).using("name"));
// Preview Result
resultRow.apply(
"Preview Result",
MapElements.into(TypeDescriptors.strings())
.via(
x -> {
System.out.println(x);
return "";
}));
/* Sample Output From the pipeline:
Row:[Row:[person1, person1@example.com], Row:[person1, 111-222-3333]]
Row:[Row:[person2, person2@example.com], Row:[person2, 222-333-4444]]
Row:[Row:[person4, person4@example.com], Row:[person4, 555-333-4444]]
Row:[Row:[person3, person3@example.com], Row:[person3, 444-333-4444]]
*/
The result Row
is of the type Row: [Row(emailSchema), Row(phoneSchema)]
, and it can be converted to desired format as shown in the code snippet below.
PCollection<String> result =
resultRow.apply(
"Format Output",
MapElements.into(TypeDescriptors.strings())
.via(
x -> {
String userInfo =
"Name: "
+ x.getRow(0).getValue("name")
+ " Email: "
+ x.getRow(0).getValue("email")
+ " Phone: "
+ x.getRow(1).getValue("phone");
System.out.println(userInfo);
return userInfo;
}));
/* Sample output From the pipeline
Name: person4 Email: person4@example.com Phone: 555-333-4444
Name: person2 Email: person2@example.com Phone: 222-333-4444
Name: person3 Email: person3@example.com Phone: 444-333-4444
Name: person1 Email: person1@example.com Phone: 111-222-3333
*/
Last updated on 2025/01/19
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!