Source code for apache_beam.internal.metrics.metric

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Metrics API classes for internal use only.

Users should use apache_beam.metrics.metric package instead.

For internal use only. No backwards compatibility guarantees.
"""
# pytype: skip-file
# mypy: disallow-untyped-defs

from __future__ import absolute_import

import datetime
import logging
import threading
import time
from builtins import object
from typing import TYPE_CHECKING
from typing import Dict
from typing import Optional
from typing import Type
from typing import Union

from apache_beam.internal.metrics.cells import HistogramCellFactory
from apache_beam.metrics.execution import MetricUpdater
from apache_beam.metrics.metric import Metrics as UserMetrics
from apache_beam.metrics.metricbase import Histogram
from apache_beam.metrics.metricbase import MetricName

if TYPE_CHECKING:
  from apache_beam.metrics.cells import MetricCell
  from apache_beam.metrics.cells import MetricCellFactory
  from apache_beam.utils.histogram import BucketType

__all__ = ['Metrics']

_LOGGER = logging.getLogger(__name__)


[docs]class Metrics(object):
[docs] @staticmethod def histogram(namespace, name, bucket_type, logger=None): # type: (Union[Type, str], str, BucketType, Optional[MetricLogger]) -> Metrics.DelegatingHistogram """Obtains or creates a Histogram metric. Args: namespace: A class or string that gives the namespace to a metric name: A string that gives a unique name to a metric bucket_type: A type of bucket used in a histogram. A subclass of apache_beam.utils.histogram.BucketType logger: MetricLogger for logging locally aggregated metric Returns: A Histogram object. """ namespace = UserMetrics.get_namespace(namespace) return Metrics.DelegatingHistogram( MetricName(namespace, name), bucket_type, logger)
[docs] class DelegatingHistogram(Histogram): """Metrics Histogram that Delegates functionality to MetricsEnvironment.""" def __init__(self, metric_name, bucket_type, logger): # type: (MetricName, BucketType, Optional[MetricLogger]) -> None super(Metrics.DelegatingHistogram, self).__init__(metric_name) self.metric_name = metric_name self.cell_type = HistogramCellFactory(bucket_type) self.logger = logger self.updater = MetricUpdater(self.cell_type, self.metric_name)
[docs] def update(self, value): # type: (object) -> None self.updater(value) if self.logger: self.logger.update(self.cell_type, self.metric_name, value)
class MetricLogger(object): """Simple object to locally aggregate and log metrics. This class is experimental. No backwards-compatibility guarantees. """ def __init__(self): # type: () -> None self._metric = dict() # type: Dict[MetricName, MetricCell] self._lock = threading.Lock() self._last_logging_millis = int(time.time() * 1000) self.minimum_logging_frequency_msec = 180000 def update(self, cell_type, metric_name, value): # type: (Union[Type[MetricCell], MetricCellFactory], MetricName, object) -> None cell = self._get_metric_cell(cell_type, metric_name) cell.update(value) def _get_metric_cell(self, cell_type, metric_name): # type: (Union[Type[MetricCell], MetricCellFactory], MetricName) -> MetricCell with self._lock: if metric_name not in self._metric: self._metric[metric_name] = cell_type() return self._metric[metric_name] def log_metrics(self, reset_after_logging=False): # type: (bool) -> None if self._lock.acquire(False): try: current_millis = int(time.time() * 1000) if ((current_millis - self._last_logging_millis) > self.minimum_logging_frequency_msec): logging_metric_info = [ '[Locally aggregated metrics since %s]' % datetime.datetime.fromtimestamp( self._last_logging_millis / 1000.0) ] for name, cell in self._metric.items(): logging_metric_info.append('%s: %s' % (name, cell.get_cumulative())) _LOGGER.info('\n'.join(logging_metric_info)) if reset_after_logging: self._metric = dict() self._last_logging_millis = current_millis finally: self._lock.release()