apache_beam.io.gcp.bigquery_tools module

Tools used by BigQuery sources and sinks.

Classes, constants and functions in this file are experimental and have no backwards compatibility guarantees.

These tools include wrappers and clients to interact with BigQuery APIs.

NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.

class apache_beam.io.gcp.bigquery_tools.FileFormat[source]

Bases: object

CSV = 'CSV'
JSON = 'NEWLINE_DELIMITED_JSON'
AVRO = 'AVRO'
class apache_beam.io.gcp.bigquery_tools.ExportCompression[source]

Bases: object

GZIP = 'GZIP'
DEFLATE = 'DEFLATE'
SNAPPY = 'SNAPPY'
NONE = 'NONE'
apache_beam.io.gcp.bigquery_tools.default_encoder(obj)[source]
apache_beam.io.gcp.bigquery_tools.get_hashable_destination(destination)[source]

Parses a table reference into a (project, dataset, table) tuple.

Parameters:destination – Either a TableReference object from the bigquery API. The object has the following attributes: projectId, datasetId, and tableId. Or a string representing the destination containing ‘PROJECT:DATASET.TABLE’.
Returns:A string representing the destination containing ‘PROJECT:DATASET.TABLE’.
apache_beam.io.gcp.bigquery_tools.to_hashable_table_ref(table_ref_elem_kv: Tuple[Union[str, apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableReference], V]) → Tuple[str, V][source]

Turns the key of the input tuple to its string representation. The key should be either a string or a TableReference.

Parameters:table_ref_elem_kv – A tuple of table reference and element.
Returns:A tuple of string representation of input table and input element.
apache_beam.io.gcp.bigquery_tools.parse_table_schema_from_json(schema_string)[source]

Parse the Table Schema provided as string.

Parameters:schema_string – String serialized table schema, should be a valid JSON.
Returns:A TableSchema of the BigQuery export from either the Query or the Table.
apache_beam.io.gcp.bigquery_tools.parse_table_reference(table, dataset=None, project=None)[source]

Parses a table reference into a (project, dataset, table) tuple.

Parameters:
  • table – The ID of the table. The ID must contain only letters (a-z, A-Z), numbers (0-9), connectors (-_). If dataset argument is None then the table argument must contain the entire table reference: ‘DATASET.TABLE’ or ‘PROJECT:DATASET.TABLE’. This argument can be a TableReference instance in which case dataset and project are ignored and the reference is returned as a result. Additionally, for date partitioned tables, appending ‘$YYYYmmdd’ to the table name is supported, e.g. ‘DATASET.TABLE$YYYYmmdd’.
  • dataset – The ID of the dataset containing this table or null if the table reference is specified entirely by the table argument.
  • project – The ID of the project containing this table or null if the table reference is specified entirely by the table (and possibly dataset) argument.
Returns:

A TableReference object from the bigquery API. The object has the following attributes: projectId, datasetId, and tableId. If the input is a TableReference object, a new object will be returned.

Raises:

ValueError – if the table reference as a string does not match the expected format.

class apache_beam.io.gcp.bigquery_tools.BigQueryWrapper(client=None, temp_dataset_id=None)[source]

Bases: object

BigQuery client wrapper with utilities for querying.

The wrapper is used to organize all the BigQuery integration points and offer a common place where retry logic for failures can be controlled. In addition it offers various functions used both in sources and sinks (e.g., find and create tables, query a table, etc.).

TEMP_TABLE = 'temp_table_'
TEMP_DATASET = 'temp_dataset_'
HISTOGRAM_METRIC_LOGGER = <apache_beam.internal.metrics.metric.MetricLogger object>
unique_row_id

Returns a unique row ID (str) used to avoid multiple insertions.

If the row ID is provided, BigQuery will make a best effort to not insert the same row multiple times for fail and retry scenarios in which the insert request may be issued several times. This comes into play for sinks executed in a local runner.

Returns:a unique row ID string
get_query_location(project_id, query, use_legacy_sql)[source]

Get the location of tables referenced in a query.

This method returns the location of the first available referenced table for user in the query and depends on the BigQuery service to provide error handling for queries that reference tables in multiple locations.

