apache_beam.io.gcp.spanner module

PTransforms for supporting Spanner in Python pipelines.

These transforms are currently supported by Beam portable Flink and Spark runners.

Setup

Transforms provided in this module are cross-language transforms implemented in the Beam Java SDK. During the pipeline construction, Python SDK will connect to a Java expansion service to expand these transforms. To facilitate this, a small amount of setup is needed before using these transforms in a Beam Python pipeline.

There are several ways to setup cross-language Spanner transforms.

  • Option 1: use the default expansion service
  • Option 2: specify a custom expansion service

See below for details regarding each of these options.

Option 1: Use the default expansion service

This is the recommended and easiest setup option for using Python Spanner transforms. This option is only available for Beam 2.26.0 and later.

This option requires following pre-requisites before running the Beam pipeline.

  • Install Java runtime in the computer from where the pipeline is constructed and make sure that ‘java’ command is available.

In this option, Python SDK will either download (for released Beam version) or build (when running from a Beam Git clone) a expansion service jar and use that to expand transforms. Currently Spanner transforms use the ‘beam-sdks-java-io-google-cloud-platform-expansion-service’ jar for this purpose.

Option 2: specify a custom expansion service

In this option, you startup your own expansion service and provide that as a parameter when using the transforms provided in this module.

This option requires following pre-requisites before running the Beam pipeline.

  • Startup your own expansion service.
  • Update your pipeline to provide the expansion service address when initiating Spanner transforms provided in this module.

Flink Users can use the built-in Expansion Service of the Flink Runner’s Job Server. If you start Flink’s Job Server, the expansion service will be started on port 8097. For a different address, please set the expansion_service parameter.

More information

For more information regarding cross-language transforms see: - https://beam.apache.org/roadmap/portability/

For more information specific to Flink runner see: - https://beam.apache.org/documentation/runners/flink/

class apache_beam.io.gcp.spanner.TimeUnit[source]

Bases: enum.Enum

An enumeration.

NANOSECONDS = 1
MICROSECONDS = 2
MILLISECONDS = 3
SECONDS = 4
HOURS = 5
DAYS = 6
class apache_beam.io.gcp.spanner.TimestampBoundMode[source]

Bases: enum.Enum

An enumeration.

MAX_STALENESS = 1
EXACT_STALENESS = 2
READ_TIMESTAMP = 3
MIN_READ_TIMESTAMP = 4
STRONG = 5
class apache_beam.io.gcp.spanner.ReadFromSpanner(project_id, instance_id, database_id, row_type=None, sql=None, table=None, host=None, emulator_host=None, batching=None, timestamp_bound_mode=None, read_timestamp=None, staleness=None, time_unit=None, expansion_service=None)[source]

Bases: apache_beam.transforms.external.ExternalTransform

A PTransform which reads from the specified Spanner instance’s database.

This transform required type of the row it has to return to provide the schema. Example:

from typing import NamedTuple
from apache_beam import coders

class ExampleRow(NamedTuple):
  id: int
  name: unicode

coders.registry.register_coder(ExampleRow, coders.RowCoder)

with Pipeline() as p:
  result = (
      p
      | ReadFromSpanner(
          instance_id='your_instance_id',
          database_id='your_database_id',
          project_id='your_project_id',
          row_type=ExampleRow,
          sql='SELECT * FROM some_table',
          timestamp_bound_mode=TimestampBoundMode.MAX_STALENESS,
          staleness=3,
          time_unit=TimeUnit.HOURS,
      ).with_output_types(ExampleRow))

Experimental; no backwards compatibility guarantees.

Initializes a read operation from Spanner.

