Source code for asyncutils.queues

  1# ty: ignore[invalid-argument-type]
  2import asyncutils as A
  3from asyncutils.constants import _NO_DEFAULT
  4from asyncutils._internal import compat as D, patch as P
  5from asyncutils._internal.helpers import LoopMixinBase, check, get_loop_and_set, fullname
  6from asyncutils._internal.log import info
  7from asyncutils._internal.submodules import queues_all as __all__
  8from _collections import deque
  9from _functools import partial
 10from abc import ABCMeta, abstractmethod
 11from asyncio import Event, timeout as _timeout, wait_for
 12from itertools import count
 13from sys import _getframe, audit
 14ignore_qempty, ignore_qfull = map((f := (ignore_qshutdown := A.IgnoreErrors(D.QueueShutDown)).combined), _ := (D.QueueEmpty, D.QueueFull))
 15ignore_qerrs, f = f(*_), object.__setattr__
 16def _wakeup_next(W):
 17    P = W.popleft
 18    while W:
 19        if not (w := P()).done(): w.set_result(None); break
 20class Q:
 21    exc = A.ForbiddenOperation; __slots__ = 'maxsize', 'empty', 'qsize', 'full', 'get', 'get_nowait', 'put', 'put_nowait', 'change_get_password', 'change_put_password', 'task_done', 'join', 'shutdown', 'cancel_extend' # noqa: RUF023
 22    def __repr__(self): return f'<password-protected queue at {id(self):#x}>'
 23    def __new__(cls, /, *A, _=f):
 24        s = super().__new__(cls)
 25        for a in zip(cls.__slots__, A): _(s, *a)
 26        return s
 27    def __setattr__(self, name, value, /, _='cancel_extend', f=f, s=frozenset(__slots__)):
 28        if name not in s: raise AttributeError(f'password-protected queue has no attribute {name!r}')
 29        if name != _: raise AttributeError(f'attribute/method {name!r} on password-protected queue is read-only')
 30        f(self, name, value)
 31    def __init_subclass__(cls, /, _=exc('subclass'), **k): raise _
 32    def _get(self, _=exc('call _get() on')): raise _
 33    def _put(self, _=exc('call _put() on')): raise _
 34    def _init(self, maxsize, _=exc('call _init() on')): raise _
 35    P.patch_method_signatures((__setattr__, 'name, value, /'), (_get, ''), (_put, ''), (_init, 'maxsize')); P.patch_classmethod_signatures((__init_subclass__, '**k'), (__new__, 'maxsize, empty, qsize, full, get, get_nowait, put, put_nowait, change_get_password, change_put_password, task_done, join, shutdown, cancel_extend, /'))
