You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
birdvisu/poor_mans_visualisation.py

267 lines
9.5 KiB
Python

#!/usr/bin/env python3
from birdvisu.annotations import AnnotatedTopology, AnnotatorID
from birdvisu.annotations.analysis import TopologyDifference, ShortestPathTree
from birdvisu.annotations.layout import MegaStyler
from birdvisu.ospfsock import BirdSocketConnection
from birdvisu.providers import BirdSocketTopologyProvider, OspfFileTopologyProvider, OspfFileParseError
from birdvisu.topo_v3 import TopologyV3, VertexID
from birdvisu.graphics_items import RouterGraphicsItem, NetworkGraphicsItem, EdgeGraphicsItem
from collections import defaultdict
from enum import Enum, auto
from ipaddress import IPv4Address
from PySide6 import QtCore, QtGui, QtWidgets
from PySide6.QtCore import Slot, QObject
from random import randint
import sys
app = QtWidgets.QApplication([])
class BirdTopologyLoader(QtWidgets.QDialog):
def __init__(self, *a, **kwa):
super().__init__(*a, **kwa)
self.setModal(True)
self.result_ = (None, None, None)
outer = QtWidgets.QVBoxLayout(self)
# Area
inner = QtWidgets.QHBoxLayout()
inner.addWidget(QtWidgets.QLabel('Area:'))
line = QtWidgets.QLineEdit(self)
line.setText('-1')
self.line = line
inner.addWidget(line)
outer.addLayout(inner)
inner = QtWidgets.QHBoxLayout()
inner.addWidget(QtWidgets.QLabel('Instance:'))
combo = QtWidgets.QComboBox()
combo.addItems(self.get_instances())
self.combo = combo
inner.addWidget(combo)
outer.addLayout(inner)
inner = QtWidgets.QHBoxLayout()
inner.addWidget(QtWidgets.QLabel('Protocol:'))
combo = QtWidgets.QComboBox()
combo.addItems(['Guess!', 'OSPFv2', 'OSPFv3'])
self.combo2 = combo
inner.addWidget(combo)
outer.addLayout(inner)
buttons = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel)
buttons.accepted.connect(self.saveResult)
buttons.accepted.connect(self.accept)
buttons.rejected.connect(self.reject)
outer.addWidget(buttons)
@Slot()
def saveResult(self):
try:
area = int(self.line.text())
except ValueError:
area = int(IPv4Address(self.line.text()))
if area == -1: area = None
inst = self.combo.currentText()
proto = {
'OSPFv2': 2,
'OSPFv3': 3,
'Guess!': None,
}[self.combo2.currentText()]
self.result_ = (area, inst, proto)
def get_instances(self):
# Our code is stupid, so we initialise a BirdSocketTopologyProvider just to get instances and then drop it.
prov = BirdSocketTopologyProvider()
bird = BirdSocketConnection()
return prov.find_running_ospf(bird)
class MainWindow(QtWidgets.QMainWindow):
class Tool(Enum):
MoveTopology = auto()
ShortestPath = auto()
def create_menus(self):
print('Creating menus…')
self.menubar = self.menuBar()
mode_menu = self.menubar.addMenu('&Mode')
short_path_act = QtGui.QAction("Sh. path &DAG", self)
short_path_act.triggered.connect(self.shortestPathMode)
mode_menu.addAction(short_path_act)
# Hack!
autoload_act = QtGui.QAction("&Load automatically", self)
autoload_act.triggered.connect(self.autoLoad)
self.menubar.addAction(autoload_act)
topo_menu = self.menubar.addMenu('&Topology')
open_ref_act = QtGui.QAction("&Load reference", self)
open_ref_act.triggered.connect(self.openRefTopology)
topo_menu.addAction(open_ref_act)
cur_topo_menu = topo_menu.addMenu("Load &current")
running_bird_act = QtGui.QAction('&BIRD', self)
running_bird_act.triggered.connect(self.curTopologyFromBird)
cur_topo_menu.addAction(running_bird_act)
from_file_act = QtGui.QAction('&from file', self)
from_file_act.triggered.connect(self.curTopologyFromFile)
cur_topo_menu.addAction(from_file_act)
refresh_act = QtGui.QAction("&Refresh", self)
refresh_act.triggered.connect(self.refreshTopologies)
topo_menu.addAction(refresh_act)
def __init__(self, *a, **kwa):
super().__init__(*a, **kwa)
self.ref_topo_provider = None
self.cur_topo_provider = None
self.set_initial_annotators()
self.annot_topo = None
self.highlighter = None
self.tool = self.Tool.MoveTopology
self.scene = QtWidgets.QGraphicsScene()
self.view = QtWidgets.QGraphicsView(self.scene)
self.view.setDragMode(self.view.DragMode.ScrollHandDrag)
self.setCentralWidget(self.view)
self.statusbar = self.statusBar()
self.statusbar.showMessage('Hello!')
self.create_menus()
self.autoLoad()
#Hack
@Slot()
def autoLoad(self):
print('Auto-loading…')
self.ref_topo_provider = OspfFileTopologyProvider('./empty.ospf')
self.cur_topo_provider = BirdSocketTopologyProvider(instance='gennet4', area=1, version=2)
self.refreshTopologies()
def set_initial_annotators(self):
# We have three kinds of annotators:
# - The essential analytic ones (TopologyDifference)
# - The one that describes the current tool (when that is Annotator backed, like for ShortestPathTree)
# - The styling ones, that actually help visualise stuff (MegaStyler)
self.essential_annotators = [AnnotatorID(TopologyDifference)]
self.current_annotators = [AnnotatorID(ShortestPathTree, (VertexID(
family=None,
is_router=True,
address=None,
router_id=int(IPv4Address('172.23.100.10')),
dr_id=None,
discriminator=None
), 'current')),
]
self.styling_annotators = [AnnotatorID(MegaStyler, tuple(self.essential_annotators + self.current_annotators))]
@Slot()
def shortestPathMode(self):
self.tool = self.Tool.ShortestPath
@Slot()
def openRefTopology(self):
filename = QtWidgets.QFileDialog.getOpenFileName(self, 'Open reference topology', '.', 'OSPF files(*.ospf);;All files(*)')[0]
if filename == '': return # Do nothing
self.ref_topo_provider = OspfFileTopologyProvider(filename)
try:
ref_topo = self.ref_topo_provider.get_topology()
except OspfFileParseError as e:
warning = QtWidgets.QMessageBox.critical(self, "Bad reference topology", f"The reference topology seems to be malformed: {e}.\nPlease select a valid reference topology.")
self.ref_topo_provider = None
return
self.refreshTopologies()
@Slot()
def curTopologyFromBird(self):
loader = BirdTopologyLoader(self)
loader.exec()
area, instance, version = loader.result_
self.cur_topo_provider = BirdSocketTopologyProvider(instance=instance, area=area, version=version)
try:
cur_topo = self.cur_topo_provider.get_topology()
except OspfFileParseError as e:
warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.")
self.cur_topo_provider = None
return
self.refreshTopologies()
@Slot()
def curTopologyFromFile(self):
filename = QtWidgets.QFileDialog.getOpenFileName(self, 'Open current topology', '.', 'OSPF files(*.ospf);;All files(*)')[0]
if filename == '': return # Do nothing
self.cur_topo_provider = OspfFileTopologyProvider(filename)
try:
cur_topo = self.cur_topo_provider.get_topology()
except OspfFileParseError as e:
warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.")
self.cur_topo_provider = None
return
self.refreshTopologies()
@Slot()
def refreshTopologies(self):
# Pre-checks:
msg = ''
if self.ref_topo_provider is None: msg += 'Please select reference topology. '
if self.cur_topo_provider is None: msg += 'Please select current topology. '
if msg:
self.statusbar.showMessage(msg)
# Nothing more to do.
return
# We just drop anything we had before, since we will be re-reading all the files.
try:
ref_topo = self.ref_topo_provider.get_topology()
except OspfFileParseError as e:
warning = QtWidgets.QMessageBox.critical(self, "Bad reference topology", f"The reference topology seems to be malformed: {e}.\nPlease select a valid reference topology.")
self.ref_topo_provider = None
return
try:
cur_topo = self.cur_topo_provider.get_topology()
except OspfFileParseError as e:
warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.")
self.cur_topo_provider = None
return
self.scene.clear()
combined_topology = TopologyV3.combine_topologies(reference=ref_topo, current=cur_topo)
combined_topology.freeze()
self.annot_topo = AnnotatedTopology(combined_topology)
for ann_id in self.essential_annotators + self.current_annotators + self.styling_annotators:
self.annot_topo.run_annotator(ann_id)
# Draw it
self.draw_visu()
def draw_visu(self):
# just take the result of the MegaStyler and create vertices according to the style.
megastyler = self.styling_annotators[-1]
assert megastyler.annotator == MegaStyler
msann = self.annot_topo.annotations[megastyler]
# These two dictionaries are used from outside, so that QGraphicsItems
# can resolve graph relations (e.g. to draw the line between the
# correct Vertices)
self.graphicsitems: dict[VertexID, QGraphicsItem] = dict()
self.topologyitems: dict[QGraphicsItem, VertexID|Edge] = dict()
for vtxid, style in msann.for_vertex.items():
gritem = self.create_vertex(vtxid, style)
self.graphicsitems[vtxid] = gritem
self.topologyitems[gritem] = vtxid
self.scene.addItem(gritem)
for edge, style in msann.for_edge.items():
gritem = self.create_edge(edge, style)
self.graphicsitems[edge] = gritem
self.topologyitems[gritem] = edge
self.scene.addItem(gritem)
def create_vertex(self, vtxid, style):
if vtxid.is_router:
return RouterGraphicsItem(vtxid, style, self)
else:
return NetworkGraphicsItem(vtxid, style, self)
def create_edge(self, edge, style):
return EdgeGraphicsItem(edge, style, self)
main_window = MainWindow()
main_window.show()
app.exec()