Class ReadChangeStreamPartitionAction

java.lang.Object
org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ReadChangeStreamPartitionAction

@Internal public class ReadChangeStreamPartitionAction extends Object
This class is part of ReadChangeStreamPartitionDoFn SDF.
  • Constructor Details

  • Method Details

    • run

      public DoFn.ProcessContinuation run(PartitionRecord partitionRecord, RestrictionTracker<StreamProgress,StreamProgress> tracker, DoFn.OutputReceiver<KV<ByteString,com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord>> receiver, ManualWatermarkEstimator<Instant> watermarkEstimator) throws IOException
      Streams changes from a specific partition. This function is responsible for maintaining the lifecycle of streaming the partition. We delegate to ChangeStreamAction to process individual response from the change stream.

      Before we send a request to Cloud Bigtable to stream the partition, we need to perform a few things.

      1. Lock the partition. Due to the design of the change streams connector, it is possible that multiple DoFn are started trying to stream the same partition. However, only 1 DoFn should be streaming a partition. So we solve this by using the metadata table as a distributed lock. We attempt to lock the partition for this DoFn's UUID. If it is successful, it means this DoFn is the only one that can stream the partition and continue. Otherwise, terminate this DoFn because another DoFn is streaming this partition already.
      2. Process CloseStream if it exists. In order to solve a possible inconsistent state problem, we do not process CloseStream after receiving it. We claim the CloseStream in the RestrictionTracker so it persists after a checkpoint. We checkpoint to flush all the DataChanges. Then on resume, we process the CloseStream. There are only 2 expected Status for CloseStream: OK and Out of Range.
        1. OK status is returned when the predetermined endTime has been reached. In this case, we update the watermark and update the metadata table. DetectNewPartitionsDoFn aggregates the watermark from all the streams to ensure all the streams have reached beyond endTime so it can also terminate and end the beam job.
        2. Out of Range is returned when the partition has either been split into more partitions or merged into a larger partition. In this case, we write to the metadata table the new partitions' information so that DetectNewPartitionsDoFn can read and output those new partitions to be streamed. We also need to ensure we clean up this partition's metadata to release the lock.
      3. Update the metadata table with the watermark and additional debugging info.
      4. Stream the partition.
      Parameters:
      partitionRecord - partition information used to identify this stream
      tracker - restriction tracker of ReadChangeStreamPartitionDoFn
      receiver - output receiver for ReadChangeStreamPartitionDoFn
      watermarkEstimator - watermark estimator ReadChangeStreamPartitionDoFn
      Returns:
      DoFn.ProcessContinuation.stop() if a checkpoint is required or the stream has completed. Or DoFn.ProcessContinuation.resume() if a checkpoint is required.
      Throws:
      IOException - when stream fails.