1__lazy_modules__ = frozenset(('asyncio',))
2from asyncutils import AsyncContextMixin, getcontext
3from asyncutils._internal.helpers import LoopMixinBase, fullname
4from asyncutils._internal.submodules import buckets_all as __all__
5from asyncio import Lock, sleep
6from sys import audit
7from time import monotonic
[docs]
8class TokenBucket:
9 __slots__ = '_capacity', '_last_update', '_lock', '_rate', '_timer', '_tokens'
10 def __init__(self, rate, capacity, timer=monotonic): audit(fullname(self), rate, capacity); self._capacity = self._tokens = capacity; self._rate, self._lock, self._last_update, self._timer = rate, Lock(), timer(), timer
[docs]
11 async def consume(self, tokens=None):
12 if tokens is None: tokens = float(getcontext().TOKEN_BUCKET_DEFAULT_CONSUME_TOKENS)
13 async with self._lock:
14 e, self._last_update = (n := self._timer())-self._last_update, n; self._tokens = d = min(self._capacity, self._tokens+e*self._rate)
15 if (d := d-tokens) >= 0: self._tokens = d
16 else: await sleep(-d/self._rate); self._tokens = 0
17 @property
18 def capacity(self): return self._capacity
[docs]
19class LeakyBucket(AsyncContextMixin, LoopMixinBase):
20 __slots__ = '_capacity', '_drainer', '_efs', '_factor', '_last', '_leak', '_max_factor', '_min_factor', '_timer', '_tokens'
21 def __init__(self, capacity, leak, min_factor=None, max_factor=None, external_factor_settable=None, timer=monotonic): audit(fullname(self), capacity, leak); C = getcontext(); self._leak, self._last, self._drainer, self._factor, self._min_factor, self._max_factor, self._efs, self._timer = leak, timer(), None, 1.0, C.LEAKY_BUCKET_DEFAULT_MIN_FACTOR if min_factor is None else min_factor, C.LEAKY_BUCKET_DEFAULT_MAX_FACTOR if max_factor is None else max_factor, C.LEAKY_BUCKET_DEFAULT_EXT_CAN_SET_FACTOR if external_factor_settable is None else external_factor_settable, timer; self._capacity = self._tokens = capacity
22 def _adjust_from_params(self, a, b, c, d, /):
23 if (f := self._factor) < abs((n := min(f*b, self._max_factor) if (r := self._tokens/self._capacity) <= a else max(f*d, self._min_factor) if r >= c else min(f*1.05, 1) if f < 1 else f)-f)*100: self._factor = n
24 def _add_tokens(self, amount):
25 if (s := self._tokens+amount) <= self._capacity: self._tokens = s; return True
26 return False
27 def _set_tokens(self): e, self._last = (c := self._timer())-self._last, c; self._tokens = max(0, self._tokens-e*self._leak*self._factor)
28 async def _drain(self):
29 while True: self._set_tokens(); self._adjust(); await sleep(min(1/(self._leak*self._factor), 1))
[docs]
30 async def acquire(self, amount=None): self._set_tokens(); return self._add_tokens(getcontext().LEAKY_BUCKET_DEFAULT_ACQUIRE_TOKENS if amount is None else amount)
[docs]
31 async def wait_for_tokens(self, amount=None):
32 c = getcontext()
33 if amount is None: amount = c.LEAKY_BUCKET_DEFAULT_WAIT_FOR_TOKENS_TOKENS
34 w, m = 0.0, c.LEAKY_BUCKET_WAIT_FOR_TOKENS_TICK
35 while True:
36 self._set_tokens(); self._adjust()
37 if self._add_tokens(amount): return w
38 await sleep(_ := min(m, (amount-self._capacity+self._tokens)/(self._leak*self._factor))); w += _
39 @property
40 def factor(self): return self._factor
41 @factor.setter
42 def factor(self, value, /):
43 if self._efs: self._factor = max(self._min_factor, min(self._max_factor, value))
44 else: raise ValueError(f'{fullname(self)}: external_factor_settable=True not passed; cannot set factor')
45 @factor.deleter
46 def factor(self): self._factor = 1
47 def _adjust(self):
48 c = self._capacity
49 for b, t in getcontext().LEAKY_BUCKET_ADJMAP:
50 if c >= b: return self._adjust_from_params(*t)
51 def __enter__(self):
52 if self._drainer is None: self._drainer = self.make(self._drain())
53 return self
[docs]
54 def __exit__(self, /, *_):
55 if d := self._drainer: d.cancel(); self._drainer = None