Source code for asyncutils.io

  1# ty: ignore[unresolved-attribute]
  2from asyncutils._internal import helpers as H, patch as P
  3from asyncutils._internal.submodules import io_all as __all__
  4import asyncutils as A, errno as E, os as O, sys as S
  5from _functools import partial
  6from asyncio import Lock, gather
  7from contextlib import asynccontextmanager
  8from itertools import count, starmap
  9from mmap import mmap
 10def f(a, b, f=S.audit, _=O.pipe, /):
 11    def double_ended_pipe(*, pipe_impl=_, x=partial(open, mode=a), y=partial(open, mode=b), _=f): r, W, R, w = *pipe_impl(), *pipe_impl(); _(f'asyncutils.io.double_ended_{"text" if a == "r" else "binary"}_pipe', r, w, R, W); return tuple(map(AsyncReadWriteCouple, map(x, (r, R)), map(y, (w, W))))
 12    return double_ended_pipe
 13_, s = lambda s=None, /, **d: {k: v for k, v in d.items() if v is not s}, '*, pipe_impl={}'
 14double_ended_text_pipe, double_ended_binary_pipe = t = tuple(map(f, ('r', 'rb'), ('w', 'wb')))
 15P.patch_function_signatures(*((_, s) for _ in t))
[docs] 16@H.subscriptable 17class AsyncReadWriteCouple(A.LoopContextMixin): 18 __slots__ = '_aso', '_executor', 'reader', 'writer'; executor, AMBIGUOUS_CALL_TIP = None, 'tip: delegate to reader or writer as appropriate' 19 def __init__(self, r, w, /, executor=None, _=H.create_executor, *, find_attr_on_writer_first=False): 20 if not r.readable(): raise TypeError(f'asyncutils.io.AsyncReadWriteCouple: reader {r!r} is not readable') 21 if not w.writable(): raise TypeError(f'asyncutils.io.AsyncReadWriteCouple: writer {w!r} is not writable') 22 super().__init__(); self._executor, self.reader, self.writer, self._aso = _(t) if executor is None is (executor := (t := type(self)).executor) else executor, r, w, (w, r) if find_attr_on_writer_first else (r, w) 23 async def _run(self, f, *a): return await self.loop.run_in_executor(self._executor, f, *a)
[docs] 24 def read(self, n=-1, /): return self._run(self.reader.read, n)
[docs] 25 def read1(self, n=-1, /): return self._run(self.reader.read1, n)
[docs] 26 def readall(self): return self._run(r.read if (f := getattr(r := self.reader, 'readall', None)) is None else f)
[docs] 27 async def readinto(self, b, /): 28 if (f := getattr(r := self.reader, 'readinto', None)) is not None: return await self._run(f, b) 29 if (m := memoryview(b)).readonly: raise TypeError('asyncutils.io.AsyncReadWriteCouple: cannot read into a read-only buffer') 30 if l := len(d := await self._run(r.read, m.nbytes)): m[:l] = d 31 return l
[docs] 32 def readinto1(self, b, /): return self._run(self.reader.readinto1, b)
[docs] 33 def readline(self, limit=-1, /): return self._run(self.reader.readline, limit)
[docs] 34 def readlines(self, hint=-1, /): return self._run(self.reader.readlines, hint)
[docs] 35 def write(self, s, /): return self._run(self.writer.write, s)
[docs] 36 def writelines(self, lines, /): return self._run(self.writer.writelines, lines)
[docs] 37 def fileno(self): A.raise_exc(OSError, E.EBADF, 'asyncutils.io.AsyncReadWriteCouple: ambiguous file descriptor query', notes=self.AMBIGUOUS_CALL_TIP)
[docs] 38 def isatty(self): A.raise_exc(OSError, E.ENOTSUP, 'asyncutils.io.AsyncReadWriteCouple: ambiguous isatty query', notes=self.AMBIGUOUS_CALL_TIP)
39 readable = writable = lambda _, /: True
[docs] 40 def flush(self): return self._run(self.writer.flush)
[docs] 41 def seekable(self): return False
[docs] 42 def seek(self, offset, whence=0, /): A.raise_exc(OSError, E.ESPIPE, 'asyncutils.io.AsyncReadWriteCouple: cannot use seek on read-write couple', notes=self.AMBIGUOUS_CALL_TIP) # noqa: ARG002
[docs] 43 def tell(self): A.raise_exc(OSError, E.ESPIPE, 'asyncutils.io.AsyncReadWriteCouple: tell is not supported', notes=self.AMBIGUOUS_CALL_TIP)
[docs] 44 def truncate(self, size=None, /): return self._run(self.writer.truncate, size)
[docs] 45 async def aclose(self): await gather(*map(self._run, (self.reader.close, self.writer.close))); self._executor.shutdown()
46 __cleanup__ = aclose 47 @property 48 def closed(self): return self.reader.closed and self.writer.closed
[docs] 49 def __getattr__(self, n, /): 50 f = (a := []).append 51 for _ in self._aso: 52 try: return getattr(_, n) 53 except AttributeError as e: f(e) 54 raise ExceptionGroup(f'asyncutils.io.AsyncReadWriteCouple: did not find attribute {n!r}', a) from None
55class File(A.LoopContextMixin): # noqa: PLR0904 56 __slots__ = '_f', '_fn', '_mmap' 57 if S.platform != 'win32': 58 def madvise(self, option, start=0, length=None, _=H.filter_out): return self.mmap.madvise(option, start, *_(length)) 59 def read(self, offset=0, size=-1): return self.run(self._read, offset, size) 60 def write(self, data, offset=0): return self.run(self._write, data, offset) 61 async def readline(self, offset=0, size=None, incl_newline=False): return (await self.run(self._readline, offset, size, incl_newline))[0] 62 async def readlines(self, hint=-1): return list(await self.run(self._readlines, hint)) 63 async def flush(self, offset=0, size=None, /): return await self.run(self._flush, offset, size) 64 def move(self, dest, src, count): return self.run(self._mmap.move, dest, src, count) 65 async def __setup__(self): self._mmap = m = mmap(self._fn, 0, access=2).__enter__(); self.mgr.add(m) 66 async def __cleanup__(self): await self.aclose(); self.mgr.discard(self._mmap) 67 def seek(self, pos, whence=0): return self.run(self._mmap.seek, pos, whence) 68 def __new__(cls, file, /): 69 if (r := (f := cls.open_files.get)((file, 'r+b'))) is None is (r := f((file, 'w+b'))) is (r := f((file, 'x+b'))): (r := super().__new__(cls))._f, r._fn = file, file.fileno() 70 return r 71 def __iter__(self): return self._f.__iter__() 72 def __aiter__(self): return A.iter_to_agen(self._f) 73 def __del__(self): self.make(self.aclose()) 74 @property 75 def closed(self): return self._f.closed 76 def fileno(self): return self._fn 77 def sync(self, _=O.fsync): self._flush(0, None); _(self._fn) 78 async def aclose(self): await gather(*map(self.run, (self._mmap.close, self._f.close))) 79 def close(self): self._mmap.close(); self._f.close() 80 def read_byte(self): return self._mmap.read_byte() 81 def write_byte(self, b, /): self._mmap.write_byte(b) 82 def resize(self, newsize): self._mmap.resize(newsize) 83 def find(self, sub, start=None, end=None, _=_): return self._mmap.find(sub, **_(start=start, end=end)) 84 def rfind(self, sub, start=None, end=None, _=_): return self._mmap.rfind(sub, **_(start=start, end=end)) 85 def tell(self): return self._mmap.tell() 86 def size(self): return self._mmap.size() 87 def isatty(self): return self._f.isatty() 88 readable = writable = seekable = AsyncReadWriteCouple.readable 89 def _flush(self, offset, size, _=H.filter_out): self._f.flush(); self._mmap.flush(offset, *_(size)) 90 def _trunc_from(self, data, offset): c = (m := self._mmap).tell(); m.seek(0, 2); m.resize(max(m.tell(), x := offset+len(data))); m.seek(c); return x 91 def _read(self, offset, size): return self._mmap[offset:None if size < 0 else offset+size] 92 def _write(self, data, offset): (m := self._mmap)[offset:self._trunc_from(data, offset)] = data; m.flush() 93 def _readline(self, offset, size, incl_newline): return (b'', 0) if offset >= (l := len(m := self._mmap)) else (m[offset:(q := p if (e := m.find(b'\n', offset, p := (l if size is None else min(offset+size, l)))) == -1 else e+incl_newline)], q) 94 def _readlines(self, hint, /): 95 if hint < 0: yield from map(bytes, self._mmap); return 96 f = self._readline 97 while hint > 0: b, n = f(0, None, False); yield b; hint -= n 98 async def writelines(self, lines, /, *, sep=b'', minimize_writes=None): 99 f, lines = self.write, A.iter_to_agen(lines) 100 if A.getcontext().MEMORY_MAPPED_IO_MANAGER_DEFAULT_MINIMIZE_WRITES if minimize_writes is None else minimize_writes: return await f(sep.join(await A.to_list(lines))) 101 if sep: 102 async for l in lines: await f(l); await f(sep) 103 else: 104 async for l in lines: await f(l) 105 async def read_str(self, offset=0, size=-1, encoding='utf-8', errors='strict'): return (await self.read(offset, size)).decode(encoding, errors) 106 def write_str(self, text, offset=0, encoding='utf-8', errors='strict'): return self.write(text.encode(encoding, errors), offset) 107 def smart_write(self, data, offset=0, encoding='utf-8', errors='strict'): return self.write(data.encode(encoding, errors) if isinstance(data, str) else data, offset) 108 async def copy_range(self, src_offset, dest_offset, size): 109 try: await self.write(await self.read(src_offset, size), dest_offset); return True 110 except: return False # noqa: E722 111 def fill(self, pattern, offset=0, count=1): return self.write(pattern*count, offset) 112 async def compare(self, other, /, size=-1, offset_self=0, offset_other=0): return (await self.read(offset_self, size)) == (await other.read(offset_other, size)) 113 async def hamming_dist(self, other, /, size=-1, offset_self=0, offset_other=0, _=tuple(map(int.bit_count, range(0x100)))): return sum(_[i^j] for i, j in zip(await self.read(offset_self, size), await other.read(offset_other, size), strict=size > 0)) # noqa: B008 114 async def hamming_dist_bytes(self, other, /, size=-1, offset_self=0, offset_other=0): return sum(i != j for i, j in zip(await self.read(offset_self, size), await other.read(offset_other, size), strict=size > 0)) 115 async def read_until(self, delim, offset=0, maxsize=-1): return (d, offset+len(d)) if (p := (d := await self.read(offset, maxsize)).find(delim)) == -1 else (d[:p+(l := len(delim))], offset+p+l) 116 async def insert(self, data, offset): await self.write(data if offset > await self.run(self.size) else data+await self.read(offset), offset) 117 async def delete(self, offset, size): 118 if size <= 0 or offset >= (s := await self.run(self._mmap.size)): return 119 if (t := offset+size) < s: await self.write(await self.read(t), offset) 120 await self.run(self.resize, max(0, s-size)) 121 async def replace(self, old, new, offset=0, count=None): 122 r, c, o, n, f, g, h = 0, offset, len(old), len(new), partial(self.run, self.find, old), self.delete, self.insert 123 if count is None: count = float('inf') 124 while r < count: 125 if (p := await f(c)) == -1: break 126 await g(p, o); await h(new, p); r += 1; c = p+n 127 return r 128 async def search_lazy(self, pattern, offset=0): 129 f = partial(self.run, self.find, pattern) 130 for c in count(offset): 131 if (p := await f(c)) == -1: break 132 yield p 133 async def search_lazy_nonoverlapping(self, pattern, offset=0): 134 f = partial(self.run, self.find, pattern) 135 while True: 136 if (offset := await f(offset)) == -1: break 137 yield offset 138 def search(self, pattern, offset=0, max_results=None): return A.collect(self.search_lazy(pattern, offset), max_results) 139 def search_nonoverlapping(self, pattern, offset=0, max_results=None): return A.collect(self.search_lazy_nonoverlapping(pattern, offset), max_results) 140 async def compact(self): 141 for i in range(len(c := await self.read())): 142 if c[~i]: await self.run(self.resize, c-i); return i 143 def __init_subclass__(cls, *, m, r): 144 @staticmethod 145 async def run(f, /, *a, r=r): return await r(f, *a) 146 cls.mgr, cls.run, cls.open_files = m, run, {}
[docs] 147class MemoryMappedIOManager(A.LoopContextMixin): 148 __slots__ = '_factory', '_lock' 149 def __init__(self, executor=None, _f=(File,), _=H.create_executor): super().__init__(); self._factory, self._lock = type('_factory', _f, {}, m=__import__('_weakrefset').WeakSet(), r=partial(self.loop.run_in_executor, _(self, False) if executor is None else executor)), Lock() 150 @property 151 def open_mmaps(self): return self._factory.mgr 152 def _run(self, f, /, *a): return self._factory.run(f, *a) 153 @property 154 def currently_open(self): return len(self.open_mmaps) 155 @property 156 def open_paths(self): return dict(self.open_files.keys()) 157 @property 158 def open_files(self): return self._factory.open_files 159 @open_files.deleter 160 def open_files(self): self.open_files.clear() 161 @asynccontextmanager 162 async def _open(self, s, /, *k): 163 if (x := (F := self.open_files).get(k)): yield x; return 164 with await (r := self._run)(open, *k) as f: 165 if s > 0: await r(f.truncate, s) 166 async with self._factory(f) as x: 167 F[k] = x 168 try: yield x 169 finally: F.pop(k, None)
[docs] 170 def open(self, path, init_size=0): return self._open(init_size, path, 'r+b')
[docs] 171 def create(self, path, init_size=0, *, exclusive=True): return self._open(init_size, path, 'x+b' if exclusive else 'w+b')
[docs] 172 async def __cleanup__(self): 173 async with self._lock: self.open_mmaps.clear(); await gather(*(f.close() for f in self.open_files.values())); del self.open_files
[docs] 174 def __del__(self): self.make(self.__cleanup__())
[docs] 175 async def copy_file(self, srcp, destp, *, flush=False): 176 async with self.open(srcp) as src, self.create(destp) as dest: 177 await dest.write(await src.read()) 178 if flush: await dest.flush()
[docs] 179 async def checksum(self, path, alg=None, _=partial(__import__('hashlib').new, usedforsecurity=False)): 180 async with self.open(path) as f: return await self._run(_, A.getcontext().MEMORY_MAPPED_IO_MANAGER_DEFAULT_CHECKSUM_ALG if alg is None else alg, await f.read()).hexdigest()
[docs] 181 async def approx_memory_usage(self): 182 async with self._lock: return await self._run(self._memusage_helper)
183 def _memusage_helper(self): return sum(m.size() for m in self.open_mmaps)
[docs] 184 @asynccontextmanager 185 async def prefetch_files(self, *P, init_size=0, _=S.exc_info): 186 l = tuple(map(partial(self.open, init_size=init_size), P)) 187 try: yield await gather(*(c.__aenter__() for c in l)) 188 finally: t = _(); await gather(*(c.__aexit__(*t) for c in l))
[docs] 189 @asynccontextmanager 190 async def create_sparsef(self, path, total_size, chunks): 191 async with self.create(path, total_size) as f: 192 g = f.smart_write 193 for o, d in chunks.items(): await g(d, o) 194 yield f
195 async def _bulk_reader(self, path, offsets): 196 a = (r := []).append 197 async with self.open(path) as f: 198 f = f.read 199 if offsets is None: a(await f()) 200 else: 201 for o, s in offsets: a(await f(o, s)) 202 return path, r 203 async def _bulk_writer(self, path, data): 204 async with self.open(path) as f: await gather(*starmap(f.write, data)) 205 async def _bulk_creator(self, path, size, chunks, exclusive): 206 async with self.create(path, size, exclusive=exclusive) as f: 207 f = f.smart_write 208 for o, d in chunks.items(): await f(d, o) 209 async def _checksum_helper(self, alg, path): return path, await self.checksum(path, alg) 210 async def _resize_helper(self, path, size): 211 async with self.open(path) as f: await self._run(f.resize, size) 212 async def _compact_helper(self, path): 213 async with self.open(path) as f: await f.compact()
[docs] 214 async def bulk_read(self, file_offsets): return dict(await gather(*starmap(self._bulk_reader, file_offsets.items())))
[docs] 215 async def bulk_write(self, file_data): await gather(*starmap(self._bulk_writer, file_data.items()))
[docs] 216 async def bulk_checksum(self, paths, alg=None): return dict(await gather(*map(partial(self._checksum_helper, A.getcontext().MEMORY_MAPPED_IO_MANAGER_DEFAULT_CHECKSUM_ALG if alg is None else alg), paths)))
[docs] 217 async def bulk_copy(self, pairs): await gather(*starmap(self.copy_file, pairs))
[docs] 218 async def bulk_resize(self, sizes): await gather(*starmap(self._resize_helper, sizes.items()))
[docs] 219 async def compact_files(self, paths): await gather(*map(self._compact_helper, paths))
[docs] 220 async def find_in_files(self, pattern, paths, max_per_file=None, *, allow_overlapping=False): 221 async def searchf(p, o): 222 async with self.open(p) as f: return p, await (f.search if allow_overlapping else f.search_nonoverlapping)(pattern, o, max_per_file) 223 return {k: v for k, v in await gather(*starmap(searchf, paths.items())) if v}
224 P.patch_method_signatures((__init__, 'executor=None'), (prefetch_files, '*paths, init_size=0'), (_open, 'init_size, path, mode, /'))
225def __getattr__(n, /, _=S, a=frozenset(('ainput', 'stdcoup')), g=globals()): 226 if n not in a: raise AttributeError(f'module {__name__!r} has no attribute {n!r}') 227 global ainput, stdcoup; stdcoup = AsyncReadWriteCouple(_.stdin, _.stdout) # ty: ignore[unresolved-global] 228 async def ainput(prompt='', assert_tty=False): 229 if prompt: await stdcoup.write(prompt); await stdcoup.flush() 230 if assert_tty and not stdcoup.reader.isatty(): raise OSError(E.ENOTTY, 'asyncutils.io.ainput: standard input is not a TTY') 231 if (d := await stdcoup.readline()).endswith('\n'): return d[:-1] 232 raise EOFError 233 return g[n] 234del f, H, P, S, _, File, O, t