Class BatchElements<T>

java.lang.Object
org.apache.beam.sdk.transforms.PTransform<PCollection<T>,PCollection<List<T>>>
org.apache.beam.sdk.transforms.BatchElements<T>
Type Parameters:
T - the type of input elements
All Implemented Interfaces:
Serializable, HasDisplayData

public class BatchElements<T> extends PTransform<PCollection<T>,PCollection<List<T>>>
A PTransform that batches elements for amortized processing.

This transform is designed to precede operations whose processing cost is of the form:

   time = fixed_cost + num_elements * per_element_cost
 

When the per-element cost is significantly smaller than the fixed cost, batching multiple elements together can amortize that fixed cost and improve overall throughput.

The transform consumes a PCollection<T> and produces a PCollection<List<T>>, where each output element is a batch of input elements.

This transform dynamically determines an optimal batch size between the configured minimum and maximum values by profiling the execution time of downstream (fused) operations. To enforce a fixed batch size, set minBatchSize == maxBatchSize.

Elements are batched per window. Each emitted batch belongs to the same window as its elements and is assigned a timestamp at the end of that window.

Example


 // With default configuration
 pipeline
     .apply("Create", Create.of(range(200)))
     .apply("Batch", BatchElements.withDefaults())
     .apply(...);

 // With custom configuration
 BatchElements.BatchConfig config =
     BatchElements.BatchConfig.builder()
         .withMinBatchSize(1)
         .withMaxBatchSize(15)
         .withTargetBatchDurationSecs(10.0)
         .withTargetBatchOverhead(0.05)
         .withVariance(0.0)
         .build();

 pipeline
     .apply("Create", Create.of(range(200)))
     .apply("Batch", BatchElements.withConfig(config))
     .apply(
         "Sizes",
         MapElements.via(
             new SimpleFunction<List<Integer>, Integer>() {
               @Override
               public Integer apply(List<Integer> input) {
                 return input.size();
               }
             }));
 
See Also:
  • Method Details

    • withDefaults

      public static <T> BatchElements<T> withDefaults()
      Batch Elements with default configuration.
    • withConfig

      public static <T> BatchElements<T> withConfig(BatchElements.BatchConfig config)
    • expand

      public PCollection<List<T>> expand(PCollection<T> input)
      Description copied from class: PTransform
      Override this method to specify how this PTransform should be expanded on the given InputT.

      NOTE: This method should not be called directly. Instead apply the PTransform should be applied to the InputT using the apply method.

      Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).

      Specified by:
      expand in class PTransform<PCollection<T>,PCollection<List<T>>>