You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

184 lines
6.0 KiB
Python

#!/usr/bin/env python3
import socket as sk
from array import array
import os
import sys
from pathlib import Path
from stat import * # suggested usage…
import selectors
from dataclasses import dataclass
import logging as log
import math
import termcolor # external dep, TODO
# Note: get/setrlimit are in resource module
def setup_logging():
log.basicConfig(
format='%(module)s: %(levelname)s: %(message)s',
level=log.DEBUG,
)
@dataclass
class Direction:
sigil: str
color: str
def hexdump(data: bytes) -> list[str]: # We will be prefixing the lines with sigils, so no need to join.
result = []
offset = 0
offset_len = math.ceil(math.log(len(data)+1, 16))
while data:
left_chunk, right_chunk, data = data[:8], data[8:16], data[16:]
left_ascii = ''.join(chr(x) if x in range(32,127) else '.' for x in left_chunk)
right_ascii = ''.join(chr(x) if x in range(32,127) else '.' for x in right_chunk)
offset_str = f'{offset:0{offset_len}x}'
line = f"{offset_str} {left_chunk.hex(' ').ljust(23)} {right_chunk.hex(' ').ljust(23)} |{left_ascii.ljust(8)}{right_ascii.ljust(8)}|"
offset += 16
result.append(line)
return result
# fd types from inode(7)
fdtypes = {
S_IFSOCK: 'socket',
S_IFLNK: 'symlink',
S_IFREG: 'regular file',
S_IFBLK: 'blockdev',
S_IFDIR: 'directory',
S_IFCHR: 'chardev',
S_IFIFO: 'fifo',
}
def fdinfo(fd: int) -> str:
result = []
stat = os.fstat(fd)
type = S_IFMT(stat.st_mode)
result.append(f'type: {fdtypes.get(type, f"UNKNOWN: {type}")}')
if S_ISREG(stat.st_mode):
result.append(f'size: {stat.st_size}')
orig_offset = os.lseek(fd, 0, os.SEEK_CUR)
os.lseek(fd, 0, os.SEEK_SET)
head = os.read(fd, 256)
result.extend(hexdump(head))
os.lseek(fd, orig_offset, os.SEEK_SET)
return result
def fd_readlink(fd: int, pid: int | None =None) -> str:
if pid is None:
pid = os.getpid()
fd_link = Path('/proc') / str(pid) / 'fd' / str(fd)
try:
return os.readlink(fd_link)
except OSError as e:
return f"Could not read {fd_link}: {e.strerror} (errno {e.errno})"
def dirprintlines(dir: Direction, lines, file=sys.stdout, indent=''):
if isinstance(lines, str): lines = [lines]
lines_to_print = [f'{dir.sigil} {indent}{line}' for line in lines]
if True or COLOR:
lines_to_print = [termcolor.colored(line, dir.color) for line in lines_to_print]
print(*lines_to_print, sep='\n', file=file)
def show_fd(dir, fd, /, indent=' '):
# Here we decide what to show and format it. We leave any particular
# decisions on other functions.
dirprintlines(dir, f'link: {fd_readlink(fd)}', indent=indent)
dirprintlines(dir, fdinfo(fd), indent=indent)
# Maps from integers to symbolic names, acc to unix(7)
sockopts = {getattr(sk, name): name for name in(
'SO_PASSCRED',
'SO_PASSSEC',
# 'SO_PEEK_OFF', # wtf python does not have this
'SO_PEERCRED',
'SO_PEERSEC',
)}
anclevels = {getattr(sk, name): name for name in(
'SOL_SOCKET',
)}
anctypes = {getattr(sk, name): name for name in(
'SCM_RIGHTS',
'SCM_CREDENTIALS',
# 'SCM_SECURITY', # wtf python does not have this
)}
def show_anc(dir, a, /, indent=' ') -> list[int]:
"returns a possible list of file descriptors"
result = []
level, type, data = a
if level == sk.SOL_SOCKET and type == sk.SCM_RIGHTS:
# A file descriptor
fds = array('i')
fds.frombytes(data)
dirprintlines(dir, f'Received {len(fds)} file descriptor{"s"*(len(fds)!=1)}', indent=indent)
result = list(fds)
for fd in fds:
show_fd(dir, fd, indent=2*indent)
else:
dirprintlines(dir, 'Cannot show details.', indent=indent)
return result
def fwd_traffic(frm: sk.socket, to: sk.socket, dir: Direction):
chunk_size = 640*1024 # … idk, ought to be enough for anyone…
fds = [] # potential fds
data, anc, flags, addr = frm.recvmsg(chunk_size, chunk_size)
dirprintlines(dir, f'Forwarding {len(data)} bytes and {len(anc)} ancillary from {addr} to {to}')
dirprintlines(dir, hexdump(data), indent=' ')
for i,a in enumerate(anc):
anclevel, anctype, _ancdata = a
dirprintlines(dir, f'Ancillary #{i}: level {anclevels[anclevel]}, type {anctypes[anctype]}')
fds = show_anc(dir, a)
to.sendmsg([data], anc, flags)
if fds: log.debug(f'Closing fds: {fds}')
for fd in fds: os.close(fd)
def main():
setup_logging()
listen_fname = sys.argv[1]
connect_fname = sys.argv[2]
listen_skt = sk.socket(sk.AF_UNIX, sk.SOCK_STREAM)
connect_skt = sk.socket(sk.AF_UNIX, sk.SOCK_STREAM)
# In case we get data on connect, we should not connect to the "server"
# socket before a "client" connects to us.
# However, we should at least "warn fast"
if not Path(connect_fname).exists():
log.warning(f"Socket {connect_fname} does not currently exist.")
elif not Path(connect_fname).is_socket():
log.warning(f"{connect_fname} is not a socket.")
listen_skt.bind(listen_fname)
listen_skt.listen()
log.info(f'Listening on {listen_fname}')
client_skt, _addrinfo = listen_skt.accept() # blocks
log.info('Incoming connection')
if True or ONE_CLIENT:
# We only support one client, so we can clean up the listening socket.
# TODO: We _could_ actually support multiple clients, by connect()ing to
# the compositor multiple times. (The code is almost ready apart from
# the fact that the select loop does not expect accepting connections.)
# However, at that point this should be probably written asynchronously…
listen_skt.close()
os.unlink(listen_fname)
connect_skt.connect(connect_fname)
log.debug(f'Succesfully connected to {connect_fname}')
sel = selectors.DefaultSelector()
sel.register(client_skt, selectors.EVENT_READ,
(client_skt, connect_skt, Direction('>', 'blue')))
sel.register(connect_skt, selectors.EVENT_READ,
(connect_skt, client_skt, Direction('<', 'yellow')))
log.debug('Starting processing loop.')
while True:
events = sel.select()
log.debug(f'We have {len(events)} event{"s"*(len(events)!=1)} to process')
for key, _what in events:
frm, to, dir = key.data # fuck python having "from" as a keyword
fwd_traffic(frm, to, dir)
log.debug('Events processed.')
if __name__ == '__main__': main()
# TODO: pcap(ng) dumping, argumets for coloring, …