Source code for asyncutils.mixins

 1__lazy_modules__ = frozenset(('asyncio',))
 2from asyncutils import safe_cancel_batch
 3from asyncutils._internal.submodules import mixins_all as __all__
 4import functools as F, asyncutils._internal.helpers as H
 5from abc import ABCMeta, abstractmethod
 6from asyncio import iscoroutine, timeout as _timeout
[docs] 7class LoopContextMixin(H.LoopMixinBase): 8 __slots__ = 'running_tasks', 9 def __init__(self): self._loop, self.running_tasks = H.get_loop_and_set(), set() 10 @property 11 def loop(self): return self._loop 12 def make(self, aw, /): (_ := self.running_tasks).add(t := super().make(aw)); t.add_done_callback(_.discard); return t
[docs] 13 async def __setup__(self): ...
[docs] 14 async def __cleanup__(self): ...
[docs] 15 async def __aenter__(self): await self.__setup__(); return self
[docs] 16 async def __aexit__(self, *_): await self.__cleanup__(); await safe_cancel_batch(self.running_tasks)
[docs] 17@H.subscriptable 18class AwaitableMixin(metaclass=ABCMeta): 19 __slots__ = ()
[docs] 20 def __await__(self): yield from self.wait().__await__()
[docs] 21 @abstractmethod 22 async def wait(self): raise NotImplementedError
[docs] 23@H.subscriptable 24class AsyncContextMixin(metaclass=ABCMeta): 25 __slots__ = ()
[docs] 26 def __enter__(self): return self
[docs] 27 @abstractmethod 28 def __exit__(self, /, *_): raise NotImplementedError
[docs] 29 async def __aenter__(self): return self.__enter__()
[docs] 30 async def __aexit__(self, /, *_): return self.__exit__(*_)
[docs] 31@H.subscriptable 32class ExecutorRequiredAsyncContextMixin(metaclass=ABCMeta):
[docs] 33 @F.cached_property 34 def runner(self): 35 if (l := getattr(self, 'loop', None)) is None is (l := getattr(self, '_loop', None)): self.loop = l = H.get_loop_and_set() 36 return F.partial(l.run_in_executor, H.create_executor(self, False)) # ty: ignore[unresolved-attribute]
[docs] 37 def __enter__(self): return self
[docs] 38 @abstractmethod 39 def __exit__(self, /, *_): raise NotImplementedError
[docs] 40 async def __aenter__(self): return await self.runner(self.__enter__)
[docs] 41 async def __aexit__(self, /, *_): return await self.runner(self.__exit__, *_)
[docs] 42@H.subscriptable 43class LockMixin(metaclass=ABCMeta): 44 __slots__ = ()
[docs] 45 def __init_subclass__(cls, *, _lock_factory_=lambda _: None, **_): cls._lock_factory = _lock_factory_; super().__init_subclass__(**_)
[docs] 46 @abstractmethod 47 async def acquire(self): raise NotImplementedError
[docs] 48 @abstractmethod 49 def release(self): raise NotImplementedError
[docs] 50 @abstractmethod 51 def locked(self): raise NotImplementedError
[docs] 52 async def __aenter__(self): 53 if await self.acquire(): return self._lock_factory() 54 raise RuntimeError('asyncutils.mixins.LockMixin: failed to acquire lock')
[docs] 55 async def __aexit__(self, *_): 56 if iscoroutine(a := self.release()): await a
[docs] 57 def acknowledge_locksmith_lock_held(self, _, /): return True # noqa: PLR6301
[docs] 58@H.subscriptable 59class LockWithOwnerMixin(LockMixin): 60 __slots__ = () 61 @property 62 @abstractmethod 63 def is_owner(self): raise NotImplementedError
[docs] 64 @abstractmethod 65 def _release(self): raise NotImplementedError
[docs] 66 def release(self): 67 if not self.is_owner: raise RuntimeError(f'{H.fullname(self)} is not acquired by current task') 68 return self._release()
[docs] 69@H.subscriptable 70class EventMixin(AwaitableMixin, H.LoopMixinBase, metaclass=ABCMeta): 71 __slots__ = ()
[docs] 72 @abstractmethod 73 async def wait_for_next(self, timeout=None, **_): raise NotImplementedError
[docs] 74 @abstractmethod 75 def is_set(self): raise NotImplementedError
[docs] 76 @abstractmethod 77 def get(self): raise NotImplementedError
[docs] 78 @abstractmethod 79 def set(self, value): raise NotImplementedError
[docs] 80 @abstractmethod 81 def clear(self): raise NotImplementedError
[docs] 82 async def wait_for_value(self, val, timeout=None, *, set_at_timeout=False): 83 try: 84 async with _timeout(timeout): 85 while val is not await self: continue 86 except TimeoutError: 87 if set_at_timeout: self.set(val) 88 raise
[docs] 89 async def wait(self, timeout=None, **k): 90 try: return self.get() 91 except ValueError: return await self.wait_for_next(timeout, **k)