#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Nexmark model.
The nexmark suite is a series of queries (streaming pipelines) performed
on a simulation of auction events. The model includes the three roles that
generate events:
  - The person who starts and auction or makes a bid (Person).
  - The auction item (Auction).
  - The bid on an item for auction (Bid).
"""
from apache_beam.coders import coder_impl
from apache_beam.coders.coders import FastCoder
from apache_beam.coders.coders import StrUtf8Coder
from apache_beam.testing.benchmarks.nexmark import nexmark_util
[docs]class PersonCoder(FastCoder):
[docs]  def to_type_hint(self):
    return Person 
  def _create_impl(self):
    return PersonCoderImpl()
[docs]  def is_deterministic(self):
    return True  
[docs]class Person(object):
  "Author of an auction or a bid."
  CODER = PersonCoder()
  def __init__(
      self, id, name, email, credit_card, city, state, date_time, extra=None):
    self.id = id
    self.name = name
    self.email_address = email  # key
    self.credit_card = credit_card
    self.city = city
    self.state = state
    self.date_time = date_time
    self.extra = extra
  def __repr__(self):
    return nexmark_util.model_to_json(self) 
[docs]class AuctionCoder(FastCoder):
[docs]  def to_type_hint(self):
    return Auction 
  def _create_impl(self):
    return AuctionCoderImpl()
[docs]  def is_deterministic(self):
    return True  
[docs]class Auction(object):
  "Item for auction."
  CODER = AuctionCoder()
  def __init__(
      self,
      id,
      item_name,
      description,
      initial_bid,
      reserve_price,
      date_time,
      expires,
      seller,
      category,
      extra=None):
    self.id = id
    self.item_name = item_name  # key
    self.description = description
    self.initial_bid = initial_bid
    self.reserve = reserve_price
    self.date_time = date_time
    self.expires = expires
    self.seller = seller
    self.category = category
    self.extra = extra
  def __repr__(self):
    return nexmark_util.model_to_json(self) 
[docs]class BidCoder(FastCoder):
[docs]  def to_type_hint(self):
    return Bid 
  def _create_impl(self):
    return BidCoderImpl()
[docs]  def is_deterministic(self):
    return True  
[docs]class Bid(object):
  "A bid for an item for auction."
  CODER = BidCoder()
  def __init__(self, auction, bidder, price, date_time, extra=None):
    self.auction = auction  # key
    self.bidder = bidder
    self.price = price
    self.date_time = date_time
    self.extra = extra
  def __repr__(self):
    return nexmark_util.model_to_json(self) 
[docs]class AuctionCoderImpl(coder_impl.StreamCoderImpl):
  _int_coder_impl = coder_impl.VarIntCoderImpl()
  _str_coder_impl = StrUtf8Coder().get_impl()
  _time_coder_impl = coder_impl.TimestampCoderImpl()
[docs]  def encode_to_stream(self, value, stream, nested):
    self._int_coder_impl.encode_to_stream(value.id, stream, True)
    self._str_coder_impl.encode_to_stream(value.item_name, stream, True)
    self._str_coder_impl.encode_to_stream(value.description, stream, True)
    self._int_coder_impl.encode_to_stream(value.initial_bid, stream, True)
    self._int_coder_impl.encode_to_stream(value.reserve, stream, True)
    self._time_coder_impl.encode_to_stream(value.date_time, stream, True)
    self._time_coder_impl.encode_to_stream(value.expires, stream, True)
    self._int_coder_impl.encode_to_stream(value.seller, stream, True)
    self._int_coder_impl.encode_to_stream(value.category, stream, True)
    self._str_coder_impl.encode_to_stream(value.extra, stream, True) 
[docs]  def decode_from_stream(self, stream, nested):
    id = self._int_coder_impl.decode_from_stream(stream, True)
    item_name = self._str_coder_impl.decode_from_stream(stream, True)
    description = self._str_coder_impl.decode_from_stream(stream, True)
    initial_bid = self._int_coder_impl.decode_from_stream(stream, True)
    reserve = self._int_coder_impl.decode_from_stream(stream, True)
    date_time = self._time_coder_impl.decode_from_stream(stream, True)
    expires = self._time_coder_impl.decode_from_stream(stream, True)
    seller = self._int_coder_impl.decode_from_stream(stream, True)
    category = self._int_coder_impl.decode_from_stream(stream, True)
    extra = self._str_coder_impl.decode_from_stream(stream, True)
    return Auction(
        id,
        item_name,
        description,
        initial_bid,
        reserve,
        date_time,
        expires,
        seller,
        category,
        extra)  
[docs]class BidCoderImpl(coder_impl.StreamCoderImpl):
  _int_coder_impl = coder_impl.VarIntCoderImpl()
  _str_coder_impl = StrUtf8Coder().get_impl()
  _time_coder_impl = coder_impl.TimestampCoderImpl()
[docs]  def encode_to_stream(self, value, stream, nested):
    self._int_coder_impl.encode_to_stream(value.auction, stream, True)
    self._int_coder_impl.encode_to_stream(value.bidder, stream, True)
    self._int_coder_impl.encode_to_stream(value.price, stream, True)
    self._time_coder_impl.encode_to_stream(value.date_time, stream, True)
    self._str_coder_impl.encode_to_stream(value.extra, stream, True) 
[docs]  def decode_from_stream(self, stream, nested):
    auction = self._int_coder_impl.decode_from_stream(stream, True)
    bidder = self._int_coder_impl.decode_from_stream(stream, True)
    price = self._int_coder_impl.decode_from_stream(stream, True)
    date_time = self._time_coder_impl.decode_from_stream(stream, True)
    extra = self._str_coder_impl.decode_from_stream(stream, True)
    return Bid(auction, bidder, price, date_time, extra)  
[docs]class PersonCoderImpl(coder_impl.StreamCoderImpl):
  _int_coder_impl = coder_impl.VarIntCoderImpl()
  _str_coder_impl = StrUtf8Coder().get_impl()
  _time_coder_impl = coder_impl.TimestampCoderImpl()
[docs]  def encode_to_stream(self, value, stream, nested):
    self._int_coder_impl.encode_to_stream(value.id, stream, True)
    self._str_coder_impl.encode_to_stream(value.name, stream, True)
    self._str_coder_impl.encode_to_stream(value.email_address, stream, True)
    self._str_coder_impl.encode_to_stream(value.credit_card, stream, True)
    self._str_coder_impl.encode_to_stream(value.city, stream, True)
    self._str_coder_impl.encode_to_stream(value.state, stream, True)
    self._time_coder_impl.encode_to_stream(value.date_time, stream, True)
    self._str_coder_impl.encode_to_stream(value.extra, stream, True) 
[docs]  def decode_from_stream(self, stream, nested):
    id = self._int_coder_impl.decode_from_stream(stream, True)
    name = self._str_coder_impl.decode_from_stream(stream, True)
    email = self._str_coder_impl.decode_from_stream(stream, True)
    credit_card = self._str_coder_impl.decode_from_stream(stream, True)
    city = self._str_coder_impl.decode_from_stream(stream, True)
    state = self._str_coder_impl.decode_from_stream(stream, True)
    date_time = self._time_coder_impl.decode_from_stream(stream, True)
    extra = self._str_coder_impl.decode_from_stream(stream, True)
    return Person(id, name, email, credit_card, city, state, date_time, extra)