Source code for apache_beam.testing.benchmarks.nexmark.queries.query8

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Query 8, 'Monitor New Users'. Select people who have entered the system and
created auctions in the last 12 hours, updated every 12 hours. In CQL syntax::

  SELECT Rstream(P.id, P.name, A.reserve)
  FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A
  WHERE P.id = A.seller;

To make things a bit more dynamic and easier to test we'll use a much
shorter window.
"""

import apache_beam as beam
from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
from apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import ResultNames
from apache_beam.transforms import window


[docs] def load(events, metadata=None, pipeline_options=None): # window person and key by persons' id persons_by_id = ( events | nexmark_query_util.JustPerson() | 'query8_window_person' >> beam.WindowInto( window.FixedWindows(metadata.get('window_size_sec'))) | 'query8_person_by_id' >> beam.ParDo(nexmark_query_util.PersonByIdFn())) # window auction and key by auctions' seller auctions_by_seller = ( events | nexmark_query_util.JustAuctions() | 'query8_window_auction' >> beam.WindowInto( window.FixedWindows(metadata.get('window_size_sec'))) | 'query8_auction_by_seller' >> beam.ParDo( nexmark_query_util.AuctionBySellerFn())) return ({ nexmark_query_util.PERSON_TAG: persons_by_id, nexmark_query_util.AUCTION_TAG: auctions_by_seller } | beam.CoGroupByKey() | 'query8_join' >> beam.ParDo(JoinPersonAuctionFn()))
[docs] class JoinPersonAuctionFn(beam.DoFn):
[docs] def process(self, element): _, group = element persons = group[nexmark_query_util.PERSON_TAG] person = persons[0] if persons else None if person is None: # do nothing if this seller id is not a new person in this window return for auction in group[nexmark_query_util.AUCTION_TAG]: yield { ResultNames.ID: person.id, ResultNames.NAME: person.name, ResultNames.RESERVE: auction.reserve }