#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Query 10, 'Log to sharded files' (Not in original suite.)
Every window_size_sec, save all events from the last period into
2*max_workers log files.
"""
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.transforms import trigger
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Duration
NUM_SHARD_PER_WORKER = 5
LATE_BATCHING_PERIOD = 10
output_path = None
max_num_workers = 5
num_log_shards = NUM_SHARD_PER_WORKER * max_num_workers
[docs]class OutputFile(object):
  def __init__(self, max_timestamp, shard, index, timing, filename):
    self.max_timestamp = max_timestamp
    self.shard = shard
    self.index = index
    self.timing = timing
    self.filename = filename 
[docs]def open_writable_gcs_file(options, filename):
  # TODO: [https://github.com/apache/beam/issues/20670] it seems that beam team
  #   has not yet decided about this method and it is left blank and
  #   unspecified.
  pass 
[docs]def output_file_for(window, shard, pane):
  """
  Returns:
    an OutputFile object constructed with pane, window and shard.
  """
  filename = '%s/LOG-%s-%s-%03d-%s' % (
      output_path, window.max_timestamp(), shard, pane.index,
      pane.timing) if output_path else None
  return OutputFile(
      window.max_timestamp(), shard, pane.index, pane.timing, filename) 
[docs]def index_path_for(window):
  """
  Returns:
    path to the index file containing all shard names or None if no output_path
      is set
  """
  if output_path:
    return '%s/INDEX-%s' % (output_path, window.max_timestamp())
  else:
    return None 
[docs]def load(events, metadata=None, pipeline_options=None):
  return (
      events
      | 'query10_shard_events' >> beam.ParDo(ShardEventsDoFn())
      # trigger fires when each sub-triger (executed in order) fires
      # repeatedly 1. after at least maxLogEvents in pane
      #            2. or finally when watermark pass the end of window
      # Repeatedly 1. after at least maxLogEvents in pane
      #            2. or processing time pass the first element in pane + delay
      | 'query10_fix_window' >> beam.WindowInto(
          window.FixedWindows(metadata.get('window_size_sec')),
          trigger=trigger.AfterEach(
              trigger.OrFinally(
                  trigger.Repeatedly(
                      trigger.AfterCount(metadata.get('max_log_events'))),
                  trigger.AfterWatermark()),
              trigger.Repeatedly(
                  trigger.AfterAny(
                      trigger.AfterCount(metadata.get('max_log_events')),
                      trigger.AfterProcessingTime(LATE_BATCHING_PERIOD)))),
          accumulation_mode=trigger.AccumulationMode.DISCARDING,
          # Use a 1 day allowed lateness so that any forgotten hold will stall
          # the pipeline for that period and be very noticeable.
          allowed_lateness=Duration.of(1 * 24 * 60 * 60))
      | 'query10_gbk' >> beam.GroupByKey()
      | 'query10_write_event' >> beam.ParDo(WriteEventDoFn(), pipeline_options)
      | 'query10_window_log_files' >> beam.WindowInto(
          window.FixedWindows(metadata.get('window_size_sec')),
          accumulation_mode=trigger.AccumulationMode.DISCARDING,
          allowed_lateness=Duration.of(1 * 24 * 60 * 60))
      | 'query10_gbk_2' >> beam.GroupByKey()
      | 'query10_write_index' >> beam.ParDo(WriteIndexDoFn(), pipeline_options)) 
[docs]class ShardEventsDoFn(beam.DoFn):
[docs]  def process(self, element):
    shard_number = abs(hash(element) % num_log_shards)
    shard = 'shard-%05d-of-%05d' % (shard_number, num_log_shards)
    yield shard, element  
[docs]class WriteEventDoFn(beam.DoFn):
[docs]  def process(
      self,
      element,
      pipeline_options,
      window=beam.DoFn.WindowParam,
      pane_info=beam.DoFn.PaneInfoParam):
    shard = element[0]
    options = pipeline_options.view_as(GoogleCloudOptions)
    output_file = output_file_for(window, shard, pane_info)
    if output_file.filename:
      # not do anything because open_writable_gcs_file does not do anything
      open_writable_gcs_file(options, output_file.filename)
      for event in element[1]:  # pylint: disable=unused-variable
        # write to file
        pass
    yield None, output_file  
[docs]class WriteIndexDoFn(beam.DoFn):
[docs]  def process(self, element, pipeline_options, window=beam.DoFn.WindowParam):
    options = pipeline_options.view_as(GoogleCloudOptions)
    filename = index_path_for(window)
    if filename:
      # not do anything because open_writable_gcs_file does not do anything
      open_writable_gcs_file(options, filename)
      for output_file in element[1]:  # pylint: disable=unused-variable
        # write to file
        pass