asyncutils.compete

Functions

convert_to_coro_iter(...)

enhanced_gather(→ list[Any])

Version of asyncio.gather() that takes a larger variety of objects as the first argument, using convert_to_coro_iter() under the hood.

enhanced_staggered_race(→ tuple[Any, int | None, ...)

asyncio.staggered.staggered_race(), but taking a larger variety of objects as the first argument using convert_to_coro_iter(); see above.

first_completed(…)

multi_winner_race_with_callback(→ list[T])

Return a list of all the coroutines that completed within timeout, and cancel the rest, triggering callbacks similarly to race_with_callback().

race_with_callback(→ T | None)

Module Contents

asyncutils.compete.convert_to_coro_iter(
cfs: asyncutils._internal.types.SupportsIteration[Any],
*,
skip_invalid: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
corocheck: collections.abc.Callable[[Any], TypeGuard[collections.abc.Coroutine[Any, Any, Any]]] = ...,
futwrap: asyncutils._internal.types.FutWrapType = ...,
handle_aiter: collections.abc.Callable[[collections.abc.AsyncIterable[Any]], collections.abc.Coroutine[Any, Any, Any]] = ...,
handle_iter: collections.abc.Callable[[collections.abc.Iterable[Any]], collections.abc.Coroutine[Any, Any, Any]] = ...,
) collections.abc.Generator[collections.abc.Coroutine[Any, Any, Any], Any][source]
A helper function to convert a possibly async iterable of futures, coroutines and even (async) iterables cfs to a plain generator of
coroutines, such that it may be starred and passed into the functions in this module.
Originally designed to complement staggered_race().
Due to the possibility of cfs being async and this function being designed to operate in a sync context, it is somewhat inefficient.
skip_invalid, which determines whether to raise TypeError for unconvertible items or simply to skip them, defaults to
context.CONVERT_TO_CORO_ITER_DEFAULT_SKIP_INVALID.
handle_aiter and handle_iter should be callables taking an async iterable and a sync iterable respectively and returning a coroutine.
async asyncutils.compete.enhanced_gather(
it: asyncutils._internal.types.SupportsIteration[Any],
return_exceptions: bool = False,
*,
loop: asyncio.AbstractEventLoop | None = ...,
) list[Any][source]

Version of asyncio.gather() that takes a larger variety of objects as the first argument, using convert_to_coro_iter() under the hood.

See also

iters.agather()

if you just want to pass in an async iterable; this version materializes a list of all the items within but avoids creating the intermediate sync futures, which in many cases is a better strategy

async asyncutils.compete.enhanced_staggered_race(
cfs: asyncutils._internal.types.SupportsIteration[Any],
delay: float | None = ...,
*,
loop: asyncio.AbstractEventLoop | None = ...,
) tuple[Any, int | None, list[Exception | None]][source]

asyncio.staggered.staggered_race(), but taking a larger variety of objects as the first argument using convert_to_coro_iter(); see above.

async asyncutils.compete.first_completed[T](
*C: collections.abc.Awaitable[T],
ret_exc: Literal[True],
timeout: float | None = ...,
) asyncutils._internal.types.ExceptionWrapper | T | None[source]
async asyncutils.compete.first_completed(*C: collections.abc.Awaitable[T], ret_exc: Literal[False] = ..., timeout: float | None = ...) T | None
Return the result of the first coroutine that completes among those passed in.
If ret_exc is True, the coroutine might have errored, in which case the exception it throws is returned in a wrapped form
unpackable using exceptions.unwrap_exc() after checking with exceptions.exception_occurred().
In any case, the losing coroutines are cancelled together and the function returns when the cancellations finish.
async asyncutils.compete.multi_winner_race_with_callback[T](
*C: collections.abc.Awaitable[T],
timeout: float,
winner: collections.abc.Callable[[T], object] = ...,
loser: collections.abc.Callable[[Any | BaseException], object] = ...,
) list[T][source]

Return a list of all the coroutines that completed within timeout, and cancel the rest, triggering callbacks similarly to race_with_callback().

async asyncutils.compete.race_with_callback[T](
*C: collections.abc.Awaitable[T],
winner: collections.abc.Callable[[T], object] = ...,
loser: collections.abc.Callable[[Any | BaseException], object] = ...,
timeout: float | None = ...,
) T | None[source]
Return the result of the first coroutine to complete, which will have winner called on it.
If no coroutine completes within timeout, None is returned.
The loser callback is called on each return value of or exception raised by the losing coroutines after seeing CancelledError.