public class SpannerIO
extends java.lang.Object
To perform a single read from Cloud Spanner, construct a SpannerIO.Read
transform using SpannerIO.read()
. It will return a PCollection
of Structs
, where each element represents an individual row returned from the read operation. Both
Query and Read APIs are supported. See more information about reading from Cloud Spanner
To execute a Query, specify a SpannerIO.Read.withQuery(Statement)
or SpannerIO.Read.withQuery(String)
during the construction of the transform.
PCollection<Struct> rows = p.apply(
SpannerIO.read()
.withInstanceId(instanceId)
.withDatabaseId(dbId)
.withQuery("SELECT id, name, email FROM users"));
Reads by default use the PartitionQuery API
which enforces some limitations on the type of queries that can be used so that the data can be
read in parallel. If the query is not supported by the PartitionQuery API, then you can specify a
non-partitioned read by setting withBatching(false)
. If the
amount of data being read by a non-partitioned read is very large, it may be useful to add a
Reshuffle.viaRandomKey()
transform on the output so that the downstream transforms can
run in parallel.
To read an entire Table, use SpannerIO.Read.withTable(String)
and optionally
specify a list of columns
.
PCollection<Struct> rows = p.apply(
SpannerIO.read()
.withInstanceId(instanceId)
.withDatabaseId(dbId)
.withTable("users")
.withColumns("id", "name", "email"));
To read using an Index, specify the index name using SpannerIO.Read.withIndex(String)
.
PCollection<Struct> rows = p.apply(
SpannerIO.read()
.withInstanceId(instanceId)
.withDatabaseId(dbId)
.withTable("users")
.withIndex("users_by_name")
.withColumns("id", "name", "email"));
The transform is guaranteed to be executed on a consistent snapshot of data, utilizing the
power of read only transactions. Staleness of data can be controlled using SpannerIO.Read.withTimestampBound(com.google.cloud.spanner.TimestampBound)
or SpannerIO.Read.withTimestamp(Timestamp)
methods. Read more
about transactions in Cloud Spanner.
It is possible to read several PCollections
within a single transaction.
Apply createTransaction()
transform, that lazily creates a transaction. The
result of this transformation can be passed to read operation using SpannerIO.Read.withTransaction(PCollectionView)
.
SpannerConfig spannerConfig = ...
PCollectionView<Transaction> tx = p.apply(
SpannerIO.createTransaction()
.withSpannerConfig(spannerConfig)
.withTimestampBound(TimestampBound.strong()));
PCollection<Struct> users = p.apply(
SpannerIO.read()
.withSpannerConfig(spannerConfig)
.withQuery("SELECT name, email FROM users")
.withTransaction(tx));
PCollection<Struct> tweets = p.apply(
SpannerIO.read()
.withSpannerConfig(spannerConfig)
.withQuery("SELECT user, tweet, date FROM tweets")
.withTransaction(tx));
SpannerIO.ReadAll
transform using SpannerIO.readAll()
.
This transform takes a PCollection
of ReadOperation
elements, and performs the
partitioned read on each of them using the same Read Only Transaction for consistent results.
Note that this transform should not be used in Streaming pipelines. This is because the same Read Only Transaction, which is created once when the pipeline is first executed, will be used for all reads. The data being read will therefore become stale, and if no reads are made for more than 1 hour, the transaction will automatically timeout and be closed by the Spanner server, meaning that any subsequent reads will fail.
// Build a collection of ReadOperations.
PCollection<ReadOperation> reads = ...
PCollection<Struct> rows = reads.apply(
SpannerIO.readAll()
.withInstanceId(instanceId)
.withDatabaseId(dbId)
The Cloud Spanner SpannerIO.Write
transform writes to Cloud Spanner by executing a collection of
input row Mutations
. The mutations are grouped into batches for efficiency.
To configure the write transform, create an instance using write()
and then specify
the destination Cloud Spanner instance (SpannerIO.Write.withInstanceId(String)
and destination
database (SpannerIO.Write.withDatabaseId(String)
). For example:
// Earlier in the pipeline, create a PCollection of Mutations to be written to Cloud Spanner.
PCollection<Mutation> mutations = ...;
// Write mutations.
SpannerWriteResult result = mutations.apply(
"Write", SpannerIO.write().withInstanceId("instance").withDatabaseId("database"));
The SpannerWriteResult
object contains the results of the
transform, including a PCollection
of MutationGroups that failed to write, and a PCollection
that can be used in batch pipelines as a completion signal to Wait.OnSignal
to indicate when all input has been written. Note that in streaming pipelines,
this signal will never be triggered as the input is unbounded and this PCollection
is
using the GlobalWindow
.
To reduce the number of transactions sent to Spanner, the Mutations
are
grouped into batches. The default maximum size of the batch is set to 1MB or 5000 mutated cells,
or 500 rows (whichever is reached first). To override this use withBatchSizeBytes()
, withMaxNumMutations()
or withMaxNumRows()
. Setting
either to a small value or zero disables batching.
Note that the maximum
size of a single transaction is 20,000 mutated cells - including cells in indexes. If you
have a large number of indexes and are getting exceptions with message: INVALID_ARGUMENT: The
transaction contains too many mutations you will need to specify a smaller number of MaxNumMutations
.
The batches written are obtained from by grouping enough Mutations
from the
Bundle provided by Beam to form several batches. This group of Mutations
is then
sorted by table and primary key, and the batches are created from the sorted group. Each batch
will then have rows for the same table, with keys that are 'close' to each other, thus optimising
write efficiency by each batch affecting as few table splits as possible performance.
This grouping factor (number of batches) is controlled by the parameter withGroupingFactor()
.
Note that each worker will need enough memory to hold GroupingFactor x
MaxBatchSizeBytes
Mutations, so if you have a large MaxBatchSize
you may need to reduce
GroupingFactor
While Grouping and Batching increases write efficiency, it dramatically increases the latency between when a Mutation is received by the transform, and when it is actually written to the database. This is because enough Mutations need to be received to fill the grouped batches. In Batch pipelines (bounded sources), this is not normally an issue, but in Streaming (unbounded) pipelines, this latency is often seen as unacceptable.
There are therefore 3 different ways that this transform can be configured:
.withGroupingFactor(1)
, is set, grouping is
disabled. This is the default for Streaming pipelines. Unsorted batches are created and
written as soon as enough mutations to fill a batch are received. This reflects a
compromise where a small amount of additional latency enables more efficient writes
.withBatchSizeBytes(0)
is set, no batching is
performed and the Mutations are written to the database as soon as they are received.
ensuring the lowest latency before Mutations are written.
Several counters are provided for monitoring purpooses:
REPORT_FAILURES
, then failed batches will be split up and the
individual mutation groups retried separately.
REPORT_FAILURES
is set so that
individual Mutation Groups are retried.
The Write transform reads the database schema on pipeline start to know which columns are used as primary keys of the tables and indexes. This is so that the transform knows how to sort the grouped Mutations by table name and primary key as described above.
If the database schema, any additional tables or indexes are created in the same pipeline then there will be a race condition, leading to a situation where the schema is read before the table is created its primary key will not be known. This will mean that the sorting/batching will not be optimal and performance will be reduced (warnings will be logged for rows using unknown tables)
To prevent this race condition, use SpannerIO.Write.withSchemaReadySignal(PCollection)
to pass a
signal PCollection
(for example the output of the transform that creates the table(s))
which will be used with Wait.OnSignal
to prevent the schema from being read until it is
ready. The Write transform will be paused until this signal PCollection
is closed.
The transform does not provide same transactional guarantees as Cloud Spanner. In particular,
Use MutationGroups
with the SpannerIO.WriteGrouped
transform to ensure
that a small set mutations is bundled together. It is guaranteed that mutations in a MutationGroup
are submitted in the same transaction. Note that a MutationGroup must not exceed
the Spanner transaction limits.
// Earlier in the pipeline, create a PCollection of MutationGroups to be written to Cloud Spanner.
PCollection<MutationGroup> mutationGroups = ...;
// Write mutation groups.
SpannerWriteResult result = mutationGroups.apply(
"Write",
SpannerIO.write().withInstanceId("instance").withDatabaseId("database").grouped());
SpannerIO.Write
can be used as a streaming sink, however as with batch mode note that the write
order of individual Mutation
/MutationGroup
objects is not guaranteed.
SpannerIO.Read
and SpannerIO.ReadAll
can be used in Streaming pipelines to read a set of Facts on
pipeline startup.
SpannerIO.ReadAll
should not be used on an unbounded PCollection<ReadOperation>
, for the
reasons stated above.
Modifier and Type | Class and Description |
---|---|
static class |
SpannerIO.CreateTransaction
A
PTransform that create a transaction. |
static class |
SpannerIO.FailureMode
A failure handling strategy.
|
static class |
SpannerIO.Read
Implementation of
read() . |
static class |
SpannerIO.ReadAll
Implementation of
readAll() . |
static class |
SpannerIO.ReadChangeStream |
static interface |
SpannerIO.SpannerChangeStreamOptions
Interface to display the name of the metadata table on Dataflow UI.
|
static class |
SpannerIO.Write
A
PTransform that writes Mutation objects to Google Cloud Spanner. |
static class |
SpannerIO.WriteGrouped
Same as
SpannerIO.Write but supports grouped mutations. |
Modifier and Type | Method and Description |
---|---|
static SpannerIO.CreateTransaction |
createTransaction()
Returns a transform that creates a batch transaction.
|
static SpannerIO.Read |
read()
Creates an uninitialized instance of
SpannerIO.Read . |
static SpannerIO.ReadAll |
readAll()
|
static SpannerIO.ReadChangeStream |
readChangeStream()
Creates an uninitialized instance of
SpannerIO.ReadChangeStream . |
static SpannerIO.Write |
write()
Creates an uninitialized instance of
SpannerIO.Write . |
public static SpannerIO.Read read()
SpannerIO.Read
. Before use, the SpannerIO.Read
must be
configured with a SpannerIO.Read.withInstanceId(java.lang.String)
and SpannerIO.Read.withDatabaseId(java.lang.String)
that identify the
Cloud Spanner database.public static SpannerIO.ReadAll readAll()
public static SpannerIO.CreateTransaction createTransaction()
TimestampBound.strong()
transaction is created, to override this use SpannerIO.CreateTransaction.withTimestampBound(TimestampBound)
.public static SpannerIO.Write write()
SpannerIO.Write
. Before use, the SpannerIO.Write
must be
configured with a SpannerIO.Write.withInstanceId(java.lang.String)
and SpannerIO.Write.withDatabaseId(java.lang.String)
that identify
the Cloud Spanner database being written.public static SpannerIO.ReadChangeStream readChangeStream()
SpannerIO.ReadChangeStream
. Before use, the SpannerIO.ReadChangeStream
must be configured with a SpannerIO.ReadChangeStream.withProjectId(java.lang.String)
, SpannerIO.ReadChangeStream.withInstanceId(java.lang.String)
, and SpannerIO.ReadChangeStream.withDatabaseId(java.lang.String)
that identify the
Cloud Spanner database being written. It must also be configured with the start time and the
change stream name.