# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import typing
from apache_beam.io.gcp.pubsub import PubsubMessage
# This file contains the input data to be requested by the example tests, if
# needed.
[docs]
def text_data():
  return '\n'.join([
      "Fool\tThou shouldst not have been old till thou hadst",
      "\tbeen wise.",
      "KING LEAR\tNothing will come of nothing: speak again.",
      "\tNever, never, never, never, never!"
  ]) 
[docs]
def word_count_jinja_parameter_data():
  params = {
      "readFromTextTransform": {
          "path": "gs://dataflow-samples/shakespeare/kinglear.txt"
      },
      "mapToFieldsSplitConfig": {
          "language": "python", "fields": {
              "value": "1"
          }
      },
      "explodeTransform": {
          "fields": "word"
      },
      "combineTransform": {
          "group_by": "word", "combine": {
              "value": "sum"
          }
      },
      "mapToFieldsCountConfig": {
          "language": "python",
          "fields": {
              "output": "word + \" - \" + str(value)"
          }
      },
      "writeToTextTransform": {
          "path": "gs://apache-beam-testing-derrickaw/wordCounts/"
      }
  }
  return json.dumps(params) 
[docs]
def word_count_jinja_template_data(test_name: str) -> list[str]:
  if test_name == 'test_wordCountInclude_yaml':
    return [
        'apache_beam/yaml/examples/transforms/jinja/'
        'include/submodules/readFromTextTransform.yaml',
        'apache_beam/yaml/examples/transforms/jinja/'
        'include/submodules/mapToFieldsSplitConfig.yaml',
        'apache_beam/yaml/examples/transforms/jinja/'
        'include/submodules/explodeTransform.yaml',
        'apache_beam/yaml/examples/transforms/jinja/'
        'include/submodules/combineTransform.yaml',
        'apache_beam/yaml/examples/transforms/jinja/'
        'include/submodules/mapToFieldsCountConfig.yaml',
        'apache_beam/yaml/examples/transforms/jinja/'
        'include/submodules/writeToTextTransform.yaml'
    ]
  elif test_name == 'test_wordCountImport_yaml':
    return [
        'apache_beam/yaml/examples/transforms/jinja/'
        'import/macros/wordCountMacros.yaml'
    ]
  return [] 
[docs]
def iceberg_dynamic_destinations_users_data():
  return [{
      'id': 3, 'name': 'Smith', 'email': 'smith@example.com', 'zip': 'NY'
  },
          {
              'id': 4,
              'name': 'Beamberg',
              'email': 'beamberg@example.com',
              'zip': 'NY'
          }] 
[docs]
def products_csv():
  return '\n'.join([
      'transaction_id,product_name,category,price',
      'T0012,Headphones,Electronics,59.99',
      'T5034,Leather Jacket,Apparel,109.99',
      'T0024,Aluminum Mug,Kitchen,29.99',
      'T0104,Headphones,Electronics,59.99',
      'T0302,Monitor,Electronics,249.99'
  ]) 
[docs]
def spanner_orders_data():
  return [{
      'order_id': 1,
      'customer_id': 1001,
      'product_id': 2001,
      'order_date': '24-03-24',
      'order_amount': 150,
  },
          {
              'order_id': 2,
              'customer_id': 1002,
              'product_id': 2002,
              'order_date': '19-04-24',
              'order_amount': 90,
          },
          {
              'order_id': 3,
              'customer_id': 1003,
              'product_id': 2003,
              'order_date': '7-05-24',
              'order_amount': 110,
          }] 
[docs]
def shipments_data():
  return [{
      'shipment_id': 'S1',
      'customer_id': 'C1',
      'shipment_date': '2023-05-01',
      'shipment_cost': 150.0,
      'customer_name': 'Alice',
      'customer_email': 'alice@example.com'
  },
          {
              'shipment_id': 'S2',
              'customer_id': 'C2',
              'shipment_date': '2023-06-12',
              'shipment_cost': 300.0,
              'customer_name': 'Bob',
              'customer_email': 'bob@example.com'
          },
          {
              'shipment_id': 'S3',
              'customer_id': 'C1',
              'shipment_date': '2023-05-10',
              'shipment_cost': 20.0,
              'customer_name': 'Alice',
              'customer_email': 'alice@example.com'
          },
          {
              'shipment_id': 'S4',
              'customer_id': 'C4',
              'shipment_date': '2024-07-01',
              'shipment_cost': 150.0,
              'customer_name': 'Derek',
              'customer_email': 'derek@example.com'
          },
          {
              'shipment_id': 'S5',
              'customer_id': 'C5',
              'shipment_date': '2023-05-09',
              'shipment_cost': 300.0,
              'customer_name': 'Erin',
              'customer_email': 'erin@example.com'
          },
          {
              'shipment_id': 'S6',
              'customer_id': 'C4',
              'shipment_date': '2024-07-02',
              'shipment_cost': 150.0,
              'customer_name': 'Derek',
              'customer_email': 'derek@example.com'
          }] 
[docs]
def bigtable_data():
  return [{
      'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2'
  }, {
      'product_id': '2', 'product_name': 'pixel 6', 'product_stock': '4'
  }, {
      'product_id': '3', 'product_name': 'pixel 7', 'product_stock': '20'
  }, {
      'product_id': '4', 'product_name': 'pixel 8', 'product_stock': '10'
  }, {
      'product_id': '5', 'product_name': 'pixel 11', 'product_stock': '3'
  }, {
      'product_id': '6', 'product_name': 'pixel 12', 'product_stock': '7'
  }, {
      'product_id': '7', 'product_name': 'pixel 13', 'product_stock': '8'
  }, {
      'product_id': '8', 'product_name': 'pixel 14', 'product_stock': '3'
  }] 
