Example pipelines using the Beam YAML API. These examples can also be found on github.
Note: These examples below are automatically tested for correctness and may be used as a starting point for your own pipelines.
This examples reads from a public file stored on Google Cloud. This
requires authenticating with Google Cloud, or setting the file in
ReadFromText
to a local file.
To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc.
This pipeline reads in a text file, counts distinct words found in the text, then logs a row containing each word and its count.
pipeline:
type: chain
transforms:
# Read text file into a collection of rows, each with one field, "line"
- type: ReadFromText
config:
path: gs://dataflow-samples/shakespeare/kinglear.txt
# Split line field in each row into list of words
- type: MapToFields
config:
language: python
fields:
words:
callable: |
import re
def my_mapping(row):
return re.findall(r"[A-Za-z\']+", row.line.lower())
# Explode each list of words into separate rows
- type: Explode
config:
fields: words
# Since each word is now distinct row, rename field to "word"
- type: MapToFields
config:
fields:
word: words
# Group by distinct words in the collection and add field "count" that
# contains number of instances, or count, for each word in the collection.
- type: Combine
config:
language: python
group_by: word
combine:
count:
value: word
fn: count
# Log out results
- type: LogForTesting
# Expected:
# Row(word='king', count=311)
# Row(word='lear', count=253)
# Row(word='dramatis', count=1)
# Row(word='personae', count=1)
# Row(word='of', count=483)
# Row(word='britain', count=2)
# Row(word='france', count=32)
# Row(word='duke', count=26)
# Row(word='burgundy', count=20)
# Row(word='cornwall', count=75)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce names
config:
elements:
- season: 'spring'
produce: '๐'
- season: 'spring'
produce: '๐ฅ'
- season: 'summer'
produce: '๐ฅ'
- season: 'fall'
produce: '๐ฅ'
- season: 'spring'
produce: '๐'
- season: 'winter'
produce: '๐'
- season: 'spring'
produce: '๐
'
- season: 'summer'
produce: '๐
'
- season: 'fall'
produce: '๐
'
- season: 'summer'
produce: '๐ฝ'
- type: Combine
name: Shortest names per key
config:
language: python
group_by: season
combine:
produce: count
- type: LogForTesting
# Expected:
# Row(season='spring', produce=4)
# Row(season='summer', produce=3)
# Row(season='fall', produce=2)
# Row(season='winter', produce=1)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- produce: '๐ฅ'
amount: 3
- produce: '๐ฅ'
amount: 2
- produce: '๐'
amount: 1
- produce: '๐
'
amount: 4
- produce: '๐
'
amount: 5
- produce: '๐
'
amount: 3
- type: Combine
name: Get max value per key
config:
language: python
group_by: produce
combine:
amount: max
- type: LogForTesting
# Expected:
# Row(produce='๐ฅ', amount=3)
# Row(produce='๐', amount=1)
# Row(produce='๐
', amount=5)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- produce: '๐ฅ'
amount: 3
- produce: '๐ฅ'
amount: 2
- produce: '๐'
amount: 1
- produce: '๐
'
amount: 4
- produce: '๐
'
amount: 5
- produce: '๐
'
amount: 3
- type: Combine
name: Get mean value per key
config:
language: python
group_by: produce
combine:
amount: mean
- type: LogForTesting
# Expected:
# Row(produce='๐ฅ', amount=2.5)
# Row(produce='๐', amount=1.0)
# Row(produce='๐
', amount=4.0)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- produce: '๐ฅ'
amount: 3
- produce: '๐ฅ'
amount: 2
- produce: '๐'
amount: 1
- produce: '๐
'
amount: 4
- produce: '๐
'
amount: 5
- produce: '๐
'
amount: 3
- type: Combine
name: Get min value per key
config:
language: python
group_by: produce
combine:
amount: min
- type: LogForTesting
# Expected:
# Row(produce='๐ฅ', amount=2)
# Row(produce='๐', amount=1)
# Row(produce='๐
', amount=3)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- recipe: 'pie'
fruit: 'raspberry'
quantity: 1
unit_price: 3.50
- recipe: 'pie'
fruit: 'blackberry'
quantity: 1
unit_price: 4.00
- recipe: 'pie'
fruit: 'blueberry'
quantity: 1
unit_price: 2.00
- recipe: 'muffin'
fruit: 'blueberry'
quantity: 2
unit_price: 2.00
- recipe: 'muffin'
fruit: 'banana'
quantity: 3
unit_price: 1.00
# Simulates global GroupBy by creating global key
- type: MapToFields
name: Add global key
config:
append: true
language: python
fields:
global_key: '0'
- type: Combine
name: Get multiple aggregations per key
config:
language: python
group_by: global_key
combine:
min_price:
value: unit_price
fn: min
mean_price:
value: unit_price
fn: mean
max_price:
value: unit_price
fn: max
# Removes unwanted global key
- type: MapToFields
name: Remove global key
config:
fields:
min_price: min_price
mean_price: mean_price
max_price: max_price
- type: LogForTesting
# Expected:
# Row(min_price=1.0, mean_price=2.5, max_price=4.0)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- recipe: 'pie'
fruit: 'raspberry'
quantity: 1
unit_price: 3.50
- recipe: 'pie'
fruit: 'blackberry'
quantity: 1
unit_price: 4.00
- recipe: 'pie'
fruit: 'blueberry'
quantity: 1
unit_price: 2.00
- recipe: 'muffin'
fruit: 'blueberry'
quantity: 2
unit_price: 3.00
- recipe: 'muffin'
fruit: 'banana'
quantity: 3
unit_price: 1.00
- type: Combine
name: Sum values per key
config:
language: python
group_by: fruit
combine:
total_quantity:
value: quantity
fn: sum
mean_price:
value: unit_price
fn: mean
- type: LogForTesting
# Expected:
# Row(fruit='raspberry', total_quantity=1, mean_price=3.5)
# Row(fruit='blackberry', total_quantity=1, mean_price=4.0)
# Row(fruit='blueberry', total_quantity=3, mean_price=2.5)
# Row(fruit='banana', total_quantity=3, mean_price=1.0)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- produce: '๐ฅ'
amount: 3
- produce: '๐ฅ'
amount: 2
- produce: '๐'
amount: 1
- produce: '๐
'
amount: 4
- produce: '๐
'
amount: 5
- produce: '๐
'
amount: 3
- type: Combine
name: Count elements per key
config:
language: python
group_by: produce
combine:
amount: sum
- type: LogForTesting
# Expected:
# Row(produce='๐ฅ', amount=5)
# Row(produce='๐', amount=1)
# Row(produce='๐
', amount=12)
This is an example that illustrates how to use session windows and then extract windowing information for further processing.
pipeline:
type: chain
transforms:
# Create some fake data.
- type: Create
name: CreateVisits
config:
elements:
- user: alice
timestamp: 1
- user: alice
timestamp: 3
- user: bob
timestamp: 7
- user: bob
timestamp: 12
- user: bob
timestamp: 20
- user: alice
timestamp: 101
- user: alice
timestamp: 109
- user: alice
timestamp: 115
# Use the timestamp field as the element timestamp.
# (Typically this would be assigned by the source.)
- type: AssignTimestamps
config:
timestamp: timestamp
# Group the data by user for each session window count the number of events
# in each per session.
# See https://beam.apache.org/documentation/programming-guide/#session-windows
- type: Combine
name: SumVisitsPerUser
config:
language: python
group_by: user
combine:
visits:
value: user
fn: count
windowing:
type: sessions
gap: 10s
# Extract the implicit Beam windowing data (including what the final
# merged session values were) into explicit fields of our rows.
- type: ExtractWindowingInfo
config:
fields: [window_start, window_end, window_string]
# Drop "short" sessions (in this case, Alice's first two visits.)
- type: Filter
config:
language: python
keep: window_end - window_start > 15
# Only keep a couple of fields.
- type: MapToFields
config:
fields:
user: user
window_string: window_string
- type: LogForTesting
# Expected:
# Row(user='bob', window_string='[7.0, 30.0)')
# Row(user='alice', window_string='[101.0, 125.0)')
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- season: 'spring'
produce: '๐'
- season: 'spring'
produce: '๐ฅ'
- season: 'spring'
produce: '๐'
- season: 'spring'
produce: '๐
'
- season: 'summer'
produce: '๐ฅ'
- season: 'summer'
produce: '๐
'
- season: 'summer'
produce: '๐ฝ'
- season: 'fall'
produce: '๐ฅ'
- season: 'fall'
produce: '๐
'
- season: 'winter'
produce: '๐'
- type: Combine
name: Group into batches
config:
language: python
group_by: season
combine:
produce:
value: produce
fn:
type: 'apache_beam.transforms.combiners.TopCombineFn'
config:
n: 3
- type: LogForTesting
# Expected:
# Row(season='spring', produce=['๐ฅ', '๐', '๐'])
# Row(season='summer', produce=['๐ฅ', '๐
', '๐ฝ'])
# Row(season='fall', produce=['๐ฅ', '๐
'])
# Row(season='winter', produce=['๐'])
This examples reads from a public file stored on Google Cloud. This
requires authenticating with Google Cloud, or setting the file in
ReadFromText
to a local file.
To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc.
The following example reads mock transaction data from resources/products.csv, performs a simple filter for "Electronics", then calculates the revenue and number of products sold for each product type.
pipeline:
transforms:
- type: ReadFromCsv
name: ReadInputFile
config:
path: gs://apache-beam-samples/beam-yaml-blog/products.csv
- type: Filter
name: FilterWithCategory
input: ReadInputFile
config:
language: python
keep: category == "Electronics"
- type: Combine
name: CountNumberSold
input: FilterWithCategory
config:
group_by: product_name
combine:
num_sold:
value: product_name
fn: count
total_revenue:
value: price
fn: sum
- type: WriteToCsv
name: WriteOutputFile
input: CountNumberSold
config:
path: output
# Expected:
# Row(product_name='Headphones', num_sold=2, total_revenue=119.98)
# Row(product_name='Monitor', num_sold=1, total_revenue=249.99)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- produce: '๐ฅ'
amount: 3
- produce: '๐ฅ'
amount: 2
- produce: '๐'
amount: 1
- produce: '๐
'
amount: 4
- produce: '๐
'
amount: 5
- produce: '๐
'
amount: 3
- type: Combine
config:
language: python
group_by: produce
combine:
biggest:
value: amount
fn:
type: 'apache_beam.transforms.combiners.TopCombineFn'
config:
n: 2
- type: LogForTesting
# Expected:
# Row(produce='๐ฅ', biggest=[3, 2])
# Row(produce='๐', biggest=[1])
# Row(produce='๐
', biggest=[5, 4])
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- produce: '๐ฅ'
amount: 3
- produce: '๐ฅ'
amount: 2
- produce: '๐'
amount: 1
- produce: '๐
'
amount: 4
- produce: '๐
'
amount: 5
- produce: '๐
'
amount: 3
- type: Combine
name: Smallest N values per key
config:
language: python
group_by: produce
combine:
smallest:
value: amount
fn:
type: 'apache_beam.transforms.combiners.TopCombineFn'
config:
n: 2
reverse: true
- type: LogForTesting
# Expected:
# Row(produce='๐ฅ', smallest=[2, 3])
# Row(produce='๐', smallest=[1])
# Row(produce='๐
', smallest=[3, 4])
This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro
pipeline:
type: chain
transforms:
# Step 1: Reading data from GCS
- type: ReadFromText
name: ReadFromGCS
config:
path: gs://dataflow-samples/shakespeare/kinglear.txt
# Step 2: Write records out to BigQuery
- type: WriteToBigQuery
name: WriteWords
config:
table: "apache-beam-testing.yaml_test.words"
create_disposition: "CREATE_NEVER"
write_disposition: "WRITE_APPEND"
num_streams: 1
# Expected:
# Row(line='Fool\tThou shouldst not have been old till thou hadst')
# Row(line='\tbeen wise.')
# Row(line='KING LEAR\tNothing will come of nothing: speak again.')
# Row(line='\tNever, never, never, never, never!')
This is an example of a Beam YAML pipeline that reads from jdbc database and writes them to BigQuery. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-bigquery
pipeline:
type: composite
transforms:
# Step 1: Reading shipment data from jdbc DB
- type: ReadFromJdbc
name: ReadShipments
config:
url: "jdbc:mysql://my-host:3306/shipment"
driver_class_name: "org.sqlite.JDBC"
query: "SELECT * FROM shipments"
# Step 2: Write successful records out to BigQuery
- type: WriteToBigQuery
name: WriteShipments
input: ReadShipments
config:
table: "apache-beam-testing.yaml_test.shipments"
create_disposition: "CREATE_NEVER"
write_disposition: "WRITE_APPEND"
error_handling:
output: "deadLetterQueue"
num_streams: 1
# Step 3: Write the failed messages to BQ to a dead letter queue JSON file
- type: WriteToJson
input: WriteShipments.deadLetterQueue
config:
path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"
# Expected:
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
# Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
# Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
A pipeline that both writes to and reads from the same Kafka topic.
pipeline:
type: chain
transforms:
# Step 1: Reading data from Kafka
- type: ReadFromKafka
name: ReadFromMyTopic
config:
format: "RAW"
topic: "{{ TOPIC }}"
bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
auto_offset_reset_config: earliest
consumer_config:
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
username={{ USERNAME }} \
password={{ PASSWORD }};"
security.protocol: "SASL_PLAINTEXT"
sasl.mechanism: "PLAIN"
# Step 2: Convert Kafka records
- type: MapToFields
name: ParseKafkaRecords
config:
language: python
fields:
text:
callable: |
def func(row):
# Kafka RAW format reads messages as bytes
# in the 'payload' field of a Row
return row.payload.decode('utf-8')
# Step 3: Write records out to Iceberg
- type: WriteToIceberg
name: WriteToAnIcebergTable
config:
# Dynamic destinations
table: "db.users.{zip}"
catalog_name: "hadoop_catalog"
catalog_properties:
type: "hadoop"
warehouse: "gs://MY-WAREHOUSE"
# Hadoop catalog config required to run pipeline locally
# Omit if running on Dataflow
config_properties:
"fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
"fs.gs.auth.service.account.json.keyfile": "/path/to/service/account/key.json"
options:
streaming: true
# Expected:
# Row(text='Fool\tThou shouldst not have been old till thou hadst')
# Row(text='\tbeen wise.')
# Row(text='KING LEAR\tNothing will come of nothing: speak again.')
# Row(text='\tNever, never, never, never, never!')
This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro
pipeline:
type: composite
transforms:
# Step 1: Reading data from MySql
- type: ReadFromMySql
name: ReadFromMySql
config:
url: "jdbc:mysql://localhost:12345/shipment?user=orange&password=orange123"
query: "SELECT * FROM shipments"
driver_class_name: "com.mysql.cj.jdbc.Driver"
# Step 2: Write records out to BigQuery
- type: WriteToBigQuery
name: WriteShipments
input: ReadFromMySql
config:
table: "apache-beam-testing.yaml_test.shipments"
create_disposition: "CREATE_NEVER"
write_disposition: "WRITE_APPEND"
error_handling:
output: "deadLetterQueue"
num_streams: 1
# Step 3: Write the failed messages to BQ to a dead letter queue JSON file
- type: WriteToJson
input: WriteShipments.deadLetterQueue
config:
path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"
# Expected:
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
# Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
# Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro
pipeline:
type: composite
transforms:
# Step 1: Reading data from Oracle
- type: ReadFromOracle
name: ReadFromOracle
config:
url: "jdbc:oracle:thin:system/oracle@localhost:12345/shipment"
query: "SELECT * FROM shipments"
driver_class_name: "oracle.jdbc.OracleDriver"
# Step 2: Write records out to BigQuery
- type: WriteToBigQuery
name: WriteShipments
input: ReadFromOracle
config:
table: "apache-beam-testing.yaml_test.shipments"
create_disposition: "CREATE_NEVER"
write_disposition: "WRITE_APPEND"
error_handling:
output: "deadLetterQueue"
num_streams: 1
# Step 3: Write the failed messages to BQ to a dead letter queue JSON file
- type: WriteToJson
input: WriteShipments.deadLetterQueue
config:
path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"
# Expected:
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
# Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
# Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro
pipeline:
type: composite
transforms:
# Step 1: Reading data from Postgres
- type: ReadFromPostgres
name: ReadFromPostgres
config:
url: "jdbc:postgresql://localhost:12345/shipment?user=user&password=postgres123"
query: "SELECT * FROM shipments"
driver_class_name: "org.postgresql.Driver"
# Step 2: Write records out to BigQuery
- type: WriteToBigQuery
name: WriteShipments
input: ReadFromPostgres
config:
table: "apache-beam-testing.yaml_test.shipments"
create_disposition: "CREATE_NEVER"
write_disposition: "WRITE_APPEND"
error_handling:
output: "deadLetterQueue"
num_streams: 1
# Step 3: Write the failed messages to BQ to a dead letter queue JSON file
- type: WriteToJson
input: WriteShipments.deadLetterQueue
config:
path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"
# Expected:
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
# Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
# Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
This is an example of a Beam YAML pipeline that reads messages from Pub/Sub and writes them to BigQuery. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-subscription-to-bigquery
pipeline:
type: composite
transforms:
# Step 1: Reading messages from Pub/Sub subscription
- type: ReadFromPubSub
name: ReadMessages
config:
subscription: "projects/apache-beam-testing/subscription/my-subscription"
format: JSON
schema:
type: object
properties:
data: {type: BYTES}
attributes: {type: object}
# Step 2: Write successful records out to BigQuery
- type: WriteToBigQuery
name: WriteMessages
input: ReadMessages
config:
table: "apache-beam-testing.yaml_test.order_data"
create_disposition: "CREATE_NEVER"
write_disposition: "WRITE_APPEND"
error_handling:
output: "deadLetterQueue"
num_streams: 1
# Step 3: Write the failed messages to BQ to a JSON file
- type: WriteToJson
input: WriteMessages.deadLetterQueue
config:
path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"
options:
streaming: true
# Expected:
# Row(label='37a', rank=1)
# Row(label='37b', rank=4)
# Row(label='37c', rank=3)
# Row(label='37d', rank=2)
A pipeline that both writes to and reads from the same Kafka topic.
pipeline:
type: chain
transforms:
# Step 1: Reading messages from Pub/Sub topic
- type: ReadFromPubSub
name: ReadMessages
config:
topic: "projects/apache-beam-testing/topics/my-topic"
format: JSON
schema:
type: object
properties:
data: {type: BYTES}
attributes: {type: object}
# Step 2: Write records out to Iceberg
- type: WriteToIceberg
name: WriteToAnIcebergTable
config:
# Dynamic destinations
table: "db.users.{zip}"
catalog_name: "hadoop_catalog"
catalog_properties:
type: "hadoop"
warehouse: "gs://MY-WAREHOUSE"
# Hadoop catalog config required to run pipeline locally
# Omit if running on Dataflow
config_properties:
"fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
"fs.gs.auth.service.account.json.keyfile": "/path/to/service/account/key.json"
options:
streaming: true
# Expected:
# Row(label='37a', rank=1)
# Row(label='37b', rank=4)
# Row(label='37c', rank=3)
# Row(label='37d', rank=2)
This is an example of a Beam YAML pipeline that reads messages from Pub/Sub and writes them to BigQuery. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery
pipeline:
type: composite
transforms:
# Step 1: Reading messages from Pub/Sub topic
- type: ReadFromPubSub
name: ReadMessages
config:
topic: "projects/apache-beam-testing/topics/my-topic"
format: JSON
schema:
type: object
properties:
data: {type: BYTES}
attributes: {type: object}
# Step 2: Write successful records out to BigQuery
- type: WriteToBigQuery
name: WriteMessages
input: ReadMessages
config:
table: "apache-beam-testing.yaml_test.order_data"
create_disposition: "CREATE_NEVER"
write_disposition: "WRITE_APPEND"
error_handling:
output: "deadLetterQueue"
num_streams: 1
# Step 3: Write the failed messages to BQ to a JSON file
- type: WriteToJson
input: WriteMessages.deadLetterQueue
config:
path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"
options:
streaming: true
# Expected:
# Row(label='37a', rank=1)
# Row(label='37b', rank=4)
# Row(label='37c', rank=3)
# Row(label='37d', rank=2)
This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro
pipeline:
type: chain
transforms:
# Step 1: Reading shipment data from Spanner DB
- type: ReadFromSpanner
name: ReadShipments
config:
project_id: 'apache-beam-testing'
instance_id: 'shipment-test'
database_id: 'shipment'
query: 'SELECT * FROM shipments'
# Step 2: Write successful records out to GCS avro files
- type: WriteToAvro
config:
path: "gs://my-bucket/yaml-123/out.avro"
# Expected:
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
# Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
# Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro
pipeline:
type: chain
transforms:
# Step 1: Reading data from Spanner
- type: ReadFromSpanner
name: ReadShipments
config:
project_id: 'apache-beam-testing'
instance_id: 'shipment-test'
database_id: 'shipment'
query: 'SELECT * FROM shipments'
# Step 2: Write records out to BigQuery
- type: WriteToBigQuery
name: WriteShipments
config:
table: "apache-beam-testing.yaml_test.shipments"
create_disposition: "CREATE_NEVER"
write_disposition: "WRITE_APPEND"
num_streams: 1
# Expected:
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
# Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
# Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
This is an example of a Beam YAML pipeline that reads from spanner database and writes to GCS avro files. This matches the Dataflow Template located here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro
pipeline:
type: composite
transforms:
# Step 1: Reading data from SqlServer
- type: ReadFromSqlServer
name: ReadFromSqlServer
config:
url: "jdbc:sqlserver://localhost:12345;databaseName=shipment;user=apple;password=apple123;encrypt=false;trustServerCertificate=true"
query: "SELECT * FROM shipments"
driver_class_name: "com.microsoft.sqlserver.jdbc.SQLServerDriver"
# Step 2: Write records out to BigQuery
- type: WriteToBigQuery
name: WriteShipments
input: ReadFromSqlServer
config:
table: "apache-beam-testing.yaml_test.shipments"
create_disposition: "CREATE_NEVER"
write_disposition: "WRITE_APPEND"
error_handling:
output: "deadLetterQueue"
num_streams: 1
# Step 3: Write the failed messages to BQ to a dead letter queue JSON file
- type: WriteToJson
input: WriteShipments.deadLetterQueue
config:
path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"
# Expected:
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='bob@example.com')
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com')
# Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
# Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
# Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='derek@example.com')
pipeline:
type: chain
transforms:
- type: Create
name: Gardening plants
config:
elements:
- produce: ['๐Strawberry', '๐ฅPotato']
season: 'spring'
- produce: ['๐ฅCarrot', '๐Eggplant', '๐
Tomato']
season: 'summer'
- type: Explode
name: Flatten lists
config:
fields: [produce]
- type: LogForTesting
# Expected:
# Row(produce='๐Strawberry', season='spring')
# Row(produce='๐ฅCarrot', season='summer')
# Row(produce='๐Eggplant', season='summer')
# Row(produce='๐
Tomato', season='summer')
# Row(produce='๐ฅPotato', season='spring')
pipeline:
type: chain
transforms:
- type: Create
name: Gardening plants
config:
elements:
- 'icon': '๐'
'name': 'Strawberry'
'duration': 'perennial'
- 'icon': '๐ฅ'
'name': 'Carrot'
'duration': 'biennial'
- 'icon': '๐'
'name': 'Eggplant'
'duration': 'perennial'
- 'icon': '๐
'
'name': 'Tomato'
'duration': 'annual'
- 'icon': '๐ฅ'
'name': 'Potato'
'duration': 'perennial'
- type: Filter
name: Filter perennials
config:
language: python
keep:
callable: "lambda plant: plant.duration == 'perennial'"
- type: LogForTesting
# Expected:
# Row(icon='๐', name='Strawberry', duration='perennial')
# Row(icon='๐', name='Eggplant', duration='perennial')
# Row(icon='๐ฅ', name='Potato', duration='perennial')
pipeline:
type: chain
transforms:
- type: Create
name: Gardening plants
config:
elements:
- '# ๐Strawberry'
- '# ๐ฅCarrot'
- '# ๐Eggplant'
- '# ๐
Tomato'
- '# ๐ฅPotato'
- type: MapToFields
name: Strip header
config:
language: python
fields:
element:
callable: "lambda row: row.element.strip('# \\n')"
- type: LogForTesting
# Expected:
# Row(element='๐Strawberry')
# Row(element='๐ฅCarrot')
# Row(element='๐Eggplant')
# Row(element='๐
Tomato')
# Row(element='๐ฅPotato')
pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {sdk: MapReduce, year: 2004}
- {sdk: MillWheel, year: 2008}
- {sdk: Flume, year: 2010}
- {sdk: Dataflow, year: 2014}
- {sdk: Apache Beam, year: 2016}
- type: MapToFields
name: ToRoman
config:
language: python
fields:
tool_name: sdk
year:
callable: |
import roman
def convert(row):
return roman.toRoman(row.year)
dependencies:
- 'roman>=4.2'
- type: LogForTesting
# Expected:
# Row(tool_name='MapReduce', year='MMIV')
# Row(tool_name='MillWheel', year='MMVIII')
# Row(tool_name='Flume', year='MMX')
# Row(tool_name='Dataflow', year='MMXIV')
# Row(tool_name='Apache Beam', year='MMXVI')
pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {sdk: MapReduce, year: 2004}
- {sdk: MillWheel, year: 2008}
- {sdk: Flume, year: 2010}
- {sdk: Dataflow, year: 2014}
- {sdk: Apache Beam, year: 2016}
- type: MapToFields
name: ToRoman
config:
language: java
fields:
tool_name: sdk
year:
callable: |
import org.apache.beam.sdk.values.Row;
import java.util.function.Function;
import com.github.chaosfirebolt.converter.RomanInteger;
public class MyFunction implements Function<Row, String> {
public String apply(Row row) {
return RomanInteger.parse(
String.valueOf(row.getInt64("year"))).toString();
}
}
dependencies:
- 'com.github.chaosfirebolt.converter:roman-numeral-converter:2.1.0'
- type: LogForTesting
# Expected:
# Row(tool_name='MapReduce', year='MMIV')
# Row(tool_name='MillWheel', year='MMVIII')
# Row(tool_name='Flume', year='MMX')
# Row(tool_name='Dataflow', year='MMXIV')
# Row(tool_name='Apache Beam', year='MMXVI')
pipeline:
type: chain
transforms:
- type: Create
name: Gardening plants
config:
elements:
- '# ๐Strawberry'
- '# ๐ฅCarrot'
- '# ๐Eggplant'
- '# ๐
Tomato'
- '# ๐ฅPotato'
- type: MapToFields
name: Strip header
config:
language: python
fields:
element: "element.strip('# \\n')"
# Transform-specific hint overrides top-level hint.
resource_hints:
min_ram: 3GB
- type: LogForTesting
# Top-level resource hint.
resource_hints:
min_ram: 1GB
# Expected:
# Row(element='๐Strawberry')
# Row(element='๐ฅCarrot')
# Row(element='๐Eggplant')
# Row(element='๐
Tomato')
# Row(element='๐ฅPotato')
This pipline creates a series of {plant: description} key pairs, matches all elements to a valid regex, filters out non-matching entries, then logs the output.
pipeline:
type: chain
transforms:
- type: Create
name: Garden plants
config:
elements:
- plant: '๐, Strawberry, perennial'
- plant: '๐ฅ, Carrot, biennial ignoring trailing words'
- plant: '๐, Eggplant, perennial'
- plant: '๐
, Tomato, annual'
- plant: '๐ฅ, Potato, perennial'
- plant: '# ๐, invalid, format'
- plant: 'invalid, ๐, format'
- type: MapToFields
name: Parse plants
config:
language: python
fields:
plant:
callable: |
import re
def regex_filter(row):
match = re.match("(?P<icon>[^\s,]+), *(\w+), *(\w+)", row.plant)
return match.group(0) if match else match
# Filters out None values produced by values that don't match regex
- type: Filter
config:
language: python
keep: plant
- type: LogForTesting
# Expected:
# Row(plant='๐, Strawberry, perennial')
# Row(plant='๐ฅ, Carrot, biennial')
# Row(plant='๐, Eggplant, perennial')
# Row(plant='๐
, Tomato, annual')
# Row(plant='๐ฅ, Potato, perennial')
This examples reads from a public file stored on Google Cloud. This
requires authenticating with Google Cloud, or setting the file in
ReadFromText
to a local file.
To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc.
The following example reads mock transaction data from resources/products.csv then performs a simple filter for "Electronics".
pipeline:
transforms:
- type: ReadFromCsv
name: ReadInputFile
config:
path: gs://apache-beam-samples/beam-yaml-blog/products.csv
- type: Filter
name: FilterWithCategory
input: ReadInputFile
config:
language: python
keep: category == "Electronics"
- type: WriteToCsv
name: WriteOutputFile
input: FilterWithCategory
config:
path: output
# Expected:
# Row(transaction_id='T0012', product_name='Headphones', category='Electronics', price=59.99)
# Row(transaction_id='T0104', product_name='Headphones', category='Electronics', price=59.99)
# Row(transaction_id='T0302', product_name='Monitor', category='Electronics', price=249.99)
The pipeline reads from Iceberg table 'db.users.NY' on GCS with Hadoop catalog configured. The table, if not exists already, can be created and populated using the iceberg_write.yaml pipeline.
Replace 'gs://MY-WAREHOUSE' with the correct GCS bucket name. If this example is run locally then replace '/path/to/service/account/key.json' with the correct path to your service account key .json file on your machine. Otherwise, if Dataflow runner is used then omit the 'config_properties' field.
pipeline:
type: chain
transforms:
- type: ReadFromIceberg
name: ReadFromAnIcebergTable
config:
table: "db.users.NY"
catalog_name: "hadoop_catalog"
catalog_properties:
type: "hadoop"
warehouse: "gs://MY-WAREHOUSE"
# Hadoop catalog config required to run pipeline locally
# Omit if running on Dataflow
config_properties:
"fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
"fs.gs.auth.service.account.json.keyfile": "/path/to/service/account/key.json"
- type: LogForTesting
- type: WriteToCsv
name: OutputToCSVFile
config:
path: "gs://MY-WAREHOUSE/my-csv.csv"
# Expected:
# Row(id=3, name='Smith', email='smith@example.com', zip='NY')
# Row(id=4, name='Beamberg', email='beamberg@example.com', zip='NY')
The pipeline uses Dynamic destinations (see https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations) to dynamically create and select a table destination based on field values in the incoming records.
Replace 'gs://MY-WAREHOUSE' with the correct GCS bucket name. If this example is run locally then replace '/path/to/service/account/key.json' with the correct path to your service account key .json file on your machine. Otherwise, if Dataflow runner is used then omit the 'config_properties' field.
pipeline:
type: chain
transforms:
- type: Create
name: CreateSampleData
config:
elements:
- { id: 1, name: "John", email: "john@example.com", zip: "WA" }
- { id: 2, name: "Jane", email: "jane@example.com", zip: "CA" }
- { id: 3, name: "Smith", email: "smith@example.com",zip: "NY"}
- { id: 4, name: "Beamberg", email: "beamberg@example.com", zip: "NY" }
- type: LogForTesting
- type: WriteToIceberg
name: WriteToAnIcebergTable
config:
# Dynamic destinations
table: "db.users.{zip}"
catalog_name: "hadoop_catalog"
catalog_properties:
type: "hadoop"
warehouse: "gs://MY-WAREHOUSE"
# Hadoop catalog config required to run pipeline locally
# Omit if running on Dataflow
config_properties:
"fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
"fs.gs.auth.service.account.json.keyfile": "/path/to/service/account/key.json"
# Expected:
# Row(id=1, name='John', email='john@example.com', zip='WA')
# Row(id=2, name='Jane', email='jane@example.com', zip='CA')
# Row(id=3, name='Smith', email='smith@example.com', zip='NY')
# Row(id=4, name='Beamberg', email='beamberg@example.com', zip='NY')
A pipeline that both writes to and reads from the same Kafka topic.
pipeline:
transforms:
- type: ReadFromText
name: ReadFromGCS
config:
path: gs://dataflow-samples/shakespeare/kinglear.txt
- type: MapToFields
name: BuildKafkaRecords
input: ReadFromGCS
config:
language: python
fields:
value:
callable: |
def func(row):
return row.line.encode('utf-8')
output_type: bytes
- type: WriteToKafka
name: SendRecordsToKafka
input: BuildKafkaRecords
config:
format: "RAW"
topic: "{{ TOPIC }}"
bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
producer_config_updates:
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
username={{ USERNAME }} \
password={{ PASSWORD }};"
security.protocol: "SASL_PLAINTEXT"
sasl.mechanism: "PLAIN"
- type: ReadFromKafka
name: ReadFromMyTopic
config:
format: "RAW"
topic: "{{ TOPIC }}"
bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
auto_offset_reset_config: earliest
consumer_config:
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
username={{ USERNAME }} \
password={{ PASSWORD }};"
security.protocol: "SASL_PLAINTEXT"
sasl.mechanism: "PLAIN"
- type: MapToFields
name: ParseKafkaRecords
input: ReadFromMyTopic
config:
language: python
fields:
text:
callable: |
def func(row):
# Kafka RAW format reads messages as bytes
# in the 'payload' field of a Row
return row.payload.decode('utf-8')
- type: LogForTesting
input: ParseKafkaRecords
# Since the pipeline both writes to and reads from a Kafka topic, we expect
# the first pipeline component to write the rows containing the `value`
# field as bytes to Kafka, and the second pipeline component to read the byte
# messages from Kafka before parsing them as string in the new `text` field.
# Expected:
# Row(value=b'Fool\tThou shouldst not have been old till thou hadst')
# Row(value=b'\tbeen wise.')
# Row(value=b'KING LEAR\tNothing will come of nothing: speak again.')
# Row(value=b'\tNever, never, never, never, never!')
# Row(text='Fool\tThou shouldst not have been old till thou hadst')
# Row(text='\tbeen wise.')
# Row(text='KING LEAR\tNothing will come of nothing: speak again.')
# Row(text='\tNever, never, never, never, never!')
pipeline:
transforms:
# Reading data from a Spanner database. The table used here has the following columns:
# shipment_id (String), customer_id (String), shipment_date (String), shipment_cost (Float64), customer_name (String), customer_email (String)
# ReadFromSpanner transform is called using project_id, instance_id, database_id and a query
# A table with a list of columns can also be specified instead of a query
- type: ReadFromSpanner
name: ReadShipments
config:
project_id: 'apache-beam-testing'
instance_id: 'shipment-test'
database_id: 'shipment'
query: 'SELECT * FROM shipments'
# Filtering the data based on a specific condition
# Here, the condition is used to keep only the rows where the customer_id is 'C1'
- type: Filter
name: FilterShipments
input: ReadShipments
config:
language: python
keep: "customer_id == 'C1'"
# Mapping the data fields and applying transformations
# A new field 'shipment_cost_category' is added with a custom transformation
# A callable is defined to categorize shipment cost
- type: MapToFields
name: MapFieldsForSpanner
input: FilterShipments
config:
language: python
fields:
shipment_id: shipment_id
customer_id: customer_id
shipment_date: shipment_date
shipment_cost: shipment_cost
customer_name: customer_name
customer_email: customer_email
shipment_cost_category:
callable: |
def categorize_cost(row):
cost = float(row[3])
if cost < 50:
return 'Low Cost'
elif cost < 200:
return 'Medium Cost'
else:
return 'High Cost'
# Writing the transformed data to a CSV file
- type: WriteToCsv
name: WriteBig
input: MapFieldsForSpanner
config:
path: shipments.csv
# On executing the above pipeline, a new CSV file is created with the following records
# Expected:
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com', shipment_cost_category='Medium Cost')
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com', shipment_cost_category='Low Cost')
pipeline:
transforms:
# Step 1: Creating rows to be written to Spanner
# The element names correspond to the column names in the Spanner table
- type: Create
name: CreateRows
config:
elements:
- shipment_id: "S5"
customer_id: "C5"
shipment_date: "2023-05-09"
shipment_cost: 300.0
customer_name: "Erin"
customer_email: "erin@example.com"
# Step 2: Writing the created rows to a Spanner database
# We require the project ID, instance ID, database ID and table ID to connect to Spanner
# Error handling can be specified optionally to ensure any failed operations aren't lost
# The failed data is passed on in the pipeline and can be handled
- type: WriteToSpanner
name: WriteSpanner
input: CreateRows
config:
project_id: 'apache-beam-testing'
instance_id: 'shipment-test'
database_id: 'shipment'
table_id: 'shipments'
error_handling:
output: my_error_output
# Step 3: Writing the failed records to a JSON file
- type: WriteToJson
input: WriteSpanner.my_error_output
config:
path: errors.json
# Expected:
# Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='erin@example.com')
pipeline:
type: chain
transforms:
# Step 1: Creating a collection of elements that needs
# to be enriched. Here we are simulating sales data
- type: Create
config:
elements:
- sale_id: 1
customer_id: 1
product_id: 1
quantity: 1
# Step 2: Enriching the data with Bigtable
# This specific bigtable stores product data in the below format
# product:product_id, product:product_name, product:product_stock
- type: Enrichment
config:
enrichment_handler: 'BigTable'
handler_config:
project_id: 'apache-beam-testing'
instance_id: 'beam-test'
table_id: 'bigtable-enrichment-test'
row_key: 'product_id'
timeout: 30
# Step 3: Logging for testing
# This is a simple way to view the enriched data
# We can also store it somewhere like a json file
- type: LogForTesting
options:
yaml_experimental_features: Enrichment
# Expected:
# Row(sale_id=1, customer_id=1, product_id=1, quantity=1, product={'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2'})
pipeline:
transforms:
# Step 1: Read orders details from Spanner
- type: ReadFromSpanner
name: ReadOrders
config:
project_id: 'apache-beam-testing'
instance_id: 'orders-test'
database_id: 'order-database'
query: 'SELECT customer_id, product_id, order_date, order_amount FROM orders'
# Step 2: Enrich order details with customers details from BigQuery
- type: Enrichment
name: Enriched
input: ReadOrders
config:
enrichment_handler: 'BigQuery'
handler_config:
project: "apache-beam-testing"
table_name: "apache-beam-testing.ALL_TEST.customers"
row_restriction_template: "customer_id = 1001 or customer_id = 1003"
fields: ["customer_id"]
# Step 3: Map enriched values to Beam schema
# TODO: This should be removed when schema'd enrichment is available
- type: MapToFields
name: MapEnrichedValues
input: Enriched
config:
language: python
fields:
customer_id:
callable: 'lambda x: x.customer_id'
output_type: integer
customer_name:
callable: 'lambda x: x.customer_name'
output_type: string
customer_email:
callable: 'lambda x: x.customer_email'
output_type: string
product_id:
callable: 'lambda x: x.product_id'
output_type: integer
order_date:
callable: 'lambda x: x.order_date'
output_type: string
order_amount:
callable: 'lambda x: x.order_amount'
output_type: integer
# Step 4: Filter orders with amount greater than 110
- type: Filter
name: FilterHighValueOrders
input: MapEnrichedValues
config:
keep: "order_amount > 110"
language: "python"
# Step 6: Write processed order to another spanner table
# Note: Make sure to replace $VARS with your values.
- type: WriteToSpanner
name: WriteProcessedOrders
input: FilterHighValueOrders
config:
project_id: '$PROJECT'
instance_id: '$INSTANCE'
database_id: '$DATABASE'
table_id: '$TABLE'
error_handling:
output: my_error_output
# Step 7: Handle write errors by writing to JSON
- type: WriteToJson
name: WriteErrorsToJson
input: WriteProcessedOrders.my_error_output
config:
path: 'errors.json'
options:
yaml_experimental_features: Enrichment
# Expected:
# Row(customer_id=1001, customer_name='Alice', customer_email='alice@gmail.com', product_id=2001, order_date='24-03-24', order_amount=150)
The pipeline first reads the YouTube comments .csv dataset from GCS bucket and performs necessary clean-up before writing it to a Kafka topic. The pipeline then reads from that Kafka topic and applies various transformation logic before RunInference transform performs remote inference with the Vertex AI model handler. The inference result is then written to a BigQuery table.
pipeline:
transforms:
# The YouTube comments dataset contains rows that
# have unexpected schema (e.g. rows with more fields,
# rows with fields that contain string instead of
# integer, etc...). PyTransform helps construct
# the logic to properly read in the csv dataset as
# a schema'd PCollection.
- type: PyTransform
name: ReadFromGCS
input: {}
config:
constructor: __callable__
kwargs:
source: |
def ReadYoutubeCommentsCsv(pcoll, file_pattern):
def _to_int(x):
try:
return int(x)
except (ValueError):
return None
return (
pcoll
| beam.io.ReadFromCsv(
file_pattern,
names=['video_id', 'comment_text', 'likes', 'replies'],
on_bad_lines='skip',
converters={'likes': _to_int, 'replies': _to_int})
| beam.Filter(lambda row:
None not in list(row._asdict().values()))
| beam.Map(lambda row: beam.Row(
video_id=row.video_id,
comment_text=row.comment_text,
likes=int(row.likes),
replies=int(row.replies)))
)
file_pattern: "{{ GCS_PATH }}"
# Send the rows as Kafka records to an existing
# Kafka topic.
- type: WriteToKafka
name: SendRecordsToKafka
input: ReadFromGCS
config:
format: "JSON"
topic: "{{ TOPIC }}"
bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
producer_config_updates:
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
username={{ USERNAME }} \
password={{ PASSWORD }};"
security.protocol: "SASL_PLAINTEXT"
sasl.mechanism: "PLAIN"
# Read Kafka records from an existing Kafka topic.
- type: ReadFromKafka
name: ReadFromMyTopic
config:
format: "JSON"
schema: |
{
"type": "object",
"properties": {
"video_id": { "type": "string" },
"comment_text": { "type": "string" },
"likes": { "type": "integer" },
"replies": { "type": "integer" }
}
}
topic: "{{ TOPIC }}"
bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
auto_offset_reset_config: earliest
consumer_config:
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
username={{ USERNAME }} \
password={{ PASSWORD }};"
security.protocol: "SASL_PLAINTEXT"
sasl.mechanism: "PLAIN"
# Remove unexpected characters from the YouTube
# comment string, e.g. emojis, ascii characters
# outside the common day-to-day English.
- type: MapToFields
name: RemoveWeirdCharacters
input: ReadFromMyTopic
config:
language: python
fields:
video_id: video_id
comment_text:
callable: |
import re
def filter(row):
# regex match and keep letters, digits, whitespace and common punctuations,
# i.e. remove non printable ASCII characters (character codes not in
# the range 32 - 126, or \x20 - \x7E).
return re.sub(r'[^\x20-\x7E]', '', row.comment_text).strip()
likes: likes
replies: replies
# Remove rows that have empty comment text
# after previously removing unexpected characters.
- type: Filter
name: FilterForProperComments
input: RemoveWeirdCharacters
config:
language: python
keep:
callable: |
def filter(row):
return len(row.comment_text) > 0
# HuggingFace's distilbert-base-uncased is used for inference,
# which accepts string with a maximum limit of 250 tokens.
# Some of the comment strings can be large and are well over
# this limit after tokenization.
# This transform truncates the comment string and ensure
# every comment satisfy the maximum token limit.
- type: MapToFields
name: Truncating
input: FilterForProperComments
config:
language: python
dependencies:
- 'transformers>=4.48.0,<4.49.0'
fields:
video_id: video_id
comment_text:
callable: |
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True)
def truncate_sentence(row):
tokens = tokenizer.tokenize(row.comment_text)
if len(tokens) >= 250:
tokens = tokens[:250]
truncated_sentence = tokenizer.convert_tokens_to_string(tokens)
else:
truncated_sentence = row.comment_text
return truncated_sentence
likes: likes
replies: replies
# HuggingFace's distilbert-base-uncased does not distinguish
# between upper and lower case tokens.
# This pipeline makes the same point by converting all words
# into lowercase.
- type: MapToFields
name: LowerCase
input: Truncating
config:
language: python
fields:
video_id: video_id
comment_text: "comment_text.lower()"
likes: likes
replies: replies
# With VertexAIModelHandlerJSON model handler,
# RunInference transform performs remote inferences by
# sending POST requests to the Vertex AI endpoint that
# our distilbert-base-uncased model is being deployed to.
- type: RunInference
name: DistilBERTRemoteInference
input: LowerCase
config:
inference_tag: "inference"
model_handler:
type: "VertexAIModelHandlerJSON"
config:
endpoint_id: "{{ ENDPOINT }}"
project: "{{ PROJECT }}"
location: "{{ LOCATION }}"
preprocess:
callable: 'lambda x: x.comment_text'
# Parse inference results output
- type: MapToFields
name: FormatInferenceOutput
input: DistilBERTRemoteInference
config:
language: python
fields:
video_id:
expression: video_id
output_type: string
comment_text:
callable: "lambda x: x.comment_text"
output_type: string
label:
callable: "lambda x: x.inference.inference[0]['label']"
output_type: string
score:
callable: "lambda x: x.inference.inference[0]['score']"
output_type: number
likes:
expression: likes
output_type: integer
replies:
expression: replies
output_type: integer
# Assign windows to each element of the unbounded PCollection.
- type: WindowInto
name: Windowing
input: FormatInferenceOutput
config:
windowing:
type: fixed
size: 30s
# Write all inference results to a BigQuery table.
- type: WriteToBigQuery
name: WriteInferenceResultsToBQ
input: Windowing
config:
table: "{{ PROJECT }}.{{ DATASET }}.{{ TABLE }}"
create_disposition: CREATE_IF_NEEDED
write_disposition: WRITE_APPEND
options:
yaml_experimental_features: ML
# Expected:
# Row(video_id='XpVt6Z1Gjjo', comment_text='I AM HAPPY', likes=1, replies=1)
# Row(video_id='XpVt6Z1Gjjo', comment_text='I AM SAD', likes=1, replies=1)
# Row(video_id='XpVt6Z1Gjjo', comment_text='ยงรฤ', likes=1, replies=1)
# Row(video_id='XpVt6Z1Gjjo', comment_text='i am happy', label='POSITIVE', score=0.95, likes=1, replies=1)
# Row(video_id='XpVt6Z1Gjjo', comment_text='i am sad', label='NEGATIVE', score=0.95, likes=1, replies=1)