#!/usr/bin/env python3
import sys
from birdvisu . providers import BirdSocketTopologyProvider , OspfFileTopologyProvider , OspfFileParseError
ref_topo_file = ' reference.ospf '
from birdvisu . topo_v3 import TopologyV3
from birdvisu . topo_v3 import VertexID
from birdvisu . annotations import AnnotatedTopology , AnnotatorID
from birdvisu . annotations . analysis import TopologyDifference , ShortestPathTree
from ipaddress import IPv4Address
from PySide6 import QtCore , QtGui , QtWidgets
app = QtWidgets . QApplication ( [ ] )
from random import randint
from collections import defaultdict
class MyGraphicsRectItem ( QtWidgets . QGraphicsRectItem ) :
def __init__ ( self , nei , shapes , * a , * * kwa ) :
self . nei = nei
self . shapes = shapes
return super ( ) . __init__ ( * a , * * kwa )
#def itemChange(self, change, val):
# return super().itemChange(change, val)
def mouseMoveEvent ( self , evt ) :
vtxid = self . data ( 0 )
for e in self . nei [ vtxid ] :
x1 = self . x ( )
y1 = self . y ( )
other = e . source if e . source != vtxid else e . target
x2 = self . shapes [ other ] . x ( )
y2 = self . shapes [ other ] . y ( )
qlinef = QtCore . QLineF ( x1 , y1 , x2 , y2 )
self . shapes [ e ] . setLine ( qlinef )
return super ( ) . mouseMoveEvent ( evt )
from enum import Enum , auto
from PySide6 . QtCore import Slot
from birdvisu . ospfsock import BirdSocketConnection
class BirdTopologyLoader ( QtWidgets . QDialog ) :
def __init__ ( self , * a , * * kwa ) :
super ( ) . __init__ ( * a , * * kwa )
self . setModal ( True )
self . result_ = ( None , None , None )
outer = QtWidgets . QVBoxLayout ( self )
# Area
inner = QtWidgets . QHBoxLayout ( )
inner . addWidget ( QtWidgets . QLabel ( ' Area: ' ) )
line = QtWidgets . QLineEdit ( self )
line . setText ( ' -1 ' )
self . line = line
inner . addWidget ( line )
outer . addLayout ( inner )
inner = QtWidgets . QHBoxLayout ( )
inner . addWidget ( QtWidgets . QLabel ( ' Instance: ' ) )
combo = QtWidgets . QComboBox ( )
combo . addItems ( self . get_instances ( ) )
self . combo = combo
inner . addWidget ( combo )
outer . addLayout ( inner )
inner = QtWidgets . QHBoxLayout ( )
inner . addWidget ( QtWidgets . QLabel ( ' Protocol: ' ) )
combo = QtWidgets . QComboBox ( )
combo . addItems ( [ ' Guess! ' , ' OSPFv2 ' , ' OSPFv3 ' ] )
self . combo2 = combo
inner . addWidget ( combo )
outer . addLayout ( inner )
buttons = QtWidgets . QDialogButtonBox ( QtWidgets . QDialogButtonBox . Ok | QtWidgets . QDialogButtonBox . Cancel )
buttons . accepted . connect ( self . saveResult )
buttons . accepted . connect ( self . accept )
buttons . rejected . connect ( self . reject )
outer . addWidget ( buttons )
@Slot ( )
def saveResult ( self ) :
try :
area = int ( self . line . text ( ) )
except ValueError :
area = int ( IPv4Address ( self . line . text ( ) ) )
if area == - 1 : area = None
inst = self . combo . currentText ( )
proto = {
' OSPFv2 ' : 2 ,
' OSPFv3 ' : 3 ,
' Guess! ' : None ,
} [ self . combo2 . currentText ( ) ]
self . result_ = ( area , inst , proto )
def get_instances ( self ) :
# Our code is stupid, so we initialise a BirdSocketTopologyProvider just to get instances and then drop it.
prov = BirdSocketTopologyProvider ( )
bird = BirdSocketConnection ( )
return prov . find_running_ospf ( bird )
class MainWindow ( QtWidgets . QMainWindow ) :
class Tool ( Enum ) :
MoveTopology = auto ( )
ShortestPath = auto ( )
def __init__ ( self , * a , * * kwa ) :
super ( ) . __init__ ( * a , * * kwa )
self . ref_topo_provider = None
self . cur_topo_provider = None
self . annotators = self . get_annotators ( )
self . annot_topo = None
self . highlighter = None
self . tool = self . Tool . MoveTopology
self . scene = QtWidgets . QGraphicsScene ( )
self . view = QtWidgets . QGraphicsView ( self . scene )
self . view . setDragMode ( self . view . DragMode . ScrollHandDrag )
self . setCentralWidget ( self . view )
self . statusbar = self . statusBar ( )
self . statusbar . showMessage ( ' Hello! ' )
self . menubar = self . menuBar ( )
mode_menu = self . menubar . addMenu ( ' &Mode ' )
short_path_act = QtGui . QAction ( " Sh. path &DAG " , self )
short_path_act . triggered . connect ( self . shortestPathMode )
mode_menu . addAction ( short_path_act )
topo_menu = self . menubar . addMenu ( ' &Topology ' )
open_ref_act = QtGui . QAction ( " &Load reference " , self )
open_ref_act . triggered . connect ( self . openRefTopology )
topo_menu . addAction ( open_ref_act )
cur_topo_menu = topo_menu . addMenu ( " Load ¤t " )
running_bird_act = QtGui . QAction ( ' &BIRD ' , self )
running_bird_act . triggered . connect ( self . curTopologyFromBird )
cur_topo_menu . addAction ( running_bird_act )
from_file_act = QtGui . QAction ( ' &from file ' , self )
from_file_act . triggered . connect ( self . curTopologyFromFile )
cur_topo_menu . addAction ( from_file_act )
refresh_act = QtGui . QAction ( " &Refresh " , self )
refresh_act . triggered . connect ( self . refreshTopologies )
topo_menu . addAction ( refresh_act )
def get_annotators ( self ) :
return [
AnnotatorID ( TopologyDifference ) ,
AnnotatorID ( ShortestPathTree , ( VertexID (
family = None ,
is_router = True ,
address = None ,
router_id = int ( IPv4Address ( ' 172.23.100.10 ' ) ) ,
dr_id = None ,
discriminator = None
) , ' current ' ) ) ,
]
@Slot ( )
def shortestPathMode ( self ) :
self . tool = self . Tool . ShortestPath
@Slot ( )
def openRefTopology ( self ) :
filename = QtWidgets . QFileDialog . getOpenFileName ( self , ' Open reference topology ' , ' . ' , ' OSPF files(*.ospf);;All files(*) ' ) [ 0 ]
if filename == ' ' : return # Do nothing
self . ref_topo_provider = OspfFileTopologyProvider ( filename )
try :
ref_topo = self . ref_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad reference topology " , f " The reference topology seems to be malformed: { e } . \n Please select a valid reference topology. " )
self . ref_topo_provider = None
return
self . refreshTopologies ( )
@Slot ( )
def curTopologyFromBird ( self ) :
loader = BirdTopologyLoader ( self )
loader . exec ( )
area , instance , version = loader . result_
self . cur_topo_provider = BirdSocketTopologyProvider ( instance = instance , area = area , version = version )
try :
cur_topo = self . cur_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad current topology " , f " The current topology seems to be malformed: { e } . \n Please select a valid topology. " )
self . cur_topo_provider = None
return
self . refreshTopologies ( )
@Slot ( )
def curTopologyFromFile ( self ) :
filename = QtWidgets . QFileDialog . getOpenFileName ( self , ' Open current topology ' , ' . ' , ' OSPF files(*.ospf);;All files(*) ' ) [ 0 ]
if filename == ' ' : return # Do nothing
self . cur_topo_provider = OspfFileTopologyProvider ( filename )
try :
cur_topo = self . cur_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad current topology " , f " The current topology seems to be malformed: { e } . \n Please select a valid topology. " )
self . cur_topo_provider = None
return
self . refreshTopologies ( )
@Slot ( )
def refreshTopologies ( self ) :
# Pre-checks:
msg = ' '
if self . ref_topo_provider is None : msg + = ' Please select reference topology. '
if self . cur_topo_provider is None : msg + = ' Please select current topology. '
if msg :
self . statusbar . showMessage ( msg )
# Nothing more to do.
return
# We just drop anything we had before, since we will be re-reading all the files.
try :
ref_topo = self . ref_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad reference topology " , f " The reference topology seems to be malformed: { e } . \n Please select a valid reference topology. " )
self . ref_topo_provider = None
return
try :
cur_topo = self . cur_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad current topology " , f " The current topology seems to be malformed: { e } . \n Please select a valid topology. " )
self . cur_topo_provider = None
return
self . scene . clear ( )
combined_topology = TopologyV3 . combine_topologies ( reference = ref_topo , current = cur_topo )
combined_topology . freeze ( )
self . annot_topo = AnnotatedTopology ( combined_topology )
for ann_id in self . annotators :
self . annot_topo . run_annotator ( ann_id )
# Draw it
self . set_current_highlighter ( )
self . ad_hoc_draw_visu ( )
def set_current_highlighter ( self ) :
# TODO!
self . highlighter = self . annotators [ - 1 ]
def ad_hoc_draw_visu ( self ) :
shapes = dict ( )
self . nei = defaultdict ( lambda : [ ] )
for k , v in self . annot_topo . topology . vertices . items ( ) :
size = 30 if k . is_router else 10
x , y = randint ( 0 , 1920 ) , randint ( 0 , 1080 )
shape = MyGraphicsRectItem ( self . nei , shapes , - size / 2 , - size / 2 , size , size )
shape . setPos ( x , y )
# TODO:brush
label_text = str ( IPv4Address ( k . router_id ) ) if k . is_router else str ( k . address ) # Surprisingly works for all the possible addresses.
label = QtWidgets . QGraphicsSimpleTextItem ( label_text , parent = shape )
label . setY ( size * 0.8 )
text_width = label . boundingRect ( ) . width ( )
label . setX ( - text_width / 2 )
shape . setData ( 0 , k )
shape . setFlag ( QtWidgets . QGraphicsItem . ItemIsMovable | QtWidgets . QGraphicsItem . ItemIsSelectable )
shapes [ k ] = shape
for e in self . annot_topo . topology . edges :
start = shapes [ e . source ] . pos ( )
end = shapes [ e . target ] . pos ( )
qlinef = QtCore . QLineF ( start , end )
line = QtWidgets . QGraphicsLineItem ( qlinef )
line . setData ( 0 , e )
self . nei [ e . source ] . append ( e )
self . nei [ e . target ] . append ( e )
shapes [ e ] = line
if self . highlighter is not None and self . highlighter in self . annot_topo . annotations :
ann = self . annot_topo . annotations [ self . highlighter ]
for shk in ann . for_vertex . keys ( ) | ann . for_edge . keys ( ) :
shapes [ shk ] . setPen ( QtGui . QPen ( QtGui . QColor ( ' blue ' ) ) )
for sh in shapes . values ( ) : self . scene . addItem ( sh )
main_window = MainWindow ( )
main_window . show ( )
app . exec ( )