Class ChangeStreamAction
java.lang.Object
org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ChangeStreamAction
This class is responsible for processing individual ChangeStreamRecord.
-
Constructor Summary
ConstructorsConstructorDescriptionChangeStreamAction
(ChangeStreamMetrics metrics) Constructs ChangeStreamAction to process individual ChangeStreamRecord. -
Method Summary
Modifier and TypeMethodDescriptionrun
(PartitionRecord partitionRecord, com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord record, RestrictionTracker<StreamProgress, StreamProgress> tracker, DoFn.OutputReceiver<KV<ByteString, com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord>> receiver, ManualWatermarkEstimator<Instant> watermarkEstimator, BytesThroughputEstimator<KV<ByteString, com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord>> throughputEstimator) This class processes ReadChangeStreamResponse from bigtable server.
-
Constructor Details
-
ChangeStreamAction
Constructs ChangeStreamAction to process individual ChangeStreamRecord.- Parameters:
metrics
- record beam metrics.
-
-
Method Details
-
run
public Optional<DoFn.ProcessContinuation> run(PartitionRecord partitionRecord, com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord record, RestrictionTracker<StreamProgress, StreamProgress> tracker, DoFn.OutputReceiver<KV<ByteString, com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord>> receiver, ManualWatermarkEstimator<Instant> watermarkEstimator, BytesThroughputEstimator<KV<ByteString, com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord>> throughputEstimator) This class processes ReadChangeStreamResponse from bigtable server. There are 3 possible response types, Heartbeat, ChangeStreamMutation, CloseStream.- Heartbeat happens periodically based on the initial configuration set at the start of the beam pipeline. Heartbeat can advance the watermark forward and includes a continuation token that provides a point to resume from after a checkpoint.
- ChangeStreamMutation includes the actual mutation that took place in the Bigtable. ChangeStreamMutation also includes watermark and continuation token. All ChangeStreamMutation are emitted to the outputReceiver with the timestamp of 0 (instead of the commit timestamp). Setting the timestamp to 0 discourages the use of windowing on this connector. All ChangeStreamMutations will be late data when windowing. This design decision prefers availability over consistency in the event that partitions are streamed slowly (due to an outages or other unavailabilities) the commit timestamp which drives the watermark may lag behind preventing windows from closing.
- CloseStream indicates that the stream has come to an end. The CloseStream is not processed but stored in the RestrictionTracker to be processed later. This ensures that we successfully commit all pending ChangeStreamMutations.
There are 2 cases that cause this function to return a non-empty ProcessContinuation.
- We fail to claim a StreamProgress. This can happen for a runner-initiated checkpoint. When the runner initiates a checkpoint, we will stop and checkpoint pending ChangeStreamMutations and resume from the previous StreamProgress.
- The response is a CloseStream. RestrictionTracker claims the CloseStream. We don't do any additional processing of the response. We return resume to signal to the caller that to checkpoint all pending ChangeStreamMutations. We expect the caller to check the RestrictionTracker includes a CloseStream and process it to close the stream.
- Parameters:
partitionRecord
- the stream partition that generated the responserecord
- the change stream record to be processedtracker
- restrictionTracker that we use to claim next block and also to store CloseStreamreceiver
- to output DataChangewatermarkEstimator
- manually progress watermark when processing responses with watermark- Returns:
- Optional.of(ProcessContinuation) if the run should be stopped or resumed, otherwise Optional.empty() to do nothing.
-