#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
A transform to find winning bids for each closed auction. In pseudo CQL syntax:
SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime)
FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
GROUP BY A.id
We will also check that the winning bid is above the auction reserve. Note that
we ignore the auction opening bid value since it has no impact on which bid
eventually wins, if any.
Our implementation will use a custom windowing function in order to bring bids
and auctions together without requiring global state.
"""
from __future__ import absolute_import
import apache_beam as beam
from apache_beam.coders import coder_impl
from apache_beam.coders.coders import FastCoder
from apache_beam.testing.benchmarks.nexmark.models import auction_bid
from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import WindowFn
from apache_beam.utils.timestamp import Duration
[docs]class AuctionOrBidWindow(IntervalWindow):
"""Windows for open auctions and bids."""
def __init__(self, start, end, auction_id, is_auction_window):
super(AuctionOrBidWindow, self).__init__(start, end)
self.auction = auction_id
self.is_auction_window = is_auction_window
[docs] @staticmethod
def for_auction(timestamp, auction):
return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True)
[docs] @staticmethod
def for_bid(expected_duration_micro, timestamp, bid):
return AuctionOrBidWindow(
timestamp,
timestamp + Duration(micros=expected_duration_micro * 2),
bid.auction,
False)
[docs] def is_auction_window_fn(self):
return self.is_auction_window
def __str__(self):
return (
'AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' %
(self.start, self.end, self.auction, self.is_auction_window))
[docs]class AuctionOrBidWindowCoder(FastCoder):
def _create_impl(self):
return AuctionOrBidWindowCoderImpl()
[docs] def is_deterministic(self):
return True
[docs]class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl):
_super_coder_impl = coder_impl.IntervalWindowCoderImpl()
_id_coder_impl = coder_impl.VarIntCoderImpl()
_bool_coder_impl = coder_impl.BooleanCoderImpl()
[docs] def encode_to_stream(self, value, stream, nested):
self._super_coder_impl.encode_to_stream(value, stream, True)
self._id_coder_impl.encode_to_stream(value.auction, stream, True)
self._bool_coder_impl.encode_to_stream(
value.is_auction_window, stream, True)
[docs] def decode_from_stream(self, stream, nested):
super_window = self._super_coder_impl.decode_from_stream(stream, True)
auction = self._id_coder_impl.decode_from_stream(stream, True)
is_auction = self._bool_coder_impl.decode_from_stream(stream, True)
return AuctionOrBidWindow(
super_window.start, super_window.end, auction, is_auction)
[docs]class AuctionOrBidWindowFn(WindowFn):
def __init__(self, expected_duration_micro):
self.expected_duration = expected_duration_micro
[docs] def assign(self, assign_context):
event = assign_context.element
if isinstance(event, nexmark_model.Auction):
return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)]
elif isinstance(event, nexmark_model.Bid):
return [
AuctionOrBidWindow.for_bid(
self.expected_duration, assign_context.timestamp, event)
]
else:
raise ValueError(
'%s can only assign windows to auctions and bids, but received %s' %
(self.__class__.__name__, event))
[docs] def merge(self, merge_context):
auction_id_to_auction_window = {}
auction_id_to_bid_window = {}
for window in merge_context.windows:
if window.is_auction_window_fn():
auction_id_to_auction_window[window.auction] = window
else:
if window.auction not in auction_id_to_bid_window:
auction_id_to_bid_window[window.auction] = []
auction_id_to_bid_window[window.auction].append(window)
for auction, auction_window in auction_id_to_auction_window.items():
bid_window_list = auction_id_to_bid_window.get(auction)
if bid_window_list is not None:
to_merge = []
for bid_window in bid_window_list:
if bid_window.start < auction_window.end:
to_merge.append(bid_window)
if len(to_merge) > 0:
to_merge.append(auction_window)
merge_context.merge(to_merge, auction_window)
[docs] def get_window_coder(self):
return AuctionOrBidWindowCoder()
[docs]class JoinAuctionBidFn(beam.DoFn):
[docs] @staticmethod
def higher_bid(bid, other):
if bid.price > other.price:
return True
elif bid.price < other.price:
return False
else:
return bid.date_time < other.date_time
[docs] def process(self, element):
_, group = element
auctions = group[nexmark_query_util.AUCTION_TAG]
auction = auctions[0] if auctions else None
if auction is None:
return
best_bid = None
for bid in group[nexmark_query_util.BID_TAG]:
if bid.price < auction.reserve:
continue
if best_bid is None or JoinAuctionBidFn.higher_bid(bid, best_bid):
best_bid = bid
if best_bid:
yield auction_bid.AuctionBid(auction, best_bid)
[docs]class WinningBids(beam.PTransform):
def __init__(self):
#TODO: change this to be calculated by event generation
expected_duration = 16667000
self.auction_or_bid_windowFn = AuctionOrBidWindowFn(expected_duration)
[docs] def expand(self, pcoll):
events = pcoll | beam.WindowInto(self.auction_or_bid_windowFn)
auction_by_id = (
events
| nexmark_query_util.JustAuctions()
| 'auction_by_id' >> beam.ParDo(nexmark_query_util.AuctionByIdFn()))
bids_by_auction_id = (
events
| nexmark_query_util.JustBids()
| 'bid_by_auction' >> beam.ParDo(nexmark_query_util.BidByAuctionIdFn()))
return ({
nexmark_query_util.AUCTION_TAG: auction_by_id,
nexmark_query_util.BID_TAG: bids_by_auction_id
}
| beam.CoGroupByKey()
| beam.ParDo(JoinAuctionBidFn()))