Class BigQueryIO

java.lang.Object
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO

public class BigQueryIO extends Object
PTransforms for reading and writing BigQuery tables.

Table References

A fully-qualified BigQuery table name consists of three components:

  • projectId: the Cloud project id (defaults to GcpOptions.getProject()).
  • datasetId: the BigQuery dataset id, unique within a project.
  • tableId: a table id, unique within a dataset.

BigQuery table references are stored as a TableReference, which comes from the BigQuery Java Client API. Tables can be referred to as Strings, with or without the projectId. A helper function is provided (BigQueryHelpers.parseTableSpec(String)) that parses the following string forms into a TableReference:

  • [project_id]:[dataset_id].[table_id]
  • [dataset_id].[table_id]

BigQuery Concepts

Tables have rows (TableRow) and each row has cells (TableCell). A table has a schema (TableSchema), which in turn describes the schema of each cell (TableFieldSchema). The terms field and cell are used interchangeably.

TableSchema: describes the schema (types and order) for values in each row. It has one attribute, 'fields', which is list of TableFieldSchema objects.

TableFieldSchema: describes the schema (type, name) for one field. It has several attributes, including 'name' and 'type'. Common values for the type attribute are: 'STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'NUMERIC', 'GEOGRAPHY'. All possible values are described at: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types

TableRow: Holds all values in a table row. Has one attribute, 'f', which is a list of TableCell instances.

TableCell: Holds the value for one cell (or field). Has one attribute, 'v', which is the value of the table cell.

