asyncutils.misc¶
Utilities that cannot be easily classified into any submodule.
Classes¶
A utility class to store synchronous callbacks and call them sequentially in an executor when the context manager exits. |
|
A simple asynchronous state machine accepting string states. |
Functions¶
Module Contents¶
- class asyncutils.misc.CacheWithBackgroundRefresh[T, R](
- ttl: float | None = ...,
- refresh: float | None = ...,
- *,
- default_loader: collections.abc.Callable[[T], R],
- processor: collections.abc.Callable[[BaseException, bool], object] = ...,
- timer: asyncutils._internal.types.Timer = ...,
- class asyncutils.misc.CacheWithBackgroundRefresh(
- ttl: float | None = ...,
- refresh: float | None = ...,
- *,
- processor: collections.abc.Callable[[BaseException, bool], object] = ...,
- timer: asyncutils._internal.types.Timer = ...,
Bases:
asyncutils.mixins.LoopContextMixinA cache that automatically refreshes entries in the background before expiry. Use as an async context manager only.Maintains entries with TTL values and proactively reloads their values from registered loaders in the background when they approach expiration.This ensures availability of fresh data without blocking get operations.All arguments are optioanl:
ttl: Time-to-live in seconds; defaultcontext.BACKGROUND_REFRESH_CACHE_DEFAULT_TTL.refresh: Time before TTL expires to begin the refresh; defaultcontext.BACKGROUND_REFRESH_CACHE_DEFAULT_REFRESH.processor: Error handler that takes two arguments(exc, was_batched), whereexcis the exception occurred andwas_batchedwhether the exception was thrown during a batch refresh, in contrast to a single-item refresh.default_loader: The loader to load values from keys for which specific loaders have not been registered.
- configure(ttl: float, refresh: float, processor: collections.abc.Callable[[BaseException, bool], object] = ...) None[source]¶
(Re-)configure the cache with the given
ttl,refreshandprocessor.
- async get(key: T, loader: collections.abc.Callable[[T], R] | None = ...) R[source]¶
- Get the value for the key from the cache.If the key is expired, it is immediately loaded; if it is within the refresh window, return the current value and trigger background refresh.
- get_loader(key: T) collections.abc.Callable[[T], R][source]¶
Get the loader registered for the key, raising
LookupErrorif there is none.
- async invalidate(key: T) R | None[source]¶
Remove a key from the cache, returning the corresponding value if it was in the cache.
- async refresh_loop() NoReturn[source]¶
This task runs continuously in the background, checking for entries requiring refresh and spawning tasks to do so.
- register_loader(key: T, loader: collections.abc.Callable[[T], R]) None[source]¶
Register a specific loader for the key, that will take precedence over the default (if any).
- class asyncutils.misc.CallbackAccumulator[T, **P](
- name: str,
- it: asyncutils._internal.types.SupportsIteration[collections.abc.Callable[P, T]],
- maxlen: int | None = ...,
- default: object = ...,
- call_once: bool = ...,
- default_getter: collections.abc.Callable[[], tuple[collections.abc.Iterable[object], collections.abc.Mapping[str, object]]] = ...,
- class asyncutils.misc.CallbackAccumulator(
- name: str,
- *,
- maxlen: int | None = ...,
- default: object = ...,
- call_once: bool = ...,
- default_getter: collections.abc.Callable[[], tuple[collections.abc.Iterable[object], collections.abc.Mapping[str, object]]] = ...,
Bases:
collections.deque[collections.abc.Callable[P,T]],asyncutils.mixins.ExecutorRequiredAsyncContextMixin[CallbackAccumulator[T,P]]A utility class to store synchronous callbacks and call them sequentially in an executor when the context manager exits.
Tip
To iterate through the callbacks at this moment safely, use the
callbacksattribute.Note
This class is no longer used by the pools after a massive rewrite, and only remains here for backwards compatibility.
Implementation detail
The fact that this class currently subclasses
dequeis subject to change.Initialize the accumulator.
nameis the name of attribute gotten on the argument toadd().maxlenis the maximum number of callbacks that can be stored.defaultis the default return value of the context manager if no callbacks are added orcall_onceisFalse.If
call_onceisTrue, the callbacks will be called only once when the context manager exits, and then cleared. IfFalse, they will be called every time the context manager exits until they are manually cleared.default_getteris a function that returns the default arguments to call the callbacks with when the context manager exits. By default, it returns the exception info ifnameis'__exit__'and empty arguments otherwise.
- __exit__(exc_typ: asyncutils._internal.types.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]¶
- __exit__(exc_typ: None, exc_val: None, exc_tb: None, /) None
Call the callbacks.
- __iter__() collections.abc.Generator[collections.abc.Callable[P, T]][source]¶
Iterate through the callbacks.
- add(o: object, /) None[source]¶
Get the method on the object with the name specified and queue it to be called.
- offer_last(o: object, /) bool[source]¶
Add a callback from object only if there is space in the accumulator, and return whether it was added.
- property callbacks: Self¶
Return a view of the callbacks currently stored in the accumulator.
- class asyncutils.misc.StateMachine(state: str)[source]¶
A simple asynchronous state machine accepting string states.
Initialize the state machine with the given initial state.
- add(
- from_state: str,
- to_state: str,
- condition: collections.abc.Callable[[str, str], collections.abc.Awaitable[Any]] | None = ...,
- Add a condition to the transition from
from_statetoto_state.If any condition isNoneor returns a truthy value taking the current and new states as positional arguments, the transition is allowed.
- on_enter[F: collections.abc.Callable[[], collections.abc.Awaitable[Any]]](state: str) collections.abc.Callable[[F], F]¶
Register an asynchronous handler to be called when
stateis entered.
- on_exit[F: collections.abc.Callable[[], collections.abc.Awaitable[Any]]](state: str) collections.abc.Callable[[F], F]¶
Register an asynchronous handler to be called when
stateis exited.
- async asyncutils.misc.gather_with_limited_concurrency[T](
- n: int = ...,
- /,
- *coros: collections.abc.Awaitable[T],
- ret_exc: Literal[False] = ...,
- async asyncutils.misc.gather_with_limited_concurrency(
- n: int = ...,
- /,
- *coros: collections.abc.Awaitable[T],
- ret_exc: Literal[True],
n, which defaults tocontext.GATHER_WITH_LIMITED_CONCURRENCY_DEFAULT_MAX_CONCURRENT, is used to restrict the number ofconcurrently running awaitables.