Source code for asyncutils.config

  1from asyncutils._internal import log as l, patch as P, initialize as I
  2from asyncutils._internal.submodules import config_all as __all__
  3from asyncutils._internal.unparsed import N, c
  4import logging as L, sys as S
  5def __dir__(_=__all__): return _
  6class FaultyConfig(BaseException):
  7    def __init__(self, k, w, c, /): self.key, self.wrong, self.correct = k, w, c; super().__init__(f'asyncutils: configuration for key {k!r} from AUTILSCFGPATH is faulty: expected {", ".join(c) if isinstance(c, tuple) else c.__name__}, got {w.__name__}')
  8def f(e, _=__import__('_functools').partial(__import__, fromlist=('',)), f=frozenset(('thread', 'process', 'interpreter')), c='.', S=S): # noqa: B008,PLR0911,PLR0912 # pragma: no cover
  9    if not isinstance(e, str): raise TypeError('asyncutils: executor name should be a string')
 10    d, c, w = e.rpartition(c)
 11    if c:
 12        try: return getattr(_(d), w)
 13        except ImportError: ...
 14    else:
 15        d = 'loky'
 16        if e in f: return getattr(_('concurrent.futures.'+e), e.title()+'PoolExecutor')
 17        if e == 'dask':
 18            try: return _('distributed.client').Client
 19            except ImportError: d = 'dask.distributed'
 20        elif e == 'deadpool':
 21            try: return __import__(e).DeadPool
 22            except ImportError: ...
 23        elif e == 'loky_noreuse':
 24            try: return _('loky.process_executor').ProcessPoolExecutor
 25            except ImportError: ...
 26        elif e == d:
 27            try: (r := _('loky.reusable_executor')._ReusablePoolExecutor).__new__ = lambda cls, *a, **k: cls.get_reusable_executor(*a, **k)[0]; return r
 28            except ImportError: ...
 29        elif e == 'ipython': return _('ipyparallel.client.view').ViewExecutor
 30        else:
 31            d, *a = e.split('_')
 32            if d == 'elib':
 33                if S.version_info >= (3, 14): __import__('_warnings').warn('executorlib is not yet compatible with Python 3.14 and above', ImportWarning, 2)
 34                try: a, e = a; return getattr(_('executorlib.executor.'+a), f'{a.title()}{e.title()}Executor')
 35                except ImportError: d = 'executorlib'
 36            elif d == 'pebble':
 37                try: a = a[0]; return getattr(_('pebble.pool.'+a), a.title()+'Pool')
 38                except ImportError: ...
 39            else: raise FaultyConfig('executor', e, _('asyncutils.constants').POSSIBLE_EXECUTORS)
 40    S.stderr.write(f'Error importing {d} (maybe not installed); falling back to ThreadPoolExecutor\n'); return _('concurrent.futures.thread').ThreadPoolExecutor
 41def k(e, a=False, N=N):
 42    if isinstance(x := N[e], str):
 43        try: return int(x, 0)
 44        except ValueError:
 45            if a: raise FaultyConfig(e, str, int)
 46    return x
 47def g(e, a=False, t=(str, int, bytes), _=k):
 48    if isinstance(x := _(e), str) and ((x.startswith("b'") and x.endswith("'")) or (x.startswith('b"') and x.endswith('"'))):
 49        try: x = x.encode()[2:-1] # noqa: SIM105
 50        except UnicodeEncodeError: ...
 51    if isinstance(x, t) or (a and (x is None or isinstance(x, float))): return x
 52    raise FaultyConfig(e, type(x), t)
 53max_memerrs, e, Executor, get_past_logs, m, M, o, s, _ = k('max_memerrs'), g('seed', True), f(N.executor), str, 'a', False, __import__('os').name == 'posix', S.stderr, L.StreamHandler()
 54silent, basic_repl, loaded_all, pdb = map(N.__getitem__, ('quiet', 'basic_repl', 'load_all', 'pdb'))
 55match logging_to := g('log_to'):
 56    case 'NULL': l.disabled = True
 57    case 'MAKE':
 58        T, m, h = 'asyncutils_log%d.log', 'x', 0
 59        for h in range(1, 0x1001):
 60            try: logging_to = (s := open(T%h, m)).name; break
 61            except PermissionError as M: s.write(f'ERROR: insufficient permissions: {M}\n'); M = True; break
 62            except AttributeError: raise SystemError("Python opened a file with no 'name' attribute") from None
 63            except Exception: ...
 64        else: M = True
 65        del T, h
 66    case 'MEMORY':
 67        from _io import StringIO as J
 68        def get_past_logs(j=J, _=_):
 69            if r := _.stream.getvalue(): _.setStream(j()) # ty: ignore[unresolved-attribute]
 70            return r
 71        get_past_logs.__text_signature__, s = '()', J(); del J # ty: ignore[unresolved-attribute]
 72    case 'STDOUT': s = S.stdout
 73    case 'STDERR': ...
 74    case 1 if o: s, logging_to = S.stdout, 'STDOUT'
 75    case 2 if o: logging_to = 'STDERR'
 76    case str()|int()|bytes():
 77        try: M = True; logging_to = getattr(s := open(logging_to, m), 'name', logging_to); M = False
 78        except PermissionError as b: s.write(f'ERROR: insufficient permissions: {b}\n')
 79        except OSError as b: s.write(f'ERROR: {b}\n')
 80        except Exception as b: s.write(f'ERROR: unexpected error opening log file: {b}\n')
 81if M: s.write('ERROR: Failed to create log file; falling back to stderr\n')
 82l.addHandler(_)
 83_.setStream(s)
 84_.setFormatter(L.Formatter('%(asctime)s - asyncutils - %(levelname)s - %(message)s'))
 85(set_logger_level := lambda level, h=_, l=l: l.setLevel(level) or h.setLevel(level))(10*min(max(3-N.V+N.Q, 1), 5))
