#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import enum
import math
import time
import warnings
from typing import Any
from typing import Optional
from typing import Sequence
import apache_beam as beam
from apache_beam.io.restriction_trackers import OffsetRange
from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
from apache_beam.io.watermark_estimators import ManualWatermarkEstimator
from apache_beam.runners import sdf_utils
from apache_beam.transforms import core
from apache_beam.transforms import window
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils import timestamp
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import Duration
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.timestamp import TimestampTypes
[docs]
class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
[docs]
def initial_restriction(self, element):
start, end, interval = element
if not isinstance(start, Timestamp):
start = Timestamp.of(start)
if not isinstance(end, Timestamp):
end = Timestamp.of(end)
interval_duration = Duration(interval)
assert start <= end
assert interval > 0
total_duration = end - start
total_outputs = math.ceil(total_duration.micros / interval_duration.micros)
return OffsetRange(0, total_outputs)
[docs]
def create_tracker(self, restriction):
return OffsetRestrictionTracker(restriction)
[docs]
def restriction_size(self, element, restriction):
return _sequence_backlog_bytes(element, time.time(), restriction)
# On drain, immediately stop emitting new elements
[docs]
def truncate(self, unused_element, unused_restriction):
return None
def _sequence_backlog_bytes(element, now, offset_range):
'''
Calculates size of the output that the sequence should have emitted up to now.
'''
start, _, interval = element
if isinstance(start, Timestamp):
start = start.micros / 1000000
assert interval > 0
now_index = math.floor((now - start) / interval)
if now_index < offset_range.start:
return 0
# We attempt to be precise as some runners scale based upon bytes and
# output byte throughput.
return 8 * (min(offset_range.stop, now_index) - offset_range.start)
[docs]
class ImpulseSeqGenDoFn(beam.DoFn):
'''
ImpulseSeqGenDoFn fn receives tuple elements with three parts:
* first_timestamp = The timestamp of the first element to be generated
(inclusive).
* last_timestamp = The timestamp marking the end of the generation period
(exclusive). No elements will be generated at or after this time.
* fire_interval = how often to fire an element.
For each input element received, ImpulseSeqGenDoFn fn will start
generating output elements in following pattern:
* if element timestamp is less than current runtime then output element.
* if element timestamp is greater than current runtime, wait until next
element timestamp.
ImpulseSeqGenDoFn can't guarantee that each element is output at exact time.
ImpulseSeqGenDoFn guarantees that elements would not be output prior to
given runtime timestamp.
The output mode of the DoFn is based on the input `data`:
- **None**: If `data` is None (by default), the output element will be the
timestamp.
- **Non-Timestamped Data**: If `data` is a sequence of arbitrary values
(e.g., `[v1, v2, ...]`), the DoFn will assign a timestamp to each
emitted element.
- **Pre-Timestamped Data**: If `data` is a sequence of tuples, where each
tuple is `(apache_beam.utils.timestamp.Timestamp, value)`, the DoFn
will use the provided timestamp for the emitted element.
See the parameter description of `PeriodicImpulse` for more information.
'''
def __init__(self, data: Optional[Sequence[Any]] = None):
self._data = data
assert self._data is None or len(self._data) > 0
self._len = len(self._data) if self._data is not None else 0
self._is_pre_timestamped = self._data is not None and self._len > 0 and \
isinstance(self._data[0], tuple) and \
isinstance(self._data[0][0], timestamp.Timestamp)
def _get_output(self, index, current_output_timestamp):
if self._data is None:
return TimestampedValue(
current_output_timestamp, current_output_timestamp)
if self._is_pre_timestamped:
event_time, value = self._data[index % self._len]
return TimestampedValue(value, event_time)
else:
value = self._data[index % self._len]
return TimestampedValue(value, current_output_timestamp)
[docs]
@beam.DoFn.unbounded_per_element()
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
ImpulseSeqGenRestrictionProvider()),
watermark_estimator=beam.DoFn.WatermarkEstimatorParam(
ManualWatermarkEstimator.default_provider())):
'''
:param element: (start_timestamp, end_timestamp, interval)
:param restriction_tracker:
:return: yields elements at processing real-time intervals with value of
target output timestamp for the element.
'''
start, _, interval = element
if isinstance(start, Timestamp):
start = start.micros / 1000000
assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
current_output_index = restriction_tracker.current_restriction().start
while True:
current_output_timestamp = start + interval * current_output_index
if current_output_timestamp > time.time():
# we are too ahead of time, let's wait.
restriction_tracker.defer_remainder(
timestamp.Timestamp(current_output_timestamp))
return
if not restriction_tracker.try_claim(current_output_index):
# nothing to claim, just stop
return
output = self._get_output(current_output_index, current_output_timestamp)
current_watermark = watermark_estimator.current_watermark()
if current_watermark is None or output.timestamp > current_watermark:
# ensure watermark is monotonic
watermark_estimator.set_watermark(output.timestamp)
yield output
current_output_index += 1
[docs]
class PeriodicSequence(PTransform):
'''
PeriodicSequence transform receives tuple elements with three parts:
* first_timestamp = first timestamp to output element for.
* last_timestamp = last timestamp/time to output element for.
* fire_interval = how often to fire an element.
For each input element received, PeriodicSequence transform will start
generating output elements in following pattern:
* if element timestamp is less than current runtime then output element.
* if element timestamp is greater than current runtime, wait until next
element timestamp.
PeriodicSequence can't guarantee that each element is output at exact time.
PeriodicSequence guarantees that elements would not be output prior to given
runtime timestamp.
The PCollection generated by PeriodicSequence is unbounded.
'''
def __init__(self):
pass
[docs]
def expand(self, pcoll):
return (
pcoll
| 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn())
| 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt)))
[docs]
class RebaseMode(enum.Enum):
'''Controls how the start and stop timestamps are rebased to execution time.
Attributes:
REBASE_NONE: Timestamps are not changed.
REBASE_ALL: Both start and stop timestamps are rebased, preserving the
original duration.
REBASE_START: Only the start timestamp is rebased; the stop timestamp
is unchanged.
'''
REBASE_NONE = 0
REBASE_ALL = 1
REBASE_START = 2
[docs]
class PeriodicImpulse(PTransform):
'''
PeriodicImpulse transform generates an infinite sequence of elements with
given runtime interval.
PeriodicImpulse transform behaves same as {@link PeriodicSequence} transform,
but can be used as first transform in pipeline.
The PCollection generated by PeriodicImpulse is unbounded.
'''
def _validate_and_adjust_duration(self):
assert self.data
# The total time we need to impulse all the data.
data_duration = (len(self.data) - 1) * Duration(self.interval)
is_pre_timestamped = isinstance(self.data[0], tuple) and \
isinstance(self.data[0][0], timestamp.Timestamp)
start_ts = Timestamp.of(self.start_ts)
stop_ts = Timestamp.of(self.stop_ts)
if stop_ts == MAX_TIMESTAMP:
# When the stop timestamp is unbounded (MAX_TIMESTAMP), set it to the
# data's actual end time plus an extra fire interval, because the
# impulse duration's upper bound is exclusive.
self.stop_ts = start_ts + data_duration + Duration(self.interval)
stop_ts = self.stop_ts
# The total time for the impulse signal which occurs in [start, end).
impulse_duration = stop_ts - start_ts
if data_duration + Duration(self.interval) < impulse_duration:
# We don't have enough data for the impulse.
# If we can fit at least one more data point in the impulse duration,
# then we will be in the repeat mode.
message = 'The number of elements in the provided pre-timestamped ' \
'data sequence is not enough to span the full impulse duration. ' \
f'Expected duration: {impulse_duration}, ' \
f'actual data duration: {data_duration}.'
if is_pre_timestamped:
raise ValueError(
f'{message} Please either provide more data or decrease '
'`stop_timestamp`.')
else:
warnings.warn(
f'{message} As a result, the data sequence will be repeated to '
'generate elements for the entire duration.')
def __init__(
self,
start_timestamp: TimestampTypes = Timestamp.now(),
stop_timestamp: TimestampTypes = MAX_TIMESTAMP,
fire_interval: float = 360.0,
apply_windowing: bool = False,
data: Optional[Sequence[Any]] = None,
rebase: RebaseMode = RebaseMode.REBASE_NONE):
'''
:param start_timestamp: Timestamp for first element.
:param stop_timestamp: Timestamp at or after which no elements will be
output.
:param fire_interval: Interval in seconds at which to output elements.
:param apply_windowing: Whether each element should be assigned to
individual window. If false, all elements will reside in global window.
:param data: A sequence of elements to emit. The behavior depends on the
content:
- **None (default):** The transform emits the event timestamps as
the element values, starting from start_timestamp and incrementing by
`fire_interval` up to the `stop_timestamp` (exclusive)
- **Sequence of raw values (e.g., `['a', 'b']`)**: The transform emits
each value in the sequence, assigning it an event timestamp that is
calculated in the same manner as the default scenario. The sequence
is repeated if the impulse duration requires more elements than
are in the sequence (a warning will be given in this case).
- **Sequence of pre-timestamped tuples (e.g.,
`[(t1, v1), (t2, v2)]`)**: The transform emits each value with its
explicitly provided event time. The format must be
`(apache_beam.utils.timestamp.Timestamp, value)`. The provided
timestamps are used directly, overriding the calculated ones.
Note that the elements in the sequence is NOT required to be ordered
by event time; an element with a timestamp earlier than a preceding one
will be treated as a potential late event.
**Important**: In this mode, the number of elements in `data` must be
sufficient to cover the duration defined by `start_timestamp`,
`stop_timestamp`, and `fire_interval`; otherwise, a `ValueError` is
raised.
:param rebase: Controls how the start and stop timestamps are rebased to
execution time. See `RebaseMode` for more details. Defaults to
`REBASE_NONE`.
'''
self.start_ts = start_timestamp
self.stop_ts = stop_timestamp
self.interval = fire_interval
self.apply_windowing = apply_windowing
self.data = data
self.rebase = rebase
if self.data:
self._validate_and_adjust_duration()
[docs]
def expand(self, pbegin):
if self.rebase == RebaseMode.REBASE_ALL:
duration = Timestamp.of(self.stop_ts) - Timestamp.of(self.start_ts)
impulse_element = pbegin | beam.Impulse() | beam.Map(
lambda _:
[Timestamp.now(), Timestamp.now() + duration, self.interval])
elif self.rebase == RebaseMode.REBASE_START:
impulse_element = pbegin | beam.Impulse() | beam.Map(
lambda _: [Timestamp.now(), self.stop_ts, self.interval])
else:
impulse_element = pbegin | 'ImpulseElement' >> beam.Create(
[(self.start_ts, self.stop_ts, self.interval)])
result = (
impulse_element
| 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn(self.data)))
if not self.data:
# This step is actually an identity transform, because the Timestamped
# values have already been generated in `ImpulseSeqGenDoFn`.
# We keep this step here to prevent the current PeriodicImpulse from
# breaking the compatibility.
result = (result | 'MapToTimestamped' >> beam.Map(lambda tt: tt))
if self.apply_windowing:
result = result | 'ApplyWindowing' >> beam.WindowInto(
window.FixedWindows(self.interval))
return result