1# ty: ignore[unresolved-attribute]
2__lazy_modules__ = frozenset(('asyncio', 'functools'))
3from asyncutils.config import _randinst
4from asyncutils.constants import _NO_DEFAULT
5from asyncutils._internal import log, patch as P
6from asyncutils._internal.helpers import fullname, get_loop_and_set
7from asyncutils._internal.submodules import func_all as __all__
8import asyncio as I, asyncutils as A
9from collections import deque, namedtuple
10from functools import partial, update_wrapper, wraps
11from itertools import count, repeat
12from sys import audit
13from time import perf_counter
[docs]
14def acompose(*F, wrap_last=True, _=I.iscoroutine):
15 async def g(*a, **k):
16 if _(r := next(i := reversed(F))(*a, **k)): r = await r
17 for f in i:
18 if _(r := f(r)): r = await r
19 return r
20 if wrap_last: update_wrapper(g, F[-1])
21 return g
[docs]
22async def areduce(f, it, initial=_NO_DEFAULT, *, await_=True):
23 async for _ in A.iter_to_agen(it): initial = _ if initial is _NO_DEFAULT else (await f(initial, _)) if await_ else f(initial, _)
24 return initial
[docs]
25def star(f, /):
26 async def g(a=(), k=None, /): return await f(*a, **(k or {}))
27 return wraps(f)(g)
[docs]
28def unstar(f, /):
29 async def g(*a, **k): return await f(a, k)
30 return wraps(f)(g)
[docs]
31def every(intvl, /, *, stop_when=None, count_f=True, verbose=False, stop_on_exc=True, wait_first=False, loop=None, max_iterations=None, timer=perf_counter, supplied_args=(), supplied_kwargs=None, default=_NO_DEFAULT, default_fname='<name unknown>', _='func.every: periodic coroutine %s reached the maximum of %d iterations'):
32 if loop is None: loop = get_loop_and_set()
33 def dec(f, /):
34 n = getattr(f, '__qualname__', default_fname)
35 if stop_when and stop_when.done(): log.warning('func.every: future to stop periodic coroutine %s is already done', n)
36 async def g(*a, **k):
37 log.debug('func.every: periodic task started'); q = default is _NO_DEFAULT; nonlocal stop_when
38 if stop_when is None: stop_when = loop.create_future()
39 if wait_first: await I.sleep(intvl)
40 for i in count() if max_iterations is None else range(max_iterations):
41 t = timer()
42 try: await f(*supplied_args, *a, **(supplied_kwargs or {}), **k)
43 except A.CRITICAL: raise A.Critical
44 except:
45 if stop_on_exc:
46 if stop_when.done(): return stop_when.result()
47 break
48 (log.error if verbose else log.warning)('func.every: error in periodic coroutine %s on iteration %d', n, i, exc_info=True)
49 try: return await I.wait_for(stop_when, intvl+t-timer() if count_f else intvl)
50 except I.CancelledError:
51 if stop_on_exc: break
52 (log.info if verbose else log.debug)('func.every: future to stop periodic coroutine %s was cancelled on iteration %d', n, i, exc_info=True); stop_when = loop.create_future()
53 except TimeoutError: continue
54 else:
55 T = n, max_iterations
56 if stop_on_exc or default is A.RAISE: raise A.MaxIterationsError(_%T)
57 (log.info if verbose or q else log.debug)(_, *T)
58 if not q: return default
59 return wraps(f)(g)
60 return dec
[docs]
61def everymethod(intvl, /, *, stop_when_getter=None, count_f=True, verbose=False, stop_on_exc=True, wait_first=False, loop=None, max_iterations=None, timer=perf_counter, supplied_args=(), supplied_kwargs=None, default=_NO_DEFAULT, default_fname='<name unknown>', _='func.everymethod: periodic coroutine %s reached the maximum of %d iterations'):
62 if loop is None: loop = get_loop_and_set()
63 def dec(f, /):
64 n = getattr(f, '__qualname__', default_fname)
65 async def g(self, /, *a, **k):
66 log.debug('func.everymethod: periodic task started'); q = default is _NO_DEFAULT
67 if (stop_when := loop.create_future() if stop_when_getter is None else stop_when_getter(self)).done(): log.warning('func.everymethod: future to stop periodic coroutine %s is already done', n)
68 if wait_first: await I.sleep(intvl)
69 for i in count() if max_iterations is None else range(max_iterations):
70 t = timer()
71 try: await f(self, *supplied_args, *a, **(supplied_kwargs or {}), **k)
72 except A.CRITICAL: raise A.Critical
73 except:
74 if stop_on_exc:
75 if stop_when.done(): return stop_when.result()
76 break
77 (log.error if verbose else log.warning)('func.everymethod: error in periodic coroutine %s on iteration %d', n, i, exc_info=True)
78 try: return await I.wait_for(stop_when, intvl+t-timer() if count_f else intvl)
79 except I.CancelledError:
80 if stop_on_exc: break
81 (log.info if verbose else log.debug)('func.everymethod: future to stop periodic coroutine %s was cancelled on iteration %d', n, i, exc_info=True); stop_when = loop.create_future()
82 except TimeoutError: continue
83 else:
84 T = n, max_iterations
85 if stop_on_exc or default is A.RAISE: raise A.MaxIterationsError(_%T)
86 (log.info if verbose or q else log.debug)(_, *T)
87 if not q: return default
88 return wraps(f)(g)
89 return dec
[docs]
90def timer(f, /, *, precision=None, expected=Exception, should_log=True, timer=perf_counter, ns=False, _='nano', c='func.timer: function %s finished in %.*f %sseconds.', d='func.timer: received expected error from function %s after %.*f %sseconds: %s'):
91 if precision is None: precision = A.getcontext().TIMER_DEFAULT_PRECISION-ns*9
92 async def g(*a, **k):
93 s = timer()
94 try:
95 r = await f(*a, **k); e = timer()-s
96 if should_log: log.info(c, fullname(f), precision, e, _ if ns else '')
97 return r, e
98 except A.CRITICAL: raise A.Critical
99 except expected as b:
100 e = timer()-s
101 if should_log: log.warning(d, fullname(f), precision, e, _ if ns else '', b, exc_info=True)
102 return A.wrap_exc(b), e
103 return wraps(f)(g)
[docs]
104def retry(tries=None, delay=None, *, max_delay=None, backoff=None, jitter=None, exc=Exception, on_retry=(_ := lambda *_: None), on_success=_, random=_randinst.random):
105 c = A.getcontext()
106 if tries is None: tries = c.RETRY_DEFAULT_TRIES
107 if delay is None: delay = c.RETRY_DEFAULT_DELAY
108 if backoff is None: backoff = c.RETRY_DEFAULT_BACKOFF
109 if max_delay is None: max_delay = c.RETRY_DEFAULT_MAX_DELAY
110 if jitter is None: jitter = c.RETRY_DEFAULT_JITTER
111 def dec(f):
112 async def g(*a, **k):
113 c, l, b = 0, 0.0, 1
114 for i in range(tries-1):
115 try: r = await f(*a, **k)
116 except exc as e:
117 c += 1
118 if I.iscoroutine(t := on_retry(i, e)): await t
119 await I.sleep(l := min(max(delay*b+(c*delay)*(1+(random()*2-1)*jitter), delay), max_delay)); b *= backoff
120 else:
121 if I.iscoroutine(t := on_success(i, l)): await t
122 return r
123 return await f(*a, **k)
124 return wraps(f)(g)
125 return dec
[docs]
126def throttle(lim, timer=perf_counter):
127 l = 0.0
128 def dec(f, /):
129 async def g(*a, **k):
130 nonlocal l
131 if w := max(0, 1/lim-timer()+l): await I.sleep(w)
132 l = timer(); return await f(*a, **k)
133 return wraps(f)(g)
134 return dec
[docs]
135def debounce(wait):
136 def dec(f, /, l=None):
137 (L := get_loop_and_set()).set_task_factory(I.eager_task_factory); g, h = L.create_task, I.sleep.__get__(wait)
138 async def j(*a, **k):
139 nonlocal l
140 if l: await A.safe_cancel(l)
141 l = g(h())
142 with A.ignore_cancellation: await l; return await f(*a, **k)
143 return wraps(f)(j)
144 return dec
[docs]
145def iterf(n, /):
146 def dec(f, /):
147 async def g(x, /):
148 for _ in repeat(None, n): x = await f(x)
149 return x
150 return wraps(f)(g)
151 return dec
[docs]
152async def measure(f, /, *, timer=perf_counter): s = timer(); return await f(), timer()-s
[docs]
153async def measure2(f, /, **k): return (await measure(f, **k))[1]
[docs]
154async def benchmark(f, /, times=None, warmup=None, _f=namedtuple('BenchmarkResult', 'min max total avg iterations', module='asyncutils.func'), *, sequential=None):
155 c, g = A.getcontext(), measure2.__get__(f)
156 if sequential is None: sequential = c.BENCHMARK_DEFAULT_SEQUENTIAL
157 if times is None: times = c.BENCHMARK_DEFAULT_TIMES
158 if warmup is None: warmup = c.BENCHMARK_DEFAULT_WARMUP
159 if sequential:
160 for _ in repeat(None, warmup): await f()
161 else: await I.gather(*(f() for _ in repeat(None, warmup)))
162 audit('asyncutils.func.benchmark', fullname(f), T := times+warmup); return _f(min(t := [await g() for _ in repeat(None, times)] if sequential else await I.gather(*(g() for _ in repeat(None, times)))), max(t), S := sum(t), S/times, T)
163P.patch_function_signatures((measure, _ := 'f, /, *, timer={}'), (measure, _), (benchmark, 'f, /, times=None, warmup=None'))
[docs]
164class RateLimited:
165 __slots__ = '_call_times', '_calls', '_func', '_lock', '_period', '_raise', '_timer'
166 def __new__(cls, f, /, calls, period=None, *, raise_=False, timer=perf_counter, lock_impl=None):
167 if period is None: return partial(cls, calls=f, period=calls, raise_=raise_, timer=timer, lock_impl=lock_impl)
168 audit('asyncutils.func.RateLimited', fullname(f), calls, period); (_ := super().__new__(cls))._func, _._period, _._call_times, _._lock, _._calls, _._raise, _._timer = f, float(period), deque(), (I.Lock if lock_impl is None else lock_impl)(), int(calls), raise_, timer; return _
[docs]
169 async def __call__(self, *a, **k):
170 p, m, P, C, f = (T := self._call_times).popleft, T.appendleft, self._period, self._calls, self._func
171 async with self._lock:
172 d = (n := self._timer())-P
173 while T:
174 if (x := p()) > d: m(x); break
175 if (l := len(T)-self._calls+1) > 0:
176 if self._raise: raise A.RateLimitExceeded(f, a, k, C, P, l)
177 await I.sleep(p()-d)
178 T.append(n)
179 return await f(*a, **k)
180 def __repr__(self): return f'{fullname(self)}({self._func!r}, {self._calls}, {self._period:.6f}, raise_={self._raise}, timer={self._timer!r}, lock_impl={fullname(self._lock)})'
181 P.patch_classmethod_signatures((__new__, 'f, /, calls, period=None, *, raise_=False, timer={}, lock_impl=None'))
182del _, perf_counter, P