apache_beam.io.gcp.bigquery module

BigQuery sources and sinks.

This module implements reading from and writing to BigQuery tables. It relies on several classes exposed by the BigQuery API: TableSchema, TableFieldSchema, TableRow, and TableCell. The default mode is to return table rows read from a BigQuery source as dictionaries. Similarly a Write transform to a BigQuerySink accepts PCollections of dictionaries. This is done for more convenient programming. If desired, the native TableRow objects can be used throughout to represent rows (use an instance of TableRowJsonCoder as a coder argument when creating the sources or sinks respectively).

Also, for programming convenience, instances of TableReference and TableSchema have a string representation that can be used for the corresponding arguments:

  • TableReference can be a PROJECT:DATASET.TABLE or DATASET.TABLE string.

  • TableSchema can be a NAME:TYPE{,NAME:TYPE}* string (e.g. ‘month:STRING,event_count:INTEGER’).

The syntax supported is described here: https://cloud.google.com/bigquery/bq-command-line-tool-quickstart

BigQuery sources can be used as main inputs or side inputs. A main input (common case) is expected to be massive and will be split into manageable chunks and processed in parallel. Side inputs are expected to be small and will be read completely every time a ParDo DoFn gets executed. In the example below the lambda function implementing the DoFn for the Map transform will get on each call one row of the main table and all rows of the side table. The runner may use some caching techniques to share the side inputs between calls in order to avoid excessive reading::

main_table = pipeline | 'VeryBig' >> beam.io.ReadFromBigQuery(...)
side_table = pipeline | 'NotBig' >> beam.io.ReadFromBigQuery(...)
results = (
    main_table
    | 'ProcessData' >> beam.Map(
        lambda element, side_input: ..., AsList(side_table)))

There is no difference in how main and side inputs are read. What makes the side_table a ‘side input’ is the AsList wrapper used when passing the table as a parameter to the Map transform. AsList signals to the execution framework that its input should be made available whole.

The main and side inputs are implemented differently. Reading a BigQuery table as main input entails exporting the table to a set of GCS files (in AVRO or in JSON format) and then processing those files.

Users may provide a query to read from rather than reading all of a BigQuery table. If specified, the result obtained by executing the specified query will be used as the data of the input transform.:

query_results = pipeline | beam.io.gcp.bigquery.ReadFromBigQuery(
    query='SELECT year, mean_temp FROM samples.weather_stations')

When creating a BigQuery input transform, users should provide either a query or a table. Pipeline construction will fail with a validation error if neither or both are specified.

When reading via ReadFromBigQuery using EXPORT, bytes are returned decoded as bytes. This is due to the fact that ReadFromBigQuery uses Avro exports by default. When reading from BigQuery using apache_beam.io.BigQuerySource, bytes are returned as base64-encoded bytes. To get base64-encoded bytes using ReadFromBigQuery, you can use the flag use_json_exports to export data as JSON, and receive base64-encoded bytes.

ReadAllFromBigQuery

Beam 2.27.0 introduces a new transform called ReadAllFromBigQuery which allows you to define table and query reads from BigQuery at pipeline runtime.::

read_requests = p | beam.Create([
    ReadFromBigQueryRequest(query='SELECT * FROM mydataset.mytable'),
    ReadFromBigQueryRequest(table='myproject.mydataset.mytable')])
results = read_requests | ReadAllFromBigQuery()

A good application for this transform is in streaming pipelines to refresh a side input coming from BigQuery. This would work like so::

side_input = (
    p
    | 'PeriodicImpulse' >> PeriodicImpulse(
        first_timestamp, last_timestamp, interval, True)
    | 'MapToReadRequest' >> beam.Map(
        lambda x: ReadFromBigQueryRequest(table='dataset.table'))
    | beam.io.ReadAllFromBigQuery())
main_input = (
    p
    | 'MpImpulse' >> beam.Create(sample_main_input_elements)
    |
    'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
    | 'WindowMpInto' >> beam.WindowInto(
        window.FixedWindows(main_input_windowing_interval)))
result = (
    main_input
    | 'ApplyCrossJoin' >> beam.FlatMap(
        cross_join, rights=beam.pvalue.AsIter(side_input)))