[docs]
def bigquery_data():
  return [{
      'customer_id': 1001,
      'customer_name': 'Alice',
      'customer_email': 'alice@gmail.com'
  },
          {
              'customer_id': 1002,
              'customer_name': 'Bob',
              'customer_email': 'bob@gmail.com'
          },
          {
              'customer_id': 1003,
              'customer_name': 'Claire',
              'customer_email': 'claire@gmail.com'
          }] 
[docs]
def pubsub_messages_data():
  """
  Provides a list of PubsubMessage objects for testing.
  """
  return [
      PubsubMessage(data=b"{\"label\": \"37a\", \"rank\": 1}", attributes={}),
      PubsubMessage(data=b"{\"label\": \"37b\", \"rank\": 4}", attributes={}),
      PubsubMessage(data=b"{\"label\": \"37c\", \"rank\": 3}", attributes={}),
      PubsubMessage(data=b"{\"label\": \"37d\", \"rank\": 2}", attributes={}),
  ] 
[docs]
def pubsub_taxi_ride_events_data():
  """
  Provides a list of PubsubMessage objects for testing taxi ride events.
  """
  return [
      PubsubMessage(
          data=b"{\"ride_id\": \"1\", \"longitude\": 11.0, \"latitude\": -11.0,"
          b"\"passenger_count\": 1, \"meter_reading\": 100.0, \"timestamp\": "
          b"\"2025-01-01T00:29:00.00000-04:00\", \"ride_status\": \"pickup\"}",
          attributes={}),
      PubsubMessage(
          data=b"{\"ride_id\": \"2\", \"longitude\": 22.0, \"latitude\": -22.0,"
          b"\"passenger_count\": 2, \"meter_reading\": 100.0, \"timestamp\": "
          b"\"2025-01-01T00:30:00.00000-04:00\", \"ride_status\": \"pickup\"}",
          attributes={}),
      PubsubMessage(
          data=b"{\"ride_id\": \"1\", \"longitude\": 13.0, \"latitude\": -13.0,"
          b"\"passenger_count\": 1, \"meter_reading\": 100.0, \"timestamp\": "
          b"\"2025-01-01T00:31:00.00000-04:00\", \"ride_status\": \"enroute\"}",
          attributes={}),
      PubsubMessage(
          data=b"{\"ride_id\": \"2\", \"longitude\": 24.0, \"latitude\": -24.0,"
          b"\"passenger_count\": 2, \"meter_reading\": 100.0, \"timestamp\": "
          b"\"2025-01-01T00:32:00.00000-04:00\", \"ride_status\": \"enroute\"}",
          attributes={}),
      PubsubMessage(
          data=b"{\"ride_id\": \"3\", \"longitude\": 33.0, \"latitude\": -33.0,"
          b"\"passenger_count\": 3, \"meter_reading\": 100.0, \"timestamp\": "
          b"\"2025-01-01T00:35:00.00000-04:00\", \"ride_status\": \"enroute\"}",
          attributes={}),
      PubsubMessage(
          data=b"{\"ride_id\": \"4\", \"longitude\": 44.0, \"latitude\": -44.0,"
          b"\"passenger_count\": 4, \"meter_reading\": 100.0, \"timestamp\": "
          b"\"2025-01-01T00:35:00.00000-04:00\", \"ride_status\": \"dropoff\"}",
          attributes={}),
      PubsubMessage(
          data=b"{\"ride_id\": \"1\", \"longitude\": 15.0, \"latitude\": -15.0,"
          b"\"passenger_count\": 1, \"meter_reading\": 100.0, \"timestamp\": "
          b"\"2025-01-01T00:33:00.00000-04:00\", \"ride_status\": \"dropoff\"}",
          attributes={}),
      PubsubMessage(
          data=b"{\"ride_id\": \"2\", \"longitude\": 26.0, \"latitude\": -26.0,"
          b"\"passenger_count\": 2, \"meter_reading\": 100.0, \"timestamp\": "
          b"\"2025-01-01T00:34:00.00000-04:00\", \"ride_status\": \"dropoff\"}",
          attributes={}),
  ] 
[docs]
def kafka_messages_data():
  """
  Provides a list of Kafka messages for testing.
  """
  return [data.encode('utf-8') for data in text_data().split('\n')] 
[docs]
class TaxiRideEventSchema(typing.NamedTuple):
  ride_id: str
  longitude: float
  latitude: float
  passenger_count: int
  meter_reading: float
  timestamp: str
  ride_status: str 
[docs]
def system_logs_csv():
  return '\n'.join([
      'LineId,Date,Time,Level,Process,Component,Content',
      '1,2024-10-01,12:00:00,INFO,Main,ComponentA,System started successfully',
      '2,2024-10-01,12:00:05,WARN,Main,ComponentA,Memory usage is high',
      '3,2024-10-01,12:00:10,ERROR,Main,ComponentA,Task failed due to timeout',
  ]) 
[docs]
def system_logs_data():
  csv_data = system_logs_csv()
  lines = csv_data.strip().split('\n')
  headers = lines[0].split(',')
  logs = []
  for row in lines[1:]:
    values = row.split(',')
    log = dict(zip(headers, values))
    log['LineId'] = int(log['LineId'])
    logs.append(log)
  return logs 
[docs]
def embedding_data():
  return [0.1, 0.2, 0.3, 0.4, 0.5] 
[docs]
def system_logs_embedding_data():
  csv_data = system_logs_csv()
  lines = csv_data.strip().split('\n')
  headers = lines[0].split(',')
  headers.append('embedding')
  logs = []
  for row in lines[1:]:
    values = row.split(',')
    values.append(embedding_data())
    log = dict(zip(headers, values))
    log['LineId'] = int(log['LineId'])
    logs.append(log)
  return logs