1__lazy_modules__ = frozenset(('asyncio',))
2import asyncutils as A, asyncio as I, collections as c
3from asyncutils.constants import _NO_DEFAULT
4from asyncutils._internal import helpers as H, patch as P
5from asyncutils._internal.submodules import misc_all as __all__
6from sys import audit, exc_info, intern
7from time import monotonic
[docs]
8class StateMachine:
9 __slots__ = '_entries', '_exits', '_lock', '_state', '_transitions'
10 def __init__(self, state): self._state, self._transitions, self._entries, self._exits, self._lock = intern(state), c.defaultdict(lambda: c.defaultdict(set)), {}, {}, I.Lock()
[docs]
11 def add(self, from_state, to_state, condition=None): self._transitions[intern(from_state)][intern(to_state)].add(condition)
12 on_enter, on_exit = map(lambda attr: lambda self, state: lambda h, /: dict.__setitem__(getattr(self, attr), state, h) or h, __slots__[:2])
[docs]
13 async def transition(self, state):
14 state, s = intern(state), self._state
15 async with self._lock:
16 if None not in (S := self._transitions[s][state]):
17 for _ in S:
18 if await _(s, state): break
19 else: return False
20 await self._helper(1); self._state = state; await self._helper(0); return True
21 async def _helper(self, i, /, _=A.IgnoreErrors(KeyError), s=__slots__):
22 async with _: await getattr(self, s[i])[self._state]()
23 P.patch_method_signatures((_helper, 'attr'))
[docs]
24async def gather_with_limited_concurrency(n=None, /, *coros, ret_exc=False):
25 async def wrapped(c, s=I.Semaphore(A.getcontext().GATHER_WITH_LIMITED_CONCURRENCY_DEFAULT_MAX_CONCURRENT if n is None else n)): # noqa: B008
26 async with s: return await c
27 return await I.gather(*map(wrapped, coros), return_exceptions=ret_exc)
[docs]
28class CallbackAccumulator(c.deque, A.ExecutorRequiredAsyncContextMixin):
29 __slots__ = 'call_once', 'default_getter', 't'
30 def __init__(self, name, it=(), maxlen=None, default=_NO_DEFAULT, call_once=True, default_getter=None): super().__init__(A.aiter_to_gen(it, use_futures=True), maxlen); self.t, self.call_once, self.default_getter = tuple(H.filter_out(name, default, s=_NO_DEFAULT)), call_once, (lambda: (exc_info(), {}) if name == '__exit__' else ((), {})) if default_getter is None else default_getter
[docs]
31 def __call__(self, *a, **k):
32 for f in self: f(*a, **k)
[docs]
33 def __enter__(self): return self
[docs]
34 def __exit__(self, /, *_): a, k = self.default_getter(); self(*a, **k)
[docs]
35 def add(self, o, /): self.append(getattr(o, *self.t))
[docs]
36 def offer_last(self, o, /):
37 if (x := self.maxlen) is None or x > len(self): self.add(o); return True
38 return False
39 @property
40 def callbacks(self): return self.copy()
[docs]
41 def __iter__(self):
42 if self.call_once:
43 p = self.popleft
44 while self: yield p()
45 else: yield from self.callbacks
[docs]
46class CacheWithBackgroundRefresh(A.LoopContextMixin):
47 _executor = None; __slots__ = '_cache', '_event', '_loaders', '_lock', '_processor', '_refresh', '_task', '_timer', '_ttl'
48 def __init__(self, ttl=None, refresh=None, *, processor=None, default_loader=None, timer=monotonic):
49 C = A.getcontext()
50 if ttl is None: ttl = C.BACKGROUND_REFRESH_CACHE_DEFAULT_TTL
51 if refresh is None: refresh = C.BACKGROUND_REFRESH_CACHE_DEFAULT_REFRESH
52 audit(H.fullname(self), ttl, refresh); super().__init__(); self._cache, self._lock, self._loaders, self._task, self._event, self._timer = {}, I.Lock(), c.defaultdict(lambda: default_loader), None, I.Event(), timer; self.configure(ttl, refresh, processor)
[docs]
53 def __contains__(self, key): return key in self._cache
[docs]
54 def register_loader(self, key, loader): self._loaders[key] = loader
[docs]
55 def expired(self, key): return self.time_past(key) > self._ttl
[docs]
56 def should_refresh(self, key): return self.time_past(key) > self._ttl-self._refresh if key in self else False
[docs]
57 def time_past(self, key): return self._timer()-self._cache[key].timestamp
[docs]
59 def get_loader(self, key):
60 if (k := self._loaders[key]) is None: raise LookupError(f'asyncutils.misc.CacheWithBackgroundRefresh: no loader registered for key {key!r}')
61 return k
62 async def _process_error(self, e, b, /):
63 if (x := (c := type(self))._executor) is None: x = H.create_executor(c)
64 if I.iscoroutine(r := await self.loop.run_in_executor(x, self._processor, e, b)): await r
[docs]
65 async def get(self, key, loader=None):
66 async with self._lock:
67 if loader is not None: self.register_loader(key, loader)
68 if self.should_refresh(key): await self.refresh_item(key)
69 if key not in self._cache or self.expired(key): await self.load_item(key)
70 return self._cache[key].value
[docs]
71 async def __setup__(self):
72 if self._task is None: self._event.clear(); self._task = self.make(self.refresh_loop())
[docs]
73 async def __cleanup__(self):
74 if self._task: self._event.set(); await A.safe_cancel(self._task); self._task = None
[docs]
75 async def load_item(self, key, _=c.namedtuple('CacheEntry', 'value timestamp loading', module='asyncutils.misc')):
76 _ = _(await self.get_loader(key)(key), self._timer(), False)
77 async with self._lock: self._cache[key] = _
[docs]
78 async def refresh_item(self, key):
79 async with self._lock:
80 if key not in self._loaders or (t := self._cache.get(key)) is None or t.loading: return
81 t.loading = True
82 try: await self.load_item(key)
83 except A.CRITICAL: raise A.Critical
84 except BaseException as e: t.loading = False; await self._process_error(e, False) # noqa: BLE001
[docs]
85 async def refresh_loop(self):
86 r, t = self._refresh, []
87 while True:
88 await I.sleep(r)
89 try:
90 async with self._lock: d, f = self._timer()-self._ttl+self._refresh, self.refresh_item; t.extend(self.make_multiple(f(k) for k, v in self._cache.items() if not v.loading and d > v.timestamp))
91 if t: await I.gather(*t); t.clear()
92 except A.CRITICAL: raise A.Critical
93 except I.CancelledError: raise
94 except BaseException as e: await self._process_error(e, True) # noqa: BLE001
[docs]
95 async def invalidate(self, key):
96 async with self._lock: return self._cache.pop(key, None)
[docs]
97 async def clear(self):
98 async with self._lock: self._cache.clear()
99 P.patch_method_signatures((__init__, 'ttl=None, refresh=None, *, processor=None, default_loader=None, timer={}'), (configure, 'ttl, refresh, processor=None'), (load_item, 'key'))