Source code for apache_beam.utils.windowed_value

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Core windowing data structures.

This module is experimental. No backwards-compatibility guarantees.
"""

# This module is carefully crafted to have optimal performance when
# compiled while still being valid Python.  Care needs to be taken when
# editing this file as WindowedValues are created for every element for
# every step in a Beam pipeline.

#cython: profile=True

from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.timestamp import Timestamp


[docs]class WindowedValue(object): """A windowed value having a value, a timestamp and set of windows. Attributes: value: The underlying value of a windowed value. timestamp: Timestamp associated with the value as seconds since Unix epoch. windows: A set (iterable) of window objects for the value. The window object are descendants of the BoundedWindow class. """ def __init__(self, value, timestamp, windows): # For performance reasons, only timestamp_micros is stored by default # (as a C int). The Timestamp object is created on demand below. self.value = value if isinstance(timestamp, int): self.timestamp_micros = timestamp * 1000000 else: self.timestamp_object = (timestamp if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp)) self.timestamp_micros = self.timestamp_object.micros self.windows = windows @property def timestamp(self): if self.timestamp_object is None: self.timestamp_object = Timestamp(0, self.timestamp_micros) return self.timestamp_object def __repr__(self): return '(%s, %s, %s)' % ( repr(self.value), 'MIN_TIMESTAMP' if self.timestamp == MIN_TIMESTAMP else 'MAX_TIMESTAMP' if self.timestamp == MAX_TIMESTAMP else float(self.timestamp), self.windows) def __hash__(self): return hash(self.value) + 3 * self.timestamp_micros + 7 * hash(self.windows) # We'd rather implement __eq__, but Cython supports that via __richcmp__ # instead. Fortunately __cmp__ is understood by both (but not by Python 3). def __cmp__(left, right): # pylint: disable=no-self-argument """Compares left and right for equality. For performance reasons, doesn't actually impose an ordering on unequal values (always returning 1). """ if type(left) is not type(right): return cmp(type(left), type(right)) # TODO(robertwb): Avoid the type checks? # Returns False (0) if equal, and True (1) if not. return not WindowedValue._typed_eq(left, right) @staticmethod def _typed_eq(left, right): return (left.timestamp_micros == right.timestamp_micros and left.value == right.value and left.windows == right.windows)
[docs] def with_value(self, new_value): """Creates a new WindowedValue with the same timestamps and windows as this. This is the fasted way to create a new WindowedValue. """ return create(new_value, self.timestamp_micros, self.windows)
def __reduce__(self): return WindowedValue, (self.value, self.timestamp, self.windows)
# TODO(robertwb): Move this to a static method.
[docs]def create(value, timestamp_micros, windows): wv = WindowedValue.__new__(WindowedValue) wv.value = value wv.timestamp_micros = timestamp_micros wv.windows = windows return wv
try: WindowedValue.timestamp_object = None except TypeError: # When we're compiled, we can't dynamically add attributes to # the cdef class, but in this case it's OK as it's already present # on each instance. pass