T
- stream type.public final class CreateStream<T> extends PTransform<PBegin,PCollection<T>>
To properly compose a stream of micro-batches with their Watermarks, please keep in mind that eventually there a two queues here - one for batches and another for Watermarks.
While both queues advance according to Spark's batch-interval, there is a slight difference in how data is pushed into the stream compared to the advancement of Watermarks since Watermarks advance onBatchCompleted hook call so if you'd want to set the watermark advance for a specific batch it should be called before that batch. Also keep in mind that being a queue that is polled per batch interval, if there is a need to "hold" the same Watermark without advancing it it should be stated explicitly or the Watermark will advance as soon as it can (in the next batch completed hook).
Example 1:
CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration)
.nextBatch(
TimestampedValue.of("foo", endOfGlobalWindow),
TimestampedValue.of("bar", endOfGlobalWindow))
.advanceNextBatchWatermarkToInfinity();
The first batch will see the default start-of-time WM of
BoundedWindow.TIMESTAMP_MIN_VALUE
and any following batch will see
the end-of-time WM BoundedWindow.TIMESTAMP_MAX_VALUE
.
Example 2:
CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration)
.nextBatch(
TimestampedValue.of(1, instant))
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
.nextBatch(
TimestampedValue.of(2, instant))
.nextBatch(
TimestampedValue.of(3, instant))
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(30)))
The first batch will see the start-of-time WM and the second will see the advanced (+20 min.) WM.
The third WM will see the WM advanced to +30 min, because this is the next advancement of the WM
regardless of where it ws called in the construction of CreateStream.
//TODO: write a proper Builder enforcing all those rules mentioned.
name
Modifier and Type | Method and Description |
---|---|
CreateStream<T> |
advanceNextBatchWatermarkToInfinity()
Advances the watermark in the next batch to the end-of-time.
|
CreateStream<T> |
advanceWatermarkForNextBatch(Instant newWatermark)
Advances the watermark in the next batch.
|
CreateStream<T> |
emptyBatch()
Adds an empty batch.
|
PCollection<T> |
expand(PBegin input)
Applies this
PTransform on the given InputT , and returns its
Output . |
java.util.Queue<java.lang.Iterable<TimestampedValue<T>>> |
getBatches()
Get the underlying queue representing the mock stream of micro-batches.
|
protected Coder<T> |
getDefaultOutputCoder()
Returns the default
Coder to use for the output of this
single-output PTransform . |
java.util.Queue<GlobalWatermarkHolder.SparkWatermarks> |
getTimes()
Get times so they can be pushed into the
GlobalWatermarkHolder . |
CreateStream<T> |
initialSystemTimeAt(Instant initialSystemTime)
Set the initial synchronized processing time.
|
CreateStream<T> |
nextBatch(T... batchElements)
For non-timestamped elements.
|
CreateStream<T> |
nextBatch(TimestampedValue<T>... batchElements)
Enqueue next micro-batch elements.
|
static <T> CreateStream<T> |
of(Coder<T> coder,
Duration batchInterval)
Set the batch interval for the stream.
|
getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, populateDisplayData, toString, validate
public static <T> CreateStream<T> of(Coder<T> coder, Duration batchInterval)
@SafeVarargs public final CreateStream<T> nextBatch(TimestampedValue<T>... batchElements)
Queue
so stream input order would keep the population order (FIFO).@SafeVarargs public final CreateStream<T> nextBatch(T... batchElements)
public CreateStream<T> emptyBatch()
public CreateStream<T> initialSystemTimeAt(Instant initialSystemTime)
public CreateStream<T> advanceWatermarkForNextBatch(Instant newWatermark)
public CreateStream<T> advanceNextBatchWatermarkToInfinity()
public java.util.Queue<java.lang.Iterable<TimestampedValue<T>>> getBatches()
public java.util.Queue<GlobalWatermarkHolder.SparkWatermarks> getTimes()
GlobalWatermarkHolder
.public PCollection<T> expand(PBegin input)
PTransform
PTransform
on the given InputT
, and returns its
Output
.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PBegin,PCollection<T>>
protected Coder<T> getDefaultOutputCoder() throws CannotProvideCoderException
PTransform
Coder
to use for the output of this
single-output PTransform
.
By default, always throws
getDefaultOutputCoder
in class PTransform<PBegin,PCollection<T>>
CannotProvideCoderException
- if no coder can be inferred