Class GroupIntoBatches<K,InputT>
- All Implemented Interfaces:
Serializable
,HasDisplayData
PTransform
that batches inputs to a desired batch size. Batches will contain only
elements of a single key.
Elements are buffered until there are enough elements for a batch, at which point they are
emitted to the output PCollection
. A maxBufferingDuration
can be set to emit
output early and avoid waiting for a full batch forever.
Batches can be triggered either based on element count or byte size. ofSize(long)
is used
to specify a maximum element count while ofByteSize(long)
is used to specify a maximum byte
size. The single-argument ofByteSize(long)
uses the input coder to determine the encoded byte
size of each element. However, this may not always be what is desired. A user may want to control
batching based on a different byte size (e.g. the memory usage of the decoded Java object) or the
input coder may not be able to efficiently determine the elements' byte size. For these cases, we
also provide the two-argument ofByteSize(long)
allowing the user to pass in a function to be
used to determine the byte size of an element.
Windows are preserved (batches contain elements from the same window). Batches may contain elements from more than one bundle.
Example 1 (batch call a webservice and get return codes):
PCollection<KV<String, String>> input = ...; long batchSize = 100L; PCollection<KV<String, Iterable<String>>> batched = input .apply(GroupIntoBatches.<String, String>ofSize(batchSize)) .setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of()))) .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, KV<String, String>>()
{@ProcessElement public void processElement(@Element KV<String, Iterable<String>> element, OutputReceiver<KV<String, String>> r) { r.output(KV.of(element.getKey(), callWebService(element.getValue()))); }
}));
Example 2 (batch unbounded input in a global window):
PCollection<KV<String, String>> unboundedInput = ...;
long batchSize = 100L;
Duration maxBufferingDuration = Duration.standardSeconds(10);
PCollection<KV<String, Iterable<String>>> batched = unboundedInput
.apply(Window.<KV<String, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
.apply(GroupIntoBatches.<String, String>ofSize(batchSize)
.withMaxBufferingDuration(maxBufferingDuration));
- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
Wrapper class for batching parameters supplied by users.class
-
Field Summary
Fields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints
-
Method Summary
Modifier and TypeMethodDescriptionPCollection
<KV<K, Iterable<InputT>>> expand
(PCollection<KV<K, InputT>> input) Override this method to specify how thisPTransform
should be expanded on the givenInputT
.Returns user supplied parameters for batching.static <K,
InputT>
GroupIntoBatches<K, InputT> ofByteSize
(long batchSizeBytes) Aim to create batches each with the specified byte size.static <K,
InputT>
GroupIntoBatches<K, InputT> ofByteSize
(long batchSizeBytes, SerializableFunction<InputT, Long> getElementByteSize) Aim to create batches each with the specified byte size.static <K,
InputT>
GroupIntoBatches<K, InputT> ofSize
(long batchSize) Aim to create batches each with the specified element count.toString()
withByteSize
(long batchSizeBytes) withByteSize
(long batchSizeBytes, SerializableFunction<InputT, Long> getElementByteSize) withMaxBufferingDuration
(Duration duration) Sets a time limit (in processing time) on how long an incomplete batch of elements is allowed to be buffered.Outputs batched elements associated with sharded input keys.withSize
(long batchSize) Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setDisplayData, setResourceHints, validate, validate
-
Method Details
-
ofSize
Aim to create batches each with the specified element count. -
ofByteSize
Aim to create batches each with the specified byte size.This option uses the PCollection's coder to determine the byte size of each element. This may not always be what is desired (e.g. the encoded size is not the same as the memory usage of the Java object). This is also only recommended if the coder returns true for isRegisterByteSizeObserverCheap, otherwise the transform will perform a possibly-expensive encoding of each element in order to measure its byte size. An alternate approach is to use
ofByteSize(long, SerializableFunction)
to specify code to calculate the byte size. -
ofByteSize
public static <K,InputT> GroupIntoBatches<K,InputT> ofByteSize(long batchSizeBytes, SerializableFunction<InputT, Long> getElementByteSize) Aim to create batches each with the specified byte size. The provided function is used to determine the byte size of each element. -
getBatchingParams
Returns user supplied parameters for batching. -
toString
- Overrides:
toString
in classPTransform<PCollection<KV<K,
InputT>>, PCollection<KV<K, Iterable<InputT>>>>
-
withSize
- See Also:
-
withByteSize
- See Also:
-
withByteSize
public GroupIntoBatches<K,InputT> withByteSize(long batchSizeBytes, SerializableFunction<InputT, Long> getElementByteSize) - See Also:
-
withMaxBufferingDuration
Sets a time limit (in processing time) on how long an incomplete batch of elements is allowed to be buffered. Once a batch is flushed to output, the timer is reset. The provided limit must be a positive duration or zero; a zero buffering duration effectively means no limit. -
withShardedKey
Outputs batched elements associated with sharded input keys. By default, keys are sharded to such that the input elements with the same key are spread to all available threads executing the transform. Runners may override the default sharding to do a better load balancing during the execution time. -
expand
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PCollection<KV<K,
InputT>>, PCollection<KV<K, Iterable<InputT>>>>
-