apache_beam.transforms.trigger module

Support for Apache Beam triggers.

Triggers control when in processing time windows get emitted.

class apache_beam.transforms.trigger.AccumulationMode[source]

Bases: object

Controls what to do with data when a trigger fires multiple times.

DISCARDING = 1
ACCUMULATING = 2
class apache_beam.transforms.trigger.TriggerFn[source]

Bases: object

A TriggerFn determines when window (panes) are emitted.

See https://beam.apache.org/documentation/programming-guide/#triggers

on_element(element, window, context)[source]

Called when a new element arrives in a window.

Parameters:
  • element – the element being added
  • window – the window to which the element is being added
  • context – a context (e.g. a TriggerContext instance) for managing state and setting timers
on_merge(to_be_merged, merge_result, context)[source]

Called when multiple windows are merged.

Parameters:
  • to_be_merged – the set of windows to be merged
  • merge_result – the window into which the windows are being merged
  • context – a context (e.g. a TriggerContext instance) for managing state and setting timers
should_fire(time_domain, timestamp, window, context)[source]

Whether this trigger should cause the window to fire.

Parameters:
  • time_domain – WATERMARK for event-time timers and REAL_TIME for processing-time timers.
  • timestamp – for time_domain WATERMARK, it represents the watermark: (a lower bound on) the watermark of the system and for time_domain REAL_TIME, it represents the trigger: timestamp of the processing-time timer.
  • window – the window whose trigger is being considered
  • context – a context (e.g. a TriggerContext instance) for managing state and setting timers
Returns:

whether this trigger should cause a firing

has_ontime_pane()[source]

Whether this trigger creates an empty pane even if there are no elements.

Returns:True if this trigger guarantees that there will always be an ON_TIME pane even if there are no elements in that pane.
on_fire(watermark, window, context)[source]

Called when a trigger actually fires.

Parameters:
  • watermark – (a lower bound on) the watermark of the system
  • window – the window whose trigger is being fired
  • context – a context (e.g. a TriggerContext instance) for managing state and setting timers
Returns:

whether this trigger is finished

reset(window, context)[source]

Clear any state and timers used by this TriggerFn.

may_lose_data(unused_windowing)[source]

Returns whether or not this trigger could cause data loss.

A trigger can cause data loss in the following scenarios:

  • The trigger has a chance to finish. For instance, AfterWatermark() without a late trigger would cause all late data to be lost. This scenario is only accounted for if the windowing strategy allows late data. Otherwise, the trigger is not responsible for the data loss.
  • The trigger condition may not be met. For instance, Repeatedly(AfterCount(N)) may not fire due to N not being met. This is only accounted for if the condition itself led to data loss. Repeatedly(AfterCount(1)) is safe, since it would only not fire if there is no data to lose, but Repeatedly(AfterCount(2)) can cause data loss if there is only one record.

Note that this only returns the potential for loss. It does not mean that there will be data loss. It also only accounts for loss related to the trigger, not other potential causes.

Parameters:windowing – The Windowing that this trigger belongs to. It does not need to be the top-level trigger.
Returns:
The DataLossReason. If there is no potential loss,
DataLossReason.NO_POTENTIAL_LOSS is returned. Otherwise, all the potential reasons are returned as a single value. For instance, if data loss can result from finishing or not having the condition met, the result will be DataLossReason.MAY_FINISH|CONDITION_NOT_GUARANTEED.
static from_runner_api(proto, context)[source]
to_runner_api(unused_context)[source]
class apache_beam.transforms.trigger.DefaultTrigger[source]

Bases: apache_beam.transforms.trigger.TriggerFn

Semantically Repeatedly(AfterWatermark()), but more optimized.

on_element(element, window, context)[source]
on_merge(to_be_merged, merge_result, context)[source]
should_fire(time_domain, watermark, window, context)[source]
on_fire(watermark, window, context)[source]
reset(window, context)[source]
may_lose_data(unused_windowing)[source]
static from_runner_api(proto, context)[source]
to_runner_api(unused_context)[source]
has_ontime_pane()[source]
class apache_beam.transforms.trigger.AfterProcessingTime(delay=0)[source]

Bases: apache_beam.transforms.trigger.TriggerFn

Fire exactly once after a specified delay from processing time.

AfterProcessingTime is experimental. No backwards compatibility guarantees.

Initialize a processing time trigger with a delay in seconds.

