#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Query 6, 'Average Selling Price by Seller'. Select the average selling price
over the last 10 closed auctions by the same seller. In CQL syntax::
SELECT Istream(AVG(Q.final), Q.seller)
FROM (SELECT Rstream(MAX(B.price) AS final, A.seller)
FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
WHERE A.id=B.auction
AND B.datetime < A.expires AND A.expires < CURRENT_TIME
GROUP BY A.id, A.seller) [PARTITION BY A.seller ROWS 10] Q
GROUP BY Q.seller;
"""
import apache_beam as beam
from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
from apache_beam.testing.benchmarks.nexmark.queries import winning_bids
from apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import ResultNames
from apache_beam.transforms import trigger
from apache_beam.transforms import window
[docs]def load(events, metadata=None, pipeline_options=None):
# find winning bids for each closed auction
return (
events
# find winning bids
| beam.Filter(nexmark_query_util.auction_or_bid)
| winning_bids.WinningBids()
# (auction_bids -> (aution.seller, bid)
| beam.Map(lambda auc_bid: (auc_bid.auction.seller, auc_bid.bid))
# calculate and output mean as data arrives
| beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
allowed_lateness=0)
| beam.CombinePerKey(MovingMeanSellingPriceFn(10))
| beam.Map(lambda t: {
ResultNames.SELLER: t[0], ResultNames.PRICE: t[1]
}))
[docs]class MovingMeanSellingPriceFn(beam.CombineFn):
"""
Combiner to keep track of up to max_num_bids of the most recent wining
bids and calculate their average selling price.
"""
def __init__(self, max_num_bids):
self.max_num_bids = max_num_bids
[docs] def create_accumulator(self):
return []
[docs] def merge_accumulators(self, accumulators):
new_accu = []
for accumulator in accumulators:
new_accu += accumulator
new_accu.sort(key=lambda bid: (bid.date_time, bid.price))
return new_accu[-self.max_num_bids:]