Source code for apache_beam.tools.utils

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Utility functions for all microbenchmarks."""

# pytype: skip-file

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import gc
import importlib
import os
import time

import numpy


[docs]def check_compiled(module): """Check whether given module has been compiled. Args: module: string, module name """ check_module = importlib.import_module(module) ext = os.path.splitext(check_module.__file__)[-1] if ext in ('.py', '.pyc'): raise RuntimeError( "Profiling uncompiled code.\n" "To compile beam, run " "'pip install Cython; python setup.py build_ext --inplace'")
[docs]class BenchmarkConfig(collections.namedtuple("BenchmarkConfig", ["benchmark", "size", "num_runs"]) ): """ Attributes: benchmark: a callable that takes an int argument - benchmark size, and returns a callable. A returned callable must run the code being benchmarked on an input of specified size. For example, one can implement a benchmark as: class MyBenchmark(object): def __init__(self, size): [do necessary initialization] def __call__(self): [run the code in question] size: int, a size of the input. Aggregated per-element metrics are counted based on the size of the input. num_runs: int, number of times to run each benchmark. """ def __str__(self): return "%s, %s element(s)" % ( getattr(self.benchmark, '__name__', str(self.benchmark)), str(self.size))
[docs]class LinearRegressionBenchmarkConfig(collections.namedtuple( "LinearRegressionBenchmarkConfig", ["benchmark", "starting_point", "increment", "num_runs"])): """ Attributes: benchmark: a callable that takes an int argument - benchmark size, and returns a callable. A returned callable must run the code being benchmarked on an input of specified size. For example, one can implement a benchmark as: class MyBenchmark(object): def __init__(self, size): [do necessary initialization] def __call__(self): [run the code in question] starting_point: int, an initial size of the input. Regression results are calculated based on the input. increment: int, the rate of growth of the input for each run of the benchmark. num_runs: int, number of times to run each benchmark. """ def __str__(self): return "%s, %s element(s) at start, %s growth per run" % ( getattr(self.benchmark, '__name__', str(self.benchmark)), str(self.starting_point), str(self.increment))
[docs]def run_benchmarks(benchmark_suite, verbose=True): """Runs benchmarks, and collects execution times. A simple instrumentation to run a callable several times, collect and print its execution times. Args: benchmark_suite: A list of BenchmarkConfig. verbose: bool, whether to print benchmark results to stdout. Returns: A dictionary of the form string -> list of floats. Keys of the dictionary are benchmark names, values are execution times in seconds for each run. """ def run(benchmark_fn, size): # Contain each run of a benchmark inside a function so that any temporary # objects can be garbage-collected after the run. benchmark_instance_callable = benchmark_fn(size) start = time.time() _ = benchmark_instance_callable() return time.time() - start cost_series = collections.defaultdict(list) size_series = collections.defaultdict(list) for benchmark_config in benchmark_suite: name = str(benchmark_config) num_runs = benchmark_config.num_runs if isinstance(benchmark_config, LinearRegressionBenchmarkConfig): size = benchmark_config.starting_point step = benchmark_config.increment else: assert isinstance(benchmark_config, BenchmarkConfig) size = benchmark_config.size step = 0 for run_id in range(num_runs): # Do a proactive GC before each run to minimize side-effects of different # runs. gc.collect() time_cost = run(benchmark_config.benchmark, size) # Appending size and time cost to perform linear regression cost_series[name].append(time_cost) size_series[name].append(size) if verbose: per_element_cost = time_cost / size print( "%s: run %d of %d, per element time cost: %g sec" % (name, run_id + 1, num_runs, per_element_cost)) # Incrementing the size of the benchmark run by the step size size += step if verbose: print("") if verbose: pad_length = max([len(str(bc)) for bc in benchmark_suite]) for benchmark_config in benchmark_suite: name = str(benchmark_config) if isinstance(benchmark_config, LinearRegressionBenchmarkConfig): from scipy import stats print() # pylint: disable=unused-variable gradient, intercept, r_value, p_value, std_err = stats.linregress( size_series[name], cost_series[name]) print("Fixed cost ", intercept) print("Per-element ", gradient) print("R^2 ", r_value**2) else: assert isinstance(benchmark_config, BenchmarkConfig) per_element_median_cost = ( numpy.median(cost_series[name]) / benchmark_config.size) std = numpy.std(cost_series[name]) / benchmark_config.size print( "%s: p. element median time cost: %g sec, relative std: %.2f%%" % ( name.ljust(pad_length, " "), per_element_median_cost, std * 100 / per_element_median_cost)) return size_series, cost_series