Source code for asyncutils.locks

  1__lazy_modules__ = frozenset(('heapq', 'asyncio'))
  2import asyncutils as A, asyncio as I
  3from asyncutils._internal.helpers import LoopMixinBase, fullname, subscriptable
  4from asyncutils._internal.submodules import locks_all as __all__
  5from _collections import defaultdict, deque
  6from heapq import heappop, heappush
  7from time import monotonic
[docs] 8class DynamicBoundedSemaphore(I.BoundedSemaphore): 9 def __init__(self, value=None): super().__init__(A.getcontext().DYNAMIC_BOUNDED_SEMAPHORE_DEFAULT_VALUE if value is None else value); self._waiters = deque() # type: deque 10 @property 11 def bound(self): return self._bound_value 12 @bound.setter 13 def bound(self, value, /): 14 if value < 0: raise ValueError('asyncutils.locks.DynamicBoundedSemaphore: bound must be non-negative') 15 d, self._bound_value, f = value-self._bound_value, value, (W := self._waiters).popleft # ty: ignore[unresolved-attribute] 16 while d and W: 17 if not (w := f()).done(): w.set_result(None); d -= 1
18def d(m, /, _=__import__('functools').wraps): 19 async def w(self, *a, **k): 20 async with self._lock: self.update_tokens_lock_held(); m(self, *a, **k) 21 return _(m)(w)
[docs] 22class AdvancedRateLimit(LoopMixinBase, A.LockMixin): 23 __slots__ = '_lock', '_lu', '_unfair', '_waiters', 'capacity', 'rate', 'tokens' 24 def __init__(self, rate, capacity=None, fair=True): super().__init__(); self.rate, self._lock, self._waiters, self._unfair, self._lu = rate, I.Lock(), deque(), not fair, monotonic(); self.tokens = self.capacity = capacity or rate
[docs] 25 async def acquire(self, tokens=None, timeout=None): 26 async with self._lock: 27 self.update_tokens_lock_held() 28 if tokens > self.tokens: w = self._waiters; (w.appendleft if self._unfair else w.append)((A.getcontext().ADVANCED_RATE_LIMIT_DEFAULT_TOKENS if tokens is None else tokens, F := self.loop.create_future())) 29 else: self.tokens -= tokens; return True 30 try: await I.wait_for(F, timeout); return True 31 except TimeoutError: return False
[docs] 32 @d 33 def release(self, tokens=None): self.tokens = min(self.tokens+(A.getcontext().ADVANCED_RATE_LIMIT_DEFAULT_TOKENS if tokens is None else tokens), self.capacity)
[docs] 34 @d 35 def set_rate(self, new): self.rate, self.capacity = new, max(self.capacity, new)
[docs] 36 def locked(self): return bool(self._waiters)
[docs] 37 def update_tokens_lock_held(self): 38 if not (w := self._waiters): return 39 e, p, self._lu = (n := monotonic())-self._lu, w.popleft, n; T = min(self.capacity, self.tokens+e*self.rate) 40 while (t := p())[0] <= T and w: 41 t, f = t; T -= t 42 if not f.done(): f.set_result(None) 43 self.tokens = T; w.appendleft(t)
[docs] 44class PrioritySemaphore(LoopMixinBase, A.LockMixin): 45 __slots__ = '_tiebreak', '_value', '_waiters' 46 def __init__(self, value=None): self._value, self._tiebreak, self._waiters = A.getcontext().PRIORITY_SEMAPHORE_DEFAULT_VALUE if value is None else value, 0, []
[docs] 47 async def acquire(self, priority=0): 48 self._value -= 1; self._tiebreak += 1; w = self._waiters 49 while self._value < 0: heappush(w, (priority, self._tiebreak, F := self.make_fut())); await F 50 return True
[docs] 51 def release(self, strict=True): 52 if w := self._waiters: heappop(w)[-1].set_result(None) 53 elif strict: raise RuntimeError('asyncutils.locks.PrioritySemaphore: release called too many times') 54 self._value += 1
[docs] 55 def locked(self): return self._value < 0
[docs] 56 def reset(self): 57 for *_, e in self._waiters: e.set_result(None) 58 self._value, self._tiebreak = 1, 0; self._waiters.clear()
[docs] 59@subscriptable 60class KeyedCondition(LoopMixinBase, A.LockMixin): 61 __slots__ = '__lock', '_specific_waiters' 62 def __init__(self, lock=None): super().__init__(); self.__lock, self._specific_waiters = lock or I.Lock(), defaultdict(set)
[docs] 63 async def acquire(self): 64 with A.ignore_noncritical: 65 if await self.__lock.acquire() != False: return True # noqa: E712 66 return False
[docs] 67 async def release(self): 68 if I.iscoroutine(r := self.__lock.release()): await r
[docs] 69 def locked(self): return self.__lock.locked()
[docs] 70 async def wait(self, key, timeout=None): 71 self.assert_locked(); (s := self._specific_waiters[key]).add(F := self.make_fut()) 72 try: await I.wait_for(F, timeout) 73 finally: s.discard(F)
[docs] 74 async def wait_for(self, key, pred, per_wait_timeout=None): 75 self.assert_locked(); f, g, h, F = (s := self._specific_waiters[key]).add, s.discard, self.make_fut, None 76 try: 77 while not pred(): f(F := h()); await I.wait_for(F, per_wait_timeout); g(F) 78 finally: g(F)
[docs] 79 async def wait_all(self, timeout=None): self.assert_locked(); await I.wait_for(I.gather(*frozenset().union(*self._specific_waiters.values()), return_exceptions=True), timeout)
[docs] 80 def assert_locked(self): 81 if not self.locked(): raise RuntimeError('asyncutils.locks.KeyedCondition: must acquire condition to notify')
[docs] 82 def notify(self, key, n=1, strict=False): 83 if n <= 0: 84 if strict: raise ValueError(f'{fullname(self)}: n must be positive') 85 return 86 self.assert_locked() 87 if (s := (S := self._specific_waiters).pop(key, None)) is None: 88 if strict: raise ValueError(f'{fullname(self)}: no parties waiting for key {key!r}') 89 return 90 p = s.pop 91 while s: 92 if not (F := p()).done(): F.set_result(None); n -= 1 93 if n == 0: break 94 if s: S[key] = s 95 if strict and n > 0: raise ValueError(f'{fullname(self)}: not enough parties to notify')
[docs] 96 def notify_all(self, key=None): 97 self.assert_locked() 98 l = 0 99 for k in self._specific_waiters if key is None else (key,): 100 if (s := self._specific_waiters.pop(k, None)) is None: break 101 p = s.pop 102 while s: 103 if not (F := p()).done(): F.set_result(None) 104 l += len(s); s.clear() 105 return l
[docs] 106@subscriptable 107class MultiCountDownLatch: 108 __slots__ = '_cond', '_counts' 109 def __init__(self, counts): self._cond, self._counts = KeyedCondition(), {k: v for k, v in counts.items() if v > 0} 110 def _count_down_lock_held(self, key, strict): 111 if (c := (d := self._counts).get(key)) is None: 112 if strict: raise KeyError(f'{fullname(self)}: cannot count down key {key!r} further') 113 return 114 if c > 1: d[key] = c-1 115 else: del d[key] 116 if c == 1: self._cond.notify_all(key)
[docs] 117 async def count_down(self, key, strict=False): 118 async with self._cond: self._count_down_lock_held(key, strict)
[docs] 119 async def count_down_all(self): 120 f = self._count_down_lock_held 121 async with self._cond: 122 for key in self._counts: f(key, True)
[docs] 123 async def wait(self, key, strict=False): 124 if key in self._counts: 125 async with (C := self._cond): await C.wait(key) 126 elif strict: raise KeyError(f'{fullname(self)}: no count for key {key!r}')
[docs] 127 async def wait_all(self, timeout=None): 128 async with (C := self._cond): await C.wait_all(timeout)
129 @property 130 def broken(self): return not self._counts
[docs] 131class RLock(A.LockWithOwnerMixin): 132 __slots__ = '__lock', '_count', '_owner' 133 def __init__(self, lock=None): self._count, self._owner, self.__lock = 0, None, lock or I.Lock()
[docs] 134 async def acquire(self): 135 async with self.__lock: 136 if self.is_owner: self._count += 1; return True 137 while True: 138 async with self.__lock: 139 if self._owner is None: self._owner, self._count = I.current_task(), 1; return True
[docs] 140 def _release(self): 141 if (c := self._count) <= 0: raise RuntimeError(f'{fullname(self)}: release called too many times') 142 if c == 1: self._owner = None 143 self._count = c-1
[docs] 144 def locked(self): return self._owner is not None
145 @property 146 def is_owner(self): return self._owner is I.current_task()
[docs] 147class PriorityLock(LoopMixinBase, A.LockWithOwnerMixin): 148 __slots__ = '_owner', '_tiebreak', '_waiters' 149 def __init__(self): super().__init__(); self._waiters, self._tiebreak, self._owner = [], 0, None
[docs] 150 async def acquire(self, priority=0, timeout=None): 151 heappush(self._waiters, (priority, self._tiebreak, F := self.make_fut())); self._tiebreak += 1 152 try: 153 if len(self._waiters) == 1 and self._owner is None: F.set_result(True) 154 await I.wait_for(F, timeout); self._owner = I.current_task(); return True 155 except TimeoutError: return False 156 finally: 157 if not F.done(): F.cancel()
[docs] 158 def _release(self, raise_=True): 159 self._owner, w = None, self._waiters 160 while w: 161 if not (F := heappop(w)[-1]).done(): return F.set_result(True) 162 if raise_: raise RuntimeError(f'{fullname(self)}: release called too many times')
[docs] 163 def locked(self): return self._owner is not None
164 @property 165 def is_owner(self): return self._owner is I.current_task()
[docs] 166class PriorityRLock(RLock): 167 __slots__ = () 168 def __init__(self): super().__init__(PriorityLock()) 169 @property 170 def owner(self): o = self.__lock._owner = self._owner; return o # ty: ignore[invalid-assignment] 171 @owner.setter 172 def owner(self, val, /): self._owner = self.__lock._owner = val # ty: ignore[invalid-assignment]
[docs] 173 async def acquire(self, priority=0, timeout=None): 174 if self.is_owner: self._count += 1; return True 175 if await self.__lock.acquire(priority, timeout): self._count = 1; return True # ty: ignore[too-many-positional-arguments] 176 return False
177del d