public class BigQueryIO
extends java.lang.Object
PTransform
s for reading and writing
BigQuery tables.
A fully-qualified BigQuery table name consists of three components:
projectId
: the Cloud project id (defaults to
GcpOptions.getProject()
).
datasetId
: the BigQuery dataset id, unique within a project.
tableId
: a table id, unique within a dataset.
BigQuery table references are stored as a TableReference
, which comes from the BigQuery Java Client API. Tables
can be referred to as Strings, with or without the projectId
. A helper function is
provided (BigQueryHelpers.parseTableSpec(String)
) that parses the following string forms
into a TableReference
:
project_id
]:[dataset_id
].[table_id
]
dataset_id
].[table_id
]
To read from a BigQuery table, apply a BigQueryIO.Read
transformation.
This produces a PCollection
of TableRows
as output:
PCollection<TableRow> weatherData = pipeline.apply(
BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations"));
See TableRow
for more information on the TableRow
object.
Users may provide a query to read from rather than reading all of a BigQuery table. If specified, the result obtained by executing the specified query will be used as the data of the input transform.
PCollection<TableRow> meanTemperatureData = pipeline.apply(
BigQueryIO.read().fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
When creating a BigQuery input transform, users should provide either a query or a table. Pipeline construction will fail with a validation error if neither or both are specified.
To write to a BigQuery table, apply a BigQueryIO.Write
transformation.
This consumes either a PCollection
of TableRows
as input when using
writeTableRows()
or of a user-defined type when using
write()
. When using a user-defined type, a function must be provided to
turn this type into a TableRow
using
BigQueryIO.Write.withFormatFunction(SerializableFunction)
.
PCollection<TableRow> quotes = ...
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("source").setType("STRING"));
fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
quotes.apply(BigQueryIO.writeTableRows()
.to("my-project:output.output_table")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
See BigQueryIO.Write
for details on how to specify if a write should append to an
existing table, replace the table, or verify that the table is empty. Note that the dataset being
written to must already exist. Unbounded PCollections can only be written using BigQueryIO.Write.WriteDisposition.WRITE_EMPTY
or BigQueryIO.Write.WriteDisposition.WRITE_APPEND
.
A common use case is to dynamically generate BigQuery table names based on the current window
or the current value. To support this, BigQueryIO.Write.to(SerializableFunction)
accepts
a function mapping the current element to a tablespec. For example, here's code that outputs
daily tables to BigQuery:
PCollection<TableRow> quotes = ...
quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
.apply(BigQueryIO.writeTableRows()
.withSchema(schema)
.to(new SerializableFunction<ValueInSingleWindow, String>() {
public String apply(ValueInSingleWindow value) {
// The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
.withZone(DateTimeZone.UTC)
.print(((IntervalWindow) value.getWindow()).start());
return "my-project:output.output_table_" + dayString;
}
}));
Note that this also allows the table to be a function of the element as well as the current
pane, in the case of triggered windows. In this case it might be convenient to call write()
directly instead of using the writeTableRows()
helper.
This will allow the mapping function to access the element of the user-defined type. In this
case, a formatting function must be specified using BigQueryIO.Write.withFormatFunction(org.apache.beam.sdk.transforms.SerializableFunction<T, com.google.api.services.bigquery.model.TableRow>)
to convert each element into a TableRow
object.
Per-table schemas can also be provided using BigQueryIO.Write.withSchemaFromView(org.apache.beam.sdk.values.PCollectionView<java.util.Map<java.lang.String, java.lang.String>>)
. This
allows you the schemas to be calculated based on a previous pipeline stage or statically via a
Create
transform. This method expects to receive a
map-valued PCollectionView
, mapping table specifications (project:dataset.table-id), to
JSON formatted TableSchema
objects. All destination tables must be present in this map,
or the pipeline will fail to create tables. Care should be taken if the map value is based on a
triggered aggregation over and unbounded PCollection
; the side input will contain the
entire history of all table schemas ever generated, which might blow up memory usage. This method
can also be useful when writing to a single table, as it allows a previous stage to calculate the
schema (possibly based on the full collection of records being written to BigQuery).
For the most general form of dynamic table destinations and schemas, look at
BigQueryIO.Write.to(DynamicDestinations)
.
Permission requirements depend on the PipelineRunner
that is used to execute the
pipeline. Please refer to the documentation of corresponding PipelineRunner
s for more
details.
Please see BigQuery Access Control for security and permission related information specific to BigQuery.
Modifier and Type | Class and Description |
---|---|
static class |
BigQueryIO.Read
Implementation of
read() . |
static class |
BigQueryIO.Write<T>
Implementation of
write() . |
Modifier and Type | Method and Description |
---|---|
static BigQueryIO.Read |
read()
A
PTransform that reads from a BigQuery table and returns a
PCollection of TableRows containing each of the rows of the table. |
static <T> BigQueryIO.Write<T> |
write()
A
PTransform that writes a PCollection to a BigQuery table. |
static BigQueryIO.Write<TableRow> |
writeTableRows()
|
public static BigQueryIO.Read read()
PTransform
that reads from a BigQuery table and returns a
PCollection
of TableRows
containing each of the rows of the table.
Each TableRow
contains values indexed by column name. Here is a
sample processing function that processes a "line" column from rows:
static class ExtractWordsFn extends DoFn<TableRow, String> {
public void processElement(ProcessContext c) {
// Get the "line" field of the TableRow object, split it into words, and emit them.
TableRow row = c.element();
String[] words = row.get("line").toString().split("[^a-zA-Z']+");
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
public static <T> BigQueryIO.Write<T> write()
PTransform
that writes a PCollection
to a BigQuery table. A formatting
function must be provided to convert each input element into a TableRow
using
BigQueryIO.Write.withFormatFunction(SerializableFunction)
.
In BigQuery, each table has an encosing dataset. The dataset being written must already exist.
By default, tables will be created if they do not exist, which corresponds to a
BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED
disposition that matches the default of
BigQuery's Jobs API. A schema must be provided (via BigQueryIO.Write.withSchema(TableSchema)
),
or else the transform may fail at runtime with an IllegalArgumentException
.
By default, writes require an empty table, which corresponds to
a BigQueryIO.Write.WriteDisposition.WRITE_EMPTY
disposition that matches the default of
BigQuery's Jobs API.
Here is a sample transform that produces TableRow values containing "word" and "count" columns:
static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> {
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
.set("word", c.element().getKey())
.set("count", c.element().getValue().intValue());
c.output(row);
}
}
public static BigQueryIO.Write<TableRow> writeTableRows()