# apache_beam.io.gcp.experimental.spannerio module¶

Google Cloud Spanner IO

Experimental; no backwards-compatibility guarantees.

This is an experimental module for reading and writing data from Google Cloud Spanner. Visit: https://cloud.google.com/spanner for more details.

Reading Data from Cloud Spanner.

To read from Cloud Spanner apply ReadFromSpanner transformation. It will return a PCollection, where each element represents an individual row returned from the read operation. Both Query and Read APIs are supported.

ReadFromSpanner relies on the ReadOperation objects which is exposed by the SpannerIO API. ReadOperation holds the immutable data which is responsible to execute batch and naive reads on Cloud Spanner. This is done for more convenient programming.

ReadFromSpanner reads from Cloud Spanner by providing either an ‘sql’ param in the constructor or ‘table’ name with ‘columns’ as list. For example::

records = (pipeline
| ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,
sql='Select * from users'))

records = (pipeline
| ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,
table='users', columns=['id', 'name', 'email']))


You can also perform multiple reads by providing a list of ReadOperations to the ReadFromSpanner transform constructor. ReadOperation exposes two static methods. Use ‘query’ to perform sql based reads, ‘table’ to perform read from table name. For example::

read_operations = [
'email']),
'email']),
]
all_users = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,

...OR...

ReadOperation.query(sql='Select name, email from
customers'),
sql='Select * from users where id <= @user_id',
params={'user_id': 100},
params_type={'user_id': param_types.INT64}
),
]
# params_types are instance of google.cloud.spanner.param_types
all_users = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,


User can also able to provide the ReadOperation in form of PCollection via pipeline. For example::

users = (pipeline
| ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME))


User may also create cloud spanner transaction from the transform called create_transaction which is available in the SpannerIO API.

The transform is guaranteed to be executed on a consistent snapshot of data, utilizing the power of read only transactions. Staleness of data can be controlled by providing the read_timestamp or exact_staleness param values in the constructor.

This transform requires root of the pipeline (PBegin) and returns PTransform which is passed later to the ReadFromSpanner constructor. ReadFromSpanner pass this transaction PTransform as a singleton side input to the _NaiveSpannerReadDoFn containing ‘session_id’ and ‘transaction_id’. For example::

transaction = (pipeline | create_transaction(TEST_PROJECT_ID,
TEST_INSTANCE_ID,
DB_NAME))

users = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,
sql='Select * from users', transaction=transaction)

tweets = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,
sql='Select * from tweets', transaction=transaction)


For further details of this transform, please review the docs on the create_transaction() method available in the SpannerIO API.

ReadFromSpanner takes this transform in the constructor and pass this to the read pipeline as the singleton side input.

Writing Data to Cloud Spanner.

The WriteToSpanner transform writes to Cloud Spanner by executing a collection a input rows (WriteMutation). The mutations are grouped into batches for efficiency.

WriteToSpanner transform relies on the WriteMutation objects which is exposed by the SpannerIO API. WriteMutation have five static methods (insert, update, insert_or_update, replace, delete). These methods returns the instance of the _Mutator object which contains the mutation type and the Spanner Mutation object. For more details, review the docs of the class SpannerIO.WriteMutation. For example::

mutations = [
WriteMutation.insert(table='user', columns=('name', 'email'),
values=[('sara', 'sara@dev.com')])
]
_ = (p
| beam.Create(mutations)
| WriteToSpanner(
project_id=SPANNER_PROJECT_ID,
instance_id=SPANNER_INSTANCE_ID,
database_id=SPANNER_DATABASE_NAME)
)


You can also create WriteMutation via calling its constructor. For example::

