#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Collection of useful coders.
Only those coders listed in __all__ are part of the public API of this module.
## On usage of `pickle`, `dill` and `pickler` in Beam
In Beam, we generally we use `pickle` for pipeline elements and `dill` for
more complex types, like user functions.
`pickler` is Beam's own wrapping of dill + compression + error handling.
It serves also as an API to mask the actual encoding layer (so we can
change it from `dill` if necessary).
We created `_MemoizingPickleCoder` to improve performance when serializing
complex user types for the execution of SDF. Specifically to address
BEAM-12781, where many identical `BoundedSource` instances are being
encoded.
"""
# pytype: skip-file
import base64
import pickle
from functools import lru_cache
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Type
from typing import TypeVar
from typing import overload
import google.protobuf.wrappers_pb2
import proto
from apache_beam.coders import coder_impl
from apache_beam.coders.avro_record import AvroRecord
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.typehints import typehints
from apache_beam.utils import proto_utils
if TYPE_CHECKING:
from google.protobuf import message # pylint: disable=ungrouped-imports
from apache_beam.coders.typecoders import CoderRegistry
from apache_beam.runners.pipeline_context import PipelineContext
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
from .stream import get_varint_size
except ImportError:
from .slow_stream import get_varint_size
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
# pylint: disable=wrong-import-order, wrong-import-position
# Avoid dependencies on the full SDK.
try:
# Import dill from the pickler module to make sure our monkey-patching of dill
# occurs.
from apache_beam.internal.dill_pickler import dill
except ImportError:
# We fall back to using the stock dill library in tests that don't use the
# full Python SDK.
import dill
__all__ = [
'Coder',
'AvroGenericCoder',
'BooleanCoder',
'BytesCoder',
'DillCoder',
'FastPrimitivesCoder',
'FloatCoder',
'IterableCoder',
'ListCoder',
'MapCoder',
'NullableCoder',
'PickleCoder',
'ProtoCoder',
'ProtoPlusCoder',
'ShardedKeyCoder',
'SingletonCoder',
'StrUtf8Coder',
'TimestampCoder',
'TupleCoder',
'TupleSequenceCoder',
'VarIntCoder',
'WindowedValueCoder',
'ParamWindowedValueCoder'
]
T = TypeVar('T')
CoderT = TypeVar('CoderT', bound='Coder')
ProtoCoderT = TypeVar('ProtoCoderT', bound='ProtoCoder')
ConstructorFn = Callable[[Optional[Any], List['Coder'], 'PipelineContext'], Any]
def serialize_coder(coder):
from apache_beam.internal import pickler
return b'%s$%s' % (
coder.__class__.__name__.encode('utf-8'),
pickler.dumps(coder, use_zlib=True))
def deserialize_coder(serialized):
from apache_beam.internal import pickler
return pickler.loads(serialized.split(b'$', 1)[1], use_zlib=True)
# pylint: enable=wrong-import-order, wrong-import-position
[docs]class Coder(object):
"""Base class for coders."""
[docs] def encode(self, value):
# type: (Any) -> bytes
"""Encodes the given object into a byte string."""
raise NotImplementedError('Encode not implemented: %s.' % self)
[docs] def decode(self, encoded):
"""Decodes the given byte string into the corresponding object."""
raise NotImplementedError('Decode not implemented: %s.' % self)
[docs] def encode_nested(self, value):
"""Uses the underlying implementation to encode in nested format."""
return self.get_impl().encode_nested(value)
[docs] def decode_nested(self, encoded):
"""Uses the underlying implementation to decode in nested format."""
return self.get_impl().decode_nested(encoded)
[docs] def is_deterministic(self):
# type: () -> bool
"""Whether this coder is guaranteed to encode values deterministically.
A deterministic coder is required for key coders in GroupByKey operations
to produce consistent results.
For example, note that the default coder, the PickleCoder, is not
deterministic: the ordering of picked entries in maps may vary across
executions since there is no defined order, and such a coder is not in
general suitable for usage as a key coder in GroupByKey operations, since
each instance of the same key may be encoded differently.
Returns:
Whether coder is deterministic.
"""
return False
[docs] def as_deterministic_coder(self, step_label, error_message=None):
"""Returns a deterministic version of self, if possible.
Otherwise raises a value error.
"""
if self.is_deterministic():
return self
else:
raise ValueError(
error_message or
"%s cannot be made deterministic for '%s'." % (self, step_label))
[docs] def estimate_size(self, value):
"""Estimates the encoded size of the given value, in bytes.
Dataflow estimates the encoded size of a PCollection processed in a pipeline
step by using the estimated size of a random sample of elements in that
PCollection.
The default implementation encodes the given value and returns its byte
size. If a coder can provide a fast estimate of the encoded size of a value
(e.g., if the encoding has a fixed size), it can provide its estimate here
to improve performance.
Arguments:
value: the value whose encoded size is to be estimated.
Returns:
The estimated encoded size of the given value.
"""
return len(self.encode(value))
# ===========================================================================
# Methods below are internal SDK details that don't need to be modified for
# user-defined coders.
# ===========================================================================
def _create_impl(self):
# type: () -> coder_impl.CoderImpl
"""Creates a CoderImpl to do the actual encoding and decoding.
"""
return coder_impl.CallbackCoderImpl(
self.encode, self.decode, self.estimate_size)
[docs] def get_impl(self):
"""For internal use only; no backwards-compatibility guarantees.
Returns the CoderImpl backing this Coder.
"""
if not hasattr(self, '_impl'):
self._impl = self._create_impl()
assert isinstance(self._impl, coder_impl.CoderImpl)
return self._impl
def __getstate__(self):
return self._dict_without_impl()
def _dict_without_impl(self):
if hasattr(self, '_impl'):
d = dict(self.__dict__)
del d['_impl']
return d
return self.__dict__
[docs] def to_type_hint(self):
raise NotImplementedError('BEAM-2717: %s' % self.__class__.__name__)
[docs] @classmethod
def from_type_hint(cls, unused_typehint, unused_registry):
# type: (Type[CoderT], Any, CoderRegistry) -> CoderT
# If not overridden, just construct the coder without arguments.
return cls()
[docs] def is_kv_coder(self):
# type: () -> bool
return False
[docs] def key_coder(self):
# type: () -> Coder
if self.is_kv_coder():
raise NotImplementedError('key_coder: %s' % self)
else:
raise ValueError('Not a KV coder: %s.' % self)
[docs] def value_coder(self):
# type: () -> Coder
if self.is_kv_coder():
raise NotImplementedError('value_coder: %s' % self)
else:
raise ValueError('Not a KV coder: %s.' % self)
def _get_component_coders(self):
# type: () -> Sequence[Coder]
"""For internal use only; no backwards-compatibility guarantees.
Returns the internal component coders of this coder."""
# This is an internal detail of the Coder API and does not need to be
# refined in user-defined Coders.
return []
[docs] def as_cloud_object(self, coders_context=None):
"""For internal use only; no backwards-compatibility guarantees.
Returns Google Cloud Dataflow API description of this coder."""
# This is an internal detail of the Coder API and does not need to be
# refined in user-defined Coders.
value = {
# We pass coders in the form "<coder_name>$<pickled_data>" to make the
# job description JSON more readable. Data before the $ is ignored by
# the worker.
'@type': serialize_coder(self),
'component_encodings': [
component.as_cloud_object(coders_context)
for component in self._get_component_coders()
],
}
if coders_context:
value['pipeline_proto_coder_id'] = coders_context.get_id(self)
return value
def __repr__(self):
return self.__class__.__name__
# pylint: disable=protected-access
def __eq__(self, other):
return (
self.__class__ == other.__class__ and
self._dict_without_impl() == other._dict_without_impl())
# pylint: enable=protected-access
def __hash__(self):
return hash(type(self))
_known_urns = {} # type: Dict[str, Tuple[type, ConstructorFn]]
@classmethod
@overload
def register_urn(
cls,
urn, # type: str
parameter_type, # type: Optional[Type[T]]
):
# type: (...) -> Callable[[Callable[[T, List[Coder], PipelineContext], Any]], Callable[[T, List[Coder], PipelineContext], Any]]
pass
@classmethod
@overload
def register_urn(
cls,
urn, # type: str
parameter_type, # type: Optional[Type[T]]
fn # type: Callable[[T, List[Coder], PipelineContext], Any]
):
# type: (...) -> None
pass
[docs] @classmethod
def register_urn(cls, urn, parameter_type, fn=None):
"""Registers a urn with a constructor.
For example, if 'beam:fn:foo' had parameter type FooPayload, one could
write `RunnerApiFn.register_urn('bean:fn:foo', FooPayload, foo_from_proto)`
where foo_from_proto took as arguments a FooPayload and a PipelineContext.
This function can also be used as a decorator rather than passing the
callable in as the final parameter.
A corresponding to_runner_api_parameter method would be expected that
returns the tuple ('beam:fn:foo', FooPayload)
"""
def register(fn):
cls._known_urns[urn] = parameter_type, fn
return fn
if fn:
# Used as a statement.
register(fn)
else:
# Used as a decorator.
return register
[docs] def to_runner_api(self, context):
# type: (PipelineContext) -> beam_runner_api_pb2.Coder
urn, typed_param, components = self.to_runner_api_parameter(context)
return beam_runner_api_pb2.Coder(
spec=beam_runner_api_pb2.FunctionSpec(
urn=urn,
payload=typed_param if isinstance(typed_param, (bytes, type(None)))
else typed_param.SerializeToString()),
component_coder_ids=[context.coders.get_id(c) for c in components])
[docs] @classmethod
def from_runner_api(cls, coder_proto, context):
# type: (Type[CoderT], beam_runner_api_pb2.Coder, PipelineContext) -> CoderT
"""Converts from an FunctionSpec to a Fn object.
Prefer registering a urn with its parameter type and constructor.
"""
parameter_type, constructor = cls._known_urns[coder_proto.spec.urn]
return constructor(
proto_utils.parse_Bytes(coder_proto.spec.payload, parameter_type),
[context.coders.get_by_id(c) for c in coder_proto.component_coder_ids],
context)
[docs] def to_runner_api_parameter(self, context):
# type: (Optional[PipelineContext]) -> Tuple[str, Any, Sequence[Coder]]
return (
python_urns.PICKLED_CODER,
google.protobuf.wrappers_pb2.BytesValue(value=serialize_coder(self)),
())
[docs] @staticmethod
def register_structured_urn(urn, cls):
# type: (str, Type[Coder]) -> None
"""Register a coder that's completely defined by its urn and its
component(s), if any, which are passed to construct the instance.
"""
setattr(
cls,
'to_runner_api_parameter',
lambda self,
unused_context: (urn, None, self._get_component_coders()))
# pylint: disable=unused-variable
@Coder.register_urn(urn, None)
def from_runner_api_parameter(unused_payload, components, unused_context):
if components:
return cls(*components)
else:
return cls()
@Coder.register_urn(
python_urns.PICKLED_CODER, google.protobuf.wrappers_pb2.BytesValue)
def _pickle_from_runner_api_parameter(payload, components, context):
return deserialize_coder(payload.value)
[docs]class StrUtf8Coder(Coder):
"""A coder used for reading and writing strings as UTF-8."""
[docs] def encode(self, value):
return value.encode('utf-8')
[docs] def decode(self, value):
return value.decode('utf-8')
[docs] def is_deterministic(self):
# type: () -> bool
return True
[docs] def to_type_hint(self):
return str
Coder.register_structured_urn(common_urns.coders.STRING_UTF8.urn, StrUtf8Coder)
class ToBytesCoder(Coder):
"""A default string coder used if no sink coder is specified."""
def encode(self, value):
return value if isinstance(value, bytes) else str(value).encode('utf-8')
def decode(self, _):
raise NotImplementedError('ToBytesCoder cannot be used for decoding.')
def is_deterministic(self):
# type: () -> bool
return True
# alias to the old class name for a courtesy to users who reference it
ToStringCoder = ToBytesCoder
class FastCoder(Coder):
"""Coder subclass used when a (faster) CoderImpl is supplied directly.
The Coder class defines _create_impl in terms of encode() and decode();
this class inverts that by defining encode() and decode() in terms of
_create_impl().
"""
def encode(self, value):
"""Encodes the given object into a byte string."""
return self.get_impl().encode(value)
def decode(self, encoded):
"""Decodes the given byte string into the corresponding object."""
return self.get_impl().decode(encoded)
def estimate_size(self, value):
return self.get_impl().estimate_size(value)
def _create_impl(self):
raise NotImplementedError
[docs]class BytesCoder(FastCoder):
"""Byte string coder."""
def _create_impl(self):
return coder_impl.BytesCoderImpl()
[docs] def is_deterministic(self):
# type: () -> bool
return True
[docs] def to_type_hint(self):
return bytes
[docs] def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:bytes',
}
def __eq__(self, other):
return type(self) == type(other)
def __hash__(self):
return hash(type(self))
Coder.register_structured_urn(common_urns.coders.BYTES.urn, BytesCoder)
[docs]class BooleanCoder(FastCoder):
def _create_impl(self):
return coder_impl.BooleanCoderImpl()
[docs] def is_deterministic(self):
# type: () -> bool
return True
[docs] def to_type_hint(self):
return bool
def __eq__(self, other):
return type(self) == type(other)
def __hash__(self):
return hash(type(self))
Coder.register_structured_urn(common_urns.coders.BOOL.urn, BooleanCoder)
[docs]class MapCoder(FastCoder):
def __init__(self, key_coder, value_coder):
# type: (Coder, Coder) -> None
self._key_coder = key_coder
self._value_coder = value_coder
def _create_impl(self):
return coder_impl.MapCoderImpl(
self._key_coder.get_impl(), self._value_coder.get_impl())
[docs] @classmethod
def from_type_hint(cls, typehint, registry):
# type: (typehints.DictConstraint, CoderRegistry) -> MapCoder
return cls(
registry.get_coder(typehint.key_type),
registry.get_coder(typehint.value_type))
[docs] def to_type_hint(self):
return typehints.Dict[self._key_coder.to_type_hint(),
self._value_coder.to_type_hint()]
[docs] def is_deterministic(self):
# type: () -> bool
# Map ordering is non-deterministic
return False
def __eq__(self, other):
return (
type(self) == type(other) and self._key_coder == other._key_coder and
self._value_coder == other._value_coder)
def __hash__(self):
return hash(type(self)) + hash(self._key_coder) + hash(self._value_coder)
def __repr__(self):
return 'MapCoder[%s, %s]' % (self._key_coder, self._value_coder)
[docs]class NullableCoder(FastCoder):
def __init__(self, value_coder):
# type: (Coder) -> None
self._value_coder = value_coder
def _create_impl(self):
return coder_impl.NullableCoderImpl(self._value_coder.get_impl())
[docs] def to_type_hint(self):
return typehints.Optional[self._value_coder.to_type_hint()]
[docs] def is_deterministic(self):
# type: () -> bool
return self._value_coder.is_deterministic()
def __eq__(self, other):
return (
type(self) == type(other) and self._value_coder == other._value_coder)
def __hash__(self):
return hash(type(self)) + hash(self._value_coder)
[docs]class VarIntCoder(FastCoder):
"""Variable-length integer coder."""
def _create_impl(self):
return coder_impl.VarIntCoderImpl()
[docs] def is_deterministic(self):
# type: () -> bool
return True
[docs] def to_type_hint(self):
return int
[docs] def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:varint',
}
def __eq__(self, other):
return type(self) == type(other)
def __hash__(self):
return hash(type(self))
Coder.register_structured_urn(common_urns.coders.VARINT.urn, VarIntCoder)
[docs]class FloatCoder(FastCoder):
"""A coder used for floating-point values."""
def _create_impl(self):
return coder_impl.FloatCoderImpl()
[docs] def is_deterministic(self):
# type: () -> bool
return True
[docs] def to_type_hint(self):
return float
def __eq__(self, other):
return type(self) == type(other)
def __hash__(self):
return hash(type(self))
Coder.register_structured_urn(common_urns.coders.DOUBLE.urn, FloatCoder)
[docs]class TimestampCoder(FastCoder):
"""A coder used for timeutil.Timestamp values."""
def _create_impl(self):
return coder_impl.TimestampCoderImpl()
[docs] def is_deterministic(self):
# type: () -> bool
return True
def __eq__(self, other):
return type(self) == type(other)
def __hash__(self):
return hash(type(self))
class _TimerCoder(FastCoder):
"""A coder used for timer values.
For internal use."""
def __init__(self, key_coder, window_coder):
# type: (Coder, Coder) -> None
self._key_coder = key_coder
self._window_coder = window_coder
def _get_component_coders(self):
# type: () -> List[Coder]
return [self._key_coder, self._window_coder]
def _create_impl(self):
return coder_impl.TimerCoderImpl(
self._key_coder.get_impl(), self._window_coder.get_impl())
def is_deterministic(self):
# type: () -> bool
return (
self._key_coder.is_deterministic() and
self._window_coder.is_deterministic())
def __eq__(self, other):
return (
type(self) == type(other) and self._key_coder == other._key_coder and
self._window_coder == other._window_coder)
def __hash__(self):
return hash(type(self)) + hash(self._key_coder) + hash(self._window_coder)
Coder.register_structured_urn(common_urns.coders.TIMER.urn, _TimerCoder)
[docs]class SingletonCoder(FastCoder):
"""A coder that always encodes exactly one value."""
def __init__(self, value):
self._value = value
def _create_impl(self):
return coder_impl.SingletonCoderImpl(self._value)
[docs] def is_deterministic(self):
# type: () -> bool
return True
def __eq__(self, other):
return type(self) == type(other) and self._value == other._value
def __hash__(self):
return hash(self._value)
def maybe_dill_dumps(o):
"""Pickle using cPickle or the Dill pickler as a fallback."""
# We need to use the dill pickler for objects of certain custom classes,
# including, for example, ones that contain lambdas.
try:
return pickle.dumps(o, pickle.HIGHEST_PROTOCOL)
except Exception: # pylint: disable=broad-except
return dill.dumps(o)
def maybe_dill_loads(o):
"""Unpickle using cPickle or the Dill pickler as a fallback."""
try:
return pickle.loads(o)
except Exception: # pylint: disable=broad-except
return dill.loads(o)
class _PickleCoderBase(FastCoder):
"""Base class for pickling coders."""
def is_deterministic(self):
# type: () -> bool
# Note that the default coder, the PickleCoder, is not deterministic (for
# example, the ordering of picked entries in maps may vary across
# executions), and so is not in general suitable for usage as a key coder in
# GroupByKey operations.
return False
def as_cloud_object(self, coders_context=None, is_pair_like=True):
value = super().as_cloud_object(coders_context)
# We currently use this coder in places where we cannot infer the coder to
# use for the value type in a more granular way. In places where the
# service expects a pair, it checks for the "is_pair_like" key, in which
# case we would fail without the hack below.
if is_pair_like:
value['is_pair_like'] = True
value['component_encodings'] = [
self.as_cloud_object(coders_context, is_pair_like=False),
self.as_cloud_object(coders_context, is_pair_like=False)
]
return value
# We allow .key_coder() and .value_coder() to be called on PickleCoder since
# we can't always infer the return values of lambdas in ParDo operations, the
# result of which may be used in a GroupBykey.
def is_kv_coder(self):
# type: () -> bool
return True
def key_coder(self):
return self
def value_coder(self):
return self
def __eq__(self, other):
return type(self) == type(other)
def __hash__(self):
return hash(type(self))
class _MemoizingPickleCoder(_PickleCoderBase):
"""Coder using Python's pickle functionality with memoization."""
def __init__(self, cache_size=16):
super().__init__()
self.cache_size = cache_size
def _create_impl(self):
from apache_beam.internal import pickler
dumps = pickler.dumps
mdumps = lru_cache(maxsize=self.cache_size, typed=True)(dumps)
def _nonhashable_dumps(x):
try:
return mdumps(x)
except TypeError:
return dumps(x)
return coder_impl.CallbackCoderImpl(_nonhashable_dumps, pickler.loads)
def as_deterministic_coder(self, step_label, error_message=None):
return FastPrimitivesCoder(self, requires_deterministic=step_label)
def to_type_hint(self):
return Any
[docs]class PickleCoder(_PickleCoderBase):
"""Coder using Python's pickle functionality."""
def _create_impl(self):
dumps = pickle.dumps
protocol = pickle.HIGHEST_PROTOCOL
return coder_impl.CallbackCoderImpl(
lambda x: dumps(x, protocol), pickle.loads)
[docs] def as_deterministic_coder(self, step_label, error_message=None):
return FastPrimitivesCoder(self, requires_deterministic=step_label)
[docs] def to_type_hint(self):
return Any
[docs]class DillCoder(_PickleCoderBase):
"""Coder using dill's pickle functionality."""
def _create_impl(self):
return coder_impl.CallbackCoderImpl(maybe_dill_dumps, maybe_dill_loads)
class DeterministicFastPrimitivesCoder(FastCoder):
"""Throws runtime errors when encoding non-deterministic values."""
def __init__(self, coder, step_label):
self._underlying_coder = coder
self._step_label = step_label
def _create_impl(self):
return coder_impl.FastPrimitivesCoderImpl(
self._underlying_coder.get_impl(),
requires_deterministic_step_label=self._step_label)
def is_deterministic(self):
# type: () -> bool
return True
def is_kv_coder(self):
# type: () -> bool
return True
def key_coder(self):
return self
def value_coder(self):
return self
def to_type_hint(self):
return Any
[docs]class FastPrimitivesCoder(FastCoder):
"""Encodes simple primitives (e.g. str, int) efficiently.
For unknown types, falls back to another coder (e.g. PickleCoder).
"""
def __init__(self, fallback_coder=PickleCoder()):
# type: (Coder) -> None
self._fallback_coder = fallback_coder
def _create_impl(self):
return coder_impl.FastPrimitivesCoderImpl(self._fallback_coder.get_impl())
[docs] def is_deterministic(self):
# type: () -> bool
return self._fallback_coder.is_deterministic()
[docs] def as_deterministic_coder(self, step_label, error_message=None):
if self.is_deterministic():
return self
else:
return DeterministicFastPrimitivesCoder(self, step_label)
[docs] def to_type_hint(self):
return Any
[docs] def as_cloud_object(self, coders_context=None, is_pair_like=True):
value = super().as_cloud_object(coders_context)
# We currently use this coder in places where we cannot infer the coder to
# use for the value type in a more granular way. In places where the
# service expects a pair, it checks for the "is_pair_like" key, in which
# case we would fail without the hack below.
if is_pair_like:
value['is_pair_like'] = True
value['component_encodings'] = [
self.as_cloud_object(coders_context, is_pair_like=False),
self.as_cloud_object(coders_context, is_pair_like=False)
]
return value
# We allow .key_coder() and .value_coder() to be called on FastPrimitivesCoder
# since we can't always infer the return values of lambdas in ParDo
# operations, the result of which may be used in a GroupBykey.
[docs] def is_kv_coder(self):
# type: () -> bool
return True
[docs] def key_coder(self):
return self
[docs] def value_coder(self):
return self
def __eq__(self, other):
return type(self) == type(other)
def __hash__(self):
return hash(type(self))
class FakeDeterministicFastPrimitivesCoder(FastPrimitivesCoder):
"""A FastPrimitivesCoder that claims to be deterministic.
This can be registered as a fallback coder to go back to the behavior before
deterministic encoding was enforced (BEAM-11719).
"""
def is_deterministic(self):
return True
class Base64PickleCoder(Coder):
"""Coder of objects by Python pickle, then base64 encoding."""
# TODO(robertwb): Do base64 encoding where it's needed (e.g. in json) rather
# than via a special Coder.
def encode(self, value):
return base64.b64encode(pickle.dumps(value, pickle.HIGHEST_PROTOCOL))
def decode(self, encoded):
return pickle.loads(base64.b64decode(encoded))
def is_deterministic(self):
# type: () -> bool
# Note that the Base64PickleCoder is not deterministic. See the
# corresponding comments for PickleCoder above.
return False
# We allow .key_coder() and .value_coder() to be called on Base64PickleCoder
# since we can't always infer the return values of lambdas in ParDo
# operations, the result of which may be used in a GroupBykey.
#
# TODO(ccy): this is currently only used for KV values from Create transforms.
# Investigate a way to unify this with PickleCoder.
def is_kv_coder(self):
# type: () -> bool
return True
def key_coder(self):
return self
def value_coder(self):
return self
[docs]class ProtoCoder(FastCoder):
"""A Coder for Google Protocol Buffers.
It supports both Protocol Buffers syntax versions 2 and 3. However,
the runtime version of the python protobuf library must exactly match the
version of the protoc compiler what was used to generate the protobuf
messages.
ProtoCoder is registered in the global CoderRegistry as the default coder for
any protobuf Message object.
"""
def __init__(self, proto_message_type):
# type: (Type[google.protobuf.message.Message]) -> None
self.proto_message_type = proto_message_type
def _create_impl(self):
return coder_impl.ProtoCoderImpl(self.proto_message_type)
[docs] def is_deterministic(self):
# type: () -> bool
# TODO(vikasrk): A proto message can be deterministic if it does not contain
# a Map.
return False
[docs] def as_deterministic_coder(self, step_label, error_message=None):
return DeterministicProtoCoder(self.proto_message_type)
def __eq__(self, other):
return (
type(self) == type(other) and
self.proto_message_type == other.proto_message_type)
def __hash__(self):
return hash(self.proto_message_type)
[docs] @classmethod
def from_type_hint(cls, typehint, unused_registry):
if issubclass(typehint, proto_utils.message_types):
return cls(typehint)
else:
raise ValueError((
'Expected a subclass of google.protobuf.message.Message'
', but got a %s' % typehint))
[docs] def to_type_hint(self):
return self.proto_message_type
class DeterministicProtoCoder(ProtoCoder):
"""A deterministic Coder for Google Protocol Buffers.
It supports both Protocol Buffers syntax versions 2 and 3. However,
the runtime version of the python protobuf library must exactly match the
version of the protoc compiler what was used to generate the protobuf
messages.
"""
def _create_impl(self):
return coder_impl.DeterministicProtoCoderImpl(self.proto_message_type)
def is_deterministic(self):
# type: () -> bool
return True
def as_deterministic_coder(self, step_label, error_message=None):
return self
[docs]class ProtoPlusCoder(FastCoder):
"""A Coder for Google Protocol Buffers wrapped using the proto-plus library.
ProtoPlusCoder is registered in the global CoderRegistry as the default coder
for any proto.Message object.
"""
def __init__(self, proto_plus_message_type):
# type: (Type[proto.Message]) -> None
self.proto_plus_message_type = proto_plus_message_type
def _create_impl(self):
return coder_impl.ProtoPlusCoderImpl(self.proto_plus_message_type)
[docs] def is_deterministic(self):
return True
def __eq__(self, other):
return (
type(self) == type(other) and
self.proto_plus_message_type == other.proto_plus_message_type)
def __hash__(self):
return hash(self.proto_plus_message_type)
[docs] @classmethod
def from_type_hint(cls, typehint, unused_registry):
if issubclass(typehint, proto.Message):
return cls(typehint)
else:
raise ValueError(
'Expected a subclass of proto.Message, but got a %s' % typehint)
[docs] def to_type_hint(self):
return self.proto_plus_message_type
AVRO_GENERIC_CODER_URN = "beam:coder:avro:generic:v1"
[docs]class AvroGenericCoder(FastCoder):
"""A coder used for AvroRecord values."""
def __init__(self, schema):
self.schema = schema
def _create_impl(self):
return coder_impl.AvroCoderImpl(self.schema)
[docs] def is_deterministic(self):
# TODO(BEAM-7903): need to confirm if it's deterministic
return False
def __eq__(self, other):
return type(self) == type(other) and self.schema == other.schema
def __hash__(self):
return hash(self.schema)
[docs] def to_type_hint(self):
return AvroRecord
[docs] def to_runner_api_parameter(self, context):
return AVRO_GENERIC_CODER_URN, self.schema.encode('utf-8'), ()
[docs] @staticmethod
@Coder.register_urn(AVRO_GENERIC_CODER_URN, bytes)
def from_runner_api_parameter(payload, unused_components, unused_context):
return AvroGenericCoder(payload.decode('utf-8'))
[docs]class TupleCoder(FastCoder):
"""Coder of tuple objects."""
def __init__(self, components):
# type: (Iterable[Coder]) -> None
self._coders = tuple(components)
def _create_impl(self):
return coder_impl.TupleCoderImpl([c.get_impl() for c in self._coders])
[docs] def is_deterministic(self):
# type: () -> bool
return all(c.is_deterministic() for c in self._coders)
[docs] def as_deterministic_coder(self, step_label, error_message=None):
if self.is_deterministic():
return self
else:
return TupleCoder([
c.as_deterministic_coder(step_label, error_message)
for c in self._coders
])
[docs] def to_type_hint(self):
return typehints.Tuple[tuple(c.to_type_hint() for c in self._coders)]
[docs] @classmethod
def from_type_hint(cls, typehint, registry):
# type: (typehints.TupleConstraint, CoderRegistry) -> TupleCoder
return cls([registry.get_coder(t) for t in typehint.tuple_types])
[docs] def as_cloud_object(self, coders_context=None):
if self.is_kv_coder():
return {
'@type': 'kind:pair',
'is_pair_like': True,
'component_encodings': [
component.as_cloud_object(coders_context)
for component in self._get_component_coders()
],
}
return super().as_cloud_object(coders_context)
def _get_component_coders(self):
# type: () -> Tuple[Coder, ...]
return self.coders()
[docs] def coders(self):
# type: () -> Tuple[Coder, ...]
return self._coders
[docs] def is_kv_coder(self):
# type: () -> bool
return len(self._coders) == 2
[docs] def key_coder(self):
# type: () -> Coder
if len(self._coders) != 2:
raise ValueError('TupleCoder does not have exactly 2 components.')
return self._coders[0]
[docs] def value_coder(self):
# type: () -> Coder
if len(self._coders) != 2:
raise ValueError('TupleCoder does not have exactly 2 components.')
return self._coders[1]
def __repr__(self):
return 'TupleCoder[%s]' % ', '.join(str(c) for c in self._coders)
def __eq__(self, other):
return type(self) == type(other) and self._coders == other.coders()
def __hash__(self):
return hash(self._coders)
[docs] def to_runner_api_parameter(self, context):
if self.is_kv_coder():
return common_urns.coders.KV.urn, None, self.coders()
else:
return python_urns.TUPLE_CODER, None, self.coders()
[docs] @staticmethod
@Coder.register_urn(common_urns.coders.KV.urn, None)
@Coder.register_urn(python_urns.TUPLE_CODER, None)
def from_runner_api_parameter(unused_payload, components, unused_context):
return TupleCoder(components)
[docs]class TupleSequenceCoder(FastCoder):
"""Coder of homogeneous tuple objects."""
def __init__(self, elem_coder):
# type: (Coder) -> None
self._elem_coder = elem_coder
[docs] def value_coder(self):
return self._elem_coder
def _create_impl(self):
return coder_impl.TupleSequenceCoderImpl(self._elem_coder.get_impl())
[docs] def is_deterministic(self):
# type: () -> bool
return self._elem_coder.is_deterministic()
[docs] def as_deterministic_coder(self, step_label, error_message=None):
if self.is_deterministic():
return self
else:
return TupleSequenceCoder(
self._elem_coder.as_deterministic_coder(step_label, error_message))
[docs] @classmethod
def from_type_hint(cls, typehint, registry):
# type: (Any, CoderRegistry) -> TupleSequenceCoder
return cls(registry.get_coder(typehint.inner_type))
def _get_component_coders(self):
# type: () -> Tuple[Coder, ...]
return (self._elem_coder, )
def __repr__(self):
return 'TupleSequenceCoder[%r]' % self._elem_coder
def __eq__(self, other):
return (
type(self) == type(other) and self._elem_coder == other.value_coder())
def __hash__(self):
return hash((type(self), self._elem_coder))
class ListLikeCoder(FastCoder):
"""Coder of iterables of homogeneous objects."""
def __init__(self, elem_coder):
# type: (Coder) -> None
self._elem_coder = elem_coder
def _create_impl(self):
return coder_impl.IterableCoderImpl(self._elem_coder.get_impl())
def is_deterministic(self):
# type: () -> bool
return self._elem_coder.is_deterministic()
def as_deterministic_coder(self, step_label, error_message=None):
if self.is_deterministic():
return self
else:
return type(self)(
self._elem_coder.as_deterministic_coder(step_label, error_message))
def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:stream',
'is_stream_like': True,
'component_encodings': [
self._elem_coder.as_cloud_object(coders_context)
],
}
def value_coder(self):
return self._elem_coder
@classmethod
def from_type_hint(cls, typehint, registry):
# type: (Any, CoderRegistry) -> ListLikeCoder
return cls(registry.get_coder(typehint.inner_type))
def _get_component_coders(self):
# type: () -> Tuple[Coder, ...]
return (self._elem_coder, )
def __repr__(self):
return '%s[%r]' % (self.__class__.__name__, self._elem_coder)
def __eq__(self, other):
return (
type(self) == type(other) and self._elem_coder == other.value_coder())
def __hash__(self):
return hash((type(self), self._elem_coder))
[docs]class IterableCoder(ListLikeCoder):
"""Coder of iterables of homogeneous objects."""
[docs] def to_type_hint(self):
return typehints.Iterable[self._elem_coder.to_type_hint()]
Coder.register_structured_urn(common_urns.coders.ITERABLE.urn, IterableCoder)
[docs]class ListCoder(ListLikeCoder):
"""Coder of Python lists."""
[docs] def to_type_hint(self):
return typehints.List[self._elem_coder.to_type_hint()]
def _create_impl(self):
return coder_impl.ListCoderImpl(self._elem_coder.get_impl())
class GlobalWindowCoder(SingletonCoder):
"""Coder for global windows."""
def __init__(self):
from apache_beam.transforms import window
super().__init__(window.GlobalWindow())
def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:global_window',
}
Coder.register_structured_urn(
common_urns.coders.GLOBAL_WINDOW.urn, GlobalWindowCoder)
class IntervalWindowCoder(FastCoder):
"""Coder for an window defined by a start timestamp and a duration."""
def _create_impl(self):
return coder_impl.IntervalWindowCoderImpl()
def is_deterministic(self):
# type: () -> bool
return True
def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:interval_window',
}
def __eq__(self, other):
return type(self) == type(other)
def __hash__(self):
return hash(type(self))
Coder.register_structured_urn(
common_urns.coders.INTERVAL_WINDOW.urn, IntervalWindowCoder)
[docs]class WindowedValueCoder(FastCoder):
"""Coder for windowed values."""
def __init__(self, wrapped_value_coder, window_coder=None):
# type: (Coder, Optional[Coder]) -> None
if not window_coder:
window_coder = PickleCoder()
self.wrapped_value_coder = wrapped_value_coder
self.timestamp_coder = TimestampCoder()
self.window_coder = window_coder
def _create_impl(self):
return coder_impl.WindowedValueCoderImpl(
self.wrapped_value_coder.get_impl(),
self.timestamp_coder.get_impl(),
self.window_coder.get_impl())
[docs] def is_deterministic(self):
# type: () -> bool
return all(
c.is_deterministic() for c in
[self.wrapped_value_coder, self.timestamp_coder, self.window_coder])
[docs] def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:windowed_value',
'is_wrapper': True,
'component_encodings': [
component.as_cloud_object(coders_context)
for component in self._get_component_coders()
],
}
def _get_component_coders(self):
# type: () -> List[Coder]
return [self.wrapped_value_coder, self.window_coder]
[docs] def is_kv_coder(self):
# type: () -> bool
return self.wrapped_value_coder.is_kv_coder()
[docs] def key_coder(self):
# type: () -> Coder
return self.wrapped_value_coder.key_coder()
[docs] def value_coder(self):
# type: () -> Coder
return self.wrapped_value_coder.value_coder()
def __repr__(self):
return 'WindowedValueCoder[%s]' % self.wrapped_value_coder
def __eq__(self, other):
return (
type(self) == type(other) and
self.wrapped_value_coder == other.wrapped_value_coder and
self.timestamp_coder == other.timestamp_coder and
self.window_coder == other.window_coder)
def __hash__(self):
return hash(
(self.wrapped_value_coder, self.timestamp_coder, self.window_coder))
Coder.register_structured_urn(
common_urns.coders.WINDOWED_VALUE.urn, WindowedValueCoder)
[docs]class ParamWindowedValueCoder(WindowedValueCoder):
"""A coder used for parameterized windowed values."""
def __init__(self, payload, components):
super().__init__(components[0], components[1])
self.payload = payload
def _create_impl(self):
return coder_impl.ParamWindowedValueCoderImpl(
self.wrapped_value_coder.get_impl(),
self.window_coder.get_impl(),
self.payload)
[docs] def is_deterministic(self):
# type: () -> bool
return self.wrapped_value_coder.is_deterministic()
[docs] def as_cloud_object(self, coders_context=None):
raise NotImplementedError(
"as_cloud_object not supported for ParamWindowedValueCoder")
def __repr__(self):
return 'ParamWindowedValueCoder[%s]' % self.wrapped_value_coder
def __eq__(self, other):
return (
type(self) == type(other) and
self.wrapped_value_coder == other.wrapped_value_coder and
self.window_coder == other.window_coder and
self.payload == other.payload)
def __hash__(self):
return hash((self.wrapped_value_coder, self.window_coder, self.payload))
[docs] @staticmethod
@Coder.register_urn(common_urns.coders.PARAM_WINDOWED_VALUE.urn, bytes)
def from_runner_api_parameter(payload, components, unused_context):
return ParamWindowedValueCoder(payload, components)
[docs] def to_runner_api_parameter(self, context):
return (
common_urns.coders.PARAM_WINDOWED_VALUE.urn,
self.payload, (self.wrapped_value_coder, self.window_coder))
class LengthPrefixCoder(FastCoder):
"""For internal use only; no backwards-compatibility guarantees.
Coder which prefixes the length of the encoded object in the stream."""
def __init__(self, value_coder):
# type: (Coder) -> None
self._value_coder = value_coder
def _create_impl(self):
return coder_impl.LengthPrefixCoderImpl(self._value_coder.get_impl())
def is_deterministic(self):
# type: () -> bool
return self._value_coder.is_deterministic()
def estimate_size(self, value):
value_size = self._value_coder.estimate_size(value)
return get_varint_size(value_size) + value_size
def value_coder(self):
return self._value_coder
def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:length_prefix',
'component_encodings': [
self._value_coder.as_cloud_object(coders_context)
],
}
def _get_component_coders(self):
# type: () -> Tuple[Coder, ...]
return (self._value_coder, )
def __repr__(self):
return 'LengthPrefixCoder[%r]' % self._value_coder
def __eq__(self, other):
return (
type(self) == type(other) and self._value_coder == other._value_coder)
def __hash__(self):
return hash((type(self), self._value_coder))
Coder.register_structured_urn(
common_urns.coders.LENGTH_PREFIX.urn, LengthPrefixCoder)
class StateBackedIterableCoder(FastCoder):
DEFAULT_WRITE_THRESHOLD = 1
def __init__(
self,
element_coder, # type: Coder
read_state=None, # type: Optional[coder_impl.IterableStateReader]
write_state=None, # type: Optional[coder_impl.IterableStateWriter]
write_state_threshold=DEFAULT_WRITE_THRESHOLD):
self._element_coder = element_coder
self._read_state = read_state
self._write_state = write_state
self._write_state_threshold = write_state_threshold
def _create_impl(self):
return coder_impl.IterableCoderImpl(
self._element_coder.get_impl(),
self._read_state,
self._write_state,
self._write_state_threshold)
def is_deterministic(self):
# type: () -> bool
return False
def _get_component_coders(self):
# type: () -> Tuple[Coder, ...]
return (self._element_coder, )
def __repr__(self):
return 'StateBackedIterableCoder[%r]' % self._element_coder
def __eq__(self, other):
return (
type(self) == type(other) and
self._element_coder == other._element_coder and
self._write_state_threshold == other._write_state_threshold)
def __hash__(self):
return hash((type(self), self._element_coder, self._write_state_threshold))
def to_runner_api_parameter(self, context):
# type: (Optional[PipelineContext]) -> Tuple[str, Any, Sequence[Coder]]
return (
common_urns.coders.STATE_BACKED_ITERABLE.urn,
str(self._write_state_threshold).encode('ascii'),
self._get_component_coders())
@staticmethod
@Coder.register_urn(common_urns.coders.STATE_BACKED_ITERABLE.urn, bytes)
def from_runner_api_parameter(payload, components, context):
return StateBackedIterableCoder(
components[0],
read_state=context.iterable_state_read,
write_state=context.iterable_state_write,
write_state_threshold=int(payload)
if payload else StateBackedIterableCoder.DEFAULT_WRITE_THRESHOLD)
[docs]class ShardedKeyCoder(FastCoder):
"""A coder for sharded key."""
def __init__(self, key_coder):
# type: (Coder) -> None
self._key_coder = key_coder
def _get_component_coders(self):
# type: () -> List[Coder]
return [self._key_coder]
def _create_impl(self):
return coder_impl.ShardedKeyCoderImpl(self._key_coder.get_impl())
[docs] def is_deterministic(self):
# type: () -> bool
return self._key_coder.is_deterministic()
[docs] def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:sharded_key',
'component_encodings': [
self._key_coder.as_cloud_object(coders_context)
],
}
[docs] def to_type_hint(self):
from apache_beam.typehints import sharded_key_type
return sharded_key_type.ShardedKeyTypeConstraint(
self._key_coder.to_type_hint())
[docs] @classmethod
def from_type_hint(cls, typehint, registry):
from apache_beam.typehints import sharded_key_type
if isinstance(typehint, sharded_key_type.ShardedKeyTypeConstraint):
return cls(registry.get_coder(typehint.key_type))
else:
raise ValueError((
'Expected an instance of ShardedKeyTypeConstraint'
', but got a %s' % typehint))
def __eq__(self, other):
return type(self) == type(other) and self._key_coder == other._key_coder
def __hash__(self):
return hash(type(self)) + hash(self._key_coder)
def __repr__(self):
return 'ShardedKeyCoder[%s]' % self._key_coder
Coder.register_structured_urn(
common_urns.coders.SHARDED_KEY.urn, ShardedKeyCoder)
class TimestampPrefixingWindowCoder(FastCoder):
"""For internal use only; no backwards-compatibility guarantees.
Coder which prefixes the max timestamp of arbitrary window to its encoded
form."""
def __init__(self, window_coder: Coder) -> None:
self._window_coder = window_coder
def _create_impl(self):
return coder_impl.TimestampPrefixingWindowCoderImpl(
self._window_coder.get_impl())
def to_type_hint(self):
return self._window_coder.to_type_hint()
def _get_component_coders(self) -> List[Coder]:
return [self._window_coder]
def is_deterministic(self) -> bool:
return self._window_coder.is_deterministic()
def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:custom_window',
'component_encodings': [
self._window_coder.as_cloud_object(coders_context)
],
}
def __repr__(self):
return 'TimestampPrefixingWindowCoder[%r]' % self._window_coder
def __eq__(self, other):
return (
type(self) == type(other) and self._window_coder == other._window_coder)
def __hash__(self):
return hash((type(self), self._window_coder))
Coder.register_structured_urn(
common_urns.coders.CUSTOM_WINDOW.urn, TimestampPrefixingWindowCoder)