Source code for apache_beam.io.gcp.spanner

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""PTransforms for supporting Spanner in Python pipelines.

  These transforms are currently supported by Beam portable
  Flink and Spark runners.

  **Setup**

  Transforms provided in this module are cross-language transforms
  implemented in the Beam Java SDK. During the pipeline construction, Python SDK
  will connect to a Java expansion service to expand these transforms.
  To facilitate this, a small amount of setup is needed before using these
  transforms in a Beam Python pipeline.

  There are several ways to setup cross-language Spanner transforms.

  * Option 1: use the default expansion service
  * Option 2: specify a custom expansion service

  See below for details regarding each of these options.

  *Option 1: Use the default expansion service*

  This is the recommended and easiest setup option for using Python Spanner
  transforms. This option is only available for Beam 2.26.0 and later.

  This option requires following pre-requisites before running the Beam
  pipeline.

  * Install Java runtime in the computer from where the pipeline is constructed
    and make sure that 'java' command is available.

  In this option, Python SDK will either download (for released Beam version) or
  build (when running from a Beam Git clone) a expansion service jar and use
  that to expand transforms. Currently Spanner transforms use the
  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
  purpose.

  *Option 2: specify a custom expansion service*

  In this option, you startup your own expansion service and provide that as
  a parameter when using the transforms provided in this module.

  This option requires following pre-requisites before running the Beam
  pipeline.

  * Startup your own expansion service.
  * Update your pipeline to provide the expansion service address when
    initiating Spanner transforms provided in this module.

  Flink Users can use the built-in Expansion Service of the Flink Runner's
  Job Server. If you start Flink's Job Server, the expansion service will be
  started on port 8097. For a different address, please set the
  expansion_service parameter.

  **More information**

  For more information regarding cross-language transforms see:
  - https://beam.apache.org/roadmap/portability/

  For more information specific to Flink runner see:
  - https://beam.apache.org/documentation/runners/flink/
