Source code for apache_beam.transforms.trigger

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Support for Dataflow triggers.

Triggers control when in processing time windows get emitted.
"""

# pytype: skip-file

from __future__ import absolute_import

import collections
import copy
import logging
import numbers
from abc import ABCMeta
from abc import abstractmethod
from builtins import object

from future.moves.itertools import zip_longest
from future.utils import iteritems
from future.utils import with_metaclass

from apache_beam.coders import coder_impl
from apache_beam.coders import observable
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.transforms import combiners
from apache_beam.transforms import core
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import WindowedValue
from apache_beam.transforms.window import WindowFn
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.timestamp import TIME_GRANULARITY

# AfterCount is experimental. No backwards compatibility guarantees.

__all__ = [
    'AccumulationMode',
    'TriggerFn',
    'DefaultTrigger',
    'AfterWatermark',
    'AfterProcessingTime',
    'AfterCount',
    'Repeatedly',
    'AfterAny',
    'AfterAll',
    'AfterEach',
    'OrFinally',
]

_LOGGER = logging.getLogger(__name__)


[docs]class AccumulationMode(object): """Controls what to do with data when a trigger fires multiple times.""" DISCARDING = beam_runner_api_pb2.AccumulationMode.DISCARDING ACCUMULATING = beam_runner_api_pb2.AccumulationMode.ACCUMULATING
# TODO(robertwb): Provide retractions of previous outputs. # RETRACTING = 3 class _StateTag(with_metaclass(ABCMeta, object)): # type: ignore[misc] """An identifier used to store and retrieve typed, combinable state. The given tag must be unique for this step.""" def __init__(self, tag): self.tag = tag class _ValueStateTag(_StateTag): """StateTag pointing to an element.""" def __repr__(self): return 'ValueStateTag(%s)' % (self.tag) def with_prefix(self, prefix): return _ValueStateTag(prefix + self.tag) class _SetStateTag(_StateTag): """StateTag pointing to an element.""" def __repr__(self): return 'SetStateTag({tag})'.format(tag=self.tag) def with_prefix(self, prefix): return _SetStateTag(prefix + self.tag) class _CombiningValueStateTag(_StateTag): """StateTag pointing to an element, accumulated with a combiner. The given tag must be unique for this step. The given CombineFn will be applied (possibly incrementally and eagerly) when adding elements.""" # TODO(robertwb): Also store the coder (perhaps extracted from the combine_fn) def __init__(self, tag, combine_fn): super(_CombiningValueStateTag, self).__init__(tag) if not combine_fn: raise ValueError('combine_fn must be specified.') if not isinstance(combine_fn, core.CombineFn): combine_fn = core.CombineFn.from_callable(combine_fn) self.combine_fn = combine_fn def __repr__(self): return 'CombiningValueStateTag(%s, %s)' % (self.tag, self.combine_fn) def with_prefix(self, prefix): return _CombiningValueStateTag(prefix + self.tag, self.combine_fn) def without_extraction(self): class NoExtractionCombineFn(core.CombineFn): create_accumulator = self.combine_fn.create_accumulator add_input = self.combine_fn.add_input merge_accumulators = self.combine_fn.merge_accumulators compact = self.combine_fn.compact extract_output = staticmethod(lambda x: x) return _CombiningValueStateTag(self.tag, NoExtractionCombineFn()) class _ListStateTag(_StateTag): """StateTag pointing to a list of elements.""" def __repr__(self): return 'ListStateTag(%s)' % self.tag def with_prefix(self, prefix): return _ListStateTag(prefix + self.tag) class _WatermarkHoldStateTag(_StateTag): def __init__(self, tag, timestamp_combiner_impl): super(_WatermarkHoldStateTag, self).__init__(tag) self.timestamp_combiner_impl = timestamp_combiner_impl def __repr__(self): return 'WatermarkHoldStateTag(%s, %s)' % ( self.tag, self.timestamp_combiner_impl) def with_prefix(self, prefix): return _WatermarkHoldStateTag( prefix + self.tag, self.timestamp_combiner_impl) # pylint: disable=unused-argument # TODO(robertwb): Provisional API, Java likely to change as well.
[docs]class TriggerFn(with_metaclass(ABCMeta, object)): # type: ignore[misc] """A TriggerFn determines when window (panes) are emitted. See https://beam.apache.org/documentation/programming-guide/#triggers """
[docs] @abstractmethod def on_element(self, element, window, context): """Called when a new element arrives in a window. Args: element: the element being added window: the window to which the element is being added context: a context (e.g. a TriggerContext instance) for managing state and setting timers """ pass
[docs] @abstractmethod def on_merge(self, to_be_merged, merge_result, context): """Called when multiple windows are merged. Args: to_be_merged: the set of windows to be merged merge_result: the window into which the windows are being merged context: a context (e.g. a TriggerContext instance) for managing state and setting timers """ pass
[docs] @abstractmethod def should_fire(self, time_domain, timestamp, window, context): """Whether this trigger should cause the window to fire. Args: time_domain: WATERMARK for event-time timers and REAL_TIME for processing-time timers. timestamp: for time_domain WATERMARK, it represents the watermark: (a lower bound on) the watermark of the system and for time_domain REAL_TIME, it represents the trigger: timestamp of the processing-time timer. window: the window whose trigger is being considered context: a context (e.g. a TriggerContext instance) for managing state and setting timers Returns: whether this trigger should cause a firing """ pass
[docs] @abstractmethod def has_ontime_pane(self): """Whether this trigger creates an empty pane even if there are no elements. Returns: True if this trigger guarantees that there will always be an ON_TIME pane even if there are no elements in that pane. """ pass
[docs] @abstractmethod def on_fire(self, watermark, window, context): """Called when a trigger actually fires. Args: watermark: (a lower bound on) the watermark of the system window: the window whose trigger is being fired context: a context (e.g. a TriggerContext instance) for managing state and setting timers Returns: whether this trigger is finished """ pass
[docs] @abstractmethod def reset(self, window, context): """Clear any state and timers used by this TriggerFn.""" pass
# pylint: enable=unused-argument
[docs] @staticmethod def from_runner_api(proto, context): return { 'after_all': AfterAll, 'after_any': AfterAny, 'after_each': AfterEach, 'after_end_of_window': AfterWatermark, 'after_processing_time': AfterProcessingTime, # after_processing_time, after_synchronized_processing_time 'always': Always, 'default': DefaultTrigger, 'element_count': AfterCount, # never 'or_finally': OrFinally, 'repeat': Repeatedly, }[proto.WhichOneof('trigger')].from_runner_api(proto, context)
[docs] @abstractmethod def to_runner_api(self, unused_context): pass
def __ne__(self, other): # TODO(BEAM-5949): Needed for Python 2 compatibility. return not self == other
[docs]class DefaultTrigger(TriggerFn): """Semantically Repeatedly(AfterWatermark()), but more optimized.""" def __init__(self): pass def __repr__(self): return 'DefaultTrigger()'
[docs] def on_element(self, element, window, context): context.set_timer('', TimeDomain.WATERMARK, window.end)
[docs] def on_merge(self, to_be_merged, merge_result, context): # Note: Timer clearing solely an optimization. for window in to_be_merged: if window.end != merge_result.end: context.clear_timer('', TimeDomain.WATERMARK)
[docs] def should_fire(self, time_domain, watermark, window, context): if watermark >= window.end: # Explicitly clear the timer so that late elements are not emitted again # when the timer is fired. context.clear_timer('', TimeDomain.WATERMARK) return watermark >= window.end
[docs] def on_fire(self, watermark, window, context): return False
[docs] def reset(self, window, context): context.clear_timer('', TimeDomain.WATERMARK)
def __eq__(self, other): return type(self) == type(other) def __hash__(self): return hash(type(self))
[docs] @staticmethod def from_runner_api(proto, context): return DefaultTrigger()
[docs] def to_runner_api(self, unused_context): return beam_runner_api_pb2.Trigger( default=beam_runner_api_pb2.Trigger.Default())
[docs] def has_ontime_pane(self): return True
[docs]class AfterProcessingTime(TriggerFn): """Fire exactly once after a specified delay from processing time. AfterProcessingTime is experimental. No backwards compatibility guarantees. """ def __init__(self, delay=0): """Initialize a processing time trigger with a delay in seconds.""" self.delay = delay def __repr__(self): return 'AfterProcessingTime(delay=%d)' % self.delay
[docs] def on_element(self, element, window, context): context.set_timer( '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
[docs] def on_merge(self, to_be_merged, merge_result, context): # timers will be kept through merging pass
[docs] def should_fire(self, time_domain, timestamp, window, context): if time_domain == TimeDomain.REAL_TIME: return True
[docs] def on_fire(self, timestamp, window, context): return True
[docs] def reset(self, window, context): pass
[docs] @staticmethod def from_runner_api(proto, context): return AfterProcessingTime( delay=( proto.after_processing_time.timestamp_transforms[0].delay. delay_millis) // 1000)
[docs] def to_runner_api(self, context): delay_proto = beam_runner_api_pb2.TimestampTransform( delay=beam_runner_api_pb2.TimestampTransform.Delay( delay_millis=self.delay * 1000)) return beam_runner_api_pb2.Trigger( after_processing_time=beam_runner_api_pb2.Trigger.AfterProcessingTime( timestamp_transforms=[delay_proto]))
[docs] def has_ontime_pane(self): return False
class Always(TriggerFn): """Repeatedly invoke the given trigger, never finishing.""" def __init__(self): pass def __repr__(self): return 'Always' def __eq__(self, other): return type(self) == type(other) def __hash__(self): return 1 def on_element(self, element, window, context): pass def on_merge(self, to_be_merged, merge_result, context): pass def has_ontime_pane(self): False def reset(self, window, context): pass def should_fire(self, time_domain, watermark, window, context): return True def on_fire(self, watermark, window, context): return False @staticmethod def from_runner_api(proto, context): return Always() def to_runner_api(self, context): return beam_runner_api_pb2.Trigger( always=beam_runner_api_pb2.Trigger.Always())
[docs]class AfterWatermark(TriggerFn): """Fire exactly once when the watermark passes the end of the window. Args: early: if not None, a speculative trigger to repeatedly evaluate before the watermark passes the end of the window late: if not None, a speculative trigger to repeatedly evaluate after the watermark passes the end of the window """ LATE_TAG = _CombiningValueStateTag('is_late', any) def __init__(self, early=None, late=None): self.early = Repeatedly(early) if early else None self.late = Repeatedly(late) if late else None def __repr__(self): qualifiers = [] if self.early: qualifiers.append('early=%s' % self.early.underlying) if self.late: qualifiers.append('late=%s' % self.late.underlying) return 'AfterWatermark(%s)' % ', '.join(qualifiers)
[docs] def is_late(self, context): return self.late and context.get_state(self.LATE_TAG)
[docs] def on_element(self, element, window, context): if self.is_late(context): self.late.on_element(element, window, NestedContext(context, 'late')) else: context.set_timer('', TimeDomain.WATERMARK, window.end) if self.early: self.early.on_element(element, window, NestedContext(context, 'early'))
[docs] def on_merge(self, to_be_merged, merge_result, context): # TODO(robertwb): Figure out whether the 'rewind' semantics could be used # here. if self.is_late(context): self.late.on_merge( to_be_merged, merge_result, NestedContext(context, 'late')) else: # Note: Timer clearing solely an optimization. for window in to_be_merged: if window.end != merge_result.end: context.clear_timer('', TimeDomain.WATERMARK) if self.early: self.early.on_merge( to_be_merged, merge_result, NestedContext(context, 'early'))
[docs] def should_fire(self, time_domain, watermark, window, context): if self.is_late(context): return self.late.should_fire( time_domain, watermark, window, NestedContext(context, 'late')) elif watermark >= window.end: # Explicitly clear the timer so that late elements are not emitted again # when the timer is fired. context.clear_timer('', TimeDomain.WATERMARK) return True elif self.early: return self.early.should_fire( time_domain, watermark, window, NestedContext(context, 'early')) return False
[docs] def on_fire(self, watermark, window, context): if self.is_late(context): return self.late.on_fire( watermark, window, NestedContext(context, 'late')) elif watermark >= window.end: context.add_state(self.LATE_TAG, True) return not self.late elif self.early: self.early.on_fire(watermark, window, NestedContext(context, 'early')) return False
[docs] def reset(self, window, context): if self.late: context.clear_state(self.LATE_TAG) if self.early: self.early.reset(window, NestedContext(context, 'early')) if self.late: self.late.reset(window, NestedContext(context, 'late'))
def __eq__(self, other): return ( type(self) == type(other) and self.early == other.early and self.late == other.late) def __hash__(self): return hash((type(self), self.early, self.late))
[docs] @staticmethod def from_runner_api(proto, context): return AfterWatermark( early=TriggerFn.from_runner_api( proto.after_end_of_window.early_firings, context) if proto.after_end_of_window.HasField('early_firings') else None, late=TriggerFn.from_runner_api( proto.after_end_of_window.late_firings, context) if proto.after_end_of_window.HasField('late_firings') else None)
[docs] def to_runner_api(self, context): early_proto = self.early.underlying.to_runner_api( context) if self.early else None late_proto = self.late.underlying.to_runner_api( context) if self.late else None return beam_runner_api_pb2.Trigger( after_end_of_window=beam_runner_api_pb2.Trigger.AfterEndOfWindow( early_firings=early_proto, late_firings=late_proto))
[docs] def has_ontime_pane(self): return True
[docs]class AfterCount(TriggerFn): """Fire when there are at least count elements in this window pane. AfterCount is experimental. No backwards compatibility guarantees. """ COUNT_TAG = _CombiningValueStateTag('count', combiners.CountCombineFn()) def __init__(self, count): if not isinstance(count, numbers.Integral) or count < 1: raise ValueError("count (%d) must be a positive integer." % count) self.count = count def __repr__(self): return 'AfterCount(%s)' % self.count def __eq__(self, other): return type(self) == type(other) and self.count == other.count def __hash__(self): return hash(self.count)
[docs] def on_element(self, element, window, context): context.add_state(self.COUNT_TAG, 1)
[docs] def on_merge(self, to_be_merged, merge_result, context): # states automatically merged pass
[docs] def should_fire(self, time_domain, watermark, window, context): return context.get_state(self.COUNT_TAG) >= self.count
[docs] def on_fire(self, watermark, window, context): return True
[docs] def reset(self, window, context): context.clear_state(self.COUNT_TAG)
[docs] @staticmethod def from_runner_api(proto, unused_context): return AfterCount(proto.element_count.element_count)
[docs] def to_runner_api(self, unused_context): return beam_runner_api_pb2.Trigger( element_count=beam_runner_api_pb2.Trigger.ElementCount( element_count=self.count))
[docs] def has_ontime_pane(self): return False
[docs]class Repeatedly(TriggerFn): """Repeatedly invoke the given trigger, never finishing.""" def __init__(self, underlying): self.underlying = underlying def __repr__(self): return 'Repeatedly(%s)' % self.underlying def __eq__(self, other): return type(self) == type(other) and self.underlying == other.underlying def __hash__(self): return hash(self.underlying)
[docs] def on_element(self, element, window, context): self.underlying.on_element(element, window, context)
[docs] def on_merge(self, to_be_merged, merge_result, context): self.underlying.on_merge(to_be_merged, merge_result, context)
[docs] def should_fire(self, time_domain, watermark, window, context): return self.underlying.should_fire(time_domain, watermark, window, context)
[docs] def on_fire(self, watermark, window, context): if self.underlying.on_fire(watermark, window, context): self.underlying.reset(window, context) return False
[docs] def reset(self, window, context): self.underlying.reset(window, context)
[docs] @staticmethod def from_runner_api(proto, context): return Repeatedly( TriggerFn.from_runner_api(proto.repeat.subtrigger, context))
[docs] def to_runner_api(self, context): return beam_runner_api_pb2.Trigger( repeat=beam_runner_api_pb2.Trigger.Repeat( subtrigger=self.underlying.to_runner_api(context)))
[docs] def has_ontime_pane(self): return self.underlying.has_ontime_pane()
class _ParallelTriggerFn(with_metaclass(ABCMeta, TriggerFn)): # type: ignore[misc] def __init__(self, *triggers): self.triggers = triggers def __repr__(self): return '%s(%s)' % ( self.__class__.__name__, ', '.join(str(t) for t in self.triggers)) def __eq__(self, other): return type(self) == type(other) and self.triggers == other.triggers def __hash__(self): return hash(self.triggers) @abstractmethod def combine_op(self, trigger_results): pass def on_element(self, element, window, context): for ix, trigger in enumerate(self.triggers): trigger.on_element(element, window, self._sub_context(context, ix)) def on_merge(self, to_be_merged, merge_result, context): for ix, trigger in enumerate(self.triggers): trigger.on_merge( to_be_merged, merge_result, self._sub_context(context, ix)) def should_fire(self, time_domain, watermark, window, context): self._time_domain = time_domain return self.combine_op( trigger.should_fire( time_domain, watermark, window, self._sub_context(context, ix)) for ix, trigger in enumerate(self.triggers)) def on_fire(self, watermark, window, context): finished = [] for ix, trigger in enumerate(self.triggers): nested_context = self._sub_context(context, ix) if trigger.should_fire(TimeDomain.WATERMARK, watermark, window, nested_context): finished.append(trigger.on_fire(watermark, window, nested_context)) return self.combine_op(finished) def reset(self, window, context): for ix, trigger in enumerate(self.triggers): trigger.reset(window, self._sub_context(context, ix)) @staticmethod def _sub_context(context, index): return NestedContext(context, '%d/' % index) @staticmethod def from_runner_api(proto, context): subtriggers = [ TriggerFn.from_runner_api(subtrigger, context) for subtrigger in proto.after_all.subtriggers or proto.after_any.subtriggers ] if proto.after_all.subtriggers: return AfterAll(*subtriggers) else: return AfterAny(*subtriggers) def to_runner_api(self, context): subtriggers = [ subtrigger.to_runner_api(context) for subtrigger in self.triggers ] if self.combine_op == all: return beam_runner_api_pb2.Trigger( after_all=beam_runner_api_pb2.Trigger.AfterAll( subtriggers=subtriggers)) elif self.combine_op == any: return beam_runner_api_pb2.Trigger( after_any=beam_runner_api_pb2.Trigger.AfterAny( subtriggers=subtriggers)) else: raise NotImplementedError(self) def has_ontime_pane(self): return any(t.has_ontime_pane() for t in self.triggers)
[docs]class AfterAny(_ParallelTriggerFn): """Fires when any subtrigger fires. Also finishes when any subtrigger finishes. """ combine_op = any
[docs]class AfterAll(_ParallelTriggerFn): """Fires when all subtriggers have fired. Also finishes when all subtriggers have finished. """ combine_op = all
[docs]class AfterEach(TriggerFn): INDEX_TAG = _CombiningValueStateTag( 'index', (lambda indices: 0 if not indices else max(indices))) def __init__(self, *triggers): self.triggers = triggers def __repr__(self): return '%s(%s)' % ( self.__class__.__name__, ', '.join(str(t) for t in self.triggers)) def __eq__(self, other): return type(self) == type(other) and self.triggers == other.triggers def __hash__(self): return hash(self.triggers)
[docs] def on_element(self, element, window, context): ix = context.get_state(self.INDEX_TAG) if ix < len(self.triggers): self.triggers[ix].on_element( element, window, self._sub_context(context, ix))
[docs] def on_merge(self, to_be_merged, merge_result, context): # This takes the furthest window on merging. # TODO(robertwb): Revisit this when merging windows logic is settled for # all possible merging situations. ix = context.get_state(self.INDEX_TAG) if ix < len(self.triggers): self.triggers[ix].on_merge( to_be_merged, merge_result, self._sub_context(context, ix))
[docs] def should_fire(self, time_domain, watermark, window, context): ix = context.get_state(self.INDEX_TAG) if ix < len(self.triggers): return self.triggers[ix].should_fire( time_domain, watermark, window, self._sub_context(context, ix))
[docs] def on_fire(self, watermark, window, context): ix = context.get_state(self.INDEX_TAG) if ix < len(self.triggers): if self.triggers[ix].on_fire(watermark, window, self._sub_context(context, ix)): ix += 1 context.add_state(self.INDEX_TAG, ix) return ix == len(self.triggers)
[docs] def reset(self, window, context): context.clear_state(self.INDEX_TAG) for ix, trigger in enumerate(self.triggers): trigger.reset(window, self._sub_context(context, ix))
@staticmethod def _sub_context(context, index): return NestedContext(context, '%d/' % index)
[docs] @staticmethod def from_runner_api(proto, context): return AfterEach( *[ TriggerFn.from_runner_api(subtrigger, context) for subtrigger in proto.after_each.subtriggers ])
[docs] def to_runner_api(self, context): return beam_runner_api_pb2.Trigger( after_each=beam_runner_api_pb2.Trigger.AfterEach( subtriggers=[ subtrigger.to_runner_api(context) for subtrigger in self.triggers ]))
[docs] def has_ontime_pane(self): return any(t.has_ontime_pane() for t in self.triggers)
[docs]class OrFinally(AfterAny):
[docs] @staticmethod def from_runner_api(proto, context): return OrFinally( TriggerFn.from_runner_api(proto.or_finally.main, context), # getattr is used as finally is a keyword in Python TriggerFn.from_runner_api( getattr(proto.or_finally, 'finally'), context))
[docs] def to_runner_api(self, context): return beam_runner_api_pb2.Trigger( or_finally=beam_runner_api_pb2.Trigger.OrFinally( main=self.triggers[0].to_runner_api(context), # dict keyword argument is used as finally is a keyword in Python **{'finally': self.triggers[1].to_runner_api(context)}))
class TriggerContext(object): def __init__(self, outer, window, clock): self._outer = outer self._window = window self._clock = clock def get_current_time(self): return self._clock.time() def set_timer(self, name, time_domain, timestamp): self._outer.set_timer(self._window, name, time_domain, timestamp) def clear_timer(self, name, time_domain): self._outer.clear_timer(self._window, name, time_domain) def add_state(self, tag, value): self._outer.add_state(self._window, tag, value) def get_state(self, tag): return self._outer.get_state(self._window, tag) def clear_state(self, tag): return self._outer.clear_state(self._window, tag) class NestedContext(object): """Namespaced context useful for defining composite triggers.""" def __init__(self, outer, prefix): self._outer = outer self._prefix = prefix def get_current_time(self): return self._outer.get_current_time() def set_timer(self, name, time_domain, timestamp): self._outer.set_timer(self._prefix + name, time_domain, timestamp) def clear_timer(self, name, time_domain): self._outer.clear_timer(self._prefix + name, time_domain) def add_state(self, tag, value): self._outer.add_state(tag.with_prefix(self._prefix), value) def get_state(self, tag): return self._outer.get_state(tag.with_prefix(self._prefix)) def clear_state(self, tag): self._outer.clear_state(tag.with_prefix(self._prefix)) # pylint: disable=unused-argument class SimpleState(with_metaclass(ABCMeta, object)): # type: ignore[misc] """Basic state storage interface used for triggering. Only timers must hold the watermark (by their timestamp). """ @abstractmethod def set_timer(self, window, name, time_domain, timestamp): pass @abstractmethod def get_window(self, window_id): pass @abstractmethod def clear_timer(self, window, name, time_domain): pass @abstractmethod def add_state(self, window, tag, value): pass @abstractmethod def get_state(self, window, tag): pass @abstractmethod def clear_state(self, window, tag): pass def at(self, window, clock): return NestedContext(TriggerContext(self, window, clock), 'trigger') class UnmergedState(SimpleState): """State suitable for use in TriggerDriver. This class must be implemented by each backend. """ @abstractmethod def set_global_state(self, tag, value): pass @abstractmethod def get_global_state(self, tag, default=None): pass # pylint: enable=unused-argument class MergeableStateAdapter(SimpleState): """Wraps an UnmergedState, tracking merged windows.""" # TODO(robertwb): A similar indirection could be used for sliding windows # or other window_fns when a single element typically belongs to many windows. WINDOW_IDS = _ValueStateTag('window_ids') def __init__(self, raw_state): self.raw_state = raw_state self.window_ids = self.raw_state.get_global_state(self.WINDOW_IDS, {}) self.counter = None def set_timer(self, window, name, time_domain, timestamp): self.raw_state.set_timer(self._get_id(window), name, time_domain, timestamp) def clear_timer(self, window, name, time_domain): for window_id in self._get_ids(window): self.raw_state.clear_timer(window_id, name, time_domain) def add_state(self, window, tag, value): if isinstance(tag, _ValueStateTag): raise ValueError( 'Merging requested for non-mergeable state tag: %r.' % tag) elif isinstance(tag, _CombiningValueStateTag): tag = tag.without_extraction() self.raw_state.add_state(self._get_id(window), tag, value) def get_state(self, window, tag): if isinstance(tag, _CombiningValueStateTag): original_tag, tag = tag, tag.without_extraction() values = [ self.raw_state.get_state(window_id, tag) for window_id in self._get_ids(window) ] if isinstance(tag, _ValueStateTag): raise ValueError( 'Merging requested for non-mergeable state tag: %r.' % tag) elif isinstance(tag, _CombiningValueStateTag): return original_tag.combine_fn.extract_output( original_tag.combine_fn.merge_accumulators(values)) elif isinstance(tag, _ListStateTag): return [v for vs in values for v in vs] elif isinstance(tag, _SetStateTag): return {v for vs in values for v in vs} elif isinstance(tag, _WatermarkHoldStateTag): return tag.timestamp_combiner_impl.combine_all(values) else: raise ValueError('Invalid tag.', tag) def clear_state(self, window, tag): for window_id in self._get_ids(window): self.raw_state.clear_state(window_id, tag) if tag is None: del self.window_ids[window] self._persist_window_ids() def merge(self, to_be_merged, merge_result): for window in to_be_merged: if window != merge_result: if window in self.window_ids: if merge_result in self.window_ids: merge_window_ids = self.window_ids[merge_result] else: merge_window_ids = self.window_ids[merge_result] = [] merge_window_ids.extend(self.window_ids.pop(window)) self._persist_window_ids() def known_windows(self): return list(self.window_ids) def get_window(self, window_id): for window, ids in self.window_ids.items(): if window_id in ids: return window raise ValueError('No window for %s' % window_id) def _get_id(self, window): if window in self.window_ids: return self.window_ids[window][0] window_id = self._get_next_counter() self.window_ids[window] = [window_id] self._persist_window_ids() return window_id def _get_ids(self, window): return self.window_ids.get(window, []) def _get_next_counter(self): if not self.window_ids: self.counter = 0 elif self.counter is None: self.counter = max(k for ids in self.window_ids.values() for k in ids) self.counter += 1 return self.counter def _persist_window_ids(self): self.raw_state.set_global_state(self.WINDOW_IDS, self.window_ids) def __repr__(self): return '\n\t'.join([repr(self.window_ids)] + repr(self.raw_state).split('\n')) def create_trigger_driver( windowing, is_batch=False, phased_combine_fn=None, clock=None): """Create the TriggerDriver for the given windowing and options.""" # TODO(robertwb): We can do more if we know elements are in timestamp # sorted order. if windowing.is_default() and is_batch: driver = BatchGlobalTriggerDriver() elif (windowing.windowfn == GlobalWindows() and (windowing.triggerfn in [AfterCount(1), Always()]) and is_batch): # Here we also just pass through all the values exactly once. driver = BatchGlobalTriggerDriver() else: driver = GeneralTriggerDriver(windowing, clock) if phased_combine_fn: # TODO(ccy): Refactor GeneralTriggerDriver to combine values eagerly using # the known phased_combine_fn here. driver = CombiningTriggerDriver(phased_combine_fn, driver) return driver class TriggerDriver(with_metaclass(ABCMeta, object)): # type: ignore[misc] """Breaks a series of bundle and timer firings into window (pane)s.""" @abstractmethod def process_elements( self, state, windowed_values, output_watermark, input_watermark=MIN_TIMESTAMP): pass @abstractmethod def process_timer( self, window_id, name, time_domain, timestamp, state, input_watermark=None): pass def process_entire_key( self, key, windowed_values, unused_output_watermark=None, unused_input_watermark=None): state = InMemoryUnmergedState() for wvalue in self.process_elements(state, windowed_values, MIN_TIMESTAMP, MIN_TIMESTAMP): yield wvalue.with_value((key, wvalue.value)) while state.timers: fired = state.get_and_clear_timers() for timer_window, (name, time_domain, fire_time) in fired: for wvalue in self.process_timer(timer_window, name, time_domain, fire_time, state): yield wvalue.with_value((key, wvalue.value)) class _UnwindowedValues(observable.ObservableMixin): """Exposes iterable of windowed values as iterable of unwindowed values.""" def __init__(self, windowed_values): super(_UnwindowedValues, self).__init__() self._windowed_values = windowed_values def __iter__(self): for wv in self._windowed_values: unwindowed_value = wv.value self.notify_observers(unwindowed_value) yield unwindowed_value def __repr__(self): return '<_UnwindowedValues of %s>' % self._windowed_values def __reduce__(self): return list, (list(self), ) def __eq__(self, other): if isinstance(other, collections.Iterable): return all( a == b for a, b in zip_longest(self, other, fillvalue=object())) else: return NotImplemented def __hash__(self): return hash(tuple(self)) def __ne__(self, other): # TODO(BEAM-5949): Needed for Python 2 compatibility. return not self == other coder_impl.FastPrimitivesCoderImpl.register_iterable_like_type( _UnwindowedValues) class BatchGlobalTriggerDriver(TriggerDriver): """Groups all received values together. """ GLOBAL_WINDOW_TUPLE = (GlobalWindow(), ) ONLY_FIRING = windowed_value.PaneInfo( is_first=True, is_last=True, timing=windowed_value.PaneInfoTiming.ON_TIME, index=0, nonspeculative_index=0) def process_elements( self, state, windowed_values, unused_output_watermark, unused_input_watermark=MIN_TIMESTAMP): yield WindowedValue( _UnwindowedValues(windowed_values), MIN_TIMESTAMP, self.GLOBAL_WINDOW_TUPLE, self.ONLY_FIRING) def process_timer( self, window_id, name, time_domain, timestamp, state, input_watermark=None): raise TypeError('Triggers never set or called for batch default windowing.') class CombiningTriggerDriver(TriggerDriver): """Uses a phased_combine_fn to process output of wrapped TriggerDriver.""" def __init__(self, phased_combine_fn, underlying): self.phased_combine_fn = phased_combine_fn self.underlying = underlying def process_elements( self, state, windowed_values, output_watermark, input_watermark=MIN_TIMESTAMP): uncombined = self.underlying.process_elements( state, windowed_values, output_watermark, input_watermark) for output in uncombined: yield output.with_value(self.phased_combine_fn.apply(output.value)) def process_timer( self, window_id, name, time_domain, timestamp, state, input_watermark=None): uncombined = self.underlying.process_timer( window_id, name, time_domain, timestamp, state, input_watermark) for output in uncombined: yield output.with_value(self.phased_combine_fn.apply(output.value)) class GeneralTriggerDriver(TriggerDriver): """Breaks a series of bundle and timer firings into window (pane)s. Suitable for all variants of Windowing. """ ELEMENTS = _ListStateTag('elements') TOMBSTONE = _CombiningValueStateTag('tombstone', combiners.CountCombineFn()) INDEX = _CombiningValueStateTag('index', combiners.CountCombineFn()) NONSPECULATIVE_INDEX = _CombiningValueStateTag( 'nonspeculative_index', combiners.CountCombineFn()) def __init__(self, windowing, clock): self.clock = clock self.allowed_lateness = windowing.allowed_lateness self.window_fn = windowing.windowfn self.timestamp_combiner_impl = TimestampCombiner.get_impl( windowing.timestamp_combiner, self.window_fn) # pylint: disable=invalid-name self.WATERMARK_HOLD = _WatermarkHoldStateTag( 'watermark', self.timestamp_combiner_impl) # pylint: enable=invalid-name self.trigger_fn = windowing.triggerfn self.accumulation_mode = windowing.accumulation_mode self.is_merging = True def process_elements( self, state, windowed_values, output_watermark, input_watermark=MIN_TIMESTAMP): if self.is_merging: state = MergeableStateAdapter(state) windows_to_elements = collections.defaultdict(list) for wv in windowed_values: for window in wv.windows: # ignore expired windows if input_watermark > window.end + self.allowed_lateness: continue windows_to_elements[window].append((wv.value, wv.timestamp)) # First handle merging. if self.is_merging: old_windows = set(state.known_windows()) all_windows = old_windows.union(list(windows_to_elements)) if all_windows != old_windows: merged_away = {} class TriggerMergeContext(WindowFn.MergeContext): def merge(_, to_be_merged, merge_result): # pylint: disable=no-self-argument for window in to_be_merged: if window != merge_result: merged_away[window] = merge_result # Clear state associated with PaneInfo since it is # not preserved across merges. state.clear_state(window, self.INDEX) state.clear_state(window, self.NONSPECULATIVE_INDEX) state.merge(to_be_merged, merge_result) # using the outer self argument. self.trigger_fn.on_merge( to_be_merged, merge_result, state.at(merge_result, self.clock)) self.window_fn.merge(TriggerMergeContext(all_windows)) merged_windows_to_elements = collections.defaultdict(list) for window, values in windows_to_elements.items(): while window in merged_away: window = merged_away[window] merged_windows_to_elements[window].extend(values) windows_to_elements = merged_windows_to_elements for window in merged_away: state.clear_state(window, self.WATERMARK_HOLD) # Next handle element adding. for window, elements in windows_to_elements.items(): if state.get_state(window, self.TOMBSTONE): continue # Add watermark hold. # TODO(ccy): Add late data and garbage-collection hold support. output_time = self.timestamp_combiner_impl.merge( window, ( element_output_time for element_output_time in ( self.timestamp_combiner_impl.assign_output_time( window, timestamp) for unused_value, timestamp in elements) if element_output_time >= output_watermark)) if output_time is not None: state.add_state(window, self.WATERMARK_HOLD, output_time) context = state.at(window, self.clock) for value, unused_timestamp in elements: state.add_state(window, self.ELEMENTS, value) self.trigger_fn.on_element(value, window, context) # Maybe fire this window. if self.trigger_fn.should_fire(TimeDomain.WATERMARK, input_watermark, window, context): finished = self.trigger_fn.on_fire(input_watermark, window, context) yield self._output( window, finished, state, input_watermark, output_watermark, False) def process_timer( self, window_id, unused_name, time_domain, timestamp, state, input_watermark=None): if input_watermark is None: input_watermark = timestamp if self.is_merging: state = MergeableStateAdapter(state) window = state.get_window(window_id) if state.get_state(window, self.TOMBSTONE): return if time_domain in (TimeDomain.WATERMARK, TimeDomain.REAL_TIME): if not self.is_merging or window in state.known_windows(): context = state.at(window, self.clock) if self.trigger_fn.should_fire(time_domain, timestamp, window, context): finished = self.trigger_fn.on_fire(timestamp, window, context) yield self._output( window, finished, state, input_watermark, timestamp, time_domain == TimeDomain.WATERMARK) else: raise Exception('Unexpected time domain: %s' % time_domain) def _output( self, window, finished, state, input_watermark, output_watermark, maybe_ontime): """Output window and clean up if appropriate.""" index = state.get_state(window, self.INDEX) state.add_state(window, self.INDEX, 1) if output_watermark <= window.max_timestamp(): nonspeculative_index = -1 timing = windowed_value.PaneInfoTiming.EARLY if state.get_state(window, self.NONSPECULATIVE_INDEX): nonspeculative_index = state.get_state( window, self.NONSPECULATIVE_INDEX) state.add_state(window, self.NONSPECULATIVE_INDEX, 1) _LOGGER.warning( 'Watermark moved backwards in time ' 'or late data moved window end forward.') else: nonspeculative_index = state.get_state(window, self.NONSPECULATIVE_INDEX) state.add_state(window, self.NONSPECULATIVE_INDEX, 1) timing = ( windowed_value.PaneInfoTiming.ON_TIME if maybe_ontime and nonspeculative_index == 0 else windowed_value.PaneInfoTiming.LATE) pane_info = windowed_value.PaneInfo( index == 0, finished, timing, index, nonspeculative_index) values = state.get_state(window, self.ELEMENTS) if finished: # TODO(robertwb): allowed lateness state.clear_state(window, self.ELEMENTS) state.add_state(window, self.TOMBSTONE, 1) elif self.accumulation_mode == AccumulationMode.DISCARDING: state.clear_state(window, self.ELEMENTS) timestamp = state.get_state(window, self.WATERMARK_HOLD) if timestamp is None: # If no watermark hold was set, output at end of window. timestamp = window.max_timestamp() elif input_watermark < window.end and self.trigger_fn.has_ontime_pane(): # Hold the watermark in case there is an empty pane that needs to be fired # at the end of the window. pass else: state.clear_state(window, self.WATERMARK_HOLD) return WindowedValue(values, timestamp, (window, ), pane_info) class InMemoryUnmergedState(UnmergedState): """In-memory implementation of UnmergedState. Used for batch and testing. """ def __init__(self, defensive_copy=True): # TODO(robertwb): Skip defensive_copy in production if it's too expensive. self.timers = collections.defaultdict(dict) self.state = collections.defaultdict(lambda: collections.defaultdict(list)) self.global_state = {} self.defensive_copy = defensive_copy def copy(self): cloned_object = InMemoryUnmergedState(defensive_copy=self.defensive_copy) cloned_object.timers = copy.deepcopy(self.timers) cloned_object.global_state = copy.deepcopy(self.global_state) for window in self.state: for tag in self.state[window]: cloned_object.state[window][tag] = copy.copy(self.state[window][tag]) return cloned_object def set_global_state(self, tag, value): assert isinstance(tag, _ValueStateTag) if self.defensive_copy: value = copy.deepcopy(value) self.global_state[tag.tag] = value def get_global_state(self, tag, default=None): return self.global_state.get(tag.tag, default) def set_timer(self, window, name, time_domain, timestamp): self.timers[window][(name, time_domain)] = timestamp def clear_timer(self, window, name, time_domain): self.timers[window].pop((name, time_domain), None) if not self.timers[window]: del self.timers[window] def get_window(self, window_id): return window_id def add_state(self, window, tag, value): if self.defensive_copy: value = copy.deepcopy(value) if isinstance(tag, _ValueStateTag): self.state[window][tag.tag] = value elif isinstance(tag, _CombiningValueStateTag): # TODO(robertwb): Store merged accumulators. self.state[window][tag.tag].append(value) elif isinstance(tag, _ListStateTag): self.state[window][tag.tag].append(value) elif isinstance(tag, _SetStateTag): self.state[window][tag.tag].append(value) elif isinstance(tag, _WatermarkHoldStateTag): self.state[window][tag.tag].append(value) else: raise ValueError('Invalid tag.', tag) def get_state(self, window, tag): values = self.state[window][tag.tag] if isinstance(tag, _ValueStateTag): return values elif isinstance(tag, _CombiningValueStateTag): return tag.combine_fn.apply(values) elif isinstance(tag, _ListStateTag): return values elif isinstance(tag, _SetStateTag): return values elif isinstance(tag, _WatermarkHoldStateTag): return tag.timestamp_combiner_impl.combine_all(values) else: raise ValueError('Invalid tag.', tag) def clear_state(self, window, tag): self.state[window].pop(tag.tag, None) if not self.state[window]: self.state.pop(window, None) def get_timers( self, clear=False, watermark=MAX_TIMESTAMP, processing_time=None): """Gets expired timers and reports if there are any realtime timers set per state. Expiration is measured against the watermark for event-time timers, and against a wall clock for processing-time timers. """ expired = [] has_realtime_timer = False for window, timers in list(self.timers.items()): for (name, time_domain), timestamp in list(timers.items()): if time_domain == TimeDomain.REAL_TIME: time_marker = processing_time has_realtime_timer = True elif time_domain == TimeDomain.WATERMARK: time_marker = watermark else: _LOGGER.error( 'TimeDomain error: No timers defined for time domain %s.', time_domain) if timestamp <= time_marker: expired.append((window, (name, time_domain, timestamp))) if clear: del timers[(name, time_domain)] if not timers and clear: del self.timers[window] return expired, has_realtime_timer def get_and_clear_timers(self, watermark=MAX_TIMESTAMP): return self.get_timers(clear=True, watermark=watermark)[0] def get_earliest_hold(self): earliest_hold = MAX_TIMESTAMP for unused_window, tagged_states in iteritems(self.state): # TODO(BEAM-2519): currently, this assumes that the watermark hold tag is # named "watermark". This is currently only true because the only place # watermark holds are set is in the GeneralTriggerDriver, where we use # this name. We should fix this by allowing enumeration of the tag types # used in adding state. if 'watermark' in tagged_states and tagged_states['watermark']: hold = min(tagged_states['watermark']) - TIME_GRANULARITY earliest_hold = min(earliest_hold, hold) return earliest_hold def __repr__(self): state_str = '\n'.join( '%s: %s' % (key, dict(state)) for key, state in self.state.items()) return 'timers: %s\nstate: %s' % (dict(self.timers), state_str)