Beam YAML Tests
A robust testing story is an important part of any production setup. Though the various built-in (and externally provided) transform in a Beam YAML pipeline can be expected to be well tested, it can be important to have tests that ensure the pipeline as a whole behaves as expected. This is particularly true for transforms that contain non-trivial UDF logic.
Whole pipeline tests
For example, consider the example word count pipeline.
pipeline:
transforms:
- type: ReadFromText
name: Read from GCS
config:
path: gs://dataflow-samples/shakespeare/kinglear.txt
- type: MapToFields
name: Split words
config:
language: python
fields:
word:
callable: |
import re
def all_words(row):
return re.findall(r'[a-z]+', row.line.lower())
value: 1
input: Read from GCS
- type: Explode
name: Explode word arrays
config:
fields: [word]
input: Split words
- type: Combine
name: Count words
config:
group_by: [word]
combine:
value: sum
input: Explode word arrays
- type: MapToFields
name: Format output
config:
language: python
fields:
output: "word + ': ' + str(value)"
input: Count words
- type: WriteToText
name: Write to GCS
config:
path: gs://bucket/counts.txt
input: Format output
tests: []
To write tests for this pipeline, one creates a tests
section that enumerates
a number of tests, each of which provide example input and assert the expected
output is produced. An example test might be as follows
tests:
- name: MyRegressionTest
mock_outputs:
- name: Read from GCS
elements:
- line: "Nothing can come of nothing"
expected_inputs:
- name: Write to GCS
elements:
- output: 'nothing: 2'
- output: 'can: 1'
- output: 'come: 1'
- output: 'of: 1'
The mock_outputs
section designates that the transform named Read from GCS
should produce the single row {line: "Nothing can come of nothing"}
for the
purposes of this test, and the expected_inputs
section indicates that the
transform Write to GCS
should expect to receive exactly the given elements.
Neither the actual Read transform nor Write transform from the original
pipelines are executed when running the test, but all intermediate transforms
are.
This test can then be executed by running
python -m apache_beam.yaml.main \
--yaml_pipeline_file=wordcount.yaml \
--tests
Alternatively, the a tests:
block may be placed in a separate file and be
validated by running
python -m apache_beam.yaml.main \
--yaml_pipeline_file=wordcount.yaml \
--tests \
--test_suite=test_file.yaml
Neither the actual Read transform nor Write transform from the original
pipelines are executed when running the test, but all intermediate transforms
are. For hermeticity, we require that all inputs (with the exception of
Create
that are needed to compute the expected outputs are explicitly mocked;
to explicitly allow a sources to be executed as part of a test their names or
types can be enumerated in an allowed_sources
attribute of the test
specification.
Pipeline fragment tests
One can also tests a portion of a pipeline using the mock_inputs
and
expected_outputs
section of a test, for example
tests:
- name: TestSplittingWithPunctuation
mock_inputs:
- name: Split words
elements:
- line: "lots-of-words"
- line: "...and more"
expected_outputs:
- name: Explode
elements:
- word: lots
value: 1
- word: of
value: 1
- word: words
value: 1
- word: and
value: 1
- word: more
value: 1
- name: TestCombineAndFormat
mock_inputs:
- name: Count words
elements:
- word: more
value: 1
- word: and
value: 1
- word: more
value: 1
expected_outputs:
- name: Format output
elements:
- output: "more: 2"
- output: "and: 1"
As before, each test only executes the portion of the pipeline between the
mock inputs and expected outputs. Note that the named transform in a
mock_inputs
specification is executed, while the named transform of a
mock_oupputs
specification is not.
Similarly, the named transform of a expected_inputs
specification is not
executed, while the named transform of an expected_outputs
necessarily is.
Automatically generating tests.
In an effort to make tests as easy to write and maintain as possible, Beam YAML provides utilities to compute the expected outputs for your tests.
Running
python -m apache_beam.yaml.main \
--yaml_pipeline_file=wordcount.yaml \
--tests \
[--test_suite=...] \
--create_test
will create an entirely new test by sampling all the sources and constructing a test accordingly.
One can also keep tests up to date by running
python -m apache_beam.yaml.main \
--yaml_pipeline_file=wordcount.yaml \
--tests \
[--test_suite=...] \
--fix_tests
which will update any existing expected_input
and expected_output
blocks
of your pipeline to contain the actual values computed during the test.
This can be useful in authoring tests as well–one can simply specify a
nonsensical or empty elements block in the expectation and the --fix_tests
flag will populate it for you.
(Of course, it is on any user of these flags to verify that the produced values
are meaningful and as expected.)
Branching pipelines
For complex, branching pipelines, any number of mock_inputs
and mock_outupts
may be enumerated to provide the input data, and any number of expected_inputs
and expected_outputs
validations may be specified as well.
In both the mock_outupts
and expected_outputs
block, multiple outputs can
be disambiguated with the TransformName.output_name
notation just as when
authoring a yaml pipeline.
pipeline:
transforms:
- type: Create
name: Abc
config:
elements: [a, b, ccc]
- type: Create
name: Xyz
config:
elements: [x, y, zzz]
- type: MapToFields
name: Upper
input: [Abc, Xyz]
config:
language: python
fields:
element: element.upper()
- type: Partition
input: Upper
config:
language: python
by: '"big" if len(element) > 1 else "small"'
outputs: ["big", "small"]
- type: MapToFields
name: MaybeHasErrors
input: Abc
config:
language: python
fields:
inverse_size: 1 / len(element)
error_handling:
output: errors
- type: StripErrorMetadata
input: MaybeHasErrors.errors
tests:
- name: MockMultipleInputs
mock_outputs:
- name: Abc
elements: [element: a]
- name: Xyz
elements: [element: z]
expected_outputs:
- name: Upper
elements: [element: A, element: Z]
- name: TestMultipelOuptuts
mock_inputs:
- name: Upper
elements: [element: m, element: nnn]
expected_outputs:
- name: Partition.big
elements: [element: NNN]
- name: Partition.small
elements: [element: M]
- name: TestErrorHandling
mock_outputs:
- name: Abc
elements: [element: 'Aaaa', element: '']
expected_outputs:
- name: MaybeHasErrors
elements: [inverse_size: 0.25]
- name: StripErrorMetadata
elements: [element: '']