Source code for apache_beam.testing.benchmarks.nexmark.queries.query8

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

Query 8, 'Monitor New Users'. Select people who have entered the system and
created auctions in the last 12 hours, updated every 12 hours. In CQL syntax::

  SELECT Rstream(,, A.reserve)
  FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A
  WHERE = A.seller;

To make things a bit more dynamic and easier to test we'll use a much
shorter window.

import apache_beam as beam
from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
from apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import ResultNames
from apache_beam.transforms import window

[docs]def load(events, metadata=None, pipeline_options=None): # window person and key by persons' id persons_by_id = ( events | nexmark_query_util.JustPerson() | 'query8_window_person' >> beam.WindowInto( window.FixedWindows(metadata.get('window_size_sec'))) | 'query8_person_by_id' >> beam.ParDo(nexmark_query_util.PersonByIdFn())) # window auction and key by auctions' seller auctions_by_seller = ( events | nexmark_query_util.JustAuctions() | 'query8_window_auction' >> beam.WindowInto( window.FixedWindows(metadata.get('window_size_sec'))) | 'query8_auction_by_seller' >> beam.ParDo( nexmark_query_util.AuctionBySellerFn())) return ({ nexmark_query_util.PERSON_TAG: persons_by_id, nexmark_query_util.AUCTION_TAG: auctions_by_seller } | beam.CoGroupByKey() | 'query8_join' >> beam.ParDo(JoinPersonAuctionFn()))
[docs]class JoinPersonAuctionFn(beam.DoFn):
[docs] def process(self, element): _, group = element persons = group[nexmark_query_util.PERSON_TAG] person = persons[0] if persons else None if person is None: # do nothing if this seller id is not a new person in this window return for auction in group[nexmark_query_util.AUCTION_TAG]: yield { ResultNames.ID:, ResultNames.NAME:, ResultNames.RESERVE: auction.reserve }