Source code for apache_beam.utils.windowed_value
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Core windowing data structures.
This module is experimental. No backwards-compatibility guarantees.
"""
# This module is carefully crafted to have optimal performance when
# compiled while still being valid Python.  Care needs to be taken when
# editing this file as WindowedValues are created for every element for
# every step in a Beam pipeline.
#cython: profile=True
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.timestamp import Timestamp
[docs]class WindowedValue(object):
  """A windowed value having a value, a timestamp and set of windows.
  Attributes:
    value: The underlying value of a windowed value.
    timestamp: Timestamp associated with the value as seconds since Unix epoch.
    windows: A set (iterable) of window objects for the value. The window
      object are descendants of the BoundedWindow class.
  """
  def __init__(self, value, timestamp, windows):
    # For performance reasons, only timestamp_micros is stored by default
    # (as a C int). The Timestamp object is created on demand below.
    self.value = value
    if isinstance(timestamp, int):
      self.timestamp_micros = timestamp * 1000000
    else:
      self.timestamp_object = (timestamp if isinstance(timestamp, Timestamp)
                               else Timestamp.of(timestamp))
      self.timestamp_micros = self.timestamp_object.micros
    self.windows = windows
  @property
  def timestamp(self):
    if self.timestamp_object is None:
      self.timestamp_object = Timestamp(0, self.timestamp_micros)
    return self.timestamp_object
  def __repr__(self):
    return '(%s, %s, %s)' % (
        repr(self.value),
        'MIN_TIMESTAMP' if self.timestamp == MIN_TIMESTAMP else
        'MAX_TIMESTAMP' if self.timestamp == MAX_TIMESTAMP else
        float(self.timestamp),
        self.windows)
  def __hash__(self):
    return hash(self.value) + 3 * self.timestamp_micros + 7 * hash(self.windows)
  # We'd rather implement __eq__, but Cython supports that via __richcmp__
  # instead.  Fortunately __cmp__ is understood by both (but not by Python 3).
  def __cmp__(left, right):  # pylint: disable=no-self-argument
    """Compares left and right for equality.
    For performance reasons, doesn't actually impose an ordering
    on unequal values (always returning 1).
    """
    if type(left) is not type(right):
      return cmp(type(left), type(right))
    # TODO(robertwb): Avoid the type checks?
    # Returns False (0) if equal, and True (1) if not.
    return not WindowedValue._typed_eq(left, right)
  @staticmethod
  def _typed_eq(left, right):
    return (left.timestamp_micros == right.timestamp_micros
            and left.value == right.value
            and left.windows == right.windows)
[docs]  def with_value(self, new_value):
    """Creates a new WindowedValue with the same timestamps and windows as this.
    This is the fasted way to create a new WindowedValue.
    """
    return create(new_value, self.timestamp_micros, self.windows) 
  def __reduce__(self):
    return WindowedValue, (self.value, self.timestamp, self.windows) 
# TODO(robertwb): Move this to a static method.
[docs]def create(value, timestamp_micros, windows):
  wv = WindowedValue.__new__(WindowedValue)
  wv.value = value
  wv.timestamp_micros = timestamp_micros
  wv.windows = windows
  return wv 
try:
  WindowedValue.timestamp_object = None
except TypeError:
  # When we're compiled, we can't dynamically add attributes to
  # the cdef class, but in this case it's OK as it's already present
  # on each instance.
  pass