1__lazy_modules__ = frozenset(('asyncio.events',))
2from asyncutils._internal import patch as P
3import asyncio.events as E, sys as S
[docs]
4def filter_out(*a, s=None): yield from filter(lambda x, s=s: s is not x, a)
[docs]
5def get_loop_and_set(_=(lambda l: l.stop() or l.close()).__get__, f=__import__('atexit').register):
6 if (l := E._get_running_loop()) is None: f(_(l := E.new_event_loop())); E.set_event_loop(l)
7 S.audit('asyncutils/get_loop_and_set', l); return l
[docs]
8def check_methods(obj, /, *meth):
9 M = obj.__class__.__mro__
10 for m in meth:
11 for b in M:
12 if (_ := b.__dict__.get(m, obj)) is None: return False
13 if _ is not obj: break
14 else: return False
15 return True
[docs]
16def copy_and_clear(l): r = l.copy(); l.clear(); return r
[docs]
17def ismodule(o, /, _=frozenset(('asyncutils._internal.initialize.Module', 'builtins.module'))): return fullname(type(o)) in _
[docs]
18def subscriptable(cls, /, _=classmethod(type(list[int]))): cls.__class_getitem__ = _; return cls
[docs]
19def check(a, b, /): return a is b or (False if (e := b.__eq__(a)) is NotImplemented else e)
[docs]
20def coerce_callable(o, /): return o if callable(o) else type(o)
[docs]
21def create_executor(f, /, save=True): # pragma: no cover
22 S.audit('asyncutils/create_executor', f'{(F := coerce_callable(f)).__module__.removeprefix('asyncutils.')}.{F.__qualname__}'); from asyncutils import Executor; e = Executor()
23 if save: f.executor = e
24 return e
[docs]
25def fullname(f, /, rmpref=False, _=('__module__', '__qualname__')): f = coerce_callable(f); n = '.'.join(filter_out(*(getattr(f, a, None) for a in _))); return n.removeprefix('asyncutils.') if rmpref else n
[docs]
26def audit_fullname(f, /, rmpref=False): S.audit(fullname(f, rmpref))
[docs]
27def verify_compat(v, /, _='This module is only for python %s or under'):
28 a, b = v.split('.', 1)
29 if (int(a), int(b)+1) <= S.version_info: raise ImportError(_%v)
[docs]
30async def simple_wrap(aw, /): return await aw
[docs]
31class LoopMixinBase:
32 __slots__ = '_loop',
33 @property
34 def loop(self):
35 if (l := getattr(self, '_loop', None)) is None: self._loop = l = get_loop_and_set()
36 elif l is not E._get_running_loop(): raise RuntimeError('could not bind loop')
37 return l
[docs]
38 def make(self, aw, /): return self.loop.create_task(simple_wrap(aw))
[docs]
39 def make_fut(self): return self.loop.create_future()
[docs]
40 def make_multiple(self, aws, /): yield from map(self.make, aws)
[docs]
41@subscriptable
42class Bag(dict):
43 __slots__, __setattr__, __delattr__ = (), dict.__setitem__, dict.__delitem__
[docs]
44 def __getattr__(self, k, /):
45 try: return self[k]
46 except KeyError: raise AttributeError(name=k, obj=self) from None
47P.patch_function_signatures((ismodule, 'o, /'), (subscriptable, 'cls, /'), (fullname, 'f, /, rmpref=False'), (verify_compat, 'v, /'), (get_loop_and_set, ''))
48del P