1# ty: ignore[unresolved-attribute]
2__lazy_modules__ = frozenset(('heapq',))
3from asyncutils import getcontext, ignore_valerrs
4from asyncutils._internal.helpers import fullname
5from asyncutils._internal.submodules import rwlocks_all as __all__
6from _collections import defaultdict, deque
7from asyncio import Condition, Lock, current_task
8from contextlib import asynccontextmanager
9from heapq import heappush, heappop
10import abc
11def _rwlock_sub_new(cls, /): (_ := object.__new__(cls)).setup(); return _
12class B:
13 __slots__ = '__wrapped__', 'reader', 'reading', 'writer', 'writing'
14 def __init__(self, l, f, /, _=__slots__[1:]):
15 for s in _: setattr(self, s, getattr(l, s))
16 self.__wrapped__ = f
17 def __init_subclass__(cls, f=__import__('operator').methodcaller, /, *, m, **_):
18 if cls.__dict__.get('__slots__', True): raise TypeError('__slots__ must be an empty tuple')
19 async def g(self, *a, **k):
20 async with c(self): return await self.__wrapped__(*a, **k)
21 cls.__call__, c = g, f(m); super().__init_subclass__(**_) # ty: ignore[invalid-assignment]
22 def __getattr__(self, n, /): return getattr(self.__wrapped__, n)
23t = 'Locked', (B,), {'__slots__': ()}
24def n(c, /, prefer_writers=None): return _rwlock_sub_new(c.__subclasses__()[getcontext().RWLOCK_DEFAULT_PREFER_WRITERS if prefer_writers is None else prefer_writers])
25def s(c, n=n, /, **_):
26 if not isinstance(c.__dict__.get('__slots__'), tuple): raise TypeError(f'subclass of {c.__name__} must define tuple as __slots__')
27 if c.__new__ is n: c.__new__ = _rwlock_sub_new
28 super(c).__init_subclass__(**_)
29def d(c, /, _=(n, classmethod(s))): c.__new__, c.__init_subclass__ = _; return c
[docs]
30class CoercedMethod:
31 __slots__ = '__f', '__n', '__o'
32 def __init_subclass__(cls, /, **_): raise TypeError('cannot subclass asyncutils.rwlocks.CoercedMethod')
33 def __init__(self, f, /): self.__f = f
[docs]
34 def __set_name__(self, /, *_): self.__o, self.__n = _
[docs]
35 def __getattr__(self, n, /): return getattr(self.__f, n)
[docs]
36 def __get__(self, o, t=None, /):
37 if o is None: raise AttributeError(f'class {fullname(t)} has no attribute {self.__n!r}', name=self.__n) if t is self.__o else TypeError('incorrectly bound asyncutils.rwlocks.CoercedMethod')
38 if not (t is None or isinstance(o, t)): raise TypeError('asyncutils.rwlocks.CoercedMethod: __get__ called incorrectly')
39 return lambda *a, **k: self.__f(o, *a, **k)
[docs]
40@d
41class RWLock(metaclass=abc.ABCMeta):
42 reader, writer = (CoercedMethod(type(*t, m=m)) for m in ('reading', 'writing')); __slots__ = '_wa', # ty: ignore[no-matching-overload]
[docs]
43 def locked(self): return self._wa
[docs]
44 @classmethod
45 def lock(cls, f, /): return cls().reader(f)
[docs]
46 @abc.abstractmethod
47 def setup(self): raise NotImplementedError
[docs]
48 @abc.abstractmethod
49 def reading(self): raise NotImplementedError
[docs]
50 @abc.abstractmethod
51 def writing(self): raise NotImplementedError
[docs]
52class ReadPreferredRWLock(RWLock):
53 __slots__ = '_cm', '_nr'
[docs]
54 def setup(self): self._nr, self._cm, self._wa = 0, Lock(), Lock()
[docs]
55 @asynccontextmanager
56 async def reading(self):
57 async with self._cm:
58 if (r := self._nr+1) == 1: await self._wa.acquire()
59 self._nr = r
60 try: yield
61 finally:
62 async with self._cm:
63 if (r := self._nr-1) == 0: self._wa.release()
64 self._nr = r
[docs]
65 @asynccontextmanager
66 async def writing(self):
67 async with self._wa: yield
[docs]
68 def locked(self): return self._wa.locked()
[docs]
69class WritePreferredRWLock(RWLock):
70 __slots__ = '_cond', '_nr', '_nw'
[docs]
71 def setup(self): self._cond, self._wa = Condition(), False; self._nr = self._nw = 0
[docs]
72 @asynccontextmanager
73 async def reading(self):
74 async with (C := self._cond):
75 w = C.wait
76 while self._wa or self._nw > 0: await w()
77 self._nr += 1
78 try: yield
79 finally:
80 async with C:
81 if (r := self._nr-1) == 0: C.notify_all()
82 self._nr = r
[docs]
83 @asynccontextmanager
84 async def writing(self):
85 async with (C := self._cond):
86 w = C.wait; self._nw += 1
87 while self._wa or self._nr > 0: await w()
88 self._nw -= 1; self._wa = True
89 try: yield
90 finally:
91 async with C: self._wa = False; C.notify_all()
[docs]
92class FairRWLock(RWLock):
93 __slots__ = '_cond', '_nr', '_qd'
[docs]
94 def setup(self): self._cond, self._wa, self._nr, self._qd = Condition(), False, 0, deque()
[docs]
95 @asynccontextmanager
96 async def reading(self):
97 async with (C := self._cond):
98 (Q := self._qd).append(E := (False, C._get_loop().create_future()))
99 w = C.wait
100 try:
101 while True:
102 if Q[0] is not E or self._wa: await w()
103 else: self._nr += 1; Q.popleft(); E[-1].set_result(True); break
104 except:
105 with ignore_valerrs: Q.remove(E)
106 raise
107 try: yield
108 finally:
109 async with C:
110 if (r := self._nr-1) == 0: C.notify_all()
111 self._nr = r
[docs]
112 @asynccontextmanager
113 async def writing(self):
114 async with (C := self._cond):
115 w = C.wait
116 (Q := self._qd).append(E := (True, C._get_loop().create_future()))
117 try:
118 while True:
119 if Q[0] is not E or self._wa or self._nr > 0: await w()
120 else: self._wa = True; Q.popleft(); E[-1].set_result(True); break
121 except:
122 with ignore_valerrs: Q.remove(E)
123 raise
124 try: yield
125 finally:
126 async with C: self._wa = False; C.notify_all()
[docs]
127@d
128class PriorityRWLock(RWLock):
129 __slots__ = '_cnt', '_cond', '_il', '_nr', '_qd'
[docs]
130 def setup(self): self._cond, self._cnt, self._il, self._wa, self._nr, self._qd = Condition(), 0, Lock(), False, 0, []
131 async def _push_item(self, priority, is_writer):
132 async with self._il: self._cnt = (c := self._cnt)+1
133 heappush(self._qd, E := (priority, *((is_writer, c) if isinstance(self, WritePreferredPriorityRWLock) else (c, is_writer)), self._cond._get_loop().create_future())); return E
[docs]
134 @asynccontextmanager
135 async def reading(self, priority=0):
136 async with (C := self._cond):
137 E, Q, w = await self._push_item(priority, False), self._qd, C.wait
138 try:
139 while True:
140 if Q[0] is not E or self._wa: await w()
141 else: self._nr += 1; heappop(Q); E[-1].set_result(True); break
142 except:
143 with ignore_valerrs: Q.remove(E)
144 raise
145 try: yield
146 finally:
147 async with C:
148 if (r := self._nr-1) == 0: C.notify_all()
149 self._nr = r
[docs]
150 @asynccontextmanager
151 async def writing(self, priority=0):
152 async with (C := self._cond):
153 E, Q, w = await self._push_item(priority, True), self._qd, C.wait
154 try:
155 while True:
156 if Q[0] is not E or self._wa or self._nr > 0: await w()
157 else: self._wa = True; heappop(Q); E[-1].set_result(True); break
158 except:
159 with ignore_valerrs: Q.remove(E)
160 raise
161 try: yield
162 finally:
163 async with C: self._wa = False; C.notify_all()
[docs]
164class FairPriorityRWLock(PriorityRWLock): __slots__ = ()
[docs]
165class WritePreferredPriorityRWLock(PriorityRWLock): __slots__ = ()
[docs]
166class AgingRWLock(PriorityRWLock):
167 __slots__ = '_rf', '_rt', '_wf', '_wt'
168 def __new__(cls, /, rf=None, wf=None): C, _ = getcontext(), _rwlock_sub_new(cls); _._rf, _._wf = C.AGING_RWLOCK_DEFAULT_READ_PRIORITY_FACTOR if rf is None else rf, C.AGING_RWLOCK_DEFAULT_WRITE_PRIORITY_FACTOR if wf is None else wf; return _
[docs]
169 def setup(self): super().setup(); self._rt, self._wt = defaultdict(int), defaultdict(int)
[docs]
170 @asynccontextmanager
171 async def reading(self, priority=None):
172 d[i] = p = (d := self._rt)[i := id(current_task())]+1
173 if priority is None: priority = self._rf*p
174 async with super().reading(priority): d.pop(i, None); yield
[docs]
175 @asynccontextmanager
176 async def writing(self, priority=None):
177 d[i] = p = (d := self._wt)[i := id(current_task())]+1
178 if priority is None: priority = self._wf*p
179 async with super().writing(priority): d.pop(i, None); yield
180 @property
181 def cur_unsuccessful_reads(self): return sum(self._rt.values())
182 @property
183 def cur_unsuccessful_writes(self): return sum(self._wt.values())
184del B, t, d, n, s