public abstract class FillGaps<ValueT> extends PTransform<PCollection<ValueT>,PCollection<ValueT>>
This transform views the original PCollection as a collection of timeseries, each with a different key. They
key to be used and the timeseries bucket size are both specified in the of(org.joda.time.Duration, java.lang.String...)
creation method. Multiple
fields can be specified for the key - the key extracted will be a composite of all of them. Any elements in the
original PCollection
will appear unchanged in the output PCollection, with timestamp and window unchanged.
Any gaps in timeseries (i.e. buckets with no elements) will be filled in the output PCollection with a single element
(by default the latest element seen or propagated into the previous bucket). The timestamp of the filled element is
the end of the bucket, and the original PCollection's window function is used to assign it to a window.
Example usage: the following code views each user,country pair in the input PCollection
as a timeseries
with bucket size one second. If any of these timeseries has a bucket with no elements, then the latest element from
the previous bucket (i.e. the one with the largest timestamp) wil be propagated into the missing bucket. If there
are multiple missing buckets, then they all will be filled up to 1 hour - the maximum gap size specified in
withMaxGapFillBuckets(java.lang.Long)
.
PCollection<MyType> input = readInput();
PCollection<MyType> gapFilled =
input.apply("fillGaps",
FillGaps.of(Duration.standardSeconds(1), "userId", "country")
.withMaxGapFillBuckets(3600L)));
gapFilled.apply(MySink.create());
By default, the latest element from the previous bucket is propagated into missing buckets. The user can override
this using the withMergeFunction(org.apache.beam.sdk.transforms.SerializableBiFunction<org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>>)
method. Several built-in merge functions are provided for -
keepLatest()
(the default), keepEarliest()
, an #keepNull()
.
Sometimes elements need to be modified before being propagated into a missing bucket. For example, consider the following element type containing a timestamp:
{@code @DefaultSchema(JavaFieldSchema.class) class MyType { MyData data; Instant timestamp;
Modifier and Type | Class and Description |
---|---|
static class |
FillGaps.FillGapsDoFn<ValueT> |
static class |
FillGaps.InterpolateData<ValueT>
Argument to withInterpolateFunction function.
|
name, resourceHints
Constructor and Description |
---|
FillGaps() |
Modifier and Type | Method and Description |
---|---|
PCollection<ValueT> |
expand(PCollection<ValueT> input)
Override this method to specify how this
PTransform should be expanded on the given
InputT . |
static <ValueT> SerializableBiFunction<TimestampedValue<ValueT>,TimestampedValue<ValueT>,TimestampedValue<ValueT>> |
keepEarliest()
|
static <ValueT> SerializableBiFunction<TimestampedValue<ValueT>,TimestampedValue<ValueT>,TimestampedValue<ValueT>> |
keepLatest()
|
static <ValueT> FillGaps<ValueT> |
of(Duration windowDuration,
FieldAccessDescriptor keyDescriptor)
Construct the transform for the given duration and key fields.
|
static <ValueT> FillGaps<ValueT> |
of(Duration windowDuration,
java.lang.String... keys)
Construct the transform for the given duration and key fields.
|
FillGaps<ValueT> |
withInterpolateFunction(SerializableFunction<FillGaps.InterpolateData<ValueT>,ValueT> interpolateFunction)
This function can be used to modify elements before propagating to the next bucket.
|
FillGaps<ValueT> |
withMaxGapFillBuckets(java.lang.Long value) |
FillGaps<ValueT> |
withMergeFunction(SerializableBiFunction<TimestampedValue<ValueT>,TimestampedValue<ValueT>,TimestampedValue<ValueT>> mergeFunction)
If there are multiple values in a single timeseries bucket, this function is used to specify
what to propagate to the next bucket.
|
FillGaps<ValueT> |
withStopTime(Instant stopTime) |
compose, compose, getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setResourceHints, toString, validate, validate
public static <ValueT> SerializableBiFunction<TimestampedValue<ValueT>,TimestampedValue<ValueT>,TimestampedValue<ValueT>> keepLatest()
withMergeFunction(org.apache.beam.sdk.transforms.SerializableBiFunction<org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>>)
. Always propagates the element with the latest
timestamp.public static <ValueT> SerializableBiFunction<TimestampedValue<ValueT>,TimestampedValue<ValueT>,TimestampedValue<ValueT>> keepEarliest()
withMergeFunction(org.apache.beam.sdk.transforms.SerializableBiFunction<org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>>)
. Always propagates the element with the earliest
timestamp.public static <ValueT> FillGaps<ValueT> of(Duration windowDuration, java.lang.String... keys)
public static <ValueT> FillGaps<ValueT> of(Duration windowDuration, FieldAccessDescriptor keyDescriptor)
public FillGaps<ValueT> withMergeFunction(SerializableBiFunction<TimestampedValue<ValueT>,TimestampedValue<ValueT>,TimestampedValue<ValueT>> mergeFunction)
public FillGaps<ValueT> withInterpolateFunction(SerializableFunction<FillGaps.InterpolateData<ValueT>,ValueT> interpolateFunction)
public PCollection<ValueT> expand(PCollection<ValueT> input)
PTransform
PTransform
should be expanded on the given
InputT
.
NOTE: This method should not be called directly. Instead apply the PTransform
should
be applied to the InputT
using the apply
method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PCollection<ValueT>,PCollection<ValueT>>