Class PartitionReconciler
Example of race condition:
- Bigtable: decides to merge A-B and B-C to A-C
- Beam A-B: receives CloseStream to merge into Partition A-C . Creates entry in metadata and terminates the stream.
- Beam B-C: is not currently streaming because it just check pointed and hasn't restarted yet.
- Bigtable: decides maybe merge wasn't good, splits A-C back into A-B and B-C
- Beam B-C: restarts now, but it never receives the 1st CloseStream merge message and it never will because CloseStream messages are not queued and because when requesting Change Stream for B-C, Bigtable recognizes that B-C does exist, so it's happy to start the stream.
- Beam A-B: doesn't exist... it's in the metadata table waiting for B-C to merge into A-C.
To reconcile this, we identify partitions that haven't been streamed for at least 5 minutes. This is probably an indication that there were some races of CloseStream merge messages.
-
Constructor Summary
ConstructorsConstructorDescriptionPartitionReconciler
(MetadataTableDao metadataTableDao, ChangeStreamMetrics metrics) -
Method Summary
Modifier and TypeMethodDescriptionvoid
addIncompleteNewPartitions
(NewPartition newPartition) Capture NewPartition row that cannot merge on its own.void
addMissingPartitions
(List<com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange> missingPartitions) Capture partitions that are not currently being streamed.getPartitionsToReconcile
(Instant lowWatermark, Instant startTime) For missing partitions, try to organize the mismatched parent tokens in a way to fill the missing partitions.
-
Constructor Details
-
PartitionReconciler
-
-
Method Details
-
addMissingPartitions
public void addMissingPartitions(List<com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange> missingPartitions) Capture partitions that are not currently being streamed. This should be the result of observing the metadata table to identify missing StreamPartition rows. All the StreamPartitions rows combined should form continuous, non-overlapping partitions covering all row keys.Combine existing missing partitions and current (newly added) missing partitions. If missing partitions have been missing for more than allotted time, it will be reconciled.
It is possible that a missing partition's boundary can change frequently, such that it can take a long time to realize a partition is truly missing. For example, if [C, D) is missing, but there are a lot of splits and merges around [C, D), we may see that sometimes [B,D) is missing, or at other times [C-E) is missing due to split and merge activities of [B-C) and [D-E), while [C-D) is truly missing. The moving boundaries would reset the timer leading to slower reconciliation of the missing partition.
- Parameters:
missingPartitions
- partitions not being streamed.
-
addIncompleteNewPartitions
Capture NewPartition row that cannot merge on its own. If any of these NewPartition row overlaps with partition we notice are missing and needs to be reconciled, we will need to clean up these NewPartition to avoid future conflicts and inconsistencies.- Parameters:
newPartition
- new partition waiting to be created.
-
getPartitionsToReconcile
For missing partitions, try to organize the mismatched parent tokens in a way to fill the missing partitions.Must call
addMissingPartitions(List)
before this.If there are parent tokens that when combined form a missing partition, it can be outputted as a merge of the missing partition.
If there are no parent tokens for a missing partition, it will need to be reconciled with adjusted low watermark. This is a catch-all solution. We don't expect to ever get into this situation. Missing partitions should all be mismatched merges that can be reconciled by organizing them correctly.
- Parameters:
lowWatermark
- watermark that all reconciled partition should havestartTime
- to help compute optimal reconcile point- Returns:
- reconciled PartitionRecord.
-