asyncutils.queues¶
Non-inheriting extensions of asyncio.Queue with more methods and password protection, and a PotentQueueBase ABC.
Attributes¶
Instance of |
|
Instance of |
|
Instance of |
|
Instance of |
Classes¶
A base class for queues with much more methods, async- and sync-compatible. |
|
A base class for queues with much more methods, async- and sync-compatible. |
|
A priority queue, where the priority of each item is determined by comparing it to other items, meaning each item should at least implement |
|
A base class for queues with much more methods, async- and sync-compatible. |
|
Functions¶
Module Contents¶
- class asyncutils.queues.PotentQueueBase[T][source]¶
Bases:
asyncio.Queue[T],asyncutils._internal.helpers.LoopMixinBase,abc.ABCA base class for queues with much more methods, async- and sync-compatible.
- __aiter__() collections.abc.AsyncGenerator[T]¶
Equivalent to
drain_persistent().
- __iter__() collections.abc.Generator[T]¶
Equivalent to
drain_until_empty().
- abstractmethod _get() T[source]¶
Get an item from the queue if not empty; called in
get()andget_nowait().
- abstractmethod _init(maxsize: int) None[source]¶
Initialize the queue given
maxsize; called in__init__().
- abstractmethod _put(item: T) None[source]¶
Put an item into the queue if not empty; called in
put()andput_nowait().
- drain_persistent(max_items: int | None = ..., timeout: float | None = ...) collections.abc.AsyncGenerator[T][source]¶
An async generator that gets items from the queue once available and yields them.
- drain_retlist(max_items: int | None = ...) list[T][source]¶
Empty the queue and return a list of the items within.
- drain_until_empty(max_items: int | None = ...) collections.abc.Generator[T][source]¶
A synchronous generator that gets items from the queue until it is emptied and returns.
- enumerate(*, lifo: Literal[False] = ...) SmartQueue[tuple[int, T]][source]¶
- enumerate(*, lifo: Literal[True]) SmartLifoQueue[tuple[int, T]]
Return a queue containing the items from enumerate applied on this queue and empty it in the process.
- enumerate_nowait(start: int = ..., *, step: int = ...) collections.abc.Generator[tuple[int, T]][source]¶
Do the equivalent of zipping
itertools.count(start, step)with the items of the queue. When the returned generator is advanced and the queue is empty at that moment, the generator stops entirely.
- async extend(it: asyncutils._internal.types.SupportsIteration[T], timeout: float | None = ...) None[source]¶
Add the items from
itinto the queue withintimeout.
- filter(pred: collections.abc.Callable[[T], bool] = ..., *, lifo: Literal[False] = ...) SmartQueue[T][source]¶
- filter(pred: collections.abc.Callable[[T], bool] = ..., *, lifo: Literal[True]) SmartLifoQueue[T]
Return a new queue from which getters can get the items in this queue that satisfy the predicate; items remaining in the original queue did not satisfy the predicate.
- filter_nowait(pred: collections.abc.Callable[[T], bool] = ..., /) tuple[list[T], int][source]¶
Filter items in the queue by a predicate and return a list of removed items and an integer; the items in the returned list after the index corresponding to that integer were items rejected from the queue due to the queue being full.
- map[R](
- f: collections.abc.Callable[[T], collections.abc.Awaitable[R]],
- stop_when: asyncio.Future[None] | None = ...,
- *,
- lifo: Literal[False] = ...,
- map(
- f: collections.abc.Callable[[T], collections.abc.Awaitable[R]],
- stop_when: asyncio.Future[None] | None = ...,
- *,
- lifo: Literal[True],
Return a queue that contains items from this queue with the function applied on each of them, emptying this queue in the process (transformation analogous to
map).
- poppush_nowait(item: T, raising: bool = ...) T[source]¶
Pop an item from the queue and push into the other end immediately.
- push(item: T) bool[source]¶
Put an item into the queue immediately, popping if necessary; returns success.
- pushpop_nowait(item: T, raising: bool = ...) T[source]¶
Push an item into the queue and pop from the other end immediately.
- shutdown(immediate: bool = ...) None[source]¶
Shut down the queue. If
immediateisTrue, pending gets raise immediately even if the queue is not empty.
- async smart_get(*, timeout: float | None = ..., default: T = ...) T[source]¶
Call
get_nowait()if an item is immediately available, waiting for a item withtimeoutotherwise; if the timeout expires anddefaultis provided, return it.
- async smart_put(item: T, *, timeout: float | None = ..., raising: bool = ...) bool | None[source]¶
Call
put_nowait()if a slot is immediately available, waiting for a slot withtimeoutotherwise; if the timeout expires andraisingis True, throwTimeoutError.
- starmap[R, *Ts](
- f: collections.abc.Callable[[*Ts], collections.abc.Awaitable[R]],
- stop_when: asyncio.Future[None] | None = ...,
- *,
- lifo: Literal[False] = ...,
- starmap(
- f: collections.abc.Callable[[*Ts], collections.abc.Awaitable[R]],
- stop_when: asyncio.Future[None] | None = ...,
- *,
- lifo: Literal[True],
Return a queue that contains items from this queue with the function applied on each of them starred, emptying this queue in the process (transformation analogous to
itertools.starmap).
- transaction() contextlib.AbstractContextManager[Self, None][source]¶
- Return an async context manager which begins a transaction on entry.If an error occurs within the context, the original items in the queue are restored and the error reraised, unless the error is criticaland deemed to require immediate exit. Otherwise, the transaction completes successfully and changes are committed on exit.
- property can_get_now: bool¶
Whether items can be get from the queue without blocking at this instant.
- property can_put_now: bool¶
Whether items can be put into the queue without blocking at this instant.
- class asyncutils.queues.SmartLifoQueue[T][source]¶
Bases:
PotentQueueBase[T]A base class for queues with much more methods, async- and sync-compatible.
- _put(item: T) None[source]¶
Put an item into the queue if not empty; called in
put()andput_nowait().
- class asyncutils.queues.SmartPriorityQueue[T](maxsize: int = ..., *, init_items: asyncutils._internal.types.SupportsIteration[T])[source]¶
- class asyncutils.queues.SmartPriorityQueue(maxsize: int = ...)
Bases:
PotentQueueBase[T]A priority queue, where the priority of each item is determined by comparing it to other items, meaning each item should at least implement
__lt__().- _put(item: T) None[source]¶
Put an item into the queue if not empty; called in
put()andput_nowait().
- peek() T[source]¶
Look at the item that would be returned by
get()orget_nowait()without actually removing it from the queue.
- async start(maxsize: int, init_items: asyncutils._internal.types.SupportsIteration[T]) None[source]¶
- class asyncutils.queues.SmartQueue[T][source]¶
Bases:
PotentQueueBase[T]A base class for queues with much more methods, async- and sync-compatible.
- _put(item: T) None[source]¶
Put an item into the queue if not empty; called in
put()andput_nowait().
- class asyncutils.queues.UserPriorityQueue[T](
- maxsize: int = ...,
- *,
- init_priority: int = ...,
- init_items: asyncutils._internal.types.SupportsIteration[T],
- class asyncutils.queues.UserPriorityQueue(maxsize: int = ..., *, init_priority: int = ...)
Bases:
SmartPriorityQueue[tuple[int,int,T]]A priority queue, where you put in items with an integer priority and the items are retrieved in ascending order of priority, with earlieritems taking precedence in case of ties.Theput()andput_nowait()methods of this class take an additionalpriorityparameter, representing the priority of the item.
- asyncutils.queues.password_queue[T, R](
- password_put: R,
- *,
- maxsize: int = ...,
- protect_get: Literal[False] = ...,
- protect_put: Literal[True] = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- puttyp: type[R] = ...,
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[False] = ...,
- protect_put: Literal[True] = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- put_from: str = ...,
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[False] = ...,
- protect_put: Literal[True] = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- put_from: str = ...,
- puttyp: type[R],
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- password_get: R,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[False],
- can_change_get: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- gettyp: type[R] = ...,
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[False],
- can_change_get: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- gettyp: type[R],
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[False],
- can_change_get: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- password_put: V,
- password_get: R,
- maxsize: int = ...,
- *,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- gettyp: type[R] = ...,
- puttyp: type[V] = ...,
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- password_put: V,
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- gettyp: type[R],
- puttyp: type[V] = ...,
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- password_put: V,
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- puttyp: type[V] = ...,
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- password_get: R,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- put_from: str = ...,
- gettyp: type[R] = ...,
- puttyp: type[V],
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- password_get: R,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- put_from: str = ...,
- gettyp: type[R] = ...,
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- put_from: str = ...,
- gettyp: type[R],
- puttyp: type[V],
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- put_from: str = ...,
- gettyp: type[R],
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- put_from: str = ...,
- puttyp: type[V],
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- put_from: str = ...,
- init_items: asyncutils._internal.types.SupportsIteration[T],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- password_put: R,
- *,
- maxsize: int = ...,
- protect_get: Literal[False] = ...,
- protect_put: Literal[True] = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- puttyp: type[R] = ...,
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[False] = ...,
- protect_put: Literal[True] = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- put_from: str = ...,
- puttyp: type[R],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[False] = ...,
- protect_put: Literal[True] = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- put_from: str = ...,
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- password_get: R,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[False],
- can_change_get: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- gettyp: type[R] = ...,
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[False],
- can_change_get: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- gettyp: type[R],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[False],
- can_change_get: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- strict: bool = ...,
- asyncutils.queues.password_queue(
- password_put: V,
- password_get: R,
- maxsize: int = ...,
- *,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- gettyp: type[R] = ...,
- puttyp: type[V] = ...,
- strict: bool = ...,
- asyncutils.queues.password_queue(
- password_put: V,
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- gettyp: type[R],
- puttyp: type[V] = ...,
- strict: bool = ...,
- asyncutils.queues.password_queue(
- password_put: V,
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- puttyp: type[V] = ...,
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- password_get: R,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- put_from: str = ...,
- gettyp: type[R] = ...,
- puttyp: type[V],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- password_get: R,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- put_from: str = ...,
- gettyp: type[R] = ...,
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- put_from: str = ...,
- gettyp: type[R],
- puttyp: type[V],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- put_from: str = ...,
- gettyp: type[R],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- put_from: str = ...,
- puttyp: type[V],
- strict: bool = ...,
- asyncutils.queues.password_queue(
- *,
- maxsize: int = ...,
- protect_get: Literal[True],
- protect_put: Literal[True] = ...,
- can_change_get: bool = ...,
- can_change_put: bool = ...,
- priority: bool = ...,
- lifo: bool = ...,
- get_from: str = ...,
- put_from: str = ...,
- strict: bool = ...,
- Return a password-protected queue, the type of which does not inherit from
asyncio.Queuebut has the same interface, withmaximum sizemaxsize.priorityandlifoparameters determine if the queue is a priority queue and last-in-first-out.Ifprotect_getisTrue, get and get_nowait will require a password, specified bypassword_getor retrieved from a variable in thecaller’s scope with nameget_from(default :const`context.PASSWORD_QUEUE_DEFAULT_GET_FROM`).Ifprotect_putisTrue, put and put_nowait will require a password, specified bypassword_putor retrieved from a variable in thecaller’s scope with nameput_from(default :const`context.PASSWORD_QUEUE_DEFAULT_PUT_FROM`).Ifinit_itemsis specified, the items in that (async) iterable will be arranged to enter the queue eventually.Danger
This function is meant not for cryptographic purposes, because no hashing of the password is performed! Attackers may obtain sensitive information, namely the passwords used by the queue, from the memory address of the returned object alone, or worse still, access and modify the internal stack/queue storing the items directly.
Note
The excessive amount of overloads here cannot be helped due to accurate typing needs. When we drop support for Python 3.12, we will use default values in the type parameters here to cut this number in half.
Note
The overloads do not cover the technically valid but useless case with both
protect_getandprotect_putbeingFalse.Note
Little type validation for the argument combinations is done at runtime; it is hoped that type checkers will catch most misuses.
- asyncutils.queues.ignore_qempty: Final[asyncutils.exceptions.IgnoreErrors]¶
Instance of
IgnoreErrorsthat suppressesQueueShutDownandQueueEmpty.
- asyncutils.queues.ignore_qerrs: Final[asyncutils.exceptions.IgnoreErrors]¶
Instance of
IgnoreErrorsthat suppresses all asyncio queue-related errors.
- asyncutils.queues.ignore_qfull: Final[asyncutils.exceptions.IgnoreErrors]¶
Instance of
IgnoreErrorsthat suppressesQueueShutDownandQueueFull.
- asyncutils.queues.ignore_qshutdown: Final[asyncutils.exceptions.IgnoreErrors]¶
Instance of
IgnoreErrorsthat suppressesQueueShutDown.