Source code for apache_beam.testing.benchmarks.nexmark.nexmark_util

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Utilities for the Nexmark suite.

The Nexmark suite is a series of queries (streaming pipelines) performed
on a simulation of auction events. This util includes:

  - A Command class used to terminate the streaming jobs
    launched in nexmark_launcher.py by the DirectRunner.
  - A ParseEventFn DoFn to parse events received from PubSub.

Usage:

To run a process for a certain duration, define in the code:
  command = Command(process_to_terminate, args)
  command.run(timeout=duration)

"""

# pytype: skip-file

import json
import logging
import threading

import apache_beam as beam
from apache_beam.metrics import MetricsFilter
from apache_beam.runners.runner import PipelineResult  # pylint: disable=unused-import
from apache_beam.testing.benchmarks.nexmark.models import auction_bid
from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Timestamp

_LOGGER = logging.getLogger(__name__)


[docs]class Command(object): def __init__(self, cmd, args): self.cmd = cmd self.args = args
[docs] def run(self, timeout): def thread_target(): logging.debug( 'Starting thread for %d seconds: %s', timeout, self.cmd.__name__) self.cmd(*self.args) _LOGGER.info( '%d seconds elapsed. Thread (%s) finished.', timeout, self.cmd.__name__) thread = threading.Thread(target=thread_target, name='Thread-timeout') thread.daemon = True thread.start() thread.join(timeout)
[docs]def setup_coder(): beam.coders.registry.register_coder( nexmark_model.Auction, nexmark_model.AuctionCoder) beam.coders.registry.register_coder( nexmark_model.Person, nexmark_model.PersonCoder) beam.coders.registry.register_coder(nexmark_model.Bid, nexmark_model.BidCoder) beam.coders.registry.register_coder( auction_bid.AuctionBid, auction_bid.AuctionBidCoder)
[docs]class ParseEventFn(beam.DoFn): """ Original parser for parsing raw events info into a Python objects. Each event line has the following format: person: <id starting with 'p'>,name,email,credit_card,city, \ state,timestamp,extra auction: <id starting with 'a'>,item_name, description,initial_bid, \ reserve_price,timestamp,expires,seller,category,extra bid: <auction starting with 'b'>,bidder,price,timestamp,extra For example: 'p12345,maria,maria@maria.com,1234-5678-9012-3456, \ sunnyvale,CA,1528098831536' 'a12345,car67,2012 hyundai elantra,15000,20000, \ 1528098831536,20180630,maria,vehicle' 'b12345,maria,20000,1528098831536' """
[docs] def process(self, elem): model_dict = { 'p': nexmark_model.Person, 'a': nexmark_model.Auction, 'b': nexmark_model.Bid, } row = elem.split(',') model = model_dict.get(elem[0]) if not model: raise ValueError('Invalid event: %s.' % row) event = model(*row) logging.debug('Parsed event: %s', event) yield event
[docs]class ParseJsonEventFn(beam.DoFn): """Parses the raw event info into a Python objects. Each event line has the following format: person: {id,name,email,credit_card,city, \ state,timestamp,extra} auction: {id,item_name, description,initial_bid, \ reserve_price,timestamp,expires,seller,category,extra} bid: {auction,bidder,price,timestamp,extra} For example: {"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\ "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\ "dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["} {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\ "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\ "expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"} {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\ "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"} """
[docs] def process(self, elem): json_dict = json.loads(elem) if type(json_dict[FieldNames.DATE_TIME]) is dict: json_dict[FieldNames.DATE_TIME] = json_dict[ FieldNames.DATE_TIME]['millis'] if FieldNames.NAME in json_dict: yield nexmark_model.Person( json_dict[FieldNames.ID], json_dict[FieldNames.NAME], json_dict[FieldNames.EMAIL_ADDRESS], json_dict[FieldNames.CREDIT_CARD], json_dict[FieldNames.CITY], json_dict[FieldNames.STATE], millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), json_dict[FieldNames.EXTRA]) elif FieldNames.ITEM_NAME in json_dict: if type(json_dict[FieldNames.EXPIRES]) is dict: json_dict[FieldNames.EXPIRES] = json_dict[FieldNames.EXPIRES]['millis'] yield nexmark_model.Auction( json_dict[FieldNames.ID], json_dict[FieldNames.ITEM_NAME], json_dict[FieldNames.DESCRIPTION], json_dict[FieldNames.INITIAL_BID], json_dict[FieldNames.RESERVE], millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), millis_to_timestamp(json_dict[FieldNames.EXPIRES]), json_dict[FieldNames.SELLER], json_dict[FieldNames.CATEGORY], json_dict[FieldNames.EXTRA]) elif FieldNames.AUCTION in json_dict: yield nexmark_model.Bid( json_dict[FieldNames.AUCTION], json_dict[FieldNames.BIDDER], json_dict[FieldNames.PRICE], millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), json_dict[FieldNames.EXTRA]) else: raise ValueError('Invalid event: %s.' % str(json_dict))
[docs]class CountAndLog(beam.PTransform):
[docs] def expand(self, pcoll): return ( pcoll | 'window' >> beam.WindowInto(window.GlobalWindows()) | "Count" >> beam.combiners.Count.Globally() | "Log" >> beam.Map(log_count_info))
[docs]def log_count_info(count): logging.info('Query resulted in %d results', count) return count
[docs]def display(elm): logging.debug(elm) return elm
[docs]def model_to_json(model): return json.dumps(construct_json_dict(model), separators=(',', ':'))
[docs]def construct_json_dict(model): return {k: unnest_to_json(v) for k, v in model.__dict__.items()}
[docs]def unnest_to_json(cand): if isinstance(cand, Timestamp): return cand.micros // 1000 elif isinstance( cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)): return construct_json_dict(cand) else: return cand
[docs]def millis_to_timestamp(millis): # type: (int) -> Timestamp micro_second = millis * 1000 return Timestamp(micros=micro_second)
[docs]def get_counter_metric(result, namespace, name): # type: (PipelineResult, str, str) -> int """ get specific counter metric from pipeline result Args: result: the PipelineResult which metrics are read from namespace: a string representing the namespace of wanted metric name: a string representing the name of the wanted metric Returns: the result of the wanted metric if it exist, else -1 """ metrics = result.metrics().query( MetricsFilter().with_namespace(namespace).with_name(name)) counters = metrics['counters'] if len(counters) > 1: raise RuntimeError( '%d instead of one metric result matches name: %s in namespace %s' % (len(counters), name, namespace)) return counters[0].result if len(counters) > 0 else -1
[docs]def get_start_time_metric(result, namespace, name): # type: (PipelineResult, str, str) -> int """ get the start time out of all times recorded by the specified distribution metric Args: result: the PipelineResult which metrics are read from namespace: a string representing the namespace of wanted metric name: a string representing the name of the wanted metric Returns: the smallest time in the metric or -1 if it doesn't exist """ distributions = result.metrics().query( MetricsFilter().with_namespace(namespace).with_name( name))['distributions'] min_list = list(map(lambda m: m.result.min, distributions)) return min(min_list) if len(min_list) > 0 else -1
[docs]def get_end_time_metric(result, namespace, name): # type: (PipelineResult, str, str) -> int """ get the end time out of all times recorded by the specified distribution metric Args: result: the PipelineResult which metrics are read from namespace: a string representing the namespace of wanted metric name: a string representing the name of the wanted metric Returns: the largest time in the metric or -1 if it doesn't exist """ distributions = result.metrics().query( MetricsFilter().with_namespace(namespace).with_name( name))['distributions'] max_list = list(map(lambda m: m.result.max, distributions)) return max(max_list) if len(max_list) > 0 else -1