You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
339 lines
12 KiB
Python
339 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from birdvisu.annotations import AnnotatedTopology, AnnotatorID
|
|
from birdvisu.annotations.analysis import TopologyDifference, ShortestPathTree
|
|
from birdvisu.annotations.layout import PlaceVerticesFromFile, PlaceUnplacedVertices, EdgeWidthByCost, HighlightTopoDiff, HighlightSPDAG, HighlightShortestPath
|
|
from birdvisu.ospfsock import BirdSocketConnection
|
|
from birdvisu.providers import BirdSocketTopologyProvider, OspfFileTopologyProvider, OspfFileParseError
|
|
from birdvisu.topo_v3 import TopologyV3, VertexID
|
|
from birdvisu.graphics_items import RouterGraphicsItem, NetworkGraphicsItem, EdgeGraphicsItem
|
|
|
|
from collections import defaultdict
|
|
from enum import Enum, auto
|
|
from ipaddress import IPv4Address
|
|
from PySide6 import QtCore, QtGui, QtWidgets
|
|
from PySide6.QtCore import Slot, QObject
|
|
from random import randint
|
|
import sys
|
|
|
|
app = QtWidgets.QApplication([])
|
|
|
|
class BirdTopologyLoader(QtWidgets.QDialog):
|
|
def __init__(self, *a, **kwa):
|
|
super().__init__(*a, **kwa)
|
|
self.setModal(True)
|
|
self.result_ = (None, None, None)
|
|
outer = QtWidgets.QVBoxLayout(self)
|
|
# Area
|
|
inner = QtWidgets.QHBoxLayout()
|
|
inner.addWidget(QtWidgets.QLabel('Area:'))
|
|
line = QtWidgets.QLineEdit(self)
|
|
line.setText('-1')
|
|
self.line = line
|
|
inner.addWidget(line)
|
|
outer.addLayout(inner)
|
|
|
|
inner = QtWidgets.QHBoxLayout()
|
|
inner.addWidget(QtWidgets.QLabel('Instance:'))
|
|
combo = QtWidgets.QComboBox()
|
|
combo.addItems(self.get_instances())
|
|
self.combo = combo
|
|
inner.addWidget(combo)
|
|
outer.addLayout(inner)
|
|
|
|
inner = QtWidgets.QHBoxLayout()
|
|
inner.addWidget(QtWidgets.QLabel('Protocol:'))
|
|
combo = QtWidgets.QComboBox()
|
|
combo.addItems(['Guess!', 'OSPFv2', 'OSPFv3'])
|
|
self.combo2 = combo
|
|
inner.addWidget(combo)
|
|
outer.addLayout(inner)
|
|
|
|
buttons = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel)
|
|
buttons.accepted.connect(self.saveResult)
|
|
buttons.accepted.connect(self.accept)
|
|
buttons.rejected.connect(self.reject)
|
|
outer.addWidget(buttons)
|
|
|
|
@Slot()
|
|
def saveResult(self):
|
|
try:
|
|
area = int(self.line.text())
|
|
except ValueError:
|
|
area = int(IPv4Address(self.line.text()))
|
|
if area == -1: area = None
|
|
inst = self.combo.currentText()
|
|
proto = {
|
|
'OSPFv2': 2,
|
|
'OSPFv3': 3,
|
|
'Guess!': None,
|
|
}[self.combo2.currentText()]
|
|
self.result_ = (area, inst, proto)
|
|
|
|
def get_instances(self):
|
|
# Our code is stupid, so we initialise a BirdSocketTopologyProvider just to get instances and then drop it.
|
|
prov = BirdSocketTopologyProvider()
|
|
bird = BirdSocketConnection()
|
|
return prov.find_running_ospf(bird)
|
|
|
|
|
|
class MainWindow(QtWidgets.QMainWindow):
|
|
class Mode(Enum):
|
|
ShortestPath = auto()
|
|
ShortestPathDAG = auto()
|
|
TopologyDifference = auto()
|
|
EdgeWeight = auto()
|
|
|
|
def create_menus(self):
|
|
print('Creating menus…')
|
|
self.menubar = self.menuBar()
|
|
mode_menu = self.menubar.addMenu('&Highlight')
|
|
edge_weight_act = QtGui.QAction("Edge costs", self)
|
|
edge_weight_act.triggered.connect(self.edgeWeightMode)
|
|
mode_menu.addAction(edge_weight_act)
|
|
topodiff_act = QtGui.QAction("Topology differences", self)
|
|
topodiff_act.triggered.connect(self.topoDiffMode)
|
|
mode_menu.addAction(topodiff_act)
|
|
|
|
# Hack!
|
|
autoload_act = QtGui.QAction("&Load automatically", self)
|
|
autoload_act.triggered.connect(self.autoLoad)
|
|
self.menubar.addAction(autoload_act)
|
|
|
|
topo_menu = self.menubar.addMenu('&Topology')
|
|
open_ref_act = QtGui.QAction("&Load reference", self)
|
|
open_ref_act.triggered.connect(self.openRefTopology)
|
|
topo_menu.addAction(open_ref_act)
|
|
|
|
cur_topo_menu = topo_menu.addMenu("Load ¤t")
|
|
running_bird_act = QtGui.QAction('&BIRD', self)
|
|
running_bird_act.triggered.connect(self.curTopologyFromBird)
|
|
cur_topo_menu.addAction(running_bird_act)
|
|
from_file_act = QtGui.QAction('&from file', self)
|
|
from_file_act.triggered.connect(self.curTopologyFromFile)
|
|
cur_topo_menu.addAction(from_file_act)
|
|
|
|
refresh_act = QtGui.QAction("&Refresh", self)
|
|
refresh_act.triggered.connect(self.refreshTopologies)
|
|
topo_menu.addAction(refresh_act)
|
|
|
|
positions = self.menubar.addMenu('&Positions')
|
|
loadp = QtGui.QAction('Load from file', self)
|
|
loadp.triggered.connect(self.load_positions)
|
|
positions.addAction(loadp)
|
|
|
|
def __init__(self, *a, **kwa):
|
|
super().__init__(*a, **kwa)
|
|
self.ref_topo_provider = None
|
|
self.cur_topo_provider = None
|
|
self.annot_topo = None
|
|
# The actual graph to show: list of neighbours and of edges
|
|
self.visu_graph: tuple[dict[VertexID, set[VertexID]], set[VertexID, VertexID]] = None
|
|
self.highlighter = None
|
|
self.mode = None
|
|
self.positions: dict[VertexID, tuple[float, float]] = dict()
|
|
self.scene = QtWidgets.QGraphicsScene()
|
|
self.view = QtWidgets.QGraphicsView(self.scene)
|
|
self.view.setDragMode(self.view.DragMode.ScrollHandDrag)
|
|
self.setCentralWidget(self.view)
|
|
self.statusbar = self.statusBar()
|
|
self.statusbar.showMessage('Hello!')
|
|
self.create_menus()
|
|
self.edgeWeightMode()
|
|
self.autoLoad()
|
|
|
|
#Hack
|
|
@Slot()
|
|
def autoLoad(self):
|
|
print('Auto-loading…')
|
|
self.ref_topo_provider = OspfFileTopologyProvider('./empty.ospf')
|
|
self.cur_topo_provider = BirdSocketTopologyProvider(instance='gennet4', area=1, version=2)
|
|
self.refreshTopologies()
|
|
|
|
@Slot()
|
|
def load_positions(self):
|
|
filename = QtWidgets.QFileDialog.getOpenFileName(self, 'Open vertex positions', '.', 'OSPF visualisation files (*.visu);;All files(*)')[0]
|
|
if filename == '': return # Do nothing
|
|
self.positions_from_file(filename)
|
|
|
|
def positions_from_file(self, fn):
|
|
if self.visu_graph is None: return # nothing to do yet.
|
|
# Let's be frank: this used to be StyleAnnotators and it is not re-implemented.
|
|
pvff = AnnotatorID(PlaceVerticesFromFile, fn)
|
|
self.annot_topo.run_annotator(pvff)
|
|
placements = PlaceUnplacedVertices(pvff).annotate(self.annot_topo)
|
|
self.positions = {v: d['position'] for v, d in placements.for_vertex.items()}
|
|
# Apply the positions:
|
|
for v, pos in self.positions.items():
|
|
x, y = pos
|
|
self.graphicsitems[v].setPos(x, y)
|
|
# Fix all the edges probably
|
|
for e in self.visu_graph[1]:
|
|
self.graphicsitems[e].update_line()
|
|
|
|
|
|
@Slot()
|
|
def savePositions(self):
|
|
...
|
|
|
|
@Slot()
|
|
def apply_styles(self):
|
|
if self.visu_graph is None: return # First need graph.
|
|
styles_ant = self.highlighter.annotate(self.annot_topo)
|
|
for_vertex = styles_ant.for_vertex
|
|
for_edge_ant = styles_ant.for_edge
|
|
# We must resolve conflicts
|
|
for_edge: dict[tuple[VertexID, VertexID], tuple[Edge, dict]] = dict() # → edge, styling dict.
|
|
for e, sty in for_edge_ant.items():
|
|
a, b = tuple(sorted((e.source, e.target)))
|
|
if (a,b) not in for_edge:
|
|
for_edge[(a,b)] = (e, sty)
|
|
if e.cost == 0: continue # A collision that we do not care about
|
|
oe, _sty = for_edge[(a,b)]
|
|
if oe.cost == 0 or oe.cost > e.cost:
|
|
for_edge[(a,b)] = (e, sty)
|
|
# Actually apply the style:
|
|
for v in self.visu_graph[0].keys():
|
|
sty = for_vertex.get(v, {})
|
|
self.graphicsitems[v].apply_style(sty)
|
|
for e in self.visu_graph[1]:
|
|
sty = for_edge[e][1] if e in for_edge else {}
|
|
self.graphicsitems[e].apply_style(sty)
|
|
|
|
@Slot()
|
|
def dagMode(self, vtxid):
|
|
self.mode = self.Mode.ShortestPathDAG
|
|
self.highlighter = HighlightSPDAG(vtxid)
|
|
self.start_vertex = vtxid
|
|
self.apply_styles()
|
|
|
|
@Slot()
|
|
def topoDiffMode(self):
|
|
self.mode = self.Mode.TopologyDifference
|
|
self.highlighter = HighlightTopoDiff(None)
|
|
self.apply_styles()
|
|
|
|
@Slot()
|
|
def edgeWeightMode(self):
|
|
self.mode = self.Mode.EdgeWeight
|
|
self.highlighter = EdgeWidthByCost(None)
|
|
self.apply_styles()
|
|
|
|
@Slot()
|
|
def shortestPathMode(self, vtxid):
|
|
self.mode = self.Mode.ShortestPath
|
|
self.end_vertex = vtxid
|
|
self.highlighter = HighlightShortestPath((self.start_vertex, self.end_vertex))
|
|
self.apply_styles()
|
|
|
|
@Slot()
|
|
def openRefTopology(self):
|
|
filename = QtWidgets.QFileDialog.getOpenFileName(self, 'Open reference topology', '.', 'OSPF files (*.ospf);;All files(*)')[0]
|
|
if filename == '': return # Do nothing
|
|
self.ref_topo_provider = OspfFileTopologyProvider(filename)
|
|
try:
|
|
ref_topo = self.ref_topo_provider.get_topology()
|
|
except OspfFileParseError as e:
|
|
warning = QtWidgets.QMessageBox.critical(self, "Bad reference topology", f"The reference topology seems to be malformed: {e}.\nPlease select a valid reference topology.")
|
|
self.ref_topo_provider = None
|
|
return
|
|
self.refreshTopologies()
|
|
|
|
@Slot()
|
|
def curTopologyFromBird(self):
|
|
loader = BirdTopologyLoader(self)
|
|
loader.exec()
|
|
area, instance, version = loader.result_
|
|
self.cur_topo_provider = BirdSocketTopologyProvider(instance=instance, area=area, version=version)
|
|
try:
|
|
cur_topo = self.cur_topo_provider.get_topology()
|
|
except OspfFileParseError as e:
|
|
warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.")
|
|
self.cur_topo_provider = None
|
|
return
|
|
self.refreshTopologies()
|
|
|
|
@Slot()
|
|
def curTopologyFromFile(self):
|
|
filename = QtWidgets.QFileDialog.getOpenFileName(self, 'Open current topology', '.', 'OSPF files (*.ospf);;All files(*)')[0]
|
|
if filename == '': return # Do nothing
|
|
self.cur_topo_provider = OspfFileTopologyProvider(filename)
|
|
try:
|
|
cur_topo = self.cur_topo_provider.get_topology()
|
|
except OspfFileParseError as e:
|
|
warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.")
|
|
self.cur_topo_provider = None
|
|
return
|
|
self.refreshTopologies()
|
|
|
|
@Slot()
|
|
def refreshTopologies(self):
|
|
# Pre-checks:
|
|
msg = ''
|
|
if self.ref_topo_provider is None: msg += 'Please select reference topology. '
|
|
if self.cur_topo_provider is None: msg += 'Please select current topology. '
|
|
if msg:
|
|
self.statusbar.showMessage(msg)
|
|
# Nothing more to do.
|
|
return
|
|
# We just drop anything we had before, since we will be re-reading all the files.
|
|
try:
|
|
ref_topo = self.ref_topo_provider.get_topology()
|
|
except OspfFileParseError as e:
|
|
warning = QtWidgets.QMessageBox.critical(self, "Bad reference topology", f"The reference topology seems to be malformed: {e}.\nPlease select a valid reference topology.")
|
|
self.ref_topo_provider = None
|
|
return
|
|
try:
|
|
cur_topo = self.cur_topo_provider.get_topology()
|
|
except OspfFileParseError as e:
|
|
warning = QtWidgets.QMessageBox.critical(self, "Bad current topology", f"The current topology seems to be malformed: {e}.\nPlease select a valid topology.")
|
|
self.cur_topo_provider = None
|
|
return
|
|
self.scene.clear()
|
|
combined_topology = TopologyV3.combine_topologies(reference=ref_topo, current=cur_topo)
|
|
combined_topology.freeze()
|
|
self.annot_topo = AnnotatedTopology(combined_topology)
|
|
self.topo_to_graph()
|
|
self.draw_visu()
|
|
|
|
def topo_to_graph(self):
|
|
"""Converts an AnnotatedTopology to a graph"""
|
|
# We actually do not care about the annotations
|
|
topo = self.annot_topo.topology
|
|
neighbours = {v: set() for v in topo.vertices.keys()} # Neighbour lists
|
|
edges: set[tuple[VertexID, VertexID]] = set()
|
|
for e in topo.edges:
|
|
neighbours[e.source].add(e.target)
|
|
neighbours[e.target].add(e.source)
|
|
first = min([e.source, e.target])
|
|
second = max([e.source, e.target])
|
|
edges.add((first, second))
|
|
self.visu_graph = (neighbours, edges)
|
|
|
|
def draw_visu(self):
|
|
self.graphicsitems: dict[VertexID|tuple[VertexID, VertexID], QGraphicsItem] = dict()
|
|
for vtxid in self.visu_graph[0].keys():
|
|
gritem = self.create_vertex(vtxid)
|
|
self.graphicsitems[vtxid] = gritem
|
|
self.scene.addItem(gritem)
|
|
for edge in self.visu_graph[1]:
|
|
gritem = self.create_edge(edge)
|
|
self.graphicsitems[edge] = gritem
|
|
self.scene.addItem(gritem)
|
|
self.positions_from_file(None)
|
|
self.apply_styles()
|
|
|
|
def create_vertex(self, vtxid):
|
|
if vtxid.is_router:
|
|
return RouterGraphicsItem(vtxid, self)
|
|
else:
|
|
return NetworkGraphicsItem(vtxid, self)
|
|
|
|
def create_edge(self, edge):
|
|
return EdgeGraphicsItem(edge, self)
|
|
|
|
|
|
main_window = MainWindow()
|
|
main_window.show()
|
|
app.exec()
|