asyncutils.base

The most useful and fundamental patterns and helpers core to this module and are therefore required by the asyncutils.console submodule, among many others.

Attributes

dummy_task

An awaitable object that completes immediately. Also an exhausted generator.

yield_to_event_loop

An awaitable and picklable singleton that yields control to the event loop for exactly one iteration when awaited, much like asyncio.sleep(s) for non-positive s.

Classes

event_loop

A context manager controlling lifecycles of native event loops. Has specialized handling for asyncio implementation details.

Functions

adisembowel(→ collections.abc.AsyncGenerator[T])

Asynchronously disembowel an iterable from the right using its pop method and yield its items from right to left.

adisembowelleft(→ collections.abc.AsyncGenerator[T])

Asynchronously disembowel an iterable from the left using its popleft method and yield its items from left to right.

aenumerate(→ collections.abc.AsyncGenerator[tuple[int, T]])

The async version of enumerate, except it is not a class and additionally supports the step parameter.

aiter_to_gen(…)

collect(→ list[T])

collect_into(→ None)

drop(→ collections.abc.AsyncGenerator[T])

Discard n items from the (async) iterable and yield the rest. If there are not enough items and raising is True, throw ItemsExhausted.

iter_to_agen(…)

safe_cancel_batch(…)

sleep_forever(→ NoReturn)

A coroutine that only completes when an exception is thrown in. The exception is propagated.

take(…)

Yield n items from the (async) iterable. If n is None, take all items.

Module Contents

class asyncutils.base.event_loop[source]

A context manager controlling lifecycles of native event loops. Has specialized handling for asyncio implementation details.

Constructor arguments are self-explanatory. Pass as appropriate; all are applied on top of EVENT_LOOP_BASE_FLAGS.

class Flags

Bases: enum.IntFlag

An enumeration of all keyword arguments accepted by the constructor in order of the offset corresponding to the flag in the flags representation.

Initialize self. See help(type(self)) for accurate signature.

ATTEMPT_AENTER = 8192
CANCEL_ALL_TASKS = 64
CLOSE_EXISTING_ON_EXIT = 8
DONT_ALLOW_REUSE = 1024
DONT_ALWAYS_STOP_ON_EXIT = 16
DONT_ATTEMPT_ENTER = 4096
DONT_CLOSE_CREATED_ON_EXIT = 32
DONT_REUSE = 2048
DONT_TRY_CLEAR_TASKS_ON_REUSE = 4
FAIL_SILENT = 512
FLIP_RELEASE_LOOP_ON_FINALIZATION = 1
KEEP_LOOP = 128
SILENT_ON_FINALIZE = 2
SUPPRESS_INNER_AEXIT_ON_RUNTIME_ERROR = 32768
SUPPRESS_INNER_EXIT_ON_RUNTIME_ERROR = 16384
SUPPRESS_RUNTIME_ERRORS = 256
__contains__(flagname: str, /) bool[source]
__contains__(flag: int, /) bool

Return whether the manager has the flag specified by name or flag.

__del__() None[source]

Finalize the manager by calling __exit__() if necessary.

__enter__() asyncio.AbstractEventLoop[source]

Enter the context, returning the underlying asyncio event loop, which is fetched on demand.

__exit__(exc_typ: None, exc_val: None, exc_tb: None, /) Literal[False][source]
__exit__(exc_typ: asyncutils._internal.prots.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) bool

Exit the context. This stops and closes the event loop if the flags say so.

__hash__() int[source]

Return the flags of the manager as its hash, not considering its state.

__reduce__() tuple[collections.abc.Callable[[int], Self], tuple[int]][source]

Support for pickling.

_get_unclosed_loop(factory: collections.abc.Callable[[], asyncio.AbstractEventLoop] = ...) asyncio.AbstractEventLoop[source]

Return a usable asyncio event loop from the internal pool, or a new event loop if there are none.

clear_flags(mask_to_keep: int = ...) None[source]

Reset the configuration of the manager to the equivalent of passing all keyword arguments as False, except those covered by mask_to_keep.

