apache_beam.testing.benchmarks.sort_and_batch_benchmark module

Benchmark: BatchElements vs SortAndBatchElements on real Beam pipelines.

Compares two batching strategies for variable-length inference workloads by running the actual Beam transforms under DirectRunner:

  • Baseline (BatchElements): fixed-count batching by setting min_batch_size == max_batch_size.

  • Stateless (SortAndBatchElements): sorts elements by size within each runner bundle, then splits batches using max_batch_weight.

The benchmark materializes per-batch summaries through a temporary Beam sink and analyzes them after the pipeline completes. This keeps the benchmark on the normal Beam execution path rather than relying on InteractiveRunner-specific result materialization or local side effects.

Bundle boundaries are runner-defined. As a result, these measurements are meant to compare the actual DirectRunner behavior of the two transforms rather than a synthetic, user-configurable bundle model.

Padding ratio:

padding_ratio = sum(max_len_in_batch * batch_size) / sum(actual_lengths)
Lower is better.  1.0 = no padding waste.

Methodology:

  • N=20 independent trials per condition (3 warmup trials excluded).

  • Same input corpus (seed=42) for A/B comparison.

  • DirectRunner with in-memory execution and one worker for reproducibility.

  • Percentile method: linear interpolation between adjacent ranks (equivalent to numpy.percentile with method=’linear’). For N=20 trials: P50 interpolates ranks 10-11 (0-indexed 9-10), P95 interpolates ranks 19-20 (0-indexed 18-19), P99 interpolates near rank 20 (0-indexed 18.81).

  • Reports median [IQR] and P95 for each metric.

  • Inference model: latency = batch_size * (max_seq_len / 50)^1.5 ms (simulates downstream transformer-like scaling).

Run:

python3 -m apache_beam.testing.benchmarks.sort_and_batch_benchmark
apache_beam.testing.benchmarks.sort_and_batch_benchmark.generate_highly_skewed_data(num_elements: int, min_length: int = 1, max_length: int = 500, seed: int = 42) list[str][source]

Pareto(alpha=1.2) – most short, few very long.

apache_beam.testing.benchmarks.sort_and_batch_benchmark.generate_lognormal_data(num_elements: int, mean_length: int = 50, std_factor: float = 0.8, min_length: int = 1, max_length: int = 500, seed: int = 42) list[str][source]

Log-normal – moderate skew, typical NLP.

apache_beam.testing.benchmarks.sort_and_batch_benchmark.generate_bimodal_data(num_elements: int, mode1_mean: int = 20, mode2_mean: int = 200, mode1_ratio: float = 0.7, min_length: int = 1, max_length: int = 500, seed: int = 42) list[str][source]

Bimodal – two distinct length groups.

apache_beam.testing.benchmarks.sort_and_batch_benchmark.generate_low_variance_data(num_elements: int, mean_length: int = 100, cv: float = 0.1, min_length: int = 1, max_length: int = 500, seed: int = 42) list[str][source]

Low-variance control (CV=10%).

apache_beam.testing.benchmarks.sort_and_batch_benchmark.simulate_inference_latency(batch_size: int, max_len: int, base_latency_ms: float = 1.0) float[source]

Simulate downstream inference: O(batch_size * seq_len^1.5).

apache_beam.testing.benchmarks.sort_and_batch_benchmark.percentile(data: Sequence[float], p: float) float[source]

Percentile via linear interpolation between adjacent ranks.

Equivalent to numpy.percentile(data, p, method=’linear’). For N=20: P50 interpolates ranks 10-11, P95 ranks 19-20, P99 near rank 20 (fractional index 18.81).

apache_beam.testing.benchmarks.sort_and_batch_benchmark.compute_padding_stats(batch_summaries: list[dict[str, int]]) dict[str, Any][source]

Padding-efficiency statistics for materialized batch summaries.

apache_beam.testing.benchmarks.sort_and_batch_benchmark.validate_invariants(data: list[str], baseline_summaries: list[dict[str, int]], stateless_summaries: list[dict[str, int]]) dict[str, Any][source]

Validate element/token counts and batch-size equality.

apache_beam.testing.benchmarks.sort_and_batch_benchmark.run_performance_benchmark(data: list[str], max_batch_size: int, max_batch_weight: int, num_trials: int = 20, warmup_trials: int = 3) tuple[dict[str, Any], dict[str, Any], list[dict[str, int]], list[dict[str, int]]][source]

Run N=20 trials for baseline and stateless.

apache_beam.testing.benchmarks.sort_and_batch_benchmark.run_benchmark(num_elements: int = 10000, min_length: int = 1, max_length: int = 500, max_batch_size: int = 32, max_batch_weight: int = 2000, distribution: str = 'pareto', seed: int = 42) dict[str, Any][source]

Run baseline vs stateless comparison.

apache_beam.testing.benchmarks.sort_and_batch_benchmark.print_results(results: dict[str, Any]) None[source]
apache_beam.testing.benchmarks.sort_and_batch_benchmark.main()[source]