Source code for asyncutils.misc

 1__lazy_modules__ = frozenset(('asyncio',))
 2import asyncutils as A, asyncio as I, collections as c
 3from asyncutils.constants import _NO_DEFAULT
 4from asyncutils._internal import helpers as H, patch as P
 5from asyncutils._internal.submodules import misc_all as __all__
 6from sys import audit, exc_info, intern
 7from time import monotonic
[docs] 8class StateMachine: 9 __slots__ = '_entries', '_exits', '_lock', '_state', '_transitions' 10 def __init__(self, state): self._state, self._transitions, self._entries, self._exits, self._lock = intern(state), c.defaultdict(lambda: c.defaultdict(set)), {}, {}, I.Lock()
[docs] 11 def add(self, from_state, to_state, condition=None): self._transitions[intern(from_state)][intern(to_state)].add(condition)
12 on_enter, on_exit = map(lambda attr: lambda self, state: lambda h, /: dict.__setitem__(getattr(self, attr), state, h) or h, __slots__[:2])
[docs] 13 async def transition(self, state): 14 state, s = intern(state), self._state 15 async with self._lock: 16 if None not in (S := self._transitions[s][state]): 17 for _ in S: 18 if await _(s, state): break 19 else: return False 20 await self._helper(1); self._state = state; await self._helper(0); return True
21 async def _helper(self, i, /, _=A.IgnoreErrors(KeyError), s=__slots__): 22 async with _: await getattr(self, s[i])[self._state]() 23 P.patch_method_signatures((_helper, 'attr'))
[docs] 24async def gather_with_limited_concurrency(n=None, /, *coros, ret_exc=False): 25 async def wrapped(c, s=I.Semaphore(A.getcontext().GATHER_WITH_LIMITED_CONCURRENCY_DEFAULT_MAX_CONCURRENT if n is None else n)): # noqa: B008 26 async with s: return await c 27 return await I.gather(*map(wrapped, coros), return_exceptions=ret_exc)
[docs] 28class CallbackAccumulator(c.deque, A.ExecutorRequiredAsyncContextMixin): 29 __slots__ = 'call_once', 'default_getter', 't' 30 def __init__(self, name, it=(), maxlen=None, default=_NO_DEFAULT, call_once=True, default_getter=None): super().__init__(A.aiter_to_gen(it, use_futures=True), maxlen); self.t, self.call_once, self.default_getter = tuple(H.filter_out(name, default, s=_NO_DEFAULT)), call_once, (lambda: (exc_info(), {}) if name == '__exit__' else ((), {})) if default_getter is None else default_getter
[docs] 31 def __call__(self, *a, **k): 32 for f in self: f(*a, **k)
[docs] 33 def __enter__(self): return self
[docs] 34 def __exit__(self, /, *_): a, k = self.default_getter(); self(*a, **k)
[docs] 35 def add(self, o, /): self.append(getattr(o, *self.t))
[docs] 36 def offer_last(self, o, /): 37 if (x := self.maxlen) is None or x > len(self): self.add(o); return True 38 return False
39 @property 40 def callbacks(self): return self.copy()
[docs] 41 def __iter__(self): 42 if self.call_once: 43 p = self.popleft 44 while self: yield p() 45 else: yield from self.callbacks
[docs] 46class CacheWithBackgroundRefresh(A.LoopContextMixin): 47 _executor = None; __slots__ = '_cache', '_event', '_loaders', '_lock', '_processor', '_refresh', '_task', '_timer', '_ttl' 48 def __init__(self, ttl=None, refresh=None, *, processor=None, default_loader=None, timer=monotonic): 49 C = A.getcontext() 50 if ttl is None: ttl = C.BACKGROUND_REFRESH_CACHE_DEFAULT_TTL 51 if refresh is None: refresh = C.BACKGROUND_REFRESH_CACHE_DEFAULT_REFRESH 52 audit(H.fullname(self), ttl, refresh); super().__init__(); self._cache, self._lock, self._loaders, self._task, self._event, self._timer = {}, I.Lock(), c.defaultdict(lambda: default_loader), None, I.Event(), timer; self.configure(ttl, refresh, processor)
[docs] 53 def __contains__(self, key): return key in self._cache
[docs] 54 def register_loader(self, key, loader): self._loaders[key] = loader
[docs] 55 def expired(self, key): return self.time_past(key) > self._ttl
[docs] 56 def should_refresh(self, key): return self.time_past(key) > self._ttl-self._refresh if key in self else False
[docs] 57 def time_past(self, key): return self._timer()-self._cache[key].timestamp
[docs] 58 def configure(self, ttl, refresh, processor=None, _=lambda *_: None): self._ttl, self._refresh, self._processor = max(ttl, refresh), min(ttl, refresh), processor or getattr(self, '_processor', _)
[docs] 59 def get_loader(self, key): 60 if (k := self._loaders[key]) is None: raise LookupError(f'asyncutils.misc.CacheWithBackgroundRefresh: no loader registered for key {key!r}') 61 return k
62 async def _process_error(self, e, b, /): 63 if (x := (c := type(self))._executor) is None: x = H.create_executor(c) 64 if I.iscoroutine(r := await self.loop.run_in_executor(x, self._processor, e, b)): await r
[docs] 65 async def get(self, key, loader=None): 66 async with self._lock: 67 if loader is not None: self.register_loader(key, loader) 68 if self.should_refresh(key): await self.refresh_item(key) 69 if key not in self._cache or self.expired(key): await self.load_item(key) 70 return self._cache[key].value
[docs] 71 async def __setup__(self): 72 if self._task is None: self._event.clear(); self._task = self.make(self.refresh_loop())
[docs] 73 async def __cleanup__(self): 74 if self._task: self._event.set(); await A.safe_cancel(self._task); self._task = None
[docs] 75 async def load_item(self, key, _=c.namedtuple('CacheEntry', 'value timestamp loading', module='asyncutils.misc')): 76 _ = _(await self.get_loader(key)(key), self._timer(), False) 77 async with self._lock: self._cache[key] = _
[docs] 78 async def refresh_item(self, key): 79 async with self._lock: 80 if key not in self._loaders or (t := self._cache.get(key)) is None or t.loading: return 81 t.loading = True 82 try: await self.load_item(key) 83 except A.CRITICAL: raise A.Critical 84 except BaseException as e: t.loading = False; await self._process_error(e, False) # noqa: BLE001
[docs] 85 async def refresh_loop(self): 86 r, t = self._refresh, [] 87 while True: 88 await I.sleep(r) 89 try: 90 async with self._lock: d, f = self._timer()-self._ttl+self._refresh, self.refresh_item; t.extend(self.make_multiple(f(k) for k, v in self._cache.items() if not v.loading and d > v.timestamp)) 91 if t: await I.gather(*t); t.clear() 92 except A.CRITICAL: raise A.Critical 93 except I.CancelledError: raise 94 except BaseException as e: await self._process_error(e, True) # noqa: BLE001
[docs] 95 async def invalidate(self, key): 96 async with self._lock: return self._cache.pop(key, None)
[docs] 97 async def clear(self): 98 async with self._lock: self._cache.clear()
99 P.patch_method_signatures((__init__, 'ttl=None, refresh=None, *, processor=None, default_loader=None, timer={}'), (configure, 'ttl, refresh, processor=None'), (load_item, 'key'))