asyncutils.misc

Utilities that cannot be easily classified into any submodule.

Classes

CacheWithBackgroundRefresh

CallbackAccumulator

A utility class to store synchronous callbacks and call them sequentially in an executor when the context manager exits.

StateMachine

A simple asynchronous state machine accepting string states.

Functions

Module Contents

class asyncutils.misc.CacheWithBackgroundRefresh[T, R](
ttl: float | None = ...,
refresh: float | None = ...,
*,
default_loader: collections.abc.Callable[[T], R],
processor: collections.abc.Callable[[BaseException, bool], object] = ...,
timer: asyncutils._internal.types.Timer = ...,
)[source]
class asyncutils.misc.CacheWithBackgroundRefresh(
ttl: float | None = ...,
refresh: float | None = ...,
*,
processor: collections.abc.Callable[[BaseException, bool], object] = ...,
timer: asyncutils._internal.types.Timer = ...,
)

Bases: asyncutils.mixins.LoopContextMixin

A cache that automatically refreshes entries in the background before expiry. Use as an async context manager only.
Maintains entries with TTL values and proactively reloads their values from registered loaders in the background when they approach expiration.
This ensures availability of fresh data without blocking get operations.

All arguments are optioanl:

  • ttl: Time-to-live in seconds; default context.BACKGROUND_REFRESH_CACHE_DEFAULT_TTL.

  • refresh: Time before TTL expires to begin the refresh; default context.BACKGROUND_REFRESH_CACHE_DEFAULT_REFRESH.

  • processor: Error handler that takes two arguments (exc, was_batched), where exc is the exception occurred and

  • was_batched whether the exception was thrown during a batch refresh, in contrast to a single-item refresh.

  • default_loader: The loader to load values from keys for which specific loaders have not been registered.

async __cleanup__() None[source]
__contains__(key: T) bool[source]

Check if a key exists in the cache.

async __setup__() None[source]
async clear() None[source]

Remove all entries from the cache asynchronously.

configure(ttl: float, refresh: float, processor: collections.abc.Callable[[BaseException, bool], object] = ...) None[source]

(Re-)configure the cache with the given ttl, refresh and processor.

expired(key: T) bool[source]

Whether the key has overstayed its TTL.

async get(key: T, loader: collections.abc.Callable[[T], R] | None = ...) R[source]
Get the value for the key from the cache.
If the key is expired, it is immediately loaded; if it is within the refresh window, return the current value and trigger background refresh.
get_loader(key: T) collections.abc.Callable[[T], R][source]

Get the loader registered for the key, raising LookupError if there is none.

async invalidate(key: T) R | None[source]

Remove a key from the cache, returning the corresponding value if it was in the cache.

async load_item(key: T) None[source]

Load the entry for a key and store it in the cache.

async refresh_item(key: T) None[source]

Refresh an entry in the background.

async refresh_loop() NoReturn[source]

This task runs continuously in the background, checking for entries requiring refresh and spawning tasks to do so.

register_loader(key: T, loader: collections.abc.Callable[[T], R]) None[source]

Register a specific loader for the key, that will take precedence over the default (if any).

should_refresh(key: T) bool[source]

Whether the key should be refreshed at this instant.

time_past(key: T) float[source]

Time having elapsed (in seconds) after the key was last reloaded.

class asyncutils.misc.CallbackAccumulator[T, **P](
name: str,
it: asyncutils._internal.types.SupportsIteration[collections.abc.Callable[P, T]],
maxlen: int | None = ...,
default: object = ...,
call_once: bool = ...,
default_getter: collections.abc.Callable[[], tuple[collections.abc.Iterable[object], collections.abc.Mapping[str, object]]] = ...,
)[source]
class asyncutils.misc.CallbackAccumulator(
name: str,
*,
maxlen: int | None = ...,
default: object = ...,
call_once: bool = ...,
default_getter: collections.abc.Callable[[], tuple[collections.abc.Iterable[object], collections.abc.Mapping[str, object]]] = ...,
)

Bases: collections.deque[collections.abc.Callable[P, T]], asyncutils.mixins.ExecutorRequiredAsyncContextMixin[CallbackAccumulator[T, P]]

A utility class to store synchronous callbacks and call them sequentially in an executor when the context manager exits.

Tip

To iterate through the callbacks at this moment safely, use the callbacks attribute.

Note

This class is no longer used by the pools after a massive rewrite, and only remains here for backwards compatibility.

Implementation detail

The fact that this class currently subclasses deque is subject to change.

Initialize the accumulator.

  • name is the name of attribute gotten on the argument to add().

  • maxlen is the maximum number of callbacks that can be stored.

  • default is the default return value of the context manager if no callbacks are added or call_once is False.

  • If call_once is True, the callbacks will be called only once when the context manager exits, and then cleared. If False, they will be called every time the context manager exits until they are manually cleared.

  • default_getter is a function that returns the default arguments to call the callbacks with when the context manager exits. By default, it returns the exception info if name is '__exit__' and empty arguments otherwise.

__call__(*a: P.args, **k: P.kwargs) None[source]
__enter__() Self[source]

Enter the context manager.

__exit__(exc_typ: asyncutils._internal.types.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]
__exit__(exc_typ: None, exc_val: None, exc_tb: None, /) None

Call the callbacks.

__iter__() collections.abc.Generator[collections.abc.Callable[P, T]][source]

Iterate through the callbacks.

add(o: object, /) None[source]

Get the method on the object with the name specified and queue it to be called.

offer_last(o: object, /) bool[source]

Add a callback from object only if there is space in the accumulator, and return whether it was added.

property callbacks: Self

Return a view of the callbacks currently stored in the accumulator.

class asyncutils.misc.StateMachine(state: str)[source]

A simple asynchronous state machine accepting string states.

Initialize the state machine with the given initial state.

add(
from_state: str,
to_state: str,
condition: collections.abc.Callable[[str, str], collections.abc.Awaitable[Any]] | None = ...,
) None[source]
Add a condition to the transition from from_state to to_state.
If any condition is None or returns a truthy value taking the current and new states as positional arguments, the transition is allowed.
on_enter[F: collections.abc.Callable[[], collections.abc.Awaitable[Any]]](state: str) collections.abc.Callable[[F], F]

Register an asynchronous handler to be called when state is entered.

on_exit[F: collections.abc.Callable[[], collections.abc.Awaitable[Any]]](state: str) collections.abc.Callable[[F], F]

Register an asynchronous handler to be called when state is exited.

async transition(state: str) bool[source]

Transition from the current state to the new state.

async asyncutils.misc.gather_with_limited_concurrency[T](
n: int = ...,
/,
*coros: collections.abc.Awaitable[T],
ret_exc: Literal[False] = ...,
) list[T][source]
async asyncutils.misc.gather_with_limited_concurrency(
n: int = ...,
/,
*coros: collections.abc.Awaitable[T],
ret_exc: Literal[True],
) list[T | BaseException]
n, which defaults to context.GATHER_WITH_LIMITED_CONCURRENCY_DEFAULT_MAX_CONCURRENT, is used to restrict the number of
concurrently running awaitables.
ret_exc is passed to asyncio.gather() as the return_exceptions argument.