1__lazy_modules__ = frozenset(('asyncutils._internal.running_console', 'functools'))
2import asyncio as I, asyncutils as A
3from asyncutils.constants import _NO_DEFAULT
4from asyncutils._internal.helpers import check_methods, create_executor, get_loop_and_set, fullname
5from asyncutils._internal.patch import patch_function_signatures
6from asyncutils._internal.submodules import util_all as __all__
7from functools import partial, wraps
8from sys import audit, exc_info
[docs]
9def avalify(v):
10 async def g(*_a, **_): return v # noqa: RUF029
11 return g
12afalsify, atruthify, anullify = map(avalify, (False, True, None))
13anullcontext = object.__new__(type('anullcontext', (), {'__new__': lambda _, /: anullcontext, '__aenter__': anullify, '__aexit__': anullify}))
[docs]
14async def wrap_in_coro(aw, /):
15 try: return await aw
16 except A.CRITICAL: raise A.Critical
[docs]
17def done_evt(*, evtcls=I.Event): (E := evtcls()).set(); return E
[docs]
18def done_fut(res=None, *, futcls=I.Future): F = futcls(); F.set_exception(A.unwrap_exc(res)) if A.exception_occurred(res) else F.set_result(res); return F
[docs]
19async def locked_lock(*, lcls=I.Lock): await (l := lcls()).acquire(); return l
[docs]
20def get_future(aw, loop=None): return (get_loop_and_set() if loop is None else loop).create_task(wrap_in_coro(aw))
[docs]
21def new_eager_tasks(*aws): (l := get_loop_and_set()).set_task_factory(I.eager_task_factory); yield from map(partial(get_future, loop=l), aws)
22def _fcopy(f, /): return wraps(f)(lambda *a, **k: f(*a, **k)) # noqa: PLW0108
[docs]
23def afcopy(f, /): return wraps(f)(lambda *a, **k: wrap_in_coro(f(*a, **k)))
[docs]
24def discard_retval(f, /):
25 async def g(*a, **k): await f(*a, **k)
26 return wraps(f)(g)
[docs]
27def to_sync(f, /, loop=None, *, timeout=None):
28 audit('asyncutils.util.to_sync', fullname(f))
29 if (f := getattr(f, '__sync__', f)) is not f: return f
30 (g := afcopy(f)).__sync__ = r = wraps(f)(lambda *a, **k: sync_await(f(*a, **k), timeout=timeout, loop=loop)); r.__async__ = g; return r # ty: ignore[unresolved-attribute]
[docs]
31def to_sync_from_loop(loop): return partial(to_sync, loop=loop)
32def _set_call(F, f, /): F.set_result(f())
[docs]
33def transient_block(l, f, /, *a, _threadsafe_=False, **k): (l.call_soon_threadsafe if _threadsafe_ else l.call_soon)(_set_call, F := l.create_future(), partial(f, *a, **k)); return F
[docs]
34def transient_block_from_loop(loop, *, threadsafe=False): return partial(transient_block, loop, _threadsafe_=threadsafe)
[docs]
35def sync_await(aw, loop=None, *, never_block=True, timeout=None):
36 audit('asyncutils.util.sync_await', fullname(aw))
37 if loop is None: loop = get_loop_and_set()
38 return (A.raise_exc(A.Deadlock, 'asyncutils.util.sync_await: cannot await on the current loop without blocking') if loop is I._get_running_loop() else I.run_coroutine_threadsafe(wrap_in_coro(aw), loop).result(timeout)) if never_block or loop.is_running() else loop.run_until_complete(I.wait_for(I.ensure_future(aw, loop=loop), timeout))
[docs]
39def semaphore(bounded=False, workers=None):
40 if workers is None: workers = A.getcontext().SEMAPHORE_DEFAULT_VALUE
41 return (I.Lock() if workers == 1 else I.BoundedSemaphore(workers)) if bounded else I.Semaphore(workers)
[docs]
42def lockf(f, /, lf=I.Lock, _lc=__import__('weakref').WeakKeyDictionary()): # noqa: B008
43 if (l := _lc.get(f)) is None: _lc[f] = l = lf()
44 async def r(*a, **k):
45 async with l: return await f(*a, **k)
46 return wraps(f)(r)
[docs]
47def to_async(f, /):
48 audit('asyncutils.util.to_async', fullname(f))
49 if (f := getattr(f, '__async__', f)) is not f: return f
50 if (e := getattr(to_async, 'executor', None)) is None: e = create_executor(to_async)
51 r = partial(get_loop_and_set().run_in_executor, e)
52 async def h(*a, **k): return await r(partial(f, *a, **k))
53 g.__async__, h.__sync__ = wraps(f)(h), (g := _fcopy(f)); return h # ty: ignore[unresolved-attribute]
[docs]
54async def aiter_from_f(f, s=_NO_DEFAULT, /):
55 while True:
56 if (r := await f()) is s or r == s: break
57 yield r
[docs]
58async def safe_cancel(t, /):
59 F = t.get_loop().create_future()
60 def f(_):
61 if not F.done(): F.set_result(None)
62 t.add_done_callback(f)
63 if not t.done(): t.cancel()
64 try: await F
65 finally: t.remove_done_callback(f)
66class DualContextManager:
67 __slots__ = '_aentered', '_ce', '_entered', '_gen', '_st', '_ue'
68 def __init__(self, /, *_): self._gen, self._ce, self._ue, self._st = _; self._entered = self._aentered = False
69 def __enter__(self):
70 if self._aentered: raise RuntimeError('asyncutils.util.dualcontextmanager: context manager already entered asynchronously')
71 if self._entered: raise RuntimeError('asyncutils.util.dualcontextmanager: context manager already entered')
72 try: self._gen = g = A.aiter_to_gen(self._gen, strict=self._st, use_futures=True); self._entered = True; return next(g)
73 except StopIteration: raise RuntimeError("asyncutils.util.dualcontextmanager: generator didn't yield") from None
74 def __exit__(self, t, v, b, /):
75 if self._aentered: raise RuntimeError('asyncutils.util.dualcontextmanager: cannot exit async context manager synchronously')
76 if not self._entered: raise RuntimeError('asyncutils.util.dualcontextmanager: context manager was never entered')
77 g = self._gen
78 if t is None:
79 try: next(g)
80 except StopIteration: return False
81 try: raise RuntimeError("asyncutils.util.dualcontextmanager: generator didn't stop")
82 finally: g.close()
83 if v is None: v = t()
84 try: g.throw(v)
85 except BaseException as e:
86 f = e is v
87 if isinstance(e, StopIteration): return not f
88 if f or (isinstance(e, RuntimeError) and isinstance(v, StopIteration) and e.__cause__ is (e := v)): e.__traceback__ = b; return False
89 raise
90 try: raise RuntimeError("asyncutils.util.dualcontextmanager: generator didn't stop after throw")
91 finally: g.close()
92 def __aenter__(self):
93 if self._aentered: raise RuntimeError('asyncutils.util.dualcontextmanager: async context manager already entered')
94 if self._entered: raise RuntimeError('asyncutils.util.dualcontextmanager: async context manager already entered synchronously')
95 try: self._gen = g = A.iter_to_agen(self._gen, strict=self._st, use_existing_executor=self._ue, create_executor=self._ce); self._aentered = True; return anext(g)
96 except StopAsyncIteration: raise RuntimeError("asyncutils.util.dualcontextmanager: async generator didn't yield") from None
97 async def __aexit__(self, t, v, b, /):
98 if self._entered: raise RuntimeError('asyncutils.util.dualcontextmanager: cannot exit sync context manager asynchronously')
99 if not self._aentered: raise RuntimeError('asyncutils.util.dualcontextmanager: async context manager was never entered')
100 g = self._gen
101 if t is None:
102 try: await anext(g)
103 except StopAsyncIteration: return False
104 try: raise RuntimeError("asyncutils.util.dualcontextmanager: async generator didn't stop")
105 finally: await g.aclose()
106 if v is None: v = t()
107 try: await g.athrow(v)
108 except BaseException as e:
109 f = e is v
110 if isinstance(e, StopAsyncIteration): return not f
111 if f or (isinstance(e, RuntimeError) and isinstance(v, StopAsyncIteration) and e.__cause__ is (e := v)): e.__traceback__ = b; return False
112 raise
113 try: raise RuntimeError("asyncutils.util.dualcontextmanager: async generator didn't stop after athrow")
114 finally: await g.aclose()
[docs]
115def dualcontextmanager(f=None, /, _=DualContextManager, *, use_existing_executor=None, create_executor=None, strict=None):
116 if f is None: return lambda f, /: dualcontextmanager(f, use_existing_executor=use_existing_executor, create_executor=create_executor, strict=strict)
117 return wraps(f)(lambda *a, **k: (c := A.getcontext()) and _(f(*a, **k), c.DUAL_CONTEXT_MANAGER_DEFAULT_USE_EXISTING_EXECUTOR if use_existing_executor is None else use_existing_executor, c.DUAL_CONTEXT_MANAGER_DEFAULT_MAY_CREATE_EXECUTOR if create_executor is None else create_executor, c.DUAL_CONTEXT_MANAGER_DEFAULT_STRICT if strict is None else strict))
[docs]
118def aawcmf2dcmff(**d):
119 def f(f, /, _=dualcontextmanager(**d)): # noqa: B008
120 async def g(*a, **k):
121 c = f(*a, **k)
122 with A.ignore_typeerrs: c = await c
123 if check_methods(c, '__aenter__', '__aexit__'):
124 async with c as r: yield r; return # noqa: ASYNC119
125 if (e := getattr(aawcmf2dcmff, 'executor', None)) is None: e = create_executor(aawcmf2dcmff)
126 r = await (h := partial(get_loop_and_set().run_in_executor, e))(c.__enter__())
127 try: yield r
128 finally: await h(c.__exit__, *exc_info())
129 return _(g)
130 f.__text_signature__ = '(f, /)'; return f # ty: ignore[unresolved-attribute]
131dcm, ignore_cancellation = (aawcmf2dcmf := aawcmf2dcmff()).__defaults__[0], A.IgnoreErrors(I.CancelledError)
132patch_function_signatures((lockf, 'f, /, lf={}'), (dualcontextmanager, 'f=None, /, *, use_existing_executor=None, create_executor=None, strict=None'))
133del DualContextManager