Parameters:
  • project_id – Specifies the Cloud Spanner project.
  • instance_id – Specifies the Cloud Spanner instance.
  • database_id – Specifies the Cloud Spanner database.
  • row_type – Row type that fits the given query or table. Passed as NamedTuple, e.g. NamedTuple(‘name’, [(‘row_name’, unicode)])
  • sql – An sql query to execute. It’s results must fit the provided row_type. Don’t use when table is set.
  • table – A spanner table. When provided all columns from row_type will be selected to query. Don’t use when query is set.
  • batching – By default Batch API is used to read data from Cloud Spanner. It is useful to disable batching when the underlying query is not root-partitionable.
  • host – Specifies the Cloud Spanner host.
  • emulator_host – Specifies Spanner emulator host.
  • timestamp_bound_mode – Defines how Cloud Spanner will choose a timestamp for a read-only transaction or a single read/query. Passed as TimestampBoundMode enum. Possible values: STRONG: A timestamp bound that will perform reads and queries at a timestamp where all previously committed transactions are visible. READ_TIMESTAMP: Returns a timestamp bound that will perform reads and queries at the given timestamp. MIN_READ_TIMESTAMP: Returns a timestamp bound that will perform reads and queries at a timestamp chosen to be at least given timestamp value. EXACT_STALENESS: Returns a timestamp bound that will perform reads and queries at an exact staleness. The timestamp is chosen soon after the read is started. MAX_STALENESS: Returns a timestamp bound that will perform reads and queries at a timestamp chosen to be at most time_unit stale.
  • read_timestamp – Timestamp in string. Use only when timestamp_bound_mode is set to READ_TIMESTAMP or MIN_READ_TIMESTAMP.
  • staleness – Staleness value as int. Use only when timestamp_bound_mode is set to EXACT_STALENESS or MAX_STALENESS. time_unit has to be set along with this param.
  • time_unit – Time unit for staleness_value passed as TimeUnit enum. Possible values: NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, HOURS, DAYS.
  • expansion_service – The address (host:port) of the ExpansionService.
URN = 'beam:external:java:spanner:read:v1'
class apache_beam.io.gcp.spanner.SpannerDelete(project_id, instance_id, database_id, table, max_batch_size_bytes=None, max_number_mutations=None, max_number_rows=None, grouping_factor=None, host=None, emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, expansion_service=None)[source]

Bases: apache_beam.transforms.external.ExternalTransform

A PTransform which writes delete mutations to the specified Spanner table.

This transform receives rows defined as NamedTuple. Example:

from typing import NamedTuple
from apache_beam import coders

class ExampleKey(NamedTuple):
   id: int
   name: unicode

coders.registry.register_coder(ExampleKey, coders.RowCoder)

