Source code for apache_beam.testing.benchmarks.nexmark.queries.winning_bids

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

A transform to find winning bids for each closed auction. In pseudo CQL syntax:

SELECT Rstream(A.*,, B.bidder, MAX(B.price), B.dateTime)
WHERE = AND B.datetime < A.expires AND A.expires < CURRENT_TIME

We will also check that the winning bid is above the auction reserve. Note that
we ignore the auction opening bid value since it has no impact on which bid
eventually wins, if any.

Our implementation will use a custom windowing function in order to bring bids
and auctions together without requiring global state.

import apache_beam as beam
from apache_beam.coders import coder_impl
from apache_beam.coders.coders import FastCoder
from apache_beam.testing.benchmarks.nexmark.models import auction_bid
from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import WindowFn
from apache_beam.utils.timestamp import Duration

[docs]class AuctionOrBidWindow(IntervalWindow): """Windows for open auctions and bids.""" def __init__(self, start, end, auction_id, is_auction_window): super().__init__(start, end) = auction_id self.is_auction_window = is_auction_window
[docs] @staticmethod def for_auction(timestamp, auction): return AuctionOrBidWindow(timestamp, auction.expires,, True)
[docs] @staticmethod def for_bid(expected_duration_micro, timestamp, bid): return AuctionOrBidWindow( timestamp, timestamp + Duration(micros=expected_duration_micro * 2),, False)
[docs] def is_auction_window_fn(self): return self.is_auction_window
def __str__(self): return ( 'AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' % (self.start, self.end,, self.is_auction_window))
[docs]class AuctionOrBidWindowCoder(FastCoder): def _create_impl(self): return AuctionOrBidWindowCoderImpl()
[docs] def is_deterministic(self): return True
[docs]class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl): _super_coder_impl = coder_impl.IntervalWindowCoderImpl() _id_coder_impl = coder_impl.VarIntCoderImpl() _bool_coder_impl = coder_impl.BooleanCoderImpl()
[docs] def encode_to_stream(self, value, stream, nested): self._super_coder_impl.encode_to_stream(value, stream, True) self._id_coder_impl.encode_to_stream(, stream, True) self._bool_coder_impl.encode_to_stream( value.is_auction_window, stream, True)
[docs] def decode_from_stream(self, stream, nested): super_window = self._super_coder_impl.decode_from_stream(stream, True) auction = self._id_coder_impl.decode_from_stream(stream, True) is_auction = self._bool_coder_impl.decode_from_stream(stream, True) return AuctionOrBidWindow( super_window.start, super_window.end, auction, is_auction)
[docs]class AuctionOrBidWindowFn(WindowFn): def __init__(self, expected_duration_micro): self.expected_duration = expected_duration_micro
[docs] def assign(self, assign_context): event = assign_context.element if isinstance(event, nexmark_model.Auction): return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)] elif isinstance(event, nexmark_model.Bid): return [ AuctionOrBidWindow.for_bid( self.expected_duration, assign_context.timestamp, event) ] else: raise ValueError( '%s can only assign windows to auctions and bids, but received %s' % (self.__class__.__name__, event))
[docs] def merge(self, merge_context): auction_id_to_auction_window = {} auction_id_to_bid_window = {} for window in if window.is_auction_window_fn(): auction_id_to_auction_window[] = window else: if not in auction_id_to_bid_window: auction_id_to_bid_window[] = [] auction_id_to_bid_window[].append(window) for auction, auction_window in auction_id_to_auction_window.items(): bid_window_list = auction_id_to_bid_window.get(auction) if bid_window_list is not None: to_merge = [] for bid_window in bid_window_list: if bid_window.start < auction_window.end: to_merge.append(bid_window) if len(to_merge) > 0: to_merge.append(auction_window) merge_context.merge(to_merge, auction_window)
[docs] def get_window_coder(self): return AuctionOrBidWindowCoder()
[docs] def get_transformed_output_time(self, window, input_timestamp): return window.max_timestamp()
[docs]class JoinAuctionBidFn(beam.DoFn):
[docs] @staticmethod def higher_bid(bid, other): if bid.price > other.price: return True elif bid.price < other.price: return False else: return bid.date_time < other.date_time
[docs] def process(self, element): _, group = element auctions = group[nexmark_query_util.AUCTION_TAG] auction = auctions[0] if auctions else None if auction is None: return best_bid = None for bid in group[nexmark_query_util.BID_TAG]: if bid.price < auction.reserve: continue if best_bid is None or JoinAuctionBidFn.higher_bid(bid, best_bid): best_bid = bid if best_bid: yield auction_bid.AuctionBid(auction, best_bid)
[docs]class WinningBids(beam.PTransform): def __init__(self): #TODO: change this to be calculated by event generation expected_duration = 16667000 self.auction_or_bid_windowFn = AuctionOrBidWindowFn(expected_duration)
[docs] def expand(self, pcoll): events = pcoll | beam.WindowInto(self.auction_or_bid_windowFn) auction_by_id = ( events | nexmark_query_util.JustAuctions() | 'auction_by_id' >> beam.ParDo(nexmark_query_util.AuctionByIdFn())) bids_by_auction_id = ( events | nexmark_query_util.JustBids() | 'bid_by_auction' >> beam.ParDo(nexmark_query_util.BidByAuctionIdFn())) return ({ nexmark_query_util.AUCTION_TAG: auction_by_id, nexmark_query_util.BID_TAG: bids_by_auction_id } | beam.CoGroupByKey() | beam.ParDo(JoinAuctionBidFn()))