1import asyncutils as A
2from asyncutils.constants import _NO_DEFAULT
3from asyncutils._internal import helpers as H, patch as P
4from asyncutils._internal.submodules import iterclasses_all as __all__
5from _collections import defaultdict, deque
6from sys import maxsize as INF
[docs]
7@H.subscriptable
8class achain:
9 __slots__ = '__its',
[docs]
10 @classmethod
11 def from_iterable(cls, it_of_its): (self := super().__new__(cls)).__its = A.amap(A.iter_to_agen, it_of_its); return self
12 def __new__(cls, *its): return cls.from_iterable(its)
[docs]
13 async def __aiter__(self):
14 async for i in self.__its: # ty: ignore[unresolved-attribute]
15 async for _ in i: yield _
[docs]
16@H.subscriptable
17class apeekable(H.LoopMixinBase):
18 __slots__ = '__cache', '__it'
19 def __init__(self, it=()): self.__it, self.__cache = A.iter_to_agen(it), deque(); super().__init__()
[docs]
20 def __aiter__(self): return self
[docs]
21 async def can_peek(self):
22 try: await self.peek(); return True
23 except StopAsyncIteration: return False
[docs]
24 async def peek(self, default=_NO_DEFAULT):
25 if not (c := self.__cache):
26 try: c.append(await anext(self.__it))
27 except StopAsyncIteration:
28 if default is _NO_DEFAULT: raise
29 return default
30 return c[0]
[docs]
31 def prepend(self, /, *i): self.__cache.extendleft(reversed(i))
[docs]
32 async def __anext__(self):
33 if (c := self.__cache): return c.popleft()
34 return await anext(self.__it)
[docs]
35 async def __getitem__(self, i, /, _=~INF):
36 f = (C := self.__cache).append
37 if isinstance(i, slice):
38 if (c := 1 if (s := i.step) is None else int(s)) > 0: a, b = 0 if (s := i.start) is None else int(s), INF if (s := i.stop) is None else int(s)
39 elif c < 0: a, b = -1 if (s := i.start) is None else int(s), _ if (s := i.stop) is None else int(s)
40 else: raise ValueError('asyncutils.iterclasses.apeekable: slice step cannot be zero')
41 if a < 0 or b < 0:
42 async for s in A.iter_to_agen(self.__it): f(s)
43 elif (d := min(max(a, b)+1, INF)-len(C)) >= 0:
44 async for s in A.take(self.__it, d): f(s)
45 return tuple(C)[a:b:c]
46 async for s in A.iter_to_agen(self.__it) if (i := i.__index__()) < 0 else A.empty_agen() if i < (l := len(C)) else A.take(self.__it, i-l+1): f(s)
47 return C[i]
48 P.patch_method_signatures((__getitem__, 'idx, /'))
[docs]
49@H.subscriptable
50class abucket:
51 __slots__ = '__cache', '__it', '__key', '__validator'
52 def __init__(self, it, key, validator=None): super().__init__(); self.__it, self.__key, self.__cache, self.__validator = A.iter_to_agen(it), key, defaultdict(deque), validator or (lambda _: True)
[docs]
53 async def contains(self, k, /):
54 if not self.__validator(k): return False
55 try: i = await anext(self[k])
56 except StopAsyncIteration: return False
57 self.__cache[k].append(i); return True
[docs]
58 async def __aiter__(self):
59 K, V, C = self.__key, self.__validator, self.__cache
60 async for i in self.__it:
61 if V(k := K(i)): C[k].append(i)
62 for k in C: yield k
[docs]
63 async def __getitem__(self, k, /):
64 if not (V := self.__validator)(k): return
65 p, I, K = (a := (C := self.__cache)[k]).popleft, self.__it, self.__key
66 while True:
67 if a: yield p(); continue
68 while True:
69 try: i = await anext(I)
70 except StopAsyncIteration:
71 if not a: del C[k]
72 return
73 if (c := K(i)) == k: yield i; break
74 elif V(c): C[c].append(i)
75del P