asyncutils.locksmiths

Implementation of a base class for locksmiths, magical entities that can compel not intentionally uncooperative locks to be released while
limiting collateral damage and hindrance of the control flow of the program as much as possible and allowing customization of behaviour in
different steps regarding some locks.

Classes

ForceResult

The possible results of a force attempt.

LocksmithBase

Instances can attempt to force specific locks asynchronously, and run cleanup or emergency tasks under it as soon as possible; this is especially useful in deadlock scenarios.

RecognitionResult

The possible results of a recognition attempt.

Functions

succeeded(→ TypeGuard[Literal[ForceResult.SUCCESS, ...)

Return whether the given result is a successful one.

Module Contents

class asyncutils.locksmiths.ForceResult

Bases: enum.IntEnum

The possible results of a force attempt.

Initialize self. See help(type(self)) for accurate signature.

ALREADY_BEING_FORCED = 4
FAILURE = 5
NO_CURRENT_TASK = 2
OWNER_COMPLETED = 3
RELEASED = 8
RELEASED_WITH_FALSE = 6
SUCCESS = 7
UNFORCABLE = 1
class asyncutils.locksmiths.LocksmithBase(loop: asyncio.AbstractEventLoop | None = ..., ltyp: type[asyncutils._internal.types.AsyncLockLike[Any]] = ...)[source]

Instances can attempt to force specific locks asynchronously, and run cleanup or emergency tasks under it as soon as possible; this is especially useful in deadlock scenarios.

Initialize the locksmith. loop is the event loop to use, defaulting to the current running loop. ltyp is the type of locks that this locksmith will attempt to force, defaulting to asyncio.Lock.

async _wait_on[T](task: collections.abc.Awaitable[T], lock: asyncutils._internal.types.AsyncLockLike[Any], /) T[source]

Wait on the task and release the lock. Only called by host() when it successfully acquires the lock; not for public use.

async already_forcing(lock: asyncutils._internal.types.AsyncLockLike[Any], /) ForceResult[source]

Called when the locksmith attempts to force a lock but it is already being forced by another locksmith, which is detected when the task that owns the lock raises the exception thrown by force() instead of handling it. Its return value is returned by force().

async answer_received(lock: asyncutils._internal.types.AsyncLockLike[Any], answer: object, /) None[source]

Called when a task that was forced to release the lock responds to the exception thrown by force() by setting a value on it, which is detected by host(). answer is the value that the task set on the exception.

can_force_lock_held(lock: asyncutils._internal.types.AsyncLockLike[Any], /) bool[source]

Return whether the locksmith can force the lock, given that the internal lock is held. The default implementation allows forcing if the lock is recognized and not currently locked.

async eager_fallback(lock: asyncutils._internal.types.AsyncLockLike[Any], /) ForceResult[source]

Called when the locksmith attempts to force a lock but the owner task appears to have completed, since it has no coroutine. Its return value is returned by force().

find_owner(lock: asyncutils._internal.types.AsyncLockLike[Any], /) asyncio.Task[Any] | None[source]

Return the owner of the lock, if it can be found. The default implementation assumes that the _owner attribute of the lock, if present, points to the task that owns it or None.

async force(
lock: asyncutils._internal.types.AsyncLockLike[Any],
/,
info: object = ...,
*,
purge_waiters: bool = ...,
) ForceResult[source]
The main feature of the locksmith; that is, to try to force the lock lock.
This method cannot be overridden, because it already delegates lock-specific behaviour to overridable methods and handlers in its core logic.
info, if passed, should be an object representing the context of the force attempt, and will be passed to the exception thrown to the
task asking to release the lock.
async get_info(lock: asyncutils._internal.types.AsyncLockLike[Any], /) Any[source]

Return information about the lock that will be passed to the forcing request.

async host[T](
task: collections.abc.Awaitable[T],
lock: asyncutils._internal.types.AsyncLockLike[Any],
/,
*,
timeout1: float | None = ...,
timeout2: float | None = ...,
timeout3: float | None = ...,
) T[source]

Run task holding lock immediately after forcing it. The default values of the timeouts are taken from context.LOCKSMITH_BASE_DEFAULT_TIMEOUTS.

async lock_busy(lock: asyncutils._internal.types.AsyncLockLike[Any], requester: LocksmithBase, context: dict[str, Any], /) None[source]

Called when a LockForceRequest by a different locksmith propagates, meaning that another locksmith is trying to do the same thing. The context parameter is a dictionary that can be passed to extra of a log record, for example. The implementation is allowed to call lock_busy() on the requester in this method with a modified context, but care should be taken to avoid infinite recursion.

patch_owner(task: asyncio.Task[Any], lock: asyncutils._internal.types.AsyncLockLike[Any], /) None[source]

Change the owner of the lock to the given task. The default implementation sets the _owner attribute of the lock to the task, if it exists.

preliminary_check_lock(lock: asyncutils._internal.types.AsyncLockLike[Any], /) bool[source]

Return whether the lock passes preliminary checks for recognition. The default implementation checks whether the lock has the acquire(), release(), and locked() methods.

async purge_waiters(lock: asyncutils._internal.types.AsyncLockLike[Any], /) None[source]

Clear all waiters on the lock after a force attempt, if necessary. The default implementation assumes this data is attached to the lock as its _waiters attribute, which is a data structure that evaluates to whether it still has any items in a boolean context, with a pop() method.

async recognize_lock(lock: asyncutils._internal.types.AsyncLockLike[Any], /) RecognitionResult[source]

Recognize the given lock as one that this locksmith can handle.

classmethod register_handler[T: asyncutils._internal.types.AsyncLockLike[Any]](
h: collections.abc.Callable[[T], object],
/,
*,
shadow: bool = ...,
) collections.abc.Callable[[type[T]], type[T]][source]

A decorator factory for async lock classes, taking a handler function and returning an identity decorator.

async release_returned_false(lock: asyncutils._internal.types.AsyncLockLike[Any], /) ForceResult[source]

Called when the locksmith attempts to force a lock and its release method returns False, which is a common convention for indicating that the lock was not actually released. Its return value is returned by force().

task_raised_critical(lock: asyncutils._internal.types.AsyncLockLike[Any], exc: BaseException, /) Literal[ForceResult][source]

Called when a task raises a critical exception in response to a force request. The default implementation throws the exception wrapped in exceptions.Critical, rather than returning any value to force(). This is not async because it is imperative that the exception be dealt with quickly. Subclasses can choose to return ForceResult.FAILURE after some handling as well.

async task_raised_other(lock: asyncutils._internal.types.AsyncLockLike[Any], exc: BaseException, /) None[source]

Called when a task raises a non-critical exception in response to a force request. The default implementation logs the exception if it is not a RuntimeError, since those are commonly raised when a task is cancelled.

async task_reraised_request(lock: asyncutils._internal.types.AsyncLockLike[Any], /) None[source]

Called when a task that was forced to release the lock does not handle or reraises the exception.

async throw_fallback(lock: asyncutils._internal.types.AsyncLockLike[Any], /) ForceResult[source]

Called when the locksmith attempts to force a lock but there is no current task that owns it or it could not be found, and no task appears to be running. Its return value is returned by force().

wrap_task[T](aw: collections.abc.Awaitable[T], /) asyncio.Task[T][source]

Wrap the given awaitable in a task using the locksmith’s event loop.

property currently_recognized: frozenset[asyncutils._internal.types.AsyncLockLike[Any]]

Return a frozenset of locks that this locksmith currently recognizes.

handlers: ClassVar[dict[type[asyncutils._internal.types.AsyncLockLike[Any]], collections.abc.Callable[[asyncutils._internal.types.AsyncLockLike[Any]], Any]]]

A mapping of lock types to handler functions, which are callables that take a lock of the corresponding type and perform any lock-specific logic necessary for forcing it.

class asyncutils.locksmiths.RecognitionResult

Bases: enum.IntEnum

The possible results of a recognition attempt.

Initialize self. See help(type(self)) for accurate signature.

ALREADY_RECOGNIZED = 3
FAILED_ACK = 2
FAILED_PRELIM = 1
SUCCESS = 4
asyncutils.locksmiths.succeeded(
result: object,
/,
) TypeGuard[Literal[ForceResult.SUCCESS, ForceResult.RELEASED, RecognitionResult.ALREADY_RECOGNIZED, RecognitionResult.SUCCESS]]

Return whether the given result is a successful one.