asyncutils.func

Higher-order functions with asynchronous APIs, containing utilities to retry, time, throttle, run functions periodically and more.

Classes

RateLimited

The rate limiter pattern as a decorator (factory). See AdvancedRateLimit for the async context manager version.

Functions

acompose(→ collections.abc.Callable[Ellipsis, Any])

Compose multiple functions. If the result of a function is a coroutine, await it before passing it to the next. Begin from the rightmost function, which can take variadic parameters, and then pipe its return value through as a single argument.

areduce(…)

Async version of functools.reduce() that takes an async function and possibly async iterable. If the function is sync or the return value is not meant to be awaited, specify await_=False.

benchmark(→ asyncutils._internal.types.BenchmarkResult)

debounce(→ asyncutils._internal.types.DecoratorFactoryRV)

A decorator factory that debounces the wrapped function, such that it is only called after it has not been called for wait seconds.

every(…)

everymethod(…)

The method version of every(). stop_when_getter, if passed, should take self and returns a suitable future stop_when. Other parameters are as in every().

iterf(...)

A decorator factory that applies the decorated function n times to its argument.

measure(→ tuple[T, float])

A simple version of timer() for functions taking no arguments and returning awaitables.

measure2(→ float)

Only return the time and discard the return value of the function.

retry(→ asyncutils._internal.types.DecoratorFactoryRV)

star(...)

Convert a function taking variadic parameters and returning an awaitable into a coroutine function taking two arguments: an iterable of positional arguments and a mapping of keyword arguments.

throttle(→ asyncutils._internal.types.DecoratorFactoryRV)

A decorator factory that throttles the wrapped function, such that it can only be called once every lim seconds, as determined by timer.

