#!/usr/bin/env python3
from birdvisu . annotations import AnnotatedTopology , AnnotatorID
from birdvisu . annotations . analysis import TopologyDifference , ShortestPathTree
from birdvisu . annotations . layout import MegaStyler
from birdvisu . ospfsock import BirdSocketConnection
from birdvisu . providers import BirdSocketTopologyProvider , OspfFileTopologyProvider , OspfFileParseError
from birdvisu . topo_v3 import TopologyV3 , VertexID
from collections import defaultdict
from enum import Enum , auto
from ipaddress import IPv4Address
from PySide6 import QtCore , QtGui , QtWidgets
from PySide6 . QtCore import Slot
from random import randint
import sys
app = QtWidgets . QApplication ( [ ] )
class MyGraphicsRectItem ( QtWidgets . QGraphicsRectItem ) :
def __init__ ( self , nei , shapes , * a , * * kwa ) :
self . nei = nei
self . shapes = shapes
return super ( ) . __init__ ( * a , * * kwa )
#def itemChange(self, change, val):
# return super().itemChange(change, val)
def mouseMoveEvent ( self , evt ) :
vtxid = self . data ( 0 )
for e in self . nei [ vtxid ] :
x1 = self . x ( )
y1 = self . y ( )
other = e . source if e . source != vtxid else e . target
x2 = self . shapes [ other ] . x ( )
y2 = self . shapes [ other ] . y ( )
qlinef = QtCore . QLineF ( x1 , y1 , x2 , y2 )
self . shapes [ e ] . setLine ( qlinef )
return super ( ) . mouseMoveEvent ( evt )
class BirdTopologyLoader ( QtWidgets . QDialog ) :
def __init__ ( self , * a , * * kwa ) :
super ( ) . __init__ ( * a , * * kwa )
self . setModal ( True )
self . result_ = ( None , None , None )
outer = QtWidgets . QVBoxLayout ( self )
# Area
inner = QtWidgets . QHBoxLayout ( )
inner . addWidget ( QtWidgets . QLabel ( ' Area: ' ) )
line = QtWidgets . QLineEdit ( self )
line . setText ( ' -1 ' )
self . line = line
inner . addWidget ( line )
outer . addLayout ( inner )
inner = QtWidgets . QHBoxLayout ( )
inner . addWidget ( QtWidgets . QLabel ( ' Instance: ' ) )
combo = QtWidgets . QComboBox ( )
combo . addItems ( self . get_instances ( ) )
self . combo = combo
inner . addWidget ( combo )
outer . addLayout ( inner )
inner = QtWidgets . QHBoxLayout ( )
inner . addWidget ( QtWidgets . QLabel ( ' Protocol: ' ) )
combo = QtWidgets . QComboBox ( )
combo . addItems ( [ ' Guess! ' , ' OSPFv2 ' , ' OSPFv3 ' ] )
self . combo2 = combo
inner . addWidget ( combo )
outer . addLayout ( inner )
buttons = QtWidgets . QDialogButtonBox ( QtWidgets . QDialogButtonBox . Ok | QtWidgets . QDialogButtonBox . Cancel )
buttons . accepted . connect ( self . saveResult )
buttons . accepted . connect ( self . accept )
buttons . rejected . connect ( self . reject )
outer . addWidget ( buttons )
@Slot ( )
def saveResult ( self ) :
try :
area = int ( self . line . text ( ) )
except ValueError :
area = int ( IPv4Address ( self . line . text ( ) ) )
if area == - 1 : area = None
inst = self . combo . currentText ( )
proto = {
' OSPFv2 ' : 2 ,
' OSPFv3 ' : 3 ,
' Guess! ' : None ,
} [ self . combo2 . currentText ( ) ]
self . result_ = ( area , inst , proto )
def get_instances ( self ) :
# Our code is stupid, so we initialise a BirdSocketTopologyProvider just to get instances and then drop it.
prov = BirdSocketTopologyProvider ( )
bird = BirdSocketConnection ( )
return prov . find_running_ospf ( bird )
class MainWindow ( QtWidgets . QMainWindow ) :
class Tool ( Enum ) :
MoveTopology = auto ( )
ShortestPath = auto ( )
def create_menus ( self ) :
print ( ' Creating menus… ' )
self . menubar = self . menuBar ( )
mode_menu = self . menubar . addMenu ( ' &Mode ' )
short_path_act = QtGui . QAction ( " Sh. path &DAG " , self )
short_path_act . triggered . connect ( self . shortestPathMode )
mode_menu . addAction ( short_path_act )
# Hack!
autoload_act = QtGui . QAction ( " &Load automatically " , self )
autoload_act . triggered . connect ( self . autoLoad )
self . menubar . addAction ( autoload_act )
topo_menu = self . menubar . addMenu ( ' &Topology ' )
open_ref_act = QtGui . QAction ( " &Load reference " , self )
open_ref_act . triggered . connect ( self . openRefTopology )
topo_menu . addAction ( open_ref_act )
cur_topo_menu = topo_menu . addMenu ( " Load ¤t " )
running_bird_act = QtGui . QAction ( ' &BIRD ' , self )
running_bird_act . triggered . connect ( self . curTopologyFromBird )
cur_topo_menu . addAction ( running_bird_act )
from_file_act = QtGui . QAction ( ' &from file ' , self )
from_file_act . triggered . connect ( self . curTopologyFromFile )
cur_topo_menu . addAction ( from_file_act )
refresh_act = QtGui . QAction ( " &Refresh " , self )
refresh_act . triggered . connect ( self . refreshTopologies )
topo_menu . addAction ( refresh_act )
def __init__ ( self , * a , * * kwa ) :
super ( ) . __init__ ( * a , * * kwa )
self . ref_topo_provider = None
self . cur_topo_provider = None
self . set_initial_annotators ( )
self . annot_topo = None
self . highlighter = None
self . tool = self . Tool . MoveTopology
self . scene = QtWidgets . QGraphicsScene ( )
self . view = QtWidgets . QGraphicsView ( self . scene )
self . view . setDragMode ( self . view . DragMode . ScrollHandDrag )
self . setCentralWidget ( self . view )
self . statusbar = self . statusBar ( )
self . statusbar . showMessage ( ' Hello! ' )
self . create_menus ( )
#Hack
@Slot ( )
def autoLoad ( self ) :
print ( ' Auto-loading… ' )
self . ref_topo_provider = OspfFileTopologyProvider ( ' ./empty.ospf ' )
self . cur_topo_provider = BirdSocketTopologyProvider ( instance = ' gennet4 ' , area = 1 , version = 2 )
self . refreshTopologies ( )
def set_initial_annotators ( self ) :
# We have three kinds of annotators:
# - The essential analytic ones (TopologyDifference)
# - The one that describes the current tool (when that is Annotator backed, like for ShortestPathTree)
# - The styling ones, that actually help visualise stuff (MegaStyler)
self . essential_annotators = [ AnnotatorID ( TopologyDifference ) ]
self . current_annotators = [ AnnotatorID ( ShortestPathTree , ( VertexID (
family = None ,
is_router = True ,
address = None ,
router_id = int ( IPv4Address ( ' 172.23.100.10 ' ) ) ,
dr_id = None ,
discriminator = None
) , ' current ' ) ) ,
]
self . styling_annotators = [ AnnotatorID ( MegaStyler , tuple ( self . essential_annotators + self . current_annotators ) ) ]
@Slot ( )
def shortestPathMode ( self ) :
self . tool = self . Tool . ShortestPath
@Slot ( )
def openRefTopology ( self ) :
filename = QtWidgets . QFileDialog . getOpenFileName ( self , ' Open reference topology ' , ' . ' , ' OSPF files(*.ospf);;All files(*) ' ) [ 0 ]
if filename == ' ' : return # Do nothing
self . ref_topo_provider = OspfFileTopologyProvider ( filename )
try :
ref_topo = self . ref_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad reference topology " , f " The reference topology seems to be malformed: { e } . \n Please select a valid reference topology. " )
self . ref_topo_provider = None
return
self . refreshTopologies ( )
@Slot ( )
def curTopologyFromBird ( self ) :
loader = BirdTopologyLoader ( self )
loader . exec ( )
area , instance , version = loader . result_
self . cur_topo_provider = BirdSocketTopologyProvider ( instance = instance , area = area , version = version )
try :
cur_topo = self . cur_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad current topology " , f " The current topology seems to be malformed: { e } . \n Please select a valid topology. " )
self . cur_topo_provider = None
return
self . refreshTopologies ( )
@Slot ( )
def curTopologyFromFile ( self ) :
filename = QtWidgets . QFileDialog . getOpenFileName ( self , ' Open current topology ' , ' . ' , ' OSPF files(*.ospf);;All files(*) ' ) [ 0 ]
if filename == ' ' : return # Do nothing
self . cur_topo_provider = OspfFileTopologyProvider ( filename )
try :
cur_topo = self . cur_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad current topology " , f " The current topology seems to be malformed: { e } . \n Please select a valid topology. " )
self . cur_topo_provider = None
return
self . refreshTopologies ( )
@Slot ( )
def refreshTopologies ( self ) :
# Pre-checks:
msg = ' '
if self . ref_topo_provider is None : msg + = ' Please select reference topology. '
if self . cur_topo_provider is None : msg + = ' Please select current topology. '
if msg :
self . statusbar . showMessage ( msg )
# Nothing more to do.
return
# We just drop anything we had before, since we will be re-reading all the files.
try :
ref_topo = self . ref_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad reference topology " , f " The reference topology seems to be malformed: { e } . \n Please select a valid reference topology. " )
self . ref_topo_provider = None
return
try :
cur_topo = self . cur_topo_provider . get_topology ( )
except OspfFileParseError as e :
warning = QtWidgets . QMessageBox . critical ( self , " Bad current topology " , f " The current topology seems to be malformed: { e } . \n Please select a valid topology. " )
self . cur_topo_provider = None
return
self . scene . clear ( )
combined_topology = TopologyV3 . combine_topologies ( reference = ref_topo , current = cur_topo )
combined_topology . freeze ( )
self . annot_topo = AnnotatedTopology ( combined_topology )
for ann_id in self . essential_annotators + self . current_annotators + self . styling_annotators :
self . annot_topo . run_annotator ( ann_id )
# Draw it
self . draw_visu ( )
def draw_visu ( self ) :
# just take the result of the MegaStyler and create vertices according to the style.
megastyler = self . styling_annotators [ - 1 ]
assert megastyler . annotator == MegaStyler
msann = self . annot_topo . annotations [ megastyler ]
self . graphicsitems : dict [ VertexID , QGraphicsItem ] = dict ( )
self . topologyitems : dict [ QGraphicsItem , VertexID | Edge ] = dict ( )
for vtxid , style in msann . for_vertex . items ( ) :
gritem = self . create_vertex ( vtxid , style )
self . graphicsitems [ vtxid ] = gritem
self . topologyitems [ gritem ] = vtxid
self . scene . addItem ( gritem )
for edge , style in msann . for_edge . items ( ) :
gritem = self . create_edge ( edge , style )
self . graphicsitems [ edge ] = gritem
self . topologyitems [ gritem ] = edge
self . scene . addItem ( gritem )
def create_vertex ( self , vtxid , style ) :
. . .
def create_edge ( self , edge , style ) :
. . .
def ad_hoc_draw_visu ( self ) :
shapes = dict ( )
self . nei = defaultdict ( lambda : [ ] )
for k , v in self . annot_topo . topology . vertices . items ( ) :
size = 30 if k . is_router else 10
x , y = randint ( 0 , 1920 ) , randint ( 0 , 1080 )
shape = MyGraphicsRectItem ( self . nei , shapes , - size / 2 , - size / 2 , size , size )
shape . setPos ( x , y )
# TODO:brush
label_text = str ( IPv4Address ( k . router_id ) ) if k . is_router else str ( k . address ) # Surprisingly works for all the possible addresses.
label = QtWidgets . QGraphicsSimpleTextItem ( label_text , parent = shape )
label . setY ( size * 0.8 )
text_width = label . boundingRect ( ) . width ( )
label . setX ( - text_width / 2 )
shape . setData ( 0 , k )
shape . setFlag ( QtWidgets . QGraphicsItem . ItemIsMovable | QtWidgets . QGraphicsItem . ItemIsSelectable )
shapes [ k ] = shape
for e in self . annot_topo . topology . edges :
start = shapes [ e . source ] . pos ( )
end = shapes [ e . target ] . pos ( )
qlinef = QtCore . QLineF ( start , end )
line = QtWidgets . QGraphicsLineItem ( qlinef )
line . setData ( 0 , e )
self . nei [ e . source ] . append ( e )
self . nei [ e . target ] . append ( e )
shapes [ e ] = line
if self . highlighter is not None and self . highlighter in self . annot_topo . annotations :
ann = self . annot_topo . annotations [ self . highlighter ]
for shk in ann . for_vertex . keys ( ) | ann . for_edge . keys ( ) :
pen = QtGui . QPen ( QtGui . QColor ( ' blue ' ) )
pen . setWidth ( pen . width ( ) * 3 )
c = pen . color ( )
c . setAlpha ( 128 )
pen . setColor ( c )
shapes [ shk ] . setPen ( pen )
# FIXME: also color edges in opposite direction
for sh in shapes . values ( ) : self . scene . addItem ( sh )
main_window = MainWindow ( )
main_window . show ( )
app . exec ( )