You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
birdvisu/poor_mans_visualisation.py

277 lines
9.4 KiB
Python

#!/usr/bin/env python3
import sys
from birdvisu.providers import BirdSocketTopologyProvider, OspfFileTopologyProvider, OspfFileParseError
ref_topo_file = 'reference.ospf'
from birdvisu.topo_v3 import TopologyV3
from birdvisu.topo_v3 import VertexID
from birdvisu.annotations import AnnotatedTopology, AnnotatorID
from birdvisu.annotations.analysis import TopologyDifference, ShortestPathTree
from ipaddress import IPv4Address
from PySide6 import QtCore, QtGui, QtWidgets
app = QtWidgets.QApplication([])
from random import randint
from collections import defaultdict
class MyGraphicsRectItem(QtWidgets.QGraphicsRectItem):
def __init__(self, nei, shapes, *a, **kwa):
self.nei = nei
self.shapes = shapes
return super().__init__(*a, **kwa)
#def itemChange(self, change, val):
# return super().itemChange(change, val)
def mouseMoveEvent(self, evt):
vtxid = self.data(0)
for e in self.nei[vtxid]:
x1 = self.x()
y1 = self.y()
other = e.source if e.source != vtxid else e.target
x2 = self.shapes[other].x()
y2 = self.shapes[other].y()
qlinef = QtCore.QLineF(x1, y1, x2, y2)
self.shapes[e].setLine(qlinef)
return super().mouseMoveEvent(evt)
from enum import Enum, auto
from PySide6.QtCore import Slot
from birdvisu.ospfsock import BirdSocketConnection
class BirdTopologyLoader(QtWidgets.QDialog):
def __init__(self, *a, **kwa):
super().__init__(*a, **kwa)
self.setModal(True)
self.result_ = (None, None, None)
outer = QtWidgets.QVBoxLayout(self)
# Area
inner = QtWidgets.QHBoxLayout()
inner.addWidget(QtWidgets.QLabel('Area:'))
line = QtWidgets.QLineEdit(self)
line.setText('-1')
self.line = line
inner.addWidget(line)
outer.addLayout(inner)
inner = QtWidgets.QHBoxLayout()
inner.addWidget(QtWidgets.QLabel('Instance:'))
combo = QtWidgets.QComboBox()
combo.addItems(self.get_instances())
self.combo = combo
inner.addWidget(combo)
outer.addLayout(inner)
inner = QtWidgets.QHBoxLayout()
inner.addWidget(QtWidgets.QLabel('Protocol:'))
combo = QtWidgets.QComboBox()
combo.addItems(['Guess!', 'OSPFv2', 'OSPFv3'])
self.combo2 = combo
inner.addWidget(combo)
outer.addLayout(inner)
buttons = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel)
buttons.accepted.connect(self.saveResult)
buttons.accepted.connect(self.accept)
buttons.rejected.connect(self.reject)
outer.addWidget(buttons)
@Slot()
def saveResult(self):
try:
area = int(self.line.text())
except ValueError:
area = int(IPv4Address(self.line.text()))
if area == -1: area = None
inst = self.combo.currentText()
proto = {
'OSPFv2': 2,
'OSPFv3': 3,
'Guess!': None,
}[self.combo2.currentText()]
self.result_ = (area, inst, proto)
def get_instances(self):
# Our code is stupid, so we initialise a BirdSocketTopologyProvider just to get instances and then drop it.
prov = BirdSocketTopologyProvider()
bird = BirdSocketConnection()
return prov.find_running_ospf(bird)
class MainWindow(QtWidgets.QMainWindow):
class Tool(Enum):
MoveTopology = auto()
ShortestPath = auto()
def __init__(self, *a, **kwa):
super().__init__(*a, **kwa)
self.ref_topo_provider = None
self.cur_topo_provider = None
self.annotators = self.get_annotators()
self.annot_topo = None
self.highlighter = None
self.tool = self.Tool.MoveTopology
self.scene = QtWidgets.QGraphicsScene()
self.view = QtWidgets.QGraphicsView(self.scene)
self.view.setDragMode(self.view.DragMode.ScrollHandDrag)
self.setCentralWidget(self.view)
self.statusbar = self.statusBar()
self.statusbar.showMessage('Hello!')
self.menubar = self.menuBar()
mode_menu = self.menubar.addMenu('&Mode')
short_path_act = QtGui.QAction("Sh. path &DAG", self)
short_path_act.triggered.connect(self.shortestPathMode)
mode_menu.addAction(short_path_act)
topo_menu = self.menubar.addMenu('&Topology')
open_ref_act = QtGui.QAction("&Load reference", self)
open_ref_act.triggered.connect(self.openRefTopology)
topo_menu.addAction(open_ref_act)
cur_topo_menu = topo_menu.addMenu("Load &current")
running_bird_act = QtGui.QAction('&BIRD', self)
running_bird_act.triggered.connect(self.curTopologyFromBird)
cur_topo_menu.addAction(running_bird_act)
from_file_act = QtGui.QAction('&from file', self)
from_file_act.triggered.connect(self.curTopologyFromFile)
cur_topo_menu.addAction(from_file_act)
refresh_act = QtGui.QAction("&Refresh", self)
refresh_act.triggered.connect(self.refreshTopologies)
topo_menu.addAction(refresh_act)
def get_annotators(self):
return [
AnnotatorID(TopologyDifference),
AnnotatorID(ShortestPathTree, (VertexID(
family=None,
is_router=True,
address=None,
router_id=int(IPv4Address('172.23.100.10')),
dr_id=None,
discriminator=None
), 'current')),
]
@Slot()
def shortestPathMode(self):
self.tool = self.Tool.ShortestPath
@Slot()
def openRefTopology(self):
filename = QtWidgets.QFileDialog.getOpenFileName(self, 'Open reference topology', '.', 'OSPF files(*.ospf);;All files(*)')[0]
if filename == '': return # Do nothing
self.ref_topo_provider = OspfFileTopologyProvider(filename)
try:
ref_topo = self.ref_topo_provider.get_topology()
except OspfFileParseError as e:
warning = QtWidgets.QMessageBox.critical(self, "Bad reference topology", f"The reference topology seems to be malformed: {e}.\nPlease select a valid reference topology.")
self.ref_topo_provider = None
return
self.refreshTopologies()
@Slot()
def curTopologyFromBird(self):
loader = BirdTopologyLoader(self)
loader.exec()
area, instance, version = loader.result_
self.cur_topo_provider = BirdSocketTopologyProvider(instance=instance, area=area, version=version)
try:
cur_topo = self.cur_topo_provider.get_topology()
except OspfFileParseError as e:
warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.")
self.cur_topo_provider = None
return
self.refreshTopologies()
@Slot()
def curTopologyFromFile(self):
filename = QtWidgets.QFileDialog.getOpenFileName(self, 'Open current topology', '.', 'OSPF files(*.ospf);;All files(*)')[0]
if filename == '': return # Do nothing
self.cur_topo_provider = OspfFileTopologyProvider(filename)
try:
cur_topo = self.cur_topo_provider.get_topology()
except OspfFileParseError as e:
warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.")
self.cur_topo_provider = None
return
self.refreshTopologies()
@Slot()
def refreshTopologies(self):
# Pre-checks:
msg = ''
if self.ref_topo_provider is None: msg += 'Please select reference topology. '
if self.cur_topo_provider is None: msg += 'Please select current topology. '
if msg:
self.statusbar.showMessage(msg)
# Nothing more to do.
return
# We just drop anything we had before, since we will be re-reading all the files.
try:
ref_topo = self.ref_topo_provider.get_topology()
except OspfFileParseError as e:
warning = QtWidgets.QMessageBox.critical(self, "Bad reference topology", f"The reference topology seems to be malformed: {e}.\nPlease select a valid reference topology.")
self.ref_topo_provider = None
return
try:
cur_topo = self.cur_topo_provider.get_topology()
except OspfFileParseError as e:
warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.")
self.cur_topo_provider = None
return
self.scene.clear()
combined_topology = TopologyV3.combine_topologies(reference=ref_topo, current=cur_topo)
combined_topology.freeze()
self.annot_topo = AnnotatedTopology(combined_topology)
for ann_id in self.annotators:
self.annot_topo.run_annotator(ann_id)
# Draw it
self.set_current_highlighter()
self.ad_hoc_draw_visu()
def set_current_highlighter(self):
# TODO!
self.highlighter = self.annotators[-1]
def ad_hoc_draw_visu(self):
shapes = dict()
self.nei = defaultdict(lambda: [])
for k, v in self.annot_topo.topology.vertices.items():
size = 30 if k.is_router else 10
x, y = randint(0, 1920), randint(0, 1080)
shape = MyGraphicsRectItem(self.nei, shapes, -size/2, -size/2, size, size)
shape.setPos(x,y)
# TODO:brush
label_text = str(IPv4Address(k.router_id)) if k.is_router else str(k.address) # Surprisingly works for all the possible addresses.
label = QtWidgets.QGraphicsSimpleTextItem(label_text, parent=shape)
label.setY(size*0.8)
text_width = label.boundingRect().width()
label.setX(-text_width/2)
shape.setData(0, k)
shape.setFlag(QtWidgets.QGraphicsItem.ItemIsMovable | QtWidgets.QGraphicsItem.ItemIsSelectable)
shapes[k] = shape
for e in self.annot_topo.topology.edges:
start = shapes[e.source].pos()
end = shapes[e.target].pos()
qlinef = QtCore.QLineF(start, end)
line = QtWidgets.QGraphicsLineItem(qlinef)
line.setData(0, e)
self.nei[e.source].append(e)
self.nei[e.target].append(e)
shapes[e] = line
if self.highlighter is not None and self.highlighter in self.annot_topo.annotations:
ann = self.annot_topo.annotations[self.highlighter]
for shk in ann.for_vertex.keys() | ann.for_edge.keys():
shapes[shk].setPen(QtGui.QPen(QtGui.QColor('blue')))
# FIXME: also color edges in opposite direction
for sh in shapes.values(): self.scene.addItem(sh)
main_window = MainWindow()
main_window.show()
app.exec()