asyncutils.buckets

Classes

Module Contents

class asyncutils.buckets.LeakyBucket(
capacity: float,
leak: float,
min_factor: float = ...,
max_factor: float = ...,
external_factor_settable: bool = ...,
timer: asyncutils._internal.types.Timer = ...,
)[source]

Bases: asyncutils.mixins.AsyncContextMixin[LeakyBucket], asyncutils._internal.helpers.LoopMixinBase

A leaky bucket rate limiter with adaptive flow control. Use as a context manager.
In the context, tokens leak from the bucket at a constant rate. Operations can add tokens to the bucket.
The bucket includes an adaptive factor that adjusts based on current fill level to provide smoother rate limiting under varying loads, as dictated by
context.LEAKY_BUCKET_ADJMAP, a sequence of tuples (mincap, (lbound, lfactor, ubound, ufactor)) monotonically descending in mincap.
capacity (required): The maximum number of tokens the bucket can hold
leak (required): The rate at which tokens leak from the bucket
min_factor: Minimum adaptive factor; default context.LEAKY_BUCKET_DEFAULT_MIN_FACTOR.
max_factor: Maximum adaptive factor; default context.LEAKY_BUCKET_DEFAULT_MAX_FACTOR.
external_factor_settable: Whether the factor attribute can be modified; default context.LEAKY_BUCKET_DEFAULT_EXT_CAN_SET_FACTOR.
__exit__(exc_typ: asyncutils._internal.types.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]
__exit__(exc_typ: None, exc_val: None, exc_tb: None, /) None

Stop draining the tokens in the bucket.

async acquire(amount: float = ...) bool[source]

Attempt to add amount tokens to the bucket immediately (default context.LEAKY_BUCKET_DEFAULT_ACQUIRE_TOKENS); return success.

async wait_for_tokens(amount: float = ...) float[source]
Keep asynchronously sleeping, with a maximum interval of context.LEAKY_BUCKET_WAIT_FOR_TOKENS_TICK each time, until amount
tokens can be added to the bucket at once (default context.LEAKY_BUCKET_DEFAULT_WAIT_FOR_TOKENS_TOKENS), and do so, returning
the total wait time.

Note

The sleep interval is calculated based on the amount of tokens requested, the current number of tokens, the bucket capacity, the leaking rate and the current factor.

property factor: float

The current adaptive factor.

class asyncutils.buckets.TokenBucket(rate: float, capacity: float, timer: asyncutils._internal.types.Timer = ...)[source]
A token bucket rate limiter that controls the rate of operations.
The bucket fills up with tokens at a fixed rate, with each operation consuming a certain amount of tokens.
If there are not enough tokens, the operation must wait until there are.
rate: The number of tokens the bucket gains per time interval as a float, as defined by the timer
capacity: The maximum number of tokens the bucket can hold as a float
timer (optional): A function such as time.time() that returns the current time; default time.monotonic()
async consume(tokens: float = ...) None[source]

Consume tokens from the bucket as described. The default amount to consume if tokens is not passed can be set through context.TOKEN_BUCKET_DEFAULT_CONSUME_TOKENS.

property capacity: float

The capacity of the bucket.