1# ty: ignore[unresolved-attribute]
2__lazy_modules__ = frozenset(('asyncutils._internal.running_console', 'functools'))
3import asyncio as I, asyncutils as A
4from asyncutils.constants import _NO_DEFAULT
5from asyncutils._internal.helpers import check_methods, create_executor, get_loop_and_set, fullname
6from asyncutils._internal.patch import patch_function_signatures
7from asyncutils._internal.submodules import util_all as __all__
8from functools import partial, wraps
9from sys import audit, exc_info
[docs]
10def avalify(v):
11 async def g(*_a, **_): return v # noqa: RUF029
12 return g
13afalsify, atruthify, anullify = map(avalify, (False, True, None))
14anullcontext = object.__new__(type('anullcontext', (), {'__new__': lambda _, /: anullcontext, '__aenter__': anullify, '__aexit__': anullify}))
[docs]
15async def wrap_in_coro(aw, /):
16 try: return await aw
17 except A.CRITICAL: raise A.Critical
[docs]
18def done_evt(*, evtcls=I.Event): (E := evtcls()).set(); return E
[docs]
19def done_fut(res=None, *, futcls=I.Future): F = futcls(); F.set_exception(A.unwrap_exc(res)) if A.exception_occurred(res) else F.set_result(res); return F
[docs]
20async def locked_lock(*, lcls=I.Lock): await (l := lcls()).acquire(); return l
[docs]
21def get_future(aw, loop=None): return (get_loop_and_set() if loop is None else loop).create_task(wrap_in_coro(aw))
[docs]
22def new_eager_tasks(*aws): (l := get_loop_and_set()).set_task_factory(I.eager_task_factory); yield from map(partial(get_future, loop=l), aws)
23def _fcopy(f, /): return wraps(f)(lambda *a, **k: f(*a, **k)) # noqa: PLW0108
[docs]
24def afcopy(f, /): return wraps(f)(lambda *a, **k: wrap_in_coro(f(*a, **k)))
[docs]
25def to_sync(f, /, loop=None, *, timeout=None):
26 audit('asyncutils.util.to_sync', fullname(f))
27 if (f := getattr(f, '__sync__', f)) is not f: return f
28 (g := afcopy(f)).__sync__ = r = wraps(f)(lambda *a, **k: sync_await(f(*a, **k), timeout=timeout, loop=loop)); r.__async__ = g; return r
[docs]
29def to_sync_from_loop(loop): return partial(to_sync, loop=loop)
30def _set_call(F, f, /): F.set_result(f())
[docs]
31def transient_block(l, f, /, *a, _threadsafe_=False, **k): (l.call_soon_threadsafe if _threadsafe_ else l.call_soon)(_set_call, F := l.create_future(), partial(f, *a, **k)); return F
[docs]
32def transient_block_from_loop(loop, *, threadsafe=False): return partial(transient_block, loop, _threadsafe_=threadsafe)
[docs]
33def sync_await(aw, loop=None, *, never_block=True, timeout=None):
34 audit('asyncutils.util.sync_await', fullname(aw))
35 if loop is None: loop = get_loop_and_set()
36 return (A.raise_exc(RuntimeError, 'asyncutils.util.sync_await: cannot await on the current loop without blocking') if loop is A._get_running_loop() else I.run_coroutine_threadsafe(wrap_in_coro(aw), loop).result(timeout)) if never_block or loop.is_running() else loop.run_until_complete(I.wait_for(I.ensure_future(aw, loop=loop), timeout))
[docs]
37def semaphore(bounded=False, workers=None):
38 if workers is None: workers = A.getcontext().SEMAPHORE_DEFAULT_VALUE
39 return (I.Lock() if workers == 1 else I.BoundedSemaphore(workers)) if bounded else I.Semaphore(workers)
[docs]
40def lockf(f, /, lf=I.Lock, _lc=__import__('weakref').WeakKeyDictionary()): # noqa: B008
41 if (l := _lc.get(f)) is None: _lc[f] = l = lf()
42 async def r(*a, **k):
43 async with l: return await f(*a, **k)
44 return wraps(f)(r)
[docs]
45def to_async(f, /):
46 audit('asyncutils.util.to_async', fullname(f))
47 if (f := getattr(f, '__async__', f)) is not f: return f
48 if (e := getattr(to_async, 'executor', None)) is None: e = create_executor(to_async)
49 r = partial(get_loop_and_set().run_in_executor, e)
50 async def h(*a, **k): return await r(partial(f, *a, **k))
51 g.__async__, h.__sync__ = wraps(f)(h), (g := _fcopy(f)); return h
[docs]
52async def aiter_from_f(f, s=_NO_DEFAULT, /):
53 while True:
54 if (r := await f()) is s or r == s: break
55 yield r
[docs]
56async def safe_cancel(t, /):
57 F = t.get_loop().create_future()
58 def f(_):
59 if not F.done(): F.set_result(None)
60 t.add_done_callback(f)
61 if not t.done(): t.cancel()
62 try: await F
63 finally: t.remove_done_callback(f)
64class DualContextManager:
65 __slots__ = '_aentered', '_ce', '_entered', '_gen', '_st', '_ue'
66 def __init__(self, /, *_): self._gen, self._ce, self._ue, self._st = _; self._entered = self._aentered = False
67 def __enter__(self):
68 if self._aentered: raise RuntimeError('asyncutils.util.dualcontextmanager: context manager already entered asynchronously')
69 if self._entered: raise RuntimeError('asyncutils.util.dualcontextmanager: context manager already entered')
70 try: self._gen = g = A.aiter_to_gen(self._gen, strict=self._st, use_futures=True); self._entered = True; return next(g)
71 except StopIteration: raise RuntimeError("asyncutils.util.dualcontextmanager: generator didn't yield") from None
72 def __exit__(self, t, v, b, /):
73 if self._aentered: raise RuntimeError('asyncutils.util.dualcontextmanager: cannot exit async context manager synchronously')
74 if not self._entered: raise RuntimeError('asyncutils.util.dualcontextmanager: context manager was never entered')
75 g = self._gen
76 if t is None:
77 try: next(g)
78 except StopIteration: return False
79 try: raise RuntimeError("asyncutils.util.dualcontextmanager: generator didn't stop")
80 finally: g.close()
81 if v is None: v = t()
82 try: g.throw(v)
83 except BaseException as e:
84 f = e is v
85 if isinstance(e, StopIteration): return not f
86 if f or (isinstance(e, RuntimeError) and isinstance(v, StopIteration) and e.__cause__ is (e := v)): e.__traceback__ = b; return False
87 raise
88 try: raise RuntimeError("asyncutils.util.dualcontextmanager: generator didn't stop after throw")
89 finally: g.close()
90 def __aenter__(self):
91 if self._aentered: raise RuntimeError('asyncutils.util.dualcontextmanager: async context manager already entered')
92 if self._entered: raise RuntimeError('asyncutils.util.dualcontextmanager: async context manager already entered synchronously')
93 try: self._gen = g = A.iter_to_agen(self._gen, strict=self._st, use_existing_executor=self._ue, create_executor=self._ce); self._aentered = True; return anext(g)
94 except StopAsyncIteration: raise RuntimeError("asyncutils.util.dualcontextmanager: async generator didn't yield") from None
95 async def __aexit__(self, t, v, b, /):
96 if self._entered: raise RuntimeError('asyncutils.util.dualcontextmanager: cannot exit sync context manager asynchronously')
97 if not self._aentered: raise RuntimeError('asyncutils.util.dualcontextmanager: async context manager was never entered')
98 g = self._gen
99 if t is None:
100 try: await anext(g)
101 except StopAsyncIteration: return False
102 try: raise RuntimeError("asyncutils.util.dualcontextmanager: async generator didn't stop")
103 finally: await g.aclose()
104 if v is None: v = t()
105 try: await g.athrow(v)
106 except BaseException as e:
107 f = e is v
108 if isinstance(e, StopAsyncIteration): return not f
109 if f or (isinstance(e, RuntimeError) and isinstance(v, StopAsyncIteration) and e.__cause__ is (e := v)): e.__traceback__ = b; return False
110 raise
111 try: raise RuntimeError("asyncutils.util.dualcontextmanager: async generator didn't stop after athrow")
112 finally: await g.aclose()
[docs]
113def dualcontextmanager(f=None, /, _=DualContextManager, *, use_existing_executor=None, create_executor=None, strict=None):
114 if f is None: return lambda f, /: dualcontextmanager(f, use_existing_executor=use_existing_executor, create_executor=create_executor, strict=strict)
115 return wraps(f)(lambda *a, **k: (c := A.getcontext()) and _(f(*a, **k), c.DUAL_CONTEXT_MANAGER_DEFAULT_USE_EXISTING_EXECUTOR if use_existing_executor is None else use_existing_executor, c.DUAL_CONTEXT_MANAGER_DEFAULT_MAY_CREATE_EXECUTOR if create_executor is None else create_executor, c.DUAL_CONTEXT_MANAGER_DEFAULT_STRICT if strict is None else strict))
[docs]
116def aawcmf2dcmff(**d):
117 def f(f, /, _=dualcontextmanager(**d)): # noqa: B008
118 async def g(*a, **k):
119 c = f(*a, **k)
120 with A.ignore_typeerrs: c = await c
121 if check_methods(c, '__aenter__', '__aexit__'):
122 async with c as r: yield r; return # noqa: ASYNC119
123 if (e := getattr(aawcmf2dcmff, 'executor', None)) is None: e = create_executor(aawcmf2dcmff)
124 r = await (h := partial(get_loop_and_set().run_in_executor, e))(c.__enter__())
125 try: yield r
126 finally: await h(c.__exit__, *exc_info())
127 return _(g)
128 f.__text_signature__ = '(f, /)'; return f
129dcm, ignore_cancellation = (aawcmf2dcmf := aawcmf2dcmff()).__defaults__[0], A.IgnoreErrors(I.CancelledError)
130patch_function_signatures((lockf, 'f, /, lf={}'), (dualcontextmanager, 'f=None, /, *, use_existing_executor=None, create_executor=None, strict=None'))
131del DualContextManager