asyncutils.pools

This module provides various pool implementations for concurrent execution and resource management in asynchronous contexts.

Classes

AdvancedPool

A pool implementation used to call sync functions concurrently in an async-first interface, managing event loop and threading resource shenanigans internally.

ConnectionPool

A pool managing resources in a simple and intuitive lock interface, with support for health checking, auto-recycling and dynamic rescaling.

Module Contents

class asyncutils.pools.AdvancedPool(max_workers: int = ..., min_workers: int = ..., qsize: int = ..., scaling: bool = ..., kill_at_exit: bool = ...)[source]

Bases: asyncutils.mixins.LoopContextMixin

A pool implementation used to call sync functions concurrently in an async-first interface, managing event loop and threading resource shenanigans internally.

Caution

Use instances of this class as async context managers only.

All arguments are optional:

  • max_workers controls the maximum number of workers (threads) that can run concurrently. Defaults to context.ADVANCED_POOL_DEFAULT_MAX_WORKERS.

  • min_workers determines the least number of threads there will be at any instance. Defaults to context.ADVANCED_POOL_DEFAULT_MIN_WORKERS.

  • qsize sets the maximum number of pending tasks that can be queued. If not passed, there is no limit.

  • scaling enables dynamic scaling of the pool based on workload. The default is True.

  • kill_at_exit determines whether the shut down when the context manager exits should be immediate. Default False.

async __cleanup__() None[source]
__del__() None[source]

Shut down the pool synchronously with a timeout of 0.2 seconds if needed. To avoid this blocking up the GC process, shut down the pool explicitly by using it as an async context manager.

async complete[T, **P](f: collections.abc.Callable[P, T], *a: P, **k: P) T[source]
async complete(f: collections.abc.Callable[Ellipsis, T], *a: object, _priority_: int, **k: object) T

Wait for a sync function to complete its execution by the pool asynchronously and get its result.

async doublestarmap[T](
f: collections.abc.Callable[Ellipsis, T],
it: asyncutils._internal.types.SupportsIteration[collections.abc.Mapping[str, Any]],
/,
priority: int = ...,
) list[T][source]

Like map(), but the iterable should yield dicts that are unpacked as keyword arguments to the function.

async drain() None[source]

Wait until all pending tasks have been completed.

async join() list[int | BaseException][source]

Return a list containing the nunmber of tasks completed by each worker in a random order or an exception if a worker thread has been terminated by an unhandled exception.

async map[R, T](
f: collections.abc.Callable[[R], T],
it: asyncutils._internal.types.SupportsIteration[R],
/,
*,
priority: int = ...,
strict: bool = ...,
) list[T][source]
async map(
f: collections.abc.Callable[[R, V], T],
i1: asyncutils._internal.types.SupportsIteration[R],
i2: asyncutils._internal.types.SupportsIteration[V],
/,
*,
priority: int = ...,
strict: bool = ...,
) list[T]
async map(
f: collections.abc.Callable[[R, V, U], T],
i1: asyncutils._internal.types.SupportsIteration[R],
i2: asyncutils._internal.types.SupportsIteration[V],
i3: asyncutils._internal.types.SupportsIteration[U],
/,
*,
priority: int = ...,
strict: bool = ...,
) list[T]
async map(
f: collections.abc.Callable[[R, V, U, S], T],
i1: asyncutils._internal.types.SupportsIteration[R],
i2: asyncutils._internal.types.SupportsIteration[V],
i3: asyncutils._internal.types.SupportsIteration[U],
i4: asyncutils._internal.types.SupportsIteration[S],
/,
*,
priority: int = ...,
strict: bool = ...,
) list[T]
async map(
f: collections.abc.Callable[Ellipsis, T],
/,
*its: asyncutils._internal.types.SupportsIteration[Any],
priority: int = ...,
strict: bool = ...,
) list[T]

Apply the function f to the items from the iterables in a concurrent manner, returning the results in a list. If strict is True, all iterables must have the same length.

raise_for_shutdown() None[source]

Raise PoolShutDown if the pool is shutting down or has been shut down.

async resize(min_workers: int, max_workers: int) None[source]

Adjust the lower and upper limits of the pool size, and destroy or spawn threads accordingly.

async shutdown(cancel_pending: bool = ..., idle_timeout: float | None = ..., total_timeout: float | None = ...) float[source]

Shut down the pool, waiting for all workers to finish their current tasks and exit. If cancel_pending is True, pending tasks that have not been picked up by workers will be cancelled immediately. If idle_timeout is passed, it will limit the time waiting to join the task queue.

async starmap[T, *Ts](
f: collections.abc.Callable[[*Ts], T],
it: asyncutils._internal.types.SupportsIteration[tuple[*Ts]],
/,
priority: int = ...,
) list[T][source]

Like map(), but the iterables should yield tuples that are unpacked as arguments to the function.

