java.lang.Object
org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler.PartitionReconciler

@Internal public class PartitionReconciler extends Object
There can be a race when many splits and merges happen to a single partition in quick succession. It's possible that some CloseStream merge messages may be missed. This can lead to an inconsistent state in the metadata table causing some partitions to not be streamed at all. This class tries to reconcile the problem by ensuring that all partitions are streamed. If any partitions are missing for an extended period of time, we fix it by creating a new partitionRecord to stream the partition.

Example of race condition:

  1. Bigtable: decides to merge A-B and B-C to A-C
  2. Beam A-B: receives CloseStream to merge into Partition A-C . Creates entry in metadata and terminates the stream.
  3. Beam B-C: is not currently streaming because it just check pointed and hasn't restarted yet.
  4. Bigtable: decides maybe merge wasn't good, splits A-C back into A-B and B-C
  5. Beam B-C: restarts now, but it never receives the 1st CloseStream merge message and it never will because CloseStream messages are not queued and because when requesting Change Stream for B-C, Bigtable recognizes that B-C does exist, so it's happy to start the stream.
  6. Beam A-B: doesn't exist... it's in the metadata table waiting for B-C to merge into A-C.

To reconcile this, we identify partitions that haven't been streamed for at least 5 minutes. This is probably an indication that there were some races of CloseStream merge messages.

  • Constructor Details

  • Method Details

    • addMissingPartitions

      public void addMissingPartitions(List<com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange> missingPartitions)
      Capture partitions that are not currently being streamed. This should be the result of observing the metadata table to identify missing StreamPartition rows. All the StreamPartitions rows combined should form continuous, non-overlapping partitions covering all row keys.

      Combine existing missing partitions and current (newly added) missing partitions. If missing partitions have been missing for more than allotted time, it will be reconciled.

      It is possible that a missing partition's boundary can change frequently, such that it can take a long time to realize a partition is truly missing. For example, if [C, D) is missing, but there are a lot of splits and merges around [C, D), we may see that sometimes [B,D) is missing, or at other times [C-E) is missing due to split and merge activities of [B-C) and [D-E), while [C-D) is truly missing. The moving boundaries would reset the timer leading to slower reconciliation of the missing partition.

      Parameters:
      missingPartitions - partitions not being streamed.
    • addIncompleteNewPartitions

      public void addIncompleteNewPartitions(NewPartition newPartition)
      Capture NewPartition row that cannot merge on its own. If any of these NewPartition row overlaps with partition we notice are missing and needs to be reconciled, we will need to clean up these NewPartition to avoid future conflicts and inconsistencies.
      Parameters:
      newPartition - new partition waiting to be created.
    • getPartitionsToReconcile

      public List<PartitionRecord> getPartitionsToReconcile(Instant lowWatermark, Instant startTime)
      For missing partitions, try to organize the mismatched parent tokens in a way to fill the missing partitions.

      Must call addMissingPartitions(List) before this.

      If there are parent tokens that when combined form a missing partition, it can be outputted as a merge of the missing partition.

      If there are no parent tokens for a missing partition, it will need to be reconciled with adjusted low watermark. This is a catch-all solution. We don't expect to ever get into this situation. Missing partitions should all be mismatched merges that can be reconciled by organizing them correctly.

      Parameters:
      lowWatermark - watermark that all reconciled partition should have
      startTime - to help compute optimal reconcile point
      Returns:
      reconciled PartitionRecord.