@Experimental(value=SOURCE_SINK) public class SpannerIO extends java.lang.Object
Transformsfor reading from and writing to Google Cloud Spanner.
To read from Cloud Spanner, apply
SpannerIO.Read transformation. It will return a
Structs, where each element represents an individual row
returned from the read operation. Both Query and Read APIs are supported. See more information
about reading from Cloud Spanner
PCollection<Struct> rows = p.apply( SpannerIO.read() .withInstanceId(instanceId) .withDatabaseId(dbId) .withQuery("SELECT id, name, email FROM users"));
PCollection<Struct> rows = p.apply( SpannerIO.read() .withInstanceId(instanceId) .withDatabaseId(dbId) .withTable("users") .withColumns("id", "name", "email"));
To optimally read using index, specify the index name using
The transform is guaranteed to be executed on a consistent snapshot of data, utilizing the
power of read only transactions. Staleness of data can be controlled using
SpannerIO.Read.withTimestamp(Timestamp) methods. Read more about transactions in
It is possible to read several
PCollections within a single transaction.
createTransaction() transform, that lazily creates a transaction. The
result of this transformation can be passed to read operation using
SpannerConfig spannerConfig = ... PCollectionView<Transaction> tx = p.apply( SpannerIO.createTransaction() .withSpannerConfig(spannerConfig) .withTimestampBound(TimestampBound.strong())); PCollection<Struct> users = p.apply( SpannerIO.read() .withSpannerConfig(spannerConfig) .withQuery("SELECT name, email FROM users") .withTransaction(tx)); PCollection<Struct> tweets = p.apply( SpannerIO.read() .withSpannerConfig(spannerConfig) .withQuery("SELECT user, tweet, date FROM tweets") .withTransaction(tx));
To configure the write transform, create an instance using
write() and then specify
the destination Cloud Spanner instance (
SpannerIO.Write.withInstanceId(String) and destination
SpannerIO.Write.withDatabaseId(String)). For example:
// Earlier in the pipeline, create a PCollection of Mutations to be written to Cloud Spanner. PCollection<Mutation> mutations = ...; // Write mutations. SpannerWriteResult result = mutations.apply( "Write", SpannerIO.write().withInstanceId("instance").withDatabaseId("database"));
SpannerWriteResultobject contains the results of the transform, including a
PCollectionof MutationGroups that failed to write, and a
PCollectionthat can be used in batch pipelines as a completion signal to
Wait.OnSignalto indicate when all input has been written. Note that in streaming pipelines, this signal will never be triggered as the input is unbounded and this
PCollectionis using the
To reduce the number of transactions sent to Spanner, the
grouped into batches The default maximum size of the batch is set to 1MB or 5000 mutated cells,
or 500 rows (whichever is reached first). To override this use
either to a small value or zero disables batching.
Note that the maximum
size of a single transaction is 20,000 mutated cells - including cells in indexes. If you
have a large number of indexes and are getting exceptions with message: INVALID_ARGUMENT: The
transaction contains too many mutations you will need to specify a smaller number of
The batches written are obtained from by grouping enough
Mutations from the
Bundle provided by Beam to form (by default) 1000 batches. This group of
Mutations is then sorted by table and primary key, and the batches are created from the sorted
group. Each batch will then have rows with keys that are 'close' to each other to optimise write
performance. This grouping factor (number of batches) is controlled by the parameter
Note that each worker will need enough memory to hold
MaxBatchSizeBytes Mutations, so if you have a large
MaxBatchSize you may need to reduce
Several counters are provided for monitoring purpooses:
REPORT_FAILURES, then failed batches will be split up and the individual mutation groups retried separately.
REPORT_FAILURESis set so that individual Mutation Groups are retried.
The Write transform reads the database schema on pipeline start to know which columns are used as primary keys of the tables and indexes. This is so that the transform knows how to sort the grouped Mutations by table name and primary key as described above.
If the database schema, any additional tables or indexes are created in the same pipeline then there will be a race condition, leading to a situation where the schema is read before the table is created its primary key will not be known. This will mean that the sorting/batching will not be optimal and performance will be reduced (warnings will be logged for rows using unknown tables)
To prevent this race condition, use
SpannerIO.Write.withSchemaReadySignal(PCollection) to pass a
PCollection (for example the output of the transform that creates the table(s))
which will be used with
Wait.OnSignal to prevent the schema from being read until it is
ready. The Write transform will be paused until this signal
PCollection is closed.
The transform does not provide same transactional guarantees as Cloud Spanner. In particular,
MutationGroups with the
SpannerIO.WriteGrouped transform to ensure
that a small set mutations is bundled together. It is guaranteed that mutations in a
MutationGroup are submitted in the same transaction. Note that a MutationGroup must not exceed
the Spanner transaction limits.
// Earlier in the pipeline, create a PCollection of MutationGroups to be written to Cloud Spanner. PCollection<MutationGroup> mutationGroups = ...; // Write mutation groups. SpannerWriteResult result = mutationGroups.apply( "Write", SpannerIO.write().withInstanceId("instance").withDatabaseId("database").grouped());
|Modifier and Type||Class and Description|
A failure handling strategy.
|Modifier and Type||Method and Description|
Returns a transform that creates a batch transaction.
Creates an uninitialized instance of
Creates an uninitialized instance of
public static SpannerIO.Read read()
public static SpannerIO.ReadAll readAll()
@Experimental public static SpannerIO.CreateTransaction createTransaction()
TimestampBound.strong()transaction is created, to override this use