diff options
author | Donald Sharp <sharpd@nvidia.com> | 2021-04-08 19:04:26 +0200 |
---|---|---|
committer | Donald Sharp <sharpd@nvidia.com> | 2021-04-09 14:35:05 +0200 |
commit | 0b25370e95306a6b1631fea8d521aa6f04e75fe4 (patch) | |
tree | 0daabb39ecf1d051a878e8be2153ef816a7959ff /tests | |
parent | tests: Add some more pytest marks for bgpd (diff) | |
download | frr-0b25370e95306a6b1631fea8d521aa6f04e75fe4.tar.xz frr-0b25370e95306a6b1631fea8d521aa6f04e75fe4.zip |
tests: More black fixups
Just another round of fixups found by running black on the code
Signed-off-by: Donald Sharp <sharpd@nvidia.com>
Diffstat (limited to 'tests')
35 files changed, 1078 insertions, 872 deletions
diff --git a/tests/topotests/all-protocol-startup/test_all_protocol_startup.py b/tests/topotests/all-protocol-startup/test_all_protocol_startup.py index c85857125..c10e32ad0 100644 --- a/tests/topotests/all-protocol-startup/test_all_protocol_startup.py +++ b/tests/topotests/all-protocol-startup/test_all_protocol_startup.py @@ -346,9 +346,9 @@ def test_converge_protocols(): print("Show that v4 routes are right\n") v4_routesFile = "%s/r%s/ipv4_routes.ref" % (thisDir, i) - expected = net["r%s" % i].cmd( - "sort {} 2> /dev/null".format(v4_routesFile) - ).rstrip() + expected = ( + net["r%s" % i].cmd("sort {} 2> /dev/null".format(v4_routesFile)).rstrip() + ) expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) actual = ( @@ -379,9 +379,9 @@ def test_converge_protocols(): print("Show that v6 routes are right\n") v6_routesFile = "%s/r%s/ipv6_routes.ref" % (thisDir, i) - expected = net["r%s" % i].cmd( - "sort {} 2> /dev/null".format(v6_routesFile) - ).rstrip() + expected = ( + net["r%s" % i].cmd("sort {} 2> /dev/null".format(v6_routesFile)).rstrip() + ) expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) actual = ( diff --git a/tests/topotests/bgp-snmp-mplsl3vpn/test_bgp_snmp_mplsvpn.py b/tests/topotests/bgp-snmp-mplsl3vpn/test_bgp_snmp_mplsvpn.py index db4eab9d3..b830e16b9 100755 --- a/tests/topotests/bgp-snmp-mplsl3vpn/test_bgp_snmp_mplsvpn.py +++ b/tests/topotests/bgp-snmp-mplsl3vpn/test_bgp_snmp_mplsvpn.py @@ -505,8 +505,10 @@ def test_r1_mplsvpn_VrfTable(): associated_int = r1_snmp.get( "mplsL3VpnVrfAssociatedInterfaces.{}".format(snmp_str_to_oid("VRF-a")) ) - assertmsg = "mplsL3VpnVrfAssociatedInterfaces incorrect should be 3 value {}".format( - associated_int + assertmsg = ( + "mplsL3VpnVrfAssociatedInterfaces incorrect should be 3 value {}".format( + associated_int + ) ) assert associated_int == "3", assertmsg diff --git a/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py b/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py index aa99ebf6d..b70626fcc 100644 --- a/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py +++ b/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py @@ -137,7 +137,7 @@ from lib.common_config import ( kill_mininet_routers_process, get_frr_ipv6_linklocal, create_route_maps, - required_linux_kernel_version + required_linux_kernel_version, ) # Reading the data from JSON File for topology and configuration creation @@ -1329,20 +1329,22 @@ def test_BGP_GR_TC_4_p0(request): result = verify_bgp_rib( tgen, addr_type, dut, input_topo, next_hop, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes result = verify_rib( tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) logger.info("[Phase 5] : R2 is about to come up now ") @@ -1802,10 +1804,11 @@ def test_BGP_GR_TC_6_1_2_p1(request): result = verify_r_bit( tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r2: R-bit is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r2: R-bit is set to True\n Error: {}".format( tc_name, result - )) + ) logger.info("Restart BGPd on R2 ") kill_router_daemons(tgen, "r2", ["bgpd"]) @@ -1823,10 +1826,11 @@ def test_BGP_GR_TC_6_1_2_p1(request): result = verify_r_bit( tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r2: R-bit is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r2: R-bit is set to True\n Error: {}".format( tc_name, result - )) + ) write_test_footer(tc_name) @@ -2108,20 +2112,22 @@ def test_BGP_GR_TC_17_p1(request): result = verify_bgp_rib( tgen, addr_type, dut, input_topo, next_hop, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes result = verify_rib( tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) logger.info("[Phase 5] : R2 is about to come up now ") @@ -2140,10 +2146,11 @@ def test_BGP_GR_TC_17_p1(request): result = verify_r_bit( tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: R-bit is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: R-bit is set to True\n Error: {}".format( tc_name, result - )) + ) # Verifying BGP RIB routes next_hop = next_hop_per_address_family( @@ -2469,20 +2476,22 @@ def test_BGP_GR_TC_20_p1(request): result = verify_bgp_rib( tgen, addr_type, dut, input_topo, next_hop, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes result = verify_rib( tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) logger.info("[Phase 5] : R2 is about to come up now ") @@ -2755,10 +2764,10 @@ def test_BGP_GR_TC_31_1_p1(request): result = verify_rib( tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info("[Phase 4] : R1 is about to come up now ") start_router_daemons(tgen, "r1", ["bgpd"]) @@ -3237,10 +3246,12 @@ def test_BGP_GR_TC_9_p1(request): result = verify_bgp_rib( tgen, addr_type, dut, input_topo, next_hop, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes @@ -3248,10 +3259,10 @@ def test_BGP_GR_TC_9_p1(request): result = verify_rib( tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) logger.info("[Phase 5] : R2 is about to come up now ") @@ -3281,10 +3292,11 @@ def test_BGP_GR_TC_9_p1(request): result = verify_f_bit( tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: F-bit is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: F-bit is set to True\n Error: {}".format( tc_name, result - )) + ) write_test_footer(tc_name) @@ -3416,10 +3428,12 @@ def test_BGP_GR_TC_17_p1(request): result = verify_bgp_rib( tgen, addr_type, dut, input_topo, next_hop, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes @@ -3427,10 +3441,10 @@ def test_BGP_GR_TC_17_p1(request): result = verify_rib( tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) logger.info("[Phase 5] : R2 is about to come up now ") @@ -3452,10 +3466,11 @@ def test_BGP_GR_TC_17_p1(request): result = verify_r_bit( tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: R-bit is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: R-bit is set to True\n Error: {}".format( tc_name, result - )) + ) # Verifying BGP RIB routes next_hop = next_hop_per_address_family( @@ -3675,10 +3690,12 @@ def test_BGP_GR_TC_43_p1(request): result = verify_bgp_rib( tgen, addr_type, dut, input_topo, next_hop, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r2: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) protocol = "bgp" result = verify_rib( tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False @@ -3983,10 +4000,12 @@ def test_BGP_GR_TC_44_p1(request): result = verify_bgp_rib( tgen, addr_type, dut, input_topo, next_hop, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) result = verify_rib( tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False ) @@ -5011,10 +5030,10 @@ def test_BGP_GR_TC_48_p1(request): result = verify_rib( tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) dut = "r2" peer = "r1" @@ -5025,17 +5044,19 @@ def test_BGP_GR_TC_48_p1(request): result = verify_bgp_rib( tgen, addr_type, dut, input_topo, next_hop, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r2: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) result = verify_rib( tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r2: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r2: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) step("Bring up BGP on R1 and remove Peer-level GR config from R1") @@ -5394,17 +5415,19 @@ def BGP_GR_TC_52_p1(request): result = verify_bgp_rib( tgen, addr_type, dut, input_topo, next_hop, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) result = verify_rib( tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) step("Bring up BGP on R2 and remove Peer-level GR config from R1") diff --git a/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py b/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py index c7dca7257..9438b90ef 100644 --- a/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py +++ b/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py @@ -136,7 +136,7 @@ from lib.common_config import ( kill_mininet_routers_process, get_frr_ipv6_linklocal, create_route_maps, - required_linux_kernel_version + required_linux_kernel_version, ) # Reading the data from JSON File for topology and configuration creation @@ -557,10 +557,11 @@ def test_BGP_GR_TC_3_p0(request): result = verify_eor( tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r2: EOR is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r2: EOR is set to True\n Error: {}".format( tc_name, result - )) + ) logger.info( "Waiting for selection deferral timer({} sec)..".format(GR_SELECT_DEFER_TIMER) @@ -703,10 +704,11 @@ def test_BGP_GR_TC_11_p0(request): result = verify_eor( tgen, topo, addr_type, input_dict, dut="r1", peer="r3", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: EOR is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: EOR is set to True\n Error: {}".format( tc_name, result - )) + ) logger.info( "Waiting for selection deferral timer({} sec).. ".format( @@ -733,10 +735,11 @@ def test_BGP_GR_TC_11_p0(request): result = verify_eor( tgen, topo, addr_type, input_dict, dut="r3", peer="r1", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r3: EOR is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r3: EOR is set to True\n Error: {}".format( tc_name, result - )) + ) write_test_footer(tc_name) @@ -1470,35 +1473,39 @@ def test_BGP_GR_18_p1(request): dut = "r6" input_dict_1 = {key: topo["routers"][key] for key in ["r1"]} result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r6: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes before shutting down BGPd daemon result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r6: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r6: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying BGP RIB routes dut = "r2" result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r2: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes before shutting down BGPd daemon result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r6: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r6: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) write_test_footer(tc_name) @@ -1959,18 +1966,20 @@ def test_BGP_GR_chaos_29_p1(request): # Verifying BGP RIB routes before shutting down BGPd daemon input_dict = {key: topo["routers"][key] for key in ["r1"]} result = verify_bgp_rib(tgen, addr_type, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes before shutting down BGPd daemon result = verify_rib(tgen, addr_type, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r3: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) logger.info("[Step 4] : Start BGPd daemon on R1..") @@ -2212,10 +2221,12 @@ def test_BGP_GR_chaos_33_p1(request): result = verify_rib( tgen, addr_type, dut, input_dict_2, next_hop_4, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) if addr_type == "ipv6": @@ -2227,10 +2238,12 @@ def test_BGP_GR_chaos_33_p1(request): result = verify_rib( tgen, addr_type, dut, input_dict_2, next_hop_6, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) logger.info("[Step 4] : Start BGPd daemon on R1 and R4..") @@ -2411,27 +2424,30 @@ def test_BGP_GR_chaos_34_2_p1(request): result = verify_f_bit( tgen, topo, addr_type, input_dict, "r3", "r1", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r3: F-bit is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r3: F-bit is set to True\n Error: {}".format( tc_name, result - )) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying BGP RIB routes after starting BGPd daemon input_dict_1 = {key: topo["routers"][key] for key in ["r1"]} result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r3: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) write_test_footer(tc_name) @@ -2568,10 +2584,11 @@ def test_BGP_GR_chaos_34_1_p1(request): result = verify_f_bit( tgen, topo, addr_type, input_dict_2, "r3", "r1", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r3: F-bit is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r3: F-bit is set to True\n Error: {}".format( tc_name, result - )) + ) logger.info(" Expected behavior: {}".format(result)) logger.info("[Step 3] : Kill BGPd daemon on R1..") @@ -2587,18 +2604,20 @@ def test_BGP_GR_chaos_34_1_p1(request): # Verifying BGP RIB routes input_dict = {key: topo["routers"][key] for key in ["r1"]} result = verify_bgp_rib(tgen, addr_type, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes result = verify_rib(tgen, addr_type, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r3: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) # Start BGPd daemon on R1 @@ -2772,27 +2791,30 @@ def test_BGP_GR_chaos_32_p1(request): result = verify_eor( tgen, topo, addr_type, input_dict_3, dut="r5", peer="r1", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r5: EOR is set to TRUE\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r5: EOR is set to TRUE\n Error: {}".format( tc_name, result - )) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying BGP RIB routes after starting BGPd daemon input_dict_1 = {key: topo["routers"][key] for key in ["r5"]} result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r3: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) write_test_footer(tc_name) @@ -2898,10 +2920,11 @@ def test_BGP_GR_chaos_37_p1(request): result = verify_eor( tgen, topo, addr_type, input_dict, dut="r3", peer="r1", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r3: EOR is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r3: EOR is set to True\n Error: {}".format( tc_name, result - )) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying BGP RIB routes after starting BGPd daemon @@ -2964,10 +2987,11 @@ def test_BGP_GR_chaos_37_p1(request): result = verify_eor( tgen, topo, addr_type, input_dict_3, dut="r1", peer="r3", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: EOR is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: EOR is set to True\n Error: {}".format( tc_name, result - )) + ) write_test_footer(tc_name) @@ -3119,18 +3143,20 @@ def test_BGP_GR_chaos_30_p1(request): # Verifying BGP RIB routes before shutting down BGPd daemon input_dict = {key: topo["routers"][key] for key in ["r3"]} result = verify_bgp_rib(tgen, addr_type, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes before shutting down BGPd daemon result = verify_rib(tgen, addr_type, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) write_test_footer(tc_name) @@ -3532,10 +3558,10 @@ def BGP_GR_TC_7_p1(request): dut = "r1" input_dict_1 = {key: topo["routers"][key] for key in ["r3"]} result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) write_test_footer(tc_name) @@ -3709,10 +3735,11 @@ def test_BGP_GR_TC_23_p1(request): result = verify_eor( tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: EOR is set to True\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: EOR is set to True\n Error: {}".format( tc_name, result - )) + ) # Verifying BGP RIB routes received from router R1 dut = "r1" @@ -3833,18 +3860,20 @@ def test_BGP_GR_20_p1(request): dut = "r3" input_dict_1 = {key: topo["routers"][key] for key in ["r1"]} result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: routes are still present in BGP RIB\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info(" Expected behavior: {}".format(result)) # Verifying RIB routes before shutting down BGPd daemon result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r3: routes are still present in ZEBRA\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result) + ) logger.info(" Expected behavior: {}".format(result)) # Start BGPd daemon on R1 diff --git a/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py b/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py index 8e5ffe10b..84d9c48f3 100644 --- a/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py +++ b/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py @@ -598,10 +598,12 @@ def test_large_community_lists_with_rmap_apply_and_remove(request): result = verify_bgp_community( tgen, adt, dut, NETWORKS[adt], input_dict_4, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "largeCommunity is still present after deleting route-map \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) write_test_footer(tc_name) @@ -899,10 +901,10 @@ def test_large_community_lists_with_rmap_set_none(request): dut = "r6" for adt in ADDR_TYPES: result = verify_bgp_community(tgen, adt, dut, NETWORKS[adt], expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "Community-list is still present \n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "Community-list is still present \n Error: {}".format(tc_name, result) + ) write_test_footer(tc_name) @@ -2238,10 +2240,10 @@ def test_large_community_lists_with_rmap_match_regex(request): result = verify_bgp_community( tgen, adt, dut, NETWORKS[adt], input_dict_7, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "largeCommunity is still present \n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "largeCommunity is still present \n Error: {}".format(tc_name, result) + ) write_test_footer(tc_name) diff --git a/tests/topotests/bgp_multi_vrf_topo1/test_bgp_multi_vrf_topo1.py b/tests/topotests/bgp_multi_vrf_topo1/test_bgp_multi_vrf_topo1.py index 464d6eb47..5ecaee2ec 100644 --- a/tests/topotests/bgp_multi_vrf_topo1/test_bgp_multi_vrf_topo1.py +++ b/tests/topotests/bgp_multi_vrf_topo1/test_bgp_multi_vrf_topo1.py @@ -508,10 +508,10 @@ def test_ambiguous_overlapping_addresses_in_different_vrfs_p0(request): ) result = verify_rib(tgen, addr_type, dut, input_dict_1, tag=500, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "Routes are present with tag value 500 \n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "Routes are present with tag value 500 \n Error: {}".format(tc_name, result) + ) logger.info("Expected Behavior: {}".format(result)) step( @@ -1147,10 +1147,12 @@ def test_prefixes_leaking_p0(request): result = verify_rib( tgen, addr_type, dut, input_dict_1, metric=123, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "Routes are present with metric value 123 \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info("Expected Behavior: {}".format(result)) result = verify_rib(tgen, addr_type, dut, input_dict_2, metric=123) @@ -1161,10 +1163,12 @@ def test_prefixes_leaking_p0(request): result = verify_rib( tgen, addr_type, dut, input_dict_2, metric=0, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "Routes are present with metric value 0 \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info("Expected Behavior: {}".format(result)) write_test_footer(tc_name) diff --git a/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py b/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py index 6d43cc0cd..c8d133012 100644 --- a/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py +++ b/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py @@ -2222,16 +2222,20 @@ def test_restart_bgpd_daemon_p1(request): } result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "Routes are still present in VRF RED_A and RED_B \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) result = verify_rib(tgen, addr_type, dut, input_dict_2, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "Routes are still present in VRF BLUE_A and BLUE_B \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("Bring up BGPd daemon on R1.") start_router_daemons(tgen, "r1", ["bgpd"]) diff --git a/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py b/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py index 9b241bb5d..2a98cb341 100644 --- a/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py +++ b/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py @@ -369,10 +369,11 @@ def test_recursive_routes_iBGP_peer_p1(request): protocol="bgp", expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " - "Routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format( tc_name, result - )) + ) step("Reconfigure the same static route on R2 again") dut = "r2" @@ -490,10 +491,11 @@ def test_recursive_routes_iBGP_peer_p1(request): result = verify_rib( tgen, addr_type, "r2", input_dict_4, protocol="bgp", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "Routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format( tc_name, result - )) + ) write_test_footer(tc_name) @@ -602,10 +604,11 @@ def test_next_hop_as_self_ip_p1(request): next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0], expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " - "Routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format( tc_name, result - )) + ) step("Shut interface on R2 that has IP from the subnet as BGP next-hop") intf_r2_r4 = topo["routers"]["r2"]["links"]["r4"]["interface"] @@ -680,10 +683,11 @@ def test_next_hop_as_self_ip_p1(request): next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0], expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " - "Routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format( tc_name, result - )) + ) write_test_footer(tc_name) @@ -1626,10 +1630,11 @@ def test_BGP_peering_bw_loopback_and_physical_p1(request): step("Verify that once eBGP multi-hop is removed, BGP session goes down") result = verify_bgp_convergence_from_running_config(tgen, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "BGP is converged \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format( tc_name, result - )) + ) step("Add ebgp-multihop command on R3 again") for addr_type in ADDR_TYPES: @@ -1667,10 +1672,11 @@ def test_BGP_peering_bw_loopback_and_physical_p1(request): step("Verify that BGP session goes down, when update-source is removed") result = verify_bgp_convergence_from_running_config(tgen, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "BGP is converged \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format( tc_name, result - )) + ) step("Add update-source command on R1 again") for addr_type in ADDR_TYPES: @@ -1719,18 +1725,20 @@ def test_BGP_peering_bw_loopback_and_physical_p1(request): next_hop=topo["routers"]["r1"]["links"]["r3"][addr_type].split("/")[0], expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " - "Routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format( tc_name, result - )) + ) sleep(3) step("Verify that BGP session goes down, when static route is removed") result = verify_bgp_convergence_from_running_config(tgen, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "BGP is converged \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format( tc_name, result - )) + ) step("Add static route on R3 again") for addr_type in ADDR_TYPES: @@ -1772,10 +1780,11 @@ def test_BGP_peering_bw_loopback_and_physical_p1(request): sleep(3) step("Verify that BGP neighborship between R1 and R3 goes down") result = verify_bgp_convergence_from_running_config(tgen, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "BGP is converged \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format( tc_name, result - )) + ) intf_r1_r3 = topo["routers"]["r1"]["links"]["r3"]["interface"] shutdown_bringup_interface(tgen, "r1", intf_r1_r3, True) @@ -2091,10 +2100,11 @@ def test_BGP_active_standby_preemption_and_ecmp_p1(request): ], expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " - "Routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format( tc_name, result - )) + ) step("Reconfigure multipath-relax command on R4") result = create_router_bgp(tgen, topo, maxpath_relax) @@ -2151,10 +2161,11 @@ def test_BGP_active_standby_preemption_and_ecmp_p1(request): ], expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " - "Routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format( tc_name, result - )) + ) step("Re-configure maximum-path 2 command on R4") input_dict_8 = { @@ -2342,10 +2353,11 @@ def test_password_authentication_for_eBGP_and_iBGP_peers_p1(request): "configured but not peer routers" ) result = verify_bgp_convergence(tgen, topo, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "BGP is converged \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format( tc_name, result - )) + ) step("configure same password on R2 and R3") for routerN in ["r2", "r3"]: @@ -2372,10 +2384,11 @@ def test_password_authentication_for_eBGP_and_iBGP_peers_p1(request): "strings are in CAPs on R2 and R3" ) result = verify_bgp_convergence(tgen, topo, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "BGP is converged \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format( tc_name, result - )) + ) step("Configure same password on R2 and R3 without CAPs") for routerN in ["r2", "r3"]: @@ -2399,10 +2412,11 @@ def test_password_authentication_for_eBGP_and_iBGP_peers_p1(request): step("Verify if password is removed from R1, both sessions go down again") result = verify_bgp_convergence(tgen, topo, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "BGP is converged \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format( tc_name, result - )) + ) step("Configure alphanumeric password on R1 and peer routers R2,R3") for bgp_neighbor in ["r2", "r3"]: diff --git a/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo1.py b/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo1.py index b99f1a741..291a6e7c3 100644 --- a/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo1.py +++ b/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo1.py @@ -498,6 +498,7 @@ def disable_route_map_to_prefer_global_next_hop(tgen, topo): # ##################################################### + def test_dynamic_imported_routes_advertised_to_iBGP_peer_p0(request): """ TC5_FUNC_5: @@ -762,9 +763,7 @@ def test_dynamic_imported_routes_advertised_to_iBGP_peer_p0(request): for addr_type in ADDR_TYPES: - step( - "On router R1 delete static routes in vrf ISR to LOOPBACK_1" - ) + step("On router R1 delete static routes in vrf ISR to LOOPBACK_1") input_routes_r1 = { "r1": { @@ -772,7 +771,7 @@ def test_dynamic_imported_routes_advertised_to_iBGP_peer_p0(request): { "network": [NETWORK1_3[addr_type], NETWORK1_4[addr_type]], "next_hop": (intf_r2_r1[addr_type]).split("/")[0], - "delete": True + "delete": True, } ] } diff --git a/tests/topotests/conftest.py b/tests/topotests/conftest.py index 7ad5d8c9a..cf64956bf 100755 --- a/tests/topotests/conftest.py +++ b/tests/topotests/conftest.py @@ -11,6 +11,7 @@ from lib.topotest import json_cmp_result from lib.topotest import g_extra_config as topotest_extra_config from lib.topolog import logger + def pytest_addoption(parser): """ Add topology-only option to the topology tester. This option makes pytest @@ -143,9 +144,7 @@ def pytest_configure(config): vtysh_on_error = config.getoption("--vtysh-on-error") topotest_extra_config["vtysh_on_error"] = vtysh_on_error - topotest_extra_config["pause_after"] = ( - pause_after or shell or vtysh - ) + topotest_extra_config["pause_after"] = pause_after or shell or vtysh topotest_extra_config["topology_only"] = config.getoption("--topology-only") @@ -177,9 +176,11 @@ def pytest_runtest_makereport(item, call): else: error = True # Handle assert failures - parent._previousfailed = item # pylint: disable=W0212 + parent._previousfailed = item # pylint: disable=W0212 logger.error( - 'assert failed at "{}/{}": {}'.format(modname, item.name, call.excinfo.value) + 'assert failed at "{}/{}": {}'.format( + modname, item.name, call.excinfo.value + ) ) # (topogen) Set topology error to avoid advancing in the test. @@ -188,7 +189,6 @@ def pytest_runtest_makereport(item, call): # This will cause topogen to report error on `routers_have_failure`. tgen.set_error("{}/{}".format(modname, item.name)) - if error and topotest_extra_config["shell_on_error"]: for router in tgen.routers(): pause = True diff --git a/tests/topotests/isis-snmp/test_isis_snmp.py b/tests/topotests/isis-snmp/test_isis_snmp.py index 1bcd0eefc..07f3335e2 100755 --- a/tests/topotests/isis-snmp/test_isis_snmp.py +++ b/tests/topotests/isis-snmp/test_isis_snmp.py @@ -124,7 +124,6 @@ class TemplateTopo(Topo): switch.add_link(tgen.gears["r3"]) - def setup_module(mod): "Sets up the pytest environment" @@ -148,20 +147,24 @@ def setup_module(mod): # Don't start the following in the CE nodes if router.name[0] == "r": router.load_config( - TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname)), + TopoRouter.RD_ISIS, + os.path.join(CWD, "{}/isisd.conf".format(rname)), "-M snmp", ) router.load_config( - TopoRouter.RD_LDP, os.path.join(CWD, "{}/ldpd.conf".format(rname)), + TopoRouter.RD_LDP, + os.path.join(CWD, "{}/ldpd.conf".format(rname)), ) router.load_config( - TopoRouter.RD_SNMP, os.path.join(CWD, "{}/snmpd.conf".format(rname)), + TopoRouter.RD_SNMP, + os.path.join(CWD, "{}/snmpd.conf".format(rname)), "-Le -Ivacm_conf,usmConf,iquery -V -DAgentX,trap", ) # After loading the configurations, this function loads configured daemons. tgen.start_router() + def teardown_module(mod): "Teardown the pytest environment" tgen = get_topogen() @@ -169,6 +172,7 @@ def teardown_module(mod): # This function tears down the whole topology. tgen.stop_topology() + def router_compare_json_output(rname, command, reference): "Compare router JSON output" @@ -184,6 +188,7 @@ def router_compare_json_output(rname, command, reference): assertmsg = '"{}" JSON output mismatches the expected result'.format(rname) assert diff is None, assertmsg + def generate_oid(numoids, index1, index2): if numoids == 1: oid = "{}".format(index1) @@ -200,7 +205,9 @@ def test_isis_convergence(): router_compare_json_output( rname, "show yang operational-data /frr-interface:lib isisd", - "show_yang_interface_isis_adjacencies.ref") + "show_yang_interface_isis_adjacencies.ref", + ) + def test_r1_scalar_snmp(): "Wait for protocol convergence" @@ -213,26 +220,26 @@ def test_r1_scalar_snmp(): r1 = tgen.net.get("r1") r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c") - assert r1_snmp.test_oid('isisSysVersion', "one(1)") - assert r1_snmp.test_oid('isisSysLevelType', "level1and2(3)") - assert r1_snmp.test_oid('isisSysID',"00 00 00 00 00 01") - assert r1_snmp.test_oid('isisSysMaxPathSplits',"32") - assert r1_snmp.test_oid('isisSysMaxLSPGenInt',"900 seconds") - assert r1_snmp.test_oid('isisSysAdminState',"on(1)") - assert r1_snmp.test_oid('isisSysMaxAge',"1200 seconds") - assert r1_snmp.test_oid('isisSysProtSupported',"07 5 6 7") + assert r1_snmp.test_oid("isisSysVersion", "one(1)") + assert r1_snmp.test_oid("isisSysLevelType", "level1and2(3)") + assert r1_snmp.test_oid("isisSysID", "00 00 00 00 00 01") + assert r1_snmp.test_oid("isisSysMaxPathSplits", "32") + assert r1_snmp.test_oid("isisSysMaxLSPGenInt", "900 seconds") + assert r1_snmp.test_oid("isisSysAdminState", "on(1)") + assert r1_snmp.test_oid("isisSysMaxAge", "1200 seconds") + assert r1_snmp.test_oid("isisSysProtSupported", "07 5 6 7") r2 = tgen.net.get("r2") r2_snmp = SnmpTester(r2, "2.2.2.2", "public", "2c") - - assert r2_snmp.test_oid('isisSysVersion', "one(1)") - assert r2_snmp.test_oid('isisSysLevelType', "level1and2(3)") - assert r2_snmp.test_oid('isisSysID',"00 00 00 00 00 02") - assert r2_snmp.test_oid('isisSysMaxPathSplits',"32") - assert r2_snmp.test_oid('isisSysMaxLSPGenInt',"900 seconds") - assert r2_snmp.test_oid('isisSysAdminState',"on(1)") - assert r2_snmp.test_oid('isisSysMaxAge',"1200 seconds") - assert r2_snmp.test_oid('isisSysProtSupported',"07 5 6 7") + + assert r2_snmp.test_oid("isisSysVersion", "one(1)") + assert r2_snmp.test_oid("isisSysLevelType", "level1and2(3)") + assert r2_snmp.test_oid("isisSysID", "00 00 00 00 00 02") + assert r2_snmp.test_oid("isisSysMaxPathSplits", "32") + assert r2_snmp.test_oid("isisSysMaxLSPGenInt", "900 seconds") + assert r2_snmp.test_oid("isisSysAdminState", "on(1)") + assert r2_snmp.test_oid("isisSysMaxAge", "1200 seconds") + assert r2_snmp.test_oid("isisSysProtSupported", "07 5 6 7") circtable_test = { @@ -245,7 +252,8 @@ circtable_test = { "isisCircMeshGroupEnabled": ["inactive(1)", "inactive(1)", "inactive(1)"], "isisCircSmallHellos": ["false(2)", "false(2)", "false(2)"], "isisCirc3WayEnabled": ["false(2)", "false(2)", "false(2)"], - } +} + def test_r1_isisCircTable(): tgen = get_topogen() @@ -256,9 +264,9 @@ def test_r1_isisCircTable(): r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c") oids = [] - oids.append(generate_oid(1,1,0)) - oids.append(generate_oid(1,2,0)) - oids.append(generate_oid(1,3,0)) + oids.append(generate_oid(1, 1, 0)) + oids.append(generate_oid(1, 2, 0)) + oids.append(generate_oid(1, 3, 0)) # check items for item in circtable_test.keys(): @@ -267,14 +275,26 @@ def test_r1_isisCircTable(): ) assert r1_snmp.test_oid_walk(item, circtable_test[item], oids), assertmsg + circleveltable_test = { "isisCircLevelMetric": ["10", "10", "10", "10"], "isisCircLevelWideMetric": ["10", "10", "0", "0"], "isisCircLevelISPriority": ["64", "64", "64", "64"], "isisCircLevelHelloMultiplier": ["10", "10", "10", "10"], - "isisCircLevelHelloTimer": ["3000 milliseconds", "3000 milliseconds", "3000 milliseconds", "3000 milliseconds"], - "isisCircLevelMinLSPRetransInt": ["1 seconds", "1 seconds", "0 seconds", "0 seconds"], - } + "isisCircLevelHelloTimer": [ + "3000 milliseconds", + "3000 milliseconds", + "3000 milliseconds", + "3000 milliseconds", + ], + "isisCircLevelMinLSPRetransInt": [ + "1 seconds", + "1 seconds", + "0 seconds", + "0 seconds", + ], +} + def test_r1_isislevelCircTable(): tgen = get_topogen() @@ -285,10 +305,10 @@ def test_r1_isislevelCircTable(): r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c") oids = [] - oids.append(generate_oid(2,1,"area")) - oids.append(generate_oid(2,2,"area")) - oids.append(generate_oid(2,3,"area")) - oids.append(generate_oid(2,3,"domain")) + oids.append(generate_oid(2, 1, "area")) + oids.append(generate_oid(2, 2, "area")) + oids.append(generate_oid(2, 3, "area")) + oids.append(generate_oid(2, 3, "domain")) # check items for item in circleveltable_test.keys(): @@ -316,6 +336,7 @@ adjtable_down_test = { "isisISAdjNeighPriority": ["64"], } + def test_r1_isisAdjTable(): "check ISIS Adjacency Table" tgen = get_topogen() @@ -324,11 +345,11 @@ def test_r1_isisAdjTable(): r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c") oids = [] - oids.append(generate_oid(2,1,1)) - oids.append(generate_oid(2,2,1)) + oids.append(generate_oid(2, 1, 1)) + oids.append(generate_oid(2, 2, 1)) oids_down = [] - oids_down.append(generate_oid(2,1,1)) + oids_down.append(generate_oid(2, 1, 1)) # check items for item in adjtable_test.keys(): @@ -337,7 +358,6 @@ def test_r1_isisAdjTable(): ) assert r1_snmp.test_oid_walk(item, adjtable_test[item], oids), assertmsg - # shutdown interface and one adjacency should be removed "check ISIS adjacency is removed when interface is shutdown" r1_cmd.vtysh_cmd("conf t\ninterface r1-eth1\nshutdown") @@ -347,7 +367,9 @@ def test_r1_isisAdjTable(): assertmsg = "{} should be {} oids {} full dict {}:".format( item, adjtable_down_test[item], oids_down, r1_snmp.walk(item) ) - assert r1_snmp.test_oid_walk(item, adjtable_down_test[item], oids_down), assertmsg + assert r1_snmp.test_oid_walk( + item, adjtable_down_test[item], oids_down + ), assertmsg # no shutdown interface and adjacency should be restored r1_cmd.vtysh_cmd("conf t\ninterface r1-eth1\nno shutdown") diff --git a/tests/topotests/isis-topo1-vrf/test_isis_topo1_vrf.py b/tests/topotests/isis-topo1-vrf/test_isis_topo1_vrf.py index 281e1f6a0..b7fe0c2dd 100644 --- a/tests/topotests/isis-topo1-vrf/test_isis_topo1_vrf.py +++ b/tests/topotests/isis-topo1-vrf/test_isis_topo1_vrf.py @@ -42,8 +42,8 @@ from lib.topogen import Topogen, TopoRouter, get_topogen from lib.topolog import logger from lib.topotest import iproute2_is_vrf_capable from lib.common_config import ( - required_linux_kernel_version, - adjust_router_l3mdev, + required_linux_kernel_version, + adjust_router_l3mdev, ) from mininet.topo import Topo @@ -51,18 +51,19 @@ from mininet.topo import Topo pytestmark = [pytest.mark.isisd] VERTEX_TYPE_LIST = [ - "pseudo_IS", - "pseudo_TE-IS", - "IS", - "TE-IS", - "ES", - "IP internal", - "IP external", - "IP TE", - "IP6 internal", - "IP6 external", - "UNKNOWN" - ] + "pseudo_IS", + "pseudo_TE-IS", + "IS", + "TE-IS", + "ES", + "IP internal", + "IP external", + "IP TE", + "IP6 internal", + "IP6 external", + "UNKNOWN", +] + class ISISTopo1(Topo): "Simple two layer ISIS vrf topology" @@ -330,8 +331,9 @@ def parse_topology(lines, level): ipv = "ipv4" continue - item_match = re.match(r"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)" - , line) + item_match = re.match( + r"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)", line + ) if ( item_match is not None and item_match.group(1) == "Vertex" @@ -344,8 +346,12 @@ def parse_topology(lines, level): # Skip header continue - item_match = re.match(r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)" - .format(vertex_type_regex), line) + item_match = re.match( + r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)".format( + vertex_type_regex + ), + line, + ) if item_match is not None: areas[area][level][ipv].append( { @@ -359,8 +365,10 @@ def parse_topology(lines, level): ) continue - item_match = re.match(r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)" - .format(vertex_type_regex), line) + item_match = re.match( + r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)".format(vertex_type_regex), + line, + ) if item_match is not None: areas[area][level][ipv].append( diff --git a/tests/topotests/ldp-snmp/test_ldp_snmp_topo1.py b/tests/topotests/ldp-snmp/test_ldp_snmp_topo1.py index 4144f9b26..f47d90615 100644 --- a/tests/topotests/ldp-snmp/test_ldp_snmp_topo1.py +++ b/tests/topotests/ldp-snmp/test_ldp_snmp_topo1.py @@ -141,15 +141,16 @@ def setup_module(mod): TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname)) ) router.load_config( - TopoRouter.RD_LDP, os.path.join(CWD, "{}/ldpd.conf".format(rname)), - "-M snmp" + TopoRouter.RD_LDP, + os.path.join(CWD, "{}/ldpd.conf".format(rname)), + "-M snmp", ) router.load_config( - TopoRouter.RD_SNMP, os.path.join(CWD, "{}/snmpd.conf".format(rname)), - "-Le -Ivacm_conf,usmConf,iquery -V -DAgentX,trap" + TopoRouter.RD_SNMP, + os.path.join(CWD, "{}/snmpd.conf".format(rname)), + "-Le -Ivacm_conf,usmConf,iquery -V -DAgentX,trap", ) - tgen.start_router() @@ -199,7 +200,7 @@ def test_rib(): # Skip if previous fatal error condition is raised # TODO: disabling this check to avoid 'snmpd not running' errors - #if tgen.routers_have_failure(): + # if tgen.routers_have_failure(): # pytest.skip(tgen.errors) for rname in ["r1", "r2", "r3"]: @@ -212,7 +213,7 @@ def test_ldp_adjacencies(): # Skip if previous fatal error condition is raised # TODO: disabling this check to avoid 'snmpd not running' errors - #if tgen.routers_have_failure(): + # if tgen.routers_have_failure(): # pytest.skip(tgen.errors) for rname in ["r1", "r2", "r3"]: @@ -226,7 +227,7 @@ def test_ldp_neighbors(): tgen = get_topogen() # Skip if previous fatal error condition is raised - #if tgen.routers_have_failure(): + # if tgen.routers_have_failure(): # pytest.skip(tgen.errors) for rname in ["r1", "r2", "r3"]: @@ -242,8 +243,8 @@ def test_r1_ldp_lsr_objects(): r1 = tgen.net.get("r1") r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c") - assert r1_snmp.test_oid('mplsLdpLsrId', "01 01 01 01") - assert r1_snmp.test_oid('mplsLdpLsrLoopDetectionCapable', 'none(1)') + assert r1_snmp.test_oid("mplsLdpLsrId", "01 01 01 01") + assert r1_snmp.test_oid("mplsLdpLsrLoopDetectionCapable", "none(1)") def test_r1_ldp_entity_table(): @@ -253,52 +254,31 @@ def test_r1_ldp_entity_table(): r1 = tgen.net.get("r1") r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c") - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityLdpId', ['1.1.1.1:0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityIndex', ['1']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityProtocolVersion', ['1']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityAdminStatus', ['enable(1)']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityOperStatus', ['enabled(2)']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityTcpPort', ['646']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityUdpDscPort', ['646']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityMaxPduLength', ['4096 octets']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityKeepAliveHoldTimer', ['180 seconds']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityHelloHoldTimer', ['0 seconds']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityInitSessionThreshold', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityLabelDistMethod', ['downstreamUnsolicited(2)']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityLabelRetentionMode', ['liberal(2)']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityPathVectorLimit', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityHopCountLimit', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityTransportAddrKind', ['loopback(2)']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityTargetPeer', ['true(1)']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityTargetPeerAddrType', ['ipv4(1)']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityTargetPeerAddr', ['01 01 01 01']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityLabelType', ['generic(1)']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityDiscontinuityTime', ['(0) 0:00:00.00']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStorageType', ['nonVolatile(3)']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityRowStatus', ['createAndGo(4)']) + assert r1_snmp.test_oid_walk("mplsLdpEntityLdpId", ["1.1.1.1:0"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityIndex", ["1"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityProtocolVersion", ["1"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityAdminStatus", ["enable(1)"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityOperStatus", ["enabled(2)"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityTcpPort", ["646"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityUdpDscPort", ["646"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityMaxPduLength", ["4096 octets"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityKeepAliveHoldTimer", ["180 seconds"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityHelloHoldTimer", ["0 seconds"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityInitSessionThreshold", ["0"]) + assert r1_snmp.test_oid_walk( + "mplsLdpEntityLabelDistMethod", ["downstreamUnsolicited(2)"] + ) + assert r1_snmp.test_oid_walk("mplsLdpEntityLabelRetentionMode", ["liberal(2)"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityPathVectorLimit", ["0"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityHopCountLimit", ["0"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityTransportAddrKind", ["loopback(2)"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityTargetPeer", ["true(1)"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityTargetPeerAddrType", ["ipv4(1)"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityTargetPeerAddr", ["01 01 01 01"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityLabelType", ["generic(1)"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityDiscontinuityTime", ["(0) 0:00:00.00"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityStorageType", ["nonVolatile(3)"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityRowStatus", ["createAndGo(4)"]) def test_r1_ldp_entity_stats_table(): @@ -308,32 +288,23 @@ def test_r1_ldp_entity_stats_table(): r1 = tgen.net.get("r1") r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c") + assert r1_snmp.test_oid_walk("mplsLdpEntityStatsSessionAttempts", ["0"]) assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsSessionAttempts', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsSessionRejectedNoHelloErrors', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsSessionRejectedAdErrors', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsSessionRejectedMaxPduErrors', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsSessionRejectedLRErrors', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsBadLdpIdentifierErrors', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsBadPduLengthErrors', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsBadMessageLengthErrors', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsBadTlvLengthErrors', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsMalformedTlvValueErrors', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsKeepAliveTimerExpErrors', ['0']) + "mplsLdpEntityStatsSessionRejectedNoHelloErrors", ["0"] + ) + assert r1_snmp.test_oid_walk("mplsLdpEntityStatsSessionRejectedAdErrors", ["0"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityStatsSessionRejectedMaxPduErrors", ["0"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityStatsSessionRejectedLRErrors", ["0"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityStatsBadLdpIdentifierErrors", ["0"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityStatsBadPduLengthErrors", ["0"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityStatsBadMessageLengthErrors", ["0"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityStatsBadTlvLengthErrors", ["0"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityStatsMalformedTlvValueErrors", ["0"]) + assert r1_snmp.test_oid_walk("mplsLdpEntityStatsKeepAliveTimerExpErrors", ["0"]) assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsShutdownReceivedNotifications', ['0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpEntityStatsShutdownSentNotifications', ['0']) + "mplsLdpEntityStatsShutdownReceivedNotifications", ["0"] + ) + assert r1_snmp.test_oid_walk("mplsLdpEntityStatsShutdownSentNotifications", ["0"]) def test_r1_ldp_peer_table(): @@ -343,17 +314,16 @@ def test_r1_ldp_peer_table(): r1 = tgen.net.get("r1") r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c") + assert r1_snmp.test_oid_walk("mplsLdpPeerLdpId", ["2.2.2.2:0", "3.3.3.3:0"]) assert r1_snmp.test_oid_walk( - 'mplsLdpPeerLdpId', ['2.2.2.2:0', '3.3.3.3:0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpPeerLabelDistMethod', - ['downstreamUnsolicited(2)', 'downstreamUnsolicited(2)']) - assert r1_snmp.test_oid_walk( - 'mplsLdpPeerPathVectorLimit', ['0', '0']) + "mplsLdpPeerLabelDistMethod", + ["downstreamUnsolicited(2)", "downstreamUnsolicited(2)"], + ) + assert r1_snmp.test_oid_walk("mplsLdpPeerPathVectorLimit", ["0", "0"]) + assert r1_snmp.test_oid_walk("mplsLdpPeerTransportAddrType", ["ipv4(1)", "ipv4(1)"]) assert r1_snmp.test_oid_walk( - 'mplsLdpPeerTransportAddrType', ['ipv4(1)', 'ipv4(1)']) - assert r1_snmp.test_oid_walk( - 'mplsLdpPeerTransportAddr', ['02 02 02 02', '03 03 03 03']) + "mplsLdpPeerTransportAddr", ["02 02 02 02", "03 03 03 03"] + ) def test_r1_ldp_session_table(): @@ -363,18 +333,20 @@ def test_r1_ldp_session_table(): r1 = tgen.net.get("r1") r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c") - assert r1_snmp.test_oid_walk('mplsLdpSessionState', - ['operational(5)', 'operational(5)']) - assert r1_snmp.test_oid_walk('mplsLdpSessionRole', - ['passive(3)', 'passive(3)']) - assert r1_snmp.test_oid_walk('mplsLdpSessionProtocolVersion', - ['1', '1']) - assert r1_snmp.test_oid_walk('mplsLdpSessionKeepAliveTime', - ['180 seconds', '180 seconds']) - assert r1_snmp.test_oid_walk('mplsLdpSessionMaxPduLength', - ['4096 octets', '4096 octets']) - assert r1_snmp.test_oid_walk('mplsLdpSessionDiscontinuityTime', - ['(0) 0:00:00.00', '(0) 0:00:00.00']) + assert r1_snmp.test_oid_walk( + "mplsLdpSessionState", ["operational(5)", "operational(5)"] + ) + assert r1_snmp.test_oid_walk("mplsLdpSessionRole", ["passive(3)", "passive(3)"]) + assert r1_snmp.test_oid_walk("mplsLdpSessionProtocolVersion", ["1", "1"]) + assert r1_snmp.test_oid_walk( + "mplsLdpSessionKeepAliveTime", ["180 seconds", "180 seconds"] + ) + assert r1_snmp.test_oid_walk( + "mplsLdpSessionMaxPduLength", ["4096 octets", "4096 octets"] + ) + assert r1_snmp.test_oid_walk( + "mplsLdpSessionDiscontinuityTime", ["(0) 0:00:00.00", "(0) 0:00:00.00"] + ) def test_r1_ldp_session_stats_table(): @@ -384,10 +356,8 @@ def test_r1_ldp_session_stats_table(): r1 = tgen.net.get("r1") r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c") - assert r1_snmp.test_oid_walk( - 'mplsLdpSessionStatsUnknownMesTypeErrors', ['0', '0']) - assert r1_snmp.test_oid_walk( - 'mplsLdpSessionStatsUnknownTlvErrors', ['0', '0']) + assert r1_snmp.test_oid_walk("mplsLdpSessionStatsUnknownMesTypeErrors", ["0", "0"]) + assert r1_snmp.test_oid_walk("mplsLdpSessionStatsUnknownTlvErrors", ["0", "0"]) def test_r1_ldp_hello_adjacency_table(): @@ -397,12 +367,11 @@ def test_r1_ldp_hello_adjacency_table(): r1 = tgen.net.get("r1") r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c") - assert r1_snmp.test_oid_walk('mplsLdpHelloAdjacencyIndex', - ['1', '2', '1']) - assert r1_snmp.test_oid_walk('mplsLdpHelloAdjacencyHoldTime', - ['15', '45', '15']) - assert r1_snmp.test_oid_walk('mplsLdpHelloAdjacencyType', - ['link(1)', 'targeted(2)', 'link(1)']) + assert r1_snmp.test_oid_walk("mplsLdpHelloAdjacencyIndex", ["1", "2", "1"]) + assert r1_snmp.test_oid_walk("mplsLdpHelloAdjacencyHoldTime", ["15", "45", "15"]) + assert r1_snmp.test_oid_walk( + "mplsLdpHelloAdjacencyType", ["link(1)", "targeted(2)", "link(1)"] + ) # Memory leak test template diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index 83ee86663..d2212d180 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -43,7 +43,7 @@ from lib.common_config import ( run_frr_cmd, FRRCFG_FILE, retry, - get_ipv6_linklocal_address + get_ipv6_linklocal_address, ) LOGDIR = "/tmp/topotests/" diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index 55266594e..ead593d2c 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -75,54 +75,54 @@ config_section = "topogen" # Debug logs for daemons DEBUG_LOGS = { "pimd": [ - 'debug msdp events', - 'debug msdp packets', - 'debug igmp events', - 'debug igmp trace', - 'debug mroute', - 'debug mroute detail', - 'debug pim events', - 'debug pim packets', - 'debug pim trace', - 'debug pim zebra', - 'debug pim bsm', - 'debug pim packets joins', - 'debug pim packets register', - 'debug pim nht' + "debug msdp events", + "debug msdp packets", + "debug igmp events", + "debug igmp trace", + "debug mroute", + "debug mroute detail", + "debug pim events", + "debug pim packets", + "debug pim trace", + "debug pim zebra", + "debug pim bsm", + "debug pim packets joins", + "debug pim packets register", + "debug pim nht", ], "bgpd": [ - 'debug bgp neighbor-events', - 'debug bgp updates', - 'debug bgp zebra', - 'debug bgp nht', - 'debug bgp neighbor-events', - 'debug bgp graceful-restart', - 'debug bgp update-groups', - 'debug bgp vpn leak-from-vrf', - 'debug bgp vpn leak-to-vrf', - 'debug bgp zebr', - 'debug bgp updates', - 'debug bgp nht', - 'debug bgp neighbor-events', - 'debug vrf' + "debug bgp neighbor-events", + "debug bgp updates", + "debug bgp zebra", + "debug bgp nht", + "debug bgp neighbor-events", + "debug bgp graceful-restart", + "debug bgp update-groups", + "debug bgp vpn leak-from-vrf", + "debug bgp vpn leak-to-vrf", + "debug bgp zebr", + "debug bgp updates", + "debug bgp nht", + "debug bgp neighbor-events", + "debug vrf", ], "zebra": [ - 'debug zebra events', - 'debug zebra rib', - 'debug zebra vxlan', - 'debug zebra nht' + "debug zebra events", + "debug zebra rib", + "debug zebra vxlan", + "debug zebra nht", ], "ospf": [ - 'debug ospf event', - 'debug ospf ism', - 'debug ospf lsa', - 'debug ospf nsm', - 'debug ospf nssa', - 'debug ospf packet all', - 'debug ospf sr', - 'debug ospf te', - 'debug ospf zebra', - ] + "debug ospf event", + "debug ospf ism", + "debug ospf lsa", + "debug ospf nsm", + "debug ospf nssa", + "debug ospf packet all", + "debug ospf sr", + "debug ospf te", + "debug ospf zebra", + ], } if config.has_option("topogen", "verbosity"): @@ -1155,10 +1155,8 @@ def create_debug_log_config(tgen, input_dict, build=False): log_file = debug_dict.setdefault("log_file", None) if log_file: - _log_file = os.path.join(LOGDIR, tgen.modname, - log_file) - debug_config.append("log file {} \n".\ - format(_log_file)) + _log_file = os.path.join(LOGDIR, tgen.modname, log_file) + debug_config.append("log file {} \n".format(_log_file)) if type(enable_logs) is list: for daemon in enable_logs: @@ -1178,10 +1176,9 @@ def create_debug_log_config(tgen, input_dict, build=False): for debug_log in debug_logs: debug_config.append("no {}".format(debug_log)) - result = create_common_configuration(tgen, router, - debug_config, - "debug_log_config", - build=build) + result = create_common_configuration( + tgen, router, debug_config, "debug_log_config", build=build + ) except InvalidCLIError: # Traceback errormsg = traceback.format_exc() @@ -3868,7 +3865,7 @@ def get_ipv6_linklocal_address(topo, node, intf): """ tgen = get_topogen() ext_nh = tgen.net[node].get_ipv6_linklocal() - req_nh = topo[node]['links'][intf]['interface'] + req_nh = topo[node]["links"][intf]["interface"] llip = None for llips in ext_nh: if llips[0] == req_nh: @@ -3876,8 +3873,9 @@ def get_ipv6_linklocal_address(topo, node, intf): logger.info("Link local ip found = %s", llip) return llip - errormsg = "Failed: Link local ip not found on router {}, "\ - "interface {}".format(node, intf) + errormsg = "Failed: Link local ip not found on router {}, " "interface {}".format( + node, intf + ) return errormsg @@ -4553,9 +4551,7 @@ def adjust_router_l3mdev(tgen, router): ) output = tgen.net[router].cmd("sysctl -n net.ipv4.tcp_l3mdev_accept") - logger.info( - "router {0}: existing tcp_l3mdev_accept was {1}".format(router, output) - ) + logger.info("router {0}: existing tcp_l3mdev_accept was {1}".format(router, output)) tgen.net[router].cmd( "sysctl -w net.ipv4.tcp_l3mdev_accept={}".format(l3mdev_accept) diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py index 9f642411b..04a12d0ee 100644 --- a/tests/topotests/lib/ospf.py +++ b/tests/topotests/lib/ospf.py @@ -344,9 +344,7 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config= for lnk in input_dict[router]["links"].keys(): if "ospf" not in input_dict[router]["links"][lnk]: logger.debug( - "Router %s: ospf config is not present in" - "input_dict", - router + "Router %s: ospf config is not present in" "input_dict", router ) continue ospf_data = input_dict[router]["links"][lnk]["ospf"] diff --git a/tests/topotests/lib/topotest.py b/tests/topotests/lib/topotest.py index aaa2c70ad..2a4611585 100644 --- a/tests/topotests/lib/topotest.py +++ b/tests/topotests/lib/topotest.py @@ -54,6 +54,7 @@ from mininet.term import makeTerm g_extra_config = {} + def gdb_core(obj, daemon, corefiles): gdbcmds = """ info threads @@ -544,7 +545,7 @@ def iproute2_is_vrf_capable(): ["ip", "route", "show", "vrf"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, - stdin=subprocess.PIPE + stdin=subprocess.PIPE, ) iproute2_err = subp.communicate()[1].splitlines()[0].split()[0] @@ -1345,15 +1346,9 @@ class Router(Node): # Run a command in a new window (gnome-terminal, screen, tmux, xterm) def runInWindow(self, cmd, title=None): topo_terminal = os.getenv("FRR_TOPO_TERMINAL") - if topo_terminal or ( - "TMUX" not in os.environ and "STY" not in os.environ - ): + if topo_terminal or ("TMUX" not in os.environ and "STY" not in os.environ): term = topo_terminal if topo_terminal else "xterm" - makeTerm( - self, - title=title if title else cmd, - term=term, - cmd=cmd) + makeTerm(self, title=title if title else cmd, term=term, cmd=cmd) else: nscmd = "sudo nsenter -m -n -t {} {}".format(self.pid, cmd) if "TMUX" in os.environ: @@ -1362,9 +1357,7 @@ class Router(Node): cmd = "{} {}".format(wcmd, nscmd) elif "STY" in os.environ: if os.path.exists( - "/run/screen/S-{}/{}".format( - os.environ['USER'], os.environ['STY'] - ) + "/run/screen/S-{}/{}".format(os.environ["USER"], os.environ["STY"]) ): wcmd = "screen" else: @@ -1372,7 +1365,6 @@ class Router(Node): cmd = "{} {}".format(wcmd, nscmd) self.cmd(cmd) - def startRouter(self, tgen=None): # Disable integrated-vtysh-config self.cmd( @@ -1459,7 +1451,7 @@ class Router(Node): def startRouterDaemons(self, daemons=None): "Starts all FRR daemons for this router." - gdb_breakpoints = g_extra_config["gdb_breakpoints"] + gdb_breakpoints = g_extra_config["gdb_breakpoints"] gdb_daemons = g_extra_config["gdb_daemons"] gdb_routers = g_extra_config["gdb_routers"] @@ -1520,12 +1512,10 @@ class Router(Node): if ( (gdb_routers or gdb_daemons) - and (not gdb_routers - or self.name in gdb_routers - or "all" in gdb_routers) - and (not gdb_daemons - or daemon in gdb_daemons - or "all" in gdb_daemons) + and ( + not gdb_routers or self.name in gdb_routers or "all" in gdb_routers + ) + and (not gdb_daemons or daemon in gdb_daemons or "all" in gdb_daemons) ): if daemon == "snmpd": cmdopt += " -f " @@ -1546,7 +1536,6 @@ class Router(Node): self.cmd(" ".join([cmdenv, binary, cmdopt])) logger.info("{}: {} {} started".format(self, self.routertype, daemon)) - # Start Zebra first if "zebra" in daemons_list: start_daemon("zebra", "-s 90000000") diff --git a/tests/topotests/multicast-pim-bsm-topo1/test_mcast_pim_bsmp_01.py b/tests/topotests/multicast-pim-bsm-topo1/test_mcast_pim_bsmp_01.py index 6b7180978..cd398a511 100644 --- a/tests/topotests/multicast-pim-bsm-topo1/test_mcast_pim_bsmp_01.py +++ b/tests/topotests/multicast-pim-bsm-topo1/test_mcast_pim_bsmp_01.py @@ -648,7 +648,11 @@ def test_BSR_CRP_with_blackhole_address_p1(request): input_dict = { "i1": {"static_routes": [{"network": BSR1_ADDR, "next_hop": next_hop_rp}]}, "l1": {"static_routes": [{"network": BSR1_ADDR, "next_hop": next_hop_lhr}]}, - "f1": {"static_routes": [{"network": CRP, "next_hop": next_hop_fhr, "delete": True}]}, + "f1": { + "static_routes": [ + {"network": CRP, "next_hop": next_hop_fhr, "delete": True} + ] + }, } result = create_static_routes(tgen, input_dict) @@ -692,10 +696,11 @@ def test_BSR_CRP_with_blackhole_address_p1(request): step("Verify if b1 chosen as BSR in l1") result = verify_pim_bsr(tgen, topo, "l1", BSR_IP_1, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "b1 is not chosen as BSR in l1 \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "b1 is not chosen as BSR in l1 \n Error: {}".format( tc_name, result - )) + ) state_after = verify_pim_interface_traffic(tgen, state_dict) assert isinstance( @@ -841,10 +846,12 @@ def test_new_router_fwd_p0(request): # Verify bsr state in l1 step("Verify no BSR in l1 as i1 would not forward the no-forward bsm") result = verify_pim_bsr(tgen, topo, "l1", bsr_ip, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "BSR data is present after no-forward bsm also \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) # unconfigure unicast bsm on f1-i1-eth2 step("unconfigure unicast bsm on f1-i1-eth2, will forward with only mcast") @@ -966,10 +973,11 @@ def test_int_bsm_config_p1(request): result = verify_ip_mroutes( tgen, "i1", src_addr, GROUP_ADDRESS, iif, oil, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "Mroutes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "Mroutes are still present \n Error: {}".format( tc_name, result - )) + ) # unconfigure bsm processing on f1 on f1-i1-eth2 step("unconfigure bsm processing on f1 in f1-i1-eth2, will drop bsm") @@ -989,20 +997,21 @@ def test_int_bsm_config_p1(request): # Verify bsr state in i1 step("Verify if b1 is not chosen as BSR in i1") result = verify_pim_bsr(tgen, topo, "i1", bsr_ip, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "b1 is chosen as BSR in i1 \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "b1 is chosen as BSR in i1 \n Error: {}".format( tc_name, result - )) + ) # check if mroute still not installed because of rp not available step("check if mroute still not installed because of rp not available") result = verify_ip_mroutes( tgen, "i1", src_addr, GROUP_ADDRESS, iif, oil, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "mroute installed but rp not available \n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "mroute installed but rp not available \n Error: {}".format(tc_name, result) + ) # configure bsm processing on i1 on f1-i1-eth2 step("configure bsm processing on f1 in f1-i1-eth2, will accept bsm") @@ -1464,10 +1473,11 @@ def test_BSM_timeout_p0(request): tgen, topo, "f1", group, rp_source="BSR", expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "bsr has not aged out in f1 \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "bsr has not aged out in f1 \n Error: {}".format( tc_name, result - )) + ) # Verify RP mapping removed after hold timer expires group = "225.1.1.1/32" @@ -1491,20 +1501,23 @@ def test_BSM_timeout_p0(request): result = verify_join_state_and_timer( tgen, dut, iif, src_addr, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "join state is up and join timer is running in l1 \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) # Verify ip mroute is not installed step("Verify mroute not installed in l1") result = verify_ip_mroutes( tgen, dut, src_addr, GROUP_ADDRESS, iif, oil, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "mroute installed in l1 \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "mroute installed in l1 \n Error: {}".format( tc_name, result - )) + ) step("clear BSM database before moving to next case") clear_bsrp_data(tgen, topo) @@ -1657,10 +1670,11 @@ def test_iif_join_state_p0(request): result = verify_ip_mroutes( tgen, dut, src_addr, GROUP_ADDRESS, iif, oil, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "mroute installed in l1 \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "mroute installed in l1 \n Error: {}".format( tc_name, result - )) + ) # Add back route for RP to make it reachable step("Add back route for RP to make it reachable") diff --git a/tests/topotests/multicast-pim-bsm-topo2/test_mcast_pim_bsmp_02.py b/tests/topotests/multicast-pim-bsm-topo2/test_mcast_pim_bsmp_02.py index 1a80c0d46..199746d5f 100644 --- a/tests/topotests/multicast-pim-bsm-topo2/test_mcast_pim_bsmp_02.py +++ b/tests/topotests/multicast-pim-bsm-topo2/test_mcast_pim_bsmp_02.py @@ -458,10 +458,11 @@ def test_starg_mroute_p0(request): result = verify_ip_mroutes( tgen, dut, src_addr, GROUP_ADDRESS, iif, oil, wait=20, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "mroute installed in l1 \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "mroute installed in l1 \n Error: {}".format( tc_name, result - )) + ) # Send BSM again to configure rp step("Add back RP by sending BSM from b1") @@ -811,10 +812,11 @@ def test_BSR_election_p0(request): # Verify bsr state in FHR step("Verify if b2 is not chosen as bsr in f1") result = verify_pim_bsr(tgen, topo, "f1", bsr_ip2, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "b2 is chosen as bsr in f1 \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "b2 is chosen as bsr in f1 \n Error: {}".format( tc_name, result - )) + ) # Verify if b1 is still chosen as bsr step("Verify if b1 is still chosen as bsr in f1") diff --git a/tests/topotests/multicast-pim-sm-topo3/test_multicast_pim_sm_topo3.py b/tests/topotests/multicast-pim-sm-topo3/test_multicast_pim_sm_topo3.py index 1c2265454..33f476de4 100755 --- a/tests/topotests/multicast-pim-sm-topo3/test_multicast_pim_sm_topo3.py +++ b/tests/topotests/multicast-pim-sm-topo3/test_multicast_pim_sm_topo3.py @@ -3487,11 +3487,11 @@ def test_prune_sent_to_LHR_and_FHR_when_PIMnbr_down_p2(request): IGMP_JOIN_RANGE_1, expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "upstream is still present after shut the link from " - "FHR to RP from RP node \n Error: {}".format( - tc_name, result - )) + "FHR to RP from RP node \n Error: {}".format(tc_name, result) + ) step(" No shut the link from FHR to RP from RP node") @@ -3638,11 +3638,11 @@ def test_prune_sent_to_LHR_and_FHR_when_PIMnbr_down_p2(request): IGMP_JOIN_RANGE_1, expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "upstream is still present after shut the link from " - "FHR to RP from FHR node \n Error: {}".format( - tc_name, result - )) + "FHR to RP from FHR node \n Error: {}".format(tc_name, result) + ) step(" No shut the link from FHR to RP from FHR node") diff --git a/tests/topotests/multicast-pim-sm-topo3/test_multicast_pim_sm_topo4.py b/tests/topotests/multicast-pim-sm-topo3/test_multicast_pim_sm_topo4.py index 68b7849c2..1081b764a 100755 --- a/tests/topotests/multicast-pim-sm-topo3/test_multicast_pim_sm_topo4.py +++ b/tests/topotests/multicast-pim-sm-topo3/test_multicast_pim_sm_topo4.py @@ -490,10 +490,12 @@ def test_mroute_when_RP_reachable_default_route_p2(request): data["oil"], expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "mroutes(S,G) are present after delete of static routes on c1 \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) result = verify_upstream_iif( tgen, @@ -503,10 +505,12 @@ def test_mroute_when_RP_reachable_default_route_p2(request): IGMP_JOIN_RANGE_1, expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "upstream is present after delete of static routes on c1 \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) for data in input_dict_starg: result = verify_ip_mroutes( @@ -518,10 +522,12 @@ def test_mroute_when_RP_reachable_default_route_p2(request): data["oil"], expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "mroutes(*,G) are present after delete of static routes on c1 \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) result = verify_upstream_iif( tgen, @@ -531,10 +537,12 @@ def test_mroute_when_RP_reachable_default_route_p2(request): IGMP_JOIN_RANGE_1, expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "upstream is present after delete of static routes on c1 \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("Configure default routes on c2") @@ -557,10 +565,12 @@ def test_mroute_when_RP_reachable_default_route_p2(request): result = verify_pim_rp_info( tgen, topo, dut, GROUP_RANGE_1, "Unknown", rp_address, SOURCE, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "RP info is unknown after removing static route from c2 \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("Verify (s,g) populated after adding default route ") @@ -787,10 +797,11 @@ def test_mroute_with_RP_default_route_all_nodes_p2(request): data["oil"], expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " - "mroutes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "mroutes are still present \n Error: {}".format( tc_name, result - )) + ) result = verify_upstream_iif( tgen, @@ -800,10 +811,11 @@ def test_mroute_with_RP_default_route_all_nodes_p2(request): IGMP_JOIN_RANGE_1, expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " - "upstream is still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "upstream is still present \n Error: {}".format( tc_name, result - )) + ) step("Configure default routes on all the nodes") @@ -840,10 +852,12 @@ def test_mroute_with_RP_default_route_all_nodes_p2(request): result = verify_pim_rp_info( tgen, topo, dut, GROUP_RANGE_1, "Unknown", rp_address, SOURCE, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "RP info is unknown after removing static route from c2 \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("Verify (s,g) populated after adding default route ") diff --git a/tests/topotests/multicast-pim-static-rp-topo1/test_multicast_pim_static_rp.py b/tests/topotests/multicast-pim-static-rp-topo1/test_multicast_pim_static_rp.py index 1317ec67b..e90230eb3 100755 --- a/tests/topotests/multicast-pim-static-rp-topo1/test_multicast_pim_static_rp.py +++ b/tests/topotests/multicast-pim-static-rp-topo1/test_multicast_pim_static_rp.py @@ -423,10 +423,12 @@ def test_add_delete_static_RP_p0(request): dut = "r1" interface = "r1-r0-eth0" result = verify_igmp_groups(tgen, dut, interface, GROUP_ADDRESS, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: igmp group present without any IGMP join \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r1: Verify show ip pim interface traffic without any IGMP join") state_dict = {"r1": {"r1-r2-eth1": ["pruneTx"]}} @@ -492,26 +494,29 @@ def test_add_delete_static_RP_p0(request): result = verify_pim_rp_info( tgen, topo, dut, GROUP_RANGE_ALL, iif, rp_address, SOURCE, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: RP info present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: RP info present \n Error: {}".format( tc_name, result - )) + ) step("r1: Verify upstream IIF interface") result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: upstream IIF interface present \n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: upstream IIF interface present \n Error: {}".format(tc_name, result) + ) step("r1: Verify upstream join state and join timer") result = verify_join_state_and_timer( tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: upstream join state is up and join timer is running \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r1: Verify PIM state") result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS, expected=False) @@ -521,10 +526,11 @@ def test_add_delete_static_RP_p0(request): step("r1: Verify ip mroutes") result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: mroutes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: mroutes are still present \n Error: {}".format( tc_name, result - )) + ) step("r1: Verify show ip pim interface traffic without any IGMP join") state_after = verify_pim_interface_traffic(tgen, state_dict) @@ -681,10 +687,12 @@ def test_SPT_RPT_path_same_p1(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (S, G) upstream join state is up and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: Verify (S, G) ip mroutes") oif = "r3-r2-eth1" @@ -811,17 +819,19 @@ def test_not_reachable_static_RP_p0(request): "using show ip pim state" ) result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "OIL is not same and IIF is not cleared on R1 \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r1: upstream IIF should be unknown , verify using show ip pim" "upstream") result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: upstream IIF is not unknown \n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: upstream IIF is not unknown \n Error: {}".format(tc_name, result) + ) step( "r1: join state should not be joined and join timer should stop," @@ -830,10 +840,12 @@ def test_not_reachable_static_RP_p0(request): result = verify_join_state_and_timer( tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: join state is joined and timer is not stopped \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step( "r1: (*,G) prune is sent towards the RP interface, verify using" @@ -850,10 +862,12 @@ def test_not_reachable_static_RP_p0(request): step("r1: (*, G) cleared from mroute table using show ip mroute") result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: (*, G) are not cleared from mroute table \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) logger.info("Expected behavior: {}".format(result)) # Uncomment next line for debugging @@ -920,10 +934,11 @@ def test_add_RP_after_join_received_p1(request): result = verify_pim_rp_info( tgen, topo, dut, GROUP_RANGE_ALL, iif, rp_address, SOURCE, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: rp-info is present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: rp-info is present \n Error: {}".format( tc_name, result - )) + ) step("joinTx value before join sent") state_dict = {"r1": {"r1-r2-eth1": ["joinTx"]}} @@ -944,34 +959,38 @@ def test_add_RP_after_join_received_p1(request): step("r1: Verify upstream IIF interface") result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: upstream IFF interface is present \n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: upstream IFF interface is present \n Error: {}".format(tc_name, result) + ) step("r1: Verify upstream join state and join timer") result = verify_join_state_and_timer( tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: upstream join state is joined and timer is running \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r1: Verify PIM state") result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: PIM state is up\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: PIM state is up\n Error: {}".format( tc_name, result - )) + ) step("r1: Verify ip mroutes") result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: mroutes are still present\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: mroutes are still present\n Error: {}".format( tc_name, result - )) + ) step("r1: Configure static RP") input_dict = { @@ -1095,33 +1114,37 @@ def test_reachable_static_RP_after_join_p0(request): step("r1 : Verify upstream IIF interface") iif = "r1-r2-eth1" result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: upstream IIF interface is present\n Error: {}".format( - tc_name, result - )) + assert result is not True, ( + "Testcase {} : Failed \n " + "r1: upstream IIF interface is present\n Error: {}".format(tc_name, result) + ) step("r1 : Verify upstream join state and join timer") result = verify_join_state_and_timer( tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: upstream join state is joined and timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r1 : Verify PIM state") result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: PIM state is up\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: PIM state is up\n Error: {}".format( tc_name, result - )) + ) step("r1 : Verify ip mroutes") result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: mroutes are still present\n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: mroutes are still present\n Error: {}".format( tc_name, result - )) + ) step("r1: Make RP reachable") intf = "r1-r2-eth1" @@ -1362,10 +1385,12 @@ def test_send_join_on_higher_preffered_rp_p1(request): result = verify_pim_rp_info( tgen, topo, dut, GROUP_RANGE, oif, rp_address_2, SOURCE, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: rp-info is present for group 225.1.1.1 \n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step( "r1 : Verify RPF interface updated in mroute when higher preferred" @@ -1619,11 +1644,11 @@ def test_RP_configured_as_LHR_1_p1(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (S, G) upstream join state is joined and join" - " timer is running \n Error: {}".format( - tc_name, result - )) + " timer is running \n Error: {}".format(tc_name, result) + ) step("r3: Verify (S, G) ip mroutes") oif = "r3-r1-eth0" @@ -1828,10 +1853,12 @@ def test_RP_configured_as_LHR_2_p1(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: Verify (S, G) ip mroutes") oif = "r3-r1-eth0" @@ -2036,10 +2063,12 @@ def test_RP_configured_as_FHR_1_p1(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: Verify (S, G) ip mroutes") oif = "r3-r1-eth0" @@ -2245,10 +2274,12 @@ def test_RP_configured_as_FHR_2_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: Verify (S, G) ip mroutes") oif = "r3-r1-eth0" @@ -2372,10 +2403,12 @@ def test_SPT_RPT_path_different_p1(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: Verify (S, G) ip mroutes") oif = "r3-r1-eth0" @@ -2394,10 +2427,12 @@ def test_SPT_RPT_path_different_p1(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r2: Verify (S, G) ip mroutes") oif = "none" @@ -2623,10 +2658,12 @@ def test_restart_pimd_process_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: Verify (S, G) ip mroutes") oif = "r3-r1-eth0" @@ -2791,10 +2828,12 @@ def test_multiple_groups_same_RP_address_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: Verify (S, G) ip mroutes") oif = "r3-r1-eth0" @@ -2813,10 +2852,12 @@ def test_multiple_groups_same_RP_address_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r2: Verify (S, G) ip mroutes") oif = "none" @@ -2932,10 +2973,12 @@ def test_multiple_groups_same_RP_address_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r2: Verify (S, G) ip mroutes") oif = "none" @@ -2952,10 +2995,12 @@ def test_multiple_groups_same_RP_address_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: Verify (S, G) ip mroutes") oif = "r3-r1-eth0" @@ -3132,10 +3177,12 @@ def test_multiple_groups_different_RP_address_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r2: Verify (S, G) ip mroutes") oif = "none" @@ -3154,10 +3201,12 @@ def test_multiple_groups_different_RP_address_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: Verify (S, G) ip mroutes") oif = "r3-r1-eth0" @@ -3224,10 +3273,12 @@ def test_multiple_groups_different_RP_address_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r4: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r4: Verify (S, G) ip mroutes") oif = "none" @@ -3399,10 +3450,12 @@ def test_multiple_groups_different_RP_address_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r2: Verify (S, G) ip mroutes") oif = "none" @@ -3421,10 +3474,12 @@ def test_multiple_groups_different_RP_address_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: Verify (S, G) ip mroutes") oif = "r3-r1-eth0" @@ -3491,10 +3546,12 @@ def test_multiple_groups_different_RP_address_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r4: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r4: Verify (S, G) ip mroutes") oif = "none" @@ -3513,10 +3570,12 @@ def test_multiple_groups_different_RP_address_p2(request): result = verify_join_state_and_timer( tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: Verify (S, G) ip mroutes") oif = "r3-r1-eth0" @@ -3643,30 +3702,36 @@ def test_shutdown_primary_path_p1(request): iif = "r1-r3-eth2" oif = "r1-r0-eth0" result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: (*,G) mroutes are not cleared after shut of R1 to R3 link\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r2: Verify (*, G) ip mroutes") dut = "r2" iif = "lo" oif = "r2-r3-eth1" result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r2: (*,G) mroutes are not cleared after shut of R1 to R3 link\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: Verify (*, G) ip mroutes") dut = "r3" iif = "r3-r2-eth1" oif = "r3-r1-eth0" result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r3: (*,G) mroutes are not cleared after shut of R1 to R3 link\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r3: No shutdown the link from R1 to R3 from R3 node") dut = "r3" @@ -3826,20 +3891,24 @@ def test_delete_RP_shut_noshut_upstream_interface_p1(request): iif = "r1-r2-eth1" oif = "r1-r0-eth0" result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: (*,G) mroutes are not cleared after shut of R1 to R0 link\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r2: Verify (*, G) ip mroutes cleared") dut = "r2" iif = "lo" oif = "r2-r1-eth0" result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r2: (*,G) mroutes are not cleared after shut of R1 to R0 link\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) write_test_footer(tc_name) @@ -3949,20 +4018,24 @@ def test_delete_RP_shut_noshut_RP_interface_p1(request): iif = "r1-r2-eth1" oif = "r1-r0-eth0" result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: (*,G) mroutes are not cleared after shut of R1 to R2 and R3 link\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) step("r2: Verify (*, G) ip mroutes cleared") dut = "r2" iif = "lo" oif = "r2-r1-eth0" result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r2: (*,G) mroutes are not cleared after shut of R1 to R2 and R3 link\n Error: {}".format( - tc_name, result - )) + tc_name, result + ) + ) write_test_footer(tc_name) diff --git a/tests/topotests/ospf-sr-topo1/test_ospf_sr_topo1.py b/tests/topotests/ospf-sr-topo1/test_ospf_sr_topo1.py index 57b93c3fd..b6e5e1483 100644 --- a/tests/topotests/ospf-sr-topo1/test_ospf_sr_topo1.py +++ b/tests/topotests/ospf-sr-topo1/test_ospf_sr_topo1.py @@ -86,6 +86,7 @@ from mininet.topo import Topo pytestmark = [pytest.mark.ospfd] + class TemplateTopo(Topo): "Test topology builder" @@ -385,9 +386,7 @@ def test_rib_ipv4_step5(): pytest.skip(tgen.errors) logger.info("Disabling SR on rt6") - tgen.net["rt6"].cmd( - 'vtysh -c "conf t" -c "router ospf" -c "no segment-routing on"' - ) + tgen.net["rt6"].cmd('vtysh -c "conf t" -c "router ospf" -c "no segment-routing on"') for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: router_compare_json_output( @@ -652,7 +651,7 @@ def test_mpls_lib_step10(): # Expected changes: # -All commands should be rejected # -#def test_ospf_invalid_config_step11(): +# def test_ospf_invalid_config_step11(): # logger.info("Test (step 11): check if invalid configuration is rejected") # tgen = get_topogen() # diff --git a/tests/topotests/ospf-te-topo1/test_ospf_te_topo1.py b/tests/topotests/ospf-te-topo1/test_ospf_te_topo1.py index 1d69f5d69..32f9b3453 100644 --- a/tests/topotests/ospf-te-topo1/test_ospf_te_topo1.py +++ b/tests/topotests/ospf-te-topo1/test_ospf_te_topo1.py @@ -79,6 +79,7 @@ import pytest pytestmark = [pytest.mark.ospfd] + class OspfTeTopo(Topo): "Test topology builder" @@ -175,6 +176,7 @@ def setup_testcase(msg): # Note that all routers must discover the same Network Topology, so the same TED. + def test_step1(): "Step1: Check initial topology" @@ -190,12 +192,8 @@ def test_step2(): tgen = setup_testcase("Step2: Shutdown interface between r1 & r2") - tgen.net["r1"].cmd( - 'vtysh -c "conf t" -c "interface r1-eth1" -c "shutdown"' - ) - tgen.net["r2"].cmd( - 'vtysh -c "conf t" -c "interface r2-eth1" -c "shutdown"' - ) + tgen.net["r1"].cmd('vtysh -c "conf t" -c "interface r1-eth1" -c "shutdown"') + tgen.net["r2"].cmd('vtysh -c "conf t" -c "interface r2-eth1" -c "shutdown"') for rname in ["r1", "r2", "r3", "r4"]: compare_ted_json_output(tgen, rname, "ted_step2.json") @@ -207,9 +205,7 @@ def test_step3(): tgen = setup_testcase("Step3: Disable Inter-AS on r3") - tgen.net["r3"].cmd( - 'vtysh -c "conf t" -c "router ospf" -c "no mpls-te inter-as"' - ) + tgen.net["r3"].cmd('vtysh -c "conf t" -c "router ospf" -c "no mpls-te inter-as"') for rname in ["r1", "r2", "r3", "r4"]: compare_ted_json_output(tgen, rname, "ted_step3.json") @@ -221,18 +217,14 @@ def test_step4(): tgen = setup_testcase("Step4: Enable Segment Routing on r1 & r2") - tgen.net["r1"].cmd( - 'vtysh -c "conf t" -c "router ospf" -c "segment-routing on"' - ) + tgen.net["r1"].cmd('vtysh -c "conf t" -c "router ospf" -c "segment-routing on"') tgen.net["r1"].cmd( 'vtysh -c "conf t" -c "router ospf" -c "segment-routing global-block 20000 23999"' ) tgen.net["r1"].cmd( 'vtysh -c "conf t" -c "router ospf" -c "segment-routing prefix 10.0.255.1/32 index 10"' ) - tgen.net["r2"].cmd( - 'vtysh -c "conf t" -c "router ospf" -c "segment-routing on"' - ) + tgen.net["r2"].cmd('vtysh -c "conf t" -c "router ospf" -c "segment-routing on"') tgen.net["r2"].cmd( 'vtysh -c "conf t" -c "router ospf" -c "segment-routing node-msd 16"' ) @@ -253,12 +245,8 @@ def test_step5(): tgen = setup_testcase("Step5: Re-enable interface between r1 & r2") - tgen.net["r1"].cmd( - 'vtysh -c "conf t" -c "interface r1-eth1" -c "no shutdown"' - ) - tgen.net["r2"].cmd( - 'vtysh -c "conf t" -c "interface r2-eth1" -c "no shutdown"' - ) + tgen.net["r1"].cmd('vtysh -c "conf t" -c "interface r1-eth1" -c "no shutdown"') + tgen.net["r2"].cmd('vtysh -c "conf t" -c "interface r2-eth1" -c "no shutdown"') for rname in ["r1", "r2", "r3", "r4"]: compare_ted_json_output(tgen, rname, "ted_step5.json") @@ -291,9 +279,7 @@ def test_step7(): tgen = setup_testcase("Step7: Disable OSPF on r4") - tgen.net["r4"].cmd( - 'vtysh -c "conf t" -c "no router ospf"' - ) + tgen.net["r4"].cmd('vtysh -c "conf t" -c "no router ospf"') for rname in ["r1", "r2", "r3"]: compare_ted_json_output(tgen, rname, "ted_step7.json") diff --git a/tests/topotests/ospf-topo1/test_ospf_topo1.py b/tests/topotests/ospf-topo1/test_ospf_topo1.py index 9117247eb..42634ce90 100644 --- a/tests/topotests/ospf-topo1/test_ospf_topo1.py +++ b/tests/topotests/ospf-topo1/test_ospf_topo1.py @@ -135,25 +135,35 @@ def test_wait_protocol_convergence(): * Full/DROther * Full/Backup """ - result = tgen.gears[router].vtysh_cmd('show ip ospf neighbor json', - isjson=True) - if topotest.json_cmp(result, {"neighbors": {neighbor: [ - {"state": "Full/DR"}]}}) is None: + result = tgen.gears[router].vtysh_cmd( + "show ip ospf neighbor json", isjson=True + ) + if ( + topotest.json_cmp( + result, {"neighbors": {neighbor: [{"state": "Full/DR"}]}} + ) + is None + ): return None - if topotest.json_cmp(result, {"neighbors": {neighbor: [ - {"state": "Full/DROther"}]}}) is None: + if ( + topotest.json_cmp( + result, {"neighbors": {neighbor: [{"state": "Full/DROther"}]}} + ) + is None + ): return None - return topotest.json_cmp(result, {"neighbors": {neighbor: [ - {"state": "Full/Backup"}]}}) + return topotest.json_cmp( + result, {"neighbors": {neighbor: [{"state": "Full/Backup"}]}} + ) - _, result = topotest.run_and_expect(run_command_and_expect, None, - count=130, wait=1) + _, result = topotest.run_and_expect( + run_command_and_expect, None, count=130, wait=1 + ) assertmsg = '"{}" convergence failure'.format(router) assert result is None, assertmsg - def expect_ospfv3_neighbor_full(router, neighbor): "Wait until OSPFv3 convergence." logger.info("waiting OSPFv3 router '{}'".format(router)) diff --git a/tests/topotests/ospf6-topo2/test_ospf6_topo2.py b/tests/topotests/ospf6-topo2/test_ospf6_topo2.py index 32aff0598..efc8565bb 100644 --- a/tests/topotests/ospf6-topo2/test_ospf6_topo2.py +++ b/tests/topotests/ospf6-topo2/test_ospf6_topo2.py @@ -141,12 +141,19 @@ def test_ospf6_default_route(): topotest.router_json_cmp, tgen.gears[router], "show ipv6 ospf6 database inter-prefix detail json", - {"areaScopedLinkStateDb": [{ - "areaId": area, - "lsa": [{ - "prefix": prefix, - "metric": metric, - }]}]}, + { + "areaScopedLinkStateDb": [ + { + "areaId": area, + "lsa": [ + { + "prefix": prefix, + "metric": metric, + } + ], + } + ] + }, ) _, result = topotest.run_and_expect(test_func, None, count=4, wait=1) assertmsg = '"{}" convergence failure'.format(router) diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_chaos.py b/tests/topotests/ospf_basic_functionality/test_ospf_chaos.py index cebe55b39..c117fc6a7 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_chaos.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_chaos.py @@ -249,17 +249,20 @@ def test_ospf_chaos_tc31_p1(request): dut = "r1" protocol = "ospf" result = verify_ospf_rib(tgen, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) - result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, - expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present \n Error: {}".format( + result = verify_rib( + tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False + ) + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format( tc_name, result - )) + ) step("Bring up OSPFd daemon on R0.") start_router_daemons(tgen, "r0", ["ospfd"]) @@ -482,17 +485,20 @@ def test_ospf_chaos_tc34_p1(request): dut = "r1" protocol = "ospf" result = verify_ospf_rib(tgen, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) - result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, - expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present \n Error: {}".format( + result = verify_rib( + tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False + ) + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format( tc_name, result - )) + ) step("Bring up staticd daemon on R0.") start_router_daemons(tgen, "r0", ["staticd"]) diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py b/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py index adf82a5e8..1aabc06db 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py @@ -259,19 +259,21 @@ def test_ospf_ecmp_tc16_p0(request): shutdown_bringup_interface(tgen, dut, intf, False) result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) protocol = "ospf" result = verify_rib( tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format( tc_name, result - )) + ) for intfr in range(1, 7): intf = topo["routers"]["r1"]["links"]["r0-link{}".format(intfr)]["interface"] @@ -326,10 +328,11 @@ def test_ospf_ecmp_tc16_p0(request): result = verify_ospf_rib( tgen, dut, input_dict, next_hop=nh, attempts=5, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) protocol = "ospf" result = verify_rib( @@ -342,10 +345,11 @@ def test_ospf_ecmp_tc16_p0(request): attempts=5, expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format( tc_name, result - )) + ) step("Re configure the static route in R0.") dut = "r0" @@ -432,10 +436,11 @@ def test_ospf_ecmp_tc17_p0(request): result = verify_ospf_rib( tgen, dut, input_dict, next_hop=nh, attempts=5, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) protocol = "ospf" result = verify_rib( @@ -448,10 +453,11 @@ def test_ospf_ecmp_tc17_p0(request): attempts=5, expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format( tc_name, result - )) + ) step("Reconfigure the static route in R0.Change ECMP value to 2.") dut = "r0" diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_ecmp_lan.py b/tests/topotests/ospf_basic_functionality/test_ospf_ecmp_lan.py index c5230d661..e6dc18a43 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_ecmp_lan.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_ecmp_lan.py @@ -307,10 +307,11 @@ def test_ospf_lan_ecmp_tc18_p0(request): result = verify_ospf_rib( tgen, dut, input_dict, next_hop=nh, attempts=5, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) protocol = "ospf" result = verify_rib( @@ -323,10 +324,11 @@ def test_ospf_lan_ecmp_tc18_p0(request): attempts=5, expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format( tc_name, result - )) + ) write_test_footer(tc_name) diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_lan.py b/tests/topotests/ospf_basic_functionality/test_ospf_lan.py index 2fbb27f4f..d9b90a132 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_lan.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_lan.py @@ -397,10 +397,11 @@ def test_ospf_lan_tc1_p0(request): shutdown_bringup_interface(tgen, dut, intf, False) result = verify_ospf_neighbor(tgen, topo, dut, lan=True, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r0: OSPF neighbors-hip is up \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r0: OSPF neighbors-hip is up \n Error: {}".format( tc_name, result - )) + ) step("No Shut interface on R0") dut = "r0" diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py b/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py index b99ce6cfb..7864d0307 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py @@ -332,18 +332,20 @@ def test_ospf_routemaps_functionality_tc19_p0(request): } } result = verify_ospf_rib(tgen, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) result = verify_rib( tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are present in fib \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: routes are present in fib \n Error: {}".format( tc_name, result - )) + ) step("Delete and reconfigure prefix list.") # Create ip prefix list @@ -383,18 +385,20 @@ def test_ospf_routemaps_functionality_tc19_p0(request): } } result = verify_ospf_rib(tgen, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) result = verify_rib( tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) pfx_list = { "r0": { @@ -438,18 +442,20 @@ def test_ospf_routemaps_functionality_tc19_p0(request): } } result = verify_ospf_rib(tgen, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) result = verify_rib( tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format( tc_name, result - )) + ) write_test_footer(tc_name) @@ -496,18 +502,20 @@ def test_ospf_routemaps_functionality_tc20_p0(request): dut = "r1" protocol = "ospf" result = verify_ospf_rib(tgen, dut, input_dict, attempts=2, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) result = verify_rib( tgen, "ipv4", dut, input_dict, protocol=protocol, attempts=2, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format( tc_name, result - )) + ) step( "configure the route map with the same name that is used " @@ -523,18 +531,20 @@ def test_ospf_routemaps_functionality_tc20_p0(request): dut = "r1" protocol = "ospf" result = verify_ospf_rib(tgen, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) result = verify_rib( tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format( tc_name, result - )) + ) # Create route map routemaps = {"r0": {"route_maps": {"rmap_ipv4": [{"action": "deny"}]}}} @@ -545,18 +555,20 @@ def test_ospf_routemaps_functionality_tc20_p0(request): dut = "r1" protocol = "ospf" result = verify_ospf_rib(tgen, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) result = verify_rib( tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format( tc_name, result - )) + ) step("Delete the route map.") # Create route map @@ -573,18 +585,20 @@ def test_ospf_routemaps_functionality_tc20_p0(request): dut = "r1" protocol = "ospf" result = verify_ospf_rib(tgen, dut, input_dict, expected=False) - assert result is not True, ("Testcase {} : Failed \n " - "r1: OSPF routes are present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format( tc_name, result - )) + ) result = verify_rib( tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False ) - assert result is not True, ("Testcase {} : Failed \n " - "r1: routes are still present \n Error: {}".format( + assert ( + result is not True + ), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format( tc_name, result - )) + ) write_test_footer(tc_name) diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py b/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py index fb6b28ce5..1432a82b1 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py @@ -247,11 +247,11 @@ def test_ospf_redistribution_tc5_p0(request): if result is not True: break - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: OSPF routes are present after deleting ip address of newly " - "configured interface of R0 \n Error: {}".format( - tc_name, result - )) + "configured interface of R0 \n Error: {}".format(tc_name, result) + ) protocol = "ospf" result = verify_rib( @@ -264,11 +264,11 @@ def test_ospf_redistribution_tc5_p0(request): attempts=5, expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: OSPF routes are present in fib after deleting ip address of newly " - "configured interface of R0 \n Error: {}".format( - tc_name, result - )) + "configured interface of R0 \n Error: {}".format(tc_name, result) + ) step("Add back the deleted ip address on newly configured interface of R0") topo1 = { @@ -370,11 +370,11 @@ def test_ospf_redistribution_tc6_p0(request): result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh, expected=False) if result is not True: break - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: OSPF routes are present after deleting ip address of newly " - "configured loopback of R0 \n Error: {}".format( - tc_name, result - )) + "configured loopback of R0 \n Error: {}".format(tc_name, result) + ) protocol = "ospf" result = verify_rib( @@ -386,11 +386,11 @@ def test_ospf_redistribution_tc6_p0(request): next_hop=nh, expected=False, ) - assert result is not True, ("Testcase {} : Failed \n " + assert result is not True, ( + "Testcase {} : Failed \n " "r1: OSPF routes are present in fib after deleting ip address of newly " - "configured loopback of R0 \n Error: {}".format( - tc_name, result - )) + "configured loopback of R0 \n Error: {}".format(tc_name, result) + ) step("Add back the deleted ip address on newly configured interface of R0") topo1 = { diff --git a/tests/topotests/ospf_suppress_fa/test_ospf_suppress_fa.py b/tests/topotests/ospf_suppress_fa/test_ospf_suppress_fa.py index 74d609c57..76e50beb5 100644 --- a/tests/topotests/ospf_suppress_fa/test_ospf_suppress_fa.py +++ b/tests/topotests/ospf_suppress_fa/test_ospf_suppress_fa.py @@ -94,12 +94,14 @@ def setup_module(mod): tgen.start_router() + def teardown_module(_mod): "Teardown the pytest environment" tgen = get_topogen() tgen.stop_topology() + def test_converge_protocols(): "Wait for protocol convergence" @@ -110,19 +112,26 @@ def test_converge_protocols(): topotest.sleep(10, "Waiting for OSPF convergence") + def ospf_configure_suppress_fa(router_name, area): "Configure OSPF suppress-fa in router_name" tgen = get_topogen() router = tgen.gears[router_name] - router.vtysh_cmd("conf t\nrouter ospf\narea {} nssa suppress-fa\nexit\n".format(area)) + router.vtysh_cmd( + "conf t\nrouter ospf\narea {} nssa suppress-fa\nexit\n".format(area) + ) + def ospf_unconfigure_suppress_fa(router_name, area): "Remove OSPF suppress-fa in router_name" tgen = get_topogen() router = tgen.gears[router_name] - router.vtysh_cmd("conf t\nrouter ospf\nno area {} nssa suppress-fa\nexit\n".format(area)) + router.vtysh_cmd( + "conf t\nrouter ospf\nno area {} nssa suppress-fa\nexit\n".format(area) + ) + def ospf_get_lsa_type5(router_name): "Return a dict with link state id as key and forwarding addresses as value" @@ -141,7 +150,8 @@ def ospf_get_lsa_type5(router_name): result[lsa] = re1.group(1) return result -@pytest.fixture(scope='module', name='original') + +@pytest.fixture(scope="module", name="original") def test_ospf_set_suppress_fa(): "Test OSPF area [x] nssa suppress-fa" @@ -162,6 +172,7 @@ def test_ospf_set_suppress_fa(): # in the test_ospf_unset_supress_fa return initial + def test_ospf_unset_supress_fa(original): "Test OSPF no area [x] nssa suppress-fa" diff --git a/tests/topotests/static_routing_with_ebgp/test_static_routes_topo4_ebgp.py b/tests/topotests/static_routing_with_ebgp/test_static_routes_topo4_ebgp.py index dc4e29ebd..5d4950a70 100644 --- a/tests/topotests/static_routing_with_ebgp/test_static_routes_topo4_ebgp.py +++ b/tests/topotests/static_routing_with_ebgp/test_static_routes_topo4_ebgp.py @@ -949,10 +949,11 @@ def static_routes_rmap_pfxlist_p0_tc7_ebgp(request): result4 = verify_rib( tgen, addr_type, dut, input_dict, protocol=protocol, expected=False ) - assert result4 is not True, ("Testcase {} : Failed \n" - "routes are still present \n Error: {}".format( + assert ( + result4 is not True + ), "Testcase {} : Failed \n" "routes are still present \n Error: {}".format( tc_name, result4 - )) + ) step("vm4 should be present in FRR1") dut = "r1" diff --git a/tests/topotests/static_routing_with_ibgp/test_static_routes_topo4_ibgp.py b/tests/topotests/static_routing_with_ibgp/test_static_routes_topo4_ibgp.py index 14db72919..09c437c3c 100644 --- a/tests/topotests/static_routing_with_ibgp/test_static_routes_topo4_ibgp.py +++ b/tests/topotests/static_routing_with_ibgp/test_static_routes_topo4_ibgp.py @@ -947,10 +947,11 @@ def test_static_routes_rmap_pfxlist_p0_tc7_ibgp(request): result4 = verify_rib( tgen, addr_type, dut, input_dict, protocol=protocol, expected=False ) - assert result4 is not True, ("Testcase {} : Failed \n" - "routes are still present \n Error: {}".format( + assert ( + result4 is not True + ), "Testcase {} : Failed \n" "routes are still present \n Error: {}".format( tc_name, result4 - )) + ) step("vm4 should be present in FRR1") dut = "r1" |