asyncutils.altlocks

Non-conventional asynchronous synchronization primitives that may not adhere to the traditional lock interface.

Exceptions

ResourceGuard

Reimplementation of anyio.ResourceGuard, as a sync- and async-compatible context manager.

UniqueResourceGuard

A subclass of ResourceGuard that only allows one guard per object. Cannot be further subclassed.

Classes

CircuitBreaker

DynamicThrottle

An async context manager used to limit the rate of a function being called. See also: RateLimited, AdvancedRateLimit

Releasing

An async context manager that releases the given lock on entry and re-acquires it on exit.

StatefulBarrier

An async barrier, that unlike traditional barriers, accumulates state from parties in a deque and makes it available once the barrier is tripped.

Module Contents

exception asyncutils.altlocks.ResourceGuard(action: str = ..., rname: object = ...)[source]

Bases: RuntimeError, asyncutils.mixins.AsyncContextMixin[None]

Reimplementation of anyio.ResourceGuard, as a sync- and async-compatible context manager.

action is used in error messages to describe the action being attempted on the resource, such as 'access' or 'close'.
rname is used in error messages to describe the resource by calling its __repr__(); if not passed, an index is automatically assigned to the resource.
__enter__() None[source]
Throw the resource guard instance, which inherits from RuntimeError itself as an exception, if the resource is already being guarded.
Otherwise, mark the resource as guarded, such that guarded gives True.
__exit__(exc_typ: asyncutils._internal.prots.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]
__exit__(exc_typ: None, exc_val: None, exc_tb: None, /) None

Mark the resource as no longer guarded.

classmethod guard(obj: object, /, *, action: str = ...) Self[source]

Alternate constructor which determines the name of the resource from the representation of the object.

property guarded: bool

Whether the resource is currently being guarded.

exception asyncutils.altlocks.UniqueResourceGuard(action: str = ..., rname: object = ...)[source]

Bases: ResourceGuard

A subclass of ResourceGuard that only allows one guard per object. Cannot be further subclassed.

Tip

The object has to be hashable, and a strong reference will be held to it for the lifetime of the guard.

Caution

This class does not stop the object from having an instance of ResourceGuard (or subclass thereof) from guarding it simultaneously.

Implementation detail

Instances are weakly referencable.

action is used in error messages to describe the action being attempted on the resource, such as 'access' or 'close'.
rname is used in error messages to describe the resource by calling its __repr__(); if not passed, an index is automatically assigned to the resource.
classmethod clear_cache() None[source]

Clear the internal cache mapping guarded objects to their guards. Call only when you are sure no guards are in use.

classmethod guard(obj: object, /, *, action: str = ...) Self[source]
If the object already has a guard, return that guard, regardless of whether it is held. In that case, the action parameter is ignored.
Otherwise, create and return a new guard for the object, using the action parameter in error messages.

Note

The guard is not held upon creation.

Attention

The error will be seen by the user only when they actually try to acquire the guard if it is already held.

class asyncutils.altlocks.CircuitBreaker[source]
The circuit breaker pattern. Use on async functions that may fail often, such as requests to an unreliable server.
Instances can be used as decorators, unless instantiated with a function as the first parameter, in which case the decorated function is returned.
Construct a circuit breaker, whose circuit is initially closed.
If name is passed, use it as its name; return a function wrapping f otherwise, deriving the name of the circuit breaker from the function. This derivation follows exactly one level of __wrapped__-based wrapping after retrieving the __func__ attribute if present.
Pass exceptions that are expected to happen through the exc parameter.
When the decorated function fails more than max_fails times (default CIRCUIT_BREAKER_DEFAULT_MAX_FAILS), the breaker triggers (opens the circuit, so to say) and disallows further calls of the wrapped functions by throwing an exception.
This state persists until the reset timeout expires (default CIRCUIT_BREAKER_DEFAULT_RESET). Then, the breaker enters the half-open state.
If the function completes successfully when the breaker is half-open under max_half_open_calls (default CIRCUIT_BREAKER_DEFAULT_MAX_HALF_OPEN_CALLS) tries, the circuit closes automatically. Otherwise, the circuit reopens.
__call__[T, **P](
f: collections.abc.Callable[P, collections.abc.Awaitable[T]],
/,
*,
timer: asyncutils._internal.prots.Timer = ...,
default: T = ...,
) collections.abc.Callable[P, types.CoroutineType[Any, Any, T]][source]
Apply the circuit breaker to a function f returning an awaitable, and return a wrapper function with the same signature that strictly returns coroutines.
timer (default time.monotonic()) is used to get the current time to calculate the timeout.
If passed, default is returned if an expected exception is raised, also suppressing that exception.

