asyncutils.altlocks¶
Non-conventional asynchronous synchronization primitives that may not adhere to the traditional lock interface.
Exceptions¶
Reimplementation of |
|
A subclass of |
Classes¶
An async context manager used to limit the rate of a function being called. See also: |
|
An async context manager that releases the given lock on entry and re-acquires it on exit. |
|
An async barrier, that unlike traditional barriers, accumulates state from parties in a deque and makes it available once the barrier is tripped. |
Module Contents¶
- exception asyncutils.altlocks.ResourceGuard(action: str = ..., rname: object = ...)[source]¶
Bases:
RuntimeError,asyncutils.mixins.AsyncContextMixin[None]Reimplementation of
anyio.ResourceGuard, as a sync- and async-compatible context manager.actionis used in error messages to describe the action being attempted on the resource, such as'access'or'close'.rnameis used in error messages to describe the resource by calling its__repr__(); if not passed, an index isautomatically assigned to the resource.- __enter__() None[source]¶
- Throw the resource guard instance, which inherits from
RuntimeErroritself as an exception, if the resource is already being guarded.Otherwise, mark the resource as guarded, such thatguardedgivesTrue.
- __exit__(exc_typ: asyncutils._internal.types.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]¶
- __exit__(exc_typ: None, exc_val: None, exc_tb: None, /) None
Mark the resource as no longer guarded.
- exception asyncutils.altlocks.UniqueResourceGuard(action: str = ..., rname: object = ...)[source]¶
Bases:
ResourceGuardA subclass of
ResourceGuardthat only allows one guard per object. Cannot be further subclassed.Tip
The object has to be hashable, and a strong reference will be held to it for the lifetime of the guard.
Caution
This class does not stop the object from having an instance of
ResourceGuard(or subclass thereof) from guarding it simultaneously.Implementation detail
Instances are weakly referencable.
actionis used in error messages to describe the action being attempted on the resource, such as'access'or'close'.rnameis used in error messages to describe the resource by calling its__repr__(); if not passed, an index isautomatically assigned to the resource.- classmethod clear_cache() None[source]¶
Clear the internal cache mapping guarded objects to their guards. Call only when you are sure no guards are in use.
- classmethod guard(obj: object, /, *, action: str = ...) Self[source]¶
- If the object already has a guard, return that guard, regardless of whether it is held. In that case, the
actionparameter is ignored.Otherwise, create and return a new guard for the object, using theactionparameter in error messages.Note
The guard is not held upon creation.
Attention
The error will be seen by the user only when they actually try to acquire the guard if it is already held.
- class asyncutils.altlocks.CircuitBreaker[source]¶
- The circuit breaker pattern. Use on async functions that may fail often, such as requests to an unreliable server.Instances can be used as decorators, unless instantiated with a function as the first parameter, in which case the decorated function is returned.Construct a circuit breaker, whose circuit is initially closed.If
nameis passed, use it as its name; return a function wrappingfotherwise, deriving the name of the circuit breaker from the function.This derivation follows exactly one level of__wrapped__-based wrapping after retrieving the__func__attribute if present.Pass exceptions that are expected to happen through theexcparameter.When the decorated function fails more thanmax_failstimes (defaultcontext.CIRCUIT_BREAKER_DEFAULT_MAX_FAILS), the breakertriggers (opens the circuit, so to say) and disallows further calls of the wrapped functions by throwing an exception.This state persists until theresettimeout expires (defaultcontext.CIRCUIT_BREAKER_DEFAULT_RESET). Then, the breaker enters thehalf-open state.If the function completes successfully when the breaker is half-open undermax_half_open_calls(defaultcontext.CIRCUIT_BREAKER_DEFAULT_MAX_HALF_OPEN_CALLS) tries, the circuit closes automatically. Otherwise, the circuit reopens.- __call__[T, **P](
- f: collections.abc.Callable[P, collections.abc.Awaitable[T]],
- /,
- *,
- timer: asyncutils._internal.types.Timer = ...,
- default: T = ...,
- Apply the circuit breaker to a function
freturning an awaitable, and return a wrapper function with the same signature thatstrictly returns coroutines.timer(defaulttime.monotonic()) is used to get the current time to calculate the timeout.If passed,defaultis returned if an expected exception is raised, also suppressing that exception.Caution
Care should be taken when applying the same circuit breaker to multiple functions, as the calls counters will be shared.
- property state: Literal[0, 1, 2]¶
The state of the circuit breaker: 0 for closed, 1 for half-open, and 2 for open.
- class asyncutils.altlocks.DynamicThrottle(
- init_rate: float,
- min_rate: float = ...,
- max_rate: float = ...,
- window: int | None = ...,
- *,
- ubound: float | None = ...,
- lbound: float | None = ...,
- ufactor: float | None = ...,
- lfactor: float | None = ...,
- jitter: float | None = ...,
- timer: asyncutils._internal.types.Timer = ...,
- rand: collections.abc.Callable[[float], float] = ...,
An async context manager used to limit the rate of a function being called. See also:
func.RateLimited,locks.AdvancedRateLimitinit_rate(required): The initial rate in calls per second.min_rate: The minimum rate; defaultcontext.DYNAMIC_THROTTLE_DEFAULT_MIN_RATE.max_rate: The maximum rate; defaultcontext.DYNAMIC_THROTTLE_DEFAULT_MAX_RATE.window: Number of calls, successful or unsuccessful, after which the rate is automatically adjusted; defaultcontext.DYNAMIC_THROTTLE_DEFAULT_WINDOW.ubound: Lower bound of the ratio successes: total calls such that the rate is multiplied byufactor(defaultcontext.DYNAMIC_THROTTLE_DEFAULT_UFACTOR) and clamped tomin_rateandmax_rate; defaultcontext.DYNAMIC_THROTTLE_DEFAULT_UBOUND.lbound: Upper bound of the above ratio such that the rate is multiplied bylfactor(defaultcontext.DYNAMIC_THROTTLE_DEFAULT_LFACTOR) and clamped similarly; defaultcontext.DYNAMIC_THROTTLE_DEFAULT_LBOUND.jitter: The jitter in calculation of the wait time before the context can enter; defaultcontext.DYNAMIC_THROTTLE_DEFAULT_JITTER.timer: Function to return current time as a float.rand: Function that takes a float (the jitter) and returns a random number within the intervaljitterand-jitter.- async __aenter__() None[source]¶
Wait for the time as computed by the throttler, with some jitter applied, to pass, such that the rate is maintained.
- async __aexit__(exc_typ: asyncutils._internal.types.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]¶
- async __aexit__(exc_typ: None, exc_val: None, exc_tb: None, /) None
If an error caused the context manager, increment
failsand reraise; otherwise, incrementsuccesses. Also adjust the rate if necessary.
- class asyncutils.altlocks.Releasing(lock: asyncutils._internal.types.AsyncLockLike[object], /)[source]¶
An async context manager that releases the given lock on entry and re-acquires it on exit.
- async __aenter__() None[source]¶
Call the release method of the lock, awaiting if it returns a coroutine.
- async __aexit__(exc_typ: asyncutils._internal.types.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]¶
- async __aexit__(exc_typ: None, exc_val: None, exc_tb: None, /) None
Re-enter the lock, propagating errors.
- class asyncutils.altlocks.StatefulBarrier[T](parties: int, name: str = ..., *, maxstate: int | None = ...)[source]¶
- class asyncutils.altlocks.StatefulBarrier(parties: int, *, initstate: asyncutils._internal.types.SupportsIteration[T], maxstate: int | None = ...)
- class asyncutils.altlocks.StatefulBarrier(
- parties: int,
- name: str,
- initstate: asyncutils._internal.types.SupportsIteration[T],
- maxstate: int | None = ...,
Bases:
asyncutils.mixins.AwaitableMixin[tuple[int,collections.deque[T]]]An async barrier, that unlike traditional barriers, accumulates state from parties in a deque and makes it available once the barrier is tripped.
parties(required): number of parties required to break the barriername: name of the barrier; to appear in error messagesinitstate: an iterable storing the initial state; will be exhausted; preferrably not asyncmaxstate: maximum length of state to store; older state will be expelled- async abort() None[source]¶
Abort the barrier, signalling
BrokenBarrierErrorto present waiting parties.
- raise_for_abort() None[source]¶
Throw
BrokenBarrierErrorif the barrier has been aborted.
- async wait(state: T = ..., *, timeout: float | None = ...) tuple[int, collections.deque[T]][source]¶
- Note that the calling party is waiting for the barrier, optionally adding some state.If the barrier has already been aborted or broken, raise
BrokenBarrierError.Once enough parties are waiting, all callers receive a tuple(pos, states), wherestatesis the deque of stored state andposthe number of parties having arrived before this one.