apache_beam.testing.load_tests.load_test_metrics_utils module

Utility functions used for integrating Metrics API into load tests pipelines.

Metrics are send to BigQuery in following format: test_id | submit_timestamp | metric_type | value

The ‘test_id’ is common for all metrics for one run. Currently it is possible to have following metrics types: * runtime * total_bytes_count

apache_beam.testing.load_tests.load_test_metrics_utils.parse_step(step_name)[source]

Replaces white spaces and removes ‘Step:’ label

Parameters:

step_name (str) – step name passed in metric ParDo

Returns:

lower case step name without namespace and step label

apache_beam.testing.load_tests.load_test_metrics_utils.split_metrics_by_namespace_and_name(metrics, namespace, name)[source]

Splits metrics list namespace and name.

Parameters:
  • metrics – list of metrics from pipeline result

  • namespace (str) – filter metrics by namespace

  • name (str) – filter metrics by name

Returns:

two lists - one of metrics which are matching filters and second of not matching

apache_beam.testing.load_tests.load_test_metrics_utils.get_generic_distributions(generic_dists, metric_id)[source]

Creates flatten list of distributions per its value type. A generic distribution is the one which is not processed but saved in the most raw version.

Parameters:
  • generic_dists – list of distributions to be saved

  • metric_id (uuid) – id of the current test run

Returns:

list of dictionaries made from DistributionMetric

apache_beam.testing.load_tests.load_test_metrics_utils.get_all_distributions_by_type(dist, metric_id)[source]

Creates new list of objects with type of each distribution metric value.

Parameters:
  • dist (object) – DistributionMetric object to be parsed

  • metric_id (uuid) – id of the current test run

Returns:

list of DistributionMetric objects

apache_beam.testing.load_tests.load_test_metrics_utils.get_distribution_dict(metric_type, submit_timestamp, dist, metric_id)[source]

Function creates DistributionMetric

Parameters:
  • metric_type (str) – type of value from distribution metric which will be saved (ex. max, min, mean, sum)

  • submit_timestamp – timestamp when metric is saved

  • dist (object)

  • metric_id (uuid) – id of the current test run

Returns:

dictionary prepared for saving according to schema

class apache_beam.testing.load_tests.load_test_metrics_utils.MetricsReader(project_name=None, bq_table=None, bq_dataset=None, publish_to_bq=False, influxdb_options: InfluxDBMetricsPublisherOptions | None = None, namespace=None, filters=None)[source]

Bases: object

A MetricsReader retrieves metrics from pipeline result, prepares it for publishers and setup publishers.

Initializes MetricsReader .

Parameters:
  • project_name (str) – project with BigQuery where metrics will be saved

  • bq_table (str) – BigQuery table where metrics will be saved

  • bq_dataset (str) – BigQuery dataset where metrics will be saved

  • namespace (str) – Namespace of the metrics

  • filters – MetricFilter to query only filtered metrics

get_counter_metric(result: PipelineResult, name: str) int[source]

Return the current value for a long counter, or -1 if can’t be retrieved. Note this uses only attempted metrics because some runners don’t support committed metrics.

publish_metrics(result: PipelineResult, extra_metrics: dict | None = None)[source]

Publish metrics from pipeline result to registered publishers.

publish_values(labeled_values)[source]

The method to publish simple labeled values.

Parameters:

labeled_values (List[Tuple(str, int)]) – list of (label, value)

class apache_beam.testing.load_tests.load_test_metrics_utils.Metric(submit_timestamp, metric_id, value, metric=None, label=None)[source]

Bases: object

Metric base class in ready-to-save format.

Initializes Metric

Parameters:
  • metric (object) – object of metric result

  • submit_timestamp (float) – date-time of saving metric to database

  • metric_id (uuid) – unique id to identify test run

  • value – value of metric

  • label – custom metric name to be saved in database

as_dict()[source]
class apache_beam.testing.load_tests.load_test_metrics_utils.CounterMetric(counter_metric, submit_timestamp, metric_id)[source]

