Class CreateStream<T>
- Type Parameters:
T
- The type of the element in this stream.
- All Implemented Interfaces:
Serializable
,HasDisplayData
To properly compose a stream of micro-batches with their Watermarks, please keep in mind that eventually there a two queues here - one for batches and another for Watermarks.
While both queues advance according to Spark's batch-interval, there is a slight difference in how data is pushed into the stream compared to the advancement of Watermarks since Watermarks advance onBatchCompleted hook call so if you'd want to set the watermark advance for a specific batch it should be called before that batch. Also keep in mind that being a queue that is polled per batch interval, if there is a need to "hold" the same Watermark without advancing it, it should be stated explicitly or the Watermark will advance as soon as it can (in the next batch completed hook).
Example 1:
CreateStream.of(StringUtf8Coder.of(), batchDuration)
.nextBatch(
TimestampedValue.of("foo", endOfGlobalWindow),
TimestampedValue.of("bar", endOfGlobalWindow))
.advanceNextBatchWatermarkToInfinity();
The first batch will see the default start-of-time WM of BoundedWindow.TIMESTAMP_MIN_VALUE
and any following batch will see the end-of-time WM BoundedWindow.TIMESTAMP_MAX_VALUE
.
Example 2:
CreateStream.of(VarIntCoder.of(), batchDuration)
.nextBatch(
TimestampedValue.of(1, instant))
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
.nextBatch(
TimestampedValue.of(2, instant))
.nextBatch(
TimestampedValue.of(3, instant))
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(30)))
The first batch will see the start-of-time WM and the second will see the advanced (+20 min.) WM. The third WM will see the WM advanced to +30 min, because this is the next advancement of the WM regardless of where it ws called in the construction of CreateStream.
- See Also:
-
Field Summary
FieldsFields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints
-
Method Summary
Modifier and TypeMethodDescriptionAdvances the watermark in the next batch to the end-of-time.advanceWatermarkForNextBatch
(Instant newWatermark) Advances the watermark in the next batch.Adds an empty batch.Override this method to specify how thisPTransform
should be expanded on the givenInputT
.long
Get the underlying queue representing the mock stream of micro-batches.getTimes()
Get times so they can be pushed into theGlobalWatermarkHolder
.initialSystemTimeAt
(Instant initialSystemTime) Set the initial synchronized processing time.boolean
final CreateStream
<T> nextBatch
(TimestampedValue<T>... batchElements) Enqueue next micro-batch elements.final CreateStream
<T> For non-timestamped elements.static <T> CreateStream
<T> Creates a new Spark based stream without forced watermark sync, intended for test purposes.static <T> CreateStream
<T> Creates a new Spark based stream intended for test purposes.Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setDisplayData, setResourceHints, toString, validate, validate
-
Field Details
-
TRANSFORM_URN
- See Also:
-
-
Method Details
-
of
public static <T> CreateStream<T> of(Coder<T> coder, Duration batchDuration, boolean forceWatermarkSync) Creates a new Spark based stream intended for test purposes.- Parameters:
coder
- the coder to be used for this stream.batchDuration
- the batch duration (interval) to be used for creating this stream.forceWatermarkSync
- whether this stream should be synced with the advancement of the watermark maintained by theGlobalWatermarkHolder
.
-
of
Creates a new Spark based stream without forced watermark sync, intended for test purposes. See alsoof(Coder, Duration, boolean)
. -
nextBatch
Enqueue next micro-batch elements. This is backed by aQueue
so stream input order would keep the population order (FIFO). -
nextBatch
For non-timestamped elements. -
emptyBatch
Adds an empty batch. -
initialSystemTimeAt
Set the initial synchronized processing time. -
advanceWatermarkForNextBatch
Advances the watermark in the next batch. -
advanceNextBatchWatermarkToInfinity
Advances the watermark in the next batch to the end-of-time. -
getBatchDuration
public long getBatchDuration() -
getBatches
Get the underlying queue representing the mock stream of micro-batches. -
getTimes
Get times so they can be pushed into theGlobalWatermarkHolder
. -
isForceWatermarkSync
public boolean isForceWatermarkSync() -
expand
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PBegin,
PCollection<T>>
-