Class FileSystemOffsetRetainer

java.lang.Object
org.apache.beam.io.debezium.FileSystemOffsetRetainer
All Implemented Interfaces:
Serializable, OffsetRetainer

public class FileSystemOffsetRetainer extends Object implements OffsetRetainer
An OffsetRetainer that persists the Debezium connector offset as a JSON file using Beam's FileSystems abstraction.

The path argument can point to any filesystem supported by the active Beam runner, including local disk, Google Cloud Storage, Amazon S3, and others

On every task.commit(), the latest offset is serialised to JSON and written to the given path (overwriting the previous file). On pipeline startup the file is read back and the connector resumes from the stored position. If the file does not yet exist the connector starts from the beginning of the change stream.

Example — resume from GCS:


 DebeziumIO.read()
     .withConnectorConfiguration(config)
     .withOffsetRetainer(
         FileSystemOffsetRetainer.of("gs://my-bucket/debezium/orders-offset.json"))
     .withFormatFunction(myMapper);
 

Example — local filesystem (useful for testing):


 DebeziumIO.read()
     .withConnectorConfiguration(config)
     .withOffsetRetainer(FileSystemOffsetRetainer.of("/tmp/debezium-offset.json"))
     .withFormatFunction(myMapper);
 

Note: writes are performed atomically: the offset is first written to a .tmp sibling file and then renamed to the final path, so a mid-write crash leaves the previous offset intact.

See Also:
  • Method Details

    • of

      public static FileSystemOffsetRetainer of(String path)
      Creates a new FileSystemOffsetRetainer that stores the offset at path.
    • loadOffset

      public @Nullable Map<String,Object> loadOffset()
      Reads the offset JSON file and returns its contents, or null if the file does not yet exist (first run). Throws RuntimeException if the file exists but cannot be read, to prevent silently reprocessing data from the beginning.
      Specified by:
      loadOffset in interface OffsetRetainer
    • saveOffset

      public void saveOffset(Map<String,Object> offset)
      Serialises offset to JSON and writes it atomically to the configured path.

      If the offset is identical to the last successfully written one, the write is skipped to avoid unnecessary I/O on every checkpoint.

      Otherwise the data is first written to a .tmp sibling file and then renamed to the final path, so a mid-write crash leaves the previous offset intact.

      Errors are logged as warnings and swallowed so the pipeline continues.

      Specified by:
      saveOffset in interface OffsetRetainer
      Parameters:
      offset - The current connector offset, as returned by SourceRecord.sourceOffset().