1from asyncutils._internal import helpers as H, patch as P
2from asyncutils._internal.submodules import compete_all as __all__
3import asyncio as I, asyncutils as A
4from asyncio.staggered import staggered_race
5from sys import audit
[docs]
6async def first_completed(*C, ret_exc=False, timeout=None):
7 audit('asyncutils.compete.first_completed/start', L := len(C)); t = tuple(A.new_eager_tasks(*C))
8 try:
9 async with I.timeout(timeout):
10 for F in (await I.wait(t, return_when='FIRST_COMPLETED'))[0]: return A.wrap_exc(e) if ret_exc and (e := F.exception()) else F.result()
11 finally: audit('asyncutils.compete.first_completed/end', L); await A.safe_cancel_batch(t)
[docs]
12async def race_with_callback(*C, winner=None, loser=None, timeout=None):
13 if not C: raise TypeError('asyncutils.compete.race_with_callback: pass in at least one coroutine')
14 audit('asyncutils.compete.race_with_callback/start', L := len(C)); d, p = await I.wait(A.new_eager_tasks(*C), return_when='FIRST_COMPLETED', timeout=timeout)
15 try:
16 if not d: return
17 w = d.pop().result()
18 if winner is not None and I.iscoroutine(r := winner(w)): await r
19 return w
20 finally: audit('asyncutils.compete.race_with_callback/end', L); await A.safe_cancel_batch(p, callback=loser)
[docs]
21async def multi_winner_race_with_callback(*C, timeout, winner=None, loser=None, _=__import__('operator').methodcaller('result')): # noqa: B008
22 if not C: raise TypeError('asyncutils.compete.multi_winner_race_with_callback: pass in at least one coroutine')
23 audit('asyncutils.compete.multi_winner_race_with_callback/start', L := len(C)); d, p = await I.wait(A.new_eager_tasks(*C), timeout=timeout); d = map(_, d)
24 try:
25 if winner is None: return list(d)
26 async def g(a, /, _=winner, f=(r := []).append):
27 if I.iscoroutine(x := _(a)): await x
28 f(a)
29 await I.gather(*map(g, d)); return r
30 except A.CRITICAL: raise A.Critical
31 finally: audit('asyncutils.compete.multi_winner_race_with_callback/end', L); await A.safe_cancel_batch(p, callback=loser)
[docs]
32def convert_to_coro_iter(cfs, *, loop=None, skip_invalid=None, corocheck=I.iscoroutine, futwrap=I.wrap_future, handle_aiter=None, handle_iter=None, _c=H.check_methods):
33 if handle_iter is None: from asyncutils import to_list as handle_iter
34 if handle_aiter is None: from asyncutils import to_list as handle_aiter
35 if skip_invalid is None: from asyncutils.context import CONVERT_TO_CORO_ITER_DEFAULT_SKIP_INVALID as skip_invalid # noqa: N811
36 for i in A.aiter_to_gen(cfs, loop=loop):
37 if corocheck(i): yield i; continue
38 try: i = futwrap(i, loop=loop) # noqa: PLW2901
39 except A.CRITICAL: raise A.Critical
40 except (AssertionError, TypeError):
41 if not _c(i, '__await__'):
42 if _c(i, '__aiter__'): yield handle_aiter(i)
43 elif _c(i, '__iter__'): yield handle_iter(i)
44 elif not skip_invalid: raise TypeError(f'asyncutils.compete.convert_to_coro_iter: invalid item in {cfs!r}: {i!r}') from None
45 continue
46 yield A.wrap_in_coro(i)
[docs]
47def enhanced_staggered_race(cfs, delay=None, *, loop=None): return staggered_race(map(lambda c: lambda: c, convert_to_coro_iter(cfs, loop=loop)), delay, loop=loop)
[docs]
48def enhanced_gather(it, return_exceptions=False, *, loop=None, _=I.gather): return _(*convert_to_coro_iter(it, loop=loop), return_exceptions=return_exceptions)
49P.patch_function_signatures((first_completed, '*coros, ret_exc=False, timeout=None, loop=None'), (race_with_callback, '*coros, winner=None, loser=None, timeout=None'), (multi_winner_race_with_callback, '*coros, timeout, winner=None, loser=None'), (convert_to_coro_iter, 'cfs, *, loop=None, skip_invalid=None, corocheck={0}, futwrap={0}, handle_aiter=None, handle_iter=None'), (enhanced_gather, 'it, return_exceptions=False, *, loop=None'))
50del H, P