1import asyncutils as A, asyncio as I
2from asyncutils._internal.compat import Queue
3from asyncutils._internal.helpers import LoopMixinBase, audit_fullname, fullname
4from asyncutils._internal.log import warning
5from asyncutils._internal.submodules import networking_all as __all__
[docs]
6class LineProtocol(I.Protocol, LoopMixinBase):
7 NEWLINE, CARRIAGE_RETURN, _h = __import__('os').linesep.encode(), b'\r', A.ignore_cancellation.combined(I.InvalidStateError); __slots__ = '_buffer', '_closed', '_drain_waiter', '_eof_received', '_lines', '_paused', 'transport'
8 def __init__(self): audit_fullname(self); self._buffer, self._lines = bytearray(), Queue(); self._closed = self._paused = self._eof_received = False; self.transport = self._drain_waiter = None
9 @property
10 def connected_transport(self):
11 if (t := self.transport) is None: raise ConnectionError('asyncutils.networking.LineProtocol: no transport connected')
12 return t
[docs]
13 def connection_made(self, transport): self.transport = transport
[docs]
14 def connection_lost(self, exc):
15 if t := self.transport:
16 with self._h: t.abort()
17 self._lines.shutdown(); self._closed = True
18 if w := self._drain_waiter:
19 if not w.done(): w.set_exception(ConnectionError('asyncutils.networking.LineProtocol: transport connection lost') if exc is None else exc)
20 self._drain_waiter = None
[docs]
21 def close(self):
22 if t := self.transport:
23 with self._h: t.close(); self._closed = True
24 return self._closed
[docs]
25 def data_received(self, data, bufsize=None):
26 if bufsize is None: bufsize = A.getcontext().LINE_PROTOCOL_DEFAULT_BUFFER_SIZE
27 (b := self._buffer).extend(data); n = self.NEWLINE
28 if len(b) > bufsize: self.flush()
29 while not self._closed and n in b: l, b = b.split(n, 1); self._put_line(l)
30 self._buffer = b
31 if self._eof_received: self.flush(); self.signal_eof()
[docs]
32 def flush(self): self._put_line(b := self._buffer); b.clear()
[docs]
33 def signal_eof(self): self._lines.put_nowait(None)
[docs]
34 def pause_writing(self):
35 self._paused = True
36 if self._drain_waiter is None: self._drain_waiter = self.make_fut()
[docs]
37 def resume_writing(self):
38 self._paused = False
39 if w := self._drain_waiter:
40 if not w.done(): w.set_result(None)
41 self._drain_waiter = None
[docs]
42 def _put_line(self, data): self._lines.put_nowait(data.rstrip(self.CARRIAGE_RETURN).decode('utf-8'))
[docs]
43 def write_line(self, line):
44 with self._h:
45 if not self.connected_transport.is_closing(): self.write_literal(line.encode('utf-8', 'ignore')+self.NEWLINE)
[docs]
46 def write_literal(self, data): self.connected_transport.write(data)
[docs]
47 def eof_received(self):
48 self._eof_received = True
49 if self._buffer: self.flush()
50 if self._lines.empty() and not self._closed: self.signal_eof()
[docs]
51 async def read_line(self): L = self._lines; return None if self._closed and L.empty() else (L.task_done() if (l := await L.get()) is None else l)
[docs]
52 async def drain(self):
53 if self._paused and (w := self._drain_waiter): await w
[docs]
54 async def write_line_with_backpressure(self, line): await self.drain(); self.write_line(line)
[docs]
55 async def write_literal_with_backpressure(self, data): await self.drain(); self.write_literal(data)
[docs]
56class LFProtocol(LineProtocol): NEWLINE, __slots__ = b'\n', ()
[docs]
57class CRLFProtocol(LineProtocol): NEWLINE, __slots__ = b'\r\n', ()
[docs]
58class CRProtocol(LineProtocol): NEWLINE, __slots__ = b'\r', ()
[docs]
59class SocketTransport(I.Transport):
60 __slots__ = '_buffer', '_closing', '_limits', '_protocol', '_socket'; _h = A.IgnoreErrors(OSError)
[docs]
61 @staticmethod
62 def make_protocol(): return LineProtocol()
63 @property
64 def loop(self): return p.loop if isinstance(p := self._protocol, LineProtocol) else NotImplemented
65 def __init__(self, sock=None):
66 audit_fullname(self); self._reset_extra(); (p := self.make_protocol()).connection_made(self); self._socket, self._closing, self._buffer, self._limits, self._protocol = sock, False, bytearray(), A.getcontext().SOCKET_TRANSPORT_LIMITS, p
67 if sock: self.connect_sock(sock)
69 def _sock_transport_read_ready(self, sock, size=None):
70 try: self._protocol.data_received(d) if (d := sock.recv(A.getcontext().LINE_PROTOCOL_DEFAULT_BUFFER_SIZE if size is None else size)) else (self._protocol.eof_received() or self.close())
71 except OSError as e: warning('%s: read error', fullname(self)); self.close(e)
[docs]
72 def connect_sock(self, sock=None):
73 if sock is None and (sock := self._socket) is None: return
74 sock.setblocking(False); self.loop.add_reader(sock.fileno(), self._sock_transport_read_ready, sock); (e := self._extra)['sockname'] = sock.getsockname() # ty: ignore[unresolved-attribute]
75 with self._h: e['peername'] = sock.getpeername()
[docs]
76 def disconnect_sock(self):
77 if (s := self._socket) is None: return
78 with self._h: s.close()
79 self.loop.remove_reader(s.fileno()); self._socket = None; self._reset_extra(); return s
[docs]
80 @A.dualcontextmanager
81 def sock_context(self, sock):
82 try: yield self.connect_sock(sock)
83 finally: self.disconnect_sock()
84 def _writer(self, data, bufsize=None):
85 if self._closing: return
86 (b := self._buffer).extend(data)
87 if bufsize is None: bufsize = A.getcontext().SOCKET_TRANSPORT_LIMITS[1]
88 if len(b) > bufsize:
89 if (s := self._socket) is None: return
90 try: s.sendall(b); b.clear()
91 except OSError as e: warning('%s: write error', fullname(self)); self.close(e)
[docs]
92 def write(self, data): self.loop.call_soon(self._writer, data)
[docs]
93 def get_write_buffer_size(self): return len(self._buffer)
[docs]
94 def get_write_buffer_limits(self): return self._limits
[docs]
95 def set_write_buffer_limits(self, high=None, low=None):
96 if low is None: low = self._limits[0]
97 if high is None: high = self._limits[1]
98 self._limits = min(low := max(low, 0), high := min(high, A.getcontext().SOCKET_TRANSPORT_LIMITS[1])), high
[docs]
99 def write_eof(self):
100 if not (self._closing or (s := self._socket) is None):
101 with self._h: s.shutdown(1)
[docs]
102 def can_write_eof(self): return True # noqa: PLR6301
[docs]
103 def is_closing(self): return self._closing
[docs]
104 def close(self, e=None):
105 if self._closing: return
106 self._closing = True; self._protocol.connection_lost(e); self.disconnect_sock()
[docs]
107 def get_protocol(self): return self._protocol
108 def set_protocol(self, protocol):
109 if not isinstance(protocol, LineProtocol): raise TypeError('asyncutils.networking.SocketTransport: protocol should be a LineProtocol')
110 self._protocol.connection_lost(None); protocol.connection_made(self); self._protocol = protocol; self.connect_sock()
[docs]
111 def abort(self): self.loop.call_soon(self.close)