Beam YAML Examples

Example pipelines using the Beam YAML API. These examples can also be found on github.

Note: These examples below are automatically tested for correctness and may be used as a starting point for your own pipelines.


Wordcount Minimal

This examples reads from a public file stored on Google Cloud. This requires authenticating with Google Cloud, or setting the file in ReadFromText to a local file.

To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc.

This pipeline reads in a text file, counts distinct words found in the text, then logs a row containing each word and its count.

pipeline:
  type: chain
  transforms:

    # Read text file into a collection of rows, each with one field, "line"
    - type: ReadFromText
      config:
        path: gs://dataflow-samples/shakespeare/kinglear.txt

    # Split line field in each row into list of words
    - type: MapToFields
      config:
        language: python
        fields:
          words:
            callable: |
              import re
              def my_mapping(row):
                return re.findall(r"[A-Za-z\']+", row.line.lower())

    # Explode each list of words into separate rows
    - type: Explode
      config:
        fields: words

    # Since each word is now distinct row, rename field to "word"
    - type: MapToFields
      config:
        fields:
          word: words

    # Group by distinct words in the collection and add field "count" that
    # contains number of instances, or count, for each word in the collection.
    - type: Combine
      config:
        language: python
        group_by: word
        combine:
          count:
            value: word
            fn: count

    # Log out results
    - type: LogForTesting

# Expected:
#  Row(word='king', count=311)
#  Row(word='lear', count=253)
#  Row(word='dramatis', count=1)
#  Row(word='personae', count=1)
#  Row(word='of', count=483)
#  Row(word='britain', count=2)
#  Row(word='france', count=32)
#  Row(word='duke', count=26)
#  Row(word='burgundy', count=20)
#  Row(word='cornwall', count=75)

Aggregation


Combine Count Minimal

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Create produce names
      config:
        elements:
          - season: 'spring'
            produce: '๐Ÿ“'
          - season: 'spring'
            produce: '๐Ÿฅ•'
          - season: 'summer'
            produce: '๐Ÿฅ•'
          - season: 'fall'
            produce: '๐Ÿฅ•'
          - season: 'spring'
            produce: '๐Ÿ†'
          - season: 'winter'
            produce: '๐Ÿ†'
          - season: 'spring'
            produce: '๐Ÿ…'
          - season: 'summer'
            produce: '๐Ÿ…'
          - season: 'fall'
            produce: '๐Ÿ…'
          - season: 'summer'
            produce: '๐ŸŒฝ'
    - type: Combine
      name: Shortest names per key
      config:
        language: python
        group_by: season
        combine:
          produce: count
    - type: LogForTesting

# Expected:
#  Row(season='spring', produce=4)
#  Row(season='summer', produce=3)
#  Row(season='fall', produce=2)
#  Row(season='winter', produce=1)

Combine Max Minimal

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Create produce
      config:
        elements:
          - produce: '๐Ÿฅ•'
            amount: 3
          - produce: '๐Ÿฅ•'
            amount: 2
          - produce: '๐Ÿ†'
            amount: 1
          - produce: '๐Ÿ…'
            amount: 4
          - produce: '๐Ÿ…'
            amount: 5
          - produce: '๐Ÿ…'
            amount: 3
    - type: Combine
      name: Get max value per key
      config:
        language: python
        group_by: produce
        combine:
          amount: max
    - type: LogForTesting

# Expected:
#  Row(produce='๐Ÿฅ•', amount=3)
#  Row(produce='๐Ÿ†', amount=1)
#  Row(produce='๐Ÿ…', amount=5)

Combine Mean Minimal

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Create produce
      config:
        elements:
          - produce: '๐Ÿฅ•'
            amount: 3
          - produce: '๐Ÿฅ•'
            amount: 2
          - produce: '๐Ÿ†'
            amount: 1
          - produce: '๐Ÿ…'
            amount: 4
          - produce: '๐Ÿ…'
            amount: 5
          - produce: '๐Ÿ…'
            amount: 3
    - type: Combine
      name: Get mean value per key
      config:
        language: python
        group_by: produce
        combine:
          amount: mean
    - type: LogForTesting

# Expected:
#  Row(produce='๐Ÿฅ•', amount=2.5)
#  Row(produce='๐Ÿ†', amount=1.0)
#  Row(produce='๐Ÿ…', amount=4.0)

Combine Min Minimal

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Create produce
      config:
        elements:
          - produce: '๐Ÿฅ•'
            amount: 3
          - produce: '๐Ÿฅ•'
            amount: 2
          - produce: '๐Ÿ†'
            amount: 1
          - produce: '๐Ÿ…'
            amount: 4
          - produce: '๐Ÿ…'
            amount: 5
          - produce: '๐Ÿ…'
            amount: 3
    - type: Combine
      name: Get min value per key
      config:
        language: python
        group_by: produce
        combine:
          amount: min
    - type: LogForTesting

# Expected:
#  Row(produce='๐Ÿฅ•', amount=2)
#  Row(produce='๐Ÿ†', amount=1)
#  Row(produce='๐Ÿ…', amount=3)

Combine Multiple Aggregations

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Create produce
      config:
        elements:
          - recipe: 'pie'
            fruit: 'raspberry'
            quantity: 1
            unit_price: 3.50
          - recipe: 'pie'
            fruit: 'blackberry'
            quantity: 1
            unit_price: 4.00
          - recipe: 'pie'
            fruit: 'blueberry'
            quantity: 1
            unit_price: 2.00
          - recipe: 'muffin'
            fruit: 'blueberry'
            quantity: 2
            unit_price: 2.00
          - recipe: 'muffin'
            fruit: 'banana'
            quantity: 3
            unit_price: 1.00
    # Simulates global GroupBy by creating global key
    - type: MapToFields
      name: Add global key
      config:
        append: true
        language: python
        fields:
          global_key: '0'
    - type: Combine
      name: Get multiple aggregations per key
      config:
        language: python
        group_by: global_key
        combine:
          min_price: 
            value: unit_price
            fn: min
          mean_price: 
            value: unit_price
            fn: mean
          max_price: 
            value: unit_price
            fn: max
    # Removes unwanted global key
    - type: MapToFields
      name: Remove global key
      config:
        fields:
          min_price: min_price
          mean_price: mean_price
          max_price: max_price
    - type: LogForTesting

# Expected:
#  Row(min_price=1.0, mean_price=2.5, max_price=4.0)

Combine Sum

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Create produce
      config:
        elements:
          - recipe: 'pie'
            fruit: 'raspberry'
            quantity: 1
            unit_price: 3.50
          - recipe: 'pie'
            fruit: 'blackberry'
            quantity: 1
            unit_price: 4.00
          - recipe: 'pie'
            fruit: 'blueberry'
            quantity: 1
            unit_price: 2.00
          - recipe: 'muffin'
            fruit: 'blueberry'
            quantity: 2
            unit_price: 3.00
          - recipe: 'muffin'
            fruit: 'banana'
            quantity: 3
            unit_price: 1.00
    - type: Combine
      name: Sum values per key
      config:
        language: python
        group_by: fruit
        combine:
          total_quantity: 
            value: quantity
            fn: sum
          mean_price:
            value: unit_price
            fn: mean
    - type: LogForTesting