Note: This transform is supported on Portable and Dataflow v2 runners.

Note: This transform does not currently clean up temporary datasets created for its execution. (BEAM-11359)

Writing Data to BigQuery

The WriteToBigQuery transform is the recommended way of writing data to BigQuery. It supports a large set of parameters to customize how you’d like to write to BigQuery.

Table References

This transform allows you to provide static project, dataset and table parameters which point to a specific BigQuery table to be created. The table parameter can also be a dynamic parameter (i.e. a callable), which receives an element to be written to BigQuery, and returns the table that that element should be sent to.

You may also provide a tuple of PCollectionView elements to be passed as side inputs to your callable. For example, suppose that one wishes to send events of different types to different tables, and the table names are computed at pipeline runtime, one may do something like the following:

with Pipeline() as p:
  elements = (p | 'Create elements' >> beam.Create([
    {'type': 'error', 'timestamp': '12:34:56', 'message': 'bad'},
    {'type': 'user_log', 'timestamp': '12:34:59', 'query': 'flu symptom'},
  ]))

  table_names = (p | 'Create table_names' >> beam.Create([
    ('error', 'my_project:dataset1.error_table_for_today'),
    ('user_log', 'my_project:dataset1.query_table_for_today'),
  ]))

  table_names_dict = beam.pvalue.AsDict(table_names)

  elements | beam.io.gcp.bigquery.WriteToBigQuery(
    table=lambda row, table_dict: table_dict[row['type']],
    table_side_inputs=(table_names_dict,))

In the example above, the table_dict argument passed to the function in table_dict is the side input coming from table_names_dict, which is passed as part of the table_side_inputs argument.

Schemas

This transform also allows you to provide a static or dynamic schema parameter (i.e. a callable).

If providing a callable, this should take in a table reference (as returned by the table parameter), and return the corresponding schema for that table. This allows to provide different schemas for different tables:

def compute_table_name(row):
  ...

errors_schema = {'fields': [
  {'name': 'type', 'type': 'STRING', 'mode': 'NULLABLE'},
  {'name': 'message', 'type': 'STRING', 'mode': 'NULLABLE'}]}
queries_schema = {'fields': [
  {'name': 'type', 'type': 'STRING', 'mode': 'NULLABLE'},
  {'name': 'query', 'type': 'STRING', 'mode': 'NULLABLE'}]}

with Pipeline() as p:
  elements = (p | beam.Create([
    {'type': 'error', 'timestamp': '12:34:56', 'message': 'bad'},
    {'type': 'user_log', 'timestamp': '12:34:59', 'query': 'flu symptom'},
  ]))

  elements | beam.io.gcp.bigquery.WriteToBigQuery(
    table=compute_table_name,
    schema=lambda table: (errors_schema
                          if 'errors' in table
                          else queries_schema))

It may be the case that schemas are computed at pipeline runtime. In cases like these, one can also provide a schema_side_inputs parameter, which is a tuple of PCollectionViews to be passed to the schema callable (much like the table_side_inputs parameter).

Additional Parameters for BigQuery Tables

This sink is able to create tables in BigQuery if they don’t already exist. It also relies on creating temporary tables when performing file loads.

The WriteToBigQuery transform creates tables using the BigQuery API by inserting a load job (see the API reference [1]), or by inserting a new table (see the API reference for that [2][3]).

When creating a new BigQuery table, there are a number of extra parameters that one may need to specify. For example, clustering, partitioning, data encoding, etc. It is possible to provide these additional parameters by passing a Python dictionary as additional_bq_parameters to the transform. As an example, to create a table that has specific partitioning, and clustering properties, one would do the following:

additional_bq_parameters = {
  'timePartitioning': {'type': 'DAY'},
  'clustering': {'fields': ['country']}}
with Pipeline() as p:
  elements = (p | beam.Create([
    {'country': 'mexico', 'timestamp': '12:34:56', 'query': 'acapulco'},
    {'country': 'canada', 'timestamp': '12:34:59', 'query': 'influenza'},
  ]))

  elements | beam.io.gcp.bigquery.WriteToBigQuery(
    table='project_name1:dataset_2.query_events_table',
    additional_bq_parameters=additional_bq_parameters)

