Source code for apache_beam.runners.dataflow.dataflow_exercise_metrics_pipeline

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A word-counting workflow."""

from __future__ import absolute_import

import time

from hamcrest.library.number.ordering_comparison import greater_than

import apache_beam as beam
from apache_beam.metrics import Metrics
from apache_beam.testing.metric_result_matchers import DistributionMatcher
from apache_beam.testing.metric_result_matchers import MetricResultMatcher

SLEEP_TIME_SECS = 1
INPUT = [0, 0, 0, 100]
METRIC_NAMESPACE = ('apache_beam.runners.dataflow.'
                    'dataflow_exercise_metrics_pipeline.UserMetricsDoFn')


[docs]def common_metric_matchers(): """MetricResult matchers common to all tests.""" # TODO(ajamato): Matcher for the 'metrics' step's ElementCount. # TODO(ajamato): Matcher for the 'metrics' step's MeanByteCount. # TODO(ajamato): Matcher for the start and finish exec times. # TODO(ajamato): Matcher for a gauge metric once implemented in dataflow. matchers = [ # User Counter Metrics. MetricResultMatcher( name='total_values', namespace=METRIC_NAMESPACE, step='metrics', attempted=sum(INPUT), committed=sum(INPUT) ), MetricResultMatcher( name='ExecutionTime_StartBundle', step='metrics', attempted=greater_than(0), committed=greater_than(0) ), MetricResultMatcher( name='ExecutionTime_ProcessElement', step='metrics', attempted=greater_than(0), committed=greater_than(0) ), MetricResultMatcher( name='ExecutionTime_FinishBundle', step='metrics', attempted=greater_than(0), committed=greater_than(0) ) ] pcoll_names = [ 'GroupByKey/Reify-out0', 'GroupByKey/Read-out0', 'map_to_common_key-out0', 'GroupByKey/GroupByWindow-out0', 'GroupByKey/Read-out0', 'GroupByKey/Reify-out0' ] for name in pcoll_names: matchers.extend([ MetricResultMatcher( name='ElementCount', labels={ 'output_user_name': name, 'original_name': '%s-ElementCount' % name }, attempted=greater_than(0), committed=greater_than(0) ), MetricResultMatcher( name='MeanByteCount', labels={ 'output_user_name': name, 'original_name': '%s-MeanByteCount' % name }, attempted=greater_than(0), committed=greater_than(0) ), ]) return matchers
[docs]def fn_api_metric_matchers(): """MetricResult matchers with adjusted step names for the FN API DF test.""" matchers = common_metric_matchers() return matchers
[docs]def legacy_metric_matchers(): """MetricResult matchers with adjusted step names for the legacy DF test.""" # TODO(ajamato): Move these to the common_metric_matchers once implemented # in the FN API. matchers = common_metric_matchers() matchers.extend([ # User distribution metric, legacy DF only. MetricResultMatcher( name='distribution_values', namespace=METRIC_NAMESPACE, step='metrics', attempted=DistributionMatcher( sum_value=sum(INPUT), count_value=len(INPUT), min_value=min(INPUT), max_value=max(INPUT) ), committed=DistributionMatcher( sum_value=sum(INPUT), count_value=len(INPUT), min_value=min(INPUT), max_value=max(INPUT) ), ), # Element count and MeanByteCount for a User ParDo. MetricResultMatcher( name='ElementCount', labels={ 'output_user_name': 'metrics-out0', 'original_name': 'metrics-out0-ElementCount' }, attempted=greater_than(0), committed=greater_than(0) ), MetricResultMatcher( name='MeanByteCount', labels={ 'output_user_name': 'metrics-out0', 'original_name': 'metrics-out0-MeanByteCount' }, attempted=greater_than(0), committed=greater_than(0) ), ]) return matchers
[docs]class UserMetricsDoFn(beam.DoFn): """Parse each line of input text into words.""" def __init__(self): self.total_metric = Metrics.counter(self.__class__, 'total_values') self.dist_metric = Metrics.distribution( self.__class__, 'distribution_values') # TODO(ajamato): Add a verifier for gauge once it is supported by the SDKs # and runners. self.latest_metric = Metrics.gauge(self.__class__, 'latest_value')
[docs] def start_bundle(self): time.sleep(SLEEP_TIME_SECS)
[docs] def process(self, element): """Returns the processed element and increments the metrics.""" elem_int = int(element) self.total_metric.inc(elem_int) self.dist_metric.update(elem_int) self.latest_metric.set(elem_int) time.sleep(SLEEP_TIME_SECS) return [elem_int]
[docs] def finish_bundle(self): time.sleep(SLEEP_TIME_SECS)
[docs]def apply_and_run(pipeline): """Given an initialized Pipeline applies transforms and runs it.""" _ = (pipeline | beam.Create(INPUT) | 'metrics' >> (beam.ParDo(UserMetricsDoFn())) | 'map_to_common_key' >> beam.Map(lambda x: ('key', x)) | beam.GroupByKey() | 'm_out' >> beam.FlatMap(lambda x: [ 1, 2, 3, 4, 5, beam.pvalue.TaggedOutput('once', x), beam.pvalue.TaggedOutput('twice', x), beam.pvalue.TaggedOutput('twice', x)]) ) result = pipeline.run() result.wait_until_finish() return result