asyncutils.channels

Bridges between asynchronous consumers/subscribers and producers/publishers.

Classes

EventBus

Observable

A class representing an observable stream of data, that observers can subscribe to and receive notifications from. Observers must be hashable!

Rendezvous

A rendezvous object, emulating Golang's unbuffered channels.

Module Contents

class asyncutils.channels.EventBus(
name: str = ...,
*,
handler: collections.abc.Callable[[BaseException], None] = ...,
max_concurrent: int = ...,
tracking_stats: bool = ...,
)[source]

Bases: asyncutils.mixins.LoopContextMixin

A class abstracting the communication between notable events and asynchronous callbacks (an async auditing system), that can optionally be hooked up to sys.audit.
Has extensive telemetry and middleware support, allowing data to be processed in a pipeline and eventually passed to subscribers. Subscribers must be hashable!
A subscriber is a function that will be called every time data is published, with the corresponding data passed in. Publishing is thus the action of triggering these subscribers.
Wildcard subscribers should take the event type as the first argument, and the event data as the next; while specific subscribers should take the event data as the only argument.

Caution

Use instances as context managers only for proper setup and shutdown.

Changed in version 0.9.5: Support for repeated, unhashable middlewares and removal using an opaque cookie was added.

All arguments are optional:

  • name: The name of this event bus, which will appear in error messages.

  • handler: A function that takes an exception having occurred in a subscribers and handles it.

  • max_concurrent: The maximum number of concurrent callbacks; default context.EVENT_BUS_DEFAULT_MAX_CONCURRENT.

  • tracking_stats: Whether to remember the amount of published data to subscribers of each event type.

async __cleanup__() None[source]
async __setup__() None[source]
add_middleware(middleware: asyncutils._internal.types.Middleware) int[source]
Append a middleware to the back of the pipe of middlewares, and return a permanent cookie that can be passed to remove_middleware()
to invalidate it.
The middleware must take the event type as the first argument and the associated data as the second.
If the middleware does not recognize the event type, it should simply return the data immediately.
There is no protection in place against malicious middlewares besides the user’s abstraction.
It is preferred that the middleware be a coroutine function. Each middleware should be extremely optimized, such as through C extensions,
to avoid hindrance of the publishing.
When publishing occurs, the first middleware takes the initial data, does some processing asynchronously, and passes the modified data to
the second middleware, and so forth. Insertion order is maintained. This may be different from the typical meaning of a ‘middleware’.
The output of the final middleware is broadcast to each subscriber concurrently. They cannot see the initial data.
add_temp_middleware(middleware: asyncutils._internal.types.Middleware, until: asyncio.Future[Any]) None[source]

Add a middleware that should take effect until the future until is done, after which the result of the future will be treated as the result of the middleware on removal. No cookie is returned in this case.

audit_context() asyncutils._internal.types.DualContextManager[None][source]

Start receiving publications from and sending publications to sys.audit() upon entry and stop on exit. Use as a context manager.

auditor(event: str, args: tuple[object, Ellipsis], /) None
The auditor of the event bus. You probably don’t want to call this directly.
Not an instance method at runtime, just a function as an attribute of the instance.
clear(event_type: str) _weakrefset.WeakSet[asyncutils._internal.types.SpecificSubscriber] | None[source]
clear(
event_type: asyncutils._internal.types.WildcardType,
) _weakrefset.WeakSet[asyncutils._internal.types.WildcardSubscriber] | None
clear() None

Remove all subscribers for the event type and return them. If not passed, clear all subscribers but persist statistics unlike clear_all().

clear_all() None[source]

Remove all subscribers and clear statistics.

clear_stats() None[source]

Clear the event publication statistics.

clear_wildcards() _weakrefset.WeakSet[asyncutils._internal.types.WildcardSubscriber] | None[source]

Equivalent to bus.clear(EventBus.WILDCARD).

event_names() set[str][source]

A set of the current event types.

