#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = 'Stefano Vissicchio ' # Copyright (C) 2013-2017 by # Stefano Vissicchio # All rights reserved. # BSD license. import unittest import networkx as nx import public_utils as utils import public_gpia as gpia import fed ################ # TOY NETWORKS # ################ class Gadgets(): _instance = None def __new__(cls, *args, **kwargs): if (cls._instance is None): cls._instance = super.__new__(cls, *args, **kwargs) return cls._instance def __init__(self): self._setUpTriangle() self._setUpImpossibleTriangle() self._setUpPylon() self._setUpTrapezoid() self._setUpEcmpLoopNHFunction() def _setUpTriangle(self): self.triangle_i = nx.Graph() self.triangle_i.add_edge('1','2',weight=1) self.triangle_i.add_edge('3','2',weight=1) self.triangle_i.add_edge('1','3',weight=10) self.triangle_f = nx.Graph(self.triangle_i) self.triangle_f['3']['2']['weight'] = 100 def _setUpImpossibleTriangle(self): self.imptria_i = nx.Graph() self.imptria_i.add_edge('u','v',weight=100) self.imptria_i.add_edge('v','d',weight=1) self.imptria_i.add_edge('v','z',weight=1) self.imptria_i.add_edge('u','z',weight=1) self.imptria_i.add_edge('d','z',weight=100) self.imptria_f = nx.Graph(self.imptria_i) self.imptria_f['v']['d']['weight'] = 100 self.imptria_f['d']['z']['weight'] = 1 self.imptria_f['u']['z']['weight'] = 100 self.imptria_f['v']['u']['weight'] = 1 def _setUpPylon(self): self.pylon_i = nx.Graph() self.pylon_i.add_edge('d1','u1',weight=1) self.pylon_i.add_edge('d1','v1',weight=100) self.pylon_i.add_edge('d2','u4',weight=1) self.pylon_i.add_edge('d2','v4',weight=100) self.pylon_i.add_edge('u1','v1',weight=1) self.pylon_i.add_edge('u2','v2',weight=1) self.pylon_i.add_edge('u3','v3',weight=1) self.pylon_i.add_edge('u4','v4',weight=1) self.pylon_i.add_edge('u1','u2',weight=1) self.pylon_i.add_edge('u3','u2',weight=1) self.pylon_i.add_edge('u3','u4',weight=1) self.pylon_i.add_edge('v1','v2',weight=100) self.pylon_i.add_edge('v3','v2',weight=100) self.pylon_i.add_edge('v3','v4',weight=100) self.pylon_f = nx.Graph(self.pylon_i) self.pylon_f['d1']['v1']['weight'] = 1 self.pylon_f['d1']['u1']['weight'] = 100 self.pylon_f['d2']['v4']['weight'] = 1 self.pylon_f['d2']['u4']['weight'] = 100 self.pylon_f['u1']['u2']['weight'] = 100 self.pylon_f['u3']['u2']['weight'] = 100 self.pylon_f['u4']['u3']['weight'] = 100 self.pylon_f['v1']['v2']['weight'] = 1 self.pylon_f['v3']['v2']['weight'] = 1 self.pylon_f['v4']['v3']['weight'] = 1 def _setUpTrapezoid(self): self.trapezoid_i = nx.Graph() self.trapezoid_i.add_edge('D','R3',weight=100) self.trapezoid_i.add_edge('D','R4',weight=1) self.trapezoid_i.add_edge('D','R2',weight=100) self.trapezoid_i.add_edge('R2','R4',weight=1) self.trapezoid_i.add_edge('R2','R3',weight=1) self.trapezoid_i.add_edge('R1','R2',weight=100) self.trapezoid_i.add_edge('R1','R3',weight=1) self.trapezoid_f = nx.Graph() self.trapezoid_f.add_edge('D','R3',weight=1) self.trapezoid_f.add_edge('D','R4',weight=1) self.trapezoid_f.add_edge('D','R2',weight=100) self.trapezoid_f.add_edge('R2','R4',weight=100) self.trapezoid_f.add_edge('R2','R3',weight=1) self.trapezoid_f.add_edge('R1','R2',weight=100) self.trapezoid_f.add_edge('R1','R3',weight=1) def _setUpEcmpLoopNHFunction(self): self.ecmpLoop_nh = {} self.ecmpLoop_nh["d"] = { "d": set(["d"]), "v": set(["d"]), "u": set(["d","s"]), "s": set(["u","v"]) } ######### # TESTS # ######### # check common functions in the abstract class class GpiaTestCase(unittest.TestCase): def setUp(self): self.instance = gpia.Gpia() self.gadgets = Gadgets() def testTimeOut(self): self.instance.set_max_cpu_time(-1) destinations = ["d"] nh_init = utils.computeNextHops(self.gadgets.imptria_i,destinations) nh_final = utils.computeNextHops(self.gadgets.imptria_f,destinations) self.assertRaises(gpia.MaxCPUTimeExceededException, self.instance.compute_sequence, self.gadgets.imptria_i,nh_init,nh_final,destinations) def testDetectPathInconsistency(self): final = set(['1','2','3']) initial = set(['4','5']) current = set(['1','2','3']) self.assertFalse(self.instance.detect_path_inconsistency(current, initial, final)) final = set(['1','2','3']) initial = set(['4','5']) current = set(['1','2']) self.assertFalse(self.instance.detect_path_inconsistency(current, initial, final)) final = set(['1','2','3']) initial = set(['4','5']) current = set(['2','4']) self.assertFalse(self.instance.detect_path_inconsistency(current, initial, final)) final = set(['1','2','3']) initial = set(['4','5']) current = set(['1','2','6']) self.assertTrue(self.instance.detect_path_inconsistency(current, initial, final)) def testBuildActualPathsWithLoops(self): destinations = ["d"] paths = self.instance.build_actual_paths(self.gadgets.ecmpLoop_nh, destinations) self.assertTrue(len(paths["d"]) == 1) self.assertFalse("s" in paths["d"]) self.assertFalse("u" in paths["d"]) self.assertFalse("v" in paths["d"]) # check correct execution of FU to FU updates class FUGpiaTestCase(unittest.TestCase): def setUp(self): self.algo = gpia.FUGpia() self.gadgets = Gadgets() def testComputeSequence(self): # setup scenario and compute sequence destinations = ["3"] nh_init = utils.computeNextHops(self.gadgets.triangle_i,destinations) nh_final = utils.computeNextHops(self.gadgets.triangle_f,destinations) seq = self.algo.compute_sequence(self.gadgets.triangle_i,nh_init,nh_final,destinations) # check that (i) all nodes are updated, (ii) the sequence consists of 2 steps, and (iii) node 1 is updated before "2" self.assertTrue(len(self.algo.get_migrated_nodes())==3) self.assertTrue(len(seq)==2) self.assertTrue("1" in seq[0] and "2" in seq[1]) def testComputeSequenceNoOrdering(self): # setup scenario and compute update sequence destinations = ["d"] nh_init = utils.computeNextHops(self.gadgets.imptria_i,destinations) nh_final = utils.computeNextHops(self.gadgets.imptria_f,destinations) seq = self.algo.compute_sequence(self.gadgets.imptria_i,nh_init,nh_final,destinations) # check the output (only d can be safely updated) self.assertTrue(len(seq)==1) self.assertTrue(seq[0] == ['d']) def testComputeSequencePylonD1(self): # setup scenario and compute update sequence destinations = ["d1"] nh_init = utils.computeNextHops(self.gadgets.pylon_i,destinations) nh_final = utils.computeNextHops(self.gadgets.pylon_f,destinations) seq = self.algo.compute_sequence(self.gadgets.pylon_i,nh_init,nh_final,destinations) # check that a sequence including all nodes is found self.assertTrue(len(utils.flatten_list(seq))==len(self.gadgets.pylon_i.nodes())) def testComputeSequencePylonD2(self): # setup scenario and compute update sequence destinations = ["d2"] nh_init = utils.computeNextHops(self.gadgets.pylon_i,destinations) nh_final = utils.computeNextHops(self.gadgets.pylon_f,destinations) seq = self.algo.compute_sequence(self.gadgets.pylon_i,nh_init,nh_final,destinations) # check that a sequence including all nodes is found self.assertTrue(len(utils.flatten_list(seq))==len(self.gadgets.pylon_i.nodes())) def testComputeSequencePylonTogether(self): # setup scenario and compute update sequence destinations = ["d1", "d2"] nh_init = utils.computeNextHops(self.gadgets.pylon_i,destinations) nh_final = utils.computeNextHops(self.gadgets.pylon_f,destinations) seq = self.algo.compute_sequence(self.gadgets.pylon_i,nh_init,nh_final,destinations) # check that no node can be updated self.assertTrue(len(seq)==0) # check correct execution of FA-involving updates class FAAndFUGpiaTestCase(unittest.TestCase): def setUp(self): self.algo = gpia.FAAndFUGpia() self.graph = nx.Graph() self.graph.add_edges_from([('a','b'),('b','d'),('c','d'),('c','b'),('c','e'),('e','f'),('f','d')]) self.gadgets = Gadgets() def testComputeSequenceTrapezoidFA2FU(self): destinations = ["D"] nh_init = utils.computeNextHops(self.gadgets.trapezoid_i,destinations) nh_final = utils.computeNextHops(self.gadgets.trapezoid_f,destinations) self.algo.set_fa2fu() seq = self.algo.compute_sequence(self.gadgets.trapezoid_i,nh_init,nh_final,destinations) self.assertTrue(len(utils.flatten_list(seq)) == len(self.gadgets.trapezoid_i.nodes())) self.assertTrue(len(seq) == len(self.gadgets.trapezoid_i.nodes())) def testComputeSequenceTrapezoidFU2FA(self): destinations = ["D"] nh_init = utils.computeNextHops(self.gadgets.trapezoid_i,destinations) nh_final = utils.computeNextHops(self.gadgets.trapezoid_f,destinations) self.algo.set_fu2fa() seq = self.algo.compute_sequence(self.gadgets.trapezoid_i,nh_init,nh_final,destinations) self.assertTrue(len(utils.flatten_list(seq)) == len(self.gadgets.trapezoid_i.nodes())) self.assertTrue(len(seq) == len(self.gadgets.trapezoid_i.nodes())) # check correct execution of FED (upon sequences computed by GPIA) class FedTestCase(unittest.TestCase): def setUp(self): self.gpia = gpia.FUGpia() self.fed = fed.Fed() self.gadgets = Gadgets() def testComputeSequence(self): # setup a case where GPIA is sufficient to complete the update destinations = ["3"] nh_init = utils.computeNextHops(self.gadgets.triangle_i,destinations) nh_final = utils.computeNextHops(self.gadgets.triangle_f,destinations) seq = self.gpia.compute_sequence(self.gadgets.triangle_i,nh_init,nh_final,destinations) # run FED non_migrated_nodes = set(self.gadgets.triangle_i.nodes()).difference(self.gpia.get_migrated_nodes()) unsafe_dests = self.gpia.get_unsafe_destinations() final_seq = self.fed.add_duplication_steps(self.gadgets.triangle_i,nh_init,nh_final,non_migrated_nodes,unsafe_dests,seq) # check that FED did not add any rule/step ## note that we cannot directly compare GPIA sequence with FED one, as the former is ## a list of lists while the latter is a list of sets self.assertTrue(len(final_seq)==len(seq)) for i in range(len(final_seq)): self.assertTrue(len(final_seq[i])==len(seq[i])) self.assertTrue(len(self.fed.get_dup_entries().values()) == 0) def testComputeSequenceNoOrdering(self): # setup a case where GPIA is sufficient to complete the update destinations = ["d"] nh_init = utils.computeNextHops(self.gadgets.imptria_i,destinations) nh_final = utils.computeNextHops(self.gadgets.imptria_f,destinations) seq = self.gpia.compute_sequence(self.gadgets.imptria_i,nh_init,nh_final,destinations) migrated_nodes = self.gpia.get_migrated_nodes() # run FED non_migrated_nodes = set(self.gadgets.imptria_i.nodes()).difference(self.gpia.get_migrated_nodes()) unsafe_dests = self.gpia.get_unsafe_destinations() final_seq = self.fed.add_duplication_steps(self.gadgets.imptria_i,nh_init,nh_final,non_migrated_nodes,unsafe_dests,seq) # check that FED updated all the nodes by adding 3 steps self.assertTrue(len(final_seq)==len(seq)+3) for n in self.gadgets.imptria_i.nodes(): self.assertTrue("upd_"+n in utils.flatten_list(final_seq)) # check that FED duplicated a minimal amount of rules self.assertTrue(len(self.fed.get_dup_entries().values()) == len(non_migrated_nodes) * len(destinations)) if __name__ == '__main__': unittest.main()