Class MetadataTableDao
Metadata table is shared across many beam jobs. Each beam job uses a specific prefix to identify itself which is used as the row prefix.
-
Constructor Summary
ConstructorsConstructorDescriptionMetadataTableDao
(com.google.cloud.bigtable.data.v2.BigtableDataClient dataClient, String tableId, ByteString changeStreamNamePrefix) -
Method Summary
Modifier and TypeMethodDescriptioncom.google.cloud.bigtable.data.v2.models.Range.ByteStringRange
Convert new partition row key to partition to process metadata read from Bigtable.convertPartitionToNewPartitionRowKey
(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition) Convert partition to a New Partition row key to query for partitions ready to be streamed as the result of splits and merges.convertPartitionToStreamPartitionRowKey
(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition) Convert partition to a Stream Partition row key to query for metadata of partitions that are currently being streamed.com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange
Convert stream partition row key to partition to process metadata read from Bigtable.boolean
deleteNewPartition
(NewPartition newPartition) This is the 2nd step of 2 phase delete.boolean
deleteStreamPartitionRow
(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition) This is the 2nd step of 2 phase delete of StreamPartition.boolean
doHoldLock
(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition, String uuid) Return true if the uuid holds the lock of the partition.boolean
lockAndRecordPartition
(PartitionRecord partitionRecord) Lock the partition in the metadata table for the DoFn streaming it.void
markNewPartitionForDeletion
(NewPartition newPartition) This is the 1st step of 2 phase delete.Read all the StreamPartition and output PartitionRecord to stream them.Read and deserialize missing partition and how long they have been missing from the metadata table.Read the low watermark of the pipeline from Detect New Partition row.Return list of locked StreamPartition and their watermarks.boolean
releaseStreamPartitionLockForDeletion
(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition, String uuid) This is the 1st step of 2 phase delete of StreamPartition.void
updateDetectNewPartitionWatermark
(Instant watermark) Update the watermark cell for Detect New Partition step.void
updateWatermark
(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition, Instant watermark, com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken currentToken) Update the metadata for the row key represented by the partition.void
writeDetectNewPartitionMissingPartitions
(HashMap<com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange, Instant> missingPartitionDurations) Write to metadata table serialized missing partitions and how long they have been missing.void
Set the version number for DetectNewPartition.void
writeNewPartition
(NewPartition newPartition) After a split or merge from a close stream, write the new partition's information to the metadata table.
-
Constructor Details
-
MetadataTableDao
public MetadataTableDao(com.google.cloud.bigtable.data.v2.BigtableDataClient dataClient, String tableId, ByteString changeStreamNamePrefix)
-
-
Method Details
-
getChangeStreamNamePrefix
- Returns:
- the prefix that is prepended to every row belonging to this beam job.
-
convertStreamPartitionRowKeyToPartition
public com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange convertStreamPartitionRowKeyToPartition(ByteString rowKey) throws InvalidProtocolBufferException Convert stream partition row key to partition to process metadata read from Bigtable.RowKey should be directly from Cloud Bigtable and not altered in any way.
- Parameters:
rowKey
- row key from Cloud Bigtable- Returns:
- partition extracted from rowKey
- Throws:
InvalidProtocolBufferException
- if conversion from rowKey to partition fails
-
convertPartitionToStreamPartitionRowKey
public ByteString convertPartitionToStreamPartitionRowKey(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition) Convert partition to a Stream Partition row key to query for metadata of partitions that are currently being streamed.- Parameters:
partition
- convert to row key- Returns:
- row key to insert to Cloud Bigtable.
-
convertNewPartitionRowKeyToPartition
public com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange convertNewPartitionRowKeyToPartition(ByteString rowKey) throws InvalidProtocolBufferException Convert new partition row key to partition to process metadata read from Bigtable.RowKey should be directly from Cloud Bigtable and not altered in any way.
- Parameters:
rowKey
- row key from Cloud Bigtable- Returns:
- partition extracted from rowKey
- Throws:
InvalidProtocolBufferException
- if conversion from rowKey to partition fails
-
convertPartitionToNewPartitionRowKey
public ByteString convertPartitionToNewPartitionRowKey(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition) Convert partition to a New Partition row key to query for partitions ready to be streamed as the result of splits and merges.- Parameters:
partition
- convert to row key- Returns:
- row key to insert to Cloud Bigtable.
-
readDetectNewPartitionsState
Read the low watermark of the pipeline from Detect New Partition row.- Returns:
- DetectNewPartitions row from the metadata table.
-
readNewPartitionsIncludingDeleted
- Returns:
- all the new partitions resulting from splits and merges waiting to be streamed including ones marked for deletion.
- Throws:
InvalidProtocolBufferException
-
readNewPartitions
- Returns:
- list the new partitions resulting from splits and merges waiting to be streamed.
- Throws:
InvalidProtocolBufferException
-
writeNewPartition
After a split or merge from a close stream, write the new partition's information to the metadata table.- Parameters:
newPartition
- the new partition
-
markNewPartitionForDeletion
This is the 1st step of 2 phase delete. Mark each parent partition in NewPartition for deletion. This avoids double reading the new partitions row for a while to allow RCSP to clean up partitions that led to it.The reason behind 2 phase delete of NewPartition is to provide the invariant that the metadata table always has a copy of the ChangeStreamContinuationToken for every partition. The alternatives are either delete NewPartition before outputting PartitionRecord to RCSP or delete NewPartition after outputting PartitionRecord to RCSP.
The former has the problem that if the output failed and the NewPartition row has been deleted, the token is now lost.
The latter has a more complex problem of race between cleaning up NewPartition and RCSP writing StreamPartition. If clean up happens before StreamPartition is written, then at that moment the token is missing. While under normal operations, it should recover because RCSP writes StreamPartition eventually. We want to provide the invariant that there's always a copy of a token for every partition in the metadata table.
- Parameters:
newPartition
- mark for deletion.
-
deleteNewPartition
This is the 2nd step of 2 phase delete. Delete the newPartition cells. This should take place after the tokens have been written to StreamPartition.It's possible to try to delete multiple parent partition cells but only a subset are marked for deletion. Only the cells marked for deletion are deleted.
- Parameters:
newPartition
- row that represents the new partition.- Returns:
- true if successfully deleted entire newPartition.
-
readStreamPartitionsWithWatermark
public List<StreamPartitionWithWatermark> readStreamPartitionsWithWatermark() throws InvalidProtocolBufferExceptionReturn list of locked StreamPartition and their watermarks.- Returns:
- list of partitions currently being streamed that have a watermark.
- Throws:
InvalidProtocolBufferException
-
readAllStreamPartitions
Read all the StreamPartition and output PartitionRecord to stream them.- Returns:
- list of PartitionRecord of all StreamPartitions in the metadata table.
- Throws:
InvalidProtocolBufferException
-
updateDetectNewPartitionWatermark
Update the watermark cell for Detect New Partition step.- Parameters:
watermark
- watermark value to set for the cell
-
updateWatermark
public void updateWatermark(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition, Instant watermark, @Nullable com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken currentToken) Update the metadata for the row key represented by the partition.- Parameters:
partition
- forms the row key of the row to updatewatermark
- watermark value to set for the cellcurrentToken
- continuation token to set for the cell
-
releaseStreamPartitionLockForDeletion
public boolean releaseStreamPartitionLockForDeletion(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition, String uuid) This is the 1st step of 2 phase delete of StreamPartition. Unset the uuid of StreamPartition if the uuid matches and set the deletion bit. This prepares the StreamPartition to be deleted.The reason to have 2 phase delete is
- If we delete StreamPartition before we write NewPartitions, then we can't satisfy the invariant that there always exists a continuation token for every row key; no matter how temporary.
- If we delete StreamPartition after we write NewPartitions, we allow a more complex
and rare race problem. After we write the NewPartition but before we delete
StreamPartition the following series of event take place. DNP processes the
NewPartitions. The NewPartitions start streaming and then immediately merge back to the
same partition as the partition we're about to Delete. The new partition tries to lock
the StreamPartition row. Locking fails because the lock is held by the RCSP that we're
deleting.
For example: the RCSP with id
1234
is working on partition AC. Metadata table has the rowStreamPartition#AC
withuuid=1234
. Partition AC splits into AB and BC. We writeNewPartition#AB
andNewPartition#BC
. But we haven't deletedStreamPartition#AC
yet. Before we clean upStreamPartition#AC
, DNP processes AB and BC. AB and BC start streaming. AB and BC immediately merge back to AC. DNP processes theNewPartition#AC
and outputs AC. A new RCSP with id 5678 tries to lockStreamPartition#AC
to start processing AC. This fails because RCSP 1234 still holds the lock. Once the lock fails RCSP 5678 stops. RCSP 1234 eventually cleans upStreamPartition#AC
and no RCSP is streaming AC anymore.
- Release the lock and mark the StreamPartition for deletion
- Write NewPartition rows
- Delete StreamPartition if and only if deletion bit is set
- Parameters:
partition
- release the lock for this partitionuuid
- match the uuid- Returns:
- true if releasing the lock was successful.
-
deleteStreamPartitionRow
public boolean deleteStreamPartitionRow(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition) This is the 2nd step of 2 phase delete of StreamPartition. Delete the row key represented by the partition. This represents that the partition will no longer be streamed. Only delete if shouldDelete bit is set.- Parameters:
partition
- forms the row key of the row to delete
-
doHoldLock
public boolean doHoldLock(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition, String uuid) Return true if the uuid holds the lock of the partition.- Parameters:
partition
- partition to check if lock is helduuid
- to check if it holds the lock- Returns:
- true if uuid holds the lock, otherwise false.
-
lockAndRecordPartition
Lock the partition in the metadata table for the DoFn streaming it. Only one DoFn is allowed to stream a specific partition at any time. Each DoFn has an uuid and will try to lock the partition at the very start of the stream. If another DoFn has already locked the partition (i.e. the uuid in the cell for the partition belongs to the DoFn), any future DoFn trying to lock the same partition will fail. Also unset the deletion bit.- Parameters:
partitionRecord
- partition to lock- Returns:
- true if uuid holds or acquired the lock, otherwise false.
-
writeDetectNewPartitionVersion
public void writeDetectNewPartitionVersion()Set the version number for DetectNewPartition. This value can be checked later to verify that the existing metadata table is compatible with current beam connector code. -
readDetectNewPartitionMissingPartitions
public HashMap<com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange,Instant> readDetectNewPartitionMissingPartitions()Read and deserialize missing partition and how long they have been missing from the metadata table.- Returns:
- deserialized missing partitions and duration.
-
writeDetectNewPartitionMissingPartitions
public void writeDetectNewPartitionMissingPartitions(HashMap<com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange, Instant> missingPartitionDurations) Write to metadata table serialized missing partitions and how long they have been missing.- Parameters:
missingPartitionDurations
- missing partitions and duration.
-