asyncutils.processors¶
Processors for asynchronous tasks.
Classes¶
Call a processor with items batched to a certain size from different sources, with an optional time limit for batches. |
|
Call a processor with items batched to a certain size from different sources with bounded concurrency. |
|
Module Contents¶
- class asyncutils.processors.BatchProcessor[T](
- processor: collections.abc.Callable[[list[T]], collections.abc.Awaitable[None]],
- *,
- maxsize: int = ...,
- maxtime: float = ...,
- timer: asyncutils._internal.types.Timer = ...,
Bases:
asyncutils.mixins.LoopContextMixinCall a processor with items batched to a certain size from different sources, with an optional time limit for batches.
Caution
Use instances of this class as async context managers to ensure proper cleanup.
maxsizedefaults tocontext.BATCH_PROCESSOR_DEFAULT_MAX_SIZE,maxtimetocontext.BATCH_PROCESSOR_DEFAULT_MAX_TIME, andtimertime.monotonic().
- class asyncutils.processors.BoundedBatchProcessor[T, R](
- processor: collections.abc.Callable[[list[T]], collections.abc.Awaitable[R]],
- batch: int = ...,
- max_concurrent: int = ...,
Call a processor with items batched to a certain size from different sources with bounded concurrency.
batchdefaults tocontext.BOUNDED_BATCH_PROCESSOR_DEFAULT_BATCH_SIZEandmax_concurrentcontext.BOUNDED_BATCH_PROCESSOR_DEFAULT_MAX_CONCURRENT.- process(items: asyncutils._internal.types.SupportsIteration[T]) collections.abc.AsyncGenerator[R][source]¶
Call the processor on batches of items from the source and yield the results as they arrive.
- class asyncutils.processors.Bulkhead(
- max_concurrent: int,
- *,
- max_queue: int = ...,
- max_rej: int = ...,
- exc: asyncutils._internal.types.Exceptable = ...,
- processor: collections.abc.Callable[[BaseException], collections.abc.Awaitable[None]] = ...,
Bases:
asyncutils.mixins.LoopContextMixinLimit the number of concurrent executions of coroutines, with an optional queue for pending executions and an optional callback thathandles exceptions raised by the processor.Caution
Use instances of this class as async context managers to ensure proper cleanup.
max_concurrent(required): maximum number of concurrent executions allowedmax_queue: maximum number of pending executions allowed in the queue; if the queue is full, new executions will be rejected until there is space in the queue. Non-positive value means no limit (there is currently no way to get a zero-capacity queue). Defaultcontext.BULKHEAD_DEFAULT_MAX_QUEUE.max_rej: maximum number of rejections allowed before the bulkhead shuts down and rejects all new executions. Negative value means no limit. Defaultcontext.BULKHEAD_DEFAULT_MAX_REJ.exc: the type of exceptions that the processor may raise and should be caught and passed to theprocessorcallback. DefaultException.- async execute(coro: collections.abc.Awaitable[Any]) None[source]¶
Queue a coroutine
coroto be executed and execute a coroutine that may not be the same ascoro. Bulkhead constraints are applied, and the return value is lost.
- async shutdown(timeout: float | None = ...) list[collections.abc.Awaitable[Any]][source]¶
Shut down the bulkhead and return all incomplete tasks.
- async wait_for_shutdown(timeout: float | None = ...) None[source]¶
Wait until the bulkhead enters the shutdown phase.
- async wait_until_idle(timeout: float | None = ...) Literal[True][source]¶
Wait until no tasks are running.
- property available_qslots: float¶
The number of available slots in the task queue. Usually an integer, but
float('inf')if the queue is unbounded.