event_stream(
event_type: str,
*,
timeout: float | None = ...,
item_timeout: float | None = ...,
bufsize: int = ...,
) collections.abc.AsyncGenerator[Any][source]
event_stream(
*,
timeout: float | None = ...,
item_timeout: float | None = ...,
bufsize: int = ...,
) collections.abc.AsyncGenerator[tuple[str, Any]]
Open an event stream for the specified event type, that is, an async generator from which consumers can receive events and the corresponding data as they occur.
If event_type is not passed, the stream will include the event type in the output.
timeout, item_timeout and bufsize default to context.EVENT_BUS_STREAM_DEFAULT_TIMEOUT, context.EVENT_BUS_STREAM_DEFAULT_ITEM_TIMEOUT and context.EVENT_BUS_STREAM_DEFAULT_BUFFER_SIZE respectively.
async feed_event(data: object, /, *, timeout: float | None = ...) None[source]
async feed_event(event_type: str, data: object, /, *, timeout: float | None = ...) None

Feed the data for an event into the event stream, the queue for which is created if necessary, such that the event stream needs not be active.

get_event_stats() collections.defaultdict[str, int][source]

Return a copy of the stats, mapping event type to number of published events.

async handle_exception(e: BaseException) None[source]

Asynchronously handle an exception according to the handler initialization parameter, which can be a sync or async function.

has_subscribers(event_type: str | asyncutils._internal.types.WildcardType) bool[source]

Whether the event type has any subscribers.

is_auditing() bool[source]

Whether the event bus is connected to sys.audit().

is_subscribed(subscriber: asyncutils._internal.types.SpecificSubscriber, event_type: str = ...) bool[source]
is_subscribed(
subscriber: asyncutils._internal.types.WildcardSubscriber,
event_type: asyncutils._internal.types.WildcardType = ...,
) bool

Whether the callback is subscribed for the event type, or subscribed for any event type if event_type is not passed.

static is_valid_event_type(event_type: object) TypeGuard[str | asyncutils._internal.types.WildcardType][source]

Whether the object is a valid event type (i.e. a string or wildcard).

async publish(
event_type: str,
data: object = ...,
*,
wait: bool = ...,
safe: bool = ...,
timeout: float | None = ...,
chaperone: collections.abc.Callable[[ExceptionGroup | Exception], object] | None = ...,
) None[source]
Publish an event, that is, some data attached to an event type, to the subscribers involved, with timeout timeout.
Each subscriber for that event type and wildcard subscribers will be triggered by the publication, receiving the data after processing by the middlewares in order.
If wait is False (default True), don’t wait for the publication to complete.
If safe is False (default True), don’t wrap callbacks with proper error handling.
chaperone, if passed, should be a function processing non-severe exceptions (instances of Exception and ExceptionGroup) in the callbacks. Otherwise, these
exception( group)s are flattened and collected into an ExceptionGroup and propagated; the caller should be prepared to handle that case.
raise_for_shutdown() None[source]

Throw an exception if the event bus is shutting down.

remove_middleware(cookie: int, *, result: object = ..., strict: bool = ...) Any[source]
Remove a previously added middleware, via add_middleware() or add_middleware_once(), and return its result. Runs in O(1) time.
If strict is True and the middleware was never added, throw ValueError.
If the middleware has an associated future add_middleware_once() and it is done, return its result. If an exception was set, reraise it.
Otherwise, set its result to result and return it.
async shutdown(immediate: bool = ..., *, timeout: float | None = ..., preserve_stats: bool = ...) None[source]
Gracefully shut down the event bus.
After the shutdown, publications fail fast and middlewares are cleared.
This waits for as many subscriber callbacks to complete as possible, within timeout seconds if specified.
If immediate is True, getters for the queue for the event stream will error immediately.
If preserve_stats is True, the event publication statistics will be saved and accessible with get_event_stats().
start_audit() None[source]

Connect the audit hook of the bus to sys.audit(), creating if necessary. Incurs overhead. Use with caution.

start_tracking() None[source]

Start tracking event publication statistics (number of publications under each event type).

