Efficient Streaming Data Processing with Beam YAML and Protobuf

Efficient Streaming Data Processing with Beam YAML and Protobuf

As streaming data processing grows, so do its maintenance, complexity, and costs. This post explains how to efficiently scale pipelines by using Protobuf, which ensures that pipelines are reusable and quick to deploy. The goal is to keep this process simple for engineers to implement using Beam YAML.

Simplify pipelines with Beam YAML

Creating a pipeline in Beam can be somewhat difficult, especially for new Apache Beam users. Setting up the project, managing dependencies, and so on can be challenging. Beam YAML eliminates most of the boilerplate code, which allows you to focus on the most important part of the work: data transformation.

Some of the key benefits of Beam YAML include:

  • Readability: By using a declarative language (YAML), the pipeline configuration is more human readable.
  • Reusability: Reusing the same components across different pipelines is simplified.
  • Maintainability: Pipeline maintenance and updates are easier.

The following template shows an example of reading events from a Kafka topic and writing them into BigQuery.

pipeline:
  transforms:
    - type: ReadFromKafka
      name: ReadProtoMovieEvents
      config:
        topic: 'TOPIC_NAME'
        format: RAW/AVRO/JSON/PROTO
        bootstrap_servers: 'BOOTSTRAP_SERVERS'
        schema: 'SCHEMA'
    - type: WriteToBigQuery
      name: WriteMovieEvents
      input: ReadProtoMovieEvents
      config:
        table: 'PROJECT_ID.DATASET.MOVIE_EVENTS_TABLE'
        useAtLeastOnceSemantics: true

options:
  streaming: true
  dataflow_service_options: [streaming_mode_at_least_once]

The complete workflow

This section demonstrates the complete workflow for this pipeline.

Create a simple proto event

The following code creates a simple movie event.

// events/v1/movie_event.proto

syntax = "proto3";

package event.v1;

import "bq_field.proto";
import "bq_table.proto";
import "buf/validate/validate.proto";
import "google/protobuf/wrappers.proto";

message MovieEvent {
  option (gen_bq_schema.bigquery_opts).table_name = "movie_table";
  google.protobuf.StringValue event_id = 1 [(gen_bq_schema.bigquery).description = "Unique Event ID"];
  google.protobuf.StringValue user_id = 2 [(gen_bq_schema.bigquery).description = "Unique User ID"];
  google.protobuf.StringValue movie_id = 3 [(gen_bq_schema.bigquery).description = "Unique Movie ID"];
  google.protobuf.Int32Value rating = 4 [(buf.validate.field).int32 = {
    // validates the average rating is at least 0
    gte: 0,
    // validates the average rating is at most 100
    lte: 100
  }, (gen_bq_schema.bigquery).description = "Movie rating"];
  string event_dt = 5 [
    (gen_bq_schema.bigquery).type_override = "DATETIME",
    (gen_bq_schema.bigquery).description = "UTC Datetime representing when we received this event. Format: YYYY-MM-DDTHH:MM:SS",
    (buf.validate.field) = {
      string: {
        pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}$"
      },
      ignore_empty: false,
    }
  ];
}

Because these events are written to BigQuery, the bq_field proto and the bq_table proto are imported. These proto files help generate the BigQuery JSON schema. This example also demonstrates a shift-left approach, which moves testing, quality, and performance as early as possible in the development process. For example, to ensure that only valid events are generated from the source, the buf.validate elements are included.

After you create the movie_event.proto proto in the events/v1 folder, you can generate the necessary file descriptor. A file descriptor is a compiled representation of the schema that allows various tools and systems to understand and work with protobuf data dynamically. To simplify the process, this example uses Buf, which requires the following configuration files.

Buf configuration:

# buf.yaml

version: v2
deps:
  - buf.build/googlecloudplatform/bq-schema-api
  - buf.build/bufbuild/protovalidate
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT
# buf.gen.yaml

version: v2
managed:
  enabled: true
