Source code for asyncutils.mixins
1__lazy_modules__ = frozenset(('asyncio',))
2from asyncutils import safe_cancel_batch
3from asyncutils._internal.submodules import mixins_all as __all__
4import functools as F, asyncutils._internal.helpers as H
5from abc import ABCMeta, abstractmethod
6from asyncio import iscoroutine, timeout as _timeout
[docs]
7class LoopContextMixin(H.LoopMixinBase):
8 __slots__ = 'running_tasks',
9 def __init__(self): self._loop, self.running_tasks = H.get_loop_and_set(), set()
10 @property
11 def loop(self): return self._loop
12 def make(self, aw, /): (_ := self.running_tasks).add(t := super().make(aw)); t.add_done_callback(_.discard); return t
[docs]
13 async def __setup__(self): ...
[docs]
14 async def __cleanup__(self): ...
[docs]
15 async def __aenter__(self): await self.__setup__(); return self
[docs]
16 async def __aexit__(self, *_): await self.__cleanup__(); await safe_cancel_batch(self.running_tasks)
[docs]
17@H.subscriptable
18class AwaitableMixin(metaclass=ABCMeta):
19 __slots__ = ()
[docs]
20 def __await__(self): yield from self.wait().__await__()
[docs]
21 @abstractmethod
22 async def wait(self): raise NotImplementedError
[docs]
23@H.subscriptable
24class AsyncContextMixin(metaclass=ABCMeta):
25 __slots__ = ()
[docs]
26 def __enter__(self): return self
[docs]
27 @abstractmethod
28 def __exit__(self, /, *_): raise NotImplementedError
[docs]
29 async def __aenter__(self): return self.__enter__()
[docs]
30 async def __aexit__(self, /, *_): return self.__exit__(*_)
[docs]
31@H.subscriptable
32class ExecutorRequiredAsyncContextMixin(metaclass=ABCMeta):
[docs]
33 @F.cached_property
34 def runner(self):
35 if (l := getattr(self, 'loop', None)) is None is (l := getattr(self, '_loop', None)): self.loop = l = H.get_loop_and_set()
36 return F.partial(l.run_in_executor, H.create_executor(self, False)) # ty: ignore[unresolved-attribute]
[docs]
37 def __enter__(self): return self
[docs]
38 @abstractmethod
39 def __exit__(self, /, *_): raise NotImplementedError
[docs]
40 async def __aenter__(self): return await self.runner(self.__enter__)
[docs]
41 async def __aexit__(self, /, *_): return await self.runner(self.__exit__, *_)
[docs]
42@H.subscriptable
43class LockMixin(metaclass=ABCMeta):
44 __slots__ = ()
[docs]
45 def __init_subclass__(cls, *, _lock_factory_=lambda _: None, **_): cls._lock_factory = _lock_factory_; super().__init_subclass__(**_)
[docs]
46 @abstractmethod
47 async def acquire(self): raise NotImplementedError
[docs]
48 @abstractmethod
49 def release(self): raise NotImplementedError
[docs]
50 @abstractmethod
51 def locked(self): raise NotImplementedError
[docs]
52 async def __aenter__(self):
53 if await self.acquire(): return self._lock_factory()
54 raise RuntimeError('asyncutils.mixins.LockMixin: failed to acquire lock')
[docs]
55 async def __aexit__(self, *_):
56 if iscoroutine(a := self.release()): await a
[docs]
57 def acknowledge_locksmith_lock_held(self, _, /): return True # noqa: PLR6301
[docs]
58@H.subscriptable
59class LockWithOwnerMixin(LockMixin):
60 __slots__ = ()
61 @property
62 @abstractmethod
63 def is_owner(self): raise NotImplementedError
[docs]
64 @abstractmethod
65 def _release(self): raise NotImplementedError
[docs]
66 def release(self):
67 if not self.is_owner: raise RuntimeError(f'{H.fullname(self)} is not acquired by current task')
68 return self._release()
[docs]
69@H.subscriptable
70class EventMixin(AwaitableMixin, H.LoopMixinBase, metaclass=ABCMeta):
71 __slots__ = ()
[docs]
72 @abstractmethod
73 async def wait_for_next(self, timeout=None, **_): raise NotImplementedError
[docs]
74 @abstractmethod
75 def is_set(self): raise NotImplementedError
[docs]
76 @abstractmethod
77 def get(self): raise NotImplementedError
[docs]
78 @abstractmethod
79 def set(self, value): raise NotImplementedError
[docs]
80 @abstractmethod
81 def clear(self): raise NotImplementedError
[docs]
82 async def wait_for_value(self, val, timeout=None, *, set_at_timeout=False):
83 try:
84 async with _timeout(timeout):
85 while val is not await self: continue
86 except TimeoutError:
87 if set_at_timeout: self.set(val)
88 raise
[docs]
89 async def wait(self, timeout=None, **k):
90 try: return self.get()
91 except ValueError: return await self.wait_for_next(timeout, **k)