public class BigQueryIO
extends java.lang.Object
PTransform
s for reading and writing BigQuery tables.
A fully-qualified BigQuery table name consists of three components:
projectId
: the Cloud project id (defaults to GcpOptions.getProject()
).
datasetId
: the BigQuery dataset id, unique within a project.
tableId
: a table id, unique within a dataset.
BigQuery table references are stored as a TableReference
, which comes from the BigQuery Java Client API. Tables
can be referred to as Strings, with or without the projectId
. A helper function is
provided (BigQueryHelpers.parseTableSpec(String)
) that parses the following string forms
into a TableReference
:
project_id
]:[dataset_id
].[table_id
]
dataset_id
].[table_id
]
Tables have rows (TableRow
) and each row has cells (TableCell
). A table has a
schema (TableSchema
), which in turn describes the schema of each cell (TableFieldSchema
). The terms field and cell are used interchangeably.
TableSchema
: describes the schema (types and order) for values in each row. It has one
attribute, 'fields', which is list of TableFieldSchema
objects.
TableFieldSchema
: describes the schema (type, name) for one field. It has several
attributes, including 'name' and 'type'. Common values for the type attribute are: 'STRING',
'INTEGER', 'FLOAT', 'BOOLEAN', 'NUMERIC', 'GEOGRAPHY'. All possible values are described at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
TableRow
: Holds all values in a table row. Has one attribute, 'f', which is a list of
TableCell
instances.
TableCell
: Holds the value for one cell (or field). Has one attribute, 'v', which is
the value of the table cell.
As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with Well-Known Text (See https://en.wikipedia.org/wiki/Well-known_text) format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to be encoded using base64 encoding when writing to BigQuery. When bytes are read from BigQuery they are returned as base64-encoded strings.
Reading from BigQuery is supported by read(SerializableFunction)
, which parses
records in AVRO format
into a custom type (see the table below for type conversion) using a specified parse function,
and by readTableRows()
which parses them into TableRow
, which may be more
convenient but has lower performance.
Both functions support reading either from a table or from the result of a query, via BigQueryIO.TypedRead.from(String)
and BigQueryIO.TypedRead.fromQuery(java.lang.String)
respectively. Exactly one of these must
be specified.
If you are reading from an authorized view wih BigQueryIO.TypedRead.fromQuery(java.lang.String)
, you need to use
BigQueryIO.TypedRead.withQueryLocation(String)
to set the location of the BigQuery job. Otherwise,
Beam will ty to determine that location by reading the metadata of the dataset that contains the
underlying tables. With authorized views, that will result in a 403 error and the query will not
be resolved.
Type Conversion Table
BigQuery standard SQL type | Avro type | Java type |
BOOLEAN | boolean | Boolean |
INT64 | long | Long |
FLOAT64 | double | Double |
BYTES | bytes | java.nio.ByteBuffer |
STRING | string | CharSequence |
DATE | int | Integer |
DATETIME | string | CharSequence |
TIMESTAMP | long | Long |
TIME | long | Long |
NUMERIC | bytes | java.nio.ByteBuffer |
GEOGRAPHY | string | CharSequence |
ARRAY | array | java.util.Collection |
STRUCT | record | org.apache.avro.generic.GenericRecord |
Example: Reading rows of a table as TableRow
.
PCollection<TableRow> weatherData = pipeline.apply(
BigQueryIO.readTableRows().from("apache-beam-testing.samples.weather_stations"));
Example: Reading rows of a table and parsing them into a custom type.
PCollection<WeatherRecord> weatherData = pipeline.apply(
BigQueryIO
.read(new SerializableFunction<SchemaAndRecord, WeatherRecord>() {
public WeatherRecord apply(SchemaAndRecord schemaAndRecord) {
return new WeatherRecord(...);
}
})
.from("apache-beam-testing.samples.weather_stations"))
.withCoder(SerializableCoder.of(WeatherRecord.class));
Note: When using read(SerializableFunction)
, you may sometimes need to use BigQueryIO.TypedRead.withCoder(Coder)
to specify a Coder
for the result type, if Beam fails to
infer it automatically.
Example: Reading results of a query as TableRow
.
PCollection<TableRow> meanTemperatureData = pipeline.apply(BigQueryIO.readTableRows()
.fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
Users can optionally specify a query priority using TypedRead#withQueryPriority(TypedRead.QueryPriority)
and a geographic location where the query
will be executed using BigQueryIO.TypedRead.withQueryLocation(String)
. Query location must be
specified for jobs that are not executed in US or EU, or if you are reading from an authorized
view. See BigQuery
Jobs: query.
To write to a BigQuery table, apply a BigQueryIO.Write
transformation. This consumes a
PCollection
of a user-defined type when using write()
(recommended),
or a PCollection
of TableRows
as input when using writeTableRows()
(not recommended). When using a user-defined type, one of the
following must be provided.
BigQueryIO.Write.withAvroFormatFunction(SerializableFunction)
(recommended) to
write data using avro records.
BigQueryIO.Write.withAvroWriter(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.Schema, org.apache.avro.io.DatumWriter<T>>)
to write avro data using a user-specified DatumWriter
(and format function).
BigQueryIO.Write.withFormatFunction(SerializableFunction)
to write data as json
encoded TableRows
.
BigQueryIO.Write.withAvroFormatFunction(SerializableFunction)
or BigQueryIO.Write.withAvroWriter(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.Schema, org.apache.avro.io.DatumWriter<T>>)
is used, the table schema MUST be specified using one of the
BigQueryIO.Write.withJsonSchema(String)
, BigQueryIO.Write.withJsonSchema(ValueProvider)
, BigQueryIO.Write.withSchemaFromView(PCollectionView)
methods, or BigQueryIO.Write.to(DynamicDestinations)
.
class Quote {
final Instant timestamp;
final String exchange;
final String symbol;
final double price;
Quote(Instant timestamp, String exchange, String symbol, double price) {
// initialize all member variables.
}
}
PCollection<Quote> quotes = ...
quotes.apply(BigQueryIO
.<Quote>write()
.to("my-project:my_dataset.my_table")
.withSchema(new TableSchema().setFields(
ImmutableList.of(
new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"),
new TableFieldSchema().setName("exchange").setType("STRING"),
new TableFieldSchema().setName("symbol").setType("STRING"),
new TableFieldSchema().setName("price").setType("FLOAT"))))
.withFormatFunction(quote -> new TableRow().set(..set the columns..))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
See BigQueryIO.Write
for details on how to specify if a write should append to an
existing table, replace the table, or verify that the table is empty. Note that the dataset being
written to must already exist. Unbounded PCollections can only be written using BigQueryIO.Write.WriteDisposition.WRITE_EMPTY
or BigQueryIO.Write.WriteDisposition.WRITE_APPEND
.
BigQueryIO supports automatically inferring the BigQuery table schema from the Beam schema on the input PCollection. Beam can also automatically format the input into a TableRow in this case, if no format function is provide. In the above example, the quotes PCollection has a schema that Beam infers from the Quote POJO. So the write could be done more simply as follows:
{@literal @}DefaultSchema(JavaFieldSchema.class)
class Quote {
final Instant timestamp;
final String exchange;
final String symbol;
final double price;
{@literal @}SchemaCreate
Quote(Instant timestamp, String exchange, String symbol, double price) {
// initialize all member variables.
}
}
PCollection<Quote> quotes = ...
quotes.apply(BigQueryIO
.<Quote>write()
.to("my-project:my_dataset.my_table")
.useBeamSchema()
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
To load historical data into a time-partitioned BigQuery table, specify BigQueryIO.Write.withTimePartitioning(com.google.api.services.bigquery.model.TimePartitioning)
with a field
used for column-based
partitioning. For example:
PCollection<Quote> quotes = ...;
quotes.apply(BigQueryIO.write()
.withSchema(schema)
.withFormatFunction(quote -> new TableRow()
.set("timestamp", quote.getTimestamp())
.set(..other columns..))
.to("my-project:my_dataset.my_table")
.withTimePartitioning(new TimePartitioning().setField("time")));
A common use case is to dynamically generate BigQuery table names based on the current value.
To support this, BigQueryIO.Write.to(SerializableFunction)
accepts a function mapping the
current element to a tablespec. For example, here's code that outputs quotes of different stocks
to different tables:
PCollection<Quote> quotes = ...;
quotes.apply(BigQueryIO.write()
.withSchema(schema)
.withFormatFunction(quote -> new TableRow()...)
.to((ValueInSingleWindow<Quote> quote) -> {
String symbol = quote.getSymbol();
return new TableDestination(
"my-project:my_dataset.quotes_" + symbol, // Table spec
"Quotes of stock " + symbol // Table description
);
});
Per-table schemas can also be provided using BigQueryIO.Write.withSchemaFromView(org.apache.beam.sdk.values.PCollectionView<java.util.Map<java.lang.String, java.lang.String>>)
. This
allows you the schemas to be calculated based on a previous pipeline stage or statically via a
Create
transform. This method expects to receive a
map-valued PCollectionView
, mapping table specifications (project:dataset.table-id), to
JSON formatted TableSchema
objects. All destination tables must be present in this map,
or the pipeline will fail to create tables. Care should be taken if the map value is based on a
triggered aggregation over and unbounded PCollection
; the side input will contain the
entire history of all table schemas ever generated, which might blow up memory usage. This method
can also be useful when writing to a single table, as it allows a previous stage to calculate the
schema (possibly based on the full collection of records being written to BigQuery).
For the most general form of dynamic table destinations and schemas, look at BigQueryIO.Write.to(DynamicDestinations)
.
BigQueryIO.Write
supports two methods of inserting data into BigQuery specified using
BigQueryIO.Write.withMethod(org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method)
. If no method is supplied, then a default method will be
chosen based on the input PCollection. See BigQueryIO.Write.Method
for more information
about the methods. The different insertion methods provide different tradeoffs of cost, quota,
and data consistency; please see BigQuery documentation for more information about these
tradeoffs.
When using read()
or readTableRows()
in a template, it's required to specify
BigQueryIO.Read.withTemplateCompatibility()
. Specifying this in a non-template pipeline is not
recommended because it has somewhat lower performance.
When using write()
or writeTableRows()
with batch loads in a template, it is
recommended to specify BigQueryIO.Write.withCustomGcsTempLocation(org.apache.beam.sdk.options.ValueProvider<java.lang.String>)
. Writing to BigQuery via batch
loads involves writing temporary files to this location, so the location must be accessible at
pipeline execution time. By default, this location is captured at pipeline construction
time, may be inaccessible if the template may be reused from a different project or at a moment
when the original location no longer exists. BigQueryIO.Write.withCustomGcsTempLocation(ValueProvider)
allows specifying the location as an argument to
the template invocation.
Permission requirements depend on the PipelineRunner
that is used to execute the
pipeline. Please refer to the documentation of corresponding PipelineRunner
s for more
details.
Please see BigQuery Access Control for security and permission related information specific to BigQuery.
- Only the STORAGE_WRITE_API_AT_LEAST_ONCE method is supported.
- If the table is not previously created and CREATE_IF_NEEDED is used, a primary key must be
specified using BigQueryIO.Write.withPrimaryKey(java.util.List<java.lang.String>)
.
Two types of updates are supported. UPSERT replaces the row with the matching primary key or inserts the row if non exists. DELETE removes the row with the matching primary key. Row inserts are still allowed as normal using a separate instance of the sink, however care must be taken not to violate primary key uniqueness constraints, as those constraints are not enforced by BigQuery. If a table contains multiple rows with the same primary key, then row updates may not work as expected.
Since PCollections are unordered, in order to properly sequence updates a sequence number must be set on each update. BigQuery uses this sequence number to ensure that updates are correctly applied to the table even if they arrive out of order.
The simplest way to apply row updates if applying TableRow
object is to use the applyRowMutations()
method. Each RowMutation
element contains a TableRow
, an
update type (UPSERT or DELETE), and a sequence number to order the updates.
PCollection<TableRow> rows = ...;
row.apply(MapElements
.into(new TypeDescriptor<RowMutation>(){})
.via(tableRow -> RowMutation.of(tableRow, getUpdateType(tableRow), getSequenceNumber(tableRow))))
.apply(BigQueryIO.applyRowMutations()
.to(my_project:my_dataset.my_table)
.withSchema(schema)
.withPrimaryKey(ImmutableList.of("field1", "field2"))
.withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED));
If writing a type other than TableRow (e.g. using writeGenericRecords()
or
writing a custom user type), then the BigQueryIO.Write.withRowMutationInformationFn(org.apache.beam.sdk.transforms.SerializableFunction<T, org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation>)
method can be
used to set an update type and sequence number for each record. For example:
PCollection<CdcEvent> cdcEvent = ...;
cdcEvent.apply(BigQueryIO.write()
.to("my-project:my_dataset.my_table")
.withSchema(schema)
.withPrimaryKey(ImmutableList.of("field1", "field2"))
.withFormatFunction(CdcEvent::getTableRow)
.withRowMutationInformationFn(cdc -> RowMutationInformation.of(cdc.getChangeType(),
cdc.getSequenceNumber()))
.withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
.withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED));
Note that in order to use inserts or deletes, the table must bet set up with a primary key. If
the table is not previously created and CREATE_IF_NEEDED is used, a primary key must be specified
using BigQueryIO.Write.withPrimaryKey(java.util.List<java.lang.String>)
.
Modifier and Type | Class and Description |
---|---|
static class |
BigQueryIO.Read
Implementation of
read() . |
static class |
BigQueryIO.TypedRead<T>
Implementation of
read(SerializableFunction) . |
static class |
BigQueryIO.Write<T>
Implementation of
write() . |
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
BIGQUERY_JOB_TEMPLATE
Template for BigQuery jobs created by BigQueryIO.
|
Modifier and Type | Method and Description |
---|---|
static BigQueryIO.Write<RowMutation> |
applyRowMutations()
Write
RowMutation messages to BigQuery. |
static BigQueryIO.Read |
read()
Deprecated.
Use
read(SerializableFunction) or readTableRows() instead. readTableRows() does exactly the same as read() , however read(SerializableFunction) performs better. |
static <T> BigQueryIO.TypedRead<T> |
read(SerializableFunction<SchemaAndRecord,T> parseFn)
Reads from a BigQuery table or query and returns a
PCollection with one element per
each row of the table or query result, parsed from the BigQuery AVRO format using the specified
function. |
static BigQueryIO.TypedRead<TableRow> |
readTableRows()
Like
read(SerializableFunction) but represents each row as a TableRow . |
static BigQueryIO.TypedRead<TableRow> |
readTableRowsWithSchema()
Like
readTableRows() but with Schema support. |
static <T> BigQueryIO.TypedRead<T> |
readWithDatumReader(AvroSource.DatumReaderFactory<T> readerFactory)
Reads from a BigQuery table or query and returns a
PCollection with one element per
each row of the table or query result. |
static <T> BigQueryIO.Write<T> |
write()
A
PTransform that writes a PCollection to a BigQuery table. |
static BigQueryIO.Write<GenericRecord> |
writeGenericRecords()
|
static <T extends com.google.protobuf.Message> |
writeProtos(java.lang.Class<T> protoMessageClass)
A
PTransform that writes a PCollection containing protocol buffer objects to a
BigQuery table. |
static BigQueryIO.Write<TableRow> |
writeTableRows()
|
public static final java.lang.String BIGQUERY_JOB_TEMPLATE
"beam_bq_job_{TYPE}_{JOB_ID}_{STEP}_{RANDOM}"
, where:
TYPE
represents the BigQuery job type (e.g. extract / copy / load / query)
JOB_ID
is the Beam job name.
STEP
is a UUID representing the Dataflow step that created the BQ job.
RANDOM
is a random string.
NOTE: This job name template does not have backwards compatibility guarantees.
@Deprecated public static BigQueryIO.Read read()
read(SerializableFunction)
or readTableRows()
instead. readTableRows()
does exactly the same as read()
, however read(SerializableFunction)
performs better.public static BigQueryIO.TypedRead<TableRow> readTableRows()
read(SerializableFunction)
but represents each row as a TableRow
.
This method is more convenient to use in some cases, but usually has significantly lower
performance than using read(SerializableFunction)
directly to parse data into a
domain-specific type, due to the overhead of converting the rows to TableRow
.
public static BigQueryIO.TypedRead<TableRow> readTableRowsWithSchema()
readTableRows()
but with Schema
support.public static <T> BigQueryIO.TypedRead<T> read(SerializableFunction<SchemaAndRecord,T> parseFn)
PCollection
with one element per
each row of the table or query result, parsed from the BigQuery AVRO format using the specified
function.
Each SchemaAndRecord
contains a BigQuery TableSchema
and a GenericRecord
representing the row, indexed by column name. Here is a sample parse function
that parses click events from a table.
class ClickEvent { long userId; String url; ... }
p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, ClickEvent>() {
public ClickEvent apply(SchemaAndRecord record) {
GenericRecord r = record.getRecord();
return new ClickEvent((Long) r.get("userId"), (String) r.get("url"));
}
}).from("...");
public static <T> BigQueryIO.TypedRead<T> readWithDatumReader(AvroSource.DatumReaderFactory<T> readerFactory)
PCollection
with one element per
each row of the table or query result. This API directly deserializes BigQuery AVRO data to the
input class, based on the appropriate DatumReader
.
class ClickEvent { long userId; String url; ... }
p.apply(BigQueryIO.read(ClickEvent.class)).from("...")
.read((AvroSource.DatumReaderFactory<ClickEvent>) (writer, reader) -> new ReflectDatumReader<>(ReflectData.get().getSchema(ClickEvent.class)));
public static <T> BigQueryIO.Write<T> write()
PTransform
that writes a PCollection
to a BigQuery table. A formatting
function must be provided to convert each input element into a TableRow
using BigQueryIO.Write.withFormatFunction(SerializableFunction)
.
In BigQuery, each table has an enclosing dataset. The dataset being written must already exist.
By default, tables will be created if they do not exist, which corresponds to a BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED
disposition that matches the default of BigQuery's
Jobs API. A schema must be provided (via BigQueryIO.Write.withSchema(TableSchema)
), or else the
transform may fail at runtime with an IllegalArgumentException
. When updating a
pipeline with a new schema, the existing schmea fields must stay in the same order, or the
pipeline will break.
By default, writes require an empty table, which corresponds to a BigQueryIO.Write.WriteDisposition.WRITE_EMPTY
disposition that matches the default of BigQuery's Jobs
API.
Here is a sample transform that produces TableRow values containing "word" and "count" columns:
static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> {
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
.set("word", c.element().getKey())
.set("count", c.element().getValue().intValue());
c.output(row);
}
}
public static BigQueryIO.Write<TableRow> writeTableRows()
PTransform
that writes a PCollection
containing TableRows
to
a BigQuery table.
It is recommended to instead use write()
with BigQueryIO.Write.withFormatFunction(SerializableFunction)
.
public static BigQueryIO.Write<RowMutation> applyRowMutations()
RowMutation
messages to BigQuery. Each update contains a TableRow
along
with information on how to apply the update. This is a convenience method - BigQueryIO.Write.withRowMutationInformationFn(org.apache.beam.sdk.transforms.SerializableFunction<T, org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation>)
can be called directly instead to tell the sink how to
apply row updates; directly calling BigQueryIO.Write.withRowMutationInformationFn(org.apache.beam.sdk.transforms.SerializableFunction<T, org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation>)
is preferred
when writing non TableRows types (e.g. writeGenericRecords()
or a custom user type).
This is supported when using the BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE
insert
method, and with either BigQueryIO.Write.CreateDisposition.CREATE_NEVER
or BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED
. For CREATE_IF_NEEDED, a primary key must be
specified using BigQueryIO.Write.withPrimaryKey(java.util.List<java.lang.String>)
.
public static BigQueryIO.Write<GenericRecord> writeGenericRecords()
public static <T extends com.google.protobuf.Message> BigQueryIO.Write<T> writeProtos(java.lang.Class<T> protoMessageClass)
PTransform
that writes a PCollection
containing protocol buffer objects to a
BigQuery table. If using one of the storage-api write methods, these protocol buffers must
match the schema of the table.
If a Schema is provided using BigQueryIO.Write.withSchema(com.google.api.services.bigquery.model.TableSchema)
, that schema will be used for
creating the table if necessary. If no schema is provided, one will be inferred from the
protocol buffer's descriptor. Note that inferring a schema from the protocol buffer may not
always provide the intended schema as multiple BigQuery types can map to the same protocol
buffer type. For example, a protocol buffer field of type INT64 may be an INT64 BigQuery type,
but it might also represent a TIME, DATETIME, or a TIMESTAMP type.