# Expected:
#  Row(fruit='raspberry', total_quantity=1, mean_price=3.5)
#  Row(fruit='blackberry', total_quantity=1, mean_price=4.0)
#  Row(fruit='blueberry', total_quantity=3, mean_price=2.5)
#  Row(fruit='banana', total_quantity=3, mean_price=1.0)

Combine Sum Minimal

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Create produce
      config:
        elements:
          - produce: '๐Ÿฅ•'
            amount: 3
          - produce: '๐Ÿฅ•'
            amount: 2
          - produce: '๐Ÿ†'
            amount: 1
          - produce: '๐Ÿ…'
            amount: 4
          - produce: '๐Ÿ…'
            amount: 5
          - produce: '๐Ÿ…'
            amount: 3
    - type: Combine
      name: Count elements per key
      config:
        language: python
        group_by: produce
        combine:
          amount: sum
    - type: LogForTesting

# Expected:
#  Row(produce='๐Ÿฅ•', amount=5)
#  Row(produce='๐Ÿ†', amount=1)
#  Row(produce='๐Ÿ…', amount=12)

Combine Sum Windowed

This is an example that illustrates how to use session windows and then extract windowing information for further processing.

pipeline:
  type: chain
  transforms:
    # Create some fake data.
    - type: Create
      name: CreateVisits
      config:
        elements:
          - user: alice
            timestamp: 1
          - user: alice
            timestamp: 3
          - user: bob
            timestamp: 7
          - user: bob
            timestamp: 12
          - user: bob
            timestamp: 20
          - user: alice
            timestamp: 101
          - user: alice
            timestamp: 109
          - user: alice
            timestamp: 115

    # Use the timestamp field as the element timestamp.
    # (Typically this would be assigned by the source.)
    - type: AssignTimestamps
      config:
        timestamp: timestamp

    # Group the data by user for each session window count the number of events
    # in each per session.
    # See https://beam.apache.org/documentation/programming-guide/#session-windows
    - type: Combine
      name: SumVisitsPerUser
      config:
        language: python
        group_by: user
        combine:
          visits:
            value: user
            fn: count
      windowing:
        type: sessions
        gap: 10s

    # Extract the implicit Beam windowing data (including what the final
    # merged session values were) into explicit fields of our rows.
    - type: ExtractWindowingInfo
      config:
        fields: [window_start, window_end, window_string]

    # Drop "short" sessions (in this case, Alice's first two visits.)
    - type: Filter
      config:
        language: python
        keep: window_end - window_start > 15

    # Only keep a couple of fields.
    - type: MapToFields
      config:
        fields:
          user: user
          window_string: window_string

    - type: LogForTesting

# Expected:
#  Row(user='bob', window_string='[7.0, 30.0)')
#  Row(user='alice', window_string='[101.0, 125.0)')

Group Into Batches

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Create produce
      config:
        elements:
          - season: 'spring'
            produce: '๐Ÿ“'
          - season: 'spring'
            produce: '๐Ÿฅ•'
          - season: 'spring'
            produce: '๐Ÿ†'
          - season: 'spring'
            produce: '๐Ÿ…'
          - season: 'summer'
            produce: '๐Ÿฅ•'
          - season: 'summer'
            produce: '๐Ÿ…'
          - season: 'summer'
            produce: '๐ŸŒฝ'
          - season: 'fall'
            produce: '๐Ÿฅ•'
          - season: 'fall'
            produce: '๐Ÿ…'
          - season: 'winter'
            produce: '๐Ÿ†'
    - type: Combine
      name: Group into batches
      config:
        language: python
        group_by: season
        combine:
          produce:
            value: produce
            fn:
              type: 'apache_beam.transforms.combiners.TopCombineFn'
              config:
                n: 3
    - type: LogForTesting

# Expected:
#  Row(season='spring', produce=['๐Ÿฅ•', '๐Ÿ“', '๐Ÿ†'])
#  Row(season='summer', produce=['๐Ÿฅ•', '๐Ÿ…', '๐ŸŒฝ'])
#  Row(season='fall', produce=['๐Ÿฅ•', '๐Ÿ…'])
#  Row(season='winter', produce=['๐Ÿ†'])

Simple Filter And Combine

This examples reads from a public file stored on Google Cloud. This requires authenticating with Google Cloud, or setting the file in ReadFromText to a local file.

To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc.

The following example reads mock transaction data from resources/products.csv, performs a simple filter for "Electronics", then calculates the revenue and number of products sold for each product type.

pipeline:
  transforms:
    - type: ReadFromCsv
      name: ReadInputFile
      config:
        path: gs://apache-beam-samples/beam-yaml-blog/products.csv
    - type: Filter
      name: FilterWithCategory
      input: ReadInputFile
      config:
        language: python
        keep: category == "Electronics"
    - type: Combine
      name: CountNumberSold
      input: FilterWithCategory
      config:
        group_by: product_name
        combine:
          num_sold:
            value: product_name
            fn: count
          total_revenue:
            value: price
            fn: sum
    - type: WriteToCsv
      name: WriteOutputFile
      input: CountNumberSold
      config:
        path: output

# Expected:
#  Row(product_name='Headphones', num_sold=2, total_revenue=119.98)
#  Row(product_name='Monitor', num_sold=1, total_revenue=249.99)

Top Largest Per Key

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Create produce
      config:
        elements:
          - produce: '๐Ÿฅ•'
            amount: 3
          - produce: '๐Ÿฅ•'
            amount: 2
          - produce: '๐Ÿ†'
            amount: 1
          - produce: '๐Ÿ…'
            amount: 4
          - produce: '๐Ÿ…'
            amount: 5
          - produce: '๐Ÿ…'
            amount: 3
    - type: Combine
      config:
        language: python
        group_by: produce
        combine:
          biggest:
            value: amount
            fn:
              type: 'apache_beam.transforms.combiners.TopCombineFn'
              config:
                n: 2
    - type: LogForTesting

# Expected:
#  Row(produce='๐Ÿฅ•', biggest=[3, 2])
#  Row(produce='๐Ÿ†', biggest=[1])
#  Row(produce='๐Ÿ…', biggest=[5, 4])

