1import asyncutils as A
2from asyncutils.constants import _NO_DEFAULT
3from asyncutils._internal import compat as D, patch as P
4from asyncutils._internal.helpers import LoopMixinBase, check, get_loop_and_set, fullname
5from asyncutils._internal.log import info
6from asyncutils._internal.submodules import queues_all as __all__
7from _collections import deque
8from _functools import partial
9from abc import ABCMeta, abstractmethod
10from asyncio import Event, timeout as _timeout, wait_for
11from itertools import count
12from sys import _getframe, audit
13ignore_qempty, ignore_qfull = map((f := (ignore_qshutdown := A.IgnoreErrors(D.QueueShutDown)).combined), _ := (D.QueueEmpty, D.QueueFull))
14ignore_qerrs, f = f(*_), object.__setattr__
15def _wakeup_next(W):
16 P = W.popleft
17 while W:
18 if not (w := P()).done(): w.set_result(None); break
19class Q:
20 exc = A.ForbiddenOperation; __slots__ = 'maxsize', 'empty', 'qsize', 'full', 'get', 'get_nowait', 'put', 'put_nowait', 'change_get_password', 'change_put_password', 'task_done', 'join', 'shutdown', 'cancel_extend' # noqa: RUF023
21 def __repr__(self): return f'<password-protected queue at {id(self):#x}>'
22 def __new__(cls, /, *a, _=f.__get__, x='pwdq.'):
23 (f := _(s := super().__new__(cls)))(*next(i := zip(cls.__slots__, a)))
24 for n, v in i: v.__qualname__ = x+n; f(n, v)
25 return s
26 def __setattr__(self, name, value, /, _=__slots__[-1], f=f, s=frozenset(__slots__)):
27 if name not in s: raise AttributeError(f'password-protected queue has no attribute {name!r}')
28 if name != _: raise AttributeError(f'attribute/method {name!r} on password-protected queue is read-only')
29 f(self, name, value)
30 def __init_subclass__(cls, /, _=exc('subclass'), **k): raise _
31 def _get(self, _=exc('call _get() on')): raise _
32 def _put(self, _=exc('call _put() on')): raise _
33 def _init(self, maxsize, _=exc('call _init() on')): raise _
34 P.patch_method_signatures((__setattr__, 'name, value, /'), (_get, ''), (_put, ''), (_init, 'maxsize')); P.patch_classmethod_signatures((__init_subclass__, '**k'), (__new__, 'maxsize, empty, qsize, full, get, get_nowait, put, put_nowait, change_get_password, change_put_password, task_done, join, shutdown, cancel_extend, /'))
[docs]
35def password_queue(password_put=_NO_DEFAULT, password_get=_NO_DEFAULT, maxsize=0, *, protect_get=False, protect_put=True, can_change_get=False, can_change_put=False, priority=False, lifo=False, init_items=(), strict=True, get_from=None, put_from=None, gettyp=object, puttyp=object, _=Q): # noqa: C901,PLR0913,PLR0915
36 audit('asyncutils.queues.password_queue', get_from if protect_get else None, put_from if protect_put else None); C = A.getcontext()
37 try: F = _getframe(1)
38 except ValueError: F = None
39 if protect_get:
40 if password_get is _NO_DEFAULT:
41 if F is None or (password_get := F.f_locals.get(get_from := (C.PASSWORD_QUEUE_DEFAULT_GET_FROM if get_from is None else get_from).strip())) is None is (password_get := F.f_globals.get(get_from)): raise A.GetPasswordRetrievalError(get_from) # ty: ignore[invalid-argument-type]
42 elif get_from is not None: raise ValueError('asyncutils.queues.password_queue: only pass one of get_from or password_get')
43 if not isinstance(password_get, gettyp): raise A.WrongPasswordType(None, password_get, type(password_get), gettyp)
44 if protect_put:
45 if password_put is _NO_DEFAULT:
46 if F is None or (password_put := F.f_locals.get(put_from := (C.PASSWORD_QUEUE_DEFAULT_PUT_FROM if put_from is None else put_from).strip())) is None is (password_put := F.f_globals.get(put_from)): raise A.PutPasswordRetrievalError(put_from) # ty: ignore[invalid-argument-type]
47 elif put_from is not None: raise ValueError('asyncutils.queues.password_queue: only pass one of put_from or password_put')
48 if not isinstance(password_put, puttyp): raise A.WrongPasswordType(None, password_put, type(password_put), puttyp)
49 def s(p):
50 if not isinstance(p, gettyp): raise A.WrongPasswordType(q, p, type(p), gettyp) # ty: ignore[invalid-argument-type]
51 if p is not password_get and (strict or not check(p, password_get)): raise A.WrongPassword(q, p) # ty: ignore[invalid-argument-type]
52 def t(p):
53 if not isinstance(p, puttyp): raise A.WrongPasswordType(q, p, type(p), puttyp) # ty: ignore[invalid-argument-type]
54 if p is not password_put and (strict or not check(p, password_put)): raise A.WrongPassword(q, p) # ty: ignore[invalid-argument-type]
55 def u(p):
56 if not protect_get: return
57 if not p: raise A.GetPasswordMissing
58 p, = p; s(p)
59 def v(p):
60 if not protect_put: return
61 if not p: raise A.PutPasswordMissing
62 p, = p; t(p)
63 E, G, P, U, S, m, b = A.done_evt(), deque(), deque(), 0, False, (L := get_loop_and_set()).create_future, object()
64 if priority: g, p = partial((M := D if lifo else __import__('heapq')).heappop, l := []), partial(M.heappush, l)
65 else: g, p = (l := []).pop if lifo else (l := deque()).popleft, l.append
66 async def get(*p):
67 u(p); _ = G.append
68 while not l:
69 if S: raise D.QueueShutDown
70 F = m()
71 try: _(F); await F
72 except:
73 F.cancel()
74 with A.ignore_valerrs: G.remove(F)
75 if l and not F.cancelled(): _wakeup_next(G)
76 raise
77 return get_nowait(_=b)
78 def get_nowait(*p, _=None):
79 if not l: raise D.QueueShutDown if S else D.QueueEmpty
80 if _ is not b: u(p)
81 i = g(); _wakeup_next(P); return i
82 async def put(i, /, *p):
83 v(p); _ = P.append
84 while full():
85 if S: raise D.QueueShutDown
86 _(F := m())
87 try: await F
88 except:
89 F.cancel()
90 with A.ignore_valerrs: P.remove(F)
91 if not (full() or F.cancelled()): _wakeup_next(P)
92 raise
93 return put_nowait(i, _=b)
94 def put_nowait(i, /, *P, _=None):
95 if S: raise D.QueueShutDown
96 if full(): raise D.QueueFull
97 if _ is not b: v(P)
98 p(i); nonlocal U; U += 1; E.clear(); _wakeup_next(G)
99 def change_get_password(opw, npw):
100 if (S and not l) or not can_change_get: return False
101 if not isinstance(npw, gettyp): return False
102 try: s(opw)
103 except A.CRITICAL: raise A.Critical
104 except: return False
105 nonlocal password_get; password_get = npw; return True
106 def change_put_password(opw, npw):
107 if S or not can_change_put: return False
108 if not isinstance(npw, puttyp): return False
109 try: t(opw)
110 except A.CRITICAL: raise A.Critical
111 except: return False
112 nonlocal password_put; password_put = npw; return True
113 def task_done():
114 nonlocal U
115 if U == 0: raise ValueError('task_done() called too many times')
116 U -= 1
117 if U == 0: E.set()
118 def shutdown(immediate=False):
119 nonlocal S, U; S = True
120 if immediate:
121 U -= len(l)
122 if U <= 0: U = 0; E.set()
123 l.clear()
124 for d in (G, P):
125 f = d.popleft
126 while d:
127 if not (F := f()).done(): F.set_result(None)
128 q = _(maxsize, lambda: not l, lambda: len(l), full := lambda: 0 < maxsize <= len(l), get, get_nowait, put, put_nowait, change_get_password, change_put_password, task_done, A.discard_retval(E.wait), shutdown, lambda msg=None: False) # noqa: ARG005
129 if init_items:
130 async def extend(f=D.partial(put, D.Placeholder, password_put)):
131 async for i in A.iter_to_agen(init_items): await f(i)
132 q.cancel_extend = L.create_task(extend()).cancel
133 return q
[docs]
134class PotentQueueBase(D.Queue, LoopMixinBase, metaclass=ABCMeta):
[docs]
135 @abstractmethod
136 def _init(self, maxsize): raise NotImplementedError
[docs]
137 @abstractmethod
138 def _get(self): raise NotImplementedError
[docs]
139 @abstractmethod
140 def _put(self, item): raise NotImplementedError
[docs]
141 @abstractmethod
142 def peek_all(self): raise NotImplementedError
[docs]
143 @abstractmethod
144 def qsize(self): raise NotImplementedError
145 def __init__(self, maxsize=0): super().__init__(maxsize); self._event = Event()
[docs]
146 def reset(self): super().__init__(self.maxsize); self._event.clear()
[docs]
147 async def smart_put(self, item, *, timeout=None, raising=True):
148 try: self.put_nowait(item); return True
149 except D.QueueFull: ...
150 try: await wait_for(self.put(item), timeout); return False
151 except TimeoutError:
152 if raising: raise
[docs]
153 async def smart_get(self, *, timeout=None, default=_NO_DEFAULT):
154 f = default is _NO_DEFAULT
155 try: return self.get_nowait()
156 except D.QueueShutDown:
157 if f: raise
158 return default
159 except D.QueueEmpty:
160 try: return await wait_for(self.get(), timeout)
161 except TimeoutError as e:
162 if f: raise e from None
163 return default
[docs]
164 async def extend(self, it, timeout=None):
165 info(f'extending {fullname(self)} with iterable {it!r}'); f = self.smart_put
166 async with _timeout(timeout):
167 async for i in A.iter_to_agen(it): await f(i)
[docs]
168 def push(self, item):
169 try:
170 if self.full(): audit(f'{fullname(self)}.push', id(self), item, self.get_nowait())
171 self.put_nowait(item); return True
172 except D.QueueShutDown: return False
[docs]
173 async def drain_persistent(self, max_items=None, timeout=None, _=ignore_qshutdown.combined(TimeoutError)):
174 m, c = abs(max_items or float('inf')), 0; info(f'persistent draining of {fullname(self)} started')
175 with _:
176 while c < m: yield await wait_for(self.get(), timeout); self.task_done(); c += 1 # noqa: ASYNC119
[docs]
177 def drain_until_empty(self, max_items=None):
178 max_items, c = abs(max_items or float('inf')), 0; info(f'draining of {fullname(self)} started')
179 with ignore_qempty:
180 while c < max_items: yield self.get_nowait(); c += 1
[docs]
181 def drain_retlist(self, max_items=None): return list(self.drain_until_empty(max_items))
182 __iter__, __aiter__ = drain_persistent, drain_until_empty
[docs]
183 def shutdown(self, immediate=False): self._event.set(); super().shutdown(immediate)
184 def __repr__(self): return f'{fullname(self)}({self.maxsize})'
185 @property
186 def is_shutdown(self): return self._event.is_set()
187 @is_shutdown.setter
188 def is_shutdown(self, val, /): self.shutdown() if val else self.__init__(self.maxsize)
189 @property
190 def can_put_now(self): return not (self.is_shutdown or self.full())
191 @property
192 def can_get_now(self): return not (self.is_shutdown or self.empty())
193 @property
194 def fully_functional(self): return not (self.is_shutdown or self.full() or self.empty())
195 @property
196 def capacity(self): return m if (m := self.maxsize) > 0 else float('inf')
197 @property
198 def remaining_capacity(self): return self.capacity-self.qsize()
199 @property
200 def utilization_rate(self): return self.qsize()/self.maxsize
[docs]
201 def pushpop_nowait(self, item, raising=True):
202 if self.is_shutdown: raise D.QueueShutDown
203 if self.empty():
204 if raising: raise D.QueueEmpty(f'{fullname(self)}.pushpop_nowait on {item!r} expected non-empty queue with raising=True')
205 return self.put_nowait(item)
206 if self.full():
207 if raising: raise D.QueueFull(f'{fullname(self)}.pushpop_nowait on {item!r} expected non-full queue with raising=True')
208 r = self.get_nowait(); self.put_nowait(item); return r
209 self.put_nowait(item); return self.get_nowait()
[docs]
210 def poppush_nowait(self, item, raising=True):
211 if self.is_shutdown: raise D.QueueShutDown
212 if self.empty():
213 if raising: raise D.QueueEmpty(f'{fullname(self)}.pushpop_nowait on {item!r} expected non-empty queue with raising=True')
214 return self.put_nowait(item)
215 r = self.get_nowait(); self.put_nowait(item); return r
[docs]
216 async def pushpop(self, item): await self.put(item); return await self.get()
[docs]
217 async def poppush(self, item): r = await self.get(); await self.put(item); return r
[docs]
218 def clear(self):
219 with ignore_qempty:
220 while True: self.get_nowait()
[docs]
221 @A.dualcontextmanager
222 def transaction(self, _=A.IgnoreErrors(TimeoutError)):
223 audit((s := f'{fullname(self)}.transaction/%s')%'start', i := id(self)); q = self.peek_all()
224 try: yield self
225 except:
226 self.clear(); f = self.put_nowait
227 for _ in q: f(_)
228 raise
229 finally: audit(s%'end', i)
[docs]
230 def empty(self): return self.qsize() == 0
[docs]
231 def __bool__(self): return self.qsize() >= 0
[docs]
232 def map(self, f, stop_when=None, *, lifo=False):
233 audit(f'{fullname(self)}.map', id(self), fullname(f))
234 if stop_when is None:
235 stop_when, E = A.AsyncCallbacksFuture(), (D.QueueShutDown, D.QueueEmpty)
236 async def get(g=self.drain_until_empty, /): # noqa: RUF029
237 try:
238 for i in g(): yield i
239 finally: stop_when.set_result(None)
240 else:
241 E = (D.QueueShutDown,)
242 async def get(g=self.get, /):
243 with ignore_qshutdown:
244 while True: yield await g() # noqa: ASYNC119
245 async def feed(q, s, g, /, f=f):
246 try:
247 while True: await q.put(await f(await anext(g)))
248 except E: await A.safe_cancel(s)
249 (s := A.AsyncCallbacksFuture(loop=self.loop)).add_noargs_async_callback(partial(A.safe_cancel, self.make(feed(q := (SmartLifoQueue if lifo else SmartQueue)(self.maxsize), s, get()))))
250 if stop_when: stop_when.add_done_callback(s.cancel)
251 return q
[docs]
252 def starmap(self, f, stop_when=None, *, lifo=False):
253 audit(f'{fullname(self)}.starmap', id(self), fullname(f))
254 if stop_when is None:
255 stop_when, E = A.AsyncCallbacksFuture(), (D.QueueShutDown, D.QueueEmpty)
256 async def get(g=self.drain_until_empty, /): # noqa: RUF029
257 try:
258 for i in g(): yield i
259 finally: stop_when.set_result(None)
260 else:
261 E = D.QueueShutDown,
262 async def get(g=self.get, /):
263 while True: yield await g()
264 async def feed(q, s, g, /, f=f):
265 try:
266 async for _ in g: await q.put(await f(*_))
267 except E: await A.safe_cancel(s)
268 (s := A.AsyncCallbacksFuture(loop=self.loop)).add_noargs_async_callback(partial(A.safe_cancel, self.make(feed(q := (SmartLifoQueue if lifo else SmartQueue)(self.maxsize), s, get()))))
269 if stop_when: stop_when.add_done_callback(s.cancel)
270 return q
[docs]
271 def filter(self, pred=bool, *, lifo=False):
272 audit(f'{fullname(self)}.filter', id(self), fullname(pred))
273 q = (SmartLifoQueue if lifo else SmartQueue)(self.maxsize)
274 async def feed(f=self.smart_put, g=q.smart_put, h=self.get, _=pred):
275 with ignore_qshutdown:
276 while True: await (f if _(i := await h()) else g)(i)
277 self.make(feed()); return q
[docs]
278 def enumerate(self, *, lifo=False):
279 audit(f'{fullname(self)}.enumerate', id(self))
280 q = (SmartLifoQueue if lifo else SmartQueue)(self.maxsize)
281 async def feed():
282 i = 0
283 with ignore_qempty:
284 while True: await q.smart_put((i, await self.get())); i += 1
285 self.make(feed()); return q
[docs]
286 def filter_nowait(self, pred=bool, /):
287 f, g, a = (k := []).append, (r := []).append, self.get_nowait
288 with ignore_qempty:
289 while True: (f if pred(i := a()) else g)(i)
290 h, j = self.put_nowait, len(r)
291 for i in k:
292 try: h(i)
293 except D.QueueFull: g(i)
294 return r, j
[docs]
295 def enumerate_nowait(self, start=0, *, step=1):
296 with ignore_qempty:
297 while True: yield start, self.get_nowait(); start += step
298 P.patch_method_signatures((filter_nowait, 'pred=bool'), (transaction, ''), (drain_persistent, 'max_items=None, timeout=None'))
[docs]
299class SmartQueue(PotentQueueBase):
[docs]
300 def _init(self, maxsize): self.__queue = deque(maxlen=maxsize if maxsize > 0 else None)
[docs]
301 def _get(self): return self.__queue.popleft()
[docs]
302 def _put(self, item): self.__queue.append(item)
[docs]
303 def peek(self):
304 if q := self.__queue: return q[0]
305 raise D.QueueEmpty
[docs]
306 def peek_all(self): return list(self.__queue)
[docs]
307 def qsize(self): return len(self.__queue)
[docs]
308 def rotate(self, n=1, /): self.__queue.rotate(n)
309 def __bool__(self): return bool(self.__queue)
310 def empty(self): return not self
[docs]
311class SmartLifoQueue(PotentQueueBase):
[docs]
312 def _init(self, maxsize): self.__queue = []
[docs]
313 def _get(self): return self.__queue.pop()
[docs]
314 def _put(self, item): self.__queue.append(item)
[docs]
315 def peek(self, i=-1, /):
316 s = self.qsize()
317 if i < 0: i += s
318 if 0 <= i < s: return self.__queue[i]
319 raise IndexError(f'asyncutils.queues.SmartLifoQueue: failed to peek item at index {i}')
[docs]
320 def peek_all(self): return self.__queue.copy()
[docs]
321 def qsize(self): return len(self.__queue)
322 def __bool__(self): return bool(self.__queue)
323 def empty(self): return not self
324 def pushpop(self, item): raise NotImplementedError
325 def pushpop_nowait(self, item, raising=True): raise NotImplementedError
[docs]
326class SmartPriorityQueue(PotentQueueBase):
327 def __init__(self, maxsize=0, *, init_items=()): super().__init__(maxsize); self.make(self.start(maxsize, init_items))
[docs]
328 async def start(self, maxsize, init_items): q = await A.collect(I := A.iter_to_agen(init_items), maxsize); import heapq as H; H.heapify(q); self.__get, self.__put, self._unfinished_tasks, self.__queue = partial(H.heappop, q), partial(H.heappush, q), len(q), q; self._finished.clear(); await self.extend(I) # ty: ignore[unresolved-attribute]
[docs]
329 def _init(self, maxsize): ...
[docs]
330 def _get(self): return self.__get()
[docs]
331 def _put(self, item): self.__put(item)
[docs]
332 def peek(self): return self.__queue[0]
[docs]
333 def peek_all(self): return self.__queue.copy()
[docs]
334 def qsize(self): return len(self.__queue)
335 def __bool__(self): return bool(self.__queue)
336 def empty(self): return not self
[docs]
337class UserPriorityQueue(SmartPriorityQueue):
[docs]
338 @classmethod
339 def from_iter_of_tuples(cls, items, maxsize=0, _=SmartPriorityQueue): _.__init__(Q := object.__new__(cls), maxsize, init_items=items); Q.__tiebreak = count(); return Q
340 def __init__(self, maxsize=0, *, init_items=(), init_priority=0): self.__tiebreak = count(); super().__init__(maxsize, init_items=((init_priority, self._tiebreak, j) async for j in A.iter_to_agen(init_items)))
341 @property
342 def _tiebreak(self): return next(self.__tiebreak)
[docs]
343 def put_nowait(self, item, priority=0): super().put_nowait((priority, self._tiebreak, item))
[docs]
344 def put(self, item, priority=0): return super().put((priority, self._tiebreak, item))
[docs]
345 def get_nowait(self): return super().get_nowait()[-1]
[docs]
346 async def get(self): return (await super().get())[-1]
347del f, _, Q