Source code for apache_beam.utils.profiler

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A profiler context manager based on cProfile.Profile and guppy.hpy objects.

For internal use only; no backwards-compatibility guarantees.
"""

# pytype: skip-file
# mypy: check-untyped-defs

import cProfile
import io
import logging
import os
import pstats
import random
import tempfile
import time
from typing import Callable
from typing import Optional

from apache_beam.io import filesystems

_LOGGER = logging.getLogger(__name__)


[docs]class Profile(object): """cProfile and Heapy wrapper context for saving and logging profiler results.""" SORTBY = 'cumulative' profile_output = None # type: str stats = None # type: pstats.Stats def __init__( self, profile_id, # type: str profile_location=None, # type: Optional[str] log_results=False, # type: bool file_copy_fn=None, # type: Optional[Callable[[str, str], None]] time_prefix='%Y-%m-%d_%H_%M_%S-', # type: str enable_cpu_profiling=False, # type: bool enable_memory_profiling=False, # type: bool ): """Creates a Profile object. Args: profile_id: Unique id of the profiling session. profile_location: The file location where the profiling results will be stored. log_results: Log the result to console if true. file_copy_fn: Lambda function for copying files. time_prefix: Format of the timestamp prefix in profiling result files. enable_cpu_profiling: CPU profiler will be enabled during the profiling session. enable_memory_profiling: Memory profiler will be enabled during the profiling session, the profiler only records the newly allocated objects in this session. """ self.profile_id = str(profile_id) self.profile_location = profile_location self.log_results = log_results self.file_copy_fn = file_copy_fn or self.default_file_copy_fn self.time_prefix = time_prefix self.enable_cpu_profiling = enable_cpu_profiling self.enable_memory_profiling = enable_memory_profiling def __enter__(self): _LOGGER.info('Start profiling: %s', self.profile_id) if self.enable_cpu_profiling: self.profile = cProfile.Profile() self.profile.enable() if self.enable_memory_profiling: try: from guppy import hpy self.hpy = hpy() self.hpy.setrelheap() except ImportError: _LOGGER.info("Unable to import guppy for memory profiling") self.hpy = None return self def __exit__(self, *args): _LOGGER.info('Stop profiling: %s', self.profile_id) if self.profile_location: if self.enable_cpu_profiling: self.profile.create_stats() self.profile_output = self._upload_profile_data( # typing: seems stats attr is missing from typeshed self.profile_location, 'cpu_profile', self.profile.stats) # type: ignore[attr-defined] if self.enable_memory_profiling: if not self.hpy: pass else: h = self.hpy.heap() heap_dump_data = '%s\n%s' % (h, h.more) self._upload_profile_data( self.profile_location, 'memory_profile', heap_dump_data, write_binary=False) if self.log_results: if self.enable_cpu_profiling: s = io.StringIO() self.stats = pstats.Stats( self.profile, stream=s).sort_stats(Profile.SORTBY) self.stats.print_stats() _LOGGER.info('Cpu profiler data: [%s]', s.getvalue()) if self.enable_memory_profiling and self.hpy: _LOGGER.info('Memory profiler data: \n%s' % self.hpy.heap())
[docs] @staticmethod def default_file_copy_fn(src, dest): dest_handle = filesystems.FileSystems.create(dest + '.tmp') try: with open(src, 'rb') as src_handle: dest_handle.write(src_handle.read()) finally: dest_handle.close() filesystems.FileSystems.rename([dest + '.tmp'], [dest])
[docs] @staticmethod def factory_from_options(options): # type: (...) -> Optional[Callable[..., Profile]] if options.profile_cpu or options.profile_memory: def create_profiler(profile_id, **kwargs): if random.random() < options.profile_sample_rate: return Profile( profile_id, options.profile_location, enable_cpu_profiling=options.profile_cpu, enable_memory_profiling=options.profile_memory, **kwargs) return create_profiler return None
def _upload_profile_data( self, profile_location, dir, data, write_binary=True): # type: (...) -> str dump_location = os.path.join( profile_location, dir, time.strftime(self.time_prefix + self.profile_id)) fd, filename = tempfile.mkstemp() try: os.close(fd) if write_binary: with open(filename, 'wb') as fb: import marshal marshal.dump(data, fb) else: with open(filename, 'w') as f: f.write(data) _LOGGER.info('Copying profiler data to: [%s]', dump_location) self.file_copy_fn(filename, dump_location) finally: os.remove(filename) return dump_location