1import asyncutils as A, asyncio as I
2from asyncutils._internal.compat import PriorityQueue
3from asyncutils._internal.helpers import LoopMixinBase, fullname, subscriptable
4from asyncutils._internal.submodules import pools_all as __all__
5from _functools import partial
6from itertools import count, repeat
7from threading import Thread, Lock as TLock
8from time import monotonic
[docs]
9class AdvancedPool(A.LoopContextMixin):
10 __slots__ = '__cnt', '_current', '_kill_at_exit', '_max', '_min', '_pending', '_queue', '_scaling', '_sdevt', '_shutdown', '_start', '_workers', 'completed'
11 @property
12 def _tiebreak(self): return self.__cnt.__next__()
13 def __init__(self, max_workers=None, min_workers=None, qsize=0, scaling=True, kill_at_exit=False): super().__init__(); C = A.getcontext(); self.__cnt, self._tlock, self._max, self._scaling, self._queue, self._workers, self._futures, self._pending, self._sdevt, self._shutdown, self._start, self.completed, self._kill_at_exit = count(), TLock(), C.ADVANCED_POOL_DEFAULT_MAX_WORKERS if max_workers is None else max_workers, scaling, PriorityQueue(qsize), set(), set(), 0, I.Event(), False, monotonic(), 0, kill_at_exit; self._current = self._min = C.ADVANCED_POOL_DEFAULT_MIN_WORKERS if min_workers is None else min_workers
14 def __repr__(self): return f'{fullname(self)}({self._max}, {self._min}, {self._queue.maxsize}, {self._scaling}, {self._kill_at_exit})'
15 async def _threadsafe_get(self):
16 with self._tlock: return await self._queue.get()
17 def _threadsafe_task_done(self):
18 with self._tlock: self._queue.task_done()
19 def _worker_loop(self, _):
20 x, g, G = 0, (L := self.loop).call_soon_threadsafe, self._threadsafe_get
21 try: # noqa: PLW0717
22 while not self._shutdown:
23 if (F := A.sync_await(G(), loop=L)[2]) is None: self._threadsafe_task_done(); break
24 f, a, k, F = F
25 try: F.set_result(f(*a, **k))
26 except BaseException as e: F.set_exception(e) # noqa: BLE001
27 finally:
28 self._threadsafe_task_done(); x += 1
29 with self._tlock: self.completed += 1; self._pending -= 1
30 except BaseException as e: g(_.set_exception, A.Critical(e) if isinstance(e, A.CRITICAL) else e) # noqa: BLE001
31 else: g(_.set_result, x)
32 def _scale_to(self, new):
33 if (d := new-self._current) > 0:
34 a, b, f, g = self._workers.add, self._futures.add, self._worker_loop, self.make_fut
35 for _ in repeat(None, d): (T := Thread(target=f, args=(F := g(),))).start(); a(T); b(F)
36 elif d < 0:
37 f = self._put_nowait_priority
38 for _ in repeat(None, -d): f(0, None)
39 self._current = new
40 def _put_nowait_priority(self, priority, item): self._queue.put_nowait((priority, self._tiebreak, item))
41 def _set_adjuster(self):
42 if not self._scaling: return
43 C = A.getcontext()
44 if (l := self._pending/((c := self._current) or 1)) > C.ADVANCED_POOL_THRESHOLD_HI and c < (M := self._max): self._scale_to(min(M, c+max(1, c>>1)))
45 elif l < C.ADVANCED_POOL_THRESHOLD_LO and c > (m := self._min): self._scale_to(max(m, c-max(1, int(c*C.ADVANCED_POOL_FACTOR))))
46 async def wait_for_shutdown(self): return await self._sdevt.wait()
[docs]
47 def raise_for_shutdown(self):
48 if self._shutdown: raise A.PoolShutDown(f'{fullname(self)} is shutting down')
[docs]
49 def submit_nowait(self, f, *a, _priority_=0, **k):
50 self.raise_for_shutdown()
51 if self.full: raise A.PoolFull('asyncutils.pool.AdvancedPool.submit_nowait: task queue full')
52 with self._tlock: self._pending += 1
53 self._put_nowait_priority(_priority_, (f, a, k, F := self.make_fut())); self._set_adjuster(); return F
54 async def _kill_helper(self):
55 f, g = (q := self._queue).get_nowait, q.task_done
56 with self._tlock, A.ignore_qempty:
57 while True:
58 if (F := f()[2]) is not None: await A.safe_cancel(F[-1])
59 g()
[docs]
60 async def complete(self, *a, **k): return await (await self.submit(*a, **k))
[docs]
61 async def submit(self, f, *a, _priority_=0, **k):
62 self.raise_for_shutdown()
63 with self._tlock: self._pending += 1
64 await self._queue.put((_priority_, self._tiebreak, (f, a, k, F := self.make_fut()))); self._set_adjuster(); return F
[docs]
65 async def shutdown(self, cancel_pending=False, idle_timeout=None):
66 if self._shutdown: return await self.wait_for_shutdown()
67 if cancel_pending: await self._kill_helper()
68 else:
69 try: await self.wait_for_slot(idle_timeout)
70 except A.PoolFull: await self._kill_helper()
71 p = (q := self._queue).put
72 for _ in repeat(None, self._current): await p((0, self._tiebreak, None))
73 with self._tlock: q.shutdown(True)
74 await self.join(); self._sdevt.set(); return self.uptime
[docs]
75 async def join(self): return await I.gather(*self._futures, return_exceptions=True)
[docs]
76 async def map(self, f, /, *its, priority=0, strict=False): return await A.agather(A.amap(partial(self.complete, f, _priority_=priority), *its, strict=strict))
[docs]
77 async def starmap(self, f, it, /, priority=0): return await A.agather(A.astarmap(partial(self.complete, f, _priority_=priority), it))
[docs]
78 async def doublestarmap(self, f, it, /, priority=0): return await A.agather(A.adoublestarmap(partial(self.complete, f, _priority_=priority), it))
[docs]
79 async def starmap_with_kwds(self, f, it, /, priority=0): return await A.agather(A.astarmap_with_kwds(partial(self.complete, f, _priority_=priority), it))
[docs]
80 async def resize(self, min_workers, max_workers): M = max(max_workers, m := max(1, min_workers)); self._scale_to(min(max(self._current, m), M)); self._min, self._max = m, M
[docs]
81 def drain(self): return self._queue.join()
[docs]
82 async def wait_for_slot(self, timeout=None):
83 self.raise_for_shutdown()
84 if not self.full: return 0.0
85 try: t = monotonic(); await I.wait_for(self.drain(), timeout); return monotonic()-t
86 except TimeoutError: raise A.PoolFull('asyncutils.pools.AdvancedPool: timeout waiting for queue space') from None
[docs]
87 async def __cleanup__(self): await self.shutdown(self._kill_at_exit)
[docs]
88 def __del__(self):
89 if self.loop.is_running(): self.make(self.shutdown(True, 0.03))
90 @property
91 def full(self): return self._queue.full()
92 @property
93 def empty(self): return self._queue.empty()
94 @property
95 def qsize(self): return self._queue.qsize()
96 @property
97 def idle(self): return self.empty and not self._pending
98 @property
99 def uptime(self): return monotonic()-self._start
[docs]
100@subscriptable
101class ConnectionPool(LoopMixinBase):
102 __slots__ = '_available', '_cleaner', '_creation_times', '_factory', '_healthchecker', '_in_use', '_lock', '_maintainer', '_pool', 'maxlife', 'maxsize', 'minsize'
103 def __init__(self, factory, maxsize=None, minsize=None, maxlife=None, healthchecker=None, cleaner=None): C = A.getcontext(); self._factory, self.maxsize, self.minsize, self.maxlife, self._healthchecker, self._cleaner, self._pool, self._in_use, self._creation_times, self._lock, self._available, self._maintainer = factory, C.CONNECTION_POOL_DEFAULT_MAX_SIZE if maxsize is None else maxsize, C.CONNECTION_POOL_DEFAULT_MIN_SIZE if minsize is None else minsize, C.CONNECTION_POOL_DEFAULT_MAX_LIFE if maxlife is None else maxlife, healthchecker or (lambda _: True), cleaner or (lambda _: None), [], set(), {}, I.Lock(), I.Event(), None
[docs]
104 def _is_healthy(self, conn, /): return self._healthchecker(conn) and not ((t := self._creation_times.get(id(conn))) and monotonic()-t > self.maxlife)
[docs]
105 async def create_connection(self, *a, _executor_=None, **k): self._creation_times[id(c := self.loop.run_in_executor(_executor_, partial(self._factory, *a, **k)))] = monotonic(); return await c
[docs]
106 async def acquire(self, *a, **k):
107 p = self._pool
108 async with self._lock:
109 while p:
110 if self._is_healthy(c := p.pop()): self._in_use.add(c); return c
111 self._cleaner(c)
112 if self.currsize < self.maxsize: self._in_use.add(c := await self.create_connection(*a, **k)); return c
113 await self._available.wait(); return await self.acquire(*a, **k)
[docs]
114 def release(self, c, /, *a, **k):
115 self._in_use.discard(c)
116 if self._is_healthy(c) and len(self._pool) < self.maxsize: self._pool.append(c); self._available.set(); self._available.clear()
117 else:
118 self._cleaner(c)
119 if self.currsize < self.minsize: self.make(self.create_connection(*a, **k))
[docs]
120 async def _maintain(self):
121 f = I.sleep.__get__(A.getcontext().CONNECTION_POOL_MAINTENANCE_INTERVAL)
122 while True:
123 await f(); n, g = [], self.create_connection
124 for c in self._pool: (n.append if self._healthchecker(c) else self._cleaner)(c)
125 async with self._lock: n.extend(await I.gather(*(g() for _ in repeat(None, self.minsize-self.currsize)))); self._pool = n
[docs]
126 async def start(self, akgen=None, executor=None):
127 f = self.create_connection
128 if akgen is None:
129 for _ in repeat(None, self.minsize): await f(_executor_=executor)
130 else:
131 async for a, k in A.take(akgen, self.minsize, default=((), {})): await f(*a, _executor_=executor, **k)
132 self._maintainer = self.make(self._maintain())
[docs]
133 async def stop(self):
134 if m := self._maintainer: await A.safe_cancel(m)
135 p, c, f = (P := self._pool).pop, self._cleaner, (u := self._in_use).pop
136 while P: c(p())
137 while u: c(f())
[docs]
138 async def __aenter__(self): await self.start(); return self
[docs]
139 async def __aexit__(self, /, *_): await self.stop()
140 @property
141 def currsize(self): return self.available+self.in_use
142 @property
143 def available(self): return len(self._pool)
144 @property
145 def in_use(self): return len(self._in_use)