Source code for apache_beam.typehints.arrow_batching_microbenchmark

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A microbenchmark for pyarrow batch creation.

This microbenchmark exercises the PyarrowBatchConverter.produce_batch method
for different batch sizes.
"""

import argparse
import logging

import pyarrow as pa

from apache_beam.portability.api import schema_pb2
from apache_beam.tools import utils
from apache_beam.typehints.arrow_type_compatibility import PyarrowBatchConverter
from apache_beam.typehints.arrow_type_compatibility import beam_schema_from_arrow_schema
from apache_beam.typehints.schemas import typing_from_runner_api


[docs]def benchmark_produce_batch(size): batch = pa.Table.from_pydict({ 'foo': pa.array(range(size), type=pa.int64()), 'bar': pa.array([i / size for i in range(size)], type=pa.float64()), 'baz': pa.array([str(i) for i in range(size)], type=pa.string()), }) beam_schema = beam_schema_from_arrow_schema(batch.schema) element_type = typing_from_runner_api( schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema))) batch_converter = PyarrowBatchConverter.from_typehints(element_type, pa.Table) elements = list(batch_converter.explode_batch(batch)) def _do_benchmark(): _ = batch_converter.produce_batch(elements) return _do_benchmark
[docs]def run_benchmark( starting_point=1, num_runs=10, num_elements_step=300, verbose=True): suite = [ utils.LinearRegressionBenchmarkConfig( benchmark_produce_batch, starting_point, num_elements_step, num_runs) ] return utils.run_benchmarks(suite, verbose=verbose)
if __name__ == '__main__': logging.basicConfig() parser = argparse.ArgumentParser() parser.add_argument('--num_runs', default=10, type=int) parser.add_argument('--starting_point', default=50, type=int) parser.add_argument('--increment', default=1000, type=int) parser.add_argument('--verbose', default=True, type=bool) options = parser.parse_args() run_benchmark( options.starting_point, options.num_runs, options.increment, options.verbose)