DirectRunner implementation of MetricResults. It is in charge not only of
responding to queries of current metrics, but also of keeping the common
state consistent.
import threading
from collections import defaultdict

from apache_beam.metrics.cells import CounterAggregator
from apache_beam.metrics.cells import DistributionAggregator
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricResult
from apache_beam.metrics.metric import MetricResults

[docs]class DirectMetrics(MetricResults): def __init__(self): self._counters = defaultdict( lambda: DirectMetric(CounterAggregator())) self._distributions = defaultdict( lambda: DirectMetric(DistributionAggregator())) def _apply_operation(self, bundle, updates, op): for k, v in updates.counters.items(): op(self._counters[k], bundle, v) for k, v in updates.distributions.items(): op(self._distributions[k], bundle, v)
[docs] def commit_logical(self, bundle, updates): op = lambda obj, bundle, update: obj.commit_logical(bundle, update) self._apply_operation(bundle, updates, op)
[docs] def commit_physical(self, bundle, updates): op = lambda obj, bundle, update: obj.commit_physical(bundle, update) self._apply_operation(bundle, updates, op)
[docs] def update_physical(self, bundle, updates): op = lambda obj, bundle, update: obj.update_physical(bundle, update) self._apply_operation(bundle, updates, op)
[docs] def query(self, filter=None): counters = [MetricResult(MetricKey(k.step, k.metric), v.extract_committed(), v.extract_latest_attempted()) for k, v in self._counters.items() if self.matches(filter, k)] distributions = [MetricResult(MetricKey(k.step, k.metric), v.extract_committed(), v.extract_latest_attempted()) for k, v in self._distributions.items() if self.matches(filter, k)] return {'counters': counters, 'distributions': distributions}
[docs]class DirectMetric(object): """ Keeps a consistent state for a single metric. It keeps track of the metric's physical and logical updates. It's thread safe. """ def __init__(self, aggregator): self.aggregator = aggregator self._attempted_lock = threading.Lock() self.finished_attempted = self.inflight_attempted = {} self._committed_lock = threading.Lock() self.finished_committed =
[docs] def commit_logical(self, bundle, update): with self._committed_lock: self.finished_committed = self.aggregator.combine(update, self.finished_committed)
[docs] def commit_physical(self, bundle, update): with self._attempted_lock: self.inflight_attempted[bundle] = update self.finished_attempted = self.aggregator.combine(update, self.finished_attempted) del self.inflight_attempted[bundle]
[docs] def update_physical(self, bundle, update): self.inflight_attempted[bundle] = update
[docs] def extract_committed(self): return self.aggregator.result(self.finished_committed)
[docs] def extract_latest_attempted(self): res = self.finished_attempted for _, u in self.inflight_attempted.items(): res = self.aggregator.combine(res, u) return self.aggregator.result(res)