copy_flags() Self[source]

Return an unentered instance with the same configuration as this that manages a different event loop.

factory_reset() None[source]

Restore the default settings from the context (i.e., set the flags to EVENT_LOOP_BASE_FLAGS).

flags_eq(other: Self, /) bool[source]
flags_eq(flags: int, /) bool

Return whether the configuration of this manager is the same as that of other, regardless of their respective states.

classmethod from_flags(flags: int, /) Self[source]

Construct an instance from flags, a bitwise or of options (default EVENT_LOOP_BASE_FLAGS).

asyncutils.base.adisembowel[T](it: asyncutils._internal.prots.SupportsPop[T], /) collections.abc.AsyncGenerator[T]

Asynchronously disembowel an iterable from the right using its pop method and yield its items from right to left.

asyncutils.base.adisembowelleft[T](it: asyncutils._internal.prots.SupportsPopLeft[T], /) collections.abc.AsyncGenerator[T]

Asynchronously disembowel an iterable from the left using its popleft method and yield its items from left to right.

asyncutils.base.aenumerate[T](
it: asyncutils._internal.prots.SupportsIteration[T],
start: int = ...,
*,
step: int = ...,
) collections.abc.AsyncGenerator[tuple[int, T]][source]

The async version of enumerate, except it is not a class and additionally supports the step parameter.

asyncutils.base.aiter_to_gen[T, R](
ait: collections.abc.AsyncGenerator[T, R],
*,
use_futures: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
strict: bool = ...,
) collections.abc.Generator[T, R][source]
asyncutils.base.aiter_to_gen(
ait: collections.abc.AsyncIterable[T],
*,
use_futures: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
strict: bool = ...,
) collections.abc.Generator[T]
asyncutils.base.aiter_to_gen(
ait: collections.abc.Iterable[T],
*,
use_futures: bool = ...,
loop: asyncio.AbstractEventLoop | None = ...,
strict: Literal[False] = ...,
) collections.abc.Generator[T]
Convert an async iterable ait to a sync generator.
If the event loop is currently running and use_futures is False (default AITER_TO_GEN_DEFAULT_ALLOW_FUTURES), raise RuntimeError to clarify that concurrent.futures.Future must be used in this case, one per item yielded, which is somewhat inefficient, but that can’t be helped.
If strict is True (default AITER_TO_GEN_DEFAULT_STRICT), only async iterables are accepted.
async asyncutils.base.collect[T](
it: asyncutils._internal.prots.SupportsIteration[T],
n: int | None = ...,
*,
default: T | asyncutils._internal.prots.RaiseType = ...,
) list[T][source]
Return a list of the first n items in the (async) iterable, consuming it up to that point exactly.
If there are less than n items to collect, throw ItemsExhausted if default is RAISE and emit a debug message through the logger before padding the behind of the list with copies of the default if passed otherwise.

See also

basic_collect()

a possibly slightly faster variant that doesn’t accept a default.

to_list()

the most barebones variant equivalent to the case when n is not passed.

async asyncutils.base.collect_into[T](
out: collections.abc.MutableSequence[T],
it: asyncutils._internal.prots.SupportsIteration[T],
n: int | None = ...,
*,
default: T | asyncutils._internal.prots.RaiseType = ...,
) None[source]
Extend a mutable sequence with the first n items in the (async) iterable, consuming it up to that point exactly.
If there are less than n items to collect, throw ItemsExhausted if default is RAISE and emit a debug message through the logger before padding the behind of the list with copies of the default if passed otherwise.
asyncutils.base.drop[T](
it: asyncutils._internal.prots.SupportsIteration[T],
n: int,
*,
raising: bool = ...,
) collections.abc.AsyncGenerator[T][source]

Discard n items from the (async) iterable and yield the rest. If there are not enough items and raising is True, throw ItemsExhausted.

