asyncutils.func¶
Higher-order functions with asynchronous APIs, containing utilities to retry, time, throttle, run functions periodically and more.
Classes¶
The rate limiter pattern as a decorator (factory). See |
Functions¶
|
Compose multiple functions. If the result of a function is a coroutine, await it before passing it to the next. Begin from the rightmost function, which can take variadic parameters, and then pipe its return value through as a single argument. |
|
Async version of |
|
|
|
A decorator factory that debounces the wrapped function, such that it is only called after it has not been called for |
|
|
|
The method version of |
|
A decorator factory that applies the decorated function |
|
A simple version of |
|
Only return the time and discard the return value of the function. |
|
|
|
Convert a function taking variadic parameters and returning an awaitable into a coroutine function taking two arguments: an iterable of positional arguments and a mapping of keyword arguments. |
|
A decorator factory that throttles the wrapped function, such that it can only be called once every |
|
|
|
The inverse of |
Module Contents¶
- class asyncutils.func.RateLimited[T, **P][source]¶
The rate limiter pattern as a decorator (factory). See
AdvancedRateLimitfor the async context manager version.Limit the rate of calls of a function
f, such that onlycallscalls withinperiodseconds, as determined bytimer, are allowed.
- asyncutils.func.acompose(*funcs: collections.abc.Callable[Ellipsis, Any], wrap_last: bool = ...) collections.abc.Callable[Ellipsis, Any][source]¶
Compose multiple functions. If the result of a function is a coroutine, await it before passing it to the next. Begin from the rightmost function, which can take variadic parameters, and then pipe its return value through as a single argument.
- async asyncutils.func.areduce[T, R](
- f: collections.abc.Callable[[T, R], collections.abc.Awaitable[T]],
- it: asyncutils._internal.types.SupportsIteration[R],
- initial: T = ...,
- *,
- await_: Literal[True] = ...,
- async asyncutils.func.areduce(
- f: collections.abc.Callable[[T, R], T],
- it: asyncutils._internal.types.SupportsIteration[R],
- initial: T = ...,
- *,
- await_: Literal[False],
Async version of
functools.reduce()that takes an async function and possibly async iterable. If the function is sync or the return value is not meant to be awaited, specifyawait_=False.
- async asyncutils.func.benchmark(
- f: collections.abc.Callable[[], collections.abc.Awaitable[Any]],
- /,
- times: int = ...,
- warmup: int = ...,
- *,
- sequential: bool = ...,
fis the function to benchmark, which should take no arguments and return an awaitable.timesis how many times the function should be run, which defaults tocontext.BENCHMARK_DEFAULT_TIMES.warmupis the number of warmup rounds to call the function for; not included in the benchmark results; defaultcontext.BENCHMARK_DEFAULT_WARMUP.sequential(defaultcontext.BENCHMARK_DEFAULT_SEQUENTIAL) determines whether the function calls are made sequentially or gathered at once. Forexample, for functions requiring a mutex to be acquired or with other rate-limiting policies in place, you should explicitly passsequential=True.This function should only be used in very simple cases, because we recognize the inherent difficulty and abundancy of noise in testing IO-bound code, andthis library alone is by no means designed to solve that issue.
- asyncutils.func.debounce(wait: float) asyncutils._internal.types.DecoratorFactoryRV[source]¶
A decorator factory that debounces the wrapped function, such that it is only called after it has not been called for
waitseconds.
- asyncutils.func.every(
- intvl: float,
- /,
- *,
- count_f: bool = ...,
- verbose: bool = ...,
- stop_on_exc: bool = ...,
- loop: asyncio.AbstractEventLoop | None = ...,
- wait_first: bool = ...,
- max_iterations: int | None = ...,
- timer: asyncutils._internal.types.Timer = ...,
- supplied_args: collections.abc.Iterable[Any] = ...,
- supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
- asyncutils.func.every(
- intvl: float,
- /,
- *,
- stop_when: asyncio.Future[T],
- count_f: bool = ...,
- verbose: bool = ...,
- stop_on_exc: bool = ...,
- loop: asyncio.AbstractEventLoop | None = ...,
- wait_first: bool = ...,
- max_iterations: int | None = ...,
- timer: asyncutils._internal.types.Timer = ...,
- supplied_args: collections.abc.Iterable[Any] = ...,
- supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
- asyncutils.func.every(
- intvl: float,
- /,
- *,
- count_f: bool = ...,
- verbose: bool = ...,
- stop_on_exc: bool = ...,
- loop: asyncio.AbstractEventLoop | None = ...,
- wait_first: bool = ...,
- max_iterations: int | None = ...,
- timer: asyncutils._internal.types.Timer = ...,
- supplied_args: collections.abc.Iterable[Any] = ...,
- supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
- default: T,
- asyncutils.func.every(
- intvl: float,
- /,
- *,
- stop_when: asyncio.Future[T],
- count_f: bool = ...,
- verbose: bool = ...,
- loop: asyncio.AbstractEventLoop | None = ...,
- stop_on_exc: bool = ...,
- wait_first: bool = ...,
- max_iterations: int | None = ...,
- timer: asyncutils._internal.types.Timer = ...,
- supplied_args: collections.abc.Iterable[Any] = ...,
- supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
- default: T,
- A decorator factory that repeats a function regularly, useful for periodic monitoring tasks.The resultant function will run every
intvlseconds, as determined bytimer, at mostmax_iterationstimes. Ifcount_fis True, this time includes the execution time of the function.Ifwait_firstisTrue, sleep forintvlseconds before the first execution.Ifstop_on_excisTrue, the function returns once the decorated function throws any exception orstop_whenis cancelled.verbosemakes the function treat exceptions more severely output-wise.Once the result ofstop_whenis set, the function returns that result, which should be None or the same type as the return type of the decorated function after awaiting.However, the task is guaranteed to be run at least once.When using thesupplied_argsandsupplied_kwargsparameters, maintain a reference to them so that you can edit the parameters fed to the function on the fly.Finally, the function returnsdefaultor None if it was not passed, unlessstop_on_excis True ordefaultisRAISE, in which caseMaxIterationsErroris thrown.
- asyncutils.func.everymethod(
- intvl: float,
- /,
- *,
- count_f: bool = ...,
- verbose: bool = ...,
- stop_on_exc: bool = ...,
- loop: asyncio.AbstractEventLoop | None = ...,
- wait_first: bool = ...,
- max_iterations: int | None = ...,
- timer: asyncutils._internal.types.Timer = ...,
- supplied_args: collections.abc.Iterable[Any] = ...,
- supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
- asyncutils.func.everymethod(
- intvl: float,
- /,
- *,
- stop_when_getter: collections.abc.Callable[[T], asyncio.Future[R]],
- count_f: bool = ...,
- verbose: bool = ...,
- stop_on_exc: bool = ...,
- loop: asyncio.AbstractEventLoop | None = ...,
- wait_first: bool = ...,
- max_iterations: int | None = ...,
- timer: asyncutils._internal.types.Timer = ...,
- supplied_args: collections.abc.Iterable[Any] = ...,
- supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
- asyncutils.func.everymethod(
- intvl: float,
- /,
- *,
- count_f: bool = ...,
- verbose: bool = ...,
- stop_on_exc: bool = ...,
- loop: asyncio.AbstractEventLoop | None = ...,
- wait_first: bool = ...,
- max_iterations: int | None = ...,
- timer: asyncutils._internal.types.Timer = ...,
- supplied_args: collections.abc.Iterable[Any] = ...,
- supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
- default: T,
- asyncutils.func.everymethod(
- intvl: float,
- /,
- *,
- stop_when_getter: collections.abc.Callable[[T], asyncio.Future[R]],
- count_f: bool = ...,
- verbose: bool = ...,
- stop_on_exc: bool = ...,
- loop: asyncio.AbstractEventLoop | None = ...,
- wait_first: bool = ...,
- max_iterations: int | None = ...,
- timer: asyncutils._internal.types.Timer = ...,
- supplied_args: collections.abc.Iterable[Any] = ...,
- supplied_kwargs: collections.abc.Mapping[str, Any] = ...,
- default: R,
The method version of
every().stop_when_getter, if passed, should takeselfand returns a suitable futurestop_when. Other parameters are as inevery().
- asyncutils.func.iterf[
- T,
- n: int,
- /,
A decorator factory that applies the decorated function
ntimes to its argument.
- async asyncutils.func.measure[T](
- f: collections.abc.Callable[[], collections.abc.Awaitable[T]],
- /,
- *,
- timer: asyncutils._internal.types.Timer = ...,
A simple version of
timer()for functions taking no arguments and returning awaitables.
- async asyncutils.func.measure2[T](
- f: collections.abc.Callable[[], collections.abc.Awaitable[T]],
- /,
- *,
- timer: asyncutils._internal.types.Timer = ...,
Only return the time and discard the return value of the function.
- asyncutils.func.retry(
- tries: int = ...,
- delay: float = ...,
- *,
- max_delay: float = ...,
- backoff: float = ...,
- jitter: float = ...,
- exc: asyncutils._internal.types.Exceptable = ...,
- on_retry: collections.abc.Callable[[int, BaseException], Any] = ...,
- on_success: collections.abc.Callable[[int, float], Any] = ...,
- random: collections.abc.Callable[[], float] = ...,
- A decorator factory that retries the wrapped function with exponential backoff, returning once the function succeeds.If the function does not succeed within
triesattempts (defaultcontext.RETRY_DEFAULT_TRIES), the last exception is propagated.backoff(defaultcontext.RETRY_DEFAULT_BACKOFF) is the multiplier applied to the delay (initiallydelaywhich defaults tocontext.RETRY_DEFAULT_DELAY) after each failed attempt, which can never exceedmax_delay(defaultcontext.RETRY_DEFAULT_MAX_DELAY).jitter(defaultcontext.RETRY_DEFAULT_JITTER) is the maximum random jitter added to the delay.excspecifies which exceptions to catch and retry on; if an exception not inexcis raised, it is propagated immediately.on_retryandon_successare callbacks called before each retry and after a successful call, with the attempt number (zero-based)as the first argument and the exception caught or the time taken for the successful call respectively as the second. Thus,on_successisonly called once.
- asyncutils.func.star[
- T,
- f: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[T]],
- /,
Convert a function taking variadic parameters and returning an awaitable into a coroutine function taking two arguments: an iterable of positional arguments and a mapping of keyword arguments.
- asyncutils.func.throttle(lim: float, timer: asyncutils._internal.types.Timer = ...) asyncutils._internal.types.DecoratorFactoryRV[source]¶
A decorator factory that throttles the wrapped function, such that it can only be called once every
limseconds, as determined bytimer.
- asyncutils.func.timer[
- T,
- **P,
- f: collections.abc.Callable[P, collections.abc.Awaitable[T]],
- /,
- *,
- precision: int = ...,
- expected: asyncutils._internal.types.Exceptable = ...,
- should_log: bool = ...,
- timer: asyncutils._internal.types.Timer = ...,
- ns: bool = ...,
- Convert the function that returns an awaitable object into an async function that returns a tuple
(res_or_exc, elapsed).timer(defaulttime.perf_counter()) is used to countelapsed, the time required to execute the function.res_or_excis the awaited result of the wrapped function, or the exception thrown as wrapped bywrap_exc().precision(defaultcontext.TIMER_DEFAULT_PRECISION) is the number of digits after the decimal point to keep in the time in logging, andnswhether the return value of the timer indicates time in nanoseconds.Any exception the wrapped function emits whose type is not inexpectedis propagated directly.
- asyncutils.func.unstar[T](
- f: collections.abc.Callable[[collections.abc.Iterable[Any], collections.abc.Mapping[str, Any]], collections.abc.Awaitable[T]],
- /,
The inverse of
star().