timer(→ collections.abc.Callable[P, ...)

unstar(→ collections.abc.Callable[Ellipsis, ...)

The inverse of star().

Module Contents

class asyncutils.func.RateLimited[T, **P][source]

The rate limiter pattern as a decorator (factory). See AdvancedRateLimit for the async context manager version.

Limit the rate of calls of a function f, such that only calls calls within period seconds, as determined by timer, are allowed.

async __call__(*a: P.args, **k: P.kwargs) T[source]
asyncutils.func.acompose(*funcs: collections.abc.Callable[Ellipsis, Any], wrap_last: bool = ...) collections.abc.Callable[Ellipsis, Any][source]

Compose multiple functions. If the result of a function is a coroutine, await it before passing it to the next. Begin from the rightmost function, which can take variadic parameters, and then pipe its return value through as a single argument.

async asyncutils.func.areduce[T, R](
f: collections.abc.Callable[[T, R], collections.abc.Awaitable[T]],
it: asyncutils._internal.types.SupportsIteration[R],
initial: T = ...,
*,
await_: Literal[True] = ...,
) T[source]
async asyncutils.func.areduce(
f: collections.abc.Callable[[T, R], T],
it: asyncutils._internal.types.SupportsIteration[R],
initial: T = ...,
*,
await_: Literal[False],
) T

Async version of functools.reduce() that takes an async function and possibly async iterable. If the function is sync or the return value is not meant to be awaited, specify await_=False.

async asyncutils.func.benchmark(
f: collections.abc.Callable[[], collections.abc.Awaitable[Any]],
/,
times: int = ...,
warmup: int = ...,
*,
sequential: bool = ...,
) asyncutils._internal.types.BenchmarkResult[source]
f is the function to benchmark, which should take no arguments and return an awaitable.
times is how many times the function should be run, which defaults to context.BENCHMARK_DEFAULT_TIMES.
warmup is the number of warmup rounds to call the function for; not included in the benchmark results; default context.BENCHMARK_DEFAULT_WARMUP.
sequential (default context.BENCHMARK_DEFAULT_SEQUENTIAL) determines whether the function calls are made sequentially or gathered at once. For
example, for functions requiring a mutex to be acquired or with other rate-limiting policies in place, you should explicitly pass sequential=True.
This function should only be used in very simple cases, because we recognize the inherent difficulty and abundancy of noise in testing IO-bound code, and
this library alone is by no means designed to solve that issue.
asyncutils.func.debounce(wait: float) asyncutils._internal.types.DecoratorFactoryRV[source]

A decorator factory that debounces the wrapped function, such that it is only called after it has not been called for wait seconds.

asyncutils.func.every(
intvl: float,
/,
*,
count_f: bool = ...,
verbose: bool = ...,
stop_on_exc: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
wait_first: bool = ...,
max_iterations: int | None = ...,
timer: asyncutils._internal.types.Timer = ...,
supplied_args: collections.abc.Iterable[Any] = ...,
supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
) asyncutils._internal.types.EveryRV[Any][source]
asyncutils.func.every(
intvl: float,
/,
*,
stop_when: asyncio.Future[T],
count_f: bool = ...,
verbose: bool = ...,
stop_on_exc: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
wait_first: bool = ...,
max_iterations: int | None = ...,
timer: asyncutils._internal.types.Timer = ...,
supplied_args: collections.abc.Iterable[Any] = ...,
supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
) asyncutils._internal.types.EveryRV[T]
asyncutils.func.every(
intvl: float,
/,
*,
count_f: bool = ...,
verbose: bool = ...,
stop_on_exc: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
wait_first: bool = ...,
max_iterations: int | None = ...,
timer: asyncutils._internal.types.Timer = ...,
supplied_args: collections.abc.Iterable[Any] = ...,
supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
default: T,
) asyncutils._internal.types.EveryRV[T]
asyncutils.func.every(
intvl: float,
/,
*,
stop_when: asyncio.Future[T],
count_f: bool = ...,
verbose: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
stop_on_exc: bool = ...,
wait_first: bool = ...,
max_iterations: int | None = ...,
timer: asyncutils._internal.types.Timer = ...,
supplied_args: collections.abc.Iterable[Any] = ...,
supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
default: T,
) asyncutils._internal.types.EveryRV[T]
A decorator factory that repeats a function regularly, useful for periodic monitoring tasks.
The resultant function will run every intvl seconds, as determined by timer, at most max_iterations times. If count_f is True, this time includes the execution time of the function.
If wait_first is True, sleep for intvl seconds before the first execution.
If stop_on_exc is True, the function returns once the decorated function throws any exception or stop_when is cancelled.
verbose makes the function treat exceptions more severely output-wise.
Once the result of stop_when is set, the function returns that result, which should be None or the same type as the return type of the decorated function after awaiting.
However, the task is guaranteed to be run at least once.
When using the supplied_args and supplied_kwargs parameters, maintain a reference to them so that you can edit the parameters fed to the function on the fly.
Finally, the function returns default or None if it was not passed, unless stop_on_exc is True or default is RAISE, in which case MaxIterationsError is thrown.
asyncutils.func.everymethod(
intvl: float,
/,
*,
count_f: bool = ...,
verbose: bool = ...,
stop_on_exc: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
wait_first: bool = ...,
max_iterations: int | None = ...,
timer: asyncutils._internal.types.Timer = ...,
supplied_args: collections.abc.Iterable[Any] = ...,
supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
) asyncutils._internal.types.EveryMethodRV[Any, Any][source]
asyncutils.func.everymethod(
intvl: float,
/,
*,
stop_when_getter: collections.abc.Callable[[T], asyncio.Future[R]],
count_f: bool = ...,
verbose: bool = ...,
stop_on_exc: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
wait_first: bool = ...,
max_iterations: int | None = ...,
timer: asyncutils._internal.types.Timer = ...,
supplied_args: collections.abc.Iterable[Any] = ...,
supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
) asyncutils._internal.types.EveryMethodRV[R, T]
asyncutils.func.everymethod(
intvl: float,
/,
*,
count_f: bool = ...,
verbose: bool = ...,
stop_on_exc: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
wait_first: bool = ...,
max_iterations: int | None = ...,
timer: asyncutils._internal.types.Timer = ...,
supplied_args: collections.abc.Iterable[Any] = ...,
supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
default: T,
) asyncutils._internal.types.EveryMethodRV[T, Any]
asyncutils.func.everymethod(
intvl: float,
/,
*,
stop_when_getter: collections.abc.Callable[[T], asyncio.Future[R]],
count_f: bool = ...,
verbose: bool = ...,
stop_on_exc: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
wait_first: bool = ...,
max_iterations: int | None = ...,
timer: asyncutils._internal.types.Timer = ...,
supplied_args: collections.abc.Iterable[Any] = ...,
supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
default: R,
) asyncutils._internal.types.EveryMethodRV[R, T]

The method version of every(). stop_when_getter, if passed, should take self and returns a suitable future stop_when. Other parameters are as in every().

asyncutils.func.iterf[
T,
](
n: int,
/,
) collections.abc.Callable[[collections.abc.Callable[[T], collections.abc.Awaitable[T]]], collections.abc.Callable[[T], types.CoroutineType[Any, Any, T]]][source]

A decorator factory that applies the decorated function n times to its argument.

async asyncutils.func.measure[T](
f: collections.abc.Callable[[], collections.abc.Awaitable[T]],
/,
*,
timer: asyncutils._internal.types.Timer = ...,
) tuple[T, float][source]

A simple version of timer() for functions taking no arguments and returning awaitables.

async asyncutils.func.measure2[T](
f: collections.abc.Callable[[], collections.abc.Awaitable[T]],
/,
*,
timer: asyncutils._internal.types.Timer = ...,
) float[source]

Only return the time and discard the return value of the function.

asyncutils.func.retry(
tries: int = ...,
delay: float = ...,
*,
max_delay: float = ...,
backoff: float = ...,
jitter: float = ...,
exc: asyncutils._internal.types.Exceptable = ...,
on_retry: collections.abc.Callable[[int, BaseException], Any] = ...,
on_success: collections.abc.Callable[[int, float], Any] = ...,
random: collections.abc.Callable[[], float] = ...,
) asyncutils._internal.types.DecoratorFactoryRV[source]
A decorator factory that retries the wrapped function with exponential backoff, returning once the function succeeds.
If the function does not succeed within tries attempts (default context.RETRY_DEFAULT_TRIES), the last exception is propagated.
backoff (default context.RETRY_DEFAULT_BACKOFF) is the multiplier applied to the delay (initially delay which defaults to
context.RETRY_DEFAULT_DELAY) after each failed attempt, which can never exceed max_delay (default
context.RETRY_DEFAULT_MAX_DELAY).
jitter (default context.RETRY_DEFAULT_JITTER) is the maximum random jitter added to the delay.
exc specifies which exceptions to catch and retry on; if an exception not in exc is raised, it is propagated immediately.
on_retry and on_success are callbacks called before each retry and after a successful call, with the attempt number (zero-based)
as the first argument and the exception caught or the time taken for the successful call respectively as the second. Thus, on_success is
only called once.
asyncutils.func.star[
T,
](
f: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[T]],
/,
) collections.abc.Callable[[collections.abc.Iterable[Any], collections.abc.Mapping[str, Any]], types.CoroutineType[Any, Any, T]][source]

Convert a function taking variadic parameters and returning an awaitable into a coroutine function taking two arguments: an iterable of positional arguments and a mapping of keyword arguments.

asyncutils.func.throttle(lim: float, timer: asyncutils._internal.types.Timer = ...) asyncutils._internal.types.DecoratorFactoryRV[source]

A decorator factory that throttles the wrapped function, such that it can only be called once every lim seconds, as determined by timer.

asyncutils.func.timer[
T,
**P,
](
f: collections.abc.Callable[P, collections.abc.Awaitable[T]],
/,
*,
precision: int = ...,
expected: asyncutils._internal.types.Exceptable = ...,
should_log: bool = ...,
timer: asyncutils._internal.types.Timer = ...,
ns: bool = ...,
) collections.abc.Callable[P, types.CoroutineType[Any, Any, tuple[T | asyncutils._internal.types.ExceptionWrapper, float]]][source]
Convert the function that returns an awaitable object into an async function that returns a tuple (res_or_exc, elapsed).
timer (default time.perf_counter()) is used to count elapsed, the time required to execute the function.
res_or_exc is the awaited result of the wrapped function, or the exception thrown as wrapped by wrap_exc().
precision (default context.TIMER_DEFAULT_PRECISION) is the number of digits after the decimal point to keep in the time in logging, and ns whether the return value of the timer indicates time in nanoseconds.
Any exception the wrapped function emits whose type is not in expected is propagated directly.
asyncutils.func.unstar[T](
f: collections.abc.Callable[[collections.abc.Iterable[Any], collections.abc.Mapping[str, Any]], collections.abc.Awaitable[T]],
/,
) collections.abc.Callable[Ellipsis, types.CoroutineType[Any, Any, T]][source]

The inverse of star().