Class FillGaps<ValueT>

java.lang.Object
org.apache.beam.sdk.transforms.PTransform<PCollection<ValueT>,PCollection<ValueT>>
org.apache.beam.sdk.extensions.timeseries.FillGaps<ValueT>
All Implemented Interfaces:
Serializable, HasDisplayData

public abstract class FillGaps<ValueT> extends PTransform<PCollection<ValueT>,PCollection<ValueT>>
Fill gaps in timeseries. Values are expected to have Beam schemas registered.

This transform views the original PCollection as a collection of timeseries, each with a different key. They key to be used and the timeseries bucket size are both specified in the of(org.joda.time.Duration, java.lang.String...) creation method. Multiple fields can be specified for the key - the key extracted will be a composite of all of them. Any elements in the original PCollection will appear unchanged in the output PCollection, with timestamp and window unchanged. Any gaps in timeseries (i.e. buckets with no elements) will be filled in the output PCollection with a single element (by default the latest element seen or propagated into the previous bucket). The timestamp of the filled element is the end of the bucket, and the original PCollection's window function is used to assign it to a window.

Example usage: the following code views each user,country pair in the input PCollection as a timeseries with bucket size one second. If any of these timeseries has a bucket with no elements, then the latest element from the previous bucket (i.e. the one with the largest timestamp) wil be propagated into the missing bucket. If there are multiple missing buckets, then they all will be filled up to 1 hour - the maximum gap size specified in withMaxGapFillBuckets(java.lang.Long).

PCollection<MyType> input = readInput();
 PCollection<MyType> gapFilled =
   input.apply("fillGaps",
      FillGaps.of(Duration.standardSeconds(1), "userId", "country")
        .withMaxGapFillBuckets(3600L)));
  gapFilled.apply(MySink.create());
     

By default, the latest element from the previous bucket is propagated into missing buckets. The user can override this using the withMergeFunction(org.apache.beam.sdk.transforms.SerializableBiFunction<org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>, org.apache.beam.sdk.values.TimestampedValue<ValueT>>) method. Several built-in merge functions are provided for - keepLatest() (the default), keepEarliest(), an

invalid reference
#keepNull()
.

Sometimes elements need to be modified before being propagated into a missing bucket. For example, consider the following element type containing a timestamp:

@DefaultSchema(JavaFieldSchema.class)
 class MyType {
   MyData data;
   Instant timestamp;
   @SchemaCreate
   MyType(MyData data, Instant timestamp) {
       this.data = data;
       this.timestamp - timestamp;
   }
 })</pre>

 The element timestamps should always be contained in its current timeseries bucket, so the element needs to be
 modified when propagated to a new bucket. This can be done using the {@link #withInterpolateFunction} method, as
 follows:

 
PCollection<MyType> input = readInput();
 PCollection<MyType> gapFilled =
   input.apply("fillGaps",
      FillGaps.of(Duration.standardSeconds(1), "userId", "country")
        .withInterpolateFunction(p -> new MyType(p.getValue().getValue().data, p.getNextWindow().maxTimestamp()))
        .withMaxGapFillBuckets(360L)));
  gapFilled.apply(MySink.create());
  
See Also: