Custom window patterns

The samples on this page demonstrate common custom window patterns. You can create custom windows with WindowFn functions. For more information, see the programming guide section on windowing.

Note: Custom merging windows isn’t supported in Python (with fnapi).

Using data to dynamically set session window gaps

You can modify the assignWindows function to use data-driven gaps, then window incoming data into sessions.

Access the assignWindows function through WindowFn.AssignContext.element(). The original, fixed-duration assignWindows function is:


public Collection<IntervalWindow> assignWindows(WindowFn.AssignContext c) {

  // Assign each element into a window from its timestamp until gapDuration in the
  // future.  Overlapping windows (representing elements within gapDuration of
  // each other) will be merged.
  return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
}

Creating data-driven gaps

To create data-driven gaps, add the following snippets to the assignWindows function:

For example, the following function assigns each element to a window between the timestamp and gapDuration:

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
  // Assign each element into a window from its timestamp until gapDuration in the
  // future.  Overlapping windows (representing elements within gapDuration of
  // each other) will be merged.
  Duration dataDrivenGap;
  TableRow message = c.element();

  try {
    dataDrivenGap = Duration.standardSeconds(Long.parseLong(message.get("gap").toString()));
  } catch (Exception e) {
    dataDrivenGap = gapDuration;
  }
  return Arrays.asList(new IntervalWindow(c.timestamp(), dataDrivenGap));
}

Then, set the gapDuration field in a windowing function:

public static class DynamicSessions extends WindowFn<TableRow, IntervalWindow> {
  /** Duration of the gaps between sessions. */
  private final Duration gapDuration;

  /** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
  private DynamicSessions(Duration gapDuration) {
    this.gapDuration = gapDuration;
  }


Windowing messages into sessions

After creating data-driven gaps, you can window incoming data into the new, custom sessions.

First, set the session length to the gap duration:

/** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
public static DynamicSessions withDefaultGapDuration(Duration gapDuration) {
  return new DynamicSessions(gapDuration);
}


Lastly, window data into sessions in your pipeline:

p.apply(
    "Window into sessions",
    Window.<TableRow>into(
        DynamicSessions.withDefaultGapDuration(Duration.standardSeconds(10))));

Example data and windows

The following test data tallies two users’ scores with and without the gap attribute:

.apply("Create data", Create.timestamped(
            TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"12\",\"gap\":\"5\"}", new Instant()),
            TimestampedValue.of("{\"user\":\"user-2\",\"score\":\"4\"}", new Instant()),
            TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"-3\",\"gap\":\"5\"}", new Instant().plus(2000)),
            TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"2\",\"gap\":\"5\"}", new Instant().plus(9000)),
            TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"7\",\"gap\":\"5\"}", new Instant().plus(12000)),
            TimestampedValue.of("{\"user\":\"user-2\",\"score\":\"10\"}", new Instant().plus(12000)))
        .withCoder(StringUtf8Coder.of()))

The diagram below visualizes the test data:

Two sets of data and the standard and dynamic sessions with which the data is windowed.

Standard sessions

Standard sessions use the following windows and scores:

user=user-2, score=4, window=[2019-05-26T13:28:49.122Z..2019-05-26T13:28:59.122Z)
user=user-1, score=18, window=[2019-05-26T13:28:48.582Z..2019-05-26T13:29:12.774Z)
user=user-2, score=10, window=[2019-05-26T13:29:03.367Z..2019-05-26T13:29:13.367Z)

User #1 sees two events separated by 12 seconds. With standard sessions, the gap defaults to 10 seconds; both scores are in different sessions, so the scores aren’t added.

User #2 sees four events, seperated by two, seven, and three seconds, respectively. Since none of the gaps are greater than the default, the four events are in the same standard session and added together (18 points).

Dynamic sessions

The dynamic sessions specify a five-second gap, so they use the following windows and scores:

user=user-2, score=4, window=[2019-05-26T14:30:22.969Z..2019-05-26T14:30:32.969Z)
user=user-1, score=9, window=[2019-05-26T14:30:22.429Z..2019-05-26T14:30:30.553Z)
user=user-1, score=9, window=[2019-05-26T14:30:33.276Z..2019-05-26T14:30:41.849Z)
user=user-2, score=10, window=[2019-05-26T14:30:37.357Z..2019-05-26T14:30:47.357Z)

With dynamic sessions, User #2 gets different scores. The third messages arrives seven seconds after the second message, so it’s grouped into a different session. The large, 18-point session is split into two 9-point sessions.