Top Smallest Per Key

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Create produce
      config:
        elements:
          - produce: '๐Ÿฅ•'
            amount: 3
          - produce: '๐Ÿฅ•'
            amount: 2
          - produce: '๐Ÿ†'
            amount: 1
          - produce: '๐Ÿ…'
            amount: 4
          - produce: '๐Ÿ…'
            amount: 5
          - produce: '๐Ÿ…'
            amount: 3
    - type: Combine
      name: Smallest N values per key
      config:
        language: python
        group_by: produce
        combine:
          smallest:
            value: amount
            fn:
              type: 'apache_beam.transforms.combiners.TopCombineFn'
              config:
                n: 2
                reverse: true
    - type: LogForTesting

# Expected:
#  Row(produce='๐Ÿฅ•', smallest=[2, 3])
#  Row(produce='๐Ÿ†', smallest=[1])
#  Row(produce='๐Ÿ…', smallest=[3, 4])

Blueprint


Gcs Text To Bigquery

This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro

pipeline:
  type: chain
  transforms:
    # Step 1: Reading data from GCS
    - type: ReadFromText
      name: ReadFromGCS
      config:
        path: gs://dataflow-samples/shakespeare/kinglear.txt
    # Step 2: Write records out to BigQuery
    - type: WriteToBigQuery
      name: WriteWords
      config:
        table: "apache-beam-testing.yaml_test.words"
        create_disposition: "CREATE_NEVER"
        write_disposition: "WRITE_APPEND"
        num_streams: 1


# Expected:
#  Row(line='Fool\tThou shouldst not have been old till thou hadst')
#  Row(line='\tbeen wise.')
#  Row(line='KING LEAR\tNothing will come of nothing: speak again.')
#  Row(line='\tNever, never, never, never, never!')

Jdbc To Bigquery

This is an example of a Beam YAML pipeline that reads from jdbc database and writes them to BigQuery. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-bigquery

pipeline:
  type: composite
  transforms:
    # Step 1: Reading shipment data from jdbc DB
    - type: ReadFromJdbc
      name: ReadShipments
      config:
        url: "jdbc:mysql://my-host:3306/shipment"
        driver_class_name: "org.sqlite.JDBC"
        query: "SELECT * FROM shipments"
    # Step 2: Write successful records out to BigQuery
    - type: WriteToBigQuery
      name: WriteShipments
      input: ReadShipments
      config:
        table: "apache-beam-testing.yaml_test.shipments"
        create_disposition: "CREATE_NEVER"
        write_disposition: "WRITE_APPEND"
        error_handling:
          output: "deadLetterQueue"
        num_streams: 1
    # Step 3: Write the failed messages to BQ to a dead letter queue JSON file
    - type: WriteToJson
      input: WriteShipments.deadLetterQueue
      config:
        path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"



# Expected:
#  Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
#  Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
#  Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
#  Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')

Kafka To Iceberg

A pipeline that both writes to and reads from the same Kafka topic.

pipeline:
  type: chain
  transforms:
    # Step 1: Reading data from Kafka
    - type: ReadFromKafka
      name: ReadFromMyTopic
      config:
        format: "RAW"
        topic: "{{ TOPIC }}"
        bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
        auto_offset_reset_config: earliest
        consumer_config:
          sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
            username={{ USERNAME }} \
            password={{ PASSWORD }};"
          security.protocol: "SASL_PLAINTEXT"
          sasl.mechanism: "PLAIN"
    # Step 2: Convert Kafka records
    - type: MapToFields
      name: ParseKafkaRecords
      config:
        language: python
        fields:
          text:
            callable: |
              def func(row):
                # Kafka RAW format reads messages as bytes 
                # in the 'payload' field of a Row
                return row.payload.decode('utf-8')
    # Step 3: Write records out to Iceberg
    - type: WriteToIceberg
      name: WriteToAnIcebergTable
      config:
        # Dynamic destinations
        table: "db.users.{zip}"
        catalog_name: "hadoop_catalog"
        catalog_properties:
          type: "hadoop"
          warehouse: "gs://MY-WAREHOUSE"
        # Hadoop catalog config required to run pipeline locally
        # Omit if running on Dataflow
        config_properties:
          "fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
          "fs.gs.auth.service.account.json.keyfile": "/path/to/service/account/key.json"

options:
  streaming: true

# Expected:
#  Row(text='Fool\tThou shouldst not have been old till thou hadst')
#  Row(text='\tbeen wise.')
#  Row(text='KING LEAR\tNothing will come of nothing: speak again.')
#  Row(text='\tNever, never, never, never, never!')

Mysql To Bigquery

This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro

pipeline:
  type: composite
  transforms:
    # Step 1: Reading data from MySql
    - type: ReadFromMySql
      name: ReadFromMySql
      config:
        url: "jdbc:mysql://localhost:12345/shipment?user=orange&password=orange123"
        query: "SELECT * FROM shipments"
        driver_class_name: "com.mysql.cj.jdbc.Driver"
    # Step 2: Write records out to BigQuery
    - type: WriteToBigQuery
      name: WriteShipments
      input: ReadFromMySql
      config:
        table: "apache-beam-testing.yaml_test.shipments"
        create_disposition: "CREATE_NEVER"
        write_disposition: "WRITE_APPEND"
        error_handling:
          output: "deadLetterQueue"
        num_streams: 1
    # Step 3: Write the failed messages to BQ to a dead letter queue JSON file
    - type: WriteToJson
      input: WriteShipments.deadLetterQueue
      config:
        path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"

# Expected:
#  Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
#  Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
#  Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
#  Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')

Oracle To Bigquery

This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro

pipeline:
  type: composite
  transforms:
    # Step 1: Reading data from Oracle
    - type: ReadFromOracle
      name: ReadFromOracle
      config:
        url: "jdbc:oracle:thin:system/oracle@localhost:12345/shipment"
        query: "SELECT * FROM shipments"
        driver_class_name: "oracle.jdbc.OracleDriver"
    # Step 2: Write records out to BigQuery
    - type: WriteToBigQuery
      name: WriteShipments
      input: ReadFromOracle
      config:
        table: "apache-beam-testing.yaml_test.shipments"
        create_disposition: "CREATE_NEVER"
        write_disposition: "WRITE_APPEND"
        error_handling:
          output: "deadLetterQueue"
        num_streams: 1
    # Step 3: Write the failed messages to BQ to a dead letter queue JSON file
    - type: WriteToJson
      input: WriteShipments.deadLetterQueue
      config:
        path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"

# Expected:
#  Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
#  Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
#  Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
#  Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')

Postgres To Bigquery

This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro

pipeline:
  type: composite
  transforms:
    # Step 1: Reading data from Postgres
    - type: ReadFromPostgres
      name: ReadFromPostgres
      config:
        url: "jdbc:postgresql://localhost:12345/shipment?user=user&password=postgres123"
        query: "SELECT * FROM shipments"
        driver_class_name: "org.postgresql.Driver"
    # Step 2: Write records out to BigQuery
    - type: WriteToBigQuery
      name: WriteShipments
      input: ReadFromPostgres
      config:
        table: "apache-beam-testing.yaml_test.shipments"
        create_disposition: "CREATE_NEVER"
        write_disposition: "WRITE_APPEND"
        error_handling:
          output: "deadLetterQueue"
        num_streams: 1
    # Step 3: Write the failed messages to BQ to a dead letter queue JSON file
    - type: WriteToJson
      input: WriteShipments.deadLetterQueue
      config:
        path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"

