Source code for asyncutils.signals

 1import asyncutils as A, signal as B, sys as M
 2from asyncutils._internal import log, helpers as H
 3from asyncutils._internal.running_console import getc
 4from asyncutils._internal.submodules import signals_all as __all__
 5from asyncio.tasks import wait_for
[docs] 6async def wait_for_signal(p, /, *S, timeout=None, raise_on_timeout=False, loop=None, possible_errors=(Exception,), default_on_processor_failure=None, sigs=None, logger=log): # noqa: PLR0912,PLR0915 7 if loop is None: loop = H.get_loop_and_set() 8 P, x, a, h = (S := {*S, *(A.getcontext().WAIT_FOR_SIGNAL_DEFAULT_SIGNALS if sigs is None else sigs)}).pop, 0, (F := loop.create_future()).add_done_callback, lambda s, _=None, F=F: F.done() or F.set_result(B.Signals(s)); M.audit('asyncutils.signals.wait_for_signal', S) 9 if M.platform == 'win32': 10 if getc() is None: __import__('_warnings').warn('signals.wait_for_signal has limited functionality on Windows', RuntimeWarning, 2) 11 while S: 12 s = P() 13 try: o = B.signal(s := B.Signals(s), h) 14 except ValueError: logger.exception('signals.wait_for_signal: invalid signal %r', s) 15 except OSError: logger.exception('signals.wait_for_signal: OS-level error for signal %s', s.name) # ty: ignore[unresolved-attribute] 16 else: a(lambda _, s=s, o=o: B.signal(s, o)); x += 1; logger.debug('signals.wait_for_signal: registered handler for signal %s', s.name) 17 else: 18 while S: 19 s = P() 20 try: o = B.getsignal(s := B.Signals(s)) 21 except ValueError: logger.exception('signals.wait_for_signal: invalid signal %r', s); continue 22 try: loop.add_signal_handler(s, h, s) 23 except NotImplementedError: break 24 except PermissionError: logger.exception('signals.wait_for_signal: insufficient permissions for signal %s', s.name) 25 except OSError: logger.exception('signals.wait_for_signal: OS-level error for signal %s', s.name) 26 except RuntimeError: logger.exception('signals.wait_for_signal: error registering signal handler for signal %s', s.name) 27 else: a(lambda _, s=s, o=o: loop.remove_signal_handler(s) and B.signal(s, o)); x += 1; logger.debug('signals.wait_for_signal: registered handler for signal %s', s.name) 28 try: 29 if x: logger.info('signals.wait_for_signal: signal handler registered successfully for total of %d signals', x); del x 30 else: raise RuntimeError('asyncutils.signals.wait_for_signal: failed to register signal handler') 31 try: s = await wait_for(F, timeout) 32 except TimeoutError: 33 if raise_on_timeout: raise 34 return logger.warning('signals.wait_for_signal: timed out') 35 logger.info('signals.wait_for_signal: signal received: %s', s.name) 36 try: 37 r = p(s) 38 with A.ignore_typeerrs: r = await r 39 except possible_errors: logger.exception('signals.wait_for_signal processor %r encountered error for signal %s', p, s); return default_on_processor_failure 40 except A.CRITICAL: raise A.Critical 41 except BaseException as e: raise RuntimeError(f'asyncutils.signals.wait_for_signal: unexpected {H.fullname(e)} in processor {p!r} for signal {s.name}') from e 42 return r 43 finally: await A.safe_cancel(F)
44wait_for_signal.__text_signature__ = '(processor, /, *signals, sigs=None, timeout=None, raise_on_timeout=False, loop=None, possible_errors={0}, default_on_processor_failure=None, logger={0})' # ty: ignore[unresolved-attribute]