asyncutils.util

Functions of utility one tier below the base submodule, such that they are not worth preloading but still quite useful.

Attributes

anullcontext

An instance of an async context manager that does nothing.

ignore_cancellation

Context manager to ignore CancelledError.

Functions

aawcmf2dcmf(→ collections.abc.Callable[P, ...)

Equivalent to aawcmf2dcmff()(f).

aawcmf2dcmff(...)

A decorator factory converting a function giving an awaitable resolving to an async context manager into a function returning a non-reusable dual context manager using dualcontextmanager().

afalsify(→ Literal[False])

An async function that does nothing and returns False.

afcopy(→ collections.abc.Callable[P, ...)

Return a copy of the async function f with the same signature and attributes.

aiter_from_f(→ collections.abc.AsyncGenerator[T])

Emulate the second form of the builtin iter() function in async, which the aiter() function does not have.

anullify(→ None)

An async function that does nothing and returns None.

atruthify(→ Literal[True])

An async function that does nothing and returns True.

avalify(→ collections.abc.Callable[Ellipsis, ...)

Return an async function that takes any arguments, always returning the value v.

dcm(→ collections.abc.Callable[P, ...)

Equivalent to dualcontextmanager() with the default arguments at the time of definition, rather than when the function is decorated.

done_evt(…)

Return a new async event that is already set, with type evtcls if passed and asyncio.Event by default.

done_fut(…)

Return a future that is already done with the result res or the exception wrapped by the wrapper exc if it is an exception wrapper returned by exceptions.wrap_exc(), with type futcls if passed and asyncio.Future by default.

dualcontextmanager(…)

Convert a callable that returns an (async) iterable, usually an (async) generator function, over exactly one item, into a function returning a non-reusable sync- and async-compatible context manager. Essentially combines contextlib.contextmanager() and contextlib.asynccontextmanager() into one decorator.

get_future(→ asyncio.Future[T])

locked_lock(…)

Return an already acquired lock of type lcls if passed and asyncio.Lock by default.

lockf(→ collections.abc.Callable[P, ...)

Apply a lock that implements the async lock interface, as constructed and returned by lf, to a function f that returns an awaitable, also converting it to an async function.

new_eager_tasks(...)

Yield eagerly started tasks wrapping the coroutines under the running loop (or a new one that is set as the current if required) in order.

safe_cancel(→ None)

semaphore(…)

Simple helper function returning a (bounded) semaphore of value workers, defaulting to context.SEMAPHORE_DEFAULT_VALUE.

sync_await(→ T)

Synchronously await the awaitable object aw under the given event loop loop with timeout timeout. If never_block=False is passed and the loop is not running, its run_until_complete() method may be called; otherwise, a pair of futures is created to coordinate the execution of the awaitable. It is preferred to use asyncio.run() to synchronously run one single top-level async function that awaits the necessary awaitables. Calling this function with the event loop running in the current thread will cause RuntimeError to be thrown.

to_async(→ collections.abc.Callable[P, ...)

to_sync(→ collections.abc.Callable[P, T])

Convert a function that returns an awaitable to an sync function with the same signature, using the event loop loop when required or creating when necessary.

to_sync_from_loop(...)

A version of to_sync() that is a decorator factory, returning its partial under loop=loop.

transient_block(…)

transient_block_from_loop(...)

Return the partial of transient_block() under the specified loop.

wrap_in_coro(→ T)

Return a coroutine resolving to the result of the awaitable aw, such that it can be passed to asyncio.create_task().

Module Contents

asyncutils.util.aawcmf2dcmf[T, **P](
f: collections.abc.Callable[P, collections.abc.Awaitable[contextlib.AbstractContextManager[T] | contextlib.AbstractAsyncContextManager[T]]],
/,
) collections.abc.Callable[P, asyncutils._internal.types.DualContextManager[T]]

Equivalent to aawcmf2dcmff()(f).

asyncutils.util.aawcmf2dcmff[
T,
**P,
](
*,
use_existing_executor: bool = ...,
create_executor: bool = ...,
strict: bool = ...,
) collections.abc.Callable[[collections.abc.Callable[P, collections.abc.Awaitable[contextlib.AbstractContextManager[T] | contextlib.AbstractAsyncContextManager[T]]]], collections.abc.Callable[P, asyncutils._internal.types.DualContextManager[T]]][source]

A decorator factory converting a function giving an awaitable resolving to an async context manager into a function returning a non-reusable dual context manager using dualcontextmanager().

async asyncutils.util.afalsify(*a: Any, **k: Any) Literal[False]

An async function that does nothing and returns False.

asyncutils.util.afcopy[T, **P](
f: collections.abc.Callable[P, collections.abc.Awaitable[T]],
/,
) collections.abc.Callable[P, types.CoroutineType[Any, Any, T]][source]

Return a copy of the async function f with the same signature and attributes.

asyncutils.util.aiter_from_f[T](
f: collections.abc.Callable[[], collections.abc.Awaitable[T]],
sentinel: T = ...,
/,
) collections.abc.AsyncGenerator[T][source]

Emulate the second form of the builtin iter() function in async, which the aiter() function does not have.

async asyncutils.util.anullify(*a: Any, **k: Any) None

An async function that does nothing and returns None.

async asyncutils.util.atruthify(*a: Any, **k: Any) Literal[True]

An async function that does nothing and returns True.

asyncutils.util.avalify[T](v: T, /) collections.abc.Callable[Ellipsis, types.CoroutineType[Any, Any, T]][source]

Return an async function that takes any arguments, always returning the value v.

asyncutils.util.dcm[T, **P](
f: collections.abc.Callable[P, asyncutils._internal.types.SupportsIteration[T]],
/,
) collections.abc.Callable[P, asyncutils._internal.types.DualContextManager[T]]

Equivalent to dualcontextmanager() with the default arguments at the time of definition, rather than when the function is decorated.

asyncutils.util.done_evt[E: asyncutils._internal.types.EventProt](*, evtcls: type[E]) E[source]
asyncutils.util.done_evt() asyncio.Event

Return a new async event that is already set, with type evtcls if passed and asyncio.Event by default.

asyncutils.util.done_fut(
exc: asyncutils._internal.types.ExceptionWrapper,
/,
*,
futcls: type[asyncutils._internal.types.FutProt[Any]],
) asyncutils._internal.types.IncompleteFut[Never][source]
asyncutils.util.done_fut(
res: None = ...,
*,
futcls: type[asyncutils._internal.types.FutProt[Any]],
) asyncutils._internal.types.IncompleteFut[None]
asyncutils.util.done_fut(res: T, *, futcls: type[asyncutils._internal.types.FutProt[Any]]) asyncutils._internal.types.IncompleteFut[T]
asyncutils.util.done_fut(exc: asyncutils._internal.types.ExceptionWrapper, /) asyncio.Future[Never]
asyncutils.util.done_fut(res: None = ...) asyncio.Future[None]
asyncutils.util.done_fut(res: T) asyncio.Future[T]

Return a future that is already done with the result res or the exception wrapped by the wrapper exc if it is an exception wrapper returned by exceptions.wrap_exc(), with type futcls if passed and asyncio.Future by default.

asyncutils.util.dualcontextmanager[
T,
**P,
](
*,
use_existing_executor: bool,
strict: Literal[True],
) collections.abc.Callable[[collections.abc.Callable[P, collections.abc.Iterable[T]]], collections.abc.Callable[P, contextlib.AbstractContextManager[T, bool]]][source]
asyncutils.util.dualcontextmanager(
*,
create_executor: bool,
strict: Literal[True],
) collections.abc.Callable[[collections.abc.Callable[P, collections.abc.Iterable[T]]], collections.abc.Callable[P, contextlib.AbstractContextManager[T, bool]]]
asyncutils.util.dualcontextmanager(
*,
use_existing_executor: bool,
create_executor: bool,
strict: Literal[True],
) collections.abc.Callable[[collections.abc.Callable[P, collections.abc.Iterable[T]]], collections.abc.Callable[P, contextlib.AbstractContextManager[T, bool]]]
asyncutils.util.dualcontextmanager(
*,
use_existing_executor: bool,
strict: Literal[False],
) collections.abc.Callable[[collections.abc.Callable[P, collections.abc.Iterable[T]]], collections.abc.Callable[P, asyncutils._internal.types.DualContextManager[T]]]
asyncutils.util.dualcontextmanager(
*,
create_executor: bool,
strict: Literal[False],
) collections.abc.Callable[[collections.abc.Callable[P, collections.abc.Iterable[T]]], collections.abc.Callable[P, asyncutils._internal.types.DualContextManager[T]]]
asyncutils.util.dualcontextmanager(
*,
use_existing_executor: bool,
create_executor: bool,
strict: Literal[False],
) collections.abc.Callable[[collections.abc.Callable[P, collections.abc.Iterable[T]]], collections.abc.Callable[P, asyncutils._internal.types.DualContextManager[T]]]
asyncutils.util.dualcontextmanager(*, strict: Literal[True]) asyncutils._internal.types.DCRV
asyncutils.util.dualcontextmanager(
*,
strict: Literal[False],
) collections.abc.Callable[[collections.abc.Callable[P, asyncutils._internal.types.SupportsIteration[T]]], collections.abc.Callable[P, asyncutils._internal.types.DualContextManager[T]]]
asyncutils.util.dualcontextmanager(
*,
strict: bool = ...,
) collections.abc.Callable[[collections.abc.Callable[P, asyncutils._internal.types.SupportsIteration[T]]], collections.abc.Callable[P, asyncutils._internal.types.DualContextManager[T]]]
asyncutils.util.dualcontextmanager(
gfunc: collections.abc.Callable[P, collections.abc.Iterable[T]],
/,
*,
use_existing_executor: bool = ...,
create_executor: bool = ...,
strict: Literal[True],
) collections.abc.Callable[P, contextlib.AbstractContextManager[T, bool]]
asyncutils.util.dualcontextmanager(
gfunc: collections.abc.Callable[P, collections.abc.Iterable[T]],
/,
*,
use_existing_executor: bool = ...,
create_executor: bool = ...,
strict: Literal[False],
) collections.abc.Callable[P, asyncutils._internal.types.DualContextManager[T]]
asyncutils.util.dualcontextmanager(
gfunc: collections.abc.Callable[P, collections.abc.Iterable[T]],
/,
*,
use_existing_executor: bool = ...,
create_executor: bool = ...,
strict: bool = ...,
) collections.abc.Callable[P, asyncutils._internal.types.DualContextManager[T]]
asyncutils.util.dualcontextmanager(
agfunc: collections.abc.Callable[P, collections.abc.AsyncIterable[T]],
/,
*,
strict: Literal[True],
) collections.abc.Callable[P, contextlib.AbstractAsyncContextManager[T, bool]]
asyncutils.util.dualcontextmanager(
agfunc: collections.abc.Callable[P, collections.abc.AsyncIterable[T]],
/,
*,
strict: Literal[False],
) collections.abc.Callable[P, asyncutils._internal.types.DualContextManager[T]]
asyncutils.util.dualcontextmanager(
agfunc: collections.abc.Callable[P, collections.abc.AsyncIterable[T]],
/,
*,
strict: bool = ...,
) collections.abc.Callable[P, asyncutils._internal.types.DualContextManager[T]]

Convert a callable that returns an (async) iterable, usually an (async) generator function, over exactly one item, into a function returning a non-reusable sync- and async-compatible context manager. Essentially combines contextlib.contextmanager() and contextlib.asynccontextmanager() into one decorator.

asyncutils.util.get_future[T](aw: collections.abc.Awaitable[T], loop: asyncio.AbstractEventLoop | None = ...) asyncio.Future[T][source]
Wrap an arbitrary awaitable aw in a task under loop, creating one and setting if required, and begin waiting on it.
Critical exceptions are wrapped in Critical.
This is as opposed to create_task(), which only takes coroutines.
asyncutils.util.locked_lock[L: asyncutils._internal.types.AsyncLockLike[Any]](*, lcls: type[L]) L[source]
asyncutils.util.locked_lock() asyncio.Lock

Return an already acquired lock of type lcls if passed and asyncio.Lock by default.

asyncutils.util.lockf[T, **P](
f: collections.abc.Callable[P, collections.abc.Awaitable[T]],
/,
lf: type[asyncutils._internal.types.AsyncLockLike[Any]] = ...,
) collections.abc.Callable[P, types.CoroutineType[Any, Any, T]][source]

Apply a lock that implements the async lock interface, as constructed and returned by lf, to a function f that returns an awaitable, also converting it to an async function.

asyncutils.util.new_eager_tasks[T](*aws: collections.abc.Awaitable[T]) collections.abc.Generator[asyncio.Task[T]][source]

Yield eagerly started tasks wrapping the coroutines under the running loop (or a new one that is set as the current if required) in order.

async asyncutils.util.safe_cancel(fut: asyncio.Future[Any], /) None[source]
Cancel a single future and wait for the cancellation to complete asynchronously.
Advertises itself as safe, because the cancellation itself can be reliably cancelled.

See also

safe_cancel_batch()

a much more efficient way to cancel multiple futures at once without compromising cancellability.

asyncutils.util.semaphore(bounded: Literal[False] = ..., workers: int = ...) asyncio.Semaphore[source]
asyncutils.util.semaphore(bounded: Literal[True], workers: Literal[1]) asyncio.Lock
asyncutils.util.semaphore(bounded: Literal[True], workers: int = ...) asyncio.BoundedSemaphore

Simple helper function returning a (bounded) semaphore of value workers, defaulting to context.SEMAPHORE_DEFAULT_VALUE.

asyncutils.util.sync_await[T](
aw: collections.abc.Awaitable[T],
loop: asyncio.AbstractEventLoop | None = ...,
*,
never_block: bool = ...,
timeout: float | None = ...,
) T[source]

Synchronously await the awaitable object aw under the given event loop loop with timeout timeout. If never_block=False is passed and the loop is not running, its run_until_complete() method may be called; otherwise, a pair of futures is created to coordinate the execution of the awaitable. It is preferred to use asyncio.run() to synchronously run one single top-level async function that awaits the necessary awaitables. Calling this function with the event loop running in the current thread will cause RuntimeError to be thrown.

asyncutils.util.to_async[T, **P](f: collections.abc.Callable[P, T], /) collections.abc.Callable[P, types.CoroutineType[Any, Any, T]][source]
Return the async version of the original function with all the attributes from its instance dictionary, which runs in an executor lazy
initialized and shared by all to_async()-transformed callables.
If the argument was returned by to_sync(), a copy of the original async function is returned.

Warning

This function may create reference cycles. If memory is a concern, call gc.collect() regularly.

See also

pools.AdvancedPool

an async-first thread pool executor-like class.

asyncutils.util.to_sync[T, **P](
f: collections.abc.Callable[P, collections.abc.Awaitable[T]],
/,
loop: asyncio.AbstractEventLoop | None = ...,
*,
timeout: float | None = ...,
) collections.abc.Callable[P, T][source]

Convert a function that returns an awaitable to an sync function with the same signature, using the event loop loop when required or creating when necessary.

asyncutils.util.to_sync_from_loop(loop: asyncio.AbstractEventLoop) asyncutils._internal.types.ToSyncFromLoopRV[source]

A version of to_sync() that is a decorator factory, returning its partial under loop=loop.

asyncutils.util.transient_block[T, **P](loop: asyncio.AbstractEventLoop, f: collections.abc.Callable[P, T], /, *a: P, **k: P) asyncio.Future[T][source]
asyncutils.util.transient_block(
loop: asyncio.AbstractEventLoop,
f: collections.abc.Callable[Ellipsis, T],
/,
*a: object,
_threadsafe_: Literal[True],
**k: object,
) asyncio.Future[T]
Run a sync function f, with the provided parameters passed straight through, in the event loop loop, and return an async future
resolving to its result or exception.
This function avoids incurring the overhead of calling run_in_executor() by instead scheduling the function to run at the
next iteration of the loop. To avoid overhead, the function should return fast.
If _threadsafe_ is True, then the function is scheduled in a thread-safe way, so that this can be called from threads not owning the loop.
asyncutils.util.transient_block_from_loop(
loop: asyncio.AbstractEventLoop,
*,
threadsafe: bool = ...,
) asyncutils._internal.types.TransientBlockFromLoopRV[source]

Return the partial of transient_block() under the specified loop.

async asyncutils.util.wrap_in_coro[T](aw: collections.abc.Awaitable[T], /) T[source]

Return a coroutine resolving to the result of the awaitable aw, such that it can be passed to asyncio.create_task().

asyncutils.util.anullcontext: asyncutils._internal.types.ANCT

An instance of an async context manager that does nothing.

asyncutils.util.ignore_cancellation: asyncutils.exceptions.IgnoreErrors

Context manager to ignore CancelledError.