Source code for apache_beam.tools.coders_microbenchmark

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A microbenchmark for measuring performance of coders.

This runs a sequence of encode-decode operations on random inputs
to collect performance of various coders.

To evaluate coders performance we approximate the behavior
how the coders are used in PCollections: we encode and decode
a list of elements. An element can be a string, a list of integers,
a windowed value, or any other object we want a coder to process.

Run as:
  python -m apache_beam.tools.coders_microbenchmark

"""

from __future__ import absolute_import
from __future__ import print_function

import argparse
import random
import re
import string
import sys

from past.builtins import unicode

from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
from apache_beam.coders import coders
from apache_beam.tools import utils
from apache_beam.transforms import window
from apache_beam.utils import windowed_value


[docs]def coder_benchmark_factory(coder, generate_fn): """Creates a benchmark that encodes and decodes a list of elements. Args: coder: coder to use to encode an element. generate_fn: a callable that generates an element. """ class CoderBenchmark(object): def __init__(self, num_elements_per_benchmark): self._coder = coders.IterableCoder(coder) self._list = [generate_fn() for _ in range(num_elements_per_benchmark)] def __call__(self): # Calling coder operations on a single element at a time may incur # unrelevant overhead. To compensate, we use a list elements. _ = self._coder.decode(self._coder.encode(self._list)) CoderBenchmark.__name__ = "%s, %s" % ( generate_fn.__name__, str(coder)) return CoderBenchmark
[docs]def small_int(): return random.randint(0, 127)
[docs]def large_int(): return random.randint(sys.maxsize >> 2, sys.maxsize)
[docs]def random_string(length): return unicode(''.join(random.choice( string.ascii_letters + string.digits) for _ in range(length)))
[docs]def small_string(): return random_string(4)
[docs]def large_string(): return random_string(100)
[docs]def list_int(size): return [small_int() for _ in range(size)]
[docs]def dict_int_int(size): return {i: i for i in list_int(size)}
[docs]def small_list(): return list_int(10)
[docs]def large_list(): # Bool is the last item in FastPrimitiveCoders before pickle. return [bool(k) for k in list_int(1000)]
[docs]def small_tuple(): # Benchmark a common case of 2-element tuples. return tuple(list_int(2))
[docs]def large_tuple(): return tuple(large_list())
[docs]def small_dict(): return {i: i for i in small_list()}
[docs]def large_dict(): return {i: i for i in large_list()}
[docs]def large_iterable(): yield 'a' * coders.coder_impl.SequenceCoderImpl._DEFAULT_BUFFER_SIZE for k in range(1000): yield k
[docs]def random_message_with_map(size): message = test_message.MessageWithMap() keys = list_int(size) random.shuffle(keys) for key in keys: message.field1[str(key)].field1 = small_string() return message
[docs]def small_message_with_map(): return random_message_with_map(5)
[docs]def large_message_with_map(): return random_message_with_map(20)
[docs]def globally_windowed_value(): return windowed_value.WindowedValue( value=small_int(), timestamp=12345678, windows=(window.GlobalWindow(),))
[docs]def random_windowed_value(num_windows): return windowed_value.WindowedValue( value=small_int(), timestamp=12345678, windows=tuple( window.IntervalWindow(i * 10, i * 10 + small_int()) for i in range(num_windows) ))
[docs]def wv_with_one_window(): return random_windowed_value(num_windows=1)
[docs]def wv_with_multiple_windows(): return random_windowed_value(num_windows=32)
[docs]def run_coder_benchmarks( num_runs, input_size, seed, verbose, filter_regex='.*'): random.seed(seed) # TODO(BEAM-4441): Pick coders using type hints, for example: # tuple_coder = typecoders.registry.get_coder(typing.Tuple[int, ...]) benchmarks = [ coder_benchmark_factory( coders.FastPrimitivesCoder(), small_int), coder_benchmark_factory( coders.FastPrimitivesCoder(), large_int), coder_benchmark_factory( coders.FastPrimitivesCoder(), small_string), coder_benchmark_factory( coders.FastPrimitivesCoder(), large_string), coder_benchmark_factory( coders.FastPrimitivesCoder(), small_list), coder_benchmark_factory( coders.IterableCoder(coders.FastPrimitivesCoder()), small_list), coder_benchmark_factory( coders.FastPrimitivesCoder(), large_list), coder_benchmark_factory( coders.IterableCoder(coders.FastPrimitivesCoder()), large_list), coder_benchmark_factory( coders.IterableCoder(coders.FastPrimitivesCoder()), large_iterable), coder_benchmark_factory( coders.FastPrimitivesCoder(), small_tuple), coder_benchmark_factory( coders.FastPrimitivesCoder(), large_tuple), coder_benchmark_factory( coders.FastPrimitivesCoder(), small_dict), coder_benchmark_factory( coders.FastPrimitivesCoder(), large_dict), coder_benchmark_factory( coders.ProtoCoder(test_message.MessageWithMap), small_message_with_map), coder_benchmark_factory( coders.ProtoCoder(test_message.MessageWithMap), large_message_with_map), coder_benchmark_factory( coders.DeterministicProtoCoder(test_message.MessageWithMap), small_message_with_map), coder_benchmark_factory( coders.DeterministicProtoCoder(test_message.MessageWithMap), large_message_with_map), coder_benchmark_factory( coders.WindowedValueCoder(coders.FastPrimitivesCoder()), wv_with_one_window), coder_benchmark_factory( coders.WindowedValueCoder(coders.FastPrimitivesCoder(), coders.IntervalWindowCoder()), wv_with_multiple_windows), coder_benchmark_factory( coders.WindowedValueCoder(coders.FastPrimitivesCoder(), coders.GlobalWindowCoder()), globally_windowed_value), coder_benchmark_factory( coders.LengthPrefixCoder(coders.FastPrimitivesCoder()), small_int) ] suite = [utils.BenchmarkConfig(b, input_size, num_runs) for b in benchmarks if re.search(filter_regex, b.__name__, flags=re.I)] utils.run_benchmarks(suite, verbose=verbose)
if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--filter', default='.*') parser.add_argument('--num_runs', default=20, type=int) parser.add_argument('--num_elements_per_benchmark', default=1000, type=int) parser.add_argument('--seed', default=42, type=int) options = parser.parse_args() utils.check_compiled("apache_beam.coders.coder_impl") num_runs = 20 num_elements_per_benchmark = 1000 seed = 42 # Fix the seed for better consistency run_coder_benchmarks( options.num_runs, options.num_elements_per_benchmark, options.seed, verbose=True, filter_regex=options.filter)