stop_audit() None[source]

Disconnect the audit hook of the bus. Note that it is currently impossible to actually remove an audit hook, so this function just deactivates it.

stop_tracking(ret_stats: Literal[False] = ...) None[source]
stop_tracking(ret_stats: Literal[True]) collections.defaultdict[str, int]

Stop tracking event publication statistics. If ret_stats is True, return a dictionary of the stats up to that point, with keys corresponding to event types and values the number of publications.

subscribe[C: asyncutils._internal.types.SpecificSubscriber](subscriber: C, /, event_type: str) C[source]
subscribe(subscriber: C, /, event_type: asyncutils._internal.types.WildcardType = ...) C

Add a subscriber to the event bus under the specified event type (if unspecified, add as wildcard). Return the subscriber to allow usage as a decorator.

subscribe_to[C: asyncutils._internal.types.SpecificSubscriber](event_type: str) collections.abc.Callable[[C], C][source]
subscribe_to(event_type: asyncutils._internal.types.WildcardType) collections.abc.Callable[[C], C]

A decorator factory for functions to subscribe to this event bus under the specified event type.

async subscribe_until[T](
fut: asyncio.Future[T],
subscriber: asyncutils._internal.types.SpecificSubscriber,
event_type: str,
*,
till_permanent: float | None = ...,
) asyncio.Task[T][source]
async subscribe_until(
fut: asyncio.Future[T],
subscriber: asyncutils._internal.types.WildcardSubscriber,
event_type: asyncutils._internal.types.WildcardType = ...,
*,
till_permanent: float | None = ...,
) asyncio.Task[T]
Add the subscriber under the event type (as a wildcard if event_type is WILDCARD or not passed) and return a task.
The subscriber is removed once fut completes, and its result returned through the returned task.
After till_permanent seconds elapse (if passed), the task errors and the subscriber is left under that event type.
subscriber_count(event_type: str | asyncutils._internal.types.WildcardType) int[source]

The number of subscribers for that event type.

subscribers_for(event_type: str) _weakrefset.WeakSet[asyncutils._internal.types.SpecificSubscriber][source]
subscribers_for(
event_type: asyncutils._internal.types.WildcardType,
) _weakrefset.WeakSet[asyncutils._internal.types.WildcardSubscriber]

A weakref.WeakSet of subscribers for the event type.

sync_start_publish(
event_type: str,
data: object = ...,
*,
safe: bool = ...,
timeout: float | None = ...,
chaperone: collections.abc.Callable[[ExceptionGroup | Exception], object] | None = ...,
) None[source]

Begin a publication synchronously. Parameters are as in publish(), below.

tracking_context(
stats_receiver: asyncio.Future[collections.abc.Mapping[str, int]] | None = ...,
) asyncutils._internal.types.DualContextManager[None][source]

Context manager, under which stats are tracked and finally sent to the stats_receiver future.

unsubscribe(subscriber: asyncutils._internal.types.SpecificSubscriber, /, event_type: str) bool[source]
unsubscribe(
subscriber: asyncutils._internal.types.WildcardSubscriber,
/,
event_type: asyncutils._internal.types.WildcardType = ...,
) bool

Remove a subscriber from the event bus under the event type (if unspecified, remove from wildcards) and return whether the removal occurred (i.e. the subscriber was initially present).

async wait_for_event(
event_type: str,
*,
timeout: bool | None = ...,
condition: collections.abc.Callable[[Any], object] = ...,
) asyncio.Task[Any][source]

Wait for an event of the specified event type that satisfies the condition to occur.

Note

The function completes once the subscription has registered and returns a task, which will be cancelled on timeout.

WILDCARD: asyncutils._internal.types.WildcardType

Sentinel representing the event type of subscribers that accept any event name.

property active_tasks: int

The number of callbacks occurring at this moment.

property auditing: bool

Get-set property for is_auditing(). When changed, connect or disconnect the underlying audit hook accordingly.

property name: str

The name of the event bus.

