Testing I/O Transforms in Apache Beam

Examples and design patterns for testing Apache Beam I/O transforms

Note: This guide is still in progress. There is an open issue to finish the guide: BEAM-1025.


This document explains the set of tests that the Beam community recommends based on our past experience writing I/O transforms. If you wish to contribute your I/O transform to the Beam community, we’ll ask you to implement these tests.

While it is standard to write unit tests and integration tests, there are many possible definitions. Our definitions are:

A note on performance benchmarking

We do not advocate writing a separate test specifically for performance benchmarking. Instead, we recommend setting up integration tests that can accept the necessary parameters to cover many different testing scenarios.

For example, if integration tests are written according to the guidelines below, the integration tests can be run on different runners (either local or in a cluster configuration) and against a data store that is a small instance with a small data set, or a large production-ready cluster with larger data set. This can provide coverage for a variety of scenarios - one of them is performance benchmarking.

Test Balance - Unit vs Integration

It’s easy to cover a large amount of code with an integration test, but it is then hard to find a cause for test failures and the test is flakier.

However, there is a valuable set of bugs found by tests that exercise multiple workers reading/writing to data store instances that have multiple nodes (eg, read replicas, etc.). Those scenarios are hard to find with unit tests and we find they commonly cause bugs in I/O transforms.

Our test strategy is a balance of those 2 contradictory needs. We recommend doing as much testing as possible in unit tests, and writing a single, small integration test that can be run in various configurations.




Unit Tests



Implementing unit tests

A general guide to writing Unit Tests for all transforms can be found in the PTransform Style Guide. We have expanded on a few important points below.

If you are using the Source API, make sure to exhaustively unit-test your code. A minor implementation error can lead to data corruption or data loss (such as skipping or duplicating records) that can be hard for your users to detect. Also look into using SourceTestUtilssource_test_utils - it is a key piece of testing Source implementations.

If you are not using the Source API, you can use TestPipeline with PAssertassert_that to help with your testing.

If you are implementing write, you can use TestPipeline to write test data and then read and verify it using a non-Beam client.

Use fakes

Instead of using mocks in your unit tests (pre-programming exact responses to each call for each test), use fakes. The preferred way to use fakes for I/O transform testing is to use a pre-existing in-memory/embeddable version of the service you’re testing, but if one does not exist consider implementing your own. Fakes have proven to be the right mix of “you can get the conditions for testing you need” and “you don’t have to write a million exacting mock function calls”.

Network failure

To help with testing and separation of concerns, code that interacts across a network should be handled in a separate class from your I/O transform. The suggested design pattern is that your I/O transform throws exceptions once it determines that a read or write is no longer possible.

This allows the I/O transform’s unit tests to act as if they have a perfect network connection, and they do not need to retry/otherwise handle network connection problems.


If your I/O transform allows batching of reads/writes, you must force the batching to occur in your test. Having configurable batch size options on your I/O transform allows that to happen easily. These must be marked as test only.

I/O Transform Integration Tests

We do not currently have examples of Python I/O integration tests or integration tests for unbounded or eventually consistent data stores. We would welcome contributions in these areas - please contact the Beam dev@ mailing list for more information.


Integration tests, data stores, and Kubernetes

In order to test I/O transforms in real world conditions, you must connect to a data store instance.

The Beam community hosts the data stores used for integration tests in Kubernetes. In order for an integration test to be run in Beam’s continuous integration environment, it must have Kubernetes scripts that set up an instance of the data store.

However, when working locally, there is no requirement to use Kubernetes. All of the test infrastructure allows you to pass in connection info, so developers can use their preferred hosting infrastructure for local development.

Running integration tests on your machine

You can always run the IO integration tests on your own machine. The high level steps for running an integration test are:

  1. Set up the data store corresponding to the test being run.
  2. Run the test, passing it connection info from the just created data store.
  3. Clean up the data store.

Data store setup/cleanup

