Source code for asyncutils.networking

  1import asyncutils as A, asyncio as I
  2from asyncutils._internal.compat import Queue
  3from asyncutils._internal.helpers import LoopMixinBase, audit_fullname, fullname
  4from asyncutils._internal.log import warning
  5from asyncutils._internal.submodules import networking_all as __all__
[docs] 6class LineProtocol(I.Protocol, LoopMixinBase): 7 NEWLINE, CARRIAGE_RETURN, _h = __import__('os').linesep.encode(), b'\r', A.ignore_cancellation.combined(I.InvalidStateError); __slots__ = '_buffer', '_closed', '_drain_waiter', '_eof_received', '_lines', '_paused', 'transport' 8 def __init__(self): audit_fullname(self); self._buffer, self._lines = bytearray(), Queue(); self._closed = self._paused = self._eof_received = False; self.transport = self._drain_waiter = None 9 @property 10 def connected_transport(self): 11 if (t := self.transport) is None: raise ConnectionError('asyncutils.networking.LineProtocol: no transport connected') 12 return t
[docs] 13 def connection_made(self, transport): self.transport = transport
[docs] 14 def connection_lost(self, exc): 15 if t := self.transport: 16 with self._h: t.abort() 17 self._lines.shutdown(); self._closed = True 18 if w := self._drain_waiter: 19 if not w.done(): w.set_exception(ConnectionError('asyncutils.networking.LineProtocol: transport connection lost') if exc is None else exc) 20 self._drain_waiter = None
[docs] 21 def close(self): 22 if t := self.transport: 23 with self._h: t.close(); self._closed = True 24 return self._closed
[docs] 25 def data_received(self, data, bufsize=None): 26 if bufsize is None: bufsize = A.getcontext().LINE_PROTOCOL_DEFAULT_BUFFER_SIZE 27 (b := self._buffer).extend(data); n = self.NEWLINE 28 if len(b) > bufsize: self.flush() 29 while not self._closed and n in b: l, b = b.split(n, 1); self._put_line(l) 30 self._buffer = b 31 if self._eof_received: self.flush(); self.signal_eof()
[docs] 32 def flush(self): self._put_line(b := self._buffer); b.clear()
[docs] 33 def signal_eof(self): self._lines.put_nowait(None)
[docs] 34 def pause_writing(self): 35 self._paused = True 36 if self._drain_waiter is None: self._drain_waiter = self.make_fut()
[docs] 37 def resume_writing(self): 38 self._paused = False 39 if w := self._drain_waiter: 40 if not w.done(): w.set_result(None) 41 self._drain_waiter = None
[docs] 42 def _put_line(self, data): self._lines.put_nowait(data.rstrip(self.CARRIAGE_RETURN).decode('utf-8'))
[docs] 43 def write_line(self, line): 44 with self._h: 45 if not self.connected_transport.is_closing(): self.write_literal(line.encode('utf-8', 'ignore')+self.NEWLINE)
[docs] 46 def write_literal(self, data): self.connected_transport.write(data)
[docs] 47 def eof_received(self): 48 self._eof_received = True 49 if self._buffer: self.flush() 50 if self._lines.empty() and not self._closed: self.signal_eof()
[docs] 51 async def read_line(self): L = self._lines; return None if self._closed and L.empty() else (L.task_done() if (l := await L.get()) is None else l)
[docs] 52 async def drain(self): 53 if self._paused and (w := self._drain_waiter): await w
[docs] 54 async def write_line_with_backpressure(self, line): await self.drain(); self.write_line(line)
[docs] 55 async def write_literal_with_backpressure(self, data): await self.drain(); self.write_literal(data)
[docs] 56class LFProtocol(LineProtocol): NEWLINE, __slots__ = b'\n', ()
[docs] 57class CRLFProtocol(LineProtocol): NEWLINE, __slots__ = b'\r\n', ()
[docs] 58class CRProtocol(LineProtocol): NEWLINE, __slots__ = b'\r', ()
[docs] 59class SocketTransport(I.Transport): 60 __slots__ = '_buffer', '_closing', '_limits', '_protocol', '_socket'; _h = A.IgnoreErrors(OSError)
[docs] 61 @staticmethod 62 def make_protocol(): return LineProtocol()
63 @property 64 def loop(self): return p.loop if isinstance(p := self._protocol, LineProtocol) else NotImplemented 65 def __init__(self, sock=None): 66 audit_fullname(self); self._reset_extra(); (p := self.make_protocol()).connection_made(self); self._socket, self._closing, self._buffer, self._limits, self._protocol = sock, False, bytearray(), A.getcontext().SOCKET_TRANSPORT_LIMITS, p 67 if sock: self.connect_sock(sock)
[docs] 68 def _reset_extra(self, _=('socket', 'sockname', 'peername')): super().__init__(dict.fromkeys(_))
69 def _sock_transport_read_ready(self, sock, size=None): 70 try: self._protocol.data_received(d) if (d := sock.recv(A.getcontext().LINE_PROTOCOL_DEFAULT_BUFFER_SIZE if size is None else size)) else (self._protocol.eof_received() or self.close()) 71 except OSError as e: warning('%s: read error', fullname(self)); self.close(e)
[docs] 72 def connect_sock(self, sock=None): 73 if sock is None and (sock := self._socket) is None: return 74 sock.setblocking(False); self.loop.add_reader(sock.fileno(), self._sock_transport_read_ready, sock); (e := self._extra)['sockname'] = sock.getsockname() # ty: ignore[unresolved-attribute] 75 with self._h: e['peername'] = sock.getpeername()
[docs] 76 def disconnect_sock(self): 77 if (s := self._socket) is None: return 78 with self._h: s.close() 79 self.loop.remove_reader(s.fileno()); self._socket = None; self._reset_extra(); return s
[docs] 80 @A.dualcontextmanager 81 def sock_context(self, sock): 82 try: yield self.connect_sock(sock) 83 finally: self.disconnect_sock()
84 def _writer(self, data, bufsize=None): 85 if self._closing: return 86 (b := self._buffer).extend(data) 87 if bufsize is None: bufsize = A.getcontext().SOCKET_TRANSPORT_LIMITS[1] 88 if len(b) > bufsize: 89 if (s := self._socket) is None: return 90 try: s.sendall(b); b.clear() 91 except OSError as e: warning('%s: write error', fullname(self)); self.close(e)
[docs] 92 def write(self, data): self.loop.call_soon(self._writer, data)
[docs] 93 def get_write_buffer_size(self): return len(self._buffer)
[docs] 94 def get_write_buffer_limits(self): return self._limits
[docs] 95 def set_write_buffer_limits(self, high=None, low=None): 96 if low is None: low = self._limits[0] 97 if high is None: high = self._limits[1] 98 self._limits = min(low := max(low, 0), high := min(high, A.getcontext().SOCKET_TRANSPORT_LIMITS[1])), high
[docs] 99 def write_eof(self): 100 if not (self._closing or (s := self._socket) is None): 101 with self._h: s.shutdown(1)
[docs] 102 def can_write_eof(self): return True # noqa: PLR6301
[docs] 103 def is_closing(self): return self._closing
[docs] 104 def close(self, e=None): 105 if self._closing: return 106 self._closing = True; self._protocol.connection_lost(e); self.disconnect_sock()
[docs] 107 def get_protocol(self): return self._protocol
108 def set_protocol(self, protocol): 109 if not isinstance(protocol, LineProtocol): raise TypeError('asyncutils.networking.SocketTransport: protocol should be a LineProtocol') 110 self._protocol.connection_lost(None); protocol.connection_made(self); self._protocol = protocol; self.connect_sock()
[docs] 111 def abort(self): self.loop.call_soon(self.close)