apache_beam.io.gcp.bigquery_file_loads module

Functionality to perform file loads into BigQuery for Batch and Streaming pipelines.

This source is able to work around BigQuery load quotas and limitations. When destinations are dynamic, or when data for a single job is too large, the data will be split into multiple jobs.

NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.

apache_beam.io.gcp.bigquery_file_loads.file_prefix_generator(with_validation=True, pipeline_gcs_location=None, temp_location=None)[source]
class apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile(schema, max_files_per_bundle=20, max_file_size=4398046511104, file_format=None)[source]

Bases: apache_beam.transforms.core.DoFn

Write input records to files before triggering a load job.

This transform keeps up to max_files_per_bundle files open to write to. It receives (destination, record) tuples, and it writes the records to different files for each destination.

If there are more than max_files_per_bundle destinations that we need to write to, then those records are grouped by their destination, and later written to files by WriteGroupedRecordsToFile.

It outputs two PCollections.

Initialize a WriteRecordsToFile.

Parameters:
  • max_files_per_bundle (int) – The maximum number of files that can be kept open during execution of this step in a worker. This is to avoid over- whelming the worker memory.
  • max_file_size (int) – The maximum size in bytes for a file to be used in an export job.
UNWRITTEN_RECORD_TAG = 'UnwrittenRecords'
WRITTEN_FILE_TAG = 'WrittenFiles'
display_data()[source]
start_bundle()[source]
process(element, file_prefix, *schema_side_inputs)[source]

Take a tuple with (destination, row) and write to file or spill out.

Destination may be a TableReference or a string, and row is a Python dictionary for a row to be inserted to BigQuery.

finish_bundle()[source]
class apache_beam.io.gcp.bigquery_file_loads.WriteGroupedRecordsToFile(schema, max_file_size=4398046511104, file_format=None)[source]

Bases: apache_beam.transforms.core.DoFn

Receives collection of dest-iterable(records), writes it to files.

This is different from WriteRecordsToFile because it receives records grouped by destination. This means that it’s not necessary to keep multiple file descriptors open, because we know for sure when records for a single destination have been written out.

Experimental; no backwards compatibility guarantees.

process(element, file_prefix, *schema_side_inputs)[source]
class apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs(create_disposition=None, write_disposition=None, test_client=None, step_name=None)[source]

Bases: apache_beam.transforms.core.DoFn

Launches jobs to copy from temporary tables into the main target table.

When a job needs to write to multiple destination tables, or when a single destination table needs to have multiple load jobs to write to it, files are loaded into temporary tables, and those tables are later copied to the destination tables.

This transform emits (destination, job_reference) pairs.

TODO(BEAM-7822): In file loads method of writing to BigQuery,
copying from temp_tables to destination_table is not atomic. See: https://issues.apache.org/jira/browse/BEAM-7822
display_data()[source]
start_bundle()[source]
process(element, job_name_prefix=None)[source]
class apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs(schema=None, create_disposition=None, write_disposition=None, test_client=None, temporary_tables=False, additional_bq_parameters=None, source_format=None, step_name=None)[source]

Bases: apache_beam.transforms.core.DoFn

Triggers the import jobs to BQ.

Experimental; no backwards compatibility guarantees.

TEMP_TABLES = 'TemporaryTables'
display_data()[source]
start_bundle()[source]
process(element, load_job_name_prefix, *schema_side_inputs)[source]
class apache_beam.io.gcp.bigquery_file_loads.PartitionFiles(max_partition_size, max_files_per_partition)[source]

Bases: apache_beam.transforms.core.DoFn

MULTIPLE_PARTITIONS_TAG = 'MULTIPLE_PARTITIONS'
SINGLE_PARTITION_TAG = 'SINGLE_PARTITION'
class Partition(max_size, max_files, files=None, size=0)[source]

Bases: object

can_accept(file_size, no_of_files=1)[source]
add(file_path, file_size)[source]
process(element)[source]
class apache_beam.io.gcp.bigquery_file_loads.WaitForBQJobs(test_client=None)[source]

Bases: apache_beam.transforms.core.DoFn

Takes in a series of BQ job names as side input, and waits for all of them.

If any job fails, it will fail. If all jobs succeed, it will succeed.

Experimental; no backwards compatibility guarantees.

start_bundle()[source]
process(element, dest_ids_list)[source]
class apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn(test_client=None)[source]

Bases: apache_beam.transforms.core.DoFn

start_bundle()[source]
process(table_reference)[source]
class apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads(destination, schema=None, custom_gcs_temp_location=None, create_disposition=None, write_disposition=None, triggering_frequency=None, temp_file_format=None, max_file_size=None, max_files_per_bundle=None, max_partition_size=None, max_files_per_partition=None, additional_bq_parameters=None, table_side_inputs=None, schema_side_inputs=None, test_client=None, validate=True, is_streaming_pipeline=False)[source]

Bases: apache_beam.transforms.ptransform.PTransform

Takes in a set of elements, and inserts them to BigQuery via batch loads.

DESTINATION_JOBID_PAIRS = 'destination_load_jobid_pairs'
DESTINATION_FILE_PAIRS = 'destination_file_pairs'
DESTINATION_COPY_JOBID_PAIRS = 'destination_copy_jobid_pairs'
COUNT = 0
verify()[source]
expand(pcoll)[source]