If you’re using Kubernetes scripts to host data stores, make sure you can connect to your cluster locally using kubectl. If you have your own data stores already setup, you just need to execute step 3 from below list.

  1. Set up the data store corresponding to the test you wish to run. You can find Kubernetes scripts for all currently supported data stores in .test-infra/kubernetes.
    1. In some cases, there is a dedicated setup script (*.sh). In other cases, you can just run kubectl create -f [scriptname] to create the data store. You can also let kubernetes.sh script perform some standard steps for you.
    2. Convention dictates there will be:
      1. A yml script for the data store itself, plus a NodePort service. The NodePort service opens a port to the data store for anyone who connects to the Kubernetes cluster’s machines from within same subnetwork. Such scripts are typically useful when running the scripts on Minikube Kubernetes Engine.
      2. A separate script, with LoadBalancer service. Such service will expose an external ip for the datastore. Such scripts are needed when external access is required (eg. on Jenkins).
    3. Examples:
      1. For JDBC, you can set up Postgres: kubectl create -f .test-infra/kubernetes/postgres/postgres.yml
      2. For Elasticsearch, you can run the setup script: bash .test-infra/kubernetes/elasticsearch/setup.sh
  2. Determine the IP address of the service:
    1. NodePort service: kubectl get pods -l 'component=elasticsearch' -o jsonpath={.items[0].status.podIP}
    2. LoadBalancer service: kubectl get svc elasticsearch-external -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
  3. Run the test using integrationTest gradle task and the instructions in the test class (e.g. see the instructions in JdbcIOIT.java).
  4. Tell Kubernetes to delete the resources specified in the Kubernetes scripts:
    1. JDBC: kubectl delete -f .test-infra/kubernetes/postgres/postgres.yml
    2. Elasticsearch: bash .test-infra/kubernetes/elasticsearch/teardown.sh

Running a particular test

integrationTest is a dedicated gradle task for running IO integration tests.

Example usage on Cloud Dataflow runner:

./gradlew integrationTest -p sdks/java/io/hadoop-format -DintegrationTestPipelineOptions='["--project=GOOGLE_CLOUD_PROJECT", "--tempRoot=GOOGLE_STORAGE_BUCKET", "--numberOfRecords=1000", "--postgresPort=5432", "--postgresServerName=SERVER_NAME", "--postgresUsername=postgres", "--postgresPassword=PASSWORD", "--postgresDatabaseName=postgres", "--postgresSsl=false", "--runner=TestDataflowRunner"]' -DintegrationTestRunner=dataflow --tests=org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOIT

Example usage on HDFS filesystem and Direct runner:

NOTE: Below setup will only work when /etc/hosts file contains entries with hadoop namenode and hadoop datanodes external IPs. Please see explanation in: Small Cluster config file and Large Cluster config file.

export HADOOP_USER_NAME=root

./gradlew integrationTest -p sdks/java/io/file-based-io-tests -DintegrationTestPipelineOptions='["--numberOfRecords=1000", "--filenamePrefix=hdfs://HDFS_NAMENODE:9000/XMLIOIT", "--hdfsConfiguration=[{\"fs.defaultFS\":\"hdfs://HDFS_NAMENODE:9000\",\"dfs.replication\":1,\"dfs.client.use.datanode.hostname\":\"true\" }]" ]' -DintegrationTestRunner=direct -Dfilesystem=hdfs --tests org.apache.beam.sdk.io.xml.XmlIOIT

Parameter descriptions:

-p sdks/java/io/file-based-io-tests/Specifies the project submodule of the I/O to test.
-DintegrationTestPipelineOptionsPasses pipeline options directly to the test being run.
-DintegrationTestRunnerRunner to be used for running the test. Currently possible options are: direct, dataflow.
-Dfilesystem(optional, where applicable) Filesystem to be used to run the test. Currently possible options are: gcs, hdfs, s3. If not provided, local filesystem will be used.
--testsSpecifies the test to be run (fully qualified reference to class/test method).

Running Integration Tests on Pull Requests

Most of the IO integration tests have dedicated Jenkins jobs that run periodically to collect metrics and avoid regressions. Thanks to ghprb plugin it is also possible to trigger these jobs on demand once a specific phrase is typed in a Github Pull Request’s comment. This way tou can check if your contribution to a certain IO is an improvement or if it makes things worse (hopefully not!).

