#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Utility functions & classes that are _not_ specific to the datastore client.
#
# For internal use only; no backwards-compatibility guarantees.
# pytype: skip-file
import random
from apache_beam.io.components import util
[docs]class AdaptiveThrottler(object):
"""Implements adaptive throttling.
See
https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg
for a full discussion of the use case and algorithm applied.
"""
# The target minimum number of requests per samplePeriodMs, even if no
# requests succeed. Must be greater than 0, else we could throttle to zero.
# Because every decision is probabilistic, there is no guarantee that the
# request rate in any given interval will not be zero. (This is the +1 from
# the formula in
# https://landing.google.com/sre/book/chapters/handling-overload.html )
MIN_REQUESTS = 1
def __init__(self, window_ms, bucket_ms, overload_ratio):
"""Initializes AdaptiveThrottler.
Args:
window_ms: int, length of history to consider, in ms, to set
throttling.
bucket_ms: int, granularity of time buckets that we store data in, in
ms.
overload_ratio: float, the target ratio between requests sent and
successful requests. This is "K" in the formula in
https://landing.google.com/sre/book/chapters/handling-overload.html.
"""
self._all_requests = util.MovingSum(window_ms, bucket_ms)
self._successful_requests = util.MovingSum(window_ms, bucket_ms)
self._overload_ratio = float(overload_ratio)
self._random = random.Random()
def _throttling_probability(self, now):
if not self._all_requests.has_data(now):
return 0
all_requests = self._all_requests.sum(now)
successful_requests = self._successful_requests.sum(now)
return max(
0, (all_requests - self._overload_ratio * successful_requests) /
(all_requests + AdaptiveThrottler.MIN_REQUESTS))
[docs] def throttle_request(self, now):
"""Determines whether one RPC attempt should be throttled.
This should be called once each time the caller intends to send an RPC; if
it returns true, drop or delay that request (calling this function again
after the delay).
Args:
now: int, time in ms since the epoch
Returns:
bool, True if the caller should throttle or delay the request.
"""
throttling_probability = self._throttling_probability(now)
self._all_requests.add(now, 1)
return self._random.uniform(0, 1) < throttling_probability
[docs] def successful_request(self, now):
"""Notifies the throttler of a successful request.
Must be called once for each request (for which throttle_request was
previously called) that succeeded.
Args:
now: int, time in ms since the epoch
"""
self._successful_requests.add(now, 1)