#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A collection of WatermarkEstimator implementations that SplittableDoFns
can use."""
# pytype: skip-file
from apache_beam.io.iobase import WatermarkEstimator
from apache_beam.transforms.core import WatermarkEstimatorProvider
from apache_beam.utils.timestamp import Timestamp
[docs]
class MonotonicWatermarkEstimator(WatermarkEstimator):
  """A WatermarkEstimator which assumes that timestamps of all ouput records
  are increasing monotonically.
  """
  def __init__(self, timestamp):
    """For a new <element, restriction> pair, the initial value is None. When
    resuming processing, the initial timestamp will be the last reported
    watermark.
    """
    self._watermark = timestamp
    self._last_observed_timestamp = timestamp
[docs]
  def observe_timestamp(self, timestamp):
    self._last_observed_timestamp = timestamp 
[docs]
  def current_watermark(self):
    if self._last_observed_timestamp is not None \
        
and self._last_observed_timestamp >= self._watermark:
      self._watermark = self._last_observed_timestamp
    return self._watermark 
[docs]
  def get_estimator_state(self):
    return self._watermark 
[docs]
  @staticmethod
  def default_provider():
    """Provide a default WatermarkEstimatorProvider for
    MonotonicWatermarkEstimator.
    """
    class DefaultMonotonicWatermarkEstimator(WatermarkEstimatorProvider):
      def initial_estimator_state(self, element, restriction):
        return None
      def create_watermark_estimator(self, estimator_state):
        return MonotonicWatermarkEstimator(estimator_state)
    return DefaultMonotonicWatermarkEstimator() 
 
[docs]
class WalltimeWatermarkEstimator(WatermarkEstimator):
  """A WatermarkEstimator which uses processing time as the estimated watermark.
  """
  def __init__(self, timestamp=None):
    self._timestamp = timestamp or Timestamp.now()
[docs]
  def observe_timestamp(self, timestamp):
    pass 
[docs]
  def current_watermark(self):
    self._timestamp = max(self._timestamp, Timestamp.now())
    return self._timestamp 
[docs]
  def get_estimator_state(self):
    return self._timestamp 
[docs]
  @staticmethod
  def default_provider():
    """Provide a default WatermarkEstimatorProvider for
    WalltimeWatermarkEstimator.
    """
    class DefaultWalltimeWatermarkEstimator(WatermarkEstimatorProvider):
      def initial_estimator_state(self, element, restriction):
        return None
      def create_watermark_estimator(self, estimator_state):
        return WalltimeWatermarkEstimator(estimator_state)
    return DefaultWalltimeWatermarkEstimator() 
 
[docs]
class ManualWatermarkEstimator(WatermarkEstimator):
  """A WatermarkEstimator which is controlled manually from within a DoFn.
  The DoFn must invoke set_watermark to advance the watermark.
  """
  def __init__(self, watermark):
    self._watermark = watermark
[docs]
  def observe_timestamp(self, timestamp):
    pass 
[docs]
  def current_watermark(self):
    return self._watermark 
[docs]
  def get_estimator_state(self):
    return self._watermark 
[docs]
  def set_watermark(self, timestamp):
    # pylint: disable=line-too-long
    """Sets a timestamp before or at the timestamps of all future elements
    produced by the associated DoFn.
    This can be approximate. If records are output that violate this guarantee,
    they will be considered late, which will affect how they will be processed.
    See https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data
    for more information on late data and how to handle it.
    However, this value should be as late as possible. Downstream windows may
    not be able to close until this watermark passes their end.
    """
    if not isinstance(timestamp, Timestamp):
      raise ValueError('set_watermark expects a Timestamp as input')
    if self._watermark and self._watermark > timestamp:
      raise ValueError(
          'Watermark must be monotonically increasing.'
          'Provided watermark %s is less than '
          'current watermark %s',
          timestamp,
          self._watermark)
    self._watermark = timestamp 
[docs]
  @staticmethod
  def default_provider():
    """Provide a default WatermarkEstimatorProvider for
    WalltimeWatermarkEstimator.
    """
    class DefaultManualWatermarkEstimatorProvider(WatermarkEstimatorProvider):
      def initial_estimator_state(self, element, restriction):
        return None
      def create_watermark_estimator(self, estimator_state):
        return ManualWatermarkEstimator(estimator_state)
    return DefaultManualWatermarkEstimatorProvider()