Beam YAML Tests

A robust testing story is an important part of any production setup. Though the various built-in (and externally provided) transform in a Beam YAML pipeline can be expected to be well tested, it can be important to have tests that ensure the pipeline as a whole behaves as expected. This is particularly true for transforms that contain non-trivial UDF logic.

Whole pipeline tests

For example, consider the example word count pipeline.

pipeline:
  transforms:
  - type: ReadFromText
    name: Read from GCS
    config:
      path: gs://dataflow-samples/shakespeare/kinglear.txt
  - type: MapToFields
    name: Split words
    config:
      language: python
      fields:
        word:
          callable: |
            import re
            def all_words(row):
              return re.findall(r'[a-z]+', row.line.lower())
        value: 1
    input: Read from GCS
  - type: Explode
    name: Explode word arrays
    config:
      fields: [word]
    input: Split words
  - type: Combine
    name: Count words
    config:
      group_by: [word]
      combine:
        value: sum
    input: Explode word arrays
  - type: MapToFields
    name: Format output
    config:
      language: python
      fields:
        output: "word + ': ' + str(value)"
    input: Count words
  - type: WriteToText
    name: Write to GCS
    config:
      path: gs://bucket/counts.txt
    input: Format output

tests: []

To write tests for this pipeline, one creates a tests section that enumerates a number of tests, each of which provide example input and assert the expected output is produced. An example test might be as follows

tests:
- name: MyRegressionTest
  mock_outputs:
    - name: Read from GCS
      elements:
        - line: "Nothing can come of nothing"
  expected_inputs:
    - name: Write to GCS
      elements:
        - output: 'nothing: 2'
        - output: 'can: 1'
        - output: 'come: 1'
        - output: 'of: 1'

The mock_outputs section designates that the transform named Read from GCS should produce the single row {line: "Nothing can come of nothing"} for the purposes of this test, and the expected_inputs section indicates that the transform Write to GCS should expect to receive exactly the given elements. Neither the actual Read transform nor Write transform from the original pipelines are executed when running the test, but all intermediate transforms are.

This test can then be executed by running

python -m apache_beam.yaml.main \
    --yaml_pipeline_file=wordcount.yaml \
    --tests

Alternatively, the a tests: block may be placed in a separate file and be validated by running

python -m apache_beam.yaml.main \
    --yaml_pipeline_file=wordcount.yaml \
    --tests \
    --test_suite=test_file.yaml

Neither the actual Read transform nor Write transform from the original pipelines are executed when running the test, but all intermediate transforms are. For hermeticity, we require that all inputs (with the exception of Create that are needed to compute the expected outputs are explicitly mocked; to explicitly allow a sources to be executed as part of a test their names or types can be enumerated in an allowed_sources attribute of the test specification.

Pipeline fragment tests

One can also tests a portion of a pipeline using the mock_inputs and expected_outputs section of a test, for example

tests:
- name: TestSplittingWithPunctuation
  mock_inputs:
    - name: Split words
      elements:
        - line: "lots-of-words"
        - line: "...and more"
  expected_outputs:
    - name: Explode
      elements:
        - word: lots
          value: 1
        - word: of
          value: 1
        - word: words
          value: 1
        - word: and
          value: 1
        - word: more
          value: 1

- name: TestCombineAndFormat
  mock_inputs:
    - name: Count words
      elements:
        - word: more
          value: 1
        - word: and
          value: 1
        - word: more
          value: 1
  expected_outputs:
    - name: Format output
      elements:
        - output: "more: 2"
        - output: "and: 1"

As before, each test only executes the portion of the pipeline between the mock inputs and expected outputs. Note that the named transform in a mock_inputs specification is executed, while the named transform of a mock_oupputs specification is not. Similarly, the named transform of a expected_inputs specification is not executed, while the named transform of an expected_outputs necessarily is.

Automatically generating tests.

In an effort to make tests as easy to write and maintain as possible, Beam YAML provides utilities to compute the expected outputs for your tests.

Running

python -m apache_beam.yaml.main \
    --yaml_pipeline_file=wordcount.yaml \
    --tests \
    [--test_suite=...] \
    --create_test

will create an entirely new test by sampling all the sources and constructing a test accordingly.

One can also keep tests up to date by running

python -m apache_beam.yaml.main \
    --yaml_pipeline_file=wordcount.yaml \
    --tests \
    [--test_suite=...] \
    --fix_tests

which will update any existing expected_input and expected_output blocks of your pipeline to contain the actual values computed during the test. This can be useful in authoring tests as well–one can simply specify a nonsensical or empty elements block in the expectation and the --fix_tests flag will populate it for you. (Of course, it is on any user of these flags to verify that the produced values are meaningful and as expected.)

Branching pipelines

For complex, branching pipelines, any number of mock_inputs and mock_outupts may be enumerated to provide the input data, and any number of expected_inputs and expected_outputs validations may be specified as well. In both the mock_outupts and expected_outputs block, multiple outputs can be disambiguated with the TransformName.output_name notation just as when authoring a yaml pipeline.

pipeline:
  transforms:
    - type: Create
      name: Abc
      config:
        elements: [a, b, ccc]
    - type: Create
      name: Xyz
      config:
        elements: [x, y, zzz]
    - type: MapToFields
      name: Upper
      input: [Abc, Xyz]
      config:
        language: python
        fields:
          element: element.upper()
    - type: Partition
      input: Upper
      config:
        language: python
        by: '"big" if len(element) > 1 else "small"'
        outputs: ["big", "small"]
    - type: MapToFields
      name: MaybeHasErrors
      input: Abc
      config:
        language: python
        fields:
          inverse_size: 1 / len(element)
        error_handling:
          output: errors
    - type: StripErrorMetadata
      input: MaybeHasErrors.errors

tests:
  - name: MockMultipleInputs
    mock_outputs:
      - name: Abc
        elements: [element: a]
      - name: Xyz
        elements: [element: z]
    expected_outputs:
      - name: Upper
        elements: [element: A, element: Z]

  - name: TestMultipelOuptuts
    mock_inputs:
      - name: Upper
        elements: [element: m, element: nnn]
    expected_outputs:
      - name: Partition.big
        elements: [element: NNN]
      - name: Partition.small
        elements: [element: M]

  - name: TestErrorHandling
    mock_outputs:
      - name: Abc
        elements: [element: 'Aaaa', element: '']
    expected_outputs:
      - name: MaybeHasErrors
        elements: [inverse_size: 0.25]
      - name: StripErrorMetadata
        elements: [element: '']