#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Query 8, 'Monitor New Users'. Select people who have entered the system and
created auctions in the last 12 hours, updated every 12 hours. In CQL syntax::
SELECT Rstream(P.id, P.name, A.reserve)
FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A
WHERE P.id = A.seller;
To make things a bit more dynamic and easier to test we'll use a much
shorter window.
"""
import apache_beam as beam
from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
from apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import ResultNames
from apache_beam.transforms import window
[docs]
def load(events, metadata=None, pipeline_options=None):
# window person and key by persons' id
persons_by_id = (
events
| nexmark_query_util.JustPerson()
| 'query8_window_person' >> beam.WindowInto(
window.FixedWindows(metadata.get('window_size_sec')))
| 'query8_person_by_id' >> beam.ParDo(nexmark_query_util.PersonByIdFn()))
# window auction and key by auctions' seller
auctions_by_seller = (
events
| nexmark_query_util.JustAuctions()
| 'query8_window_auction' >> beam.WindowInto(
window.FixedWindows(metadata.get('window_size_sec')))
| 'query8_auction_by_seller' >> beam.ParDo(
nexmark_query_util.AuctionBySellerFn()))
return ({
nexmark_query_util.PERSON_TAG: persons_by_id,
nexmark_query_util.AUCTION_TAG: auctions_by_seller
}
| beam.CoGroupByKey()
| 'query8_join' >> beam.ParDo(JoinPersonAuctionFn()))
[docs]
class JoinPersonAuctionFn(beam.DoFn):
[docs]
def process(self, element):
_, group = element
persons = group[nexmark_query_util.PERSON_TAG]
person = persons[0] if persons else None
if person is None:
# do nothing if this seller id is not a new person in this window
return
for auction in group[nexmark_query_util.AUCTION_TAG]:
yield {
ResultNames.ID: person.id,
ResultNames.NAME: person.name,
ResultNames.RESERVE: auction.reserve
}