Source code for asyncutils.func

  1# ty: ignore[unresolved-attribute]
  2__lazy_modules__ = frozenset(('asyncio', 'functools'))
  3from asyncutils.config import _randinst
  4from asyncutils.constants import _NO_DEFAULT
  5from asyncutils._internal import log, patch as P
  6from asyncutils._internal.helpers import fullname, get_loop_and_set
  7from asyncutils._internal.submodules import func_all as __all__
  8import asyncio as I, asyncutils as A
  9from collections import deque, namedtuple
 10from functools import partial, update_wrapper, wraps
 11from itertools import count, repeat
 12from sys import audit
 13from time import perf_counter
[docs] 14def acompose(*F, wrap_last=True, _=I.iscoroutine): 15 async def g(*a, **k): 16 if _(r := next(i := reversed(F))(*a, **k)): r = await r 17 for f in i: 18 if _(r := f(r)): r = await r 19 return r 20 if wrap_last: update_wrapper(g, F[-1]) 21 return g
[docs] 22async def areduce(f, it, initial=_NO_DEFAULT, *, await_=True): 23 async for _ in A.iter_to_agen(it): initial = _ if initial is _NO_DEFAULT else (await f(initial, _)) if await_ else f(initial, _) 24 return initial
[docs] 25def star(f, /): 26 async def g(a=(), k=None, /): return await f(*a, **(k or {})) 27 return wraps(f)(g)
[docs] 28def unstar(f, /): 29 async def g(*a, **k): return await f(a, k) 30 return wraps(f)(g)
[docs] 31def every(intvl, /, *, stop_when=None, count_f=True, verbose=False, stop_on_exc=True, wait_first=False, loop=None, max_iterations=None, timer=perf_counter, supplied_args=(), supplied_kwargs=None, default=_NO_DEFAULT, default_fname='<name unknown>', _='func.every: periodic coroutine %s reached the maximum of %d iterations'): 32 if loop is None: loop = get_loop_and_set() 33 def dec(f, /): 34 n = getattr(f, '__qualname__', default_fname) 35 if stop_when and stop_when.done(): log.warning('func.every: future to stop periodic coroutine %s is already done', n) 36 async def g(*a, **k): 37 log.debug('func.every: periodic task started'); q = default is _NO_DEFAULT; nonlocal stop_when 38 if stop_when is None: stop_when = loop.create_future() 39 if wait_first: await I.sleep(intvl) 40 for i in count() if max_iterations is None else range(max_iterations): 41 t = timer() 42 try: await f(*supplied_args, *a, **(supplied_kwargs or {}), **k) 43 except A.CRITICAL: raise A.Critical 44 except: 45 if stop_on_exc: 46 if stop_when.done(): return stop_when.result() 47 break 48 (log.error if verbose else log.warning)('func.every: error in periodic coroutine %s on iteration %d', n, i, exc_info=True) 49 try: return await I.wait_for(stop_when, intvl+t-timer() if count_f else intvl) 50 except I.CancelledError: 51 if stop_on_exc: break 52 (log.info if verbose else log.debug)('func.every: future to stop periodic coroutine %s was cancelled on iteration %d', n, i, exc_info=True); stop_when = loop.create_future() 53 except TimeoutError: continue 54 else: 55 T = n, max_iterations 56 if stop_on_exc or default is A.RAISE: raise A.MaxIterationsError(_%T) 57 (log.info if verbose or q else log.debug)(_, *T) 58 if not q: return default 59 return wraps(f)(g) 60 return dec
[docs] 61def everymethod(intvl, /, *, stop_when_getter=None, count_f=True, verbose=False, stop_on_exc=True, wait_first=False, loop=None, max_iterations=None, timer=perf_counter, supplied_args=(), supplied_kwargs=None, default=_NO_DEFAULT, default_fname='<name unknown>', _='func.everymethod: periodic coroutine %s reached the maximum of %d iterations'): 62 if loop is None: loop = get_loop_and_set() 63 def dec(f, /): 64 n = getattr(f, '__qualname__', default_fname) 65 async def g(self, /, *a, **k): 66 log.debug('func.everymethod: periodic task started'); q = default is _NO_DEFAULT 67 if (stop_when := loop.create_future() if stop_when_getter is None else stop_when_getter(self)).done(): log.warning('func.everymethod: future to stop periodic coroutine %s is already done', n) 68 if wait_first: await I.sleep(intvl) 69 for i in count() if max_iterations is None else range(max_iterations): 70 t = timer() 71 try: await f(self, *supplied_args, *a, **(supplied_kwargs or {}), **k) 72 except A.CRITICAL: raise A.Critical 73 except: 74 if stop_on_exc: 75 if stop_when.done(): return stop_when.result() 76 break 77 (log.error if verbose else log.warning)('func.everymethod: error in periodic coroutine %s on iteration %d', n, i, exc_info=True) 78 try: return await I.wait_for(stop_when, intvl+t-timer() if count_f else intvl) 79 except I.CancelledError: 80 if stop_on_exc: break 81 (log.info if verbose else log.debug)('func.everymethod: future to stop periodic coroutine %s was cancelled on iteration %d', n, i, exc_info=True); stop_when = loop.create_future() 82 except TimeoutError: continue 83 else: 84 T = n, max_iterations 85 if stop_on_exc or default is A.RAISE: raise A.MaxIterationsError(_%T) 86 (log.info if verbose or q else log.debug)(_, *T) 87 if not q: return default 88 return wraps(f)(g) 89 return dec
[docs] 90def timer(f, /, *, precision=None, expected=Exception, should_log=True, timer=perf_counter, ns=False, _='nano', c='func.timer: function %s finished in %.*f %sseconds.', d='func.timer: received expected error from function %s after %.*f %sseconds: %s'): 91 if precision is None: precision = A.getcontext().TIMER_DEFAULT_PRECISION-ns*9 92 async def g(*a, **k): 93 s = timer() 94 try: 95 r = await f(*a, **k); e = timer()-s 96 if should_log: log.info(c, fullname(f), precision, e, _ if ns else '') 97 return r, e 98 except A.CRITICAL: raise A.Critical 99 except expected as b: 100 e = timer()-s 101 if should_log: log.warning(d, fullname(f), precision, e, _ if ns else '', b, exc_info=True) 102 return A.wrap_exc(b), e 103 return wraps(f)(g)
[docs] 104def retry(tries=None, delay=None, *, max_delay=None, backoff=None, jitter=None, exc=Exception, on_retry=(_ := lambda *_: None), on_success=_, random=_randinst.random): 105 c = A.getcontext() 106 if tries is None: tries = c.RETRY_DEFAULT_TRIES 107 if delay is None: delay = c.RETRY_DEFAULT_DELAY 108 if backoff is None: backoff = c.RETRY_DEFAULT_BACKOFF 109 if max_delay is None: max_delay = c.RETRY_DEFAULT_MAX_DELAY 110 if jitter is None: jitter = c.RETRY_DEFAULT_JITTER 111 def dec(f): 112 async def g(*a, **k): 113 c, l, b = 0, 0.0, 1 114 for i in range(tries-1): 115 try: r = await f(*a, **k) 116 except exc as e: 117 c += 1 118 if I.iscoroutine(t := on_retry(i, e)): await t 119 await I.sleep(l := min(max(delay*b+(c*delay)*(1+(random()*2-1)*jitter), delay), max_delay)); b *= backoff 120 else: 121 if I.iscoroutine(t := on_success(i, l)): await t 122 return r 123 return await f(*a, **k) 124 return wraps(f)(g) 125 return dec
[docs] 126def throttle(lim, timer=perf_counter): 127 l = 0.0 128 def dec(f, /): 129 async def g(*a, **k): 130 nonlocal l 131 if w := max(0, 1/lim-timer()+l): await I.sleep(w) 132 l = timer(); return await f(*a, **k) 133 return wraps(f)(g) 134 return dec
[docs] 135def debounce(wait): 136 def dec(f, /, l=None): 137 (L := get_loop_and_set()).set_task_factory(I.eager_task_factory); g, h = L.create_task, I.sleep.__get__(wait) 138 async def j(*a, **k): 139 nonlocal l 140 if l: await A.safe_cancel(l) 141 l = g(h()) 142 with A.ignore_cancellation: await l; return await f(*a, **k) 143 return wraps(f)(j) 144 return dec
[docs] 145def iterf(n, /): 146 def dec(f, /): 147 async def g(x, /): 148 for _ in repeat(None, n): x = await f(x) 149 return x 150 return wraps(f)(g) 151 return dec
[docs] 152async def measure(f, /, *, timer=perf_counter): s = timer(); return await f(), timer()-s
[docs] 153async def measure2(f, /, **k): return (await measure(f, **k))[1]
[docs] 154async def benchmark(f, /, times=None, warmup=None, _f=namedtuple('BenchmarkResult', 'min max total avg iterations', module='asyncutils.func'), *, sequential=None): 155 c, g = A.getcontext(), measure2.__get__(f) 156 if sequential is None: sequential = c.BENCHMARK_DEFAULT_SEQUENTIAL 157 if times is None: times = c.BENCHMARK_DEFAULT_TIMES 158 if warmup is None: warmup = c.BENCHMARK_DEFAULT_WARMUP 159 if sequential: 160 for _ in repeat(None, warmup): await f() 161 else: await I.gather(*(f() for _ in repeat(None, warmup))) 162 audit('asyncutils.func.benchmark', fullname(f), T := times+warmup); return _f(min(t := [await g() for _ in repeat(None, times)] if sequential else await I.gather(*(g() for _ in repeat(None, times)))), max(t), S := sum(t), S/times, T)
163P.patch_function_signatures((measure, _ := 'f, /, *, timer={}'), (measure, _), (benchmark, 'f, /, times=None, warmup=None'))
[docs] 164class RateLimited: 165 __slots__ = '_call_times', '_calls', '_func', '_lock', '_period', '_raise', '_timer' 166 def __new__(cls, f, /, calls, period=None, *, raise_=False, timer=perf_counter, lock_impl=None): 167 if period is None: return partial(cls, calls=f, period=calls, raise_=raise_, timer=timer, lock_impl=lock_impl) 168 audit('asyncutils.func.RateLimited', fullname(f), calls, period); (_ := super().__new__(cls))._func, _._period, _._call_times, _._lock, _._calls, _._raise, _._timer = f, float(period), deque(), (I.Lock if lock_impl is None else lock_impl)(), int(calls), raise_, timer; return _
[docs] 169 async def __call__(self, *a, **k): 170 p, m, P, C, f = (T := self._call_times).popleft, T.appendleft, self._period, self._calls, self._func 171 async with self._lock: 172 d = (n := self._timer())-P 173 while T: 174 if (x := p()) > d: m(x); break 175 if (l := len(T)-self._calls+1) > 0: 176 if self._raise: raise A.RateLimitExceeded(f, a, k, C, P, l) 177 await I.sleep(p()-d) 178 T.append(n) 179 return await f(*a, **k)
180 def __repr__(self): return f'{fullname(self)}({self._func!r}, {self._calls}, {self._period:.6f}, raise_={self._raise}, timer={self._timer!r}, lock_impl={fullname(self._lock)})' 181 P.patch_classmethod_signatures((__new__, 'f, /, calls, period=None, *, raise_=False, timer={}, lock_impl=None'))
182del _, perf_counter, P