asyncutils.base¶
The most useful and fundamental patterns and helpers core to this module and are therefore required by the asyncutils.console submodule, among many others.
Attributes¶
An awaitable object that completes immediately. Also an exhausted generator. |
|
An awaitable and picklable singleton that yields control to the event loop for exactly one iteration when awaited, much like |
Classes¶
A context manager to manage lifecycles of native event loops. Has specialized handling for |
Functions¶
|
Asynchronously disembowel an iterable from the right using its pop method and yield its items from right to left. |
|
Asynchronously disembowel an iterable from the left using its popleft method and yield its items from left to right. |
|
The async version of |
|
|
|
|
|
|
|
Discard |
|
|
|
A coroutine that only completes when an exception is thrown in. The exception is propagated. |
|
Yield |
Module Contents¶
- class asyncutils.base.event_loop[source]¶
A context manager to manage lifecycles of native event loops. Has specialized handling for
asyncioimplementation details.Constructor arguments are self-explanatory. Pass as appropriate; all are applied on top of
context.EVENT_LOOP_BASE_FLAGS.- __enter__() asyncio.AbstractEventLoop[source]¶
Enter the context, returning the underlying
asyncioevent loop, which is fetched on demand.
- __exit__(t: None, v: None, b: None, /) Literal[False][source]¶
- __exit__(t: asyncutils._internal.types.ExcType, v: BaseException, b: types.TracebackType, /) bool
Exit the context. This stops and closes the event loop if the flags say so.
- __reduce__() tuple[collections.abc.Callable[[int], Self], tuple[int]][source]¶
Support for pickling.
- _get_unclosed_loop(factory: collections.abc.Callable[[], asyncio.AbstractEventLoop] = ...) asyncio.AbstractEventLoop[source]¶
Return a usable
asyncioevent loop from the internal pool, or a new event loop if there are none.
- clear_flags(mask_to_keep: int = ...) None[source]¶
Reset the configuration of the manager to the equivalent of passing all keyword arguments as
False, except those covered bymask_to_keep.
- copy_flags() Self[source]¶
Return an unentered instance with the same configuration as this that manages a different event loop.
- factory_reset() None[source]¶
Restore the default settings from the context (i.e., set the flags to
context.EVENT_LOOP_BASE_FLAGS).
- flags_eq(other: Self, /) bool[source]¶
- flags_eq(flags: int, /) bool
Return whether the configuration of this manager is the same as that of
other, regardless of their respective states.
- classmethod from_flags(flags: int, /) Self[source]¶
Construct an instance from
flags, a bitwise or of options (defaultcontext.EVENT_LOOP_BASE_FLAGS).
- constructor_args: Final = ('dont_release_loop_on_finalization', 'silent_on_finalize', 'dont_try_clear_tasks_on_reuse',...¶
A tuple of all keyword arguments accepted by the constructor in order of the offset corresponding to the flag in the flags representation.
- asyncutils.base.adisembowel[T](it: asyncutils._internal.types.SupportsPop[T], /) collections.abc.AsyncGenerator[T]¶
Asynchronously disembowel an iterable from the right using its pop method and yield its items from right to left.
- asyncutils.base.adisembowelleft[T](it: asyncutils._internal.types.SupportsPopLeft[T], /) collections.abc.AsyncGenerator[T]¶
Asynchronously disembowel an iterable from the left using its popleft method and yield its items from left to right.
- asyncutils.base.aenumerate[T](
- it: asyncutils._internal.types.SupportsIteration[T],
- start: int = ...,
- *,
- step: int = ...,
The async version of
enumerate, except it is not a class and additionally supports thestepparameter.
- asyncutils.base.aiter_to_gen[T, R](
- ait: collections.abc.AsyncGenerator[T, R],
- *,
- use_futures: bool = ...,
- loop: asyncio.AbstractEventLoop | None = ...,
- strict: bool = ...,
- asyncutils.base.aiter_to_gen(
- ait: collections.abc.AsyncIterable[T],
- *,
- use_futures: bool = ...,
- loop: asyncio.AbstractEventLoop | None = ...,
- strict: bool = ...,
- asyncutils.base.aiter_to_gen(
- ait: collections.abc.Iterable[T],
- *,
- use_futures: bool = ...,
- loop: asyncio.AbstractEventLoop | None = ...,
- strict: Literal[False] = ...,
- Convert an async iterable
aitto a sync generator.If the event loop is currently running anduse_futuresisFalse(defaultcontext.AITER_TO_GEN_DEFAULT_ALLOW_FUTURES), raiseRuntimeErrorto clarify thatconcurrent.futures.Futuremust be used in this case, one per item yielded,which is somewhat inefficient, but that can’t be helped.IfstrictisTrue(defaultcontext.AITER_TO_GEN_DEFAULT_STRICT), only async iterables are accepted.
- async asyncutils.base.collect[T](
- it: asyncutils._internal.types.SupportsIteration[T],
- n: int | None = ...,
- *,
- default: T | asyncutils._internal.types.RaiseType = ...,
- Return a list of the first
nitems in the (async) iterable, consuming it up to that point exactly.If there are less thannitems to collect, throwexceptions.ItemsExhaustedif default isconstants.RAISEand emit a debugmessage through the logger before padding the behind of the list with copies of the default if passed otherwise.See also
iters.basic_collect()a possibly slightly faster variant that doesn’t accept a default.
iters.to_list()the most barebones variant equivalent to the case when
nis not passed.
- async asyncutils.base.collect_into[T](
- out: collections.abc.MutableSequence[T],
- it: asyncutils._internal.types.SupportsIteration[T],
- n: int | None = ...,
- *,
- default: T | asyncutils._internal.types.RaiseType = ...,
- Extend a mutable sequence with the first
nitems in the (async) iterable, consuming it up to that point exactly.If there are less thannitems to collect, throwexceptions.ItemsExhaustedif default isconstants.RAISEand emit a debugmessage through the logger before padding the behind of the list with copies of the default if passed otherwise.
- asyncutils.base.drop[T](
- it: asyncutils._internal.types.SupportsIteration[T],
- n: int,
- *,
- raising: bool = ...,
Discard
nitems from the (async) iterable and yield the rest. If there are not enough items and raising is True, throwexceptions.ItemsExhausted.
- asyncutils.base.iter_to_agen[T, R](
- it: collections.abc.AsyncGenerator[T, R],
- sentinel: T = ...,
- *,
- use_existing_executor: bool = ...,
- create_executor: bool = ...,
- strict: Literal[False] = ...,
- asyncutils.base.iter_to_agen(
- it: collections.abc.AsyncIterable[T],
- sentinel: T = ...,
- *,
- use_existing_executor: bool = ...,
- create_executor: bool = ...,
- strict: Literal[False] = ...,
- asyncutils.base.iter_to_agen(
- it: collections.abc.Iterable[T],
- *,
- use_existing_executor: bool = ...,
- create_executor: bool = ...,
- strict: bool = ...,
- asyncutils.base.iter_to_agen(
- it: collections.abc.Iterable[T],
- sentinel: T,
- *,
- use_existing_executor: bool = ...,
- create_executor: bool = ...,
- strict: bool = ...,
- Convert the (async) iterable
itto an async generator as non-blockingly as possible.Ifitis an async generator andsentinelis not passed, it is returned as is.Values sent to the return async generator will be passed through to the original.The async generator will stop when it encounters an item identical tosentinel.Whenuse_existing_executor=Trueis passed (defaultcontext.ITER_TO_AGEN_DEFAULT_USE_EXISTING_EXECUTOR), the function will attempt to usean existing executor as created by previous calls specifyingcreate_executor=True(defaultcontext.ITER_TO_AGEN_DEFAULT_MAY_CREATE_EXECUTOR)to advance the iterable, and fall back to blocking the event loop every step without an executor.IfstrictisTrue(defaultcontext.ITER_TO_AGEN_DEFAULT_STRICT), only sync iterables are accepted.
- async asyncutils.base.safe_cancel_batch[T](
- batch: asyncutils._internal.types.SupportsIteration[asyncio.Future[T]],
- /,
- *,
- callback: collections.abc.Callable[[T | BaseException], object] | None = ...,
- disembowel: Literal[False] = ...,
- raising: bool = ...,
- async asyncutils.base.safe_cancel_batch(
- batch: asyncutils._internal.types.SupportsPop[asyncio.Future[T]],
- /,
- *,
- callback: collections.abc.Callable[[T | BaseException], object] | None = ...,
- disembowel: Literal[True],
- raising: bool = ...,
- Cancel an (async) iterable of futures, waiting for the cancellations to complete asynchronously.The batch cancellation itself can be reliably cancelled.Afterwards, if
disembowelisTrue, clear the iterable using itspop()method repeatedly, falling back toclear().The callback is called on each result or exception of the futures afterCancelledErrorwas thrown into them concurrently.IfraisingisTrue, all calls of the callback that themselves threw exceptions are collected into aBaseExceptionGroup,which is then raised.
- async asyncutils.base.sleep_forever() NoReturn¶
A coroutine that only completes when an exception is thrown in. The exception is propagated.
- asyncutils.base.take[T](
- it: asyncutils._internal.types.SupportsIteration[T],
- n: int,
- *,
- default: T | asyncutils._internal.types.RaiseType,
- asyncutils.base.take(it: asyncutils._internal.types.SupportsIteration[T], n: int | None) collections.abc.AsyncGenerator[T]
Yield
nitems from the (async) iterable.Tip
To ensure there are exactly
nitems in the resultant async generator, pass a default value. In particular, passconstants.RAISEasdefaultto causeexceptions.ItemsExhaustedto be thrown in the case that there aren’t enough items. Ifnis None, take all items.
- asyncutils.base.dummy_task: asyncutils._internal.types.GeneratorCoroutine[Never, Any, Any]¶
An awaitable object that completes immediately. Also an exhausted generator.
Implementation detail
This is achieved by setting the
inspect.CO_ITERABLE_COROUTINEflag on the code of a generator function.
- asyncutils.base.yield_to_event_loop: collections.abc.Awaitable[None]¶
An awaitable and picklable singleton that yields control to the event loop for exactly one iteration when awaited, much like
asyncio.sleep(s)for non-positives.