1# ty: ignore[unresolved-attribute]
2from asyncutils._internal import compat as Z, helpers as H, log as L, patch as P
3from asyncutils._internal.submodules import base_all as __all__
4from asyncutils.constants import _NO_DEFAULT, RAISE
5from _functools import partial
6import asyncutils as A, asyncio as I
7from itertools import batched, repeat
8from sys import audit, exc_info
9b, c = H.check_methods, H.fullname
[docs]
10class event_loop: # noqa: N801
11 __reusable, constructor_args = [], ('dont_release_loop_on_finalization', 'silent_on_finalize', 'dont_try_clear_tasks_on_reuse', 'close_existing_on_exit', 'dont_always_stop_on_exit', 'dont_close_created_on_exit', 'cancel_all_tasks', 'keep_loop', 'suppress_runtime_errors', 'fail_silent', 'dont_allow_reuse', 'dont_reuse', 'dont_attempt_enter', 'attempt_aenter', 'suppress_inner_exit_on_runtime_error', 'suppress_inner_aexit_on_runtime_error'); __slots__ = '_flags', '_istr', '_loop', '_state', '_task'
[docs]
12 def _get_unclosed_loop(self, factory=I.new_event_loop, _=A.IgnoreErrors(AttributeError)): # pragma: no cover
13 if self._flags&0x800: return factory()
14 p, L = (pool := self.__reusable).pop, None
15 while pool and ((L := p()).is_closed() or L.is_running()): ...
16 if L is None: return factory()
17 if not self._flags&4:
18 with _: L._ready.clear()
19 with _: L._scheduled.clear()
20 return L
[docs]
21 def factory_reset(self): self._flags = A.getcontext().EVENT_LOOP_BASE_FLAGS
[docs]
22 def clear_flags(self, mask_to_keep=0): self._flags &= mask_to_keep
[docs]
23 def copy_flags(self): return self.from_flags(self._flags)
[docs]
24 def flags_eq(self, o, /): return self._flags == (o if isinstance(o, int) else o._flags)
[docs]
25 @classmethod
26 def from_flags(cls, flags, /, _=c, m=0x10000):
27 if not 0 <= flags < m: raise OverflowError(f'asyncutils.base.event_loop: flags value {flags:#x} has forbidden bits set')
28 r._flags, r._state, r._istr = flags, 0, f'{_(cls)} at {id(r := object.__new__(cls)):#x}'; return r
29 def __new__(cls, /, **k):
30 F, p, s = A.getcontext().EVENT_LOOP_BASE_FLAGS, k.pop, 1
31 for f in cls.constructor_args:
32 if (x := p(f, None)) is None: ...
33 elif x: F |= s
34 else: F &= ~s
35 s <<= 1
36 if k: A.raise_exc(TypeError, 'asyncutils.base.event_loop: got unexpected keyword arguments; shown below', notes=k) # pragma: no cover
37 return cls.from_flags(F)
[docs]
38 def __hash__(self): return self._flags
[docs]
39 def __enter__(self, _='asyncutils.base.event_loop: context already entered'):
40 f = self._flags
41 if (s := self._state)&1: # pragma: no cover
42 if f&0x200: return self._loop
43 raise RuntimeError(_)
44 if (l := I._get_running_loop()) is None: I.set_event_loop(l := self._get_unclosed_loop())
45 else: s |= 2
46 if not f&0x1000 and callable(g := getattr(l, '__enter__', None)): # pragma: no cover
47 try: g(); s |= 4
48 except A.CRITICAL: raise A.Critical
49 except BaseException as e:
50 if not f&0x200: raise RuntimeError(f'{self._istr}: exception occurred while calling __enter__ of associated event loop: {e}') from e
51 if f&0x2000 and callable(g := getattr(l, '__aenter__', None)):
52 try: I.run_coroutine_threadsafe(g(), l).result(); s |= 8
53 except A.CRITICAL: raise A.Critical
54 except BaseException as e:
55 if not f&0x200: raise RuntimeError(f'{self._istr}: exception occurred while calling __aenter__ of associated event loop: {e}') from e
56 self._loop, self._state = l, s+1; return l
[docs]
57 def __exit__(self, t, v, b, /, _m='%s context not entered', _n='%s context not entered with errors passed into __exit__', _i=A.IgnoreErrors(RuntimeError), _l=L): # noqa: C901,PLR0912
58 n, f, l = self._istr, self._flags, self._loop
59 if not (s := self._state)&1:
60 if f&0x200: return False
61 raise RuntimeError(_m%n) if v is None else BaseExceptionGroup(_n%n, tuple(A.unnest_reverse(v))).with_traceback(b)
62 if f&0x40: self._task = l.create_task(safe_cancel_batch(I.all_tasks(l)))
63 if not f&0x400: self.__reusable.append(l)
64 if not ((c := s&2) and f&0x10):
65 with _i: l.stop()
66 q, r, self._state = t is not None and issubclass(t, RuntimeError), False, s-1
67 if s&4: # pragma: no cover
68 if callable(g := getattr(l, '__exit__', None)):
69 try: r = g(None, None, None) if f&0x4000 and q else g(t, v, b)
70 except A.CRITICAL: _l.critical('%s: critical error while calling __exit__ of associated event loop', n, exc_info=True)
71 except RuntimeError:
72 if not f&0x100: _l.exception('event loop management shenanigans while exiting associated event loop')
73 except:
74 if not f&0x200: _l.exception('%s: exception occurred while calling __exit__ of associated event loop', n)
75 elif not f&0x200: _l.error('%s: __enter__ already called but __exit__ is not present', n)
76 if s&8: # pragma: no cover
77 if callable(g := getattr(l, '__aexit__', None)) and not r:
78 try: r = I.run_coroutine_threadsafe(g(None, None, None) if f&0x8000 else g(t, v, b), l).result()
79 except A.CRITICAL: _l.critical('%s: critical error while calling __aexit__ of associated event loop', n, exc_info=True)
80 except RuntimeError:
81 if not f&0x100: _l.exception('runtime error exiting associated event loop')
82 except:
83 if not f&0x200: _l.exception('%s: exception occurred while calling __aexit__ of associated event loop', n)
84 elif not f&0x200: _l.error('%s: __aenter__ already called but __aexit__ is not present', n)
85 if f&8 or not (c or f&0x20):
86 with _i: l.close()
87 I.set_event_loop(None)
88 if not f&0x80: del self._loop
89 return r or (q and (f>>8)&1)
90 def __del__(self, _f=L.debug, _g=L.warning, _m='%s: garbage-collecting entered context; you are advised to refactor your code', _w='%s: cannot suppress exceptions from within destructor', _d='destroyed %s'): # pragma: no cover
91 b, n = not (f := self._flags)&2, self._istr
92 if not self._state&1:
93 if b: _f(_d, n)
94 return
95 if b: _g(_m, n)
96 if f&1: self._flags = f^0x400
97 if self.__exit__(*exc_info()) and b: _g(_w, n)
[docs]
98 def __reduce__(self, /): return self.from_flags, (self._flags,)
99 def __repr__(self, _=c): return f'{_(self)}.from_flags({self._flags:#4x})'
100 P.patch_method_signatures((__enter__, ''), (__exit__, P.xsig), (__del__, ''), (_get_unclosed_loop, 'factory={}')); P.patch_classmethod_signatures((from_flags, 'flags, /'), (__new__, f'*, {"={0}, ".join(constructor_args)}={{0}}'))
101def f(n):
102 async def adisembowel(it, /): # pragma: no cover
103 if callable(p := getattr(it, n, None)):
104 while it: yield p()
105 if callable(p := getattr(it, 'clear', None)) and I.iscoroutine(p := p()): await p
106 else:
107 async for i in iter_to_agen(it): yield i
108 return adisembowel
109adisembowel, adisembowelleft = map(f, ('pop', 'popleft'))
[docs]
110async def safe_cancel_batch(t, /, *, callback=None, disembowel=False, raising=False, _=c):
111 audit('asyncutils.base.safe_cancel_batch', _(t)); a = (l := []).append
112 async for F in (adisembowel if disembowel else iter_to_agen)(t):
113 if not F.done(): F.cancel(); a(F)
114 r = await I.gather(*l, return_exceptions=True)
115 if callback is not None:
116 async def f(a, /, _=callback): return (await r) if I.iscoroutine(r := _(a)) else r
117 L = len(r := await I.gather(*map(f, r), return_exceptions=True))
118 if raising and (E := tuple(A.unnest_reverse(*filter(BaseException.__instancecheck__, r)))): raise BaseExceptionGroup(f'asyncutils.base.safe_cancel_batch: {f"flattened {L} exception (groups)" if len(E) < L else f"collected {L} exceptions"} thrown by callback function {callback!r}', E)
[docs]
119async def iter_to_agen(it, sentinel=_NO_DEFAULT, *, use_existing_executor=None, create_executor=None, strict=None, a=c, b=b, c=H.check, s=H.create_executor, h=H.get_loop_and_set, w=L.debug, _=type('', (), {'__slots__': ('it',), '__init__': lambda self, it: setattr(self, 'it', it), '__bool__': lambda self, _=b: _(self.it, 'send', 'throw', 'close'), '__enter__': lambda self: None, '__exit__': lambda self, t, v, b, /, _=frozenset(('StopIteration interacts badly with generators and cannot be raised into a Future', 'async generator raised StopIteration')): False if t is None else str(v) in _ if t is RuntimeError else (((True if (C := getattr(self.it, 'close', None)) is None else C()) if t is StopAsyncIteration else (True if (T := getattr(self.it, 'throw', None)) is None else T(v))) or True)})): # noqa: ARG005,C901,PLR0912
120 # ruff: disable[ASYNC119]
121 audit('asyncutils.base.iter_to_agen', a(it))
122 if type(it) in Z.s:
123 for i in batched(it, 0x400):
124 for _ in i: yield _
125 await A.yield_to_event_loop
126 return
127 C = A.getcontext()
128 if b(it, '__aiter__') and not (C.ITER_TO_AGEN_DEFAULT_STRICT if strict is None else strict):
129 if sentinel is _NO_DEFAULT:
130 async for _ in it: yield _
131 elif b(it, 'asend', 'athrow', 'aclose'):
132 l = await (_ := it.asend)(None)
133 while not c(l, sentinel): l = await _((yield l))
134 else:
135 async for l in it:
136 if c(l, sentinel): break
137 yield l
138 return
139 elif not b(it, '__iter__'): raise TypeError(f'asyncutils.base.iter_to_agen: cannot iterate over {it!r} synchronously or asynchronously')
140 e, g = None, _(it := iter(it))
141 if create_executor is None: create_executor = C.ITER_TO_AGEN_DEFAULT_MAY_CREATE_EXECUTOR
142 if C.ITER_TO_AGEN_DEFAULT_USE_EXISTING_EXECUTOR if use_existing_executor is None else use_existing_executor:
143 if (e := getattr(iter_to_agen, 'executor', None)) is None:
144 if create_executor: e = s(iter_to_agen)
145 else: w('asyncutils.base.iter_to_agen: no existing executor')
146 elif create_executor: e = s(iter_to_agen, False)
147 with g:
148 if e is None:
149 if g:
150 l = (_ := it.send)(None)
151 while not c(l, sentinel): l = _((yield l)); await A.yield_to_event_loop
152 else:
153 while not c(l := next(it, sentinel), sentinel): yield l; await A.yield_to_event_loop
154 else:
155 def r(*a, _=h().run_in_executor, e=e): return partial(_, e, *a)
156 if g:
157 l = await (_ := r(it.send))(None)
158 while True:
159 if c(l, sentinel): break
160 l = await _((yield l))
161 else:
162 _ = r(next, it)
163 while True:
164 if c((l := await _()), sentinel): break
165 yield l
166 # ruff: enable[ASYNC119]
[docs]
167def aiter_to_gen(ait, *, use_futures=None, loop=None, strict=None, a=c, b=b, g=H.get_loop_and_set):
168 audit('asyncutils.base.aiter_to_gen', a(ait)); C, e = A.getcontext(), I.futures._chain_future
169 if b(ait, '__iter__') and not (C.AITER_TO_GEN_DEFAULT_STRICT if strict is None else strict): yield from ait; return
170 if not b(ait, '__aiter__'): raise TypeError(f'asyncutils.base.aiter_to_gen: cannot iterate over {ait!r} synchronously or asynchronously')
171 d = b(ait := aiter(ait), 'asend', 'athrow', 'aclose')
172 with A.ignore_stopaiteration:
173 if loop is None: loop = g()
174 if loop.is_running():
175 if not (C.AITER_TO_GEN_DEFAULT_ALLOW_FUTURES if use_futures is None else use_futures): raise RuntimeError(f'asyncutils.base.aiter_to_gen: cannot convert async iterator {ait!r} to sync in running event loop without using futures')
176 def f(*a, f, c=loop.create_task, g=e, t=I.Future): return g(c(f(*a)), F := t()) or F.result()
177 if d:
178 p, x = partial(f, f=ait.asend), None
179 while True: x = yield p(x)
180 else:
181 p = partial(f, f=ait.__anext__)
182 while True: yield p()
183 else:
184 a = loop.run_until_complete
185 if d:
186 p, x = ait.asend, None
187 while True: x = yield a(p(x))
188 else:
189 p = ait.__anext__
190 while True: yield a(p())
[docs]
191async def take(it, n=None, *, default=_NO_DEFAULT, _=L.debug, m='asyncutils.base.take: ran out of items'):
192 if n is None:
193 async for i in iter_to_agen(it): yield i
194 return
195 if n == 0: return
196 async for n, i in aenumerate(it, n-1, step=-1): # noqa: B020,PLR1704
197 yield i
198 if n == 0: return
199 if default is RAISE: raise A.ItemsExhausted(m)
200 if default is _NO_DEFAULT: _(m)
201 else:
202 for _ in repeat(default, n): yield _
[docs]
203async def collect(it, n=None, *, default=_NO_DEFAULT, _='asyncutils.base.collect: ran out of items'): return [i async for i in take(it, n, default=default, m=_)]
[docs]
204async def collect_into(out, it, n=None, *, default=_NO_DEFAULT, _='asyncutils.base.collect_into: ran out of items'):
205 a = out.append
206 async for i in take(it, n, default=default, m=_): a(i)
[docs]
207async def drop(it, n, *, raising=False, _=L.debug, m='asyncutils.base.drop: ran out of items'):
208 it = iter_to_agen(it)
209 if n == 0:
210 async for i in it: yield i
211 return
212 async for i, j in aenumerate(it):
213 if i == n: yield j; break
214 else:
215 if raising: raise A.ItemsExhausted(m)
216 _(m); return
217 async for j in it: yield j
[docs]
218async def aenumerate(it, start=0, *, step=1):
219 async for _ in iter_to_agen(it): yield start, _; start += step
220P.patch_function_signatures((safe_cancel_batch, 'batch, /, *, callback=None, disembowel=False, raising=False'), (iter_to_agen, 'it, sentinel={}, *, use_existing_executor=None, create_executor=None, strict=None'), (aiter_to_gen, 'ait, *, use_futures=None, loop=None, strict=None'), (collect, 'it, n=None, *, default={}'), (take, 'it, n, *, default={}'), (drop, 'it, n, *, raising=False'))
221yield_to_event_loop, sleep_forever = object.__new__(type('', (), {'__new__': lambda _: yield_to_event_loop, '__await__': (_ := lambda _: (yield)), **dict.fromkeys(('__repr__', '__str__', '__reduce__'), lambda _, r='asyncutils.base.yield_to_event_loop': r)})), I.sleep.__get__(float('inf'))
222(dummy_task := type(_)(_.__code__.replace(co_flags=0x161, co_name=(_ := 'dummy_task'), co_qualname=_), globals())(None)).close()
223del f, _, P, L, b, c, H