1# ty: ignore[unresolved-attribute]
2__lazy_modules__ = frozenset(('heapq',))
3from asyncutils import getcontext, ignore_valerrs
4from asyncutils._internal.helpers import fullname
5from asyncutils._internal.submodules import rwlocks_all as __all__
6from _collections import defaultdict, deque
7from asyncio import Condition, Lock, current_task
8from contextlib import asynccontextmanager
9from heapq import heappush, heappop
10import abc
11def _rwlock_sub_new(cls, /): (_ := object.__new__(cls)).setup(); return _
12class B:
13 __slots__ = '__wrapped__', 'reader', 'reading', 'writer', 'writing'
14 def __init__(self, l, f, /, _=__slots__[1:]):
15 for s in _: setattr(self, s, getattr(l, s))
16 self.__wrapped__ = f
17 def __init_subclass__(cls, f=__import__('operator').methodcaller, /, *, m, **_):
18 if cls.__dict__.get('__slots__', True): raise TypeError('__slots__ must be an empty tuple')
19 async def g(self, *a, **k):
20 async with c(self): return await self.__wrapped__(*a, **k)
21 cls.__call__, c = g, f(m); super().__init_subclass__(**_) # ty: ignore[invalid-assignment]
22 def __getattr__(self, n, /): return getattr(self.__wrapped__, n)
23t = 'Locked', (B,), {'__slots__': ()}
24def n(c, /, prefer_writers=None): return _rwlock_sub_new(c.__subclasses__()[getcontext().RWLOCK_DEFAULT_PREFER_WRITERS if prefer_writers is None else prefer_writers])
25def s(c, _=n, /, **k):
26 if not isinstance(c.__dict__.get('__slots__'), tuple): raise TypeError(f'subclass of {c.__name__} must define tuple as __slots__')
27 if c.__new__ is _: c.__new__ = _rwlock_sub_new
28 super(c).__init_subclass__(**k)
29def d(c, /, _=(n, classmethod(s))): c.__new__, c.__init_subclass__ = _; return c
[docs]
30class CoercedMethod:
31 __slots__ = '__f', '__n', '__o'
32 def __init_subclass__(cls, /, **_): raise TypeError('cannot subclass asyncutils.rwlocks.CoercedMethod')
33 def __init__(self, f, /): self.__f = f
[docs]
34 def __set_name__(self, /, *_): self.__o, self.__n = _
[docs]
35 def __getattr__(self, n, /): return getattr(self.__f, n)
[docs]
36 def __get__(self, o, t=None, /):
37 if o is None: raise AttributeError(f'class {fullname(t)} has no attribute {self.__n!r}', name=self.__n) if t is self.__o else TypeError('incorrectly bound asyncutils.rwlocks.CoercedMethod')
38 if not (t is None or isinstance(o, t)): raise TypeError('asyncutils.rwlocks.CoercedMethod: __get__ called incorrectly')
39 return lambda *a, **k: self.__f(o, *a, **k)
[docs]
40@d
41class RWLock(metaclass=abc.ABCMeta):
42 reader, writer = (CoercedMethod(type(*t, m=m)) for m in ('reading', 'writing')); __slots__ = '_nr', '_wa' # ty: ignore[no-matching-overload]
[docs]
43 def is_reading(self): return self._nr > 0
[docs]
44 def is_writing(self): return self._wa
[docs]
45 def locked(self): return self.is_writing() or self.is_reading()
[docs]
46 @classmethod
47 def lock(cls, f, /): return cls().reader(f)
[docs]
48 @abc.abstractmethod
49 def setup(self): raise NotImplementedError
[docs]
50 @abc.abstractmethod
51 def reading(self): raise NotImplementedError
[docs]
52 @abc.abstractmethod
53 def writing(self): raise NotImplementedError
[docs]
54class ReadPreferredRWLock(RWLock):
55 __slots__ = '_cm',
[docs]
56 def setup(self): self._nr, self._cm, self._wa = 0, Lock(), Lock()
[docs]
57 @asynccontextmanager
58 async def reading(self):
59 w = self._wa
60 async with self._cm:
61 if (r := self._nr+1) == 1: await w.acquire()
62 self._nr = r
63 try: yield
64 finally:
65 async with self._cm:
66 if (r := self._nr-1) == 0: w.release()
67 self._nr = r
[docs]
68 @asynccontextmanager
69 async def writing(self):
70 async with self._wa: yield
[docs]
71 def is_reading(self): return self._nr > 0
[docs]
72 def is_writing(self): return not self.is_reading() and self.locked()
[docs]
73 def locked(self): return self._wa.locked()
[docs]
74class WritePreferredRWLock(RWLock):
75 __slots__ = '_cd', '_nw'
[docs]
76 def setup(self): self._cd, self._wa = Condition(), False; self._nr = self._nw = 0
[docs]
77 @asynccontextmanager
78 async def reading(self):
79 async with (C := self._cd):
80 w = C.wait
81 while self._wa or self._nw > 0: await w()
82 self._nr += 1
83 try: yield
84 finally:
85 async with C:
86 if (r := self._nr-1) == 0: C.notify_all()
87 self._nr = r
[docs]
88 @asynccontextmanager
89 async def writing(self):
90 async with (C := self._cd):
91 w = C.wait; self._nw += 1
92 while self._wa or self._nr > 0: await w()
93 self._nw -= 1; self._wa = True
94 try: yield
95 finally:
96 async with C: self._wa = False; C.notify_all()
[docs]
97class FairRWLock(RWLock):
98 __slots__ = '_cd', '_qd'
[docs]
99 def setup(self): self._cd, self._wa, self._nr, self._qd = Condition(), False, 0, deque()
[docs]
100 @asynccontextmanager
101 async def reading(self):
102 async with (C := self._cd):
103 (Q := self._qd).append(E := (False, C._get_loop().create_future()))
104 w = C.wait
105 try:
106 while True:
107 if Q[0] is not E or self._wa: await w()
108 else: self._nr += 1; Q.popleft(); E[-1].set_result(True); break
109 except:
110 with ignore_valerrs: Q.remove(E)
111 raise
112 try: yield
113 finally:
114 async with C:
115 if (r := self._nr-1) == 0: C.notify_all()
116 self._nr = r
[docs]
117 @asynccontextmanager
118 async def writing(self):
119 async with (C := self._cd):
120 w = C.wait
121 (Q := self._qd).append(E := (True, C._get_loop().create_future()))
122 try:
123 while True:
124 if Q[0] is not E or self._wa or self._nr > 0: await w()
125 else: self._wa = True; Q.popleft(); E[-1].set_result(True); break
126 except:
127 with ignore_valerrs: Q.remove(E)
128 raise
129 try: yield
130 finally:
131 async with C: self._wa = False; C.notify_all()
[docs]
132@d
133class PriorityRWLock(RWLock):
134 __slots__ = '_cd', '_ct', '_il', '_qd'
[docs]
135 def setup(self): self._cd, self._ct, self._il, self._wa, self._nr, self._qd = Condition(), 0, Lock(), False, 0, []
136 async def _push_item(self, priority, is_writer):
137 async with self._il: self._ct = (c := self._ct)+1
138 heappush(self._qd, E := (priority, *((is_writer, c) if isinstance(self, WritePreferredPriorityRWLock) else (c, is_writer)), self._cd._get_loop().create_future())); return E
[docs]
139 @asynccontextmanager
140 async def reading(self, priority=0):
141 async with (C := self._cd):
142 E, Q, w = await self._push_item(priority, False), self._qd, C.wait
143 try:
144 while True:
145 if Q[0] is not E or self._wa: await w()
146 else: self._nr += 1; heappop(Q); E[-1].set_result(True); break
147 except:
148 with ignore_valerrs: Q.remove(E)
149 raise
150 try: yield
151 finally:
152 async with C:
153 if (r := self._nr-1) == 0: C.notify_all()
154 self._nr = r
[docs]
155 @asynccontextmanager
156 async def writing(self, priority=0):
157 async with (C := self._cd):
158 E, Q, w = await self._push_item(priority, True), self._qd, C.wait
159 try:
160 while True:
161 if Q[0] is not E or self._wa or self._nr > 0: await w()
162 else: self._wa = True; heappop(Q); E[-1].set_result(True); break
163 except:
164 with ignore_valerrs: Q.remove(E)
165 raise
166 try: yield
167 finally:
168 async with C: self._wa = False; C.notify_all()
[docs]
169class FairPriorityRWLock(PriorityRWLock): __slots__ = ()
[docs]
170class WritePreferredPriorityRWLock(PriorityRWLock): __slots__ = ()
[docs]
171class AgingRWLock(PriorityRWLock):
172 __slots__ = '_rf', '_rt', '_wf', '_wt'
173 def __new__(cls, /, rf=None, wf=None): C, _ = getcontext(), _rwlock_sub_new(cls); _._rf, _._wf = C.AGING_RWLOCK_DEFAULT_READ_PRIORITY_FACTOR if rf is None else rf, C.AGING_RWLOCK_DEFAULT_WRITE_PRIORITY_FACTOR if wf is None else wf; return _
[docs]
174 def setup(self): super().setup(); self._rt, self._wt = defaultdict(int), defaultdict(int)
[docs]
175 @asynccontextmanager
176 async def reading(self, priority=None):
177 d[i] = p = (d := self._rt)[i := id(current_task())]+1
178 if priority is None: priority = self._rf*p
179 async with super().reading(priority): d.pop(i, None); yield
[docs]
180 @asynccontextmanager
181 async def writing(self, priority=None):
182 d[i] = p = (d := self._wt)[i := id(current_task())]+1
183 if priority is None: priority = self._wf*p
184 async with super().writing(priority): d.pop(i, None); yield
185 @property
186 def cur_unsuccessful_reads(self): return sum(self._rt.values())
187 @property
188 def cur_unsuccessful_writes(self): return sum(self._wt.values())
189del B, t, d, n, s