Source code for asyncutils.iterclasses

 1import asyncutils as A
 2from asyncutils.constants import _NO_DEFAULT
 3from asyncutils._internal import helpers as H, patch as P
 4from asyncutils._internal.submodules import iterclasses_all as __all__
 5from _collections import defaultdict, deque
 6from sys import maxsize as INF
[docs] 7@H.subscriptable 8class achain: 9 __slots__ = '__its',
[docs] 10 @classmethod 11 def from_iterable(cls, it_of_its): (self := super().__new__(cls)).__its = A.amap(A.iter_to_agen, it_of_its); return self
12 def __new__(cls, *its): return cls.from_iterable(its)
[docs] 13 async def __aiter__(self): 14 async for i in self.__its: # ty: ignore[unresolved-attribute] 15 async for _ in i: yield _
[docs] 16@H.subscriptable 17class apeekable(H.LoopMixinBase): 18 __slots__ = '__cache', '__it' 19 def __init__(self, it=()): self.__it, self.__cache = A.iter_to_agen(it), deque(); super().__init__()
[docs] 20 def __aiter__(self): return self
[docs] 21 async def can_peek(self): 22 try: await self.peek(); return True 23 except StopAsyncIteration: return False
[docs] 24 async def peek(self, default=_NO_DEFAULT): 25 if not (c := self.__cache): 26 try: c.append(await anext(self.__it)) 27 except StopAsyncIteration: 28 if default is _NO_DEFAULT: raise 29 return default 30 return c[0]
[docs] 31 def prepend(self, /, *i): self.__cache.extendleft(reversed(i))
[docs] 32 async def __anext__(self): 33 if (c := self.__cache): return c.popleft() 34 return await anext(self.__it)
[docs] 35 async def __getitem__(self, i, /, _=~INF): 36 f = (C := self.__cache).append 37 if isinstance(i, slice): 38 if (c := 1 if (s := i.step) is None else int(s)) > 0: a, b = 0 if (s := i.start) is None else int(s), INF if (s := i.stop) is None else int(s) 39 elif c < 0: a, b = -1 if (s := i.start) is None else int(s), _ if (s := i.stop) is None else int(s) 40 else: raise ValueError('asyncutils.iterclasses.apeekable: slice step cannot be zero') 41 if a < 0 or b < 0: 42 async for s in A.iter_to_agen(self.__it): f(s) 43 elif (d := min(max(a, b)+1, INF)-len(C)) >= 0: 44 async for s in A.take(self.__it, d): f(s) 45 return tuple(C)[a:b:c] 46 async for s in A.iter_to_agen(self.__it) if (i := i.__index__()) < 0 else A.empty_agen() if i < (l := len(C)) else A.take(self.__it, i-l+1): f(s) 47 return C[i]
48 P.patch_method_signatures((__getitem__, 'idx, /'))
[docs] 49@H.subscriptable 50class abucket: 51 __slots__ = '__cache', '__it', '__key', '__validator' 52 def __init__(self, it, key, validator=None): super().__init__(); self.__it, self.__key, self.__cache, self.__validator = A.iter_to_agen(it), key, defaultdict(deque), validator or (lambda _: True)
[docs] 53 async def contains(self, k, /): 54 if not self.__validator(k): return False 55 try: i = await anext(self[k]) 56 except StopAsyncIteration: return False 57 self.__cache[k].append(i); return True
[docs] 58 async def __aiter__(self): 59 K, V, C = self.__key, self.__validator, self.__cache 60 async for i in self.__it: 61 if V(k := K(i)): C[k].append(i) 62 for k in C: yield k
[docs] 63 async def __getitem__(self, k, /): 64 if not (V := self.__validator)(k): return 65 p, I, K = (a := (C := self.__cache)[k]).popleft, self.__it, self.__key 66 while True: 67 if a: yield p(); continue 68 while True: 69 try: i = await anext(I) 70 except StopAsyncIteration: 71 if not a: del C[k] 72 return 73 if (c := K(i)) == k: yield i; break 74 elif V(c): C[c].append(i)
75del P