#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This file contains internal metric cell classes. A metric cell is used to
accumulate in-memory changes to a metric. It represents a specific metric
in a single context.
For internal use only. No backwards compatibility guarantees.
"""
# pytype: skip-file
from __future__ import absolute_import
from __future__ import division
from builtins import object
from typing import TYPE_CHECKING
from typing import Optional
from apache_beam.metrics.cells import MetricAggregator
from apache_beam.metrics.cells import MetricCell
from apache_beam.metrics.cells import MetricCellFactory
from apache_beam.utils.histogram import Histogram
if TYPE_CHECKING:
from apache_beam.utils.histogram import BucketType
[docs]class HistogramCell(MetricCell):
"""For internal use only; no backwards-compatibility guarantees.
Tracks the current value and delta for a histogram metric.
Each cell tracks the state of a metric independently per context per bundle.
Therefore, each metric has a different cell in each bundle, that is later
aggregated.
This class is thread safe since underlying histogram object is thread safe.
"""
def __init__(self, bucket_type):
self._bucket_type = bucket_type
self.data = HistogramAggregator(bucket_type).identity_element()
[docs] def reset(self):
self.data = HistogramAggregator(self._bucket_type).identity_element()
[docs] def combine(self, other):
# type: (HistogramCell) -> HistogramCell
result = HistogramCell(self._bucket_type)
result.data = self.data.combine(other.data)
return result
[docs] def update(self, value):
self.data.histogram.record(value)
[docs] def get_cumulative(self):
# type: () -> HistogramData
return self.data.get_cumulative()
[docs] def to_runner_api_monitoring_info(self, name, transform_id):
# Histogram metric is currently worker-local and internal
# use only. This method should be implemented when runners
# support Histogram metric reporting.
return None
[docs]class HistogramCellFactory(MetricCellFactory):
def __init__(self, bucket_type):
self._bucket_type = bucket_type
def __call__(self):
return HistogramCell(self._bucket_type)
def __eq__(self, other):
if not isinstance(other, HistogramCellFactory):
return False
return self._bucket_type == other._bucket_type
def __hash__(self):
return hash(self._bucket_type)
[docs]class HistogramResult(object):
def __init__(self, data):
# type: (HistogramData) -> None
self.data = data
def __eq__(self, other):
if isinstance(other, HistogramResult):
return self.data == other.data
else:
return False
def __hash__(self):
return hash(self.data)
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __repr__(self):
return '<HistogramResult({})>'.format(
self.data.histogram.get_percentile_info())
@property
def p99(self):
return self.data.histogram.p99()
@property
def p95(self):
return self.data.histogram.p95()
@property
def p90(self):
return self.data.histogram.p90()
[docs]class HistogramData(object):
"""For internal use only; no backwards-compatibility guarantees.
The data structure that holds data about a histogram metric.
This object is not thread safe, so it's not supposed to be modified
outside the HistogramCell.
"""
def __init__(self, histogram):
self.histogram = histogram
def __eq__(self, other):
return self.histogram == other.histogram
def __hash__(self):
return hash(self.histogram)
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __repr__(self):
return 'HistogramData({})'.format(self.histogram.get_percentile_info())
[docs] def get_cumulative(self):
# type: () -> HistogramData
return HistogramData(self.histogram)
[docs] def combine(self, other):
# type: (Optional[HistogramData]) -> HistogramData
if other is None:
return self
return HistogramData(self.histogram.combine(other.histogram))
[docs]class HistogramAggregator(MetricAggregator):
"""For internal use only; no backwards-compatibility guarantees.
Aggregator for Histogram metric data during pipeline execution.
Values aggregated should be ``HistogramData`` objects.
"""
def __init__(self, bucket_type):
# type: (BucketType) -> None
self._bucket_type = bucket_type
[docs] def identity_element(self):
# type: () -> HistogramData
return HistogramData(Histogram(self._bucket_type))
[docs] def combine(self, x, y):
# type: (HistogramData, HistogramData) -> HistogramData
return x.combine(y)
[docs] def result(self, x):
# type: (HistogramData) -> HistogramResult
return HistogramResult(x.get_cumulative())