Source code for apache_beam.testing.load_tests.load_test_metrics_utils

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Utility functions used for integrating Metrics API into load tests pipelines.

Metrics are send to BigQuery in following format:
test_id | submit_timestamp | metric_type | value

The 'test_id' is common for all metrics for one run.
Currently it is possible to have following metrics types:
* runtime
* total_bytes_count
"""

from __future__ import absolute_import

import logging
import time
import uuid

import apache_beam as beam
from apache_beam.metrics import Metrics

try:
  from google.cloud import bigquery
  from google.cloud.bigquery.schema import SchemaField
  from google.cloud.exceptions import NotFound
except ImportError:
  bigquery = None
  SchemaField = None
  NotFound = None

RUNTIME_METRIC = 'runtime'
COUNTER_LABEL = 'total_bytes_count'

ID_LABEL = 'test_id'
SUBMIT_TIMESTAMP_LABEL = 'timestamp'
METRICS_TYPE_LABEL = 'metric'
VALUE_LABEL = 'value'

SCHEMA = [
    {'name': ID_LABEL,
     'field_type': 'STRING',
     'mode': 'REQUIRED'
    },
    {'name': SUBMIT_TIMESTAMP_LABEL,
     'field_type': 'TIMESTAMP',
     'mode': 'REQUIRED'
    },
    {'name': METRICS_TYPE_LABEL,
     'field_type': 'STRING',
     'mode': 'REQUIRED'
    },
    {'name': VALUE_LABEL,
     'field_type': 'FLOAT',
     'mode': 'REQUIRED'
    }
]

_LOGGER = logging.getLogger()


[docs]def parse_step(step_name): """Replaces white spaces and removes 'Step:' label Args: step_name(str): step name passed in metric ParDo Returns: lower case step name without namespace and step label """ return step_name.lower().replace(' ', '_').strip('step:_')
[docs]def split_metrics_by_namespace_and_name(metrics, namespace, name): """Splits metrics list namespace and name. Args: metrics: list of metrics from pipeline result namespace(str): filter metrics by namespace name(str): filter metrics by name Returns: two lists - one of metrics which are matching filters and second of not matching """ matching_metrics = [] not_matching_metrics = [] for dist in metrics: if dist.key.metric.namespace == namespace\ and dist.key.metric.name == name: matching_metrics.append(dist) else: not_matching_metrics.append(dist) return matching_metrics, not_matching_metrics
[docs]def get_generic_distributions(generic_dists, metric_id): """Creates flatten list of distributions per its value type. A generic distribution is the one which is not processed but saved in the most raw version. Args: generic_dists: list of distributions to be saved metric_id(uuid): id of the current test run Returns: list of dictionaries made from :class:`DistributionMetric` """ return sum( (get_all_distributions_by_type(dist, metric_id) for dist in generic_dists), [] )
[docs]def get_all_distributions_by_type(dist, metric_id): """Creates new list of objects with type of each distribution metric value. Args: dist(object): DistributionMetric object to be parsed metric_id(uuid): id of the current test run Returns: list of :class:`DistributionMetric` objects """ submit_timestamp = time.time() dist_types = ['mean', 'max', 'min', 'sum'] return [ get_distribution_dict(dist_type, submit_timestamp, dist, metric_id) for dist_type in dist_types ]
[docs]def get_distribution_dict(metric_type, submit_timestamp, dist, metric_id): """Function creates :class:`DistributionMetric` Args: metric_type(str): type of value from distribution metric which will be saved (ex. max, min, mean, sum) submit_timestamp: timestamp when metric is saved dist(object) distribution object from pipeline result metric_id(uuid): id of the current test run Returns: dictionary prepared for saving according to schema """ return DistributionMetric(dist, submit_timestamp, metric_id, metric_type).as_dict()
[docs]class MetricsReader(object): """ A :class:`MetricsReader` retrieves metrics from pipeline result, prepares it for publishers and setup publishers. """ publishers = [] def __init__(self, project_name=None, bq_table=None, bq_dataset=None, publish_to_bq=False, filters=None): """Initializes :class:`MetricsReader` . Args: project_name (str): project with BigQuery where metrics will be saved bq_table (str): BigQuery table where metrics will be saved bq_dataset (str): BigQuery dataset where metrics will be saved filters: MetricFilter to query only filtered metrics """ self._namespace = bq_table self.publishers.append(ConsoleMetricsPublisher()) check = project_name and bq_table and bq_dataset and publish_to_bq if check: bq_publisher = BigQueryMetricsPublisher( project_name, bq_table, bq_dataset) self.publishers.append(bq_publisher) self.filters = filters
[docs] def publish_metrics(self, result): metrics = result.metrics().query(self.filters) # Metrics from pipeline result are stored in map with keys: 'gauges', # 'distributions' and 'counters'. # Under each key there is list of objects of each metric type. It is # required to prepare metrics for publishing purposes. Expected is to have # a list of dictionaries matching the schema. insert_dicts = self._prepare_all_metrics(metrics) if len(insert_dicts) > 0: for publisher in self.publishers: publisher.publish(insert_dicts)
def _prepare_all_metrics(self, metrics): metric_id = uuid.uuid4().hex insert_rows = self._get_counters(metrics['counters'], metric_id) insert_rows += self._get_distributions(metrics['distributions'], metric_id) return insert_rows def _get_counters(self, counters, metric_id): submit_timestamp = time.time() return [ CounterMetric(counter, submit_timestamp, metric_id).as_dict() for counter in counters ] def _get_distributions(self, distributions, metric_id): rows = [] matching_namsespace, not_matching_namespace = \ split_metrics_by_namespace_and_name(distributions, self._namespace, RUNTIME_METRIC) if len(matching_namsespace) > 0: runtime_metric = RuntimeMetric(matching_namsespace, metric_id) rows.append(runtime_metric.as_dict()) if len(not_matching_namespace) > 0: rows += get_generic_distributions(not_matching_namespace, metric_id) return rows
[docs]class Metric(object): """Metric base class in ready-to-save format.""" def __init__(self, submit_timestamp, metric_id, value, metric=None, label=None): """Initializes :class:`Metric` Args: metric (object): object of metric result submit_timestamp (float): date-time of saving metric to database metric_id (uuid): unique id to identify test run value: value of metric label: custom metric name to be saved in database """ self.submit_timestamp = submit_timestamp self.metric_id = metric_id self.label = label or metric.key.metric.namespace + \ '_' + parse_step(metric.key.step) + \ '_' + metric.key.metric.name self.value = value
[docs] def as_dict(self): return {SUBMIT_TIMESTAMP_LABEL: self.submit_timestamp, ID_LABEL: self.metric_id, VALUE_LABEL: self.value, METRICS_TYPE_LABEL: self.label }
[docs]class CounterMetric(Metric): """The Counter Metric in ready-to-publish format. Args: counter_metric (object): counter metric object from MetricResult submit_timestamp (float): date-time of saving metric to database metric_id (uuid): unique id to identify test run """ def __init__(self, counter_metric, submit_timestamp, metric_id): value = counter_metric.committed super(CounterMetric, self).__init__(submit_timestamp, metric_id, value, counter_metric)
[docs]class DistributionMetric(Metric): """The Distribution Metric in ready-to-publish format. Args: dist_metric (object): distribution metric object from MetricResult submit_timestamp (float): date-time of saving metric to database metric_id (uuid): unique id to identify test run """ def __init__(self, dist_metric, submit_timestamp, metric_id, metric_type): custom_label = dist_metric.key.metric.namespace + \ '_' + parse_step(dist_metric.key.step) + \ '_' + metric_type + \ '_' + dist_metric.key.metric.name value = getattr(dist_metric.committed, metric_type) super(DistributionMetric, self) \ .__init__(submit_timestamp, metric_id, value, dist_metric, custom_label)
[docs]class RuntimeMetric(Metric): """The Distribution Metric in ready-to-publish format. Args: runtime_list: list of distributions metrics from MetricResult with runtime name metric_id(uuid): unique id to identify test run """ def __init__(self, runtime_list, metric_id): value = self._prepare_runtime_metrics(runtime_list) submit_timestamp = time.time() # Label does not include step name, because it is one value calculated # out of many steps label = runtime_list[0].key.metric.namespace + \ '_' + RUNTIME_METRIC super(RuntimeMetric, self).__init__(submit_timestamp, metric_id, value, None, label) def _prepare_runtime_metrics(self, distributions): min_values = [] max_values = [] for dist in distributions: min_values.append(dist.result.min) max_values.append(dist.result.max) # finding real start min_value = min(min_values) # finding real end max_value = max(max_values) runtime_in_s = float(max_value - min_value) return runtime_in_s
[docs]class ConsoleMetricsPublisher(object): """A :class:`ConsoleMetricsPublisher` publishes collected metrics to console output."""
[docs] def publish(self, results): if len(results) > 0: log = "Load test results for test: %s and timestamp: %s:" \ % (results[0][ID_LABEL], results[0][SUBMIT_TIMESTAMP_LABEL]) _LOGGER.info(log) for result in results: log = "Metric: %s Value: %d" \ % (result[METRICS_TYPE_LABEL], result[VALUE_LABEL]) _LOGGER.info(log) else: _LOGGER.info("No test results were collected.")
[docs]class BigQueryMetricsPublisher(object): """A :class:`BigQueryMetricsPublisher` publishes collected metrics to BigQuery output.""" def __init__(self, project_name, table, dataset): self.bq = BigQueryClient(project_name, table, dataset)
[docs] def publish(self, results): outputs = self.bq.save(results) if len(outputs) > 0: for output in outputs: errors = output['errors'] for err in errors: _LOGGER.error(err['message']) raise ValueError( 'Unable save rows in BigQuery: {}'.format(err['message']))
[docs]class BigQueryClient(object): """A :class:`BigQueryClient` publishes collected metrics to BigQuery output.""" def __init__(self, project_name, table, dataset): self._namespace = table self._client = bigquery.Client(project=project_name) self._schema_names = self._get_schema_names() schema = self._prepare_schema() self._get_or_create_table(schema, dataset) def _get_schema_names(self): return [schema['name'] for schema in SCHEMA] def _prepare_schema(self): return [SchemaField(**row) for row in SCHEMA] def _get_or_create_table(self, bq_schemas, dataset): if self._namespace == '': raise ValueError('Namespace cannot be empty.') dataset = self._get_dataset(dataset) table_ref = dataset.table(self._namespace) try: self._bq_table = self._client.get_table(table_ref) except NotFound: table = bigquery.Table(table_ref, schema=bq_schemas) self._bq_table = self._client.create_table(table) def _get_dataset(self, dataset_name): bq_dataset_ref = self._client.dataset(dataset_name) try: bq_dataset = self._client.get_dataset(bq_dataset_ref) except NotFound: raise ValueError( 'Dataset {} does not exist in your project. ' 'You have to create table first.' .format(dataset_name)) return bq_dataset
[docs] def save(self, results): return self._client.insert_rows(self._bq_table, results)
[docs]class MeasureTime(beam.DoFn): """A distribution metric prepared to be added to pipeline as ParDo to measure runtime.""" def __init__(self, namespace): """Initializes :class:`MeasureTime`. namespace(str): namespace of metric """ self.namespace = namespace self.runtime = Metrics.distribution(self.namespace, RUNTIME_METRIC)
[docs] def start_bundle(self): self.runtime.update(time.time())
[docs] def finish_bundle(self): self.runtime.update(time.time())
[docs] def process(self, element): yield element
[docs]class MeasureBytes(beam.DoFn): """Metric to measure how many bytes was observed in pipeline.""" LABEL = 'total_bytes' def __init__(self, namespace, extractor=None): """Initializes :class:`MeasureBytes`. Args: namespace(str): metric namespace extractor: function to extract elements to be count """ self.namespace = namespace self.counter = Metrics.counter(self.namespace, self.LABEL) self.extractor = extractor if extractor else lambda x: (yield x)
[docs] def process(self, element, *args): for value in self.extractor(element, *args): self.counter.inc(len(value)) yield element
[docs]class CountMessages(beam.DoFn): LABEL = 'total_messages' def __init__(self, namespace): self.namespace = namespace self.counter = Metrics.counter(self.namespace, self.LABEL)
[docs] def process(self, element): self.counter.inc(1) yield element