property stream_queue: asyncio.Queue[tuple[str, Any]] | asyncio.Queue[Any]
The asynchronous queue to which events are output by event_stream().
The items in the queue are tuples (event_type, data) if the event type was not specified in the creation of the event stream,
otherwise just the data attached to each event of that type.
property total_subscribers: int

The total number of subscribers for any event type.

property wildcard_count: int

The number of wildcard subscribers under this event bus.

property wildcards: _weakrefset.WeakSet[asyncutils._internal.types.WildcardSubscriber]

All the wildcard subscribers for this event bus.

class asyncutils.channels.Observable[**P](init_observers: collections.abc.Iterable[asyncutils._internal.types.Observer[P]], maxsize: int | None = ...)[source]
class asyncutils.channels.Observable(*, maxsize: int | None = ...)

Bases: asyncutils.mixins.LoopContextMixin

A class representing an observable stream of data, that observers can subscribe to and receive notifications from. Observers must be hashable!

Caution

Use instances of this class as context managers only.

Instantiate the observable with the initial observers taken from the iterable init_observers. If maxsize is None, accumulation of notifications is disabled; otherwise, it is the maximum size of the queue of notifications (default is no maximum).

__aiter__() collections.abc.AsyncGenerator[asyncutils._internal.types.Observer[P]][source]

Return an async generator over the observers.

async __cleanup__() None[source]
__iter__() collections.abc.Iterator[asyncutils._internal.types.Observer[P]][source]

Iterate over the current observers. When this iterator is active, no subscriptions or unsubscriptions can be done.

async __setup__() None[source]
at_change(key: collections.abc.Callable[P, object] = ..., ret_exc: bool = ...) Self[source]

Return a new observable that will only emit notifications when the value returned by key changes.

buffer(count: int, ret_exc: bool = ...) Self[source]

Return a new observable that will buffer notifications and emit them concurrently in batches of size count.

debounce(delay: float, ret_exc: bool = ...) Self[source]

Return a new observable that will only emit the latest notification after delay seconds have passed since the last notification.

filter(pred: collections.abc.Callable[P, bool], ret_exc: bool = ...) Self[source]

Return a new observable emitting the notifications of this observable to its observers only when the parameters, starred and passed to pred, evaluate to a true value.

fork(ret_exc: bool = ...) Self[source]

Return a new observable that will emit all notifications to its observers.

async handle_notifications() None[source]

Execute the queued notifications one by one and wait for each to complete.

async handle_unsubscriptions() None[source]

Perform the unsubscriptions as requested by unsubscribe_eventually().

map(
transform: collections.abc.Callable[P, tuple[collections.abc.Iterable[object], collections.abc.Mapping[str, object]]],
ret_exc: bool = ...,
) Observable[Ellipsis][source]

Return a new observable transforming the parameters of notifications from this observable by transform.

merge(ret_exc: bool = ...) Self[source]

Return a new observable that will emit notifications from all the observables in obs.

async notify(*a: P.args, **k: P.kwargs) None[source]
async notify(*a: object, _ret_exc_: bool = ..., **k: object) None

Notify the observers with the parameters passed in. If another notification is in progress, it will be queued to be completed by that notification. If _ret_exc_ is True (default False), exceptions occurring in any observer is not propagated.

notify_sequential(*a: P.args, **k: P.kwargs) collections.abc.AsyncGenerator[Any][source]
notify_sequential(
*a: object,
_silent_: bool = ...,
_persistent_: bool = ...,
**k: object,
) collections.abc.AsyncGenerator[Any]

Version of notify() that doesn’t attempt to trigger the observers in parallel.

ntimes(observer: asyncutils._internal.types.Observer[P], n: int = ...) asyncutils._internal.types.SubscriptionRV[source]

Add an observer immediately and automatically have it removed after n notifications. n defaults to context.OBSERVABLE_DEFAULT_NTIMES_N.

async restart_accumulation(flush: bool = ...) None[source]

