Source code for asyncutils.pools

  1import asyncutils as A, asyncio as I
  2from asyncutils._internal.compat import PriorityQueue
  3from asyncutils._internal.helpers import LoopMixinBase, fullname, subscriptable
  4from asyncutils._internal.submodules import pools_all as __all__
  5from _functools import partial
  6from itertools import count, repeat
  7from threading import Thread, Lock as TLock
  8from time import monotonic
[docs] 9class AdvancedPool(A.LoopContextMixin): 10 __slots__ = '__cnt', '_current', '_kill_at_exit', '_max', '_min', '_pending', '_queue', '_scaling', '_sdevt', '_shutdown', '_start', '_workers', 'completed' 11 @property 12 def _tiebreak(self): return self.__cnt.__next__() 13 def __init__(self, max_workers=None, min_workers=None, qsize=0, scaling=True, kill_at_exit=False): super().__init__(); C = A.getcontext(); self.__cnt, self._tlock, self._max, self._scaling, self._queue, self._workers, self._futures, self._pending, self._sdevt, self._shutdown, self._start, self.completed, self._kill_at_exit = count(), TLock(), C.ADVANCED_POOL_DEFAULT_MAX_WORKERS if max_workers is None else max_workers, scaling, PriorityQueue(qsize), set(), set(), 0, I.Event(), False, monotonic(), 0, kill_at_exit; self._current = self._min = C.ADVANCED_POOL_DEFAULT_MIN_WORKERS if min_workers is None else min_workers 14 def __repr__(self): return f'{fullname(self)}({self._max}, {self._min}, {self._queue.maxsize}, {self._scaling}, {self._kill_at_exit})' 15 async def _threadsafe_get(self): 16 with self._tlock: return await self._queue.get() 17 def _threadsafe_task_done(self): 18 with self._tlock: self._queue.task_done() 19 def _worker_loop(self, _): 20 x, g, G = 0, (L := self.loop).call_soon_threadsafe, self._threadsafe_get 21 try: # noqa: PLW0717 22 while not self._shutdown: 23 if (F := A.sync_await(G(), loop=L)[2]) is None: self._threadsafe_task_done(); break 24 f, a, k, F = F 25 try: F.set_result(f(*a, **k)) 26 except BaseException as e: F.set_exception(e) # noqa: BLE001 27 finally: 28 self._threadsafe_task_done(); x += 1 29 with self._tlock: self.completed += 1; self._pending -= 1 30 except BaseException as e: g(_.set_exception, A.Critical(e) if isinstance(e, A.CRITICAL) else e) # noqa: BLE001 31 else: g(_.set_result, x) 32 def _scale_to(self, new): 33 if (d := new-self._current) > 0: 34 a, b, f, g = self._workers.add, self._futures.add, self._worker_loop, self.make_fut 35 for _ in repeat(None, d): (T := Thread(target=f, args=(F := g(),))).start(); a(T); b(F) 36 elif d < 0: 37 f = self._put_nowait_priority 38 for _ in repeat(None, -d): f(0, None) 39 self._current = new 40 def _put_nowait_priority(self, priority, item): self._queue.put_nowait((priority, self._tiebreak, item)) 41 def _set_adjuster(self): 42 if not self._scaling: return 43 C = A.getcontext() 44 if (l := self._pending/((c := self._current) or 1)) > C.ADVANCED_POOL_THRESHOLD_HI and c < (M := self._max): self._scale_to(min(M, c+max(1, c>>1))) 45 elif l < C.ADVANCED_POOL_THRESHOLD_LO and c > (m := self._min): self._scale_to(max(m, c-max(1, int(c*C.ADVANCED_POOL_FACTOR)))) 46 async def wait_for_shutdown(self): return await self._sdevt.wait()
[docs] 47 def raise_for_shutdown(self): 48 if self._shutdown: raise A.PoolShutDown(f'{fullname(self)} is shutting down')
[docs] 49 def submit_nowait(self, f, *a, _priority_=0, **k): 50 self.raise_for_shutdown() 51 if self.full: raise A.PoolFull('asyncutils.pool.AdvancedPool.submit_nowait: task queue full') 52 with self._tlock: self._pending += 1 53 self._put_nowait_priority(_priority_, (f, a, k, F := self.make_fut())); self._set_adjuster(); return F
54 async def _kill_helper(self): 55 f, g = (q := self._queue).get_nowait, q.task_done 56 with self._tlock, A.ignore_qempty: 57 while True: 58 if (F := f()[2]) is not None: await A.safe_cancel(F[-1]) 59 g()
[docs] 60 async def complete(self, *a, **k): return await (await self.submit(*a, **k))
[docs] 61 async def submit(self, f, *a, _priority_=0, **k): 62 self.raise_for_shutdown() 63 with self._tlock: self._pending += 1 64 await self._queue.put((_priority_, self._tiebreak, (f, a, k, F := self.make_fut()))); self._set_adjuster(); return F
[docs] 65 async def shutdown(self, cancel_pending=False, idle_timeout=None): 66 if self._shutdown: return await self.wait_for_shutdown() 67 if cancel_pending: await self._kill_helper() 68 else: 69 try: await self.wait_for_slot(idle_timeout) 70 except A.PoolFull: await self._kill_helper() 71 p = (q := self._queue).put 72 for _ in repeat(None, self._current): await p((0, self._tiebreak, None)) 73 with self._tlock: q.shutdown(True) 74 await self.join(); self._sdevt.set(); return self.uptime
[docs] 75 async def join(self): return await I.gather(*self._futures, return_exceptions=True)
[docs] 76 async def map(self, f, /, *its, priority=0, strict=False): return await A.agather(A.amap(partial(self.complete, f, _priority_=priority), *its, strict=strict))
[docs] 77 async def starmap(self, f, it, /, priority=0): return await A.agather(A.astarmap(partial(self.complete, f, _priority_=priority), it))
[docs] 78 async def doublestarmap(self, f, it, /, priority=0): return await A.agather(A.adoublestarmap(partial(self.complete, f, _priority_=priority), it))
[docs] 79 async def starmap_with_kwds(self, f, it, /, priority=0): return await A.agather(A.astarmap_with_kwds(partial(self.complete, f, _priority_=priority), it))
[docs] 80 async def resize(self, min_workers, max_workers): M = max(max_workers, m := max(1, min_workers)); self._scale_to(min(max(self._current, m), M)); self._min, self._max = m, M
[docs] 81 def drain(self): return self._queue.join()
[docs] 82 async def wait_for_slot(self, timeout=None): 83 self.raise_for_shutdown() 84 if not self.full: return 0.0 85 try: t = monotonic(); await I.wait_for(self.drain(), timeout); return monotonic()-t 86 except TimeoutError: raise A.PoolFull('asyncutils.pools.AdvancedPool: timeout waiting for queue space') from None
[docs] 87 async def __cleanup__(self): await self.shutdown(self._kill_at_exit)
[docs] 88 def __del__(self): 89 if self.loop.is_running(): self.make(self.shutdown(True, 0.03))
90 @property 91 def full(self): return self._queue.full() 92 @property 93 def empty(self): return self._queue.empty() 94 @property 95 def qsize(self): return self._queue.qsize() 96 @property 97 def idle(self): return self.empty and not self._pending 98 @property 99 def uptime(self): return monotonic()-self._start
[docs] 100@subscriptable 101class ConnectionPool(LoopMixinBase): 102 __slots__ = '_available', '_cleaner', '_creation_times', '_factory', '_healthchecker', '_in_use', '_lock', '_maintainer', '_pool', 'maxlife', 'maxsize', 'minsize' 103 def __init__(self, factory, maxsize=None, minsize=None, maxlife=None, healthchecker=None, cleaner=None): C = A.getcontext(); self._factory, self.maxsize, self.minsize, self.maxlife, self._healthchecker, self._cleaner, self._pool, self._in_use, self._creation_times, self._lock, self._available, self._maintainer = factory, C.CONNECTION_POOL_DEFAULT_MAX_SIZE if maxsize is None else maxsize, C.CONNECTION_POOL_DEFAULT_MIN_SIZE if minsize is None else minsize, C.CONNECTION_POOL_DEFAULT_MAX_LIFE if maxlife is None else maxlife, healthchecker or (lambda _: True), cleaner or (lambda _: None), [], set(), {}, I.Lock(), I.Event(), None
[docs] 104 def _is_healthy(self, conn, /): return self._healthchecker(conn) and not ((t := self._creation_times.get(id(conn))) and monotonic()-t > self.maxlife)
[docs] 105 async def create_connection(self, *a, _executor_=None, **k): self._creation_times[id(c := self.loop.run_in_executor(_executor_, partial(self._factory, *a, **k)))] = monotonic(); return await c
[docs] 106 async def acquire(self, *a, **k): 107 p = self._pool 108 async with self._lock: 109 while p: 110 if self._is_healthy(c := p.pop()): self._in_use.add(c); return c 111 self._cleaner(c) 112 if self.currsize < self.maxsize: self._in_use.add(c := await self.create_connection(*a, **k)); return c 113 await self._available.wait(); return await self.acquire(*a, **k)
[docs] 114 def release(self, c, /, *a, **k): 115 self._in_use.discard(c) 116 if self._is_healthy(c) and len(self._pool) < self.maxsize: self._pool.append(c); self._available.set(); self._available.clear() 117 else: 118 self._cleaner(c) 119 if self.currsize < self.minsize: self.make(self.create_connection(*a, **k))
[docs] 120 async def _maintain(self): 121 f = I.sleep.__get__(A.getcontext().CONNECTION_POOL_MAINTENANCE_INTERVAL) 122 while True: 123 await f(); n, g = [], self.create_connection 124 for c in self._pool: (n.append if self._healthchecker(c) else self._cleaner)(c) 125 async with self._lock: n.extend(await I.gather(*(g() for _ in repeat(None, self.minsize-self.currsize)))); self._pool = n
[docs] 126 async def start(self, akgen=None, executor=None): 127 f = self.create_connection 128 if akgen is None: 129 for _ in repeat(None, self.minsize): await f(_executor_=executor) 130 else: 131 async for a, k in A.take(akgen, self.minsize, default=((), {})): await f(*a, _executor_=executor, **k) 132 self._maintainer = self.make(self._maintain())
[docs] 133 async def stop(self): 134 if m := self._maintainer: await A.safe_cancel(m) 135 p, c, f = (P := self._pool).pop, self._cleaner, (u := self._in_use).pop 136 while P: c(p()) 137 while u: c(f())
[docs] 138 async def __aenter__(self): await self.start(); return self
[docs] 139 async def __aexit__(self, /, *_): await self.stop()
140 @property 141 def currsize(self): return self.available+self.in_use 142 @property 143 def available(self): return len(self._pool) 144 @property 145 def in_use(self): return len(self._in_use)