diff --git a/birdvisu/annotations/analysis.py b/birdvisu/annotations/analysis.py index 121b375..b67c7b1 100644 --- a/birdvisu/annotations/analysis.py +++ b/birdvisu/annotations/analysis.py @@ -16,6 +16,7 @@ class TopologyDifference(Annotator): Currently, we only support the "reference vs. current" comparison, since that is the most useful case and it is clear which vertices are new and which old.""" + idempotent = True class Status(Enum): Missing = 'missing' New = 'new' @@ -77,6 +78,7 @@ class ShortestPathTree(Annotator): the vertices too. The annotations are of form (metric_type, distance) If the start vertex is not found, annotates the whole topology with None.""" + idempotent = True def __init__(self, param): vertex, ancestor = param self.start_vtxid = vertex diff --git a/birdvisu/annotations/layout.py b/birdvisu/annotations/layout.py new file mode 100644 index 0000000..cfb1416 --- /dev/null +++ b/birdvisu/annotations/layout.py @@ -0,0 +1,368 @@ +from . import Annotator, Annotation +import .analysis as analysis +from ..topo_v3 import VertexID, VertexType +from ..ospffile import load + +from collections.abc import Sequence +from ipaddress import ip_network, IPv4Network +from socket import AF_INET, AF_INET6 + +import math +import random + +def _parse_router_id(quaddot): + from ipaddress import IPv4Address + return int(IPv4Address(quaddot)) + +def _parse_details(det): + result = dict() + for detail, _chld in det: + tag, *d = detail.split() + if False: 'alignment' + elif tag == 'position': + result['position'] = tuple(map(float, d[:2])) + # Put other known directives here. We are deliberately ignoring unknown, since details may include identifiers (designated routers &c.) + return result + +class StyleAnnotator(Annotator): + """Quasi-interface. These annotators promise that they only annotate + vertices and edges with styling dictionaries. + + These dictionaries can + consist of following styling properties: + For vertices: + - position (xy; set by PlaceUnplacedVertices) + - highlight_colour (rgba) + For edges: + - colour (rgba) + - width (float) + - highlight_colour (rgba) + + Should a StyleAnnotator fail, it should mark the whole Topology as False + and not fill any specific tags.""" + pass # Superfluous :-) + +class PlaceVerticesFromFile(StyleAnnotator): + """Annotator to load vertex positions from a file and annotate the vertices with them. + + The filename is either given to the Annotator, or ./visualisation.visu is used.""" + idempotent = False # The layout file could have changed, this is not covered by a TopologyV3 + def __init__(self, filename): + if filename is None: + filename = r'./visualisation.visu' + self.filename = filename + def annotate(self, topo): + self.topo = topo + self.result = Annotation() + try: + with open(self.filename, 'r') as f: + positions = load(f) + except OSError as e: + print(f'WARN: Could not load positions from {self.filename}: {e}') + self.result.for_topology = False + return self.result + + # TODO: cope with multiple presets in one file? + for d, chl in positions: + if d.startswith('visualisation '): + self.add_positions(chl) + break + return self.result + def add_positions(chl) -> None: + # level-2 are standard directives like from BIRD. For networks, level-3 may contain details (dr / address, router for stubnets maybe?) + for directive, details in chl: + tag, *det = directive.split() + if False: 'alignment' + elif tag in ['router', 'xrouter']: + router_id = _parse_router_id(det[0]) + vtxid = VertexID(router_id=router_id, is_router=True, address=None, family=None, dr_id=None, discriminator=None) + annot = _parse_details(det) + self.result.for_vertex[vtxid] = annot + elif tag == 'vlink': + raise ValueError('Being a virtual link is not a vertex attribute.') + elif tag in ['external', 'xnetwork', 'stubnet']: + rid = None + addr = ip_network(det[0]) + family = AF_INET if isinstance(addr, IPv4Network) else AF_INET6 + if tag == 'stubnet': + for d, _l4 in details: + t, *rid = d.split() + if t == 'router': + rid = _parse_router_id(rid[0]) + break + else: + cand = self.topo.finder.find(address=addr, type=VertexType.StubNet) + if len(cand) != 1: + print(f'Bad number of candidates for {directive}') + continue # Place with other way + rid = cand.pop().router_id + vtxid = VertexID(router_id=rid, is_router=False, address=addr, family=family, dr_if=None, discriminator=None) + annot = _parse_details(det) + self.result.for_vertex[vtxid] = annot + elif tag == 'network': + # We do not know the OSPF version, which is sad. Let us guess + version = 3 if '-' in directive else 2 + if version == 3: + net_label = det[0] + rid, discr = net_label.lstrip(r'[').rstrip(r']').split(r'-') + rid = _parse_router_id(rid) + # find networks + family = None + addresses = [] + for d, _l4 in details: + t, *addr = d.split() + if t == 'address': + addr = ip_network(addr[0]) + addresses.append(addr) + family = AF_INET if isinstance(addr, IPv4Network) else AF_INET6 + if len(addresses) == 0: fin_addr = None + elif len(addresses) == 1: fin_addr = addresses[0] + else: fin_addr = tuple(addresses) + vtxid = VertexID(dr_id=rid, discriminator=discr, is_router=False, address=fin_addr, family=family, router_id=None) + else: + addr = ip_network(det[0]) + # find designated router + for d, _l4 in details: + t, *rid = d.split() + if t == 'dr': + rid = _parse_router_id(rid[0]) + break + else: + # Try using finder + cand = self.topo.finder.find(address=addr, type=VertexType.TransitNetwork) + if len(cand) != 1: + print(f'Bad number of candidates for {directive}') + continue # Place with other way + rid = cand.pop().dr_id + vtxid = VertexID(dr_id=rid, discriminator=None, is_router=False, address=addr, family=AF_INET, router_id=None) + annot = _parse_details(det) + self.result.for_vertex[vtxid] = annot + else: raise ValueError(f'Unknown directive: {directive}') + +class PlaceUnplacedVertices(StyleAnnotator): + """Determine positions for unplaced vertices + + Our approach is to add them to random places in proximity of already placed + vertices. This way, the user can easily find the vertex and optionally move + it to their liking.""" + def __init__(self, previous): + self.previous = previous # If an annotator to place vertices has been run, this is the reference. + def annotate(self, topo): + # First, try to find existing placements + if self.previous is None: + previous_annotations = list(filter(lambda ann_id: ann_id.annotator == PlaceVerticesFromFile, topo.annotations.keys())) + if len(previous_annotations) > 0: + # Use the most potent previous annotation? + best = max(previous_annotations, key=lambda ann_id: len(topo.annotations[ann_id].for_vertex)) + else: best = None + else: + best = self.previous + + already_placed: set[VertexID] = set(topo.annotations[best].for_vertex.keys()) if best is not None else set() + to_be_placed: set[VertexID] = topo.topology.vertices.keys() - already_placed + + # Let's say that a proximity is any distance shorter than 10% of the current diameter + # TODO: Too many edge cases (no vertices placed, one vertex placed, + # yada yada), screw that, we just say that proximity is 200 px (or whatever the units are.) + + # BFS: First place all the known vertices at their coordinates, then their neighbours, … + queue: list[tuple[VertexID, float, float, bool]] = [] # Vertex, x, y, should we randomly move it (i.e. False for already placed) + found_vertices = set() + + result = Annotation() + + if best is not None: + for vtxid in already_placed: + queue.append(( + vtxid, + topo.annotations[best].for_vertex[vtxid]['position'][0], + topo.annotations[best].for_vertex[vtxid]['position'][1], + False, + )) + found_vertices.add(vtxid) + else: + vtxid = to_be_placed.pop() + queue.append((vtxid, 0, 0, False)) + found_vertices.add(vtxid) + + while len(queue) > 0: + vtxid, x, y, move = queue.pop(0) # Maybe bad code? (cf. deque) + if move: + angle = random.random() * 2 * math.pi + distance = random.randint(20, 200) + dx = distance * math.cos(angle) + dy = distance * math.sin(angle) + x += dx + y += dy + # place + result.for_vertex[vtxid] = {'position': (x, y)} + # add neigh + for e in topo.topology.vertices[vtxid].incoming_edges: + if e.source not in found_vertices: + queue.append((e.source, x, y, True)) + found_vertices.add(e.source) + for e in topo.topology.vertices[vtxid].outgoing_edges: + if e.target not in found_vertices: + queue.append((e.target, x, y, True)) + found_vertices.add(e.target) + # If there are any vertices left, we place one of it at random and + # repeat. This is unlikely though, because the discovered topology can + # not be disconnected by definition of OSPF basic algorithm. + # Contrary to programming idioms, we solve this here, so that the + # code is not duplicated and preparation for the BFS is not complicated further + if len(queue) == 0: + remaining_vertices = to_be_placed - found_vertices + if len(remaining_vertices) > 0: + vtxid = remaining_vertices.pop() + queue.append(( + vtxid, + random.randint(0, 1000), # Will it blend? + random.randint(0, 1000), + False)) + found_vertices.add(vtxid) + + return result + +def _default_width_for_cost(cost): + from math import log2 + if cost <= 2: return 1 + return int(log2(cost)) +class EdgeWidthByCost(StyleAnnotator): + idempotent = True + def __init__(self, param): + # The param is a function mapping the costs to widths. + # This _is_ hashable, but also it _is_ ugly. But convenient :-) + self.width_for_cost = param if param is not None else _default_width_for_cost + def annotate(topo): + result = Annotation() + result.for_edge = {e: self.width_for_cost(e.cost) for e in topo.topology.edges} + return result + +class HighlightTopoDiff(StyleAnnotator): + idempotent = True + def __init__(self, _param): pass + def annotate(topo): + topodiff = AnnotatorID(annotator=analysis.TopologyDifference) + topo.run_annotator(topodiff) # make sure + td_result = topo.annotations[topodiff] + result = Annotation() + S = analysis.TopologyDifference.Status + for k, v in td_result.for_vertex.items(): + result.for_vertex[k] = {'highlight_colour': { + S.Missing: (255, 0, 0, 255), + S.New: (0, 192, 0, 255), + S.Discrepant: (0, 0, 255, 255), + }[v]} + for k, v in td_result.for_edge.items(): + result.for_edge[k] = {'colour': { + S.Missing: (255, 0, 0, 255), + S.New: (0, 192, 0, 255), + S.Discrepant: (0, 0, 255, 255), + }[v]} + return result + +class HighlightCurrent(StyleAnnotator): + idempotent = False + def __init__(self, what): + self.what = what + def annotate(topo): + result = Annotation() + if self.what is None: + # Not going to guess. + result.for_topology = False + return result + # We got an annotator, make sure it has run + topo.run_annotator(self.what) + current = topo.annotations[self.what] + if False: 'alignment' + elif self.what.annotator = analysis.ShortestPathTree: + # Highlight edges + result.for_edge = {e: {'highlight_colour': (200, 200, 0, 128)} for e in current.for_edge.keys()} + return result + # Add highlights for other tools here. + +class MegaStyler(StyleAnnotator): + """This Annotator summarizes various previous StyleAnnotators into styles of graphics items. + + Either it gets a Sequence of annotators to consider as the parameter, or we + try to utilize most of the existing Annotations. The notable exception is + the ShortestPathTree, since that is only relevant when it is explicitly selected. + + We depend on StyleAnnotators' Annotations being the correct dictionaries. + + Also, there are not enough Annotators currently shipped with Birdvisu. + Therefore, we do not implement any priority-based colouring, but rather + just apply styles in order, overriding the previous ones. While not + future-proof, it is simple enough to be implemented now and substituted in the + future. + + It is almost like a MetaStyler, but sounds much cooler with a G!""" #lol + idempotent = False + def __init__(self, param): + # The boolean determines whether we should run this annotator when it has not been run previously + self.annotator_order: Sequence[tuple[StyleAnnotator, bool]] = [ + (PlaceVerticesFromFile, False), + (PlaceUnplacedVertices, True), + (EdgeWidthByCost, True), + (HighlightTopoDiff, True), + (HighlightCurrent, True), + ] + if param is not None: + self.detect = False + self.relevant_annotators = param + else: + self.detect = True + + def annotate(topo): + # First, set some base styles + edge_style = defaultdict(lambda: { + 'width': 1, + 'colour': (0,0,0,255), + 'highlight_colour': (0,0,0,0), # "None" + }) + vertex_style = defaultdict(lambda: { + 'position': (0,0), # Fallback to have consistent output, PLEASE OVERRIDE! + 'highlight_colour': (0,0,0,0), # "None" + }) + + # Walk the annotators and collect the annotations + if not detect: + relevant_annotators = self.relevant_annotators + else: + # We will be iterating over this often, so this time it is not a set. + relevant_annotators = list(filter( + lambda ann_id: ann_id.annotator != analysis.ShortestPathTree, + topo.annotations.keys())) + + for ann_cls, should_run in self.annotator_order: + # Could this code be better? + runs = list(filter(lambda ann_id: ann_id.annotator == ann_cls, relevant_annotators)) + if len(runs) > 0: + annot = runs[0] # If there is not only one, we lack a hint what to choose. + elif len(runs) == 0 and should_run: + param = None + # This is sooo ugly: PlaceUnplacedVertices should get a + # parameter if a relevant placement annotator has been run. + # HighlightCurrent needs to know the right current tool. + if False: 'alignment' + elif ann_cls == PlaceUnplacedVertices: + param = next(filter(lambda ann_id: ann_id.annotator == PlaceVerticesFromFile, relevant_annotators), None) + elif ann_cls == HighlightCurrent: + # We hope that relevant_annotators contain a single ShortestPathTree: + param = next(filter(lambda ann_id: ann_id.annotator == analysis.ShortestPathTree, relevant_annotators), None) + annot = AnnotatorID(annotator=ann_cls, param=param) + topo.run_annotator(annot) + else: continue + data = topo.annotations[annot] + + for k, v in data.for_vertex.items(): + vertex_style[k] |= v + for k, v in data.for_edge.items(): + edge_style[k] |= v + + result = Annotation() + for v in topo.topology.vertices.keys(): + result.for_vertex[v] = vertex_style[v] + for e in topo.topology.edges: + result.for_edge[e] = edge_style[e] + return result diff --git a/birdvisu/visualisation/annotators.py b/birdvisu/visualisation/annotators.py index 74ff73b..2039476 100644 --- a/birdvisu/visualisation/annotators.py +++ b/birdvisu/visualisation/annotators.py @@ -1,146 +1,4 @@ -"""Annotators for visualising. - -Many of these should be somewhere else, because they make assumptions not -related to visualisation. Also, only the currently needed annotators were -written.""" - -from enum import Enum, auto -from dataclasses import dataclass -import re -import random -from PySide6 import QtCore, QtGui, QtWidgets -from .qt_widgets import MyGraphicsRectItem - -# Classification - -class DifferenceStatus(Enum): - """Describes differences between two topologies""" - NORMAL = auto() - MISSING = auto() - EXTRA = auto() - DISCREPANCY = auto() - -def difference_annotator(at, reference_src='reference', actual_src='actual'): - """Adds DifferenceStatuses according to which sources provided which part - of topology""" - - topo = at.topology - for data, annot in [ - (topo.routers, at.router_annotations), - (topo.networks, at.network_annotations), - (topo.links, at.link_annotations), - ]: - for k, v in data.items(): - verdict = DifferenceStatus.NORMAL - if actual_src not in v.sources: - verdict = DifferenceStatus.MISSING - if reference_src not in v.sources: - verdict = DifferenceStatus.EXTRA - # Nodes currently cannot have discrepant information (provided the - # Topology.is_valid()) - if data is topo.links: - if v.metric < 0: - verdict = DifferenceStatus.DISCREPANCY - - # TODO: We should probably disallow mutating previous AnnotatedTopologies. - annot[k].append(verdict) - return at - -# TODO: annotator for routing trees - - - -# Layouting - -@dataclass -class Position: - x: float - y: float - -def extract_positions(at, directive='visualisation default'): - topo = at.topology - for data, annot in [ - (topo.routers, at.router_annotations), - (topo.networks, at.network_annotations), - ]: - for k, v in data.items(): - visu_directive = None - for details in v.all_details: - visu_directives = list(filter(lambda x: x[0] == directive, details[1])) - if len(visu_directives) > 0: - visu_directive = visu_directives[-1] # Use the last one found. - position = None - if visu_directive is not None: - for pos in visu_directive[1]: - m = re.match(r'position \[([0-9.]+) ([0-9.]+)\]', pos[0]) - if pos is not None: - x, y = m.groups() - position = Position(x = float(x), y = float(y)) - if position is not None: - annot[k].append(position) - return at - -def random_position(at): - # Fallback when no position could be extracted - # TODO: this should use some kind of heuristic or existing layout engine. - topo = at.topology - for data, annot in [ - (topo.routers, at.router_annotations), - (topo.networks, at.network_annotations), - ]: - for k, v in data.items(): - if len(annot[k]) > 0 and not isinstance(annot[k][-1], Position): - annot[k].append(Position( - # This is really last resort, so expecting the canvas to be - # FullHD is as good assumption as any. - x = float(random.randint(0, 1920)), - y = float(random.randint(0, 1080)), - )) - return at - - -# Rendering - -def assign_brushes(at): - for annot in [ - at.router_annotations, - at.network_annotations, - at.link_annotations, - ]: - for tags in annot.values(): - statuses = list(filter(lambda x: isinstance(x, DifferenceStatus), tags)) - status = statuses[-1] if len(statuses) > 0 else DifferenceStatus.DISCREPANCY # should always have something. - color = { - DifferenceStatus.NORMAL: 'black', - DifferenceStatus.EXTRA: 'green', - DifferenceStatus.MISSING: 'red', - DifferenceStatus.DISCREPANCY: 'blue', - }[status] - tags.append(QtGui.QBrush(QtGui.QColor(color))) - return at - def create_qgritems(at): - # qgritem = QGraphicsItem - # TODO: reasonable visualisation, not just squares :-) - # - this probably involves creatin custom qgritems - # - We will need own qgritems anyway to handle interaction (e.g. - # resolving to Router objects) - topo = at.topology - for rk, r in topo.routers.items(): - size = 30 - brush = None - for tag in at.router_annotations[rk][-1::-1]: - if isinstance(tag, QtGui.QBrush): - brush = tag - break - pos = None - for tag in at.router_annotations[rk][-1::-1]: - if isinstance(tag, Position): - pos = tag - break - x = pos.x - y = pos.y - shape = MyGraphicsRectItem(-size/2, -size/2, size, size) shape.setBrush(brush) shape.setPos(x, y) @@ -199,11 +57,4 @@ def create_qgritems(at): npos = tag break - line = QtWidgets.QGraphicsLineItem(rpos.x, rpos.y, npos.x, npos.y) - ritem.setData(0, line) - nitem.setData(0, line) - line.setData(0, nitem) - line.setData(1, ritem) - at.link_annotations[lk].append(line) - return at