asyncutils.io

Provides asynchronous file-like interfaces to the following: coupled reader and writer, write-one-end-and-read-the-other pipes, and memory maps.
Does not depend on aiofiles or any such library, using executors as determined by the module configuration.
This library is not designed to do I/O operations, and the functionality in this submodule is far from comprehensive.
See aiostream or similar for that.

Attributes

stdcoup

Instance of AsyncReadWriteCouple wrapping standard input and output.

Classes

AsyncReadWriteCouple

MemoryMappedIOManager

An asynchronous object-oriented manager interface to memory-mapped I/O, that optimizes batch operations using an event loop.

Functions

ainput(→ str)

Asynchronously write prompt to standard output and read a line from standard input, returning it without the trailing newline. If assert_tty is True, raise OSError if standard input is not a TTY (You need only pass this the first time you call the function).

double_ended_binary_pipe(...)

The above, but in binary mode.

double_ended_text_pipe(...)

Module Contents

class asyncutils.io.AsyncReadWriteCouple[T: (str, bytes), R: (str, bytes)](
reader: asyncutils._internal.types.Reader[T],
writer: asyncutils._internal.types.Writer[R],
/,
executor: asyncutils.config.Executor | None = ...,
*,
find_attr_on_writer_first: bool = ...,
)[source]

Bases: asyncutils.mixins.LoopContextMixin

An asynchronous file-like interface to a readable and writable object.
Delegates its methods to the underlying reader and writer, but achieves non-blockingness by running in an executor
Preferrably, the executor should be a thread pool, but that is ultimately determined by the library configuration.

Warning

This class is not designed to wrap asyncio streams because their methods are different and already async.

See also

io.BufferedRWPair

The standard library synchronous equivalent. Some of its methods may not be present here.

Initialize the couple with the given reader, writer and optionally a PEP 3148 executor to call the file methods in, after checking that the reader is readable and the writer is writable.

async __cleanup__() None
__getattr__(name: str, /) Any[source]

Search for the attribute on the writer first if find_attr_on_writer_first=True was passed and the reader otherwise.

async aclose() None[source]

Close the reader and writer asynchronously and shut down the underlying executor. It is safe to close a file multiple times, but no other methods should be called after closing.

fileno() NoReturn[source]

Raise OSError with errno EBADF.

async flush() None[source]

Asynchronously flush the writer.

isatty() NoReturn[source]

Raise OSError with errno ENOTSUP.

async read(n: int = ..., /) T[source]

Read n characters from the reader.

async read1(n: int = ..., /) T[source]

Call the read1() method on the reader.

readable() Literal[True]

Return True, because prior verification has been done that the reader is readable, and that is assumed not to be invalidated.

async readall() T[source]

Read all characters from the reader until EOF. If the readall() method is not present, call read() with no arguments without handling the non-blocking case.

async readinto(b: collections.abc.Buffer, /) int[source]
Read into the writable bytes-like object b from the reader, returning the number of bytes read.
Calls the readinto() method of the reader if it exists and falls back to the read() method.
The case where the underlying implementation of read() returns None or raises BlockingIOError is not considered.
async readinto1(b: collections.abc.Buffer, /) int[source]

Call the readinto1() method on the reader. There is no fallback implementation.

async readline(limit: int = ..., /) T[source]

Read a line, of length at most limit, from the reader.

async readlines(hint: int = ..., /) list[T][source]

Collect lines of the file into a list until at least hint characters are read (if available), and a line boundary is encountered.

seek(offset: int, whence: int = ..., /) NoReturn[source]

Raise OSError with errno ESPIPE.

seekable() Literal[False][source]

The couple itself is not seekable, but the reader and writer may be..

tell() NoReturn[source]

Raise OSError with errno ESPIPE.

async truncate(size: int | None = ..., /) int[source]

Truncate the file at size (or the current position if not passed), and return the new file size.

writable() Literal[True]

Return True, because prior verification has been done that the writer is writable, and that is assumed not to be invalidated.

async write(s: R, /) int[source]

Write s into the writer, returning the number of characters written.

async writelines(lines: collections.abc.Iterable[R], /) None[source]

Write the lines from the iterable into the writer without adding newline as separators.

property closed: bool

Whether both the reader and writer have been closed.

property executor: asyncutils.config.Executor

The underlying executor.

property reader: asyncutils._internal.types.Reader[T]

The underlying reader.

property writer: asyncutils._internal.types.Writer[R]

The underlying writer.

class asyncutils.io.MemoryMappedIOManager(executor: asyncutils.config.Executor | None = ...)[source]

Bases: asyncutils.mixins.LoopContextMixin

An asynchronous object-oriented manager interface to memory-mapped I/O, that optimizes batch operations using an event loop.

Tip

You will probably only ever need one instance of this.

Initialize the manager with the executor to be used for its operations.

async __cleanup__() None[source]
__del__() None[source]
async approx_memory_usage() int[source]

Compute the approximate memory used by the currently open memory maps in bytes.

async bulk_checksum[O: asyncutils._internal.types.Openable](paths: collections.abc.Iterable[O], alg: asyncutils._internal.types.HashAlgorithm = ...) dict[O, str][source]

Compute checksums from the files at paths using the specified algorithm, defaulting to context.MEMORY_MAPPED_IO_MANAGER_DEFAULT_CHECKSUM_ALG. The same remarks from checksum() apply here.

async bulk_copy(
pairs: collections.abc.Iterable[tuple[asyncutils._internal.types.Openable, asyncutils._internal.types.Openable]],
) None[source]

Copy the contents of each file in the first position of the tuples in pairs into the file in the second position asynchronously. If you have a dictionary, pass in its items view.

