apache_beam.io.components.adaptive_throttler module
- class apache_beam.io.components.adaptive_throttler.ThrottlingSignaler(namespace: str = '')[source]
Bases:
object
A class that handles signaling throttling of remote requests to the SDK harness.
- class apache_beam.io.components.adaptive_throttler.AdaptiveThrottler(window_ms, bucket_ms, overload_ratio)[source]
Bases:
object
Implements adaptive throttling.
See https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg for a full discussion of the use case and algorithm applied.
Initializes AdaptiveThrottler.
- Parameters:
window_ms – int, length of history to consider, in ms, to set throttling.
bucket_ms – int, granularity of time buckets that we store data in, in ms.
overload_ratio – float, the target ratio between requests sent and successful requests. This is “K” in the formula in https://landing.google.com/sre/book/chapters/handling-overload.html.
- MIN_REQUESTS = 1
- throttle_request(now)[source]
Determines whether one RPC attempt should be throttled.
This should be called once each time the caller intends to send an RPC; if it returns true, drop or delay that request (calling this function again after the delay).
- Parameters:
now – int, time in ms since the epoch
- Returns:
bool, True if the caller should throttle or delay the request.
- class apache_beam.io.components.adaptive_throttler.ReactiveThrottler(window_ms: int, bucket_ms: int, overload_ratio: float, namespace: str = '', throttle_delay_secs: int = 5)[source]
Bases:
AdaptiveThrottler
A wrapper around the AdaptiveThrottler that also handles logging and signaling throttling to the SDK harness using the provided namespace.
For usage, instantiate one instance of a ReactiveThrottler class for a PTransform. When making remote calls to a service, preface that call with the throttle() method to potentially pre-emptively throttle the request. This will throttle future calls based on the failure rate of preceding calls, with higher failure rates leading to longer periods of throttling to allow system recovery. capture the timestamp of the attempted request, then execute the request code. On a success, call successful_request(timestamp) to report the success to the throttler. This flow looks like the following:
- def remote_call():
throttler.throttle()
- try:
timestamp = time.time() result = make_request() throttler.successful_request(timestamp) return result
- except Exception as e:
# do any error handling you want to do raise
Initializes the ReactiveThrottler.
- Parameters:
window_ms – int, length of history to consider, in ms, to set throttling.
bucket_ms – int, granularity of time buckets that we store data in, in ms.
overload_ratio – float, the target ratio between requests sent and successful requests. This is “K” in the formula in https://landing.google.com/sre/book/chapters/handling-overload.html.
namespace – str, the namespace to use for logging and signaling throttling is occurring
throttle_delay_secs – int, the amount of time in seconds to wait after preemptively throttled requests
- throttle()[source]
Stops request code from advancing while the underlying AdaptiveThrottler is signaling to preemptively throttle the request. Automatically handles logging the throttling and signaling to the SDK harness that the request is being throttled. This should be called in any context where a call to a remote service is being contacted prior to the call being performed.