Class TimerUtils

java.lang.Object
org.apache.beam.runners.spark.util.TimerUtils

public class TimerUtils extends Object
Utility class for handling timers in the Spark runner. Provides functionality for managing timer operations in stateful processing.
  • Field Details

    • EMPTY_BYTE_ARRAY

      public static final byte[] EMPTY_BYTE_ARRAY
    • TIMER_MARKER

      public static final TimerUtils.TimerMarker TIMER_MARKER
      Constant marker used to identify timer values in transformations.
  • Constructor Details

    • TimerUtils

      public TimerUtils()
  • Method Details

    • triggerExpiredTimers

      public static <W extends BoundedWindow> void triggerExpiredTimers(SparkTimerInternals sparkTimerInternals, WindowingStrategy<?,W> windowingStrategy, AbstractInOutIterator<?,?,?> abstractInOutIterator)
      Fires all expired timers using the provided iterator.

      Gets expired timers from SparkTimerInternals and processes each one. The AbstractInOutIterator.fireTimer(org.apache.beam.runners.core.TimerInternals.TimerData) method automatically deletes each timer after processing.

      Type Parameters:
      W - Window type
      Parameters:
      sparkTimerInternals - Source of timer data
      windowingStrategy - Used to determine which timers are expired
      abstractInOutIterator - Iterator that processes and then removes the timers
    • dropExpiredTimers

      public static <W extends BoundedWindow> void dropExpiredTimers(SparkTimerInternals sparkTimerInternals, WindowingStrategy<?,W> windowingStrategy)
    • toPeriodicDStream

      public static <KeyT, ValueT> org.apache.spark.streaming.api.java.JavaDStream<WindowedValue<KV<KeyT,ValueT>>> toPeriodicDStream(org.apache.spark.streaming.api.java.JavaDStream<WindowedValue<KV<KeyT,ValueT>>> originalLRDD)
      Converts a standard DStream into a periodic DStream that ensures all keys are processed in every micro-batch, even if they don't receive new data.

      This method addresses a fundamental challenge in stateful processing with Spark Streaming: ensuring that timer events for all keys are processed in every batch, regardless of whether those keys receive new data. Without this mechanism, timers associated with inactive keys would not fire at the appropriate times.

      Implementation details:

      1. First, it extracts all unique keys from the input DStream and marks them with a special timer key marker.
      2. Then, it uses Spark's mapWithState operation to maintain a persistent set of all keys that have ever been seen.
      3. For each batch, it takes a snapshot of this state (all known keys).
      4. It then transforms the original DStream by combining it with synthetic events for all known keys.
      5. These synthetic events use the special TimerUtils.WindowedValueForTimerMarker class with the TIMER_MARKER to allow downstream operations to distinguish them from regular data events.

      When processed downstream, the synthetic key-value pairs will trigger evaluation of any pending timers for all keys, even those that haven't received new data in the current batch. This ensures consistent and reliable timer execution across the entire stateful pipeline.

      This approach is critical for implementing features like windowing, sessions, and other time-based operations that rely on timers firing at the correct moments, regardless of data activity.

      Type Parameters:
      KeyT - The type of keys
      ValueT - The type of values
      Parameters:
      originalLRDD - The original DStream of windowed key-value pairs to be enhanced with periodic timer events
      Returns:
      A new DStream that includes periodic events for all known keys, ensuring timer processing for every key in each batch