#!/usr/bin/env python3 from birdvisu.annotations import AnnotatedTopology, AnnotatorID from birdvisu.annotations.analysis import TopologyDifference, ShortestPathTree from birdvisu.annotations.layout import MegaStyler from birdvisu.ospfsock import BirdSocketConnection from birdvisu.providers import BirdSocketTopologyProvider, OspfFileTopologyProvider, OspfFileParseError from birdvisu.topo_v3 import TopologyV3, VertexID from birdvisu.graphics_items import RouterGraphicsItem, NetworkGraphicsItem, EdgeGraphicsItem from collections import defaultdict from enum import Enum, auto from ipaddress import IPv4Address from PySide6 import QtCore, QtGui, QtWidgets from PySide6.QtCore import Slot, QObject from random import randint import sys app = QtWidgets.QApplication([]) class BirdTopologyLoader(QtWidgets.QDialog): def __init__(self, *a, **kwa): super().__init__(*a, **kwa) self.setModal(True) self.result_ = (None, None, None) outer = QtWidgets.QVBoxLayout(self) # Area inner = QtWidgets.QHBoxLayout() inner.addWidget(QtWidgets.QLabel('Area:')) line = QtWidgets.QLineEdit(self) line.setText('-1') self.line = line inner.addWidget(line) outer.addLayout(inner) inner = QtWidgets.QHBoxLayout() inner.addWidget(QtWidgets.QLabel('Instance:')) combo = QtWidgets.QComboBox() combo.addItems(self.get_instances()) self.combo = combo inner.addWidget(combo) outer.addLayout(inner) inner = QtWidgets.QHBoxLayout() inner.addWidget(QtWidgets.QLabel('Protocol:')) combo = QtWidgets.QComboBox() combo.addItems(['Guess!', 'OSPFv2', 'OSPFv3']) self.combo2 = combo inner.addWidget(combo) outer.addLayout(inner) buttons = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel) buttons.accepted.connect(self.saveResult) buttons.accepted.connect(self.accept) buttons.rejected.connect(self.reject) outer.addWidget(buttons) @Slot() def saveResult(self): try: area = int(self.line.text()) except ValueError: area = int(IPv4Address(self.line.text())) if area == -1: area = None inst = self.combo.currentText() proto = { 'OSPFv2': 2, 'OSPFv3': 3, 'Guess!': None, }[self.combo2.currentText()] self.result_ = (area, inst, proto) def get_instances(self): # Our code is stupid, so we initialise a BirdSocketTopologyProvider just to get instances and then drop it. prov = BirdSocketTopologyProvider() bird = BirdSocketConnection() return prov.find_running_ospf(bird) class MainWindow(QtWidgets.QMainWindow): class Tool(Enum): MoveTopology = auto() ShortestPath = auto() def create_menus(self): print('Creating menus…') self.menubar = self.menuBar() mode_menu = self.menubar.addMenu('&Mode') short_path_act = QtGui.QAction("Sh. path &DAG", self) short_path_act.triggered.connect(self.shortestPathMode) mode_menu.addAction(short_path_act) # Hack! autoload_act = QtGui.QAction("&Load automatically", self) autoload_act.triggered.connect(self.autoLoad) self.menubar.addAction(autoload_act) topo_menu = self.menubar.addMenu('&Topology') open_ref_act = QtGui.QAction("&Load reference", self) open_ref_act.triggered.connect(self.openRefTopology) topo_menu.addAction(open_ref_act) cur_topo_menu = topo_menu.addMenu("Load ¤t") running_bird_act = QtGui.QAction('&BIRD', self) running_bird_act.triggered.connect(self.curTopologyFromBird) cur_topo_menu.addAction(running_bird_act) from_file_act = QtGui.QAction('&from file', self) from_file_act.triggered.connect(self.curTopologyFromFile) cur_topo_menu.addAction(from_file_act) refresh_act = QtGui.QAction("&Refresh", self) refresh_act.triggered.connect(self.refreshTopologies) topo_menu.addAction(refresh_act) def __init__(self, *a, **kwa): super().__init__(*a, **kwa) self.ref_topo_provider = None self.cur_topo_provider = None self.set_initial_annotators() self.annot_topo = None self.highlighter = None self.tool = self.Tool.MoveTopology self.scene = QtWidgets.QGraphicsScene() self.view = QtWidgets.QGraphicsView(self.scene) self.view.setDragMode(self.view.DragMode.ScrollHandDrag) self.setCentralWidget(self.view) self.statusbar = self.statusBar() self.statusbar.showMessage('Hello!') self.create_menus() self.autoLoad() #Hack @Slot() def autoLoad(self): print('Auto-loading…') self.ref_topo_provider = OspfFileTopologyProvider('./empty.ospf') self.cur_topo_provider = BirdSocketTopologyProvider(instance='gennet4', area=1, version=2) self.refreshTopologies() def set_initial_annotators(self): # We have three kinds of annotators: # - The essential analytic ones (TopologyDifference) # - The one that describes the current tool (when that is Annotator backed, like for ShortestPathTree) # - The styling ones, that actually help visualise stuff (MegaStyler) self.essential_annotators = [AnnotatorID(TopologyDifference)] self.current_annotators = [AnnotatorID(ShortestPathTree, (VertexID( family=None, is_router=True, address=None, router_id=int(IPv4Address('172.23.100.10')), dr_id=None, discriminator=None ), 'current')), ] self.styling_annotators = [AnnotatorID(MegaStyler, tuple(self.essential_annotators + self.current_annotators))] @Slot() def shortestPathMode(self): self.tool = self.Tool.ShortestPath @Slot() def openRefTopology(self): filename = QtWidgets.QFileDialog.getOpenFileName(self, 'Open reference topology', '.', 'OSPF files(*.ospf);;All files(*)')[0] if filename == '': return # Do nothing self.ref_topo_provider = OspfFileTopologyProvider(filename) try: ref_topo = self.ref_topo_provider.get_topology() except OspfFileParseError as e: warning = QtWidgets.QMessageBox.critical(self, "Bad reference topology", f"The reference topology seems to be malformed: {e}.\nPlease select a valid reference topology.") self.ref_topo_provider = None return self.refreshTopologies() @Slot() def curTopologyFromBird(self): loader = BirdTopologyLoader(self) loader.exec() area, instance, version = loader.result_ self.cur_topo_provider = BirdSocketTopologyProvider(instance=instance, area=area, version=version) try: cur_topo = self.cur_topo_provider.get_topology() except OspfFileParseError as e: warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.") self.cur_topo_provider = None return self.refreshTopologies() @Slot() def curTopologyFromFile(self): filename = QtWidgets.QFileDialog.getOpenFileName(self, 'Open current topology', '.', 'OSPF files(*.ospf);;All files(*)')[0] if filename == '': return # Do nothing self.cur_topo_provider = OspfFileTopologyProvider(filename) try: cur_topo = self.cur_topo_provider.get_topology() except OspfFileParseError as e: warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.") self.cur_topo_provider = None return self.refreshTopologies() @Slot() def refreshTopologies(self): # Pre-checks: msg = '' if self.ref_topo_provider is None: msg += 'Please select reference topology. ' if self.cur_topo_provider is None: msg += 'Please select current topology. ' if msg: self.statusbar.showMessage(msg) # Nothing more to do. return # We just drop anything we had before, since we will be re-reading all the files. try: ref_topo = self.ref_topo_provider.get_topology() except OspfFileParseError as e: warning = QtWidgets.QMessageBox.critical(self, "Bad reference topology", f"The reference topology seems to be malformed: {e}.\nPlease select a valid reference topology.") self.ref_topo_provider = None return try: cur_topo = self.cur_topo_provider.get_topology() except OspfFileParseError as e: warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.") self.cur_topo_provider = None return self.scene.clear() combined_topology = TopologyV3.combine_topologies(reference=ref_topo, current=cur_topo) combined_topology.freeze() self.annot_topo = AnnotatedTopology(combined_topology) for ann_id in self.essential_annotators + self.current_annotators + self.styling_annotators: self.annot_topo.run_annotator(ann_id) # Draw it self.draw_visu() def draw_visu(self): # just take the result of the MegaStyler and create vertices according to the style. megastyler = self.styling_annotators[-1] assert megastyler.annotator == MegaStyler msann = self.annot_topo.annotations[megastyler] self.graphicsitems: dict[VertexID, QGraphicsItem] = dict() self.topologyitems: dict[QGraphicsItem, VertexID|Edge] = dict() for vtxid, style in msann.for_vertex.items(): gritem = self.create_vertex(vtxid, style) self.graphicsitems[vtxid] = gritem self.topologyitems[gritem] = vtxid self.scene.addItem(gritem) for edge, style in msann.for_edge.items(): gritem = self.create_edge(edge, style) self.graphicsitems[edge] = gritem self.topologyitems[gritem] = edge self.scene.addItem(gritem) def create_vertex(self, vtxid, style): if vtxid.is_router: return RouterGraphicsItem(vtxid, style, self) else: return NetworkGraphicsItem(vtxid, style, self) def create_edge(self, edge, style): return EdgeGraphicsItem(edge, style, self) main_window = MainWindow() main_window.show() app.exec()