Side input patterns

The samples on this page show you common Beam side input patterns. A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection. For more information, see the programming guide section on side inputs.

Slowly updating global window side inputs

You can retrieve side inputs from global windows to use them in a pipeline job with non-global windows, like a FixedWindow.

To slowly update global window side inputs in pipelines with non-global windows:

  1. Write a DoFn that periodically pulls data from a bounded source into a global window.

    a. Use the GenerateSequence source transform to periodically emit a value.

    b. Instantiate a data-driven trigger that activates on each element and pulls data from a bounded source.

    c. Fire the trigger to pass the data into the global window.

  2. Create the side input for downstream transforms. The side input should fit into memory.

The global window side input triggers on processing time, so the main pipeline nondeterministically matches the side input to elements in event time.

For instance, the following code sample uses a Map to create a DoFn. The Map becomes a View.asSingleton side input that’s rebuilt on each counter tick. The side input updates every 5 seconds in order to demonstrate the workflow. In a real-world scenario, the side input would typically update every few hours or once per day.

public static void sideInputPatterns() {
  // This pipeline uses View.asSingleton for a placeholder external service.
  // Run in debug mode to see the output.
  Pipeline p = Pipeline.create();

  // Create a side input that updates each second.
  PCollectionView<Map<String, String>> map =
      p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
          .apply(
              Window.<Long>into(new GlobalWindows())
                  .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                  .discardingFiredPanes())
          .apply(
              ParDo.of(
                  new DoFn<Long, Map<String, String>>() {

                    @ProcessElement
                    public void process(
                        @Element Long input, OutputReceiver<Map<String, String>> o) {
                      // Replace map with test data from the placeholder external service.
                      // Add external reads here.
                      o.output(PlaceholderExternalService.readTestData());
                    }
                  }))
          .apply(View.asSingleton());

  // Consume side input. GenerateSequence generates test data.
  // Use a real source (like PubSubIO or KafkaIO) in production.
  p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
      .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
      .apply(Sum.longsGlobally().withoutDefaults())
      .apply(
          ParDo.of(
                  new DoFn<Long, KV<Long, Long>>() {

                    @ProcessElement
                    public void process(ProcessContext c) {
                      Map<String, String> keyMap = c.sideInput(map);
                      c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

                      LOG.debug(
                          "Value is {}, key A is {}, and key B is {}.",
                          c.element(),
                          keyMap.get("Key_A"),
                          keyMap.get("Key_B"));
                    }
                  })
              .withSideInputs(map));
}

/** Placeholder class that represents an external service generating test data. */
public static class PlaceholderExternalService {

  public static Map<String, String> readTestData() {

    Map<String, String> map = new HashMap<>();
    Instant now = Instant.now();

    DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");

    map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
    map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());

    return map;
  }
}