Beam YAML Error Handling

The larger one’s pipeline gets, the more common it is to encounter “exceptional” data that is malformatted, doesn’t handle the proper preconditions, or otherwise breaks during processing. Generally any such record will cause the pipeline to permanently fail, but often it is desirable to allow the pipeline to continue, re-directing bad records to another path for special handling or simply recording them for later off-line analysis. This is often called the “dead letter queue” pattern.

Beam YAML has special support for this pattern if the transform supports a error_handling config parameter with an output field. The output parameter is a name that must referenced as an input to another transform that will process the errors (e.g. by writing them out). For example, the following code will write all “good” processed records to one file and any “bad” records, along with metadata about what error was encountered, to a separate file.

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      input: ReadFromCsv
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: my_error_output

    - type: WriteToJson
      input: MapToFields
      config:
        path: /path/to/output.json

    - type: WriteToJson
      name: WriteErrorsToJson
      input: MapToFields.my_error_output
      config:
        path: /path/to/errors.json

Note that with error_handling declared, MapToFields.my_error_output must be consumed; to ignore it will be an error. Any use is fine, e.g. logging the bad records to stdout would be sufficient (though not recommended for a robust pipeline).

Note also that the exact format of the error outputs is still being finalized. They can be safely printed and written to outputs, but their precise schema may change in a future version of Beam and should not yet be depended on. It generally contains the failed record itself as well as information about the error that was encountered (e.g. error messages and tracebacks). To recover the bad record alone one can process the error output with the StripErrorMetadata transformation.

Some transforms allow for extra arguments in their error_handling config, e.g. for Python functions one can give a threshold which limits the relative number of records that can be bad before considering the entire pipeline a failure

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      input: ReadFromCsv
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: my_error_output
          # If more than 10% of records throw an error, stop the pipeline.
          threshold: 0.1

    - type: WriteToJson
      input: MapToFields
      config:
        path: /path/to/output.json

    - type: WriteToJson
      name: WriteErrorsToJson
      input: MapToFields.my_error_output
      config:
        path: /path/to/errors.json

One can do arbitrary further processing on these failed records if desired, e.g.

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      name: ComputeRatio
      input: ReadFromCsv
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: my_error_output

    - type: StripErrorMetadata
      name: FailedRecordsWithoutMetadata
      # Takes the error information from ComputeRatio and returns just the
      # failing records themselves for another attempt with a different
      # transform.
      input: ComputeRatio.my_error_output

    - type: MapToFields
      name: ComputeRatioForBadRecords
      input: FailedRecordsWithoutMetadata
      config:
        language: python
        fields:
          col1: col1
          ratio: col2 / (col3 + 1)
        error_handling:
          output: still_bad

    - type: WriteToJson
      # Takes as input everything from the "success" path of both transforms.
      input: [ComputeRatio, ComputeRatioForBadRecords]
      config:
        path: /path/to/output.json

    - type: WriteToJson
      name: WriteErrorsToJson
      # These failed the first and the second transform.
      input: ComputeRatioForBadRecords.still_bad
      config:
        path: /path/to/errors.json

When using the chain syntax, the required error consumption can happen in an extra_transforms block.

pipeline:
  type: chain
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      name: SomeStep
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: errors

    - type: MapToFields
      name: AnotherStep
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          inverse_ratio: 1 / ratio
        error_handling:
          output: errors

    - type: WriteToJson
      config:
        path: /path/to/output.json

  extra_transforms:
    - type: WriteToJson
      name: WriteErrors
      input: [SomeStep.errors, AnotherStep.errors]
      config:
        path: /path/to/errors.json

Error Handling with Custom Providers

Custom transforms, such as those defined in separate YAML files via a YamlProvider, can also expose error outputs from their underlying transforms.

Consider a file my_transforms.yaml that defines a RaiseElementToPower transform:

# my_transforms.yaml
- type: yaml
  transforms:
    RaiseElementToPower:
      config_schema:
        properties:
          n: {type: integer}
      body:
        type: MapToFields
        config:
          language: python
          append: true
          fields:
            power: "element ** {{n}}"
          # This transform internally defines and exposes an error output.
          error_handling:
            output: my_error

This transform takes a numeric element and raises it to the power of n. If the element is not a number, it will produce an error. The error output from the internal MapToFields is named my_error. This error output is automatically exposed by the RaiseElementToPower transform.

When using this transform in a pipeline, you can access this error output and handle it. The main output of the transform will contain only the successfully processed elements.

pipeline:
  transforms:
    - type: Create
      config:
        elements: [2, 'bad', 3]
    - type: RaiseElementToPower
      input: Create
      config:
        n: 2
    - type: WriteToJson
      name: WriteGood
      # The main output contains successfully processed elements.
      input: RaiseElementToPower
      config:
        path: /path/to/good
    - type: WriteToJson
      name: WriteBad
      # The error output is accessed by its name.
      input: RaiseElementToPower.my_error
      config:
        path: /path/to/bad

  providers:
    - include: my_transforms.yaml

In this example, the pipeline separates the good and bad records coming from the custom RaiseElementToPower transform. The good records are written to one location, and the error records are written to another.

A pipeline will fail at construction time if an error output is declared (either in a built-in transform or a custom one) but not consumed. This helps ensure that all error paths are considered.

See YAML schema info for another use of error_handling in a schema context.