Source code for asyncutils.constants

 1from asyncutils._internal import compat, patch as P
 2from asyncutils._internal.helpers import fullname
 3from asyncutils._internal.submodules import constants_all as __all__
 4RECIP_E, EXECUTORS_FROZENSET = 0.36787944117144233, frozenset(POSSIBLE_EXECUTORS := ('thread', 'process', 'interpreter', 'loky', 'loky_noreuse', 'dask', 'ipython', 'elib_flux_cluster', 'elib_flux_job', 'elib_slurm_cluster', 'elib_slurm_job', 'elib_single_node', 'pebble_thread', 'pebble_process', 'deadpool'))
[docs] 5class sentinel_base: 6 _can_instantiate = False; __slots__ = '__mod', '__name' 7 def __new__(cls, name=None, _=__import__('keyword').iskeyword, g=compat.f): 8 cls._assert_can_instantiate() 9 if name is None: return super().__new__(cls) 10 if _(name) or not all(p.isidentifier() and not _(p) for p in name.split('.', 1)): raise ValueError('asyncutils.constants.sentinel_base: invalid name') 11 if (m := g(1)) is not None: name = f'{m}.{name}' 12 if (o := (c := cls._cache).get(name)) is None: 13 (o := super().__new__(cls)).__name, o.__mod = name, m 14 with cls._lock: c[name] = o 15 return o 16 @property 17 def name(self): return self.__name 18 @property 19 def module(self): return self.__mod # ty: ignore[unresolved-attribute] # pragma: no cover 20 @classmethod 21 def _assert_can_instantiate(cls): 22 if not cls._can_instantiate: raise TypeError(f'cannot instantiate {fullname(cls)!r}') 23 def __repr__(self): return f'<{fullname(self)} {self.__name!r} at {id(self):#x}>' 24 def __str__(self): return getattr(self, '_sentinel_base__name', '<unbound>')+(' <private>'*self.is_private)
[docs] 25 def __set_name__(self, owner, name, /, _='NOTE: The following is not allowed:\nclass {0}:\n{1} = {2}({3!r})\n...\ninstead, use:\nclass {0}:\n{1} = {2}()\n'.format): 26 N = f'{fullname(owner)}.{name}'.replace('<locals>.', '').replace('<lambda>.', '') 27 with self._lock: 28 if (n := getattr(self, '_sentinel_base__name', None)) is None: 29 if not self.is_(self._cache.setdefault(N, self)): raise NameError(f'{fullname(self)} name collision', name=N) 30 self.__name = N 31 elif n == N and self._cache.get(N) is self: # pragma: no cover 32 if b := self.bound_to: __import__('sys').stderr.write(_(b.rpartition('.')[-1], self.back, fullname(self), N)) 33 else: raise NameError(f'cannot bind named unbound {fullname(self)} to class {fullname(owner)!r}', name=N) 34 else: raise NameError(f'cannot bind named {fullname(self)} to class {fullname(owner)!r}', name=N)
[docs] 35 def __reduce__(self): 36 if (n := getattr(self, '_sentinel_base__name', None)) is None: raise TypeError(f'cannot pickle unbound instance of {fullname(self)}') 37 return type(self), (n,)
[docs] 38 def __init_subclass__(cls, *, lock_impl=__import__('_thread').allocate_lock): 39 if cls.__dict__.get('__slots__', True): raise TypeError('asyncutils.constants.sentinel_base: sentinel classes should have empty __slots__') 40 cls._cache, cls._lock, cls._can_instantiate = {}, lock_impl(), True
41 @property 42 def is_private(self): return (self.back or '').startswith('_') 43 @property 44 def bound_to(self): return getattr(self, '_sentinel_base__name', '').rpartition('.')[0] or None 45 @property 46 def back(self): return getattr(self, '_sentinel_base__name', '').rpartition('.')[2] or None
[docs] 47 def is_(self, o, /): return self is o
48 P.patch_classmethod_signatures((__new__, 'name=None'), (__init_subclass__, 'lock_impl={}')); P.patch_method_signatures((__set_name__, 'owner, name, /'))
49class _sentinel(sentinel_base): 50 __slots__ = () 51 def __init_subclass__(cls, /, **_): raise TypeError('cannot subclass the type of asyncutils-internal sentinels') 52 def __reduce__(self): return self.name 53_NO_DEFAULT, RAISE = map(_sentinel, ('_NO_DEFAULT', 'RAISE')) 54_sentinel._can_instantiate = False 55del _sentinel, P