1# ty: ignore[unresolved-attribute]
2from asyncutils._internal import helpers as H, patch as P
3from asyncutils._internal.submodules import io_all as __all__
4import asyncutils as A, errno as E, os as O, sys as S
5from _functools import partial
6from asyncio import Lock, gather
7from contextlib import asynccontextmanager
8from itertools import count, starmap
9from mmap import mmap
10def f(a, b, f=S.audit, _=O.pipe, /):
11 def double_ended_pipe(*, pipe_impl=_, x=partial(open, mode=a), y=partial(open, mode=b), _=f): r, W, R, w = *pipe_impl(), *pipe_impl(); _(f'asyncutils.io.double_ended_{"text" if a == "r" else "binary"}_pipe', r, w, R, W); return tuple(map(AsyncReadWriteCouple, map(x, (r, R)), map(y, (w, W))))
12 return double_ended_pipe
13_, s = lambda s=None, /, **d: {k: v for k, v in d.items() if v is not s}, '*, pipe_impl={}'
14double_ended_text_pipe, double_ended_binary_pipe = t = tuple(map(f, ('r', 'rb'), ('w', 'wb')))
15P.patch_function_signatures(*((_, s) for _ in t))
[docs]
16@H.subscriptable
17class AsyncReadWriteCouple(A.LoopContextMixin):
18 __slots__ = '_aso', '_executor', 'reader', 'writer'; executor, AMBIGUOUS_CALL_TIP = None, 'tip: delegate to reader or writer as appropriate'
19 def __init__(self, r, w, /, executor=None, _=H.create_executor, *, find_attr_on_writer_first=False):
20 if not r.readable(): raise TypeError(f'asyncutils.io.AsyncReadWriteCouple: reader {r!r} is not readable')
21 if not w.writable(): raise TypeError(f'asyncutils.io.AsyncReadWriteCouple: writer {w!r} is not writable')
22 super().__init__(); self._executor, self.reader, self.writer, self._aso = _(t) if executor is None is (executor := (t := type(self)).executor) else executor, r, w, (w, r) if find_attr_on_writer_first else (r, w)
23 async def _run(self, f, *a): return await self.loop.run_in_executor(self._executor, f, *a)
[docs]
24 def read(self, n=-1, /): return self._run(self.reader.read, n)
[docs]
25 def read1(self, n=-1, /): return self._run(self.reader.read1, n)
[docs]
26 def readall(self): return self._run(r.read if (f := getattr(r := self.reader, 'readall', None)) is None else f)
[docs]
27 async def readinto(self, b, /):
28 if (f := getattr(r := self.reader, 'readinto', None)) is not None: return await self._run(f, b)
29 if (m := memoryview(b)).readonly: raise TypeError('asyncutils.io.AsyncReadWriteCouple: cannot read into a read-only buffer')
30 if l := len(d := await self._run(r.read, m.nbytes)): m[:l] = d
31 return l
[docs]
32 def readinto1(self, b, /): return self._run(self.reader.readinto1, b)
[docs]
33 def readline(self, limit=-1, /): return self._run(self.reader.readline, limit)
[docs]
34 def readlines(self, hint=-1, /): return self._run(self.reader.readlines, hint)
[docs]
35 def write(self, s, /): return self._run(self.writer.write, s)
[docs]
36 def writelines(self, lines, /): return self._run(self.writer.writelines, lines)
[docs]
37 def fileno(self): A.raise_exc(OSError, E.EBADF, 'asyncutils.io.AsyncReadWriteCouple: ambiguous file descriptor query', notes=self.AMBIGUOUS_CALL_TIP)
[docs]
38 def isatty(self): A.raise_exc(OSError, E.ENOTSUP, 'asyncutils.io.AsyncReadWriteCouple: ambiguous isatty query', notes=self.AMBIGUOUS_CALL_TIP)
39 readable = writable = lambda _, /: True
[docs]
40 def flush(self): return self._run(self.writer.flush)
[docs]
41 def seekable(self): return False
[docs]
42 def seek(self, offset, whence=0, /): A.raise_exc(OSError, E.ESPIPE, 'asyncutils.io.AsyncReadWriteCouple: cannot use seek on read-write couple', notes=self.AMBIGUOUS_CALL_TIP) # noqa: ARG002
[docs]
43 def tell(self): A.raise_exc(OSError, E.ESPIPE, 'asyncutils.io.AsyncReadWriteCouple: tell is not supported', notes=self.AMBIGUOUS_CALL_TIP)
[docs]
44 def truncate(self, size=None, /): return self._run(self.writer.truncate, size)
[docs]
45 async def aclose(self): await gather(*map(self._run, (self.reader.close, self.writer.close))); self._executor.shutdown()
46 __cleanup__ = aclose
47 @property
48 def closed(self): return self.reader.closed and self.writer.closed
[docs]
49 def __getattr__(self, n, /):
50 f = (a := []).append
51 for _ in self._aso:
52 try: return getattr(_, n)
53 except AttributeError as e: f(e)
54 raise ExceptionGroup(f'asyncutils.io.AsyncReadWriteCouple: did not find attribute {n!r}', a) from None
55class File(A.LoopContextMixin): # noqa: PLR0904
56 __slots__ = '_f', '_fn', '_mmap'
57 if S.platform != 'win32':
58 def madvise(self, option, start=0, length=None, _=H.filter_out): return self.mmap.madvise(option, start, *_(length))
59 def read(self, offset=0, size=-1): return self.run(self._read, offset, size)
60 def write(self, data, offset=0): return self.run(self._write, data, offset)
61 async def readline(self, offset=0, size=None, incl_newline=False): return (await self.run(self._readline, offset, size, incl_newline))[0]
62 async def readlines(self, hint=-1): return list(await self.run(self._readlines, hint))
63 async def flush(self, offset=0, size=None, /): return await self.run(self._flush, offset, size)
64 def move(self, dest, src, count): return self.run(self._mmap.move, dest, src, count)
65 async def __setup__(self): self._mmap = m = mmap(self._fn, 0, access=2).__enter__(); self.mgr.add(m)
66 async def __cleanup__(self): await self.aclose(); self.mgr.discard(self._mmap)
67 def seek(self, pos, whence=0): return self.run(self._mmap.seek, pos, whence)
68 def __new__(cls, file, /):
69 if (r := (f := cls.open_files.get)((file, 'r+b'))) is None is (r := f((file, 'w+b'))) is (r := f((file, 'x+b'))): (r := super().__new__(cls))._f, r._fn = file, file.fileno()
70 return r
71 def __iter__(self): return self._f.__iter__()
72 def __aiter__(self): return A.iter_to_agen(self._f)
73 def __del__(self): self.make(self.aclose())
74 @property
75 def closed(self): return self._f.closed
76 def fileno(self): return self._fn
77 def sync(self, _=O.fsync): self._flush(0, None); _(self._fn)
78 async def aclose(self): await gather(*map(self.run, (self._mmap.close, self._f.close)))
79 def close(self): self._mmap.close(); self._f.close()
80 def read_byte(self): return self._mmap.read_byte()
81 def write_byte(self, b, /): self._mmap.write_byte(b)
82 def resize(self, newsize): self._mmap.resize(newsize)
83 def find(self, sub, start=None, end=None, _=_): return self._mmap.find(sub, **_(start=start, end=end))
84 def rfind(self, sub, start=None, end=None, _=_): return self._mmap.rfind(sub, **_(start=start, end=end))
85 def tell(self): return self._mmap.tell()
86 def size(self): return self._mmap.size()
87 def isatty(self): return self._f.isatty()
88 readable = writable = seekable = AsyncReadWriteCouple.readable
89 def _flush(self, offset, size, _=H.filter_out): self._f.flush(); self._mmap.flush(offset, *_(size))
90 def _trunc_from(self, data, offset): c = (m := self._mmap).tell(); m.seek(0, 2); m.resize(max(m.tell(), x := offset+len(data))); m.seek(c); return x
91 def _read(self, offset, size): return self._mmap[offset:None if size < 0 else offset+size]
92 def _write(self, data, offset): (m := self._mmap)[offset:self._trunc_from(data, offset)] = data; m.flush()
93 def _readline(self, offset, size, incl_newline): return (b'', 0) if offset >= (l := len(m := self._mmap)) else (m[offset:(q := p if (e := m.find(b'\n', offset, p := (l if size is None else min(offset+size, l)))) == -1 else e+incl_newline)], q)
94 def _readlines(self, hint, /):
95 if hint < 0: yield from map(bytes, self._mmap); return
96 f = self._readline
97 while hint > 0: b, n = f(0, None, False); yield b; hint -= n
98 async def writelines(self, lines, /, *, sep=b'', minimize_writes=None):
99 f, lines = self.write, A.iter_to_agen(lines)
100 if A.getcontext().MEMORY_MAPPED_IO_MANAGER_DEFAULT_MINIMIZE_WRITES if minimize_writes is None else minimize_writes: return await f(sep.join(await A.to_list(lines)))
101 if sep:
102 async for l in lines: await f(l); await f(sep)
103 else:
104 async for l in lines: await f(l)
105 async def read_str(self, offset=0, size=-1, encoding='utf-8', errors='strict'): return (await self.read(offset, size)).decode(encoding, errors)
106 def write_str(self, text, offset=0, encoding='utf-8', errors='strict'): return self.write(text.encode(encoding, errors), offset)
107 def smart_write(self, data, offset=0, encoding='utf-8', errors='strict'): return self.write(data.encode(encoding, errors) if isinstance(data, str) else data, offset)
108 async def copy_range(self, src_offset, dest_offset, size):
109 try: await self.write(await self.read(src_offset, size), dest_offset); return True
110 except: return False # noqa: E722
111 def fill(self, pattern, offset=0, count=1): return self.write(pattern*count, offset)
112 async def compare(self, other, /, size=-1, offset_self=0, offset_other=0): return (await self.read(offset_self, size)) == (await other.read(offset_other, size))
113 async def hamming_dist(self, other, /, size=-1, offset_self=0, offset_other=0, _=tuple(map(int.bit_count, range(0x100)))): return sum(_[i^j] for i, j in zip(await self.read(offset_self, size), await other.read(offset_other, size), strict=size > 0)) # noqa: B008
114 async def hamming_dist_bytes(self, other, /, size=-1, offset_self=0, offset_other=0): return sum(i != j for i, j in zip(await self.read(offset_self, size), await other.read(offset_other, size), strict=size > 0))
115 async def read_until(self, delim, offset=0, maxsize=-1): return (d, offset+len(d)) if (p := (d := await self.read(offset, maxsize)).find(delim)) == -1 else (d[:p+(l := len(delim))], offset+p+l)
116 async def insert(self, data, offset): await self.write(data if offset > await self.run(self.size) else data+await self.read(offset), offset)
117 async def delete(self, offset, size):
118 if size <= 0 or offset >= (s := await self.run(self._mmap.size)): return
119 if (t := offset+size) < s: await self.write(await self.read(t), offset)
120 await self.run(self.resize, max(0, s-size))
121 async def replace(self, old, new, offset=0, count=None):
122 r, c, o, n, f, g, h = 0, offset, len(old), len(new), partial(self.run, self.find, old), self.delete, self.insert
123 if count is None: count = float('inf')
124 while r < count:
125 if (p := await f(c)) == -1: break
126 await g(p, o); await h(new, p); r += 1; c = p+n
127 return r
128 async def search_lazy(self, pattern, offset=0):
129 f = partial(self.run, self.find, pattern)
130 for c in count(offset):
131 if (p := await f(c)) == -1: break
132 yield p
133 async def search_lazy_nonoverlapping(self, pattern, offset=0):
134 f = partial(self.run, self.find, pattern)
135 while True:
136 if (offset := await f(offset)) == -1: break
137 yield offset
138 def search(self, pattern, offset=0, max_results=None): return A.collect(self.search_lazy(pattern, offset), max_results)
139 def search_nonoverlapping(self, pattern, offset=0, max_results=None): return A.collect(self.search_lazy_nonoverlapping(pattern, offset), max_results)
140 async def compact(self):
141 for i in range(len(c := await self.read())):
142 if c[~i]: await self.run(self.resize, c-i); return i
143 def __init_subclass__(cls, *, m, r):
144 @staticmethod
145 async def run(f, /, *a, r=r): return await r(f, *a)
146 cls.mgr, cls.run, cls.open_files = m, run, {}
[docs]
147class MemoryMappedIOManager(A.LoopContextMixin):
148 __slots__ = '_factory', '_lock'
149 def __init__(self, executor=None, _f=(File,), _=H.create_executor): super().__init__(); self._factory, self._lock = type('_factory', _f, {}, m=__import__('_weakrefset').WeakSet(), r=partial(self.loop.run_in_executor, _(self, False) if executor is None else executor)), Lock()
150 @property
151 def open_mmaps(self): return self._factory.mgr
152 def _run(self, f, /, *a): return self._factory.run(f, *a)
153 @property
154 def currently_open(self): return len(self.open_mmaps)
155 @property
156 def open_paths(self): return dict(self.open_files.keys())
157 @property
158 def open_files(self): return self._factory.open_files
159 @open_files.deleter
160 def open_files(self): self.open_files.clear()
161 @asynccontextmanager
162 async def _open(self, s, /, *k):
163 if (x := (F := self.open_files).get(k)): yield x; return
164 with await (r := self._run)(open, *k) as f:
165 if s > 0: await r(f.truncate, s)
166 async with self._factory(f) as x:
167 F[k] = x
168 try: yield x
169 finally: F.pop(k, None)
[docs]
170 def open(self, path, init_size=0): return self._open(init_size, path, 'r+b')
[docs]
171 def create(self, path, init_size=0, *, exclusive=True): return self._open(init_size, path, 'x+b' if exclusive else 'w+b')
[docs]
172 async def __cleanup__(self):
173 async with self._lock: self.open_mmaps.clear(); await gather(*(f.close() for f in self.open_files.values())); del self.open_files
[docs]
174 def __del__(self): self.make(self.__cleanup__())
[docs]
175 async def copy_file(self, srcp, destp, *, flush=False):
176 async with self.open(srcp) as src, self.create(destp) as dest:
177 await dest.write(await src.read())
178 if flush: await dest.flush()
[docs]
179 async def checksum(self, path, alg=None, _=partial(__import__('hashlib').new, usedforsecurity=False)):
180 async with self.open(path) as f: return await self._run(_, A.getcontext().MEMORY_MAPPED_IO_MANAGER_DEFAULT_CHECKSUM_ALG if alg is None else alg, await f.read()).hexdigest()
[docs]
181 async def approx_memory_usage(self):
182 async with self._lock: return await self._run(self._memusage_helper)
183 def _memusage_helper(self): return sum(m.size() for m in self.open_mmaps)
[docs]
184 @asynccontextmanager
185 async def prefetch_files(self, *P, init_size=0, _=S.exc_info):
186 l = tuple(map(partial(self.open, init_size=init_size), P))
187 try: yield await gather(*(c.__aenter__() for c in l))
188 finally: t = _(); await gather(*(c.__aexit__(*t) for c in l))
[docs]
189 @asynccontextmanager
190 async def create_sparsef(self, path, total_size, chunks):
191 async with self.create(path, total_size) as f:
192 g = f.smart_write
193 for o, d in chunks.items(): await g(d, o)
194 yield f
195 async def _bulk_reader(self, path, offsets):
196 a = (r := []).append
197 async with self.open(path) as f:
198 f = f.read
199 if offsets is None: a(await f())
200 else:
201 for o, s in offsets: a(await f(o, s))
202 return path, r
203 async def _bulk_writer(self, path, data):
204 async with self.open(path) as f: await gather(*starmap(f.write, data))
205 async def _bulk_creator(self, path, size, chunks, exclusive):
206 async with self.create(path, size, exclusive=exclusive) as f:
207 f = f.smart_write
208 for o, d in chunks.items(): await f(d, o)
209 async def _checksum_helper(self, alg, path): return path, await self.checksum(path, alg)
210 async def _resize_helper(self, path, size):
211 async with self.open(path) as f: await self._run(f.resize, size)
212 async def _compact_helper(self, path):
213 async with self.open(path) as f: await f.compact()
[docs]
214 async def bulk_read(self, file_offsets): return dict(await gather(*starmap(self._bulk_reader, file_offsets.items())))
[docs]
215 async def bulk_write(self, file_data): await gather(*starmap(self._bulk_writer, file_data.items()))
[docs]
216 async def bulk_checksum(self, paths, alg=None): return dict(await gather(*map(partial(self._checksum_helper, A.getcontext().MEMORY_MAPPED_IO_MANAGER_DEFAULT_CHECKSUM_ALG if alg is None else alg), paths)))
[docs]
217 async def bulk_copy(self, pairs): await gather(*starmap(self.copy_file, pairs))
[docs]
218 async def bulk_resize(self, sizes): await gather(*starmap(self._resize_helper, sizes.items()))
[docs]
219 async def compact_files(self, paths): await gather(*map(self._compact_helper, paths))
[docs]
220 async def find_in_files(self, pattern, paths, max_per_file=None, *, allow_overlapping=False):
221 async def searchf(p, o):
222 async with self.open(p) as f: return p, await (f.search if allow_overlapping else f.search_nonoverlapping)(pattern, o, max_per_file)
223 return {k: v for k, v in await gather(*starmap(searchf, paths.items())) if v}
224 P.patch_method_signatures((__init__, 'executor=None'), (prefetch_files, '*paths, init_size=0'), (_open, 'init_size, path, mode, /'))
225def __getattr__(n, /, _=S, a=frozenset(('ainput', 'stdcoup')), g=globals()):
226 if n not in a: raise AttributeError(f'module {__name__!r} has no attribute {n!r}')
227 global ainput, stdcoup; stdcoup = AsyncReadWriteCouple(_.stdin, _.stdout) # ty: ignore[unresolved-global]
228 async def ainput(prompt='', assert_tty=False):
229 if prompt: await stdcoup.write(prompt); await stdcoup.flush()
230 if assert_tty and not stdcoup.reader.isatty(): raise OSError(E.ENOTTY, 'asyncutils.io.ainput: standard input is not a TTY')
231 if (d := await stdcoup.readline()).endswith('\n'): return d[:-1]
232 raise EOFError
233 return g[n]
234del f, H, P, S, _, File, O, t