1from asyncutils import aenumerate, getcontext, iter_to_agen
2from asyncutils.config import _randinst
3from asyncutils.constants import _NO_DEFAULT
4from asyncutils._internal import compat as Z, helpers as H, patch as P
5from asyncutils._internal.submodules import iters_all as __all__
6import asyncutils as A, asyncio as B, operator as O, math as M
7from collections import Counter, defaultdict, deque
8from functools import partial, lru_cache, wraps
9from itertools import repeat
10from sys import audit
11from time import monotonic
12_rand, _randrange, _sample, _smallprimes, _perfect_test, _identity = _randinst.random, _randinst.randrange, _randinst.sample, frozenset(_littleprimes := (2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199)), ((0x7ff, (2,)), (0x8a8d7f, (31, 73)), (0x11baa74c5, (2, 7, 61)), (0x1053cb094c1, (2, 13, 23, 0x195f53)), (0x1f51f3fee3b, _littleprimes[:5]), (0x32907381cdf, _littleprimes[:6]), (1<<64, (2, 0x145, 0x249f, 0x6e12, 0x6e0d7, 0x953d18, 0x6b0191fe)), (0x2be6951adc5b22410a5fd, _littleprimes[:13]), (0x4c16c7697197146a6b8eb49518c5, _littleprimes[:18])), lambda _, /: _
[docs]
13def fmap(fs, /, *a, **k): return agather(f(*a, **k) async for f in iter_to_agen(fs))
[docs]
14async def fmap_sequential(fs, /, *a, **k):
15 async for f in iter_to_agen(fs): yield await f(*a, **k)
[docs]
16async def fmap_parallel(fs, /, *a, **k):
17 t = H.get_loop_and_set().create_task
18 for r in await to_list(t(f(*a, **k)) async for f in iter_to_agen(fs)): yield await r
[docs]
19async def map_on_map(outer, inner, it, *, inner_await=False, outer_await=False):
20 async for _ in amap(inner, it, await_=inner_await): yield await to_tuple(amap(outer, _, await_=outer_await))
[docs]
21def aevery(it, n, *, skip_first=False): return aislice(it, skip_first, None, n)
[docs]
22def aeveryother(it, *, skip_first=False): return aevery(it, 2, skip_first=skip_first)
[docs]
23async def agather(it_of_its, return_exceptions=False): return await B.gather(*await to_list(it_of_its), return_exceptions=return_exceptions)
[docs]
24def aawgenf2agenf(f, /):
25 async def g(*a, **k):
26 async for _ in await f(*a, **k): yield _
27 return wraps(f)(g)
[docs]
28def tee(it, n=2, *, maxqsize=None, put_exc=None, loop=None):
29 if n <= 0: raise ValueError('asyncutils.iters.tee: n must be positive')
30 if n == 1: return iter_to_agen(it),
31 if loop is None: loop = H.get_loop_and_set()
32 C = getcontext()
33 if put_exc is None: put_exc = C.TEE_DEFAULT_PUT_EXC
34 if maxqsize is None: maxqsize = C.TEE_DEFAULT_MAX_QSIZE
35 Q = tuple(Z.Queue(maxqsize) for _ in repeat(None, n))
36 async def iterator(q):
37 nonlocal n
38 while True:
39 try:
40 if A.exception_occurred(i := await q.get()) and put_exc: raise A.unwrap_exc(i)
41 yield i
42 except Z.QueueShutDown:
43 n -= 1
44 if n == 0: await A.safe_cancel(t)
45 break
46 async def feed():
47 async def helper(i): await agather((q.put(i) for q in Q), True)
48 try: await agather(amap(helper, it))
49 except A.CRITICAL: raise A.Critical
50 except BaseException as e:
51 if put_exc: await helper(A.wrap_exc(e))
52 else:
53 for q in Q: q.shutdown(True)
54 raise
55 finally:
56 for q in Q: q.shutdown()
57 t = loop.create_task(feed()); return tuple(map(iterator, Q))
[docs]
58async def adoublestarmap(f, it, /, await_=False):
59 it = iter_to_agen(it)
60 if await_:
61 async for _ in it: yield await f(**_)
62 else:
63 async for _ in it: yield f(**_)
[docs]
64async def astarmap_with_kwds(f, it, /, await_=False):
65 it = iter_to_agen(it)
66 if await_:
67 async for a, k in it: yield await f(*a, **k)
68 else:
69 async for a, k in it: yield f(*a, **k)
[docs]
70async def aloops(n):
71 m, n = divmod(n, 1024)
72 for _ in repeat(None, m):
73 for _ in repeat(None, 1024): yield
74 await A.yield_to_event_loop
75 for _ in repeat(None, n): yield
76async def _aunzip_put(*_):
77 for q, i in zip(*_):
78 with A.ignore_qshutdown: await q.put(i)
[docs]
79async def aunzip(ait, *, fillvalue=_NO_DEFAULT, put_batch=None, maxqsize=None, _a=_aunzip_put, _b=_identity):
80 audit('asyncutils.iters.aunzip', H.fullname(ait)); l = len(t := await anext(ait := iter_to_agen(ait), ())); C = getcontext()
81 if maxqsize is None: maxqsize = C.AUNZIP_DEFAULT_MAX_QSIZE
82 if put_batch is None: put_batch = C.AUNZIP_DEFAULT_PUT_BATCH
83 if maxqsize < put_batch: raise ValueError('asyncutils.iters.aunzip: maxqsize cannot be less than put_batch')
84 f = partial(Z.Queue, maxqsize)
85 class aunzip_consumer:
86 __slots__ = '__q',
87 def __init__(self): self.__q = f()
88 async def __anext__(self, l=B.Lock(), f=partial(A.take, ait, put_batch, default=A.RAISE)): # noqa: B008
89 if self.__q.empty():
90 async with l:
91 try:
92 async for _ in f(): await _a(Q, _)
93 except A.ItemsExhausted:
94 for q in Q: q.close()
95 try: r = await self.__q.get()
96 except Z.QueueShutDown: raise StopAsyncIteration from None
97 if r is fillvalue: raise StopAsyncIteration
98 return r
99 def close(self): self.__q.shutdown()
100 __aiter__, __anext__.__text_signature__ = _b, '($self)' # ty: ignore[unresolved-attribute]
101 await _a(Q := await to_tuple(aunzip_consumer() async for _ in aloops(l)), t); return Q
[docs]
102async def merge(*I, reverse=False, maxqsize=None, _=lambda p: lambda i: aconsume(amap(p, i, await_=True))):
103 audit('asyncutils.iters.merge', I); p, g, l, a = (q := (Z.LifoQueue if reverse else Z.Queue)(getcontext().MERGE_DEFAULT_MAX_QSIZE if maxqsize is None else maxqsize)).put, q.get, H.get_loop_and_set(), object()
104 async def close():
105 await B.gather(*map(_(p), I))
106 if not reverse: await p(a)
107 if reverse: q.put_nowait(a)
108 t = l.create_task(close())
109 if reverse: await t
110 while True:
111 if (c := await g()) is a: break
112 yield c
[docs]
113def aflatten(it, _=A.achain.from_iterable): return _(it).__aiter__()
[docs]
114def acountdown(n, step=1, *, include_zero=False): return arange(n, -include_zero, -step)
115async def _atraverse(s, n, q, f, i, /):
116 a, c, g = (v := {s}).add, v.__contains__, q.append
117 if i: yield s
118 while q:
119 async for _ in afilterfalse(c, n(f())): a(_); g(_); yield _
[docs]
120def abfs(start, neighbours, *, _=_atraverse, include_start=True): return _(start, lambda x, /, _=neighbours: areversed(_(x)), q := deque((start,)), q.popleft, include_start)
[docs]
121def adfs(start, neighbours, *, _=_atraverse, include_start=True): return _(start, neighbours, q := [start], q.pop, include_start)
[docs]
122async def asattolo(it, /, _=_randrange):
123 i = len(a := await to_list(it))
124 while i > 1:
125 i -= 1
126 a[j], a[i] = a[i], a[j := _(i)]
127 return a
[docs]
128async def abrent(f, start, /):
129 p = l = 1; t, h, m = start, await f(start), 0
130 while t is not h:
131 if p == l: t, l, p = h, 0, p<<1
132 h, l = await f(h), l+1
133 a = start, await A.iterf(l)(f)(start)
134 while a[0] is not a[1]: a, m = await B.gather(*map(f, a)), m+1
135 return a[0], l, m
[docs]
136async def asamplel(it, k, *, rrange=_randrange, rand=_rand):
137 if k < 0: raise ValueError('asyncutils.iters.asamplel: expected non-negative sample size')
138 if k == 0: return []
139 R, W, i = await A.collect(it := iter_to_agen(it), k, default=A.RAISE), 1.0, k
140 while True:
141 W *= rand()**(1.0/k); i += (s := M.floor(M.log(rand(), 1-W))+1)
142 try: R[rrange(k)] = await anth(it, s)
143 except A.ItemsExhausted: return R
[docs]
144async def asample_weighted(it, k, *, rrange=_randrange, rand=_rand):
145 if k < 0: raise ValueError('asyncutils.iters.asample_weighted: expected non-negative sample size')
146 if k == 0: return []
147 W, u, p = 0.0, rand(), 1.0
148 async def agen(it):
149 nonlocal W
150 async for i, w in iter_to_agen(it):
151 if w < 0: raise ValueError(f'asyncutils.iters.asample_weighted: weight {w} for item {i!r} is negative')
152 W += w; yield i, w
153 r = await A.collect(it := agen(it), k, default=A.RAISE)
154 async for i, w in it:
155 w /= W; u -= w*p; p *= 1-w # noqa: PLW2901
156 if u <= 0: r[rrange(k)], u, p = i, rand(), 1.0
157 return r
[docs]
158async def astarfilter(pred, it):
159 if pred is None: pred = bool
160 async for t in iter_to_agen(it):
161 if (await r) if B.iscoroutine(r := pred(*await to_list(t))) else r: yield t
[docs]
162async def astarfilterfalse(pred, it):
163 if pred is None: pred = bool
164 async for t in iter_to_agen(it):
165 if B.iscoroutine(r := pred(*await to_list(t))): r = await r
166 if not r: yield t
[docs]
167def amultistarfilter(pred, /, *its, strict=False): return astarfilter(pred, azip(*its, strict=strict))
[docs]
168async def amultistarfilterfalse(pred, /, *its, strict=False): return astarfilterfalse(pred, azip(*its, strict=strict))
[docs]
169def amultifilter(pred, /, *its, strict=False): return afilter(pred, azip(*its, strict=strict))
[docs]
170def amultifilterfalse(pred, /, *its, strict=False): return afilterfalse(pred, azip(*its, strict=strict))
[docs]
171def ahammingdist(i1, i2, /, cmpeq=H.check): return ailen(amultistarfilterfalse(cmpeq, i1, i2))
[docs]
172async def amergesortedby(its, *, key=_identity, await_=False, reverse=False, _=A.ignore_stopaiteration):
173 f = (h := []).append
174 for i, it in enumerate(its := await to_tuple(amap(iter_to_agen, its))):
175 with _:
176 k = key(v := await anext(it))
177 f(((await k) if await_ else k, i, v))
178 if reverse: from asyncutils._internal.compat import heapify as a, heappop as b, heappush as c
179 else: from heapq import heapify as a, heappop as b, heappush as c
180 a(h)
181 while h:
182 k, i, v = b(h); yield v
183 with _:
184 k = key(v := await anext(its[i]))
185 c(h, ((await k) if await_ else k, i, v))
[docs]
186async def batch(it, n, *, item_timeout=None, strict=False):
187 f, g, i = iter_to_agen(it).__anext__, (b := []).append, 0
188 while True:
189 for i in range(n): # noqa: B007
190 try: g(await B.wait_for(f(), item_timeout))
191 except StopAsyncIteration: break
192 except TimeoutError:
193 if b: break
194 if b:
195 if strict and i < n-1: raise ValueError('asyncutils.iters.batch: incomplete batch')
196 yield H.copy_and_clear(b)
[docs]
197def batch2(it, n, strict=False): return A.aiter_from_f(partial(A.collect, it, n, default=A.RAISE if strict else _NO_DEFAULT), [])
198async def _asideeffect(f, it, size):
199 if size is None:
200 if B.iscoroutine(r := f(i := await anext(it))):
201 await r; yield i
202 async for i in it: await f(i); yield i
203 else:
204 yield i
205 async for i in it: f(i); yield i
206 elif B.iscoroutine(r := f(i := await anext(it := batch(it, size)))):
207 await r
208 for _ in i: yield _
209 async for i in it:
210 await f(i)
211 for _ in i: yield _
212 else:
213 for _ in i: yield _
214 async for i in it:
215 f(i)
216 for _ in i: yield _
[docs]
217async def aside_effect(f, it, /, _=_asideeffect, *, size=None, before=None, after=None):
218 if before is not None: before()
219 try: await _(f, iter_to_agen(it), size)
220 finally:
221 if after is not None: after()
[docs]
222def asliced(seq, n, strict=False):
223 I = atakewhile(None, (seq[i:i+n] async for i in acount(step=n)))
224 if not strict: return I
225 async def r():
226 async for s in I:
227 if len(s) != n: raise ValueError(f'asyncutils.iters.asliced: length of {seq!r} is not divisible by {n}')
228 yield s
229 return r()
[docs]
230def buffer(it, maxsize=0, *, timeout_get=None, timeout_put=None, cooldown=0.0, loop=None):
231 q = Z.Queue(maxsize)
232 if timeout_get is None: timeout_get = float('inf')
233 async def con(g=q.get, d=q.task_done, f=B.sleep.__get__(cooldown), e=True): # noqa: B008
234 while True:
235 if e: x = timeout_get+monotonic(); e = False
236 yield await g(); d()
237 if monotonic() > x: e = True; await f() # ty: ignore[possibly-unresolved-reference]
238 async def cons():
239 try: await con()
240 finally: await A.safe_cancel(t)
241 async def prod(p=q.put):
242 try:
243 async for _ in iter_to_agen(it):
244 try: await B.wait_for(p(_), timeout_put)
245 except TimeoutError: break
246 finally: c.close()
247 if loop is None: loop = H.get_loop_and_set()
248 c, t = cons(), loop.create_task(prod()); return c
[docs]
249async def asplitat(it, pred, maxsplit=-1, keep_sep=False):
250 I, f = iter_to_agen(it), (b := []).append
251 if not maxsplit: yield await to_list(I); return
252 async for i in I:
253 if not pred(i): f(i); continue
254 yield b
255 if keep_sep: yield [i]
256 if maxsplit == 1: yield await to_list(I); return
257 f = (b := []).append; maxsplit -= 1
258 yield b
[docs]
259def batch_process(items, size, processor): return amap(processor, batch(items, size), await_=True)
[docs]
260async def window(it, size, step=1):
261 if not size >= 1 <= step: raise ValueError('asyncutils.iters.window: size and step should both be >=1')
262 a, c = (b := deque(maxlen=size)).append, 0
263 async for i in iter_to_agen(it):
264 a(i)
265 if len(b) == size:
266 if not c%step and (t := (yield tuple(b))) is not None: size, step = t
267 c += 1
[docs]
268async def aall(it):
269 async for _ in iter_to_agen(it):
270 if not _: return False
271 return True
[docs]
272async def aany(it):
273 async for _ in iter_to_agen(it):
274 if _: return True
275 return False
[docs]
276async def aisempty(it):
277 async for _ in iter_to_agen(it): return False
278 return True
279async def _aextreme(I, K, d, c, /):
280 if (r := await anext(I := iter_to_agen(I[0] if len(I) == 1 else I), _NO_DEFAULT)) is _NO_DEFAULT:
281 if d is _NO_DEFAULT: raise ValueError('empty (async) iterable passed to asyncutils.iters.amax or asyncutils.iters.amin with no default value')
282 return d
283 k = K(r)
284 async for i in I:
285 if c(x := K(i), k): k, r = x, i
286 return r
[docs]
287def amax(*it, key=_identity, default=_NO_DEFAULT, _=_aextreme): return _(it, key, default, O.gt)
[docs]
288def amin(*it, key=_identity, default=_NO_DEFAULT, _=_aextreme): return _(it, key, default, O.lt)
[docs]
289async def azip(*I, strict=False, _=A.ignore_stopaiteration.combined(RuntimeError)): # noqa: B008
290 I = tuple(map(iter_to_agen, I))
291 with _:
292 while True: yield tuple(await B.gather(*map(anext, I))) # noqa: ASYNC119
293 if strict:
294 for x, y in enumerate(I):
295 with _: await anext(y); raise ValueError(f'asyncutils.iters.azip: iterable {x} longer than shortest iterable')
[docs]
296async def amap(f, /, *its, await_=False, strict=False):
297 it = azip(*its, strict=strict)
298 if await_:
299 async for _ in it: yield await f(*_)
300 else:
301 async for _ in it: yield f(*_)
[docs]
302async def afilter(f, it):
303 if f is None: f = bool
304 async for _ in iter_to_agen(it):
305 if B.iscoroutine(r := f(_)): r = await r
306 if r: yield _
[docs]
307def amapif(f, p, it, /, await_=False): return amap(f, afilter(p, it), await_)
[docs]
308def amultimapif(f, p, /, *its, await_=False): return astarmap(f, afilter(p, azip(*its)), await_)
[docs]
309async def arange(a, b=None, c=1, /):
310 if c == 0: raise ValueError('asyncutils.iters.arange: step cannot be zero')
311 if b is None: a, b = 0, a
312 f = b.__lt__ if c < 0 else b.__gt__
313 while f(a): yield a; a += c
[docs]
314async def acount(start=0, step=1):
315 if isinstance(step, float):
316 if step.is_integer(): step = int(step)
317 else: start = float(start)
318 elif start.is_integer(): start = int(start)
319 while True: yield start; start += step
[docs]
320async def acycle(it):
321 a, I = (l := []).append, iter_to_agen(it)
322 async for i in I: yield i; a(i)
323 l = tuple(l)
324 while True:
325 for i in l: yield i
[docs]
326async def arepeat(elem, n=None):
327 if n is None or n < 0:
328 while True: yield elem
329 else:
330 while n: yield elem; n -= 1
[docs]
331async def aaccumulate(it, func=O.add, *, initial=None):
332 it = iter_to_agen(it)
333 if initial is None:
334 try: initial = await anext(it)
335 except StopAsyncIteration: return
336 yield initial
337 async for _ in it: yield (initial := func(initial, _))
[docs]
338async def acompress(data, selectors):
339 async for i, j in azip(data, selectors):
340 if j: yield i
[docs]
341async def adropwhile(pred, it, *, skip_first=False):
342 async for _ in (it := iter_to_agen(it)):
343 if pred(_): continue
344 if not skip_first: yield _
345 break
346 async for _ in it: yield _
[docs]
347async def ac3merge(seqs):
348 seqs, g, d, c = await to_list(afilter(None, seqs)), (n := []).append, n.clear, None
349 while seqs:
350 for s in seqs:
351 c = s[0]
352 for t in seqs:
353 next(t := iter(t))
354 if any(H.check(_, c) for _ in t): c = None; break
355 else: break
356 if c is None: raise ValueError('asyncutils.iters.ac3merge: cannot resolve sequences')
357 yield c
358 for s in seqs:
359 if s[0] == c: del s[0]
360 if s: g(s)
361 seqs = tuple(n); d()
[docs]
362def afilterfalse(f, it): return afilter(lambda i: not (f or bool)(i), it)
[docs]
363async def agroupby(it, key=_identity):
364 I, e = iter_to_agen(it), False
365 async def grouper(k):
366 nonlocal cv, ck, e; yield cv
367 async for cv in I:
368 if (ck := key(cv)) != k: return
369 yield cv
370 e = True
371 try: ck = key(cv := await anext(I))
372 except StopAsyncIteration: return
373 while not e:
374 yield ck, (g := grouper(t := ck))
375 if ck == t: await aconsume(g)
[docs]
376async def aislice(it, /, *a, _=lambda x: x if x is None else int(x, 0) if isinstance(x, str) else int(x)):
377 x, y, z = 0 if (s := slice(*map(_, a))).start is None else s.start, s.stop, 1 if s.step is None else s.step
378 if x < 0 or (y is not None and y < 0) or z <= 0: raise ValueError('asyncutils.iters.aislice: invalid indices')
379 async for i, j in azip(acount() if y is None else arange(max(x, y)), it):
380 if i == x: yield j; x += z
[docs]
381async def aiter_idx(it, value, start=0, stop=None, _=H.check):
382 async for i, j in aenumerate(aislice(it, start, stop), start):
383 if _(j, value): yield i
[docs]
384async def asieve(n):
385 if n < 2: return
386 yield 2; s, d = 3, bytearray((0, 1))*(n>>1)
387 async for p in aiter_idx(d, 1, s, M.isqrt(n)+1):
388 async for i in aiter_idx(d, 1, s, s := p*p): yield i
389 d[s:n:x] = bytes(len(range(s, n, x := p<<1)))
390 async for i in aiter_idx(d, 1, s): yield i
[docs]
391async def apairwise(it):
392 try: a = await anext(I := iter_to_agen(it))
393 except StopAsyncIteration: return
394 async for b in I: yield a, b; a = b
[docs]
395@aawgenf2agenf
396async def atriplewise(it):
397 a, b, c = tee(iter_to_agen(it), 3, maxqsize=3); await B.gather(*(anext(g, None) for g in (b, c, c)))
398 return azip(a, b, c)
[docs]
399async def aproduct(*its, repeat=1):
400 if repeat < 0: raise ValueError('asyncutils.iters.aproduct: repeat cannot be negative')
401 r = [()]
402 async for p in arepeat(amap(to_tuple, its, await_=True), repeat): r = [(*x, y) for x in r async for y in p]
403 for _ in r: yield _
[docs]
404async def astarmap(f, it, /, await_=False):
405 it = iter_to_agen(it)
406 if await_:
407 async for _ in it: yield await f(*_)
408 else:
409 async for _ in it: yield f(*_)
[docs]
410async def atakewhile(pred, it):
411 if pred is None: pred = bool
412 async for _ in iter_to_agen(it):
413 if not pred(_): break
414 yield _
[docs]
415async def atakewhile_inclusive(pred, it):
416 if pred is None: pred = bool
417 async for _ in iter_to_agen(it):
418 yield _
419 if not pred(_): break
[docs]
420async def atakewhilenot(pred, it):
421 if pred is None: pred = bool
422 async for _ in iter_to_agen(it):
423 if pred(_): break
424 yield _
[docs]
425async def atakewhilenot_inclusive(pred, it):
426 if pred is None: pred = bool
427 async for _ in iter_to_agen(it):
428 yield _
429 if pred(_): break
[docs]
430def asquaresum(it): return asumprod(*tee(it))
[docs]
431async def aziplongest(*its, fillvalue=None):
432 n = len(I := list(map(iter_to_agen, its)))
433 while True:
434 f = (v := []).append
435 for i, a in enumerate(I):
436 try: _ = await anext(a)
437 except StopAsyncIteration:
438 n -= 1
439 if not n: return
440 I[i], _ = arepeat(fillvalue), fillvalue
441 f(_)
442 yield tuple(v)
[docs]
443def asumprod(p, q, /): return asum(amap(O.mul, p, q, strict=True))
[docs]
444async def aconvolve(signal, kernel, _=A.achain):
445 f = (w := deque((0,), n := len(K := await to_tuple(areversed(kernel))))*n).append
446 async for x in _(signal, arepeat(0, n-1)): f(x); yield await asumprod(K, w)
[docs]
447def atabulate(f, start=0, step=1, /, *, await_=False): return amap(f, acount(start, step), await_=await_)
[docs]
448async def asum(it, start=0):
449 async for i in iter_to_agen(it): start += i
450 return start
[docs]
451async def aprod(it, start=1):
452 async for i in iter_to_agen(it): start *= i
453 return start
[docs]
454async def amatprod(it, start):
455 async for i in iter_to_agen(it): start @= i
456 return start
[docs]
457def atail(n, it, /): return aislice(it, max(0, len(it)-n), None)
[docs]
458async def to_tuple(it, /): return tuple(await to_list(it))
[docs]
459async def to_set(it, /, frozen=False): r = await A.transient_block(H.get_loop_and_set(), set, it) if type(it) in Z.s else {_ async for _ in iter_to_agen(it)}; return frozenset(r) if frozen else r
[docs]
460async def to_list(it, /): return await A.transient_block(H.get_loop_and_set(), list, it) if type(it) in Z.s else [_ async for _ in iter_to_agen(it)]
[docs]
461async def aconsume(it, n=None, _=H.check_methods):
462 if n == 0: return
463 if n: it = A.take(it, n, default=A.RAISE)
464 if _(it, '__iter__'): await H.get_loop_and_set().run_in_executor(H.create_executor(aconsume) if (E := getattr(aconsume, 'executor', None)) is None else E, deque, it, 0)
465 else:
466 async for _ in it: ...
[docs]
467def anth(it, n, default=_NO_DEFAULT): return anext(aislice(it, n, None), *H.filter_out(default, s=_NO_DEFAULT))
[docs]
468async def aallequal(it, key=_identity, strict=False):
469 async for _ in (I := agroupby(it, key)):
470 async for _ in I: return False
471 return True
472 if strict: raise ValueError('asyncutils.aallequal: iterable cannot be empty with strict=True')
473 return True
[docs]
474async def acombinations(it, r):
475 if r > (n := len(p := await to_tuple(it))): return
476 I = list(range(r)); yield p[:r]
477 while True:
478 for i in range(r-1, -1, -1):
479 if I[i] != i+n-r: break
480 else: return
481 I[i] += 1
482 for j in range(i+1, r): I[j] = I[j-1]+1
483 yield tuple(p[i] for i in I)
[docs]
484async def acombinations_with_replacement(it, r):
485 if not (n := len(p := await to_tuple(it))) and r: return
486 I = [0]*r; yield (p[0],)*r
487 while True:
488 for i in range(r-1, -1, -1):
489 if I[i] != n-1: break
490 else: return
491 I[i:] = (I[i]+1,)*(r-i); yield tuple(p[i] for i in I)
[docs]
492async def apermutations(it, r=None):
493 n = len(p := await to_tuple(it))
494 if (r := n if r is None else r) > n: return
495 I, C, x = list(range(n)), list(range(n, n-r, -1)), r-1; yield p[:r]
496 while n:
497 for i in range(x, -1, -1):
498 C[i] -= 1
499 if C[i]: I[i], I[-j] = I[-(j := C[i])], I[i]; yield tuple(p[i] for i in I[:r]); break
500 else: I[i:], C[i] = I[i+1:]+I[i:i+1], n-i
501 else: return
[docs]
502@aawgenf2agenf
503async def apowerset(it): s = await to_tuple(it); return aflatten(acombinations(s, r) for r in range(len(s)+1))
[docs]
504def aquantify(it, pred=bool): return asum(amap(pred, it))
[docs]
505def apadnone(it, _=A.achain): return _(it, arepeat(None)).__aiter__()
[docs]
506def agrouper(it, n, fillvalue=_NO_DEFAULT): I = (iter_to_agen(it),)*n; return azip(*I, strict=fillvalue is A.RAISE) if isinstance(fillvalue, type(A.RAISE)) else aziplongest(*I, fillvalue=fillvalue)
[docs]
507async def aroundrobin(*its):
508 I = (iter_to_agen(i) for i in its)
509 for i in range(len(its), 0, -1):
510 async for _ in (I := acycle(aislice(I, i))): yield await anext(_)
[docs]
511def aroundrobin2(*its): return afilter(partial(O.is_not, _NO_DEFAULT), aflatten(aziplongest(*its, fillvalue=_NO_DEFAULT)))
[docs]
512async def aunique_everseen(it, key=_identity):
513 A, a = (S := set()).add, (s := []).append
514 async for i in iter_to_agen(it):
515 k = key(i)
516 try:
517 if k not in S: A(k); yield i
518 except TypeError:
519 if k not in s: a(k); yield i
[docs]
520def aunique_justseen(it, key=_identity, _=O.itemgetter): return amap(_(0), agroupby(it)) if key is None else amap(anext, amap(_(1), agroupby(it, key)), await_=True)
[docs]
521@aawgenf2agenf
522async def aunique(it, key=None, reverse=False): return aunique_justseen(await asorted(it, key=key, reverse=reverse), key)
[docs]
523@aawgenf2agenf
524async def ancycles(it, n): return aflatten(arepeat(await to_tuple(it), n))
[docs]
525def apartition(pred, it):
526 if pred is None: pred = bool
527 async def agen(q, _=iter_to_agen(it).__anext__): # noqa: B008
528 p = q.popleft
529 while True:
530 while q: yield p()
531 try: (T if pred(v := await _()) else F).append(v)
532 except StopAsyncIteration: return
533 return map(agen, (F := deque(), T := deque()))
[docs]
534async def aiterexcept(f, exc, first=None):
535 if first is not None: yield await first()
536 with A.IgnoreErrors(exc):
537 while True: yield await f() # noqa: ASYNC119
[docs]
538async def ailen(it):
539 i = 0
540 async for _ in iter_to_agen(it): i += 1
541 return i
[docs]
542async def aiterate(f, start):
543 while True: yield start; start = await f(start)
[docs]
544async def asorted(it, *, key=_identity, reverse=False):
545 from heapq import heappop as g, heapify as f
546 b, a = (m := []).append, (r := []).append
547 async for i, j in aenumerate(it): b((key(j), i, j))
548 f(m)
549 while m: a(g(m))
550 if reverse: r.reverse()
551 return r
[docs]
552def acanonical(it): return asorted(it, key=id, reverse=True)
553async def _adpermfull(A, _):
554 while True:
555 yield tuple(A)
556 for i in range(_-2, -1, -1):
557 if A[i] < A[i+1]: break
558 else: return
559 for j in range(j := _-1, i, -1): # noqa: B020
560 if A[i] < A[j]: break
561 A[i], A[j] = A[j], A[i]; A[i+1:] = A[:i-_:-1]
562async def _adpermpartial(A, _):
563 h, R, l = A[:_], range(_-1, -1, -1), range(len(t := A[_:]))
564 while True:
565 yield tuple(h); p = t[-1]
566 for i in R:
567 if h[i] < p: break
568 p = h[i]
569 else: return
570 p = h[i]
571 for j in l:
572 if (c := t[j]) > p: h[i], t[j] = c, p; break
573 else:
574 for j in R:
575 if (c := h[j]) > p: h[i], h[j] = c, p; break
576 t += h[:-(x := _-i):-1]; i += 1; h[i:], t[:] = t[:x], t[x:]
[docs]
577async def empty_agen():
578 if False: yield
[docs]
579async def agives(x): yield x
[docs]
580@aawgenf2agenf
581async def adistinct_permutations(it, r=None, f=(_adpermpartial, _adpermfull)):
582 if (S := len(I := await to_list(it))) < (_ := S if r is None else r): return agives(())
583 if _ <= 0: return empty_agen()
584 a = f[_ == S]
585 try: I.sort(); return a(I, _)
586 except TypeError:
587 d = defaultdict(list)
588 for i in I: d[I.index(i)].append(i)
589 return amap(lambda i, E={k: acycle(v) for k, v in d.items()}: to_tuple(await anext(E[_]) for _ in i), a(await asorted(amap(I.index, I)), _), await_=True) # noqa: B008
[docs]
590async def aunique_to_each(*its):
591 p = frozenset(await to_list(amap(to_tuple, its, await_=True)))
592 for i, j in Counter(await to_list(aflatten(map(frozenset, p)))).items():
593 if j == 1 and i in p: yield i
[docs]
594async def aderangements(it, r=None):
595 async for _ in acompress(apermutations(X := await to_tuple(it), r), amap(aall, amap(partial(amap, O.is_not), arepeat(Y := tuple(range(len(X)))), apermutations(Y, r)), await_=True)): yield _
[docs]
596def aintersperse(e, it, n=1):
597 if n <= 0: raise ValueError('asyncutils.iters.aintersperse: n must be positive')
598 return aislice(ainterleave_stopearly(arepeat(e), it), 1, None) if n == 1 else aflatten(aislice(ainterleave_stopearly(arepeat((e,)), batch(it, n)), 1, None))
[docs]
599def ainterleave_stopearly(*its): return aflatten(azip(*its))
[docs]
600def aspy(it, n=1): p, q = tee(it, maxqsize=n); return A.take(q, n), p
[docs]
601async def ainterleave_evenly(its, lengths=None):
602 I = await to_tuple(its)
603 try:
604 if (X := len(I)) != len(L := await to_tuple(lengths or map(len, I))): raise ValueError('asyncutils.iters.ainterleave_evenly: mismatch in length of its and lengths')
605 except TypeError: raise ValueError('asyncutils.iters.ainterleave_evenly: cannot determine lengths of (async) iterables') from None
606 A, *a = map(f := L.__getitem__, _ := sorted(range(X), key=f, reverse=True)); B, *b = (iter_to_agen(I[i]) for i in _); E, t = [A//X]*len(a), sum(L)
607 while t:
608 yield await anext(B); t -= 1; E[:] = map(O.sub, E, a)
609 for i, e in enumerate(E):
610 if e < 0: yield await anext(b[i]); t -= 1; E[i] += A
[docs]
611async def ainterleave_randomly(its, _=_randrange):
612 x = len(I := await to_list(amap(iter_to_agen, its)))
613 while x:
614 i = _(x)
615 try: yield await anext(I[i])
616 except StopAsyncIteration: I[i] = I[-1]; del I[-1]; x -= 1
[docs]
617async def acollapse(it, base_typ=(str, bytes), levels=None):
618 if levels is None: levels = float('inf')
619 (g := (S := deque()).appendleft)((0, arepeat(iter_to_agen(it), 1))); f = S.popleft
620 while S:
621 l, n = N = f()
622 if l > levels:
623 async for i in n: yield i
624 continue
625 async for _ in n:
626 if isinstance(_, base_typ): yield _
627 else:
628 try: t = iter_to_agen(_); g((l+1, t)); g(N); break
629 except TypeError: yield _
[docs]
630def afirsttrue(it, default=_NO_DEFAULT, pred=None): return anext(afilter(pred, it), *H.filter_out(default, s=_NO_DEFAULT))
[docs]
631def aprepend(val, it, _=A.achain): return _((val,), it).__aiter__()
[docs]
632def aappend(val, it, _=A.achain): return _(it, (val,)).__aiter__()
[docs]
633async def arandomproduct(*a, n=1, _=_randinst.choice):
634 async for i in ancycles(amap(to_tuple, a, await_=True), n): yield _(i)
[docs]
635async def arandomcombination(it, r, _=_sample):
636 (_ := _(range(len(p := await to_tuple(it))), r)).sort()
637 for i in _: yield p[i]
[docs]
638@aawgenf2agenf
639async def arandom_combination_with_replacement(it, r, _=_randrange): return amap((p := await to_tuple(it)).__getitem__, await asorted(arepeatfunc(_, r, len(p))))
[docs]
640async def arandom_permutation(it, r=None, _=_sample):
641 p = await to_tuple(it)
642 if r is None: r = len(p)
643 for i in _(p, r): yield i
[docs]
644async def afirst(it, default=_NO_DEFAULT):
645 async for i in iter_to_agen(it): return i
646 if default is _NO_DEFAULT: raise ValueError('asyncutils.iters.afirst called on empty iterable without default value')
647 return default
[docs]
648async def alast(it, default=_NO_DEFAULT, _=H.check_methods):
649 try:
650 if _(it, '__getitem__'): return it[-1]
651 return (await to_list(it)).pop() if (f := getattr(it, '__reversed__', None)) is None else f().__next__()
652 except (IndexError, TypeError, StopIteration, StopAsyncIteration):
653 if default is _NO_DEFAULT: raise ValueError('asyncutils.iters.alast called on empty iterable without default value') from None
654 return default
[docs]
655def anth_or_last(it, n, default=_NO_DEFAULT): return alast(aislice(it, n+1), default)
[docs]
656def abefore_and_after(pred, it): a, b = tee(it); return acompress(atakewhile(pred, a), azip(b)), b
[docs]
657async def anthcombination(it, r, idx):
658 if not 0 <= r <= (n := len(p := await to_tuple(it))): raise IndexError(f'asyncutils.iters.anthcombination: r={r} is out of range')
659 c, k = 1, min(r, n-r)
660 for i in range(1, k+1): c = c*(n-k+i)//i
661 if idx < 0: idx += c
662 if idx < 0 or idx >= c: raise IndexError(f'asyncutils.iters.anthcombination: idx={idx} is out of range')
663 while r:
664 c, n, r = c*r//n, n-1, r-1
665 while idx >= c: idx -= c; c, n = c*(n-r)//n, n-1
666 yield p[~n]
[docs]
667@aawgenf2agenf
668async def asubslices(it): return astarmap(O.getitem, azip(arepeat(s := await to_tuple(it)), astarmap(slice, acombinations(range(len(s)+1), 2))))
[docs]
669async def arepeatfunc(f, times=None, *a):
670 async def g(i=A.ignore_typeerrs, _=partial(f, *a)): # noqa: B008
671 r = _()
672 with i: r = await r
673 async for _ in aloops(times): await g()
[docs]
674async def apolynomial_from_roots(roots, _=(1,)):
675 async for r in iter_to_agen(roots): _ = aconvolve(_, (1, -r))
676 async for i in iter_to_agen(_): yield i
[docs]
677@aawgenf2agenf
678async def atranspose(mat): return azip(*await to_list(mat), strict=True)
[docs]
679@aawgenf2agenf
680async def aflatten_tensor(tensor, base_typ=(str, bytes), _=H.check_methods):
681 I = iter_to_agen(tensor)
682 while True:
683 try: v = await anext(I)
684 except StopAsyncIteration: break
685 I = aprepend(v, I)
686 if isinstance(v, base_typ) or not (_(v, '__iter__') or _(v, '__aiter__')): break
687 I = aflatten(I)
688 return I
[docs]
689@aawgenf2agenf
690async def apolynomial_derivative(coeff): return amap(O.mul, r := await to_tuple(coeff), range(len(r)-1, 0))
[docs]
691async def apolynomial_eval(coeff, x):
692 if not (n := len(t := await to_tuple(coeff))): return type(x)(0)
693 return await asumprod(t, areversed(await A.collect(apowers(x), n-1)))
[docs]
694@aawgenf2agenf
695async def areshape(mat, shape):
696 if isinstance(shape, int): return batch(aflatten(mat), shape)
697 d = await anext(shape := iter_to_agen(shape)); return aislice(await A.areduce(batch, areversed(shape), aflatten_tensor(mat), await_=False), d)
698async def _factor_pollard(n):
699 if n == 4: return 2
700 async for b in arange(1, n):
701 x = y = 2; d = 1
702 while (d := M.gcd((x := (x*x+b)%n)-(y := ((z := (y*y+b)%n)*z+b)%n), n)) == 1: ...
703 if d != n: return d
704 raise ValueError(f'asyncutils.iters.afactor: internal error: {n} is prime')
705@lru_cache
706def _shift_to_odd(n):
707 if not ((1<<(s := ((n-1)^n).bit_length()-1))*(d := n>>s) == n and d&1 and s > -1): raise ValueError(f'asyncutils.iters.aisprime: internal error: {n} is invalid')
708 return s-1, d
709async def _probable_prime(n, base, _=_shift_to_odd):
710 s, d = _(m := n-1)
711 if (x := pow(base, d, n)) in {1, m}: return True
712 async for _ in aloops(s):
713 if (x := x*x%n) == m: return True
714 return False
[docs]
715async def aisprime(n, s=_smallprimes, p=_perfect_test, r=_randrange, f=_probable_prime):
716 if n < 210: return n in s
717 if not (n&1 and n%3 and n%5 and n%7 and n%11 and n%13 and n%17): return False
718 for l, _ in p:
719 if n < l: break
720 else: _ = arepeatfunc(r, 64, 2, n-1)
721 return await aall(amap(partial(f, n), _, await_=True))
[docs]
722async def afactor(n, _=_littleprimes, F=_factor_pollard):
723 if n < 1: raise ValueError('asyncutils.iters.afactor: no prime factors')
724 if n == 1: return
725 for p in _:
726 while not n%p: yield p; n //= p
727 if n == 1: return
728 e = (t := [n]).extend
729 for n in t:
730 if n < 44521 or await aisprime(n): yield n
731 else: e((f := await F(n), n//f))
[docs]
742async def arandom_derangement(it, _=_randinst.shuffle):
743 if (l := len(s := await to_tuple(it))) < 2:
744 if s: raise ValueError('asyncutils.iters.arandom_derangement: no derangements to choose from')
745 return ()
746 i = tuple(p := list(range(l)))
747 while any(map(O.is_, i, p)): _(p)
748 return O.itemgetter(*p)(s)
[docs]
749@aawgenf2agenf
750async def amatmul(*a):
751 M, N = map(iter_to_agen, a); N = aprepend(t := await to_tuple(await anext(N)), N)
752 return batch(astarmap(asumprod, aproduct(M, atranspose(N)), True), len(t))
[docs]
753@aawgenf2agenf
754async def mat_vec_mul(M, V): return amap(asumprod.__get__(await to_tuple(V)), amap(to_tuple, M, await_=True), await_=True)
[docs]
755async def vecs_eq(u, v, cmpeq=H.check, *, strict=True):
756 try: return await aall(amap(cmpeq, u, v, strict=strict))
757 except ValueError: return False
[docs]
758async def afreivalds(A, B, C, k=None, _r=_randrange): n = len(A := await to_tuple(A)); return await aall(await vecs_eq(mat_vec_mul(A, mat_vec_mul(B, r := await to_tuple(arepeatfunc(_r, n, 2)))), mat_vec_mul(C, r), int.__eq__) async for _ in aloops(getcontext().AFRIEVALDS_DEFAULT_K if k is None else k))
[docs]
759def basic_collect(*_): return to_list(aislice(*_) if len(_) > 1 else _[0])
[docs]
760async def asubstrings(it):
761 for i in (s := await to_tuple(it)): yield i,
762 async for n in arange(2, c := len(s)+1):
763 async for i in arange(c-n): yield s[i:i+n]
[docs]
764def asubstr_indices(seq, reverse=False):
765 r = range(1, x := len(seq)+1)
766 if reverse: r = reversed(r)
767 return ((seq[i:(j := i+L)], i, j) for L in r async for i in arange(x-L))
[docs]
768def iter_task(it, summaryf=aconsume):
769 async def task(f): t = f(); await summaryf(it); return f()-t
770 return (l := H.get_loop_and_set()).create_task(task(l.time))
[docs]
771def agetitems_from_indices(it, indices, setatend=None, finish=False, _='index %r beyond the ends of (async) iterable {!r}'.format, _c=A.achain):
772 L, r, it = H.get_loop_and_set(), [], iter_to_agen(it)
773 async def consume(f=r.append):
774 s, M, m, d = L.time(), 0, 0, defaultdict(list)
775 async for x in amap(O.index, indices):
776 if M is not None:
777 if x < 0: M, m = None, min(m, x)
778 else: M = max(x, M)
779 d[x].append(F := L.create_future()); f(F)
780 async def helper(i, j, d=d):
781 async for x, F in aenumerate(d.pop(i, ())):
782 if F.cancelled(): continue
783 if F.done(): raise A.FutureCorrupted(f'asyncutils.iters.agetitems_from_indices: future at index {x} associated with index {i} called on (async) iterable {it!r} had its result/exception set by an external party')
784 F.set_result(j)
785 try:
786 if M is None:
787 b = deque(maxlen=-m)
788 async def helper2(i, j): await helper(i, j); b.append(j)
789 await aconsume(_c(astarmap(helper2, aenumerate(it)), amap(helper, acount(-1, -1), A.adisembowelleft(b), await_=True)))
790 else: await aconsume(astarmap(helper, aenumerate(A.take(it, M)), True))
791 except A.CRITICAL: raise A.Critical
792 except BaseException as e:
793 async for F in afilterfalse(O.methodcaller('done'), r): F.set_exception(e)
794 raise
795 a = _(it)
796 for i, l in d.items():
797 e = IndexError(a%i)
798 async for x, F in aenumerate(l):
799 if not F.cancelled():
800 if F.done(): raise ExceptionGroup('asyncutils.iters.agetitems_from_indices: error while processing indices for which items were not successfully got', (e, A.FutureCorrupted(f'asyncutils.iters.agetitems_from_indices: future at index {x} associated with index {i} called on (async) iterable {it!r} had its result/exception set by an external party')))
801 F.set_exception(e)
802 if finish: await aconsume(it)
803 if setatend is None: return
804 if setatend.done() and not setatend.cancelled(): raise A.FutureCorrupted(f'asyncutils.iters.agetitems_from_indices: future setatend at {id(setatend):#x} (exact type {H.fullname(setatend)}) had its result set by an external party')
805 setatend.set_result(L.time()-s)
806 c = L.create_task(consume())
807 if setatend is not None: setatend.add_done_callback(lambda _: setattr(_, '__cancel', t := B.gather(A.safe_cancel(c), A.safe_cancel_batch(r))) or t.add_done_callback(lambda _: delattr(setatend, '__cancel')))
808 audit('asyncutils.iters.agetitems_from_indices', H.fullname(it)); return r
[docs]
809async def aintersend(i1, i2):
810 audit('asyncutils.iters.aintersend', H.fullname(i1), H.fullname(i2)); t = None, None; f, g = i1.asend, i2.asend
811 while True: yield (t := tuple(await B.gather(f(t[1]), g(t[0]))))
[docs]
812def asendstream(i1, i2): audit('asyncutils.iters.asendstream', H.fullname(i1), H.fullname(i2)); return amap(i1.asend, i2, await_=True)
[docs]
813async def acat(first=None):
814 audit('asyncutils.iters.acat', first)
815 while True: first = yield first
[docs]
816async def aforever():
817 audit('asyncutils.iters.aforever')
818 while True: yield
819async def _aguess(it, estlen, key, default, finish_event, cmp, _=_aextreme):
820 if (r := await _(A.take(I := iter_to_agen(it), M.ceil(estlen*A.RECIP_E)), key, _NO_DEFAULT, cmp)) is _NO_DEFAULT:
821 if default is _NO_DEFAULT: raise ValueError('empty (async) iterable passed to asyncutils.iters.aguessmax or asyncutils.iters.aguessmin with no default value')
822 return default
823 k = key(r)
824 try:
825 async for i in I:
826 if cmp(key(i), k): return i
827 return r
828 finally:
829 if not (finish_event is None or finish_event.is_set()): (t := (_ := H.get_loop_and_set().create_task)(aconsume(I))).add_done_callback(lambda _: finish_event.set()); _(finish_event.wait()).add_done_callback(t.cancel)
[docs]
830def aguessmax(it, estlen, *, key=_identity, default=_NO_DEFAULT, finish_event=None, _=_aguess): return _(it, estlen, key, default, finish_event, O.gt)
[docs]
831def aguessmin(it, estlen, *, key=_identity, default=_NO_DEFAULT, finish_event=None, _=_aguess): return _(it, estlen, key, default, finish_event, O.lt)
[docs]
832async def apowers_of_two(*, init=1, init_shift=0, shift=1):
833 init <<= init_shift
834 while True: yield init; init <<= shift
[docs]
835def apowers(base, start=1): return aprepend(start, arepeat(0)) if base == 0 else arepeat(start) if base == 1 else apowers_of_two(init=base, shift=base.bit_length()-1) if base.bit_count() == 1 else aaccumulate(arepeat(base), O.mul, initial=start)
[docs]
836async def areversed(it, /):
837 try:
838 async for i in iter_to_agen(reversed(it)): yield i
839 except TypeError:
840 f = (it := await to_list(it)).pop
841 while it: yield f()
[docs]
842async def arunlength_encode(it, /):
843 async for k, g in agroupby(it): yield k, ailen(g)
[docs]
844def arunlength_decode(it, /): return aflatten(astarmap(arepeat, it))
845async def _dfthelper(a, i=False, /): return await to_tuple(A.take(apowers(M.e**((1 if i else -1)*1j*M.tau/(N := len(a := await to_tuple(a))))), N)), N, a
[docs]
846async def adft(a, /, _=_dfthelper):
847 R, N, a = await _(a)
848 for k in range(N): yield M.sumprod(a, (R[k*i%N] for i in range(N)))
[docs]
849async def aidft(A, /, _=_dfthelper):
850 R, N, A = await _(A, True)
851 for k in range(N): yield M.sumprod(A, (R[k*n%N] for n in range(N)))/N
852async def _aargminmax(it, key, default, f): return (await f(aenumerate(amap(key, it)), key=O.itemgetter(1), default=default))[0]
[docs]
853def aargmin(it, key=_identity, default=-1, _=_aargminmax): return _(it, key, default, amin)
[docs]
854def aargmax(it, key=_identity, default=-1, _=_aargminmax): return _(it, key, default, amax)
[docs]
855def arunningmean(it): return amap(O.truediv, aaccumulate(it), acount(1))
[docs]
856@aawgenf2agenf
857async def apowersetofsets(it, *, frozen=True): S = tuple(dict.fromkeys(await to_list(amap(frozenset, azip(it))))); return aflatten(astarmap((frozenset if frozen else set).union, acombinations(S, r)) async for r in arange(len(S)+1))
[docs]
858async def aserialize(it):
859 l, n = B.Lock(), iter_to_agen(it).__anext__
860 while True:
861 async with l: x = await n()
862 yield x # noqa: RUF070
[docs]
863async def aonline_sorter(it, *, key=_identity, reverse=False, slow=None):
864 audit('asyncutils.iters.aonline_sorter', id(it)); c = Z if reverse else __import__('heapq')
865 if slow is None: slow = getcontext().AONLINESORTER_DEFAULT_SLOW
866 if (e := getattr(aonline_sorter, 'executor', None)) is None: e = H.create_executor(aonline_sorter)
867 q = partial(p := partial(H.get_loop_and_set().run_in_executor, e), key)
868 await p(c.heapify, it := [(await q(x) if slow else key(x), i, x) async for i, x in aenumerate(it)])
869 a, b, i = partial(c.heappop, it), partial(c.heappush, it), len(it)
870 while it:
871 if (j := (yield a()[2])) is not None: b(((await q(j)) if slow else key(j), i, j)); i += 1
872P.patch_function_signatures((aonline_sorter, 'it, *, key={}, reverse=False, slow=None'), (aside_effect, 'f, it, /, *, size=None, before=None, after=None'), (apolynomial_from_roots, 'roots'), (adistinct_permutations, 'it, r=None'), (abfs, _ := 'start, neighbours, *, include_start=True'), (adfs, _), (aaccumulate, 'it, func={}, *, initial=None'), (aconvolve, 'signal, kernel'), (aislice, 'it, /, *a'), (ainterleave_randomly, 'its'), (ahammingdist, 'i1, i2, /, cmpeq={}'), (aiter_idx, 'it, value, start=0, stop=None'), (amergesortedby, 'its, *, key={}, await_=False, reverse=False'), (amax, _ := '*it, key={}, default=_NO_DEFAULT'), (amin, _), (asample_weighted, _ := 'it, k, *, rrange={0}, rand={0}'), (asamplel, _), (arandomcombination, _ := 'it, r'), (arandom_combination_with_replacement, _), (asorted, 'it, *, key={}, reverse=False'), (aunique_justseen, _ := 'it, key={}'), (aunique_everseen, _), (agroupby, _), (vecs_eq, 'u, v, cmpeq={}, *, strict=True'), (adft, 'xarr, /'), (aidft, 'Xarr, /'), (aconsume, 'it, n=None'), (aallequal, 'it, key={}, strict=False'), (aprepend, 'val, it'), (arandomproduct, '*a, n=1'), (asattolo, 'it, /'), (aargmin, _ := 'it, key={}, default=-1'), (aargmax, _), (afactor, _ := 'n'), (agetitems_from_indices, 'it, indices, setatend=None, finish=False'), (alast, 'it, default=_NO_DEFAULT'), (aisprime, _), (aguessmax, _ := 'it, estlen, *, key={}, default=_NO_DEFAULT, finish_event=None'), (aguessmin, _), (aflatten, _ := 'it'), (arandom_derangement, _), (afreivalds, 'A, B, C, k=None'), (basic_collect, 'it, n'), (iter_task, 'it, summaryf={}'), (apadnone, 'it'), (aunzip, 'ait, put_batch=None, fillvalue={}'), (aflatten_tensor, 'tensor, base_typ={}'), (arandom_permutation, 'it, r=None'))
873del P, _asideeffect, _adpermpartial, _adpermfull, _atraverse, _aunzip_put, _aguess, _aargminmax, _aextreme, _factor_pollard, _shift_to_odd, _probable_prime, _dfthelper, _littleprimes, _randrange, _sample, _smallprimes, _perfect_test, _rand, _randinst, _identity, _