apache_beam.io.gcp.bigquery module¶
BigQuery sources and sinks.
This module implements reading from and writing to BigQuery tables. It relies on several classes exposed by the BigQuery API: TableSchema, TableFieldSchema, TableRow, and TableCell. The default mode is to return table rows read from a BigQuery source as dictionaries. Similarly a Write transform to a BigQuerySink accepts PCollections of dictionaries. This is done for more convenient programming. If desired, the native TableRow objects can be used throughout to represent rows (use an instance of TableRowJsonCoder as a coder argument when creating the sources or sinks respectively).
Also, for programming convenience, instances of TableReference and TableSchema have a string representation that can be used for the corresponding arguments:
- TableReference can be a PROJECT:DATASET.TABLE or DATASET.TABLE string.
- TableSchema can be a NAME:TYPE{,NAME:TYPE}* string (e.g. ‘month:STRING,event_count:INTEGER’).
The syntax supported is described here: https://cloud.google.com/bigquery/bq-command-line-tool-quickstart
BigQuery sources can be used as main inputs or side inputs. A main input (common case) is expected to be massive and will be split into manageable chunks and processed in parallel. Side inputs are expected to be small and will be read completely every time a ParDo DoFn gets executed. In the example below the lambda function implementing the DoFn for the Map transform will get on each call one row of the main table and all rows of the side table. The runner may use some caching techniques to share the side inputs between calls in order to avoid excessive reading::
main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource()
side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource()
results = (
main_table
| 'ProcessData' >> beam.Map(
lambda element, side_input: ..., AsList(side_table)))
There is no difference in how main and side inputs are read. What makes the side_table a ‘side input’ is the AsList wrapper used when passing the table as a parameter to the Map transform. AsList signals to the execution framework that its input should be made available whole.
The main and side inputs are implemented differently. Reading a BigQuery table as main input entails exporting the table to a set of GCS files (currently in JSON format) and then processing those files. Reading the same table as a side input entails querying the table for all its rows. The coder argument on BigQuerySource controls the reading of the lines in the export files (i.e., transform a JSON object into a PCollection element). The coder is not involved when the same table is read as a side input since there is no intermediate format involved. We get the table rows directly from the BigQuery service with a query.
Users may provide a query to read from rather than reading all of a BigQuery table. If specified, the result obtained by executing the specified query will be used as the data of the input transform.:
query_results = pipeline | beam.io.Read(beam.io.BigQuerySource(
query='SELECT year, mean_temp FROM samples.weather_stations'))
When creating a BigQuery input transform, users should provide either a query or a table. Pipeline construction will fail with a validation error if neither or both are specified.
Time partitioned tables
BigQuery sink currently does not fully support writing to BigQuery time partitioned tables. But writing to a single partition may work if that does not involve creating a new table (for example, when writing to an existing table with create_disposition=CREATE_NEVER and write_disposition=WRITE_APPEND). BigQuery source supports reading from a single time partition with the partition decorator specified as a part of the table identifier.
* Short introduction to BigQuery concepts * Tables have rows (TableRow) and each row has cells (TableCell). A table has a schema (TableSchema), which in turn describes the schema of each cell (TableFieldSchema). The terms field and cell are used interchangeably.
- TableSchema: Describes the schema (types and order) for values in each row.
- Has one attribute, ‘field’, which is list of TableFieldSchema objects.
- TableFieldSchema: Describes the schema (type, name) for one field.
- Has several attributes, including ‘name’ and ‘type’. Common values for the type attribute are: ‘STRING’, ‘INTEGER’, ‘FLOAT’, ‘BOOLEAN’, ‘NUMERIC’. All possible values are described at: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
- TableRow: Holds all values in a table row. Has one attribute, ‘f’, which is a
- list of TableCell instances.
- TableCell: Holds the value for one cell (or field). Has one attribute,
- ‘v’, which is a JsonValue instance. This class is defined in apitools.base.py.extra_types.py module.
As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits).
-
class
apache_beam.io.gcp.bigquery.
TableRowJsonCoder
(table_schema=None)[source]¶ Bases:
apache_beam.coders.coders.Coder
A coder for a TableRow instance to/from a JSON string.
Note that the encoding operation (used when writing to sinks) requires the table schema in order to obtain the ordered list of field names. Reading from sources on the other hand does not need the table schema.
-
class
apache_beam.io.gcp.bigquery.
BigQueryDisposition
[source]¶ Bases:
future.types.newobject.newobject
Class holding standard strings used for create and write dispositions.
-
CREATE_NEVER
= 'CREATE_NEVER'¶
-
CREATE_IF_NEEDED
= 'CREATE_IF_NEEDED'¶
-
WRITE_TRUNCATE
= 'WRITE_TRUNCATE'¶
-
WRITE_APPEND
= 'WRITE_APPEND'¶
-
WRITE_EMPTY
= 'WRITE_EMPTY'¶
-
-
class
apache_beam.io.gcp.bigquery.
BigQuerySource
(table=None, dataset=None, project=None, query=None, validate=False, coder=None, use_standard_sql=False, flatten_results=True)[source]¶ Bases:
apache_beam.runners.dataflow.native_io.iobase.NativeSource
A source based on a BigQuery table.
Initialize a
BigQuerySource
.Parameters: - table (str) – The ID of a BigQuery table. If specified all data of the
table will be used as input of the current source. The ID must contain
only letters
a-z
,A-Z
, numbers0-9
, or underscores_
. If dataset and query arguments areNone
then the table argument must contain the entire table reference specified as:'DATASET.TABLE'
or'PROJECT:DATASET.TABLE'
. - dataset (str) – The ID of the dataset containing this table or
None
if the table reference is specified entirely by the table argument or a query is specified. - project (str) – The ID of the project containing this table or
None
if the table reference is specified entirely by the table argument or a query is specified. - query (str) – A query to be used instead of arguments table, dataset, and project.
- validate (bool) – If
True
, various checks will be done when source gets initialized (e.g., is table present?). This should beTrue
for most scenarios in order to catch errors as early as possible (pipeline construction instead of pipeline execution). It should beFalse
if the table is created during pipeline execution by a previous step. - coder (Coder) – The coder for the table
rows if serialized to disk. If
None
, then the default coder isRowAsDictJsonCoder
, which will interpret every line in a file as a JSON serialized dictionary. This argument needs a value only in special cases when returning table rows as dictionaries is not desirable. - use_standard_sql (bool) – Specifies whether to use BigQuery’s standard SQL
dialect for this query. The default value is
False
. If set toTrue
, the query will use BigQuery’s updated SQL dialect with improved standards compliance. This parameter is ignored for table inputs. - flatten_results (bool) – Flattens all nested and repeated fields in the
query results. The default value is
True
.
Raises: ValueError
– if any of the following is true:- the table reference as a string does not match the expected format
- neither a table nor a query is specified
- both a table and a query is specified.
-
format
¶ Source format name required for remote execution.
- table (str) – The ID of a BigQuery table. If specified all data of the
table will be used as input of the current source. The ID must contain
only letters
-
class
apache_beam.io.gcp.bigquery.
BigQuerySink
(table, dataset=None, project=None, schema=None, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_EMPTY', validate=False, coder=None)[source]¶ Bases:
apache_beam.runners.dataflow.native_io.iobase.NativeSink
A sink based on a BigQuery table.
Initialize a BigQuerySink.
Parameters: - table (str) – The ID of the table. The ID must contain only letters
a-z
,A-Z
, numbers0-9
, or underscores_
. If dataset argument isNone
then the table argument must contain the entire table reference specified as:'DATASET.TABLE'
or'PROJECT:DATASET.TABLE'
. - dataset (str) – The ID of the dataset containing this table or
None
if the table reference is specified entirely by the table argument. - project (str) – The ID of the project containing this table or
None
if the table reference is specified entirely by the table argument. - schema (str) – The schema to be used if the BigQuery table to write has
to be created. This can be either specified as a
TableSchema
object or a single string of the form'field1:type1,field2:type2,field3:type3'
that defines a comma separated list of fields. Here'type'
should specify the BigQuery type of the field. Single string based schemas do not support nested fields, repeated fields, or specifying a BigQuery mode for fields (mode will always be set to'NULLABLE'
). - create_disposition (BigQueryDisposition) –
A string describing what happens if the table does not exist. Possible values are:
BigQueryDisposition.CREATE_IF_NEEDED
: create if does not exist.BigQueryDisposition.CREATE_NEVER
: fail the write if does not exist.
- write_disposition (BigQueryDisposition) –
A string describing what happens if the table has already some data. Possible values are:
BigQueryDisposition.WRITE_TRUNCATE
: delete existing rows.BigQueryDisposition.WRITE_APPEND
: add to existing rows.BigQueryDisposition.WRITE_EMPTY
: fail the write if table not empty.
- validate (bool) – If
True
, various checks will be done when sink gets initialized (e.g., is table present given the disposition arguments?). This should beTrue
for most scenarios in order to catch errors as early as possible (pipeline construction instead of pipeline execution). It should beFalse
if the table is created during pipeline execution by a previous step. - coder (Coder) – The coder for the
table rows if serialized to disk. If
None
, then the default coder isRowAsDictJsonCoder
, which will interpret every element written to the sink as a dictionary that will be JSON serialized as a line in a file. This argument needs a value only in special cases when writing table rows as dictionaries is not desirable.
Raises: TypeError
– if the schema argument is not astr
or aTableSchema
object.ValueError
– if the table reference as a string does not match the expected format.
-
format
¶ Sink format name required for remote execution.
- table (str) – The ID of the table. The ID must contain only letters
-
class
apache_beam.io.gcp.bigquery.
WriteToBigQuery
(table, dataset=None, project=None, schema=None, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_APPEND', batch_size=None, test_client=None)[source]¶ Bases:
apache_beam.transforms.ptransform.PTransform
Initialize a WriteToBigQuery transform.
Parameters: - table (str) – The ID of the table. The ID must contain only letters
a-z
,A-Z
, numbers0-9
, or underscores_
. If dataset argument isNone
then the table argument must contain the entire table reference specified as:'DATASET.TABLE'
or'PROJECT:DATASET.TABLE'
. - dataset (str) – The ID of the dataset containing this table or
None
if the table reference is specified entirely by the table argument. - project (str) – The ID of the project containing this table or
None
if the table reference is specified entirely by the table argument. - schema (str) – The schema to be used if the BigQuery table to write has to
be created. This can be either specified as a
TableSchema
object or a single string of the form'field1:type1,field2:type2,field3:type3'
that defines a comma separated list of fields. Here'type'
should specify the BigQuery type of the field. Single string based schemas do not support nested fields, repeated fields, or specifying a BigQuery mode for fields (mode will always be set to'NULLABLE'
). - create_disposition (BigQueryDisposition) –
A string describing what happens if the table does not exist. Possible values are:
BigQueryDisposition.CREATE_IF_NEEDED
: create if does not exist.BigQueryDisposition.CREATE_NEVER
: fail the write if does not exist.
- write_disposition (BigQueryDisposition) –
A string describing what happens if the table has already some data. Possible values are:
BigQueryDisposition.WRITE_TRUNCATE
: delete existing rows.BigQueryDisposition.WRITE_APPEND
: add to existing rows.BigQueryDisposition.WRITE_EMPTY
: fail the write if table not empty.
For streaming pipelines WriteTruncate can not be used.
- batch_size (int) – Number of rows to be written to BQ per streaming API insert.
- test_client – Override the default bigquery client used for testing.
-
static
get_table_schema_from_string
(schema)[source]¶ Transform the string table schema into a
TableSchema
instance.Parameters: schema (str) – The sting schema to be used if the BigQuery table to write has to be created. Returns: The schema to be used if the BigQuery table to write has to be created but in the TableSchema
format.Return type: TableSchema
-
static
table_schema_to_dict
(table_schema)[source]¶ Create a dictionary representation of table schema for serialization
-
static
get_dict_table_schema
(schema)[source]¶ Transform the table schema into a dictionary instance.
Parameters: schema (TableSchema) – The schema to be used if the BigQuery table to write has to be created. This can either be a dict or string or in the TableSchema format. Returns: The schema to be used if the BigQuery table to write has to be created but in the dictionary format. Return type: Dict[str, Any]
- table (str) – The ID of the table. The ID must contain only letters