"""

# pytype: skip-file

from __future__ import absolute_import

from enum import Enum
from enum import auto
from typing import NamedTuple
from typing import Optional

from past.builtins import unicode

from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external import ExternalTransform
from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
from apache_beam.typehints.schemas import named_tuple_to_schema

__all__ = [
    'ReadFromSpanner',
    'SpannerDelete',
    'SpannerInsert',
    'SpannerInsertOrUpdate',
    'SpannerReplace',
    'SpannerUpdate',
    'TimestampBoundMode',
    'TimeUnit',
]


def default_io_expansion_service():
  return BeamJarExpansionService(
      'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')


[docs]class TimeUnit(Enum): NANOSECONDS = auto() MICROSECONDS = auto() MILLISECONDS = auto() SECONDS = auto() HOURS = auto() DAYS = auto()
[docs]class TimestampBoundMode(Enum): MAX_STALENESS = auto() EXACT_STALENESS = auto() READ_TIMESTAMP = auto() MIN_READ_TIMESTAMP = auto() STRONG = auto()
class ReadFromSpannerSchema(NamedTuple): instance_id: unicode database_id: unicode schema: bytes sql: Optional[unicode] table: Optional[unicode] project_id: Optional[unicode] host: Optional[unicode] emulator_host: Optional[unicode] batching: Optional[bool] timestamp_bound_mode: Optional[unicode] read_timestamp: Optional[unicode] staleness: Optional[int] time_unit: Optional[unicode]
[docs]class ReadFromSpanner(ExternalTransform): """ A PTransform which reads from the specified Spanner instance's database. This transform required type of the row it has to return to provide the schema. Example:: from typing import NamedTuple from apache_beam import coders class ExampleRow(NamedTuple): id: int name: unicode coders.registry.register_coder(ExampleRow, coders.RowCoder) with Pipeline() as p: result = ( p | ReadFromSpanner( instance_id='your_instance_id', database_id='your_database_id', project_id='your_project_id', row_type=ExampleRow, sql='SELECT * FROM some_table', timestamp_bound_mode=TimestampBoundMode.MAX_STALENESS, staleness=3, time_unit=TimeUnit.HOURS, ).with_output_types(ExampleRow)) Experimental; no backwards compatibility guarantees. """ URN = 'beam:external:java:spanner:read:v1' def __init__( self, project_id, instance_id, database_id, row_type=None, sql=None, table=None, host=None, emulator_host=None, batching=None, timestamp_bound_mode=None, read_timestamp=None, staleness=None, time_unit=None, expansion_service=None, ): """ Initializes a read operation from Spanner. :param project_id: Specifies the Cloud Spanner project. :param instance_id: Specifies the Cloud Spanner instance. :param database_id: Specifies the Cloud Spanner database. :param row_type: Row type that fits the given query or table. Passed as NamedTuple, e.g. NamedTuple('name', [('row_name', unicode)]) :param sql: An sql query to execute. It's results must fit the provided row_type. Don't use when table is set. :param table: A spanner table. When provided all columns from row_type will be selected to query. Don't use when query is set. :param batching: By default Batch API is used to read data from Cloud Spanner. It is useful to disable batching when the underlying query is not root-partitionable. :param host: Specifies the Cloud Spanner host. :param emulator_host: Specifies Spanner emulator host. :param timestamp_bound_mode: Defines how Cloud Spanner will choose a timestamp for a read-only transaction or a single read/query. Passed as TimestampBoundMode enum. Possible values: STRONG: A timestamp bound that will perform reads and queries at a timestamp where all previously committed transactions are visible. READ_TIMESTAMP: Returns a timestamp bound that will perform reads and queries at the given timestamp. MIN_READ_TIMESTAMP: Returns a timestamp bound that will perform reads and queries at a timestamp chosen to be at least given timestamp value. EXACT_STALENESS: Returns a timestamp bound that will perform reads and queries at an exact staleness. The timestamp is chosen soon after the read is started. MAX_STALENESS: Returns a timestamp bound that will perform reads and queries at a timestamp chosen to be at most time_unit stale. :param read_timestamp: Timestamp in string. Use only when timestamp_bound_mode is set to READ_TIMESTAMP or MIN_READ_TIMESTAMP. :param staleness: Staleness value as int. Use only when timestamp_bound_mode is set to EXACT_STALENESS or MAX_STALENESS. time_unit has to be set along with this param. :param time_unit: Time unit for staleness_value passed as TimeUnit enum. Possible values: NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, HOURS, DAYS. :param expansion_service: The address (host:port) of the ExpansionService. """ assert row_type assert sql or table and not (sql and table) staleness_value = int(staleness) if staleness else None if staleness_value or time_unit: assert staleness_value and time_unit and \ timestamp_bound_mode is TimestampBoundMode.MAX_STALENESS or \ timestamp_bound_mode is TimestampBoundMode.EXACT_STALENESS if read_timestamp: assert timestamp_bound_mode is TimestampBoundMode.MIN_READ_TIMESTAMP\ or timestamp_bound_mode is TimestampBoundMode.READ_TIMESTAMP super(ReadFromSpanner, self).__init__( self.URN, NamedTupleBasedPayloadBuilder( ReadFromSpannerSchema( instance_id=instance_id, database_id=database_id, sql=sql, table=table, schema=named_tuple_to_schema(row_type).SerializeToString(), project_id=project_id, host=host, emulator_host=emulator_host, batching=batching, timestamp_bound_mode=_get_enum_name(timestamp_bound_mode), read_timestamp=read_timestamp, staleness=staleness, time_unit=_get_enum_name(time_unit), ), ), expansion_service or default_io_expansion_service(), )
class WriteToSpannerSchema(NamedTuple): project_id: unicode instance_id: unicode database_id: unicode table: unicode max_batch_size_bytes: Optional[int] max_number_mutations: Optional[int] max_number_rows: Optional[int] grouping_factor: Optional[int] host: Optional[unicode] emulator_host: Optional[unicode] commit_deadline: Optional[int] max_cumulative_backoff: Optional[int] _CLASS_DOC = \ """ A PTransform which writes {operation} mutations to the specified Spanner table. This transform receives rows defined as NamedTuple. Example:: from typing import NamedTuple from apache_beam import coders class {row_type}(NamedTuple): id: int name: unicode coders.registry.register_coder({row_type}, coders.RowCoder) with Pipeline() as p: _ = ( p | 'Impulse' >> beam.Impulse() | 'Generate' >> beam.FlatMap(lambda x: range(num_rows)) | 'To row' >> beam.Map(lambda n: {row_type}(n, str(n)) .with_output_types({row_type}) | 'Write to Spanner' >> Spanner{operation_suffix}( instance_id='your_instance', database_id='existing_database', project_id='your_project_id', table='your_table')) Experimental; no backwards compatibility guarantees. """ _INIT_DOC = \ """ Initializes {operation} operation to a Spanner table. :param project_id: Specifies the Cloud Spanner project. :param instance_id: Specifies the Cloud Spanner instance. :param database_id: Specifies the Cloud Spanner database. :param table: Specifies the Cloud Spanner table. :param max_batch_size_bytes: Specifies the batch size limit (max number of bytes mutated per batch). Default value is 1048576 bytes = 1MB. :param max_number_mutations: Specifies the cell mutation limit (maximum number of mutated cells per batch). Default value is 5000. :param max_number_rows: Specifies the row mutation limit (maximum number of mutated rows per batch). Default value is 500. :param grouping_factor: Specifies the multiple of max mutation (in terms of both bytes per batch and cells per batch) that is used to select a set of mutations to sort by key for batching. This sort uses local memory on the workers, so using large values can cause out of memory errors. Default value is 1000. :param host: Specifies the Cloud Spanner host. :param emulator_host: Specifies Spanner emulator host. :param commit_deadline: Specifies the deadline for the Commit API call. Default is 15 secs. DEADLINE_EXCEEDED errors will prompt a backoff/retry until the value of commit_deadline is reached. DEADLINE_EXCEEDED errors are ar reported with logging and counters. Pass seconds as value. :param max_cumulative_backoff: Specifies the maximum cumulative backoff time when retrying after DEADLINE_EXCEEDED errors. Default is 900s (15min). If the mutations still have not been written after this time, they are treated as a failure, and handled according to the setting of failure_mode. Pass seconds as value. :param expansion_service: The address (host:port) of the ExpansionService. """ def _add_doc( value, operation=None, row_type=None, operation_suffix=None, ): def _doc(obj): obj.__doc__ = value.format( operation=operation, row_type=row_type, operation_suffix=operation_suffix, ) return obj return _doc
[docs]@_add_doc( _CLASS_DOC, operation='delete', row_type='ExampleKey', operation_suffix='Delete', ) class SpannerDelete(ExternalTransform): URN = 'beam:external:java:spanner:delete:v1' @_add_doc(_INIT_DOC, operation='a delete') def __init__( self, project_id, instance_id, database_id, table, max_batch_size_bytes=None, max_number_mutations=None, max_number_rows=None, grouping_factor=None, host=None, emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, expansion_service=None, ): max_cumulative_backoff = int( max_cumulative_backoff) if max_cumulative_backoff else None commit_deadline = int(commit_deadline) if commit_deadline else None super().__init__( self.URN, NamedTupleBasedPayloadBuilder( WriteToSpannerSchema( project_id=project_id, instance_id=instance_id, database_id=database_id, table=table, max_batch_size_bytes=max_batch_size_bytes, max_number_mutations=max_number_mutations, max_number_rows=max_number_rows, grouping_factor=grouping_factor, host=host, emulator_host=emulator_host, commit_deadline=commit_deadline, max_cumulative_backoff=max_cumulative_backoff, ), ), expansion_service=expansion_service or default_io_expansion_service(), )
[docs]@_add_doc( _CLASS_DOC, operation='insert', row_type='ExampleRow', operation_suffix='Insert', ) class SpannerInsert(ExternalTransform): URN = 'beam:external:java:spanner:insert:v1' @_add_doc(_INIT_DOC, operation='an insert') def __init__( self, project_id, instance_id, database_id, table, max_batch_size_bytes=None, max_number_mutations=None, max_number_rows=None, grouping_factor=None, host=None, emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, expansion_service=None, ): max_cumulative_backoff = int( max_cumulative_backoff) if max_cumulative_backoff else None commit_deadline = int(commit_deadline) if commit_deadline else None super().__init__( self.URN, NamedTupleBasedPayloadBuilder( WriteToSpannerSchema( project_id=project_id, instance_id=instance_id, database_id=database_id, table=table, max_batch_size_bytes=max_batch_size_bytes, max_number_mutations=max_number_mutations, max_number_rows=max_number_rows, grouping_factor=grouping_factor, host=host, emulator_host=emulator_host, commit_deadline=commit_deadline, max_cumulative_backoff=max_cumulative_backoff, ), ), expansion_service=expansion_service or default_io_expansion_service(), )
[docs]@_add_doc( _CLASS_DOC, operation='replace', row_type='ExampleRow', operation_suffix='Replace', ) class SpannerReplace(ExternalTransform): URN = 'beam:external:java:spanner:replace:v1' @_add_doc(_INIT_DOC, operation='a replace') def __init__( self, project_id, instance_id, database_id, table, max_batch_size_bytes=None, max_number_mutations=None, max_number_rows=None, grouping_factor=None, host=None, emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, expansion_service=None, ): max_cumulative_backoff = int( max_cumulative_backoff) if max_cumulative_backoff else None commit_deadline = int(commit_deadline) if commit_deadline else None super().__init__( self.URN, NamedTupleBasedPayloadBuilder( WriteToSpannerSchema( project_id=project_id, instance_id=instance_id, database_id=database_id, table=table, max_batch_size_bytes=max_batch_size_bytes, max_number_mutations=max_number_mutations, max_number_rows=max_number_rows, grouping_factor=grouping_factor, host=host, emulator_host=emulator_host, commit_deadline=commit_deadline, max_cumulative_backoff=max_cumulative_backoff, ), ), expansion_service=expansion_service or default_io_expansion_service(), )
[docs]@_add_doc( _CLASS_DOC, operation='insert-or-update', row_type='ExampleRow', operation_suffix='InsertOrUpdate', ) class SpannerInsertOrUpdate(ExternalTransform): URN = 'beam:external:java:spanner:insert_or_update:v1' @_add_doc(_INIT_DOC, operation='an insert-or-update') def __init__( self, project_id, instance_id, database_id, table, max_batch_size_bytes=None, max_number_mutations=None, max_number_rows=None, grouping_factor=None, host=None, emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, expansion_service=None, ): max_cumulative_backoff = int( max_cumulative_backoff) if max_cumulative_backoff else None commit_deadline = int(commit_deadline) if commit_deadline else None super().__init__( self.URN, NamedTupleBasedPayloadBuilder( WriteToSpannerSchema( project_id=project_id, instance_id=instance_id, database_id=database_id, table=table, max_batch_size_bytes=max_batch_size_bytes, max_number_mutations=max_number_mutations, max_number_rows=max_number_rows, grouping_factor=grouping_factor, host=host, emulator_host=emulator_host, commit_deadline=commit_deadline, max_cumulative_backoff=max_cumulative_backoff, ), ), expansion_service=expansion_service or default_io_expansion_service(), )
[docs]@_add_doc( _CLASS_DOC, operation='update', row_type='ExampleRow', operation_suffix='Update', ) class SpannerUpdate(ExternalTransform): URN = 'beam:external:java:spanner:update:v1' @_add_doc(_INIT_DOC, operation='an update') def __init__( self, project_id, instance_id, database_id, table, max_batch_size_bytes=None, max_number_mutations=None, max_number_rows=None, grouping_factor=None, host=None, emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, expansion_service=None, ): max_cumulative_backoff = int( max_cumulative_backoff) if max_cumulative_backoff else None commit_deadline = int(commit_deadline) if commit_deadline else None super().__init__( self.URN, NamedTupleBasedPayloadBuilder( WriteToSpannerSchema( project_id=project_id, instance_id=instance_id, database_id=database_id, table=table, max_batch_size_bytes=max_batch_size_bytes, max_number_mutations=max_number_mutations, max_number_rows=max_number_rows, grouping_factor=grouping_factor, host=host, emulator_host=emulator_host, commit_deadline=commit_deadline, max_cumulative_backoff=max_cumulative_backoff, ), ), expansion_service=expansion_service or default_io_expansion_service(), )
def _get_enum_name(enum): return None if enum is None else enum.name