public abstract class Window<T> extends PTransform<PCollection<T>,PCollection<T>>
Window
logically divides up or groups the elements of a
PCollection
into finite windows according to a WindowFn
.
The output of Window
contains the same elements as input, but they
have been logically assigned to windows. The next
GroupByKeys
,
including one within composite transforms, will group by the combination of
keys and windows.
See GroupByKey
for more information about how grouping with windows works.
Windowing a PCollection
divides the elements into windows based
on the associated event time for each element. This is especially useful
for PCollections
with unbounded size, since it allows operating on
a sub-group of the elements placed into a related window. For PCollections
with a bounded size (aka. conventional batch mode), by default, all data is
implicitly in a single window, unless Window
is applied.
For example, a simple form of windowing divides up the data into
fixed-width time intervals, using FixedWindows
.
The following example demonstrates how to use Window
in a pipeline
that counts the number of occurrences of strings each minute:
PCollection<String> items = ...;
PCollection<String> windowed_items = items.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
PCollection<KV<String, Long>> windowed_counts = windowed_items.apply(
Count.<String>perElement());
Let (data, timestamp) denote a data element along with its timestamp. Then, if the input to this pipeline consists of {("foo", 15s), ("bar", 30s), ("foo", 45s), ("foo", 1m30s)}, the output will be {(KV("foo", 2), 1m), (KV("bar", 1), 1m), (KV("foo", 1), 2m)}
Several predefined WindowFn
s are provided:
FixedWindows
partitions the timestamps into fixed-width intervals.
SlidingWindows
places data into overlapping fixed-width intervals.
Sessions
groups data into sessions where each item in a window
is separated from the next by no more than a specified gap.
Additionally, custom WindowFn
s can be created, by creating new
subclasses of WindowFn
.
triggering(Trigger)
allows specifying a trigger to control when
(in processing time) results for the given window can be produced. If unspecified, the default
behavior is to trigger first when the watermark passes the end of the window, and then trigger
again every time there is late arriving data.
Elements are added to the current window pane as they arrive. When the root trigger fires, output is produced based on the elements in the current pane.
Depending on the trigger, this can be used both to output partial results early during the processing of the whole window, and to deal with late arriving in batches.
Continuing the earlier example, if we wanted to emit the values that were available when the watermark passed the end of the window, and then output any late arriving elements once-per (actual hour) hour until we have finished processing the next 24-hours of data. (The use of watermark time to stop processing tends to be more robust if the data source is slow for a few days, etc.)
PCollection<String> items = ...;
PCollection<String> windowed_items = items.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1))))
.withAllowedLateness(Duration.standardDays(1)));
PCollection<KV<String, Long>> windowed_counts = windowed_items.apply(
Count.<String>perElement());
On the other hand, if we wanted to get early results every minute of processing time (for which there were new elements in the given window) we could do the following:
PCollection<String> windowed_items = items.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))
.withAllowedLateness(Duration.ZERO));
After a GroupByKey
the trigger is set to
a trigger that will preserve the intent of the upstream trigger. See
Trigger.getContinuationTrigger()
for more information.
See Trigger
for details on the available triggers.
Modifier and Type | Class and Description |
---|---|
static class |
Window.Assign<T>
A Primitive
PTransform that assigns windows to elements based on a WindowFn . |
static class |
Window.ClosingBehavior
Specifies the conditions under which a final pane will be created when a window is permanently
closed.
|
name
Constructor and Description |
---|
Window() |
Modifier and Type | Method and Description |
---|---|
Window<T> |
accumulatingFiredPanes()
Returns a new
Window PTransform that uses the registered WindowFn and
Triggering behavior, and that accumulates elements in a pane after they are triggered. |
static <T> Window<T> |
configure()
Returns a new builder for a
Window transform for setting windowing parameters other
than the windowing function. |
Window<T> |
discardingFiredPanes()
Returns a new
Window PTransform that uses the registered WindowFn and
Triggering behavior, and that discards elements in a pane after they are triggered. |
PCollection<T> |
expand(PCollection<T> input)
Applies this
PTransform on the given InputT , and returns its
Output . |
protected Coder<?> |
getDefaultOutputCoder(PCollection<T> input)
Returns the default
Coder to use for the output of this
single-output PTransform when applied to the given input. |
protected java.lang.String |
getKindString()
Returns the name to use by default for this
PTransform
(not including the names of any enclosing PTransform s). |
WindowingStrategy<?,?> |
getOutputStrategyInternal(WindowingStrategy<?,?> inputStrategy)
Get the output strategy of this
Window PTransform . |
abstract WindowFn<? super T,?> |
getWindowFn() |
static <T> Window<T> |
into(WindowFn<? super T,?> fn)
|
void |
populateDisplayData(DisplayData.Builder builder)
Register display data for the given transform or component.
|
static <T> org.apache.beam.sdk.transforms.windowing.Window.Remerge<T> |
remerge()
Creates a
Window PTransform that does not change assigned
windows, but will cause windows to be merged again as part of the next
GroupByKey . |
Window<T> |
triggering(Trigger trigger)
Sets a non-default trigger for this
Window PTransform . |
Window<T> |
withAllowedLateness(Duration allowedLateness)
Override the amount of lateness allowed for data elements in the output
PCollection
and downstream PCollections until explicitly set again. |
Window<T> |
withAllowedLateness(Duration allowedLateness,
Window.ClosingBehavior behavior)
Override the amount of lateness allowed for data elements in the pipeline.
|
Window<T> |
withTimestampCombiner(TimestampCombiner timestampCombiner)
(Experimental) Override the default
TimestampCombiner , to control
the output timestamp of values output from a GroupByKey operation. |
getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getName, toString, validate
public static <T> Window<T> into(WindowFn<? super T,?> fn)
Window
PTransform
that uses the given
WindowFn
to window the data.
The resulting PTransform
's types have been bound, with both the
input and output being a PCollection<T>
, inferred from the types of
the argument WindowFn
. It is ready to be applied, or further
properties can be set on it first.
public static <T> Window<T> configure()
Window
transform for setting windowing parameters other
than the windowing function.@Experimental(value=TRIGGER) public Window<T> triggering(Trigger trigger)
Window
PTransform
.
Elements that are assigned to a specific window will be output when
the trigger fires.
Trigger
has more details on the available triggers.
Must also specify allowed lateness using withAllowedLateness(org.joda.time.Duration)
and accumulation
mode using either discardingFiredPanes()
or accumulatingFiredPanes()
.
@Experimental(value=TRIGGER) public Window<T> discardingFiredPanes()
Window
PTransform
that uses the registered WindowFn and
Triggering behavior, and that discards elements in a pane after they are triggered.
Does not modify this transform. The resulting PTransform
is sufficiently
specified to be applied, but more properties can still be specified.
@Experimental(value=TRIGGER) public Window<T> accumulatingFiredPanes()
Window
PTransform
that uses the registered WindowFn and
Triggering behavior, and that accumulates elements in a pane after they are triggered.
Does not modify this transform. The resulting PTransform
is sufficiently
specified to be applied, but more properties can still be specified.
@Experimental(value=TRIGGER) public Window<T> withAllowedLateness(Duration allowedLateness)
PCollection
and downstream PCollections
until explicitly set again.
Like the other properties on this Window
operation, this will be applied at
the next GroupByKey
. Any elements that are later than this as decided by
the system-maintained watermark will be dropped.
This value also determines how long state will be kept around for old windows. Once no elements will be added to a window (because this duration has passed) any state associated with the window will be cleaned up.
Depending on the trigger this may not produce a pane with PaneInfo.isLast
. See
Window.ClosingBehavior.FIRE_IF_NON_EMPTY
for more details.
@Experimental(value=OUTPUT_TIME) public Window<T> withTimestampCombiner(TimestampCombiner timestampCombiner)
TimestampCombiner
, to control
the output timestamp of values output from a GroupByKey
operation.@Experimental(value=TRIGGER) public Window<T> withAllowedLateness(Duration allowedLateness, Window.ClosingBehavior behavior)
Window
operation, this will be applied at
the next GroupByKey
. Any elements that are later than this as decided by
the system-maintained watermark will be dropped.
This value also determines how long state will be kept around for old windows. Once no elements will be added to a window (because this duration has passed) any state associated with the window will be cleaned up.
public WindowingStrategy<?,?> getOutputStrategyInternal(WindowingStrategy<?,?> inputStrategy)
Window PTransform
. For internal use
only.public PCollection<T> expand(PCollection<T> input)
PTransform
PTransform
on the given InputT
, and returns its
Output
.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PCollection<T>,PCollection<T>>
public void populateDisplayData(DisplayData.Builder builder)
PTransform
populateDisplayData(DisplayData.Builder)
is invoked by Pipeline runners to collect
display data via DisplayData.from(HasDisplayData)
. Implementations may call
super.populateDisplayData(builder)
in order to register display data in the current
namespace, but should otherwise use subcomponent.populateDisplayData(builder)
to use
the namespace of the subcomponent.
By default, does not register any display data. Implementors may override this method to provide their own display data.
populateDisplayData
in interface HasDisplayData
populateDisplayData
in class PTransform<PCollection<T>,PCollection<T>>
builder
- The builder to populate with display data.HasDisplayData
protected Coder<?> getDefaultOutputCoder(PCollection<T> input)
PTransform
Coder
to use for the output of this
single-output PTransform
when applied to the given input.
By default, always throws.
getDefaultOutputCoder
in class PTransform<PCollection<T>,PCollection<T>>
protected java.lang.String getKindString()
PTransform
PTransform
(not including the names of any enclosing PTransform
s).
By default, returns the base name of this PTransform
's class.
The caller is responsible for ensuring that names of applied
PTransform
s are unique, e.g., by adding a uniquifying
suffix when needed.
getKindString
in class PTransform<PCollection<T>,PCollection<T>>
public static <T> org.apache.beam.sdk.transforms.windowing.Window.Remerge<T> remerge()
Window
PTransform
that does not change assigned
windows, but will cause windows to be merged again as part of the next
GroupByKey
.