Source code for asyncutils.queues

  1import asyncutils as A
  2from asyncutils.constants import _NO_DEFAULT
  3from asyncutils._internal import compat as D, patch as P
  4from asyncutils._internal.helpers import LoopMixinBase, check, get_loop_and_set, fullname
  5from asyncutils._internal.log import info
  6from asyncutils._internal.submodules import queues_all as __all__
  7from _collections import deque
  8from _functools import partial
  9from abc import ABCMeta, abstractmethod
 10from asyncio import Event, timeout as _timeout, wait_for
 11from itertools import count
 12from sys import _getframe, audit
 13ignore_qempty, ignore_qfull = map((f := (ignore_qshutdown := A.IgnoreErrors(D.QueueShutDown)).combined), _ := (D.QueueEmpty, D.QueueFull))
 14ignore_qerrs, f = f(*_), object.__setattr__
 15def _wakeup_next(W):
 16    P = W.popleft
 17    while W:
 18        if not (w := P()).done(): w.set_result(None); break
 19class Q:
 20    exc = A.ForbiddenOperation; __slots__ = 'maxsize', 'empty', 'qsize', 'full', 'get', 'get_nowait', 'put', 'put_nowait', 'change_get_password', 'change_put_password', 'task_done', 'join', 'shutdown', 'cancel_extend' # noqa: RUF023
 21    def __repr__(self): return f'<password-protected queue at {id(self):#x}>'
 22    def __new__(cls, /, *a, _=f.__get__, x='pwdq.'):
 23        (f := _(s := super().__new__(cls)))(*next(i := zip(cls.__slots__, a)))
 24        for n, v in i: v.__qualname__ = x+n; f(n, v)
 25        return s
 26    def __setattr__(self, name, value, /, _=__slots__[-1], f=f, s=frozenset(__slots__)):
 27        if name not in s: raise AttributeError(f'password-protected queue has no attribute {name!r}')
 28        if name != _: raise AttributeError(f'attribute/method {name!r} on password-protected queue is read-only')
 29        f(self, name, value)
 30    def __init_subclass__(cls, /, _=exc('subclass'), **k): raise _
 31    def _get(self, _=exc('call _get() on')): raise _
 32    def _put(self, _=exc('call _put() on')): raise _
 33    def _init(self, maxsize, _=exc('call _init() on')): raise _
 34    P.patch_method_signatures((__setattr__, 'name, value, /'), (_get, ''), (_put, ''), (_init, 'maxsize')); P.patch_classmethod_signatures((__init_subclass__, '**k'), (__new__, 'maxsize, empty, qsize, full, get, get_nowait, put, put_nowait, change_get_password, change_put_password, task_done, join, shutdown, cancel_extend, /'))
