#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Query 6, 'Average Selling Price by Seller'. Select the average selling price
over the last 10 closed auctions by the same seller. In CQL syntax::
  SELECT Istream(AVG(Q.final), Q.seller)
  FROM (SELECT Rstream(MAX(B.price) AS final, A.seller)
    FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
    WHERE A.id=B.auction
      AND B.datetime < A.expires AND A.expires < CURRENT_TIME
    GROUP BY A.id, A.seller) [PARTITION BY A.seller ROWS 10] Q
  GROUP BY Q.seller;
"""
import apache_beam as beam
from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
from apache_beam.testing.benchmarks.nexmark.queries import winning_bids
from apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import ResultNames
from apache_beam.transforms import trigger
from apache_beam.transforms import window
[docs]def load(events, metadata=None, pipeline_options=None):
  # find winning bids for each closed auction
  return (
      events
      # find winning bids
      | beam.Filter(nexmark_query_util.auction_or_bid)
      | winning_bids.WinningBids()
      # (auction_bids -> (aution.seller, bid)
      | beam.Map(lambda auc_bid: (auc_bid.auction.seller, auc_bid.bid))
      # calculate and output mean as data arrives
      | beam.WindowInto(
          window.GlobalWindows(),
          trigger=trigger.Repeatedly(trigger.AfterCount(1)),
          accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
          allowed_lateness=0)
      | beam.CombinePerKey(MovingMeanSellingPriceFn(10))
      | beam.Map(lambda t: {
          ResultNames.SELLER: t[0], ResultNames.PRICE: t[1]
      })) 
[docs]class MovingMeanSellingPriceFn(beam.CombineFn):
  """
  Combiner to keep track of up to max_num_bids of the most recent wining
  bids and calculate their average selling price.
  """
  def __init__(self, max_num_bids):
    self.max_num_bids = max_num_bids
[docs]  def create_accumulator(self):
    return [] 
[docs]  def merge_accumulators(self, accumulators):
    new_accu = []
    for accumulator in accumulators:
      new_accu += accumulator
    new_accu.sort(key=lambda bid: (bid.date_time, bid.price))
    return new_accu[-self.max_num_bids:]