# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Query 4, 'Average Price for a Category'. Select the average of the wining bid
prices for all closed auctions in each category. In CQL syntax::
FROM Category C, (SELECT Rstream(MAX(B.price) AS final, A.category)
FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
AND B.datetime < A.expires AND A.expires < CURRENT_TIME
GROUP BY A.id, A.category) Q
WHERE Q.category = C.id
GROUP BY C.id;
For extra spiciness our implementation differs slightly from the above:
* We select both the average winning price and the category.
* We don't bother joining with a static category table, since it's
contents are never used.
* We only consider bids which are above the auction's reserve price.
* We accept the highest-price, earliest valid bid as the winner.
* We calculate the averages oven a sliding window of size
window_size_sec and period window_period_sec.
import apache_beam as beam
from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
from apache_beam.testing.benchmarks.nexmark.queries import winning_bids
from apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import ResultNames
from apache_beam.transforms import window
[docs]def load(events, metadata=None, pipeline_options=None):
# find winning bids for each closed auction
all_winning_bids = (
# key winning bids by auction category
| beam.Map(lambda auc_bid: (auc_bid.auction.category, auc_bid.bid.price))
# re-window for sliding average
# average for each category
# TODO(leiyiz): fanout with sliding window produces duplicated results,
# uncomment after it is fixed
# produce output
[docs] def process(self, element, pane_info=beam.DoFn.PaneInfoParam):