As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with Well-Known Text (See https://en.wikipedia.org/wiki/Well-known_text) format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to be encoded using base64 encoding when writing to BigQuery. When bytes are read from BigQuery they are returned as base64-encoded strings.

Reading

Reading from BigQuery is supported by read(SerializableFunction), which parses records in AVRO format into a custom type (see the table below for type conversion) using a specified parse function, and by readTableRows() which parses them into TableRow, which may be more convenient but has lower performance.

Both functions support reading either from a table or from the result of a query, via BigQueryIO.TypedRead.from(String) and BigQueryIO.TypedRead.fromQuery(java.lang.String) respectively. Exactly one of these must be specified.

If you are reading from an authorized view wih BigQueryIO.TypedRead.fromQuery(java.lang.String), you need to use BigQueryIO.TypedRead.withQueryLocation(String) to set the location of the BigQuery job. Otherwise, Beam will ty to determine that location by reading the metadata of the dataset that contains the underlying tables. With authorized views, that will result in a 403 error and the query will not be resolved.

Type Conversion Table

BigQuery standard SQL type Avro type Java type
BOOLEAN boolean Boolean
INT64 long Long
FLOAT64 double Double
BYTES bytes java.nio.ByteBuffer
STRING string CharSequence
DATE int Integer
DATETIME string CharSequence
TIMESTAMP long Long
TIME long Long
NUMERIC bytes java.nio.ByteBuffer
GEOGRAPHY string CharSequence
ARRAY array java.util.Collection
STRUCT record org.apache.avro.generic.GenericRecord

Example: Reading rows of a table as TableRow.


 PCollection<TableRow> weatherData = pipeline.apply(
     BigQueryIO.readTableRows().from("apache-beam-testing.samples.weather_stations"));
 
Example: Reading rows of a table and parsing them into a custom type.

 PCollection<WeatherRecord> weatherData = pipeline.apply(
    BigQueryIO
      .read(new SerializableFunction<SchemaAndRecord, WeatherRecord>() {
        public WeatherRecord apply(SchemaAndRecord schemaAndRecord) {
          return new WeatherRecord(...);
        }
      })
      .from("apache-beam-testing.samples.weather_stations"))
      .withCoder(SerializableCoder.of(WeatherRecord.class));
 

Note: When using read(SerializableFunction), you may sometimes need to use BigQueryIO.TypedRead.withCoder(Coder) to specify a Coder for the result type, if Beam fails to infer it automatically.

Example: Reading results of a query as TableRow.


 PCollection<TableRow> meanTemperatureData = pipeline.apply(BigQueryIO.readTableRows()
     .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
 

Users can optionally specify a query priority using BigQueryIO.TypedRead.withQueryPriority(TypedRead.QueryPriority) and a geographic location where the query will be executed using BigQueryIO.TypedRead.withQueryLocation(String). Query location must be specified for jobs that are not executed in US or EU, or if you are reading from an authorized view. See BigQuery Jobs: query.

Writing

To write to a BigQuery table, apply a BigQueryIO.Write transformation. This consumes a PCollection of a user-defined type when using write() (recommended), or a PCollection of TableRows as input when using writeTableRows() (not recommended). When using a user-defined type, one of the following must be provided.

If BigQueryIO.Write.withAvroFormatFunction(SerializableFunction) or BigQueryIO.Write.withAvroWriter(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.Schema, org.apache.avro.io.DatumWriter<T>>) is used, the table schema MUST be specified using one of the BigQueryIO.Write.withJsonSchema(String), BigQueryIO.Write.withJsonSchema(ValueProvider), BigQueryIO.Write.withSchemaFromView(PCollectionView) methods, or BigQueryIO.Write.to(DynamicDestinations).

 class Quote {
   final Instant timestamp;
   final String exchange;
   final String symbol;
   final double price;

   Quote(Instant timestamp, String exchange, String symbol, double price) {
     // initialize all member variables.
   }
 }

 PCollection<Quote> quotes = ...

 quotes.apply(BigQueryIO
     .<Quote>write()
     .to("my-project:my_dataset.my_table")
     .withSchema(new TableSchema().setFields(
         ImmutableList.of(
           new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"),
           new TableFieldSchema().setName("exchange").setType("STRING"),
           new TableFieldSchema().setName("symbol").setType("STRING"),
           new TableFieldSchema().setName("price").setType("FLOAT"))))
     .withFormatFunction(quote -> new TableRow().set(..set the columns..))
     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
 

See BigQueryIO.Write for details on how to specify if a write should append to an existing table, replace the table, or verify that the table is empty. Note that the dataset being written to must already exist. Unbounded PCollections can only be written using BigQueryIO.Write.WriteDisposition.WRITE_EMPTY or BigQueryIO.Write.WriteDisposition.WRITE_APPEND.

BigQueryIO supports automatically inferring the BigQuery table schema from the Beam schema on the input PCollection. Beam can also automatically format the input into a TableRow in this case, if no format function is provide. In the above example, the quotes PCollection has a schema that Beam infers from the Quote POJO. So the write could be done more simply as follows:


 {@literal @}DefaultSchema(JavaFieldSchema.class)
 class Quote {
   final Instant timestamp;
   final String exchange;
   final String symbol;
   final double price;

   {@literal @}SchemaCreate
   Quote(Instant timestamp, String exchange, String symbol, double price) {
     // initialize all member variables.
   }
 }

 PCollection<Quote> quotes = ...

 quotes.apply(BigQueryIO
     .<Quote>write()
     .to("my-project:my_dataset.my_table")
     .useBeamSchema()
     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
 

Loading historical data into time-partitioned BigQuery tables

To load historical data into a time-partitioned BigQuery table, specify BigQueryIO.Write.withTimePartitioning(com.google.api.services.bigquery.model.TimePartitioning) with a field used for column-based partitioning. For example:


 PCollection<Quote> quotes = ...;

 quotes.apply(BigQueryIO.write()
         .withSchema(schema)
         .withFormatFunction(quote -> new TableRow()
            .set("timestamp", quote.getTimestamp())
            .set(..other columns..))
         .to("my-project:my_dataset.my_table")
         .withTimePartitioning(new TimePartitioning().setField("time")));
 

Writing different values to different tables

A common use case is to dynamically generate BigQuery table names based on the current value. To support this, BigQueryIO.Write.to(SerializableFunction) accepts a function mapping the current element to a tablespec. For example, here's code that outputs quotes of different stocks to different tables:


 PCollection<Quote> quotes = ...;

 quotes.apply(BigQueryIO.write()
         .withSchema(schema)
         .withFormatFunction(quote -> new TableRow()...)
         .to((ValueInSingleWindow<Quote> quote) -> {
             String symbol = quote.getSymbol();
             return new TableDestination(
                 "my-project:my_dataset.quotes_" + symbol, // Table spec
                 "Quotes of stock " + symbol // Table description
               );
           });
 

Per-table schemas can also be provided using BigQueryIO.Write.withSchemaFromView(org.apache.beam.sdk.values.PCollectionView<java.util.Map<java.lang.String, java.lang.String>>). This allows you the schemas to be calculated based on a previous pipeline stage or statically via a Create transform. This method expects to receive a map-valued PCollectionView, mapping table specifications (project:dataset.table-id), to JSON formatted TableSchema objects. All destination tables must be present in this map, or the pipeline will fail to create tables. Care should be taken if the map value is based on a triggered aggregation over and unbounded PCollection; the side input will contain the entire history of all table schemas ever generated, which might blow up memory usage. This method can also be useful when writing to a single table, as it allows a previous stage to calculate the schema (possibly based on the full collection of records being written to BigQuery).

For the most general form of dynamic table destinations and schemas, look at BigQueryIO.Write.to(DynamicDestinations).

Insertion Method

BigQueryIO.Write supports two methods of inserting data into BigQuery specified using BigQueryIO.Write.withMethod(org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method). If no method is supplied, then a default method will be chosen based on the input PCollection. See BigQueryIO.Write.Method for more information about the methods. The different insertion methods provide different tradeoffs of cost, quota, and data consistency; please see BigQuery documentation for more information about these tradeoffs.

Usage with templates

When using read() or readTableRows() in a template, it's required to specify BigQueryIO.Read.withTemplateCompatibility(). Specifying this in a non-template pipeline is not recommended because it has somewhat lower performance.

When using write() or writeTableRows() with batch loads in a template, it is recommended to specify BigQueryIO.Write.withCustomGcsTempLocation(org.apache.beam.sdk.options.ValueProvider<java.lang.String>). Writing to BigQuery via batch loads involves writing temporary files to this location, so the location must be accessible at pipeline execution time. By default, this location is captured at pipeline construction time, may be inaccessible if the template may be reused from a different project or at a moment when the original location no longer exists. BigQueryIO.Write.withCustomGcsTempLocation(ValueProvider) allows specifying the location as an argument to the template invocation.

Permissions

Permission requirements depend on the PipelineRunner that is used to execute the pipeline. Please refer to the documentation of corresponding PipelineRunners for more details.

Please see BigQuery Access Control for security and permission related information specific to BigQuery.

Updates to the I/O connector code

For any significant updates to this I/O connector, please consider involving corresponding code reviewers mentioned here.

Upserts and deletes

The connector also supports streaming row updates to BigQuery, with the following qualifications:

- Only the STORAGE_WRITE_API_AT_LEAST_ONCE method is supported.

- If the table is not previously created and CREATE_IF_NEEDED is used, a primary key must be specified using BigQueryIO.Write.withPrimaryKey(java.util.List<java.lang.String>).

Two types of updates are supported. UPSERT replaces the row with the matching primary key or inserts the row if non exists. DELETE removes the row with the matching primary key. Row inserts are still allowed as normal using a separate instance of the sink, however care must be taken not to violate primary key uniqueness constraints, as those constraints are not enforced by BigQuery. If a table contains multiple rows with the same primary key, then row updates may not work as expected.

Since PCollections are unordered, in order to properly sequence updates a sequence number must be set on each update. BigQuery uses this sequence number to ensure that updates are correctly applied to the table even if they arrive out of order.

The simplest way to apply row updates if applying TableRow object is to use the applyRowMutations() method. Each RowMutation element contains a TableRow, an update type (UPSERT or DELETE), and a sequence number to order the updates.


 PCollection<TableRow> rows = ...;
 row.apply(MapElements
       .into(new TypeDescriptor<RowMutation>(){})
       .via(tableRow -> RowMutation.of(tableRow, getUpdateType(tableRow), getSequenceNumber(tableRow))))
    .apply(BigQueryIO.applyRowMutations()
           .to(my_project:my_dataset.my_table)
           .withSchema(schema)
           .withPrimaryKey(ImmutableList.of("field1", "field2"))
           .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED));
 

If writing a type other than TableRow (e.g. using writeGenericRecords() or writing a custom user type), then the BigQueryIO.Write.withRowMutationInformationFn(org.apache.beam.sdk.transforms.SerializableFunction<T, org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation>) method can be used to set an update type and sequence number for each record. For example:


 PCollection<CdcEvent> cdcEvent = ...;

 cdcEvent.apply(BigQueryIO.write()
          .to("my-project:my_dataset.my_table")
          .withSchema(schema)
          .withPrimaryKey(ImmutableList.of("field1", "field2"))
          .withFormatFunction(CdcEvent::getTableRow)
          .withRowMutationInformationFn(cdc -> RowMutationInformation.of(cdc.getChangeType(),
                                                                         cdc.getSequenceNumber()))
          .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
          .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED));
 

Note that in order to use inserts or deletes, the table must bet set up with a primary key. If the table is not previously created and CREATE_IF_NEEDED is used, a primary key must be specified using BigQueryIO.Write.withPrimaryKey(java.util.List<java.lang.String>).