asyncutils.processors

Processors for asynchronous tasks.

Classes

BatchProcessor

Call a processor with items batched to a certain size from different sources, with an optional time limit for batches.

BoundedBatchProcessor

Call a processor with items batched to a certain size from different sources with bounded concurrency.

Bulkhead

Module Contents

class asyncutils.processors.BatchProcessor[T](
processor: collections.abc.Callable[[list[T]], collections.abc.Awaitable[None]],
*,
maxsize: int = ...,
maxtime: float = ...,
timer: asyncutils._internal.types.Timer = ...,
)[source]

Bases: asyncutils.mixins.LoopContextMixin

Call a processor with items batched to a certain size from different sources, with an optional time limit for batches.

Caution

Use instances of this class as async context managers to ensure proper cleanup.

maxsize defaults to context.BATCH_PROCESSOR_DEFAULT_MAX_SIZE, maxtime to context.BATCH_PROCESSOR_DEFAULT_MAX_TIME, and timer time.monotonic().

async __cleanup__() None[source]
async __setup__() None[source]
async add(item: T) None[source]

Add an item to the current batch. If the batch is full, process it asynchronously before returning.

async flush() None[source]

Process the items in the buffer, even if the batch size is not reached.

property time_since_last_process: float

Return the time in seconds since the last batch was processed.

class asyncutils.processors.BoundedBatchProcessor[T, R](
processor: collections.abc.Callable[[list[T]], collections.abc.Awaitable[R]],
batch: int = ...,
max_concurrent: int = ...,
)[source]

Call a processor with items batched to a certain size from different sources with bounded concurrency.

batch defaults to context.BOUNDED_BATCH_PROCESSOR_DEFAULT_BATCH_SIZE and max_concurrent context.BOUNDED_BATCH_PROCESSOR_DEFAULT_MAX_CONCURRENT.

process(items: asyncutils._internal.types.SupportsIteration[T]) collections.abc.AsyncGenerator[R][source]

Call the processor on batches of items from the source and yield the results as they arrive.

class asyncutils.processors.Bulkhead(
max_concurrent: int,
*,
max_queue: int = ...,
max_rej: int = ...,
exc: asyncutils._internal.types.Exceptable = ...,
processor: collections.abc.Callable[[BaseException], collections.abc.Awaitable[None]] = ...,
)[source]

Bases: asyncutils.mixins.LoopContextMixin

Limit the number of concurrent executions of coroutines, with an optional queue for pending executions and an optional callback that
handles exceptions raised by the processor.

Caution

Use instances of this class as async context managers to ensure proper cleanup.

max_concurrent (required): maximum number of concurrent executions allowed
max_queue: maximum number of pending executions allowed in the queue; if the queue is full, new executions will be rejected until there is space in the queue. Non-positive value means no limit (there is currently no way to get a zero-capacity queue). Default context.BULKHEAD_DEFAULT_MAX_QUEUE.
max_rej: maximum number of rejections allowed before the bulkhead shuts down and rejects all new executions. Negative value means no limit. Default context.BULKHEAD_DEFAULT_MAX_REJ.
exc: the type of exceptions that the processor may raise and should be caught and passed to the processor callback. Default Exception.
async __cleanup__() None[source]
async execute(coro: collections.abc.Awaitable[Any]) None[source]

Queue a coroutine coro to be executed and execute a coroutine that may not be the same as coro. Bulkhead constraints are applied, and the return value is lost.

async shutdown(timeout: float | None = ...) list[collections.abc.Awaitable[Any]][source]

Shut down the bulkhead and return all incomplete tasks.

async wait_for_shutdown(timeout: float | None = ...) None[source]

Wait until the bulkhead enters the shutdown phase.

async wait_until_idle(timeout: float | None = ...) Literal[True][source]

Wait until no tasks are running.

property active_tasks: int

The number of currently running tasks.

property available_qslots: float

The number of available slots in the task queue. Usually an integer, but float('inf') if the queue is unbounded.

property available_slots: int

The number of slots available on the bulkhead.

property curr_qsize: int

The current size of the task queue.

property is_shutdown: bool

Whether the bulkhead is shutting down or has been shut down, possibly because too many executions were rejected.

property max_qsize: int

The maximum size of the task queue.

property rejected: int

The number of rejected executions so far.