Source code for asyncutils.base

  1# ty: ignore[unresolved-attribute]
  2from asyncutils._internal import compat as Z, helpers as H, log as L, patch as P
  3from asyncutils._internal.submodules import base_all as __all__
  4from asyncutils.constants import _NO_DEFAULT, RAISE
  5from _functools import partial
  6import asyncutils as A, asyncio as I
  7from itertools import batched, repeat
  8from sys import audit, exc_info
  9b, c = H.check_methods, H.fullname
[docs] 10class event_loop: # noqa: N801 11 __reusable, constructor_args = [], ('dont_release_loop_on_finalization', 'silent_on_finalize', 'dont_try_clear_tasks_on_reuse', 'close_existing_on_exit', 'dont_always_stop_on_exit', 'dont_close_created_on_exit', 'cancel_all_tasks', 'keep_loop', 'suppress_runtime_errors', 'fail_silent', 'dont_allow_reuse', 'dont_reuse', 'dont_attempt_enter', 'attempt_aenter', 'suppress_inner_exit_on_runtime_error', 'suppress_inner_aexit_on_runtime_error'); __slots__ = '_flags', '_istr', '_loop', '_state', '_task'
[docs] 12 def _get_unclosed_loop(self, factory=I.new_event_loop, _=A.IgnoreErrors(AttributeError)): # pragma: no cover 13 if self._flags&0x800: return factory() 14 p, L = (pool := self.__reusable).pop, None 15 while pool and ((L := p()).is_closed() or L.is_running()): ... 16 if L is None: return factory() 17 if not self._flags&4: 18 with _: L._ready.clear() 19 with _: L._scheduled.clear() 20 return L
[docs] 21 def factory_reset(self): self._flags = A.getcontext().EVENT_LOOP_BASE_FLAGS
[docs] 22 def clear_flags(self, mask_to_keep=0): self._flags &= mask_to_keep
[docs] 23 def copy_flags(self): return self.from_flags(self._flags)
[docs] 24 def flags_eq(self, o, /): return self._flags == (o if isinstance(o, int) else o._flags)
[docs] 25 @classmethod 26 def from_flags(cls, flags, /, _=c, m=0x10000): 27 if not 0 <= flags < m: raise OverflowError(f'asyncutils.base.event_loop: flags value {flags:#x} has forbidden bits set') 28 r._flags, r._state, r._istr = flags, 0, f'{_(cls)} at {id(r := object.__new__(cls)):#x}'; return r
29 def __new__(cls, /, **k): 30 F, p, s = A.getcontext().EVENT_LOOP_BASE_FLAGS, k.pop, 1 31 for f in cls.constructor_args: 32 if (x := p(f, None)) is None: ... 33 elif x: F |= s 34 else: F &= ~s 35 s <<= 1 36 if k: A.raise_exc(TypeError, 'asyncutils.base.event_loop: got unexpected keyword arguments; shown below', notes=k) # pragma: no cover 37 return cls.from_flags(F)
[docs] 38 def __hash__(self): return self._flags
[docs] 39 def __enter__(self, _='asyncutils.base.event_loop: context already entered'): 40 f = self._flags 41 if (s := self._state)&1: # pragma: no cover 42 if f&0x200: return self._loop 43 raise RuntimeError(_) 44 if (l := I._get_running_loop()) is None: I.set_event_loop(l := self._get_unclosed_loop()) 45 else: s |= 2 46 if not f&0x1000 and callable(g := getattr(l, '__enter__', None)): # pragma: no cover 47 try: g(); s |= 4 48 except A.CRITICAL: raise A.Critical 49 except BaseException as e: 50 if not f&0x200: raise RuntimeError(f'{self._istr}: exception occurred while calling __enter__ of associated event loop: {e}') from e 51 if f&0x2000 and callable(g := getattr(l, '__aenter__', None)): 52 try: I.run_coroutine_threadsafe(g(), l).result(); s |= 8 53 except A.CRITICAL: raise A.Critical 54 except BaseException as e: 55 if not f&0x200: raise RuntimeError(f'{self._istr}: exception occurred while calling __aenter__ of associated event loop: {e}') from e 56 self._loop, self._state = l, s+1; return l
[docs] 57 def __exit__(self, t, v, b, /, _m='%s context not entered', _n='%s context not entered with errors passed into __exit__', _i=A.IgnoreErrors(RuntimeError), _l=L): # noqa: C901,PLR0912 58 n, f, l = self._istr, self._flags, self._loop 59 if not (s := self._state)&1: 60 if f&0x200: return False 61 raise RuntimeError(_m%n) if v is None else BaseExceptionGroup(_n%n, tuple(A.unnest_reverse(v))).with_traceback(b) 62 if f&0x40: self._task = l.create_task(safe_cancel_batch(I.all_tasks(l))) 63 if not f&0x400: self.__reusable.append(l) 64 if not ((c := s&2) and f&0x10): 65 with _i: l.stop() 66 q, r, self._state = t is not None and issubclass(t, RuntimeError), False, s-1 67 if s&4: # pragma: no cover 68 if callable(g := getattr(l, '__exit__', None)): 69 try: r = g(None, None, None) if f&0x4000 and q else g(t, v, b) 70 except A.CRITICAL: _l.critical('%s: critical error while calling __exit__ of associated event loop', n, exc_info=True) 71 except RuntimeError: 72 if not f&0x100: _l.exception('event loop management shenanigans while exiting associated event loop') 73 except: 74 if not f&0x200: _l.exception('%s: exception occurred while calling __exit__ of associated event loop', n) 75 elif not f&0x200: _l.error('%s: __enter__ already called but __exit__ is not present', n) 76 if s&8: # pragma: no cover 77 if callable(g := getattr(l, '__aexit__', None)) and not r: 78 try: r = I.run_coroutine_threadsafe(g(None, None, None) if f&0x8000 else g(t, v, b), l).result() 79 except A.CRITICAL: _l.critical('%s: critical error while calling __aexit__ of associated event loop', n, exc_info=True) 80 except RuntimeError: 81 if not f&0x100: _l.exception('runtime error exiting associated event loop') 82 except: 83 if not f&0x200: _l.exception('%s: exception occurred while calling __aexit__ of associated event loop', n) 84 elif not f&0x200: _l.error('%s: __aenter__ already called but __aexit__ is not present', n) 85 if f&8 or not (c or f&0x20): 86 with _i: l.close() 87 I.set_event_loop(None) 88 if not f&0x80: del self._loop 89 return r or (q and (f>>8)&1)
90 def __del__(self, _f=L.debug, _g=L.warning, _m='%s: garbage-collecting entered context; you are advised to refactor your code', _w='%s: cannot suppress exceptions from within destructor', _d='destroyed %s'): # pragma: no cover 91 b, n = not (f := self._flags)&2, self._istr 92 if not self._state&1: 93 if b: _f(_d, n) 94 return 95 if b: _g(_m, n) 96 if f&1: self._flags = f^0x400 97 if self.__exit__(*exc_info()) and b: _g(_w, n)
[docs] 98 def __reduce__(self, /): return self.from_flags, (self._flags,)
99 def __repr__(self, _=c): return f'{_(self)}.from_flags({self._flags:#4x})' 100 P.patch_method_signatures((__enter__, ''), (__exit__, P.xsig), (__del__, ''), (_get_unclosed_loop, 'factory={}')); P.patch_classmethod_signatures((from_flags, 'flags, /'), (__new__, f'*, {"={0}, ".join(constructor_args)}={{0}}'))
101def f(n): 102 async def adisembowel(it, /): # pragma: no cover 103 if callable(p := getattr(it, n, None)): 104 while it: yield p() 105 if callable(p := getattr(it, 'clear', None)) and I.iscoroutine(p := p()): await p 106 else: 107 async for i in iter_to_agen(it): yield i 108 return adisembowel 109adisembowel, adisembowelleft = map(f, ('pop', 'popleft'))
[docs] 110async def safe_cancel_batch(t, /, *, callback=None, disembowel=False, raising=False, _=c): 111 audit('asyncutils.base.safe_cancel_batch', _(t)); a = (l := []).append 112 async for F in (adisembowel if disembowel else iter_to_agen)(t): 113 if not F.done(): F.cancel(); a(F) 114 r = await I.gather(*l, return_exceptions=True) 115 if callback is not None: 116 async def f(a, /, _=callback): return (await r) if I.iscoroutine(r := _(a)) else r 117 L = len(r := await I.gather(*map(f, r), return_exceptions=True)) 118 if raising and (E := tuple(A.unnest_reverse(*filter(BaseException.__instancecheck__, r)))): raise BaseExceptionGroup(f'asyncutils.base.safe_cancel_batch: {f"flattened {L} exception (groups)" if len(E) < L else f"collected {L} exceptions"} thrown by callback function {callback!r}', E)
[docs] 119async def iter_to_agen(it, sentinel=_NO_DEFAULT, *, use_existing_executor=None, create_executor=None, strict=None, a=c, b=b, c=H.check, s=H.create_executor, h=H.get_loop_and_set, w=L.debug, _=type('', (), {'__slots__': ('it',), '__init__': lambda self, it: setattr(self, 'it', it), '__bool__': lambda self, _=b: _(self.it, 'send', 'throw', 'close'), '__enter__': lambda self: None, '__exit__': lambda self, t, v, b, /, _=frozenset(('StopIteration interacts badly with generators and cannot be raised into a Future', 'async generator raised StopIteration')): False if t is None else str(v) in _ if t is RuntimeError else (((True if (C := getattr(self.it, 'close', None)) is None else C()) if t is StopAsyncIteration else (True if (T := getattr(self.it, 'throw', None)) is None else T(v))) or True)})): # noqa: ARG005,C901,PLR0912 120 # ruff: disable[ASYNC119] 121 audit('asyncutils.base.iter_to_agen', a(it)) 122 if type(it) in Z.s: 123 for i in batched(it, 0x400): 124 for _ in i: yield _ 125 await A.yield_to_event_loop 126 return 127 C = A.getcontext() 128 if b(it, '__aiter__') and not (C.ITER_TO_AGEN_DEFAULT_STRICT if strict is None else strict): 129 if sentinel is _NO_DEFAULT: 130 async for _ in it: yield _ 131 elif b(it, 'asend', 'athrow', 'aclose'): 132 l = await (_ := it.asend)(None) 133 while not c(l, sentinel): l = await _((yield l)) 134 else: 135 async for l in it: 136 if c(l, sentinel): break 137 yield l 138 return 139 elif not b(it, '__iter__'): raise TypeError(f'asyncutils.base.iter_to_agen: cannot iterate over {it!r} synchronously or asynchronously') 140 e, g = None, _(it := iter(it)) 141 if create_executor is None: create_executor = C.ITER_TO_AGEN_DEFAULT_MAY_CREATE_EXECUTOR 142 if C.ITER_TO_AGEN_DEFAULT_USE_EXISTING_EXECUTOR if use_existing_executor is None else use_existing_executor: 143 if (e := getattr(iter_to_agen, 'executor', None)) is None: 144 if create_executor: e = s(iter_to_agen) 145 else: w('asyncutils.base.iter_to_agen: no existing executor') 146 elif create_executor: e = s(iter_to_agen, False) 147 with g: 148 if e is None: 149 if g: 150 l = (_ := it.send)(None) 151 while not c(l, sentinel): l = _((yield l)); await A.yield_to_event_loop 152 else: 153 while not c(l := next(it, sentinel), sentinel): yield l; await A.yield_to_event_loop 154 else: 155 def r(*a, _=h().run_in_executor, e=e): return partial(_, e, *a) 156 if g: 157 l = await (_ := r(it.send))(None) 158 while True: 159 if c(l, sentinel): break 160 l = await _((yield l)) 161 else: 162 _ = r(next, it) 163 while True: 164 if c((l := await _()), sentinel): break 165 yield l
166 # ruff: enable[ASYNC119]
[docs] 167def aiter_to_gen(ait, *, use_futures=None, loop=None, strict=None, a=c, b=b, g=H.get_loop_and_set): 168 audit('asyncutils.base.aiter_to_gen', a(ait)); C, e = A.getcontext(), I.futures._chain_future 169 if b(ait, '__iter__') and not (C.AITER_TO_GEN_DEFAULT_STRICT if strict is None else strict): yield from ait; return 170 if not b(ait, '__aiter__'): raise TypeError(f'asyncutils.base.aiter_to_gen: cannot iterate over {ait!r} synchronously or asynchronously') 171 d = b(ait := aiter(ait), 'asend', 'athrow', 'aclose') 172 with A.ignore_stopaiteration: 173 if loop is None: loop = g() 174 if loop.is_running(): 175 if not (C.AITER_TO_GEN_DEFAULT_ALLOW_FUTURES if use_futures is None else use_futures): raise RuntimeError(f'asyncutils.base.aiter_to_gen: cannot convert async iterator {ait!r} to sync in running event loop without using futures') 176 def f(*a, f, c=loop.create_task, g=e, t=I.Future): return g(c(f(*a)), F := t()) or F.result() 177 if d: 178 p, x = partial(f, f=ait.asend), None 179 while True: x = yield p(x) 180 else: 181 p = partial(f, f=ait.__anext__) 182 while True: yield p() 183 else: 184 a = loop.run_until_complete 185 if d: 186 p, x = ait.asend, None 187 while True: x = yield a(p(x)) 188 else: 189 p = ait.__anext__ 190 while True: yield a(p())
[docs] 191async def take(it, n=None, *, default=_NO_DEFAULT, _=L.debug, m='asyncutils.base.take: ran out of items'): 192 if n is None: 193 async for i in iter_to_agen(it): yield i 194 return 195 if n == 0: return 196 async for n, i in aenumerate(it, n-1, step=-1): # noqa: B020,PLR1704 197 yield i 198 if n == 0: return 199 if default is RAISE: raise A.ItemsExhausted(m) 200 if default is _NO_DEFAULT: _(m) 201 else: 202 for _ in repeat(default, n): yield _
[docs] 203async def collect(it, n=None, *, default=_NO_DEFAULT, _='asyncutils.base.collect: ran out of items'): return [i async for i in take(it, n, default=default, m=_)]
[docs] 204async def collect_into(out, it, n=None, *, default=_NO_DEFAULT, _='asyncutils.base.collect_into: ran out of items'): 205 a = out.append 206 async for i in take(it, n, default=default, m=_): a(i)
[docs] 207async def drop(it, n, *, raising=False, _=L.debug, m='asyncutils.base.drop: ran out of items'): 208 it = iter_to_agen(it) 209 if n == 0: 210 async for i in it: yield i 211 return 212 async for i, j in aenumerate(it): 213 if i == n: yield j; break 214 else: 215 if raising: raise A.ItemsExhausted(m) 216 _(m); return 217 async for j in it: yield j
[docs] 218async def aenumerate(it, start=0, *, step=1): 219 async for _ in iter_to_agen(it): yield start, _; start += step
220P.patch_function_signatures((safe_cancel_batch, 'batch, /, *, callback=None, disembowel=False, raising=False'), (iter_to_agen, 'it, sentinel={}, *, use_existing_executor=None, create_executor=None, strict=None'), (aiter_to_gen, 'ait, *, use_futures=None, loop=None, strict=None'), (collect, 'it, n=None, *, default={}'), (take, 'it, n, *, default={}'), (drop, 'it, n, *, raising=False')) 221yield_to_event_loop, sleep_forever = object.__new__(type('', (), {'__new__': lambda _: yield_to_event_loop, '__await__': (_ := lambda _: (yield)), **dict.fromkeys(('__repr__', '__str__', '__reduce__'), lambda _, r='asyncutils.base.yield_to_event_loop': r)})), I.sleep.__get__(float('inf')) 222(dummy_task := type(_)(_.__code__.replace(co_flags=0x161, co_name=(_ := 'dummy_task'), co_qualname=_), globals())(None)).close() 223del f, _, P, L, b, c, H