#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Utilities for the Nexmark suite.
The Nexmark suite is a series of queries (streaming pipelines) performed
on a simulation of auction events. This util includes:
  - A Command class used to terminate the streaming jobs
    launched in nexmark_launcher.py by the DirectRunner.
  - A ParseEventFn DoFn to parse events received from PubSub.
Usage:
To run a process for a certain duration, define in the code:
  command = Command(process_to_terminate, args)
  command.run(timeout=duration)
"""
# pytype: skip-file
import json
import logging
import threading
import apache_beam as beam
from apache_beam.metrics import MetricsFilter
from apache_beam.runners.runner import PipelineResult  # pylint: disable=unused-import
from apache_beam.testing.benchmarks.nexmark.models import auction_bid
from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Timestamp
_LOGGER = logging.getLogger(__name__)
[docs]class Command(object):
  def __init__(self, cmd, args):
    self.cmd = cmd
    self.args = args
[docs]  def run(self, timeout):
    def thread_target():
      logging.debug(
          'Starting thread for %d seconds: %s', timeout, self.cmd.__name__)
      self.cmd(*self.args)
      _LOGGER.info(
          '%d seconds elapsed. Thread (%s) finished.',
          timeout,
          self.cmd.__name__)
    thread = threading.Thread(target=thread_target, name='Thread-timeout')
    thread.daemon = True
    thread.start()
    thread.join(timeout)  
[docs]def setup_coder():
  beam.coders.registry.register_coder(
      nexmark_model.Auction, nexmark_model.AuctionCoder)
  beam.coders.registry.register_coder(
      nexmark_model.Person, nexmark_model.PersonCoder)
  beam.coders.registry.register_coder(nexmark_model.Bid, nexmark_model.BidCoder)
  beam.coders.registry.register_coder(
      auction_bid.AuctionBid, auction_bid.AuctionBidCoder) 
[docs]class ParseEventFn(beam.DoFn):
  """
  Original parser for parsing raw events info into a Python objects.
  Each event line has the following format:
    person: <id starting with 'p'>,name,email,credit_card,city, \
            state,timestamp,extra
    auction: <id starting with 'a'>,item_name, description,initial_bid, \
             reserve_price,timestamp,expires,seller,category,extra
    bid: <auction starting with 'b'>,bidder,price,timestamp,extra
  For example:
    'p12345,maria,maria@maria.com,1234-5678-9012-3456, \
     sunnyvale,CA,1528098831536'
    'a12345,car67,2012 hyundai elantra,15000,20000, \
     1528098831536,20180630,maria,vehicle'
    'b12345,maria,20000,1528098831536'
  """
[docs]  def process(self, elem):
    model_dict = {
        'p': nexmark_model.Person,
        'a': nexmark_model.Auction,
        'b': nexmark_model.Bid,
    }
    row = elem.split(',')
    model = model_dict.get(elem[0])
    if not model:
      raise ValueError('Invalid event: %s.' % row)
    event = model(*row)
    logging.debug('Parsed event: %s', event)
    yield event  
[docs]class ParseJsonEventFn(beam.DoFn):
  """Parses the raw event info into a Python objects.
  Each event line has the following format:
    person:  {id,name,email,credit_card,city, \
              state,timestamp,extra}
    auction: {id,item_name, description,initial_bid, \
              reserve_price,timestamp,expires,seller,category,extra}
    bid:     {auction,bidder,price,timestamp,extra}
  For example:
    {"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\
     "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\
     "dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["}
    {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\
     "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\
     "expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"}
    {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\
     "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"}
  """
[docs]  def process(self, elem):
    json_dict = json.loads(elem)
    if type(json_dict[FieldNames.DATE_TIME]) is dict:
      json_dict[FieldNames.DATE_TIME] = json_dict[
          FieldNames.DATE_TIME]['millis']
    if FieldNames.NAME in json_dict:
      yield nexmark_model.Person(
          json_dict[FieldNames.ID],
          json_dict[FieldNames.NAME],
          json_dict[FieldNames.EMAIL_ADDRESS],
          json_dict[FieldNames.CREDIT_CARD],
          json_dict[FieldNames.CITY],
          json_dict[FieldNames.STATE],
          millis_to_timestamp(json_dict[FieldNames.DATE_TIME]),
          json_dict[FieldNames.EXTRA])
    elif FieldNames.ITEM_NAME in json_dict:
      if type(json_dict[FieldNames.EXPIRES]) is dict:
        json_dict[FieldNames.EXPIRES] = json_dict[FieldNames.EXPIRES]['millis']
      yield nexmark_model.Auction(
          json_dict[FieldNames.ID],
          json_dict[FieldNames.ITEM_NAME],
          json_dict[FieldNames.DESCRIPTION],
          json_dict[FieldNames.INITIAL_BID],
          json_dict[FieldNames.RESERVE],
          millis_to_timestamp(json_dict[FieldNames.DATE_TIME]),
          millis_to_timestamp(json_dict[FieldNames.EXPIRES]),
          json_dict[FieldNames.SELLER],
          json_dict[FieldNames.CATEGORY],
          json_dict[FieldNames.EXTRA])
    elif FieldNames.AUCTION in json_dict:
      yield nexmark_model.Bid(
          json_dict[FieldNames.AUCTION],
          json_dict[FieldNames.BIDDER],
          json_dict[FieldNames.PRICE],
          millis_to_timestamp(json_dict[FieldNames.DATE_TIME]),
          json_dict[FieldNames.EXTRA])
    else:
      raise ValueError('Invalid event: %s.' % str(json_dict))  
[docs]class CountAndLog(beam.PTransform):
[docs]  def expand(self, pcoll):
    return (
        pcoll
        | 'window' >> beam.WindowInto(window.GlobalWindows())
        | "Count" >> beam.combiners.Count.Globally()
        | "Log" >> beam.Map(log_count_info))  
[docs]def log_count_info(count):
  logging.info('Query resulted in %d results', count)
  return count 
[docs]def display(elm):
  logging.debug(elm)
  return elm 
[docs]def model_to_json(model):
  return json.dumps(construct_json_dict(model), separators=(',', ':')) 
[docs]def construct_json_dict(model):
  return {k: unnest_to_json(v) for k, v in model.__dict__.items()} 
[docs]def unnest_to_json(cand):
  if isinstance(cand, Timestamp):
    return cand.micros // 1000
  elif isinstance(
      cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)):
    return construct_json_dict(cand)
  else:
    return cand 
[docs]def millis_to_timestamp(millis):
  # type: (int) -> Timestamp
  micro_second = millis * 1000
  return Timestamp(micros=micro_second) 
[docs]def get_counter_metric(result, namespace, name):
  # type: (PipelineResult, str, str) -> int
  """
  get specific counter metric from pipeline result
  Args:
    result: the PipelineResult which metrics are read from
    namespace: a string representing the namespace of wanted metric
    name: a string representing the  name of the wanted metric
  Returns:
    the result of the wanted metric if it exist, else -1
  """
  metrics = result.metrics().query(
      MetricsFilter().with_namespace(namespace).with_name(name))
  counters = metrics['counters']
  if len(counters) > 1:
    raise RuntimeError(
        '%d instead of one metric result matches name: %s in namespace %s' %
        (len(counters), name, namespace))
  return counters[0].result if len(counters) > 0 else -1 
[docs]def get_start_time_metric(result, namespace, name):
  # type: (PipelineResult, str, str) -> int
  """
  get the start time out of all times recorded by the specified distribution
  metric
  Args:
    result: the PipelineResult which metrics are read from
    namespace: a string representing the namespace of wanted metric
    name: a string representing the  name of the wanted metric
  Returns:
    the smallest time in the metric or -1 if it doesn't exist
  """
  distributions = result.metrics().query(
      MetricsFilter().with_namespace(namespace).with_name(
          name))['distributions']
  min_list = list(map(lambda m: m.result.min, distributions))
  return min(min_list) if len(min_list) > 0 else -1 
[docs]def get_end_time_metric(result, namespace, name):
  # type: (PipelineResult, str, str) -> int
  """
  get the end time out of all times recorded by the specified distribution
  metric
  Args:
    result: the PipelineResult which metrics are read from
    namespace: a string representing the namespace of wanted metric
    name: a string representing the  name of the wanted metric
  Returns:
    the largest time in the metric or -1 if it doesn't exist
  """
  distributions = result.metrics().query(
      MetricsFilter().with_namespace(namespace).with_name(
          name))['distributions']
  max_list = list(map(lambda m: m.result.max, distributions))
  return max(max_list) if len(max_list) > 0 else -1