# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
DataflowRunner implementation of MetricResults. It is in charge of
responding to queries of current metrics by going to the dataflow
import numbers
from collections import defaultdict
from apache_beam.metrics.cells import DistributionData
from apache_beam.metrics.cells import DistributionResult
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricResult
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metricbase import MetricName
def _get_match(proto, filter_fn):
"""Finds and returns the first element that matches a query.
If no element matches the query, it throws ValueError.
If more than one element matches the query, it returns only the first.
query = [elm for elm in proto if filter_fn(elm)]
if len(query) == 0:
raise ValueError('Could not find element')
elif len(query) > 1:
raise ValueError('Too many matches')
return query[0]
[docs]class DataflowMetrics(MetricResults):
"""Implementation of MetricResults class for the Dataflow runner."""
def __init__(self, dataflow_client=None, job_result=None, job_graph=None):
"""Initialize the Dataflow metrics object.
dataflow_client: apiclient.DataflowApplicationClient to interact with the
dataflow service.
job_result: DataflowPipelineResult with the state and id information of
the job.
job_graph: apiclient.Job instance to be able to translate between internal
step names (e.g. "s2"), and user step names (e.g. "split").
super(DataflowMetrics, self).__init__()
self._dataflow_client = dataflow_client
self.job_result = job_result
self._queried_after_termination = False
self._cached_metrics = None
self._job_graph = job_graph
def _is_counter(metric_result):
return isinstance(metric_result.attempted, numbers.Number)
def _is_distribution(metric_result):
return isinstance(metric_result.attempted, DistributionResult)
def _translate_step_name(self, internal_name):
"""Translate between internal step names (e.g. "s1") and user step names."""
if not self._job_graph:
raise ValueError('Could not translate the internal step name.')
step = _get_match(self._job_graph.proto.steps,
lambda x: x.name == internal_name)
user_step_name = _get_match(
lambda x: x.key == 'user_name').value.string_value
except ValueError:
raise ValueError('Could not translate the internal step name.')
return user_step_name
def _get_metric_key(self, metric):
"""Populate the MetricKey object for a queried metric result."""
# If ValueError is thrown within this try-block, it is because of
# one of the following:
# 1. Unable to translate the step name. Only happening with improperly
# formatted job graph (unlikely), or step name not being the internal
# step name (only happens for unstructured-named metrics).
# 2. Unable to unpack [step] or [namespace]; which should only happen
# for unstructured names.
step = _get_match(metric.name.context.additionalProperties,
lambda x: x.key == 'step').value
step = self._translate_step_name(step)
namespace = _get_match(metric.name.context.additionalProperties,
lambda x: x.key == 'namespace').value
name = metric.name.name
except ValueError:
return None
return MetricKey(step, MetricName(namespace, name))
def _populate_metric_results(self, response):
"""Take a list of metrics, and convert it to a list of MetricResult."""
user_metrics = [metric
for metric in response.metrics
if metric.name.origin == 'user']
# Get the tentative/committed versions of every metric together.
metrics_by_name = defaultdict(lambda: {})
for metric in user_metrics:
if (metric.name.name.endswith('[MIN]') or
metric.name.name.endswith('[MAX]') or
metric.name.name.endswith('[MEAN]') or
# The Dataflow Service presents distribution metrics in two ways:
# One way is as a single distribution object with all its fields, and
# another way is as four different scalar metrics labeled as [MIN],
# [MAX], [COUNT], [MEAN].
# TODO(pabloem) remove these when distributions are not being broken up
# in the service.
# The second way is only useful for the UI, and should be ignored.
is_tentative = [prop
for prop in metric.name.context.additionalProperties
if prop.key == 'tentative' and prop.value == 'true']
tentative_or_committed = 'tentative' if is_tentative else 'committed'
metric_key = self._get_metric_key(metric)
if metric_key is None:
metrics_by_name[metric_key][tentative_or_committed] = metric
# Now we create the MetricResult elements.
result = []
for metric_key, metric in metrics_by_name.iteritems():
attempted = self._get_metric_value(metric['tentative'])
committed = self._get_metric_value(metric['committed'])
if attempted is None or committed is None:
return result
def _get_metric_value(self, metric):
"""Get a metric result object from a MetricUpdate from Dataflow API."""
if metric is None:
return None
if metric.scalar is not None:
return metric.scalar.integer_value
elif metric.distribution is not None:
dist_count = _get_match(metric.distribution.object_value.properties,
lambda x: x.key == 'count').value.integer_value
dist_min = _get_match(metric.distribution.object_value.properties,
lambda x: x.key == 'min').value.integer_value
dist_max = _get_match(metric.distribution.object_value.properties,
lambda x: x.key == 'max').value.integer_value
dist_mean = _get_match(metric.distribution.object_value.properties,
lambda x: x.key == 'mean').value.integer_value
# Calculating dist_sum with a hack, as distribution sum is not yet
# available in the Dataflow API.
# TODO(pabloem) Switch to "sum" field once it's available in the API
dist_sum = dist_count * dist_mean
return DistributionResult(
dist_sum, dist_count, dist_min, dist_max))
return None
def _get_metrics_from_dataflow(self):
"""Return cached metrics or query the dataflow service."""
job_id = self.job_result.job_id()
except AttributeError:
job_id = None
if not job_id:
raise ValueError('Can not query metrics. Job id is unknown.')
if self._cached_metrics:
return self._cached_metrics
job_metrics = self._dataflow_client.get_job_metrics(job_id)
# If the job has terminated, metrics will not change and we can cache them.
if self.job_result._is_in_terminal_state():
self._cached_metrics = job_metrics
return job_metrics
[docs] def query(self, filter=None):
response = self._get_metrics_from_dataflow()
metric_results = self._populate_metric_results(response)
return {'counters': [elm for elm in metric_results
if self.matches(filter, elm.key)
and DataflowMetrics._is_counter(elm)],
'distributions': [elm for elm in metric_results
if self.matches(filter, elm.key)
and DataflowMetrics._is_distribution(elm)],
'gauges': []} # Gauges are not currently supported by dataflow