Class FillGaps<ValueT>
- All Implemented Interfaces:
Serializable
,HasDisplayData
This transform views the original PCollection as a collection of timeseries, each with a different key. They
key to be used and the timeseries bucket size are both specified in the of(org.joda.time.Duration, java.lang.String...)
creation method. Multiple
fields can be specified for the key - the key extracted will be a composite of all of them. Any elements in the
original PCollection
will appear unchanged in the output PCollection, with timestamp and window unchanged.
Any gaps in timeseries (i.e. buckets with no elements) will be filled in the output PCollection with a single element
(by default the latest element seen or propagated into the previous bucket). The timestamp of the filled element is
the end of the bucket, and the original PCollection's window function is used to assign it to a window.
Example usage: the following code views each user,country pair in the input PCollection
as a timeseries
with bucket size one second. If any of these timeseries has a bucket with no elements, then the latest element from
the previous bucket (i.e. the one with the largest timestamp) wil be propagated into the missing bucket. If there
are multiple missing buckets, then they all will be filled up to 1 hour - the maximum gap size specified in
withMaxGapFillBuckets(java.lang.Long)
.
PCollection<MyType> input = readInput();
PCollection<MyType> gapFilled =
input.apply("fillGaps",
FillGaps.of(Duration.standardSeconds(1), "userId", "country")
.withMaxGapFillBuckets(3600L)));
gapFilled.apply(MySink.create());
By default, the latest element from the previous bucket is propagated into missing buckets. The user can override
this using the withMergeFunction(org.apache.beam.sdk.transforms.SerializableBiFunction<org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>>)
method. Several built-in merge functions are provided for -
keepLatest()
(the default), keepEarliest()
, an
invalid reference
#keepNull()
Sometimes elements need to be modified before being propagated into a missing bucket. For example, consider the following element type containing a timestamp:
@DefaultSchema(JavaFieldSchema.class) class MyType { MyData data; Instant timestamp; @SchemaCreate MyType(MyData data, Instant timestamp) { this.data = data; this.timestamp - timestamp; } })</pre> The element timestamps should always be contained in its current timeseries bucket, so the element needs to be modified when propagated to a new bucket. This can be done using the {@link #withInterpolateFunction}
method, as follows:PCollection<MyType> input = readInput(); PCollection<MyType> gapFilled = input.apply("fillGaps", FillGaps.of(Duration.standardSeconds(1), "userId", "country") .withInterpolateFunction(p -> new MyType(p.getValue().getValue().data, p.getNextWindow().maxTimestamp())) .withMaxGapFillBuckets(360L))); gapFilled.apply(MySink.create());
- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
static class
Argument to withInterpolateFunction function. -
Field Summary
Fields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionexpand
(PCollection<ValueT> input) Override this method to specify how thisPTransform
should be expanded on the givenInputT
.static <ValueT> SerializableBiFunction
<TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>> static <ValueT> SerializableBiFunction
<TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>> static <ValueT> FillGaps
<ValueT> Construct the transform for the given duration and key fields.static <ValueT> FillGaps
<ValueT> of
(Duration windowDuration, FieldAccessDescriptor keyDescriptor) Construct the transform for the given duration and key fields.withInterpolateFunction
(SerializableFunction<FillGaps.InterpolateData<ValueT>, ValueT> interpolateFunction) This function can be used to modify elements before propagating to the next bucket.withMaxGapFillBuckets
(Long value) withMergeFunction
(SerializableBiFunction<TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>> mergeFunction) If there are multiple values in a single timeseries bucket, this function is used to specify what to propagate to the next bucket.withStopTime
(Instant stopTime) Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setDisplayData, setResourceHints, toString, validate, validate
-
Constructor Details
-
FillGaps
public FillGaps()
-
-
Method Details
-
keepLatest
public static <ValueT> SerializableBiFunction<TimestampedValue<ValueT>,TimestampedValue<ValueT>, keepLatest()TimestampedValue<ValueT>> Argument towithMergeFunction(org.apache.beam.sdk.transforms.SerializableBiFunction<org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>>)
. Always propagates the element with the latest timestamp. -
keepEarliest
public static <ValueT> SerializableBiFunction<TimestampedValue<ValueT>,TimestampedValue<ValueT>, keepEarliest()TimestampedValue<ValueT>> Argument towithMergeFunction(org.apache.beam.sdk.transforms.SerializableBiFunction<org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>>)
. Always propagates the element with the earliest timestamp. -
of
Construct the transform for the given duration and key fields. -
of
public static <ValueT> FillGaps<ValueT> of(Duration windowDuration, FieldAccessDescriptor keyDescriptor) Construct the transform for the given duration and key fields. -
withMaxGapFillBuckets
-
withStopTime
-
withMergeFunction
public FillGaps<ValueT> withMergeFunction(SerializableBiFunction<TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>> mergeFunction) If there are multiple values in a single timeseries bucket, this function is used to specify what to propagate to the next bucket. If not specified, then the value with the latest timestamp will be propagated. -
withInterpolateFunction
public FillGaps<ValueT> withInterpolateFunction(SerializableFunction<FillGaps.InterpolateData<ValueT>, ValueT> interpolateFunction) This function can be used to modify elements before propagating to the next bucket. A common use case is to modify a contained timestamp to match that of the new bucket. -
expand
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PCollection<ValueT>,
PCollection<ValueT>>
-