# Expected:
#  Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
#  Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
#  Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
#  Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')

Pubsub Subscription To Bigquery

This is an example of a Beam YAML pipeline that reads messages from Pub/Sub and writes them to BigQuery. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-subscription-to-bigquery

pipeline:
  type: composite
  transforms:
    # Step 1: Reading messages from Pub/Sub subscription
    - type: ReadFromPubSub
      name: ReadMessages
      config:
        subscription: "projects/apache-beam-testing/subscription/my-subscription"
        format: JSON
        schema: 
          type: object
          properties:
            data: {type: BYTES}
            attributes: {type: object}
    # Step 2: Write successful records out to BigQuery
    - type: WriteToBigQuery
      name: WriteMessages
      input: ReadMessages
      config:
        table: "apache-beam-testing.yaml_test.order_data"
        create_disposition: "CREATE_NEVER"
        write_disposition: "WRITE_APPEND"
        error_handling:
          output: "deadLetterQueue"
        num_streams: 1
    # Step 3: Write the failed messages to BQ to a JSON file
    - type: WriteToJson
      input: WriteMessages.deadLetterQueue
      config:
        path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"

options:
  streaming: true


# Expected:
#  Row(label='37a', rank=1)
#  Row(label='37b', rank=4)
#  Row(label='37c', rank=3)
#  Row(label='37d', rank=2)

Pubsub To Iceberg

A pipeline that both writes to and reads from the same Kafka topic.

pipeline:
  type: chain
  transforms:
    # Step 1: Reading messages from Pub/Sub topic
    - type: ReadFromPubSub
      name: ReadMessages
      config:
        topic: "projects/apache-beam-testing/topics/my-topic"
        format: JSON
        schema: 
          type: object
          properties:
            data: {type: BYTES}
            attributes: {type: object}
    # Step 2: Write records out to Iceberg
    - type: WriteToIceberg
      name: WriteToAnIcebergTable
      config:
        # Dynamic destinations
        table: "db.users.{zip}"
        catalog_name: "hadoop_catalog"
        catalog_properties:
          type: "hadoop"
          warehouse: "gs://MY-WAREHOUSE"
        # Hadoop catalog config required to run pipeline locally
        # Omit if running on Dataflow
        config_properties:
          "fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
          "fs.gs.auth.service.account.json.keyfile": "/path/to/service/account/key.json"

options:
  streaming: true

# Expected:
#  Row(label='37a', rank=1)
#  Row(label='37b', rank=4)
#  Row(label='37c', rank=3)
#  Row(label='37d', rank=2)

Pubsub Topic To Bigquery

This is an example of a Beam YAML pipeline that reads messages from Pub/Sub and writes them to BigQuery. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery

pipeline:
  type: composite
  transforms:
    # Step 1: Reading messages from Pub/Sub topic
    - type: ReadFromPubSub
      name: ReadMessages
      config:
        topic: "projects/apache-beam-testing/topics/my-topic"
        format: JSON
        schema: 
          type: object
          properties:
            data: {type: BYTES}
            attributes: {type: object}
    # Step 2: Write successful records out to BigQuery
    - type: WriteToBigQuery
      name: WriteMessages
      input: ReadMessages
      config:
        table: "apache-beam-testing.yaml_test.order_data"
        create_disposition: "CREATE_NEVER"
        write_disposition: "WRITE_APPEND"
        error_handling:
          output: "deadLetterQueue"
        num_streams: 1
    # Step 3: Write the failed messages to BQ to a JSON file
    - type: WriteToJson
      input: WriteMessages.deadLetterQueue
      config:
        path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"

options:
  streaming: true


# Expected:
#  Row(label='37a', rank=1)
#  Row(label='37b', rank=4)
#  Row(label='37c', rank=3)
#  Row(label='37d', rank=2)

Spanner To Avro

This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro

pipeline:
  type: chain
  transforms:
    # Step 1: Reading shipment data from Spanner DB
    - type: ReadFromSpanner
      name: ReadShipments
      config:
        project_id: 'apache-beam-testing'
        instance_id: 'shipment-test'
        database_id: 'shipment'
        query: 'SELECT * FROM shipments'
    # Step 2: Write successful records out to GCS avro files
    - type: WriteToAvro
      config:
        path: "gs://my-bucket/yaml-123/out.avro"


# Expected:
#  Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
#  Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
#  Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
#  Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')

Spanner To Bigquery

This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro

pipeline:
  type: chain
  transforms:
    # Step 1: Reading data from Spanner
    - type: ReadFromSpanner
      name: ReadShipments
      config:
        project_id: 'apache-beam-testing'
        instance_id: 'shipment-test'
        database_id: 'shipment'
        query: 'SELECT * FROM shipments'
    # Step 2: Write records out to BigQuery
    - type: WriteToBigQuery
      name: WriteShipments
      config:
        table: "apache-beam-testing.yaml_test.shipments"
        create_disposition: "CREATE_NEVER"
        write_disposition: "WRITE_APPEND"
        num_streams: 1

# Expected:
#  Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
#  Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
#  Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
#  Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')

Sqlserver To Bigquery

This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro

pipeline:
  type: composite
  transforms:
    # Step 1: Reading data from SqlServer
    - type: ReadFromSqlServer
      name: ReadFromSqlServer
      config:
        url: "jdbc:sqlserver://localhost:12345;databaseName=shipment;user=apple;password=apple123;encrypt=false;trustServerCertificate=true"
        query: "SELECT * FROM shipments"
        driver_class_name: "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    # Step 2: Write records out to BigQuery
    - type: WriteToBigQuery
      name: WriteShipments
      input: ReadFromSqlServer
      config:
        table: "apache-beam-testing.yaml_test.shipments"
        create_disposition: "CREATE_NEVER"
        write_disposition: "WRITE_APPEND"
        error_handling:
          output: "deadLetterQueue"
        num_streams: 1
    # Step 3: Write the failed messages to BQ to a dead letter queue JSON file
    - type: WriteToJson
      input: WriteShipments.deadLetterQueue
      config:
        path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"

# Expected:
#  Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
#  Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
#  Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
#  Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
#  Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')

Elementwise


Explode

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Gardening plants
      config:
        elements:
          - produce: ['๐Ÿ“Strawberry', '๐Ÿฅ”Potato']
            season: 'spring'
          - produce: ['๐Ÿฅ•Carrot', '๐Ÿ†Eggplant', '๐Ÿ…Tomato']
            season: 'summer'
    - type: Explode
      name: Flatten lists
      config:
        fields: [produce]
    - type: LogForTesting

