Source code for asyncutils.rwlocks

  1# ty: ignore[unresolved-attribute]
  2__lazy_modules__ = frozenset(('heapq',))
  3from asyncutils import getcontext, ignore_valerrs
  4from asyncutils._internal.helpers import fullname
  5from asyncutils._internal.submodules import rwlocks_all as __all__
  6from _collections import defaultdict, deque
  7from asyncio import Condition, Lock, current_task
  8from contextlib import asynccontextmanager
  9from heapq import heappush, heappop
 10import abc
 11def _rwlock_sub_new(cls, /): (_ := object.__new__(cls)).setup(); return _
 12class B:
 13    __slots__ = '__wrapped__', 'reader', 'reading', 'writer', 'writing'
 14    def __init__(self, l, f, /, _=__slots__[1:]):
 15        for s in _: setattr(self, s, getattr(l, s))
 16        self.__wrapped__ = f
 17    def __init_subclass__(cls, f=__import__('operator').methodcaller, /, *, m, **_):
 18        if cls.__dict__.get('__slots__', True): raise TypeError('__slots__ must be an empty tuple')
 19        async def g(self, *a, **k):
 20            async with c(self): return await self.__wrapped__(*a, **k)
 21        cls.__call__, c = g, f(m); super().__init_subclass__(**_) # ty: ignore[invalid-assignment]
 22    def __getattr__(self, n, /): return getattr(self.__wrapped__, n)
 23t = 'Locked', (B,), {'__slots__': ()}
 24def n(c, /, prefer_writers=None): return _rwlock_sub_new(c.__subclasses__()[getcontext().RWLOCK_DEFAULT_PREFER_WRITERS if prefer_writers is None else prefer_writers])
 25def s(c, n=n, /, **_):
 26    if not isinstance(c.__dict__.get('__slots__'), tuple): raise TypeError(f'subclass of {c.__name__} must define tuple as __slots__')
 27    if c.__new__ is n: c.__new__ = _rwlock_sub_new
 28    super(c).__init_subclass__(**_)
 29def d(c, /, _=(n, classmethod(s))): c.__new__, c.__init_subclass__ = _; return c
