apache_beam.testing.benchmarks.nexmark.queries.query3 module

Query 3, ‘Local Item Suggestion’. Who is selling in OR, ID or CA in category 10, and for what auction ids? In CQL syntax:

SELECT Istream(P.name, P.city, P.state, A.id)
FROM Auction A [ROWS UNBOUNDED], Person P [ROWS UNBOUNDED]
WHERE A.seller = P.id
  AND (P.state = `OR' OR P.state = `ID' OR P.state = `CA')
  AND A.category = 10;

We’ll implement this query to allow ‘new auction’ events to come before the ‘new person’ events for the auction seller. Those auctions will be stored until the matching person is seen. Then all subsequent auctions for a person will use the stored person record.

apache_beam.testing.benchmarks.nexmark.queries.query3.load(events, metadata=None, pipeline_options=None)[source]
class apache_beam.testing.benchmarks.nexmark.queries.query3.JoinFn(max_auction_wait_time)[source]

Bases: apache_beam.transforms.core.DoFn

Join auctions and person by person id and emit their product one pair at a time.

We know a person may submit any number of auctions. Thus new person event must have the person record stored in persistent state in order to match future auctions by that person.

However we know that each auction is associated with at most one person, so only need to store auction records in persistent state until we have seen the corresponding person record. And of course may have already seen that record.

AUCTIONS = 'auctions_state'
PERSON = 'person_state'
PERSON_EXPIRING = 'person_state_expiring'
auction_spec = BagStateSpec(auctions_state)
person_spec = ReadModifyWriteStateSpec(person_state)
person_timer_spec = TimerSpec(ts-person_state_expiring)
process(element: Tuple[str, Dict[str, Union[List[apache_beam.testing.benchmarks.nexmark.models.nexmark_model.Auction], List[apache_beam.testing.benchmarks.nexmark.models.nexmark_model.Person]]]], auction_state=StateParam(auctions_state), person_state=StateParam(person_state), person_timer=TimerParam(ts-person_state_expiring))[source]
expiry(person_state=StateParam(person_state))[source]