#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Utilities for the Nexmark suite.
The Nexmark suite is a series of queries (streaming pipelines) performed
on a simulation of auction events. This util includes:
- A Command class used to terminate the streaming jobs
launched in nexmark_launcher.py by the DirectRunner.
- A ParseEventFn DoFn to parse events received from PubSub.
Usage:
To run a process for a certain duration, define in the code:
command = Command(process_to_terminate, args)
command.run(timeout=duration)
"""
# pytype: skip-file
from __future__ import absolute_import
from __future__ import print_function
import json
import logging
import threading
import apache_beam as beam
from apache_beam.metrics import MetricsFilter
from apache_beam.runners.runner import PipelineResult # pylint: disable=unused-import
from apache_beam.testing.benchmarks.nexmark.models import auction_bid
from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Timestamp
_LOGGER = logging.getLogger(__name__)
[docs]class Command(object):
def __init__(self, cmd, args):
self.cmd = cmd
self.args = args
[docs] def run(self, timeout):
def thread_target():
logging.debug(
'Starting thread for %d seconds: %s', timeout, self.cmd.__name__)
self.cmd(*self.args)
_LOGGER.info(
'%d seconds elapsed. Thread (%s) finished.',
timeout,
self.cmd.__name__)
thread = threading.Thread(target=thread_target, name='Thread-timeout')
thread.daemon = True
thread.start()
thread.join(timeout)
[docs]def setup_coder():
beam.coders.registry.register_coder(
nexmark_model.Auction, nexmark_model.AuctionCoder)
beam.coders.registry.register_coder(
nexmark_model.Person, nexmark_model.PersonCoder)
beam.coders.registry.register_coder(nexmark_model.Bid, nexmark_model.BidCoder)
beam.coders.registry.register_coder(
auction_bid.AuctionBid, auction_bid.AuctionBidCoder)
[docs]class ParseEventFn(beam.DoFn):
"""
Original parser for parsing raw events info into a Python objects.
Each event line has the following format:
person: <id starting with 'p'>,name,email,credit_card,city, \
state,timestamp,extra
auction: <id starting with 'a'>,item_name, description,initial_bid, \
reserve_price,timestamp,expires,seller,category,extra
bid: <auction starting with 'b'>,bidder,price,timestamp,extra
For example:
'p12345,maria,maria@maria.com,1234-5678-9012-3456, \
sunnyvale,CA,1528098831536'
'a12345,car67,2012 hyundai elantra,15000,20000, \
1528098831536,20180630,maria,vehicle'
'b12345,maria,20000,1528098831536'
"""
[docs] def process(self, elem):
model_dict = {
'p': nexmark_model.Person,
'a': nexmark_model.Auction,
'b': nexmark_model.Bid,
}
row = elem.split(',')
model = model_dict.get(elem[0])
if not model:
raise ValueError('Invalid event: %s.' % row)
event = model(*row)
logging.debug('Parsed event: %s', event)
yield event
[docs]class ParseJsonEventFn(beam.DoFn):
"""Parses the raw event info into a Python objects.
Each event line has the following format:
person: {id,name,email,credit_card,city, \
state,timestamp,extra}
auction: {id,item_name, description,initial_bid, \
reserve_price,timestamp,expires,seller,category,extra}
bid: {auction,bidder,price,timestamp,extra}
For example:
{"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\
"creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\
"dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["}
{"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\
"initialBid":28873,"reserve":29448,"dateTime":1528098831036,\
"expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"}
{"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\
"extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"}
"""
[docs] def process(self, elem):
json_dict = json.loads(elem)
if type(json_dict[FieldNames.DATE_TIME]) is dict:
json_dict[FieldNames.DATE_TIME] = json_dict[
FieldNames.DATE_TIME]['millis']
if FieldNames.NAME in json_dict:
yield nexmark_model.Person(
json_dict[FieldNames.ID],
json_dict[FieldNames.NAME],
json_dict[FieldNames.EMAIL_ADDRESS],
json_dict[FieldNames.CREDIT_CARD],
json_dict[FieldNames.CITY],
json_dict[FieldNames.STATE],
millis_to_timestamp(json_dict[FieldNames.DATE_TIME]),
json_dict[FieldNames.EXTRA])
elif FieldNames.ITEM_NAME in json_dict:
if type(json_dict[FieldNames.EXPIRES]) is dict:
json_dict[FieldNames.EXPIRES] = json_dict[FieldNames.EXPIRES]['millis']
yield nexmark_model.Auction(
json_dict[FieldNames.ID],
json_dict[FieldNames.ITEM_NAME],
json_dict[FieldNames.DESCRIPTION],
json_dict[FieldNames.INITIAL_BID],
json_dict[FieldNames.RESERVE],
millis_to_timestamp(json_dict[FieldNames.DATE_TIME]),
millis_to_timestamp(json_dict[FieldNames.EXPIRES]),
json_dict[FieldNames.SELLER],
json_dict[FieldNames.CATEGORY],
json_dict[FieldNames.EXTRA])
elif FieldNames.AUCTION in json_dict:
yield nexmark_model.Bid(
json_dict[FieldNames.AUCTION],
json_dict[FieldNames.BIDDER],
json_dict[FieldNames.PRICE],
millis_to_timestamp(json_dict[FieldNames.DATE_TIME]),
json_dict[FieldNames.EXTRA])
else:
raise ValueError('Invalid event: %s.' % str(json_dict))
[docs]class CountAndLog(beam.PTransform):
[docs] def expand(self, pcoll):
return (
pcoll
| 'window' >> beam.WindowInto(window.GlobalWindows())
| "Count" >> beam.combiners.Count.Globally()
| "Log" >> beam.Map(log_count_info))
[docs]def log_count_info(count):
logging.info('Query resulted in %d results', count)
return count
[docs]def display(elm):
logging.debug(elm)
return elm
[docs]def model_to_json(model):
return json.dumps(construct_json_dict(model), separators=(',', ':'))
[docs]def construct_json_dict(model):
return {k: unnest_to_json(v) for k, v in model.__dict__.items()}
[docs]def unnest_to_json(cand):
if isinstance(cand, Timestamp):
return cand.micros // 1000
elif isinstance(
cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)):
return construct_json_dict(cand)
else:
return cand
[docs]def millis_to_timestamp(millis):
# type: (int) -> Timestamp
micro_second = millis * 1000
return Timestamp(micros=micro_second)
[docs]def get_counter_metric(result, namespace, name):
# type: (PipelineResult, str, str) -> int
"""
get specific counter metric from pipeline result
Args:
result: the PipelineResult which metrics are read from
namespace: a string representing the namespace of wanted metric
name: a string representing the name of the wanted metric
Returns:
the result of the wanted metric if it exist, else -1
"""
metrics = result.metrics().query(
MetricsFilter().with_namespace(namespace).with_name(name))
counters = metrics['counters']
if len(counters) > 1:
raise RuntimeError(
'%d instead of one metric result matches name: %s in namespace %s' %
(len(counters), name, namespace))
return counters[0].result if len(counters) > 0 else -1
[docs]def get_start_time_metric(result, namespace, name):
# type: (PipelineResult, str, str) -> int
"""
get the start time out of all times recorded by the specified distribution
metric
Args:
result: the PipelineResult which metrics are read from
namespace: a string representing the namespace of wanted metric
name: a string representing the name of the wanted metric
Returns:
the smallest time in the metric or -1 if it doesn't exist
"""
distributions = result.metrics().query(
MetricsFilter().with_namespace(namespace).with_name(
name))['distributions']
min_list = list(map(lambda m: m.result.min, distributions))
return min(min_list) if len(min_list) > 0 else -1
[docs]def get_end_time_metric(result, namespace, name):
# type: (PipelineResult, str, str) -> int
"""
get the end time out of all times recorded by the specified distribution
metric
Args:
result: the PipelineResult which metrics are read from
namespace: a string representing the namespace of wanted metric
name: a string representing the name of the wanted metric
Returns:
the largest time in the metric or -1 if it doesn't exist
"""
distributions = result.metrics().query(
MetricsFilter().with_namespace(namespace).with_name(
name))['distributions']
max_list = list(map(lambda m: m.result.max, distributions))
return max(max_list) if len(max_list) > 0 else -1