apache_beam.io.gcp.bigquery_change_history module

Streaming source for BigQuery change history (APPENDS/CHANGES functions).

This module provides ReadBigQueryChangeHistory, a streaming PTransform that continuously polls BigQuery APPENDS() or CHANGES() functions and emits changed rows as an unbounded PCollection.

Status: Experimental: API may change without notice.

Usage:

import apache_beam as beam
from apache_beam.io.gcp.bigquery_change_history import ReadBigQueryChangeHistory

with beam.Pipeline(options=pipeline_options) as p:
    changes = (
        p
        | ReadBigQueryChangeHistory(
            table='my-project:my_dataset.my_table',
            change_function='APPENDS',
            poll_interval_sec=60))
Architecture:

Poll: Polling SDF emits lightweight _QueryRange instructions. Query: _ExecuteQueryFn runs the BQ query, writes to a temp table. Read: SDF reads temp table via Storage Read API with dynamic splitting. Cleanup: Stateful DoFn tracks stream completion, deletes temp tables.

class apache_beam.io.gcp.bigquery_change_history.ReadBigQueryChangeHistory(table: str, poll_interval_sec: float = 60, start_time: float | None = None, stop_time: float | None = None, change_function: str = 'APPENDS', buffer_sec: float = 10, project: str | None = None, temp_dataset: str | None = None, location: str | None = None, change_type_column: str = 'change_type', change_timestamp_column: str = 'change_timestamp', columns: List[str] | None = None, row_filter: str | None = None, batch_arrow_read: bool = True, max_split_rounds: int = 1, reshuffle_decompress: bool = True)[source]

Bases: PTransform

Streaming source for BigQuery change history.

Continuously polls BigQuery APPENDS() or CHANGES() functions and emits changed rows as an unbounded PCollection of dicts.

Parameters:
  • table – BigQuery table to read changes from. Format: ‘project:dataset.table’ or ‘project.dataset.table’.

  • poll_interval_sec – Seconds between polls. Default 60.

  • start_time – Start reading from this timestamp (float, epoch seconds). Default: current time when pipeline starts.

  • stop_time – Stop polling at this timestamp. Default: run forever.

  • change_function – ‘CHANGES’ or ‘APPENDS’. Default ‘APPENDS’.

  • buffer_sec – Safety buffer in seconds behind now(). Default 10. BQ does not fail or wait if the query end_ts is less than BQ’s CURRENT_TIMESTAMP. This is an extra guardrail to protect against silent data.

  • project – GCP project ID. Default: from pipeline options.

  • temp_dataset – Dataset for temp tables. If None (default), a per-pipeline dataset is auto-created with a 24-hour table expiration as a safety net for orphaned tables. Set this to use an existing dataset (e.g. if your service account lacks bigquery.datasets.create permission).

  • location – BigQuery geographic location for query jobs and temp dataset (e.g. ‘US’, ‘us-central1’). If None (default), inferred from the source table.

  • change_type_column – Output column name for the _CHANGE_TYPE pseudo-column. Default ‘change_type’. Change this if your source table already has a column named ‘change_type’.

  • change_timestamp_column – Output column name for the _CHANGE_TIMESTAMP pseudo-column. Default ‘change_timestamp’. Change this if your source table already has a column named ‘change_timestamp’. This column is also used internally to extract event timestamps for watermark tracking.

  • columns – Optional list of column names to select from the source table. If None (default), all columns are selected. The pseudo-columns (change_type, change_timestamp) are always included regardless of this setting.

  • row_filter – Optional SQL boolean expression used as a WHERE clause on the CHANGES/APPENDS query. Do not include the WHERE keyword. Example: 'status = "active" AND region = "US"'.

  • batch_arrow_read – If True (default), convert Arrow RecordBatches in bulk using to_pylist() instead of per-cell .as_py() calls. This is 1.5x faster for large tables at the cost of ~2x peak memory per RecordBatch. Set to False for minimal memory usage.

  • max_split_rounds – Maximum number of recursive SplitReadStream rounds. Each round splits every stream at fraction=0.5, potentially doubling the stream count (if BQ allows). Default 1 (one round of splitting). Set 0 to disable splitting entirely. Set higher for very large tables where more parallelism is needed.

  • reshuffle_decompress – If True (default), the Read SDF emits raw compressed Arrow batches instead of decoded rows. The batches are reshuffled for fan-out and then decoded in a separate DoFn. This spreads decompression and Arrow-to-Python conversion CPU across more workers. Set to False to decode rows inline within the Read SDF.

expand(pbegin: PBegin) PCollection[source]