# Expected:
#  Row(produce='๐Ÿ“Strawberry', season='spring')
#  Row(produce='๐Ÿฅ•Carrot', season='summer')
#  Row(produce='๐Ÿ†Eggplant', season='summer')
#  Row(produce='๐Ÿ…Tomato', season='summer')
#  Row(produce='๐Ÿฅ”Potato', season='spring')

Filter Callable

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Gardening plants
      config:
        elements:
          - 'icon': '๐Ÿ“'
            'name': 'Strawberry'
            'duration': 'perennial'
          - 'icon': '๐Ÿฅ•'
            'name': 'Carrot'
            'duration': 'biennial'
          - 'icon': '๐Ÿ†'
            'name': 'Eggplant'
            'duration': 'perennial'
          - 'icon': '๐Ÿ…'
            'name': 'Tomato'
            'duration': 'annual'
          - 'icon': '๐Ÿฅ”'
            'name': 'Potato'
            'duration': 'perennial'
    - type: Filter
      name: Filter perennials
      config:
        language: python
        keep:
          callable: "lambda plant: plant.duration == 'perennial'"
    - type: LogForTesting

# Expected:
#  Row(icon='๐Ÿ“', name='Strawberry', duration='perennial')
#  Row(icon='๐Ÿ†', name='Eggplant', duration='perennial')
#  Row(icon='๐Ÿฅ”', name='Potato', duration='perennial')

Map To Fields Callable

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Gardening plants
      config:
        elements:
          - '# ๐Ÿ“Strawberry'
          - '# ๐Ÿฅ•Carrot'
          - '# ๐Ÿ†Eggplant'
          - '# ๐Ÿ…Tomato'
          - '# ๐Ÿฅ”Potato'
    - type: MapToFields
      name: Strip header
      config:
        language: python
        fields:
          element:
            callable: "lambda row: row.element.strip('# \\n')"
    - type: LogForTesting

# Expected:
#  Row(element='๐Ÿ“Strawberry')
#  Row(element='๐Ÿฅ•Carrot')
#  Row(element='๐Ÿ†Eggplant')
#  Row(element='๐Ÿ…Tomato')
#  Row(element='๐Ÿฅ”Potato')

Map To Fields With Deps

pipeline:
  type: chain
  transforms:
    - type: Create
      config:
        elements:
          - {sdk: MapReduce, year: 2004}
          - {sdk: MillWheel, year: 2008}
          - {sdk: Flume, year: 2010}
          - {sdk: Dataflow, year: 2014}
          - {sdk: Apache Beam, year: 2016}
    - type: MapToFields
      name: ToRoman
      config:
        language: python
        fields:
          tool_name: sdk
          year:
            callable: |
              import roman

              def convert(row):
                return roman.toRoman(row.year)
        dependencies:
          - 'roman>=4.2'
    - type: LogForTesting

# Expected:
#  Row(tool_name='MapReduce', year='MMIV')
#  Row(tool_name='MillWheel', year='MMVIII')
#  Row(tool_name='Flume', year='MMX')
#  Row(tool_name='Dataflow', year='MMXIV')
#  Row(tool_name='Apache Beam', year='MMXVI')

Map To Fields With Java Deps

pipeline:
  type: chain
  transforms:
    - type: Create
      config:
        elements:
          - {sdk: MapReduce, year: 2004}
          - {sdk: MillWheel, year: 2008}
          - {sdk: Flume, year: 2010}
          - {sdk: Dataflow, year: 2014}
          - {sdk: Apache Beam, year: 2016}
    - type: MapToFields
      name: ToRoman
      config:
        language: java
        fields:
          tool_name: sdk
          year:
            callable: |
              import org.apache.beam.sdk.values.Row;
              import java.util.function.Function;
              import com.github.chaosfirebolt.converter.RomanInteger;

              public class MyFunction implements Function<Row, String> {
                public String apply(Row row) {
                  return RomanInteger.parse(
                      String.valueOf(row.getInt64("year"))).toString();
                }
              }
        dependencies:
          - 'com.github.chaosfirebolt.converter:roman-numeral-converter:2.1.0'
    - type: LogForTesting

# Expected:
#  Row(tool_name='MapReduce', year='MMIV')
#  Row(tool_name='MillWheel', year='MMVIII')
#  Row(tool_name='Flume', year='MMX')
#  Row(tool_name='Dataflow', year='MMXIV')
#  Row(tool_name='Apache Beam', year='MMXVI')

Map To Fields With Resource Hints

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Gardening plants
      config:
        elements:
          - '# ๐Ÿ“Strawberry'
          - '# ๐Ÿฅ•Carrot'
          - '# ๐Ÿ†Eggplant'
          - '# ๐Ÿ…Tomato'
          - '# ๐Ÿฅ”Potato'
    - type: MapToFields
      name: Strip header
      config:
        language: python
        fields:
          element: "element.strip('# \\n')"
      # Transform-specific hint overrides top-level hint.
      resource_hints:
        min_ram: 3GB
    - type: LogForTesting
  # Top-level resource hint.
  resource_hints:
    min_ram: 1GB

# Expected:
#  Row(element='๐Ÿ“Strawberry')
#  Row(element='๐Ÿฅ•Carrot')
#  Row(element='๐Ÿ†Eggplant')
#  Row(element='๐Ÿ…Tomato')
#  Row(element='๐Ÿฅ”Potato')

Regex Matches

This pipline creates a series of {plant: description} key pairs, matches all elements to a valid regex, filters out non-matching entries, then logs the output.

pipeline:
  type: chain
  transforms:
    - type: Create
      name: Garden plants
      config:
        elements:
          - plant: '๐Ÿ“, Strawberry, perennial'
          - plant:  '๐Ÿฅ•, Carrot, biennial ignoring trailing words'
          - plant:  '๐Ÿ†, Eggplant, perennial'
          - plant:  '๐Ÿ…, Tomato, annual'
          - plant:  '๐Ÿฅ”, Potato, perennial'
          - plant:  '# ๐ŸŒ, invalid, format'
          - plant:  'invalid, ๐Ÿ‰, format'
    - type: MapToFields
      name: Parse plants
      config:
        language: python
        fields:
          plant:
            callable: |
              import re
              def regex_filter(row):
                match = re.match("(?P<icon>[^\s,]+), *(\w+), *(\w+)", row.plant)
                return match.group(0) if match else match

    # Filters out None values produced by values that don't match regex
    - type: Filter
      config:
        language: python
        keep: plant
    - type: LogForTesting

# Expected:
#  Row(plant='๐Ÿ“, Strawberry, perennial')
#  Row(plant='๐Ÿฅ•, Carrot, biennial')
#  Row(plant='๐Ÿ†, Eggplant, perennial')
#  Row(plant='๐Ÿ…, Tomato, annual')
#  Row(plant='๐Ÿฅ”, Potato, perennial')

