apache_beam.io.gcp.bigquery_change_history module
Streaming source for BigQuery change history (APPENDS/CHANGES functions).
This module provides ReadBigQueryChangeHistory, a streaming PTransform
that continuously polls BigQuery APPENDS() or CHANGES() functions and emits
changed rows as an unbounded PCollection.
Status: Experimental: API may change without notice.
Usage:
import apache_beam as beam
from apache_beam.io.gcp.bigquery_change_history import ReadBigQueryChangeHistory
with beam.Pipeline(options=pipeline_options) as p:
changes = (
p
| ReadBigQueryChangeHistory(
table='my-project:my_dataset.my_table',
change_function='APPENDS',
poll_interval_sec=60))
- Architecture:
Poll: Polling SDF emits lightweight _QueryRange instructions. Query: _ExecuteQueryFn runs the BQ query, writes to a temp table. Read: SDF reads temp table via Storage Read API with dynamic splitting. Cleanup: Stateful DoFn tracks stream completion, deletes temp tables.
- class apache_beam.io.gcp.bigquery_change_history.ReadBigQueryChangeHistory(table: str, poll_interval_sec: float = 60, start_time: float | None = None, stop_time: float | None = None, change_function: str = 'APPENDS', buffer_sec: float = 10, project: str | None = None, temp_dataset: str | None = None, location: str | None = None, change_type_column: str = 'change_type', change_timestamp_column: str = 'change_timestamp', columns: List[str] | None = None, row_filter: str | None = None, batch_arrow_read: bool = True, max_split_rounds: int = 1, reshuffle_decompress: bool = True)[source]
Bases:
PTransformStreaming source for BigQuery change history.
Continuously polls BigQuery APPENDS() or CHANGES() functions and emits changed rows as an unbounded PCollection of dicts.
- Parameters:
table – BigQuery table to read changes from. Format: ‘project:dataset.table’ or ‘project.dataset.table’.
poll_interval_sec – Seconds between polls. Default 60.
start_time – Start reading from this timestamp (float, epoch seconds). Default: current time when pipeline starts.
stop_time – Stop polling at this timestamp. Default: run forever.
change_function – ‘CHANGES’ or ‘APPENDS’. Default ‘APPENDS’.
buffer_sec – Safety buffer in seconds behind now(). Default 10. BQ does not fail or wait if the query end_ts is less than BQ’s CURRENT_TIMESTAMP. This is an extra guardrail to protect against silent data.
project – GCP project ID. Default: from pipeline options.
temp_dataset – Dataset for temp tables. If None (default), a per-pipeline dataset is auto-created with a 24-hour table expiration as a safety net for orphaned tables. Set this to use an existing dataset (e.g. if your service account lacks bigquery.datasets.create permission).
location – BigQuery geographic location for query jobs and temp dataset (e.g. ‘US’, ‘us-central1’). If None (default), inferred from the source table.
change_type_column – Output column name for the _CHANGE_TYPE pseudo-column. Default ‘change_type’. Change this if your source table already has a column named ‘change_type’.
change_timestamp_column – Output column name for the _CHANGE_TIMESTAMP pseudo-column. Default ‘change_timestamp’. Change this if your source table already has a column named ‘change_timestamp’. This column is also used internally to extract event timestamps for watermark tracking.
columns – Optional list of column names to select from the source table. If None (default), all columns are selected. The pseudo-columns (change_type, change_timestamp) are always included regardless of this setting.
row_filter – Optional SQL boolean expression used as a WHERE clause on the CHANGES/APPENDS query. Do not include the WHERE keyword. Example:
'status = "active" AND region = "US"'.batch_arrow_read – If True (default), convert Arrow RecordBatches in bulk using to_pylist() instead of per-cell .as_py() calls. This is 1.5x faster for large tables at the cost of ~2x peak memory per RecordBatch. Set to False for minimal memory usage.
max_split_rounds – Maximum number of recursive SplitReadStream rounds. Each round splits every stream at fraction=0.5, potentially doubling the stream count (if BQ allows). Default 1 (one round of splitting). Set 0 to disable splitting entirely. Set higher for very large tables where more parallelism is needed.
reshuffle_decompress – If True (default), the Read SDF emits raw compressed Arrow batches instead of decoded rows. The batches are reshuffled for fan-out and then decoded in a separate DoFn. This spreads decompression and Arrow-to-Python conversion CPU across more workers. Set to False to decode rows inline within the Read SDF.
- expand(pbegin: PBegin) PCollection[source]