mutations = [
WriteMutation(insert='users', columns=('name', 'email'),
values=[('sara", 'sara@example.com')])
]


For more information, review the docs available on WriteMutation class.

WriteToSpanner transform also takes three batching parameters (max_number_rows, max_number_cells and max_batch_size_bytes). By default, max_number_rows is set to 50 rows, max_number_cells is set to 500 cells and max_batch_size_bytes is set to 1MB (1048576 bytes). These parameter used to reduce the number of transactions sent to spanner by grouping the mutation into batches. Setting these param values either to smaller value or zero to disable batching. Unlike the Java connector, this connector does not create batches of transactions sorted by table and primary key.

WriteToSpanner transforms starts with the grouping into batches. The first step in this process is to make the mutation groups of the WriteMutation objects and then filtering them into batchable and unbatchable mutation groups. There are three batching parameters (max_number_cells, max_number_rows & max_batch_size_bytes). We calculated th mutation byte size from the method available in the google.cloud.spanner_v1.proto.mutation_pb2.Mutation.ByteSize. if the mutation rows, cells or byte size are larger than value of the any batching parameters param, it will be tagged as “unbatchable” mutation. After this all the batchable mutation are merged into a single mutation group whos size is not larger than the “max_batch_size_bytes”, after this process, all the mutation groups together to process. If the Mutation references a table or column does not exits, it will cause a exception and fails the entire pipeline.

class apache_beam.io.gcp.experimental.spannerio.ReadOperation[source]

Encapsulates a spanner read operation.

Create new instance of ReadOperation(is_sql, is_table, read_operation, kwargs)

classmethod query(sql, params=None, param_types=None)[source]

A convenient method to construct ReadOperation from sql query.

Parameters: sql – SQL query statement params – (optional) values for parameter replacement. Keys must match the names used in sql param_types – (optional) maps explicit types for one or more param values; required if parameters are passed.
classmethod table(table, columns, index='', keyset=None)[source]

A convenient method to construct ReadOperation from table.

Parameters: table – name of the table from which to fetch data. columns – names of columns to be retrieved. index – (optional) name of index to use, rather than the table’s primary key. keyset – (optional) KeySet keys / ranges identifying rows to be retrieved.
apache_beam.io.gcp.experimental.spannerio.create_transaction(pbegin, project_id, instance_id, database_id, credentials=None, pool=None, read_timestamp=None, exact_staleness=None)[source]

A PTransform method to create a batch transaction.

Parameters: pbegin – Root of the pipeline project_id – Cloud spanner project id. Be sure to use the Project ID, not the Project Number. instance_id – Cloud spanner instance id. database_id – Cloud spanner database id. credentials – (optional) The authorization credentials to attach to requests. These credentials identify this application to the service. If none are specified, the client will attempt to ascertain the credentials from the environment. pool – (optional) session pool to be used by database. If not passed, Spanner Cloud SDK uses the BurstyPool by default. google.cloud.spanner.BurstyPool. Ref: https://googleapis.dev/python/spanner/latest/database-api.html?#google. cloud.spanner_v1.database.Database read_timestamp – (optional) An instance of the datetime.datetime object to execute all reads at the given timestamp. exact_staleness – (optional) An instance of the datetime.timedelta object. These timestamp bounds execute reads at a user-specified timestamp.
class apache_beam.io.gcp.experimental.spannerio.ReadFromSpanner(project_id, instance_id, database_id, pool=None, read_timestamp=None, exact_staleness=None, credentials=None, sql=None, params=None, param_types=None, table=None, query_name=None, columns=None, index='', keyset=None, read_operations=None, transaction=None)[source]

A PTransform to perform reads from cloud spanner. ReadFromSpanner uses BatchAPI to perform all read operations.

A PTransform that uses Spanner Batch API to perform reads.

Parameters: project_id – Cloud spanner project id. Be sure to use the Project ID, not the Project Number. instance_id – Cloud spanner instance id. database_id – Cloud spanner database id. pool – (optional) session pool to be used by database. If not passed, Spanner Cloud SDK uses the BurstyPool by default. google.cloud.spanner.BurstyPool. Ref: https://googleapis.dev/python/spanner/latest/database-api.html?#google. cloud.spanner_v1.database.Database read_timestamp – (optional) An instance of the datetime.datetime object to execute all reads at the given timestamp. By default, set to None. exact_staleness – (optional) An instance of the datetime.timedelta object. These timestamp bounds execute reads at a user-specified timestamp. By default, set to None. credentials – (optional) The authorization credentials to attach to requests. These credentials identify this application to the service. If none are specified, the client will attempt to ascertain the credentials from the environment. By default, set to None. sql – (optional) SQL query statement. params – (optional) Values for parameter replacement. Keys must match the names used in sql. By default, set to None. param_types – (optional) maps explicit types for one or more param values; required if params are passed. By default, set to None. table – (optional) Name of the table from which to fetch data. By default, set to None. columns – (optional) List of names of columns to be retrieved; required if the table is passed. By default, set to None. index – (optional) name of index to use, rather than the table’s primary key. By default, set to None. keyset – (optional) keys / ranges identifying rows to be retrieved. By default, set to None. read_operations – (optional) List of the objects of ReadOperation to perform read all. By default, set to None. transaction – (optional) PTransform of the create_transaction() to perform naive read on cloud spanner. By default, set to None.
expand(pbegin)[source]
display_data()[source]
class apache_beam.io.gcp.experimental.spannerio.WriteToSpanner(project_id, instance_id, database_id, pool=None, credentials=None, max_batch_size_bytes=1048576, max_number_rows=50, max_number_cells=500)[source]

A PTransform to write onto Google Cloud Spanner.

Parameters: project_id – Cloud spanner project id. Be sure to use the Project ID, not the Project Number. instance_id – Cloud spanner instance id. database_id – Cloud spanner database id. max_batch_size_bytes – (optional) Split the mutations into batches to reduce the number of transaction sent to Spanner. By default it is set to 1 MB (1048576 Bytes). max_number_rows – (optional) Split the mutations into batches to reduce the number of transaction sent to Spanner. By default it is set to 50 rows per batch. max_number_cells – (optional) Split the mutations into batches to reduce the number of transaction sent to Spanner. By default it is set to 500 cells per batch.
display_data()[source]
expand(pcoll)[source]
class apache_beam.io.gcp.experimental.spannerio.MutationGroup[source]

A Bundle of Spanner Mutations (_Mutator).

info
primary()[source]
class apache_beam.io.gcp.experimental.spannerio.WriteMutation(insert=None, update=None, insert_or_update=None, replace=None, delete=None, columns=None, values=None, keyset=None)[source]

Bases: object

A convenient class to create Spanner Mutations for Write. User can provide the operation via constructor or via static methods.

Note: If a user passing the operation via construction, make sure that it will only accept one operation at a time. For example, if a user passing a table name in the insert parameter, and he also passes the update parameter value, this will cause an error.

Parameters: insert – (Optional) Name of the table in which rows will be inserted. update – (Optional) Name of the table in which existing rows will be updated. insert_or_update – (Optional) Table name in which rows will be written. Like insert, except that if the row already exists, then its column values are overwritten with the ones provided. Any column values not explicitly written are preserved. replace – (Optional) Table name in which rows will be replaced. Like insert, except that if the row already exists, it is deleted, and the column values provided are inserted instead. Unlike insert_or_update, this means any values not explicitly written become NULL. delete – (Optional) Table name from which rows will be deleted. Succeeds whether or not the named rows were present. columns – The names of the columns in table to be written. The list of columns must contain enough columns to allow Cloud Spanner to derive values for all primary key columns in the row(s) to be modified. values – The values to be written. values can contain more than one list of values. If it does, then multiple rows are written, one for each entry in values. Each list in values must have exactly as many entries as there are entries in columns above. Sending multiple lists is equivalent to sending multiple Mutations, each containing one values entry and repeating table and columns. keyset – (Optional) The primary keys of the rows within table to delete. Delete is idempotent. The transaction will succeed even if some or all rows do not exist.
static insert(table, columns, values)[source]

Insert one or more new table rows.

Parameters: table – Name of the table to be modified. columns – Name of the table columns to be modified. values – Values to be modified.
static update(table, columns, values)[source]

Update one or more existing table rows.

Parameters: table – Name of the table to be modified. columns – Name of the table columns to be modified. values – Values to be modified.
static insert_or_update(table, columns, values)[source]

Insert/update one or more table rows. :param table: Name of the table to be modified. :param columns: Name of the table columns to be modified. :param values: Values to be modified.

static replace(table, columns, values)[source]

Replace one or more table rows.

Parameters: table – Name of the table to be modified. columns – Name of the table columns to be modified. values – Values to be modified.
static delete(table, keyset)[source]

Delete one or more table rows.

Parameters: table – Name of the table to be modified. keyset – Keys/ranges identifying rows to delete.