Source code for apache_beam.runners.dataflow.dataflow_metrics

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

DataflowRunner implementation of MetricResults. It is in charge of
responding to queries of current metrics by going to the dataflow

# pytype: skip-file

import argparse
import logging
import numbers
import sys
from collections import defaultdict

from apache_beam.metrics.cells import DistributionData
from apache_beam.metrics.cells import DistributionResult
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricResult
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metricbase import MetricName
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions

_LOGGER = logging.getLogger(__name__)

def _get_match(proto, filter_fn):
  """Finds and returns the first element that matches a query.

  If no element matches the query, it throws ValueError.
  If more than one element matches the query, it returns only the first.
  query = [elm for elm in proto if filter_fn(elm)]
  if len(query) == 0:
    raise ValueError('Could not find element')
  elif len(query) > 1:
    raise ValueError('Too many matches')

  return query[0]

# V1b3 MetricStructuredName keys to accept and copy to the MetricKey labels.
STEP_LABEL = 'step'
    ['execution_step', 'original_name', 'output_user_name'])

[docs]class DataflowMetrics(MetricResults): """Implementation of MetricResults class for the Dataflow runner.""" def __init__(self, dataflow_client=None, job_result=None, job_graph=None): """Initialize the Dataflow metrics object. Args: dataflow_client: apiclient.DataflowApplicationClient to interact with the dataflow service. job_result: DataflowPipelineResult with the state and id information of the job. job_graph: apiclient.Job instance to be able to translate between internal step names (e.g. "s2"), and user step names (e.g. "split"). """ super(DataflowMetrics, self).__init__() self._dataflow_client = dataflow_client self.job_result = job_result self._queried_after_termination = False self._cached_metrics = None self._job_graph = job_graph @staticmethod def _is_counter(metric_result): return isinstance(metric_result.attempted, numbers.Number) @staticmethod def _is_distribution(metric_result): return isinstance(metric_result.attempted, DistributionResult) def _translate_step_name(self, internal_name): """Translate between internal step names (e.g. "s1") and user step names.""" if not self._job_graph: raise ValueError( 'Could not translate the internal step name %r since job graph is ' 'not available.' % internal_name) user_step_name = None if (self._job_graph and internal_name in self._job_graph.proto_pipeline.components.transforms.keys()): # Dataflow Runner v2 with portable job submission uses proto transform map # IDs for step names. Also PTransform.unique_name maps to user step names. # Hence we lookup user step names based on the proto. user_step_name = self._job_graph.proto_pipeline.components.transforms[ internal_name].unique_name else: try: step = _get_match( self._job_graph.proto.steps, lambda x: == internal_name) user_step_name = _get_match(, lambda x: x.key == 'user_name').value.string_value except ValueError: pass # Exception is handled below. if not user_step_name: raise ValueError( 'Could not translate the internal step name %r.' % internal_name) return user_step_name def _get_metric_key(self, metric): """Populate the MetricKey object for a queried metric result.""" step = "" name = # Always extract a name labels = dict() try: # Try to extract the user step name. # If ValueError is thrown within this try-block, it is because of # one of the following: # 1. Unable to translate the step name. Only happening with improperly # formatted job graph (unlikely), or step name not being the internal # step name (only happens for unstructured-named metrics). # 2. Unable to unpack [step] or [namespace]; which should only happen # for unstructured names. step = _get_match(, lambda x: x.key == STEP_LABEL).value step = self._translate_step_name(step) except ValueError: pass namespace = "dataflow/v1b3" # Try to extract namespace or add a default. try: namespace = _get_match(, lambda x: x.key == 'namespace').value except ValueError: pass for kv in if kv.key in STRUCTURED_NAME_LABELS: labels[kv.key] = kv.value # Package everything besides namespace and name the labels as well, # including unmodified step names to assist in integration the exact # unmodified values which come from dataflow. return MetricKey(step, MetricName(namespace, name), labels=labels) def _populate_metrics(self, response, result, user_metrics=False): """Move metrics from response to results as MetricResults.""" if user_metrics: metrics = [ metric for metric in response.metrics if == 'user' ] else: metrics = [ metric for metric in response.metrics if == 'dataflow/v1b3' ] # Get the tentative/committed versions of every metric together. metrics_by_name = defaultdict(lambda: {}) for metric in metrics: if ('_MIN') or'_MAX') or'_MEAN') or'_COUNT')): # The Dataflow Service presents distribution metrics in two ways: # One way is as a single distribution object with all its fields, and # another way is as four different scalar metrics labeled as _MIN, # _MAX, _COUNT_, _MEAN. # TODO(pabloem) remove these when distributions are not being broken up # in the service. # The second way is only useful for the UI, and should be ignored. continue is_tentative = [ prop for prop in if prop.key == 'tentative' and prop.value == 'true' ] tentative_or_committed = 'tentative' if is_tentative else 'committed' metric_key = self._get_metric_key(metric) if metric_key is None: continue metrics_by_name[metric_key][tentative_or_committed] = metric # Now we create the MetricResult elements. for metric_key, metric in metrics_by_name.items(): attempted = self._get_metric_value(metric['tentative']) committed = self._get_metric_value(metric['committed']) result.append( MetricResult(metric_key, attempted=attempted, committed=committed)) def _get_metric_value(self, metric): """Get a metric result object from a MetricUpdate from Dataflow API.""" if metric is None: return None if metric.scalar is not None: return metric.scalar.integer_value elif metric.distribution is not None: dist_count = _get_match(, lambda x: x.key == 'count').value.integer_value dist_min = _get_match(, lambda x: x.key == 'min').value.integer_value dist_max = _get_match(, lambda x: x.key == 'max').value.integer_value dist_sum = _get_match(, lambda x: x.key == 'sum').value.integer_value if not dist_sum: # distribution metric is not meant to use on large values, but in case # it is, the value can overflow and become double_value, the correctness # of the value may not be guaranteed. "Distribution metric sum value seems to have " "overflowed integer_value range, the correctness of sum or mean " "value may not be guaranteed: %s" % metric.distribution) dist_sum = int( _get_match(, lambda x: x.key == 'sum').value.double_value) return DistributionResult( DistributionData(dist_sum, dist_count, dist_min, dist_max)) else: return None def _get_metrics_from_dataflow(self, job_id=None): """Return cached metrics or query the dataflow service.""" if not job_id: try: job_id = self.job_result.job_id() except AttributeError: job_id = None if not job_id: raise ValueError('Can not query metrics. Job id is unknown.') if self._cached_metrics: return self._cached_metrics job_metrics = self._dataflow_client.get_job_metrics(job_id) # If we cannot determine that the job has terminated, # then metrics will not change and we can cache them. if self.job_result and self.job_result.is_in_terminal_state(): self._cached_metrics = job_metrics return job_metrics
[docs] def all_metrics(self, job_id=None): """Return all user and system metrics from the dataflow service.""" metric_results = [] response = self._get_metrics_from_dataflow(job_id=job_id) self._populate_metrics(response, metric_results, user_metrics=True) self._populate_metrics(response, metric_results, user_metrics=False) return metric_results
[docs] def query(self, filter=None): metric_results = [] response = self._get_metrics_from_dataflow() self._populate_metrics(response, metric_results, user_metrics=True) return { self.COUNTERS: [ elm for elm in metric_results if self.matches(filter, elm.key) and DataflowMetrics._is_counter(elm) ], self.DISTRIBUTIONS: [ elm for elm in metric_results if self.matches(filter, elm.key) and DataflowMetrics._is_distribution(elm) ], self.GAUGES: [] } # TODO(pabloem): Add Gauge support for dataflow.
[docs]def main(argv): """Print the metric results for a the dataflow --job_id and --project. Instead of running an entire pipeline which takes several minutes, use this main method to display MetricResults for a specific --job_id and --project which takes only a few seconds. """ # TODO(BEAM-6833): The MetricResults do not show translated step names as the # job_graph is not provided to DataflowMetrics. # Import here to avoid adding the dependency for local running scenarios. try: # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.runners.dataflow.internal import apiclient except ImportError: raise ImportError( 'Google Cloud Dataflow runner not available, ' 'please install apache_beam[gcp]') if argv[0] == __file__: argv = argv[1:] parser = argparse.ArgumentParser() parser.add_argument( '-j', '--job_id', type=str, help='The job id to query metrics for.') parser.add_argument( '-p', '--project', type=str, help='The project name to query metrics for.') flags = parser.parse_args(argv) # Get a Dataflow API client and set its project and job_id in the options. options = PipelineOptions() gcloud_options = options.view_as(GoogleCloudOptions) gcloud_options.project = flags.project dataflow_client = apiclient.DataflowApplicationClient(options) df_metrics = DataflowMetrics(dataflow_client) all_metrics = df_metrics.all_metrics(job_id=flags.job_id) 'Printing all MetricResults for %s in %s', flags.job_id, flags.project) for metric_result in all_metrics:
if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) main(sys.argv)