Source code for apache_beam.tools.distribution_counter_microbenchmark

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A microbenchmark for measuring DistributionAccumulator performance

This runs a sequence of distribution.update for random input value to calculate
average update time per input.
A typical update operation should run into 0.6 microseconds

Run as
  python -m apache_beam.tools.distribution_counter_microbenchmark
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import random
import sys
import time
from builtins import range

from apache_beam.tools import utils


[docs]def generate_input_values(num_input, lower_bound, upper_bound): values = [] # pylint: disable=unused-variable for i in range(num_input): values.append(random.randint(lower_bound, upper_bound)) return values
[docs]def run_benchmark(num_runs=100, num_input=10000, seed=time.time()): total_time = 0 random.seed(seed) lower_bound = 0 upper_bound = sys.maxsize inputs = generate_input_values(num_input, lower_bound, upper_bound) from apache_beam.transforms import DataflowDistributionCounter print("Number of runs:", num_runs) print("Input size:", num_input) print("Input sequence from %d to %d" % (lower_bound, upper_bound)) print("Random seed:", seed) for i in range(num_runs): counter = DataflowDistributionCounter() start = time.time() counter.add_inputs_for_test(inputs) time_cost = time.time() - start print("Run %d: Total time cost %g sec" % (i+1, time_cost)) total_time += time_cost // num_input print("Per element update time cost:", total_time // num_runs)
if __name__ == '__main__': utils.check_compiled( 'apache_beam.transforms.cy_dataflow_distribution_counter') run_benchmark()