asyncutils.altlocks

Non-conventional asynchronous synchronization primitives that may not adhere to the traditional lock interface.

Exceptions

ResourceGuard

Reimplementation of anyio.ResourceGuard, as a sync- and async-compatible context manager.

UniqueResourceGuard

A subclass of ResourceGuard that only allows one guard per object. Cannot be further subclassed.

Classes

CircuitBreaker

DynamicThrottle

An async context manager used to limit the rate of a function being called. See also: func.RateLimited, locks.AdvancedRateLimit

Releasing

An async context manager that releases the given lock on entry and re-acquires it on exit.

StatefulBarrier

An async barrier, that unlike traditional barriers, accumulates state from parties in a deque and makes it available once the barrier is tripped.

Module Contents

exception asyncutils.altlocks.ResourceGuard(action: str = ..., rname: object = ...)[source]

Bases: RuntimeError, asyncutils.mixins.AsyncContextMixin[None]

Reimplementation of anyio.ResourceGuard, as a sync- and async-compatible context manager.

action is used in error messages to describe the action being attempted on the resource, such as 'access' or 'close'.
rname is used in error messages to describe the resource by calling its __repr__(); if not passed, an index is
automatically assigned to the resource.
__enter__() None[source]
Throw the resource guard instance, which inherits from RuntimeError itself as an exception, if the resource is already being guarded.
Otherwise, mark the resource as guarded, such that guarded gives True.
__exit__(exc_typ: asyncutils._internal.types.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]
__exit__(exc_typ: None, exc_val: None, exc_tb: None, /) None

Mark the resource as no longer guarded.

classmethod guard(obj: object, /, *, action: str = ...) Self[source]

Alternate constructor which determines the name of the resource from the representation of the object.

property guarded: bool

Whether the resource is currently being guarded.

exception asyncutils.altlocks.UniqueResourceGuard(action: str = ..., rname: object = ...)[source]

Bases: ResourceGuard

A subclass of ResourceGuard that only allows one guard per object. Cannot be further subclassed.

Tip

The object has to be hashable, and a strong reference will be held to it for the lifetime of the guard.

Caution

This class does not stop the object from having an instance of ResourceGuard (or subclass thereof) from guarding it simultaneously.

Implementation detail

Instances are weakly referencable.

action is used in error messages to describe the action being attempted on the resource, such as 'access' or 'close'.
rname is used in error messages to describe the resource by calling its __repr__(); if not passed, an index is
automatically assigned to the resource.
classmethod clear_cache() None[source]

Clear the internal cache mapping guarded objects to their guards. Call only when you are sure no guards are in use.

classmethod guard(obj: object, /, *, action: str = ...) Self[source]
If the object already has a guard, return that guard, regardless of whether it is held. In that case, the action parameter is ignored.
Otherwise, create and return a new guard for the object, using the action parameter in error messages.

Note

The guard is not held upon creation.

Attention

The error will be seen by the user only when they actually try to acquire the guard if it is already held.

class asyncutils.altlocks.CircuitBreaker[source]
The circuit breaker pattern. Use on async functions that may fail often, such as requests to an unreliable server.
Instances can be used as decorators, unless instantiated with a function as the first parameter, in which case the decorated function is returned.
Construct a circuit breaker, whose circuit is initially closed.
If name is passed, use it as its name; return a function wrapping f otherwise, deriving the name of the circuit breaker from the function.
This derivation follows exactly one level of __wrapped__-based wrapping after retrieving the __func__ attribute if present.
Pass exceptions that are expected to happen through the exc parameter.
When the decorated function fails more than max_fails times (default context.CIRCUIT_BREAKER_DEFAULT_MAX_FAILS), the breaker
triggers (opens the circuit, so to say) and disallows further calls of the wrapped functions by throwing an exception.
This state persists until the reset timeout expires (default context.CIRCUIT_BREAKER_DEFAULT_RESET). Then, the breaker enters the
half-open state.
If the function completes successfully when the breaker is half-open under max_half_open_calls (default
context.CIRCUIT_BREAKER_DEFAULT_MAX_HALF_OPEN_CALLS) tries, the circuit closes automatically. Otherwise, the circuit reopens.
__call__[T, **P](
f: collections.abc.Callable[P, collections.abc.Awaitable[T]],
/,
*,
timer: asyncutils._internal.types.Timer = ...,
default: T = ...,
) collections.abc.Callable[P, types.CoroutineType[Any, Any, T]][source]
Apply the circuit breaker to a function f returning an awaitable, and return a wrapper function with the same signature that
strictly returns coroutines.
timer (default time.monotonic()) is used to get the current time to calculate the timeout.
If passed, default is returned if an expected exception is raised, also suppressing that exception.

Caution

Care should be taken when applying the same circuit breaker to multiple functions, as the calls counters will be shared.

CLOSED: ClassVar[int]

The closed state.

HALF_OPEN: ClassVar[int]

The half-open state.

