#!/usr/bin/env python3 from birdvisu.annotations import AnnotatedTopology, AnnotatorID from birdvisu.annotations.analysis import TopologyDifference, ShortestPathTree from birdvisu.annotations.layout import PlaceVerticesFromFile, PlaceUnplacedVertices, EdgeWidthByCost, HighlightTopoDiff, HighlightSPDAG, HighlightShortestPath from birdvisu.ospfsock import BirdSocketConnection from birdvisu.providers import BirdSocketTopologyProvider, OspfFileTopologyProvider, OspfFileParseError from birdvisu.topo_v3 import TopologyV3, VertexID from birdvisu.graphics_items import RouterGraphicsItem, NetworkGraphicsItem, EdgeGraphicsItem from collections import defaultdict from enum import Enum, auto from ipaddress import IPv4Address from PySide6 import QtCore, QtGui, QtWidgets from PySide6.QtCore import Slot, QObject from random import randint import sys app = QtWidgets.QApplication([]) class BirdTopologyLoader(QtWidgets.QDialog): def __init__(self, *a, **kwa): super().__init__(*a, **kwa) self.setModal(True) self.result_ = (None, None, None) outer = QtWidgets.QVBoxLayout(self) # Area inner = QtWidgets.QHBoxLayout() inner.addWidget(QtWidgets.QLabel('Area:')) line = QtWidgets.QLineEdit(self) line.setText('-1') self.line = line inner.addWidget(line) outer.addLayout(inner) inner = QtWidgets.QHBoxLayout() inner.addWidget(QtWidgets.QLabel('Instance:')) combo = QtWidgets.QComboBox() combo.addItems(self.get_instances()) self.combo = combo inner.addWidget(combo) outer.addLayout(inner) inner = QtWidgets.QHBoxLayout() inner.addWidget(QtWidgets.QLabel('Protocol:')) combo = QtWidgets.QComboBox() combo.addItems(['Guess!', 'OSPFv2', 'OSPFv3']) self.combo2 = combo inner.addWidget(combo) outer.addLayout(inner) buttons = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel) buttons.accepted.connect(self.saveResult) buttons.accepted.connect(self.accept) buttons.rejected.connect(self.reject) outer.addWidget(buttons) @Slot() def saveResult(self): try: area = int(self.line.text()) except ValueError: area = int(IPv4Address(self.line.text())) if area == -1: area = None inst = self.combo.currentText() proto = { 'OSPFv2': 2, 'OSPFv3': 3, 'Guess!': None, }[self.combo2.currentText()] self.result_ = (area, inst, proto) def get_instances(self): # Our code is stupid, so we initialise a BirdSocketTopologyProvider just to get instances and then drop it. prov = BirdSocketTopologyProvider() bird = BirdSocketConnection() return prov.find_running_ospf(bird) class MainWindow(QtWidgets.QMainWindow): class Mode(Enum): ShortestPath = auto() ShortestPathDAG = auto() TopologyDifference = auto() EdgeWeight = auto() def create_menus(self): print('Creating menus…') self.menubar = self.menuBar() mode_menu = self.menubar.addMenu('&Mode') short_path_act = QtGui.QAction("Sh. path &DAG", self) short_path_act.triggered.connect(self.shortestPathMode) mode_menu.addAction(short_path_act) # Hack! autoload_act = QtGui.QAction("&Load automatically", self) autoload_act.triggered.connect(self.autoLoad) self.menubar.addAction(autoload_act) topo_menu = self.menubar.addMenu('&Topology') open_ref_act = QtGui.QAction("&Load reference", self) open_ref_act.triggered.connect(self.openRefTopology) topo_menu.addAction(open_ref_act) cur_topo_menu = topo_menu.addMenu("Load ¤t") running_bird_act = QtGui.QAction('&BIRD', self) running_bird_act.triggered.connect(self.curTopologyFromBird) cur_topo_menu.addAction(running_bird_act) from_file_act = QtGui.QAction('&from file', self) from_file_act.triggered.connect(self.curTopologyFromFile) cur_topo_menu.addAction(from_file_act) refresh_act = QtGui.QAction("&Refresh", self) refresh_act.triggered.connect(self.refreshTopologies) topo_menu.addAction(refresh_act) positions = self.menubar.addMenu('&Positions') loadp = QtGui.QAction('Load from file', self) loadp.triggered.connect(self.load_positions) positions.addAction(loadp) def __init__(self, *a, **kwa): super().__init__(*a, **kwa) self.ref_topo_provider = None self.cur_topo_provider = None self.annot_topo = None # The actual graph to show: list of neighbours and of edges self.visu_graph: tuple[dict[VertexID, set[VertexID]], set[VertexID, VertexID]] = None self.highlighter = None self.mode = None self.positions: dict[VertexID, tuple[float, float]] = dict() self.scene = QtWidgets.QGraphicsScene() self.view = QtWidgets.QGraphicsView(self.scene) self.view.setDragMode(self.view.DragMode.ScrollHandDrag) self.setCentralWidget(self.view) self.statusbar = self.statusBar() self.statusbar.showMessage('Hello!') self.create_menus() self.edgeWeightMode() self.autoLoad() #Hack @Slot() def autoLoad(self): print('Auto-loading…') self.ref_topo_provider = OspfFileTopologyProvider('./empty.ospf') self.cur_topo_provider = BirdSocketTopologyProvider(instance='gennet4', area=1, version=2) self.refreshTopologies() @Slot() def load_positions(self): filename = QtWidgets.QFileDialog.getOpenFileName(self, 'Open vertex positions', '.', 'OSPF files(*.visu);;All files(*)')[0] if filename == '': return # Do nothing self.positions_from_file(filename) def positions_from_file(self, fn): if self.visu_graph is None: return # nothing to do yet. # Let's be frank: this used to be StyleAnnotators and it is not re-implemented. pvff = AnnotatorID(PlaceVerticesFromFile, fn) self.annot_topo.run_annotator(pvff) placements = PlaceUnplacedVertices(pvff).annotate(self.annot_topo) self.positions = {v: d['position'] for v, d in placements.for_vertex.items()} # Apply the positions: for v, pos in self.positions.items(): x, y = pos self.graphicsitems[v].setPos(x, y) # Fix all the edges probably for e in self.visu_graph[1]: self.graphicsitems[e].update_line() @Slot() def savePositions(self): ... @Slot() def apply_styles(self): if self.visu_graph is None: return # First need graph. styles_ant = self.highlighter.annotate(self.annot_topo) for_vertex = styles_ant.for_vertex for_edge_ant = styles_ant.for_edge # We must resolve conflicts for_edge: dict[tuple[VertexID, VertexID], tuple[Edge, dict]] = dict() # → edge, styling dict. for e, sty in for_edge_ant.items(): a, b = tuple(sorted((e.source, e.target))) if (a,b) not in for_edge: for_edge[(a,b)] = (e, sty) if e.cost == 0: continue # A collision that we do not care about oe, _sty = for_edge[(a,b)] if oe.cost == 0 or oe.cost > e.cost: for_edge[(a,b)] = (e, sty) # Actually apply the style: for v, sty in for_vertex.items(): self.graphicsitems[v].apply_style(sty) for e, tup in for_edge.items(): self.graphicsitems[e].apply_style(tup[1]) @Slot() def dagMode(self): self.mode = self.Mode.ShortestPathDAG self.highlighter = HighlightSPDAG(...) self.apply_styles() @Slot() def topoDiffMode(self): self.mode = self.Mode.TopologyDifference self.highlighter = HighlightTopoDiff(None) self.apply_styles() @Slot() def edgeWeightMode(self): self.mode = self.Mode.EdgeWeight self.highlighter = EdgeWidthByCost(None) self.apply_styles() @Slot() def shortestPathMode(self): self.mode = self.Mode.ShortestPath ... @Slot() def openRefTopology(self): filename = QtWidgets.QFileDialog.getOpenFileName(self, 'Open reference topology', '.', 'OSPF files(*.ospf);;All files(*)')[0] if filename == '': return # Do nothing self.ref_topo_provider = OspfFileTopologyProvider(filename) try: ref_topo = self.ref_topo_provider.get_topology() except OspfFileParseError as e: warning = QtWidgets.QMessageBox.critical(self, "Bad reference topology", f"The reference topology seems to be malformed: {e}.\nPlease select a valid reference topology.") self.ref_topo_provider = None return self.refreshTopologies() @Slot() def curTopologyFromBird(self): loader = BirdTopologyLoader(self) loader.exec() area, instance, version = loader.result_ self.cur_topo_provider = BirdSocketTopologyProvider(instance=instance, area=area, version=version) try: cur_topo = self.cur_topo_provider.get_topology() except OspfFileParseError as e: warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.") self.cur_topo_provider = None return self.refreshTopologies() @Slot() def curTopologyFromFile(self): filename = QtWidgets.QFileDialog.getOpenFileName(self, 'Open current topology', '.', 'OSPF files(*.ospf);;All files(*)')[0] if filename == '': return # Do nothing self.cur_topo_provider = OspfFileTopologyProvider(filename) try: cur_topo = self.cur_topo_provider.get_topology() except OspfFileParseError as e: warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.") self.cur_topo_provider = None return self.refreshTopologies() @Slot() def refreshTopologies(self): # Pre-checks: msg = '' if self.ref_topo_provider is None: msg += 'Please select reference topology. ' if self.cur_topo_provider is None: msg += 'Please select current topology. ' if msg: self.statusbar.showMessage(msg) # Nothing more to do. return # We just drop anything we had before, since we will be re-reading all the files. try: ref_topo = self.ref_topo_provider.get_topology() except OspfFileParseError as e: warning = QtWidgets.QMessageBox.critical(self, "Bad reference topology", f"The reference topology seems to be malformed: {e}.\nPlease select a valid reference topology.") self.ref_topo_provider = None return try: cur_topo = self.cur_topo_provider.get_topology() except OspfFileParseError as e: warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.") self.cur_topo_provider = None return self.scene.clear() combined_topology = TopologyV3.combine_topologies(reference=ref_topo, current=cur_topo) combined_topology.freeze() self.annot_topo = AnnotatedTopology(combined_topology) self.topo_to_graph() self.draw_visu() def topo_to_graph(self): """Converts an AnnotatedTopology to a graph""" # We actually do not care about the annotations topo = self.annot_topo.topology neighbours = {v: set() for v in topo.vertices.keys()} # Neighbour lists edges: set[tuple[VertexID, VertexID]] = set() for e in topo.edges: neighbours[e.source].add(e.target) neighbours[e.target].add(e.source) first = min([e.source, e.target]) second = max([e.source, e.target]) edges.add((first, second)) self.visu_graph = (neighbours, edges) def draw_visu(self): self.graphicsitems: dict[VertexID|tuple[VertexID, VertexID], QGraphicsItem] = dict() for vtxid in self.visu_graph[0].keys(): gritem = self.create_vertex(vtxid) self.graphicsitems[vtxid] = gritem self.scene.addItem(gritem) for edge in self.visu_graph[1]: gritem = self.create_edge(edge) self.graphicsitems[edge] = gritem self.scene.addItem(gritem) self.positions_from_file(None) self.apply_styles() def create_vertex(self, vtxid): if vtxid.is_router: return RouterGraphicsItem(vtxid, self) else: return NetworkGraphicsItem(vtxid, self) def create_edge(self, edge): return EdgeGraphicsItem(edge, self) main_window = MainWindow() main_window.show() app.exec()