Source code for apache_beam.io.watermark_estimators

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A collection of WatermarkEstimator implementations that SplittableDoFns
can use."""

# pytype: skip-file

from apache_beam.io.iobase import WatermarkEstimator
from apache_beam.transforms.core import WatermarkEstimatorProvider
from apache_beam.utils.timestamp import Timestamp


[docs]class MonotonicWatermarkEstimator(WatermarkEstimator): """A WatermarkEstimator which assumes that timestamps of all ouput records are increasing monotonically. """ def __init__(self, timestamp): """For a new <element, restriction> pair, the initial value is None. When resuming processing, the initial timestamp will be the last reported watermark. """ self._watermark = timestamp
[docs] def observe_timestamp(self, timestamp): if self._watermark is None: self._watermark = timestamp else: # TODO(BEAM-9312): Consider making it configurable to deal with late # timestamp. if timestamp < self._watermark: raise ValueError( 'A MonotonicWatermarkEstimator expects output ' 'timestamp to be increasing monotonically.') self._watermark = timestamp
[docs] def current_watermark(self): return self._watermark
[docs] def get_estimator_state(self): return self._watermark
[docs] @staticmethod def default_provider(): """Provide a default WatermarkEstimatorProvider for MonotonicWatermarkEstimator. """ class DefaultMonotonicWatermarkEstimator(WatermarkEstimatorProvider): def initial_estimator_state(self, element, restriction): return None def create_watermark_estimator(self, estimator_state): return MonotonicWatermarkEstimator(estimator_state) return DefaultMonotonicWatermarkEstimator()
[docs]class WalltimeWatermarkEstimator(WatermarkEstimator): """A WatermarkEstimator which uses processing time as the estimated watermark. """ def __init__(self, timestamp=None): self._timestamp = timestamp or Timestamp.now()
[docs] def observe_timestamp(self, timestamp): pass
[docs] def current_watermark(self): self._timestamp = max(self._timestamp, Timestamp.now()) return self._timestamp
[docs] def get_estimator_state(self): return self._timestamp
[docs] @staticmethod def default_provider(): """Provide a default WatermarkEstimatorProvider for WalltimeWatermarkEstimator. """ class DefaultWalltimeWatermarkEstimator(WatermarkEstimatorProvider): def initial_estimator_state(self, element, restriction): return None def create_watermark_estimator(self, estimator_state): return WalltimeWatermarkEstimator(estimator_state) return DefaultWalltimeWatermarkEstimator()
[docs]class ManualWatermarkEstimator(WatermarkEstimator): """A WatermarkEstimator which is controlled manually from within a DoFn. The DoFn must invoke set_watermark to advance the watermark. """ def __init__(self, watermark): self._watermark = watermark
[docs] def observe_timestamp(self, timestamp): pass
[docs] def current_watermark(self): return self._watermark
[docs] def get_estimator_state(self): return self._watermark
[docs] def set_watermark(self, timestamp): # pylint: disable=line-too-long """Sets a timestamp before or at the timestamps of all future elements produced by the associated DoFn. This can be approximate. If records are output that violate this guarantee, they will be considered late, which will affect how they will be processed. See https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data for more information on late data and how to handle it. However, this value should be as late as possible. Downstream windows may not be able to close until this watermark passes their end. """ if not isinstance(timestamp, Timestamp): raise ValueError('set_watermark expects a Timestamp as input') if self._watermark and self._watermark > timestamp: raise ValueError( 'Watermark must be monotonically increasing.' 'Provided watermark %s is less than ' 'current watermark %s', timestamp, self._watermark) self._watermark = timestamp
[docs] @staticmethod def default_provider(): """Provide a default WatermarkEstimatorProvider for WalltimeWatermarkEstimator. """ class DefaultManualWatermarkEstimatorProvider(WatermarkEstimatorProvider): def initial_estimator_state(self, element, restriction): return None def create_watermark_estimator(self, estimator_state): return ManualWatermarkEstimator(estimator_state) return DefaultManualWatermarkEstimatorProvider()