Caution

Care should be taken when applying the same circuit breaker to multiple functions, as the calls counters will be shared.

CLOSED: ClassVar[int]

The closed state.

HALF_OPEN: ClassVar[int]

The half-open state.

OPEN: ClassVar[int]

The open state.

property fails: int

Current count of conseuctive failures.

property name: str

The name of the circuit breaker, to be shown in error messages.

property state: Literal[0, 1, 2]

The state of the circuit breaker: 0 for closed, 1 for half-open, and 2 for open.

class asyncutils.altlocks.DynamicThrottle(
init_rate: float,
min_rate: float = ...,
max_rate: float = ...,
window: int | None = ...,
*,
ubound: float | None = ...,
lbound: float | None = ...,
ufactor: float | None = ...,
lfactor: float | None = ...,
jitter: float | None = ...,
timer: asyncutils._internal.prots.Timer = ...,
rand: collections.abc.Callable[[float], float] = ...,
)[source]

An async context manager used to limit the rate of a function being called. See also: RateLimited, AdvancedRateLimit

async __aenter__() None[source]

Wait for the time as computed by the throttler, with some jitter applied, to pass, such that the rate is maintained.

async __aexit__(exc_typ: asyncutils._internal.prots.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]
async __aexit__(exc_typ: None, exc_val: None, exc_tb: None, /) None

If an error caused the context manager, increment fails and reraise; otherwise, increment successes. Also adjust the rate if necessary.

reset() None[source]

Reset the counts of successes and fails.

property ctime: float

The current time as returned by timer.

property fails: int

Current number of failed calls. Reset periodically.

property jitter: float

The current jitter in calculating the wait time.

property rate: float

The current rate.

property successes: int

Current number of succeeded calls; reset periodically.

class asyncutils.altlocks.Releasing(lock: asyncutils._internal.prots.AsyncLockLike[object], /)[source]

An async context manager that releases the given lock on entry and re-acquires it on exit.

async __aenter__() None[source]

Call the release method of the lock, awaiting if it returns a coroutine.

async __aexit__(exc_typ: asyncutils._internal.prots.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]
async __aexit__(exc_typ: None, exc_val: None, exc_tb: None, /) None

Re-enter the lock, propagating errors.

class asyncutils.altlocks.StatefulBarrier[T](parties: int, name: str = ..., *, maxstate: int | None = ...)[source]
class asyncutils.altlocks.StatefulBarrier(parties: int, *, initstate: asyncutils._internal.prots.SupportsIteration[T], maxstate: int | None = ...)
class asyncutils.altlocks.StatefulBarrier(
parties: int,
name: str,
initstate: asyncutils._internal.prots.SupportsIteration[T],
maxstate: int | None = ...,
)

Bases: asyncutils.mixins.AwaitableMixin[tuple[int, collections.deque[T]]]

An async barrier, that unlike traditional barriers, accumulates state from parties in a deque and makes it available once the barrier is tripped.

  • parties (required): number of parties required to break the barrier

  • name: name of the barrier; to appear in error messages

  • initstate: an iterable storing the initial state; will be exhausted; preferrably not async

  • maxstate: maximum length of state to store; older state will be expelled

async abort() None[source]

Abort the barrier, signalling BrokenBarrierError to present waiting parties.

raise_for_abort() None[source]

Throw BrokenBarrierError if the barrier has been aborted.

async wait(state: T = ..., *, timeout: float | None = ...) tuple[int, collections.deque[T]][source]
Note that the calling party is waiting for the barrier, optionally adding some state.
If the barrier has already been aborted or broken, raise BrokenBarrierError.
Once enough parties are waiting, all callers receive a tuple (pos, states), where states is the deque of stored state and pos the number of parties having arrived before this one.
property broken: bool

Whether the barrier is broken.

property n_waiting: int

Number of parties currently waiting.

property parties: int

Total number of parties, arrived or not.

property remaining_parties: int

Number of parties the waiting parties are waiting for.