java.lang.Object
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao

@Internal public class MetadataTableDao extends Object
Data access object for managing the state of the metadata Bigtable table.

Metadata table is shared across many beam jobs. Each beam job uses a specific prefix to identify itself which is used as the row prefix.

  • Constructor Details

    • MetadataTableDao

      public MetadataTableDao(com.google.cloud.bigtable.data.v2.BigtableDataClient dataClient, String tableId, ByteString changeStreamNamePrefix)
  • Method Details

    • getChangeStreamNamePrefix

      public ByteString getChangeStreamNamePrefix()
      Returns:
      the prefix that is prepended to every row belonging to this beam job.
    • convertStreamPartitionRowKeyToPartition

      public com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange convertStreamPartitionRowKeyToPartition(ByteString rowKey) throws InvalidProtocolBufferException
      Convert stream partition row key to partition to process metadata read from Bigtable.

      RowKey should be directly from Cloud Bigtable and not altered in any way.

      Parameters:
      rowKey - row key from Cloud Bigtable
      Returns:
      partition extracted from rowKey
      Throws:
      InvalidProtocolBufferException - if conversion from rowKey to partition fails
    • convertPartitionToStreamPartitionRowKey

      public ByteString convertPartitionToStreamPartitionRowKey(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition)
      Convert partition to a Stream Partition row key to query for metadata of partitions that are currently being streamed.
      Parameters:
      partition - convert to row key
      Returns:
      row key to insert to Cloud Bigtable.
    • convertNewPartitionRowKeyToPartition

      public com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange convertNewPartitionRowKeyToPartition(ByteString rowKey) throws InvalidProtocolBufferException
      Convert new partition row key to partition to process metadata read from Bigtable.

      RowKey should be directly from Cloud Bigtable and not altered in any way.

      Parameters:
      rowKey - row key from Cloud Bigtable
      Returns:
      partition extracted from rowKey
      Throws:
      InvalidProtocolBufferException - if conversion from rowKey to partition fails
    • convertPartitionToNewPartitionRowKey

      public ByteString convertPartitionToNewPartitionRowKey(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition)
      Convert partition to a New Partition row key to query for partitions ready to be streamed as the result of splits and merges.
      Parameters:
      partition - convert to row key
      Returns:
      row key to insert to Cloud Bigtable.
    • readDetectNewPartitionsState

      @Nullable public DetectNewPartitionsState readDetectNewPartitionsState()
      Read the low watermark of the pipeline from Detect New Partition row.
      Returns:
      DetectNewPartitions row from the metadata table.
    • readNewPartitionsIncludingDeleted

      public List<NewPartition> readNewPartitionsIncludingDeleted() throws InvalidProtocolBufferException
      Returns:
      all the new partitions resulting from splits and merges waiting to be streamed including ones marked for deletion.
      Throws:
      InvalidProtocolBufferException
    • readNewPartitions

      public List<NewPartition> readNewPartitions() throws InvalidProtocolBufferException
      Returns:
      list the new partitions resulting from splits and merges waiting to be streamed.
      Throws:
      InvalidProtocolBufferException
    • writeNewPartition

      public void writeNewPartition(NewPartition newPartition)
      After a split or merge from a close stream, write the new partition's information to the metadata table.
      Parameters:
      newPartition - the new partition
    • markNewPartitionForDeletion

      public void markNewPartitionForDeletion(NewPartition newPartition)
      This is the 1st step of 2 phase delete. Mark each parent partition in NewPartition for deletion. This avoids double reading the new partitions row for a while to allow RCSP to clean up partitions that led to it.

      The reason behind 2 phase delete of NewPartition is to provide the invariant that the metadata table always has a copy of the ChangeStreamContinuationToken for every partition. The alternatives are either delete NewPartition before outputting PartitionRecord to RCSP or delete NewPartition after outputting PartitionRecord to RCSP.

      The former has the problem that if the output failed and the NewPartition row has been deleted, the token is now lost.

      The latter has a more complex problem of race between cleaning up NewPartition and RCSP writing StreamPartition. If clean up happens before StreamPartition is written, then at that moment the token is missing. While under normal operations, it should recover because RCSP writes StreamPartition eventually. We want to provide the invariant that there's always a copy of a token for every partition in the metadata table.

      Parameters:
      newPartition - mark for deletion.
    • deleteNewPartition

      public boolean deleteNewPartition(NewPartition newPartition)
      This is the 2nd step of 2 phase delete. Delete the newPartition cells. This should take place after the tokens have been written to StreamPartition.

      It's possible to try to delete multiple parent partition cells but only a subset are marked for deletion. Only the cells marked for deletion are deleted.

      Parameters:
      newPartition - row that represents the new partition.
      Returns:
      true if successfully deleted entire newPartition.
    • readStreamPartitionsWithWatermark

      public List<StreamPartitionWithWatermark> readStreamPartitionsWithWatermark() throws InvalidProtocolBufferException
      Return list of locked StreamPartition and their watermarks.
      Returns:
      list of partitions currently being streamed that have a watermark.
      Throws:
      InvalidProtocolBufferException
    • readAllStreamPartitions

      public List<PartitionRecord> readAllStreamPartitions() throws InvalidProtocolBufferException
      Read all the StreamPartition and output PartitionRecord to stream them.
      Returns:
      list of PartitionRecord of all StreamPartitions in the metadata table.
      Throws:
      InvalidProtocolBufferException
    • updateDetectNewPartitionWatermark

      public void updateDetectNewPartitionWatermark(Instant watermark)
      Update the watermark cell for Detect New Partition step.
      Parameters:
      watermark - watermark value to set for the cell
    • updateWatermark

      public void updateWatermark(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition, Instant watermark, @Nullable com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken currentToken)
      Update the metadata for the row key represented by the partition.
      Parameters:
      partition - forms the row key of the row to update
      watermark - watermark value to set for the cell
      currentToken - continuation token to set for the cell
    • releaseStreamPartitionLockForDeletion

      public boolean releaseStreamPartitionLockForDeletion(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition, String uuid)
      This is the 1st step of 2 phase delete of StreamPartition. Unset the uuid of StreamPartition if the uuid matches and set the deletion bit. This prepares the StreamPartition to be deleted.

      The reason to have 2 phase delete is

      • If we delete StreamPartition before we write NewPartitions, then we can't satisfy the invariant that there always exists a continuation token for every row key; no matter how temporary.
      • If we delete StreamPartition after we write NewPartitions, we allow a more complex and rare race problem. After we write the NewPartition but before we delete StreamPartition the following series of event take place. DNP processes the NewPartitions. The NewPartitions start streaming and then immediately merge back to the same partition as the partition we're about to Delete. The new partition tries to lock the StreamPartition row. Locking fails because the lock is held by the RCSP that we're deleting.

        For example: the RCSP with id 1234 is working on partition AC. Metadata table has the row StreamPartition#AC with uuid=1234. Partition AC splits into AB and BC. We write NewPartition#AB and NewPartition#BC. But we haven't deleted StreamPartition#AC yet. Before we clean up StreamPartition#AC, DNP processes AB and BC. AB and BC start streaming. AB and BC immediately merge back to AC. DNP processes the NewPartition#AC and outputs AC. A new RCSP with id 5678 tries to lock StreamPartition#AC to start processing AC. This fails because RCSP 1234 still holds the lock. Once the lock fails RCSP 5678 stops. RCSP 1234 eventually cleans up StreamPartition#AC and no RCSP is streaming AC anymore.

      The solution is a 2 phase delete
      1. Release the lock and mark the StreamPartition for deletion
      2. Write NewPartition rows
      3. Delete StreamPartition if and only if deletion bit is set
      We added the deletion bit to prevent accidentally deleting a new StreamPartition as the result of the split and merge race described above. If the above scenario happens, the new partition will be able to lock the StreamPartition row. But we don't want to delete the StreamPartition, so we check if it is still marked for deletion. Obviously, locking the row unsets the deletion bit, so it can't be deleted.
      Parameters:
      partition - release the lock for this partition
      uuid - match the uuid
      Returns:
      true if releasing the lock was successful.
    • deleteStreamPartitionRow

      public boolean deleteStreamPartitionRow(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition)
      This is the 2nd step of 2 phase delete of StreamPartition. Delete the row key represented by the partition. This represents that the partition will no longer be streamed. Only delete if shouldDelete bit is set.
      Parameters:
      partition - forms the row key of the row to delete
    • doHoldLock

      public boolean doHoldLock(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition, String uuid)
      Return true if the uuid holds the lock of the partition.
      Parameters:
      partition - partition to check if lock is held
      uuid - to check if it holds the lock
      Returns:
      true if uuid holds the lock, otherwise false.
    • lockAndRecordPartition

      public boolean lockAndRecordPartition(PartitionRecord partitionRecord)
      Lock the partition in the metadata table for the DoFn streaming it. Only one DoFn is allowed to stream a specific partition at any time. Each DoFn has an uuid and will try to lock the partition at the very start of the stream. If another DoFn has already locked the partition (i.e. the uuid in the cell for the partition belongs to the DoFn), any future DoFn trying to lock the same partition will fail. Also unset the deletion bit.
      Parameters:
      partitionRecord - partition to lock
      Returns:
      true if uuid holds or acquired the lock, otherwise false.
    • writeDetectNewPartitionVersion

      public void writeDetectNewPartitionVersion()
      Set the version number for DetectNewPartition. This value can be checked later to verify that the existing metadata table is compatible with current beam connector code.
    • readDetectNewPartitionMissingPartitions

      public HashMap<com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange,Instant> readDetectNewPartitionMissingPartitions()
      Read and deserialize missing partition and how long they have been missing from the metadata table.
      Returns:
      deserialized missing partitions and duration.
    • writeDetectNewPartitionMissingPartitions

      public void writeDetectNewPartitionMissingPartitions(HashMap<com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange,Instant> missingPartitionDurations)
      Write to metadata table serialized missing partitions and how long they have been missing.
      Parameters:
      missingPartitionDurations - missing partitions and duration.