First version

Very MVP, but it seems to be able to pass both data and fds in both
directions. No dumping yet.
master
LEdoian 10 months ago
parent 3eefff68cf
commit 497dc20949

@ -0,0 +1,88 @@
#!/usr/bin/env python3
import socket as sk
from array import array
import os
import sys
from pathlib import Path
from stat import * # suggested usage…
import selectors
from dataclasses import dataclass
import logging as log
# Note: get/setrlimit are in resource module
def setup_logging():
log.basicConfig(
format='%(module)s: %(levelname)s: %(message)s',
level=log.DEBUG,
)
@dataclass
class Direction:
sigil: str
color: str
def hexdump(data: bytes) -> str:
# TODO: dump somehow
...
def fdinfo(fd: int) -> str:
stat = os.fstat(fd)
# TODO: return something
def fd_readlink(fd: int, pid: int | None =None) -> str:
if pid is None:
pid = os.getpid()
fd_link = Path('/proc') / str(pid) / 'fd' / str(fd)
try:
return os.readlink(fd_link)
except OSError as e:
return f"Could not read {fd_link}: {e.strerror} (errno {e.errno})"
def fwd_traffic(frm: sk.socket, to: sk.socket, dir: Direction):
print(f'{dir.sigil} there be data from {frm}')
chunk_size = 640*1024 # … idk, ought to be enough for anyone…
data, anc, flags, _addr = frm.recvmsg(chunk_size, chunk_size)
# TODO: inspect
to.sendmsg([data], anc, flags)
def main():
setup_logging()
listen_fname = sys.argv[1]
connect_fname = sys.argv[2]
listen_skt = sk.socket(sk.AF_UNIX, sk.SOCK_STREAM)
connect_skt = sk.socket(sk.AF_UNIX, sk.SOCK_STREAM)
# In case we get data on connect, we should not connect to the "server"
# socket before a "client" connects to us.
# However, we should at least "warn fast"
if not Path(connect_fname).exists():
log.warning(f"Socket {connect_fname} does not currently exist.")
elif not Path(connect_fname).is_socket():
log.warning(f"{connect_fname} is not a socket.")
listen_skt.bind(listen_fname)
listen_skt.listen()
log.info(f'Listening on {listen_fname}')
client_skt, _addrinfo = listen_skt.accept() # blocks
log.info('Incoming connection')
connect_skt.connect(connect_fname)
log.debug(f'Succesfully connected to {connect_fname}')
sel = selectors.DefaultSelector()
sel.register(client_skt, selectors.EVENT_READ,
(client_skt, connect_skt, Direction('>', 'blue')))
sel.register(connect_skt, selectors.EVENT_READ,
(connect_skt, client_skt, Direction('<', 'yellow')))
log.debug('Starting processing loop.')
while True:
events = sel.select()
log.debug(f'We have {len(events)} event{"s"*(len(events)!=1)} to process')
for key, _what in events:
frm, to, dir = key.data # fuck python having "from" as a keyword
fwd_traffic(frm, to, dir)
log.debug('Events processed.')
if __name__ == '__main__': main()
# TODO: pcap(ng) dumping, argumets for coloring, …
Loading…
Cancel
Save