asyncutils.locks

Locking primitives, more advanced than or supplementing the functionality of those in asyncio.
All classes strictly follow the asynchronous lock interface as defined by asyncio.Lock and made explicit in the
AsyncLockLike protocol, besides MultiCountDownLatch, since it uses
KeyedCondition internally and it is not desired for asyncutils.altlocks to import this submodule as well.

Classes

AdvancedRateLimit

A rate limiter that supports a mode in which waiters can cut the queue.

DynamicBoundedSemaphore

A subclass of asyncio.BoundedSemaphore whose bound can be set by the user via the bound property.

KeyedCondition

A condition variable that allows waiting on and notifying individual keys, or all keys at once.

MultiCountDownLatch

A collection of count-down latches, each identified by a key, supporting waiting on individual latches or on all of them at once.

PriorityLock

A lock allowing waiters with a lower priority value to enter first.

PriorityRLock

A reentrant lock supporting priority.

PrioritySemaphore

A semaphore that allows waiters with a lower priority value to enter first.

RLock

An async reentrant lock that is somehow missing from asyncio.

Module Contents

class asyncutils.locks.AdvancedRateLimit(rate: float, capacity: float = ..., fair: bool = ...)[source]

Bases: asyncutils._internal.helpers.LoopMixinBase, asyncutils.mixins.LockMixin[None]

A rate limiter that supports a mode in which waiters can cut the queue.

rate (required): The initial rate at which tokens refill.
capacity: The maximum rate, defaulting to the current rate.
fair: Whether to maintain FIFO (first in, first out) for waiters; default True.
async acquire(tokens: float = ..., timeout: float | None = ...) Literal[True][source]

Acquire the specified number of tokens from the rate limiter (default context.ADVANCED_RATE_LIMIT_DEFAULT_TOKENS), waiting until the timeout expires and signalling TimeoutError if necessary.

locked() bool[source]

Return True if the limiter is currently locked, such that acquire() must block to wait for tokens.

async release(tokens: float = ...) None[source]

Release the specified number of tokens back to the rate limiter (default context.ADVANCED_RATE_LIMIT_DEFAULT_TOKENS).

async set_rate(new: float) None[source]

Set the rate of the limiter to new.

update_tokens_lock_held() None[source]

Perform necessary processing before further operations on the token count. It is guaranteed that the rate limiter only calls this when holding the internal lock.

property capacity: float

Return the capacity of the limiter.

property rate: float

Return the current rate of the limiter.

property tokens: float

Return the current number of tokens available.

class asyncutils.locks.DynamicBoundedSemaphore(value: int = ...)[source]

Bases: asyncio.BoundedSemaphore

A subclass of asyncio.BoundedSemaphore whose bound can be set by the user via the bound property.

value, the initial value of the semaphore, defaults to context.DYNAMIC_BOUNDED_SEMAPHORE_DEFAULT_VALUE.

property bound: int

Return the bound of the semaphore.

class asyncutils.locks.KeyedCondition[T](lock: asyncio.Lock | asyncutils.mixins.LockMixin[Any] | None = ...)[source]

Bases: asyncutils.mixins.LockMixin[KeyedCondition[T]], asyncutils.mixins.LoopContextMixin

A condition variable that allows waiting on and notifying individual keys, or all keys at once.

Initialize the condition variable with the given lock, or create a new one if not passed.

async acquire() bool[source]

Wrap the acquire method of the underlying lock to only reraise critical errors and return success.

assert_locked() None[source]

Assert that the underlying lock is currently locked, and raise a RuntimeError if not.

locked() bool[source]

Return whether the underlying lock is currently locked.

notify(key: T, n: int = ..., strict: bool = ...) None[source]

Notify n waiters waiting on the given key key (default 1). If strict is True and the key doesn’t exist, KeyError is raised.

notify_all(key: T | None = ...) int[source]

Notify all waiters waiting on the given key key, or all waiters if key is None. Return the number of waiters that were notified.

async release() None[source]

Await the release method of the underlying lock if it is a coroutine.

async wait(key: T, timeout: float | None = ...) None[source]

Wait for the given key key to be notified within timeout.

async wait_all(timeout: float | None = ...) None[source]

Wait for all the current waiters to be notified within timeout.

async wait_for(key: T, pred: collections.abc.Callable[[], bool], per_wait_timeout: float | None = ...) None[source]

Keep waiting for the given key key to be notified within per_wait_timeout seconds until the predicate pred returns True.

class asyncutils.locks.MultiCountDownLatch[H: collections.abc.Hashable](counts: collections.abc.Mapping[H, int])[source]

A collection of count-down latches, each identified by a key, supporting waiting on individual latches or on all of them at once.

Initialize the latch with the given mapping of keys to counts. No more keys can be added after this stage.

async count_down(key: H, strict: bool = ...) None[source]

Decrement the count of the latch with the given key by one. If it reaches zero, wake up all waiters. If strict is True and the key doesn’t exist, KeyError is raised.

async count_down_all() None[source]

Decrement the count of each latch by one.

async wait(key: H, strict: bool = ...) None[source]

Wait for the latch with the given key to reach zero. If strict is True and the key doesn’t exist, KeyError is raised.

async wait_all(timeout: float | None = ...) None[source]

Wait for the count of all latches to reach zero.

property broken: bool

If this returns True, it means that wait_all() will return immediately.

class asyncutils.locks.PriorityLock[source]

Bases: asyncutils._internal.helpers.LoopMixinBase, asyncutils.mixins.LockWithOwnerMixin[None]

A lock allowing waiters with a lower priority value to enter first.

_release(raise_: bool = ...) None[source]
async acquire(priority: int = ..., timeout: float | None = ...) bool[source]
locked() bool[source]
property is_owner: bool
class asyncutils.locks.PriorityRLock[source]

Bases: RLock

A reentrant lock supporting priority.

async acquire(priority: int = ..., timeout: float | None = ...) bool[source]
property owner: asyncio.Task[Any] | None
class asyncutils.locks.PrioritySemaphore(value: int = ...)[source]

Bases: asyncutils._internal.helpers.LoopMixinBase, asyncutils.mixins.LockMixin[None]

A semaphore that allows waiters with a lower priority value to enter first.

value, the initial value as an integer, defaults to context.PRIORITY_SEMAPHORE_DEFAULT_VALUE.

async acquire(priority: int = ...) Literal[True][source]

Acquire the semaphore with the specified priority (default context.PRIORITY_SEMAPHORE_DEFAULT_PRIORITY).

locked() bool[source]

Return True if the semaphore is currently locked.

release(strict: bool = ...) None[source]

Release the semaphore. If strict is True (the default) and the number of releases is more than the number of acquisitions, a RuntimeError is raised.

reset() None[source]

Reset the semaphore to its initial state.

class asyncutils.locks.RLock(lock: asyncutils._internal.types.AsyncContextManager[Any] | None = ...)[source]

Bases: asyncutils.mixins.LockWithOwnerMixin[None]

An async reentrant lock that is somehow missing from asyncio.

_release() None[source]
async acquire() Literal[True][source]
locked() bool[source]
property is_owner: bool