#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A set of utilities to write pipelines for performance tests.
This module offers a way to create pipelines using synthetic sources and steps.
Exact shape of the pipeline and the behaviour of sources and steps can be
controlled through arguments. Please see function 'parse_args()' for more
details about the arguments.
Shape of the pipeline is primariy controlled through two arguments. Argument
'steps' can be used to define a list of steps as a JSON string. Argument
'barrier' describes how these steps are separated from each other. Argument
'barrier' can be use to build a pipeline as a a series of steps or a tree of
steps with a fanin or a fanout of size 2.
Other arguments describe what gets generated by synthetic sources that produce
data for the pipeline.
"""
from __future__ import absolute_import
from __future__ import division
import argparse
import json
import logging
import math
import time
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.io import iobase
from apache_beam.io import range_trackers
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.testing.test_pipeline import TestPipeline
try:
  import numpy as np
except ImportError:
  np = None
[docs]def parse_byte_size(s):
  suffixes = 'BKMGTP'
  if s[-1] in suffixes:
    return int(float(s[:-1]) * 1024**suffixes.index(s[-1]))
  return int(s) 
[docs]def div_round_up(a, b):
  """Return ceil(a/b)."""
  return int(math.ceil(float(a) / b)) 
[docs]def rotate_key(element):
  """Returns a new key-value pair of the same size but with a different key."""
  (key, value) = element
  return key[-1] + key[:-1], value 
[docs]class SyntheticStep(beam.DoFn):
  """A DoFn of which behavior can be controlled through prespecified parameters.
  """
  def __init__(self, per_element_delay_sec=0, per_bundle_delay_sec=0,
               output_records_per_input_record=1, output_filter_ratio=0):
    if per_element_delay_sec and per_element_delay_sec < 1e-3:
      raise ValueError('Per element sleep time must be at least 1e-3. '
                       'Received: %r', per_element_delay_sec)
    self._per_element_delay_sec = per_element_delay_sec
    self._per_bundle_delay_sec = per_bundle_delay_sec
    self._output_records_per_input_record = output_records_per_input_record
    self._output_filter_ratio = output_filter_ratio
[docs]  def start_bundle(self):
    self._start_time = time.time() 
[docs]  def finish_bundle(self):
    # The target is for the enclosing stage to take as close to as possible
    # the given number of seconds, so we only sleep enough to make up for
    # overheads not incurred elsewhere.
    to_sleep = self._per_bundle_delay_sec - (time.time() - self._start_time)
    # Ignoring sub-millisecond sleep times.
    if to_sleep >= 1e-3:
      time.sleep(to_sleep) 
[docs]  def process(self, element):
    if self._per_element_delay_sec >= 1e-3:
      time.sleep(self._per_element_delay_sec)
    filter_element = False
    if self._output_filter_ratio > 0:
      if np.random.random() < self._output_filter_ratio:
        filter_element = True
    if not filter_element:
      for _ in range(self._output_records_per_input_record):
        yield element  
[docs]class SyntheticSource(iobase.BoundedSource):
  """A custom source of a specified size.
  """
  def __init__(self, input_spec):
    """Initiates a synthetic source.
    Args:
      input_spec: Input specification of the source. See corresponding option in
                  function 'parse_args()' below for more details.
    Raises:
      ValueError: if input parameters are invalid.
    """
    def maybe_parse_byte_size(s):
      return parse_byte_size(s) if isinstance(s, str) else int(s)
    self._num_records = input_spec['numRecords']
    self._key_size = maybe_parse_byte_size(input_spec.get('keySizeBytes', 1))
    self._value_size = maybe_parse_byte_size(
        input_spec.get('valueSizeBytes', 1))
    self._total_size = self.element_size * self._num_records
    self._initial_splitting = (
        input_spec['bundleSizeDistribution']['type']
        if 'bundleSizeDistribution' in input_spec else 'const')
    if self._initial_splitting != 'const' and self._initial_splitting != 'zipf':
      raise ValueError(
          'Only const and zipf distributions are supported for determining '
          'sizes of bundles produced by initial splitting. Received: %s',
          self._initial_splitting)
    self._initial_splitting_num_bundles = (
        input_spec['forceNumInitialBundles']
        if 'forceNumInitialBundles' in input_spec else 0)
    if self._initial_splitting == 'zipf':
      self._initial_splitting_distribution_parameter = (
          input_spec['bundleSizeDistribution']['param'])
      if self._initial_splitting_distribution_parameter < 1:
        raise ValueError(
            'Parameter for a Zipf distribution must be larger than 1. '
            'Received %r.', self._initial_splitting_distribution_parameter)
    else:
      self._initial_splitting_distribution_parameter = 0
    self._dynamic_splitting = (
        'none' if (
            'splitPointFrequencyRecords' in input_spec
            and input_spec['splitPointFrequencyRecords'] == 0)
        else 'perfect')
    if 'delayDistribution' in input_spec:
      if input_spec['delayDistribution']['type'] != 'const':
        raise ValueError('SyntheticSource currently only supports delay '
                         'distributions of type \'const\'. Received %s.',
                         input_spec['delayDistribution']['type'])
      self._sleep_per_input_record_sec = (
          float(input_spec['delayDistribution']['const']) / 1000)
      if (self._sleep_per_input_record_sec and
          self._sleep_per_input_record_sec < 1e-3):
        raise ValueError('Sleep time per input record must be at least 1e-3.'
                         ' Received: %r', self._sleep_per_input_record_sec)
    else:
      self._sleep_per_input_record_sec = 0
  @property
  def element_size(self):
    return self._key_size + self._value_size
[docs]  def estimate_size(self):
    return self._total_size 
[docs]  def split(self, desired_bundle_size, start_position=0, stop_position=None):
    # Performs initial splitting of SyntheticSource.
    #
    # Exact sizes and distribution of initial splits generated here depends on
    # the input specification of the SyntheticSource.
    if stop_position is None:
      stop_position = self._num_records
    if self._initial_splitting == 'zipf':
      desired_num_bundles = self._initial_splitting_num_bundles or math.ceil(
          float(self.estimate_size()) / desired_bundle_size)
      samples = np.random.zipf(self._initial_splitting_distribution_parameter,
                               desired_num_bundles)
      total = sum(samples)
      relative_bundle_sizes = [(float(sample) / total) for sample in samples]
      bundle_ranges = []
      start = start_position
      index = 0
      while start < stop_position:
        if index == desired_num_bundles - 1:
          bundle_ranges.append((start, stop_position))
          break
        stop = start + int(self._num_records * relative_bundle_sizes[index])
        bundle_ranges.append((start, stop))
        start = stop
        index += 1
    else:
      if self._initial_splitting_num_bundles:
        bundle_size_in_elements = max(1, self._num_records /
                                      self._initial_splitting_num_bundles)
      else:
        bundle_size_in_elements = (max(
            div_round_up(desired_bundle_size, self.element_size),
            math.floor(math.sqrt(self._num_records))))
      bundle_ranges = []
      for start in range(start_position, stop_position,
                         bundle_size_in_elements):
        stop = min(start + bundle_size_in_elements, stop_position)
        bundle_ranges.append((start, stop))
    for start, stop in bundle_ranges:
      yield iobase.SourceBundle(stop - start, self, start, stop) 
[docs]  def get_range_tracker(self, start_position, stop_position):
    if start_position is None:
      start_position = 0
    if stop_position is None:
      stop_position = self._num_records
    tracker = range_trackers.OffsetRangeTracker(start_position, stop_position)
    if self._dynamic_splitting == 'none':
      tracker = range_trackers.UnsplittableRangeTracker(tracker)
    return tracker 
[docs]  def read(self, range_tracker):
    index = range_tracker.start_position()
    while range_tracker.try_claim(index):
      r = np.random.RandomState(index)
      time.sleep(self._sleep_per_input_record_sec)
      yield r.bytes(self._key_size), r.bytes(self._value_size)
      index += 1 
[docs]  def default_output_coder(self):
    return beam.coders.TupleCoder(
        [beam.coders.BytesCoder(), beam.coders.BytesCoder()])  
[docs]class ShuffleBarrier(beam.PTransform):
[docs]  def expand(self, pc):
    return (pc
            | beam.Map(rotate_key)
            | beam.GroupByKey()
            | 'Ungroup' >> beam.FlatMap(
                lambda elm: [(elm[0], v) for v in elm[1]]))  
[docs]def merge_using_gbk(name, pc1, pc2):
  """Merges two given PCollections using a CoGroupByKey."""
  pc1_with_key = pc1 | (name + 'AttachKey1') >> beam.Map(lambda x: (x, x))
  pc2_with_key = pc2 | (name + 'AttachKey2') >> beam.Map(lambda x: (x, x))
  grouped = (
      {'pc1': pc1_with_key, 'pc2': pc2_with_key} |
      (name + 'Group') >> beam.CoGroupByKey())
  return (grouped |
          (name + 'DeDup') >> beam.Map(lambda elm: elm[0]))  # Ignoring values 
[docs]def expand_using_gbk(name, pc):
  """Expands a given PCollection into two copies using GroupByKey."""
  ret = []
  ret.append((pc | ('%s.a' % name) >> ShuffleBarrier()))
  ret.append((pc | ('%s.b' % name) >> ShuffleBarrier()))
  return ret 
[docs]def expand_using_second_output(name, pc):
  """Expands a given PCollection into two copies using side outputs."""
  class ExpandFn(beam.DoFn):
    def process(self, element):
      yield beam.pvalue.TaggedOutput('second_out', element)
      yield element
  pc1, pc2 = (pc | name >> beam.ParDo(
      ExpandFn()).with_outputs('second_out', main='main_out'))
  return [pc1, pc2] 
def _parse_steps(json_str):
  """Converts the JSON step description into Python objects.
  See property 'steps' for more details about the JSON step description.
  Args:
    json_str: a JSON string that describes the steps.
  Returns:
    Information about steps as a list of dictionaries. Each dictionary may have
    following properties.
    (1) per_element_delay - amount of delay for each element in seconds.
    (2) per_bundle_delay - minimum amount of delay for a given step in seconds.
    (3) output_records_per_input_record - number of output elements generated
        for each input element to a step.
    (4) output_filter_ratio - the probability at which a step may filter out a
        given element by not producing any output for that element.
  """
  all_steps = []
  json_data = json.loads(json_str)
  for val in json_data:
    steps = {}
    steps['per_element_delay'] = (
        (float(val['per_element_delay_msec']) / 1000)
        if 'per_element_delay_msec' in val else 0)
    steps['per_bundle_delay'] = (
        float(val['per_bundle_delay_sec'])
        if 'per_bundle_delay_sec' in val else 0)
    steps['output_records_per_input_record'] = (
        int(val['output_records_per_input_record'])
        if 'output_records_per_input_record' in val else 1)
    steps['output_filter_ratio'] = (
        float(val['output_filter_ratio'])
        if 'output_filter_ratio' in val else 0)
    all_steps.append(steps)
  return all_steps
[docs]def parse_args(args):
  """Parses a given set of arguments.
  Args:
    args: set of arguments to be passed.
  Returns:
    a tuple where first item gives the set of arguments defined and parsed
    within this method and second item gives the set of unknown arguments.
  """
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--steps',
      dest='steps',
      type=_parse_steps,
      help='A JSON string that gives a list where each entry of the list is '
           'configuration information for a step. Configuration for each step '
           'consists of '
           '(1) A float "per_bundle_delay_sec" (in seconds). Defaults to 0.'
           '(2) A float "per_element_delay_msec" (in milli seconds). '
           '    Defaults to 0.'
           '(3) An integer "output_records_per_input_record". Defaults to 1.'
           '(4) A float "output_filter_ratio" in the range [0, 1] . '
           '    Defaults to 0.')
  parser.add_argument(
      '--input',
      dest='input',
      type=json.loads,
      help='A JSON string that describes the properties of the SyntheticSource '
           'used by the pipeline. Configuration is similar to Java '
           'SyntheticBoundedInput.'
           'Currently supports following properties. '
           '(1) An integer "numRecords". '
           '(2) An integer "keySize". '
           '(3) An integer "valueSize". '
           '(4) A tuple "bundleSizeDistribution" with following values. '
           '    A string "type". Allowed values are "const" and "zipf". '
           '    An float "param". Only used if "type"=="zipf". Must be '
           '    larger than 1. '
           '(5) An integer "forceNumInitialBundles". '
           '(6) An integer "splitPointFrequencyRecords". '
           '(7) A tuple "delayDistribution" with following values. '
           '    A string "type". Only allowed value is "const". '
           '    An integer "const". ')
  parser.add_argument('--barrier',
                      dest='barrier',
                      default='shuffle',
                      choices=['shuffle', 'side-input', 'expand-gbk',
                               'expand-second-output', 'merge-gbk',
                               'merge-side-input'],
                      help='Whether to use shuffle as the barrier '
                           '(as opposed to side inputs).')
  parser.add_argument('--output',
                      dest='output',
                      default='',
                      help='Destination to write output.')
  return parser.parse_known_args(args) 
[docs]def run(argv=None):
  """Runs the workflow."""
  known_args, pipeline_args = parse_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  input_info = known_args.input
  with TestPipeline(options=pipeline_options) as p:
    source = SyntheticSource(input_info)
    # pylint: disable=expression-not-assigned
    barrier = known_args.barrier
    pc_list = []
    num_roots = 2 ** (len(known_args.steps) - 1) if (
        barrier == 'merge-gbk' or barrier == 'merge-side-input') else 1
    for read_no in range(num_roots):
      pc_list.append((p | ('Read %d' % read_no) >> beam.io.Read(source)))
    for step_no, steps in enumerate(known_args.steps):
      if step_no != 0:
        new_pc_list = []
        for pc_no, pc in enumerate(pc_list):
          if barrier == 'shuffle':
            new_pc_list.append(
                (pc |
                 ('shuffle %d.%d' % (step_no, pc_no)) >> ShuffleBarrier()))
          elif barrier == 'side-input':
            new_pc_list.append(
                (pc |
                 ('side-input %d.%d' % (step_no, pc_no)) >> SideInputBarrier()))
          elif barrier == 'expand-gbk':
            new_pc_list.extend(
                expand_using_gbk(('expand-gbk %d.%d' % (step_no, pc_no)), pc))
          elif barrier == 'expand-second-output':
            new_pc_list.extend(
                expand_using_second_output(
                    ('expand-second-output %d.%d' % (step_no, pc_no)), pc))
          elif barrier == 'merge-gbk':
            if pc_no % 2 == 0:
              new_pc_list.append(
                  merge_using_gbk(('merge-gbk %d.%d' % (step_no, pc_no)),
                                  pc, pc_list[pc_no + 1]))
            else:
              continue
          elif barrier == 'merge-side-input':
            if pc_no % 2 == 0:
              new_pc_list.append(
                  merge_using_side_input(
                      ('merge-side-input %d.%d' % (step_no, pc_no)),
                      pc, pc_list[pc_no + 1]))
            else:
              continue
        pc_list = new_pc_list
      new_pc_list = []
      for pc_no, pc in enumerate(pc_list):
        new_pc = pc | 'SyntheticStep %d.%d' % (step_no, pc_no) >> beam.ParDo(
            SyntheticStep(
                per_element_delay_sec=steps['per_element_delay'],
                per_bundle_delay_sec=steps['per_bundle_delay'],
                output_records_per_input_record=
                steps['output_records_per_input_record'],
                output_filter_ratio=
                steps['output_filter_ratio']))
        new_pc_list.append(new_pc)
      pc_list = new_pc_list
    if known_args.output:
      # If an output location is provided we format and write output.
      if len(pc_list) == 1:
        (pc_list[0] |
         'FormatOutput' >> beam.Map(lambda elm: (elm[0] + elm[1])) |
         'WriteOutput' >> WriteToText(known_args.output))
  logging.info('Pipeline run completed.') 
if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()