asyncutils.base.iter_to_agen[T, R](
it: collections.abc.AsyncGenerator[T, R],
sentinel: T = ...,
*,
use_existing_executor: bool = ...,
create_executor: bool = ...,
strict: Literal[False] = ...,
) collections.abc.AsyncGenerator[T, R][source]
asyncutils.base.iter_to_agen(
it: collections.abc.AsyncIterable[T],
sentinel: T = ...,
*,
use_existing_executor: bool = ...,
create_executor: bool = ...,
strict: Literal[False] = ...,
) collections.abc.AsyncGenerator[T]
asyncutils.base.iter_to_agen(
it: collections.abc.Iterable[T],
*,
use_existing_executor: bool = ...,
create_executor: bool = ...,
strict: bool = ...,
) collections.abc.AsyncGenerator[T]
asyncutils.base.iter_to_agen(
it: collections.abc.Iterable[T],
sentinel: T,
*,
use_existing_executor: bool = ...,
create_executor: bool = ...,
strict: bool = ...,
) collections.abc.AsyncGenerator[T]
Convert the (async) iterable it to an async generator as non-blockingly as possible.
If it is an async generator and sentinel is not passed, it is returned as is.
Values sent to the return async generator will be passed through to the original.
The async generator will stop when it encounters an item identical to sentinel.
When use_existing_executor=True is passed (default ITER_TO_AGEN_DEFAULT_USE_EXISTING_EXECUTOR), the function will attempt to use an existing executor as created by previous calls specifying create_executor=True (default ITER_TO_AGEN_DEFAULT_MAY_CREATE_EXECUTOR) to advance the iterable, and fall back to blocking the event loop every step without an executor.
If strict is True (default ITER_TO_AGEN_DEFAULT_STRICT), only sync iterables are accepted.
async asyncutils.base.safe_cancel_batch[T](
batch: asyncutils._internal.prots.SupportsIteration[asyncio.Future[T]],
/,
*,
callback: collections.abc.Callable[[T | BaseException], object] | None = ...,
disembowel: Literal[False] = ...,
raising: bool = ...,
) None[source]
async asyncutils.base.safe_cancel_batch(
batch: asyncutils._internal.prots.SupportsPop[asyncio.Future[T]],
/,
*,
callback: collections.abc.Callable[[T | BaseException], object] | None = ...,
disembowel: Literal[True],
raising: bool = ...,
) None
Cancel an (async) iterable of futures, waiting for the cancellations to complete asynchronously.
The batch cancellation itself can be cancelled, but less reliably and granularly than safe_cancel().
Afterwards, if disembowel is True, clear the iterable using its pop() method repeatedly, falling back to clear().
The callback is called on each result or exception of the futures after CancelledError was thrown into them concurrently.
If raising is True, all calls of the callback that themselves threw exceptions are collected into a BaseExceptionGroup, which is then raised.
async asyncutils.base.sleep_forever() NoReturn

A coroutine that only completes when an exception is thrown in. The exception is propagated.

asyncutils.base.take[T](
it: asyncutils._internal.prots.SupportsIteration[T],
n: int,
*,
default: T | asyncutils._internal.prots.RaiseType,
) collections.abc.AsyncGenerator[T][source]
asyncutils.base.take(it: asyncutils._internal.prots.SupportsIteration[T], n: int | None) collections.abc.AsyncGenerator[T]

Yield n items from the (async) iterable. If n is None, take all items.

Tip

To ensure there are exactly n items in the resultant async generator, pass a default value. In particular, pass RAISE as default to cause ItemsExhausted to be thrown in the case that there aren’t enough items.

asyncutils.base.dummy_task: asyncutils._internal.prots.GeneratorCoroutine[Never, Any, Any]

An awaitable object that completes immediately. Also an exhausted generator.

Implementation detail

This is achieved by setting the inspect.CO_ITERABLE_COROUTINE flag on the code of a generator function.

asyncutils.base.yield_to_event_loop: collections.abc.Awaitable[None]

An awaitable and picklable singleton that yields control to the event loop for exactly one iteration when awaited, much like asyncio.sleep(s) for non-positive s.