@Experimental(value=SOURCE_SINK) public class SpannerIO extends java.lang.Object
Transforms
for reading from and writing to Google Cloud Spanner.
To read from Cloud Spanner, apply SpannerIO.Read
transformation. It will return a
PCollection
of Structs
, where each element represents an individual row
returned from the read operation. Both Query and Read APIs are supported. See more information
about reading from Cloud Spanner
To execute a query, specify a SpannerIO.Read.withQuery(Statement)
or
SpannerIO.Read.withQuery(String)
during the construction of the transform.
PCollection<Struct> rows = p.apply(
SpannerIO.read()
.withInstanceId(instanceId)
.withDatabaseId(dbId)
.withQuery("SELECT id, name, email FROM users"));
To use the Read API, specify a table name
and a
list of columns
.
PCollection<Struct> rows = p.apply(
SpannerIO.read()
.withInstanceId(instanceId)
.withDatabaseId(dbId)
.withTable("users")
.withColumns("id", "name", "email"));
To optimally read using index, specify the index name using SpannerIO.Read.withIndex(java.lang.String)
.
The transform is guaranteed to be executed on a consistent snapshot of data, utilizing the
power of read only transactions. Staleness of data can be controlled using SpannerIO.Read.withTimestampBound(com.google.cloud.spanner.TimestampBound)
or SpannerIO.Read.withTimestamp(Timestamp)
methods. Read more about transactions in
Cloud Spanner.
It is possible to read several PCollections
within a single transaction.
Apply createTransaction()
transform, that lazily creates a transaction. The
result of this transformation can be passed to read operation using SpannerIO.Read.withTransaction(PCollectionView)
.
SpannerConfig spannerConfig = ...
PCollectionView<Transaction> tx =
p.apply(
SpannerIO.createTransaction()
.withSpannerConfig(spannerConfig)
.withTimestampBound(TimestampBound.strong()));
PCollection<Struct> users = p.apply(
SpannerIO.read()
.withSpannerConfig(spannerConfig)
.withQuery("SELECT name, email FROM users")
.withTransaction(tx));
PCollection<Struct> tweets = p.apply(
SpannerIO.read()
.withSpannerConfig(spannerConfig)
.withQuery("SELECT user, tweet, date FROM tweets")
.withTransaction(tx));
The Cloud Spanner SpannerIO.Write
transform writes to Cloud Spanner by executing a
collection of input row Mutations
. The mutations are grouped into batches for
efficiency.
To configure the write transform, create an instance using write()
and then specify
the destination Cloud Spanner instance (SpannerIO.Write.withInstanceId(String)
and destination
database (SpannerIO.Write.withDatabaseId(String)
). For example:
// Earlier in the pipeline, create a PCollection of Mutations to be written to Cloud Spanner.
PCollection<Mutation> mutations = ...;
// Write mutations.
SpannerWriteResult result = mutations.apply(
"Write", SpannerIO.write().withInstanceId("instance").withDatabaseId("database"));
SpannerWriteResult
object contains the results of the transform,
including a PCollection
of MutationGroups that failed to write, and a PCollection
that can be used as a completion signal.
To reduce the number of transactions sent to Spanner, the Mutations
are
grouped into batches The default maximum size of the batch is set to 1MB or 5000 mutated cells.
To override this use withBatchSizeBytes()
and withMaxNumMutations()
. Setting either to a small value or zero
disables batching.
Note that the maximum
size of a single transaction is 20,000 mutated cells - including cells in indexes. If you
have a large number of indexes and are getting exceptions with message: INVALID_ARGUMENT: The
transaction contains too many mutations you will need to specify a smaller number of MaxNumMutations
.
The batches written are obtained from by grouping enough Mutations
from the
Bundle provided by Beam to form (by default) 1000 batches. This group of Mutations
is then sorted by Key, and the batches are created from the sorted group. This so that
each batch will have keys that are 'close' to each other to optimise write performance. This
grouping factor (number of batches) is controlled by the parameter withGroupingFactor()
.
Note that each worker will need enough memory to hold GroupingFactor x MaxBatchSizeBytes
Mutations, so if you have a large MaxBatchSize
you may need to reduce GroupingFactor
The Write transform reads the database schema on pipeline start. If the schema is created as
part of the same pipeline, this transform needs to wait until this has happened. Use SpannerIO.Write.withSchemaReadySignal(PCollection)
to pass a signal PCollection
which will be used
with Wait.OnSignal
to prevent the schema from being read until it is ready. The Write
transform will be paused until the signal PCollection
is closed.
The transform does not provide same transactional guarantees as Cloud Spanner. In particular,
Use MutationGroups
with the SpannerIO.WriteGrouped
transform to ensure
that a small set mutations is bundled together. It is guaranteed that mutations in a MutationGroup
are submitted in the same transaction. Note that a MutationGroup must not exceed
the Spanner transaction limits.
// Earlier in the pipeline, create a PCollection of MutationGroups to be written to Cloud Spanner.
PCollection<MutationGroup> mutationGroups = ...;
// Write mutation groups.
SpannerWriteResult result = mutationGroups.apply(
"Write",
SpannerIO.write().withInstanceId("instance").withDatabaseId("database").grouped());
SpannerIO.Write
can be used as a streaming sink, however as with batch mode note that
the write order of individual Mutation
/MutationGroup
objects is not guaranteed.
Modifier and Type | Class and Description |
---|---|
static class |
SpannerIO.CreateTransaction
A
PTransform that create a transaction. |
static class |
SpannerIO.FailureMode
A failure handling strategy.
|
static class |
SpannerIO.Read
Implementation of
read() . |
static class |
SpannerIO.ReadAll
Implementation of
readAll() . |
static class |
SpannerIO.Write
A
PTransform that writes Mutation objects to Google Cloud Spanner. |
static class |
SpannerIO.WriteGrouped
Same as
SpannerIO.Write but supports grouped mutations. |
Modifier and Type | Method and Description |
---|---|
static SpannerIO.CreateTransaction |
createTransaction()
Returns a transform that creates a batch transaction.
|
static SpannerIO.Read |
read()
Creates an uninitialized instance of
SpannerIO.Read . |
static SpannerIO.ReadAll |
readAll()
|
static SpannerIO.Write |
write()
Creates an uninitialized instance of
SpannerIO.Write . |
@Experimental(value=SOURCE_SINK) public static SpannerIO.Read read()
SpannerIO.Read
. Before use, the SpannerIO.Read
must be
configured with a SpannerIO.Read.withInstanceId(java.lang.String)
and SpannerIO.Read.withDatabaseId(java.lang.String)
that identify the
Cloud Spanner database.@Experimental(value=SOURCE_SINK) public static SpannerIO.ReadAll readAll()
@Experimental public static SpannerIO.CreateTransaction createTransaction()
TimestampBound.strong()
transaction is created, to override this use SpannerIO.CreateTransaction.withTimestampBound(TimestampBound)
.@Experimental public static SpannerIO.Write write()
SpannerIO.Write
. Before use, the SpannerIO.Write
must be
configured with a SpannerIO.Write.withInstanceId(java.lang.String)
and SpannerIO.Write.withDatabaseId(java.lang.String)
that identify
the Cloud Spanner database being written.