[docs] 35def password_queue(password_put=_NO_DEFAULT, password_get=_NO_DEFAULT, maxsize=0, *, protect_get=False, protect_put=True, can_change_get=False, can_change_put=False, priority=False, lifo=False, init_items=(), strict=True, get_from=None, put_from=None, gettyp=object, puttyp=object, _=Q): # noqa: C901,PLR0913,PLR0915 36 audit('asyncutils.queues.password_queue', get_from if protect_get else None, put_from if protect_put else None); C = A.getcontext() 37 try: F = _getframe(1) 38 except ValueError: F = None 39 if protect_get: 40 if password_get is _NO_DEFAULT: 41 if F is None or (password_get := F.f_locals.get(get_from := (C.PASSWORD_QUEUE_DEFAULT_GET_FROM if get_from is None else get_from).strip())) is None is (password_get := F.f_globals.get(get_from)): raise A.GetPasswordRetrievalError(get_from) # ty: ignore[invalid-argument-type] 42 elif get_from is not None: raise ValueError('asyncutils.queues.password_queue: only pass one of get_from or password_get') 43 if not isinstance(password_get, gettyp): raise A.WrongPasswordType(None, password_get, type(password_get), gettyp) 44 if protect_put: 45 if password_put is _NO_DEFAULT: 46 if F is None or (password_put := F.f_locals.get(put_from := (C.PASSWORD_QUEUE_DEFAULT_PUT_FROM if put_from is None else put_from).strip())) is None is (password_put := F.f_globals.get(put_from)): raise A.PutPasswordRetrievalError(put_from) # ty: ignore[invalid-argument-type] 47 elif put_from is not None: raise ValueError('asyncutils.queues.password_queue: only pass one of put_from or password_put') 48 if not isinstance(password_put, puttyp): raise A.WrongPasswordType(None, password_put, type(password_put), puttyp) 49 def s(p): 50 if not isinstance(p, gettyp): raise A.WrongPasswordType(q, p, type(p), gettyp) # ty: ignore[invalid-argument-type] 51 if p is not password_get and (strict or not check(p, password_get)): raise A.WrongPassword(q, p) # ty: ignore[invalid-argument-type] 52 def t(p): 53 if not isinstance(p, puttyp): raise A.WrongPasswordType(q, p, type(p), puttyp) # ty: ignore[invalid-argument-type] 54 if p is not password_put and (strict or not check(p, password_put)): raise A.WrongPassword(q, p) # ty: ignore[invalid-argument-type] 55 def u(p): 56 if not protect_get: return 57 if not p: raise A.GetPasswordMissing 58 p, = p; s(p) 59 def v(p): 60 if not protect_put: return 61 if not p: raise A.PutPasswordMissing 62 p, = p; t(p) 63 E, G, P, U, S, m, b = A.done_evt(), deque(), deque(), 0, False, (L := get_loop_and_set()).create_future, object() 64 if priority: g, p = partial((M := D if lifo else __import__('heapq')).heappop, l := []), partial(M.heappush, l) 65 else: g, p = (l := []).pop if lifo else (l := deque()).popleft, l.append 66 async def get(*p): 67 u(p); _ = G.append 68 while not l: 69 if S: raise D.QueueShutDown 70 F = m() 71 try: _(F); await F 72 except: 73 F.cancel() 74 with A.ignore_valerrs: G.remove(F) 75 if l and not F.cancelled(): _wakeup_next(G) 76 raise 77 return get_nowait(_=b) 78 def get_nowait(*p, _=None): 79 if not l: raise D.QueueShutDown if S else D.QueueEmpty 80 if _ is not b: u(p) 81 i = g(); _wakeup_next(P); return i 82 async def put(i, /, *p): 83 v(p); _ = P.append 84 while full(): 85 if S: raise D.QueueShutDown 86 _(F := m()) 87 try: await F 88 except: 89 F.cancel() 90 with A.ignore_valerrs: P.remove(F) 91 if not (full() or F.cancelled()): _wakeup_next(P) 92 raise 93 return put_nowait(i, _=b) 94 def put_nowait(i, /, *P, _=None): 95 if S: raise D.QueueShutDown 96 if full(): raise D.QueueFull 97 if _ is not b: v(P) 98 p(i); nonlocal U; U += 1; E.clear(); _wakeup_next(G) 99 def change_get_password(opw, npw): 100 if (S and not l) or not can_change_get: return False 101 if not isinstance(npw, gettyp): return False 102 try: s(opw) 103 except A.CRITICAL: raise A.Critical 104 except: return False 105 nonlocal password_get; password_get = npw; return True 106 def change_put_password(opw, npw): 107 if S or not can_change_put: return False 108 if not isinstance(npw, puttyp): return False 109 try: t(opw) 110 except A.CRITICAL: raise A.Critical 111 except: return False 112 nonlocal password_put; password_put = npw; return True 113 def task_done(): 114 nonlocal U 115 if U == 0: raise ValueError('task_done() called too many times') 116 U -= 1 117 if U == 0: E.set() 118 def shutdown(immediate=False): 119 nonlocal S, U; S = True 120 if immediate: 121 U -= len(l) 122 if U <= 0: U = 0; E.set() 123 l.clear() 124 for d in (G, P): 125 f = d.popleft 126 while d: 127 if not (F := f()).done(): F.set_result(None) 128 q = _(maxsize, lambda: not l, lambda: len(l), full := lambda: 0 < maxsize <= len(l), get, get_nowait, put, put_nowait, change_get_password, change_put_password, task_done, A.discard_retval(E.wait), shutdown, lambda msg=None: False) # noqa: ARG005 129 if init_items: 130 async def extend(f=D.partial(put, D.Placeholder, password_put)): 131 async for i in A.iter_to_agen(init_items): await f(i) 132 q.cancel_extend = L.create_task(extend()).cancel 133 return q
[docs] 134class PotentQueueBase(D.Queue, LoopMixinBase, metaclass=ABCMeta):
[docs] 135 @abstractmethod 136 def _init(self, maxsize): raise NotImplementedError
[docs] 137 @abstractmethod 138 def _get(self): raise NotImplementedError
[docs] 139 @abstractmethod 140 def _put(self, item): raise NotImplementedError
[docs] 141 @abstractmethod 142 def peek_all(self): raise NotImplementedError
[docs] 143 @abstractmethod 144 def qsize(self): raise NotImplementedError
145 def __init__(self, maxsize=0): super().__init__(maxsize); self._event = Event()
[docs] 146 def reset(self): super().__init__(self.maxsize); self._event.clear()
[docs] 147 async def smart_put(self, item, *, timeout=None, raising=True): 148 try: self.put_nowait(item); return True 149 except D.QueueFull: ... 150 try: await wait_for(self.put(item), timeout); return False 151 except TimeoutError: 152 if raising: raise
[docs] 153 async def smart_get(self, *, timeout=None, default=_NO_DEFAULT): 154 f = default is _NO_DEFAULT 155 try: return self.get_nowait() 156 except D.QueueShutDown: 157 if f: raise 158 return default 159 except D.QueueEmpty: 160 try: return await wait_for(self.get(), timeout) 161 except TimeoutError as e: 162 if f: raise e from None 163 return default
[docs] 164 async def extend(self, it, timeout=None): 165 info(f'extending {fullname(self)} with iterable {it!r}'); f = self.smart_put 166 async with _timeout(timeout): 167 async for i in A.iter_to_agen(it): await f(i)
[docs] 168 def push(self, item): 169 try: 170 if self.full(): audit(f'{fullname(self)}.push', id(self), item, self.get_nowait()) 171 self.put_nowait(item); return True 172 except D.QueueShutDown: return False
[docs] 173 async def drain_persistent(self, max_items=None, timeout=None, _=ignore_qshutdown.combined(TimeoutError)): 174 m, c = abs(max_items or float('inf')), 0; info(f'persistent draining of {fullname(self)} started') 175 with _: 176 while c < m: yield await wait_for(self.get(), timeout); self.task_done(); c += 1 # noqa: ASYNC119
[docs] 177 def drain_until_empty(self, max_items=None): 178 max_items, c = abs(max_items or float('inf')), 0; info(f'draining of {fullname(self)} started') 179 with ignore_qempty: 180 while c < max_items: yield self.get_nowait(); c += 1
[docs] 181 def drain_retlist(self, max_items=None): return list(self.drain_until_empty(max_items))
182 __iter__, __aiter__ = drain_persistent, drain_until_empty
[docs] 183 def shutdown(self, immediate=False): self._event.set(); super().shutdown(immediate)
184 def __repr__(self): return f'{fullname(self)}({self.maxsize})' 185 @property 186 def is_shutdown(self): return self._event.is_set() 187 @is_shutdown.setter 188 def is_shutdown(self, val, /): self.shutdown() if val else self.__init__(self.maxsize) 189 @property 190 def can_put_now(self): return not (self.is_shutdown or self.full()) 191 @property 192 def can_get_now(self): return not (self.is_shutdown or self.empty()) 193 @property 194 def fully_functional(self): return not (self.is_shutdown or self.full() or self.empty()) 195 @property 196 def capacity(self): return m if (m := self.maxsize) > 0 else float('inf') 197 @property 198 def remaining_capacity(self): return self.capacity-self.qsize() 199 @property 200 def utilization_rate(self): return self.qsize()/self.maxsize
[docs] 201 def pushpop_nowait(self, item, raising=True): 202 if self.is_shutdown: raise D.QueueShutDown 203 if self.empty(): 204 if raising: raise D.QueueEmpty(f'{fullname(self)}.pushpop_nowait on {item!r} expected non-empty queue with raising=True') 205 return self.put_nowait(item) 206 if self.full(): 207 if raising: raise D.QueueFull(f'{fullname(self)}.pushpop_nowait on {item!r} expected non-full queue with raising=True') 208 r = self.get_nowait(); self.put_nowait(item); return r 209 self.put_nowait(item); return self.get_nowait()
[docs] 210 def poppush_nowait(self, item, raising=True): 211 if self.is_shutdown: raise D.QueueShutDown 212 if self.empty(): 213 if raising: raise D.QueueEmpty(f'{fullname(self)}.pushpop_nowait on {item!r} expected non-empty queue with raising=True') 214 return self.put_nowait(item) 215 r = self.get_nowait(); self.put_nowait(item); return r
[docs] 216 async def pushpop(self, item): await self.put(item); return await self.get()
[docs] 217 async def poppush(self, item): r = await self.get(); await self.put(item); return r
[docs] 218 def clear(self): 219 with ignore_qempty: 220 while True: self.get_nowait()
[docs] 221 @A.dualcontextmanager 222 def transaction(self, _=A.IgnoreErrors(TimeoutError)): 223 audit((s := f'{fullname(self)}.transaction/%s')%'start', i := id(self)); q = self.peek_all() 224 try: yield self 225 except: 226 self.clear(); f = self.put_nowait 227 for _ in q: f(_) 228 raise 229 finally: audit(s%'end', i)
[docs] 230 def empty(self): return self.qsize() == 0
[docs] 231 def __bool__(self): return self.qsize() >= 0
[docs] 232 def map(self, f, stop_when=None, *, lifo=False): 233 audit(f'{fullname(self)}.map', id(self), fullname(f)) 234 if stop_when is None: 235 stop_when, E = A.AsyncCallbacksFuture(), (D.QueueShutDown, D.QueueEmpty) 236 async def get(g=self.drain_until_empty, /): # noqa: RUF029 237 try: 238 for i in g(): yield i 239 finally: stop_when.set_result(None) 240 else: 241 E = (D.QueueShutDown,) 242 async def get(g=self.get, /): 243 with ignore_qshutdown: 244 while True: yield await g() # noqa: ASYNC119 245 async def feed(q, s, g, /, f=f): 246 try: 247 while True: await q.put(await f(await anext(g))) 248 except E: await A.safe_cancel(s) 249 (s := A.AsyncCallbacksFuture(loop=self.loop)).add_noargs_async_callback(partial(A.safe_cancel, self.make(feed(q := (SmartLifoQueue if lifo else SmartQueue)(self.maxsize), s, get())))) 250 if stop_when: stop_when.add_done_callback(s.cancel) 251 return q
[docs] 252 def starmap(self, f, stop_when=None, *, lifo=False): 253 audit(f'{fullname(self)}.starmap', id(self), fullname(f)) 254 if stop_when is None: 255 stop_when, E = A.AsyncCallbacksFuture(), (D.QueueShutDown, D.QueueEmpty) 256 async def get(g=self.drain_until_empty, /): # noqa: RUF029 257 try: 258 for i in g(): yield i 259 finally: stop_when.set_result(None) 260 else: 261 E = D.QueueShutDown, 262 async def get(g=self.get, /): 263 while True: yield await g() 264 async def feed(q, s, g, /, f=f): 265 try: 266 async for _ in g: await q.put(await f(*_)) 267 except E: await A.safe_cancel(s) 268 (s := A.AsyncCallbacksFuture(loop=self.loop)).add_noargs_async_callback(partial(A.safe_cancel, self.make(feed(q := (SmartLifoQueue if lifo else SmartQueue)(self.maxsize), s, get())))) 269 if stop_when: stop_when.add_done_callback(s.cancel) 270 return q
[docs] 271 def filter(self, pred=bool, *, lifo=False): 272 audit(f'{fullname(self)}.filter', id(self), fullname(pred)) 273 q = (SmartLifoQueue if lifo else SmartQueue)(self.maxsize) 274 async def feed(f=self.smart_put, g=q.smart_put, h=self.get, _=pred): 275 with ignore_qshutdown: 276 while True: await (f if _(i := await h()) else g)(i) 277 self.make(feed()); return q
[docs] 278 def enumerate(self, *, lifo=False): 279 audit(f'{fullname(self)}.enumerate', id(self)) 280 q = (SmartLifoQueue if lifo else SmartQueue)(self.maxsize) 281 async def feed(): 282 i = 0 283 with ignore_qempty: 284 while True: await q.smart_put((i, await self.get())); i += 1 285 self.make(feed()); return q
[docs] 286 def filter_nowait(self, pred=bool, /): 287 f, g, a = (k := []).append, (r := []).append, self.get_nowait 288 with ignore_qempty: 289 while True: (f if pred(i := a()) else g)(i) 290 h, j = self.put_nowait, len(r) 291 for i in k: 292 try: h(i) 293 except D.QueueFull: g(i) 294 return r, j
[docs] 295 def enumerate_nowait(self, start=0, *, step=1): 296 with ignore_qempty: 297 while True: yield start, self.get_nowait(); start += step
298 P.patch_method_signatures((filter_nowait, 'pred=bool'), (transaction, ''), (drain_persistent, 'max_items=None, timeout=None'))
[docs] 299class SmartQueue(PotentQueueBase):
[docs] 300 def _init(self, maxsize): self.__queue = deque(maxlen=maxsize if maxsize > 0 else None)
[docs] 301 def _get(self): return self.__queue.popleft()
[docs] 302 def _put(self, item): self.__queue.append(item)
[docs] 303 def peek(self): 304 if q := self.__queue: return q[0] 305 raise D.QueueEmpty
[docs] 306 def peek_all(self): return list(self.__queue)
[docs] 307 def qsize(self): return len(self.__queue)
[docs] 308 def rotate(self, n=1, /): self.__queue.rotate(n)
309 def __bool__(self): return bool(self.__queue) 310 def empty(self): return not self
[docs] 311class SmartLifoQueue(PotentQueueBase):
[docs] 312 def _init(self, maxsize): self.__queue = []
[docs] 313 def _get(self): return self.__queue.pop()
[docs] 314 def _put(self, item): self.__queue.append(item)
[docs] 315 def peek(self, i=-1, /): 316 s = self.qsize() 317 if i < 0: i += s 318 if 0 <= i < s: return self.__queue[i] 319 raise IndexError(f'asyncutils.queues.SmartLifoQueue: failed to peek item at index {i}')
[docs] 320 def peek_all(self): return self.__queue.copy()
[docs] 321 def qsize(self): return len(self.__queue)
322 def __bool__(self): return bool(self.__queue) 323 def empty(self): return not self 324 def pushpop(self, item): raise NotImplementedError 325 def pushpop_nowait(self, item, raising=True): raise NotImplementedError
[docs] 326class SmartPriorityQueue(PotentQueueBase): 327 def __init__(self, maxsize=0, *, init_items=()): super().__init__(maxsize); self.make(self.start(maxsize, init_items))
[docs] 328 async def start(self, maxsize, init_items): q = await A.collect(I := A.iter_to_agen(init_items), maxsize); import heapq as H; H.heapify(q); self.__get, self.__put, self._unfinished_tasks, self.__queue = partial(H.heappop, q), partial(H.heappush, q), len(q), q; self._finished.clear(); await self.extend(I) # ty: ignore[unresolved-attribute]
[docs] 329 def _init(self, maxsize): ...
[docs] 330 def _get(self): return self.__get()
[docs] 331 def _put(self, item): self.__put(item)
[docs] 332 def peek(self): return self.__queue[0]
[docs] 333 def peek_all(self): return self.__queue.copy()
[docs] 334 def qsize(self): return len(self.__queue)
335 def __bool__(self): return bool(self.__queue) 336 def empty(self): return not self
[docs] 337class UserPriorityQueue(SmartPriorityQueue):
[docs] 338 @classmethod 339 def from_iter_of_tuples(cls, items, maxsize=0, _=SmartPriorityQueue): _.__init__(Q := object.__new__(cls), maxsize, init_items=items); Q.__tiebreak = count(); return Q
340 def __init__(self, maxsize=0, *, init_items=(), init_priority=0): self.__tiebreak = count(); super().__init__(maxsize, init_items=((init_priority, self._tiebreak, j) async for j in A.iter_to_agen(init_items))) 341 @property 342 def _tiebreak(self): return next(self.__tiebreak)
[docs] 343 def put_nowait(self, item, priority=0): super().put_nowait((priority, self._tiebreak, item))
[docs] 344 def put(self, item, priority=0): return super().put((priority, self._tiebreak, item))
[docs] 345 def get_nowait(self): return super().get_nowait()[-1]
[docs] 346 async def get(self): return (await super().get())[-1]
347del f, _, Q