Source code for asyncutils.compete

 1from asyncutils._internal import helpers as H, patch as P
 2from asyncutils._internal.submodules import compete_all as __all__
 3import asyncio as I, asyncutils as A
 4from asyncio.staggered import staggered_race
 5from sys import audit
[docs] 6async def first_completed(*C, ret_exc=False, timeout=None): 7 audit('asyncutils.compete.first_completed/start', L := len(C)); t = tuple(A.new_eager_tasks(*C)) 8 try: 9 async with I.timeout(timeout): 10 for F in (await I.wait(t, return_when='FIRST_COMPLETED'))[0]: return A.wrap_exc(e) if ret_exc and (e := F.exception()) else F.result() 11 finally: audit('asyncutils.compete.first_completed/end', L); await A.safe_cancel_batch(t)
[docs] 12async def race_with_callback(*C, winner=None, loser=None, timeout=None): 13 if not C: raise TypeError('asyncutils.compete.race_with_callback: pass in at least one coroutine') 14 audit('asyncutils.compete.race_with_callback/start', L := len(C)); d, p = await I.wait(A.new_eager_tasks(*C), return_when='FIRST_COMPLETED', timeout=timeout) 15 try: 16 if not d: return 17 w = d.pop().result() 18 if winner is not None and I.iscoroutine(r := winner(w)): await r 19 return w 20 finally: audit('asyncutils.compete.race_with_callback/end', L); await A.safe_cancel_batch(p, callback=loser)
[docs] 21async def multi_winner_race_with_callback(*C, timeout, winner=None, loser=None, _=__import__('operator').methodcaller('result')): # noqa: B008 22 if not C: raise TypeError('asyncutils.compete.multi_winner_race_with_callback: pass in at least one coroutine') 23 audit('asyncutils.compete.multi_winner_race_with_callback/start', L := len(C)); d, p = await I.wait(A.new_eager_tasks(*C), timeout=timeout); d = map(_, d) 24 try: 25 if winner is None: return list(d) 26 async def g(a, /, _=winner, f=(r := []).append): 27 if I.iscoroutine(x := _(a)): await x 28 f(a) 29 await I.gather(*map(g, d)); return r 30 except A.CRITICAL: raise A.Critical 31 finally: audit('asyncutils.compete.multi_winner_race_with_callback/end', L); await A.safe_cancel_batch(p, callback=loser)
[docs] 32def convert_to_coro_iter(cfs, *, loop=None, skip_invalid=None, corocheck=I.iscoroutine, futwrap=I.wrap_future, handle_aiter=None, handle_iter=None, _c=H.check_methods): 33 if handle_iter is None: from asyncutils import to_list as handle_iter 34 if handle_aiter is None: from asyncutils import to_list as handle_aiter 35 if skip_invalid is None: from asyncutils.context import CONVERT_TO_CORO_ITER_DEFAULT_SKIP_INVALID as skip_invalid # noqa: N811 36 for i in A.aiter_to_gen(cfs, loop=loop): 37 if corocheck(i): yield i; continue 38 try: i = futwrap(i, loop=loop) # noqa: PLW2901 39 except A.CRITICAL: raise A.Critical 40 except (AssertionError, TypeError): 41 if not _c(i, '__await__'): 42 if _c(i, '__aiter__'): yield handle_aiter(i) 43 elif _c(i, '__iter__'): yield handle_iter(i) 44 elif not skip_invalid: raise TypeError(f'asyncutils.compete.convert_to_coro_iter: invalid item in {cfs!r}: {i!r}') from None 45 continue 46 yield A.wrap_in_coro(i)
[docs] 47def enhanced_staggered_race(cfs, delay=None, *, loop=None): return staggered_race(map(lambda c: lambda: c, convert_to_coro_iter(cfs, loop=loop)), delay, loop=loop)
[docs] 48def enhanced_gather(it, return_exceptions=False, *, loop=None, _=I.gather): return _(*convert_to_coro_iter(it, loop=loop), return_exceptions=return_exceptions)
49P.patch_function_signatures((first_completed, '*coros, ret_exc=False, timeout=None, loop=None'), (race_with_callback, '*coros, winner=None, loser=None, timeout=None'), (multi_winner_race_with_callback, '*coros, timeout, winner=None, loser=None'), (convert_to_coro_iter, 'cfs, *, loop=None, skip_invalid=None, corocheck={0}, futwrap={0}, handle_aiter=None, handle_iter=None'), (enhanced_gather, 'it, return_exceptions=False, *, loop=None')) 50del H, P