Source code for asyncutils.altlocks

  1# ty: ignore[unresolved-attribute]
  2__lazy_modules__ = frozenset(('functools',))
  3from asyncutils.config import _randinst
  4from asyncutils.constants import _NO_DEFAULT
  5from asyncutils._internal import patch as P
  6from asyncutils._internal.helpers import fullname
  7from asyncutils._internal.submodules import altlocks_all as __all__
  8from _collections import deque
  9import asyncio as I, asyncutils as A
 10from functools import wraps
 11from itertools import count
 12from sys import audit
 13from time import monotonic
[docs] 14class Releasing: 15 __slots__ = '_lock', 16 def __init__(self, l, /): self._lock = l
[docs] 17 async def __aenter__(self): 18 if not (l := self._lock).locked(): raise RuntimeError('asyncutils.altlocks.Releasing: lock is not acquired') 19 if I.iscoroutine(r := l.release()): await r
[docs] 20 async def __aexit__(self, *_): await self._lock.acquire()
[docs] 21class ResourceGuard(RuntimeError, A.AsyncContextMixin): 22 _inc_cnt = staticmethod(count(1).__next__); __slots__ = 'guarded', 23 def __init__(self, action='using', rname=None): super().__init__(f'another task is already {action} resource{f" #{self._inc_cnt()}" if rname is None else f": {rname!r}"}'); self.guarded = False
[docs] 24 def __enter__(self): 25 if self.guarded: raise self 26 self.guarded = True
[docs] 27 def __exit__(self, /, *_, e=RuntimeError('asyncutils.altlocks.ResourceGuard: __aexit__ called without prior __aenter__ call')): # noqa: B008 28 if not self.guarded: raise e 29 self.guarded = False
[docs] 30 @classmethod 31 def guard(cls, obj, /, *, action='using'): return cls(action, obj)
32 P.patch_method_signatures((__exit__, P.xsig))
[docs] 33class UniqueResourceGuard(ResourceGuard): 34 _cache = __import__('weakref').WeakValueDictionary(); __slots__ = '__weakref__', 35 def __init_subclass__(cls, /, **_): raise TypeError('cannot subclass asyncutils.altlocks.UniqueResourceGuard')
[docs] 36 @classmethod 37 def guard(cls, obj, /, **_): 38 if (r := (c := cls._cache).get(obj)) is None: c[obj] = r = super().guard(obj, **_) 39 audit('asyncutils.altlocks.UniqueResourceGuard', fullname(obj)); return r
[docs] 40 @classmethod 41 def clear_cache(cls): audit('asyncutils.altlocks.UniqueResourceGuard.clear_cache'); cls._cache.clear() # pragma: no cover
42 P.patch_method_signatures((guard, "obj, /, *, action='using'"))
[docs] 43class CircuitBreaker: 44 CLOSED, HALF_OPEN, OPEN = range(3) 45 __slots__ = '_exc', '_half_open_calls', '_lock', '_max_fails', '_max_half_open_calls', '_opened', '_reset', '_unlock', 'fails', 'name', 'state'; _inc_cnt = staticmethod(count(1).__next__) 46 def __new__(cls, n, /, max_fails=None, reset=None, *, exc=Exception, max_half_open_calls=None, _='#%d'): 47 f = None 48 if callable(n) and (n := getattr(f := getattr(getattr(n, '__func__', n), '__wrapped__', n), '__qualname__', None)) is None is (n := getattr(f, '__name__', None)): n = _%cls._inc_cnt() 49 audit('asyncutils.altlocks.CircuitBreaker', n, max_fails); s, C = super().__new__(cls), A.getcontext(); s.name, s._max_fails, s._reset, s._exc, s._opened, s._max_half_open_calls, s._unlock, s._lock, s.state = n, C.CIRCUIT_BREAKER_DEFAULT_MAX_FAILS if max_fails is None else max_fails, C.CIRCUIT_BREAKER_DEFAULT_RESET if reset is None else reset, exc, float('-inf'), C.CIRCUIT_BREAKER_DEFAULT_MAX_HALF_OPEN_CALLS if max_half_open_calls is None else max_half_open_calls, Releasing(l := I.Lock()), l, s.CLOSED; s.fails = s._half_open_calls = 0; return s if f is None else s(f)
[docs] 50 def __call__(self, f, /, *, timer=monotonic, default=_NO_DEFAULT): 51 audit('asyncutils.altlocks.CircuitBreaker.__call__', self.name, fullname(f)) 52 async def g(*a, **k): 53 async with self._lock: 54 if (s := self.state) == self.OPEN: 55 if timer()-self._opened > self._reset: self.state, self._half_open_calls = self.HALF_OPEN, 0 56 else: raise A.CircuitOpen(f'asyncutils.altlocks.CircuitBreaker: circuit {self.name} is open') 57 elif s == self.HALF_OPEN: 58 if (c := self._half_open_calls) == (m := self._max_half_open_calls): raise A.CircuitHalfOpen(f'asyncutils.altlocks.CircuitBreaker: breaker {self.name} exceeded the maximum of {m} calls in the half-open state') 59 self._half_open_calls = c+1 60 try: 61 async with self._unlock: r = await f(*a, **k) 62 if s == self.HALF_OPEN: self._half_open_calls = self.fails = 0; self.state = self.CLOSED 63 return r 64 except self._exc: 65 if (x := self.fails+1) < self._max_fails: self.fails = x 66 else: self._opened, self.state, self.fails = timer(), self.OPEN, 0 67 if default is _NO_DEFAULT: raise 68 return default 69 except A.CRITICAL: raise A.Critical 70 except BaseException as e: raise A.CircuitBreakerError(f'asyncutils.altlocks.CircuitBreaker: unexpected {fullname(e)} in {fullname(f)} under breaker {self.name!r}') from e 71 return wraps(f)(g)
72 P.patch_classmethod_signatures((__new__, 'name, /, max_fails=None, reset=None, *, exc={}, max_half_open_calls=None'))
[docs] 73class StatefulBarrier(A.AwaitableMixin): 74 __slots__ = '_broken', '_cond', '_count', '_exc', '_initstate', '_parties', '_state' 75 def __init__(self, parties, name='\b', initstate=(), maxstate=None): self._parties, self._exc, self._count, self._state, self._cond, self._initstate, self._broken = parties, I.BrokenBarrierError(f'{fullname(self)} {name} is broken'), 0, deque(maxlen=maxstate), I.Condition(), initstate, False 76 async def _wait(self, state=None): 77 async with (C := self._cond): 78 self.raise_for_abort(); f = (S := self._state).append 79 if (s := self._initstate) is not None: 80 async for _ in A.iter_to_agen(s): f(_) 81 self._initstate = None 82 self._count = (c := self._count)+1 83 if state is not None: f(state) 84 if c == self._parties-1: self._count = 0; C.notify_all(); self._broken = True 85 else: 86 w = C.wait 87 while not self._broken: await w() 88 return c, S.copy()
[docs] 89 async def wait(self, state=None, timeout=None): 90 try: return await I.wait_for(self._wait(state), timeout) 91 except TimeoutError: await self.abort(); raise
[docs] 92 async def abort(self): 93 async with (C := self._cond): 94 if not self._broken: self._broken = True; C.notify_all()
[docs] 95 def raise_for_abort(self): 96 if self._broken: raise self._exc
97 @property 98 def broken(self): return self._broken 99 @property 100 def remaining_parties(self): return self._parties-self._count 101 @property 102 def parties(self): return self._parties 103 @property 104 def n_waiting(self): return self._count
[docs] 105class DynamicThrottle: 106 __slots__ = '_fails', '_jitter', '_lb', '_lc', '_lf', '_lock', '_max', '_min', '_randf', '_rate', '_successes', '_timer', '_ub', '_uf', '_window' 107 def __init__(self, init_rate, min_rate=None, max_rate=None, window=None, *, ubound=None, lbound=None, ufactor=None, lfactor=None, jitter=None, timer=monotonic, rand=lambda j, u=_randinst.uniform: u(-j, j)): 108 C = A.getcontext() 109 if min_rate is None: min_rate = C.DYNAMIC_THROTTLE_DEFAULT_MIN_RATE 110 if max_rate is None: max_rate = C.DYNAMIC_THROTTLE_DEFAULT_MAX_RATE 111 if not 0 < min_rate <= init_rate <= max_rate: raise ValueError('asyncutils.altlocks.DynamicThrottle: inconsistent rates after applying bounds') 112 self._min, self._max, self._window, self._lock, self._timer, self._ub, self._lb, self._uf, self._lf, self.jitter, self._randf, self._rate, self._lc = min_rate, max_rate, C.DYNAMIC_THROTTLE_DEFAULT_WINDOW if window is None else window, I.Lock(), timer, C.DYNAMIC_THROTTLE_DEFAULT_UBOUND if ubound is None else ubound, C.DYNAMIC_THROTTLE_DEFAULT_LBOUND if lbound is None else lbound, C.DYNAMIC_THROTTLE_DEFAULT_UFACTOR if ufactor is None else ufactor, C.DYNAMIC_THROTTLE_DEFAULT_LFACTOR if lfactor is None else lfactor, C.DYNAMIC_THROTTLE_DEFAULT_JITTER if jitter is None else jitter, rand, init_rate, timer()-1.0/init_rate; self.reset() 113 @property 114 def rate(self): return self._rate 115 @rate.setter 116 def rate(self, rate, /, _=0.02): 117 if abs(1-self._rate/(rate := max(self._min, min(self._max, rate)))) > _: self._rate = rate 118 @property 119 def jitter(self): return self._jitter 120 @jitter.setter 121 def jitter(self, jitter, /): self._jitter = max(0.0, float(jitter)) 122 @property 123 def ctime(self): return self._timer() 124 @property 125 def successes(self): return self._successes 126 @property 127 def fails(self): return self._fails
[docs] 128 async def __aenter__(self): await I.sleep((1.0/self._rate-self.ctime+self._lc)*(1.0+self._randf(self._jitter))); self._lc = self.ctime
[docs] 129 async def __aexit__(self, e, /, *_): 130 async with self._lock: 131 if (t := (s := self._successes)+self._fails) >= self._window: self.rate *= self._uf if (r := s/t) > self._ub else self._lf if r < self._lb else 1.0; self.reset() 132 if e is None: self._successes += 1 133 else: self._fails += 1
[docs] 134 def reset(self): self._successes = self._fails = 0
135 P.patch_method_signatures((__aexit__, P.xsig))
136del _randinst, count, P