New layout annotators
Now prettier, maybe. Enjoy the ugly code. But it is more nicely separated, tbh, so that is good progress. (In this repo, I try to create reasonable commit messages, right? :-D)topo-mov
parent
f2f6784363
commit
e8e0538e59
@ -0,0 +1,368 @@
|
||||
from . import Annotator, Annotation
|
||||
import .analysis as analysis
|
||||
from ..topo_v3 import VertexID, VertexType
|
||||
from ..ospffile import load
|
||||
|
||||
from collections.abc import Sequence
|
||||
from ipaddress import ip_network, IPv4Network
|
||||
from socket import AF_INET, AF_INET6
|
||||
|
||||
import math
|
||||
import random
|
||||
|
||||
def _parse_router_id(quaddot):
|
||||
from ipaddress import IPv4Address
|
||||
return int(IPv4Address(quaddot))
|
||||
|
||||
def _parse_details(det):
|
||||
result = dict()
|
||||
for detail, _chld in det:
|
||||
tag, *d = detail.split()
|
||||
if False: 'alignment'
|
||||
elif tag == 'position':
|
||||
result['position'] = tuple(map(float, d[:2]))
|
||||
# Put other known directives here. We are deliberately ignoring unknown, since details may include identifiers (designated routers &c.)
|
||||
return result
|
||||
|
||||
class StyleAnnotator(Annotator):
|
||||
"""Quasi-interface. These annotators promise that they only annotate
|
||||
vertices and edges with styling dictionaries.
|
||||
|
||||
These dictionaries can
|
||||
consist of following styling properties:
|
||||
For vertices:
|
||||
- position (xy; set by PlaceUnplacedVertices)
|
||||
- highlight_colour (rgba)
|
||||
For edges:
|
||||
- colour (rgba)
|
||||
- width (float)
|
||||
- highlight_colour (rgba)
|
||||
|
||||
Should a StyleAnnotator fail, it should mark the whole Topology as False
|
||||
and not fill any specific tags."""
|
||||
pass # Superfluous :-)
|
||||
|
||||
class PlaceVerticesFromFile(StyleAnnotator):
|
||||
"""Annotator to load vertex positions from a file and annotate the vertices with them.
|
||||
|
||||
The filename is either given to the Annotator, or ./visualisation.visu is used."""
|
||||
idempotent = False # The layout file could have changed, this is not covered by a TopologyV3
|
||||
def __init__(self, filename):
|
||||
if filename is None:
|
||||
filename = r'./visualisation.visu'
|
||||
self.filename = filename
|
||||
def annotate(self, topo):
|
||||
self.topo = topo
|
||||
self.result = Annotation()
|
||||
try:
|
||||
with open(self.filename, 'r') as f:
|
||||
positions = load(f)
|
||||
except OSError as e:
|
||||
print(f'WARN: Could not load positions from {self.filename}: {e}')
|
||||
self.result.for_topology = False
|
||||
return self.result
|
||||
|
||||
# TODO: cope with multiple presets in one file?
|
||||
for d, chl in positions:
|
||||
if d.startswith('visualisation '):
|
||||
self.add_positions(chl)
|
||||
break
|
||||
return self.result
|
||||
def add_positions(chl) -> None:
|
||||
# level-2 are standard directives like from BIRD. For networks, level-3 may contain details (dr / address, router for stubnets maybe?)
|
||||
for directive, details in chl:
|
||||
tag, *det = directive.split()
|
||||
if False: 'alignment'
|
||||
elif tag in ['router', 'xrouter']:
|
||||
router_id = _parse_router_id(det[0])
|
||||
vtxid = VertexID(router_id=router_id, is_router=True, address=None, family=None, dr_id=None, discriminator=None)
|
||||
annot = _parse_details(det)
|
||||
self.result.for_vertex[vtxid] = annot
|
||||
elif tag == 'vlink':
|
||||
raise ValueError('Being a virtual link is not a vertex attribute.')
|
||||
elif tag in ['external', 'xnetwork', 'stubnet']:
|
||||
rid = None
|
||||
addr = ip_network(det[0])
|
||||
family = AF_INET if isinstance(addr, IPv4Network) else AF_INET6
|
||||
if tag == 'stubnet':
|
||||
for d, _l4 in details:
|
||||
t, *rid = d.split()
|
||||
if t == 'router':
|
||||
rid = _parse_router_id(rid[0])
|
||||
break
|
||||
else:
|
||||
cand = self.topo.finder.find(address=addr, type=VertexType.StubNet)
|
||||
if len(cand) != 1:
|
||||
print(f'Bad number of candidates for {directive}')
|
||||
continue # Place with other way
|
||||
rid = cand.pop().router_id
|
||||
vtxid = VertexID(router_id=rid, is_router=False, address=addr, family=family, dr_if=None, discriminator=None)
|
||||
annot = _parse_details(det)
|
||||
self.result.for_vertex[vtxid] = annot
|
||||
elif tag == 'network':
|
||||
# We do not know the OSPF version, which is sad. Let us guess
|
||||
version = 3 if '-' in directive else 2
|
||||
if version == 3:
|
||||
net_label = det[0]
|
||||
rid, discr = net_label.lstrip(r'[').rstrip(r']').split(r'-')
|
||||
rid = _parse_router_id(rid)
|
||||
# find networks
|
||||
family = None
|
||||
addresses = []
|
||||
for d, _l4 in details:
|
||||
t, *addr = d.split()
|
||||
if t == 'address':
|
||||
addr = ip_network(addr[0])
|
||||
addresses.append(addr)
|
||||
family = AF_INET if isinstance(addr, IPv4Network) else AF_INET6
|
||||
if len(addresses) == 0: fin_addr = None
|
||||
elif len(addresses) == 1: fin_addr = addresses[0]
|
||||
else: fin_addr = tuple(addresses)
|
||||
vtxid = VertexID(dr_id=rid, discriminator=discr, is_router=False, address=fin_addr, family=family, router_id=None)
|
||||
else:
|
||||
addr = ip_network(det[0])
|
||||
# find designated router
|
||||
for d, _l4 in details:
|
||||
t, *rid = d.split()
|
||||
if t == 'dr':
|
||||
rid = _parse_router_id(rid[0])
|
||||
break
|
||||
else:
|
||||
# Try using finder
|
||||
cand = self.topo.finder.find(address=addr, type=VertexType.TransitNetwork)
|
||||
if len(cand) != 1:
|
||||
print(f'Bad number of candidates for {directive}')
|
||||
continue # Place with other way
|
||||
rid = cand.pop().dr_id
|
||||
vtxid = VertexID(dr_id=rid, discriminator=None, is_router=False, address=addr, family=AF_INET, router_id=None)
|
||||
annot = _parse_details(det)
|
||||
self.result.for_vertex[vtxid] = annot
|
||||
else: raise ValueError(f'Unknown directive: {directive}')
|
||||
|
||||
class PlaceUnplacedVertices(StyleAnnotator):
|
||||
"""Determine positions for unplaced vertices
|
||||
|
||||
Our approach is to add them to random places in proximity of already placed
|
||||
vertices. This way, the user can easily find the vertex and optionally move
|
||||
it to their liking."""
|
||||
def __init__(self, previous):
|
||||
self.previous = previous # If an annotator to place vertices has been run, this is the reference.
|
||||
def annotate(self, topo):
|
||||
# First, try to find existing placements
|
||||
if self.previous is None:
|
||||
previous_annotations = list(filter(lambda ann_id: ann_id.annotator == PlaceVerticesFromFile, topo.annotations.keys()))
|
||||
if len(previous_annotations) > 0:
|
||||
# Use the most potent previous annotation?
|
||||
best = max(previous_annotations, key=lambda ann_id: len(topo.annotations[ann_id].for_vertex))
|
||||
else: best = None
|
||||
else:
|
||||
best = self.previous
|
||||
|
||||
already_placed: set[VertexID] = set(topo.annotations[best].for_vertex.keys()) if best is not None else set()
|
||||
to_be_placed: set[VertexID] = topo.topology.vertices.keys() - already_placed
|
||||
|
||||
# Let's say that a proximity is any distance shorter than 10% of the current diameter
|
||||
# TODO: Too many edge cases (no vertices placed, one vertex placed,
|
||||
# yada yada), screw that, we just say that proximity is 200 px (or whatever the units are.)
|
||||
|
||||
# BFS: First place all the known vertices at their coordinates, then their neighbours, …
|
||||
queue: list[tuple[VertexID, float, float, bool]] = [] # Vertex, x, y, should we randomly move it (i.e. False for already placed)
|
||||
found_vertices = set()
|
||||
|
||||
result = Annotation()
|
||||
|
||||
if best is not None:
|
||||
for vtxid in already_placed:
|
||||
queue.append((
|
||||
vtxid,
|
||||
topo.annotations[best].for_vertex[vtxid]['position'][0],
|
||||
topo.annotations[best].for_vertex[vtxid]['position'][1],
|
||||
False,
|
||||
))
|
||||
found_vertices.add(vtxid)
|
||||
else:
|
||||
vtxid = to_be_placed.pop()
|
||||
queue.append((vtxid, 0, 0, False))
|
||||
found_vertices.add(vtxid)
|
||||
|
||||
while len(queue) > 0:
|
||||
vtxid, x, y, move = queue.pop(0) # Maybe bad code? (cf. deque)
|
||||
if move:
|
||||
angle = random.random() * 2 * math.pi
|
||||
distance = random.randint(20, 200)
|
||||
dx = distance * math.cos(angle)
|
||||
dy = distance * math.sin(angle)
|
||||
x += dx
|
||||
y += dy
|
||||
# place
|
||||
result.for_vertex[vtxid] = {'position': (x, y)}
|
||||
# add neigh
|
||||
for e in topo.topology.vertices[vtxid].incoming_edges:
|
||||
if e.source not in found_vertices:
|
||||
queue.append((e.source, x, y, True))
|
||||
found_vertices.add(e.source)
|
||||
for e in topo.topology.vertices[vtxid].outgoing_edges:
|
||||
if e.target not in found_vertices:
|
||||
queue.append((e.target, x, y, True))
|
||||
found_vertices.add(e.target)
|
||||
# If there are any vertices left, we place one of it at random and
|
||||
# repeat. This is unlikely though, because the discovered topology can
|
||||
# not be disconnected by definition of OSPF basic algorithm.
|
||||
# Contrary to programming idioms, we solve this here, so that the
|
||||
# code is not duplicated and preparation for the BFS is not complicated further
|
||||
if len(queue) == 0:
|
||||
remaining_vertices = to_be_placed - found_vertices
|
||||
if len(remaining_vertices) > 0:
|
||||
vtxid = remaining_vertices.pop()
|
||||
queue.append((
|
||||
vtxid,
|
||||
random.randint(0, 1000), # Will it blend?
|
||||
random.randint(0, 1000),
|
||||
False))
|
||||
found_vertices.add(vtxid)
|
||||
|
||||
return result
|
||||
|
||||
def _default_width_for_cost(cost):
|
||||
from math import log2
|
||||
if cost <= 2: return 1
|
||||
return int(log2(cost))
|
||||
class EdgeWidthByCost(StyleAnnotator):
|
||||
idempotent = True
|
||||
def __init__(self, param):
|
||||
# The param is a function mapping the costs to widths.
|
||||
# This _is_ hashable, but also it _is_ ugly. But convenient :-)
|
||||
self.width_for_cost = param if param is not None else _default_width_for_cost
|
||||
def annotate(topo):
|
||||
result = Annotation()
|
||||
result.for_edge = {e: self.width_for_cost(e.cost) for e in topo.topology.edges}
|
||||
return result
|
||||
|
||||
class HighlightTopoDiff(StyleAnnotator):
|
||||
idempotent = True
|
||||
def __init__(self, _param): pass
|
||||
def annotate(topo):
|
||||
topodiff = AnnotatorID(annotator=analysis.TopologyDifference)
|
||||
topo.run_annotator(topodiff) # make sure
|
||||
td_result = topo.annotations[topodiff]
|
||||
result = Annotation()
|
||||
S = analysis.TopologyDifference.Status
|
||||
for k, v in td_result.for_vertex.items():
|
||||
result.for_vertex[k] = {'highlight_colour': {
|
||||
S.Missing: (255, 0, 0, 255),
|
||||
S.New: (0, 192, 0, 255),
|
||||
S.Discrepant: (0, 0, 255, 255),
|
||||
}[v]}
|
||||
for k, v in td_result.for_edge.items():
|
||||
result.for_edge[k] = {'colour': {
|
||||
S.Missing: (255, 0, 0, 255),
|
||||
S.New: (0, 192, 0, 255),
|
||||
S.Discrepant: (0, 0, 255, 255),
|
||||
}[v]}
|
||||
return result
|
||||
|
||||
class HighlightCurrent(StyleAnnotator):
|
||||
idempotent = False
|
||||
def __init__(self, what):
|
||||
self.what = what
|
||||
def annotate(topo):
|
||||
result = Annotation()
|
||||
if self.what is None:
|
||||
# Not going to guess.
|
||||
result.for_topology = False
|
||||
return result
|
||||
# We got an annotator, make sure it has run
|
||||
topo.run_annotator(self.what)
|
||||
current = topo.annotations[self.what]
|
||||
if False: 'alignment'
|
||||
elif self.what.annotator = analysis.ShortestPathTree:
|
||||
# Highlight edges
|
||||
result.for_edge = {e: {'highlight_colour': (200, 200, 0, 128)} for e in current.for_edge.keys()}
|
||||
return result
|
||||
# Add highlights for other tools here.
|
||||
|
||||
class MegaStyler(StyleAnnotator):
|
||||
"""This Annotator summarizes various previous StyleAnnotators into styles of graphics items.
|
||||
|
||||
Either it gets a Sequence of annotators to consider as the parameter, or we
|
||||
try to utilize most of the existing Annotations. The notable exception is
|
||||
the ShortestPathTree, since that is only relevant when it is explicitly selected.
|
||||
|
||||
We depend on StyleAnnotators' Annotations being the correct dictionaries.
|
||||
|
||||
Also, there are not enough Annotators currently shipped with Birdvisu.
|
||||
Therefore, we do not implement any priority-based colouring, but rather
|
||||
just apply styles in order, overriding the previous ones. While not
|
||||
future-proof, it is simple enough to be implemented now and substituted in the
|
||||
future.
|
||||
|
||||
It is almost like a MetaStyler, but sounds much cooler with a G!""" #lol
|
||||
idempotent = False
|
||||
def __init__(self, param):
|
||||
# The boolean determines whether we should run this annotator when it has not been run previously
|
||||
self.annotator_order: Sequence[tuple[StyleAnnotator, bool]] = [
|
||||
(PlaceVerticesFromFile, False),
|
||||
(PlaceUnplacedVertices, True),
|
||||
(EdgeWidthByCost, True),
|
||||
(HighlightTopoDiff, True),
|
||||
(HighlightCurrent, True),
|
||||
]
|
||||
if param is not None:
|
||||
self.detect = False
|
||||
self.relevant_annotators = param
|
||||
else:
|
||||
self.detect = True
|
||||
|
||||
def annotate(topo):
|
||||
# First, set some base styles
|
||||
edge_style = defaultdict(lambda: {
|
||||
'width': 1,
|
||||
'colour': (0,0,0,255),
|
||||
'highlight_colour': (0,0,0,0), # "None"
|
||||
})
|
||||
vertex_style = defaultdict(lambda: {
|
||||
'position': (0,0), # Fallback to have consistent output, PLEASE OVERRIDE!
|
||||
'highlight_colour': (0,0,0,0), # "None"
|
||||
})
|
||||
|
||||
# Walk the annotators and collect the annotations
|
||||
if not detect:
|
||||
relevant_annotators = self.relevant_annotators
|
||||
else:
|
||||
# We will be iterating over this often, so this time it is not a set.
|
||||
relevant_annotators = list(filter(
|
||||
lambda ann_id: ann_id.annotator != analysis.ShortestPathTree,
|
||||
topo.annotations.keys()))
|
||||
|
||||
for ann_cls, should_run in self.annotator_order:
|
||||
# Could this code be better?
|
||||
runs = list(filter(lambda ann_id: ann_id.annotator == ann_cls, relevant_annotators))
|
||||
if len(runs) > 0:
|
||||
annot = runs[0] # If there is not only one, we lack a hint what to choose.
|
||||
elif len(runs) == 0 and should_run:
|
||||
param = None
|
||||
# This is sooo ugly: PlaceUnplacedVertices should get a
|
||||
# parameter if a relevant placement annotator has been run.
|
||||
# HighlightCurrent needs to know the right current tool.
|
||||
if False: 'alignment'
|
||||
elif ann_cls == PlaceUnplacedVertices:
|
||||
param = next(filter(lambda ann_id: ann_id.annotator == PlaceVerticesFromFile, relevant_annotators), None)
|
||||
elif ann_cls == HighlightCurrent:
|
||||
# We hope that relevant_annotators contain a single ShortestPathTree:
|
||||
param = next(filter(lambda ann_id: ann_id.annotator == analysis.ShortestPathTree, relevant_annotators), None)
|
||||
annot = AnnotatorID(annotator=ann_cls, param=param)
|
||||
topo.run_annotator(annot)
|
||||
else: continue
|
||||
data = topo.annotations[annot]
|
||||
|
||||
for k, v in data.for_vertex.items():
|
||||
vertex_style[k] |= v
|
||||
for k, v in data.for_edge.items():
|
||||
edge_style[k] |= v
|
||||
|
||||
result = Annotation()
|
||||
for v in topo.topology.vertices.keys():
|
||||
result.for_vertex[v] = vertex_style[v]
|
||||
for e in topo.topology.edges:
|
||||
result.for_edge[e] = edge_style[e]
|
||||
return result
|
Loading…
Reference in New Issue