Source code for

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import logging
import time
from typing import TypeVar

from apache_beam import typehints
from import util
from apache_beam.metrics.metric import Metrics
from apache_beam.transforms import DoFn
from apache_beam.utils.retry import FuzzedExponentialIntervals

T = TypeVar('T')

_LOG = logging.getLogger(__name__)

[docs]@typehints.with_input_types(T) @typehints.with_output_types(T) class RampupThrottlingFn(DoFn): """A ``DoFn`` that throttles ramp-up following an exponential function. An implementation of a client-side throttler that enforces a gradual ramp-up, broadly in line with Datastore best practices. See also """ _BASE_BUDGET = 500 _RAMP_UP_INTERVAL = datetime.timedelta(minutes=5) def __init__(self, num_workers, *unused_args, **unused_kwargs): """Initializes a ramp-up throttler transform. Args: num_workers: A hint for the expected number of workers, used to derive the local rate limit. """ super(RampupThrottlingFn, self).__init__(*unused_args, **unused_kwargs) self._num_workers = num_workers self._successful_ops = util.MovingSum(window_ms=1000, bucket_ms=1000) self._first_instant = self._throttled_secs = Metrics.counter( RampupThrottlingFn, "cumulativeThrottlingSeconds") def _calc_max_ops_budget( self, first_instant: datetime.datetime, current_instant: datetime.datetime): """Function that returns per-second budget according to best practices. The exact function is `500 / num_workers * 1.5^max(0, (x-5)/5)`, where x is the number of minutes since start time. """ timedelta_since_first = current_instant - first_instant growth = max( 0.0, (timedelta_since_first - self._RAMP_UP_INTERVAL) / self._RAMP_UP_INTERVAL) max_ops_budget = int(self._BASE_BUDGET / self._num_workers * (1.5**growth)) return max(1, max_ops_budget)
[docs] def process(self, element, **kwargs): backoff = iter( FuzzedExponentialIntervals(initial_delay_secs=1, num_retries=10000)) while True: instant = max_ops_budget = self._calc_max_ops_budget(self._first_instant, instant) current_op_count = self._successful_ops.sum(instant.timestamp() * 1000) available_ops = max_ops_budget - current_op_count if available_ops > 0: self._successful_ops.add(instant.timestamp() * 1000, 1) yield element break else: backoff_secs = next(backoff) 'Delaying by %sms to conform to gradual ramp-up.', int(1000 * backoff_secs)) time.sleep(backoff_secs)