Much like the schema case, the parameter with additional_bq_parameters can also take a callable that receives a table reference.

[1] https://cloud.google.com/bigquery/docs/reference/rest/v2/Job #jobconfigurationload [2] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert [3] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource

Chaining of operations after WriteToBigQuery

WritToBigQuery returns an object with several PCollections that consist of metadata about the write operations. These are useful to inspect the write operation and follow with the results:

schema = {'fields': [
    {'name': 'column', 'type': 'STRING', 'mode': 'NULLABLE'}]}

error_schema = {'fields': [
    {'name': 'destination', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'row', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'error_message', 'type': 'STRING', 'mode': 'NULLABLE'}]}

with Pipeline() as p:
  result = (p
    | 'Create Columns' >> beam.Create([
            {'column': 'value'},
            {'bad_column': 'bad_value'}
          ])
    | 'Write Data' >> WriteToBigQuery(
            method=WriteToBigQuery.Method.STREAMING_INSERTS,
            table=my_table,
            schema=schema,
            insert_retry_strategy=RetryStrategy.RETRY_NEVER
          ))

  _ = (result.failed_rows_with_errors
    | 'Get Errors' >> beam.Map(lambda e: {
            "destination": e[0],
            "row": json.dumps(e[1]),
            "error_message": e[2][0]['message']
          })
    | 'Write Errors' >> WriteToBigQuery(
            method=WriteToBigQuery.Method.STREAMING_INSERTS,
            table=error_log_table,
            schema=error_schema,
          ))

Often, the simplest use case is to chain an operation after writing data to BigQuery.To do this, one can chain the operation after one of the output PCollections. A generic way in which this operation (independent of write method) could look like:

def chain_after(result):
  try:
    # This works for FILE_LOADS, where we run load and possibly copy jobs.
    return (result.destination_load_jobid_pairs,
        result.destination_copy_jobid_pairs) | beam.Flatten()
  except AttributeError:
    # Works for STREAMING_INSERTS, where we return the rows BigQuery rejected
    return result.failed_rows

result = (pcoll | WriteToBigQuery(...))

_ = (chain_after(result)
     | beam.Reshuffle() # Force a 'commit' of the intermediate date
     | MyOperationAfterWriteToBQ())

Attributes can be accessed using dot notation or bracket notation:

result.failed_rows <–> result[‘FailedRows’] result.failed_rows_with_errors <–> result[‘FailedRowsWithErrors’] result.destination_load_jobid_pairs <–> result[‘destination_load_jobid_pairs’] result.destination_file_pairs <–> result[‘destination_file_pairs’] result.destination_copy_jobid_pairs <–> result[‘destination_copy_jobid_pairs’]

Writing with Storage Write API using Cross Language

This sink is able to write with BigQuery’s Storage Write API. To do so, specify the method WriteToBigQuery.Method.STORAGE_WRITE_API. This will use the StorageWriteToBigQuery() transform to discover and use the Java implementation. Using this transform directly will require the use of beam.Row() elements.

Similar to streaming inserts, it returns two dead-letter queue PCollections: one containing just the failed rows and the other containing failed rows and errors. They can be accessed with failed_rows and failed_rows_with_errors, respectively. See the examples above for how to do this.

* Short introduction to BigQuery concepts * Tables have rows (TableRow) and each row has cells (TableCell). A table has a schema (TableSchema), which in turn describes the schema of each cell (TableFieldSchema). The terms field and cell are used interchangeably.

TableSchema: Describes the schema (types and order) for values in each row.

Has one attribute, ‘field’, which is list of TableFieldSchema objects.

TableFieldSchema: Describes the schema (type, name) for one field.

Has several attributes, including ‘name’ and ‘type’. Common values for the type attribute are: ‘STRING’, ‘INTEGER’, ‘FLOAT’, ‘BOOLEAN’, ‘NUMERIC’, ‘GEOGRAPHY’. All possible values are described at: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types

TableRow: Holds all values in a table row. Has one attribute, ‘f’, which is a

