Source code for apache_beam.utils.retry

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Retry decorators for calls raising exceptions.

For internal use only; no backwards-compatibility guarantees.

This module is used mostly to decorate all integration points where the code
makes calls to remote services. Searching through the code base for @retry
should find all such places. For this reason even places where retry is not
needed right now use a @retry.no_retries decorator.
"""

# pytype: skip-file

from __future__ import absolute_import

import functools
import logging
import random
import sys
import time
import traceback
from builtins import next
from builtins import object
from builtins import range

from future.utils import raise_with_traceback

from apache_beam.io.filesystem import BeamIOError

# Protect against environments where apitools library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
# TODO(sourabhbajaj): Remove the GCP specific error code to a submodule
try:
  from apitools.base.py.exceptions import HttpError
except ImportError as e:
  HttpError = None

# Protect against environments where aws tools are not available.
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
  from apache_beam.io.aws.clients.s3 import messages as _s3messages
except ImportError:
  S3ClientError = None
else:
  S3ClientError = _s3messages.S3ClientError
# pylint: enable=wrong-import-order, wrong-import-position

_LOGGER = logging.getLogger(__name__)


[docs]class PermanentException(Exception): """Base class for exceptions that should not be retried.""" pass
[docs]class FuzzedExponentialIntervals(object): """Iterable for intervals that are exponentially spaced, with fuzzing. On iteration, yields retry interval lengths, in seconds. Every iteration over this iterable will yield differently fuzzed interval lengths, as long as fuzz is nonzero. Args: initial_delay_secs: The delay before the first retry, in seconds. num_retries: The total number of times to retry. factor: The exponential factor to use on subsequent retries. Default is 2 (doubling). fuzz: A value between 0 and 1, indicating the fraction of fuzz. For a given delay d, the fuzzed delay is randomly chosen between [(1 - fuzz) * d, d]. max_delay_secs: Maximum delay (in seconds). After this limit is reached, further tries use max_delay_sec instead of exponentially increasing the time. Defaults to 1 hour. """ def __init__( self, initial_delay_secs, num_retries, factor=2, fuzz=0.5, max_delay_secs=60 * 60 * 1): self._initial_delay_secs = initial_delay_secs if num_retries > 10000: raise ValueError('num_retries parameter cannot exceed 10000.') self._num_retries = num_retries self._factor = factor if not 0 <= fuzz <= 1: raise ValueError('fuzz parameter expected to be in [0, 1] range.') self._fuzz = fuzz self._max_delay_secs = max_delay_secs def __iter__(self): current_delay_secs = min(self._max_delay_secs, self._initial_delay_secs) for _ in range(self._num_retries): fuzz_multiplier = 1 - self._fuzz + random.random() * self._fuzz yield current_delay_secs * fuzz_multiplier current_delay_secs = min( self._max_delay_secs, current_delay_secs * self._factor)
[docs]def retry_on_server_errors_filter(exception): """Filter allowing retries on server errors and non-HttpErrors.""" if (HttpError is not None) and isinstance(exception, HttpError): return exception.status_code >= 500 if (S3ClientError is not None) and isinstance(exception, S3ClientError): return exception.code >= 500 return not isinstance(exception, PermanentException)
# TODO(BEAM-6202): Dataflow returns 404 for job ids that actually exist. # Retry on those errors.
[docs]def retry_on_server_errors_and_notfound_filter(exception): if HttpError is not None and isinstance(exception, HttpError): if exception.status_code == 404: # 404 Not Found return True return retry_on_server_errors_filter(exception)
[docs]def retry_on_server_errors_and_timeout_filter(exception): if HttpError is not None and isinstance(exception, HttpError): if exception.status_code == 408: # 408 Request Timeout return True if S3ClientError is not None and isinstance(exception, S3ClientError): if exception.code == 408: # 408 Request Timeout return True return retry_on_server_errors_filter(exception)
[docs]def retry_on_server_errors_timeout_or_quota_issues_filter(exception): """Retry on server, timeout and 403 errors. 403 errors can be accessDenied, billingNotEnabled, and also quotaExceeded, rateLimitExceeded.""" if HttpError is not None and isinstance(exception, HttpError): if exception.status_code == 403: return True if S3ClientError is not None and isinstance(exception, S3ClientError): if exception.code == 403: return True return retry_on_server_errors_and_timeout_filter(exception)
[docs]def retry_on_beam_io_error_filter(exception): """Filter allowing retries on Beam IO errors.""" return isinstance(exception, BeamIOError)
SERVER_ERROR_OR_TIMEOUT_CODES = [408, 500, 502, 503, 504, 598, 599]
[docs]class Clock(object): """A simple clock implementing sleep()."""
[docs] def sleep(self, value): time.sleep(value)
[docs]def no_retries(fun): """A retry decorator for places where we do not want retries.""" return with_exponential_backoff(retry_filter=lambda _: False, clock=None)(fun)
[docs]def with_exponential_backoff( num_retries=7, initial_delay_secs=5.0, logger=_LOGGER.warning, retry_filter=retry_on_server_errors_filter, clock=Clock(), fuzz=True, factor=2, max_delay_secs=60 * 60): """Decorator with arguments that control the retry logic. Args: num_retries: The total number of times to retry. initial_delay_secs: The delay before the first retry, in seconds. logger: A callable used to report an exception. Must have the same signature as functions in the standard logging module. The default is _LOGGER.warning. retry_filter: A callable getting the exception raised and returning True if the retry should happen. For instance we do not want to retry on 404 Http errors most of the time. The default value will return true for server errors (HTTP status code >= 500) and non Http errors. clock: A clock object implementing a sleep method. The default clock will use time.sleep(). fuzz: True if the delay should be fuzzed (default). During testing False can be used so that the delays are not randomized. factor: The exponential factor to use on subsequent retries. Default is 2 (doubling). max_delay_secs: Maximum delay (in seconds). After this limit is reached, further tries use max_delay_sec instead of exponentially increasing the time. Defaults to 1 hour. Returns: As per Python decorators with arguments pattern returns a decorator for the function which in turn will return the wrapped (decorated) function. The decorator is intended to be used on callables that make HTTP or RPC requests that can temporarily timeout or have transient errors. For instance the make_http_request() call below will be retried 16 times with exponential backoff and fuzzing of the delay interval (default settings). from apache_beam.utils import retry # ... @retry.with_exponential_backoff() make_http_request(args) """ def real_decorator(fun): """The real decorator whose purpose is to return the wrapped function.""" @functools.wraps(fun) def wrapper(*args, **kwargs): retry_intervals = iter( FuzzedExponentialIntervals( initial_delay_secs, num_retries, factor, fuzz=0.5 if fuzz else 0, max_delay_secs=max_delay_secs)) while True: try: return fun(*args, **kwargs) except Exception as exn: # pylint: disable=broad-except if not retry_filter(exn): raise # Get the traceback object for the current exception. The # sys.exc_info() function returns a tuple with three elements: # exception type, exception value, and exception traceback. exn_traceback = sys.exc_info()[2] try: try: sleep_interval = next(retry_intervals) except StopIteration: # Re-raise the original exception since we finished the retries. raise_with_traceback(exn, exn_traceback) logger( 'Retry with exponential backoff: waiting for %s seconds before ' 'retrying %s because we caught exception: %s ' 'Traceback for above exception (most recent call last):\n%s', sleep_interval, getattr(fun, '__name__', str(fun)), ''.join(traceback.format_exception_only(exn.__class__, exn)), ''.join(traceback.format_tb(exn_traceback))) clock.sleep(sleep_interval) finally: # Traceback objects in locals can cause reference cycles that will # prevent garbage collection. Clear it now since we do not need # it anymore. exn_traceback = None return wrapper return real_decorator