To run IO Integration Tests type the following comments in your Pull Request:

JdbcIOITRun Java JdbcIO Performance Test
MongoDBIOITRun Java MongoDBIO Performance Test
HadoopFormatIOITRun Java HadoopFormatIO Performance Test
TextIO - local filesystemRun Java TextIO Performance Test
TextIO - HDFSRun Java TextIO Performance Test HDFS
Compressed TextIO - local filesystemRun Java CompressedTextIO Performance Test
Compressed TextIO - HDFSRun Java CompressedTextIO Performance Test HDFS
AvroIO - local filesystemRun Java AvroIO Performance Test
AvroIO - HDFSRun Java AvroIO Performance Test HDFS
TFRecordIO - local filesystemRun Java TFRecordIO Performance Test
ParquetIO - local filesystemRun Java ParquetIO Performance Test
XmlIO - local filesystemRun Java XmlIO Performance Test
XmlIO - HDFSRun Java XmlIO Performance Test on HDFS

Every job definition can be found in .test-infra/jenkins. If you modified/added new Jenkins job definitions in your Pull Request, run the seed job before running the integration test (comment: “Run seed job”).

Performance testing dashboard

As mentioned before, we measure the performance of IOITs by gathering test execution times from Jenkins jobs that run periodically. The consequent results are stored in a database (BigQuery), therefore we can display them in a form of plots.

The dashboard gathering all the results is available here: Performance Testing Dashboard

Implementing Integration Tests

There are three components necessary to implement an integration test:

These two pieces are discussed in detail below.

Test Code

These are the conventions used by integration testing code:

An end to end example of these principles can be found in JdbcIOIT.

Kubernetes scripts

As discussed in Integration tests, data stores, and Kubernetes, to have your tests run on Beam’s continuous integration server, you’ll need to implement a Kubernetes script that creates an instance of your data store.

If you would like help with this or have other questions, contact the Beam dev@ mailing list and the community may be able to assist you.

Guidelines for creating a Beam data store Kubernetes script:

  1. You should define two Kubernetes scripts.
    • This is the best known way to implement item #1.
    • The first script will contain the main datastore instance script (StatefulSet) plus a NodePort service exposing the data store. This will be the script run by the Beam Jenkins continuous integration server.
    • The second script will define an additional LoadBalancer service, used to expose an external IP address to the data store if the Kubernetes cluster is on another network. This file’s name is usually suffixed with ‘-for-local-dev’.
  2. You must ensure that pods are recreated after crashes.
    • If you use a pod directly, it will not be recreated if the pod crashes or something causes the cluster to move the container for your pod.
    • In most cases, you’ll want to use StatefulSet as it supports persistent disks that last between restarts, and having a stable network identifier associated with the pod using a particular persistent disk. Deployment and ReplicaSet are also possibly useful, but likely in fewer scenarios since they do not have those features.
  3. You should create separate scripts for small and large instances of your data store.
  4. You must use a Docker image from a trusted source and pin the version of the Docker image.
    • You should prefer images in this order:
      1. An image provided by the creator of the data source/sink (if they officially maintain it). For Apache projects, this would be the official Apache repository.
      2. Official Docker images, because they have security fixes and guaranteed maintenance.
      3. Non-official Docker images, or images from other providers that have good maintainers (e.g. quay.io).

Jenkins jobs

You can find examples of existing IOIT jenkins job definitions in .test-infra/jenkins directory. Look for files called job_PerformanceTest_*.groovy. The most prominent examples are:

Notice that there is a utility class helpful in creating the jobs easily without forgetting important steps or repeating code. See Kubernetes.groovy for more details.

Small Scale and Large Scale Integration Tests

Apache Beam expects that it can run integration tests in multiple configurations:

You can do this by:

  1. Creating two Kubernetes scripts: one for a small instance of the data store, and one for a large instance.
  2. Having your test take a pipeline option that decides whether to generate a small or large amount of test data (where small and large are sizes appropriate to your data store)

An example of this is HadoopFormatIO’s tests.