asyncutils.channels¶
Bridges between asynchronous consumers/subscribers and producers/publishers.
Classes¶
A class representing an observable stream of data, that observers can subscribe to and receive notifications from. Observers must be hashable! |
|
A rendezvous object, emulating Golang's unbuffered channels. |
Module Contents¶
- class asyncutils.channels.EventBus(
- name: str = ...,
- *,
- handler: collections.abc.Callable[[BaseException], None] = ...,
- max_concurrent: int = ...,
- tracking_stats: bool = ...,
Bases:
asyncutils.mixins.LoopContextMixinA class abstracting the communication between notable events and asynchronous callbacks (an async auditing system), that can optionally be hooked up to sys.audit.Has extensive telemetry and middleware support, allowing data to be processed in a pipeline and eventually passed to subscribers. Subscribers must be hashable!A subscriber is a function that will be called every time data is published, with the corresponding data passed in. Publishing is thus the action of triggering these subscribers.Wildcard subscribers should take the event type as the first argument, and the event data as the next; while specific subscribers should take the event data as the only argument.Caution
Use instances as context managers only for proper setup and shutdown.
Changed in version 0.9.5: Support for repeated, unhashable middlewares and removal using an opaque cookie was added.
All arguments are optional:
name: The name of this event bus, which will appear in error messages.handler: A function that takes an exception having occurred in a subscribers and handles it.max_concurrent: The maximum number of concurrent callbacks; defaultcontext.EVENT_BUS_DEFAULT_MAX_CONCURRENT.tracking_stats: Whether to remember the amount of published data to subscribers of each event type.
- add_middleware(middleware: asyncutils._internal.types.Middleware) int[source]¶
- Append a middleware to the back of the pipe of middlewares, and return a permanent cookie that can be passed to
remove_middleware()to invalidate it.The middleware must take the event type as the first argument and the associated data as the second.If the middleware does not recognize the event type, it should simply return the data immediately.There is no protection in place against malicious middlewares besides the user’s abstraction.It is preferred that the middleware be a coroutine function. Each middleware should be extremely optimized, such as through C extensions,to avoid hindrance of the publishing.When publishing occurs, the first middleware takes the initial data, does some processing asynchronously, and passes the modified data tothe second middleware, and so forth. Insertion order is maintained. This may be different from the typical meaning of a ‘middleware’.The output of the final middleware is broadcast to each subscriber concurrently. They cannot see the initial data.
- add_temp_middleware(middleware: asyncutils._internal.types.Middleware, until: asyncio.Future[Any]) None[source]¶
Add a middleware that should take effect until the future
untilis done, after which the result of the future will be treated as the result of the middleware on removal. No cookie is returned in this case.
- audit_context() asyncutils._internal.types.DualContextManager[None][source]¶
Start receiving publications from and sending publications to
sys.audit()upon entry and stop on exit. Use as a context manager.
- auditor(event: str, args: tuple[object, Ellipsis], /) None¶
- The auditor of the event bus. You probably don’t want to call this directly.Not an instance method at runtime, just a function as an attribute of the instance.
- clear(event_type: str) _weakrefset.WeakSet[asyncutils._internal.types.SpecificSubscriber] | None[source]¶
- clear(
- event_type: asyncutils._internal.types.WildcardType,
- clear() None
Remove all subscribers for the event type and return them. If not passed, clear all subscribers but persist statistics unlike
clear_all().
- clear_wildcards() _weakrefset.WeakSet[asyncutils._internal.types.WildcardSubscriber] | None[source]¶
Equivalent to
bus.clear(EventBus.WILDCARD).
- event_stream(
- event_type: str,
- *,
- timeout: float | None = ...,
- item_timeout: float | None = ...,
- bufsize: int = ...,
- event_stream( ) collections.abc.AsyncGenerator[tuple[str, Any]]
- Open an event stream for the specified event type, that is, an async generator from which consumers can receive events and the corresponding data as they occur.If
event_typeis not passed, the stream will include the event type in the output.timeout,item_timeoutandbufsizedefault tocontext.EVENT_BUS_STREAM_DEFAULT_TIMEOUT,context.EVENT_BUS_STREAM_DEFAULT_ITEM_TIMEOUTandcontext.EVENT_BUS_STREAM_DEFAULT_BUFFER_SIZErespectively.
- async feed_event(data: object, /, *, timeout: float | None = ...) None[source]¶
- async feed_event(event_type: str, data: object, /, *, timeout: float | None = ...) None
Feed the data for an event into the event stream, the queue for which is created if necessary, such that the event stream needs not be active.
- get_event_stats() collections.defaultdict[str, int][source]¶
Return a copy of the stats, mapping event type to number of published events.
- async handle_exception(e: BaseException) None[source]¶
Asynchronously handle an exception according to the
handlerinitialization parameter, which can be a sync or async function.
- has_subscribers(event_type: str | asyncutils._internal.types.WildcardType) bool[source]¶
Whether the event type has any subscribers.
- is_auditing() bool[source]¶
Whether the event bus is connected to
sys.audit().
- is_subscribed(subscriber: asyncutils._internal.types.SpecificSubscriber, event_type: str = ...) bool[source]¶
- is_subscribed(
- subscriber: asyncutils._internal.types.WildcardSubscriber,
- event_type: asyncutils._internal.types.WildcardType = ...,
Whether the callback is subscribed for the event type, or subscribed for any event type if event_type is not passed.
- static is_valid_event_type(event_type: object) TypeGuard[str | asyncutils._internal.types.WildcardType][source]¶
Whether the object is a valid event type (i.e. a string or wildcard).
- async publish(
- event_type: str,
- data: object = ...,
- *,
- wait: bool = ...,
- safe: bool = ...,
- timeout: float | None = ...,
- chaperone: collections.abc.Callable[[ExceptionGroup | Exception], object] | None = ...,
- Publish an event, that is, some data attached to an event type, to the subscribers involved, with timeout
timeout.Each subscriber for that event type and wildcard subscribers will be triggered by the publication, receiving the data after processing by the middlewares in order.IfwaitisFalse(defaultTrue), don’t wait for the publication to complete.IfsafeisFalse(defaultTrue), don’t wrap callbacks with proper error handling.chaperone, if passed, should be a function processing non-severe exceptions (instances ofExceptionandExceptionGroup) in the callbacks. Otherwise, theseexception( group)s are flattened and collected into anExceptionGroupand propagated; the caller should be prepared to handle that case.
- remove_middleware(cookie: int, *, result: object = ..., strict: bool = ...) Any[source]¶
- Remove a previously added middleware, via
add_middleware()oradd_middleware_once(), and return its result. Runs in O(1) time.If the middleware has an associated futureadd_middleware_once()and it is done, return its result. If an exception was set, reraise it.Otherwise, set its result toresultand return it.
- async shutdown(immediate: bool = ..., *, timeout: float | None = ..., preserve_stats: bool = ...) None[source]¶
- Gracefully shut down the event bus.After the shutdown, publications fail fast and middlewares are cleared.This waits for as many subscriber callbacks to complete as possible, within
timeoutseconds if specified.IfimmediateisTrue, getters for the queue for the event stream will error immediately.Ifpreserve_statsisTrue, the event publication statistics will be saved and accessible withget_event_stats().
- start_audit() None[source]¶
Connect the audit hook of the bus to
sys.audit(), creating if necessary. Incurs overhead. Use with caution.
- start_tracking() None[source]¶
Start tracking event publication statistics (number of publications under each event type).
- stop_audit() None[source]¶
Disconnect the audit hook of the bus. Note that it is currently impossible to actually remove an audit hook, so this function just deactivates it.
- stop_tracking(ret_stats: Literal[False] = ...) None[source]¶
- stop_tracking(ret_stats: Literal[True]) collections.defaultdict[str, int]
Stop tracking event publication statistics. If
ret_statsisTrue, return a dictionary of the stats up to that point, with keys corresponding to event types and values the number of publications.
- subscribe[C: asyncutils._internal.types.SpecificSubscriber](subscriber: C, /, event_type: str) C[source]¶
- subscribe(subscriber: C, /, event_type: asyncutils._internal.types.WildcardType = ...) C
Add a subscriber to the event bus under the specified event type (if unspecified, add as wildcard). Return the subscriber to allow usage as a decorator.
- subscribe_to[C: asyncutils._internal.types.SpecificSubscriber](event_type: str) collections.abc.Callable[[C], C][source]¶
- subscribe_to(event_type: asyncutils._internal.types.WildcardType) collections.abc.Callable[[C], C]
A decorator factory for functions to subscribe to this event bus under the specified event type.
- async subscribe_until[T](
- fut: asyncio.Future[T],
- subscriber: asyncutils._internal.types.SpecificSubscriber,
- event_type: str,
- *,
- till_permanent: float | None = ...,
- async subscribe_until(
- fut: asyncio.Future[T],
- subscriber: asyncutils._internal.types.WildcardSubscriber,
- event_type: asyncutils._internal.types.WildcardType = ...,
- *,
- till_permanent: float | None = ...,
- Add the subscriber under the event type (as a wildcard if
event_typeisWILDCARDor not passed) and return a task.The subscriber is removed oncefutcompletes, and its result returned through the returned task.Aftertill_permanentseconds elapse (if passed), the task errors and the subscriber is left under that event type.
- subscriber_count(event_type: str | asyncutils._internal.types.WildcardType) int[source]¶
The number of subscribers for that event type.
- subscribers_for(event_type: str) _weakrefset.WeakSet[asyncutils._internal.types.SpecificSubscriber][source]¶
- subscribers_for(
- event_type: asyncutils._internal.types.WildcardType,
A
weakref.WeakSetof subscribers for the event type.
- sync_start_publish(
- event_type: str,
- data: object = ...,
- *,
- safe: bool = ...,
- timeout: float | None = ...,
- chaperone: collections.abc.Callable[[ExceptionGroup | Exception], object] | None = ...,
Begin a publication synchronously. Parameters are as in
publish(), below.
- tracking_context(
- stats_receiver: asyncio.Future[collections.abc.Mapping[str, int]] | None = ...,
Context manager, under which stats are tracked and finally sent to the stats_receiver future.
- unsubscribe(subscriber: asyncutils._internal.types.SpecificSubscriber, /, event_type: str) bool[source]¶
- unsubscribe(
- subscriber: asyncutils._internal.types.WildcardSubscriber,
- /,
- event_type: asyncutils._internal.types.WildcardType = ...,
Remove a subscriber from the event bus under the event type (if unspecified, remove from wildcards) and return whether the removal occurred (i.e. the subscriber was initially present).
- async wait_for_event(
- event_type: str,
- *,
- timeout: bool | None = ...,
- condition: collections.abc.Callable[[Any], object] = ...,
Wait for an event of the specified event type that satisfies the condition to occur.
Note
The function completes once the subscription has registered and returns a task, which will be cancelled on timeout.
- WILDCARD: asyncutils._internal.types.WildcardType¶
Sentinel representing the event type of subscribers that accept any event name.
- property auditing: bool¶
Get-set property for
is_auditing(). When changed, connect or disconnect the underlying audit hook accordingly.
- property stream_queue: asyncio.Queue[tuple[str, Any]] | asyncio.Queue[Any]¶
- The asynchronous queue to which events are output by
event_stream().The items in the queue are tuples(event_type, data)if the event type was not specified in the creation of the event stream,otherwise just the data attached to each event of that type.
- property wildcards: _weakrefset.WeakSet[asyncutils._internal.types.WildcardSubscriber]¶
All the wildcard subscribers for this event bus.
- class asyncutils.channels.Observable[**P](init_observers: collections.abc.Iterable[asyncutils._internal.types.Observer[P]], maxsize: int | None = ...)[source]¶
- class asyncutils.channels.Observable(*, maxsize: int | None = ...)
Bases:
asyncutils.mixins.LoopContextMixinA class representing an observable stream of data, that observers can subscribe to and receive notifications from. Observers must be hashable!
Caution
Use instances of this class as context managers only.
Instantiate the observable with the initial observers taken from the iterable
init_observers. IfmaxsizeisNone, accumulation of notifications is disabled; otherwise, it is the maximum size of the queue of notifications (default is no maximum).- __aiter__() collections.abc.AsyncGenerator[asyncutils._internal.types.Observer[P]][source]¶
Return an async generator over the observers.
- __iter__() collections.abc.Iterator[asyncutils._internal.types.Observer[P]][source]¶
Iterate over the current observers. When this iterator is active, no subscriptions or unsubscriptions can be done.
- at_change(key: collections.abc.Callable[P, object] = ..., ret_exc: bool = ...) Self[source]¶
Return a new observable that will only emit notifications when the value returned by
keychanges.
- buffer(count: int, ret_exc: bool = ...) Self[source]¶
Return a new observable that will buffer notifications and emit them concurrently in batches of size
count.
- debounce(delay: float, ret_exc: bool = ...) Self[source]¶
Return a new observable that will only emit the latest notification after
delayseconds have passed since the last notification.
- filter(pred: collections.abc.Callable[P, bool], ret_exc: bool = ...) Self[source]¶
Return a new observable emitting the notifications of this observable to its observers only when the parameters, starred and passed to
pred, evaluate to a true value.
- fork(ret_exc: bool = ...) Self[source]¶
Return a new observable that will emit all notifications to its observers.
- async handle_notifications() None[source]¶
Execute the queued notifications one by one and wait for each to complete.
- async handle_unsubscriptions() None[source]¶
Perform the unsubscriptions as requested by
unsubscribe_eventually().
- map(
- transform: collections.abc.Callable[P, tuple[collections.abc.Iterable[object], collections.abc.Mapping[str, object]]],
- ret_exc: bool = ...,
Return a new observable transforming the parameters of notifications from this observable by
transform.
- merge(ret_exc: bool = ...) Self[source]¶
Return a new observable that will emit notifications from all the observables in
obs.
- async notify(*a: P.args, **k: P.kwargs) None[source]¶
- async notify(*a: object, _ret_exc_: bool = ..., **k: object) None
Notify the observers with the parameters passed in. If another notification is in progress, it will be queued to be completed by that notification. If
_ret_exc_isTrue(defaultFalse), exceptions occurring in any observer is not propagated.
- notify_sequential(*a: P.args, **k: P.kwargs) collections.abc.AsyncGenerator[Any][source]¶
- notify_sequential( ) collections.abc.AsyncGenerator[Any]
Version of
notify()that doesn’t attempt to trigger the observers in parallel.
- ntimes(observer: asyncutils._internal.types.Observer[P], n: int = ...) asyncutils._internal.types.SubscriptionRV[source]¶
Add an observer immediately and automatically have it removed after
nnotifications.ndefaults tocontext.OBSERVABLE_DEFAULT_NTIMES_N.
- async restart_accumulation(flush: bool = ...) None[source]¶
Complete all notifications if
flushisTrue, then restart notification accumulation.
- start_accumulation() bool[source]¶
Begin accumulation of notifications and return
True, or returnFalseif accumulation is already occurring.
- async subscribe(observer: asyncutils._internal.types.Observer[P]) asyncutils._internal.types.SubscriptionRV[source]¶
‘Call
wait_until_idle(), then add an observer and return a subscription object that can be used to remove it.
- subscribe_nowait(observer: asyncutils._internal.types.Observer[P]) asyncutils._internal.types.SubscriptionRV[source]¶
Add an observer without waiting for the observable to be idle.
- subscribe_syncf(observer: asyncutils._internal.types.Observer[P]) asyncutils._internal.types.SubscriptionRV[source]¶
Subscribe a synchronous observer by converting it to async in an executor.
- throttle(interval: float, ret_exc: bool = ...) Self[source]¶
Return a new observable that will emit notifications at most once every
intervalseconds.
- async unsubscribe(observer: asyncutils._internal.types.Observer[P], strict: bool = ...) None[source]¶
Call
wait_until_idle(), then remove the observer. IfstrictisTrue, assert that the observer was indeed subscribed.
- unsubscribe_eventually(observer: asyncutils._internal.types.Observer[P], asap: bool = ...) None[source]¶
Note that the observer is to be removed at some point in the future. If
asapisTrueand there is no notification running, the observer is removed immediately.
- unsubscribe_nowait(observer: asyncutils._internal.types.Observer[P], strict: bool = ...) None[source]¶
Remove the observer immediately even if a notification is occurring. If
strictisTrue, assert that the observer was indeed subscribed.
- async wait_for_next(timeout: float | None = ..., strict: bool = ...) tuple[tuple[Any, Ellipsis], dict[str, Any]][source]¶
Wait for the next notification to occur by adding a temporary subscriber and return its parameters as a tuple
(args, kwargs). IfstrictisTrue, assert that another operation did not remove the subscriber prematurely.
- class asyncutils.channels.Rendezvous[T](*, loop: asyncio.AbstractEventLoop = ..., lock: asyncio.Lock = ...)[source]¶
A rendezvous object, emulating Golang’s unbuffered channels.
Instantiate a rendezvous object, which will be maintained by a background task cleaning up its done getters and putters periodically,according tocontext.RENDEZVOUS_MAINTENANCE_INTERVAL. Ifloopis not passed, the running event loop is used. If there is norunning event loop, one is created and set.- __length_hint__() int[source]¶
Approximate number of operations pending. Implemented for
operator.length_hint().
- async exchange(put_val: T, /, *, timeout: float | None = ..., asap: bool = ...) T[source]¶
- Put in a value to the rendezvous and get and return a different value gotten from it.If
asapisTrue, return once a value is available, without necessarily having completed the put.
- async get(default: T | None = ..., *, timeout: float | None = ...) T[source]¶
- Get a value from the rendezvous, blocking until available unless default is passed and timeout is not, in which case the default is returned if a value is not immediately available.If
defaultis not passed andtimeoutis reached, theTimeoutErroris propagated. In any case, the get is cancelled at timeout.
- async put(value: T, /, *, timeout: float | None = ...) bool[source]¶
Like
raising_put(), but returns a boolean representing if the put succeeded. The recommended interface.
- async raising_put(value: T, /, *, timeout: float) None[source]¶
- Put in
valueto the rendezvous, blocking until it is gotten or timeout is reached, at which pointTimeoutErroris raised and the put cancelled.Also be prepared to intercept or reraiseCancelledErrorresulting from reset.
- async reset() None[source]¶
- Hard reset the rendezvous. Call from a monitoring task when a deadlock appears to have occurred.This cancels all pending gets, puts and exchanges; their callers will see
CancelledError.
- state_snapshot() asyncutils._internal.types.StateSnapshot[source]¶
Trigger a cleanup and return a snapshot of the current state of the object.