# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A microbenchmark for measuring performance of coders.
This runs a sequence of encode-decode operations on random inputs
to collect performance of various coders.
To evaluate coders performance we approximate the behavior
how the coders are used in PCollections: we encode and decode
a list of elements. An element can be a string, a list of integers,
a windowed value, or any other object we want a coder to process.
Run as:
  python -m apache_beam.tools.coders_microbenchmark
"""
# pytype: skip-file
from __future__ import absolute_import
from __future__ import print_function
import argparse
import random
import re
import string
import sys
from past.builtins import unicode
from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
from apache_beam.coders import coders
from apache_beam.tools import utils
from apache_beam.transforms import window
from apache_beam.utils import windowed_value
[docs]def coder_benchmark_factory(coder, generate_fn):
  """Creates a benchmark that encodes and decodes a list of elements.
  Args:
    coder: coder to use to encode an element.
    generate_fn: a callable that generates an element.
  """
  class CoderBenchmark(object):
    def __init__(self, num_elements_per_benchmark):
      self._coder = coders.IterableCoder(coder)
      self._list = [generate_fn()
                    for _ in range(num_elements_per_benchmark)]
    def __call__(self):
      # Calling coder operations on a single element at a time may incur
      # unrelevant overhead. To compensate, we use a list elements.
      _ = self._coder.decode(self._coder.encode(self._list))
  CoderBenchmark.__name__ = "%s, %s" % (
      generate_fn.__name__, str(coder))
  return CoderBenchmark 
[docs]def small_int():
  return random.randint(0, 127) 
[docs]def large_int():
  return random.randint(sys.maxsize >> 2, sys.maxsize) 
[docs]def random_string(length):
  return unicode(''.join(random.choice(
      string.ascii_letters + string.digits) for _ in range(length))) 
[docs]def small_string():
  return random_string(4) 
[docs]def large_string():
  return random_string(100) 
[docs]def list_int(size):
  return [small_int() for _ in range(size)] 
[docs]def dict_int_int(size):
  return {i: i for i in list_int(size)} 
[docs]def small_list():
  return list_int(10) 
[docs]def large_list():
  # Bool is the last item in FastPrimitiveCoders before pickle.
  return [bool(k) for k in list_int(1000)] 
[docs]def small_tuple():
  # Benchmark a common case of 2-element tuples.
  return tuple(list_int(2)) 
[docs]def large_tuple():
  return tuple(large_list()) 
[docs]def small_dict():
  return {i: i for i in small_list()} 
[docs]def large_dict():
  return {i: i for i in large_list()} 
[docs]def large_iterable():
  yield 'a' * coders.coder_impl.SequenceCoderImpl._DEFAULT_BUFFER_SIZE
  for k in range(1000):
    yield k 
[docs]def random_message_with_map(size):
  message = test_message.MessageWithMap()
  keys = list_int(size)
  random.shuffle(keys)
  for key in keys:
    message.field1[str(key)].field1 = small_string()
  return message 
[docs]def small_message_with_map():
  return random_message_with_map(5) 
[docs]def large_message_with_map():
  return random_message_with_map(20) 
[docs]def globally_windowed_value():
  return windowed_value.WindowedValue(
      value=small_int(),
      timestamp=12345678,
      windows=(window.GlobalWindow(),)) 
[docs]def random_windowed_value(num_windows):
  return windowed_value.WindowedValue(
      value=small_int(),
      timestamp=12345678,
      windows=tuple(
          window.IntervalWindow(i * 10, i * 10 + small_int())
          for i in range(num_windows)
      )) 
[docs]def wv_with_one_window():
  return random_windowed_value(num_windows=1) 
[docs]def wv_with_multiple_windows():
  return random_windowed_value(num_windows=32) 
[docs]def run_coder_benchmarks(
    num_runs, input_size, seed, verbose, filter_regex='.*'):
  random.seed(seed)
  # TODO(BEAM-4441): Pick coders using type hints, for example:
  # tuple_coder = typecoders.registry.get_coder(typing.Tuple[int, ...])
  benchmarks = [
      coder_benchmark_factory(
          coders.FastPrimitivesCoder(), small_int),
      coder_benchmark_factory(
          coders.FastPrimitivesCoder(), large_int),
      coder_benchmark_factory(
          coders.FastPrimitivesCoder(), small_string),
      coder_benchmark_factory(
          coders.FastPrimitivesCoder(), large_string),
      coder_benchmark_factory(
          coders.FastPrimitivesCoder(),
          small_list),
      coder_benchmark_factory(
          coders.IterableCoder(coders.FastPrimitivesCoder()),
          small_list),
      coder_benchmark_factory(
          coders.FastPrimitivesCoder(),
          large_list),
      coder_benchmark_factory(
          coders.IterableCoder(coders.FastPrimitivesCoder()),
          large_list),
      coder_benchmark_factory(
          coders.IterableCoder(coders.FastPrimitivesCoder()),
          large_iterable),
      coder_benchmark_factory(
          coders.FastPrimitivesCoder(),
          small_tuple),
      coder_benchmark_factory(
          coders.FastPrimitivesCoder(),
          large_tuple),
      coder_benchmark_factory(
          coders.FastPrimitivesCoder(),
          small_dict),
      coder_benchmark_factory(
          coders.FastPrimitivesCoder(),
          large_dict),
      coder_benchmark_factory(
          coders.ProtoCoder(test_message.MessageWithMap),
          small_message_with_map),
      coder_benchmark_factory(
          coders.ProtoCoder(test_message.MessageWithMap),
          large_message_with_map),
      coder_benchmark_factory(
          coders.DeterministicProtoCoder(test_message.MessageWithMap),
          small_message_with_map),
      coder_benchmark_factory(
          coders.DeterministicProtoCoder(test_message.MessageWithMap),
          large_message_with_map),
      coder_benchmark_factory(
          coders.WindowedValueCoder(coders.FastPrimitivesCoder()),
          wv_with_one_window),
      coder_benchmark_factory(
          coders.WindowedValueCoder(coders.FastPrimitivesCoder(),
                                    coders.IntervalWindowCoder()),
          wv_with_multiple_windows),
      coder_benchmark_factory(
          coders.WindowedValueCoder(coders.FastPrimitivesCoder(),
                                    coders.GlobalWindowCoder()),
          globally_windowed_value),
      coder_benchmark_factory(
          coders.LengthPrefixCoder(coders.FastPrimitivesCoder()),
          small_int)
  ]
  suite = [utils.BenchmarkConfig(b, input_size, num_runs) for b in benchmarks
           if re.search(filter_regex, b.__name__, flags=re.I)]
  utils.run_benchmarks(suite, verbose=verbose) 
if __name__ == "__main__":
  parser = argparse.ArgumentParser()
  parser.add_argument('--filter', default='.*')
  parser.add_argument('--num_runs', default=20, type=int)
  parser.add_argument('--num_elements_per_benchmark', default=1000, type=int)
  parser.add_argument('--seed', default=42, type=int)
  options = parser.parse_args()
  utils.check_compiled("apache_beam.coders.coder_impl")
  num_runs = 20
  num_elements_per_benchmark = 1000
  seed = 42  # Fix the seed for better consistency
  run_coder_benchmarks(
      options.num_runs, options.num_elements_per_benchmark, options.seed,
      verbose=True, filter_regex=options.filter)