1# ty: ignore[unresolved-attribute]
2__lazy_modules__ = frozenset(('functools',))
3from asyncutils.config import _randinst
4from asyncutils.constants import _NO_DEFAULT
5from asyncutils._internal import patch as P
6from asyncutils._internal.helpers import fullname
7from asyncutils._internal.submodules import altlocks_all as __all__
8from _collections import deque
9import asyncio as I, asyncutils as A
10from functools import wraps
11from itertools import count
12from sys import audit
13from time import monotonic
[docs]
14class Releasing:
15 __slots__ = '_lock',
16 def __init__(self, l, /): self._lock = l
[docs]
17 async def __aenter__(self):
18 if not (l := self._lock).locked(): raise RuntimeError('asyncutils.altlocks.Releasing: lock is not acquired')
19 if I.iscoroutine(r := l.release()): await r
[docs]
20 async def __aexit__(self, *_): await self._lock.acquire()
[docs]
21class ResourceGuard(RuntimeError, A.AsyncContextMixin):
22 _inc_cnt = staticmethod(count(1).__next__); __slots__ = 'guarded',
23 def __init__(self, action='using', rname=None): super().__init__(f'another task is already {action} resource{f" #{self._inc_cnt()}" if rname is None else f": {rname!r}"}'); self.guarded = False
[docs]
24 def __enter__(self):
25 if self.guarded: raise self
26 self.guarded = True
[docs]
27 def __exit__(self, /, *_, e=RuntimeError('asyncutils.altlocks.ResourceGuard: __aexit__ called without prior __aenter__ call')): # noqa: B008
28 if not self.guarded: raise e
29 self.guarded = False
[docs]
30 @classmethod
31 def guard(cls, obj, /, *, action='using'): return cls(action, obj)
32 P.patch_method_signatures((__exit__, P.xsig))
[docs]
33class UniqueResourceGuard(ResourceGuard):
34 _cache = __import__('weakref').WeakValueDictionary(); __slots__ = '__weakref__',
35 def __init_subclass__(cls, /, **_): raise TypeError('cannot subclass asyncutils.altlocks.UniqueResourceGuard')
[docs]
36 @classmethod
37 def guard(cls, obj, /, **_):
38 if (r := (c := cls._cache).get(obj)) is None: c[obj] = r = super().guard(obj, **_)
39 audit('asyncutils.altlocks.UniqueResourceGuard', fullname(obj)); return r
[docs]
40 @classmethod
41 def clear_cache(cls): audit('asyncutils.altlocks.UniqueResourceGuard.clear_cache'); cls._cache.clear() # pragma: no cover
42 P.patch_method_signatures((guard, "obj, /, *, action='using'"))
[docs]
43class CircuitBreaker:
44 CLOSED, HALF_OPEN, OPEN = range(3)
45 __slots__ = '_exc', '_half_open_calls', '_lock', '_max_fails', '_max_half_open_calls', '_opened', '_reset', '_unlock', 'fails', 'name', 'state'; _inc_cnt = staticmethod(count(1).__next__)
46 def __new__(cls, n, /, max_fails=None, reset=None, *, exc=Exception, max_half_open_calls=None, _='#%d'):
47 f = None
48 if callable(n) and (n := getattr(f := getattr(getattr(n, '__func__', n), '__wrapped__', n), '__qualname__', None)) is None is (n := getattr(f, '__name__', None)): n = _%cls._inc_cnt()
49 audit('asyncutils.altlocks.CircuitBreaker', n, max_fails); s, C = super().__new__(cls), A.getcontext(); s.name, s._max_fails, s._reset, s._exc, s._opened, s._max_half_open_calls, s._unlock, s._lock, s.state = n, C.CIRCUIT_BREAKER_DEFAULT_MAX_FAILS if max_fails is None else max_fails, C.CIRCUIT_BREAKER_DEFAULT_RESET if reset is None else reset, exc, float('-inf'), C.CIRCUIT_BREAKER_DEFAULT_MAX_HALF_OPEN_CALLS if max_half_open_calls is None else max_half_open_calls, Releasing(l := I.Lock()), l, s.CLOSED; s.fails = s._half_open_calls = 0; return s if f is None else s(f)
[docs]
50 def __call__(self, f, /, *, timer=monotonic, default=_NO_DEFAULT):
51 audit('asyncutils.altlocks.CircuitBreaker.__call__', self.name, fullname(f))
52 async def g(*a, **k):
53 async with self._lock:
54 if (s := self.state) == self.OPEN:
55 if timer()-self._opened > self._reset: self.state, self._half_open_calls = self.HALF_OPEN, 0
56 else: raise A.CircuitOpen(f'asyncutils.altlocks.CircuitBreaker: circuit {self.name} is open')
57 elif s == self.HALF_OPEN:
58 if (c := self._half_open_calls) == (m := self._max_half_open_calls): raise A.CircuitHalfOpen(f'asyncutils.altlocks.CircuitBreaker: breaker {self.name} exceeded the maximum of {m} calls in the half-open state')
59 self._half_open_calls = c+1
60 try:
61 async with self._unlock: r = await f(*a, **k)
62 if s == self.HALF_OPEN: self._half_open_calls = self.fails = 0; self.state = self.CLOSED
63 return r
64 except self._exc:
65 if (x := self.fails+1) < self._max_fails: self.fails = x
66 else: self._opened, self.state, self.fails = timer(), self.OPEN, 0
67 if default is _NO_DEFAULT: raise
68 return default
69 except A.CRITICAL: raise A.Critical
70 except BaseException as e: raise A.CircuitBreakerError(f'asyncutils.altlocks.CircuitBreaker: unexpected {fullname(e)} in {fullname(f)} under breaker {self.name!r}') from e
71 return wraps(f)(g)
72 P.patch_classmethod_signatures((__new__, 'name, /, max_fails=None, reset=None, *, exc={}, max_half_open_calls=None'))
[docs]
73class StatefulBarrier(A.AwaitableMixin):
74 __slots__ = '_broken', '_cond', '_count', '_exc', '_initstate', '_parties', '_state'
75 def __init__(self, parties, name='\b', initstate=(), maxstate=None): self._parties, self._exc, self._count, self._state, self._cond, self._initstate, self._broken = parties, I.BrokenBarrierError(f'{fullname(self)} {name} is broken'), 0, deque(maxlen=maxstate), I.Condition(), initstate, False
76 async def _wait(self, state=None):
77 async with (C := self._cond):
78 self.raise_for_abort(); f = (S := self._state).append
79 if (s := self._initstate) is not None:
80 async for _ in A.iter_to_agen(s): f(_)
81 self._initstate = None
82 self._count = (c := self._count)+1
83 if state is not None: f(state)
84 if c == self._parties-1: self._count = 0; C.notify_all(); self._broken = True
85 else:
86 w = C.wait
87 while not self._broken: await w()
88 return c, S.copy()
[docs]
89 async def wait(self, state=None, timeout=None):
90 try: return await I.wait_for(self._wait(state), timeout)
91 except TimeoutError: await self.abort(); raise
[docs]
92 async def abort(self):
93 async with (C := self._cond):
94 if not self._broken: self._broken = True; C.notify_all()
[docs]
95 def raise_for_abort(self):
96 if self._broken: raise self._exc
97 @property
98 def broken(self): return self._broken
99 @property
100 def remaining_parties(self): return self._parties-self._count
101 @property
102 def parties(self): return self._parties
103 @property
104 def n_waiting(self): return self._count
[docs]
105class DynamicThrottle:
106 __slots__ = '_fails', '_jitter', '_lb', '_lc', '_lf', '_lock', '_max', '_min', '_randf', '_rate', '_successes', '_timer', '_ub', '_uf', '_window'
107 def __init__(self, init_rate, min_rate=None, max_rate=None, window=None, *, ubound=None, lbound=None, ufactor=None, lfactor=None, jitter=None, timer=monotonic, rand=lambda j, u=_randinst.uniform: u(-j, j)):
108 C = A.getcontext()
109 if min_rate is None: min_rate = C.DYNAMIC_THROTTLE_DEFAULT_MIN_RATE
110 if max_rate is None: max_rate = C.DYNAMIC_THROTTLE_DEFAULT_MAX_RATE
111 if not 0 < min_rate <= init_rate <= max_rate: raise ValueError('asyncutils.altlocks.DynamicThrottle: inconsistent rates after applying bounds')
112 self._min, self._max, self._window, self._lock, self._timer, self._ub, self._lb, self._uf, self._lf, self.jitter, self._randf, self._rate, self._lc = min_rate, max_rate, C.DYNAMIC_THROTTLE_DEFAULT_WINDOW if window is None else window, I.Lock(), timer, C.DYNAMIC_THROTTLE_DEFAULT_UBOUND if ubound is None else ubound, C.DYNAMIC_THROTTLE_DEFAULT_LBOUND if lbound is None else lbound, C.DYNAMIC_THROTTLE_DEFAULT_UFACTOR if ufactor is None else ufactor, C.DYNAMIC_THROTTLE_DEFAULT_LFACTOR if lfactor is None else lfactor, C.DYNAMIC_THROTTLE_DEFAULT_JITTER if jitter is None else jitter, rand, init_rate, timer()-1.0/init_rate; self.reset()
113 @property
114 def rate(self): return self._rate
115 @rate.setter
116 def rate(self, rate, /, _=0.02):
117 if abs(1-self._rate/(rate := max(self._min, min(self._max, rate)))) > _: self._rate = rate
118 @property
119 def jitter(self): return self._jitter
120 @jitter.setter
121 def jitter(self, jitter, /): self._jitter = max(0.0, float(jitter))
122 @property
123 def ctime(self): return self._timer()
124 @property
125 def successes(self): return self._successes
126 @property
127 def fails(self): return self._fails
[docs]
128 async def __aenter__(self): await I.sleep((1.0/self._rate-self.ctime+self._lc)*(1.0+self._randf(self._jitter))); self._lc = self.ctime
[docs]
129 async def __aexit__(self, e, /, *_):
130 async with self._lock:
131 if (t := (s := self._successes)+self._fails) >= self._window: self.rate *= self._uf if (r := s/t) > self._ub else self._lf if r < self._lb else 1.0; self.reset()
132 if e is None: self._successes += 1
133 else: self._fails += 1
[docs]
134 def reset(self): self._successes = self._fails = 0
135 P.patch_method_signatures((__aexit__, P.xsig))
136del _randinst, count, P