Class ReadChangeStreamPartitionAction
java.lang.Object
org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ReadChangeStreamPartitionAction
This class is part of
ReadChangeStreamPartitionDoFn
SDF.-
Constructor Summary
ConstructorsConstructorDescriptionReadChangeStreamPartitionAction
(MetadataTableDao metadataTableDao, ChangeStreamDao changeStreamDao, ChangeStreamMetrics metrics, ChangeStreamAction changeStreamAction, Duration heartbeatDuration, SizeEstimator<KV<ByteString, com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord>> sizeEstimator) -
Method Summary
Modifier and TypeMethodDescriptionrun
(PartitionRecord partitionRecord, RestrictionTracker<StreamProgress, StreamProgress> tracker, DoFn.OutputReceiver<KV<ByteString, com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord>> receiver, ManualWatermarkEstimator<Instant> watermarkEstimator) Streams changes from a specific partition.
-
Constructor Details
-
ReadChangeStreamPartitionAction
public ReadChangeStreamPartitionAction(MetadataTableDao metadataTableDao, ChangeStreamDao changeStreamDao, ChangeStreamMetrics metrics, ChangeStreamAction changeStreamAction, Duration heartbeatDuration, SizeEstimator<KV<ByteString, com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord>> sizeEstimator)
-
-
Method Details
-
run
public DoFn.ProcessContinuation run(PartitionRecord partitionRecord, RestrictionTracker<StreamProgress, StreamProgress> tracker, DoFn.OutputReceiver<KV<ByteString, throws IOExceptioncom.google.cloud.bigtable.data.v2.models.ChangeStreamRecord>> receiver, ManualWatermarkEstimator<Instant> watermarkEstimator) Streams changes from a specific partition. This function is responsible for maintaining the lifecycle of streaming the partition. We delegate toChangeStreamAction
to process individual response from the change stream.Before we send a request to Cloud Bigtable to stream the partition, we need to perform a few things.
- Lock the partition. Due to the design of the change streams connector, it is possible that multiple DoFn are started trying to stream the same partition. However, only 1 DoFn should be streaming a partition. So we solve this by using the metadata table as a distributed lock. We attempt to lock the partition for this DoFn's UUID. If it is successful, it means this DoFn is the only one that can stream the partition and continue. Otherwise, terminate this DoFn because another DoFn is streaming this partition already.
- Process CloseStream if it exists. In order to solve a possible inconsistent state
problem, we do not process CloseStream after receiving it. We claim the CloseStream in
the RestrictionTracker so it persists after a checkpoint. We checkpoint to flush all the
DataChanges. Then on resume, we process the CloseStream. There are only 2 expected Status
for CloseStream: OK and Out of Range.
- OK status is returned when the predetermined endTime has been reached. In this
case, we update the watermark and update the metadata table.
DetectNewPartitionsDoFn
aggregates the watermark from all the streams to ensure all the streams have reached beyond endTime so it can also terminate and end the beam job. - Out of Range is returned when the partition has either been split into more
partitions or merged into a larger partition. In this case, we write to the
metadata table the new partitions' information so that
DetectNewPartitionsDoFn
can read and output those new partitions to be streamed. We also need to ensure we clean up this partition's metadata to release the lock.
- OK status is returned when the predetermined endTime has been reached. In this
case, we update the watermark and update the metadata table.
- Update the metadata table with the watermark and additional debugging info.
- Stream the partition.
- Parameters:
partitionRecord
- partition information used to identify this streamtracker
- restriction tracker ofReadChangeStreamPartitionDoFn
receiver
- output receiver forReadChangeStreamPartitionDoFn
watermarkEstimator
- watermark estimatorReadChangeStreamPartitionDoFn
- Returns:
DoFn.ProcessContinuation.stop()
if a checkpoint is required or the stream has completed. OrDoFn.ProcessContinuation.resume()
if a checkpoint is required.- Throws:
IOException
- when stream fails.
-