1__lazy_modules__ = frozenset(('asyncio', 'time'))
2import asyncutils as A, asyncio as I
3from asyncutils.constants import _NO_DEFAULT
4from asyncutils._internal.submodules import events_all as __all__
5from _collections import deque
6from time import monotonic
[docs]
7class SingleWaiterEventWithValue(A.EventMixin):
8 __slots__ = '_waiter',
[docs]
9 def set(self, value):
10 if (w := self._waiter) is None or w.done(): self._waiter = w = self.make_fut()
11 w.set_result(value)
[docs]
12 def is_set(self): return False if (w := self._waiter) is None else w.done()
[docs]
13 async def wait_for_next(self, timeout=None, *, strict=False):
14 if w := self._waiter:
15 if strict: raise RuntimeError('asyncutils.events.SingleWaiterEventWithValue: another waiter is waiting')
16 else: self._waiter = w = self.make_fut()
17 try: return await I.wait_for(w, timeout)
18 finally: self._waiter = None
[docs]
19 def get(self, default=_NO_DEFAULT):
20 if (w := self._waiter) is None or not w.done():
21 if default is _NO_DEFAULT: raise A.EventValueError('asyncutils.events.SingleWaiterEventWithValue: no value is set')
22 return default
23 return w.result()
[docs]
24 def clear(self): self._waiter = None
25 __init__ = clear
[docs]
26class EventWithValue(A.EventMixin):
27 __slots__ = '_hist', '_value', '_waiters'
28 def __init__(self, *, maxhist=_NO_DEFAULT): self._waiters, self._value, self._hist = set(), None, deque(maxlen=A.getcontext().EVENT_WITH_VALUE_DEFAULT_MAX_HIST if maxhist is _NO_DEFAULT else maxhist)
29 def _record_hist(self): self._hist.append((monotonic(), A.ref(self._value)))
[docs]
30 def set(self, value, *, strict=True):
31 if value != self._value: self._value = value; self._record_hist()
32 if value is None:
33 if strict: raise A.EventValueError('asyncutils.events.EventWithValue: use clear instead')
34 else:
35 f, w = (t := []).append, self._waiters
36 for _ in w: f(_) if _.done() else _.set_result(value)
37 w.difference_update(t)
[docs]
38 def remove_done_waiters(self, _=__import__('operator').methodcaller('done')): (W := self._waiters).difference_update(filter(_, W)) # noqa: B008
[docs]
39 def set_once(self, value): v = self._value; self.set(value); self.set(v)
[docs]
40 def clear(self): self.set(None, strict=False)
[docs]
41 def get(self, default=_NO_DEFAULT):
42 if (v := self._value) is None:
43 if _NO_DEFAULT.is_(default): raise A.EventValueError('asyncutils.events.EventWithValue: no value is set')
44 return default
45 return v
[docs]
46 async def wait_for_next(self, timeout=None):
47 (w := self._waiters).add(F := self.make_fut())
48 try: return await I.wait_for(F, timeout)
49 finally: w.discard(F)
[docs]
50 def is_set(self): return self._value is not None
51 @property
52 def history(self): return [(t, q) for t, s in self._hist if (q := s()) is not None]
53 @property
54 def history_asdict(self): return {t: q for t, s in self._hist if (q := s()) is not None}
[docs]
55 def recent_history(self, duration=None):
56 if duration is None: duration = A.getcontext().EVENT_WITH_VALUE_DEFAULT_RECENT
57 x, I = monotonic()-duration, iter(self._hist)
58 for t, _ in I:
59 if t >= x: yield t, _; yield from I
[docs]
60 async def wait_for_transition(self, old, new, timeout=None, *, force_transition=False, legacy=False):
61 x = new
62 try:
63 async with I.timeout(timeout):
64 while True:
65 if legacy or x is not old: await self.wait_for_value(old)
66 if new is (x := await self.wait_for_next()): return True
67 except TimeoutError:
68 if force_transition: o = self.get(None); (s := self.set)(old); s(new); s(o, strict=False)
69 return False
[docs]
70 async def wait_for_transition_unordered(self, a, b, timeout=None, *, force_transition=False, legacy=False): return await next(iter((await I.wait(map(self.loop.create_task, (self.wait_for_transition(a, b, timeout, force_transition=force_transition, legacy=legacy), self.wait_for_transition(b, a, timeout, legacy=legacy))), return_when='FIRST_COMPLETED'))[0]))