on_element(element, window, context)[source]
on_merge(to_be_merged, merge_result, context)[source]
should_fire(time_domain, timestamp, window, context)[source]
on_fire(timestamp, window, context)[source]
reset(window, context)[source]
may_lose_data(unused_windowing)[source]
static from_runner_api(proto, context)[source]
to_runner_api(context)[source]
has_ontime_pane()[source]
class apache_beam.transforms.trigger.AfterWatermark(early=None, late=None)[source]

Bases: apache_beam.transforms.trigger.TriggerFn

Fire exactly once when the watermark passes the end of the window.

Parameters:
  • early – if not None, a speculative trigger to repeatedly evaluate before the watermark passes the end of the window
  • late – if not None, a speculative trigger to repeatedly evaluate after the watermark passes the end of the window
LATE_TAG = CombiningValueStateTag(is_late, CallableWrapperCombineFn(<built-in function any>))
is_late(context)[source]
on_element(element, window, context)[source]
on_merge(to_be_merged, merge_result, context)[source]
should_fire(time_domain, watermark, window, context)[source]
on_fire(watermark, window, context)[source]
reset(window, context)[source]
may_lose_data(windowing)[source]

May cause data loss if the windowing allows lateness and either:

  • The late trigger is not set
  • The late trigger may cause data loss.

The second case is equivalent to Repeatedly(late).may_lose_data(windowing)

static from_runner_api(proto, context)[source]
to_runner_api(context)[source]
has_ontime_pane()[source]
class apache_beam.transforms.trigger.AfterCount(count)[source]

Bases: apache_beam.transforms.trigger.TriggerFn

Fire when there are at least count elements in this window pane.

AfterCount is experimental. No backwards compatibility guarantees.

COUNT_TAG = CombiningValueStateTag(count, <apache_beam.transforms.combiners.CountCombineFn object>)
on_element(element, window, context)[source]
on_merge(to_be_merged, merge_result, context)[source]
should_fire(time_domain, watermark, window, context)[source]
on_fire(watermark, window, context)[source]
reset(window, context)[source]
may_lose_data(unused_windowing)[source]
static from_runner_api(proto, unused_context)[source]
to_runner_api(unused_context)[source]
has_ontime_pane()[source]
class apache_beam.transforms.trigger.Repeatedly(underlying)[source]

Bases: apache_beam.transforms.trigger.TriggerFn

Repeatedly invoke the given trigger, never finishing.

on_element(element, window, context)[source]
on_merge(to_be_merged, merge_result, context)[source]
should_fire(time_domain, watermark, window, context)[source]
on_fire(watermark, window, context)[source]
reset(window, context)[source]
may_lose_data(windowing)[source]

Repeatedly may only lose data if the underlying trigger may not have its condition met.

For underlying triggers that may finish, Repeatedly overrides that behavior.

static from_runner_api(proto, context)[source]
to_runner_api(context)[source]
has_ontime_pane()[source]
class apache_beam.transforms.trigger.AfterAny(*triggers)[source]

Bases: apache_beam.transforms.trigger._ParallelTriggerFn

Fires when any subtrigger fires.

Also finishes when any subtrigger finishes.

combine_op()

Return True if bool(x) is True for any x in the iterable.

If the iterable is empty, return False.

may_lose_data(windowing)[source]
class apache_beam.transforms.trigger.AfterAll(*triggers)[source]

Bases: apache_beam.transforms.trigger._ParallelTriggerFn

Fires when all subtriggers have fired.

Also finishes when all subtriggers have finished.

combine_op()

Return True if bool(x) is True for all values x in the iterable.

If the iterable is empty, return True.

may_lose_data(windowing)[source]
class apache_beam.transforms.trigger.AfterEach(*triggers)[source]

Bases: apache_beam.transforms.trigger.TriggerFn

INDEX_TAG = CombiningValueStateTag(index, CallableWrapperCombineFn(<function AfterEach.<lambda>>))
on_element(element, window, context)[source]
on_merge(to_be_merged, merge_result, context)[source]
should_fire(time_domain, watermark, window, context)[source]
on_fire(watermark, window, context)[source]
reset(window, context)[source]
may_lose_data(windowing)[source]
static from_runner_api(proto, context)[source]
to_runner_api(context)[source]
has_ontime_pane()[source]
class apache_beam.transforms.trigger.OrFinally(*triggers)[source]

Bases: apache_beam.transforms.trigger.AfterAny

static from_runner_api(proto, context)[source]
to_runner_api(context)[source]