list of TableCell instances.

TableCell: Holds the value for one cell (or field). Has one attribute,

‘v’, which is a JsonValue instance. This class is defined in apitools.base.py.extra_types.py module.

As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with Well-Known Text (See https://en.wikipedia.org/wiki/Well-known_text) format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to be encoded using base64 encoding when writing to BigQuery.

Updates to the I/O connector code

For any significant updates to this I/O connector, please consider involving corresponding code reviewers mentioned in https://github.com/apache/beam/blob/master/sdks/python/OWNERS

class apache_beam.io.gcp.bigquery.TableRowJsonCoder(table_schema=None)[source]

Bases: Coder

A coder for a TableRow instance to/from a JSON string.

Note that the encoding operation (used when writing to sinks) requires the table schema in order to obtain the ordered list of field names. Reading from sources on the other hand does not need the table schema.

encode(table_row)[source]
decode(encoded_table_row)[source]
class apache_beam.io.gcp.bigquery.BigQueryDisposition[source]

Bases: object

Class holding standard strings used for create and write dispositions.

CREATE_NEVER = 'CREATE_NEVER'
CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'
WRITE_TRUNCATE = 'WRITE_TRUNCATE'
WRITE_APPEND = 'WRITE_APPEND'
WRITE_EMPTY = 'WRITE_EMPTY'
static validate_create(disposition)[source]
static validate_write(disposition)[source]
apache_beam.io.gcp.bigquery.BigQuerySource(table=None, dataset=None, project=None, query=None, validate=False, coder=None, use_standard_sql=False, flatten_results=True, kms_key=None, use_dataflow_native_source=False)[source]
apache_beam.io.gcp.bigquery.BigQuerySink(*args, validate=False, **kwargs)[source]

A deprecated alias for WriteToBigQuery.

class apache_beam.io.gcp.bigquery.BigQueryQueryPriority[source]

Bases: object

Class holding standard strings used for query priority.

INTERACTIVE = 'INTERACTIVE'
BATCH = 'BATCH'
class apache_beam.io.gcp.bigquery.WriteToBigQuery(table, dataset=None, project=None, schema=None, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_APPEND', kms_key=None, batch_size=None, max_file_size=None, max_partition_size=None, max_files_per_bundle=None, test_client=None, custom_gcs_temp_location=None, method=None, insert_retry_strategy=None, additional_bq_parameters=None, table_side_inputs=None, schema_side_inputs=None, triggering_frequency=None, use_at_least_once=False, validate=True, temp_file_format=None, ignore_insert_ids=False, with_auto_sharding=False, num_storage_api_streams=0, ignore_unknown_columns=False, load_job_project_id=None, max_insert_payload_size=9437184, num_streaming_keys=500, use_cdc_writes: bool = False, primary_key: List[str] | None = None, expansion_service=None)[source]

Bases: PTransform

Write data to BigQuery.

This transform receives a PCollection of elements to be inserted into BigQuery tables. The elements would come in as Python dictionaries, or as TableRow instances.

Initialize a WriteToBigQuery transform.

Parameters:
  • table (str, callable, ValueProvider) – The ID of the table, or a callable that returns it. The ID must contain only letters a-z, A-Z, numbers 0-9, or connectors -_. If dataset argument is None then the table argument must contain the entire table reference specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. If it’s a callable, it must receive one argument representing an element to be written to BigQuery, and return a TableReference, or a string table name as specified above.

  • dataset (str) – The ID of the dataset containing this table or None if the table reference is specified entirely by the table argument.

  • project (str) – The ID of the project containing this table or None if the table reference is specified entirely by the table argument.

  • schema (str,dict,ValueProvider,callable) – The schema to be used if the BigQuery table to write has to be created. This can be either specified as a TableSchema. or a ValueProvider that has a JSON string, or a python dictionary, or the string or dictionary itself, object or a single string of the form 'field1:type1,field2:type2,field3:type3' that defines a comma separated list of fields. Here 'type' should specify the BigQuery type of the field. Single string based schemas do not support nested fields, repeated fields, or specifying a BigQuery mode for fields (mode will always be set to 'NULLABLE'). If a callable, then it should receive a destination (in the form of a str, and return a str, dict or TableSchema). One may also pass SCHEMA_AUTODETECT here when using JSON-based file loads, and BigQuery will try to infer the schema for the files that are being loaded.

  • create_disposition (BigQueryDisposition) –

    A string describing what happens if the table does not exist. Possible values are:

  • write_disposition (BigQueryDisposition) –

    A string describing what happens if the table has already some data. Possible values are:

    For streaming pipelines WriteTruncate can not be used.

  • kms_key (str) – Optional Cloud KMS key name for use when creating new tables.

  • batch_size (int) – Number of rows to be written to BQ per streaming API insert. The default is 500.

  • test_client – Override the default bigquery client used for testing.

  • max_file_size (int) – The maximum size for a file to be written and then loaded into BigQuery. The default value is 4TB, which is 80% of the limit of 5TB for BigQuery to load any file.

  • max_partition_size (int) – Maximum byte size for each load job to BigQuery. Defaults to 15TB. Applicable to FILE_LOADS only.

  • max_files_per_bundle (int) – The maximum number of files to be concurrently written by a worker. The default here is 20. Larger values will allow writing to multiple destinations without having to reshard - but they increase the memory burden on the workers.

  • custom_gcs_temp_location (str) – A GCS location to store files to be used for file loads into BigQuery. By default, this will use the pipeline’s temp_location, but for pipelines whose temp_location is not appropriate for BQ File Loads, users should pass a specific one.

  • method – The method to use to write to BigQuery. It may be STREAMING_INSERTS, FILE_LOADS, STORAGE_WRITE_API or DEFAULT. An introduction on loading data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data. DEFAULT will use STREAMING_INSERTS on Streaming pipelines and FILE_LOADS on Batch pipelines. Note: FILE_LOADS currently does not support BigQuery’s JSON data type: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type”>

  • insert_retry_strategy

    The strategy to use when retrying streaming inserts into BigQuery. Options are shown in bigquery_tools.RetryStrategy attrs. Default is to retry always. This means that whenever there are rows that fail to be inserted to BigQuery, they will be retried indefinitely. Other retry strategy settings will produce a deadletter PCollection as output. Appropriate values are:

    • RetryStrategy.RETRY_ALWAYS: retry all rows if there are any kind of errors. Note that this will hold your pipeline back if there are errors until you cancel or update it.

    • RetryStrategy.RETRY_NEVER: rows with errors will not be retried. Instead they will be output to a dead letter queue under the ‘FailedRows’ tag.

    • RetryStrategy.RETRY_ON_TRANSIENT_ERROR: retry rows with transient errors (e.g. timeouts). Rows with permanent errors will be output to dead letter queue under ‘FailedRows’ tag.

  • additional_bq_parameters (dict, callable) – Additional parameters to pass to BQ when creating / loading data into a table. If a callable, it should be a function that receives a table reference indicating the destination and returns a dictionary. These can be ‘timePartitioning’, ‘clustering’, etc. They are passed directly to the job load configuration. See https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload

  • table_side_inputs (tuple) – A tuple with AsSideInput PCollections to be passed to the table callable (if one is provided).

  • schema_side_inputs – A tuple with AsSideInput PCollections to be passed to the schema callable (if one is provided).

  • triggering_frequency (float) –

    When method is FILE_LOADS: Value will be converted to int. Every triggering_frequency seconds, a BigQuery load job will be triggered for all the data written since the last load job. BigQuery has limits on how many load jobs can be triggered per day, so be careful not to set this duration too low, or you may exceed daily quota. Often this is set to 5 or 10 minutes to ensure that the project stays well under the BigQuery quota. See https://cloud.google.com/bigquery/quota-policy for more information about BigQuery quotas.

    When method is STREAMING_INSERTS and with_auto_sharding=True: A streaming inserts batch will be submitted at least every triggering_frequency seconds when data is waiting. The batch can be sent earlier if it reaches the maximum batch size set by batch_size. Default value is 0.2 seconds.

    When method is STORAGE_WRITE_API: A stream of rows will be committed every triggering_frequency seconds. By default, this will be 5 seconds to ensure exactly-once semantics.

  • use_at_least_once – Intended only for STORAGE_WRITE_API. When True, will use at-least-once semantics. This is cheaper and provides lower latency, but will potentially duplicate records.

  • validate – Indicates whether to perform validation checks on inputs. This parameter is primarily used for testing.

  • temp_file_format – The format to use for file loads into BigQuery. The options are NEWLINE_DELIMITED_JSON or AVRO, with NEWLINE_DELIMITED_JSON being used by default. For advantages and limitations of the two formats, see https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro and https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json.

  • ignore_insert_ids – When using the STREAMING_INSERTS method to write data to BigQuery, insert_ids are a feature of BigQuery that support deduplication of events. If your use case is not sensitive to duplication of data inserted to BigQuery, set ignore_insert_ids to True to increase the throughput for BQ writing. See: https://cloud.google.com/bigquery/streaming-data-into-bigquery#disabling_best_effort_de-duplication

  • with_auto_sharding – Experimental. If true, enables using a dynamically determined number of shards to write to BigQuery. This can be used for all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only applicable to unbounded input.

  • num_storage_api_streams – Specifies the number of write streams that the Storage API sink will use. This parameter is only applicable when writing unbounded data.

  • ignore_unknown_columns – Accept rows that contain values that do not match the schema. The unknown values are ignored. Default is False, which treats unknown values as errors. This option is only valid for method=STREAMING_INSERTS. See reference: https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll

  • load_job_project_id – Specifies an alternate GCP project id to use for billingBatch File Loads. By default, the project id of the table is used.

  • num_streaming_keys – The number of shards per destination when writing via streaming inserts.

  • expansion_service – The address (host:port) of the expansion service. If no expansion service is provided, will attempt to run the default GCP expansion service. Used for STORAGE_WRITE_API method.

  • max_insert_payload_size – The maximum byte size for a BigQuery legacy streaming insert payload.

  • use_cdc_writes – Configure the usage of CDC writes on BigQuery. The argument can be used by passing True and the Beam Rows will be sent as they are to the BigQuery sink which expects a ‘record’ and ‘row_mutation_info’ properties. Used for STORAGE_WRITE_API, working on ‘at least once’ mode.

  • primary_key – When using CDC write on BigQuery and CREATE_IF_NEEDED mode for the underlying tables a list of column names is required to be configured as the primary key. Used for STORAGE_WRITE_API, working on ‘at least once’ mode.

class Method[source]

Bases: object

DEFAULT = 'DEFAULT'
STREAMING_INSERTS = 'STREAMING_INSERTS'
FILE_LOADS = 'FILE_LOADS'
STORAGE_WRITE_API = 'STORAGE_WRITE_API'
static get_table_schema_from_string(schema)

Transform the string table schema into a TableSchema instance.

Parameters:

schema (str) – The string schema to be used if the BigQuery table to write has to be created.

Returns:

The schema to be used if the BigQuery table to write has to be created but in the TableSchema format.

Return type:

TableSchema

static table_schema_to_dict(table_schema)

Create a dictionary representation of table schema for serialization

static get_dict_table_schema(schema)

Transform the table schema into a dictionary instance.

Parameters:

schema (str, dict, TableSchema) – The schema to be used if the BigQuery table to write has to be created. This can either be a dict or string or in the TableSchema format.

Returns:

The schema to be used if the BigQuery table to write has to be created but in the dictionary format.

Return type:

Dict[str, Any]

expand(pcoll)[source]
display_data()[source]
to_runner_api_parameter(context)[source]
from_runner_api(payload, context)[source]
class apache_beam.io.gcp.bigquery.WriteResult(method: str | None = None, destination_load_jobid_pairs: PCollection[Tuple[str, JobReference]] | None = None, destination_file_pairs: PCollection[Tuple[str, Tuple[str, int]]] | None = None, destination_copy_jobid_pairs: PCollection[Tuple[str, JobReference]] | None = None, failed_rows: PCollection[Tuple[str, dict]] | None = None, failed_rows_with_errors: PCollection[Tuple[str, dict, list]] | None = None)[source]

Bases: object

The result of a WriteToBigQuery transform.

validate(valid_methods, attribute)[source]
property destination_load_jobid_pairs: PCollection[Tuple[str, JobReference]]

A FILE_LOADS method attribute

Returns: A PCollection of the table destinations that were successfully

loaded to using the batch load API, along with the load job IDs.

Raises: AttributeError: if accessed with a write method besides FILE_LOADS.

property destination_file_pairs: PCollection[Tuple[str, Tuple[str, int]]]

A FILE_LOADS method attribute

Returns: A PCollection of the table destinations along with the

temp files used as sources to load from.

Raises: AttributeError: if accessed with a write method besides FILE_LOADS.

property destination_copy_jobid_pairs: PCollection[Tuple[str, JobReference]]

A FILE_LOADS method attribute

Returns: A PCollection of the table destinations that were successfully

copied to, along with the copy job ID.

Raises: AttributeError: if accessed with a write method besides FILE_LOADS.

property failed_rows: PCollection[Tuple[str, dict]]

A [STREAMING_INSERTS, STORAGE_WRITE_API] method attribute

Returns: A PCollection of rows that failed when inserting to BigQuery.

Raises: AttributeError: if accessed with a write method besides [STREAMING_INSERTS, STORAGE_WRITE_API].

property failed_rows_with_errors: PCollection[Tuple[str, dict, list]]

A [STREAMING_INSERTS, STORAGE_WRITE_API] method attribute

Returns:

A PCollection of rows that failed when inserting to BigQuery, along with their errors.

Raises:
  • AttributeError – if accessed with a write method

  • besides [STREAMING_INSERTS, STORAGE_WRITE_API].

class apache_beam.io.gcp.bigquery.ReadFromBigQuery(gcs_location=None, method=None, use_native_datetime=False, output_type=None, *args, **kwargs)[source]

Bases: PTransform

Read data from BigQuery.

This PTransform uses a BigQuery export job to take a snapshot of the table on GCS, and then reads from each produced file. File format is Avro by default.

Parameters:
  • method – The method to use to read from BigQuery. It may be EXPORT or DIRECT_READ. EXPORT invokes a BigQuery export request (https://cloud.google.com/bigquery/docs/exporting-data). DIRECT_READ reads directly from BigQuery storage using the BigQuery Read API (https://cloud.google.com/bigquery/docs/reference/storage). If unspecified, the default is currently EXPORT.

  • use_native_datetime (bool) – By default this transform exports BigQuery DATETIME fields as formatted strings (for example: 2021-01-01T12:59:59). If True, BigQuery DATETIME fields will be returned as native Python datetime objects. This can only be used when ‘method’ is ‘DIRECT_READ’.

  • table (str, callable, ValueProvider) – The ID of the table, or a callable that returns it. If dataset argument is None then the table argument must contain the entire table reference specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. If it’s a callable, it must receive one argument representing an element to be written to BigQuery, and return a TableReference, or a string table name as specified above.

  • dataset (str) – The ID of the dataset containing this table or None if the table reference is specified entirely by the table argument.

  • project (str) – The ID of the project containing this table.

  • query (str, ValueProvider) – A query to be used instead of arguments table, dataset, and project.

  • validate (bool) – If True, various checks will be done when source gets initialized (e.g., is table present?). This should be True for most scenarios in order to catch errors as early as possible (pipeline construction instead of pipeline execution). It should be False if the table is created during pipeline execution by a previous step.

  • coder (Coder) – The coder for the table rows. If None, then the default coder is _JsonToDictCoder, which will interpret every row as a JSON serialized dictionary.

  • use_standard_sql (bool) – Specifies whether to use BigQuery’s standard SQL dialect for this query. The default value is False. If set to True, the query will use BigQuery’s updated SQL dialect with improved standards compliance. This parameter is ignored for table inputs.

  • flatten_results (bool) – Flattens all nested and repeated fields in the query results. The default value is True.

  • kms_key (str) – Optional Cloud KMS key name for use when creating new temporary tables.

  • gcs_location (str, ValueProvider) – The name of the Google Cloud Storage bucket where the extracted table should be written as a string or a ValueProvider. If None, then the temp_location parameter is used.

  • bigquery_job_labels (dict) – A dictionary with string labels to be passed to BigQuery export and query jobs created by this transform. See: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration

  • use_json_exports (bool) – By default, this transform works by exporting BigQuery data into Avro files, and reading those files. With this parameter, the transform will instead export to JSON files. JSON files are slower to read due to their larger size. When using JSON exports, the BigQuery types for DATE, DATETIME, TIME, and TIMESTAMP will be exported as strings. This behavior is consistent with BigQuerySource. When using Avro exports, these fields will be exported as native Python types (datetime.date, datetime.datetime, datetime.datetime, and datetime.datetime respectively). Avro exports are recommended. To learn more about BigQuery types, and Time-related type representations, see: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types To learn more about type conversions between BigQuery and Avro, see: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#avro_conversions

  • temp_dataset (apache_beam.io.gcp.internal.clients.bigquery.DatasetReference) – Temporary dataset reference to use when reading from BigQuery using a query. When reading using a query, BigQuery source will create a temporary dataset and a temporary table to store the results of the query. With this option, you can set an existing dataset to create the temporary table in. BigQuery source will create a temporary table in that dataset, and will remove it once it is not needed. Job needs access to create and delete tables within the given dataset. Dataset name should not start with the reserved prefix beam_temp_dataset_.

  • query_priority (BigQueryQueryPriority) – By default, this transform runs queries with BATCH priority. Use BigQueryQueryPriority.INTERACTIVE to run queries with INTERACTIVE priority. This option is ignored when reading from a table rather than a query. To learn more about query priority, see: https://cloud.google.com/bigquery/docs/running-queries

  • output_type (str) – By default, this source yields Python dictionaries (PYTHON_DICT). There is experimental support for producing a PCollection with a schema and yielding Beam Rows via the option BEAM_ROW. For more information on schemas, see https://beam.apache.org/documentation/programming-guide/#what-is-a-schema)

class Method[source]

Bases: object

EXPORT = 'EXPORT'
DIRECT_READ = 'DIRECT_READ'
COUNTER = 0
expand(pcoll)[source]
class apache_beam.io.gcp.bigquery.ReadFromBigQueryRequest(query: str | None = None, use_standard_sql: bool = True, table: str | TableReference | None = None, flatten_results: bool = False)[source]

Bases: object

Class that defines data to read from BQ.

Only one of query or table should be specified.

Parameters:
  • query – SQL query to fetch data.

  • use_standard_sql – Specifies whether to use BigQuery’s standard SQL dialect for this query. The default value is True. If set to False, the query will use BigQuery’s legacy SQL dialect. This parameter is ignored for table inputs.

  • table – The ID of the table to read. Table should define project and dataset (ex.: 'PROJECT:DATASET.TABLE').

  • flatten_results – Flattens all nested and repeated fields in the query results. The default value is False.

validate()[source]
class apache_beam.io.gcp.bigquery.ReadAllFromBigQuery(gcs_location: str | ValueProvider | None = None, validate: bool = False, kms_key: str | None = None, temp_dataset: str | DatasetReference | None = None, bigquery_job_labels: Dict[str, str] | None = None, query_priority: str = 'BATCH')[source]

Bases: PTransform

Read data from BigQuery.

PTransform:ReadFromBigQueryRequest->Rows

This PTransform uses a BigQuery export job to take a snapshot of the table on GCS, and then reads from each produced file. Data is exported into a new subdirectory for each export using UUIDs generated in ReadFromBigQueryRequest objects.

It is recommended not to use this PTransform for streaming jobs on GlobalWindow, since it will not be able to cleanup snapshots.

Parameters:
  • gcs_location (str) – The name of the Google Cloud Storage bucket where the extracted table should be written as a string. If None, then the temp_location parameter is used.

  • validate (bool) – If True, various checks will be done when source gets initialized (e.g., is table present?).

  • kms_key (str) – Experimental. Optional Cloud KMS key name for use when creating new temporary tables.

COUNTER = 0
expand(pcoll)[source]