Schema Patterns

The samples on this page describe common patterns using Schemas. Schemas provide us a type-system for Beam records that is independent of any specific programming-language type. There might be multiple Java classes that all have the same schema (for example a Protocol-Buffer class or a POJO class), and Beam will allow us to seamlessly convert between these types. Schemas also provide a simple way to reason about types across different programming-language APIs. For more information, see the programming guide section on Schemas.

Using Joins

Beam supports equijoins on schema PCollections of Schemas where the join condition depends on the equality of a subset of fields.

Consider using Join if you have multiple collections that provide information about related things, and their structure is known.

For example let’s say we have two different collections with user data: one collection contains names and email addresses; the other collection contains names and phone numbers. We can join the two collections using the name as a common key and the other data as the associated values. After the join, we have one collection that contains all the information (email address and phone numbers) associated with each name.

The following conceptual example uses two input collections to show the mechanism of Join.

First, we define Schemas and User data.

// Define Schemas
Schema emailSchema =
    Schema.of(
        Schema.Field.of("name", Schema.FieldType.STRING),
        Schema.Field.of("email", Schema.FieldType.STRING));

Schema phoneSchema =
    Schema.of(
        Schema.Field.of("name", Schema.FieldType.STRING),
        Schema.Field.of("phone", Schema.FieldType.STRING));

// Create User Data Collections
final List<Row> emailUsers =
    Arrays.asList(
        Row.withSchema(emailSchema).addValue("person1").addValue("person1@example.com").build(),
        Row.withSchema(emailSchema).addValue("person2").addValue("person2@example.com").build(),
        Row.withSchema(emailSchema).addValue("person3").addValue("person3@example.com").build(),
        Row.withSchema(emailSchema).addValue("person4").addValue("person4@example.com").build(),
        Row.withSchema(emailSchema)
            .addValue("person6")
            .addValue("person6@example.com")
            .build());

final List<Row> phoneUsers =
    Arrays.asList(
        Row.withSchema(phoneSchema).addValue("person1").addValue("111-222-3333").build(),
        Row.withSchema(phoneSchema).addValue("person2").addValue("222-333-4444").build(),
        Row.withSchema(phoneSchema).addValue("person3").addValue("444-333-4444").build(),
        Row.withSchema(phoneSchema).addValue("person4").addValue("555-333-4444").build(),
        Row.withSchema(phoneSchema).addValue("person5").addValue("777-333-4444").build());

Then we create the Pcollections for user data and perform join on the two PCollections using a Join.

// Create/Read Schema PCollections
PCollection<Row> emailList =
    p.apply("CreateEmails", Create.of(emailUsers).withRowSchema(emailSchema));

PCollection<Row> phoneList =
    p.apply("CreatePhones", Create.of(phoneUsers).withRowSchema(phoneSchema));

// Perform Join
PCollection<Row> resultRow =
    emailList.apply("Apply Join", Join.<Row, Row>innerJoin(phoneList).using("name"));

// Preview Result
resultRow.apply(
    "Preview Result",
    MapElements.into(TypeDescriptors.strings())
        .via(
            x -> {
              System.out.println(x);
              return "";
            }));

/* Sample Output From the pipeline:
 Row:[Row:[person1, person1@example.com], Row:[person1, 111-222-3333]]
 Row:[Row:[person2, person2@example.com], Row:[person2, 222-333-4444]]
 Row:[Row:[person4, person4@example.com], Row:[person4, 555-333-4444]]
 Row:[Row:[person3, person3@example.com], Row:[person3, 444-333-4444]]
*/

The result Row is of the type Row: [Row(emailSchema), Row(phoneSchema)], and it can be converted to desired format as shown in the code snippet below.

PCollection<String> result =
    resultRow.apply(
        "Format Output",
        MapElements.into(TypeDescriptors.strings())
            .via(
                x -> {
                  String userInfo =
                      "Name: "
                          + x.getRow(0).getValue("name")
                          + " Email: "
                          + x.getRow(0).getValue("email")
                          + " Phone: "
                          + x.getRow(1).getValue("phone");
                  System.out.println(userInfo);
                  return userInfo;
                }));

/* Sample output From the pipeline
Name: person4 Email: person4@example.com Phone: 555-333-4444
Name: person2 Email: person2@example.com Phone: 222-333-4444
Name: person3 Email: person3@example.com Phone: 444-333-4444
Name: person1 Email: person1@example.com Phone: 111-222-3333
 */