Simple Filter

This examples reads from a public file stored on Google Cloud. This requires authenticating with Google Cloud, or setting the file in ReadFromText to a local file.

To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc.

The following example reads mock transaction data from resources/products.csv then performs a simple filter for "Electronics".

pipeline:
  transforms:
    - type: ReadFromCsv
      name: ReadInputFile
      config:
        path: gs://apache-beam-samples/beam-yaml-blog/products.csv
    - type: Filter
      name: FilterWithCategory
      input: ReadInputFile
      config:
        language: python
        keep: category == "Electronics"
    - type: WriteToCsv
      name: WriteOutputFile
      input: FilterWithCategory
      config:
        path: output

# Expected:
#  Row(transaction_id='T0012', product_name='Headphones', category='Electronics', price=59.99)
#  Row(transaction_id='T0104', product_name='Headphones', category='Electronics', price=59.99)
#  Row(transaction_id='T0302', product_name='Monitor', category='Electronics', price=249.99)

IO


Iceberg Read

The pipeline reads from Iceberg table 'db.users.NY' on GCS with Hadoop catalog configured. The table, if not exists already, can be created and populated using the iceberg_write.yaml pipeline.

Replace 'gs://MY-WAREHOUSE' with the correct GCS bucket name. If this example is run locally then replace '/path/to/service/account/key.json' with the correct path to your service account key .json file on your machine. Otherwise, if Dataflow runner is used then omit the 'config_properties' field.

pipeline:
  type: chain
  transforms:
    - type: ReadFromIceberg
      name: ReadFromAnIcebergTable
      config:
        table: "db.users.NY"
        catalog_name: "hadoop_catalog"
        catalog_properties:
          type: "hadoop"
          warehouse: "gs://MY-WAREHOUSE"
        # Hadoop catalog config required to run pipeline locally
        # Omit if running on Dataflow
        config_properties:
          "fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
          "fs.gs.auth.service.account.json.keyfile": "/path/to/service/account/key.json"

    - type: LogForTesting

    - type: WriteToCsv
      name: OutputToCSVFile
      config:
        path: "gs://MY-WAREHOUSE/my-csv.csv"

# Expected:
#  Row(id=3, name='Smith', email='smith@example.com', zip='NY')
#  Row(id=4, name='Beamberg', email='beamberg@example.com', zip='NY')

Iceberg Write

The pipeline uses Dynamic destinations (see https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations) to dynamically create and select a table destination based on field values in the incoming records.

Replace 'gs://MY-WAREHOUSE' with the correct GCS bucket name. If this example is run locally then replace '/path/to/service/account/key.json' with the correct path to your service account key .json file on your machine. Otherwise, if Dataflow runner is used then omit the 'config_properties' field.

pipeline:
  type: chain
  transforms:
    - type: Create
      name: CreateSampleData
      config:
        elements:
          - { id: 1, name: "John", email: "john@example.com", zip: "WA" }
          - { id: 2, name: "Jane", email: "jane@example.com", zip: "CA" }
          - { id: 3, name: "Smith", email: "smith@example.com",zip: "NY"}
          - { id: 4, name: "Beamberg", email: "beamberg@example.com", zip: "NY" }

    - type: LogForTesting

    - type: WriteToIceberg
      name: WriteToAnIcebergTable
      config:
        # Dynamic destinations
        table: "db.users.{zip}"
        catalog_name: "hadoop_catalog"
        catalog_properties:
          type: "hadoop"
          warehouse: "gs://MY-WAREHOUSE"
        # Hadoop catalog config required to run pipeline locally
        # Omit if running on Dataflow
        config_properties:
          "fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
          "fs.gs.auth.service.account.json.keyfile": "/path/to/service/account/key.json"

# Expected:
#  Row(id=1, name='John', email='john@example.com', zip='WA')
#  Row(id=2, name='Jane', email='jane@example.com', zip='CA')
#  Row(id=3, name='Smith', email='smith@example.com', zip='NY')
#  Row(id=4, name='Beamberg', email='beamberg@example.com', zip='NY')

Kafka

A pipeline that both writes to and reads from the same Kafka topic.

pipeline:
  transforms:
    - type: ReadFromText
      name: ReadFromGCS
      config:
        path: gs://dataflow-samples/shakespeare/kinglear.txt

    - type: MapToFields
      name: BuildKafkaRecords
      input: ReadFromGCS
      config:
        language: python
        fields:
          value:
            callable: |
              def func(row):
                return row.line.encode('utf-8')
            output_type: bytes

    - type: WriteToKafka
      name: SendRecordsToKafka
      input: BuildKafkaRecords
      config:
        format: "RAW"
        topic: "{{ TOPIC }}"
        bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
        producer_config_updates:
          sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
            username={{ USERNAME }} \
            password={{ PASSWORD }};"
          security.protocol: "SASL_PLAINTEXT"
          sasl.mechanism: "PLAIN"

    - type: ReadFromKafka
      name: ReadFromMyTopic
      config:
        format: "RAW"
        topic: "{{ TOPIC }}"
        bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
        auto_offset_reset_config: earliest
        consumer_config:
          sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
            username={{ USERNAME }} \
            password={{ PASSWORD }};"
          security.protocol: "SASL_PLAINTEXT"
          sasl.mechanism: "PLAIN"

    - type: MapToFields
      name: ParseKafkaRecords
      input: ReadFromMyTopic
      config:
        language: python
        fields:
          text:
            callable: |
              def func(row):
                # Kafka RAW format reads messages as bytes 
                # in the 'payload' field of a Row
                return row.payload.decode('utf-8')

    - type: LogForTesting
      input: ParseKafkaRecords

# Since the pipeline both writes to and reads from a Kafka topic, we expect
#   the first pipeline component to write the rows containing the `value`
#   field as bytes to Kafka, and the second pipeline component to read the byte
#   messages from Kafka before parsing them as string in the new `text` field.
# Expected:
#  Row(value=b'Fool\tThou shouldst not have been old till thou hadst')
#  Row(value=b'\tbeen wise.')
#  Row(value=b'KING LEAR\tNothing will come of nothing: speak again.')
#  Row(value=b'\tNever, never, never, never, never!')
#  Row(text='Fool\tThou shouldst not have been old till thou hadst')
#  Row(text='\tbeen wise.')
#  Row(text='KING LEAR\tNothing will come of nothing: speak again.')
#  Row(text='\tNever, never, never, never, never!')

Spanner Read

