Source code for asyncutils.iters

  1from asyncutils import aenumerate, getcontext, iter_to_agen
  2from asyncutils.config import _randinst
  3from asyncutils.constants import _NO_DEFAULT
  4from asyncutils._internal import compat as Z, helpers as H, patch as P
  5from asyncutils._internal.submodules import iters_all as __all__
  6import asyncutils as A, asyncio as B, operator as O, math as M
  7from collections import Counter, defaultdict, deque
  8from functools import partial, lru_cache, wraps
  9from itertools import repeat
 10from sys import audit
 11from time import monotonic
 12_rand, _randrange, _sample, _smallprimes, _perfect_test, _identity = _randinst.random, _randinst.randrange, _randinst.sample, frozenset(_littleprimes := (2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199)), ((0x7ff, (2,)), (0x8a8d7f, (31, 73)), (0x11baa74c5, (2, 7, 61)), (0x1053cb094c1, (2, 13, 23, 0x195f53)), (0x1f51f3fee3b, _littleprimes[:5]), (0x32907381cdf, _littleprimes[:6]), (1<<64, (2, 0x145, 0x249f, 0x6e12, 0x6e0d7, 0x953d18, 0x6b0191fe)), (0x2be6951adc5b22410a5fd, _littleprimes[:13]), (0x4c16c7697197146a6b8eb49518c5, _littleprimes[:18])), lambda _, /: _
