Source code for asyncutils.console

  1from asyncutils import __version__ as V, config as C, exceptions as E
  2from asyncutils._internal import patch as P, running_console as R, log
  3from asyncutils._internal.helpers import fullname
  4from asyncutils._internal.submodules import console_all as __all__
  5import abc, sys as S
  6from asyncio import iscoroutine
  7from asyncio.futures import _chain_future
  8from itertools import repeat
  9from os import getenv as g
 10try: from _pyrepl.console import InteractiveColoredConsole as B # ty: ignore[unresolved-import]
 11except ImportError: from code import InteractiveConsole as B; C.basic_repl = True
 12_s = object()
 13_f = '',
[docs] 14class ConsoleBase(B, metaclass=abc.ABCMeta): 15 LOCALS_HANDLERS, interrupt_hooks, memerr_hooks, disallow_subclass_msg = __import__('collections').ChainMap(), (), (lambda self, f=getattr(S, '_clear_internal_caches' if S.version_info >= (3, 13) else '_clear_type_cache', None), g=__import__('gc').collect, d=log.debug: (f and f()) or self.write('MemoryError\n') or d('Emergency garbage collection after MemoryError: %s objects collected in total', g()),), 'cannot subclass %s'; default_local_exit = _unsubclassable = False 16 if C.basic_repl: CAN_USE_PYREPL = False # pragma: no cover 17 else: from _pyrepl.main import CAN_USE_PYREPL # ty: ignore[unresolved-import] 18 def __init__(self, loop, mod=None, modname=None, *, context_factory=__import__('_contextvars').copy_context, _f=_f, _s=_s, _m='cannot %s event loop within REPL', g=globals().get, _={'__cached__': 'cached', '__file__': 'origin', '__package__': 'parent', '__loader__': 'submodule_search_locations'}, _r=E.raise_exc): # noqa: B006 19 S.audit(fullname(type(self)), loop) 20 if modname is None: modname = self.NAME 21 if mod is None: mod = __import__(modname, fromlist=_f if '.' in modname else ()) 22 def stop(p=None, /, _=loop.stop, *, asap=False): 23 if p is _s: _() if asap else loop.call_soon_threadsafe(_) 24 else: raise RuntimeError(_m%'stop') 25 def close(p=None, /, _=loop.close): 26 if p is _s: _() 27 else: raise RuntimeError(_m%'close') 28 loop.stop, loop.close, self._internal_is_running, self.memory_errors, self._loop, self.context, self.exc, self._fut, (d := {})[modname] = stop, close, False, 0, loop, context_factory(), None, None, mod; super().__init__(d, '<stdin>', **({'local_exit': self.default_local_exit} if (H := S.hexversion) > 0x30d00a0 else {})); self.compile.compiler.flags |= 0x2000; d.update(__name__='__main__', __doc__='A console with top-level await support, much like the asyncio REPL, and some preloaded names.', __spec__=__spec__, __annotations__={}, __builtins__=__builtins__) 29 if H > 0x30e00a0: d['__annotate__'] = g('__annotate__') # cover: off 30 if H < 0x30f00a1: 31 for k in _: d[k] = g(k) 32 elif H < 0x30f00f0: 33 for k, v in _.items(): d[k] = getattr(__spec__, v) # cover: on 34 if callable(h := self.LOCALS_HANDLERS.get(modname)): h(d) 35 elif h is not None: raise TypeError(f'asyncutils.console.ConsoleBase: locals handler for module {modname!r} should be callable, not {fullname(h)!r}')
[docs] 36 def refresh(self): 37 if not ((F := self._fut) is None or F.done()): F.cancel()
38 def __callback(self, fut, code, /, *, makef=type(refresh), corocheck=iscoroutine, futchain=_chain_future): 39 try: c = makef(code, self.locals)() 40 except SystemExit as e: return self.set_return_code(e) 41 except BaseException as e: 42 if isinstance(e, KeyboardInterrupt): self.interrupt() 43 elif isinstance(e, MemoryError): self.memoryerror() 44 return fut.set_exception(e) 45 if not corocheck(c): return fut.set_result(c) 46 try: futchain(_ := self._loop.create_task(c, context=self.context), fut); self._fut = _ 47 except BaseException as e: fut.set_exception(e)
[docs] 48 def showtraceback(self): 49 if (t := S.exc_info())[2] is not None: self._showtraceback(*t, '')
[docs] 50 def runcode(self, code, *, futimpl=__import__('concurrent.futures._base', fromlist=_f).Future, dont_show_traceback=(KeyboardInterrupt, MemoryError, SyntaxError), threadsafe=True): 51 getattr(self._loop, 'call_soon_threadsafe' if threadsafe else 'call_soon')(self.__callback, F := futimpl(), code, context=self.context) 52 try: return F.result() 53 except SystemExit as e: self.set_return_code(e) 54 except BaseException as e: 55 if not isinstance(e, dont_show_traceback): self.showtraceback() 56 return getattr(self, 'STATEMENT_FAILED', None)
[docs] 57 def interact(self, banner=None, *, ps1='>>> ', _f=_f, _s=_s, _q=C.silent, _o=type('', (), {'write': lambda *_: None, 'flush': lambda _, /: None})(), p=g('PYTHONSTARTUP')): # noqa: B008 58 x = False; self.write_special(self.BANNER if banner is None else banner) 59 if p and not S.flags.ignore_environment: # pragma: no cover 60 with __import__('tokenize').open(p) as f: 61 if _q: S.stdout, _o = _o, S.stdout 62 S.audit('cpython.run_startup', p); exec(compile(f.read(), p, 'exec'), self.locals) # noqa: S102 63 if _q: S.stdout = _o 64 if (p := getattr(S, 'ps1', None)) is None: p, x = ps1, True 65 self._interact_hook(*((f'{(t := __import__('_colorize').get_theme().syntax).prompt}{p}{(r := t.reset)}'.lstrip(), t.keyword, r, t.builtin) if (c := self.CAN_USE_PYREPL) else (p, '', '', ''))) 66 try: __import__('_pyrepl.simple_interact', fromlist=_f).run_multiline_interactive_console(self) if c else super().interact('', '') 67 finally: self._loop.stop(_s); S.ps1 = ps1 if x else getattr(S, 'ps1', ps1) if p is None else p
[docs] 68 def _interact_hook(self, ps1, kcolor, reset, fcolor): n, S.ps1 = self.NAME, ps1; self.write_special(f'{ps1}{kcolor}import{reset} {n}\n{ps1}{kcolor}from{reset} {n} {kcolor}import{reset} *\n') # noqa: ARG002
[docs] 69 @abc.abstractmethod 70 def prehook(self, max_memerrs): self._max_memerrs, self._internal_is_running = 3 if max_memerrs is None else max_memerrs, True
[docs] 71 def posthook(self): self._internal_is_running = False
[docs] 72 def write_special(self, msg): self.write(msg)
[docs] 73 def interrupt(self, _=_f, m='\nKeyboardInterrupt\n'): 74 if not self.CAN_USE_PYREPL: self.write(m) 75 elif callable(f := getattr(__import__('_pyrepl.readline', fromlist=_)._get_reader().threading_hook, 'add', None)): f('') 76 self.refresh()
[docs] 77 def memoryerror(self): 78 if (m := self.memory_errors) == self._max_memerrs: return self.set_return_code(f'ERROR: Exceeded MemoryError threshold: {m}\n') 79 self.memory_errors = m+1 80 for _ in self.memerr_hooks: _(self) 81 self.refresh()
[docs] 82 def set_return_code(self, e, /, _=_s): self.exc = e if isinstance(e, SystemExit) else SystemExit(*(e.args if isinstance(e, BaseException) else (e,))); self._loop.stop(_)
[docs] 83 def __init_subclass__(cls, *, name=None, native_handler=None, default_local_exit=True, disallow_subclass_msg=None, other_handlers=None, additional_interrupt_hooks=(), additional_memerr_hooks=(), template=f'%(name)s REPL (version %(version)s) running on {S.platform}\nType "help", "copyright", "credits" or "license" for more information, "clear" to clear the terminal, and "exit" or "quit" to exit.\n%(description)s\n', **k): 84 if cls._unsubclassable: raise TypeError(cls.disallow_subclass_msg%fullname(cls)) 85 if name is None: name = cls.__qualname__.casefold().removesuffix('console') 86 if other_handlers is None: other_handlers = {} 87 k['name'] = cls.NAME = name; (f := k.setdefault)('version', 'unknown'); f('description', 'Enjoy!'); cls.BANNER, cls.LOCALS_HANDLERS, cls.interrupt_hooks, cls.memerr_hooks, cls.default_local_exit, cls._unsubclassable, other_handlers[name] = template%k, cls.LOCALS_HANDLERS.new_child(other_handlers), (*cls.interrupt_hooks, *additional_interrupt_hooks), (*cls.memerr_hooks, *additional_memerr_hooks), default_local_exit, disallow_subclass_msg is not None, native_handler 88 if disallow_subclass_msg: cls.disallow_subclass_msg = disallow_subclass_msg
89 def __repr__(self): return f'{fullname(self)}({self._loop!r}, local_exit={self.local_exit})' 90 @property 91 def is_running(self): return self._internal_is_running
[docs] 92 def run(self, *, exitmsg='Thank you for using %s!\nExiting REPL...\n', threadname='<%s REPL thread>', max_memerrs=None, always_run_interactive=bool(S.flags.inspect), always_install_completer=False, suppress_asyncio_warnings=False, suppress_unawaited_coroutine_warnings=False, _=frozenset(('win32', 'cygwin', 'android', 'ios', 'wasi'))): 93 self.prehook(max_memerrs); S.audit(f'{fullname(self)}.run', id(self)); l, w, n = self._loop, S.stderr.write, self.NAME 94 if always_run_interactive or S.stdin.isatty(): 95 S.audit('cpython.run_stdin'); __import__('threading').Thread(name=threadname%n, target=self.interact, daemon=True).start() 96 if callable(h := getattr(S, i := '__interactivehook__', None)): # pragma: no cover 97 S.audit('cpython.run_interactivehook', h) 98 try: h() 99 except: w(f'Error running {self!r}!\nFailed calling sys.__interactivehook__\n'); __import__('traceback').print_exc() # noqa: E722 100 if always_install_completer or (S.platform not in _ and h.__module__ == 'site' and h.__name__ == h.__qualname__ == 'register_readline'): 101 try: __import__('readline').set_completer(__import__('rlcompleter').Completer(self.locals).complete) # ty: ignore[possibly-missing-attribute] 102 except ImportError: w('Failed to install readline completer\n') 103 elif h is not None: w('Removing sys.__interactivehook__ since it is not callable\n'); delattr(S, i) 104 while True: 105 try: l.run_forever(); break 106 except KeyboardInterrupt: self.interrupt() 107 except MemoryError: self.memoryerror() 108 else: self.write_special(self.BANNER); self.runcode(compile((l := S.stdin).read(), getattr(l, 'name', '<stdin>'), 'exec')) 109 try: self.posthook() 110 except SystemExit: raise 111 except BaseException as e: w(f'{fullname(e)} occurred in posthook of {self!r}: {e}\n') 112 finally: 113 if suppress_asyncio_warnings: P.patch_aio_logs() 114 if suppress_unawaited_coroutine_warnings: P.patch_unawaited_coroutine_warnings() 115 self.write_special(exitmsg%n) 116 return self.retcode
117 @property 118 def retcode(self): return 0 if (e := self.exc) is None else e.code 119 P.patch_method_signatures((run, '*, exitmsg=None, threadname=None, max_memerrs=None, always_run_interactive=None, always_install_completer=False, suppress_asyncio_warnings=False, suppress_unawaited_coroutine_warnings=False'), (interrupt, ''), (set_return_code, 'e, /'), (__init__, 'loop, mod=None, modname=None, *, context_factory={}'), (__callback, 'fut, code, /, *, makef={0}, corocheck={0}, futchain={0}'), (interact, "banner=None, *, ps1='>>> '")); P.patch_classmethod_signatures((__init_subclass__, '*, name=None, native_handler=None, default_local_exit=True, disallow_subclass_msg=None, other_handlers=None, additional_interrupt_hooks=(), additional_memerr_hooks=(), template={}, version=None, description=None, **k'))
120def _(d, /): 121 def load_all(_=d): 122 for k, v in _.items(): _[k] = v if (g := getattr(v, 'load', None)) is None else g() 123 load_all.__qualname__, load_all.__module__, load_all.__text_signature__ = load_all.__name__, 'asyncutils', '()'; return load_all # ty: ignore[unresolved-attribute]
[docs] 124class AsyncUtilsConsole(ConsoleBase, version=V, description='asyncutils is a multi-purpose and efficient asynchronous utilties library.\nYou can use await statements directly instead of asyncio.run for quick testing.\nAll the submodules of asyncutils are also loaded into the namespace.', native_handler=lambda d, /, v=V, _=_f, r=_: (u := d.update)(m := __import__('asyncutils._internal.initialize', fromlist=_).s) or u(__version__=v, load_all=r(m)), default_local_exit=True, disallow_subclass_msg='cannot subclass %s; subclass asyncutils.console.ConsoleBase instead'): 125 def __repr__(self): return f'<{"running" if self.is_running else "idle"} asyncutils console at {id(self):#x}>' 126 @property 127 def is_running(self): 128 if not self._loop.is_running(): self._internal_is_running = False; return False 129 if self._internal_is_running == (b := R.getc() is self): return b 130 if b: self._internal_is_running = True 131 else: self.set_return_code(1) 132 S.stderr.write('User tampered with console-internal state!\n'); return False
[docs] 133 def _interact_hook(self, ps1, kcolor, reset, fcolor): 134 super()._interact_hook(ps1, kcolor, reset, fcolor) 135 if R.should_write_load_all(): self.write_special(f'{ps1}{fcolor}load_all{reset}()\n')
[docs] 136 def write_special(self, msg, _=C.silent): 137 if not _: self.write(msg)
[docs] 138 def prehook(self, max_memerrs, _=C.max_memerrs, _r='this console is already running', _a='another console is running'): 139 if self._internal_is_running: raise RuntimeError(_r) 140 if r := R.getc(): raise RuntimeError(_r if r is self else _a) 141 R.setc(self); super().prehook(_ if max_memerrs is None else max_memerrs) # ty: ignore[invalid-argument-type]
[docs] 142 def posthook(self, _m='WARNING: user tampered with asyncutils module state\n', _=C.pdb, _e=E.StateCorrupted('console-internal', "attribute 'exc' of console was set to a non-SystemExit exception")): 143 if R.unsetc() is not self: S.stderr.write(_m); del S.modules[__name__] 144 if _ and isinstance(e := self.exc, BaseException): 145 if not isinstance(e, SystemExit): raise _e 146 if (t := e.__traceback__) is None: raise e 147 __import__('pdb').post_mortem(t) 148 super().posthook()
[docs] 149 def showtraceback(self, s=3, a=('asyncutils\\console.py', 'asyncutils/console.py'), f=38, m=S.intern('__callback')): 150 t, v, b = S.exc_info() 151 if b is None: return 152 for _ in repeat(None, s): 153 if (b := b.tb_next) is None: return 154 if (c := b.tb_frame.f_code).co_filename.endswith(a) and c.co_firstlineno == f and c.co_name == m and (b := b.tb_next) is None: return 155 self._showtraceback(t, v, b, '')
156 P.patch_method_signatures((showtraceback, ''), (posthook, ''), (prehook, 'max_memerrs'), (write_special, 'msg'))
157del _f, _s, g, C, V, B, _, iscoroutine, E, log