[docs] 30class CoercedMethod: 31 __slots__ = '__f', '__n', '__o' 32 def __init_subclass__(cls, /, **_): raise TypeError('cannot subclass asyncutils.rwlocks.CoercedMethod') 33 def __init__(self, f, /): self.__f = f
[docs] 34 def __set_name__(self, /, *_): self.__o, self.__n = _
[docs] 35 def __getattr__(self, n, /): return getattr(self.__f, n)
[docs] 36 def __get__(self, o, t=None, /): 37 if o is None: raise AttributeError(f'class {fullname(t)} has no attribute {self.__n!r}', name=self.__n) if t is self.__o else TypeError('incorrectly bound asyncutils.rwlocks.CoercedMethod') 38 if not (t is None or isinstance(o, t)): raise TypeError('asyncutils.rwlocks.CoercedMethod: __get__ called incorrectly') 39 return lambda *a, **k: self.__f(o, *a, **k)
[docs] 40@d 41class RWLock(metaclass=abc.ABCMeta): 42 reader, writer = (CoercedMethod(type(*t, m=m)) for m in ('reading', 'writing')); __slots__ = '_wa', # ty: ignore[no-matching-overload]
[docs] 43 def locked(self): return self._wa
[docs] 44 @classmethod 45 def lock(cls, f, /): return cls().reader(f)
[docs] 46 @abc.abstractmethod 47 def setup(self): raise NotImplementedError
[docs] 48 @abc.abstractmethod 49 def reading(self): raise NotImplementedError
[docs] 50 @abc.abstractmethod 51 def writing(self): raise NotImplementedError
[docs] 52class ReadPreferredRWLock(RWLock): 53 __slots__ = '_cm', '_nr'
[docs] 54 def setup(self): self._nr, self._cm, self._wa = 0, Lock(), Lock()
[docs] 55 @asynccontextmanager 56 async def reading(self): 57 async with self._cm: 58 if (r := self._nr+1) == 1: await self._wa.acquire() 59 self._nr = r 60 try: yield 61 finally: 62 async with self._cm: 63 if (r := self._nr-1) == 0: self._wa.release() 64 self._nr = r
[docs] 65 @asynccontextmanager 66 async def writing(self): 67 async with self._wa: yield
[docs] 68 def locked(self): return self._wa.locked()
[docs] 69class WritePreferredRWLock(RWLock): 70 __slots__ = '_cond', '_nr', '_nw'
[docs] 71 def setup(self): self._cond, self._wa = Condition(), False; self._nr = self._nw = 0
[docs] 72 @asynccontextmanager 73 async def reading(self): 74 async with (C := self._cond): 75 w = C.wait 76 while self._wa or self._nw > 0: await w() 77 self._nr += 1 78 try: yield 79 finally: 80 async with C: 81 if (r := self._nr-1) == 0: C.notify_all() 82 self._nr = r
[docs] 83 @asynccontextmanager 84 async def writing(self): 85 async with (C := self._cond): 86 w = C.wait; self._nw += 1 87 while self._wa or self._nr > 0: await w() 88 self._nw -= 1; self._wa = True 89 try: yield 90 finally: 91 async with C: self._wa = False; C.notify_all()
[docs] 92class FairRWLock(RWLock): 93 __slots__ = '_cond', '_nr', '_qd'
[docs] 94 def setup(self): self._cond, self._wa, self._nr, self._qd = Condition(), False, 0, deque()
[docs] 95 @asynccontextmanager 96 async def reading(self): 97 async with (C := self._cond): 98 (Q := self._qd).append(E := (False, C._get_loop().create_future())) 99 w = C.wait 100 try: 101 while True: 102 if Q[0] is not E or self._wa: await w() 103 else: self._nr += 1; Q.popleft(); E[-1].set_result(True); break 104 except: 105 with ignore_valerrs: Q.remove(E) 106 raise 107 try: yield 108 finally: 109 async with C: 110 if (r := self._nr-1) == 0: C.notify_all() 111 self._nr = r
[docs] 112 @asynccontextmanager 113 async def writing(self): 114 async with (C := self._cond): 115 w = C.wait 116 (Q := self._qd).append(E := (True, C._get_loop().create_future())) 117 try: 118 while True: 119 if Q[0] is not E or self._wa or self._nr > 0: await w() 120 else: self._wa = True; Q.popleft(); E[-1].set_result(True); break 121 except: 122 with ignore_valerrs: Q.remove(E) 123 raise 124 try: yield 125 finally: 126 async with C: self._wa = False; C.notify_all()
[docs] 127@d 128class PriorityRWLock(RWLock): 129 __slots__ = '_cnt', '_cond', '_il', '_nr', '_qd'
[docs] 130 def setup(self): self._cond, self._cnt, self._il, self._wa, self._nr, self._qd = Condition(), 0, Lock(), False, 0, []
131 async def _push_item(self, priority, is_writer): 132 async with self._il: self._cnt = (c := self._cnt)+1 133 heappush(self._qd, E := (priority, *((is_writer, c) if isinstance(self, WritePreferredPriorityRWLock) else (c, is_writer)), self._cond._get_loop().create_future())); return E
[docs] 134 @asynccontextmanager 135 async def reading(self, priority=0): 136 async with (C := self._cond): 137 E, Q, w = await self._push_item(priority, False), self._qd, C.wait 138 try: 139 while True: 140 if Q[0] is not E or self._wa: await w() 141 else: self._nr += 1; heappop(Q); E[-1].set_result(True); break 142 except: 143 with ignore_valerrs: Q.remove(E) 144 raise 145 try: yield 146 finally: 147 async with C: 148 if (r := self._nr-1) == 0: C.notify_all() 149 self._nr = r
[docs] 150 @asynccontextmanager 151 async def writing(self, priority=0): 152 async with (C := self._cond): 153 E, Q, w = await self._push_item(priority, True), self._qd, C.wait 154 try: 155 while True: 156 if Q[0] is not E or self._wa or self._nr > 0: await w() 157 else: self._wa = True; heappop(Q); E[-1].set_result(True); break 158 except: 159 with ignore_valerrs: Q.remove(E) 160 raise 161 try: yield 162 finally: 163 async with C: self._wa = False; C.notify_all()
[docs] 164class FairPriorityRWLock(PriorityRWLock): __slots__ = ()
[docs] 165class WritePreferredPriorityRWLock(PriorityRWLock): __slots__ = ()
[docs] 166class AgingRWLock(PriorityRWLock): 167 __slots__ = '_rf', '_rt', '_wf', '_wt' 168 def __new__(cls, /, rf=None, wf=None): C, _ = getcontext(), _rwlock_sub_new(cls); _._rf, _._wf = C.AGING_RWLOCK_DEFAULT_READ_PRIORITY_FACTOR if rf is None else rf, C.AGING_RWLOCK_DEFAULT_WRITE_PRIORITY_FACTOR if wf is None else wf; return _
[docs] 169 def setup(self): super().setup(); self._rt, self._wt = defaultdict(int), defaultdict(int)
[docs] 170 @asynccontextmanager 171 async def reading(self, priority=None): 172 d[i] = p = (d := self._rt)[i := id(current_task())]+1 173 if priority is None: priority = self._rf*p 174 async with super().reading(priority): d.pop(i, None); yield
[docs] 175 @asynccontextmanager 176 async def writing(self, priority=None): 177 d[i] = p = (d := self._wt)[i := id(current_task())]+1 178 if priority is None: priority = self._wf*p 179 async with super().writing(priority): d.pop(i, None); yield
180 @property 181 def cur_unsuccessful_reads(self): return sum(self._rt.values()) 182 @property 183 def cur_unsuccessful_writes(self): return sum(self._wt.values())
184del B, t, d, n, s