Built-in I/O Transforms

Google BigQuery I/O connector

The Beam SDKs include built-in transforms that can read data from and write data to Google BigQuery tables.

Before you start

To use BigQueryIO, add the Maven artifact dependency to your pom.xml file.

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>2.16.0</version>
</dependency>

Additional resources:

To use BigQueryIO, you must install the Google Cloud Platform dependencies by running pip install apache-beam[gcp].

Additional resources:

BigQuery basics

Table names

To read or write from a BigQuery table, you must provide a fully-qualified BigQuery table name (for example, bigquery-public-data:github_repos.sample_contents). A fully-qualified BigQuery table name consists of three parts:

A table name can also include a table decorator if you are using time-partitioned tables.

To specify a BigQuery table, you can use either the table’s fully-qualified name as a string, or use a TableReference TableReference object.

Using a string

To specify a table with a string, use the format [project_id]:[dataset_id].[table_id] to specify the fully-qualified BigQuery table name.

String tableSpec = "clouddataflow-readonly:samples.weather_stations";
# project-id:dataset_id.table_id
table_spec = 'clouddataflow-readonly:samples.weather_stations'

You can also omit project_id and use the [dataset_id].[table_id] format. If you omit the project ID, Beam uses the default project ID from your pipeline options. pipeline options.

String tableSpec = "samples.weather_stations";
# dataset_id.table_id
table_spec = 'samples.weather_stations'

Using a TableReference

To specify a table with a TableReference, create a new TableReference using the three parts of the BigQuery table name.

TableReference tableSpec =
    new TableReference()
        .setProjectId("clouddataflow-readonly")
        .setDatasetId("samples")
        .setTableId("weather_stations");
from apache_beam.io.gcp.internal.clients import bigquery

table_spec = bigquery.TableReference(
    projectId='clouddataflow-readonly',
    datasetId='samples',
    tableId='weather_stations')

The Beam SDK for Java also provides the parseTableSpec helper method, which constructs a TableReference object from a String that contains the fully-qualified BigQuery table name. However, the static factory methods for BigQueryIO transforms accept the table name as a String and construct a TableReference object for you.

Table rows

BigQueryIO read and write transforms produce and consume data as a PCollection of dictionaries, where each element in the PCollection represents a single row in the table.

Schemas

When writing to BigQuery, you must supply a table schema for the destination table that you want to write to, unless you specify a create disposition of CREATE_NEVER. Creating a table schema covers schemas in more detail.

Data types

BigQuery supports the following data types: STRING, BYTES, INTEGER, FLOAT, NUMERIC, BOOLEAN, TIMESTAMP, DATE, TIME, DATETIME and GEOGRAPHY. All possible values are described at https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types. BigQueryIO allows you to use all of these data types. The following example shows the correct format for data types used when reading from and writing to BigQuery:

TableRow row = new TableRow();
row.set("string", "abc");
byte[] rawbytes = {(byte) 0xab, (byte) 0xac};
row.set("bytes", new String(Base64.getEncoder().encodeToString(rawbytes)));
row.set("integer", 5);
row.set("float", 0.5);
row.set("numeric", 5);
row.set("boolean", true);
row.set("timestamp", "2018-12-31 12:44:31.744957 UTC");
row.set("date", "2018-12-31");
row.set("time", "12:44:31");
row.set("datetime", "2019-06-11T14:44:31");
row.set("geography", "POINT(30 10)");
bigquery_data = [{
    'string': 'abc',
    'bytes': base64.b64encode(b'\xab\xac'),
    'integer': 5,
    'float': 0.5,
    'numeric': Decimal('5'),
    'boolean': True,
    'timestamp': '2018-12-31 12:44:31.744957 UTC',
    'date': '2018-12-31',
    'time': '12:44:31',
    'datetime': '2018-12-31T12:44:31',
    'geography': 'POINT(30 10)'
}]