plugins:
  # Python Plugins
  - remote: buf.build/protocolbuffers/python
    out: gen/python
  - remote: buf.build/grpc/python
    out: gen/python

  # Java Plugins
  - remote: buf.build/protocolbuffers/java:v25.2
    out: gen/maven/src/main/java
  - remote: buf.build/grpc/java
    out: gen/maven/src/main/java

  # BQ Schemas
  - remote: buf.build/googlecloudplatform/bq-schema:v1.1.0
    out: protoc-gen/bq_schema

Run the following two commands to generate the necessary Java, Python, BigQuery schema, and Descriptor file:

// Generate the buf.lock file
buf deps update

// It generates the descriptor in descriptor.binp.
buf build . -o descriptor.binp --exclude-imports

// It generates the Java, Python and BigQuery schema as described in buf.gen.yaml
buf generate --include-imports

Make the Beam YAML read proto

Make the following modifications to the to the YAML file:

# movie_events_pipeline.yml

pipeline:
  transforms:
    - type: ReadFromKafka
      name: ReadProtoMovieEvents
      config:
        topic: 'movie_proto'
        format: PROTO
        bootstrap_servers: '<BOOTSTRAP_SERVERS>'
        file_descriptor_path: 'gs://my_proto_bucket/movie/v1.0.0/descriptor.binp'
        message_name: 'event.v1.MovieEvent'
    - type: WriteToBigQuery
      name: WriteMovieEvents
      input: ReadProtoMovieEvents
      config:
        table: '<PROJECT_ID>.raw.movie_table'
        useAtLeastOnceSemantics: true
options:
  streaming: true
  dataflow_service_options: [streaming_mode_at_least_once]

This step changes the format to PROTO and adds the file_descriptor_path and the message_name.

Deploy the pipeline with Terraform

You can use Terraform to deploy the Beam YAML pipeline with Dataflow as the runner. The following Terraform code example demonstrates how to achieve this:

// Enable Dataflow API.
resource "google_project_service" "enable_dataflow_api" {
  project = var.gcp_project_id
  service = "dataflow.googleapis.com"
}

// DF Beam YAML
resource "google_dataflow_flex_template_job" "data_movie_job" {
  provider                     = google-beta
  project                      = var.gcp_project_id
  name                         = "movie-proto-events"
  container_spec_gcs_path      = "gs://dataflow-templates-${var.gcp_region}/latest/flex/Yaml_Template"
  region                       = var.gcp_region
  on_delete                    = "drain"
  machine_type                 = "n2d-standard-4"
  enable_streaming_engine      = true
  subnetwork                   = var.subnetwork
  skip_wait_on_job_termination = true
  parameters = {
    yaml_pipeline_file = "gs://${var.bucket_name}/yamls/${var.package_version}/movie_events_pipeline.yml"
    max_num_workers    = 40
    worker_zone        = var.gcp_zone
  }
  depends_on = [google_project_service.enable_dataflow_api]
}

Assuming the BigQuery table exists, which you can do by using Terraform and Proto, this code creates a Dataflow job by using the Beam YAML code that reads Proto events from Kafka and writes them into BigQuery.

Improvements and conclusions

The following community contributions could improve the Beam YAML code in this example:

  • Support schema registries: Integrate with schema registries such as Buf Registry or Apicurio for better schema management. The current workflow generates the descriptors by using Buf and store them in Google Cloud Storage. The descriptors could be stored in a schema registry instead.

  • Enhanced Monitoring: Implement advanced monitoring and alerting mechanisms to quickly identify and address issues in the data pipeline.

Leveraging Beam YAML and Protobuf lets us streamline the creation and maintenance of data processing pipelines, significantly reducing complexity. This approach ensures that engineers can more efficiently implement and scale robust, reusable pipelines without needs to manually write Beam code.

Contribute

Developers who want to help build out and add functionalities are welcome to start contributing to the effort in the Beam YAML module.

There is also a list of open bugs found on the GitHub repo - now marked with the yaml tag.

Although Beam YAML is marked stable as of Beam 2.52, it is still under heavy development, with new features being added with each release. Those who want to be part of the design decisions and give insights to how the framework is being used are highly encouraged to join the dev mailing list, where those discussions are occurring.