Class CreateStream<T>

java.lang.Object
org.apache.beam.sdk.transforms.PTransform<PBegin,PCollection<T>>
org.apache.beam.runners.spark.io.CreateStream<T>
Type Parameters:
T - The type of the element in this stream.
All Implemented Interfaces:
Serializable, HasDisplayData

public final class CreateStream<T> extends PTransform<PBegin,PCollection<T>>
Create an input stream from Queue. For SparkRunner tests only.

To properly compose a stream of micro-batches with their Watermarks, please keep in mind that eventually there a two queues here - one for batches and another for Watermarks.

While both queues advance according to Spark's batch-interval, there is a slight difference in how data is pushed into the stream compared to the advancement of Watermarks since Watermarks advance onBatchCompleted hook call so if you'd want to set the watermark advance for a specific batch it should be called before that batch. Also keep in mind that being a queue that is polled per batch interval, if there is a need to "hold" the same Watermark without advancing it, it should be stated explicitly or the Watermark will advance as soon as it can (in the next batch completed hook).

Example 1:


 CreateStream.of(StringUtf8Coder.of(), batchDuration)
   .nextBatch(
     TimestampedValue.of("foo", endOfGlobalWindow),
     TimestampedValue.of("bar", endOfGlobalWindow))
   .advanceNextBatchWatermarkToInfinity();
 
The first batch will see the default start-of-time WM of BoundedWindow.TIMESTAMP_MIN_VALUE and any following batch will see the end-of-time WM BoundedWindow.TIMESTAMP_MAX_VALUE.

Example 2:


 CreateStream.of(VarIntCoder.of(), batchDuration)
     .nextBatch(
         TimestampedValue.of(1, instant))
     .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
     .nextBatch(
         TimestampedValue.of(2, instant))
     .nextBatch(
         TimestampedValue.of(3, instant))
     .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(30)))
 

The first batch will see the start-of-time WM and the second will see the advanced (+20 min.) WM. The third WM will see the WM advanced to +30 min, because this is the next advancement of the WM regardless of where it ws called in the construction of CreateStream.

See Also:
  • Field Details

  • Method Details

    • of

      public static <T> CreateStream<T> of(Coder<T> coder, Duration batchDuration, boolean forceWatermarkSync)
      Creates a new Spark based stream intended for test purposes.
      Parameters:
      coder - the coder to be used for this stream.
      batchDuration - the batch duration (interval) to be used for creating this stream.
      forceWatermarkSync - whether this stream should be synced with the advancement of the watermark maintained by the GlobalWatermarkHolder.
    • of

      public static <T> CreateStream<T> of(Coder<T> coder, Duration batchDuration)
      Creates a new Spark based stream without forced watermark sync, intended for test purposes. See also of(Coder, Duration, boolean).
    • nextBatch

      @SafeVarargs public final CreateStream<T> nextBatch(TimestampedValue<T>... batchElements)
      Enqueue next micro-batch elements. This is backed by a Queue so stream input order would keep the population order (FIFO).
    • nextBatch

      @SafeVarargs public final CreateStream<T> nextBatch(T... batchElements)
      For non-timestamped elements.
    • emptyBatch

      public CreateStream<T> emptyBatch()
      Adds an empty batch.
    • initialSystemTimeAt

      public CreateStream<T> initialSystemTimeAt(Instant initialSystemTime)
      Set the initial synchronized processing time.
    • advanceWatermarkForNextBatch

      public CreateStream<T> advanceWatermarkForNextBatch(Instant newWatermark)
      Advances the watermark in the next batch.
    • advanceNextBatchWatermarkToInfinity

      public CreateStream<T> advanceNextBatchWatermarkToInfinity()
      Advances the watermark in the next batch to the end-of-time.
    • getBatchDuration

      public long getBatchDuration()
    • getBatches

      public Queue<Iterable<TimestampedValue<T>>> getBatches()
      Get the underlying queue representing the mock stream of micro-batches.
    • getTimes

      Get times so they can be pushed into the GlobalWatermarkHolder.
    • isForceWatermarkSync

      public boolean isForceWatermarkSync()
    • expand

      public PCollection<T> expand(PBegin input)
      Description copied from class: PTransform
      Override this method to specify how this PTransform should be expanded on the given InputT.

      NOTE: This method should not be called directly. Instead apply the PTransform should be applied to the InputT using the apply method.

      Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).

      Specified by:
      expand in class PTransform<PBegin,PCollection<T>>