T
- The type of the element in this stream.public final class CreateStream<T> extends PTransform<PBegin,PCollection<T>>
To properly compose a stream of micro-batches with their Watermarks, please keep in mind that eventually there a two queues here - one for batches and another for Watermarks.
While both queues advance according to Spark's batch-interval, there is a slight difference in how data is pushed into the stream compared to the advancement of Watermarks since Watermarks advance onBatchCompleted hook call so if you'd want to set the watermark advance for a specific batch it should be called before that batch. Also keep in mind that being a queue that is polled per batch interval, if there is a need to "hold" the same Watermark without advancing it, it should be stated explicitly or the Watermark will advance as soon as it can (in the next batch completed hook).
Example 1:
CreateStream.of(StringUtf8Coder.of(), batchDuration)
.nextBatch(
TimestampedValue.of("foo", endOfGlobalWindow),
TimestampedValue.of("bar", endOfGlobalWindow))
.advanceNextBatchWatermarkToInfinity();
The first batch will see the default start-of-time WM of BoundedWindow.TIMESTAMP_MIN_VALUE
and any following batch will see the end-of-time WM BoundedWindow.TIMESTAMP_MAX_VALUE
.
Example 2:
CreateStream.of(VarIntCoder.of(), batchDuration)
.nextBatch(
TimestampedValue.of(1, instant))
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
.nextBatch(
TimestampedValue.of(2, instant))
.nextBatch(
TimestampedValue.of(3, instant))
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(30)))
The first batch will see the start-of-time WM and the second will see the advanced (+20 min.) WM. The third WM will see the WM advanced to +30 min, because this is the next advancement of the WM regardless of where it ws called in the construction of CreateStream.
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
TRANSFORM_URN |
name, resourceHints
Modifier and Type | Method and Description |
---|---|
CreateStream<T> |
advanceNextBatchWatermarkToInfinity()
Advances the watermark in the next batch to the end-of-time.
|
CreateStream<T> |
advanceWatermarkForNextBatch(Instant newWatermark)
Advances the watermark in the next batch.
|
CreateStream<T> |
emptyBatch()
Adds an empty batch.
|
PCollection<T> |
expand(PBegin input)
Override this method to specify how this
PTransform should be expanded on the given
InputT . |
long |
getBatchDuration() |
java.util.Queue<java.lang.Iterable<TimestampedValue<T>>> |
getBatches()
Get the underlying queue representing the mock stream of micro-batches.
|
java.util.Queue<GlobalWatermarkHolder.SparkWatermarks> |
getTimes()
Get times so they can be pushed into the
GlobalWatermarkHolder . |
CreateStream<T> |
initialSystemTimeAt(Instant initialSystemTime)
Set the initial synchronized processing time.
|
boolean |
isForceWatermarkSync() |
CreateStream<T> |
nextBatch(T... batchElements)
For non-timestamped elements.
|
CreateStream<T> |
nextBatch(TimestampedValue<T>... batchElements)
Enqueue next micro-batch elements.
|
static <T> CreateStream<T> |
of(Coder<T> coder,
Duration batchDuration)
Creates a new Spark based stream without forced watermark sync, intended for test purposes.
|
static <T> CreateStream<T> |
of(Coder<T> coder,
Duration batchDuration,
boolean forceWatermarkSync)
Creates a new Spark based stream intended for test purposes.
|
compose, compose, getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setResourceHints, toString, validate, validate
public static final java.lang.String TRANSFORM_URN
public static <T> CreateStream<T> of(Coder<T> coder, Duration batchDuration, boolean forceWatermarkSync)
batchDuration
- the batch duration (interval) to be used for creating this stream.coder
- the coder to be used for this stream.forceWatermarkSync
- whether this stream should be synced with the advancement of the
watermark maintained by the GlobalWatermarkHolder
.public static <T> CreateStream<T> of(Coder<T> coder, Duration batchDuration)
of(Coder, Duration, boolean)
.@SafeVarargs public final CreateStream<T> nextBatch(TimestampedValue<T>... batchElements)
Queue
so stream input order
would keep the population order (FIFO).@SafeVarargs public final CreateStream<T> nextBatch(T... batchElements)
public CreateStream<T> emptyBatch()
public CreateStream<T> initialSystemTimeAt(Instant initialSystemTime)
public CreateStream<T> advanceWatermarkForNextBatch(Instant newWatermark)
public CreateStream<T> advanceNextBatchWatermarkToInfinity()
public long getBatchDuration()
public java.util.Queue<java.lang.Iterable<TimestampedValue<T>>> getBatches()
public java.util.Queue<GlobalWatermarkHolder.SparkWatermarks> getTimes()
GlobalWatermarkHolder
.public boolean isForceWatermarkSync()
public PCollection<T> expand(PBegin input)
PTransform
PTransform
should be expanded on the given
InputT
.
NOTE: This method should not be called directly. Instead apply the PTransform
should
be applied to the InputT
using the apply
method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PBegin,PCollection<T>>