async bulk_read[O: asyncutils._internal.types.Openable](file_offsets: collections.abc.Mapping[O, collections.abc.Iterable[tuple[int, int]]]) dict[O, list[bytes]][source]

Read from each file at their corresponding offsets as specified by the value in the file_offsets mapping, which should be an iterable of tuples (offset, size) or None if the whole file is to be read.

async bulk_resize(sizes: collections.abc.Mapping[asyncutils._internal.types.Openable, int]) None[source]

Resize each file at the keys of sizes to the corresponding value asynchronously.

async bulk_write(
file_data: collections.abc.Mapping[asyncutils._internal.types.Openable, collections.abc.Iterable[tuple[bytes, int]]],
) None[source]

Write into each file at their corresponding offsets as specified by the value in the file_data mapping, which should be an iterable of tuples (data, offset).

async checksum(path: asyncutils._internal.types.Openable, alg: asyncutils._internal.types.HashAlgorithm = ...) str[source]

Compute a checksum from the file at path using the specified algorithm (default context.MEMORY_MAPPED_IO_MANAGER_DEFAULT_CHECKSUM_ALG).

Changed in version 0.9.8: Started passing usedforsecurity=False to the hashlib.new() constructor to bypass FIPS restrictions, such that broken algorithms like MD5 are always allowed if explicitly requested, seeing as though it is fast and enough to prevent accidental file corruption in simple cases.

Changed in version 0.9.8: Offloaded checksum computation to the executor as well.

Caution

If executing cryptographic checksums, use the factory default BLAKE2s (32-bit) or BLAKE2b (64-bit), SHA256, or similar.

Danger

Calling without the alg parameter may cause a faulty algorithm to be chosen if the value in the current context dictates so.

See also

hashlib.algorithms_available

For the set of supported algorithms on your system and installation.

The relevant FIPS specification

For the security requirements of the algorithms.

The Wikipedia article

For basic information pertaining to checksums and their use cases.

async compact_files(paths: collections.abc.Iterable[asyncutils._internal.types.Openable]) None[source]

Reduce the size of each file at paths by truncating the trailing null bytes asynchronously.

async copy_file(
srcp: asyncutils._internal.types.Openable,
destp: asyncutils._internal.types.Openable,
*,
flush: bool = ...,
) None[source]

Copy the contents of the file at srcp into that at destp asynchronously and flush it if flush is True. Uses open() and create() internally.

create(
path: asyncutils._internal.types.Openable,
init_size: int = ...,
*,
exclusive: bool = ...,
) asyncutils._internal.types.OpenRV[source]

An async context manager that opens a file at path for memory-mapped writing and reading on entry and closes it on exit. The file is truncated to the beginning, and must not exist if exclusive is True (the default behaviour).

create_sparsef(
path: asyncutils._internal.types.Openable,
total_size: int,
chunks: collections.abc.Mapping[int, bytes | str],
) asyncutils._internal.types.OpenRV[source]

An async context manager that creates a file of size total_size at path, with chunks mapping offsets to the data to be written there. Data from old chunks is overwritten by that from new ones.

async find_in_files[O: asyncutils._internal.types.Openable](
pattern: bytes,
paths: collections.abc.Mapping[O, int],
max_per_file: int = ...,
*,
allow_overlapping: bool = ...,
) dict[O, list[int]][source]

Find all occurrences of pattern in the files at paths, returning a mapping from each file to a list of offsets where the pattern was found.

open(path: asyncutils._internal.types.Openable, init_size: int = ...) asyncutils._internal.types.OpenRV[source]

An async context manager that opens a file at path for memory-mapped reading and writing on entry and closes it on exit. The file must exist and is not truncated.

prefetch_files(
*paths: asyncutils._internal.types.Openable,
init_size: int = ...,
) contextlib.AbstractAsyncContextManager[list[asyncutils._internal.types.MemoryMappedFile], None][source]

Prefetch existing files at paths for memory-mapped I/O into memory at once, closing them simultaneously on exit.

property currently_open: int

Number of currently open memory maps.

property open_files: asyncutils._internal.types.OpenFiles

Dictionary mapping tuples of the form (file, mode) to the managed file objects.

property open_mmaps: weakref.WeakSet[mmap.mmap]

Instance of WeakSet containing the maps managed by this manager.

property open_paths: dict[asyncutils._internal.types.Openable, Literal['r+b', 'w+b', 'x+b']]

Dictionary mapping file paths as passed to open(), create() or create_sparsef() to the mode the file was opened with.

async asyncutils.io.ainput(prompt: str = ..., assert_tty: bool = ...) str

Asynchronously write prompt to standard output and read a line from standard input, returning it without the trailing newline. If assert_tty is True, raise OSError if standard input is not a TTY (You need only pass this the first time you call the function).

asyncutils.io.double_ended_binary_pipe(
*,
pipe_impl: collections.abc.Callable[[], tuple[int, int]] = ...,
) tuple[AsyncReadWriteCouple[bytes, bytes], AsyncReadWriteCouple[bytes, bytes]]

The above, but in binary mode.

asyncutils.io.double_ended_text_pipe(
*,
pipe_impl: collections.abc.Callable[[], tuple[int, int]] = ...,
) tuple[AsyncReadWriteCouple[str, str], AsyncReadWriteCouple[str, str]]
Return a tuple of two AsyncReadWriteCouple’s in text mode, such that each can read what the other writes.
Two operating system level pipes are created.
Pass a function that returns a tuple of two integer file descriptors for pipe_impl (default os.pipe()) to customize this behaviour.
asyncutils.io.stdcoup: AsyncReadWriteCouple[str, str]

Instance of AsyncReadWriteCouple wrapping standard input and output.