Side input patterns
The samples on this page show you common Beam side input patterns. A side input is an additional input that your DoFn
can access each time it processes an element in the input PCollection
. For more information, see the programming guide section on side inputs.
If you are trying to enrich your data by doing a key-value lookup to a remote service, you may first want to consider the Enrichment transform which can abstract away some of the details of side inputs and provide additional benefits like client-side throttling.
- Java SDK
- Python SDK
Slowly updating global window side inputs
You can retrieve side inputs from global windows to use them in a pipeline job with non-global windows, like a FixedWindow
.
To slowly update global window side inputs in pipelines with non-global windows:
Write a
DoFn
that periodically pulls data from a bounded source into a global window.a. Use the
GenerateSequence
source transform to periodically emit a value.b. Instantiate a data-driven trigger that activates on each element and pulls data from a bounded source.
c. Fire the trigger to pass the data into the global window.
Create the side input for downstream transforms. The side input should fit into memory.
The global window side input triggers on processing time, so the main pipeline non-deterministically matches the side input to elements in event time.
For instance, the following code sample uses a Map
to create a DoFn
. The Map
becomes a View.asSingleton
side input that’s rebuilt on each counter tick. The side input updates every 5 seconds in order to demonstrate the workflow. In a real-world scenario, the side input would typically update every few hours or once per day.
public static void sideInputPatterns() {
// This pipeline uses View.asSingleton for a placeholder external service.
// Run in debug mode to see the output.
Pipeline p = Pipeline.create();
// Create a side input that updates every 5 seconds.
// View as an iterable, not singleton, so that if we happen to trigger more
// than once before Latest.globally is computed we can handle both elements.
PCollectionView<Iterable<Map<String, String>>> mapIterable =
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
.apply(
ParDo.of(
new DoFn<Long, Map<String, String>>() {
@ProcessElement
public void process(
@Element Long input,
@Timestamp Instant timestamp,
OutputReceiver<Map<String, String>> o) {
// Replace map with test data from the placeholder external service.
// Add external reads here.
o.output(PlaceholderExternalService.readTestData(timestamp));
}
}))
.apply(
Window.<Map<String, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(Latest.globally())
.apply(View.asIterable());
// Consume side input. GenerateSequence generates test data.
// Use a real source (like PubSubIO or KafkaIO) in production.
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(
ParDo.of(
new DoFn<Long, KV<Long, Long>>() {
@ProcessElement
public void process(ProcessContext c, @Timestamp Instant timestamp) {
Iterable<Map<String, String>> si = c.sideInput(mapIterable);
// Take an element from the side input iterable (likely length 1)
Map<String, String> keyMap = si.iterator().next();
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.info(
"Value is {} with timestamp {}, using key A from side input with time {}.",
c.element(),
timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")),
keyMap.get("Key_A"));
}
})
.withSideInputs(mapIterable));
p.run();
}
/** Placeholder class that represents an external service generating test data. */
public static class PlaceholderExternalService {
public static Map<String, String> readTestData(Instant timestamp) {
Map<String, String> map = new HashMap<>();
map.put("Key_A", timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")));
return map;
}
}
Slowly updating side input using windowing
You can read side input data periodically into distinct PCollection windows. When you apply the side input to your main input, each main input window is automatically matched to a single side input window. This guarantees consistency on the duration of the single window, meaning that each window on the main input will be matched to a single version of side input data.
To read side input data periodically into distinct PCollection windows:
- Use the PeriodicImpulse or PeriodicSequence PTransform to:
- Generate an infinite sequence of elements at required processing time intervals
- Assign them to separate windows.
- Fetch data using SDF Read or ReadAll PTransform triggered by arrival of PCollection element.
- Apply the side input.
PCollectionView<List<Long>> sideInput =
p.apply(
"SIImpulse",
PeriodicImpulse.create()
.startAt(startAt)
.stopAt(stopAt)
.withInterval(interval1)
.applyWindowing())
.apply(
"FileToRead",
ParDo.of(
new DoFn<Instant, String>() {
@DoFn.ProcessElement
public void process(@Element Instant notUsed, OutputReceiver<String> o) {
o.output(fileToRead);
}
}))
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(TextIO.readFiles())
.apply(
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void process(@Element String src, OutputReceiver<String> o) {
o.output(src);
}
}))
.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults())
.apply(View.asList());
PCollection<Instant> mainInput =
p.apply(
"MIImpulse",
PeriodicImpulse.create()
.startAt(startAt.minus(Duration.standardSeconds(1)))
.stopAt(stopAt.minus(Duration.standardSeconds(1)))
.withInterval(interval2)
.applyWindowing());
// Consume side input. GenerateSequence generates test data.
// Use a real source (like PubSubIO or KafkaIO) in production.
PCollection<Long> result =
mainInput.apply(
"generateOutput",
ParDo.of(
new DoFn<Instant, Long>() {
@ProcessElement
public void process(ProcessContext c) {
c.output((long) c.sideInput(sideInput).size());
}
})
.withSideInputs(sideInput));
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms import window
# from apache_beam.utils.timestamp import MAX_TIMESTAMP
# last_timestamp = MAX_TIMESTAMP to go on indefninitely
# Any user-defined function.
# cross join is used as an example.
def cross_join(left, rights):
for x in rights:
yield (left, x)
# Create pipeline.
pipeline = beam.Pipeline()
side_input = (
pipeline
| 'PeriodicImpulse' >> PeriodicImpulse(
first_timestamp, last_timestamp, interval, True)
| 'MapToFileName' >> beam.Map(lambda x: src_file_pattern + str(x))
| 'ReadFromFile' >> beam.io.ReadAllFromText())
main_input = (
pipeline
| 'MpImpulse' >> beam.Create(sample_main_input_elements)
|
'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
| 'WindowMpInto' >> beam.WindowInto(
window.FixedWindows(main_input_windowing_interval)))
result = (
main_input
| 'ApplyCrossJoin' >> beam.FlatMap(
cross_join, rights=beam.pvalue.AsIter(side_input)))
Last updated on 2025/01/19
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!