asyncutils.pools¶
This module provides various pool implementations for concurrent execution and resource management in asynchronous contexts.
Classes¶
A pool implementation used to call sync functions concurrently in an async-first interface, managing event loop and threading resource shenanigans internally. |
|
A pool managing resources in a simple and intuitive lock interface, with support for health checking, auto-recycling and dynamic rescaling. |
Module Contents¶
- class asyncutils.pools.AdvancedPool(max_workers: int = ..., min_workers: int = ..., qsize: int = ..., scaling: bool = ..., kill_at_exit: bool = ...)[source]¶
Bases:
asyncutils.mixins.LoopContextMixinA pool implementation used to call sync functions concurrently in an async-first interface, managing event loop and threading resource shenanigans internally.
Caution
Use instances of this class as async context managers only.
All arguments are optional:
max_workerscontrols the maximum number of workers (threads) that can run concurrently. Defaults tocontext.ADVANCED_POOL_DEFAULT_MAX_WORKERS.min_workersdetermines the least number of threads there will be at any instance. Defaults tocontext.ADVANCED_POOL_DEFAULT_MIN_WORKERS.qsizesets the maximum number of pending tasks that can be queued. If not passed, there is no limit.scalingenables dynamic scaling of the pool based on workload. The default isTrue.kill_at_exitdetermines whether the shut down when the context manager exits should be immediate. DefaultFalse.
- __del__() None[source]¶
Shut down the pool synchronously with a timeout of 0.2 seconds if needed. To avoid this blocking up the GC process, shut down the pool explicitly by using it as an async context manager.
- async complete[T, **P](f: collections.abc.Callable[P, T], *a: P, **k: P) T[source]¶
- async complete(f: collections.abc.Callable[Ellipsis, T], *a: object, _priority_: int, **k: object) T
Wait for a sync function to complete its execution by the pool asynchronously and get its result.
- async doublestarmap[T](
- f: collections.abc.Callable[Ellipsis, T],
- it: asyncutils._internal.types.SupportsIteration[collections.abc.Mapping[str, Any]],
- /,
- priority: int = ...,
Like
map(), but the iterable should yield dicts that are unpacked as keyword arguments to the function.
- async join() list[int | BaseException][source]¶
Return a list containing the nunmber of tasks completed by each worker in a random order or an exception if a worker thread has been terminated by an unhandled exception.
- async map[R, T](
- f: collections.abc.Callable[[R], T],
- it: asyncutils._internal.types.SupportsIteration[R],
- /,
- *,
- priority: int = ...,
- strict: bool = ...,
- async map(
- f: collections.abc.Callable[[R, V], T],
- i1: asyncutils._internal.types.SupportsIteration[R],
- i2: asyncutils._internal.types.SupportsIteration[V],
- /,
- *,
- priority: int = ...,
- strict: bool = ...,
- async map(
- f: collections.abc.Callable[[R, V, U], T],
- i1: asyncutils._internal.types.SupportsIteration[R],
- i2: asyncutils._internal.types.SupportsIteration[V],
- i3: asyncutils._internal.types.SupportsIteration[U],
- /,
- *,
- priority: int = ...,
- strict: bool = ...,
- async map(
- f: collections.abc.Callable[[R, V, U, S], T],
- i1: asyncutils._internal.types.SupportsIteration[R],
- i2: asyncutils._internal.types.SupportsIteration[V],
- i3: asyncutils._internal.types.SupportsIteration[U],
- i4: asyncutils._internal.types.SupportsIteration[S],
- /,
- *,
- priority: int = ...,
- strict: bool = ...,
- async map(
- f: collections.abc.Callable[Ellipsis, T],
- /,
- *its: asyncutils._internal.types.SupportsIteration[Any],
- priority: int = ...,
- strict: bool = ...,
Apply the function
fto the items from the iterables in a concurrent manner, returning the results in a list. IfstrictisTrue, all iterables must have the same length.
- raise_for_shutdown() None[source]¶
Raise
PoolShutDownif the pool is shutting down or has been shut down.
- async resize(min_workers: int, max_workers: int) None[source]¶
Adjust the lower and upper limits of the pool size, and destroy or spawn threads accordingly.
- async shutdown(cancel_pending: bool = ..., idle_timeout: float | None = ..., total_timeout: float | None = ...) float[source]¶
Shut down the pool, waiting for all workers to finish their current tasks and exit. If
cancel_pendingisTrue, pending tasks that have not been picked up by workers will be cancelled immediately. Ifidle_timeoutis passed, it will limit the time waiting to join the task queue.
- async starmap[T, *Ts](
- f: collections.abc.Callable[[*Ts], T],
- it: asyncutils._internal.types.SupportsIteration[tuple[*Ts]],
- /,
- priority: int = ...,
Like
map(), but the iterables should yield tuples that are unpacked as arguments to the function.
- async starmap_with_kwds[T](
- f: collections.abc.Callable[Ellipsis, T],
- it: asyncutils._internal.types.SupportsIteration[tuple[asyncutils._internal.types.SupportsIteration[Any], collections.abc.Mapping[str, Any]]],
- /,
- priority: int = ...,
Like
map(), but the iterable should yield tuples of the form(args, kwargs), whereargsis an iterable of positional arguments andkwargsis a mapping of keyword arguments.
- async submit[T, **P](f: collections.abc.Callable[P, T], *a: P, **k: P) asyncio.Future[T][source]¶
- async submit(f: collections.abc.Callable[Ellipsis, T], *a: object, _priority_: int, **k: object) asyncio.Future[T]
Schedule a sync function to be executed in a worker thread, waiting asynchronously until there is enough space in the internal queue from which workers fetch tasks if necessary, and get an async future to access its result.
- submit_nowait[T, **P](f: collections.abc.Callable[P, T], *a: P, **k: P) asyncio.Future[T][source]¶
- submit_nowait(f: collections.abc.Callable[Ellipsis, T], *a: object, _priority_: int, **k: object) asyncio.Future[T]
Schedule a sync function to be executed in a worker thread, raising
asyncio.QueueFullif there is not enough space in the internal queue from which workers fetch tasks, and get an async future to access its result.
- async wait_for_slot(timeout: float | None = ...) float[source]¶
Wait until there is a slot in the internal queue for pending tasks, and return the time spent waiting. If
timeoutis passed, it will limit the waiting time.
- property full: bool¶
Return whether the internal queue for pending tasks is full, such that
wait_for_slot()will block.
- class asyncutils.pools.ConnectionPool[T, **P](
- factory: collections.abc.Callable[P, T],
- maxsize: int = ...,
- minsize: int = ...,
- maxlife: float = ...,
- healthchecker: collections.abc.Callable[[T], bool] | None = ...,
- cleaner: collections.abc.Callable[[T], None] | None = ...,
Bases:
asyncutils._internal.helpers.LoopMixinBaseA pool managing resources in a simple and intuitive lock interface, with support for health checking, auto-recycling and dynamic rescaling.
Caution
Use instances of this class as async context managers only.
All arguments except
factory, which should be a callable returning a connection, are optional:maxsizecontrols the maximum number of connections that can be created. Defaults tocontext.CONNECTION_POOL_DEFAULT_MAX_SIZE.minsizedetermines the least number of connections that will be maintained at any instance. Defaults tocontext.CONNECTION_POOL_DEFAULT_MIN_SIZE.maxlifesets the maximum lifetime of a connection in seconds, after which it will be recycled. Defaults tocontext.CONNECTION_POOL_DEFAULT_MAX_LIFE.healthcheckeris a function that takes a connection and returns whether it is healthy. If not passed, connections are assumed to always be healthy.cleaneris a function that takes a connection and performs necessary cleanup before it is recycled. If not passed, no cleanup will be performed.
- async __aexit__(exc_typ: asyncutils._internal.types.ExcType, exc_val: BaseException, exc_tb: types.TracebackType, /) None[source]¶
- async __aexit__(exc_typ: None, exc_val: None, exc_tb: None, /) None
Stop the pool, closing all connections.
- async acquire(*a: P.args, **k: P.kwargs) T[source]¶
Acquire a connection from the pool, waiting asynchronously until one is available if necessary. The arguments will be passed to the connection factory when creating new connections if needed.
- async create_connection(*a: P.args, **k: P.kwargs) T[source]¶
- async create_connection(*a: object, _executor_: asyncutils.config.Executor | None, **k: object) T
- async release(conn: T, /, *a: P.args, **k: P.kwargs) None[source]¶
Release a connection back to the pool after use. The arguments will be passed to the connection factory when creating new connections if needed, and can be used by the cleaner for cleanup if necessary.
- async start(
- akgen: asyncutils._internal.types.SupportsIteration[tuple[collections.abc.Iterable[Any], collections.abc.Mapping[str, Any]]] | None = ...,
- executor: asyncutils.config.Executor | None = ...,
Spawn the connections using the arguments from the parameter generator. The factory is run in the executor passed to make it async.