[docs] 36def password_queue(password_put=_NO_DEFAULT, password_get=_NO_DEFAULT, maxsize=0, *, protect_get=False, protect_put=True, can_change_get=False, can_change_put=False, priority=False, lifo=False, init_items=(), strict=True, get_from=None, put_from=None, gettyp=object, puttyp=object, _=Q): # noqa: C901,PLR0913,PLR0915 37 audit('asyncutils.queues.password_queue', get_from if protect_get else None, put_from if protect_put else None); C = A.getcontext() 38 try: F = _getframe(1) 39 except ValueError: F = None 40 if protect_get: 41 if password_get is _NO_DEFAULT: 42 if F is None or (password_get := F.f_locals.get(get_from := (C.PASSWORD_QUEUE_DEFAULT_GET_FROM if get_from is None else get_from).strip())) is None is (password_get := F.f_globals.get(get_from)): raise A.GetPasswordRetrievalError(get_from) 43 elif get_from is not None: raise ValueError('asyncutils.queues.password_queue: only pass one of get_from or password_get') 44 if not isinstance(password_get, gettyp): raise A.WrongPasswordType(None, password_get, type(password_get), gettyp) 45 if protect_put: 46 if password_put is _NO_DEFAULT: 47 if F is None or (password_put := F.f_locals.get(put_from := (C.PASSWORD_QUEUE_DEFAULT_PUT_FROM if put_from is None else put_from).strip())) is None is (password_put := F.f_globals.get(put_from)): raise A.PutPasswordRetrievalError(put_from) 48 elif put_from is not None: raise ValueError('asyncutils.queues.password_queue: only pass one of put_from or password_put') 49 if not isinstance(password_put, puttyp): raise A.WrongPasswordType(None, password_put, type(password_put), puttyp) 50 def s(p): 51 if not isinstance(p, gettyp): raise A.WrongPasswordType(q, p, type(p), gettyp) 52 if p is not password_get and (strict or not check(p, password_get)): raise A.WrongPassword(q, p) 53 def t(p): 54 if not isinstance(p, puttyp): raise A.WrongPasswordType(q, p, type(p), puttyp) 55 if p is not password_put and (strict or not check(p, password_put)): raise A.WrongPassword(q, p) 56 def u(p): 57 if not protect_get: return 58 if not p: raise A.GetPasswordMissing 59 p, = p; s(p) 60 def v(p): 61 if not protect_put: return 62 if not p: raise A.PutPasswordMissing 63 p, = p; t(p) 64 E, G, P, U, S, m, b = A.done_evt(), deque(), deque(), 0, False, (L := get_loop_and_set()).create_future, object() 65 if priority: g, p = partial((M := D if lifo else __import__('heapq')).heappop, l := []), partial(M.heappush, l) 66 else: g, p = (l := []).pop if lifo else (l := deque()).popleft, l.append 67 async def get(*p): 68 u(p); _ = G.append 69 while not l: 70 if S: raise D.QueueShutDown 71 F = m() 72 try: _(F); await F 73 except: 74 F.cancel() 75 with A.ignore_valerrs: G.remove(F) 76 if l and not F.cancelled(): _wakeup_next(G) 77 raise 78 return get_nowait(_=b) 79 def get_nowait(*p, _=None): 80 if not l: raise D.QueueShutDown if S else D.QueueEmpty 81 if _ is not b: u(p) 82 i = g(); _wakeup_next(P); return i 83 async def put(item, *p): 84 v(p); _ = P.append 85 while full(): 86 if S: raise D.QueueShutDown 87 _(F := m()) 88 try: await F 89 except: 90 F.cancel() 91 with A.ignore_valerrs: P.remove(F) 92 if not (full() or F.cancelled()): _wakeup_next(P) 93 raise 94 return put_nowait(item, _=b) 95 def put_nowait(item, *P, _=None): 96 if S: raise D.QueueShutDown 97 if full(): raise D.QueueFull 98 if _ is not b: v(P) 99 p(item); nonlocal U; U += 1; E.clear(); _wakeup_next(G) 100 def change_get_password(old_pwd, new_pwd): 101 if (S and not l) or not can_change_get: return False 102 if not isinstance(new_pwd, gettyp): return False 103 try: s(old_pwd) 104 except A.CRITICAL: raise A.Critical 105 except: return False 106 nonlocal password_get; password_get = new_pwd; return True 107 def change_put_password(old_pwd, new_pwd): 108 if S or not can_change_put: return False 109 if not isinstance(new_pwd, puttyp): return False 110 try: t(old_pwd) 111 except A.CRITICAL: raise A.Critical 112 except: return False 113 nonlocal password_put; password_put = new_pwd; return True 114 def task_done(): 115 nonlocal U 116 if U == 0: raise ValueError('task_done() called too many times') 117 U -= 1 118 if U == 0: E.set() 119 def shutdown(immediate=False): 120 nonlocal S, U; S = True 121 if immediate: 122 U -= len(l) 123 if U <= 0: U = 0; E.set() 124 l.clear() 125 for d in (G, P): 126 f = d.popleft 127 while d: 128 if not (F := f()).done(): F.set_result(None) 129 q = _(maxsize, lambda: not l, lambda: len(l), full := lambda: 0 < maxsize <= len(l), get, get_nowait, put, put_nowait, change_get_password, change_put_password, task_done, E.wait, shutdown, lambda msg=None: False) # noqa: ARG005 130 if init_items: 131 async def extend(f=D.partial(put, D.Placeholder, password_put)): 132 async for i in A.iter_to_agen(init_items): await f(i) 133 q.cancel_extend = L.create_task(extend()).cancel 134 return q
[docs] 135class PotentQueueBase(D.Queue, LoopMixinBase, metaclass=ABCMeta):
[docs] 136 @abstractmethod 137 def _init(self, maxsize): raise NotImplementedError
[docs] 138 @abstractmethod 139 def _get(self): raise NotImplementedError
[docs] 140 @abstractmethod 141 def _put(self, item): raise NotImplementedError
[docs] 142 @abstractmethod 143 def peek_all(self): raise NotImplementedError
[docs] 144 @abstractmethod 145 def qsize(self): raise NotImplementedError
146 def __init__(self, maxsize=0): super().__init__(maxsize); self._event = Event()
[docs] 147 def reset(self): super().__init__(self.maxsize); self._event.clear()
[docs] 148 async def smart_put(self, item, *, timeout=None, raising=True): 149 try: self.put_nowait(item); return True 150 except D.QueueFull: ... 151 try: await wait_for(self.put(item), timeout); return False 152 except TimeoutError: 153 if raising: raise
[docs] 154 async def smart_get(self, *, timeout=None, default=_NO_DEFAULT): 155 f = default is _NO_DEFAULT 156 try: return self.get_nowait() 157 except D.QueueShutDown: 158 if f: raise 159 return default 160 except D.QueueEmpty: 161 try: return await wait_for(self.get(), timeout) 162 except TimeoutError as e: 163 if f: raise e from None 164 return default
[docs] 165 async def extend(self, it, timeout=None): 166 info(f'extending {fullname(self)} with iterable {it!r}'); f = self.smart_put 167 async with _timeout(timeout): 168 async for i in A.iter_to_agen(it): await f(i)
[docs] 169 def push(self, item): 170 try: 171 if self.full(): audit(f'{fullname(self)}.push', id(self), item, self.get_nowait()) 172 self.put_nowait(item); return True 173 except D.QueueShutDown: return False
[docs] 174 async def drain_persistent(self, max_items=None, timeout=None, _=ignore_qshutdown.combined(TimeoutError)): 175 m, c = abs(max_items or float('inf')), 0; info(f'persistent draining of {fullname(self)} started') 176 with _: 177 while c < m: yield await wait_for(self.get(), timeout); self.task_done(); c += 1 # noqa: ASYNC119
[docs] 178 def drain_until_empty(self, max_items=None): 179 max_items, c = abs(max_items or float('inf')), 0; info(f'draining of {fullname(self)} started') 180 with ignore_qempty: 181 while c < max_items: yield self.get_nowait(); c += 1
[docs] 182 def drain_retlist(self, max_items=None): return list(self.drain_until_empty(max_items))
183 __iter__, __aiter__ = drain_persistent, drain_until_empty
[docs] 184 def shutdown(self, immediate=False): self._event.set(); super().shutdown(immediate)
185 def __repr__(self): return f'{fullname(self)}({self.maxsize})' 186 @property 187 def is_shutdown(self): return self._event.is_set() 188 @is_shutdown.setter 189 def is_shutdown(self, val, /): self.shutdown() if val else self.__init__(self.maxsize) 190 @property 191 def can_put_now(self): return not (self.is_shutdown or self.full()) 192 @property 193 def can_get_now(self): return not (self.is_shutdown or self.empty()) 194 @property 195 def fully_functional(self): return not (self.is_shutdown or self.full() or self.empty()) 196 @property 197 def capacity(self): return m if (m := self.maxsize) > 0 else float('inf') 198 @property 199 def remaining_capacity(self): return self.capacity-self.qsize() 200 @property 201 def utilization_rate(self): return self.qsize()/self.maxsize
[docs] 202 def pushpop_nowait(self, item, raising=True): 203 if self.is_shutdown: 204 if raising: raise D.QueueShutDown 205 return 206 if self.empty(): 207 if raising: raise D.QueueEmpty(f'{fullname(self)}.pushpop_nowait on {item!r} expected non-empty queue with raising=True') 208 return self.put_nowait(item) 209 if self.full(): 210 if raising: raise D.QueueFull(f'{fullname(self)}.pushpop_nowait on {item!r} expected non-full queue with raising=True') 211 r = self.get_nowait(); self.put_nowait(item); return r 212 self.put_nowait(item); return self.get_nowait()
[docs] 213 def poppush_nowait(self, item, raising=True): 214 if self.is_shutdown: 215 if raising: raise D.QueueShutDown 216 return 217 if self.empty(): 218 if raising: raise D.QueueEmpty(f'{fullname(self)}.pushpop_nowait on {item!r} expected non-empty queue with raising=True') 219 return self.put_nowait(item) 220 r = self.get_nowait(); self.put_nowait(item); return r
[docs] 221 async def pushpop(self, item): await self.put(item); return await self.get()
[docs] 222 async def poppush(self, item): r = await self.get(); await self.put(item); return r
[docs] 223 def clear(self): 224 with ignore_qempty: 225 while True: self.get_nowait()
[docs] 226 @A.dualcontextmanager 227 def transaction(self, _=A.IgnoreErrors(TimeoutError)): 228 audit((s := f'{fullname(self)}.transaction/%s')%'start', i := id(self)); q = self.peek_all() 229 try: yield self 230 except: 231 self.clear(); f = self.put_nowait 232 for _ in q: f(_) 233 raise 234 finally: audit(s%'end', i)
[docs] 235 def empty(self): return self.qsize() == 0
[docs] 236 def __bool__(self): return self.qsize() >= 0
[docs] 237 def map(self, f, stop_when=None, *, lifo=False): 238 audit(f'{fullname(self)}.map', id(self), fullname(f)) 239 if stop_when is None: 240 stop_when, E = A.AsyncCallbacksFuture(), (D.QueueShutDown, D.QueueEmpty) 241 async def get(g=self.drain_until_empty, /): # noqa: RUF029 242 try: 243 for i in g(): yield i 244 finally: stop_when.set_result(None) 245 else: 246 E = (D.QueueShutDown,) 247 async def get(g=self.get, /): 248 with ignore_qshutdown: 249 while True: yield await g() # noqa: ASYNC119 250 async def feed(q, s, g, /, f=f): 251 try: 252 while True: await q.put(await f(await anext(g))) 253 except E: await A.safe_cancel(s) 254 (s := A.AsyncCallbacksFuture(loop=self.loop)).add_noargs_async_callback(partial(A.safe_cancel, self.make(feed(q := (SmartLifoQueue if lifo else SmartQueue)(self.maxsize), s, get())))) 255 if stop_when: stop_when.add_done_callback(s.cancel) 256 return q
[docs] 257 def starmap(self, f, stop_when=None, *, lifo=False): 258 audit(f'{fullname(self)}.starmap', id(self), fullname(f)) 259 if stop_when is None: 260 stop_when, E = A.AsyncCallbacksFuture(), (D.QueueShutDown, D.QueueEmpty) 261 async def get(g=self.drain_until_empty, /): # noqa: RUF029 262 try: 263 for i in g(): yield i 264 finally: stop_when.set_result(None) 265 else: 266 E = D.QueueShutDown, 267 async def get(g=self.get, /): 268 while True: yield await g() 269 async def feed(q, s, g, /, f=f): 270 try: 271 async for _ in g: await q.put(await f(*_)) 272 except E: await A.safe_cancel(s) 273 (s := A.AsyncCallbacksFuture(loop=self.loop)).add_noargs_async_callback(partial(A.safe_cancel, self.make(feed(q := (SmartLifoQueue if lifo else SmartQueue)(self.maxsize), s, get())))) 274 if stop_when: stop_when.add_done_callback(s.cancel) 275 return q
[docs] 276 def filter(self, pred=bool, *, lifo=False): 277 audit(f'{fullname(self)}.filter', id(self), fullname(pred)) 278 q = (SmartLifoQueue if lifo else SmartQueue)(self.maxsize) 279 async def feed(f=self.smart_put, g=q.smart_put, h=self.get, _=pred): 280 with ignore_qshutdown: 281 while True: await (f if _(i := await h()) else g)(i) 282 self.make(feed()); return q
[docs] 283 def enumerate(self, *, lifo=False): 284 audit(f'{fullname(self)}.enumerate', id(self)) 285 q = (SmartLifoQueue if lifo else SmartQueue)(self.maxsize) 286 async def feed(): 287 i = 0 288 with ignore_qempty: 289 while True: await q.smart_put((i, await self.get())); i += 1 290 self.make(feed()); return q
[docs] 291 def filter_nowait(self, pred=bool, /): 292 f, g, a = (k := []).append, (r := []).append, self.get_nowait 293 with ignore_qempty: 294 while True: (f if pred(i := a()) else g)(i) 295 h, j = self.put_nowait, len(r) 296 for i in k: 297 try: h(i) 298 except D.QueueFull: g(i) 299 return r, j
[docs] 300 def enumerate_nowait(self, start=0, *, step=1): 301 with ignore_qempty: 302 while True: yield start, self.get_nowait(); start += step
303 P.patch_method_signatures((filter_nowait, 'pred=bool'), (transaction, ''), (drain_persistent, 'max_items=None, timeout=None'))
[docs] 304class SmartQueue(PotentQueueBase):
[docs] 305 def _init(self, maxsize): self.__queue = deque(maxlen=maxsize if maxsize > 0 else None)
[docs] 306 def _get(self): return self.__queue.popleft()
[docs] 307 def _put(self, item): self.__queue.append(item)
[docs] 308 def peek(self): 309 if q := self.__queue: return q[0] 310 raise D.QueueEmpty
[docs] 311 def peek_all(self): return list(self.__queue)
[docs] 312 def qsize(self): return len(self.__queue)
[docs] 313 def rotate(self, n=1, /): self.__queue.rotate(n)
314 def __bool__(self): return bool(self.__queue) 315 def empty(self): return not self
[docs] 316class SmartLifoQueue(PotentQueueBase):
[docs] 317 def _init(self, maxsize): self.__queue = []
[docs] 318 def _get(self): return self.__queue.pop()
[docs] 319 def _put(self, item): self.__queue.append(item)
[docs] 320 def peek(self, i=-1, /): 321 s = self.qsize() 322 if i < 0: i += s 323 if 0 <= i < s: return self.__queue[i] 324 raise IndexError(f'asyncutils.queues.SmartLifoQueue: failed to peek item at index {i}')
[docs] 325 def peek_all(self): return self.__queue.copy()
[docs] 326 def qsize(self): return len(self.__queue)
327 def __bool__(self): return bool(self.__queue) 328 def empty(self): return not self 329 def pushpop(self, item): raise NotImplementedError 330 def pushpop_nowait(self, item, raising=True): raise NotImplementedError
[docs] 331class SmartPriorityQueue(PotentQueueBase): 332 def __init__(self, maxsize=0, *, init_items=()): super().__init__(maxsize); self.make(self.start(maxsize, init_items))
[docs] 333 async def start(self, maxsize, init_items): q = await A.collect(I := A.iter_to_agen(init_items), maxsize); import heapq as H; H.heapify(q); self.__get, self.__put, self._unfinished_tasks, self.__queue = partial(H.heappop, q), partial(H.heappush, q), len(q), q; self._finished.clear(); await self.extend(I) # ty: ignore[unresolved-attribute]
[docs] 334 def _init(self, maxsize): ...
[docs] 335 def _get(self): return self.__get()
[docs] 336 def _put(self, item): self.__put(item)
[docs] 337 def peek(self): return self.__queue[0]
[docs] 338 def peek_all(self): return self.__queue.copy()
[docs] 339 def qsize(self): return len(self.__queue)
340 def __bool__(self): return bool(self.__queue) 341 def empty(self): return not self
[docs] 342class UserPriorityQueue(SmartPriorityQueue):
[docs] 343 @classmethod 344 def from_iter_of_tuples(cls, items, maxsize=0, _=SmartPriorityQueue): _.__init__(Q := object.__new__(cls), maxsize, init_items=items); Q.__tiebreak = count(); return Q
345 def __init__(self, maxsize=0, *, init_items=(), init_priority=0): self.__tiebreak = count(); super().__init__(maxsize, init_items=((init_priority, self._tiebreak, j) async for j in A.iter_to_agen(init_items))) 346 @property 347 def _tiebreak(self): return next(self.__tiebreak)
[docs] 348 def put_nowait(self, item, priority=0): super().put_nowait((priority, self._tiebreak, item))
[docs] 349 def put(self, item, priority=0): return super().put((priority, self._tiebreak, item))
[docs] 350 def get_nowait(self): return super().get_nowait()[-1]
[docs] 351 async def get(self): return (await super().get())[-1]
352del f, _, Q