"""Simple utility to interface with BIRD using the control socket.""" from dataclasses import dataclass,field import socket as sk from string import digits from select import select class BirdError(Exception): def __init__(self, code, where, text): self.code = code self.start = where[0] self.end = where[1] self.text = text @dataclass class BirdResponse: text: str = '' codes: list[tuple[tuple[int, int], str]] = field(default_factory=list) # List of line ranges (inclusive) and the respective code. def raise_exceptions(self) -> None: # If the response contains any errors, raise them as BirdErrors # TODO Py3.11: use ExceptionGroups? # Until then, let's just report the first one? assert len(self.codes) >= 1, "Not a valid response." for where, code in self.codes: if code.startswith('9'): # Extract text start = where[0] - 1 end = where[1] print(start,end) errorlines = self.text.splitlines(keepends=True)[start:end] exc = BirdError(code, where, ''.join(errorlines)) raise exc class BirdSocketConnection: def __init__(self, sockpath='/run/bird/bird.ctl'): self.socket = sk.socket(sk.AF_UNIX, sk.SOCK_STREAM) self.socket.setblocking(False) self.socket.connect(sockpath) # Bird announces itself by printing version. We save the response just in case. self.bird_prologue = self._parse_response() def request(self, req: str) -> BirdResponse: if not req.endswith('\n'): req = req + '\n' binreq = req.encode() self.socket.send(binreq) return self._parse_response() def _parse_response(self) -> BirdResponse: # We even read from the socket here in order to have the whole protocol # described at one place. Therefore other parts of the code do not need # to know anything about the protocol (maybe apart from the meaning of # status codes, if they are interested…) result = BirdResponse() code : str | None = None # string because of leading zeroes, and it has no numeric interpretation anyway start = None cont = True # The response might not yet be ready rrdy, wrdy, xrdy = select([self.socket], [], [], 1) if len(rrdy) == 0: print('WTF, no response in 1 second?') f = self.socket.makefile('r') for i, line in enumerate(f, start=1): assert cont, "WTF bad format (should not continue)." if line.startswith(tuple(digits)): if start is not None: end = i-1 result.codes.append(((start, end), code)) code = line[:4] start = i cont = line[4] == '-' text = line[5:] else: assert cont == True text = line[1:] result.text += text # Invariant: LF as line terminator if not cont: # Finalization. We do not have i after the end of the cycle, # but this is the last iteration. end = i result.codes.append(((start, end), code)) break result.raise_exceptions() return result