#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Coder implementations.
The actual encode/decode implementations are split off from coders to
allow conditional (compiled/pure) implementations, which can be used to
encode many elements with minimal overhead.
This module may be optionally compiled with Cython, using the corresponding
coder_impl.pxd file for type hints.
For internal use only; no backwards-compatibility guarantees.
"""
from types import NoneType
from apache_beam.coders import observable
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils import windowed_value
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
from stream import InputStream as create_InputStream
from stream import OutputStream as create_OutputStream
from stream import ByteCountingOutputStream
from stream import get_varint_size
globals()['create_InputStream'] = create_InputStream
globals()['create_OutputStream'] = create_OutputStream
globals()['ByteCountingOutputStream'] = ByteCountingOutputStream
except ImportError:
from slow_stream import InputStream as create_InputStream
from slow_stream import OutputStream as create_OutputStream
from slow_stream import ByteCountingOutputStream
from slow_stream import get_varint_size
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
[docs]class CoderImpl(object):
"""For internal use only; no backwards-compatibility guarantees."""
[docs] def encode_to_stream(self, value, stream, nested):
"""Reads object from potentially-nested encoding in stream."""
raise NotImplementedError
[docs] def decode_from_stream(self, stream, nested):
"""Reads object from potentially-nested encoding in stream."""
raise NotImplementedError
[docs] def encode(self, value):
"""Encodes an object to an unnested string."""
raise NotImplementedError
[docs] def decode(self, encoded):
"""Decodes an object to an unnested string."""
raise NotImplementedError
[docs] def estimate_size(self, value, nested=False):
"""Estimates the encoded size of the given value, in bytes."""
return self._get_nested_size(len(self.encode(value)), nested)
def _get_nested_size(self, inner_size, nested):
if not nested:
return inner_size
varint_size = get_varint_size(inner_size)
return varint_size + inner_size
[docs] def get_estimated_size_and_observables(self, value, nested=False):
"""Returns estimated size of value along with any nested observables.
The list of nested observables is returned as a list of 2-tuples of
(obj, coder_impl), where obj is an instance of observable.ObservableMixin,
and coder_impl is the CoderImpl that can be used to encode elements sent by
obj to its observers.
Arguments:
value: the value whose encoded size is to be estimated.
nested: whether the value is nested.
Returns:
The estimated encoded size of the given value and a list of observables
whose elements are 2-tuples of (obj, coder_impl) as described above.
"""
return self.estimate_size(value, nested), []
[docs]class SimpleCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
Subclass of CoderImpl implementing stream methods using encode/decode."""
[docs] def encode_to_stream(self, value, stream, nested):
"""Reads object from potentially-nested encoding in stream."""
stream.write(self.encode(value), nested)
[docs] def decode_from_stream(self, stream, nested):
"""Reads object from potentially-nested encoding in stream."""
return self.decode(stream.read_all(nested))
[docs]class StreamCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
Subclass of CoderImpl implementing encode/decode using stream methods."""
[docs] def encode(self, value):
out = create_OutputStream()
self.encode_to_stream(value, out, False)
return out.get()
[docs] def decode(self, encoded):
return self.decode_from_stream(create_InputStream(encoded), False)
[docs] def estimate_size(self, value, nested=False):
"""Estimates the encoded size of the given value, in bytes."""
out = ByteCountingOutputStream()
self.encode_to_stream(value, out, nested)
return out.get_count()
[docs]class CallbackCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A CoderImpl that calls back to the _impl methods on the Coder itself.
This is the default implementation used if Coder._get_impl()
is not overwritten.
"""
def __init__(self, encoder, decoder, size_estimator=None):
self._encoder = encoder
self._decoder = decoder
self._size_estimator = size_estimator or self._default_size_estimator
def _default_size_estimator(self, value):
return len(self.encode(value))
[docs] def encode_to_stream(self, value, stream, nested):
return stream.write(self._encoder(value), nested)
[docs] def decode_from_stream(self, stream, nested):
return self._decoder(stream.read_all(nested))
[docs] def encode(self, value):
return self._encoder(value)
[docs] def decode(self, encoded):
return self._decoder(encoded)
[docs] def estimate_size(self, value, nested=False):
return self._get_nested_size(self._size_estimator(value), nested)
[docs] def get_estimated_size_and_observables(self, value, nested=False):
# TODO(robertwb): Remove this once all coders are correct.
if isinstance(value, observable.ObservableMixin):
# CallbackCoderImpl can presumably encode the elements too.
return 1, [(value, self)]
return self.estimate_size(value, nested), []
[docs]class DeterministicFastPrimitivesCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
def __init__(self, coder, step_label):
self._underlying_coder = coder
self._step_label = step_label
def _check_safe(self, value):
if isinstance(value, (str, unicode, long, int, float)):
pass
elif value is None:
pass
elif isinstance(value, (tuple, list)):
for x in value:
self._check_safe(x)
else:
raise TypeError(
"Unable to deterministically code '%s' of type '%s', "
"please provide a type hint for the input of '%s'" % (
value, type(value), self._step_label))
[docs] def encode_to_stream(self, value, stream, nested):
self._check_safe(value)
return self._underlying_coder.encode_to_stream(value, stream, nested)
[docs] def decode_from_stream(self, stream, nested):
return self._underlying_coder.decode_from_stream(stream, nested)
[docs] def encode(self, value):
self._check_safe(value)
return self._underlying_coder.encode(value)
[docs] def decode(self, encoded):
return self._underlying_coder.decode(encoded)
[docs] def estimate_size(self, value, nested=False):
return self._underlying_coder.estimate_size(value, nested)
[docs] def get_estimated_size_and_observables(self, value, nested=False):
return self._underlying_coder.get_estimated_size_and_observables(
value, nested)
[docs]class ProtoCoderImpl(SimpleCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
def __init__(self, proto_message_type):
self.proto_message_type = proto_message_type
[docs] def encode(self, value):
return value.SerializeToString()
[docs] def decode(self, encoded):
proto_message = self.proto_message_type()
proto_message.ParseFromString(encoded)
return proto_message
UNKNOWN_TYPE = 0xFF
NONE_TYPE = 0
INT_TYPE = 1
FLOAT_TYPE = 2
STR_TYPE = 3
UNICODE_TYPE = 4
BOOL_TYPE = 9
LIST_TYPE = 5
TUPLE_TYPE = 6
DICT_TYPE = 7
SET_TYPE = 8
[docs]class FastPrimitivesCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
def __init__(self, fallback_coder_impl):
self.fallback_coder_impl = fallback_coder_impl
[docs] def get_estimated_size_and_observables(self, value, nested=False):
if isinstance(value, observable.ObservableMixin):
# FastPrimitivesCoderImpl can presumably encode the elements too.
return 1, [(value, self)]
out = ByteCountingOutputStream()
self.encode_to_stream(value, out, nested)
return out.get_count(), []
[docs] def encode_to_stream(self, value, stream, nested):
t = type(value)
if t is NoneType:
stream.write_byte(NONE_TYPE)
elif t is int:
stream.write_byte(INT_TYPE)
stream.write_var_int64(value)
elif t is float:
stream.write_byte(FLOAT_TYPE)
stream.write_bigendian_double(value)
elif t is str:
stream.write_byte(STR_TYPE)
stream.write(value, nested)
elif t is unicode:
unicode_value = value # for typing
stream.write_byte(UNICODE_TYPE)
stream.write(unicode_value.encode('utf-8'), nested)
elif t is list or t is tuple or t is set:
stream.write_byte(
LIST_TYPE if t is list else TUPLE_TYPE if t is tuple else SET_TYPE)
stream.write_var_int64(len(value))
for e in value:
self.encode_to_stream(e, stream, True)
elif t is dict:
dict_value = value # for typing
stream.write_byte(DICT_TYPE)
stream.write_var_int64(len(dict_value))
for k, v in dict_value.iteritems():
self.encode_to_stream(k, stream, True)
self.encode_to_stream(v, stream, True)
elif t is bool:
stream.write_byte(BOOL_TYPE)
stream.write_byte(value)
else:
stream.write_byte(UNKNOWN_TYPE)
self.fallback_coder_impl.encode_to_stream(value, stream, nested)
[docs] def decode_from_stream(self, stream, nested):
t = stream.read_byte()
if t == NONE_TYPE:
return None
elif t == INT_TYPE:
return stream.read_var_int64()
elif t == FLOAT_TYPE:
return stream.read_bigendian_double()
elif t == STR_TYPE:
return stream.read_all(nested)
elif t == UNICODE_TYPE:
return stream.read_all(nested).decode('utf-8')
elif t == LIST_TYPE or t == TUPLE_TYPE or t == SET_TYPE:
vlen = stream.read_var_int64()
vlist = [self.decode_from_stream(stream, True) for _ in range(vlen)]
if t == LIST_TYPE:
return vlist
elif t == TUPLE_TYPE:
return tuple(vlist)
return set(vlist)
elif t == DICT_TYPE:
vlen = stream.read_var_int64()
v = {}
for _ in range(vlen):
k = self.decode_from_stream(stream, True)
v[k] = self.decode_from_stream(stream, True)
return v
elif t == BOOL_TYPE:
return not not stream.read_byte()
return self.fallback_coder_impl.decode_from_stream(stream, nested)
[docs]class BytesCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder for bytes/str objects."""
[docs] def encode_to_stream(self, value, out, nested):
out.write(value, nested)
[docs] def decode_from_stream(self, in_stream, nested):
return in_stream.read_all(nested)
[docs] def encode(self, value):
assert isinstance(value, bytes), (value, type(value))
return value
[docs] def decode(self, encoded):
return encoded
[docs]class FloatCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
[docs] def encode_to_stream(self, value, out, nested):
out.write_bigendian_double(value)
[docs] def decode_from_stream(self, in_stream, nested):
return in_stream.read_bigendian_double()
[docs] def estimate_size(self, unused_value, nested=False):
# A double is encoded as 8 bytes, regardless of nesting.
return 8
[docs]class IntervalWindowCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
# TODO: Fn Harness only supports millis. Is this important enough to fix?
def _to_normal_time(self, value):
"""Convert "lexicographically ordered unsigned" to signed."""
return value - (1 << 63)
def _from_normal_time(self, value):
"""Convert signed to "lexicographically ordered unsigned"."""
return value + (1 << 63)
[docs] def encode_to_stream(self, value, out, nested):
span_micros = value.end.micros - value.start.micros
out.write_bigendian_uint64(self._from_normal_time(value.end.micros / 1000))
out.write_var_int64(span_micros / 1000)
[docs] def decode_from_stream(self, in_, nested):
end_millis = self._to_normal_time(in_.read_bigendian_uint64())
start_millis = end_millis - in_.read_var_int64()
from apache_beam.transforms.window import IntervalWindow
ret = IntervalWindow(start=Timestamp(micros=start_millis * 1000),
end=Timestamp(micros=end_millis * 1000))
return ret
[docs] def estimate_size(self, value, nested=False):
# An IntervalWindow is context-insensitive, with a timestamp (8 bytes)
# and a varint timespam.
span = value.end.micros - value.start.micros
return 8 + get_varint_size(span / 1000)
[docs]class TimestampCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
[docs] def encode_to_stream(self, value, out, nested):
out.write_bigendian_int64(value.micros)
[docs] def decode_from_stream(self, in_stream, nested):
return Timestamp(micros=in_stream.read_bigendian_int64())
[docs] def estimate_size(self, unused_value, nested=False):
# A Timestamp is encoded as a 64-bit integer in 8 bytes, regardless of
# nesting.
return 8
small_ints = [chr(_) for _ in range(128)]
[docs]class VarIntCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder for long/int objects."""
[docs] def encode_to_stream(self, value, out, nested):
out.write_var_int64(value)
[docs] def decode_from_stream(self, in_stream, nested):
return in_stream.read_var_int64()
[docs] def encode(self, value):
ivalue = value # type cast
if 0 <= ivalue < len(small_ints):
return small_ints[ivalue]
return StreamCoderImpl.encode(self, value)
[docs] def decode(self, encoded):
if len(encoded) == 1:
i = ord(encoded)
if 0 <= i < 128:
return i
return StreamCoderImpl.decode(self, encoded)
[docs] def estimate_size(self, value, nested=False):
# Note that VarInts are encoded the same way regardless of nesting.
return get_varint_size(value)
[docs]class SingletonCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder that always encodes exactly one value."""
def __init__(self, value):
self._value = value
[docs] def encode_to_stream(self, value, stream, nested):
pass
[docs] def decode_from_stream(self, stream, nested):
return self._value
[docs] def encode(self, value):
b = '' # avoid byte vs str vs unicode error
return b
[docs] def decode(self, encoded):
return self._value
[docs] def estimate_size(self, value, nested=False):
return 0
[docs]class AbstractComponentCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
CoderImpl for coders that are comprised of several component coders."""
def __init__(self, coder_impls):
for c in coder_impls:
assert isinstance(c, CoderImpl), c
self._coder_impls = tuple(coder_impls)
def _extract_components(self, value):
raise NotImplementedError
def _construct_from_components(self, components):
raise NotImplementedError
[docs] def encode_to_stream(self, value, out, nested):
values = self._extract_components(value)
if len(self._coder_impls) != len(values):
raise ValueError(
'Number of components does not match number of coders.')
for i in range(0, len(self._coder_impls)):
c = self._coder_impls[i] # type cast
c.encode_to_stream(values[i], out,
nested or i + 1 < len(self._coder_impls))
[docs] def decode_from_stream(self, in_stream, nested):
return self._construct_from_components(
[c.decode_from_stream(in_stream,
nested or i + 1 < len(self._coder_impls))
for i, c in enumerate(self._coder_impls)])
[docs] def estimate_size(self, value, nested=False):
"""Estimates the encoded size of the given value, in bytes."""
# TODO(ccy): This ignores sizes of observable components.
estimated_size, _ = (
self.get_estimated_size_and_observables(value))
return estimated_size
[docs] def get_estimated_size_and_observables(self, value, nested=False):
"""Returns estimated size of value along with any nested observables."""
values = self._extract_components(value)
estimated_size = 0
observables = []
for i in range(0, len(self._coder_impls)):
c = self._coder_impls[i] # type cast
child_size, child_observables = (
c.get_estimated_size_and_observables(
values[i], nested=nested or i + 1 < len(self._coder_impls)))
estimated_size += child_size
observables += child_observables
return estimated_size, observables
[docs]class TupleCoderImpl(AbstractComponentCoderImpl):
"""A coder for tuple objects."""
def _extract_components(self, value):
return value
def _construct_from_components(self, components):
return tuple(components)
[docs]class SequenceCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder for sequences.
If the length of the sequence in known we encode the length as a 32 bit
``int`` followed by the encoded bytes.
If the length of the sequence is unknown, we encode the length as ``-1``
followed by the encoding of elements buffered up to 64K bytes before prefixing
the count of number of elements. A ``0`` is encoded at the end to indicate the
end of stream.
The resulting encoding would look like this::
-1
countA element(0) element(1) ... element(countA - 1)
countB element(0) element(1) ... element(countB - 1)
...
countX element(0) element(1) ... element(countX - 1)
0
"""
# Default buffer size of 64kB of handling iterables of unknown length.
_DEFAULT_BUFFER_SIZE = 64 * 1024
def __init__(self, elem_coder):
self._elem_coder = elem_coder
def _construct_from_sequence(self, values):
raise NotImplementedError
[docs] def encode_to_stream(self, value, out, nested):
# Compatible with Java's IterableLikeCoder.
if hasattr(value, '__len__'):
out.write_bigendian_int32(len(value))
for elem in value:
self._elem_coder.encode_to_stream(elem, out, True)
else:
# We don't know the size without traversing it so use a fixed size buffer
# and encode as many elements as possible into it before outputting
# the size followed by the elements.
# -1 to indicate that the length is not known.
out.write_bigendian_int32(-1)
buffer = create_OutputStream()
prev_index = index = -1
for index, elem in enumerate(value):
self._elem_coder.encode_to_stream(elem, buffer, True)
if out.size() > self._DEFAULT_BUFFER_SIZE:
out.write_var_int64(index - prev_index)
out.write(buffer.get())
prev_index = index
buffer = create_OutputStream()
if index > prev_index:
out.write_var_int64(index - prev_index)
out.write(buffer.get())
out.write_var_int64(0)
[docs] def decode_from_stream(self, in_stream, nested):
size = in_stream.read_bigendian_int32()
if size >= 0:
elements = [self._elem_coder.decode_from_stream(in_stream, True)
for _ in range(size)]
else:
elements = []
count = in_stream.read_var_int64()
while count > 0:
for _ in range(count):
elements.append(self._elem_coder.decode_from_stream(in_stream, True))
count = in_stream.read_var_int64()
return self._construct_from_sequence(elements)
[docs] def estimate_size(self, value, nested=False):
"""Estimates the encoded size of the given value, in bytes."""
# TODO(ccy): This ignores element sizes.
estimated_size, _ = (
self.get_estimated_size_and_observables(value))
return estimated_size
[docs] def get_estimated_size_and_observables(self, value, nested=False):
"""Returns estimated size of value along with any nested observables."""
estimated_size = 0
# Size of 32-bit integer storing number of elements.
estimated_size += 4
if isinstance(value, observable.ObservableMixin):
return estimated_size, [(value, self._elem_coder)]
observables = []
for elem in value:
child_size, child_observables = (
self._elem_coder.get_estimated_size_and_observables(
elem, nested=True))
estimated_size += child_size
observables += child_observables
# TODO: (BEAM-1537) Update to use an accurate count depending on size and
# count, currently we are underestimating the size by up to 10 bytes
# per block of data since we are not including the count prefix which
# occurs at most once per 64k of data and is upto 10 bytes long. The upper
# bound of the underestimate is 10 / 65536 ~= 0.0153% of the actual size.
return estimated_size, observables
[docs]class TupleSequenceCoderImpl(SequenceCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder for homogeneous tuple objects."""
def _construct_from_sequence(self, components):
return tuple(components)
[docs]class IterableCoderImpl(SequenceCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder for homogeneous iterable objects."""
def _construct_from_sequence(self, components):
return components
[docs]class WindowedValueCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder for windowed values."""
# Ensure that lexicographic ordering of the bytes corresponds to
# chronological order of timestamps.
# TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
# byte representation of timestamps.
def _to_normal_time(self, value):
"""Convert "lexicographically ordered unsigned" to signed."""
return value - (1 << 63)
def _from_normal_time(self, value):
"""Convert signed to "lexicographically ordered unsigned"."""
return value + (1 << 63)
def __init__(self, value_coder, timestamp_coder, window_coder):
# TODO(lcwik): Remove the timestamp coder field
self._value_coder = value_coder
self._timestamp_coder = timestamp_coder
self._windows_coder = TupleSequenceCoderImpl(window_coder)
[docs] def encode_to_stream(self, value, out, nested):
wv = value # type cast
# Avoid creation of Timestamp object.
restore_sign = -1 if wv.timestamp_micros < 0 else 1
out.write_bigendian_uint64(
# Convert to postive number and divide, since python rounds off to the
# lower negative number. For ex: -3 / 2 = -2, but we expect it to be -1,
# to be consistent across SDKs.
# TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
# precision of timestamps.
self._from_normal_time(
restore_sign * (abs(wv.timestamp_micros) / 1000)))
self._windows_coder.encode_to_stream(wv.windows, out, True)
# Default PaneInfo encoded byte representing NO_FIRING.
# TODO(BEAM-1522): Remove the hard coding here once PaneInfo is supported.
out.write_byte(0xF)
self._value_coder.encode_to_stream(wv.value, out, nested)
[docs] def decode_from_stream(self, in_stream, nested):
timestamp = self._to_normal_time(in_stream.read_bigendian_uint64())
# Restore MIN/MAX timestamps to their actual values as encoding incurs loss
# of precision while converting to millis.
# Note: This is only a best effort here as there is no way to know if these
# were indeed MIN/MAX timestamps.
# TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
# precision of timestamps.
if timestamp == -(abs(MIN_TIMESTAMP.micros) / 1000):
timestamp = MIN_TIMESTAMP.micros
elif timestamp == (MAX_TIMESTAMP.micros / 1000):
timestamp = MAX_TIMESTAMP.micros
else:
timestamp *= 1000
windows = self._windows_coder.decode_from_stream(in_stream, True)
# Read PaneInfo encoded byte.
# TODO(BEAM-1522): Ignored for now but should be converted to pane info once
# it is supported.
in_stream.read_byte()
value = self._value_coder.decode_from_stream(in_stream, nested)
return windowed_value.create(
value,
# Avoid creation of Timestamp object.
timestamp,
windows)
[docs] def get_estimated_size_and_observables(self, value, nested=False):
"""Returns estimated size of value along with any nested observables."""
if isinstance(value, observable.ObservableMixin):
# Should never be here.
# TODO(robertwb): Remove when coders are set correctly.
return 0, [(value, self._value_coder)]
estimated_size = 0
observables = []
value_estimated_size, value_observables = (
self._value_coder.get_estimated_size_and_observables(
value.value, nested=nested))
estimated_size += value_estimated_size
observables += value_observables
estimated_size += (
self._timestamp_coder.estimate_size(value.timestamp, nested=True))
estimated_size += (
self._windows_coder.estimate_size(value.windows, nested=True))
# for pane info
estimated_size += 1
return estimated_size, observables
[docs]class LengthPrefixCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
Coder which prefixes the length of the encoded object in the stream."""
def __init__(self, value_coder):
self._value_coder = value_coder
[docs] def encode_to_stream(self, value, out, nested):
encoded_value = self._value_coder.encode(value)
out.write_var_int64(len(encoded_value))
out.write(encoded_value)
[docs] def decode_from_stream(self, in_stream, nested):
value_length = in_stream.read_var_int64()
return self._value_coder.decode(in_stream.read(value_length))
[docs] def estimate_size(self, value, nested=False):
value_size = self._value_coder.estimate_size(value)
return get_varint_size(value_size) + value_size