asyncutils.base

The most useful and fundamental patterns and helpers core to this module and are therefore required by the asyncutils.console submodule, among many others.

Attributes

dummy_task

An awaitable object that completes immediately. Also an exhausted generator.

yield_to_event_loop

An awaitable and picklable singleton that yields control to the event loop for exactly one iteration when awaited, much like asyncio.sleep(s) for non-positive s.

Classes

event_loop

A context manager to manage lifecycles of native event loops. Has specialized handling for asyncio implementation details.

Functions

adisembowel(→ collections.abc.AsyncGenerator[T])

Asynchronously disembowel an iterable from the right using its pop method and yield its items from right to left.

adisembowelleft(→ collections.abc.AsyncGenerator[T])

Asynchronously disembowel an iterable from the left using its popleft method and yield its items from left to right.

aenumerate(→ collections.abc.AsyncGenerator[tuple[int, T]])

The async version of enumerate, except it is not a class and additionally supports the step parameter.

aiter_to_gen(…)

collect(→ list[T])

collect_into(→ None)

drop(→ collections.abc.AsyncGenerator[T])

Discard n items from the (async) iterable and yield the rest. If there are not enough items and raising is True, throw exceptions.ItemsExhausted.

iter_to_agen(…)

safe_cancel_batch(…)

sleep_forever(→ NoReturn)

A coroutine that only completes when an exception is thrown in. The exception is propagated.

take(…)

Yield n items from the (async) iterable.

Module Contents

class asyncutils.base.event_loop[source]

A context manager to manage lifecycles of native event loops. Has specialized handling for asyncio implementation details.

Constructor arguments are self-explanatory. Pass as appropriate; all are applied on top of context.EVENT_LOOP_BASE_FLAGS.

__enter__() asyncio.AbstractEventLoop[source]

Enter the context, returning the underlying asyncio event loop, which is fetched on demand.

__exit__(t: None, v: None, b: None, /) Literal[False][source]
__exit__(t: asyncutils._internal.types.ExcType, v: BaseException, b: types.TracebackType, /) bool

Exit the context. This stops and closes the event loop if the flags say so.

__hash__() int[source]

Return the flags of the manager as its hash, not considering its state.

__reduce__() tuple[collections.abc.Callable[[int], Self], tuple[int]][source]

Support for pickling.

_get_unclosed_loop(factory: collections.abc.Callable[[], asyncio.AbstractEventLoop] = ...) asyncio.AbstractEventLoop[source]

Return a usable asyncio event loop from the internal pool, or a new event loop if there are none.

clear_flags(mask_to_keep: int = ...) None[source]

Reset the configuration of the manager to the equivalent of passing all keyword arguments as False, except those covered by mask_to_keep.

copy_flags() Self[source]

Return an unentered instance with the same configuration as this that manages a different event loop.

factory_reset() None[source]

Restore the default settings from the context (i.e., set the flags to context.EVENT_LOOP_BASE_FLAGS).

flags_eq(other: Self, /) bool[source]
flags_eq(flags: int, /) bool

Return whether the configuration of this manager is the same as that of other, regardless of their respective states.

classmethod from_flags(flags: int, /) Self[source]

Construct an instance from flags, a bitwise or of options (default context.EVENT_LOOP_BASE_FLAGS).