Bases: Metric

The Counter Metric in ready-to-publish format.

Parameters:
  • counter_metric (object) – counter metric object from MetricResult

  • submit_timestamp (float) – date-time of saving metric to database

  • metric_id (uuid) – unique id to identify test run

class apache_beam.testing.load_tests.load_test_metrics_utils.DistributionMetric(dist_metric, submit_timestamp, metric_id, metric_type)[source]

Bases: Metric

The Distribution Metric in ready-to-publish format.

Parameters:
  • dist_metric (object) – distribution metric object from MetricResult

  • submit_timestamp (float) – date-time of saving metric to database

  • metric_id (uuid) – unique id to identify test run

class apache_beam.testing.load_tests.load_test_metrics_utils.RuntimeMetric(runtime_list, metric_id)[source]

Bases: Metric

The Distribution Metric in ready-to-publish format.

Parameters:
  • runtime_list – list of distributions metrics from MetricResult with runtime name

  • metric_id (uuid) – unique id to identify test run

class apache_beam.testing.load_tests.load_test_metrics_utils.MetricsPublisher[source]

Bases: object

Base class for metrics publishers.

publish(results)[source]
class apache_beam.testing.load_tests.load_test_metrics_utils.ConsoleMetricsPublisher[source]

Bases: MetricsPublisher

A ConsoleMetricsPublisher publishes collected metrics to console output.

publish(results)[source]
class apache_beam.testing.load_tests.load_test_metrics_utils.BigQueryMetricsPublisher(project_name, table, dataset, bq_schema=None)[source]

Bases: MetricsPublisher

A BigQueryMetricsPublisher publishes collected metrics to BigQuery output.

publish(results)[source]
class apache_beam.testing.load_tests.load_test_metrics_utils.BigQueryClient(project_name, table, dataset, bq_schema=None)[source]

Bases: object

A BigQueryClient publishes collected metrics to BigQuery output.

save(results)[source]
class apache_beam.testing.load_tests.load_test_metrics_utils.InfluxDBMetricsPublisherOptions(measurement: str, db_name: str, hostname: str, user: str | None = None, password: str | None = None)[source]

Bases: object

validate() bool[source]
http_auth_enabled() bool[source]
class apache_beam.testing.load_tests.load_test_metrics_utils.InfluxDBMetricsPublisher(options: InfluxDBMetricsPublisherOptions)[source]

Bases: MetricsPublisher

Publishes collected metrics to InfluxDB database.

publish(results: List[Mapping[str, float | str | int]]) None[source]
class apache_beam.testing.load_tests.load_test_metrics_utils.MeasureTime(namespace)[source]

Bases: DoFn

A distribution metric prepared to be added to pipeline as ParDo to measure runtime.

Initializes MeasureTime.

namespace(str): namespace of metric

start_bundle()[source]
finish_bundle()[source]
process(element)[source]
class apache_beam.testing.load_tests.load_test_metrics_utils.MeasureBytes(namespace, extractor=None)[source]

Bases: DoFn

Metric to measure how many bytes was observed in pipeline.

Initializes MeasureBytes.

Parameters:
  • namespace (str) – metric namespace

  • extractor – function to extract elements to be count

LABEL = 'total_bytes'
process(element, *args)[source]
class apache_beam.testing.load_tests.load_test_metrics_utils.CountMessages(namespace)[source]

Bases: DoFn

LABEL = 'total_messages'
process(element)[source]
class apache_beam.testing.load_tests.load_test_metrics_utils.MeasureLatency(namespace)[source]

Bases: DoFn

A distribution metric which captures the latency based on the timestamps of the processed elements.

Initializes MeasureLatency.

namespace(str): namespace of metric

LABEL = 'latency'
process(element, timestamp=TimestampParam)[source]
class apache_beam.testing.load_tests.load_test_metrics_utils.AssignTimestamps[source]

Bases: DoFn

DoFn to assigned timestamps to elements.

process(element)[source]