Source code for apache_beam.testing.benchmarks.nexmark.nexmark_launcher

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Nexmark launcher.

The Nexmark suite is a series of queries (streaming pipelines) performed
on a simulation of auction events. The launcher orchestrates the generation
and parsing of streaming events and the running of queries.

Model
  - Person: Author of an auction or a bid.
  - Auction: Item under auction.
  - Bid: A bid for an item under auction.

Events
 - Create Person
 - Create Auction
 - Create Bid

Queries
  - Query0: Pass through (send and receive auction events).

Usage
  - DirectRunner
      python nexmark_launcher.py \
          --query/q <query number> \
          --project <project id> \
          --loglevel=DEBUG (optional) \
          --wait_until_finish_duration <time_in_ms> \
          --streaming

  - DataflowRunner
      python nexmark_launcher.py \
          --query/q <query number> \
          --project <project id> \
          --region <GCE region> \
          --loglevel=DEBUG (optional) \
          --wait_until_finish_duration <time_in_ms> \
          --streaming \
          --sdk_location <apache_beam tar.gz> \
          --staging_location=gs://... \
          --temp_location=gs://

"""

# pytype: skip-file

from __future__ import absolute_import
from __future__ import print_function

import argparse
import logging
import sys
import uuid

from google.cloud import pubsub

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TestOptions
from apache_beam.testing.benchmarks.nexmark.nexmark_util import Command
from apache_beam.testing.benchmarks.nexmark.queries import query0
from apache_beam.testing.benchmarks.nexmark.queries import query1
from apache_beam.testing.benchmarks.nexmark.queries import query2


[docs]class NexmarkLauncher(object): def __init__(self): self.parse_args() self.uuid = str(uuid.uuid4()) self.topic_name = self.args.topic_name + self.uuid self.subscription_name = self.args.subscription_name + self.uuid publish_client = pubsub.Client(project=self.project) topic = publish_client.topic(self.topic_name) if topic.exists(): logging.info('deleting topic %s', self.topic_name) topic.delete() logging.info('creating topic %s', self.topic_name) topic.create() sub = topic.subscription(self.subscription_name) if sub.exists(): logging.info('deleting sub %s', self.topic_name) sub.delete() logging.info('creating sub %s', self.topic_name) sub.create()
[docs] def parse_args(self): parser = argparse.ArgumentParser() parser.add_argument( '--query', '-q', type=int, action='append', required=True, choices=[0, 1, 2], help='Query to run') parser.add_argument( '--subscription_name', type=str, help='Pub/Sub subscription to read from') parser.add_argument( '--topic_name', type=str, help='Pub/Sub topic to read from') parser.add_argument( '--loglevel', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='Set logging level to debug') parser.add_argument( '--input', type=str, required=True, help='Path to the data file containing nexmark events.') self.args, self.pipeline_args = parser.parse_known_args() logging.basicConfig( level=getattr(logging, self.args.loglevel, None), format='(%(threadName)-10s) %(message)s') self.pipeline_options = PipelineOptions(self.pipeline_args) logging.debug('args, pipeline_args: %s, %s', self.args, self.pipeline_args) # Usage with Dataflow requires a project to be supplied. self.project = self.pipeline_options.view_as(GoogleCloudOptions).project if self.project is None: parser.print_usage() print(sys.argv[0] + ': error: argument --project is required') sys.exit(1) # Pub/Sub is currently available for use only in streaming pipelines. self.streaming = self.pipeline_options.view_as(StandardOptions).streaming if self.streaming is None: parser.print_usage() print(sys.argv[0] + ': error: argument --streaming is required') sys.exit(1) # wait_until_finish ensures that the streaming job is canceled. self.wait_until_finish_duration = ( self.pipeline_options.view_as(TestOptions).wait_until_finish_duration) if self.wait_until_finish_duration is None: parser.print_usage() print(sys.argv[0] + ': error: argument --wait_until_finish_duration is required') # pylint: disable=line-too-long sys.exit(1) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (e.g., a module imported at module level). self.pipeline_options.view_as(SetupOptions).save_main_session = True
[docs] def generate_events(self): publish_client = pubsub.Client(project=self.project) topic = publish_client.topic(self.topic_name) sub = topic.subscription(self.subscription_name) logging.info('Generating auction events to topic %s', topic.name) if self.args.input.startswith('gs://'): from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem fs = GCSFileSystem(self.pipeline_options) with fs.open(self.args.input) as infile: for line in infile: topic.publish(line) else: with open(self.args.input) as infile: for line in infile: topic.publish(line) logging.info('Finished event generation.') # Read from PubSub into a PCollection. if self.args.subscription_name: raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub( subscription=sub.full_name) else: raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub( topic=topic.full_name) return raw_events
[docs] def run_query(self, query, query_args, query_errors): try: self.parse_args() self.pipeline = beam.Pipeline(options=self.pipeline_options) raw_events = self.generate_events() query.load(raw_events, query_args) result = self.pipeline.run() job_duration = ( self.pipeline_options.view_as(TestOptions).wait_until_finish_duration) if self.pipeline_options.view_as(StandardOptions).runner == 'DataflowRunner': # pylint: disable=line-too-long result.wait_until_finish(duration=job_duration) result.cancel() else: result.wait_until_finish() except Exception as exc: query_errors.append(str(exc)) raise
[docs] def cleanup(self): publish_client = pubsub.Client(project=self.project) topic = publish_client.topic(self.topic_name) if topic.exists(): topic.delete() sub = topic.subscription(self.subscription_name) if sub.exists(): sub.delete()
[docs] def run(self): queries = { 0: query0, 1: query1, 2: query2, # TODO(mariagh): Add more queries. } # TODO(mariagh): Move to a config file. query_args = {2: {'auction_id': 'a1003'}} query_errors = [] for i in self.args.query: self.parse_args() logging.info('Running query %d', i) # The DirectRunner is the default runner, and it needs # special handling to cancel streaming jobs. launch_from_direct_runner = self.pipeline_options.view_as( StandardOptions).runner in [None, 'DirectRunner'] query_duration = self.pipeline_options.view_as(TestOptions).wait_until_finish_duration # pylint: disable=line-too-long if launch_from_direct_runner: command = Command( self.run_query, args=[queries[i], query_args.get(i), query_errors]) command.run(timeout=query_duration // 1000) else: try: self.run_query(queries[i], query_args.get(i), query_errors=None) except Exception as exc: query_errors.append(exc) if query_errors: logging.error('Query failed with %s', ', '.join(query_errors)) else: logging.info('Queries run: %s', self.args.query)
if __name__ == '__main__': launcher = NexmarkLauncher() launcher.run() launcher.cleanup()