pipeline:
  transforms:

    # Reading data from a Spanner database. The table used here has the following columns:
    # shipment_id (String), customer_id (String), shipment_date (String), shipment_cost (Float64), customer_name (String), customer_email (String)
    # ReadFromSpanner transform is called using project_id, instance_id, database_id and a query
    # A table with a list of columns can also be specified instead of a query
    - type: ReadFromSpanner
      name: ReadShipments
      config:
        project_id: 'apache-beam-testing'
        instance_id: 'shipment-test'
        database_id: 'shipment'
        query: 'SELECT * FROM shipments'

    # Filtering the data based on a specific condition
    # Here, the condition is used to keep only the rows where the customer_id is 'C1'
    - type: Filter
      name: FilterShipments
      input: ReadShipments
      config:
        language: python
        keep: "customer_id == 'C1'"

    # Mapping the data fields and applying transformations
    # A new field 'shipment_cost_category' is added with a custom transformation
    # A callable is defined to categorize shipment cost
    - type: MapToFields
      name: MapFieldsForSpanner
      input: FilterShipments
      config:
        language: python
        fields:
          shipment_id: shipment_id
          customer_id: customer_id
          shipment_date: shipment_date
          shipment_cost: shipment_cost
          customer_name: customer_name
          customer_email: customer_email
          shipment_cost_category:
            callable: |
              def categorize_cost(row):
                  cost = float(row[3])
                  if cost < 50:
                      return 'Low Cost'
                  elif cost < 200:
                      return 'Medium Cost'
                  else:
                      return 'High Cost'

    # Writing the transformed data to a CSV file
    - type: WriteToCsv
      name: WriteBig
      input: MapFieldsForSpanner
      config:
        path: shipments.csv


# On executing the above pipeline, a new CSV file is created with the following records
# Expected:
#  Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com', shipment_cost_category='Medium Cost')
#  Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com', shipment_cost_category='Low Cost')

Spanner Write

pipeline:
  transforms:

    # Step 1: Creating rows to be written to Spanner
    # The element names correspond to the column names in the Spanner table
    - type: Create
      name: CreateRows
      config:
        elements:
          - shipment_id: "S5"
            customer_id: "C5"
            shipment_date: "2023-05-09"
            shipment_cost: 300.0
            customer_name: "Erin"
            customer_email: "erin@example.com"

    # Step 2: Writing the created rows to a Spanner database
    # We require the project ID, instance ID, database ID and table ID to connect to Spanner
    # Error handling can be specified optionally to ensure any failed operations aren't lost
    # The failed data is passed on in the pipeline and can be handled
    - type: WriteToSpanner
      name: WriteSpanner
      input: CreateRows
      config:
        project_id: 'apache-beam-testing'
        instance_id: 'shipment-test'
        database_id: 'shipment'
        table_id: 'shipments'
        error_handling:
          output: my_error_output

    # Step 3: Writing the failed records to a JSON file
    - type: WriteToJson
      input: WriteSpanner.my_error_output
      config:
        path: errors.json

# Expected:
#  Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')

ML


Bigtable Enrichment

pipeline:
  type: chain
  transforms:

  # Step 1: Creating a collection of elements that needs
  # to be enriched. Here we are simulating sales data
    - type: Create
      config:
        elements:
        - sale_id: 1
          customer_id: 1
          product_id: 1
          quantity: 1

  # Step 2: Enriching the data with Bigtable
  # This specific bigtable stores product data in the below format
  # product:product_id, product:product_name, product:product_stock
    - type: Enrichment
      config:
        enrichment_handler: 'BigTable'
        handler_config:
          project_id: 'apache-beam-testing'
          instance_id: 'beam-test'
          table_id: 'bigtable-enrichment-test'
          row_key: 'product_id'
        timeout: 30

  # Step 3: Logging for testing
  # This is a simple way to view the enriched data
  # We can also store it somewhere like a json file
    - type: LogForTesting

options:
  yaml_experimental_features: Enrichment

# Expected:
#  Row(sale_id=1, customer_id=1, product_id=1, quantity=1, product={'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2'})

Enrich Spanner With Bigquery

pipeline:
  transforms:
    # Step 1: Read orders details from Spanner
    - type: ReadFromSpanner
      name: ReadOrders
      config:
        project_id: 'apache-beam-testing'
        instance_id: 'orders-test'
        database_id: 'order-database'
        query: 'SELECT customer_id, product_id, order_date, order_amount FROM orders'

    # Step 2: Enrich order details with customers details from BigQuery
    - type: Enrichment
      name: Enriched
      input: ReadOrders
      config:
        enrichment_handler: 'BigQuery'
        handler_config:
          project: "apache-beam-testing"
          table_name: "apache-beam-testing.ALL_TEST.customers"
          row_restriction_template: "customer_id = 1001 or customer_id = 1003"
          fields: ["customer_id"]

    # Step 3: Map enriched values to Beam schema
    # TODO: This should be removed when schema'd enrichment is available
    - type: MapToFields
      name: MapEnrichedValues
      input: Enriched
      config:
        language: python
        fields:
          customer_id:
            callable: 'lambda x: x.customer_id'
            output_type: integer
          customer_name:
            callable: 'lambda x: x.customer_name'
            output_type: string
          customer_email:
            callable: 'lambda x: x.customer_email'
            output_type: string 
          product_id:
            callable: 'lambda x: x.product_id'
            output_type: integer
          order_date:
            callable: 'lambda x: x.order_date'
            output_type: string
          order_amount:
            callable: 'lambda x: x.order_amount'
            output_type: integer

    # Step 4: Filter orders with amount greater than 110
    - type: Filter
      name: FilterHighValueOrders
      input: MapEnrichedValues
      config:
        keep: "order_amount > 110"
        language: "python"


    # Step 6: Write processed order to another spanner table
    #   Note: Make sure to replace $VARS with your values.
    - type: WriteToSpanner
      name: WriteProcessedOrders
      input: FilterHighValueOrders
      config:
        project_id: '$PROJECT'
        instance_id: '$INSTANCE'
        database_id: '$DATABASE'
        table_id: '$TABLE'
        error_handling:
          output: my_error_output

    # Step 7: Handle write errors by writing to JSON
    - type: WriteToJson
      name: WriteErrorsToJson
      input: WriteProcessedOrders.my_error_output
      config:
        path: 'errors.json'

options:
  yaml_experimental_features: Enrichment

# Expected:
#  Row(customer_id=1001, customer_name='Alice', customer_email='alice@gmail.com', product_id=2001, order_date='24-03-24', order_amount=150)

Sentiment Analysis Streaming Sentiment Analysis

The pipeline first reads the YouTube comments .csv dataset from GCS bucket and performs necessary clean-up before writing it to a Kafka topic. The pipeline then reads from that Kafka topic and applies various transformation logic before RunInference transform performs remote inference with the Vertex AI model handler. The inference result is then written to a BigQuery table.

