#!/usr/bin/env python3 import socket as sk from array import array import os import sys from pathlib import Path from stat import * # suggested usage… import selectors from dataclasses import dataclass import logging as log import math import termcolor # external dep, TODO # Note: get/setrlimit are in resource module def setup_logging(): log.basicConfig( format='%(module)s: %(levelname)s: %(message)s', level=log.DEBUG, ) @dataclass class Direction: sigil: str color: str def hexdump(data: bytes) -> list[str]: # We will be prefixing the lines with sigils, so no need to join. result = [] offset = 0 offset_len = math.ceil(math.log(len(data)+1, 16)) while data: left_chunk, right_chunk, data = data[:8], data[8:16], data[16:] left_ascii = ''.join(chr(x) if x in range(32,127) else '.' for x in left_chunk) right_ascii = ''.join(chr(x) if x in range(32,127) else '.' for x in right_chunk) offset_str = f'{offset:0{offset_len}x}' line = f"{offset_str} {left_chunk.hex(' ').ljust(23)} {right_chunk.hex(' ').ljust(23)} |{left_ascii.ljust(8)}{right_ascii.ljust(8)}|" offset += 16 result.append(line) return result # fd types from inode(7) fdtypes = { S_IFSOCK: 'socket', S_IFLNK: 'symlink', S_IFREG: 'regular file', S_IFBLK: 'blockdev', S_IFDIR: 'directory', S_IFCHR: 'chardev', S_IFIFO: 'fifo', } def fdinfo(fd: int) -> str: result = [] stat = os.fstat(fd) type = S_IFMT(stat.st_mode) result.append(f'type: {fdtypes.get(type, f"UNKNOWN: {type}")}') if S_ISREG(stat.st_mode): result.append(f'size: {stat.st_size}') orig_offset = os.lseek(fd, 0, os.SEEK_CUR) os.lseek(fd, 0, os.SEEK_SET) head = os.read(fd, 256) result.extend(hexdump(head)) os.lseek(fd, orig_offset, os.SEEK_SET) return result def fd_readlink(fd: int, pid: int | None =None) -> str: if pid is None: pid = os.getpid() fd_link = Path('/proc') / str(pid) / 'fd' / str(fd) try: return os.readlink(fd_link) except OSError as e: return f"Could not read {fd_link}: {e.strerror} (errno {e.errno})" def dirprintlines(dir: Direction, lines, file=sys.stdout, indent=''): if isinstance(lines, str): lines = [lines] lines_to_print = [f'{dir.sigil} {indent}{line}' for line in lines] if True or COLOR: lines_to_print = [termcolor.colored(line, dir.color) for line in lines_to_print] print(*lines_to_print, sep='\n', file=file) def show_fd(dir, fd, /, indent=' '): # Here we decide what to show and format it. We leave any particular # decisions on other functions. dirprintlines(dir, f'link: {fd_readlink(fd)}', indent=indent) dirprintlines(dir, fdinfo(fd), indent=indent) # Maps from integers to symbolic names, acc to unix(7) sockopts = {getattr(sk, name): name for name in( 'SO_PASSCRED', 'SO_PASSSEC', # 'SO_PEEK_OFF', # wtf python does not have this 'SO_PEERCRED', 'SO_PEERSEC', )} anclevels = {getattr(sk, name): name for name in( 'SOL_SOCKET', )} anctypes = {getattr(sk, name): name for name in( 'SCM_RIGHTS', 'SCM_CREDENTIALS', # 'SCM_SECURITY', # wtf python does not have this )} def show_anc(dir, a, /, indent=' ') -> list[int]: "returns a possible list of file descriptors" result = [] level, type, data = a if level == sk.SOL_SOCKET and type == sk.SCM_RIGHTS: # A file descriptor fds = array('i') fds.frombytes(data) dirprintlines(dir, f'Received {len(fds)} file descriptor{"s"*(len(fds)!=1)}', indent=indent) result = list(fds) for fd in fds: show_fd(dir, fd, indent=2*indent) else: dirprintlines(dir, 'Cannot show details.', indent=indent) return result def fwd_traffic(frm: sk.socket, to: sk.socket, dir: Direction): chunk_size = 640*1024 # … idk, ought to be enough for anyone… fds = [] # potential fds data, anc, flags, addr = frm.recvmsg(chunk_size, chunk_size) dirprintlines(dir, f'Forwarding {len(data)} bytes and {len(anc)} ancillary from {addr} to {to}') dirprintlines(dir, hexdump(data), indent=' ') for i,a in enumerate(anc): anclevel, anctype, _ancdata = a dirprintlines(dir, f'Ancillary #{i}: level {anclevels[anclevel]}, type {anctypes[anctype]}') fds = show_anc(dir, a) to.sendmsg([data], anc, flags) if fds: log.debug(f'Closing fds: {fds}') for fd in fds: os.close(fd) def main(): setup_logging() listen_fname = sys.argv[1] connect_fname = sys.argv[2] listen_skt = sk.socket(sk.AF_UNIX, sk.SOCK_STREAM) connect_skt = sk.socket(sk.AF_UNIX, sk.SOCK_STREAM) # In case we get data on connect, we should not connect to the "server" # socket before a "client" connects to us. # However, we should at least "warn fast" if not Path(connect_fname).exists(): log.warning(f"Socket {connect_fname} does not currently exist.") elif not Path(connect_fname).is_socket(): log.warning(f"{connect_fname} is not a socket.") listen_skt.bind(listen_fname) listen_skt.listen() log.info(f'Listening on {listen_fname}') client_skt, _addrinfo = listen_skt.accept() # blocks log.info('Incoming connection') if True or ONE_CLIENT: # We only support one client, so we can clean up the listening socket. # TODO: We _could_ actually support multiple clients, by connect()ing to # the compositor multiple times. (The code is almost ready apart from # the fact that the select loop does not expect accepting connections.) # However, at that point this should be probably written asynchronously… listen_skt.close() os.unlink(listen_fname) connect_skt.connect(connect_fname) log.debug(f'Succesfully connected to {connect_fname}') sel = selectors.DefaultSelector() sel.register(client_skt, selectors.EVENT_READ, (client_skt, connect_skt, Direction('>', 'blue'))) sel.register(connect_skt, selectors.EVENT_READ, (connect_skt, client_skt, Direction('<', 'yellow'))) log.debug('Starting processing loop.') while True: events = sel.select() log.debug(f'We have {len(events)} event{"s"*(len(events)!=1)} to process') for key, _what in events: frm, to, dir = key.data # fuck python having "from" as a keyword fwd_traffic(frm, to, dir) log.debug('Events processed.') if __name__ == '__main__': main() # TODO: pcap(ng) dumping, argumets for coloring, …