#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Nexmark launcher.
The Nexmark suite is a series of queries (streaming pipelines) performed
on a simulation of auction events. The launcher orchestrates the generation
and parsing of streaming events and the running of queries.
Model
- Person: Author of an auction or a bid.
- Auction: Item under auction.
- Bid: A bid for an item under auction.
Events
- Create Person
- Create Auction
- Create Bid
Queries
- Query0: Pass through (send and receive auction events).
Usage
- DirectRunner
python nexmark_launcher.py \
--query/q <query number> \
--project <project id> \
--loglevel=DEBUG (optional) \
--wait_until_finish_duration <time_in_ms> \
--streaming
- DataflowRunner
python nexmark_launcher.py \
--query/q <query number> \
--project <project id> \
--region <GCE region> \
--loglevel=DEBUG (optional) \
--wait_until_finish_duration <time_in_ms> \
--streaming \
--sdk_location <apache_beam tar.gz> \
--staging_location=gs://... \
--temp_location=gs://
"""
# pytype: skip-file
import argparse
import logging
import time
import uuid
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.runners import PipelineState
from apache_beam.testing.benchmarks.nexmark import nexmark_util
from apache_beam.testing.benchmarks.nexmark.monitor import Monitor
from apache_beam.testing.benchmarks.nexmark.monitor import MonitorSuffix
from apache_beam.testing.benchmarks.nexmark.nexmark_perf import NexmarkPerf
from apache_beam.testing.benchmarks.nexmark.queries import query0
from apache_beam.testing.benchmarks.nexmark.queries import query1
from apache_beam.testing.benchmarks.nexmark.queries import query2
from apache_beam.testing.benchmarks.nexmark.queries import query3
from apache_beam.testing.benchmarks.nexmark.queries import query4
from apache_beam.testing.benchmarks.nexmark.queries import query5
from apache_beam.testing.benchmarks.nexmark.queries import query6
from apache_beam.testing.benchmarks.nexmark.queries import query7
from apache_beam.testing.benchmarks.nexmark.queries import query8
from apache_beam.testing.benchmarks.nexmark.queries import query9
from apache_beam.testing.benchmarks.nexmark.queries import query10
from apache_beam.testing.benchmarks.nexmark.queries import query11
from apache_beam.testing.benchmarks.nexmark.queries import query12
from apache_beam.transforms import window
[docs]class NexmarkLauncher(object):
# how long after some result is seen and no activity seen do we cancel job
DONE_DELAY = 5 * 60
# delay in seconds between sample perf data
PERF_DELAY = 20
# delay before cancelling the job when pipeline appears to be stuck
TERMINATE_DELAY = 1 * 60 * 60
# delay before warning when pipeline appears to be stuck
WARNING_DELAY = 10 * 60
def __init__(self):
self.parse_args()
self.manage_resources = self.args.manage_resources
self.uuid = str(uuid.uuid4()) if self.manage_resources else ''
self.topic_name = (
self.args.topic_name + self.uuid if self.args.topic_name else None)
self.subscription_name = (
self.args.subscription_name +
self.uuid if self.args.subscription_name else None)
self.pubsub_mode = self.args.pubsub_mode
if self.manage_resources:
from google.cloud import pubsub
self.cleanup()
publish_client = pubsub.Client(project=self.project)
topic = publish_client.topic(self.topic_name)
logging.info('creating topic %s', self.topic_name)
topic.create()
sub = topic.subscription(self.subscription_name)
logging.info('creating sub %s', self.topic_name)
sub.create()
[docs] def parse_args(self):
parser = argparse.ArgumentParser()
parser.add_argument(
'--query',
'-q',
type=int,
action='append',
required=True,
choices=[i for i in range(13)],
help='Query to run')
parser.add_argument(
'--subscription_name',
type=str,
help='Pub/Sub subscription to read from')
parser.add_argument(
'--topic_name', type=str, help='Pub/Sub topic to read from')
parser.add_argument(
'--loglevel',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
default='INFO',
help='Set logging level to debug')
parser.add_argument(
'--input',
type=str,
help='Path to the data file containing nexmark events.')
parser.add_argument(
'--num_events',
type=int,
default=100000,
help='number of events expected to process')
parser.add_argument(
'--manage_resources',
default=False,
action='store_true',
help='If true, manage the creation and cleanup of topics and '
'subscriptions.')
parser.add_argument(
'--pubsub_mode',
type=str,
default='SUBSCRIBE_ONLY',
choices=['PUBLISH_ONLY', 'SUBSCRIBE_ONLY', 'COMBINED'],
help='Pubsub mode used in the pipeline.')
self.args, self.pipeline_args = parser.parse_known_args()
logging.basicConfig(
level=getattr(logging, self.args.loglevel, None),
format='(%(threadName)-10s) %(message)s')
self.pipeline_options = PipelineOptions(self.pipeline_args)
logging.debug('args, pipeline_args: %s, %s', self.args, self.pipeline_args)
# Usage with Dataflow requires a project to be supplied.
self.project = self.pipeline_options.view_as(GoogleCloudOptions).project
self.streaming = self.pipeline_options.view_as(StandardOptions).streaming
self.pipeline_options.view_as(TypeOptions).allow_unsafe_triggers = True
if self.streaming:
if self.args.subscription_name is None or self.project is None:
raise ValueError(
'argument --subscription_name and --project ' +
'are required when running in streaming mode')
else:
if self.args.input is None:
raise ValueError(
'argument --input is required when running in batch mode')
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
self.pipeline_options.view_as(SetupOptions).save_main_session = True
[docs] def generate_events(self):
from google.cloud import pubsub
publish_client = pubsub.Client(project=self.project)
topic = publish_client.topic(self.topic_name)
logging.info('Generating auction events to topic %s', topic.name)
if self.args.input.startswith('gs://'):
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
fs = GCSFileSystem(self.pipeline_options)
with fs.open(self.args.input) as infile:
for line in infile:
topic.publish(line)
else:
with open(self.args.input) as infile:
for line in infile:
topic.publish(line)
logging.info('Finished event generation.')
[docs] def read_from_file(self):
return (
self.pipeline
| 'reading_from_file' >> beam.io.ReadFromText(self.args.input)
| 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn())
| 'timestamping' >>
beam.Map(lambda e: window.TimestampedValue(e, e.date_time)))
[docs] def read_from_pubsub(self):
# Read from PubSub into a PCollection.
if self.subscription_name:
raw_events = self.pipeline | 'ReadPubSub_sub' >> beam.io.ReadFromPubSub(
subscription=self.subscription_name,
with_attributes=True,
timestamp_attribute='timestamp')
else:
raw_events = self.pipeline | 'ReadPubSub_topic' >> beam.io.ReadFromPubSub(
topic=self.topic_name,
with_attributes=True,
timestamp_attribute='timestamp')
events = (
raw_events
| 'pubsub_unwrap' >> beam.Map(lambda m: m.data)
| 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
return events
[docs] def run_query(self, query, query_args, pipeline_options, query_errors):
try:
self.pipeline = beam.Pipeline(options=self.pipeline_options)
nexmark_util.setup_coder()
event_monitor = Monitor('.events', 'event')
result_monitor = Monitor('.results', 'result')
if self.streaming:
if self.pubsub_mode != 'SUBSCRIBE_ONLY':
self.generate_events()
if self.pubsub_mode == 'PUBLISH_ONLY':
return
events = self.read_from_pubsub()
else:
events = self.read_from_file()
events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
output = query.load(events, query_args, pipeline_options)
output | 'result_monitor' >> beam.ParDo(result_monitor.doFn) # pylint: disable=expression-not-assigned
result = self.pipeline.run()
if not self.streaming:
result.wait_until_finish()
perf = self.monitor(result, event_monitor, result_monitor)
self.log_performance(perf)
except Exception as exc:
query_errors.append(str(exc))
raise
[docs] def monitor(self, job, event_monitor, result_monitor):
"""
keep monitoring the performance and progress of running job and cancel
the job if the job is stuck or seems to have finished running
Returns:
the final performance if it is measured
"""
logging.info('starting to monitor the job')
last_active_ms = -1
perf = None
cancel_job = False
waiting_for_shutdown = False
while True:
now = int(time.time() * 1000) # current time in ms
logging.debug('now is %d', now)
curr_perf = NexmarkLauncher.get_performance(
job, event_monitor, result_monitor)
if perf is None or curr_perf.has_progress(perf):
last_active_ms = now
# only judge if the job should be cancelled if it is streaming job and
# has not been shut down already
if self.streaming and not waiting_for_shutdown:
quiet_duration = (now - last_active_ms) // 1000
if (curr_perf.event_count >= self.args.num_events and
curr_perf.result_count >= 0 and quiet_duration > self.DONE_DELAY):
# we think the job is finished if expected input count has been seen
# and no new results have been produced for a while
logging.info('streaming query appears to have finished executing')
waiting_for_shutdown = True
cancel_job = True
elif quiet_duration > self.TERMINATE_DELAY:
logging.error(
'streaming query have been stuck for %d seconds', quiet_duration)
logging.error('canceling streaming job')
waiting_for_shutdown = True
cancel_job = True
elif quiet_duration > self.WARNING_DELAY:
logging.warning(
'streaming query have been stuck for %d seconds', quiet_duration)
if cancel_job:
job.cancel()
perf = curr_perf
stopped = PipelineState.is_terminal(job.state)
if stopped:
break
if not waiting_for_shutdown:
if last_active_ms == now:
logging.info('activity seen, new performance data extracted')
else:
logging.info('no activity seen')
else:
logging.info('waiting for shutdown')
time.sleep(self.PERF_DELAY)
return perf
[docs] def cleanup(self):
if self.manage_resources:
from google.cloud import pubsub
publish_client = pubsub.Client(project=self.project)
topic = publish_client.topic(self.topic_name)
if topic.exists():
logging.info('deleting topic %s', self.topic_name)
topic.delete()
sub = topic.subscription(self.subscription_name)
if sub.exists():
logging.info('deleting sub %s', self.topic_name)
sub.delete()
[docs] def run(self):
queries = {
0: query0,
1: query1,
2: query2,
3: query3,
4: query4,
5: query5,
6: query6,
7: query7,
8: query8,
9: query9,
10: query10,
11: query11,
12: query12
}
# TODO(mariagh): Move to a config file.
query_args = {
'auction_skip': 123,
'window_size_sec': 10,
'window_period_sec': 5,
'fanout': 5,
'num_max_workers': 5,
'max_log_events': 100000,
'occasional_delay_sec': 3,
'max_auction_waiting_time': 600
}
query_errors = []
for i in self.args.query:
logging.info('Running query %d', i)
self.run_query(
queries[i],
query_args,
self.pipeline_options,
query_errors=query_errors)
if query_errors:
logging.error('Query failed with %s', ', '.join(query_errors))
else:
logging.info('Queries run: %s', self.args.query)
if __name__ == '__main__':
launcher = NexmarkLauncher()
launcher.run()
launcher.cleanup()