constructor_args: Final = ('dont_release_loop_on_finalization', 'silent_on_finalize', 'dont_try_clear_tasks_on_reuse',...

A tuple of all keyword arguments accepted by the constructor in order of the offset corresponding to the flag in the flags representation.

asyncutils.base.adisembowel[T](it: asyncutils._internal.types.SupportsPop[T], /) collections.abc.AsyncGenerator[T]

Asynchronously disembowel an iterable from the right using its pop method and yield its items from right to left.

asyncutils.base.adisembowelleft[T](it: asyncutils._internal.types.SupportsPopLeft[T], /) collections.abc.AsyncGenerator[T]

Asynchronously disembowel an iterable from the left using its popleft method and yield its items from left to right.

asyncutils.base.aenumerate[T](
it: asyncutils._internal.types.SupportsIteration[T],
start: int = ...,
*,
step: int = ...,
) collections.abc.AsyncGenerator[tuple[int, T]][source]

The async version of enumerate, except it is not a class and additionally supports the step parameter.

asyncutils.base.aiter_to_gen[T, R](
ait: collections.abc.AsyncGenerator[T, R],
*,
use_futures: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
strict: bool = ...,
) collections.abc.Generator[T, R][source]
asyncutils.base.aiter_to_gen(
ait: collections.abc.AsyncIterable[T],
*,
use_futures: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
strict: bool = ...,
) collections.abc.Generator[T]
asyncutils.base.aiter_to_gen(
ait: collections.abc.Iterable[T],
*,
use_futures: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
strict: Literal[False] = ...,
) collections.abc.Generator[T]
Convert an async iterable ait to a sync generator.
If the event loop is currently running and use_futures is False (default context.AITER_TO_GEN_DEFAULT_ALLOW_FUTURES), raise RuntimeError to clarify that concurrent.futures.Future must be used in this case, one per item yielded,
which is somewhat inefficient, but that can’t be helped.
If strict is True (default context.AITER_TO_GEN_DEFAULT_STRICT), only async iterables are accepted.
async asyncutils.base.collect[T](
it: asyncutils._internal.types.SupportsIteration[T],
n: int | None = ...,
*,
default: T | asyncutils._internal.types.RaiseType = ...,
) list[T][source]
Return a list of the first n items in the (async) iterable, consuming it up to that point exactly.
If there are less than n items to collect, throw exceptions.ItemsExhausted if default is constants.RAISE and emit a debug
message through the logger before padding the behind of the list with copies of the default if passed otherwise.

See also

iters.basic_collect()

a possibly slightly faster variant that doesn’t accept a default.

iters.to_list()

the most barebones variant equivalent to the case when n is not passed.

async asyncutils.base.collect_into[T](
out: collections.abc.MutableSequence[T],
it: asyncutils._internal.types.SupportsIteration[T],
n: int | None = ...,
*,
default: T | asyncutils._internal.types.RaiseType = ...,
) None[source]
Extend a mutable sequence with the first n items in the (async) iterable, consuming it up to that point exactly.
If there are less than n items to collect, throw exceptions.ItemsExhausted if default is constants.RAISE and emit a debug
message through the logger before padding the behind of the list with copies of the default if passed otherwise.
asyncutils.base.drop[T](
it: asyncutils._internal.types.SupportsIteration[T],
n: int,
*,
raising: bool = ...,
) collections.abc.AsyncGenerator[T][source]

Discard n items from the (async) iterable and yield the rest. If there are not enough items and raising is True, throw exceptions.ItemsExhausted.

asyncutils.base.iter_to_agen[T, R](
it: collections.abc.AsyncGenerator[T, R],
sentinel: T = ...,
*,
use_existing_executor: bool = ...,
create_executor: bool = ...,
strict: Literal[False] = ...,
) collections.abc.AsyncGenerator[T, R][source]
asyncutils.base.iter_to_agen(
it: collections.abc.AsyncIterable[T],
sentinel: T = ...,
*,
use_existing_executor: bool = ...,
create_executor: bool = ...,
strict: Literal[False] = ...,
) collections.abc.AsyncGenerator[T]
asyncutils.base.iter_to_agen(
it: collections.abc.Iterable[T],
*,
use_existing_executor: bool = ...,
create_executor: bool = ...,
strict: bool = ...,
) collections.abc.AsyncGenerator[T]
asyncutils.base.iter_to_agen(
it: collections.abc.Iterable[T],
sentinel: T,
*,
use_existing_executor: bool = ...,
create_executor: bool = ...,
strict: bool = ...,
) collections.abc.AsyncGenerator[T]
Convert the (async) iterable it to an async generator as non-blockingly as possible.
If it is an async generator and sentinel is not passed, it is returned as is.
Values sent to the return async generator will be passed through to the original.
The async generator will stop when it encounters an item identical to sentinel.
When use_existing_executor=True is passed (default context.ITER_TO_AGEN_DEFAULT_USE_EXISTING_EXECUTOR), the function will attempt to use
an existing executor as created by previous calls specifying create_executor=True (default context.ITER_TO_AGEN_DEFAULT_MAY_CREATE_EXECUTOR)
to advance the iterable, and fall back to blocking the event loop every step without an executor.
If strict is True (default context.ITER_TO_AGEN_DEFAULT_STRICT), only sync iterables are accepted.
async asyncutils.base.safe_cancel_batch[T](
batch: asyncutils._internal.types.SupportsIteration[asyncio.Future[T]],
/,
*,
callback: collections.abc.Callable[[T | BaseException], object] | None = ...,
disembowel: Literal[False] = ...,
raising: bool = ...,
) None[source]
async asyncutils.base.safe_cancel_batch(
batch: asyncutils._internal.types.SupportsPop[asyncio.Future[T]],
/,
*,
callback: collections.abc.Callable[[T | BaseException], object] | None = ...,
disembowel: Literal[True],
raising: bool = ...,
) None
Cancel an (async) iterable of futures, waiting for the cancellations to complete asynchronously.
The batch cancellation itself can be reliably cancelled.
Afterwards, if disembowel is True, clear the iterable using its pop() method repeatedly, falling back to clear().
The callback is called on each result or exception of the futures after CancelledError was thrown into them concurrently.
If raising is True, all calls of the callback that themselves threw exceptions are collected into a BaseExceptionGroup,
which is then raised.
async asyncutils.base.sleep_forever() NoReturn

A coroutine that only completes when an exception is thrown in. The exception is propagated.

asyncutils.base.take[T](
it: asyncutils._internal.types.SupportsIteration[T],
n: int,
*,
default: T | asyncutils._internal.types.RaiseType,
) collections.abc.AsyncGenerator[T][source]
asyncutils.base.take(it: asyncutils._internal.types.SupportsIteration[T], n: int | None) collections.abc.AsyncGenerator[T]

Yield n items from the (async) iterable.

Tip

To ensure there are exactly n items in the resultant async generator, pass a default value. In particular, pass constants.RAISE as default to cause exceptions.ItemsExhausted to be thrown in the case that there aren’t enough items. If n is None, take all items.

asyncutils.base.dummy_task: asyncutils._internal.types.GeneratorCoroutine[Never, Any, Any]

An awaitable object that completes immediately. Also an exhausted generator.

Implementation detail

This is achieved by setting the inspect.CO_ITERABLE_COROUTINE flag on the code of a generator function.

asyncutils.base.yield_to_event_loop: collections.abc.Awaitable[None]

An awaitable and picklable singleton that yields control to the event loop for exactly one iteration when awaited, much like asyncio.sleep(s) for non-positive s.