with Pipeline() as p:
  _ = (
      p
      | 'Impulse' >> beam.Impulse()
      | 'Generate' >> beam.FlatMap(lambda x: range(num_rows))
      | 'To row' >> beam.Map(lambda n: ExampleKey(n, str(n))
          .with_output_types(ExampleKey)
      | 'Write to Spanner' >> SpannerDelete(
          instance_id='your_instance',
          database_id='existing_database',
          project_id='your_project_id',
          table='your_table'))

Experimental; no backwards compatibility guarantees.

Initializes a delete operation to a Spanner table.

Parameters:
  • project_id – Specifies the Cloud Spanner project.
  • instance_id – Specifies the Cloud Spanner instance.
  • database_id – Specifies the Cloud Spanner database.
  • table – Specifies the Cloud Spanner table.
  • max_batch_size_bytes – Specifies the batch size limit (max number of bytes mutated per batch). Default value is 1048576 bytes = 1MB.
  • max_number_mutations – Specifies the cell mutation limit (maximum number of mutated cells per batch). Default value is 5000.
  • max_number_rows – Specifies the row mutation limit (maximum number of mutated rows per batch). Default value is 500.
  • grouping_factor – Specifies the multiple of max mutation (in terms of both bytes per batch and cells per batch) that is used to select a set of mutations to sort by key for batching. This sort uses local memory on the workers, so using large values can cause out of memory errors. Default value is 1000.
  • host – Specifies the Cloud Spanner host.
  • emulator_host – Specifies Spanner emulator host.
  • commit_deadline – Specifies the deadline for the Commit API call. Default is 15 secs. DEADLINE_EXCEEDED errors will prompt a backoff/retry until the value of commit_deadline is reached. DEADLINE_EXCEEDED errors are ar reported with logging and counters. Pass seconds as value.
  • max_cumulative_backoff – Specifies the maximum cumulative backoff time when retrying after DEADLINE_EXCEEDED errors. Default is 900s (15min). If the mutations still have not been written after this time, they are treated as a failure, and handled according to the setting of failure_mode. Pass seconds as value.
  • expansion_service – The address (host:port) of the ExpansionService.
URN = 'beam:external:java:spanner:delete:v1'
class apache_beam.io.gcp.spanner.SpannerInsert(project_id, instance_id, database_id, table, max_batch_size_bytes=None, max_number_mutations=None, max_number_rows=None, grouping_factor=None, host=None, emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, expansion_service=None)[source]

Bases: apache_beam.transforms.external.ExternalTransform

A PTransform which writes insert mutations to the specified Spanner table.

This transform receives rows defined as NamedTuple. Example:

from typing import NamedTuple
from apache_beam import coders

class ExampleRow(NamedTuple):
   id: int
   name: unicode

coders.registry.register_coder(ExampleRow, coders.RowCoder)

with Pipeline() as p:
  _ = (
      p
      | 'Impulse' >> beam.Impulse()
      | 'Generate' >> beam.FlatMap(lambda x: range(num_rows))
      | 'To row' >> beam.Map(lambda n: ExampleRow(n, str(n))
          .with_output_types(ExampleRow)
      | 'Write to Spanner' >> SpannerInsert(
          instance_id='your_instance',
          database_id='existing_database',
          project_id='your_project_id',
          table='your_table'))

Experimental; no backwards compatibility guarantees.

Initializes an insert operation to a Spanner table.

Parameters:
  • project_id – Specifies the Cloud Spanner project.
  • instance_id – Specifies the Cloud Spanner instance.
  • database_id – Specifies the Cloud Spanner database.
  • table – Specifies the Cloud Spanner table.
  • max_batch_size_bytes – Specifies the batch size limit (max number of bytes mutated per batch). Default value is 1048576 bytes = 1MB.
  • max_number_mutations – Specifies the cell mutation limit (maximum number of mutated cells per batch). Default value is 5000.
  • max_number_rows – Specifies the row mutation limit (maximum number of mutated rows per batch). Default value is 500.
  • grouping_factor – Specifies the multiple of max mutation (in terms of both bytes per batch and cells per batch) that is used to select a set of mutations to sort by key for batching. This sort uses local memory on the workers, so using large values can cause out of memory errors. Default value is 1000.
  • host – Specifies the Cloud Spanner host.
  • emulator_host – Specifies Spanner emulator host.
  • commit_deadline – Specifies the deadline for the Commit API call. Default is 15 secs. DEADLINE_EXCEEDED errors will prompt a backoff/retry until the value of commit_deadline is reached. DEADLINE_EXCEEDED errors are ar reported with logging and counters. Pass seconds as value.
  • max_cumulative_backoff – Specifies the maximum cumulative backoff time when retrying after DEADLINE_EXCEEDED errors. Default is 900s (15min). If the mutations still have not been written after this time, they are treated as a failure, and handled according to the setting of failure_mode. Pass seconds as value.
  • expansion_service – The address (host:port) of the ExpansionService.
URN = 'beam:external:java:spanner:insert:v1'
class apache_beam.io.gcp.spanner.SpannerReplace(project_id, instance_id, database_id, table, max_batch_size_bytes=None, max_number_mutations=None, max_number_rows=None, grouping_factor=None, host=None, emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, expansion_service=None)[source]

Bases: apache_beam.transforms.external.ExternalTransform

A PTransform which writes replace mutations to the specified Spanner table.

This transform receives rows defined as NamedTuple. Example:

from typing import NamedTuple
from apache_beam import coders

class ExampleRow(NamedTuple):
   id: int
   name: unicode

coders.registry.register_coder(ExampleRow, coders.RowCoder)

with Pipeline() as p:
  _ = (
      p
      | 'Impulse' >> beam.Impulse()
      | 'Generate' >> beam.FlatMap(lambda x: range(num_rows))
      | 'To row' >> beam.Map(lambda n: ExampleRow(n, str(n))
          .with_output_types(ExampleRow)
      | 'Write to Spanner' >> SpannerReplace(
          instance_id='your_instance',
          database_id='existing_database',
          project_id='your_project_id',
          table='your_table'))

Experimental; no backwards compatibility guarantees.

Initializes a replace operation to a Spanner table.

Parameters:
  • project_id – Specifies the Cloud Spanner project.
  • instance_id – Specifies the Cloud Spanner instance.
  • database_id – Specifies the Cloud Spanner database.
  • table – Specifies the Cloud Spanner table.
  • max_batch_size_bytes – Specifies the batch size limit (max number of bytes mutated per batch). Default value is 1048576 bytes = 1MB.
  • max_number_mutations – Specifies the cell mutation limit (maximum number of mutated cells per batch). Default value is 5000.
  • max_number_rows – Specifies the row mutation limit (maximum number of mutated rows per batch). Default value is 500.
  • grouping_factor – Specifies the multiple of max mutation (in terms of both bytes per batch and cells per batch) that is used to select a set of mutations to sort by key for batching. This sort uses local memory on the workers, so using large values can cause out of memory errors. Default value is 1000.
  • host – Specifies the Cloud Spanner host.
  • emulator_host – Specifies Spanner emulator host.
  • commit_deadline – Specifies the deadline for the Commit API call. Default is 15 secs. DEADLINE_EXCEEDED errors will prompt a backoff/retry until the value of commit_deadline is reached. DEADLINE_EXCEEDED errors are ar reported with logging and counters. Pass seconds as value.
  • max_cumulative_backoff – Specifies the maximum cumulative backoff time when retrying after DEADLINE_EXCEEDED errors. Default is 900s (15min). If the mutations still have not been written after this time, they are treated as a failure, and handled according to the setting of failure_mode. Pass seconds as value.
  • expansion_service – The address (host:port) of the ExpansionService.
URN = 'beam:external:java:spanner:replace:v1'
class apache_beam.io.gcp.spanner.SpannerInsertOrUpdate(project_id, instance_id, database_id, table, max_batch_size_bytes=None, max_number_mutations=None, max_number_rows=None, grouping_factor=None, host=None, emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, expansion_service=None)[source]

Bases: apache_beam.transforms.external.ExternalTransform

A PTransform which writes insert-or-update mutations to the specified Spanner table.

This transform receives rows defined as NamedTuple. Example:

from typing import NamedTuple
from apache_beam import coders

class ExampleRow(NamedTuple):
   id: int
   name: unicode

coders.registry.register_coder(ExampleRow, coders.RowCoder)

with Pipeline() as p:
  _ = (
      p
      | 'Impulse' >> beam.Impulse()
      | 'Generate' >> beam.FlatMap(lambda x: range(num_rows))
      | 'To row' >> beam.Map(lambda n: ExampleRow(n, str(n))
          .with_output_types(ExampleRow)
      | 'Write to Spanner' >> SpannerInsertOrUpdate(
          instance_id='your_instance',
          database_id='existing_database',
          project_id='your_project_id',
          table='your_table'))

Experimental; no backwards compatibility guarantees.

Initializes an insert-or-update operation to a Spanner table.

Parameters:
  • project_id – Specifies the Cloud Spanner project.
  • instance_id – Specifies the Cloud Spanner instance.
  • database_id – Specifies the Cloud Spanner database.
  • table – Specifies the Cloud Spanner table.
  • max_batch_size_bytes – Specifies the batch size limit (max number of bytes mutated per batch). Default value is 1048576 bytes = 1MB.
  • max_number_mutations – Specifies the cell mutation limit (maximum number of mutated cells per batch). Default value is 5000.
  • max_number_rows – Specifies the row mutation limit (maximum number of mutated rows per batch). Default value is 500.
  • grouping_factor – Specifies the multiple of max mutation (in terms of both bytes per batch and cells per batch) that is used to select a set of mutations to sort by key for batching. This sort uses local memory on the workers, so using large values can cause out of memory errors. Default value is 1000.
  • host – Specifies the Cloud Spanner host.
  • emulator_host – Specifies Spanner emulator host.
  • commit_deadline – Specifies the deadline for the Commit API call. Default is 15 secs. DEADLINE_EXCEEDED errors will prompt a backoff/retry until the value of commit_deadline is reached. DEADLINE_EXCEEDED errors are ar reported with logging and counters. Pass seconds as value.
  • max_cumulative_backoff – Specifies the maximum cumulative backoff time when retrying after DEADLINE_EXCEEDED errors. Default is 900s (15min). If the mutations still have not been written after this time, they are treated as a failure, and handled according to the setting of failure_mode. Pass seconds as value.
  • expansion_service – The address (host:port) of the ExpansionService.
URN = 'beam:external:java:spanner:insert_or_update:v1'
class apache_beam.io.gcp.spanner.SpannerUpdate(project_id, instance_id, database_id, table, max_batch_size_bytes=None, max_number_mutations=None, max_number_rows=None, grouping_factor=None, host=None, emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, expansion_service=None)[source]

Bases: apache_beam.transforms.external.ExternalTransform

A PTransform which writes update mutations to the specified Spanner table.

This transform receives rows defined as NamedTuple. Example:

from typing import NamedTuple
from apache_beam import coders

class ExampleRow(NamedTuple):
   id: int
   name: unicode

coders.registry.register_coder(ExampleRow, coders.RowCoder)

with Pipeline() as p:
  _ = (
      p
      | 'Impulse' >> beam.Impulse()
      | 'Generate' >> beam.FlatMap(lambda x: range(num_rows))
      | 'To row' >> beam.Map(lambda n: ExampleRow(n, str(n))
          .with_output_types(ExampleRow)
      | 'Write to Spanner' >> SpannerUpdate(
          instance_id='your_instance',
          database_id='existing_database',
          project_id='your_project_id',
          table='your_table'))

Experimental; no backwards compatibility guarantees.

Initializes an update operation to a Spanner table.

Parameters:
  • project_id – Specifies the Cloud Spanner project.
  • instance_id – Specifies the Cloud Spanner instance.
  • database_id – Specifies the Cloud Spanner database.
  • table – Specifies the Cloud Spanner table.
  • max_batch_size_bytes – Specifies the batch size limit (max number of bytes mutated per batch). Default value is 1048576 bytes = 1MB.
  • max_number_mutations – Specifies the cell mutation limit (maximum number of mutated cells per batch). Default value is 5000.
  • max_number_rows – Specifies the row mutation limit (maximum number of mutated rows per batch). Default value is 500.
  • grouping_factor – Specifies the multiple of max mutation (in terms of both bytes per batch and cells per batch) that is used to select a set of mutations to sort by key for batching. This sort uses local memory on the workers, so using large values can cause out of memory errors. Default value is 1000.
  • host – Specifies the Cloud Spanner host.
  • emulator_host – Specifies Spanner emulator host.
  • commit_deadline – Specifies the deadline for the Commit API call. Default is 15 secs. DEADLINE_EXCEEDED errors will prompt a backoff/retry until the value of commit_deadline is reached. DEADLINE_EXCEEDED errors are ar reported with logging and counters. Pass seconds as value.
  • max_cumulative_backoff – Specifies the maximum cumulative backoff time when retrying after DEADLINE_EXCEEDED errors. Default is 900s (15min). If the mutations still have not been written after this time, they are treated as a failure, and handled according to the setting of failure_mode. Pass seconds as value.
  • expansion_service – The address (host:port) of the ExpansionService.
URN = 'beam:external:java:spanner:update:v1'