1from asyncutils._internal import compat, patch as P
2from asyncutils._internal.helpers import fullname
3from asyncutils._internal.submodules import constants_all as __all__
4RECIP_E, EXECUTORS_FROZENSET = 0.36787944117144233, frozenset(POSSIBLE_EXECUTORS := ('thread', 'process', 'interpreter', 'loky', 'loky_noreuse', 'dask', 'ipython', 'elib_flux_cluster', 'elib_flux_job', 'elib_slurm_cluster', 'elib_slurm_job', 'elib_single_node', 'pebble_thread', 'pebble_process', 'deadpool'))
[docs]
5class sentinel_base:
6 _can_instantiate = False; __slots__ = '__mod', '__name'
7 def __new__(cls, name=None, _=__import__('keyword').iskeyword, g=compat.f):
8 cls._assert_can_instantiate()
9 if name is None: return super().__new__(cls)
10 if _(name) or not all(p.isidentifier() and not _(p) for p in name.split('.', 1)): raise ValueError('asyncutils.constants.sentinel_base: invalid name')
11 if (m := g(1)) is not None: name = f'{m}.{name}'
12 if (o := (c := cls._cache).get(name)) is None:
13 (o := super().__new__(cls)).__name, o.__mod = name, m
14 with cls._lock: c[name] = o
15 return o
16 @property
17 def name(self): return self.__name
18 @property
19 def module(self): return self.__mod # ty: ignore[unresolved-attribute] # pragma: no cover
20 @classmethod
21 def _assert_can_instantiate(cls):
22 if not cls._can_instantiate: raise TypeError(f'cannot instantiate {fullname(cls)!r}')
23 def __repr__(self): return f'<{fullname(self)} {self.__name!r} at {id(self):#x}>'
24 def __str__(self): return getattr(self, '_sentinel_base__name', '<unbound>')+(' <private>'*self.is_private)
[docs]
25 def __set_name__(self, owner, name, /, _='NOTE: The following is not allowed:\nclass {0}:\n{1} = {2}({3!r})\n...\ninstead, use:\nclass {0}:\n{1} = {2}()\n'.format):
26 N = f'{fullname(owner)}.{name}'.replace('<locals>.', '').replace('<lambda>.', '')
27 with self._lock:
28 if (n := getattr(self, '_sentinel_base__name', None)) is None:
29 if not self.is_(self._cache.setdefault(N, self)): raise NameError(f'{fullname(self)} name collision', name=N)
30 self.__name = N
31 elif n == N and self._cache.get(N) is self: # pragma: no cover
32 if b := self.bound_to: __import__('sys').stderr.write(_(b.rpartition('.')[-1], self.back, fullname(self), N))
33 else: raise NameError(f'cannot bind named unbound {fullname(self)} to class {fullname(owner)!r}', name=N)
34 else: raise NameError(f'cannot bind named {fullname(self)} to class {fullname(owner)!r}', name=N)
[docs]
35 def __reduce__(self):
36 if (n := getattr(self, '_sentinel_base__name', None)) is None: raise TypeError(f'cannot pickle unbound instance of {fullname(self)}')
37 return type(self), (n,)
[docs]
38 def __init_subclass__(cls, *, lock_impl=__import__('_thread').allocate_lock):
39 if cls.__dict__.get('__slots__', True): raise TypeError('asyncutils.constants.sentinel_base: sentinel classes should have empty __slots__')
40 cls._cache, cls._lock, cls._can_instantiate = {}, lock_impl(), True
41 @property
42 def is_private(self): return (self.back or '').startswith('_')
43 @property
44 def bound_to(self): return getattr(self, '_sentinel_base__name', '').rpartition('.')[0] or None
45 @property
46 def back(self): return getattr(self, '_sentinel_base__name', '').rpartition('.')[2] or None
[docs]
47 def is_(self, o, /): return self is o
48 P.patch_classmethod_signatures((__new__, 'name=None'), (__init_subclass__, 'lock_impl={}')); P.patch_method_signatures((__set_name__, 'owner, name, /'))
49class _sentinel(sentinel_base):
50 __slots__ = ()
51 def __init_subclass__(cls, /, **_): raise TypeError('cannot subclass the type of asyncutils-internal sentinels')
52 def __reduce__(self): return self.name
53_NO_DEFAULT, RAISE = map(_sentinel, ('_NO_DEFAULT', 'RAISE'))
54_sentinel._can_instantiate = False
55del _sentinel, P