asyncutils.buckets

Classes

Module Contents

class asyncutils.buckets.LeakyBucket(
capacity: float,
leak: float,
min_factor: float = ...,
max_factor: float = ...,
external_factor_settable: bool = ...,
timer: asyncutils._internal.prots.Timer = ...,
)[source]

Bases: asyncutils.mixins.AsyncContextMixin[LeakyBucket], asyncutils._internal.helpers.LoopMixinBase

A leaky bucket rate limiter with adaptive flow control. Use as a context manager.
In the context, tokens leak from the bucket at a constant rate. Operations can add tokens to the bucket.
The bucket includes an adaptive factor that adjusts based on current fill level to provide smoother rate limiting under varying loads, as dictated by LEAKY_BUCKET_ADJMAP, a sequence of tuples (mincap, (lbound, lfactor, ubound, ufactor)) monotonically descending in mincap.
__exit__(exc_typ: asyncutils._internal.prots.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]
__exit__(exc_typ: None, exc_val: None, exc_tb: None, /) None

Stop draining the tokens in the bucket.

async acquire(amount: float = ...) bool[source]

Attempt to add amount tokens to the bucket immediately (default LEAKY_BUCKET_DEFAULT_ACQUIRE_TOKENS); return success.

async wait_for_tokens(amount: float = ...) float[source]

Keep asynchronously sleeping, with a maximum interval of LEAKY_BUCKET_WAIT_FOR_TOKENS_TICK each time, until amount tokens can be added to the bucket at once (default LEAKY_BUCKET_DEFAULT_WAIT_FOR_TOKENS_TOKENS), and do so, returning the total wait time.

Note

The sleep interval is calculated based on the amount of tokens requested, the current number of tokens, the bucket capacity, the leaking rate and the current factor.

property factor: float

The current adaptive factor.

class asyncutils.buckets.TokenBucket(rate: float, capacity: float, timer: asyncutils._internal.prots.Timer = ...)[source]
A token bucket rate limiter that controls the rate of operations.
The bucket fills up with tokens at a fixed rate, with each operation consuming a certain amount of tokens.
If there are not enough tokens, the operation must wait until there are.
  • rate: The number of tokens the bucket gains per time interval as a float, as defined by the timer

  • capacity: The maximum number of tokens the bucket can hold as a float

  • timer (optional): A function such as time.time() that returns the current time; default monotonic()

async consume(tokens: float = ...) None[source]

Consume tokens from the bucket as described. The default amount to consume if tokens is not passed can be set through TOKEN_BUCKET_DEFAULT_CONSUME_TOKENS.

property capacity: float

The capacity of the bucket.