Source code for asyncutils.events

 1__lazy_modules__ = frozenset(('asyncio', 'time'))
 2import asyncutils as A, asyncio as I
 3from asyncutils.constants import _NO_DEFAULT
 4from asyncutils._internal.submodules import events_all as __all__
 5from _collections import deque
 6from time import monotonic
[docs] 7class SingleWaiterEventWithValue(A.EventMixin): 8 __slots__ = '_waiter',
[docs] 9 def set(self, value): 10 if (w := self._waiter) is None or w.done(): self._waiter = w = self.make_fut() 11 w.set_result(value)
[docs] 12 def is_set(self): return False if (w := self._waiter) is None else w.done()
[docs] 13 async def wait_for_next(self, timeout=None, *, strict=False): 14 if w := self._waiter: 15 if strict: raise RuntimeError('asyncutils.events.SingleWaiterEventWithValue: another waiter is waiting') 16 else: self._waiter = w = self.make_fut() 17 try: return await I.wait_for(w, timeout) 18 finally: self._waiter = None
[docs] 19 def get(self, default=_NO_DEFAULT): 20 if (w := self._waiter) is None or not w.done(): 21 if default is _NO_DEFAULT: raise A.EventValueError('asyncutils.events.SingleWaiterEventWithValue: no value is set') 22 return default 23 return w.result()
[docs] 24 def clear(self): self._waiter = None
25 __init__ = clear
[docs] 26class EventWithValue(A.EventMixin): 27 __slots__ = '_hist', '_value', '_waiters' 28 def __init__(self, *, maxhist=_NO_DEFAULT): self._waiters, self._value, self._hist = set(), None, deque(maxlen=A.getcontext().EVENT_WITH_VALUE_DEFAULT_MAX_HIST if maxhist is _NO_DEFAULT else maxhist) 29 def _record_hist(self): self._hist.append((monotonic(), A.ref(self._value)))
[docs] 30 def set(self, value, *, strict=True): 31 if value != self._value: self._value = value; self._record_hist() 32 if value is None: 33 if strict: raise A.EventValueError('asyncutils.events.EventWithValue: use clear instead') 34 else: 35 f, w = (t := []).append, self._waiters 36 for _ in w: f(_) if _.done() else _.set_result(value) 37 w.difference_update(t)
[docs] 38 def remove_done_waiters(self, _=__import__('operator').methodcaller('done')): (W := self._waiters).difference_update(filter(_, W)) # noqa: B008
[docs] 39 def set_once(self, value): v = self._value; self.set(value); self.set(v)
[docs] 40 def clear(self): self.set(None, strict=False)
[docs] 41 def get(self, default=_NO_DEFAULT): 42 if (v := self._value) is None: 43 if _NO_DEFAULT.is_(default): raise A.EventValueError('asyncutils.events.EventWithValue: no value is set') 44 return default 45 return v
[docs] 46 async def wait_for_next(self, timeout=None): 47 (w := self._waiters).add(F := self.make_fut()) 48 try: return await I.wait_for(F, timeout) 49 finally: w.discard(F)
[docs] 50 def is_set(self): return self._value is not None
51 @property 52 def history(self): return [(t, q) for t, s in self._hist if (q := s()) is not None] 53 @property 54 def history_asdict(self): return {t: q for t, s in self._hist if (q := s()) is not None}
[docs] 55 def recent_history(self, duration=None): 56 if duration is None: duration = A.getcontext().EVENT_WITH_VALUE_DEFAULT_RECENT 57 x, I = monotonic()-duration, iter(self._hist) 58 for t, _ in I: 59 if t >= x: yield t, _; yield from I
[docs] 60 async def wait_for_transition(self, old, new, timeout=None, *, force_transition=False, legacy=False): 61 x = new 62 try: 63 async with I.timeout(timeout): 64 while True: 65 if legacy or x is not old: await self.wait_for_value(old) 66 if new is (x := await self.wait_for_next()): return True 67 except TimeoutError: 68 if force_transition: o = self.get(None); (s := self.set)(old); s(new); s(o, strict=False) 69 return False
[docs] 70 async def wait_for_transition_unordered(self, a, b, timeout=None, *, force_transition=False, legacy=False): return await next(iter((await I.wait(map(self.loop.create_task, (self.wait_for_transition(a, b, timeout, force_transition=force_transition, legacy=legacy), self.wait_for_transition(b, a, timeout, legacy=legacy))), return_when='FIRST_COMPLETED'))[0]))