1__lazy_modules__ = frozenset(('heapq', 'asyncio'))
2import asyncutils as A, asyncio as I
3from asyncutils._internal.helpers import LoopMixinBase, fullname, subscriptable
4from asyncutils._internal.submodules import locks_all as __all__
5from _collections import defaultdict, deque
6from heapq import heappop, heappush
7from time import monotonic
[docs]
8class DynamicBoundedSemaphore(I.BoundedSemaphore):
9 def __init__(self, value=None): super().__init__(A.getcontext().DYNAMIC_BOUNDED_SEMAPHORE_DEFAULT_VALUE if value is None else value); self._waiters = deque() # type: deque
10 @property
11 def bound(self): return self._bound_value
12 @bound.setter
13 def bound(self, value, /):
14 if value < 0: raise ValueError('asyncutils.locks.DynamicBoundedSemaphore: bound must be non-negative')
15 d, self._bound_value, f = value-self._bound_value, value, (W := self._waiters).popleft # ty: ignore[unresolved-attribute]
16 while d and W:
17 if not (w := f()).done(): w.set_result(None); d -= 1
18def d(m, /, _=__import__('functools').wraps):
19 async def w(self, *a, **k):
20 async with self._lock: self.update_tokens_lock_held(); m(self, *a, **k)
21 return _(m)(w)
[docs]
22class AdvancedRateLimit(LoopMixinBase, A.LockMixin):
23 __slots__ = '_lock', '_lu', '_unfair', '_waiters', 'capacity', 'rate', 'tokens'
24 def __init__(self, rate, capacity=None, fair=True): super().__init__(); self.rate, self._lock, self._waiters, self._unfair, self._lu = rate, I.Lock(), deque(), not fair, monotonic(); self.tokens = self.capacity = capacity or rate
[docs]
25 async def acquire(self, tokens=None, timeout=None):
26 async with self._lock:
27 self.update_tokens_lock_held()
28 if tokens > self.tokens: w = self._waiters; (w.appendleft if self._unfair else w.append)((A.getcontext().ADVANCED_RATE_LIMIT_DEFAULT_TOKENS if tokens is None else tokens, F := self.loop.create_future()))
29 else: self.tokens -= tokens; return True
30 try: await I.wait_for(F, timeout); return True
31 except TimeoutError: return False
[docs]
32 @d
33 def release(self, tokens=None): self.tokens = min(self.tokens+(A.getcontext().ADVANCED_RATE_LIMIT_DEFAULT_TOKENS if tokens is None else tokens), self.capacity)
[docs]
34 @d
35 def set_rate(self, new): self.rate, self.capacity = new, max(self.capacity, new)
[docs]
36 def locked(self): return bool(self._waiters)
[docs]
37 def update_tokens_lock_held(self):
38 if not (w := self._waiters): return
39 e, p, self._lu = (n := monotonic())-self._lu, w.popleft, n; T = min(self.capacity, self.tokens+e*self.rate)
40 while (t := p())[0] <= T and w:
41 t, f = t; T -= t
42 if not f.done(): f.set_result(None)
43 self.tokens = T; w.appendleft(t)
[docs]
44class PrioritySemaphore(LoopMixinBase, A.LockMixin):
45 __slots__ = '_tiebreak', '_value', '_waiters'
46 def __init__(self, value=None): self._value, self._tiebreak, self._waiters = A.getcontext().PRIORITY_SEMAPHORE_DEFAULT_VALUE if value is None else value, 0, []
[docs]
47 async def acquire(self, priority=0):
48 self._value -= 1; self._tiebreak += 1; w = self._waiters
49 while self._value < 0: heappush(w, (priority, self._tiebreak, F := self.make_fut())); await F
50 return True
[docs]
51 def release(self, strict=True):
52 if w := self._waiters: heappop(w)[-1].set_result(None)
53 elif strict: raise RuntimeError('asyncutils.locks.PrioritySemaphore: release called too many times')
54 self._value += 1
[docs]
55 def locked(self): return self._value < 0
[docs]
56 def reset(self):
57 for *_, e in self._waiters: e.set_result(None)
58 self._value, self._tiebreak = 1, 0; self._waiters.clear()
[docs]
59@subscriptable
60class KeyedCondition(LoopMixinBase, A.LockMixin):
61 __slots__ = '__lock', '_specific_waiters'
62 def __init__(self, lock=None): super().__init__(); self.__lock, self._specific_waiters = lock or I.Lock(), defaultdict(set)
[docs]
63 async def acquire(self):
64 with A.ignore_noncritical:
65 if await self.__lock.acquire() != False: return True # noqa: E712
66 return False
[docs]
67 async def release(self):
68 if I.iscoroutine(r := self.__lock.release()): await r
[docs]
69 def locked(self): return self.__lock.locked()
[docs]
70 async def wait(self, key, timeout=None):
71 self.assert_locked(); (s := self._specific_waiters[key]).add(F := self.make_fut())
72 try: await I.wait_for(F, timeout)
73 finally: s.discard(F)
[docs]
74 async def wait_for(self, key, pred, per_wait_timeout=None):
75 self.assert_locked(); f, g, h, F = (s := self._specific_waiters[key]).add, s.discard, self.make_fut, None
76 try:
77 while not pred(): f(F := h()); await I.wait_for(F, per_wait_timeout); g(F)
78 finally: g(F)
[docs]
79 async def wait_all(self, timeout=None): self.assert_locked(); await I.wait_for(I.gather(*frozenset().union(*self._specific_waiters.values()), return_exceptions=True), timeout)
[docs]
80 def assert_locked(self):
81 if not self.locked(): raise RuntimeError('asyncutils.locks.KeyedCondition: must acquire condition to notify')
[docs]
82 def notify(self, key, n=1, strict=False):
83 if n <= 0:
84 if strict: raise ValueError(f'{fullname(self)}: n must be positive')
85 return
86 self.assert_locked()
87 if (s := (S := self._specific_waiters).pop(key, None)) is None:
88 if strict: raise ValueError(f'{fullname(self)}: no parties waiting for key {key!r}')
89 return
90 p = s.pop
91 while s:
92 if not (F := p()).done(): F.set_result(None); n -= 1
93 if n == 0: break
94 if s: S[key] = s
95 if strict and n > 0: raise ValueError(f'{fullname(self)}: not enough parties to notify')
[docs]
96 def notify_all(self, key=None):
97 self.assert_locked()
98 l = 0
99 for k in self._specific_waiters if key is None else (key,):
100 if (s := self._specific_waiters.pop(k, None)) is None: break
101 p = s.pop
102 while s:
103 if not (F := p()).done(): F.set_result(None)
104 l += len(s); s.clear()
105 return l
[docs]
106@subscriptable
107class MultiCountDownLatch:
108 __slots__ = '_cond', '_counts'
109 def __init__(self, counts): self._cond, self._counts = KeyedCondition(), {k: v for k, v in counts.items() if v > 0}
110 def _count_down_lock_held(self, key, strict):
111 if (c := (d := self._counts).get(key)) is None:
112 if strict: raise KeyError(f'{fullname(self)}: cannot count down key {key!r} further')
113 return
114 if c > 1: d[key] = c-1
115 else: del d[key]
116 if c == 1: self._cond.notify_all(key)
[docs]
117 async def count_down(self, key, strict=False):
118 async with self._cond: self._count_down_lock_held(key, strict)
[docs]
119 async def count_down_all(self):
120 f = self._count_down_lock_held
121 async with self._cond:
122 for key in self._counts: f(key, True)
[docs]
123 async def wait(self, key, strict=False):
124 if key in self._counts:
125 async with (C := self._cond): await C.wait(key)
126 elif strict: raise KeyError(f'{fullname(self)}: no count for key {key!r}')
[docs]
127 async def wait_all(self, timeout=None):
128 async with (C := self._cond): await C.wait_all(timeout)
129 @property
130 def broken(self): return not self._counts
[docs]
131class RLock(A.LockWithOwnerMixin):
132 __slots__ = '__lock', '_count', '_owner'
133 def __init__(self, lock=None): self._count, self._owner, self.__lock = 0, None, lock or I.Lock()
[docs]
134 async def acquire(self):
135 async with self.__lock:
136 if self.is_owner: self._count += 1; return True
137 while True:
138 async with self.__lock:
139 if self._owner is None: self._owner, self._count = I.current_task(), 1; return True
[docs]
140 def _release(self):
141 if (c := self._count) <= 0: raise RuntimeError(f'{fullname(self)}: release called too many times')
142 if c == 1: self._owner = None
143 self._count = c-1
[docs]
144 def locked(self): return self._owner is not None
145 @property
146 def is_owner(self): return self._owner is I.current_task()
[docs]
147class PriorityLock(LoopMixinBase, A.LockWithOwnerMixin):
148 __slots__ = '_owner', '_tiebreak', '_waiters'
149 def __init__(self): super().__init__(); self._waiters, self._tiebreak, self._owner = [], 0, None
[docs]
150 async def acquire(self, priority=0, timeout=None):
151 heappush(self._waiters, (priority, self._tiebreak, F := self.make_fut())); self._tiebreak += 1
152 try:
153 if len(self._waiters) == 1 and self._owner is None: F.set_result(True)
154 await I.wait_for(F, timeout); self._owner = I.current_task(); return True
155 except TimeoutError: return False
156 finally:
157 if not F.done(): F.cancel()
[docs]
158 def _release(self, raise_=True):
159 self._owner, w = None, self._waiters
160 while w:
161 if not (F := heappop(w)[-1]).done(): return F.set_result(True)
162 if raise_: raise RuntimeError(f'{fullname(self)}: release called too many times')
[docs]
163 def locked(self): return self._owner is not None
164 @property
165 def is_owner(self): return self._owner is I.current_task()
[docs]
166class PriorityRLock(RLock):
167 __slots__ = ()
168 def __init__(self): super().__init__(PriorityLock())
169 @property
170 def owner(self): o = self.__lock._owner = self._owner; return o # ty: ignore[invalid-assignment]
171 @owner.setter
172 def owner(self, val, /): self._owner = self.__lock._owner = val # ty: ignore[invalid-assignment]
[docs]
173 async def acquire(self, priority=0, timeout=None):
174 if self.is_owner: self._count += 1; return True
175 if await self.__lock.acquire(priority, timeout): self._count = 1; return True # ty: ignore[too-many-positional-arguments]
176 return False
177del d