#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
""" Support for mapping python types to proto Schemas and back again.
Imposes a mapping between common Python types and Beam portable schemas
(https://s.apache.org/beam-schemas)::
Python Schema
np.int8 <-----> BYTE
np.int16 <-----> INT16
np.int32 <-----> INT32
np.int64 <-----> INT64
int ------> INT64
np.float32 <-----> FLOAT
np.float64 <-----> DOUBLE
float ------> DOUBLE
bool <-----> BOOLEAN
str <-----> STRING
bytes <-----> BYTES
ByteString ------> BYTES
Timestamp <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
Decimal <-----> LogicalType(urn="beam:logical_type:fixed_decimal:v1")
Mapping <-----> MapType
Sequence <-----> ArrayType
NamedTuple <-----> RowType
beam.Row ------> RowType
One direction mapping of Python types from Beam portable schemas:
bytes
<------ LogicalType(urn="beam:logical_type:fixed_bytes:v1")
<------ LogicalType(urn="beam:logical_type:var_bytes:v1")
str
<------ LogicalType(urn="beam:logical_type:fixed_char:v1")
<------ LogicalType(urn="beam:logical_type:var_char:v1")
Timestamp
<------ LogicalType(urn="beam:logical_type:millis_instant:v1")
Note that some of these mappings are provided as conveniences,
but they are lossy and will not survive a roundtrip from python to Beam schemas
and back. For example, the Python type :code:`int` will map to :code:`INT64` in
Beam schemas but converting that back to a Python type will yield
:code:`np.int64`.
:code:`nullable=True` on a Beam :code:`FieldType` is represented in Python by
wrapping the type in :code:`Optional`.
This module is intended for internal use only. Nothing defined here provides
any backwards-compatibility guarantee.
"""
# pytype: skip-file
import decimal
import logging
from typing import Any
from typing import ByteString
from typing import Dict
from typing import Generic
from typing import Iterable
from typing import List
from typing import Mapping
from typing import NamedTuple
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import TypeVar
from typing import Union
import numpy as np
from google.protobuf import text_format
from apache_beam.portability import common_urns
from apache_beam.portability.api import schema_pb2
from apache_beam.typehints import row_type
from apache_beam.typehints import typehints
from apache_beam.typehints.native_type_compatibility import _get_args
from apache_beam.typehints.native_type_compatibility import _match_is_exactly_mapping
from apache_beam.typehints.native_type_compatibility import _match_is_optional
from apache_beam.typehints.native_type_compatibility import _safe_issubclass
from apache_beam.typehints.native_type_compatibility import convert_to_typing_type
from apache_beam.typehints.native_type_compatibility import extract_optional_type
from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
from apache_beam.typehints.schema_registry import SCHEMA_REGISTRY
from apache_beam.typehints.schema_registry import SchemaTypeRegistry
from apache_beam.utils import proto_utils
from apache_beam.utils.python_callable import PythonCallableWithSource
from apache_beam.utils.timestamp import Timestamp
PYTHON_ANY_URN = "beam:logical:pythonsdk_any:v1"
# Bi-directional mappings
_PRIMITIVES = (
(np.int8, schema_pb2.BYTE),
(np.int16, schema_pb2.INT16),
(np.int32, schema_pb2.INT32),
(np.int64, schema_pb2.INT64),
(np.float32, schema_pb2.FLOAT),
(np.float64, schema_pb2.DOUBLE),
(str, schema_pb2.STRING),
(bool, schema_pb2.BOOLEAN),
(bytes, schema_pb2.BYTES),
)
PRIMITIVE_TO_ATOMIC_TYPE = dict((typ, atomic) for typ, atomic in _PRIMITIVES)
ATOMIC_TYPE_TO_PRIMITIVE = dict((atomic, typ) for typ, atomic in _PRIMITIVES)
# One-way mappings
PRIMITIVE_TO_ATOMIC_TYPE.update({
ByteString: schema_pb2.BYTES,
# Allow users to specify a native int, and use INT64 as the cross-language
# representation. Technically ints have unlimited precision, but RowCoder
# should throw an error if it sees one with a bit width > 64 when encoding.
int: schema_pb2.INT64,
float: schema_pb2.DOUBLE,
})
_LOGGER = logging.getLogger(__name__)
# Serialized schema_pb2.Schema w/o id to id.
_SCHEMA_ID_CACHE = {}
[docs]
def named_fields_to_schema(
names_and_types: Union[Dict[str, type], Sequence[Tuple[str, type]]],
schema_id: Optional[str] = None,
schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY,
):
schema_options = schema_options or []
field_options = field_options or {}
if isinstance(names_and_types, dict):
names_and_types = names_and_types.items()
schema = schema_pb2.Schema(
fields=[
schema_pb2.Field(
name=name,
type=typing_to_runner_api(type),
options=[
option_to_runner_api(option_tuple)
for option_tuple in field_options.get(name, [])
],
) for (name, type) in names_and_types
],
options=[
option_to_runner_api(option_tuple) for option_tuple in schema_options
])
if schema_id is None:
key = schema.SerializeToString()
if key not in _SCHEMA_ID_CACHE:
_SCHEMA_ID_CACHE[key] = schema_registry.generate_new_id()
schema_id = _SCHEMA_ID_CACHE[key]
schema.id = schema_id
return schema
[docs]
def named_fields_from_schema(
schema): # (schema_pb2.Schema) -> typing.List[typing.Tuple[str, type]]
return [(field.name, typing_from_runner_api(field.type))
for field in schema.fields]
[docs]
def typing_to_runner_api(
type_: type,
schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY
) -> schema_pb2.FieldType:
return SchemaTranslation(
schema_registry=schema_registry).typing_to_runner_api(type_)
[docs]
def typing_from_runner_api(
fieldtype_proto: schema_pb2.FieldType,
schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY) -> type:
return SchemaTranslation(
schema_registry=schema_registry).typing_from_runner_api(fieldtype_proto)
[docs]
def value_to_runner_api(
type_proto: schema_pb2.FieldType,
value,
schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY
) -> schema_pb2.FieldValue:
return SchemaTranslation(schema_registry=schema_registry).value_to_runner_api(
type_proto, value)
[docs]
def value_from_runner_api(
type_proto: schema_pb2.FieldType,
value_proto: schema_pb2.FieldValue,
schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY
) -> schema_pb2.FieldValue:
return SchemaTranslation(
schema_registry=schema_registry).value_from_runner_api(
type_proto, value_proto)
[docs]
def option_to_runner_api(
option: Tuple[str, Any],
schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY) -> schema_pb2.Option:
return SchemaTranslation(
schema_registry=schema_registry).option_to_runner_api(option)
[docs]
def option_from_runner_api(
option_proto: schema_pb2.Option,
schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY) -> Tuple[str, Any]:
return SchemaTranslation(
schema_registry=schema_registry).option_from_runner_api(option_proto)
[docs]
def schema_field(
name: str,
field_type: Union[schema_pb2.FieldType, type],
description: Optional[str] = None) -> schema_pb2.Field:
return schema_pb2.Field(
name=name,
type=field_type if isinstance(field_type, schema_pb2.FieldType) else
typing_to_runner_api(field_type),
description=description)
[docs]
class SchemaTranslation(object):
def __init__(self, schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY):
self.schema_registry = schema_registry
[docs]
def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
if isinstance(type_, schema_pb2.Schema):
return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
if hasattr(type_, '_beam_schema_proto') and type_._beam_schema_proto.obj:
return schema_pb2.FieldType(
row_type=schema_pb2.RowType(schema=type_._beam_schema_proto.obj))
if isinstance(type_, row_type.RowTypeConstraint):
if type_.schema_id is None:
schema_id = SCHEMA_REGISTRY.generate_new_id()
type_.set_schema_id(schema_id)
schema = None
else:
schema_id = type_.schema_id
schema = self.schema_registry.get_schema_by_id(schema_id)
if schema is None:
# Either user_type was not annotated with a schema id, or there was
# no schema in the registry with the id. The latter should only happen
# in tests.
# Either way, we need to generate a new schema proto.
schema = schema_pb2.Schema(
fields=[
schema_pb2.Field(
name=field_name,
type=self.typing_to_runner_api(field_type),
options=[
self.option_to_runner_api(option_tuple)
for option_tuple in type_.field_options(field_name)
],
description=type_._field_descriptions.get(field_name, None),
) for (field_name, field_type) in type_._fields
],
id=schema_id,
options=[
self.option_to_runner_api(option_tuple)
for option_tuple in type_.schema_options
],
)
self.schema_registry.add(type_.user_type, schema)
return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
else:
# See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
# dataclass)
row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
if row_type_constraint is not None:
return self.typing_to_runner_api(row_type_constraint)
if isinstance(type_, typehints.TypeConstraint):
type_ = convert_to_typing_type(type_)
# All concrete types (other than NamedTuple sub-classes) should map to
# a supported primitive type.
if type_ in PRIMITIVE_TO_ATOMIC_TYPE:
return schema_pb2.FieldType(atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[type_])
elif _match_is_exactly_mapping(type_):
key_type, value_type = map(self.typing_to_runner_api, _get_args(type_))
return schema_pb2.FieldType(
map_type=schema_pb2.MapType(key_type=key_type, value_type=value_type))
elif _match_is_optional(type_):
# It's possible that a user passes us Optional[Optional[T]], but in python
# typing this is indistinguishable from Optional[T] - both resolve to
# Union[T, None] - so there's no need to check for that case here.
result = self.typing_to_runner_api(extract_optional_type(type_))
result.nullable = True
return result
elif type_ == range:
return schema_pb2.FieldType(
array_type=schema_pb2.ArrayType(
element_type=schema_pb2.FieldType(
atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[int])))
elif _safe_issubclass(type_, Sequence) and not _safe_issubclass(type_, str):
element_type = self.typing_to_runner_api(_get_args(type_)[0])
return schema_pb2.FieldType(
array_type=schema_pb2.ArrayType(element_type=element_type))
elif _safe_issubclass(type_, Mapping):
key_type, value_type = map(self.typing_to_runner_api, _get_args(type_))
return schema_pb2.FieldType(
map_type=schema_pb2.MapType(key_type=key_type, value_type=value_type))
elif _safe_issubclass(type_, Iterable) and not _safe_issubclass(type_, str):
element_type = self.typing_to_runner_api(_get_args(type_)[0])
return schema_pb2.FieldType(
array_type=schema_pb2.ArrayType(element_type=element_type))
try:
logical_type = LogicalType.from_typing(type_)
except ValueError:
# Unknown type, just treat it like Any
return schema_pb2.FieldType(
logical_type=schema_pb2.LogicalType(urn=PYTHON_ANY_URN),
nullable=True)
else:
argument_type = None
argument = None
if logical_type.argument_type() is not None:
argument_type = self.typing_to_runner_api(logical_type.argument_type())
try:
argument = self.value_to_runner_api(
argument_type, logical_type.argument())
except ValueError:
# TODO(https://github.com/apache/beam/issues/23373): Complete support
# for logical types that require arguments beyond atomic type.
# For now, skip arguments.
argument = None
return schema_pb2.FieldType(
logical_type=schema_pb2.LogicalType(
urn=logical_type.urn(),
representation=self.typing_to_runner_api(
logical_type.representation_type()),
argument_type=argument_type,
argument=argument))
[docs]
def atomic_value_from_runner_api(
self,
atomic_type: schema_pb2.AtomicType,
atomic_value: schema_pb2.AtomicTypeValue):
if atomic_type == schema_pb2.BYTE:
value = np.int8(atomic_value.byte)
elif atomic_type == schema_pb2.INT16:
value = np.int16(atomic_value.int16)
elif atomic_type == schema_pb2.INT32:
value = np.int32(atomic_value.int32)
elif atomic_type == schema_pb2.INT64:
value = np.int64(atomic_value.int64)
elif atomic_type == schema_pb2.FLOAT:
value = np.float32(atomic_value.float)
elif atomic_type == schema_pb2.DOUBLE:
value = np.float64(atomic_value.double)
elif atomic_type == schema_pb2.STRING:
value = atomic_value.string
elif atomic_type == schema_pb2.BOOLEAN:
value = atomic_value.boolean
elif atomic_type == schema_pb2.BYTES:
value = atomic_value.bytes
else:
raise ValueError(
f"Unrecognized atomic_type ({atomic_type}) "
f"when decoding value {atomic_value!r}")
return value
[docs]
def atomic_value_to_runner_api(
self, atomic_type: schema_pb2.AtomicType,
value) -> schema_pb2.AtomicTypeValue:
if atomic_type == schema_pb2.BYTE:
atomic_value = schema_pb2.AtomicTypeValue(byte=value)
elif atomic_type == schema_pb2.INT16:
atomic_value = schema_pb2.AtomicTypeValue(int16=value)
elif atomic_type == schema_pb2.INT32:
atomic_value = schema_pb2.AtomicTypeValue(int32=value)
elif atomic_type == schema_pb2.INT64:
atomic_value = schema_pb2.AtomicTypeValue(int64=value)
elif atomic_type == schema_pb2.FLOAT:
atomic_value = schema_pb2.AtomicTypeValue(float=value)
elif atomic_type == schema_pb2.DOUBLE:
atomic_value = schema_pb2.AtomicTypeValue(double=value)
elif atomic_type == schema_pb2.STRING:
atomic_value = schema_pb2.AtomicTypeValue(string=value)
elif atomic_type == schema_pb2.BOOLEAN:
atomic_value = schema_pb2.AtomicTypeValue(boolean=value)
elif atomic_type == schema_pb2.BYTES:
atomic_value = schema_pb2.AtomicTypeValue(bytes=value)
else:
raise ValueError(
"Unrecognized atomic_type {atomic_type} when encoding value {value}")
return atomic_value
[docs]
def value_from_runner_api(
self,
type_proto: schema_pb2.FieldType,
value_proto: schema_pb2.FieldValue):
if type_proto.WhichOneof("type_info") != "atomic_type":
# TODO: Allow other value types
raise ValueError(
"Encounterd option with unsupported type. Only "
f"atomic_type options are supported: {type_proto}")
value = self.atomic_value_from_runner_api(
type_proto.atomic_type, value_proto.atomic_value)
return value
[docs]
def value_to_runner_api(self, typing_proto: schema_pb2.FieldType, value):
if typing_proto.WhichOneof("type_info") != "atomic_type":
# TODO: Allow other value types
raise ValueError(
"Only atomic_type option values are currently supported in Python. "
f"Got {value!r}, which maps to fieldtype {typing_proto!r}.")
atomic_value = self.atomic_value_to_runner_api(
typing_proto.atomic_type, value)
value_proto = schema_pb2.FieldValue(atomic_value=atomic_value)
return value_proto
[docs]
def option_from_runner_api(
self, option_proto: schema_pb2.Option) -> Tuple[str, Any]:
if not option_proto.HasField('type'):
return option_proto.name, None
value = self.value_from_runner_api(option_proto.type, option_proto.value)
return option_proto.name, value
[docs]
def option_to_runner_api(self, option: Tuple[str, Any]) -> schema_pb2.Option:
name, value = option
if value is None:
# a value of None indicates the option is just a flag.
# Don't set type, value
return schema_pb2.Option(name=name)
type_proto = self.typing_to_runner_api(type(value))
value_proto = self.value_to_runner_api(type_proto, value)
return schema_pb2.Option(name=name, type=type_proto, value=value_proto)
[docs]
def typing_from_runner_api(
self, fieldtype_proto: schema_pb2.FieldType) -> type:
if fieldtype_proto.nullable:
# In order to determine the inner type, create a copy of fieldtype_proto
# with nullable=False and pass back to typing_from_runner_api
base_type = schema_pb2.FieldType()
base_type.CopyFrom(fieldtype_proto)
base_type.nullable = False
base = self.typing_from_runner_api(base_type)
if base == Any:
return base
else:
return Optional[base]
type_info = fieldtype_proto.WhichOneof("type_info")
if type_info == "atomic_type":
try:
return ATOMIC_TYPE_TO_PRIMITIVE[fieldtype_proto.atomic_type]
except KeyError:
raise ValueError(
"Unsupported atomic type: {0}".format(fieldtype_proto.atomic_type))
elif type_info == "array_type":
return Sequence[self.typing_from_runner_api(
fieldtype_proto.array_type.element_type)]
elif type_info == "map_type":
return Mapping[
self.typing_from_runner_api(fieldtype_proto.map_type.key_type),
self.typing_from_runner_api(fieldtype_proto.map_type.value_type)]
elif type_info == "row_type":
schema = fieldtype_proto.row_type.schema
schema_options = [
self.option_from_runner_api(option_proto)
for option_proto in schema.options
]
field_options = {
field.name: [
self.option_from_runner_api(option_proto)
for option_proto in field.options
]
for field in schema.fields if field.options
}
# First look for user type in the registry
user_type = self.schema_registry.get_typing_by_id(schema.id)
if user_type is None:
# If not in SDK options (the coder likely came from another SDK),
# generate a NamedTuple type to use.
fields = named_fields_from_schema(schema)
descriptions = {
field.name: field.description
for field in schema.fields
}
result = row_type.RowTypeConstraint.from_fields(
fields=fields,
schema_id=schema.id,
schema_options=schema_options,
field_options=field_options,
schema_registry=self.schema_registry,
field_descriptions=descriptions or None)
return result
else:
return row_type.RowTypeConstraint.from_user_type(
user_type,
schema_options=schema_options,
field_options=field_options)
elif type_info == "logical_type":
if fieldtype_proto.logical_type.urn == PYTHON_ANY_URN:
return Any
else:
return LogicalType.from_runner_api(
fieldtype_proto.logical_type).language_type()
else:
raise ValueError(f"Unrecognized type_info: {type_info!r}")
[docs]
def named_tuple_from_schema(self, schema: schema_pb2.Schema) -> type:
from apache_beam import coders
type_name = 'BeamSchema_{}'.format(schema.id.replace('-', '_'))
subfields = []
descriptions = {}
for field in schema.fields:
try:
field_py_type = self.typing_from_runner_api(field.type)
if isinstance(field_py_type, row_type.RowTypeConstraint):
field_py_type = field_py_type.user_type
except ValueError as e:
raise ValueError(
"Failed to decode schema due to an issue with Field proto:\n\n"
f"{text_format.MessageToString(field)}") from e
descriptions[field.name] = field.description
subfields.append((field.name, field_py_type))
user_type = NamedTuple(type_name, subfields)
# Define a reduce function, otherwise these types can't be pickled
# (See BEAM-9574)
setattr(
user_type,
'__reduce__',
_named_tuple_reduce_method(schema.SerializeToString()))
setattr(user_type, "_field_descriptions", descriptions)
setattr(user_type, row_type._BEAM_SCHEMA_ID, schema.id)
self.schema_registry.add(user_type, schema)
coders.registry.register_coder(user_type, coders.RowCoder)
return user_type
def _named_tuple_reduce_method(serialized_schema):
def __reduce__(self):
return _hydrate_namedtuple_instance, (serialized_schema, tuple(self))
return __reduce__
def _hydrate_namedtuple_instance(encoded_schema, values):
return named_tuple_from_schema(
proto_utils.parse_Bytes(encoded_schema, schema_pb2.Schema))(*values)
[docs]
def named_tuple_from_schema(
schema, schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY) -> type:
return SchemaTranslation(
schema_registry=schema_registry).named_tuple_from_schema(schema)
[docs]
def named_tuple_to_schema(
named_tuple,
schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY) -> schema_pb2.Schema:
return typing_to_runner_api(named_tuple, schema_registry).row_type.schema
[docs]
def schema_from_element_type(element_type: type) -> schema_pb2.Schema:
"""Get a schema for the given PCollection element_type.
Returns schema as a list of (name, python_type) tuples"""
if isinstance(element_type, row_type.RowTypeConstraint):
return named_fields_to_schema(element_type._fields)
elif match_is_named_tuple(element_type):
return named_tuple_to_schema(element_type)
else:
raise TypeError(
f"Could not determine schema for type hint {element_type!r}. Did you "
"mean to create a schema-aware PCollection? See "
"https://s.apache.org/beam-python-schemas")
[docs]
def named_fields_from_element_type(
element_type: type) -> List[Tuple[str, type]]:
return named_fields_from_schema(schema_from_element_type(element_type))
[docs]
def union_schema_type(element_types):
"""Returns a schema whose fields are the union of each corresponding field.
element_types must be a set of schema-aware types whose fields have the
same naming and ordering.
"""
union_fields_and_types = []
for field in zip(*[named_fields_from_element_type(t) for t in element_types]):
names, types = zip(*field)
name_set = set(names)
if len(name_set) != 1:
raise TypeError(
f"Could not determine schema for type hints {element_types!r}: "
f"Inconsistent names: {name_set}")
union_fields_and_types.append(
(next(iter(name_set)), typehints.Union[types]))
return named_tuple_from_schema(named_fields_to_schema(union_fields_and_types))
class _Ephemeral:
"""Helper class for wrapping unpicklable objects."""
def __init__(self, obj):
self.obj = obj
def __reduce__(self):
return _Ephemeral, (None, )
# Registry of typings for a schema by UUID
[docs]
class LogicalTypeRegistry(object):
def __init__(self):
self.by_urn = {}
self.by_logical_type = {}
self.by_language_type = {}
[docs]
def add(self, urn, logical_type):
self.by_urn[urn] = logical_type
self.by_logical_type[logical_type] = urn
self.by_language_type[logical_type.language_type()] = logical_type
[docs]
def get_logical_type_by_urn(self, urn):
return self.by_urn.get(urn, None)
[docs]
def get_urn_by_logial_type(self, logical_type):
return self.by_logical_type.get(logical_type, None)
[docs]
def get_logical_type_by_language_type(self, representation_type):
return self.by_language_type.get(representation_type, None)
[docs]
def copy(self):
copy = LogicalTypeRegistry()
copy.by_urn.update(self.by_urn)
copy.by_logical_type.update(self.by_logical_type)
copy.by_language_type.update(self.by_language_type)
return copy
LanguageT = TypeVar('LanguageT')
RepresentationT = TypeVar('RepresentationT')
ArgT = TypeVar('ArgT')
[docs]
class LogicalType(Generic[LanguageT, RepresentationT, ArgT]):
_known_logical_types = LogicalTypeRegistry()
[docs]
@classmethod
def urn(cls):
# type: () -> str
"""Return the URN used to identify this logical type"""
raise NotImplementedError()
[docs]
@classmethod
def language_type(cls):
# type: () -> type
"""Return the language type this LogicalType encodes.
The returned type should match LanguageT"""
raise NotImplementedError()
[docs]
@classmethod
def representation_type(cls):
# type: () -> type
"""Return the type of the representation this LogicalType uses to encode the
language type.
The returned type should match RepresentationT"""
raise NotImplementedError()
[docs]
@classmethod
def argument_type(cls):
# type: () -> type
"""Return the type of the argument used for variations of this LogicalType.
The returned type should match ArgT"""
raise NotImplementedError(cls)
[docs]
def argument(self):
# type: () -> ArgT
"""Return the argument for this instance of the LogicalType."""
raise NotImplementedError()
[docs]
def to_representation_type(self, value):
# type: (LanguageT) -> RepresentationT
"""Convert an instance of LanguageT to RepresentationT."""
raise NotImplementedError()
[docs]
def to_language_type(self, value):
# type: (RepresentationT) -> LanguageT
"""Convert an instance of RepresentationT to LanguageT."""
raise NotImplementedError()
[docs]
@classmethod
def register_logical_type(cls, logical_type_cls):
"""Register an implementation of LogicalType."""
cls._known_logical_types.add(logical_type_cls.urn(), logical_type_cls)
return logical_type_cls
[docs]
@classmethod
def from_typing(cls, typ):
# type: (type) -> LogicalType
"""Construct an instance of a registered LogicalType implementation given a
typing.
Raises ValueError if no registered LogicalType implementation can encode the
given typing."""
logical_type = cls._known_logical_types.get_logical_type_by_language_type(
typ)
if logical_type is None:
raise ValueError("No logical type registered for typing '%s'" % typ)
return logical_type._from_typing(typ)
@classmethod
def _from_typing(cls, typ):
# type: (type) -> LogicalType
"""Construct an instance of this LogicalType implementation given a typing.
"""
raise NotImplementedError()
[docs]
@classmethod
def from_runner_api(cls, logical_type_proto):
# type: (schema_pb2.LogicalType) -> LogicalType
"""Construct an instance of a registered LogicalType implementation given a
proto LogicalType.
Raises ValueError if no LogicalType registered for the given URN.
"""
logical_type = cls._known_logical_types.get_logical_type_by_urn(
logical_type_proto.urn)
if logical_type is None:
raise ValueError(
"No logical type registered for URN '%s'" % logical_type_proto.urn)
if not logical_type_proto.HasField(
"argument_type") or not logical_type_proto.HasField("argument"):
# logical type_proto without argument
return logical_type()
else:
try:
argument = value_from_runner_api(
logical_type_proto.argument_type, logical_type_proto.argument)
except ValueError:
# TODO(https://github.com/apache/beam/issues/23373): Complete support
# for logical types that require arguments beyond atomic type.
# For now, skip arguments.
_LOGGER.warning(
'Logical type %s with argument is currently unsupported. '
'Argument values are omitted',
logical_type_proto.urn)
return logical_type()
return logical_type(argument)
[docs]
class NoArgumentLogicalType(LogicalType[LanguageT, RepresentationT, None]):
[docs]
@classmethod
def argument_type(cls):
# type: () -> type
return None
[docs]
def argument(self):
# type: () -> ArgT
return None
@classmethod
def _from_typing(cls, typ):
# type: (type) -> LogicalType
# Since there's no argument, there can be no additional information encoded
# in the typing. Just construct an instance.
return cls()
[docs]
class PassThroughLogicalType(LogicalType[LanguageT, LanguageT, ArgT]):
"""A base class for LogicalTypes that use the same type as the underlying
representation type.
"""
[docs]
def to_language_type(self, value):
return value
[docs]
@classmethod
def representation_type(cls):
# type: () -> type
return cls.language_type()
[docs]
def to_representation_type(self, value):
return value
@classmethod
def _from_typing(cls, typ):
# type: (type) -> LogicalType
# TODO(https://github.com/apache/beam/issues/23373): enable argument
return cls()
MicrosInstantRepresentation = NamedTuple(
'MicrosInstantRepresentation', [('seconds', np.int64),
('micros', np.int64)])
[docs]
@LogicalType.register_logical_type
class MillisInstant(NoArgumentLogicalType[Timestamp, np.int64]):
"""Millisecond-precision instant logical type handles values consistent with
that encoded by ``InstantCoder`` in the Java SDK.
This class handles :class:`apache_beam.utils.timestamp.Timestamp` language
type as :class:`MicrosInstant`, but it only provides millisecond precision,
because it is aimed to handle data encoded by Java sdk's InstantCoder which
has same precision level.
Timestamp is handled by `MicrosInstant` by default. In some scenario, such as
read from cross-language transform with rows containing InstantCoder encoded
timestamps, one may need to override the mapping of Timetamp to MillisInstant.
To do this, re-register this class with
:func:`~LogicalType.register_logical_type`.
"""
[docs]
@classmethod
def representation_type(cls):
# type: () -> type
return np.int64
[docs]
@classmethod
def urn(cls):
return common_urns.millis_instant.urn
[docs]
@classmethod
def language_type(cls):
return Timestamp
[docs]
def to_language_type(self, value):
# type: (np.int64) -> Timestamp
# value shifted as in apache_beams.coders.coder_impl.TimestampCoderImpl
if value < 0:
millis = int(value) + (1 << 63)
else:
millis = int(value) - (1 << 63)
return Timestamp(micros=millis * 1000)
# Make sure MicrosInstant is registered after MillisInstant so that it
# overwrites the mapping of Timestamp language type representation choice and
# thus does not lose microsecond precision inside python sdk.
[docs]
@LogicalType.register_logical_type
class MicrosInstant(NoArgumentLogicalType[Timestamp,
MicrosInstantRepresentation]):
"""Microsecond-precision instant logical type that handles ``Timestamp``."""
[docs]
@classmethod
def urn(cls):
return common_urns.micros_instant.urn
[docs]
@classmethod
def representation_type(cls):
# type: () -> type
return MicrosInstantRepresentation
[docs]
@classmethod
def language_type(cls):
return Timestamp
[docs]
def to_representation_type(self, value):
# type: (Timestamp) -> MicrosInstantRepresentation
return MicrosInstantRepresentation(
value.micros // 1000000, value.micros % 1000000)
[docs]
def to_language_type(self, value):
# type: (MicrosInstantRepresentation) -> Timestamp
return Timestamp(seconds=int(value.seconds), micros=int(value.micros))
[docs]
@LogicalType.register_logical_type
class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]):
"""A logical type for PythonCallableSource objects."""
[docs]
@classmethod
def urn(cls):
return common_urns.python_callable.urn
[docs]
@classmethod
def representation_type(cls):
# type: () -> type
return str
[docs]
@classmethod
def language_type(cls):
return PythonCallableWithSource
[docs]
def to_representation_type(self, value):
# type: (PythonCallableWithSource) -> str
return value.get_source()
[docs]
def to_language_type(self, value):
# type: (str) -> PythonCallableWithSource
return PythonCallableWithSource(value)
FixedPrecisionDecimalArgumentRepresentation = NamedTuple(
'FixedPrecisionDecimalArgumentRepresentation', [('precision', np.int32),
('scale', np.int32)])
[docs]
class DecimalLogicalType(NoArgumentLogicalType[decimal.Decimal, bytes]):
"""A logical type for decimal objects handling values consistent with that
encoded by ``BigDecimalCoder`` in the Java SDK.
"""
[docs]
@classmethod
def urn(cls):
return common_urns.decimal.urn
[docs]
@classmethod
def representation_type(cls):
# type: () -> type
return bytes
[docs]
@classmethod
def language_type(cls):
return decimal.Decimal
[docs]
def to_representation_type(self, value):
# type: (decimal.Decimal) -> bytes
return str(value).encode()
[docs]
def to_language_type(self, value):
# type: (bytes) -> decimal.Decimal
return decimal.Decimal(value.decode())
[docs]
@LogicalType.register_logical_type
class FixedPrecisionDecimalLogicalType(
LogicalType[decimal.Decimal,
DecimalLogicalType,
FixedPrecisionDecimalArgumentRepresentation]):
"""A wrapper of DecimalLogicalType that contains the precision value.
"""
def __init__(self, precision=-1, scale=0):
self.precision = precision
self.scale = scale
[docs]
@classmethod
def urn(cls):
# TODO(https://github.com/apache/beam/issues/23373) promote this URN to
# schema.proto once logical types with argument are fully supported and the
# implementation of this logical type can thus be considered standardized.
return "beam:logical_type:fixed_decimal:v1"
[docs]
@classmethod
def representation_type(cls):
# type: () -> type
return DecimalLogicalType
[docs]
@classmethod
def language_type(cls):
return decimal.Decimal
[docs]
def to_representation_type(self, value):
# type: (decimal.Decimal) -> bytes
return DecimalLogicalType().to_representation_type(value)
[docs]
def to_language_type(self, value):
# type: (bytes) -> decimal.Decimal
return DecimalLogicalType().to_language_type(value)
[docs]
@classmethod
def argument_type(cls):
return FixedPrecisionDecimalArgumentRepresentation
[docs]
def argument(self):
return FixedPrecisionDecimalArgumentRepresentation(
precision=self.precision, scale=self.scale)
@classmethod
def _from_typing(cls, typ):
return cls()
# TODO(yathu,BEAM-10722): Investigate and resolve conflicts in logical type
# registration when more than one logical types sharing the same language type
LogicalType.register_logical_type(DecimalLogicalType)
[docs]
@LogicalType.register_logical_type
class FixedBytes(PassThroughLogicalType[bytes, np.int32]):
"""A logical type for fixed-length bytes."""
[docs]
@classmethod
def urn(cls):
return common_urns.fixed_bytes.urn
def __init__(self, length: np.int32):
self.length = length
[docs]
@classmethod
def language_type(cls) -> type:
return bytes
[docs]
def to_language_type(self, value: bytes):
length = len(value)
if length > self.length:
raise ValueError(
"value length {} > allowed length {}".format(length, self.length))
elif length < self.length:
# padding at the end
value = value + b'\0' * (self.length - length)
return value
[docs]
@classmethod
def argument_type(cls):
return np.int32
[docs]
def argument(self):
return self.length
[docs]
@LogicalType.register_logical_type
class VariableBytes(PassThroughLogicalType[bytes, np.int32]):
"""A logical type for variable-length bytes with specified maximum length."""
[docs]
@classmethod
def urn(cls):
return common_urns.var_bytes.urn
def __init__(self, max_length: np.int32 = np.iinfo(np.int32).max):
self.max_length = max_length
[docs]
@classmethod
def language_type(cls) -> type:
return bytes
[docs]
def to_language_type(self, value: bytes):
length = len(value)
if length > self.max_length:
raise ValueError(
"value length {} > allowed length {}".format(length, self.max_length))
return value
[docs]
@classmethod
def argument_type(cls):
return np.int32
[docs]
def argument(self):
return self.max_length
[docs]
@LogicalType.register_logical_type
class FixedString(PassThroughLogicalType[str, np.int32]):
"""A logical type for fixed-length string."""
[docs]
@classmethod
def urn(cls):
return common_urns.fixed_char.urn
def __init__(self, length: np.int32):
self.length = length
[docs]
@classmethod
def language_type(cls) -> type:
return str
[docs]
def to_language_type(self, value: str):
length = len(value)
if length > self.length:
raise ValueError(
"value length {} > allowed length {}".format(length, self.length))
elif length < self.length:
# padding at the end
value = value + ' ' * (self.length - length)
return value
[docs]
@classmethod
def argument_type(cls):
return np.int32
[docs]
def argument(self):
return self.length
[docs]
@LogicalType.register_logical_type
class VariableString(PassThroughLogicalType[str, np.int32]):
"""A logical type for variable-length string with specified maximum length."""
[docs]
@classmethod
def urn(cls):
return common_urns.var_char.urn
def __init__(self, max_length: np.int32 = np.iinfo(np.int32).max):
self.max_length = max_length
[docs]
@classmethod
def language_type(cls) -> type:
return str
[docs]
def to_language_type(self, value: str):
length = len(value)
if length > self.max_length:
raise ValueError(
"value length {} > allowed length {}".format(length, self.max_length))
return value
[docs]
@classmethod
def argument_type(cls):
return np.int32
[docs]
def argument(self):
return self.max_length