java.lang.Object
org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ChangeStreamAction

@Internal public class ChangeStreamAction extends Object
This class is responsible for processing individual ChangeStreamRecord.
  • Constructor Details

    • ChangeStreamAction

      public ChangeStreamAction(ChangeStreamMetrics metrics)
      Constructs ChangeStreamAction to process individual ChangeStreamRecord.
      Parameters:
      metrics - record beam metrics.
  • Method Details

    • run

      public Optional<DoFn.ProcessContinuation> run(PartitionRecord partitionRecord, com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord record, RestrictionTracker<StreamProgress,StreamProgress> tracker, DoFn.OutputReceiver<KV<ByteString,com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord>> receiver, ManualWatermarkEstimator<Instant> watermarkEstimator, BytesThroughputEstimator<KV<ByteString,com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord>> throughputEstimator)
      This class processes ReadChangeStreamResponse from bigtable server. There are 3 possible response types, Heartbeat, ChangeStreamMutation, CloseStream.
      • Heartbeat happens periodically based on the initial configuration set at the start of the beam pipeline. Heartbeat can advance the watermark forward and includes a continuation token that provides a point to resume from after a checkpoint.
      • ChangeStreamMutation includes the actual mutation that took place in the Bigtable. ChangeStreamMutation also includes watermark and continuation token. All ChangeStreamMutation are emitted to the outputReceiver with the timestamp of 0 (instead of the commit timestamp). Setting the timestamp to 0 discourages the use of windowing on this connector. All ChangeStreamMutations will be late data when windowing. This design decision prefers availability over consistency in the event that partitions are streamed slowly (due to an outages or other unavailabilities) the commit timestamp which drives the watermark may lag behind preventing windows from closing.
      • CloseStream indicates that the stream has come to an end. The CloseStream is not processed but stored in the RestrictionTracker to be processed later. This ensures that we successfully commit all pending ChangeStreamMutations.
      CloseStream is the only response that type will initiate a resume. Other response type will simply process the response and return empty. Returning empty signals to caller that we have processed the response, and it does not require any additional action.

      There are 2 cases that cause this function to return a non-empty ProcessContinuation.

      1. We fail to claim a StreamProgress. This can happen for a runner-initiated checkpoint. When the runner initiates a checkpoint, we will stop and checkpoint pending ChangeStreamMutations and resume from the previous StreamProgress.
      2. The response is a CloseStream. RestrictionTracker claims the CloseStream. We don't do any additional processing of the response. We return resume to signal to the caller that to checkpoint all pending ChangeStreamMutations. We expect the caller to check the RestrictionTracker includes a CloseStream and process it to close the stream.
      Parameters:
      partitionRecord - the stream partition that generated the response
      record - the change stream record to be processed
      tracker - restrictionTracker that we use to claim next block and also to store CloseStream
      receiver - to output DataChange
      watermarkEstimator - manually progress watermark when processing responses with watermark
      Returns:
      Optional.of(ProcessContinuation) if the run should be stopped or resumed, otherwise Optional.empty() to do nothing.