Source code for apache_beam.yaml.yaml_errors

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import functools
import inspect
from typing import NamedTuple

import apache_beam as beam
from apache_beam.typehints.row_type import RowTypeConstraint


[docs] class ErrorHandlingConfig(NamedTuple): """This option specifies whether and where to output error rows. Args: output (str): Name to use for the output error collection """ output: str
# TODO: Other parameters are valid here too, but not common to Java.
[docs] def exception_handling_args(error_handling_spec): if error_handling_spec: return { 'dead_letter_tag' if k == 'output' else k: v for (k, v) in error_handling_spec.items() } else: return None
[docs] def map_errors_to_standard_format(input_type): # TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple. return beam.Map( lambda x: beam.Row( element=x[0], msg=str(x[1][1]), stack=''.join(x[1][2])) ).with_output_types( RowTypeConstraint.from_fields([("element", input_type), ("msg", str), ("stack", str)]))
[docs] def maybe_with_exception_handling(inner_expand): def expand(self, pcoll): wrapped_pcoll = beam.core._MaybePValueWithErrors( pcoll, self._exception_handling_args) return inner_expand(self, wrapped_pcoll).as_result( map_errors_to_standard_format(pcoll.element_type)) return expand
[docs] def maybe_with_exception_handling_transform_fn(transform_fn): @functools.wraps(transform_fn) def expand(pcoll, error_handling=None, **kwargs): wrapped_pcoll = beam.core._MaybePValueWithErrors( pcoll, exception_handling_args(error_handling)) return transform_fn(wrapped_pcoll, **kwargs).as_result( map_errors_to_standard_format(pcoll.element_type)) original_signature = inspect.signature(transform_fn) new_parameters = list(original_signature.parameters.values()) error_handling_param = inspect.Parameter( 'error_handling', inspect.Parameter.KEYWORD_ONLY, default=None, annotation=ErrorHandlingConfig) if new_parameters[-1].kind == inspect.Parameter.VAR_KEYWORD: new_parameters.insert(-1, error_handling_param) else: new_parameters.append(error_handling_param) expand.__signature__ = original_signature.replace(parameters=new_parameters) return expand