1import asyncio as I, asyncutils as A
2from asyncutils._internal.compat import Queue, QueueEmpty, QueueFull, QueueShutDown
3from asyncutils._internal.helpers import copy_and_clear, fullname, subscriptable
4from asyncutils._internal.submodules import processors_all as __all__
5from _functools import partial
6from time import monotonic
[docs]
7@subscriptable
8class BoundedBatchProcessor:
9 __slots__ = '_batch', '_processor', '_sem'
10 def __init__(self, processor, batch=None, max_concurrent=None): C = A.getcontext(); self._processor, self._batch, self._sem = processor, C.BOUNDED_BATCH_PROCESSOR_DEFAULT_BATCH_SIZE if batch is None else batch, I.Semaphore(C.BOUNDED_BATCH_PROCESSOR_DEFAULT_MAX_CONCURRENT if max_concurrent is None else max_concurrent)
[docs]
11 async def process(self, items):
12 f, p, s = partial(A.collect, A.iter_to_agen(items), self._batch), self._processor, self._sem
13 while b := await f():
14 async with s: x = await p(b)
15 yield x # noqa: RUF070
[docs]
16@subscriptable
17class BatchProcessor(A.LoopContextMixin):
18 __slots__ = '_batch', '_flusher', '_last_process', '_lock', '_maxsize', '_processor', '_sleep', '_timer'
19 def __init__(self, processor, *, maxsize=None, maxtime=None, timer=monotonic): C = A.getcontext(); self._processor, self._maxsize, self._sleep, self._batch, self._last_process, self._lock, self._timer = processor, C.BATCH_PROCESSOR_DEFAULT_MAX_SIZE if maxsize is None else maxsize, I.sleep.__get__(C.BATCH_PROCESSOR_DEFAULT_MAX_TIME if maxtime is None else maxtime), [], timer(), I.Lock(), timer; self._flusher = None
[docs]
20 async def add(self, item):
21 async with self._lock:
22 (b := self._batch).append(item)
23 if len(b) >= self._maxsize: return await self._process()
24 if self._flusher is None: self._flusher = self.make(self._schedule_flush())
25 async def _schedule_flush(self): await self._sleep(); await self.flush(); self._flusher = None
26 async def _process(self):
27 if not (b := self._batch): return
28 b, self._last_process = copy_and_clear(b), self._timer()
29 await self._processor(b)
[docs]
30 async def flush(self):
31 async with self._lock:
32 if self._batch: await self._process()
33 @property
34 def time_since_last_process(self): return self._timer()-self._last_process
[docs]
35 async def __setup__(self): super().__init__()
[docs]
36 async def __cleanup__(self):
37 if (f := self._flusher) is not None: await A.safe_cancel(f)
[docs]
38class Bulkhead(A.LoopContextMixin):
39 __slots__ = '_exc', '_init_val', '_max_rej', '_mtevt', '_processor', '_queue', '_rejected', '_sdfut', '_sem'
40 def __init__(self, max_concurrent, *, max_queue=None, max_rej=None, exc=Exception, processor=None):
41 if max_concurrent <= 0: raise ValueError('asyncutils.processors.Bulkhead: max_concurrent must be positive')
42 C = A.getcontext()
43 if max_queue is None: max_queue = C.BULKHEAD_DEFAULT_MAX_QUEUE
44 if max_rej is None: max_rej = C.BULKHEAD_DEFAULT_MAX_REJ
45 if max_queue <= 0: raise ValueError('asyncutils.processors.Bulkhead: max_queue must be positive')
46 super().__init__(); self._sem, self._queue, self._rejected, self._init_val, self._exc, self._processor, self._sdfut, self._mtevt, self._max_rej = I.Semaphore(max_concurrent), Queue(max_queue), 0, max_concurrent, exc, processor, self.make_fut(), I.Event(), max_rej
[docs]
47 async def execute(self, coro):
48 try: self._queue.put_nowait(coro)
49 except QueueFull as e:
50 if (x := self._rejected) == self._max_rej: await self.shutdown(); raise A.BulkheadShutDown(f'{fullname(self)} has been shutdown because too many tasks were rejected') from e
51 self._rejected = x+1; raise A.BulkheadFull(f'{fullname(self)} queue full') from None
52 if self.is_shutdown: raise A.BulkheadShutDown(f'{fullname(self)} is shutting down')
53 async with self._sem:
54 try: await (await self._queue.get())
55 except (QueueEmpty, QueueShutDown, I.CancelledError): raise A.BulkheadShutDown(f'{fullname(self)} is shutting down') from None
56 except self._exc as e:
57 if p := self._processor: await p(e)
58 getattr(self._mtevt, 'clear' if self.active_tasks else 'set')()
[docs]
59 async def __cleanup__(self): await self.shutdown()
60 @property
61 def available_slots(self): return self._sem._value
62 @property
63 def active_tasks(self): return self._init_val-self.available_slots
64 @property
65 def curr_qsize(self): return self._queue.qsize()
66 @property
67 def max_qsize(self): return self._queue.maxsize
68 @property
69 def available_qslots(self): return m-self.curr_qsize if (m := self.max_qsize) > 0 else float('inf')
70 @property
71 def is_shutdown(self): return self._sdfut.done()
72 @property
73 def rejected(self): return self._rejected
[docs]
74 async def wait_until_idle(self, timeout=None): await I.wait_for(self._mtevt.wait(), timeout)
[docs]
75 def wait_for_shutdown(self, timeout=None): return I.wait_for(self._sdfut, timeout)
[docs]
76 async def shutdown(self, timeout=None):
77 self._sdfut.set_result(None); (h := (q := self._queue).shutdown)(); r = []
78 try:
79 async with I.timeout(timeout):
80 await self._mtevt.wait(); a = (s := self._sem).acquire
81 while s._value: await a()
82 except TimeoutError:
83 f, g = r.append, q.get_nowait
84 while True:
85 try: f(g())
86 except: h(True); break # noqa: E722
87 return r