async starmap_with_kwds[T](
f: collections.abc.Callable[Ellipsis, T],
it: asyncutils._internal.types.SupportsIteration[tuple[asyncutils._internal.types.SupportsIteration[Any], collections.abc.Mapping[str, Any]]],
/,
priority: int = ...,
) list[T][source]

Like map(), but the iterable should yield tuples of the form (args, kwargs), where args is an iterable of positional arguments and kwargs is a mapping of keyword arguments.

async submit[T, **P](f: collections.abc.Callable[P, T], *a: P, **k: P) asyncio.Future[T][source]
async submit(f: collections.abc.Callable[Ellipsis, T], *a: object, _priority_: int, **k: object) asyncio.Future[T]

Schedule a sync function to be executed in a worker thread, waiting asynchronously until there is enough space in the internal queue from which workers fetch tasks if necessary, and get an async future to access its result.

submit_nowait[T, **P](f: collections.abc.Callable[P, T], *a: P, **k: P) asyncio.Future[T][source]
submit_nowait(f: collections.abc.Callable[Ellipsis, T], *a: object, _priority_: int, **k: object) asyncio.Future[T]

Schedule a sync function to be executed in a worker thread, raising asyncio.QueueFull if there is not enough space in the internal queue from which workers fetch tasks, and get an async future to access its result.

async wait_for_slot(timeout: float | None = ...) float[source]

Wait until there is a slot in the internal queue for pending tasks, and return the time spent waiting. If timeout is passed, it will limit the waiting time.

property completed: int

Return the total number of tasks completed by the pool.

property empty: bool

Return whether the internal queue for pending tasks is empty.

property full: bool

Return whether the internal queue for pending tasks is full, such that wait_for_slot() will block.

property idle: bool

Return whether all workers are idle, i.e. not executing any tasks currently. This also implies empty is True.

property qsize: int

Return the current number of pending tasks in the internal queue.

property uptime: float

Return the time in seconds since the pool started.

class asyncutils.pools.ConnectionPool[T, **P](
factory: collections.abc.Callable[P, T],
maxsize: int = ...,
minsize: int = ...,
maxlife: float = ...,
healthchecker: collections.abc.Callable[[T], bool] | None = ...,
cleaner: collections.abc.Callable[[T], None] | None = ...,
)[source]

Bases: asyncutils._internal.helpers.LoopMixinBase

A pool managing resources in a simple and intuitive lock interface, with support for health checking, auto-recycling and dynamic rescaling.

Caution

Use instances of this class as async context managers only.

All arguments except factory, which should be a callable returning a connection, are optional:

  • maxsize controls the maximum number of connections that can be created. Defaults to context.CONNECTION_POOL_DEFAULT_MAX_SIZE.

  • minsize determines the least number of connections that will be maintained at any instance. Defaults to context.CONNECTION_POOL_DEFAULT_MIN_SIZE.

  • maxlife sets the maximum lifetime of a connection in seconds, after which it will be recycled. Defaults to context.CONNECTION_POOL_DEFAULT_MAX_LIFE.

  • healthchecker is a function that takes a connection and returns whether it is healthy. If not passed, connections are assumed to always be healthy.

  • cleaner is a function that takes a connection and performs necessary cleanup before it is recycled. If not passed, no cleanup will be performed.

async __aenter__() Self[source]

Start and return the pool.

async __aexit__(exc_typ: asyncutils._internal.types.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]
async __aexit__(exc_typ: None, exc_val: None, exc_tb: None, /) None

Stop the pool, closing all connections.

_is_healthy(conn: T, /) bool[source]
async _maintain() None[source]
async acquire(*a: P.args, **k: P.kwargs) T[source]

Acquire a connection from the pool, waiting asynchronously until one is available if necessary. The arguments will be passed to the connection factory when creating new connections if needed.

async create_connection(*a: P.args, **k: P.kwargs) T[source]
async create_connection(*a: object, _executor_: asyncutils.config.Executor | None, **k: object) T
async release(conn: T, /, *a: P.args, **k: P.kwargs) None[source]

Release a connection back to the pool after use. The arguments will be passed to the connection factory when creating new connections if needed, and can be used by the cleaner for cleanup if necessary.

async start(
akgen: asyncutils._internal.types.SupportsIteration[tuple[collections.abc.Iterable[Any], collections.abc.Mapping[str, Any]]] | None = ...,
executor: asyncutils.config.Executor | None = ...,
) None[source]

Spawn the connections using the arguments from the parameter generator. The factory is run in the executor passed to make it async.

async stop() None[source]

Close all connections and stop the pool.

property available: int

The number of connections currently available for acquisition.

property currsize: int

The current number of connections managed by the pool, including those in use and those available.

property in_use: int

The number of connections currently in use.

property maxlife: float

The maximum lifetime of a connection in seconds.

property maxsize: int

The maximum number of connections that can be created by the pool.

property minsize: int

The minimum number of connections that will be maintained by the pool.