Add TopologyProviders for ospffile format

older_libs
LEdoian 2 years ago
parent c13192316b
commit e60413bdf1

@ -0,0 +1,126 @@
"""Common TopologyProviders"""
from birdvisu import ospffile
from birdvisu.maps_new import TopologyProvider, Router, Network, Link, Topology
import subprocess
import re
class OspfDataTopologyProvider(TopologyProvider):
"""Common provider for parsing BIRD's OSPF data.
At the moment it can only process static data."""
def __init__(self, data: str):
parsed = ospffile.loads(data)
self.topology = Topology()
# Networks will need changed names, since 'network' directives do not
# feel trustworthy (I don't really understand them, but I think they
# depend on designated routers and interface numbers, changes of which
# should not cause us to think that is a different network)
self.network_renames = {}
# We only care for areas:
# TODO: Should we support global visualisation configuration?
for directive, details in parsed:
if directive.startswith('area'):
self.add_area((directive, details))
def add_area(self, ospf_area):
_area, topo = ospf_area # Ignoring the area
for obj in topo:
directive, details = obj
if directive.startswith('router'):
self.add_router(obj)
elif directive.startswith('network'):
self.add_network(obj)
self.find_links()
# assert self.topology.is_valid()
def add_router(self, tree):
ident, details = tree
assert ident not in self.topology.routers
self.topology.routers[ident] = Router(
ident = ident,
links = [], # Will be filled by find_links
details = tree,
)
# Stubnets and external routes are only mentioned here, so add them now
for det in details:
n, _nd = det
if n.startswith(('stubnet', 'external')):
net_id = re.match(r'((stubnet|external) [^ ]+)', n).group(1)
if net_id not in self.topology.networks:
self.topology.networks[net_id] = Network(
ident = net_id,
links = [],
details = det,
)
def add_network(self, tree):
bird_name, details = tree
assert bird_name not in self.network_renames
details.append((bird_name, [])) # Keep old name for reference
ident_candidates = []
for d, dd in details:
if d.startswith('address'):
ident_candidates.append(d.replace('address', 'network'))
assert len(ident_candidates) > 0, "Network without address?"
ident = sorted(ident_candidates)[0]
self.network_renames[bird_name] = ident
assert ident not in self.topology.networks, "Multiple networks with same address?"
self.topology.networks[ident] = Network(
ident = ident,
links = [],
details = tree,
)
def find_links(self):
for r in self.topology.routers.values():
det = r.details[1]
for n, nd in det:
if n.startswith(('network', 'stubnet', 'external')):
net_id = re.match(r'((network|stubnet|external) [^ ]+)', n).group(1)
if net_id in self.network_renames:
net_id = self.network_renames[net_id]
metric = int(re.search(r'metric ([0-9]+)', n).group(1))
ident = (r.ident, net_id)
# I really hope that one router has at most one link to each network (incl. external)
assert ident not in self.topology.links
link = Link(
router = self.topology.routers[r.ident],
network = self.topology.networks[net_id],
metric = metric,
)
self.topology.links[ident] = link
link.router.links.append(link)
link.network.links.append(link)
def get_topology(self):
return self.topology
class OspfFileTopologyProvider(TopologyProvider):
def __init__(self, ospf_file):
ospf_data = ospf_file.read()
self.ospfdataprovider = OspfDataTopologyProvider(ospf_data)
def get_topology(self):
return self.ospfdataprovider.get_topology()
class RunningBirdTopologyProvider(TopologyProvider):
def __init__(self):
self.load_topology()
def load_topology(self):
bird = subprocess.Popen(['birdcl', 'show', 'ospf', 'state'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
# Probably not needed, but will work and is tested from interactive python shell
stdin=subprocess.PIPE,
text=True)
stdout, stderr = bird.communicate()
if bird.returncode != 0 or stderr != '':
raise RuntimeError(f'BIRD failed: returncode={bird.returncode}, stderr={stderr}')
# FIXME: this probably causes memory leaks, since TopologyCombiner will
# keep complete history from all runs that have been processed. This
# needs to be solved at TopologyCombiner level or above, since we need
# to be able to remove old data from the topology.
self.topology = OspfDataTopologyProvider(stdout).get_topology()
def get_topology(self):
return self.topology
Loading…
Cancel
Save