[docs] 13def fmap(fs, /, *a, **k): return agather(f(*a, **k) async for f in iter_to_agen(fs))
[docs] 14async def fmap_sequential(fs, /, *a, **k): 15 async for f in iter_to_agen(fs): yield await f(*a, **k)
[docs] 16async def fmap_parallel(fs, /, *a, **k): 17 t = H.get_loop_and_set().create_task 18 for r in await to_list(t(f(*a, **k)) async for f in iter_to_agen(fs)): yield await r
[docs] 19async def map_on_map(outer, inner, it, *, inner_await=False, outer_await=False): 20 async for _ in amap(inner, it, await_=inner_await): yield await to_tuple(amap(outer, _, await_=outer_await))
[docs] 21def aevery(it, n, *, skip_first=False): return aislice(it, skip_first, None, n)
[docs] 22def aeveryother(it, *, skip_first=False): return aevery(it, 2, skip_first=skip_first)
[docs] 23async def agather(it_of_its, return_exceptions=False): return await B.gather(*await to_list(it_of_its), return_exceptions=return_exceptions)
[docs] 24def aawgenf2agenf(f, /): 25 async def g(*a, **k): 26 async for _ in await f(*a, **k): yield _ 27 return wraps(f)(g)
[docs] 28def tee(it, n=2, *, maxqsize=None, put_exc=None, loop=None): 29 if n <= 0: raise ValueError('asyncutils.iters.tee: n must be positive') 30 if n == 1: return iter_to_agen(it), 31 if loop is None: loop = H.get_loop_and_set() 32 C = getcontext() 33 if put_exc is None: put_exc = C.TEE_DEFAULT_PUT_EXC 34 if maxqsize is None: maxqsize = C.TEE_DEFAULT_MAX_QSIZE 35 Q = tuple(Z.Queue(maxqsize) for _ in repeat(None, n)) 36 async def iterator(q): 37 nonlocal n 38 while True: 39 try: 40 if A.exception_occurred(i := await q.get()) and put_exc: raise A.unwrap_exc(i) 41 yield i 42 except Z.QueueShutDown: 43 n -= 1 44 if n == 0: await A.safe_cancel(t) 45 break 46 async def feed(): 47 async def helper(i): await agather((q.put(i) for q in Q), True) 48 try: await agather(amap(helper, it)) 49 except A.CRITICAL: raise A.Critical 50 except BaseException as e: 51 if put_exc: await helper(A.wrap_exc(e)) 52 else: 53 for q in Q: q.shutdown(True) 54 raise 55 finally: 56 for q in Q: q.shutdown() 57 t = loop.create_task(feed()); return tuple(map(iterator, Q))
[docs] 58async def adoublestarmap(f, it, /, await_=False): 59 it = iter_to_agen(it) 60 if await_: 61 async for _ in it: yield await f(**_) 62 else: 63 async for _ in it: yield f(**_)
[docs] 64async def astarmap_with_kwds(f, it, /, await_=False): 65 it = iter_to_agen(it) 66 if await_: 67 async for a, k in it: yield await f(*a, **k) 68 else: 69 async for a, k in it: yield f(*a, **k)
[docs] 70async def aloops(n): 71 m, n = divmod(n, 1024) 72 for _ in repeat(None, m): 73 for _ in repeat(None, 1024): yield 74 await A.yield_to_event_loop 75 for _ in repeat(None, n): yield
76async def _aunzip_put(*_): 77 for q, i in zip(*_): 78 with A.ignore_qshutdown: await q.put(i)
[docs] 79async def aunzip(ait, *, fillvalue=_NO_DEFAULT, put_batch=None, maxqsize=None, _a=_aunzip_put, _b=_identity): 80 audit('asyncutils.iters.aunzip', H.fullname(ait)); l = len(t := await anext(ait := iter_to_agen(ait), ())); C = getcontext() 81 if maxqsize is None: maxqsize = C.AUNZIP_DEFAULT_MAX_QSIZE 82 if put_batch is None: put_batch = C.AUNZIP_DEFAULT_PUT_BATCH 83 if maxqsize < put_batch: raise ValueError('asyncutils.iters.aunzip: maxqsize cannot be less than put_batch') 84 f = partial(Z.Queue, maxqsize) 85 class aunzip_consumer: 86 __slots__ = '__q', 87 def __init__(self): self.__q = f() 88 async def __anext__(self, l=B.Lock(), f=partial(A.take, ait, put_batch, default=A.RAISE)): # noqa: B008 89 if self.__q.empty(): 90 async with l: 91 try: 92 async for _ in f(): await _a(Q, _) 93 except A.ItemsExhausted: 94 for q in Q: q.close() 95 try: r = await self.__q.get() 96 except Z.QueueShutDown: raise StopAsyncIteration from None 97 if r is fillvalue: raise StopAsyncIteration 98 return r 99 def close(self): self.__q.shutdown() 100 __aiter__, __anext__.__text_signature__ = _b, '($self)' # ty: ignore[unresolved-attribute] 101 await _a(Q := await to_tuple(aunzip_consumer() async for _ in aloops(l)), t); return Q
[docs] 102async def merge(*I, reverse=False, maxqsize=None, _=lambda p: lambda i: aconsume(amap(p, i, await_=True))): 103 audit('asyncutils.iters.merge', I); p, g, l, a = (q := (Z.LifoQueue if reverse else Z.Queue)(getcontext().MERGE_DEFAULT_MAX_QSIZE if maxqsize is None else maxqsize)).put, q.get, H.get_loop_and_set(), object() 104 async def close(): 105 await B.gather(*map(_(p), I)) 106 if not reverse: await p(a) 107 if reverse: q.put_nowait(a) 108 t = l.create_task(close()) 109 if reverse: await t 110 while True: 111 if (c := await g()) is a: break 112 yield c
[docs] 113def aflatten(it, _=A.achain.from_iterable): return _(it).__aiter__()
[docs] 114def acountdown(n, step=1, *, include_zero=False): return arange(n, -include_zero, -step)
115async def _atraverse(s, n, q, f, i, /): 116 a, c, g = (v := {s}).add, v.__contains__, q.append 117 if i: yield s 118 while q: 119 async for _ in afilterfalse(c, n(f())): a(_); g(_); yield _
[docs] 120def abfs(start, neighbours, *, _=_atraverse, include_start=True): return _(start, lambda x, /, _=neighbours: areversed(_(x)), q := deque((start,)), q.popleft, include_start)
[docs] 121def adfs(start, neighbours, *, _=_atraverse, include_start=True): return _(start, neighbours, q := [start], q.pop, include_start)
[docs] 122async def asattolo(it, /, _=_randrange): 123 i = len(a := await to_list(it)) 124 while i > 1: 125 i -= 1 126 a[j], a[i] = a[i], a[j := _(i)] 127 return a
[docs] 128async def abrent(f, start, /): 129 p = l = 1; t, h, m = start, await f(start), 0 130 while t is not h: 131 if p == l: t, l, p = h, 0, p<<1 132 h, l = await f(h), l+1 133 a = start, await A.iterf(l)(f)(start) 134 while a[0] is not a[1]: a, m = await B.gather(*map(f, a)), m+1 135 return a[0], l, m
[docs] 136async def asamplel(it, k, *, rrange=_randrange, rand=_rand): 137 if k < 0: raise ValueError('asyncutils.iters.asamplel: expected non-negative sample size') 138 if k == 0: return [] 139 R, W, i = await A.collect(it := iter_to_agen(it), k, default=A.RAISE), 1.0, k 140 while True: 141 W *= rand()**(1.0/k); i += (s := M.floor(M.log(rand(), 1-W))+1) 142 try: R[rrange(k)] = await anth(it, s) 143 except A.ItemsExhausted: return R
[docs] 144async def asample_weighted(it, k, *, rrange=_randrange, rand=_rand): 145 if k < 0: raise ValueError('asyncutils.iters.asample_weighted: expected non-negative sample size') 146 if k == 0: return [] 147 W, u, p = 0.0, rand(), 1.0 148 async def agen(it): 149 nonlocal W 150 async for i, w in iter_to_agen(it): 151 if w < 0: raise ValueError(f'asyncutils.iters.asample_weighted: weight {w} for item {i!r} is negative') 152 W += w; yield i, w 153 r = await A.collect(it := agen(it), k, default=A.RAISE) 154 async for i, w in it: 155 w /= W; u -= w*p; p *= 1-w # noqa: PLW2901 156 if u <= 0: r[rrange(k)], u, p = i, rand(), 1.0 157 return r
[docs] 158async def astarfilter(pred, it): 159 if pred is None: pred = bool 160 async for t in iter_to_agen(it): 161 if (await r) if B.iscoroutine(r := pred(*await to_list(t))) else r: yield t
[docs] 162async def astarfilterfalse(pred, it): 163 if pred is None: pred = bool 164 async for t in iter_to_agen(it): 165 if B.iscoroutine(r := pred(*await to_list(t))): r = await r 166 if not r: yield t
[docs] 167def amultistarfilter(pred, /, *its, strict=False): return astarfilter(pred, azip(*its, strict=strict))
[docs] 168async def amultistarfilterfalse(pred, /, *its, strict=False): return astarfilterfalse(pred, azip(*its, strict=strict))
[docs] 169def amultifilter(pred, /, *its, strict=False): return afilter(pred, azip(*its, strict=strict))
[docs] 170def amultifilterfalse(pred, /, *its, strict=False): return afilterfalse(pred, azip(*its, strict=strict))
[docs] 171def ahammingdist(i1, i2, /, cmpeq=H.check): return ailen(amultistarfilterfalse(cmpeq, i1, i2))
[docs] 172async def amergesortedby(its, *, key=_identity, await_=False, reverse=False, _=A.ignore_stopaiteration): 173 f = (h := []).append 174 for i, it in enumerate(its := await to_tuple(amap(iter_to_agen, its))): 175 with _: 176 k = key(v := await anext(it)) 177 f(((await k) if await_ else k, i, v)) 178 if reverse: from asyncutils._internal.compat import heapify as a, heappop as b, heappush as c 179 else: from heapq import heapify as a, heappop as b, heappush as c 180 a(h) 181 while h: 182 k, i, v = b(h); yield v 183 with _: 184 k = key(v := await anext(its[i])) 185 c(h, ((await k) if await_ else k, i, v))
[docs] 186async def batch(it, n, *, item_timeout=None, strict=False): 187 f, g, i = iter_to_agen(it).__anext__, (b := []).append, 0 188 while True: 189 for i in range(n): # noqa: B007 190 try: g(await B.wait_for(f(), item_timeout)) 191 except StopAsyncIteration: break 192 except TimeoutError: 193 if b: break 194 if b: 195 if strict and i < n-1: raise ValueError('asyncutils.iters.batch: incomplete batch') 196 yield H.copy_and_clear(b)
[docs] 197def batch2(it, n, strict=False): return A.aiter_from_f(partial(A.collect, it, n, default=A.RAISE if strict else _NO_DEFAULT), [])
198async def _asideeffect(f, it, size): 199 if size is None: 200 if B.iscoroutine(r := f(i := await anext(it))): 201 await r; yield i 202 async for i in it: await f(i); yield i 203 else: 204 yield i 205 async for i in it: f(i); yield i 206 elif B.iscoroutine(r := f(i := await anext(it := batch(it, size)))): 207 await r 208 for _ in i: yield _ 209 async for i in it: 210 await f(i) 211 for _ in i: yield _ 212 else: 213 for _ in i: yield _ 214 async for i in it: 215 f(i) 216 for _ in i: yield _
[docs] 217async def aside_effect(f, it, /, _=_asideeffect, *, size=None, before=None, after=None): 218 if before is not None: before() 219 try: await _(f, iter_to_agen(it), size) 220 finally: 221 if after is not None: after()
[docs] 222def asliced(seq, n, strict=False): 223 I = atakewhile(None, (seq[i:i+n] async for i in acount(step=n))) 224 if not strict: return I 225 async def r(): 226 async for s in I: 227 if len(s) != n: raise ValueError(f'asyncutils.iters.asliced: length of {seq!r} is not divisible by {n}') 228 yield s 229 return r()
[docs] 230def buffer(it, maxsize=0, *, timeout_get=None, timeout_put=None, cooldown=0.0, loop=None): 231 q = Z.Queue(maxsize) 232 if timeout_get is None: timeout_get = float('inf') 233 async def con(g=q.get, d=q.task_done, f=B.sleep.__get__(cooldown), e=True): # noqa: B008 234 while True: 235 if e: x = timeout_get+monotonic(); e = False 236 yield await g(); d() 237 if monotonic() > x: e = True; await f() # ty: ignore[possibly-unresolved-reference] 238 async def cons(): 239 try: await con() 240 finally: await A.safe_cancel(t) 241 async def prod(p=q.put): 242 try: 243 async for _ in iter_to_agen(it): 244 try: await B.wait_for(p(_), timeout_put) 245 except TimeoutError: break 246 finally: c.close() 247 if loop is None: loop = H.get_loop_and_set() 248 c, t = cons(), loop.create_task(prod()); return c
[docs] 249async def asplitat(it, pred, maxsplit=-1, keep_sep=False): 250 I, f = iter_to_agen(it), (b := []).append 251 if not maxsplit: yield await to_list(I); return 252 async for i in I: 253 if not pred(i): f(i); continue 254 yield b 255 if keep_sep: yield [i] 256 if maxsplit == 1: yield await to_list(I); return 257 f = (b := []).append; maxsplit -= 1 258 yield b
[docs] 259def batch_process(items, size, processor): return amap(processor, batch(items, size), await_=True)
[docs] 260async def window(it, size, step=1): 261 if not size >= 1 <= step: raise ValueError('asyncutils.iters.window: size and step should both be >=1') 262 a, c = (b := deque(maxlen=size)).append, 0 263 async for i in iter_to_agen(it): 264 a(i) 265 if len(b) == size: 266 if not c%step and (t := (yield tuple(b))) is not None: size, step = t 267 c += 1
[docs] 268async def aall(it): 269 async for _ in iter_to_agen(it): 270 if not _: return False 271 return True
[docs] 272async def aany(it): 273 async for _ in iter_to_agen(it): 274 if _: return True 275 return False
[docs] 276async def aisempty(it): 277 async for _ in iter_to_agen(it): return False 278 return True
279async def _aextreme(I, K, d, c, /): 280 if (r := await anext(I := iter_to_agen(I[0] if len(I) == 1 else I), _NO_DEFAULT)) is _NO_DEFAULT: 281 if d is _NO_DEFAULT: raise ValueError('empty (async) iterable passed to asyncutils.iters.amax or asyncutils.iters.amin with no default value') 282 return d 283 k = K(r) 284 async for i in I: 285 if c(x := K(i), k): k, r = x, i 286 return r
[docs] 287def amax(*it, key=_identity, default=_NO_DEFAULT, _=_aextreme): return _(it, key, default, O.gt)
[docs] 288def amin(*it, key=_identity, default=_NO_DEFAULT, _=_aextreme): return _(it, key, default, O.lt)
[docs] 289async def azip(*I, strict=False, _=A.ignore_stopaiteration.combined(RuntimeError)): # noqa: B008 290 I = tuple(map(iter_to_agen, I)) 291 with _: 292 while True: yield tuple(await B.gather(*map(anext, I))) # noqa: ASYNC119 293 if strict: 294 for x, y in enumerate(I): 295 with _: await anext(y); raise ValueError(f'asyncutils.iters.azip: iterable {x} longer than shortest iterable')
[docs] 296async def amap(f, /, *its, await_=False, strict=False): 297 it = azip(*its, strict=strict) 298 if await_: 299 async for _ in it: yield await f(*_) 300 else: 301 async for _ in it: yield f(*_)
[docs] 302async def afilter(f, it): 303 if f is None: f = bool 304 async for _ in iter_to_agen(it): 305 if B.iscoroutine(r := f(_)): r = await r 306 if r: yield _
[docs] 307def amapif(f, p, it, /, await_=False): return amap(f, afilter(p, it), await_)
[docs] 308def amultimapif(f, p, /, *its, await_=False): return astarmap(f, afilter(p, azip(*its)), await_)
[docs] 309async def arange(a, b=None, c=1, /): 310 if c == 0: raise ValueError('asyncutils.iters.arange: step cannot be zero') 311 if b is None: a, b = 0, a 312 f = b.__lt__ if c < 0 else b.__gt__ 313 while f(a): yield a; a += c
[docs] 314async def acount(start=0, step=1): 315 if isinstance(step, float): 316 if step.is_integer(): step = int(step) 317 else: start = float(start) 318 elif start.is_integer(): start = int(start) 319 while True: yield start; start += step
[docs] 320async def acycle(it): 321 a, I = (l := []).append, iter_to_agen(it) 322 async for i in I: yield i; a(i) 323 l = tuple(l) 324 while True: 325 for i in l: yield i
[docs] 326async def arepeat(elem, n=None): 327 if n is None or n < 0: 328 while True: yield elem 329 else: 330 while n: yield elem; n -= 1
[docs] 331async def aaccumulate(it, func=O.add, *, initial=None): 332 it = iter_to_agen(it) 333 if initial is None: 334 try: initial = await anext(it) 335 except StopAsyncIteration: return 336 yield initial 337 async for _ in it: yield (initial := func(initial, _))
[docs] 338async def acompress(data, selectors): 339 async for i, j in azip(data, selectors): 340 if j: yield i
[docs] 341async def adropwhile(pred, it, *, skip_first=False): 342 async for _ in (it := iter_to_agen(it)): 343 if pred(_): continue 344 if not skip_first: yield _ 345 break 346 async for _ in it: yield _
[docs] 347async def ac3merge(seqs): 348 seqs, g, d, c = await to_list(afilter(None, seqs)), (n := []).append, n.clear, None 349 while seqs: 350 for s in seqs: 351 c = s[0] 352 for t in seqs: 353 next(t := iter(t)) 354 if any(H.check(_, c) for _ in t): c = None; break 355 else: break 356 if c is None: raise ValueError('asyncutils.iters.ac3merge: cannot resolve sequences') 357 yield c 358 for s in seqs: 359 if s[0] == c: del s[0] 360 if s: g(s) 361 seqs = tuple(n); d()
[docs] 362def afilterfalse(f, it): return afilter(lambda i: not (f or bool)(i), it)
[docs] 363async def agroupby(it, key=_identity): 364 I, e = iter_to_agen(it), False 365 async def grouper(k): 366 nonlocal cv, ck, e; yield cv 367 async for cv in I: 368 if (ck := key(cv)) != k: return 369 yield cv 370 e = True 371 try: ck = key(cv := await anext(I)) 372 except StopAsyncIteration: return 373 while not e: 374 yield ck, (g := grouper(t := ck)) 375 if ck == t: await aconsume(g)
[docs] 376async def aislice(it, /, *a, _=lambda x: x if x is None else int(x, 0) if isinstance(x, str) else int(x)): 377 x, y, z = 0 if (s := slice(*map(_, a))).start is None else s.start, s.stop, 1 if s.step is None else s.step 378 if x < 0 or (y is not None and y < 0) or z <= 0: raise ValueError('asyncutils.iters.aislice: invalid indices') 379 async for i, j in azip(acount() if y is None else arange(max(x, y)), it): 380 if i == x: yield j; x += z
[docs] 381async def aiter_idx(it, value, start=0, stop=None, _=H.check): 382 async for i, j in aenumerate(aislice(it, start, stop), start): 383 if _(j, value): yield i
[docs] 384async def asieve(n): 385 if n < 2: return 386 yield 2; s, d = 3, bytearray((0, 1))*(n>>1) 387 async for p in aiter_idx(d, 1, s, M.isqrt(n)+1): 388 async for i in aiter_idx(d, 1, s, s := p*p): yield i 389 d[s:n:x] = bytes(len(range(s, n, x := p<<1))) 390 async for i in aiter_idx(d, 1, s): yield i
[docs] 391async def apairwise(it): 392 try: a = await anext(I := iter_to_agen(it)) 393 except StopAsyncIteration: return 394 async for b in I: yield a, b; a = b
[docs] 395@aawgenf2agenf 396async def atriplewise(it): 397 a, b, c = tee(iter_to_agen(it), 3, maxqsize=3); await B.gather(*(anext(g, None) for g in (b, c, c))) 398 return azip(a, b, c)
[docs] 399async def aproduct(*its, repeat=1): 400 if repeat < 0: raise ValueError('asyncutils.iters.aproduct: repeat cannot be negative') 401 r = [()] 402 async for p in arepeat(amap(to_tuple, its, await_=True), repeat): r = [(*x, y) for x in r async for y in p] 403 for _ in r: yield _
[docs] 404async def astarmap(f, it, /, await_=False): 405 it = iter_to_agen(it) 406 if await_: 407 async for _ in it: yield await f(*_) 408 else: 409 async for _ in it: yield f(*_)
[docs] 410async def atakewhile(pred, it): 411 if pred is None: pred = bool 412 async for _ in iter_to_agen(it): 413 if not pred(_): break 414 yield _
[docs] 415async def atakewhile_inclusive(pred, it): 416 if pred is None: pred = bool 417 async for _ in iter_to_agen(it): 418 yield _ 419 if not pred(_): break
[docs] 420async def atakewhilenot(pred, it): 421 if pred is None: pred = bool 422 async for _ in iter_to_agen(it): 423 if pred(_): break 424 yield _
[docs] 425async def atakewhilenot_inclusive(pred, it): 426 if pred is None: pred = bool 427 async for _ in iter_to_agen(it): 428 yield _ 429 if pred(_): break
[docs] 430def asquaresum(it): return asumprod(*tee(it))
[docs] 431async def aziplongest(*its, fillvalue=None): 432 n = len(I := list(map(iter_to_agen, its))) 433 while True: 434 f = (v := []).append 435 for i, a in enumerate(I): 436 try: _ = await anext(a) 437 except StopAsyncIteration: 438 n -= 1 439 if not n: return 440 I[i], _ = arepeat(fillvalue), fillvalue 441 f(_) 442 yield tuple(v)
[docs] 443def asumprod(p, q, /): return asum(amap(O.mul, p, q, strict=True))
[docs] 444async def aconvolve(signal, kernel, _=A.achain): 445 f = (w := deque((0,), n := len(K := await to_tuple(areversed(kernel))))*n).append 446 async for x in _(signal, arepeat(0, n-1)): f(x); yield await asumprod(K, w)
[docs] 447def atabulate(f, start=0, step=1, /, *, await_=False): return amap(f, acount(start, step), await_=await_)
[docs] 448async def asum(it, start=0): 449 async for i in iter_to_agen(it): start += i 450 return start
[docs] 451async def aprod(it, start=1): 452 async for i in iter_to_agen(it): start *= i 453 return start
[docs] 454async def amatprod(it, start): 455 async for i in iter_to_agen(it): start @= i 456 return start
[docs] 457def atail(n, it, /): return aislice(it, max(0, len(it)-n), None)
[docs] 458async def to_tuple(it, /): return tuple(await to_list(it))
[docs] 459async def to_set(it, /, frozen=False): r = await A.transient_block(H.get_loop_and_set(), set, it) if type(it) in Z.s else {_ async for _ in iter_to_agen(it)}; return frozenset(r) if frozen else r
[docs] 460async def to_list(it, /): return await A.transient_block(H.get_loop_and_set(), list, it) if type(it) in Z.s else [_ async for _ in iter_to_agen(it)]
[docs] 461async def aconsume(it, n=None, _=H.check_methods): 462 if n == 0: return 463 if n: it = A.take(it, n, default=A.RAISE) 464 if _(it, '__iter__'): await H.get_loop_and_set().run_in_executor(H.create_executor(aconsume) if (E := getattr(aconsume, 'executor', None)) is None else E, deque, it, 0) 465 else: 466 async for _ in it: ...
[docs] 467def anth(it, n, default=_NO_DEFAULT): return anext(aislice(it, n, None), *H.filter_out(default, s=_NO_DEFAULT))
[docs] 468async def aallequal(it, key=_identity, strict=False): 469 async for _ in (I := agroupby(it, key)): 470 async for _ in I: return False 471 return True 472 if strict: raise ValueError('asyncutils.aallequal: iterable cannot be empty with strict=True') 473 return True
[docs] 474async def acombinations(it, r): 475 if r > (n := len(p := await to_tuple(it))): return 476 I = list(range(r)); yield p[:r] 477 while True: 478 for i in range(r-1, -1, -1): 479 if I[i] != i+n-r: break 480 else: return 481 I[i] += 1 482 for j in range(i+1, r): I[j] = I[j-1]+1 483 yield tuple(p[i] for i in I)
[docs] 484async def acombinations_with_replacement(it, r): 485 if not (n := len(p := await to_tuple(it))) and r: return 486 I = [0]*r; yield (p[0],)*r 487 while True: 488 for i in range(r-1, -1, -1): 489 if I[i] != n-1: break 490 else: return 491 I[i:] = (I[i]+1,)*(r-i); yield tuple(p[i] for i in I)
[docs] 492async def apermutations(it, r=None): 493 n = len(p := await to_tuple(it)) 494 if (r := n if r is None else r) > n: return 495 I, C, x = list(range(n)), list(range(n, n-r, -1)), r-1; yield p[:r] 496 while n: 497 for i in range(x, -1, -1): 498 C[i] -= 1 499 if C[i]: I[i], I[-j] = I[-(j := C[i])], I[i]; yield tuple(p[i] for i in I[:r]); break 500 else: I[i:], C[i] = I[i+1:]+I[i:i+1], n-i 501 else: return
[docs] 502@aawgenf2agenf 503async def apowerset(it): s = await to_tuple(it); return aflatten(acombinations(s, r) for r in range(len(s)+1))
[docs] 504def aquantify(it, pred=bool): return asum(amap(pred, it))
[docs] 505def apadnone(it, _=A.achain): return _(it, arepeat(None)).__aiter__()
[docs] 506def agrouper(it, n, fillvalue=_NO_DEFAULT): I = (iter_to_agen(it),)*n; return azip(*I, strict=fillvalue is A.RAISE) if isinstance(fillvalue, type(A.RAISE)) else aziplongest(*I, fillvalue=fillvalue)
[docs] 507async def aroundrobin(*its): 508 I = (iter_to_agen(i) for i in its) 509 for i in range(len(its), 0, -1): 510 async for _ in (I := acycle(aislice(I, i))): yield await anext(_)
[docs] 511def aroundrobin2(*its): return afilter(partial(O.is_not, _NO_DEFAULT), aflatten(aziplongest(*its, fillvalue=_NO_DEFAULT)))
[docs] 512async def aunique_everseen(it, key=_identity): 513 A, a = (S := set()).add, (s := []).append 514 async for i in iter_to_agen(it): 515 k = key(i) 516 try: 517 if k not in S: A(k); yield i 518 except TypeError: 519 if k not in s: a(k); yield i
[docs] 520def aunique_justseen(it, key=_identity, _=O.itemgetter): return amap(_(0), agroupby(it)) if key is None else amap(anext, amap(_(1), agroupby(it, key)), await_=True)
[docs] 521@aawgenf2agenf 522async def aunique(it, key=None, reverse=False): return aunique_justseen(await asorted(it, key=key, reverse=reverse), key)
[docs] 523@aawgenf2agenf 524async def ancycles(it, n): return aflatten(arepeat(await to_tuple(it), n))
[docs] 525def apartition(pred, it): 526 if pred is None: pred = bool 527 async def agen(q, _=iter_to_agen(it).__anext__): # noqa: B008 528 p = q.popleft 529 while True: 530 while q: yield p() 531 try: (T if pred(v := await _()) else F).append(v) 532 except StopAsyncIteration: return 533 return map(agen, (F := deque(), T := deque()))
[docs] 534async def aiterexcept(f, exc, first=None): 535 if first is not None: yield await first() 536 with A.IgnoreErrors(exc): 537 while True: yield await f() # noqa: ASYNC119
[docs] 538async def ailen(it): 539 i = 0 540 async for _ in iter_to_agen(it): i += 1 541 return i
[docs] 542async def aiterate(f, start): 543 while True: yield start; start = await f(start)
[docs] 544async def asorted(it, *, key=_identity, reverse=False): 545 from heapq import heappop as g, heapify as f 546 b, a = (m := []).append, (r := []).append 547 async for i, j in aenumerate(it): b((key(j), i, j)) 548 f(m) 549 while m: a(g(m)) 550 if reverse: r.reverse() 551 return r
[docs] 552def acanonical(it): return asorted(it, key=id, reverse=True)
553async def _adpermfull(A, _): 554 while True: 555 yield tuple(A) 556 for i in range(_-2, -1, -1): 557 if A[i] < A[i+1]: break 558 else: return 559 for j in range(j := _-1, i, -1): # noqa: B020 560 if A[i] < A[j]: break 561 A[i], A[j] = A[j], A[i]; A[i+1:] = A[:i-_:-1] 562async def _adpermpartial(A, _): 563 h, R, l = A[:_], range(_-1, -1, -1), range(len(t := A[_:])) 564 while True: 565 yield tuple(h); p = t[-1] 566 for i in R: 567 if h[i] < p: break 568 p = h[i] 569 else: return 570 p = h[i] 571 for j in l: 572 if (c := t[j]) > p: h[i], t[j] = c, p; break 573 else: 574 for j in R: 575 if (c := h[j]) > p: h[i], h[j] = c, p; break 576 t += h[:-(x := _-i):-1]; i += 1; h[i:], t[:] = t[:x], t[x:]
[docs] 577async def empty_agen(): 578 if False: yield
[docs] 579async def agives(x): yield x
[docs] 580@aawgenf2agenf 581async def adistinct_permutations(it, r=None, f=(_adpermpartial, _adpermfull)): 582 if (S := len(I := await to_list(it))) < (_ := S if r is None else r): return agives(()) 583 if _ <= 0: return empty_agen() 584 a = f[_ == S] 585 try: I.sort(); return a(I, _) 586 except TypeError: 587 d = defaultdict(list) 588 for i in I: d[I.index(i)].append(i) 589 return amap(lambda i, E={k: acycle(v) for k, v in d.items()}: to_tuple(await anext(E[_]) for _ in i), a(await asorted(amap(I.index, I)), _), await_=True) # noqa: B008
[docs] 590async def aunique_to_each(*its): 591 p = frozenset(await to_list(amap(to_tuple, its, await_=True))) 592 for i, j in Counter(await to_list(aflatten(map(frozenset, p)))).items(): 593 if j == 1 and i in p: yield i
[docs] 594async def aderangements(it, r=None): 595 async for _ in acompress(apermutations(X := await to_tuple(it), r), amap(aall, amap(partial(amap, O.is_not), arepeat(Y := tuple(range(len(X)))), apermutations(Y, r)), await_=True)): yield _
[docs] 596def aintersperse(e, it, n=1): 597 if n <= 0: raise ValueError('asyncutils.iters.aintersperse: n must be positive') 598 return aislice(ainterleave_stopearly(arepeat(e), it), 1, None) if n == 1 else aflatten(aislice(ainterleave_stopearly(arepeat((e,)), batch(it, n)), 1, None))
[docs] 599def ainterleave_stopearly(*its): return aflatten(azip(*its))
[docs] 600def aspy(it, n=1): p, q = tee(it, maxqsize=n); return A.take(q, n), p
[docs] 601async def ainterleave_evenly(its, lengths=None): 602 I = await to_tuple(its) 603 try: 604 if (X := len(I)) != len(L := await to_tuple(lengths or map(len, I))): raise ValueError('asyncutils.iters.ainterleave_evenly: mismatch in length of its and lengths') 605 except TypeError: raise ValueError('asyncutils.iters.ainterleave_evenly: cannot determine lengths of (async) iterables') from None 606 A, *a = map(f := L.__getitem__, _ := sorted(range(X), key=f, reverse=True)); B, *b = (iter_to_agen(I[i]) for i in _); E, t = [A//X]*len(a), sum(L) 607 while t: 608 yield await anext(B); t -= 1; E[:] = map(O.sub, E, a) 609 for i, e in enumerate(E): 610 if e < 0: yield await anext(b[i]); t -= 1; E[i] += A
[docs] 611async def ainterleave_randomly(its, _=_randrange): 612 x = len(I := await to_list(amap(iter_to_agen, its))) 613 while x: 614 i = _(x) 615 try: yield await anext(I[i]) 616 except StopAsyncIteration: I[i] = I[-1]; del I[-1]; x -= 1
[docs] 617async def acollapse(it, base_typ=(str, bytes), levels=None): 618 if levels is None: levels = float('inf') 619 (g := (S := deque()).appendleft)((0, arepeat(iter_to_agen(it), 1))); f = S.popleft 620 while S: 621 l, n = N = f() 622 if l > levels: 623 async for i in n: yield i 624 continue 625 async for _ in n: 626 if isinstance(_, base_typ): yield _ 627 else: 628 try: t = iter_to_agen(_); g((l+1, t)); g(N); break 629 except TypeError: yield _
[docs] 630def afirsttrue(it, default=_NO_DEFAULT, pred=None): return anext(afilter(pred, it), *H.filter_out(default, s=_NO_DEFAULT))
[docs] 631def aprepend(val, it, _=A.achain): return _((val,), it).__aiter__()
[docs] 632def aappend(val, it, _=A.achain): return _(it, (val,)).__aiter__()
[docs] 633async def arandomproduct(*a, n=1, _=_randinst.choice): 634 async for i in ancycles(amap(to_tuple, a, await_=True), n): yield _(i)
[docs] 635async def arandomcombination(it, r, _=_sample): 636 (_ := _(range(len(p := await to_tuple(it))), r)).sort() 637 for i in _: yield p[i]
[docs] 638@aawgenf2agenf 639async def arandom_combination_with_replacement(it, r, _=_randrange): return amap((p := await to_tuple(it)).__getitem__, await asorted(arepeatfunc(_, r, len(p))))
[docs] 640async def arandom_permutation(it, r=None, _=_sample): 641 p = await to_tuple(it) 642 if r is None: r = len(p) 643 for i in _(p, r): yield i
[docs] 644async def afirst(it, default=_NO_DEFAULT): 645 async for i in iter_to_agen(it): return i 646 if default is _NO_DEFAULT: raise ValueError('asyncutils.iters.afirst called on empty iterable without default value') 647 return default
[docs] 648async def alast(it, default=_NO_DEFAULT, _=H.check_methods): 649 try: 650 if _(it, '__getitem__'): return it[-1] 651 return (await to_list(it)).pop() if (f := getattr(it, '__reversed__', None)) is None else f().__next__() 652 except (IndexError, TypeError, StopIteration, StopAsyncIteration): 653 if default is _NO_DEFAULT: raise ValueError('asyncutils.iters.alast called on empty iterable without default value') from None 654 return default
[docs] 655def anth_or_last(it, n, default=_NO_DEFAULT): return alast(aislice(it, n+1), default)
[docs] 656def abefore_and_after(pred, it): a, b = tee(it); return acompress(atakewhile(pred, a), azip(b)), b
[docs] 657async def anthcombination(it, r, idx): 658 if not 0 <= r <= (n := len(p := await to_tuple(it))): raise IndexError(f'asyncutils.iters.anthcombination: r={r} is out of range') 659 c, k = 1, min(r, n-r) 660 for i in range(1, k+1): c = c*(n-k+i)//i 661 if idx < 0: idx += c 662 if idx < 0 or idx >= c: raise IndexError(f'asyncutils.iters.anthcombination: idx={idx} is out of range') 663 while r: 664 c, n, r = c*r//n, n-1, r-1 665 while idx >= c: idx -= c; c, n = c*(n-r)//n, n-1 666 yield p[~n]
[docs] 667@aawgenf2agenf 668async def asubslices(it): return astarmap(O.getitem, azip(arepeat(s := await to_tuple(it)), astarmap(slice, acombinations(range(len(s)+1), 2))))
[docs] 669async def arepeatfunc(f, times=None, *a): 670 async def g(i=A.ignore_typeerrs, _=partial(f, *a)): # noqa: B008 671 r = _() 672 with i: r = await r 673 async for _ in aloops(times): await g()
[docs] 674async def apolynomial_from_roots(roots, _=(1,)): 675 async for r in iter_to_agen(roots): _ = aconvolve(_, (1, -r)) 676 async for i in iter_to_agen(_): yield i
[docs] 677@aawgenf2agenf 678async def atranspose(mat): return azip(*await to_list(mat), strict=True)
[docs] 679@aawgenf2agenf 680async def aflatten_tensor(tensor, base_typ=(str, bytes), _=H.check_methods): 681 I = iter_to_agen(tensor) 682 while True: 683 try: v = await anext(I) 684 except StopAsyncIteration: break 685 I = aprepend(v, I) 686 if isinstance(v, base_typ) or not (_(v, '__iter__') or _(v, '__aiter__')): break 687 I = aflatten(I) 688 return I
[docs] 689@aawgenf2agenf 690async def apolynomial_derivative(coeff): return amap(O.mul, r := await to_tuple(coeff), range(len(r)-1, 0))
[docs] 691async def apolynomial_eval(coeff, x): 692 if not (n := len(t := await to_tuple(coeff))): return type(x)(0) 693 return await asumprod(t, areversed(await A.collect(apowers(x), n-1)))
[docs] 694@aawgenf2agenf 695async def areshape(mat, shape): 696 if isinstance(shape, int): return batch(aflatten(mat), shape) 697 d = await anext(shape := iter_to_agen(shape)); return aislice(await A.areduce(batch, areversed(shape), aflatten_tensor(mat), await_=False), d)
698async def _factor_pollard(n): 699 if n == 4: return 2 700 async for b in arange(1, n): 701 x = y = 2; d = 1 702 while (d := M.gcd((x := (x*x+b)%n)-(y := ((z := (y*y+b)%n)*z+b)%n), n)) == 1: ... 703 if d != n: return d 704 raise ValueError(f'asyncutils.iters.afactor: internal error: {n} is prime') 705@lru_cache 706def _shift_to_odd(n): 707 if not ((1<<(s := ((n-1)^n).bit_length()-1))*(d := n>>s) == n and d&1 and s > -1): raise ValueError(f'asyncutils.iters.aisprime: internal error: {n} is invalid') 708 return s-1, d 709async def _probable_prime(n, base, _=_shift_to_odd): 710 s, d = _(m := n-1) 711 if (x := pow(base, d, n)) in {1, m}: return True 712 async for _ in aloops(s): 713 if (x := x*x%n) == m: return True 714 return False
[docs] 715async def aisprime(n, s=_smallprimes, p=_perfect_test, r=_randrange, f=_probable_prime): 716 if n < 210: return n in s 717 if not (n&1 and n%3 and n%5 and n%7 and n%11 and n%13 and n%17): return False 718 for l, _ in p: 719 if n < l: break 720 else: _ = arepeatfunc(r, 64, 2, n-1) 721 return await aall(amap(partial(f, n), _, await_=True))
[docs] 722async def afactor(n, _=_littleprimes, F=_factor_pollard): 723 if n < 1: raise ValueError('asyncutils.iters.afactor: no prime factors') 724 if n == 1: return 725 for p in _: 726 while not n%p: yield p; n //= p 727 if n == 1: return 728 e = (t := [n]).extend 729 for n in t: 730 if n < 44521 or await aisprime(n): yield n 731 else: e((f := await F(n), n//f))
[docs] 732async def arunning_median(it, *, maxlen=None): 733 if maxlen is None: 734 r, l, h = iter_to_agen(it).__anext__, [], []; from asyncutils._internal.compat import heappush as a, heappushpop as c; from heapq import heappush as b 735 while True: a(l, await r()); yield l[0]; b(h, c(l, await r())); yield (l[0]+h[0])/2 736 if (m := O.index(maxlen)) <= 0: raise ValueError('asyncutils.iters.arunning_median: window size should be positive') 737 w, o = deque(), []; from bisect import bisect_left as b, insort_right as f 738 async for i in iter_to_agen(it): 739 w.append(i); f(o, i) 740 if (n := len(o)) > m: del o[b(o, w.popleft())]; n -= 1 741 m = n>>1; yield o[m] if n&1 else (o[m-1]+o[m])/2
[docs] 742async def arandom_derangement(it, _=_randinst.shuffle): 743 if (l := len(s := await to_tuple(it))) < 2: 744 if s: raise ValueError('asyncutils.iters.arandom_derangement: no derangements to choose from') 745 return () 746 i = tuple(p := list(range(l))) 747 while any(map(O.is_, i, p)): _(p) 748 return O.itemgetter(*p)(s)
[docs] 749@aawgenf2agenf 750async def amatmul(*a): 751 M, N = map(iter_to_agen, a); N = aprepend(t := await to_tuple(await anext(N)), N) 752 return batch(astarmap(asumprod, aproduct(M, atranspose(N)), True), len(t))
[docs] 753@aawgenf2agenf 754async def mat_vec_mul(M, V): return amap(asumprod.__get__(await to_tuple(V)), amap(to_tuple, M, await_=True), await_=True)
[docs] 755async def vecs_eq(u, v, cmpeq=H.check, *, strict=True): 756 try: return await aall(amap(cmpeq, u, v, strict=strict)) 757 except ValueError: return False
[docs] 758async def afreivalds(A, B, C, k=None, _r=_randrange): n = len(A := await to_tuple(A)); return await aall(await vecs_eq(mat_vec_mul(A, mat_vec_mul(B, r := await to_tuple(arepeatfunc(_r, n, 2)))), mat_vec_mul(C, r), int.__eq__) async for _ in aloops(getcontext().AFRIEVALDS_DEFAULT_K if k is None else k))
[docs] 759def basic_collect(*_): return to_list(aislice(*_) if len(_) > 1 else _[0])
[docs] 760async def asubstrings(it): 761 for i in (s := await to_tuple(it)): yield i, 762 async for n in arange(2, c := len(s)+1): 763 async for i in arange(c-n): yield s[i:i+n]
[docs] 764def asubstr_indices(seq, reverse=False): 765 r = range(1, x := len(seq)+1) 766 if reverse: r = reversed(r) 767 return ((seq[i:(j := i+L)], i, j) for L in r async for i in arange(x-L))
[docs] 768def iter_task(it, summaryf=aconsume): 769 async def task(f): t = f(); await summaryf(it); return f()-t 770 return (l := H.get_loop_and_set()).create_task(task(l.time))
[docs] 771def agetitems_from_indices(it, indices, setatend=None, finish=False, _='index %r beyond the ends of (async) iterable {!r}'.format, _c=A.achain): 772 L, r, it = H.get_loop_and_set(), [], iter_to_agen(it) 773 async def consume(f=r.append): 774 s, M, m, d = L.time(), 0, 0, defaultdict(list) 775 async for x in amap(O.index, indices): 776 if M is not None: 777 if x < 0: M, m = None, min(m, x) 778 else: M = max(x, M) 779 d[x].append(F := L.create_future()); f(F) 780 async def helper(i, j, d=d): 781 async for x, F in aenumerate(d.pop(i, ())): 782 if F.cancelled(): continue 783 if F.done(): raise A.FutureCorrupted(f'asyncutils.iters.agetitems_from_indices: future at index {x} associated with index {i} called on (async) iterable {it!r} had its result/exception set by an external party') 784 F.set_result(j) 785 try: 786 if M is None: 787 b = deque(maxlen=-m) 788 async def helper2(i, j): await helper(i, j); b.append(j) 789 await aconsume(_c(astarmap(helper2, aenumerate(it)), amap(helper, acount(-1, -1), A.adisembowelleft(b), await_=True))) 790 else: await aconsume(astarmap(helper, aenumerate(A.take(it, M)), True)) 791 except A.CRITICAL: raise A.Critical 792 except BaseException as e: 793 async for F in afilterfalse(O.methodcaller('done'), r): F.set_exception(e) 794 raise 795 a = _(it) 796 for i, l in d.items(): 797 e = IndexError(a%i) 798 async for x, F in aenumerate(l): 799 if not F.cancelled(): 800 if F.done(): raise ExceptionGroup('asyncutils.iters.agetitems_from_indices: error while processing indices for which items were not successfully got', (e, A.FutureCorrupted(f'asyncutils.iters.agetitems_from_indices: future at index {x} associated with index {i} called on (async) iterable {it!r} had its result/exception set by an external party'))) 801 F.set_exception(e) 802 if finish: await aconsume(it) 803 if setatend is None: return 804 if setatend.done() and not setatend.cancelled(): raise A.FutureCorrupted(f'asyncutils.iters.agetitems_from_indices: future setatend at {id(setatend):#x} (exact type {H.fullname(setatend)}) had its result set by an external party') 805 setatend.set_result(L.time()-s) 806 c = L.create_task(consume()) 807 if setatend is not None: setatend.add_done_callback(lambda _: setattr(_, '__cancel', t := B.gather(A.safe_cancel(c), A.safe_cancel_batch(r))) or t.add_done_callback(lambda _: delattr(setatend, '__cancel'))) 808 audit('asyncutils.iters.agetitems_from_indices', H.fullname(it)); return r
[docs] 809async def aintersend(i1, i2): 810 audit('asyncutils.iters.aintersend', H.fullname(i1), H.fullname(i2)); t = None, None; f, g = i1.asend, i2.asend 811 while True: yield (t := tuple(await B.gather(f(t[1]), g(t[0]))))
[docs] 812def asendstream(i1, i2): audit('asyncutils.iters.asendstream', H.fullname(i1), H.fullname(i2)); return amap(i1.asend, i2, await_=True)
[docs] 813async def acat(first=None): 814 audit('asyncutils.iters.acat', first) 815 while True: first = yield first
[docs] 816async def aforever(): 817 audit('asyncutils.iters.aforever') 818 while True: yield
819async def _aguess(it, estlen, key, default, finish_event, cmp, _=_aextreme): 820 if (r := await _(A.take(I := iter_to_agen(it), M.ceil(estlen*A.RECIP_E)), key, _NO_DEFAULT, cmp)) is _NO_DEFAULT: 821 if default is _NO_DEFAULT: raise ValueError('empty (async) iterable passed to asyncutils.iters.aguessmax or asyncutils.iters.aguessmin with no default value') 822 return default 823 k = key(r) 824 try: 825 async for i in I: 826 if cmp(key(i), k): return i 827 return r 828 finally: 829 if not (finish_event is None or finish_event.is_set()): (t := (_ := H.get_loop_and_set().create_task)(aconsume(I))).add_done_callback(lambda _: finish_event.set()); _(finish_event.wait()).add_done_callback(t.cancel)
[docs] 830def aguessmax(it, estlen, *, key=_identity, default=_NO_DEFAULT, finish_event=None, _=_aguess): return _(it, estlen, key, default, finish_event, O.gt)
[docs] 831def aguessmin(it, estlen, *, key=_identity, default=_NO_DEFAULT, finish_event=None, _=_aguess): return _(it, estlen, key, default, finish_event, O.lt)
[docs] 832async def apowers_of_two(*, init=1, init_shift=0, shift=1): 833 init <<= init_shift 834 while True: yield init; init <<= shift
[docs] 835def apowers(base, start=1): return aprepend(start, arepeat(0)) if base == 0 else arepeat(start) if base == 1 else apowers_of_two(init=base, shift=base.bit_length()-1) if base.bit_count() == 1 else aaccumulate(arepeat(base), O.mul, initial=start)
[docs] 836async def areversed(it, /): 837 try: 838 async for i in iter_to_agen(reversed(it)): yield i 839 except TypeError: 840 f = (it := await to_list(it)).pop 841 while it: yield f()
[docs] 842async def arunlength_encode(it, /): 843 async for k, g in agroupby(it): yield k, ailen(g)
[docs] 844def arunlength_decode(it, /): return aflatten(astarmap(arepeat, it))
845async def _dfthelper(a, i=False, /): return await to_tuple(A.take(apowers(M.e**((1 if i else -1)*1j*M.tau/(N := len(a := await to_tuple(a))))), N)), N, a
[docs] 846async def adft(a, /, _=_dfthelper): 847 R, N, a = await _(a) 848 for k in range(N): yield M.sumprod(a, (R[k*i%N] for i in range(N)))
[docs] 849async def aidft(A, /, _=_dfthelper): 850 R, N, A = await _(A, True) 851 for k in range(N): yield M.sumprod(A, (R[k*n%N] for n in range(N)))/N
852async def _aargminmax(it, key, default, f): return (await f(aenumerate(amap(key, it)), key=O.itemgetter(1), default=default))[0]
[docs] 853def aargmin(it, key=_identity, default=-1, _=_aargminmax): return _(it, key, default, amin)
[docs] 854def aargmax(it, key=_identity, default=-1, _=_aargminmax): return _(it, key, default, amax)
[docs] 855def arunningmean(it): return amap(O.truediv, aaccumulate(it), acount(1))
[docs] 856@aawgenf2agenf 857async def apowersetofsets(it, *, frozen=True): S = tuple(dict.fromkeys(await to_list(amap(frozenset, azip(it))))); return aflatten(astarmap((frozenset if frozen else set).union, acombinations(S, r)) async for r in arange(len(S)+1))
[docs] 858async def aserialize(it): 859 l, n = B.Lock(), iter_to_agen(it).__anext__ 860 while True: 861 async with l: x = await n() 862 yield x # noqa: RUF070
[docs] 863async def aonline_sorter(it, *, key=_identity, reverse=False, slow=None): 864 audit('asyncutils.iters.aonline_sorter', id(it)); c = Z if reverse else __import__('heapq') 865 if slow is None: slow = getcontext().AONLINESORTER_DEFAULT_SLOW 866 if (e := getattr(aonline_sorter, 'executor', None)) is None: e = H.create_executor(aonline_sorter) 867 q = partial(p := partial(H.get_loop_and_set().run_in_executor, e), key) 868 await p(c.heapify, it := [(await q(x) if slow else key(x), i, x) async for i, x in aenumerate(it)]) 869 a, b, i = partial(c.heappop, it), partial(c.heappush, it), len(it) 870 while it: 871 if (j := (yield a()[2])) is not None: b(((await q(j)) if slow else key(j), i, j)); i += 1
872P.patch_function_signatures((aonline_sorter, 'it, *, key={}, reverse=False, slow=None'), (aside_effect, 'f, it, /, *, size=None, before=None, after=None'), (apolynomial_from_roots, 'roots'), (adistinct_permutations, 'it, r=None'), (abfs, _ := 'start, neighbours, *, include_start=True'), (adfs, _), (aaccumulate, 'it, func={}, *, initial=None'), (aconvolve, 'signal, kernel'), (aislice, 'it, /, *a'), (ainterleave_randomly, 'its'), (ahammingdist, 'i1, i2, /, cmpeq={}'), (aiter_idx, 'it, value, start=0, stop=None'), (amergesortedby, 'its, *, key={}, await_=False, reverse=False'), (amax, _ := '*it, key={}, default=_NO_DEFAULT'), (amin, _), (asample_weighted, _ := 'it, k, *, rrange={0}, rand={0}'), (asamplel, _), (arandomcombination, _ := 'it, r'), (arandom_combination_with_replacement, _), (asorted, 'it, *, key={}, reverse=False'), (aunique_justseen, _ := 'it, key={}'), (aunique_everseen, _), (agroupby, _), (vecs_eq, 'u, v, cmpeq={}, *, strict=True'), (adft, 'xarr, /'), (aidft, 'Xarr, /'), (aconsume, 'it, n=None'), (aallequal, 'it, key={}, strict=False'), (aprepend, 'val, it'), (arandomproduct, '*a, n=1'), (asattolo, 'it, /'), (aargmin, _ := 'it, key={}, default=-1'), (aargmax, _), (afactor, _ := 'n'), (agetitems_from_indices, 'it, indices, setatend=None, finish=False'), (alast, 'it, default=_NO_DEFAULT'), (aisprime, _), (aguessmax, _ := 'it, estlen, *, key={}, default=_NO_DEFAULT, finish_event=None'), (aguessmin, _), (aflatten, _ := 'it'), (arandom_derangement, _), (afreivalds, 'A, B, C, k=None'), (basic_collect, 'it, n'), (iter_task, 'it, summaryf={}'), (apadnone, 'it'), (aunzip, 'ait, put_batch=None, fillvalue={}'), (aflatten_tensor, 'tensor, base_typ={}'), (arandom_permutation, 'it, r=None')) 873del P, _asideeffect, _adpermpartial, _adpermfull, _atraverse, _aunzip_put, _aguess, _aargminmax, _aextreme, _factor_pollard, _shift_to_odd, _probable_prime, _dfthelper, _littleprimes, _randrange, _sample, _smallprimes, _perfect_test, _rand, _randinst, _identity, _