Class FileSystemOffsetRetainer
- All Implemented Interfaces:
Serializable,OffsetRetainer
OffsetRetainer that persists the Debezium connector offset as a JSON file using Beam's
FileSystems abstraction.
The path argument can point to any filesystem supported by the active Beam runner,
including local disk, Google Cloud Storage, Amazon S3, and others
On every task.commit(), the latest offset is serialised to JSON and written to the
given path (overwriting the previous file). On pipeline startup the file is read back and the
connector resumes from the stored position. If the file does not yet exist the connector starts
from the beginning of the change stream.
Example — resume from GCS:
DebeziumIO.read()
.withConnectorConfiguration(config)
.withOffsetRetainer(
FileSystemOffsetRetainer.of("gs://my-bucket/debezium/orders-offset.json"))
.withFormatFunction(myMapper);
Example — local filesystem (useful for testing):
DebeziumIO.read()
.withConnectorConfiguration(config)
.withOffsetRetainer(FileSystemOffsetRetainer.of("/tmp/debezium-offset.json"))
.withFormatFunction(myMapper);
Note: writes are performed atomically: the offset is first written to a .tmp
sibling file and then renamed to the final path, so a mid-write crash leaves the previous offset
intact.
- See Also:
-
Method Summary
Modifier and TypeMethodDescriptionReads the offset JSON file and returns its contents, ornullif the file does not yet exist (first run).static FileSystemOffsetRetainerCreates a newFileSystemOffsetRetainerthat stores the offset atpath.voidsaveOffset(Map<String, Object> offset) Serialisesoffsetto JSON and writes it atomically to the configured path.
-
Method Details
-
of
Creates a newFileSystemOffsetRetainerthat stores the offset atpath. -
loadOffset
Reads the offset JSON file and returns its contents, ornullif the file does not yet exist (first run). ThrowsRuntimeExceptionif the file exists but cannot be read, to prevent silently reprocessing data from the beginning.- Specified by:
loadOffsetin interfaceOffsetRetainer
-
saveOffset
Serialisesoffsetto JSON and writes it atomically to the configured path.If the offset is identical to the last successfully written one, the write is skipped to avoid unnecessary I/O on every checkpoint.
Otherwise the data is first written to a
.tmpsibling file and then renamed to the final path, so a mid-write crash leaves the previous offset intact.Errors are logged as warnings and swallowed so the pipeline continues.
- Specified by:
saveOffsetin interfaceOffsetRetainer- Parameters:
offset- The current connector offset, as returned bySourceRecord.sourceOffset().
-