#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Utility functions & classes that are _not_ specific to the datastore client.
#
# For internal use only; no backwards-compatibility guarantees.
# pytype: skip-file
import logging
import random
import time
from apache_beam.io.components import util
from apache_beam.metrics.metric import Metrics
_SECONDS_TO_MILLISECONDS = 1_000
[docs]
class ThrottlingSignaler(object):
"""A class that handles signaling throttling of remote requests to the
SDK harness.
"""
def __init__(self, namespace: str = ""):
self.throttling_metric = Metrics.counter(
namespace, "cumulativeThrottlingSeconds")
[docs]
def signal_throttled(self, seconds: int):
"""Signals to the runner that requests have been throttled for some amount
of time.
Args:
seconds: int, duration of throttling in seconds.
"""
self.throttling_metric.inc(seconds)
[docs]
class AdaptiveThrottler(object):
"""Implements adaptive throttling.
See
https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg
for a full discussion of the use case and algorithm applied.
"""
# The target minimum number of requests per samplePeriodMs, even if no
# requests succeed. Must be greater than 0, else we could throttle to zero.
# Because every decision is probabilistic, there is no guarantee that the
# request rate in any given interval will not be zero. (This is the +1 from
# the formula in
# https://landing.google.com/sre/book/chapters/handling-overload.html )
MIN_REQUESTS = 1
def __init__(self, window_ms, bucket_ms, overload_ratio):
"""Initializes AdaptiveThrottler.
Args:
window_ms: int, length of history to consider, in ms, to set
throttling.
bucket_ms: int, granularity of time buckets that we store data in, in
ms.
overload_ratio: float, the target ratio between requests sent and
successful requests. This is "K" in the formula in
https://landing.google.com/sre/book/chapters/handling-overload.html.
"""
self._all_requests = util.MovingSum(window_ms, bucket_ms)
self._successful_requests = util.MovingSum(window_ms, bucket_ms)
self._overload_ratio = float(overload_ratio)
self._random = random.Random()
def _throttling_probability(self, now):
if not self._all_requests.has_data(now):
return 0
all_requests = self._all_requests.sum(now)
successful_requests = self._successful_requests.sum(now)
return max(
0, (all_requests - self._overload_ratio * successful_requests) /
(all_requests + AdaptiveThrottler.MIN_REQUESTS))
[docs]
def throttle_request(self, now):
"""Determines whether one RPC attempt should be throttled.
This should be called once each time the caller intends to send an RPC; if
it returns true, drop or delay that request (calling this function again
after the delay).
Args:
now: int, time in ms since the epoch
Returns:
bool, True if the caller should throttle or delay the request.
"""
throttling_probability = self._throttling_probability(now)
self._all_requests.add(now, 1)
return self._random.uniform(0, 1) < throttling_probability
[docs]
def successful_request(self, now):
"""Notifies the throttler of a successful request.
Must be called once for each request (for which throttle_request was
previously called) that succeeded.
Args:
now: int, time in ms since the epoch
"""
self._successful_requests.add(now, 1)
[docs]
class ReactiveThrottler(AdaptiveThrottler):
""" A wrapper around the AdaptiveThrottler that also handles logging and
signaling throttling to the SDK harness using the provided namespace.
For usage, instantiate one instance of a ReactiveThrottler class for a
PTransform. When making remote calls to a service, preface that call with
the throttle() method to potentially pre-emptively throttle the request.
This will throttle future calls based on the failure rate of preceding calls,
with higher failure rates leading to longer periods of throttling to allow
system recovery. capture the timestamp of the attempted request, then execute
the request code. On a success, call successful_request(timestamp) to report
the success to the throttler. This flow looks like the following:
def remote_call():
throttler.throttle()
try:
timestamp = time.time()
result = make_request()
throttler.successful_request(timestamp)
return result
except Exception as e:
# do any error handling you want to do
raise
"""
def __init__(
self,
window_ms: int,
bucket_ms: int,
overload_ratio: float,
namespace: str = '',
throttle_delay_secs: int = 5):
"""Initializes the ReactiveThrottler.
Args:
window_ms: int, length of history to consider, in ms, to set
throttling.
bucket_ms: int, granularity of time buckets that we store data in, in
ms.
overload_ratio: float, the target ratio between requests sent and
successful requests. This is "K" in the formula in
https://landing.google.com/sre/book/chapters/handling-overload.html.
namespace: str, the namespace to use for logging and signaling
throttling is occurring
throttle_delay_secs: int, the amount of time in seconds to wait
after preemptively throttled requests
"""
self.throttling_signaler = ThrottlingSignaler(namespace=namespace)
self.logger = logging.getLogger(namespace)
self.throttle_delay_secs = throttle_delay_secs
super().__init__(
window_ms=window_ms, bucket_ms=bucket_ms, overload_ratio=overload_ratio)
[docs]
def throttle(self):
""" Stops request code from advancing while the underlying
AdaptiveThrottler is signaling to preemptively throttle the request.
Automatically handles logging the throttling and signaling to the SDK
harness that the request is being throttled. This should be called in any
context where a call to a remote service is being contacted prior to the
call being performed.
"""
while self.throttle_request(time.time() * _SECONDS_TO_MILLISECONDS):
self.logger.info(
"Delaying request for %d seconds due to previous failures",
self.throttle_delay_secs)
time.sleep(self.throttle_delay_secs)
self.throttling_signaler.signal_throttled(self.throttle_delay_secs)