[docs] 86class debugging: 87 __slots__ = 'orig_level', 'orig_name' 88 @property 89 def level(self, _=l): return _.level # noqa: PLR0206 90 @property 91 def entered(self): return self.orig_level is not None 92 def __init__(self): self.orig_level = self.orig_name = None
[docs] 93 def __enter__(self, _s=set_logger_level, _m=L._levelToName.__getitem__, _l=l, _d=10): 94 if self.entered: _l.warning('config.debugging: context manager already entered') 95 else: 96 self.orig_name, self.orig_level = _m(l := _l.level), l; _s(_d) 97 if l != _d: _l.debug('config.debugging: debug mode entered') 98 return self
[docs] 99 def __exit__(self, /, *_, _s=set_logger_level, _l=l, _L=10): 100 if not self.entered: return _l.warning('config.debugging: context manager not entered') 101 if (l := self.orig_level) != _l.level == _L: _l.debug('config.debugging: exiting debug mode'); _s(l) 102 else: _l.warning('config.debugging: user already exited debug mode; original level was %s', self.orig_name) 103 self.orig_name = self.orig_level = None
104 def __repr__(self): return f'<asyncutils debug mode context manager (entered: {self.entered}) at {id(self):#x}>' 105 P.patch_method_signatures((__enter__, ''), (__exit__, P.xsig))
106debug = debugging() 107I.l = d = l.debug # ty: ignore[invalid-assignment] 108if N.debug: 109 debug.__enter__(); d('python %s', S.version) 110 if silent: from asyncutils import __version__ as V; d(V.representation); d('platform: %s', S.platform) 111 if c: d('config file path: %s', c) 112__import__('atexit').register(lambda s=s, _=d: None if s.closed else _('bye') or s.flush() or s.close()) 113p = (A := I.A).pop 114while A: d(*p()) 115def r(n, /): raise AttributeError(f"module 'asyncutils.config' has no attribute {n!r}") 116def __getattr__(n, /, _=e, r=r): 117 if n != '_randinst': r(n) 118 global _randinst; _randinst, __getattr__.__code__ = __import__('random').Random(_), r.__code__; return _randinst # ty: ignore[unresolved-global] 119P.patch_function_signatures((__getattr__, 'name, /'), (set_logger_level, 'level'), (__dir__, '')) 120if loaded_all: 121 i = I.Module.load 122 for _ in I.s.values(): i(_) # ty: ignore[invalid-argument-type] 123 l.debug('all submodules loaded in %.2f milliseconds', __import__('asyncutils').time_since_boot()) 124del _, e, L, M, N, S, f, m, r, s, o, P, g, k, l, c, d, FaultyConfig, I