Complete all notifications if flush is True, then restart notification accumulation.

start_accumulation() bool[source]

Begin accumulation of notifications and return True, or return False if accumulation is already occurring.

async subscribe(observer: asyncutils._internal.types.Observer[P]) asyncutils._internal.types.SubscriptionRV[source]

‘Call wait_until_idle(), then add an observer and return a subscription object that can be used to remove it.

subscribe_nowait(observer: asyncutils._internal.types.Observer[P]) asyncutils._internal.types.SubscriptionRV[source]

Add an observer without waiting for the observable to be idle.

subscribe_syncf(observer: asyncutils._internal.types.Observer[P]) asyncutils._internal.types.SubscriptionRV[source]

Subscribe a synchronous observer by converting it to async in an executor.

throttle(interval: float, ret_exc: bool = ...) Self[source]

Return a new observable that will emit notifications at most once every interval seconds.

async unsubscribe(observer: asyncutils._internal.types.Observer[P], strict: bool = ...) None[source]

Call wait_until_idle(), then remove the observer. If strict is True, assert that the observer was indeed subscribed.

unsubscribe_eventually(observer: asyncutils._internal.types.Observer[P], asap: bool = ...) None[source]

Note that the observer is to be removed at some point in the future. If asap is True and there is no notification running, the observer is removed immediately.

unsubscribe_nowait(observer: asyncutils._internal.types.Observer[P], strict: bool = ...) None[source]

Remove the observer immediately even if a notification is occurring. If strict is True, assert that the observer was indeed subscribed.

async wait_for_next(timeout: float | None = ..., strict: bool = ...) tuple[tuple[Any, Ellipsis], dict[str, Any]][source]

Wait for the next notification to occur by adding a temporary subscriber and return its parameters as a tuple (args, kwargs). If strict is True, assert that another operation did not remove the subscriber prematurely.

async wait_until_idle(timeout: float | None = ...) None[source]

Wait until the observable is idle, that is, until all notifications have completed.

property idle: bool

Whether the observable is idle, that is, not currently notifying observers.

property notifying: bool

The opposite of idle.

class asyncutils.channels.Rendezvous[T](*, loop: asyncio.AbstractEventLoop = ..., lock: asyncio.Lock = ...)[source]

A rendezvous object, emulating Golang’s unbuffered channels.

Instantiate a rendezvous object, which will be maintained by a background task cleaning up its done getters and putters periodically,
according to context.RENDEZVOUS_MAINTENANCE_INTERVAL. If loop is not passed, the running event loop is used. If there is no
running event loop, one is created and set.
__length_hint__() int[source]

Approximate number of operations pending. Implemented for operator.length_hint().

cleanup() None[source]

Clean up the internal getter and putter stacks.

async exchange(put_val: T, /, *, timeout: float | None = ..., asap: bool = ...) T[source]
Put in a value to the rendezvous and get and return a different value gotten from it.
If asap is True, return once a value is available, without necessarily having completed the put.
async get(default: T | None = ..., *, timeout: float | None = ...) T[source]
Get a value from the rendezvous, blocking until available unless default is passed and timeout is not, in which case the default is returned if a value is not immediately available.
If default is not passed and timeout is reached, the TimeoutError is propagated. In any case, the get is cancelled at timeout.
async put(value: T, /, *, timeout: float | None = ...) bool[source]

Like raising_put(), but returns a boolean representing if the put succeeded. The recommended interface.

async raising_put(value: T, /, *, timeout: float) None[source]
Put in value to the rendezvous, blocking until it is gotten or timeout is reached, at which point TimeoutError is raised and the put cancelled.
Also be prepared to intercept or reraise CancelledError resulting from reset.
async reset() None[source]
Hard reset the rendezvous. Call from a monitoring task when a deadlock appears to have occurred.
This cancels all pending gets, puts and exchanges; their callers will see CancelledError.
state_snapshot() asyncutils._internal.types.StateSnapshot[source]

Trigger a cleanup and return a snapshot of the current state of the object.