apache_beam.testing.benchmarks.sort_and_batch_benchmark module
Benchmark: BatchElements vs SortAndBatchElements on real Beam pipelines.
Compares two batching strategies for variable-length inference workloads by running the actual Beam transforms under DirectRunner:
Baseline (BatchElements): fixed-count batching by setting
min_batch_size == max_batch_size.Stateless (SortAndBatchElements): sorts elements by size within each runner bundle, then splits batches using
max_batch_weight.
The benchmark materializes per-batch summaries through a temporary Beam sink and analyzes them after the pipeline completes. This keeps the benchmark on the normal Beam execution path rather than relying on InteractiveRunner-specific result materialization or local side effects.
Bundle boundaries are runner-defined. As a result, these measurements are meant to compare the actual DirectRunner behavior of the two transforms rather than a synthetic, user-configurable bundle model.
Padding ratio:
padding_ratio = sum(max_len_in_batch * batch_size) / sum(actual_lengths)
Lower is better. 1.0 = no padding waste.
Methodology:
N=20 independent trials per condition (3 warmup trials excluded).
Same input corpus (seed=42) for A/B comparison.
DirectRunner with in-memory execution and one worker for reproducibility.
Percentile method: linear interpolation between adjacent ranks (equivalent to numpy.percentile with method=’linear’). For N=20 trials: P50 interpolates ranks 10-11 (0-indexed 9-10), P95 interpolates ranks 19-20 (0-indexed 18-19), P99 interpolates near rank 20 (0-indexed 18.81).
Reports median [IQR] and P95 for each metric.
Inference model: latency = batch_size * (max_seq_len / 50)^1.5 ms (simulates downstream transformer-like scaling).
Run:
python3 -m apache_beam.testing.benchmarks.sort_and_batch_benchmark
- apache_beam.testing.benchmarks.sort_and_batch_benchmark.generate_highly_skewed_data(num_elements: int, min_length: int = 1, max_length: int = 500, seed: int = 42) list[str][source]
Pareto(alpha=1.2) – most short, few very long.
- apache_beam.testing.benchmarks.sort_and_batch_benchmark.generate_lognormal_data(num_elements: int, mean_length: int = 50, std_factor: float = 0.8, min_length: int = 1, max_length: int = 500, seed: int = 42) list[str][source]
Log-normal – moderate skew, typical NLP.
- apache_beam.testing.benchmarks.sort_and_batch_benchmark.generate_bimodal_data(num_elements: int, mode1_mean: int = 20, mode2_mean: int = 200, mode1_ratio: float = 0.7, min_length: int = 1, max_length: int = 500, seed: int = 42) list[str][source]
Bimodal – two distinct length groups.
- apache_beam.testing.benchmarks.sort_and_batch_benchmark.generate_low_variance_data(num_elements: int, mean_length: int = 100, cv: float = 0.1, min_length: int = 1, max_length: int = 500, seed: int = 42) list[str][source]
Low-variance control (CV=10%).
- apache_beam.testing.benchmarks.sort_and_batch_benchmark.simulate_inference_latency(batch_size: int, max_len: int, base_latency_ms: float = 1.0) float[source]
Simulate downstream inference: O(batch_size * seq_len^1.5).
- apache_beam.testing.benchmarks.sort_and_batch_benchmark.percentile(data: Sequence[float], p: float) float[source]
Percentile via linear interpolation between adjacent ranks.
Equivalent to numpy.percentile(data, p, method=’linear’). For N=20: P50 interpolates ranks 10-11, P95 ranks 19-20, P99 near rank 20 (fractional index 18.81).
- apache_beam.testing.benchmarks.sort_and_batch_benchmark.compute_padding_stats(batch_summaries: list[dict[str, int]]) dict[str, Any][source]
Padding-efficiency statistics for materialized batch summaries.
- apache_beam.testing.benchmarks.sort_and_batch_benchmark.validate_invariants(data: list[str], baseline_summaries: list[dict[str, int]], stateless_summaries: list[dict[str, int]]) dict[str, Any][source]
Validate element/token counts and batch-size equality.
- apache_beam.testing.benchmarks.sort_and_batch_benchmark.run_performance_benchmark(data: list[str], max_batch_size: int, max_batch_weight: int, num_trials: int = 20, warmup_trials: int = 3) tuple[dict[str, Any], dict[str, Any], list[dict[str, int]], list[dict[str, int]]][source]
Run N=20 trials for baseline and stateless.
- apache_beam.testing.benchmarks.sort_and_batch_benchmark.run_benchmark(num_elements: int = 10000, min_length: int = 1, max_length: int = 500, max_batch_size: int = 32, max_batch_weight: int = 2000, distribution: str = 'pareto', seed: int = 42) dict[str, Any][source]
Run baseline vs stateless comparison.