OPEN: ClassVar[int]

The open state.

property fails: int

Current count of conseuctive failures.

property name: str

The name of the circuit breaker, to be shown in error messages.

property state: Literal[0, 1, 2]

The state of the circuit breaker: 0 for closed, 1 for half-open, and 2 for open.

class asyncutils.altlocks.DynamicThrottle(
init_rate: float,
min_rate: float = ...,
max_rate: float = ...,
window: int | None = ...,
*,
ubound: float | None = ...,
lbound: float | None = ...,
ufactor: float | None = ...,
lfactor: float | None = ...,
jitter: float | None = ...,
timer: asyncutils._internal.types.Timer = ...,
rand: collections.abc.Callable[[float], float] = ...,
)[source]

An async context manager used to limit the rate of a function being called. See also: func.RateLimited, locks.AdvancedRateLimit

init_rate (required): The initial rate in calls per second.
min_rate: The minimum rate; default context.DYNAMIC_THROTTLE_DEFAULT_MIN_RATE.
max_rate: The maximum rate; default context.DYNAMIC_THROTTLE_DEFAULT_MAX_RATE.
window: Number of calls, successful or unsuccessful, after which the rate is automatically adjusted; default context.DYNAMIC_THROTTLE_DEFAULT_WINDOW.
ubound: Lower bound of the ratio successes: total calls such that the rate is multiplied by ufactor (default context.DYNAMIC_THROTTLE_DEFAULT_UFACTOR) and clamped to min_rate and max_rate; default context.DYNAMIC_THROTTLE_DEFAULT_UBOUND.
lbound: Upper bound of the above ratio such that the rate is multiplied by lfactor (default context.DYNAMIC_THROTTLE_DEFAULT_LFACTOR) and clamped similarly; default context.DYNAMIC_THROTTLE_DEFAULT_LBOUND.
jitter: The jitter in calculation of the wait time before the context can enter; default context.DYNAMIC_THROTTLE_DEFAULT_JITTER.
timer: Function to return current time as a float.
rand: Function that takes a float (the jitter) and returns a random number within the interval jitter and -jitter.
async __aenter__() None[source]

Wait for the time as computed by the throttler, with some jitter applied, to pass, such that the rate is maintained.

async __aexit__(exc_typ: asyncutils._internal.types.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]
async __aexit__(exc_typ: None, exc_val: None, exc_tb: None, /) None

If an error caused the context manager, increment fails and reraise; otherwise, increment successes. Also adjust the rate if necessary.

reset() None[source]

Reset the counts of successes and fails.

property ctime: float

The current time as returned by timer.

property fails: int

Current number of failed calls. Reset periodically.

property jitter: float

The current jitter in calculating the wait time.

property rate: float

The current rate.

property successes: int

Current number of succeeded calls; reset periodically.

class asyncutils.altlocks.Releasing(lock: asyncutils._internal.types.AsyncLockLike[object], /)[source]

An async context manager that releases the given lock on entry and re-acquires it on exit.

async __aenter__() None[source]

Call the release method of the lock, awaiting if it returns a coroutine.

async __aexit__(exc_typ: asyncutils._internal.types.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]
async __aexit__(exc_typ: None, exc_val: None, exc_tb: None, /) None

Re-enter the lock, propagating errors.

class asyncutils.altlocks.StatefulBarrier[T](parties: int, name: str = ..., *, maxstate: int | None = ...)[source]
class asyncutils.altlocks.StatefulBarrier(parties: int, *, initstate: asyncutils._internal.types.SupportsIteration[T], maxstate: int | None = ...)
class asyncutils.altlocks.StatefulBarrier(
parties: int,
name: str,
initstate: asyncutils._internal.types.SupportsIteration[T],
maxstate: int | None = ...,
)

Bases: asyncutils.mixins.AwaitableMixin[tuple[int, collections.deque[T]]]

An async barrier, that unlike traditional barriers, accumulates state from parties in a deque and makes it available once the barrier is tripped.

parties (required): number of parties required to break the barrier
name: name of the barrier; to appear in error messages
initstate: an iterable storing the initial state; will be exhausted; preferrably not async
maxstate: maximum length of state to store; older state will be expelled
async abort() None[source]

Abort the barrier, signalling BrokenBarrierError to present waiting parties.

raise_for_abort() None[source]

Throw BrokenBarrierError if the barrier has been aborted.

async wait(state: T = ..., *, timeout: float | None = ...) tuple[int, collections.deque[T]][source]
Note that the calling party is waiting for the barrier, optionally adding some state.
If the barrier has already been aborted or broken, raise BrokenBarrierError.
Once enough parties are waiting, all callers receive a tuple (pos, states), where states is the deque of stored state and pos
the number of parties having arrived before this one.
property broken: bool

Whether the barrier is broken.

property n_waiting: int

Number of parties currently waiting.

property parties: int

Total number of parties, arrived or not.

property remaining_parties: int

Number of parties the waiting parties are waiting for.