Source code for apache_beam.utils.profiler
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A profiler context manager based on cProfile.Profile objects.
For internal use only; no backwards-compatibility guarantees.
"""
import cProfile
import logging
import os
import pstats
import StringIO
import tempfile
import time
import warnings
from threading import Timer
[docs]class Profile(object):
"""cProfile wrapper context for saving and logging profiler results."""
SORTBY = 'cumulative'
def __init__(self, profile_id, profile_location=None, log_results=False,
file_copy_fn=None):
self.stats = None
self.profile_id = str(profile_id)
self.profile_location = profile_location
self.log_results = log_results
self.file_copy_fn = file_copy_fn
def __enter__(self):
logging.info('Start profiling: %s', self.profile_id)
self.profile = cProfile.Profile()
self.profile.enable()
return self
def __exit__(self, *args):
self.profile.disable()
logging.info('Stop profiling: %s', self.profile_id)
if self.profile_location and self.file_copy_fn:
dump_location = os.path.join(
self.profile_location, 'profile',
('%s-%s' % (time.strftime('%Y-%m-%d_%H_%M_%S'), self.profile_id)))
fd, filename = tempfile.mkstemp()
self.profile.dump_stats(filename)
logging.info('Copying profiler data to: [%s]', dump_location)
self.file_copy_fn(filename, dump_location) # pylint: disable=protected-access
os.close(fd)
os.remove(filename)
if self.log_results:
s = StringIO.StringIO()
self.stats = pstats.Stats(
self.profile, stream=s).sort_stats(Profile.SORTBY)
self.stats.print_stats()
logging.info('Profiler data: [%s]', s.getvalue())
[docs]class MemoryReporter(object):
"""A memory reporter that reports the memory usage and heap profile.
Usage:::
mr = MemoryReporter(interval_second=30.0)
mr.start()
while ...
<do something>
# this will report continuously with 30 seconds between reports.
mr.stop()
NOTE: A reporter with start() should always stop(), or the parent process can
never finish.
Or simply the following which does star() and stop():
with MemoryReporter(interval_second=100):
while ...
<do some thing>
Also it could report on demand without continuous reporting.::
mr = MemoryReporter() # default interval 60s but not started.
<do something>
mr.report_once()
"""
def __init__(self, interval_second=60.0):
# guppy might not have installed. http://pypi.python.org/pypi/guppy/0.1.10
# The reporter can be set up only when guppy is installed (and guppy cannot
# be added to the required packages in setup.py, since it's not available
# in all platforms).
try:
from guppy import hpy # pylint: disable=import-error
self._hpy = hpy
self._interval_second = interval_second
self._timer = None
except ImportError:
warnings.warn('guppy is not installed; MemoryReporter not available.')
self._hpy = None
self._enabled = False
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.stop()
[docs] def start(self):
if self._enabled or not self._hpy:
return
self._enabled = True
def report_with_interval():
if not self._enabled:
return
self.report_once()
self._timer = Timer(self._interval_second, report_with_interval)
self._timer.start()
self._timer = Timer(self._interval_second, report_with_interval)
self._timer.start()
[docs] def stop(self):
if not self._enabled:
return
self._timer.cancel()
self._enabled = False
[docs] def report_once(self):
if not self._hpy:
return
report_start_time = time.time()
heap_profile = self._hpy().heap()
logging.info('*** MemoryReport Heap:\n %s\n MemoryReport took %.1f seconds',
heap_profile, time.time() - report_start_time)