@Internal public class MetadataTableDao extends java.lang.Object
Metadata table is shared across many beam jobs. Each beam job uses a specific prefix to identify itself which is used as the row prefix.
Constructor and Description |
---|
MetadataTableDao(com.google.cloud.bigtable.data.v2.BigtableDataClient dataClient,
java.lang.String tableId,
com.google.protobuf.ByteString changeStreamNamePrefix) |
Modifier and Type | Method and Description |
---|---|
com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange |
convertNewPartitionRowKeyToPartition(com.google.protobuf.ByteString rowKey)
Convert new partition row key to partition to process metadata read from Bigtable.
|
com.google.protobuf.ByteString |
convertPartitionToNewPartitionRowKey(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition)
Convert partition to a New Partition row key to query for partitions ready to be streamed as
the result of splits and merges.
|
com.google.protobuf.ByteString |
convertPartitionToStreamPartitionRowKey(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition)
Convert partition to a Stream Partition row key to query for metadata of partitions that are
currently being streamed.
|
com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange |
convertStreamPartitionRowKeyToPartition(com.google.protobuf.ByteString rowKey)
Convert stream partition row key to partition to process metadata read from Bigtable.
|
boolean |
deleteNewPartition(NewPartition newPartition)
This is the 2nd step of 2 phase delete.
|
boolean |
deleteStreamPartitionRow(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition)
This is the 2nd step of 2 phase delete of StreamPartition.
|
boolean |
doHoldLock(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition,
java.lang.String uuid)
Return true if the uuid holds the lock of the partition.
|
com.google.protobuf.ByteString |
getChangeStreamNamePrefix() |
boolean |
lockAndRecordPartition(PartitionRecord partitionRecord)
Lock the partition in the metadata table for the DoFn streaming it.
|
void |
markNewPartitionForDeletion(NewPartition newPartition)
This is the 1st step of 2 phase delete.
|
java.util.List<PartitionRecord> |
readAllStreamPartitions()
Read all the StreamPartition and output PartitionRecord to stream them.
|
java.util.HashMap<com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange,Instant> |
readDetectNewPartitionMissingPartitions()
Read and deserialize missing partition and how long they have been missing from the metadata
table.
|
DetectNewPartitionsState |
readDetectNewPartitionsState()
Read the low watermark of the pipeline from Detect New Partition row.
|
java.util.List<NewPartition> |
readNewPartitions() |
java.util.List<NewPartition> |
readNewPartitionsIncludingDeleted() |
java.util.List<StreamPartitionWithWatermark> |
readStreamPartitionsWithWatermark()
Return list of locked StreamPartition and their watermarks.
|
boolean |
releaseStreamPartitionLockForDeletion(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition,
java.lang.String uuid)
This is the 1st step of 2 phase delete of StreamPartition.
|
void |
updateDetectNewPartitionWatermark(Instant watermark)
Update the watermark cell for Detect New Partition step.
|
void |
updateWatermark(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition,
Instant watermark,
com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken currentToken)
Update the metadata for the row key represented by the partition.
|
void |
writeDetectNewPartitionMissingPartitions(java.util.HashMap<com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange,Instant> missingPartitionDurations)
Write to metadata table serialized missing partitions and how long they have been missing.
|
void |
writeDetectNewPartitionVersion()
Set the version number for DetectNewPartition.
|
void |
writeNewPartition(NewPartition newPartition)
After a split or merge from a close stream, write the new partition's information to the
metadata table.
|
public MetadataTableDao(com.google.cloud.bigtable.data.v2.BigtableDataClient dataClient, java.lang.String tableId, com.google.protobuf.ByteString changeStreamNamePrefix)
public com.google.protobuf.ByteString getChangeStreamNamePrefix()
public com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange convertStreamPartitionRowKeyToPartition(com.google.protobuf.ByteString rowKey) throws com.google.protobuf.InvalidProtocolBufferException
RowKey should be directly from Cloud Bigtable and not altered in any way.
rowKey
- row key from Cloud Bigtablecom.google.protobuf.InvalidProtocolBufferException
- if conversion from rowKey to partition failspublic com.google.protobuf.ByteString convertPartitionToStreamPartitionRowKey(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition)
partition
- convert to row keypublic com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange convertNewPartitionRowKeyToPartition(com.google.protobuf.ByteString rowKey) throws com.google.protobuf.InvalidProtocolBufferException
RowKey should be directly from Cloud Bigtable and not altered in any way.
rowKey
- row key from Cloud Bigtablecom.google.protobuf.InvalidProtocolBufferException
- if conversion from rowKey to partition failspublic com.google.protobuf.ByteString convertPartitionToNewPartitionRowKey(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition)
partition
- convert to row key@Nullable public DetectNewPartitionsState readDetectNewPartitionsState()
public java.util.List<NewPartition> readNewPartitionsIncludingDeleted() throws com.google.protobuf.InvalidProtocolBufferException
com.google.protobuf.InvalidProtocolBufferException
public java.util.List<NewPartition> readNewPartitions() throws com.google.protobuf.InvalidProtocolBufferException
com.google.protobuf.InvalidProtocolBufferException
public void writeNewPartition(NewPartition newPartition)
newPartition
- the new partitionpublic void markNewPartitionForDeletion(NewPartition newPartition)
The reason behind 2 phase delete of NewPartition is to provide the invariant that the metadata table always has a copy of the ChangeStreamContinuationToken for every partition. The alternatives are either delete NewPartition before outputting PartitionRecord to RCSP or delete NewPartition after outputting PartitionRecord to RCSP.
The former has the problem that if the output failed and the NewPartition row has been deleted, the token is now lost.
The latter has a more complex problem of race between cleaning up NewPartition and RCSP writing StreamPartition. If clean up happens before StreamPartition is written, then at that moment the token is missing. While under normal operations, it should recover because RCSP writes StreamPartition eventually. We want to provide the invariant that there's always a copy of a token for every partition in the metadata table.
newPartition
- mark for deletion.public boolean deleteNewPartition(NewPartition newPartition)
It's possible to try to delete multiple parent partition cells but only a subset are marked for deletion. Only the cells marked for deletion are deleted.
newPartition
- row that represents the new partition.public java.util.List<StreamPartitionWithWatermark> readStreamPartitionsWithWatermark() throws com.google.protobuf.InvalidProtocolBufferException
com.google.protobuf.InvalidProtocolBufferException
public java.util.List<PartitionRecord> readAllStreamPartitions() throws com.google.protobuf.InvalidProtocolBufferException
com.google.protobuf.InvalidProtocolBufferException
public void updateDetectNewPartitionWatermark(Instant watermark)
watermark
- watermark value to set for the cellpublic void updateWatermark(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition, Instant watermark, @Nullable com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken currentToken)
partition
- forms the row key of the row to updatewatermark
- watermark value to set for the cellcurrentToken
- continuation token to set for the cellpublic boolean releaseStreamPartitionLockForDeletion(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition, java.lang.String uuid)
The reason to have 2 phase delete is
For example: the RCSP with id 1234
is working on partition AC. Metadata
table has the row
StreamPartition#AC
with uuid=1234
. Partition AC splits into AB and
BC. We write NewPartition#AB
and
NewPartition#BC
. But we haven't deleted StreamPartition#AC
yet.
Before we clean up StreamPartition#AC
, DNP processes AB and BC. AB and BC
start streaming. AB and BC immediately merge back to AC. DNP processes the
NewPartition#AC
and outputs AC. A new RCSP with id 5678 tries to lock
StreamPartition#AC
to start processing AC. This fails because RCSP 1234 still
holds the lock. Once the lock fails RCSP 5678 stops. RCSP 1234 eventually cleans up
StreamPartition#AC
and no RCSP is streaming AC anymore.
partition
- release the lock for this partitionuuid
- match the uuidpublic boolean deleteStreamPartitionRow(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition)
partition
- forms the row key of the row to deletepublic boolean doHoldLock(com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange partition, java.lang.String uuid)
partition
- partition to check if lock is helduuid
- to check if it holds the lockpublic boolean lockAndRecordPartition(PartitionRecord partitionRecord)
partitionRecord
- partition to lockpublic void writeDetectNewPartitionVersion()
public java.util.HashMap<com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange,Instant> readDetectNewPartitionMissingPartitions()
public void writeDetectNewPartitionMissingPartitions(java.util.HashMap<com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange,Instant> missingPartitionDurations)
missingPartitionDurations
- missing partitions and duration.