wait_for_bq_job(job_reference, sleep_duration_sec=5, max_retries=0)[source]

Poll job until it is DONE.

Parameters:
  • job_reference – bigquery.JobReference instance.
  • sleep_duration_sec – Specifies the delay in seconds between retries.
  • max_retries – The total number of times to retry. If equals to 0, the function waits forever.
Raises:

RuntimeError – If the job is FAILED or the number of retries has been reached.

get_table(project_id, dataset_id, table_id)[source]

Lookup a table’s metadata object.

Parameters:
  • client – bigquery.BigqueryV2 instance
  • project_id – table lookup parameter
  • dataset_id – table lookup parameter
  • table_id – table lookup parameter
Returns:

bigquery.Table instance

Raises:

HttpError – if lookup failed.

get_or_create_dataset(project_id, dataset_id, location=None)[source]
get_table_location(project_id, dataset_id, table_id)[source]
create_temporary_dataset(project_id, location)[source]
clean_up_temporary_dataset(project_id)[source]
get_job(project, job_id, location=None)[source]
perform_load_job(destination, job_id, source_uris=None, source_stream=None, schema=None, write_disposition=None, create_disposition=None, additional_load_parameters=None, source_format=None, job_labels=None)[source]

Starts a job to load data into BigQuery.

Returns:bigquery.JobReference with the information about the job that was started.
perform_extract_job(destination, job_id, table_reference, destination_format, project=None, include_header=True, compression='NONE', use_avro_logical_types=False, job_labels=None)[source]

Starts a job to export data from BigQuery.

Returns:bigquery.JobReference with the information about the job that was started.
get_or_create_table(project_id, dataset_id, table_id, schema, create_disposition, write_disposition, additional_create_parameters=None)[source]

Gets or creates a table based on create and write dispositions.

The function mimics the behavior of BigQuery import jobs when using the same create and write dispositions.

Parameters:
  • project_id – The project id owning the table.
  • dataset_id – The dataset id owning the table.
  • table_id – The table id.
  • schema – A bigquery.TableSchema instance or None.
  • create_disposition – CREATE_NEVER or CREATE_IF_NEEDED.
  • write_disposition – WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE.
Returns:

A bigquery.Table instance if table was found or created.

Raises:

RuntimeError – For various mismatches between the state of the table and the create/write dispositions passed in. For example if the table is not empty and WRITE_EMPTY was specified then an error will be raised since the table was expected to be empty.

run_query(project_id, query, use_legacy_sql, flatten_results, dry_run=False, job_labels=None)[source]
insert_rows(project_id, dataset_id, table_id, rows, insert_ids=None, skip_invalid_rows=False)[source]

Inserts rows into the specified table.

Parameters:
  • project_id – The project id owning the table.
  • dataset_id – The dataset id owning the table.
  • table_id – The table id.
  • rows – A list of plain Python dictionaries. Each dictionary is a row and each key in it is the name of a field.
  • skip_invalid_rows – If there are rows with insertion errors, whether they should be skipped, and all others should be inserted successfully.
Returns:

A tuple (bool, errors). If first element is False then the second element will be a bigquery.InserttErrorsValueListEntry instance containing specific errors.

convert_row_to_dict(row, schema)[source]

Converts a TableRow instance using the schema to a Python dict.

class apache_beam.io.gcp.bigquery_tools.BigQueryReader(source, test_bigquery_client=None, use_legacy_sql=True, flatten_results=True, kms_key=None)[source]

Bases: apache_beam.runners.dataflow.native_io.iobase.NativeSourceReader

A reader for a BigQuery source.

class apache_beam.io.gcp.bigquery_tools.BigQueryWriter(sink, test_bigquery_client=None, buffer_size=None)[source]

Bases: apache_beam.runners.dataflow.native_io.iobase.NativeSinkWriter

The sink writer for a BigQuerySink.

Write(row)[source]
class apache_beam.io.gcp.bigquery_tools.RowAsDictJsonCoder[source]

Bases: apache_beam.coders.coders.Coder

A coder for a table row (represented as a dict) to/from a JSON string.

This is the default coder for sources and sinks if the coder argument is not specified.

