Source code for asyncutils.util

  1__lazy_modules__ = frozenset(('asyncutils._internal.running_console', 'functools'))
  2import asyncio as I, asyncutils as A
  3from asyncutils.constants import _NO_DEFAULT
  4from asyncutils._internal.helpers import check_methods, create_executor, get_loop_and_set, fullname
  5from asyncutils._internal.patch import patch_function_signatures
  6from asyncutils._internal.submodules import util_all as __all__
  7from functools import partial, wraps
  8from sys import audit, exc_info
[docs] 9def avalify(v): 10 async def g(*_a, **_): return v # noqa: RUF029 11 return g
12afalsify, atruthify, anullify = map(avalify, (False, True, None)) 13anullcontext = object.__new__(type('anullcontext', (), {'__new__': lambda _, /: anullcontext, '__aenter__': anullify, '__aexit__': anullify}))
[docs] 14async def wrap_in_coro(aw, /): 15 try: return await aw 16 except A.CRITICAL: raise A.Critical
[docs] 17def done_evt(*, evtcls=I.Event): (E := evtcls()).set(); return E
[docs] 18def done_fut(res=None, *, futcls=I.Future): F = futcls(); F.set_exception(A.unwrap_exc(res)) if A.exception_occurred(res) else F.set_result(res); return F
[docs] 19async def locked_lock(*, lcls=I.Lock): await (l := lcls()).acquire(); return l
[docs] 20def get_future(aw, loop=None): return (get_loop_and_set() if loop is None else loop).create_task(wrap_in_coro(aw))
[docs] 21def new_eager_tasks(*aws): (l := get_loop_and_set()).set_task_factory(I.eager_task_factory); yield from map(partial(get_future, loop=l), aws)
22def _fcopy(f, /): return wraps(f)(lambda *a, **k: f(*a, **k)) # noqa: PLW0108
[docs] 23def afcopy(f, /): return wraps(f)(lambda *a, **k: wrap_in_coro(f(*a, **k)))
[docs] 24def discard_retval(f, /): 25 async def g(*a, **k): await f(*a, **k) 26 return wraps(f)(g)
[docs] 27def to_sync(f, /, loop=None, *, timeout=None): 28 audit('asyncutils.util.to_sync', fullname(f)) 29 if (f := getattr(f, '__sync__', f)) is not f: return f 30 (g := afcopy(f)).__sync__ = r = wraps(f)(lambda *a, **k: sync_await(f(*a, **k), timeout=timeout, loop=loop)); r.__async__ = g; return r # ty: ignore[unresolved-attribute]
[docs] 31def to_sync_from_loop(loop): return partial(to_sync, loop=loop)
32def _set_call(F, f, /): F.set_result(f())
[docs] 33def transient_block(l, f, /, *a, _threadsafe_=False, **k): (l.call_soon_threadsafe if _threadsafe_ else l.call_soon)(_set_call, F := l.create_future(), partial(f, *a, **k)); return F
[docs] 34def transient_block_from_loop(loop, *, threadsafe=False): return partial(transient_block, loop, _threadsafe_=threadsafe)
[docs] 35def sync_await(aw, loop=None, *, never_block=True, timeout=None): 36 audit('asyncutils.util.sync_await', fullname(aw)) 37 if loop is None: loop = get_loop_and_set() 38 return (A.raise_exc(A.Deadlock, 'asyncutils.util.sync_await: cannot await on the current loop without blocking') if loop is I._get_running_loop() else I.run_coroutine_threadsafe(wrap_in_coro(aw), loop).result(timeout)) if never_block or loop.is_running() else loop.run_until_complete(I.wait_for(I.ensure_future(aw, loop=loop), timeout))
[docs] 39def semaphore(bounded=False, workers=None): 40 if workers is None: workers = A.getcontext().SEMAPHORE_DEFAULT_VALUE 41 return (I.Lock() if workers == 1 else I.BoundedSemaphore(workers)) if bounded else I.Semaphore(workers)
[docs] 42def lockf(f, /, lf=I.Lock, _lc=__import__('weakref').WeakKeyDictionary()): # noqa: B008 43 if (l := _lc.get(f)) is None: _lc[f] = l = lf() 44 async def r(*a, **k): 45 async with l: return await f(*a, **k) 46 return wraps(f)(r)
[docs] 47def to_async(f, /): 48 audit('asyncutils.util.to_async', fullname(f)) 49 if (f := getattr(f, '__async__', f)) is not f: return f 50 if (e := getattr(to_async, 'executor', None)) is None: e = create_executor(to_async) 51 r = partial(get_loop_and_set().run_in_executor, e) 52 async def h(*a, **k): return await r(partial(f, *a, **k)) 53 g.__async__, h.__sync__ = wraps(f)(h), (g := _fcopy(f)); return h # ty: ignore[unresolved-attribute]
[docs] 54async def aiter_from_f(f, s=_NO_DEFAULT, /): 55 while True: 56 if (r := await f()) is s or r == s: break 57 yield r
[docs] 58async def safe_cancel(t, /): 59 F = t.get_loop().create_future() 60 def f(_): 61 if not F.done(): F.set_result(None) 62 t.add_done_callback(f) 63 if not t.done(): t.cancel() 64 try: await F 65 finally: t.remove_done_callback(f)
66class DualContextManager: 67 __slots__ = '_aentered', '_ce', '_entered', '_gen', '_st', '_ue' 68 def __init__(self, /, *_): self._gen, self._ce, self._ue, self._st = _; self._entered = self._aentered = False 69 def __enter__(self): 70 if self._aentered: raise RuntimeError('asyncutils.util.dualcontextmanager: context manager already entered asynchronously') 71 if self._entered: raise RuntimeError('asyncutils.util.dualcontextmanager: context manager already entered') 72 try: self._gen = g = A.aiter_to_gen(self._gen, strict=self._st, use_futures=True); self._entered = True; return next(g) 73 except StopIteration: raise RuntimeError("asyncutils.util.dualcontextmanager: generator didn't yield") from None 74 def __exit__(self, t, v, b, /): 75 if self._aentered: raise RuntimeError('asyncutils.util.dualcontextmanager: cannot exit async context manager synchronously') 76 if not self._entered: raise RuntimeError('asyncutils.util.dualcontextmanager: context manager was never entered') 77 g = self._gen 78 if t is None: 79 try: next(g) 80 except StopIteration: return False 81 try: raise RuntimeError("asyncutils.util.dualcontextmanager: generator didn't stop") 82 finally: g.close() 83 if v is None: v = t() 84 try: g.throw(v) 85 except BaseException as e: 86 f = e is v 87 if isinstance(e, StopIteration): return not f 88 if f or (isinstance(e, RuntimeError) and isinstance(v, StopIteration) and e.__cause__ is (e := v)): e.__traceback__ = b; return False 89 raise 90 try: raise RuntimeError("asyncutils.util.dualcontextmanager: generator didn't stop after throw") 91 finally: g.close() 92 def __aenter__(self): 93 if self._aentered: raise RuntimeError('asyncutils.util.dualcontextmanager: async context manager already entered') 94 if self._entered: raise RuntimeError('asyncutils.util.dualcontextmanager: async context manager already entered synchronously') 95 try: self._gen = g = A.iter_to_agen(self._gen, strict=self._st, use_existing_executor=self._ue, create_executor=self._ce); self._aentered = True; return anext(g) 96 except StopAsyncIteration: raise RuntimeError("asyncutils.util.dualcontextmanager: async generator didn't yield") from None 97 async def __aexit__(self, t, v, b, /): 98 if self._entered: raise RuntimeError('asyncutils.util.dualcontextmanager: cannot exit sync context manager asynchronously') 99 if not self._aentered: raise RuntimeError('asyncutils.util.dualcontextmanager: async context manager was never entered') 100 g = self._gen 101 if t is None: 102 try: await anext(g) 103 except StopAsyncIteration: return False 104 try: raise RuntimeError("asyncutils.util.dualcontextmanager: async generator didn't stop") 105 finally: await g.aclose() 106 if v is None: v = t() 107 try: await g.athrow(v) 108 except BaseException as e: 109 f = e is v 110 if isinstance(e, StopAsyncIteration): return not f 111 if f or (isinstance(e, RuntimeError) and isinstance(v, StopAsyncIteration) and e.__cause__ is (e := v)): e.__traceback__ = b; return False 112 raise 113 try: raise RuntimeError("asyncutils.util.dualcontextmanager: async generator didn't stop after athrow") 114 finally: await g.aclose()
[docs] 115def dualcontextmanager(f=None, /, _=DualContextManager, *, use_existing_executor=None, create_executor=None, strict=None): 116 if f is None: return lambda f, /: dualcontextmanager(f, use_existing_executor=use_existing_executor, create_executor=create_executor, strict=strict) 117 return wraps(f)(lambda *a, **k: (c := A.getcontext()) and _(f(*a, **k), c.DUAL_CONTEXT_MANAGER_DEFAULT_USE_EXISTING_EXECUTOR if use_existing_executor is None else use_existing_executor, c.DUAL_CONTEXT_MANAGER_DEFAULT_MAY_CREATE_EXECUTOR if create_executor is None else create_executor, c.DUAL_CONTEXT_MANAGER_DEFAULT_STRICT if strict is None else strict))
[docs] 118def aawcmf2dcmff(**d): 119 def f(f, /, _=dualcontextmanager(**d)): # noqa: B008 120 async def g(*a, **k): 121 c = f(*a, **k) 122 with A.ignore_typeerrs: c = await c 123 if check_methods(c, '__aenter__', '__aexit__'): 124 async with c as r: yield r; return # noqa: ASYNC119 125 if (e := getattr(aawcmf2dcmff, 'executor', None)) is None: e = create_executor(aawcmf2dcmff) 126 r = await (h := partial(get_loop_and_set().run_in_executor, e))(c.__enter__()) 127 try: yield r 128 finally: await h(c.__exit__, *exc_info()) 129 return _(g) 130 f.__text_signature__ = '(f, /)'; return f # ty: ignore[unresolved-attribute]
131dcm, ignore_cancellation = (aawcmf2dcmf := aawcmf2dcmff()).__defaults__[0], A.IgnoreErrors(I.CancelledError) 132patch_function_signatures((lockf, 'f, /, lf={}'), (dualcontextmanager, 'f=None, /, *, use_existing_executor=None, create_executor=None, strict=None')) 133del DualContextManager