Source code for apache_beam.metrics.cells

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This file contains metric cell classes. A metric cell is used to accumulate
in-memory changes to a metric. It represents a specific metric in a single
context.

Cells depend on a 'dirty-bit' in the CellCommitState class that tracks whether
a cell's updates have been committed.
"""

from __future__ import absolute_import
from __future__ import division

import threading
import time
from builtins import object

from google.protobuf import timestamp_pb2

from apache_beam.metrics.metricbase import Counter
from apache_beam.metrics.metricbase import Distribution
from apache_beam.metrics.metricbase import Gauge
from apache_beam.portability.api import beam_fn_api_pb2

__all__ = ['DistributionResult', 'GaugeResult']


class CellCommitState(object):
  """For internal use only; no backwards-compatibility guarantees.

  Atomically tracks a cell's dirty/clean commit status.

  Reporting a metric update works in a two-step process: First, updates to the
  metric are received, and the metric is marked as 'dirty'. Later, updates are
  committed, and then the cell may be marked as 'clean'.

  The tracking of a cell's state is done conservatively: A metric may be
  reported DIRTY even if updates have not occurred.

  This class is thread-safe.
  """

  # Indicates that there have been changes to the cell since the last commit.
  DIRTY = 0
  # Indicates that there have NOT been changes to the cell since last commit.
  CLEAN = 1
  # Indicates that a commit of the current value is in progress.
  COMMITTING = 2

  def __init__(self):
    """Initializes ``CellCommitState``.

    A cell is initialized as dirty.
    """
    self._lock = threading.Lock()
    self._state = CellCommitState.DIRTY

  @property
  def state(self):
    with self._lock:
      return self._state

  def after_modification(self):
    """Indicate that changes have been made to the metric being tracked.

    Should be called after modification of the metric value.
    """
    with self._lock:
      self._state = CellCommitState.DIRTY

  def after_commit(self):
    """Mark changes made up to the last call to ``before_commit`` as committed.

    The next call to ``before_commit`` will return ``False`` unless there have
    been changes made.
    """
    with self._lock:
      if self._state == CellCommitState.COMMITTING:
        self._state = CellCommitState.CLEAN

  def before_commit(self):
    """Check the dirty state, and mark the metric as committing.

    After this call, the state is either CLEAN, or COMMITTING. If the state
    was already CLEAN, then we simply return. If it was either DIRTY or
    COMMITTING, then we set the cell as COMMITTING (e.g. in the middle of
    a commit).

    After a commit is successful, ``after_commit`` should be called.

    Returns:
      A boolean, which is false if the cell is CLEAN, and true otherwise.
    """
    with self._lock:
      if self._state == CellCommitState.CLEAN:
        return False
      self._state = CellCommitState.COMMITTING
      return True


class MetricCell(object):
  """For internal use only; no backwards-compatibility guarantees.

  Accumulates in-memory changes to a metric.

  A MetricCell represents a specific metric in a single context and bundle.
  All subclasses must be thread safe, as these are used in the pipeline runners,
  and may be subject to parallel/concurrent updates. Cells should only be used
  directly within a runner.
  """
  def __init__(self):
    self.commit = CellCommitState()
    self._lock = threading.Lock()

  def get_cumulative(self):
    raise NotImplementedError


class CounterCell(Counter, MetricCell):
  """For internal use only; no backwards-compatibility guarantees.

  Tracks the current value and delta of a counter metric.

  Each cell tracks the state of a metric independently per context per bundle.
  Therefore, each metric has a different cell in each bundle, cells are
  aggregated by the runner.

  This class is thread safe.
  """
  def __init__(self, *args):
    super(CounterCell, self).__init__(*args)
    self.value = CounterAggregator.identity_element()

  def reset(self):
    self.commit = CellCommitState()
    self.value = CounterAggregator.identity_element()

  def combine(self, other):
    result = CounterCell()
    result.inc(self.value + other.value)
    return result

  def inc(self, n=1):
    with self._lock:
      self.value += n
      self.commit.after_modification()

  def get_cumulative(self):
    with self._lock:
      return self.value

  def to_runner_api_monitoring_info(self):
    """Returns a Metric with this counter value for use in a MonitoringInfo."""
    # TODO(ajamato): Update this code to be consistent with Gauges
    # and Distributions. Since there is no CounterData class this method
    # was added to CounterCell. Consider adding a CounterData class or
    # removing the GaugeData and DistributionData classes.
    return beam_fn_api_pb2.Metric(
        counter_data=beam_fn_api_pb2.CounterData(
            int64_value=self.get_cumulative()
        )
    )


class DistributionCell(Distribution, MetricCell):
  """For internal use only; no backwards-compatibility guarantees.

  Tracks the current value and delta for a distribution metric.

  Each cell tracks the state of a metric independently per context per bundle.
  Therefore, each metric has a different cell in each bundle, that is later
  aggregated.

  This class is thread safe.
  """
  def __init__(self, *args):
    super(DistributionCell, self).__init__(*args)
    self.data = DistributionAggregator.identity_element()

  def reset(self):
    self.commit = CellCommitState()
    self.data = DistributionAggregator.identity_element()

  def combine(self, other):
    result = DistributionCell()
    result.data = self.data.combine(other.data)
    return result

  def update(self, value):
    with self._lock:
      self.commit.after_modification()
      self._update(value)

  def _update(self, value):
    value = int(value)
    self.data.count += 1
    self.data.sum += value
    self.data.min = (value
                     if self.data.min is None or self.data.min > value
                     else self.data.min)
    self.data.max = (value
                     if self.data.max is None or self.data.max < value
                     else self.data.max)

  def get_cumulative(self):
    with self._lock:
      return self.data.get_cumulative()


class GaugeCell(Gauge, MetricCell):
  """For internal use only; no backwards-compatibility guarantees.

  Tracks the current value and delta for a gauge metric.

  Each cell tracks the state of a metric independently per context per bundle.
  Therefore, each metric has a different cell in each bundle, that is later
  aggregated.

  This class is thread safe.
  """
  def __init__(self, *args):
    super(GaugeCell, self).__init__(*args)
    self.data = GaugeAggregator.identity_element()

  def reset(self):
    self.commit = CellCommitState()
    self.data = GaugeAggregator.identity_element()

  def combine(self, other):
    result = GaugeCell()
    result.data = self.data.combine(other.data)
    return result

  def set(self, value):
    value = int(value)
    with self._lock:
      self.commit.after_modification()
      # Set the value directly without checking timestamp, because
      # this value is naturally the latest value.
      self.data.value = value
      self.data.timestamp = time.time()

  def get_cumulative(self):
    with self._lock:
      return self.data.get_cumulative()


[docs]class DistributionResult(object): """The result of a Distribution metric.""" def __init__(self, data): self.data = data def __eq__(self, other): if isinstance(other, DistributionResult): return self.data == other.data else: return False def __hash__(self): return hash(self.data) def __ne__(self, other): # TODO(BEAM-5949): Needed for Python 2 compatibility. return not self == other def __repr__(self): return '<DistributionResult(sum={}, count={}, min={}, max={})>'.format( self.sum, self.count, self.min, self.max) @property def max(self): return self.data.max @property def min(self): return self.data.min @property def count(self): return self.data.count @property def sum(self): return self.data.sum @property def mean(self): """Returns the float mean of the distribution. If the distribution contains no elements, it returns None. """ if self.data.count == 0: return None return self.data.sum / self.data.count
[docs]class GaugeResult(object): def __init__(self, data): self.data = data def __eq__(self, other): if isinstance(other, GaugeResult): return self.data == other.data else: return False def __hash__(self): return hash(self.data) def __ne__(self, other): # TODO(BEAM-5949): Needed for Python 2 compatibility. return not self == other def __repr__(self): return '<GaugeResult(value={}, timestamp={})>'.format( self.value, self.timestamp) @property def value(self): return self.data.value @property def timestamp(self): return self.data.timestamp
class GaugeData(object): """For internal use only; no backwards-compatibility guarantees. The data structure that holds data about a gauge metric. Gauge metrics are restricted to integers only. This object is not thread safe, so it's not supposed to be modified by other than the GaugeCell that contains it. """ def __init__(self, value, timestamp=None): self.value = value self.timestamp = timestamp if timestamp is not None else 0 def __eq__(self, other): return self.value == other.value and self.timestamp == other.timestamp def __hash__(self): return hash((self.value, self.timestamp)) def __ne__(self, other): # TODO(BEAM-5949): Needed for Python 2 compatibility. return not self == other def __repr__(self): return '<GaugeData(value={}, timestamp={})>'.format( self.value, self.timestamp) def get_cumulative(self): return GaugeData(self.value, timestamp=self.timestamp) def combine(self, other): if other is None: return self if other.timestamp > self.timestamp: return other else: return self @staticmethod def singleton(value, timestamp=None): return GaugeData(value, timestamp=timestamp) def to_runner_api(self): seconds = int(self.timestamp) nanos = int((self.timestamp - seconds) * 10**9) gauge_timestamp = timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos) return beam_fn_api_pb2.Metrics.User.GaugeData( value=self.value, timestamp=gauge_timestamp) @staticmethod def from_runner_api(proto): gauge_timestamp = (proto.timestamp.seconds + float(proto.timestamp.nanos) / 10**9) return GaugeData(proto.value, timestamp=gauge_timestamp) def to_runner_api_monitoring_info(self): """Returns a Metric with this value for use in a MonitoringInfo.""" return beam_fn_api_pb2.Metric( counter_data=beam_fn_api_pb2.CounterData( int64_value=self.value ) ) class DistributionData(object): """For internal use only; no backwards-compatibility guarantees. The data structure that holds data about a distribution metric. Distribution metrics are restricted to distributions of integers only. This object is not thread safe, so it's not supposed to be modified by other than the DistributionCell that contains it. """ def __init__(self, sum, count, min, max): self.sum = sum self.count = count self.min = min self.max = max def __eq__(self, other): return (self.sum == other.sum and self.count == other.count and self.min == other.min and self.max == other.max) def __hash__(self): return hash((self.sum, self.count, self.min, self.max)) def __ne__(self, other): # TODO(BEAM-5949): Needed for Python 2 compatibility. return not self == other def __repr__(self): return '<DistributionData(sum={}, count={}, min={}, max={})>'.format( self.sum, self.count, self.min, self.max) def get_cumulative(self): return DistributionData(self.sum, self.count, self.min, self.max) def combine(self, other): if other is None: return self new_min = (None if self.min is None and other.min is None else min(x for x in (self.min, other.min) if x is not None)) new_max = (None if self.max is None and other.max is None else max(x for x in (self.max, other.max) if x is not None)) return DistributionData( self.sum + other.sum, self.count + other.count, new_min, new_max) @staticmethod def singleton(value): return DistributionData(value, 1, value, value) def to_runner_api(self): return beam_fn_api_pb2.Metrics.User.DistributionData( count=self.count, sum=self.sum, min=self.min, max=self.max) @staticmethod def from_runner_api(proto): return DistributionData(proto.sum, proto.count, proto.min, proto.max) def to_runner_api_monitoring_info(self): """Returns a Metric with this value for use in a MonitoringInfo.""" return beam_fn_api_pb2.Metric( distribution_data=beam_fn_api_pb2.DistributionData( int_distribution_data=beam_fn_api_pb2.IntDistributionData( count=self.count, sum=self.sum, min=self.min, max=self.max))) class MetricAggregator(object): """For internal use only; no backwards-compatibility guarantees. Base interface for aggregating metric data during pipeline execution.""" def identity_element(self): """Returns the identical element of an Aggregation. For the identity element, it must hold that Aggregator.combine(any_element, identity_element) == any_element. """ raise NotImplementedError def combine(self, updates): raise NotImplementedError def result(self, x): raise NotImplementedError class CounterAggregator(MetricAggregator): """For internal use only; no backwards-compatibility guarantees. Aggregator for Counter metric data during pipeline execution. Values aggregated should be ``int`` objects. """ @staticmethod def identity_element(): return 0 def combine(self, x, y): return int(x) + int(y) def result(self, x): return int(x) class DistributionAggregator(MetricAggregator): """For internal use only; no backwards-compatibility guarantees. Aggregator for Distribution metric data during pipeline execution. Values aggregated should be ``DistributionData`` objects. """ @staticmethod def identity_element(): return DistributionData(0, 0, None, None) def combine(self, x, y): return x.combine(y) def result(self, x): return DistributionResult(x.get_cumulative()) class GaugeAggregator(MetricAggregator): """For internal use only; no backwards-compatibility guarantees. Aggregator for Gauge metric data during pipeline execution. Values aggregated should be ``GaugeData`` objects. """ @staticmethod def identity_element(): return GaugeData(None, timestamp=0) def combine(self, x, y): result = x.combine(y) return result def result(self, x): return GaugeResult(x.get_cumulative())