1# ty: ignore[invalid-argument-type]
2import asyncutils as A
3from asyncutils.constants import _NO_DEFAULT
4from asyncutils._internal import compat as D, patch as P
5from asyncutils._internal.helpers import LoopMixinBase, check, get_loop_and_set, fullname
6from asyncutils._internal.log import info
7from asyncutils._internal.submodules import queues_all as __all__
8from _collections import deque
9from _functools import partial
10from abc import ABCMeta, abstractmethod
11from asyncio import Event, timeout as _timeout, wait_for
12from itertools import count
13from sys import _getframe, audit
14ignore_qempty, ignore_qfull = map((f := (ignore_qshutdown := A.IgnoreErrors(D.QueueShutDown)).combined), _ := (D.QueueEmpty, D.QueueFull))
15ignore_qerrs, f = f(*_), object.__setattr__
16def _wakeup_next(W):
17 P = W.popleft
18 while W:
19 if not (w := P()).done(): w.set_result(None); break
20class Q:
21 exc = A.ForbiddenOperation; __slots__ = 'maxsize', 'empty', 'qsize', 'full', 'get', 'get_nowait', 'put', 'put_nowait', 'change_get_password', 'change_put_password', 'task_done', 'join', 'shutdown', 'cancel_extend' # noqa: RUF023
22 def __repr__(self): return f'<password-protected queue at {id(self):#x}>'
23 def __new__(cls, /, *A, _=f):
24 s = super().__new__(cls)
25 for a in zip(cls.__slots__, A): _(s, *a)
26 return s
27 def __setattr__(self, name, value, /, _='cancel_extend', f=f, s=frozenset(__slots__)):
28 if name not in s: raise AttributeError(f'password-protected queue has no attribute {name!r}')
29 if name != _: raise AttributeError(f'attribute/method {name!r} on password-protected queue is read-only')
30 f(self, name, value)
31 def __init_subclass__(cls, /, _=exc('subclass'), **k): raise _
32 def _get(self, _=exc('call _get() on')): raise _
33 def _put(self, _=exc('call _put() on')): raise _
34 def _init(self, maxsize, _=exc('call _init() on')): raise _
35 P.patch_method_signatures((__setattr__, 'name, value, /'), (_get, ''), (_put, ''), (_init, 'maxsize')); P.patch_classmethod_signatures((__init_subclass__, '**k'), (__new__, 'maxsize, empty, qsize, full, get, get_nowait, put, put_nowait, change_get_password, change_put_password, task_done, join, shutdown, cancel_extend, /'))
[docs]
36def password_queue(password_put=_NO_DEFAULT, password_get=_NO_DEFAULT, maxsize=0, *, protect_get=False, protect_put=True, can_change_get=False, can_change_put=False, priority=False, lifo=False, init_items=(), strict=True, get_from=None, put_from=None, gettyp=object, puttyp=object, _=Q): # noqa: C901,PLR0913,PLR0915
37 audit('asyncutils.queues.password_queue', get_from if protect_get else None, put_from if protect_put else None); C = A.getcontext()
38 try: F = _getframe(1)
39 except ValueError: F = None
40 if protect_get:
41 if password_get is _NO_DEFAULT:
42 if F is None or (password_get := F.f_locals.get(get_from := (C.PASSWORD_QUEUE_DEFAULT_GET_FROM if get_from is None else get_from).strip())) is None is (password_get := F.f_globals.get(get_from)): raise A.GetPasswordRetrievalError(get_from)
43 elif get_from is not None: raise ValueError('asyncutils.queues.password_queue: only pass one of get_from or password_get')
44 if not isinstance(password_get, gettyp): raise A.WrongPasswordType(None, password_get, type(password_get), gettyp)
45 if protect_put:
46 if password_put is _NO_DEFAULT:
47 if F is None or (password_put := F.f_locals.get(put_from := (C.PASSWORD_QUEUE_DEFAULT_PUT_FROM if put_from is None else put_from).strip())) is None is (password_put := F.f_globals.get(put_from)): raise A.PutPasswordRetrievalError(put_from)
48 elif put_from is not None: raise ValueError('asyncutils.queues.password_queue: only pass one of put_from or password_put')
49 if not isinstance(password_put, puttyp): raise A.WrongPasswordType(None, password_put, type(password_put), puttyp)
50 def s(p):
51 if not isinstance(p, gettyp): raise A.WrongPasswordType(q, p, type(p), gettyp)
52 if p is not password_get and (strict or not check(p, password_get)): raise A.WrongPassword(q, p)
53 def t(p):
54 if not isinstance(p, puttyp): raise A.WrongPasswordType(q, p, type(p), puttyp)
55 if p is not password_put and (strict or not check(p, password_put)): raise A.WrongPassword(q, p)
56 def u(p):
57 if not protect_get: return
58 if not p: raise A.GetPasswordMissing
59 p, = p; s(p)
60 def v(p):
61 if not protect_put: return
62 if not p: raise A.PutPasswordMissing
63 p, = p; t(p)
64 E, G, P, U, S, m, b = A.done_evt(), deque(), deque(), 0, False, (L := get_loop_and_set()).create_future, object()
65 if priority: g, p = partial((M := D if lifo else __import__('heapq')).heappop, l := []), partial(M.heappush, l)
66 else: g, p = (l := []).pop if lifo else (l := deque()).popleft, l.append
67 async def get(*p):
68 u(p); _ = G.append
69 while not l:
70 if S: raise D.QueueShutDown
71 F = m()
72 try: _(F); await F
73 except:
74 F.cancel()
75 with A.ignore_valerrs: G.remove(F)
76 if l and not F.cancelled(): _wakeup_next(G)
77 raise
78 return get_nowait(_=b)
79 def get_nowait(*p, _=None):
80 if not l: raise D.QueueShutDown if S else D.QueueEmpty
81 if _ is not b: u(p)
82 i = g(); _wakeup_next(P); return i
83 async def put(item, *p):
84 v(p); _ = P.append
85 while full():
86 if S: raise D.QueueShutDown
87 _(F := m())
88 try: await F
89 except:
90 F.cancel()
91 with A.ignore_valerrs: P.remove(F)
92 if not (full() or F.cancelled()): _wakeup_next(P)
93 raise
94 return put_nowait(item, _=b)
95 def put_nowait(item, *P, _=None):
96 if S: raise D.QueueShutDown
97 if full(): raise D.QueueFull
98 if _ is not b: v(P)
99 p(item); nonlocal U; U += 1; E.clear(); _wakeup_next(G)
100 def change_get_password(old_pwd, new_pwd):
101 if (S and not l) or not can_change_get: return False
102 if not isinstance(new_pwd, gettyp): return False
103 try: s(old_pwd)
104 except A.CRITICAL: raise A.Critical
105 except: return False
106 nonlocal password_get; password_get = new_pwd; return True
107 def change_put_password(old_pwd, new_pwd):
108 if S or not can_change_put: return False
109 if not isinstance(new_pwd, puttyp): return False
110 try: t(old_pwd)
111 except A.CRITICAL: raise A.Critical
112 except: return False
113 nonlocal password_put; password_put = new_pwd; return True
114 def task_done():
115 nonlocal U
116 if U == 0: raise ValueError('task_done() called too many times')
117 U -= 1
118 if U == 0: E.set()
119 def shutdown(immediate=False):
120 nonlocal S, U; S = True
121 if immediate:
122 U -= len(l)
123 if U <= 0: U = 0; E.set()
124 l.clear()
125 for d in (G, P):
126 f = d.popleft
127 while d:
128 if not (F := f()).done(): F.set_result(None)
129 q = _(maxsize, lambda: not l, lambda: len(l), full := lambda: 0 < maxsize <= len(l), get, get_nowait, put, put_nowait, change_get_password, change_put_password, task_done, E.wait, shutdown, lambda msg=None: False) # noqa: ARG005
130 if init_items:
131 async def extend(f=D.partial(put, D.Placeholder, password_put)):
132 async for i in A.iter_to_agen(init_items): await f(i)
133 q.cancel_extend = L.create_task(extend()).cancel
134 return q
[docs]
135class PotentQueueBase(D.Queue, LoopMixinBase, metaclass=ABCMeta):
[docs]
136 @abstractmethod
137 def _init(self, maxsize): raise NotImplementedError
[docs]
138 @abstractmethod
139 def _get(self): raise NotImplementedError
[docs]
140 @abstractmethod
141 def _put(self, item): raise NotImplementedError
[docs]
142 @abstractmethod
143 def peek_all(self): raise NotImplementedError
[docs]
144 @abstractmethod
145 def qsize(self): raise NotImplementedError
146 def __init__(self, maxsize=0): super().__init__(maxsize); self._event = Event()
[docs]
147 def reset(self): super().__init__(self.maxsize); self._event.clear()
[docs]
148 async def smart_put(self, item, *, timeout=None, raising=True):
149 try: self.put_nowait(item); return True
150 except D.QueueFull: ...
151 try: await wait_for(self.put(item), timeout); return False
152 except TimeoutError:
153 if raising: raise
[docs]
154 async def smart_get(self, *, timeout=None, default=_NO_DEFAULT):
155 f = default is _NO_DEFAULT
156 try: return self.get_nowait()
157 except D.QueueShutDown:
158 if f: raise
159 return default
160 except D.QueueEmpty:
161 try: return await wait_for(self.get(), timeout)
162 except TimeoutError as e:
163 if f: raise e from None
164 return default
[docs]
165 async def extend(self, it, timeout=None):
166 info(f'extending {fullname(self)} with iterable {it!r}'); f = self.smart_put
167 async with _timeout(timeout):
168 async for i in A.iter_to_agen(it): await f(i)
[docs]
169 def push(self, item):
170 try:
171 if self.full(): audit(f'{fullname(self)}.push', id(self), item, self.get_nowait())
172 self.put_nowait(item); return True
173 except D.QueueShutDown: return False
[docs]
174 async def drain_persistent(self, max_items=None, timeout=None, _=ignore_qshutdown.combined(TimeoutError)):
175 m, c = abs(max_items or float('inf')), 0; info(f'persistent draining of {fullname(self)} started')
176 with _:
177 while c < m: yield await wait_for(self.get(), timeout); self.task_done(); c += 1 # noqa: ASYNC119
[docs]
178 def drain_until_empty(self, max_items=None):
179 max_items, c = abs(max_items or float('inf')), 0; info(f'draining of {fullname(self)} started')
180 with ignore_qempty:
181 while c < max_items: yield self.get_nowait(); c += 1
[docs]
182 def drain_retlist(self, max_items=None): return list(self.drain_until_empty(max_items))
183 __iter__, __aiter__ = drain_persistent, drain_until_empty
[docs]
184 def shutdown(self, immediate=False): self._event.set(); super().shutdown(immediate)
185 def __repr__(self): return f'{fullname(self)}({self.maxsize})'
186 @property
187 def is_shutdown(self): return self._event.is_set()
188 @is_shutdown.setter
189 def is_shutdown(self, val, /): self.shutdown() if val else self.__init__(self.maxsize)
190 @property
191 def can_put_now(self): return not (self.is_shutdown or self.full())
192 @property
193 def can_get_now(self): return not (self.is_shutdown or self.empty())
194 @property
195 def fully_functional(self): return not (self.is_shutdown or self.full() or self.empty())
196 @property
197 def capacity(self): return m if (m := self.maxsize) > 0 else float('inf')
198 @property
199 def remaining_capacity(self): return self.capacity-self.qsize()
200 @property
201 def utilization_rate(self): return self.qsize()/self.maxsize
[docs]
202 def pushpop_nowait(self, item, raising=True):
203 if self.is_shutdown:
204 if raising: raise D.QueueShutDown
205 return
206 if self.empty():
207 if raising: raise D.QueueEmpty(f'{fullname(self)}.pushpop_nowait on {item!r} expected non-empty queue with raising=True')
208 return self.put_nowait(item)
209 if self.full():
210 if raising: raise D.QueueFull(f'{fullname(self)}.pushpop_nowait on {item!r} expected non-full queue with raising=True')
211 r = self.get_nowait(); self.put_nowait(item); return r
212 self.put_nowait(item); return self.get_nowait()
[docs]
213 def poppush_nowait(self, item, raising=True):
214 if self.is_shutdown:
215 if raising: raise D.QueueShutDown
216 return
217 if self.empty():
218 if raising: raise D.QueueEmpty(f'{fullname(self)}.pushpop_nowait on {item!r} expected non-empty queue with raising=True')
219 return self.put_nowait(item)
220 r = self.get_nowait(); self.put_nowait(item); return r
[docs]
221 async def pushpop(self, item): await self.put(item); return await self.get()
[docs]
222 async def poppush(self, item): r = await self.get(); await self.put(item); return r
[docs]
223 def clear(self):
224 with ignore_qempty:
225 while True: self.get_nowait()
[docs]
226 @A.dualcontextmanager
227 def transaction(self, _=A.IgnoreErrors(TimeoutError)):
228 audit((s := f'{fullname(self)}.transaction/%s')%'start', i := id(self)); q = self.peek_all()
229 try: yield self
230 except:
231 self.clear(); f = self.put_nowait
232 for _ in q: f(_)
233 raise
234 finally: audit(s%'end', i)
[docs]
235 def empty(self): return self.qsize() == 0
[docs]
236 def __bool__(self): return self.qsize() >= 0
[docs]
237 def map(self, f, stop_when=None, *, lifo=False):
238 audit(f'{fullname(self)}.map', id(self), fullname(f))
239 if stop_when is None:
240 stop_when, E = A.AsyncCallbacksFuture(), (D.QueueShutDown, D.QueueEmpty)
241 async def get(g=self.drain_until_empty, /): # noqa: RUF029
242 try:
243 for i in g(): yield i
244 finally: stop_when.set_result(None)
245 else:
246 E = (D.QueueShutDown,)
247 async def get(g=self.get, /):
248 with ignore_qshutdown:
249 while True: yield await g() # noqa: ASYNC119
250 async def feed(q, s, g, /, f=f):
251 try:
252 while True: await q.put(await f(await anext(g)))
253 except E: await A.safe_cancel(s)
254 (s := A.AsyncCallbacksFuture(loop=self.loop)).add_noargs_async_callback(partial(A.safe_cancel, self.make(feed(q := (SmartLifoQueue if lifo else SmartQueue)(self.maxsize), s, get()))))
255 if stop_when: stop_when.add_done_callback(s.cancel)
256 return q
[docs]
257 def starmap(self, f, stop_when=None, *, lifo=False):
258 audit(f'{fullname(self)}.starmap', id(self), fullname(f))
259 if stop_when is None:
260 stop_when, E = A.AsyncCallbacksFuture(), (D.QueueShutDown, D.QueueEmpty)
261 async def get(g=self.drain_until_empty, /): # noqa: RUF029
262 try:
263 for i in g(): yield i
264 finally: stop_when.set_result(None)
265 else:
266 E = D.QueueShutDown,
267 async def get(g=self.get, /):
268 while True: yield await g()
269 async def feed(q, s, g, /, f=f):
270 try:
271 async for _ in g: await q.put(await f(*_))
272 except E: await A.safe_cancel(s)
273 (s := A.AsyncCallbacksFuture(loop=self.loop)).add_noargs_async_callback(partial(A.safe_cancel, self.make(feed(q := (SmartLifoQueue if lifo else SmartQueue)(self.maxsize), s, get()))))
274 if stop_when: stop_when.add_done_callback(s.cancel)
275 return q
[docs]
276 def filter(self, pred=bool, *, lifo=False):
277 audit(f'{fullname(self)}.filter', id(self), fullname(pred))
278 q = (SmartLifoQueue if lifo else SmartQueue)(self.maxsize)
279 async def feed(f=self.smart_put, g=q.smart_put, h=self.get, _=pred):
280 with ignore_qshutdown:
281 while True: await (f if _(i := await h()) else g)(i)
282 self.make(feed()); return q
[docs]
283 def enumerate(self, *, lifo=False):
284 audit(f'{fullname(self)}.enumerate', id(self))
285 q = (SmartLifoQueue if lifo else SmartQueue)(self.maxsize)
286 async def feed():
287 i = 0
288 with ignore_qempty:
289 while True: await q.smart_put((i, await self.get())); i += 1
290 self.make(feed()); return q
[docs]
291 def filter_nowait(self, pred=bool, /):
292 f, g, a = (k := []).append, (r := []).append, self.get_nowait
293 with ignore_qempty:
294 while True: (f if pred(i := a()) else g)(i)
295 h, j = self.put_nowait, len(r)
296 for i in k:
297 try: h(i)
298 except D.QueueFull: g(i)
299 return r, j
[docs]
300 def enumerate_nowait(self, start=0, *, step=1):
301 with ignore_qempty:
302 while True: yield start, self.get_nowait(); start += step
303 P.patch_method_signatures((filter_nowait, 'pred=bool'), (transaction, ''), (drain_persistent, 'max_items=None, timeout=None'))
[docs]
304class SmartQueue(PotentQueueBase):
[docs]
305 def _init(self, maxsize): self.__queue = deque(maxlen=maxsize if maxsize > 0 else None)
[docs]
306 def _get(self): return self.__queue.popleft()
[docs]
307 def _put(self, item): self.__queue.append(item)
[docs]
308 def peek(self):
309 if q := self.__queue: return q[0]
310 raise D.QueueEmpty
[docs]
311 def peek_all(self): return list(self.__queue)
[docs]
312 def qsize(self): return len(self.__queue)
[docs]
313 def rotate(self, n=1, /): self.__queue.rotate(n)
314 def __bool__(self): return bool(self.__queue)
315 def empty(self): return not self
[docs]
316class SmartLifoQueue(PotentQueueBase):
[docs]
317 def _init(self, maxsize): self.__queue = []
[docs]
318 def _get(self): return self.__queue.pop()
[docs]
319 def _put(self, item): self.__queue.append(item)
[docs]
320 def peek(self, i=-1, /):
321 s = self.qsize()
322 if i < 0: i += s
323 if 0 <= i < s: return self.__queue[i]
324 raise IndexError(f'asyncutils.queues.SmartLifoQueue: failed to peek item at index {i}')
[docs]
325 def peek_all(self): return self.__queue.copy()
[docs]
326 def qsize(self): return len(self.__queue)
327 def __bool__(self): return bool(self.__queue)
328 def empty(self): return not self
329 def pushpop(self, item): raise NotImplementedError
330 def pushpop_nowait(self, item, raising=True): raise NotImplementedError
[docs]
331class SmartPriorityQueue(PotentQueueBase):
332 def __init__(self, maxsize=0, *, init_items=()): super().__init__(maxsize); self.make(self.start(maxsize, init_items))
[docs]
333 async def start(self, maxsize, init_items): q = await A.collect(I := A.iter_to_agen(init_items), maxsize); import heapq as H; H.heapify(q); self.__get, self.__put, self._unfinished_tasks, self.__queue = partial(H.heappop, q), partial(H.heappush, q), len(q), q; self._finished.clear(); await self.extend(I) # ty: ignore[unresolved-attribute]
[docs]
334 def _init(self, maxsize): ...
[docs]
335 def _get(self): return self.__get()
[docs]
336 def _put(self, item): self.__put(item)
[docs]
337 def peek(self): return self.__queue[0]
[docs]
338 def peek_all(self): return self.__queue.copy()
[docs]
339 def qsize(self): return len(self.__queue)
340 def __bool__(self): return bool(self.__queue)
341 def empty(self): return not self
[docs]
342class UserPriorityQueue(SmartPriorityQueue):
[docs]
343 @classmethod
344 def from_iter_of_tuples(cls, items, maxsize=0, _=SmartPriorityQueue): _.__init__(Q := object.__new__(cls), maxsize, init_items=items); Q.__tiebreak = count(); return Q
345 def __init__(self, maxsize=0, *, init_items=(), init_priority=0): self.__tiebreak = count(); super().__init__(maxsize, init_items=((init_priority, self._tiebreak, j) async for j in A.iter_to_agen(init_items)))
346 @property
347 def _tiebreak(self): return next(self.__tiebreak)
[docs]
348 def put_nowait(self, item, priority=0): super().put_nowait((priority, self._tiebreak, item))
[docs]
349 def put(self, item, priority=0): return super().put((priority, self._tiebreak, item))
[docs]
350 def get_nowait(self): return super().get_nowait()[-1]
[docs]
351 async def get(self): return (await super().get())[-1]
352del f, _, Q