1import asyncutils as A, signal as B, sys as M
2from asyncutils._internal import log, helpers as H
3from asyncutils._internal.running_console import getc
4from asyncutils._internal.submodules import signals_all as __all__
5from asyncio.tasks import wait_for
[docs]
6async def wait_for_signal(p, /, *S, timeout=None, raise_on_timeout=False, loop=None, possible_errors=(Exception,), default_on_processor_failure=None, sigs=None, logger=log): # noqa: PLR0912,PLR0915
7 if loop is None: loop = H.get_loop_and_set()
8 P, x, a, h = (S := {*S, *(A.getcontext().WAIT_FOR_SIGNAL_DEFAULT_SIGNALS if sigs is None else sigs)}).pop, 0, (F := loop.create_future()).add_done_callback, lambda s, _=None, F=F: F.done() or F.set_result(B.Signals(s)); M.audit('asyncutils.signals.wait_for_signal', S)
9 if M.platform == 'win32':
10 if getc() is None: __import__('_warnings').warn('signals.wait_for_signal has limited functionality on Windows', RuntimeWarning, 2)
11 while S:
12 s = P()
13 try: o = B.signal(s := B.Signals(s), h)
14 except ValueError: logger.exception('signals.wait_for_signal: invalid signal %r', s)
15 except OSError: logger.exception('signals.wait_for_signal: OS-level error for signal %s', s.name) # ty: ignore[unresolved-attribute]
16 else: a(lambda _, s=s, o=o: B.signal(s, o)); x += 1; logger.debug('signals.wait_for_signal: registered handler for signal %s', s.name)
17 else:
18 while S:
19 s = P()
20 try: o = B.getsignal(s := B.Signals(s))
21 except ValueError: logger.exception('signals.wait_for_signal: invalid signal %r', s); continue
22 try: loop.add_signal_handler(s, h, s)
23 except NotImplementedError: break
24 except PermissionError: logger.exception('signals.wait_for_signal: insufficient permissions for signal %s', s.name)
25 except OSError: logger.exception('signals.wait_for_signal: OS-level error for signal %s', s.name)
26 except RuntimeError: logger.exception('signals.wait_for_signal: error registering signal handler for signal %s', s.name)
27 else: a(lambda _, s=s, o=o: loop.remove_signal_handler(s) and B.signal(s, o)); x += 1; logger.debug('signals.wait_for_signal: registered handler for signal %s', s.name)
28 try:
29 if x: logger.info('signals.wait_for_signal: signal handler registered successfully for total of %d signals', x); del x
30 else: raise RuntimeError('asyncutils.signals.wait_for_signal: failed to register signal handler')
31 try: s = await wait_for(F, timeout)
32 except TimeoutError:
33 if raise_on_timeout: raise
34 return logger.warning('signals.wait_for_signal: timed out')
35 logger.info('signals.wait_for_signal: signal received: %s', s.name)
36 try:
37 r = p(s)
38 with A.ignore_typeerrs: r = await r
39 except possible_errors: logger.exception('signals.wait_for_signal processor %r encountered error for signal %s', p, s); return default_on_processor_failure
40 except A.CRITICAL: raise A.Critical
41 except BaseException as e: raise RuntimeError(f'asyncutils.signals.wait_for_signal: unexpected {H.fullname(e)} in processor {p!r} for signal {s.name}') from e
42 return r
43 finally: await A.safe_cancel(F)
44wait_for_signal.__text_signature__ = '(processor, /, *signals, sigs=None, timeout=None, raise_on_timeout=False, loop=None, possible_errors={0}, default_on_processor_failure=None, logger={0})' # ty: ignore[unresolved-attribute]