#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import functools
import inspect
from typing import NamedTuple
import apache_beam as beam
from apache_beam.typehints.row_type import RowTypeConstraint
[docs]
class ErrorHandlingConfig(NamedTuple):
"""This option specifies whether and where to output error rows.
Args:
output (str): Name to use for the output error collection
"""
output: str
# TODO: Other parameters are valid here too, but not common to Java.
[docs]
def exception_handling_args(error_handling_spec):
if error_handling_spec:
return {
'dead_letter_tag' if k == 'output' else k: v
for (k, v) in error_handling_spec.items()
}
else:
return None
[docs]
def maybe_with_exception_handling(inner_expand):
def expand(self, pcoll):
wrapped_pcoll = beam.core._MaybePValueWithErrors(
pcoll, self._exception_handling_args)
return inner_expand(self, wrapped_pcoll).as_result(
map_errors_to_standard_format(pcoll.element_type))
return expand