Source code for apache_beam.internal.metrics.cells

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This file contains internal metric cell classes. A metric cell is used to
accumulate in-memory changes to a metric. It represents a specific metric
in a single context.

For internal use only. No backwards compatibility guarantees.
"""

# pytype: skip-file

from __future__ import absolute_import
from __future__ import division

from builtins import object
from typing import TYPE_CHECKING
from typing import Optional

from apache_beam.metrics.cells import MetricAggregator
from apache_beam.metrics.cells import MetricCell
from apache_beam.metrics.cells import MetricCellFactory
from apache_beam.utils.histogram import Histogram

if TYPE_CHECKING:
  from apache_beam.utils.histogram import BucketType


[docs]class HistogramCell(MetricCell): """For internal use only; no backwards-compatibility guarantees. Tracks the current value and delta for a histogram metric. Each cell tracks the state of a metric independently per context per bundle. Therefore, each metric has a different cell in each bundle, that is later aggregated. This class is thread safe since underlying histogram object is thread safe. """ def __init__(self, bucket_type): self._bucket_type = bucket_type self.data = HistogramAggregator(bucket_type).identity_element()
[docs] def reset(self): self.data = HistogramAggregator(self._bucket_type).identity_element()
[docs] def combine(self, other): # type: (HistogramCell) -> HistogramCell result = HistogramCell(self._bucket_type) result.data = self.data.combine(other.data) return result
[docs] def update(self, value): self.data.histogram.record(value)
[docs] def get_cumulative(self): # type: () -> HistogramData return self.data.get_cumulative()
[docs] def to_runner_api_monitoring_info(self, name, transform_id): # Histogram metric is currently worker-local and internal # use only. This method should be implemented when runners # support Histogram metric reporting. return None
[docs]class HistogramCellFactory(MetricCellFactory): def __init__(self, bucket_type): self._bucket_type = bucket_type def __call__(self): return HistogramCell(self._bucket_type) def __eq__(self, other): if not isinstance(other, HistogramCellFactory): return False return self._bucket_type == other._bucket_type def __hash__(self): return hash(self._bucket_type)
[docs]class HistogramResult(object): def __init__(self, data): # type: (HistogramData) -> None self.data = data def __eq__(self, other): if isinstance(other, HistogramResult): return self.data == other.data else: return False def __hash__(self): return hash(self.data) def __repr__(self): return '<HistogramResult({})>'.format( self.data.histogram.get_percentile_info()) @property def p99(self): return self.data.histogram.p99() @property def p95(self): return self.data.histogram.p95() @property def p90(self): return self.data.histogram.p90()
[docs]class HistogramData(object): """For internal use only; no backwards-compatibility guarantees. The data structure that holds data about a histogram metric. This object is not thread safe, so it's not supposed to be modified outside the HistogramCell. """ def __init__(self, histogram): self.histogram = histogram def __eq__(self, other): return self.histogram == other.histogram def __hash__(self): return hash(self.histogram) def __repr__(self): return 'HistogramData({})'.format(self.histogram.get_percentile_info())
[docs] def get_cumulative(self): # type: () -> HistogramData return HistogramData(self.histogram)
[docs] def combine(self, other): # type: (Optional[HistogramData]) -> HistogramData if other is None: return self return HistogramData(self.histogram.combine(other.histogram))
[docs]class HistogramAggregator(MetricAggregator): """For internal use only; no backwards-compatibility guarantees. Aggregator for Histogram metric data during pipeline execution. Values aggregated should be ``HistogramData`` objects. """ def __init__(self, bucket_type): # type: (BucketType) -> None self._bucket_type = bucket_type
[docs] def identity_element(self): # type: () -> HistogramData return HistogramData(Histogram(self._bucket_type))
[docs] def combine(self, x, y): # type: (HistogramData, HistogramData) -> HistogramData return x.combine(y)
[docs] def result(self, x): # type: (HistogramData) -> HistogramResult return HistogramResult(x.get_cumulative())