#!/usr/bin/env python3
from birdvisu . annotations import AnnotatedTopology , AnnotatorID
from birdvisu . annotations . analysis import TopologyDifference , ShortestPathTree
from birdvisu . annotations . layout import PlaceVerticesFromFile , PlaceUnplacedVertices , EdgeWidthByCost , HighlightTopoDiff , HighlightSPDAG , HighlightShortestPath
from birdvisu . ospfsock import BirdSocketConnection
from birdvisu . providers import BirdSocketTopologyProvider , OspfFileTopologyProvider , OspfFileParseError
from birdvisu . topo_v3 import TopologyV3 , VertexID
from birdvisu . graphics_items import RouterGraphicsItem , NetworkGraphicsItem , EdgeGraphicsItem
from collections import defaultdict
from enum import Enum , auto
from ipaddress import IPv4Address
from PySide6 import QtCore , QtGui , QtWidgets
from PySide6 . QtCore import Slot , QObject
from random import randint
import sys
app = QtWidgets . QApplication ( [ ] )
class BirdTopologyLoader ( QtWidgets . QDialog ) :
def __init__ ( self , * a , * * kwa ) :
super ( ) . __init__ ( * a , * * kwa )
self . setModal ( True )
self . result_ = ( None , None , None )
outer = QtWidgets . QVBoxLayout ( self )
# Area
inner = QtWidgets . QHBoxLayout ( )
inner . addWidget ( QtWidgets . QLabel ( ' Area: ' ) )
line = QtWidgets . QLineEdit ( self )
line . setText ( ' -1 ' )
self . line = line
inner . addWidget ( line )
outer . addLayout ( inner )
inner = QtWidgets . QHBoxLayout ( )
inner . addWidget ( QtWidgets . QLabel ( ' Instance: ' ) )
combo = QtWidgets . QComboBox ( )
combo . addItems ( self . get_instances ( ) )
self . combo = combo
inner . addWidget ( combo )
outer . addLayout ( inner )
inner = QtWidgets . QHBoxLayout ( )
inner . addWidget ( QtWidgets . QLabel ( ' Protocol: ' ) )
combo = QtWidgets . QComboBox ( )
combo . addItems ( [ ' Guess! ' , ' OSPFv2 ' , ' OSPFv3 ' ] )
self . combo2 = combo
inner . addWidget ( combo )
outer . addLayout ( inner )
buttons = QtWidgets . QDialogButtonBox ( QtWidgets . QDialogButtonBox . Ok | QtWidgets . QDialogButtonBox . Cancel )
buttons . accepted . connect ( self . saveResult )
buttons . accepted . connect ( self . accept )
buttons . rejected . connect ( self . reject )
outer . addWidget ( buttons )
@Slot ( )
def saveResult ( self ) :
try :
area = int ( self . line . text ( ) )
except ValueError :
area = int ( IPv4Address ( self . line . text ( ) ) )
if area == - 1 : area = None
inst = self . combo . currentText ( )
proto = {
' OSPFv2 ' : 2 ,
' OSPFv3 ' : 3 ,
' Guess! ' : None ,
} [ self . combo2 . currentText ( ) ]
self . result_ = ( area , inst , proto )
def get_instances ( self ) :
# Our code is stupid, so we initialise a BirdSocketTopologyProvider just to get instances and then drop it.
prov = BirdSocketTopologyProvider ( )
bird = BirdSocketConnection ( )
return prov . find_running_ospf ( bird )
class MainWindow ( QtWidgets . QMainWindow ) :
class Mode ( Enum ) :
ShortestPath = auto ( )
ShortestPathDAG = auto ( )
TopologyDifference = auto ( )
EdgeWeight = auto ( )
def create_menus ( self ) :
print ( ' Creating menus… ' )
self . menubar = self . menuBar ( )
mode_menu = self . menubar . addMenu ( ' &Highlight ' )
edge_weight_act = QtGui . QAction ( " Edge costs " , self )
edge_weight_act . triggered . connect ( self . edgeWeightMode )
mode_menu . addAction ( edge_weight_act )
topodiff_act = QtGui . QAction ( " Topology differences " , self )
topodiff_act . triggered . connect ( self . topoDiffMode )
mode_menu . addAction ( topodiff_act )
# Hack!
autoload_act = QtGui . QAction ( " &Load automatically " , self )
autoload_act . triggered . connect ( self . autoLoad )
self . menubar . addAction ( autoload_act )
topo_menu = self . menubar . addMenu ( ' &Topology ' )
open_ref_act = QtGui . QAction ( " &Load reference " , self )
open_ref_act . triggered . connect ( self . openRefTopology )
topo_menu . addAction ( open_ref_act )
cur_topo_menu = topo_menu . addMenu ( " Load ¤t " )
running_bird_act = QtGui . QAction ( ' &BIRD ' , self )
running_bird_act . triggered . connect ( self . curTopologyFromBird )
cur_topo_menu . addAction ( running_bird_act )
from_file_act = QtGui . QAction ( ' &from file ' , self )
from_file_act . triggered . connect ( self . curTopologyFromFile )
cur_topo_menu . addAction ( from_file_act )
refresh_act = QtGui . QAction ( " &Refresh " , self )
refresh_act . triggered . connect ( self . refreshTopologies )
topo_menu . addAction ( refresh_act )
positions = self . menubar . addMenu ( ' &Positions ' )
loadp = QtGui . QAction ( ' Load from file ' , self )
loadp . triggered . connect ( self . load_positions )
positions . addAction ( loadp )
def __init__ ( self , * a , * * kwa ) :
super ( ) . __init__ ( * a , * * kwa )
self . ref_topo_provider = None
self . cur_topo_provider = None
self . annot_topo = None
# The actual graph to show: list of neighbours and of edges
self . visu_graph : tuple [ dict [ VertexID , set [ VertexID ] ] , set [ VertexID , VertexID ] ] = None
self . highlighter = None
self . mode = None
self . positions : dict [ VertexID , tuple [ float , float ] ] = dict ( )
self . scene = QtWidgets . QGraphicsScene ( )
self . view = QtWidgets . QGraphicsView ( self . scene )
self . view . setDragMode ( self . view . DragMode . ScrollHandDrag )
self . setCentralWidget ( self . view )
self . statusbar = self . statusBar ( )
self . statusbar . showMessage ( ' Hello! ' )
self . create_menus ( )
self . edgeWeightMode ( )
self . autoLoad ( )
#Hack
@Slot ( )
def autoLoad ( self ) :
print ( ' Auto-loading… ' )
self . ref_topo_provider = OspfFileTopologyProvider ( ' ./empty.ospf ' )
self . cur_topo_provider = BirdSocketTopologyProvider ( instance = ' gennet4 ' , area = 1 , version = 2 )
self . refreshTopologies ( )
@Slot ( )
def load_positions ( self ) :
filename = QtWidgets . QFileDialog . getOpenFileName ( self , ' Open vertex positions ' , ' . ' , ' OSPF visualisation files (*.visu);;All files(*) ' ) [ 0 ]
if filename == ' ' : return # Do nothing
self . positions_from_file ( filename )
def positions_from_file ( self , fn ) :
if self . visu_graph is None : return # nothing to do yet.
# Let's be frank: this used to be StyleAnnotators and it is not re-implemented.
pvff = AnnotatorID ( PlaceVerticesFromFile , fn )
self . annot_topo . run_annotator ( pvff )
placements = PlaceUnplacedVertices ( pvff ) . annotate ( self . annot_topo )
self . positions = { v : d [ ' position ' ] for v , d in placements . for_vertex . items ( ) }
# Apply the positions:
for v , pos in self . positions . items ( ) :
x , y = pos
self . graphicsitems [ v ] . setPos ( x , y )
# Fix all the edges probably
for e in self . visu_graph [ 1 ] :
self . graphicsitems [ e ] . update_line ( )
@Slot ( )
def savePositions ( self ) :
. . .
@Slot ( )
def apply_styles ( self ) :
if self . visu_graph is None : return # First need graph.
styles_ant = self . highlighter . annotate ( self . annot_topo )
for_vertex = styles_ant . for_vertex
for_edge_ant = styles_ant . for_edge
# We must resolve conflicts
for_edge : dict [ tuple [ VertexID , VertexID ] , tuple [ Edge , dict ] ] = dict ( ) # → edge, styling dict.
for e , sty in for_edge_ant . items ( ) :
a , b = tuple ( sorted ( ( e . source , e . target ) ) )
if ( a , b ) not in for_edge :
for_edge [ ( a , b ) ] = ( e , sty )
if e . cost == 0 : continue # A collision that we do not care about
oe , _sty = for_edge [ ( a , b ) ]
if oe . cost == 0 or oe . cost > e . cost :
for_edge [ ( a , b ) ] = ( e , sty )
# Actually apply the style:
for v in self . visu_graph [ 0 ] . keys ( ) :
sty = for_vertex . get ( v , { } )
self . graphicsitems [ v ] . apply_style ( sty )
for e in self . visu_graph [ 1 ] :
sty = for_edge [ e ] [ 1 ] if e in for_edge else { }
self . graphicsitems [ e ] . apply_style ( sty )
@Slot ( )
def dagMode ( self , vtxid ) :
self . mode = self . Mode . ShortestPathDAG
self . highlighter = HighlightSPDAG ( vtxid )
self . start_vertex = vtxid
self . apply_styles ( )
@Slot ( )
def topoDiffMode ( self ) :
self . mode = self . Mode . TopologyDifference
self . highlighter = HighlightTopoDiff ( None )
self . apply_styles ( )
@Slot ( )
def edgeWeightMode ( self ) :
self . mode = self . Mode . EdgeWeight
self . highlighter = EdgeWidthByCost ( None )
self . apply_styles ( )
@Slot ( )
def shortestPathMode ( self , vtxid ) :
self . mode = self . Mode . ShortestPath
self . end_vertex = vtxid
self . highlighter = HighlightShortestPath ( ( self . start_vertex , self . end_vertex ) )
self . apply_styles ( )
@Slot ( )
def openRefTopology ( self ) :
filename = QtWidgets . QFileDialog . getOpenFileName ( self , ' Open reference topology ' , ' . ' , ' OSPF files (*.ospf);;All files(*) ' ) [ 0 ]
if filename == ' ' : return # Do nothing
self . ref_topo_provider = OspfFileTopologyProvider ( filename )
try :
ref_topo = self . ref_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad reference topology " , f " The reference topology seems to be malformed: { e } . \n Please select a valid reference topology. " )
self . ref_topo_provider = None
return
self . refreshTopologies ( )
@Slot ( )
def curTopologyFromBird ( self ) :
loader = BirdTopologyLoader ( self )
loader . exec ( )
area , instance , version = loader . result_
self . cur_topo_provider = BirdSocketTopologyProvider ( instance = instance , area = area , version = version )
try :
cur_topo = self . cur_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad current topology " , f " The current topology seems to be malformed: { e } . \n Please select a valid topology. " )
self . cur_topo_provider = None
return
self . refreshTopologies ( )
@Slot ( )
def curTopologyFromFile ( self ) :
filename = QtWidgets . QFileDialog . getOpenFileName ( self , ' Open current topology ' , ' . ' , ' OSPF files (*.ospf);;All files(*) ' ) [ 0 ]
if filename == ' ' : return # Do nothing
self . cur_topo_provider = OspfFileTopologyProvider ( filename )
try :
cur_topo = self . cur_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad current topology " , f " The current topology seems to be malformed: { e } . \n Please select a valid topology. " )
self . cur_topo_provider = None
return
self . refreshTopologies ( )
@Slot ( )
def refreshTopologies ( self ) :
# Pre-checks:
msg = ' '
if self . ref_topo_provider is None : msg + = ' Please select reference topology. '
if self . cur_topo_provider is None : msg + = ' Please select current topology. '
if msg :
self . statusbar . showMessage ( msg )
# Nothing more to do.
return
# We just drop anything we had before, since we will be re-reading all the files.
try :
ref_topo = self . ref_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad reference topology " , f " The reference topology seems to be malformed: { e } . \n Please select a valid reference topology. " )
self . ref_topo_provider = None
return
try :
cur_topo = self . cur_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad current topology " , f " The current topology seems to be malformed: { e } . \n Please select a valid topology. " )
self . cur_topo_provider = None
return
self . scene . clear ( )
combined_topology = TopologyV3 . combine_topologies ( reference = ref_topo , current = cur_topo )
combined_topology . freeze ( )
self . annot_topo = AnnotatedTopology ( combined_topology )
self . topo_to_graph ( )
self . draw_visu ( )
def topo_to_graph ( self ) :
""" Converts an AnnotatedTopology to a graph """
# We actually do not care about the annotations
topo = self . annot_topo . topology
neighbours = { v : set ( ) for v in topo . vertices . keys ( ) } # Neighbour lists
edges : set [ tuple [ VertexID , VertexID ] ] = set ( )
for e in topo . edges :
neighbours [ e . source ] . add ( e . target )
neighbours [ e . target ] . add ( e . source )
first = min ( [ e . source , e . target ] )
second = max ( [ e . source , e . target ] )
edges . add ( ( first , second ) )
self . visu_graph = ( neighbours , edges )
def draw_visu ( self ) :
self . graphicsitems : dict [ VertexID | tuple [ VertexID , VertexID ] , QGraphicsItem ] = dict ( )
for vtxid in self . visu_graph [ 0 ] . keys ( ) :
gritem = self . create_vertex ( vtxid )
self . graphicsitems [ vtxid ] = gritem
self . scene . addItem ( gritem )
for edge in self . visu_graph [ 1 ] :
gritem = self . create_edge ( edge )
self . graphicsitems [ edge ] = gritem
self . scene . addItem ( gritem )
self . positions_from_file ( None )
self . apply_styles ( )
def create_vertex ( self , vtxid ) :
if vtxid . is_router :
return RouterGraphicsItem ( vtxid , self )
else :
return NetworkGraphicsItem ( vtxid , self )
def create_edge ( self , edge ) :
return EdgeGraphicsItem ( edge , self )
main_window = MainWindow ( )
main_window . show ( )
app . exec ( )