Source code for asyncutils.rwlocks

  1# ty: ignore[unresolved-attribute]
  2__lazy_modules__ = frozenset(('heapq',))
  3from asyncutils import getcontext, ignore_valerrs
  4from asyncutils._internal.helpers import fullname
  5from asyncutils._internal.submodules import rwlocks_all as __all__
  6from _collections import defaultdict, deque
  7from asyncio import Condition, Lock, current_task
  8from contextlib import asynccontextmanager
  9from heapq import heappush, heappop
 10import abc
 11def _rwlock_sub_new(cls, /): (_ := object.__new__(cls)).setup(); return _
 12class B:
 13    __slots__ = '__wrapped__', 'reader', 'reading', 'writer', 'writing'
 14    def __init__(self, l, f, /, _=__slots__[1:]):
 15        for s in _: setattr(self, s, getattr(l, s))
 16        self.__wrapped__ = f
 17    def __init_subclass__(cls, f=__import__('operator').methodcaller, /, *, m, **_):
 18        if cls.__dict__.get('__slots__', True): raise TypeError('__slots__ must be an empty tuple')
 19        async def g(self, *a, **k):
 20            async with c(self): return await self.__wrapped__(*a, **k)
 21        cls.__call__, c = g, f(m); super().__init_subclass__(**_) # ty: ignore[invalid-assignment]
 22    def __getattr__(self, n, /): return getattr(self.__wrapped__, n)
 23t = 'Locked', (B,), {'__slots__': ()}
 24def n(c, /, prefer_writers=None): return _rwlock_sub_new(c.__subclasses__()[getcontext().RWLOCK_DEFAULT_PREFER_WRITERS if prefer_writers is None else prefer_writers])
 25def s(c, _=n, /, **k):
 26    if not isinstance(c.__dict__.get('__slots__'), tuple): raise TypeError(f'subclass of {c.__name__} must define tuple as __slots__')
 27    if c.__new__ is _: c.__new__ = _rwlock_sub_new
 28    super(c).__init_subclass__(**k)
 29def d(c, /, _=(n, classmethod(s))): c.__new__, c.__init_subclass__ = _; return c