encode(table_row)[source]
decode(encoded_table_row)[source]
to_type_hint()[source]
class apache_beam.io.gcp.bigquery_tools.JsonRowWriter(file_handle)[source]

Bases: io.IOBase

A writer which provides an IOBase-like interface for writing table rows (represented as dicts) as newline-delimited JSON strings.

Initialize an JsonRowWriter.

Parameters:file_handle (io.IOBase) – Output stream to write to.
close()[source]
closed
flush()[source]
read(size=-1)[source]
tell()[source]
writable()[source]
write(row)[source]
class apache_beam.io.gcp.bigquery_tools.AvroRowWriter(file_handle, schema)[source]

Bases: io.IOBase

A writer which provides an IOBase-like interface for writing table rows (represented as dicts) as Avro records.

Initialize an AvroRowWriter.

Parameters:
  • file_handle (io.IOBase) – Output stream to write Avro records to.
  • schema (Dict[Text, Any]) – BigQuery table schema.
close()[source]
closed
flush()[source]
read(size=-1)[source]
tell()[source]
writable()[source]
write(row)[source]
class apache_beam.io.gcp.bigquery_tools.RetryStrategy[source]

Bases: object

RETRY_ALWAYS = 'RETRY_ALWAYS'
RETRY_NEVER = 'RETRY_NEVER'
RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
static should_retry(strategy, error_message)[source]
class apache_beam.io.gcp.bigquery_tools.AppendDestinationsFn(destination)[source]

Bases: apache_beam.transforms.core.DoFn

Adds the destination to an element, making it a KV pair.

Outputs a PCollection of KV-pairs where the key is a TableReference for the destination, and the value is the record itself.

Experimental; no backwards compatibility guarantees.

display_data()[source]
process(element, *side_inputs)[source]
apache_beam.io.gcp.bigquery_tools.get_table_schema_from_string(schema)[source]

Transform the string table schema into a TableSchema instance.

Parameters:schema (str) – The sting schema to be used if the BigQuery table to write has to be created.
Returns:The schema to be used if the BigQuery table to write has to be created but in the TableSchema format.
Return type:TableSchema
apache_beam.io.gcp.bigquery_tools.table_schema_to_dict(table_schema)[source]

Create a dictionary representation of table schema for serialization

apache_beam.io.gcp.bigquery_tools.get_dict_table_schema(schema)[source]

Transform the table schema into a dictionary instance.

Parameters:schema (str, dict, TableSchema) – The schema to be used if the BigQuery table to write has to be created. This can either be a dict or string or in the TableSchema format.
Returns:The schema to be used if the BigQuery table to write has to be created but in the dictionary format.
Return type:Dict[str, Any]
apache_beam.io.gcp.bigquery_tools.get_avro_schema_from_table_schema(schema)[source]

Transform the table schema into an Avro schema.

Parameters:schema (str, dict, TableSchema) – The TableSchema to convert to Avro schema. This can either be a dict or string or in the TableSchema format.
Returns:An Avro schema, which can be used by fastavro.
Return type:Dict[str, Any]
class apache_beam.io.gcp.bigquery_tools.BigQueryJobTypes[source]

Bases: object

EXPORT = 'EXPORT'
COPY = 'COPY'
LOAD = 'LOAD'
QUERY = 'QUERY'
apache_beam.io.gcp.bigquery_tools.generate_bq_job_name(job_name, step_id, job_type, random=None)[source]
apache_beam.io.gcp.bigquery_tools.check_schema_equal(left, right, *, ignore_descriptions=False, ignore_field_order=False)[source]

Check whether schemas are equivalent.

This comparison function differs from using == to compare TableSchema because it ignores categories, policy tags, descriptions (optionally), and field ordering (optionally).

Parameters:
  • left (TableSchema, TableFieldSchema) – One schema to compare.
  • right (TableSchema, TableFieldSchema) – The other schema to compare.
  • ignore_descriptions (bool) – (optional) Whether or not to ignore field descriptions when comparing. Defaults to False.
  • ignore_field_order (bool) – (optional) Whether or not to ignore struct field order when comparing. Defaults to False.
Returns:

True if the schemas are equivalent, False otherwise.

Return type:

bool