Source code for apache_beam.testing.load_tests.load_test_metrics_utils

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Utility functions used for integrating Metrics API into load tests pipelines.
"""

from __future__ import absolute_import

import logging
import time

import apache_beam as beam
from apache_beam.metrics import Metrics

try:
  from google.cloud import bigquery
  from google.cloud.bigquery.schema import SchemaField
  from google.cloud.exceptions import NotFound
except ImportError:
  bigquery = None
  SchemaField = None
  NotFound = None

RUNTIME_LABEL = 'runtime'
SUBMIT_TIMESTAMP_LABEL = 'submit_timestamp'


def _get_schema_field(schema_field):
  return SchemaField(
      name=schema_field['name'],
      field_type=schema_field['type'],
      mode=schema_field['mode'])


[docs]class BigQueryClient(object): def __init__(self, project_name, table, dataset, schema_map): self._namespace = table self._bq_client = bigquery.Client(project=project_name) schema = self._parse_schema(schema_map) self._schema_names = self._get_schema_names(schema) schema = self._prepare_schema(schema) self._get_or_create_table(schema, dataset)
[docs] def match_and_save(self, result_list): rows_tuple = tuple(self._match_inserts_by_schema(result_list)) self._insert_data(rows_tuple)
def _match_inserts_by_schema(self, insert_list): for name in self._schema_names: yield self._get_element_by_schema(name, insert_list) def _get_element_by_schema(self, schema_name, insert_list): for metric in insert_list: if metric['label'] == schema_name: return metric['value'] return None def _insert_data(self, rows_tuple): errors = self._bq_client.insert_rows(self._bq_table, rows=[rows_tuple]) if len(errors) > 0: for err in errors: logging.error(err['message']) raise ValueError('Unable save rows in BigQuery.') def _get_dataset(self, dataset_name): bq_dataset_ref = self._bq_client.dataset(dataset_name) try: bq_dataset = self._bq_client.get_dataset(bq_dataset_ref) except NotFound: raise ValueError( 'Dataset {} does not exist in your project. ' 'You have to create table first.' .format(dataset_name)) return bq_dataset def _get_or_create_table(self, bq_schemas, dataset): if self._namespace == '': raise ValueError('Namespace cannot be empty.') dataset = self._get_dataset(dataset) table_ref = dataset.table(self._namespace) try: self._bq_table = self._bq_client.get_table(table_ref) except NotFound: table = bigquery.Table(table_ref, schema=bq_schemas) self._bq_table = self._bq_client.create_table(table) def _parse_schema(self, schema_map): return [{'name': SUBMIT_TIMESTAMP_LABEL, 'type': 'TIMESTAMP', 'mode': 'REQUIRED'}] + schema_map def _prepare_schema(self, schemas): return [_get_schema_field(schema) for schema in schemas] def _get_schema_names(self, schemas): return [schema['name'] for schema in schemas]
[docs]class MetricsMonitor(object): def __init__(self, project_name, table, dataset, schema_map): if project_name is not None: self.bq = BigQueryClient(project_name, table, dataset, schema_map)
[docs] def send_metrics(self, result): metrics = result.metrics().query() counters = metrics['counters'] counters_list = [] if len(counters) > 0: counters_list = self._prepare_counter_metrics(counters) distributions = metrics['distributions'] dist_list = [] if len(distributions) > 0: dist_list = self._prepare_runtime_metrics(distributions) timestamp = {'label': SUBMIT_TIMESTAMP_LABEL, 'value': time.time()} insert_list = [timestamp] + dist_list + counters_list self.bq.match_and_save(insert_list)
def _prepare_counter_metrics(self, counters): for counter in counters: logging.info("Counter: %s", counter) counters_list = [] for counter in counters: counter_commited = counter.committed counter_label = str(counter.key.metric.name) counters_list.append( {'label': counter_label, 'value': counter_commited}) return counters_list def _prepare_runtime_metrics(self, distributions): min_values = [] max_values = [] for dist in distributions: logging.info("Distribution: %s", dist) min_values.append(dist.committed.min) max_values.append(dist.committed.max) # finding real start min_value = min(min_values) # finding real end max_value = max(max_values) runtime_in_s = max_value - min_value logging.info("Runtime: %s", runtime_in_s) runtime_in_s = float(runtime_in_s) return [{'label': RUNTIME_LABEL, 'value': runtime_in_s}]
[docs]class MeasureTime(beam.DoFn): def __init__(self, namespace): self.namespace = namespace self.runtime = Metrics.distribution(self.namespace, RUNTIME_LABEL)
[docs] def start_bundle(self): self.runtime.update(time.time())
[docs] def finish_bundle(self): self.runtime.update(time.time())
[docs] def process(self, element): yield element
[docs]def count_bytes(counter_name): def layer(f): def repl(*args): namespace = args[2] counter = Metrics.counter(namespace, counter_name) element = args[1] _, value = element for i in range(len(value)): counter.inc(i) return f(*args) return repl return layer