public class GroupIntoBatches<K,InputT> extends PTransform<PCollection<KV<K,InputT>>,PCollection<KV<K,java.lang.Iterable<InputT>>>>
PTransform
that batches inputs to a desired batch size. Batches will contain only
elements of a single key.
Elements are buffered until there are enough elements for a batch, at which point they are
emitted to the output PCollection
. A maxBufferingDuration
can be set to emit
output early and avoid waiting for a full batch forever.
Batches can be triggered either based on element count or byte size. ofSize(long)
is used
to specify a maximum element count while ofByteSize(long)
is used to specify a maximum byte
size. The single-argument ofByteSize(long)
uses the input coder to determine the encoded byte
size of each element. However, this may not always be what is desired. A user may want to control
batching based on a different byte size (e.g. the memory usage of the decoded Java object) or the
input coder may not be able to efficiently determine the elements' byte size. For these cases, we
also provide the two-argument ofByteSize(long)
allowing the user to pass in a function to be
used to determine the byte size of an element.
Windows are preserved (batches contain elements from the same window). Batches may contain elements from more than one bundle.
Example 1 (batch call a webservice and get return codes):
PCollection<KV<String, String>> input = ...; long batchSize = 100L; PCollection<KV<String, Iterable<String>>> batched = input .apply(GroupIntoBatches.<String, String>ofSize(batchSize)) .setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of()))) .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, KV<String, String>>()
{@ProcessElement public void processElement(@Element KV<String, Iterable<String>> element, OutputReceiver<KV<String, String>> r) { r.output(KV.of(element.getKey(), callWebService(element.getValue()))); }
}));
Example 2 (batch unbounded input in a global window):
PCollection<KV<String, String>> unboundedInput = ...;
long batchSize = 100L;
Duration maxBufferingDuration = Duration.standardSeconds(10);
PCollection<KV<String, Iterable<String>>> batched = unboundedInput
.apply(Window.<KV<String, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
.apply(GroupIntoBatches.<String, String>ofSize(batchSize)
.withMaxBufferingDuration(maxBufferingDuration));
Modifier and Type | Class and Description |
---|---|
static class |
GroupIntoBatches.BatchingParams<InputT>
Wrapper class for batching parameters supplied by users.
|
class |
GroupIntoBatches.WithShardedKey |
name, resourceHints
Modifier and Type | Method and Description |
---|---|
PCollection<KV<K,java.lang.Iterable<InputT>>> |
expand(PCollection<KV<K,InputT>> input)
Override this method to specify how this
PTransform should be expanded on the given
InputT . |
GroupIntoBatches.BatchingParams<InputT> |
getBatchingParams()
Returns user supplied parameters for batching.
|
static <K,InputT> GroupIntoBatches<K,InputT> |
ofByteSize(long batchSizeBytes)
Aim to create batches each with the specified byte size.
|
static <K,InputT> GroupIntoBatches<K,InputT> |
ofByteSize(long batchSizeBytes,
SerializableFunction<InputT,java.lang.Long> getElementByteSize)
Aim to create batches each with the specified byte size.
|
static <K,InputT> GroupIntoBatches<K,InputT> |
ofSize(long batchSize)
Aim to create batches each with the specified element count.
|
GroupIntoBatches<K,InputT> |
withMaxBufferingDuration(Duration duration)
Sets a time limit (in processing time) on how long an incomplete batch of elements is allowed
to be buffered.
|
GroupIntoBatches.WithShardedKey |
withShardedKey()
Outputs batched elements associated with sharded input keys.
|
compose, compose, getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setResourceHints, toString, validate, validate
public static <K,InputT> GroupIntoBatches<K,InputT> ofSize(long batchSize)
public static <K,InputT> GroupIntoBatches<K,InputT> ofByteSize(long batchSizeBytes)
This option uses the PCollection's coder to determine the byte size of each element. This
may not always be what is desired (e.g. the encoded size is not the same as the memory usage of
the Java object). This is also only recommended if the coder returns true for
isRegisterByteSizeObserverCheap, otherwise the transform will perform a possibly-expensive
encoding of each element in order to measure its byte size. An alternate approach is to use
ofByteSize(long, SerializableFunction)
to specify code to calculate the byte size.
public static <K,InputT> GroupIntoBatches<K,InputT> ofByteSize(long batchSizeBytes, SerializableFunction<InputT,java.lang.Long> getElementByteSize)
public GroupIntoBatches.BatchingParams<InputT> getBatchingParams()
public GroupIntoBatches<K,InputT> withMaxBufferingDuration(Duration duration)
@Experimental public GroupIntoBatches.WithShardedKey withShardedKey()
public PCollection<KV<K,java.lang.Iterable<InputT>>> expand(PCollection<KV<K,InputT>> input)
PTransform
PTransform
should be expanded on the given
InputT
.
NOTE: This method should not be called directly. Instead apply the PTransform
should
be applied to the InputT
using the apply
method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PCollection<KV<K,InputT>>,PCollection<KV<K,java.lang.Iterable<InputT>>>>