[docs] 30class CoercedMethod: 31 __slots__ = '__f', '__n', '__o' 32 def __init_subclass__(cls, /, **_): raise TypeError('cannot subclass asyncutils.rwlocks.CoercedMethod') 33 def __init__(self, f, /): self.__f = f
[docs] 34 def __set_name__(self, /, *_): self.__o, self.__n = _
[docs] 35 def __getattr__(self, n, /): return getattr(self.__f, n)
[docs] 36 def __get__(self, o, t=None, /): 37 if o is None: raise AttributeError(f'class {fullname(t)} has no attribute {self.__n!r}', name=self.__n) if t is self.__o else TypeError('incorrectly bound asyncutils.rwlocks.CoercedMethod') 38 if not (t is None or isinstance(o, t)): raise TypeError('asyncutils.rwlocks.CoercedMethod: __get__ called incorrectly') 39 return lambda *a, **k: self.__f(o, *a, **k)
[docs] 40@d 41class RWLock(metaclass=abc.ABCMeta): 42 reader, writer = (CoercedMethod(type(*t, m=m)) for m in ('reading', 'writing')); __slots__ = '_nr', '_wa' # ty: ignore[no-matching-overload]
[docs] 43 def is_reading(self): return self._nr > 0
[docs] 44 def is_writing(self): return self._wa
[docs] 45 def locked(self): return self.is_writing() or self.is_reading()
[docs] 46 @classmethod 47 def lock(cls, f, /): return cls().reader(f)
[docs] 48 @abc.abstractmethod 49 def setup(self): raise NotImplementedError
[docs] 50 @abc.abstractmethod 51 def reading(self): raise NotImplementedError
[docs] 52 @abc.abstractmethod 53 def writing(self): raise NotImplementedError
[docs] 54class ReadPreferredRWLock(RWLock): 55 __slots__ = '_cm',
[docs] 56 def setup(self): self._nr, self._cm, self._wa = 0, Lock(), Lock()
[docs] 57 @asynccontextmanager 58 async def reading(self): 59 w = self._wa 60 async with self._cm: 61 if (r := self._nr+1) == 1: await w.acquire() 62 self._nr = r 63 try: yield 64 finally: 65 async with self._cm: 66 if (r := self._nr-1) == 0: w.release() 67 self._nr = r
[docs] 68 @asynccontextmanager 69 async def writing(self): 70 async with self._wa: yield
[docs] 71 def is_reading(self): return self._nr > 0
[docs] 72 def is_writing(self): return not self.is_reading() and self.locked()
[docs] 73 def locked(self): return self._wa.locked()
[docs] 74class WritePreferredRWLock(RWLock): 75 __slots__ = '_cd', '_nw'
[docs] 76 def setup(self): self._cd, self._wa = Condition(), False; self._nr = self._nw = 0
[docs] 77 @asynccontextmanager 78 async def reading(self): 79 async with (C := self._cd): 80 w = C.wait 81 while self._wa or self._nw > 0: await w() 82 self._nr += 1 83 try: yield 84 finally: 85 async with C: 86 if (r := self._nr-1) == 0: C.notify_all() 87 self._nr = r
[docs] 88 @asynccontextmanager 89 async def writing(self): 90 async with (C := self._cd): 91 w = C.wait; self._nw += 1 92 while self._wa or self._nr > 0: await w() 93 self._nw -= 1; self._wa = True 94 try: yield 95 finally: 96 async with C: self._wa = False; C.notify_all()
[docs] 97class FairRWLock(RWLock): 98 __slots__ = '_cd', '_qd'
[docs] 99 def setup(self): self._cd, self._wa, self._nr, self._qd = Condition(), False, 0, deque()
[docs] 100 @asynccontextmanager 101 async def reading(self): 102 async with (C := self._cd): 103 (Q := self._qd).append(E := (False, C._get_loop().create_future())) 104 w = C.wait 105 try: 106 while True: 107 if Q[0] is not E or self._wa: await w() 108 else: self._nr += 1; Q.popleft(); E[-1].set_result(True); break 109 except: 110 with ignore_valerrs: Q.remove(E) 111 raise 112 try: yield 113 finally: 114 async with C: 115 if (r := self._nr-1) == 0: C.notify_all() 116 self._nr = r
[docs] 117 @asynccontextmanager 118 async def writing(self): 119 async with (C := self._cd): 120 w = C.wait 121 (Q := self._qd).append(E := (True, C._get_loop().create_future())) 122 try: 123 while True: 124 if Q[0] is not E or self._wa or self._nr > 0: await w() 125 else: self._wa = True; Q.popleft(); E[-1].set_result(True); break 126 except: 127 with ignore_valerrs: Q.remove(E) 128 raise 129 try: yield 130 finally: 131 async with C: self._wa = False; C.notify_all()
[docs] 132@d 133class PriorityRWLock(RWLock): 134 __slots__ = '_cd', '_ct', '_il', '_qd'
[docs] 135 def setup(self): self._cd, self._ct, self._il, self._wa, self._nr, self._qd = Condition(), 0, Lock(), False, 0, []
136 async def _push_item(self, priority, is_writer): 137 async with self._il: self._ct = (c := self._ct)+1 138 heappush(self._qd, E := (priority, *((is_writer, c) if isinstance(self, WritePreferredPriorityRWLock) else (c, is_writer)), self._cd._get_loop().create_future())); return E
[docs] 139 @asynccontextmanager 140 async def reading(self, priority=0): 141 async with (C := self._cd): 142 E, Q, w = await self._push_item(priority, False), self._qd, C.wait 143 try: 144 while True: 145 if Q[0] is not E or self._wa: await w() 146 else: self._nr += 1; heappop(Q); E[-1].set_result(True); break 147 except: 148 with ignore_valerrs: Q.remove(E) 149 raise 150 try: yield 151 finally: 152 async with C: 153 if (r := self._nr-1) == 0: C.notify_all() 154 self._nr = r
[docs] 155 @asynccontextmanager 156 async def writing(self, priority=0): 157 async with (C := self._cd): 158 E, Q, w = await self._push_item(priority, True), self._qd, C.wait 159 try: 160 while True: 161 if Q[0] is not E or self._wa or self._nr > 0: await w() 162 else: self._wa = True; heappop(Q); E[-1].set_result(True); break 163 except: 164 with ignore_valerrs: Q.remove(E) 165 raise 166 try: yield 167 finally: 168 async with C: self._wa = False; C.notify_all()
[docs] 169class FairPriorityRWLock(PriorityRWLock): __slots__ = ()
[docs] 170class WritePreferredPriorityRWLock(PriorityRWLock): __slots__ = ()
[docs] 171class AgingRWLock(PriorityRWLock): 172 __slots__ = '_rf', '_rt', '_wf', '_wt' 173 def __new__(cls, /, rf=None, wf=None): C, _ = getcontext(), _rwlock_sub_new(cls); _._rf, _._wf = C.AGING_RWLOCK_DEFAULT_READ_PRIORITY_FACTOR if rf is None else rf, C.AGING_RWLOCK_DEFAULT_WRITE_PRIORITY_FACTOR if wf is None else wf; return _
[docs] 174 def setup(self): super().setup(); self._rt, self._wt = defaultdict(int), defaultdict(int)
[docs] 175 @asynccontextmanager 176 async def reading(self, priority=None): 177 d[i] = p = (d := self._rt)[i := id(current_task())]+1 178 if priority is None: priority = self._rf*p 179 async with super().reading(priority): d.pop(i, None); yield
[docs] 180 @asynccontextmanager 181 async def writing(self, priority=None): 182 d[i] = p = (d := self._wt)[i := id(current_task())]+1 183 if priority is None: priority = self._wf*p 184 async with super().writing(priority): d.pop(i, None); yield
185 @property 186 def cur_unsuccessful_reads(self): return sum(self._rt.values()) 187 @property 188 def cur_unsuccessful_writes(self): return sum(self._wt.values())
189del B, t, d, n, s