As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with Well-Known Text (See https://en.wikipedia.org/wiki/Well-known_text format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to be encoded using base64 encoding when writing to BigQuery. When bytes are read from BigQuery they are returned as base64-encoded strings.

As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with Well-Known Text (See https://en.wikipedia.org/wiki/Well-known_text format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to be encoded using base64 encoding when writing to BigQuery. When bytes are read from BigQuery they are returned as base64-encoded bytes.

Reading from BigQuery

BigQueryIO allows you to read from a BigQuery table, or read the results of an arbitrary SQL query string. By default, Beam invokes a BigQuery export request when you apply a BigQueryIO read transform. However, the Beam SDK for Java (version 2.11.0 and later) adds support for the beta release of the BigQuery Storage API as an experimental feature. See Using the BigQuery Storage API for more information and a list of limitations.

Beam’s use of BigQuery APIs is subject to BigQuery’s Quota and Pricing policies.

The Beam SDK for Java has two BigQueryIO read methods. Both of these methods allow you to read from a table, or read fields using a query string.

  1. read(SerializableFunction) reads Avro-formatted records and uses a specified parsing function to parse them into a PCollection of custom typed objects. Each element in the PCollection represents a single row in the table. The example code for reading with a query string shows how to use read(SerializableFunction).

  2. readTableRows returns a PCollection of BigQuery TableRow objects. Each element in the PCollection represents a single row in the table. Integer values in the TableRow objects are encoded as strings to match BigQuery’s exported JSON format. This method is convenient, but can be 2-3 times slower in performance compared to read(SerializableFunction). The example code for reading from a table shows how to use readTableRows.

Note: BigQueryIO.read() is deprecated as of Beam SDK 2.2.0. Instead, use read(SerializableFunction<SchemaAndRecord, T>) to parse BigQuery rows from Avro GenericRecord into your custom type, or use readTableRows() to parse them into JSON TableRow objects.

To read from a BigQuery table using the Beam SDK for Python, apply a Read transform on a BigQuerySource. Read returns a PCollection of dictionaries, where each element in the PCollection represents a single row in the table. Integer values in the TableRow objects are encoded as strings to match BigQuery’s exported JSON format.

Reading from a table

To read an entire BigQuery table, use the from method with a BigQuery table name. This example uses readTableRows.

To read an entire BigQuery table, use the table parameter with the BigQuery table name.

The following code reads an entire table that contains weather station data and then extracts the max_temperature column.

PCollection<Double> maxTemperatures =
    p.apply(BigQueryIO.readTableRows().from(tableSpec))
        // Each row is of type TableRow
        .apply(
            MapElements.into(TypeDescriptors.doubles())
                .via((TableRow row) -> (Double) row.get("max_temperature")));
max_temperatures = (
    p
    | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(table_spec))
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

Reading with a query string

If you don’t want to read an entire table, you can supply a query string with the fromQuery method. This example uses read(SerializableFunction).

If you don’t want to read an entire table, you can supply a query string to BigQuerySource by specifying the query parameter.

The following code uses a SQL query to only read the max_temperature column.

PCollection<Double> maxTemperatures =
    p.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
            .fromQuery(
                "SELECT max_temperature FROM [clouddataflow-readonly:samples.weather_stations]")
            .withCoder(DoubleCoder.of()));
max_temperatures = (
    p
    | 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource(
        query='SELECT max_temperature FROM '\
              '[clouddataflow-readonly:samples.weather_stations]'))
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

You can also use BigQuery’s standard SQL dialect with a query string, as shown in the following example:

PCollection<Double> maxTemperatures =
    p.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
            .fromQuery(
                "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
            .usingStandardSql()
            .withCoder(DoubleCoder.of()));
max_temperatures = (
    p
    | 'QueryTableStdSQL' >> beam.io.Read(beam.io.BigQuerySource(
        query='SELECT max_temperature FROM '\
              '`clouddataflow-readonly.samples.weather_stations`',
        use_standard_sql=True))
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

Using the BigQuery Storage API

The BigQuery Storage API allows you to directly access tables in BigQuery storage. As a result, your pipeline can read from BigQuery storage faster than previously possible.

The Beam SDK for Java (version 2.11.0 and later) adds support for the beta release of the BigQuery Storage API as an experimental feature. Beam’s support for the BigQuery Storage API has the following limitations:

Because this is currently a Beam experimental feature, export based reads are recommended for production jobs.

Enabling the API

The BigQuery Storage API is distinct from the existing BigQuery API. You must enable the BigQuery Storage API for your Google Cloud Platform project.

Updating your code

Use the following methods when you read from a table:

The following code snippet reads from a table. This example is from the BigQueryTornadoes example. When the example’s read method option is set to DIRECT_READ, the pipeline uses the BigQuery Storage API and column projection to read public samples of weather data from a BigQuery table. You can view the full source code on GitHub.

   TableReadOptions tableReadOptions =
       TableReadOptions.newBuilder()
           .addAllSelectedFields(Lists.newArrayList("month", "tornado"))
           .build();

   rowsFromBigQuery =
       p.apply(
            BigQueryIO.readTableRows()
               .from(options.getInput())
               .withMethod(Method.DIRECT_READ)
               .withReadOptions(tableReadOptions));
# The SDK for Python does not support the BigQuery Storage API.

The following code snippet reads with a query string.

// Snippet not yet available (BEAM-7034).
# The SDK for Python does not support the BigQuery Storage API.

Writing to BigQuery

BigQueryIO allows you to write to BigQuery tables. If you are using the Beam SDK for Java, you can also write different rows to different tables.

BigQueryIO write transforms use APIs that are subject to BigQuery’s Quota and Pricing policies.

When you apply a write transform, you must provide the following information for the destination table(s):

In addition, if your write operation creates a new BigQuery table, you must also supply a table schema for the destination table.

Create disposition

The create disposition controls whether or not your BigQuery write operation should create a table if the destination table does not exist.

Use .withCreateDisposition to specify the create disposition. Valid enum values are:

Use the create_disposition parameter to specify the create disposition. Valid enum values are:

If you specify CREATE_IF_NEEDED as the create disposition and you don’t supply a table schema, the transform might fail at runtime if the destination table does not exist.

Write disposition

The write disposition controls how your BigQuery write operation applies to an existing table.

Use .withWriteDisposition to specify the write disposition. Valid enum values are:

Use the write_disposition parameter to specify the write disposition. Valid enum values are:

When you use WRITE_EMPTY, the check for whether or not the destination table is empty can occur before the actual write operation. This check doesn’t guarantee that your pipeline will have exclusive access to the table. Two concurrent pipelines that write to the same output table with a write disposition of WRITE_EMPTY might start successfully, but both pipelines can fail later when the write attempts happen.

Creating a table schema

If your BigQuery write operation creates a new table, you must provide schema information. The schema contains information about each field in the table.

To create a table schema in Java, you can either use a TableSchema object, or use a string that contains a JSON-serialized TableSchema object.

To create a table schema in Python, you can either use a TableSchema object, or use a string that defines a list of fields. Single string based schemas do not support nested fields, repeated fields, or specifying a BigQuery mode for fields (the mode will always be set to NULLABLE).

Using a TableSchema

To create and use a table schema as a TableSchema object, follow these steps.

  1. Create a list of TableFieldSchema objects. Each TableFieldSchema object represents a field in the table.

  2. Create a TableSchema object and use the setFields method to specify your list of fields.

  3. Use the withSchema method to provide your table schema when you apply a write transform.

  1. Create a TableSchema object.

  2. Create and append a TableFieldSchema object for each field in your table.

  3. Next, use the schema parameter to provide your table schema when you apply a write transform. Set the parameter’s value to the TableSchema object.

The following example code shows how to create a TableSchema for a table with two fields (source and quote) of type string.

TableSchema tableSchema =
    new TableSchema()
        .setFields(
            ImmutableList.of(
                new TableFieldSchema()
                    .setName("source")
                    .setType("STRING")
                    .setMode("NULLABLE"),
                new TableFieldSchema()
                    .setName("quote")
                    .setType("STRING")
                    .setMode("REQUIRED")));
table_schema = {'fields': [
    {'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'}
]}

Using a string

To create and use a table schema as a string that contains JSON-serialized TableSchema object, follow these steps.

  1. Create a string that contains a JSON-serialized TableSchema object.

  2. Use the withJsonSchema method to provide your table schema when you apply a write transform.

To create and use a table schema as a string, follow these steps.

  1. Create a single comma separated string of the form “field1:type1,field2:type2,field3:type3” that defines a list of fields. The type should specify the field’s BigQuery type.

  2. Use the schema parameter to provide your table schema when you apply a write transform. Set the parameter’s value to the string.

The following example shows how to use a string to specify the same table schema as the previous example.

String tableSchemaJson =
    ""
        + "{"
        + "  \"fields\": ["
        + "    {"
        + "      \"name\": \"source\","
        + "      \"type\": \"STRING\","
        + "      \"mode\": \"NULLABLE\""
        + "    },"
        + "    {"
        + "      \"name\": \"quote\","
        + "      \"type\": \"STRING\","
        + "      \"mode\": \"REQUIRED\""
        + "    }"
        + "  ]"
        + "}";
# column_name:BIGQUERY_TYPE, ...
table_schema = 'source:STRING, quote:STRING'

Setting the insertion method

The Beam SDK for Python does not currently support specifying the insertion method.

BigQueryIO supports two methods of inserting data into BigQuery: load jobs and streaming inserts. Each insertion method provides different tradeoffs of cost, quota, and data consistency. See the BigQuery documentation for load jobs and streaming inserts for more information about these tradeoffs.

BigQueryIO chooses a default insertion method based on the input PCollection.

BigQueryIO uses load jobs when you apply a BigQueryIO write transform to a bounded PCollection.

BigQueryIO uses load jobs in the following situations:

BigQueryIO uses streaming inserts when you apply a BigQueryIO write transform to an unbounded PCollection.

BigQueryIO uses streaming inserts in the following situations:

You can use withMethod to specify the desired insertion method. See Write.Method for the list of the available methods and their restrictions.

Note: If you use batch loads in a streaming pipeline, you must use withTriggeringFrequency to specify a triggering frequency and withNumFileShards to specify number of file shards written.

Writing to a table

To write to a BigQuery table, apply either a writeTableRows or write transform.

To write to a BigQuery table, apply the WriteToBigQuery transform. WriteToBigQuery supports both batch mode and streaming mode. You must apply the transform to a PCollection of dictionaries. In general, you’ll need to use another transform, such as ParDo, to format your output data into a collection.

The following examples use this PCollection that contains quotes.

/*
@DefaultCoder(AvroCoder.class)
static class Quote {
  final String source;
  final String quote;

  public Quote() {
    this.source = "";
    this.quote = "";
  }
  public Quote(String source, String quote) {
    this.source = source;
    this.quote = quote;
  }
}
*/

PCollection<Quote> quotes =
    p.apply(
        Create.of(
            new Quote("Mahatma Gandhi", "My life is my message."),
            new Quote("Yoda", "Do, or do not. There is no 'try'.")));
quotes = p | beam.Create([
    {'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
    {'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
])

The writeTableRows method writes a PCollection of BigQuery TableRow objects to a BigQuery table. Each element in the PCollection represents a single row in the table. This example uses writeTableRows to write quotes to a PCollection<TableRow>. The write operation creates a table if needed; if the table already exists, it will be replaced.

The following example code shows how to apply a WriteToBigQuery transform to write a PCollection of dictionaries to a BigQuery table. The write operation creates a table if needed; if the table already exists, it will be replaced.

quotes
    .apply(
        MapElements.into(TypeDescriptor.of(TableRow.class))
            .via(
                (Quote elem) ->
                    new TableRow().set("source", elem.source).set("quote", elem.quote)))
    .apply(
        BigQueryIO.writeTableRows()
            .to(tableSpec)
            .withSchema(tableSchema)
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
quotes | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

The write transform writes a PCollection of custom typed objects to a BigQuery table. Use .withFormatFunction(SerializableFunction) to provide a formatting function that converts each input element in the PCollection into a TableRow. This example uses write to write a PCollection<String>. The write operation creates a table if needed; if the table already exists, it will be replaced.

quotes.apply(
    BigQueryIO.<Quote>write()
        .to(tableSpec)
        .withSchema(tableSchema)
        .withFormatFunction(
            (Quote elem) ->
                new TableRow().set("source", elem.source).set("quote", elem.quote))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

When you use streaming inserts, you can decide what to do with failed records. You can either keep retrying, or return the failed records in a separate PCollection using the WriteResult.getFailedInserts() method.

Using dynamic destinations

The Beam SDK for Python does not currently support dynamic destinations.

You can use the dynamic destinations feature to write elements in a PCollection to different BigQuery tables, possibly with different schemas.

The dynamic destinations feature groups your user type by a user-defined destination key, uses the key to compute a destination table and/or schema, and writes each group’s elements to the computed destination.

In addition, you can also write your own types that have a mapping function to TableRow, and you can use side inputs in all DynamicDestinations methods.

To use dynamic destinations, you must create a DynamicDestinations object and implement the following methods:

Then, use write().to with your DynamicDestinations object. This example uses a PCollection that contains weather data and writes the data into a different table for each year.

/*
@DefaultCoder(AvroCoder.class)
static class WeatherData {
  final long year;
  final long month;
  final long day;
  final double maxTemp;

  public WeatherData() {
    this.year = 0;
    this.month = 0;
    this.day = 0;
    this.maxTemp = 0.0f;
  }
  public WeatherData(long year, long month, long day, double maxTemp) {
    this.year = year;
    this.month = month;
    this.day = day;
    this.maxTemp = maxTemp;
  }
}
*/

PCollection<WeatherData> weatherData =
    p.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> {
                  GenericRecord record = elem.getRecord();
                  return new WeatherData(
                      (Long) record.get("year"),
                      (Long) record.get("month"),
                      (Long) record.get("day"),
                      (Double) record.get("max_temperature"));
                })
            .fromQuery(
                "SELECT year, month, day, max_temperature "
                    + "FROM [clouddataflow-readonly:samples.weather_stations] "
                    + "WHERE year BETWEEN 2007 AND 2009")
            .withCoder(AvroCoder.of(WeatherData.class)));

// We will send the weather data into different tables for every year.
weatherData.apply(
    BigQueryIO.<WeatherData>write()
        .to(
            new DynamicDestinations<WeatherData, Long>() {
              @Override
              public Long getDestination(ValueInSingleWindow<WeatherData> elem) {
                return elem.getValue().year;
              }

              @Override
              public TableDestination getTable(Long destination) {
                return new TableDestination(
                    new TableReference()
                        .setProjectId(writeProject)
                        .setDatasetId(writeDataset)
                        .setTableId(writeTable + "_" + destination),
                    "Table for year " + destination);
              }

              @Override
              public TableSchema getSchema(Long destination) {
                return new TableSchema()
                    .setFields(
                        ImmutableList.of(
                            new TableFieldSchema()
                                .setName("year")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("month")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("day")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("maxTemp")
                                .setType("FLOAT")
                                .setMode("NULLABLE")));
              }
            })
        .withFormatFunction(
            (WeatherData elem) ->
                new TableRow()
                    .set("year", elem.year)
                    .set("month", elem.month)
                    .set("day", elem.day)
                    .set("maxTemp", elem.maxTemp))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
# The Beam SDK for Python does not currently support dynamic destinations.

Using time partitioning

The Beam SDK for Python does not currently support time partitioning.

BigQuery time partitioning divides your table into smaller partitions, which is called a partitioned table. Partitioned tables make it easier for you to manage and query your data.

To use BigQuery time partitioning, use one of these two methods:

This example generates one partition per day.

weatherData.apply(
    BigQueryIO.<WeatherData>write()
        .to(tableSpec + "_partitioning")
        .withSchema(tableSchema)
        .withFormatFunction(
            (WeatherData elem) ->
                new TableRow()
                    .set("year", elem.year)
                    .set("month", elem.month)
                    .set("day", elem.day)
                    .set("maxTemp", elem.maxTemp))
        // NOTE: an existing table without time partitioning set up will not work
        .withTimePartitioning(new TimePartitioning().setType("DAY"))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
# The Beam SDK for Python does not currently support time partitioning.

Limitations

BigQueryIO currently has the following limitations.

  1. You can’t sequence the completion of a BigQuery write with other steps of your pipeline.

  2. If you are using the Beam SDK for Python, you might have import size quota issues if you write a very large dataset. As a workaround, you can partition the dataset (for example, using Beam’s Partition transform) and write to multiple BigQuery tables. The Beam SDK for Java does not have this limitation as it partitions your dataset for you.

Additional examples

You can find additional examples that use BigQuery in Beam’s examples directories.

Java cookbook examples

These examples are from the Java cookbook examples directory.

Java complete examples

These examples are from the Java complete examples directory.

Python cookbook examples

These examples are from the Python cookbook examples directory.