#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A profiler context manager based on cProfile.Profile objects.
For internal use only; no backwards-compatibility guarantees.
"""
from __future__ import absolute_import
import cProfile # pylint: disable=bad-python3-import
import io
import logging
import os
import pstats
import random
import tempfile
import time
import warnings
from builtins import object
from threading import Timer
from apache_beam.io import filesystems
[docs]class Profile(object):
"""cProfile wrapper context for saving and logging profiler results."""
SORTBY = 'cumulative'
def __init__(self, profile_id, profile_location=None, log_results=False,
file_copy_fn=None, time_prefix='%Y-%m-%d_%H_%M_%S-'):
self.stats = None
self.profile_id = str(profile_id)
self.profile_location = profile_location
self.log_results = log_results
self.file_copy_fn = file_copy_fn or self.default_file_copy_fn
self.time_prefix = time_prefix
self.profile_output = None
def __enter__(self):
logging.info('Start profiling: %s', self.profile_id)
self.profile = cProfile.Profile()
self.profile.enable()
return self
def __exit__(self, *args):
self.profile.disable()
logging.info('Stop profiling: %s', self.profile_id)
if self.profile_location:
dump_location = os.path.join(
self.profile_location,
time.strftime(self.time_prefix + self.profile_id))
fd, filename = tempfile.mkstemp()
try:
os.close(fd)
self.profile.dump_stats(filename)
logging.info('Copying profiler data to: [%s]', dump_location)
self.file_copy_fn(filename, dump_location)
finally:
os.remove(filename)
self.profile_output = dump_location
if self.log_results:
s = io.StringIO()
self.stats = pstats.Stats(
self.profile, stream=s).sort_stats(Profile.SORTBY)
self.stats.print_stats()
logging.info('Profiler data: [%s]', s.getvalue())
[docs] @staticmethod
def default_file_copy_fn(src, dest):
dest_handle = filesystems.FileSystems.create(dest + '.tmp')
try:
with open(src, 'rb') as src_handle:
dest_handle.write(src_handle.read())
finally:
dest_handle.close()
filesystems.FileSystems.rename([dest + '.tmp'], [dest])
[docs] @staticmethod
def factory_from_options(options):
if options.profile_cpu:
def create_profiler(profile_id, **kwargs):
if random.random() < options.profile_sample_rate:
return Profile(profile_id, options.profile_location, **kwargs)
return create_profiler
[docs]class MemoryReporter(object):
"""A memory reporter that reports the memory usage and heap profile.
Usage:::
mr = MemoryReporter(interval_second=30.0)
mr.start()
while ...
<do something>
# this will report continuously with 30 seconds between reports.
mr.stop()
NOTE: A reporter with start() should always stop(), or the parent process can
never finish.
Or simply the following which does star() and stop():
with MemoryReporter(interval_second=100):
while ...
<do some thing>
Also it could report on demand without continuous reporting.::
mr = MemoryReporter() # default interval 60s but not started.
<do something>
mr.report_once()
"""
def __init__(self, interval_second=60.0):
# guppy might not have installed. http://pypi.python.org/pypi/guppy/0.1.10
# The reporter can be set up only when guppy is installed (and guppy cannot
# be added to the required packages in setup.py, since it's not available
# in all platforms).
try:
from guppy import hpy # pylint: disable=import-error
self._hpy = hpy
self._interval_second = interval_second
self._timer = None
except ImportError:
warnings.warn('guppy is not installed; MemoryReporter not available.')
self._hpy = None
self._enabled = False
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.stop()
[docs] def start(self):
if self._enabled or not self._hpy:
return
self._enabled = True
def report_with_interval():
if not self._enabled:
return
self.report_once()
self._timer = Timer(self._interval_second, report_with_interval)
self._timer.start()
self._timer = Timer(self._interval_second, report_with_interval)
self._timer.start()
[docs] def stop(self):
if not self._enabled:
return
self._timer.cancel()
self._enabled = False
[docs] def report_once(self):
if not self._hpy:
return
report_start_time = time.time()
heap_profile = self._hpy().heap()
logging.info('*** MemoryReport Heap:\n %s\n MemoryReport took %.1f seconds',
heap_profile, time.time() - report_start_time)