pipeline:
  transforms:
    # The YouTube comments dataset contains rows that
    # have unexpected schema (e.g. rows with more fields,
    # rows with fields that contain string instead of
    # integer, etc...). PyTransform helps construct
    # the logic to properly read in the csv dataset as
    # a schema'd PCollection.
    - type: PyTransform
      name: ReadFromGCS
      input: {}
      config:
        constructor: __callable__
        kwargs:
          source: |
            def ReadYoutubeCommentsCsv(pcoll, file_pattern):
              def _to_int(x):
                try:
                  return int(x)
                except (ValueError):
                  return None

              return (
                  pcoll 
                  | beam.io.ReadFromCsv(
                        file_pattern,
                        names=['video_id', 'comment_text', 'likes', 'replies'],
                        on_bad_lines='skip',
                        converters={'likes': _to_int, 'replies': _to_int})
                  | beam.Filter(lambda row: 
                        None not in list(row._asdict().values()))
                  | beam.Map(lambda row: beam.Row(
                        video_id=row.video_id,
                        comment_text=row.comment_text,
                        likes=int(row.likes),
                        replies=int(row.replies)))
              )
          file_pattern: "{{ GCS_PATH }}"

    # Send the rows as Kafka records to an existing
    # Kafka topic.
    - type: WriteToKafka
      name: SendRecordsToKafka
      input: ReadFromGCS
      config:
        format: "JSON"
        topic: "{{ TOPIC }}"
        bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
        producer_config_updates:
          sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
            username={{ USERNAME }} \
            password={{ PASSWORD }};"
          security.protocol: "SASL_PLAINTEXT"
          sasl.mechanism: "PLAIN"

    # Read Kafka records from an existing Kafka topic.
    - type: ReadFromKafka
      name: ReadFromMyTopic
      config:
        format: "JSON"
        schema: |
          {
            "type": "object",
            "properties": {
              "video_id": { "type": "string" },
              "comment_text": { "type": "string" },
              "likes": { "type": "integer" },
              "replies": { "type": "integer" }
            }
          }
        topic: "{{ TOPIC }}"
        bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
        auto_offset_reset_config: earliest
        consumer_config:
          sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
            username={{ USERNAME }} \
            password={{ PASSWORD }};"
          security.protocol: "SASL_PLAINTEXT"
          sasl.mechanism: "PLAIN"

    # Remove unexpected characters from the YouTube
    # comment string, e.g. emojis, ascii characters
    # outside the common day-to-day English.
    - type: MapToFields
      name: RemoveWeirdCharacters
      input: ReadFromMyTopic
      config:
        language: python
        fields:
          video_id: video_id
          comment_text:
            callable: |
              import re
              def filter(row):
                # regex match and keep letters, digits, whitespace and common punctuations,
                # i.e. remove non printable ASCII characters (character codes not in
                # the range 32 - 126, or \x20 - \x7E).
                return re.sub(r'[^\x20-\x7E]', '', row.comment_text).strip()
          likes: likes
          replies: replies

    # Remove rows that have empty comment text
    # after previously removing unexpected characters.
    - type: Filter
      name: FilterForProperComments
      input: RemoveWeirdCharacters
      config:
        language: python
        keep:
          callable: |
            def filter(row):
              return len(row.comment_text) > 0

    # HuggingFace's distilbert-base-uncased is used for inference,
    # which accepts string with a maximum limit of 250 tokens.
    # Some of the comment strings can be large and are well over
    # this limit after tokenization.
    # This transform truncates the comment string and ensure
    # every comment satisfy the maximum token limit.
    - type: MapToFields
      name: Truncating
      input: FilterForProperComments
      config:
        language: python
        dependencies:
          - 'transformers>=4.48.0,<4.49.0'
        fields:
          video_id: video_id
          comment_text:
            callable: |
              from transformers import AutoTokenizer

              tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True)

              def truncate_sentence(row):
                tokens = tokenizer.tokenize(row.comment_text)
                if len(tokens) >= 250:
                  tokens = tokens[:250]
                  truncated_sentence = tokenizer.convert_tokens_to_string(tokens)
                else:
                  truncated_sentence = row.comment_text

                return truncated_sentence
          likes: likes
          replies: replies

    # HuggingFace's distilbert-base-uncased does not distinguish
    # between upper and lower case tokens.
    # This pipeline makes the same point by converting all words
    # into lowercase.
    - type: MapToFields
      name: LowerCase
      input: Truncating
      config:
        language: python
        fields:
          video_id: video_id
          comment_text: "comment_text.lower()"
          likes: likes
          replies: replies

    # With VertexAIModelHandlerJSON model handler,
    # RunInference transform performs remote inferences by
    # sending POST requests to the Vertex AI endpoint that
    # our distilbert-base-uncased model is being deployed to.
    - type: RunInference
      name: DistilBERTRemoteInference
      input: LowerCase
      config:
        inference_tag: "inference"
        model_handler:
          type: "VertexAIModelHandlerJSON"
          config:
            endpoint_id: "{{ ENDPOINT }}"
            project: "{{ PROJECT }}"
            location: "{{ LOCATION }}"
            preprocess:
              callable: 'lambda x: x.comment_text'

    # Parse inference results output
    - type: MapToFields
      name: FormatInferenceOutput
      input: DistilBERTRemoteInference
      config:
        language: python
        fields:
          video_id:
            expression: video_id
            output_type: string
          comment_text:
            callable: "lambda x: x.comment_text"
            output_type: string
          label:
            callable: "lambda x: x.inference.inference[0]['label']"
            output_type: string
          score:
            callable: "lambda x: x.inference.inference[0]['score']"
            output_type: number
          likes:
            expression: likes
            output_type: integer
          replies:
            expression: replies
            output_type: integer

    # Assign windows to each element of the unbounded PCollection.
    - type: WindowInto
      name: Windowing
      input: FormatInferenceOutput
      config:
        windowing:
          type: fixed
          size: 30s

    # Write all inference results to a BigQuery table.
    - type: WriteToBigQuery
      name: WriteInferenceResultsToBQ
      input: Windowing
      config:
        table: "{{ PROJECT }}.{{ DATASET }}.{{ TABLE }}"
        create_disposition: CREATE_IF_NEEDED
        write_disposition: WRITE_APPEND

options:
  yaml_experimental_features: ML

# Expected:
#  Row(video_id='XpVt6Z1Gjjo', comment_text='I AM HAPPY', likes=1, replies=1)
#  Row(video_id='XpVt6Z1Gjjo', comment_text='I AM SAD', likes=1, replies=1)
#  Row(video_id='XpVt6Z1Gjjo', comment_text='ยงรฤ', likes=1, replies=1)
#  Row(video_id='XpVt6Z1Gjjo', comment_text='i am happy', label='POSITIVE', score=0.95, likes=1, replies=1)
#  Row(video_id='XpVt6Z1Gjjo', comment_text='i am sad', label='NEGATIVE', score=0.95, likes=1, replies=1)