Source code for asyncutils.buckets

 1__lazy_modules__ = frozenset(('asyncio',))
 2from asyncutils import AsyncContextMixin, getcontext
 3from asyncutils._internal.helpers import LoopMixinBase, fullname
 4from asyncutils._internal.submodules import buckets_all as __all__
 5from asyncio import Lock, sleep
 6from sys import audit
 7from time import monotonic
[docs] 8class TokenBucket: 9 __slots__ = '_capacity', '_last_update', '_lock', '_rate', '_timer', '_tokens' 10 def __init__(self, rate, capacity, timer=monotonic): audit(fullname(self), rate, capacity); self._capacity = self._tokens = capacity; self._rate, self._lock, self._last_update, self._timer = rate, Lock(), timer(), timer
[docs] 11 async def consume(self, tokens=None): 12 if tokens is None: tokens = float(getcontext().TOKEN_BUCKET_DEFAULT_CONSUME_TOKENS) 13 async with self._lock: 14 e, self._last_update = (n := self._timer())-self._last_update, n; self._tokens = d = min(self._capacity, self._tokens+e*self._rate) 15 if (d := d-tokens) >= 0: self._tokens = d 16 else: await sleep(-d/self._rate); self._tokens = 0
17 @property 18 def capacity(self): return self._capacity
[docs] 19class LeakyBucket(AsyncContextMixin, LoopMixinBase): 20 __slots__ = '_capacity', '_drainer', '_efs', '_factor', '_last', '_leak', '_max_factor', '_min_factor', '_timer', '_tokens' 21 def __init__(self, capacity, leak, min_factor=None, max_factor=None, external_factor_settable=None, timer=monotonic): audit(fullname(self), capacity, leak); C = getcontext(); self._leak, self._last, self._drainer, self._factor, self._min_factor, self._max_factor, self._efs, self._timer = leak, timer(), None, 1.0, C.LEAKY_BUCKET_DEFAULT_MIN_FACTOR if min_factor is None else min_factor, C.LEAKY_BUCKET_DEFAULT_MAX_FACTOR if max_factor is None else max_factor, C.LEAKY_BUCKET_DEFAULT_EXT_CAN_SET_FACTOR if external_factor_settable is None else external_factor_settable, timer; self._capacity = self._tokens = capacity 22 def _adjust_from_params(self, a, b, c, d, /): 23 if (f := self._factor) < abs((n := min(f*b, self._max_factor) if (r := self._tokens/self._capacity) <= a else max(f*d, self._min_factor) if r >= c else min(f*1.05, 1) if f < 1 else f)-f)*100: self._factor = n 24 def _add_tokens(self, amount): 25 if (s := self._tokens+amount) <= self._capacity: self._tokens = s; return True 26 return False 27 def _set_tokens(self): e, self._last = (c := self._timer())-self._last, c; self._tokens = max(0, self._tokens-e*self._leak*self._factor) 28 async def _drain(self): 29 while True: self._set_tokens(); self._adjust(); await sleep(min(1/(self._leak*self._factor), 1))
[docs] 30 async def acquire(self, amount=None): self._set_tokens(); return self._add_tokens(getcontext().LEAKY_BUCKET_DEFAULT_ACQUIRE_TOKENS if amount is None else amount)
[docs] 31 async def wait_for_tokens(self, amount=None): 32 c = getcontext() 33 if amount is None: amount = c.LEAKY_BUCKET_DEFAULT_WAIT_FOR_TOKENS_TOKENS 34 w, m = 0.0, c.LEAKY_BUCKET_WAIT_FOR_TOKENS_TICK 35 while True: 36 self._set_tokens(); self._adjust() 37 if self._add_tokens(amount): return w 38 await sleep(_ := min(m, (amount-self._capacity+self._tokens)/(self._leak*self._factor))); w += _
39 @property 40 def factor(self): return self._factor 41 @factor.setter 42 def factor(self, value, /): 43 if self._efs: self._factor = max(self._min_factor, min(self._max_factor, value)) 44 else: raise ValueError(f'{fullname(self)}: external_factor_settable=True not passed; cannot set factor') 45 @factor.deleter 46 def factor(self): self._factor = 1 47 def _adjust(self): 48 c = self._capacity 49 for b, t in getcontext().LEAKY_BUCKET_ADJMAP: 50 if c >= b: return self._adjust_from_params(*t) 51 def __enter__(self): 52 if self._drainer is None: self._drainer = self.make(self._drain()) 53 return self
[docs] 54 def __exit__(self, /, *_): 55 if d := self._drainer: d.cancel(); self._drainer = None