Source code for asyncutils.futures

 1__lazy_modules__ = frozenset(('_contextvars',))
 2from asyncutils._internal.helpers import copy_and_clear, fullname
 3from asyncutils._internal.submodules import futures_all as __all__
 4from _contextvars import copy_context
 5from asyncio.futures import Future, _PyFuture # ty: ignore[unresolved-import]
 6from asyncio.tasks import Task, _PyTask, eager_task_factory # ty: ignore[unresolved-import]
 7from sys import audit
 8class FB:
 9    def __init__(self): self._creation_time = self.get_loop().time() # ty: ignore[unresolved-attribute]
10    def __lt__(self, other, /): return self._creation_time < other._creation_time
[docs] 11class TimeAwareFuture(FB, Future): ...
[docs] 12class TimeAwareTask(FB, Task): ...
13def ff(a): 14 def remove_callback(self, fn, /): 15 if r := len(C := getattr(self, a))-len(l := [(f, c) for f, c in C if f is not fn]): C[:] = l 16 return r 17 return remove_callback
[docs] 18class AsyncCallbacksFuture(_PyFuture): 19 def __init__(self, *, loop=None): _PyFuture.__init__(self, loop=loop); self._setup() 20 def _setup(self): self._async_callbacks, self._noargs_callbacks, self._noargs_async_callbacks = [], [], []
[docs] 21 def add_async_callback(self, fn, /, *, context=None): 22 if self._state == 'PENDING': self._async_callbacks.append((fn, context or copy_context())) 23 else: self._loop.create_task(fn(self))
[docs] 24 def add_noargs_callback(self, fn, /, *, context=None): 25 if self._state == 'PENDING': self._noargs_callbacks.append((fn, context or copy_context())) 26 else: self._loop.call_soon(fn)
[docs] 27 def add_noargs_async_callback(self, fn, /, *, context=None): 28 if self._state == 'PENDING': self._noargs_async_callbacks.append((fn, context or copy_context())) 29 else: self._loop.create_task(fn())
30 remove_async_callback, remove_noargs_callback, remove_noargs_async_callback = map(ff, ('_async_callbacks', '_noargs_callbacks', '_noargs_async_callbacks')) 31 def __schedule_callbacks(self): 32 audit(f'{fullname(self)}/schedule_callbacks', id(self)); a, b = (l := self._loop).create_task, l.call_soon; c, d, e, f = map(copy_and_clear, (self._async_callbacks, self._callbacks, self._noargs_async_callbacks, self._noargs_callbacks)) 33 for g, _ in c: a(g(self), context=_) 34 for g, _ in d: b(g, self, context=_) 35 for g, _ in e: a(g(), context=_) 36 for g, _ in f: b(g, context=_)
[docs] 37class AsyncCallbacksTask(_PyTask, AsyncCallbacksFuture): # ty: ignore[inconsistent-mro] 38 def __init__(self, coro, **k): _PyTask.__init__(self, coro, **k); self._setup()
[docs] 39class EagerAsyncCallbacksFuture(AsyncCallbacksFuture): 40 def _setup(self): self._loop.set_task_factory(eager_task_factory); super()._setup()
[docs] 41class EagerAsyncCallbacksTask(AsyncCallbacksTask, EagerAsyncCallbacksFuture): ...
[docs] 42class TimeAwareAsyncCallbacksFuture(FB, AsyncCallbacksFuture): ...
[docs] 43class TimeAwareAsyncCallbacksTask(FB, AsyncCallbacksTask): ...
[docs] 44class EagerTimeAwareAsyncCallbacksFuture(TimeAwareAsyncCallbacksFuture, EagerAsyncCallbacksFuture): ...
[docs] 45class EagerTimeAwareAsyncCallbacksTask(TimeAwareAsyncCallbacksTask, EagerAsyncCallbacksTask): ...
46del FB, ff