#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Nexmark launcher.
The Nexmark suite is a series of queries (streaming pipelines) performed
on a simulation of auction events. The launcher orchestrates the generation
and parsing of streaming events and the running of queries.
Model
  - Person: Author of an auction or a bid.
  - Auction: Item under auction.
  - Bid: A bid for an item under auction.
Events
 - Create Person
 - Create Auction
 - Create Bid
Queries
  - Query0: Pass through (send and receive auction events).
Usage
  - DirectRunner
      python nexmark_launcher.py \
          --query/q <query number> \
          --project <project id> \
          --loglevel=DEBUG (optional) \
          --wait_until_finish_duration <time_in_ms> \
          --streaming
  - DataflowRunner
      python nexmark_launcher.py \
          --query/q <query number> \
          --project <project id> \
          --region <GCE region> \
          --loglevel=DEBUG (optional) \
          --wait_until_finish_duration <time_in_ms> \
          --streaming \
          --sdk_location <apache_beam tar.gz> \
          --staging_location=gs://... \
          --temp_location=gs://
"""
# pytype: skip-file
import argparse
import logging
import time
import uuid
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.runners import PipelineState
from apache_beam.testing.benchmarks.nexmark import nexmark_util
from apache_beam.testing.benchmarks.nexmark.monitor import Monitor
from apache_beam.testing.benchmarks.nexmark.monitor import MonitorSuffix
from apache_beam.testing.benchmarks.nexmark.nexmark_perf import NexmarkPerf
from apache_beam.testing.benchmarks.nexmark.queries import query0
from apache_beam.testing.benchmarks.nexmark.queries import query1
from apache_beam.testing.benchmarks.nexmark.queries import query2
from apache_beam.testing.benchmarks.nexmark.queries import query3
from apache_beam.testing.benchmarks.nexmark.queries import query4
from apache_beam.testing.benchmarks.nexmark.queries import query5
from apache_beam.testing.benchmarks.nexmark.queries import query6
from apache_beam.testing.benchmarks.nexmark.queries import query7
from apache_beam.testing.benchmarks.nexmark.queries import query8
from apache_beam.testing.benchmarks.nexmark.queries import query9
from apache_beam.testing.benchmarks.nexmark.queries import query10
from apache_beam.testing.benchmarks.nexmark.queries import query11
from apache_beam.testing.benchmarks.nexmark.queries import query12
from apache_beam.transforms import window
[docs]class NexmarkLauncher(object):
  # how long after some result is seen and no activity seen do we cancel job
  DONE_DELAY = 5 * 60
  # delay in seconds between sample perf data
  PERF_DELAY = 20
  # delay before cancelling the job when pipeline appears to be stuck
  TERMINATE_DELAY = 1 * 60 * 60
  # delay before warning when pipeline appears to be stuck
  WARNING_DELAY = 10 * 60
  def __init__(self):
    self.parse_args()
    self.manage_resources = self.args.manage_resources
    self.uuid = str(uuid.uuid4()) if self.manage_resources else ''
    self.topic_name = (
        self.args.topic_name + self.uuid if self.args.topic_name else None)
    self.subscription_name = (
        self.args.subscription_name +
        self.uuid if self.args.subscription_name else None)
    self.pubsub_mode = self.args.pubsub_mode
    if self.manage_resources:
      from google.cloud import pubsub
      self.cleanup()
      publish_client = pubsub.Client(project=self.project)
      topic = publish_client.topic(self.topic_name)
      logging.info('creating topic %s', self.topic_name)
      topic.create()
      sub = topic.subscription(self.subscription_name)
      logging.info('creating sub %s', self.topic_name)
      sub.create()
[docs]  def parse_args(self):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--query',
        '-q',
        type=int,
        action='append',
        required=True,
        choices=[i for i in range(13)],
        help='Query to run')
    parser.add_argument(
        '--subscription_name',
        type=str,
        help='Pub/Sub subscription to read from')
    parser.add_argument(
        '--topic_name', type=str, help='Pub/Sub topic to read from')
    parser.add_argument(
        '--loglevel',
        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
        default='INFO',
        help='Set logging level to debug')
    parser.add_argument(
        '--input',
        type=str,
        help='Path to the data file containing nexmark events.')
    parser.add_argument(
        '--num_events',
        type=int,
        default=100000,
        help='number of events expected to process')
    parser.add_argument(
        '--manage_resources',
        default=False,
        action='store_true',
        help='If true, manage the creation and cleanup of topics and '
        'subscriptions.')
    parser.add_argument(
        '--pubsub_mode',
        type=str,
        default='SUBSCRIBE_ONLY',
        choices=['PUBLISH_ONLY', 'SUBSCRIBE_ONLY', 'COMBINED'],
        help='Pubsub mode used in the pipeline.')
    self.args, self.pipeline_args = parser.parse_known_args()
    logging.basicConfig(
        level=getattr(logging, self.args.loglevel, None),
        format='(%(threadName)-10s) %(message)s')
    self.pipeline_options = PipelineOptions(self.pipeline_args)
    logging.debug('args, pipeline_args: %s, %s', self.args, self.pipeline_args)
    # Usage with Dataflow requires a project to be supplied.
    self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
    self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
    self.pipeline_options.view_as(TypeOptions).allow_unsafe_triggers = True
    if self.streaming:
      if self.args.subscription_name is None or self.project is None:
        raise ValueError(
            'argument --subscription_name and --project ' +
            'are required when running in streaming mode')
    else:
      if self.args.input is None:
        raise ValueError(
            'argument --input is required when running in batch mode')
    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    self.pipeline_options.view_as(SetupOptions).save_main_session = True 
[docs]  def generate_events(self):
    from google.cloud import pubsub
    publish_client = pubsub.Client(project=self.project)
    topic = publish_client.topic(self.topic_name)
    logging.info('Generating auction events to topic %s', topic.name)
    if self.args.input.startswith('gs://'):
      from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
      fs = GCSFileSystem(self.pipeline_options)
      with fs.open(self.args.input) as infile:
        for line in infile:
          topic.publish(line)
    else:
      with open(self.args.input) as infile:
        for line in infile:
          topic.publish(line)
    logging.info('Finished event generation.') 
[docs]  def read_from_file(self):
    return (
        self.pipeline
        | 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
        | 'timestamping' >>
        beam.Map(lambda e: window.TimestampedValue(e, e.date_time))) 
[docs]  def read_from_pubsub(self):
    # Read from PubSub into a PCollection.
    if self.subscription_name:
      raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
          subscription=self.subscription_name,
          with_attributes=True,
          timestamp_attribute='timestamp')
    else:
      raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
          topic=self.topic_name,
          with_attributes=True,
          timestamp_attribute='timestamp')
    events = (
        raw_events
        | 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
        | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
    return events 
[docs]  def run_query(self, query, query_args, pipeline_options, query_errors):
    try:
      self.pipeline = beam.Pipeline(options=self.pipeline_options)
      nexmark_util.setup_coder()
      event_monitor = Monitor('.events', 'event')
      result_monitor = Monitor('.results', 'result')
      if self.streaming:
        if self.pubsub_mode != 'SUBSCRIBE_ONLY':
          self.generate_events()
        if self.pubsub_mode == 'PUBLISH_ONLY':
          return
        events = self.read_from_pubsub()
      else:
        events = self.read_from_file()
      events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
      output = query.load(events, query_args, pipeline_options)
      output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: disable=expression-not-assigned
      result = self.pipeline.run()
      if not self.streaming:
        result.wait_until_finish()
      perf = self.monitor(result, event_monitor, result_monitor)
      self.log_performance(perf)
    except Exception as exc:
      query_errors.append(str(exc))
      raise 
[docs]  def monitor(self, job, event_monitor, result_monitor):
    """
    keep monitoring the performance and progress of running job and cancel
    the job if the job is stuck or seems to have finished running
    Returns:
      the final performance if it is measured
    """
    logging.info('starting to monitor the job')
    last_active_ms = -1
    perf = None
    cancel_job = False
    waiting_for_shutdown = False
    while True:
      now = int(time.time() * 1000)  # current time in ms
      logging.debug('now is %d', now)
      curr_perf = NexmarkLauncher.get_performance(
          job, event_monitor, result_monitor)
      if perf is None or curr_perf.has_progress(perf):
        last_active_ms = now
      # only judge if the job should be cancelled if it is streaming job and
      # has not been shut down already
      if self.streaming and not waiting_for_shutdown:
        quiet_duration = (now - last_active_ms) // 1000
        if (curr_perf.event_count >= self.args.num_events and
            curr_perf.result_count >= 0 and quiet_duration > self.DONE_DELAY):
          # we think the job is finished if expected input count has been seen
          # and no new results have been produced for a while
          logging.info('streaming query appears to have finished executing')
          waiting_for_shutdown = True
          cancel_job = True
        elif quiet_duration > self.TERMINATE_DELAY:
          logging.error(
              'streaming query have been stuck for %d seconds', quiet_duration)
          logging.error('canceling streaming job')
          waiting_for_shutdown = True
          cancel_job = True
        elif quiet_duration > self.WARNING_DELAY:
          logging.warning(
              'streaming query have been stuck for %d seconds', quiet_duration)
        if cancel_job:
          job.cancel()
      perf = curr_perf
      stopped = PipelineState.is_terminal(job.state)
      if stopped:
        break
      if not waiting_for_shutdown:
        if last_active_ms == now:
          logging.info('activity seen, new performance data extracted')
        else:
          logging.info('no activity seen')
      else:
        logging.info('waiting for shutdown')
      time.sleep(self.PERF_DELAY)
    return perf 
[docs]  def cleanup(self):
    if self.manage_resources:
      from google.cloud import pubsub
      publish_client = pubsub.Client(project=self.project)
      topic = publish_client.topic(self.topic_name)
      if topic.exists():
        logging.info('deleting topic %s', self.topic_name)
        topic.delete()
      sub = topic.subscription(self.subscription_name)
      if sub.exists():
        logging.info('deleting sub %s', self.topic_name)
        sub.delete() 
[docs]  def run(self):
    queries = {
        0: query0,
        1: query1,
        2: query2,
        3: query3,
        4: query4,
        5: query5,
        6: query6,
        7: query7,
        8: query8,
        9: query9,
        10: query10,
        11: query11,
        12: query12
    }
    # TODO(mariagh): Move to a config file.
    query_args = {
        'auction_skip': 123,
        'window_size_sec': 10,
        'window_period_sec': 5,
        'fanout': 5,
        'num_max_workers': 5,
        'max_log_events': 100000,
        'occasional_delay_sec': 3,
        'max_auction_waiting_time': 600
    }
    query_errors = []
    for i in self.args.query:
      logging.info('Running query %d', i)
      self.run_query(
          queries[i],
          query_args,
          self.pipeline_options,
          query_errors=query_errors)
    if query_errors:
      logging.error('Query failed with %s', ', '.join(query_errors))
    else:
      logging.info('Queries run: %s', self.args.query)  
if __name__ == '__main__':
  launcher = NexmarkLauncher()
  launcher.run()
  launcher.cleanup()