diff options
Diffstat (limited to 'tests')
16 files changed, 6530 insertions, 11 deletions
diff --git a/tests/topotests/bgp_default_originate/bgp_default_originate_topo1.json b/tests/topotests/bgp_default_originate/bgp_default_originate_topo1.json new file mode 100644 index 000000000..5fae34dd7 --- /dev/null +++ b/tests/topotests/bgp_default_originate/bgp_default_originate_topo1.json @@ -0,0 +1,294 @@ +{ + "address_types": [ + "ipv4", + "ipv6" + ], + "ipv4base": "192.168.0.0", + "ipv4mask": 3024, + "ipv6base": "fd00::", + "ipv6mask": 64, + "link_ip_start": { + "ipv4": "192.168.0.0", + "v4mask": 24, + "ipv6": "fd00::", + "v6mask": 64 + }, + "lo_prefix": { + "ipv4": "1.0.", + "v4mask": 32, + "ipv6": "2001:db8:f::", + "v6mask": 128 + }, + "routers": { + "r0": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r1": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r0": { + "keepalivetimer": 1, + "holddowntimer": 3 + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r0": { + "keepalivetimer": 1, + "holddowntimer": 3 + } + } + } + } + } + } + } + } + }, + "r1": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r0": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r2": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "200", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r0": { + "dest_link": { + "r1": { + "keepalivetimer": 1, + "holddowntimer": 3 + } + } + }, + "r2": { + "dest_link": { + "r1": { + "keepalivetimer": 1, + "holddowntimer": 3 + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r0": { + "dest_link": { + "r1": { + "keepalivetimer": 1, + "holddowntimer": 3 + } + } + }, + "r2": { + "dest_link": { + "r1": { + "keepalivetimer": 1, + "holddowntimer": 3 + } + } + } + } + } + } + } + } + }, + "r2": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r1": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "300", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {"keepalivetimer": 1, + "holddowntimer": 3} + } + }, + "r3": { + "dest_link": { + "r2": {"keepalivetimer": 1, + "holddowntimer": 3} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {"keepalivetimer": 1, + "holddowntimer": 3} + } + }, + "r3": { + "dest_link": { + "r2": {"keepalivetimer": 1, + "holddowntimer": 3} + } + } + } + } + } + } + } + }, + "r3": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r2": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r4": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "400", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": {"keepalivetimer": 1, + "holddowntimer": 3} + } + }, + "r4": { + "dest_link": { + "r3": {"keepalivetimer": 1, + "holddowntimer": 3} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": {"keepalivetimer": 1, + "holddowntimer": 3} + } + }, + "r4": { + "dest_link": { + "r3": {"keepalivetimer": 1, + "holddowntimer": 3} + } + } + } + } + } + } + } + }, + "r4": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "500", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r4": {"keepalivetimer": 1, + "holddowntimer": 3} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r4": {"keepalivetimer": 1, + "holddowntimer": 3} + } + } + } + } + } + } + } + } + } +}
\ No newline at end of file diff --git a/tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_1.py b/tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_1.py new file mode 100644 index 000000000..ee71ae16e --- /dev/null +++ b/tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_1.py @@ -0,0 +1,2537 @@ +#!/usr/bin/env python +# +# Copyright (c) 2022 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF") +# in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# Shreenidhi A R <rshreenidhi@vmware.com> +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# +""" +Following tests are covered. +1. Verify BGP default-originate route with IBGP peer +2. Verify BGP default-originate route with EBGP peer +3. Verify BGP default route when default-originate configured with route-map over IBGP peer +4. Verify BGP default route when default-originate configured with route-map over EBGP peer" + +""" +import os +import sys +import time +import pytest +from time import sleep +from copy import deepcopy +from lib.topolog import logger + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from lib.topogen import Topogen, get_topogen +from lib.topojson import build_config_from_json +from lib.topolog import logger + +from lib.bgp import ( + verify_bgp_convergence, + verify_graceful_restart, + create_router_bgp, + verify_router_id, + modify_as_number, + verify_as_numbers, + clear_bgp_and_verify, + clear_bgp, + verify_bgp_rib, + get_prefix_count_route, + get_dut_as_number, + verify_rib_default_route, + verify_fib_default_route, + verify_bgp_advertised_routes_from_neighbor, + verify_bgp_received_routes_from_neighbor, +) +from lib.common_config import ( + interface_status, + verify_prefix_lists, + verify_fib_routes, + kill_router_daemons, + start_router_daemons, + shutdown_bringup_interface, + step, + required_linux_kernel_version, + stop_router, + start_router, + create_route_maps, + create_prefix_lists, + get_frr_ipv6_linklocal, + start_topology, + write_test_header, + check_address_types, + write_test_footer, + reset_config_on_routers, + create_static_routes, + check_router_status, + delete_route_maps, +) + + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) +sys.path.append(os.path.join(CWD, "../lib/")) + +# Required to instantiate the topology builder class. + +# pylint: disable=C0413 +# Import topogen and topotest helpers + +# Global variables +topo = None +KEEPALIVETIMER = 1 +HOLDDOWNTIMER = 3 +# Global variables +NETWORK1_1 = {"ipv4": "1.1.1.1/32", "ipv6": "1::1/128"} +NETWORK1_2 = {"ipv4": "1.1.1.2/32", "ipv6": "1::2/128"} +NETWORK2_1 = {"ipv4": "2.1.1.1/32", "ipv6": "2::1/128"} +NETWORK2_2 = {"ipv4": "2.1.1.2/32", "ipv6": "2::2/128"} +NETWORK3_1 = {"ipv4": "3.1.1.1/32", "ipv6": "3::1/128"} +NETWORK3_2 = {"ipv4": "3.1.1.2/32", "ipv6": "3::2/128"} +NETWORK4_1 = {"ipv4": "4.1.1.1/32", "ipv6": "4::1/128"} +NETWORK4_2 = {"ipv4": "4.1.1.2/32", "ipv6": "4::2/128"} +NETWORK5_1 = {"ipv4": "5.1.1.1/32", "ipv6": "5::1/128"} +NETWORK5_2 = {"ipv4": "5.1.1.2/32", "ipv6": "5::2/128"} +DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} +NEXT_HOP_IP = {"ipv4": "Null0", "ipv6": "Null0"} + +IPV4_RM = "RMVIPV4" +IPV6_RM = "RMVIPV6" + +IPV4_RM1 = "RMVIPV41" +IPV6_RM1 = "RMVIPV61" + +IPV4_RM2 = "RMVIPV42" +IPV6_RM2 = "RMVIPV62" + +IPV4_PL_1 = "PV41" +IPV4_PL_2 = "PV42" + +IPV6_PL_1 = "PV61" +IPV6_PL_2 = "PV62" + + +r1_ipv4_loopback = "1.0.1.0/24" +r2_ipv4_loopback = "1.0.2.0/24" +r3_ipv4_loopback = "1.0.3.0/24" +r4_ipv4_loopback = "1.0.4.0/24" +r1_ipv6_loopback = "2001:db8:f::1:0/120" +r2_ipv6_loopback = "2001:db8:f::2:0/120" +r3_ipv6_loopback = "2001:db8:f::3:0/120" +r4_ipv6_loopback = "2001:db8:f::4:0/120" + +r0_connected_address_ipv4 = "192.168.0.0/24" +r0_connected_address_ipv6 = "fd00::/64" +r1_connected_address_ipv4 = "192.168.1.0/24" +r1_connected_address_ipv6 = "fd00:0:0:1::/64" +r3_connected_address_ipv4 = "192.168.2.0/24" +r3_connected_address_ipv6 = "fd00:0:0:2::/64" +r4_connected_address_ipv4 = "192.168.3.0/24" +r4_connected_address_ipv6 = "fd00:0:0:3::/64" + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + # Required linux kernel version for this suite to run. + result = required_linux_kernel_version("4.15") + if result is not True: + pytest.skip("Kernel requirements are not met") + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("=" * 40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + json_file = "{}/bgp_default_originate_topo1.json".format(CWD) + tgen = Topogen(json_file, mod.__name__) + global topo + topo = tgen.json_topo + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start daemons and then start routers + start_topology(tgen) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + global ADDR_TYPES + global BGP_CONVERGENCE + global DEFAULT_ROUTES + global DEFAULT_ROUTE_NXT_HOP_R1, DEFAULT_ROUTE_NXT_HOP_R3 + global R0_NETWORK_LOOPBACK, R0_NETWORK_LOOPBACK_NXTHOP, R1_NETWORK_LOOPBACK, R1_NETWORK_LOOPBACK_NXTHOP + global R0_NETWORK_CONNECTED, R0_NETWORK_CONNECTED_NXTHOP, R1_NETWORK_CONNECTED, R1_NETWORK_CONNECTED_NXTHOP + global R4_NETWORK_LOOPBACK, R4_NETWORK_LOOPBACK_NXTHOP, R3_NETWORK_LOOPBACK, R3_NETWORK_LOOPBACK_NXTHOP + global R4_NETWORK_CONNECTED, R4_NETWORK_CONNECTED_NXTHOP, R3_NETWORK_CONNECTED, R3_NETWORK_CONNECTED_NXTHOP + + ADDR_TYPES = check_address_types() + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format( + BGP_CONVERGENCE + ) + # There are the global varibles used through out the file these are acheived only after building the topology. + + r0_loopback_address_ipv4 = topo["routers"]["r0"]["links"]["lo"]["ipv4"] + r0_loopback_address_ipv4_nxt_hop = topo["routers"]["r0"]["links"]["r1"][ + "ipv4" + ].split("/")[0] + r0_loopback_address_ipv6 = topo["routers"]["r0"]["links"]["lo"]["ipv6"] + r0_loopback_address_ipv6_nxt_hop = topo["routers"]["r0"]["links"]["r1"][ + "ipv6" + ].split("/")[0] + + r1_loopback_address_ipv4 = topo["routers"]["r1"]["links"]["lo"]["ipv4"] + r1_loopback_address_ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"][ + "ipv4" + ].split("/")[0] + r1_loopback_address_ipv6 = topo["routers"]["r1"]["links"]["lo"]["ipv6"] + r1_loopback_address_ipv6_nxt_hop = topo["routers"]["r1"]["links"]["r2"][ + "ipv6" + ].split("/")[0] + + r4_loopback_address_ipv4 = topo["routers"]["r4"]["links"]["lo"]["ipv4"] + r4_loopback_address_ipv4_nxt_hop = topo["routers"]["r4"]["links"]["r3"][ + "ipv4" + ].split("/")[0] + r4_loopback_address_ipv6 = topo["routers"]["r4"]["links"]["lo"]["ipv6"] + r4_loopback_address_ipv6_nxt_hop = topo["routers"]["r4"]["links"]["r3"][ + "ipv6" + ].split("/")[0] + + r3_loopback_address_ipv4 = topo["routers"]["r3"]["links"]["lo"]["ipv4"] + r3_loopback_address_ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"][ + "ipv4" + ].split("/")[0] + r3_loopback_address_ipv6 = topo["routers"]["r3"]["links"]["lo"]["ipv6"] + r3_loopback_address_ipv6_nxt_hop = topo["routers"]["r3"]["links"]["r2"][ + "ipv6" + ].split("/")[0] + + R0_NETWORK_LOOPBACK = { + "ipv4": r0_loopback_address_ipv4, + "ipv6": r0_loopback_address_ipv6, + } + R0_NETWORK_LOOPBACK_NXTHOP = { + "ipv4": r0_loopback_address_ipv4_nxt_hop, + "ipv6": r0_loopback_address_ipv6_nxt_hop, + } + + R1_NETWORK_LOOPBACK = { + "ipv4": r1_loopback_address_ipv4, + "ipv6": r1_loopback_address_ipv6, + } + R1_NETWORK_LOOPBACK_NXTHOP = { + "ipv4": r1_loopback_address_ipv4_nxt_hop, + "ipv6": r1_loopback_address_ipv6_nxt_hop, + } + + R0_NETWORK_CONNECTED = { + "ipv4": r0_connected_address_ipv4, + "ipv6": r0_connected_address_ipv6, + } + R0_NETWORK_CONNECTED_NXTHOP = { + "ipv4": r0_loopback_address_ipv4_nxt_hop, + "ipv6": r0_loopback_address_ipv6_nxt_hop, + } + + R1_NETWORK_CONNECTED = { + "ipv4": r1_connected_address_ipv4, + "ipv6": r1_connected_address_ipv6, + } + R1_NETWORK_CONNECTED_NXTHOP = { + "ipv4": r1_loopback_address_ipv4_nxt_hop, + "ipv6": r1_loopback_address_ipv6_nxt_hop, + } + + R4_NETWORK_LOOPBACK = { + "ipv4": r4_loopback_address_ipv4, + "ipv6": r4_loopback_address_ipv6, + } + R4_NETWORK_LOOPBACK_NXTHOP = { + "ipv4": r4_loopback_address_ipv4_nxt_hop, + "ipv6": r4_loopback_address_ipv6_nxt_hop, + } + + R3_NETWORK_LOOPBACK = { + "ipv4": r3_loopback_address_ipv4, + "ipv6": r3_loopback_address_ipv6, + } + R3_NETWORK_LOOPBACK_NXTHOP = { + "ipv4": r3_loopback_address_ipv4_nxt_hop, + "ipv6": r3_loopback_address_ipv6_nxt_hop, + } + + R4_NETWORK_CONNECTED = { + "ipv4": r4_connected_address_ipv4, + "ipv6": r4_connected_address_ipv6, + } + R4_NETWORK_CONNECTED_NXTHOP = { + "ipv4": r4_loopback_address_ipv4_nxt_hop, + "ipv6": r4_loopback_address_ipv6_nxt_hop, + } + + R3_NETWORK_CONNECTED = { + "ipv4": r3_connected_address_ipv4, + "ipv6": r3_connected_address_ipv6, + } + R3_NETWORK_CONNECTED_NXTHOP = { + "ipv4": r3_loopback_address_ipv4_nxt_hop, + "ipv6": r3_loopback_address_ipv6_nxt_hop, + } + + # populating the nexthop for default routes + + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + + interface = topo["routers"]["r1"]["links"]["r2"]["interface"] + ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r1", intf=interface) + ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0] + ipv6_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv6"].split("/")[0] + DEFAULT_ROUTE_NXT_HOP_R1 = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local} + + interface = topo["routers"]["r3"]["links"]["r2"]["interface"] + ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r3", intf=interface) + ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0] + ipv6_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv6"].split("/")[0] + DEFAULT_ROUTE_NXT_HOP_R3 = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local} + + logger.info("Running setup_module() done") + + +def teardown_module(): + """Teardown the pytest environment""" + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + logger.info( + "Testsuite end time: {}".format(time.asctime(time.localtime(time.time()))) + ) + logger.info("=" * 40) + +##################################################### +# +# Testcases +# +##################################################### + +def test_verify_bgp_default_originate_in_IBGP_p0(request): + """ + Verify BGP default-originate route with IBGP peer + """ + tgen = get_topogen() + global BGP_CONVERGENCE + global topo + # test case name + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + check_router_status(tgen) + reset_config_on_routers(tgen) + + if BGP_CONVERGENCE != True: + pytest.skip("skipped because of BGP Convergence failure") + + step("Configure IPv4 and IPv6 , IBGP neighbor between R1 and R2") + step("Configure IPv4 and IPv6 Loopback interface on R1, R0 and R2") + step("Configure IPv4 and IPv6 EBGP neighbor between R0 and R1") + + r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"] + r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"] + r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"] + r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"] + r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"] + + input_dict = { + "r0": { + "bgp": { + "local_as": 1000, + } + }, + "r1": { + "bgp": { + "local_as": 2000, + } + }, + "r2": { + "bgp": { + "local_as": 2000, + } + }, + "r3": { + "bgp": { + "local_as": r3_local_as, + } + }, + "r4": { + "bgp": { + "local_as": r4_local_as, + } + }, + } + result = modify_as_number(tgen, topo, input_dict) + try: + assert result is True + except AssertionError: + logger.info("Expected behaviour: {}".format(result)) + logger.info("BGP config is not created because of invalid ASNs") + + step("After changing the BGP AS Path Verify the BGP Convergence") + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, " Complete Convergence is expected after changing the ASN but failed to converge --> :Failed \n Error: {}".format( + BGP_CONVERGENCE + ) + + step("Configure IPv4 and IPv6 static route on R1 next-hop as NULL0") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r1": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + } + ] + } + } + result = create_static_routes(tgen, static_routes_input) + assert result is True, "Testcase {} : Failed to configure the static routes {} on router R1 \n Error: {}".format( + tc_name,static_routes_input, result + ) + step("verify IPv4 and IPv6 static route are configured and up on R1") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r1": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + } + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r1", static_routes_input) + assert result is True, "Testcase {} : Failed \n After configuring the static routes {} , the routes are not found in FIB \n Error: {}".format( + tc_name,static_routes_input, result + ) + + step( + "Configure redistribute static and connected on R0 and R1, for IPv4 and IPv6 address family " + ) + redistribute_static = { + "r0": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + }, + } + } + }, + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + }, + } + } + }, + } + result = create_router_bgp(tgen, topo, redistribute_static) + assert result is True, "Testcase {} : Failed to configure the redistribute static configuration \n Error: {}".format(tc_name, result) + + step( + "After configuring redistribute command , verify static and connected routes ( loopback connected routes) are advertised on R2" + ) + + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [R0_NETWORK_LOOPBACK[addr_type]], + "next_hop": R0_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R0_NETWORK_CONNECTED[addr_type]], + "next_hop": R0_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_LOOPBACK[addr_type]], + "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_CONNECTED[addr_type]], + "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : After redistributing static routes the routes {} expected in FIB but NOT FOUND ......! \n Error: {}".format( + tc_name, static_routes_input,result + ) + + result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : After redistributing static routes the routes {} expected in RIB but NOT FOUND ......! \n Error: {}".format( + tc_name, static_routes_input , result + ) + + step( + "Taking the snapshot of the prefix count before configuring the default originate" + ) + snapshot1 = get_prefix_count_route(tgen, topo, dut="r2", peer="r1") + + step( + "Configure Default originate on R1 for R1 to R2, for IPv4 and IPv6 BGP address family " + ) + local_as = get_dut_as_number(tgen, dut="r1") + default_originate_config = { + "r1": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": {"unicast": {"default_originate": {"r2": {}}}}, + "ipv6": {"unicast": {"default_originate": {"r2": {}}}}, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed Configuring default originate configuration. \n Error: {}".format(tc_name, result) + + step( + "After configuring default-originate command , verify default routes are advertised on R2 " + " R1 static and loopback routes received on R2 BGP and FIB" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [R1_NETWORK_LOOPBACK[addr_type]], + "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_CONNECTED[addr_type]], + "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + ] + } + } + + result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : post configuring the BGP Default originate configuration static and connected routes should not be effected but impacted on FIB .......! FAILED \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failedpost configuring the BGP Default originate configuration static and connected routes should not be effected but impacted on RIB......! FAILED \n Error: {}".format( + tc_name, result + ) + step( + "Verify default route for IPv4 and IPv6 present with path=igp metric =0 , local-preference= 100 " + ) + result = verify_rib_default_route( + tgen, + topo, + dut="r2", + routes=DEFAULT_ROUTES, + expected_nexthop=DEFAULT_ROUTE_NXT_HOP_R1, + metric=0, + locPrf=100, + ) + assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result) + step( + "Taking the snapshot2 of the prefix count after configuring the default originate" + ) + snapshot2 = get_prefix_count_route(tgen, topo, dut="r2", peer="r1") + + step("verifying the prefix count incrementing or not ") + isIPv4prefix_incremented = False + isIPv6prefix_incremented = False + if snapshot1["ipv4_count"] < snapshot2["ipv4_count"]: + isIPv4prefix_incremented = True + if snapshot1["ipv6_count"] < snapshot2["ipv6_count"]: + isIPv6prefix_incremented = True + + assert ( + isIPv4prefix_incremented is True + ), "Testcase {} : Failed Error: IPV4 Prefix is not incremented on receiveing ".format( + tc_name + ) + + assert ( + isIPv6prefix_incremented is True + ), "Testcase {} : Failed Error: IPV6 Prefix is not incremented on receiveing ".format( + tc_name + ) + write_test_footer(tc_name) + + +def test_verify_bgp_default_originate_in_EBGP_p0(request): + """ + Verify BGP default-originate route with EBGP peer + """ + tgen = get_topogen() + global BGP_CONVERGENCE + global topo + # test case name + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + check_router_status(tgen) + reset_config_on_routers(tgen) + + if BGP_CONVERGENCE != True: + pytest.skip("skipped because of BGP Convergence failure") + + step("Configure IPv4 and IPv6 , EBGP neighbor between R3 and R2") + step("Configure lPv4 and IPv6 Loopback interface on R3, R4 and R2") + step("Configure IPv4 and IPv6 IBGP neighbor between R4 and R3") + r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"] + r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"] + r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"] + r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"] + r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"] + + input_dict = { + "r0": { + "bgp": { + "local_as": r0_local_as, + } + }, + "r1": { + "bgp": { + "local_as": r1_local_as, + } + }, + "r2": { + "bgp": { + "local_as": r2_local_as, + } + }, + "r3": { + "bgp": { + "local_as": 4000, + } + }, + "r4": { + "bgp": { + "local_as": 4000, + } + }, + } + result = modify_as_number(tgen, topo, input_dict) + try: + assert result is True + except AssertionError: + logger.info("Expected behaviour: {}".format(result)) + logger.info("BGP config is not created because of invalid ASNs") + step("After changing the BGP AS Path Verify the BGP Convergence") + + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "Complete convergence is expeceted after changing the ASN os the routes ..! :Failed \n Error: {}".format( + BGP_CONVERGENCE + ) + + step(" Configure IPv4 and IPv6 static route on R3 next-hop on R4 interface") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r3": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + } + ] + } + } + result = create_static_routes(tgen, static_routes_input) + assert result is True, "Testcase {} : Failed to configure the static routes ....! Failed \n Error: {}".format( + tc_name, result + ) + step("verify IPv4 and IPv6 static route are configured and up on R1") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r3": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + } + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r3", static_routes_input) + assert result is True, "Testcase {} : Route is not found in {} in FIB ......! Failed \n Error: {}".format( + tc_name, static_routes_input,result + ) + + step( + "Configure redistribute static and connected on R3 and R4 for IPv4 and IPv6 address family " + ) + redistribute_static = { + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + }, + } + } + }, + "r4": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + }, + } + } + }, + } + result = create_router_bgp(tgen, topo, redistribute_static) + assert result is True, "Testcase {} : Failed to configure redistribute configuratin \n Error: {}".format(tc_name, result) + + step( + "After configuring redistribute command , verify static and connected routes ( loopback connected routes) are advertised on R2" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [R3_NETWORK_LOOPBACK[addr_type]], + "next_hop": R3_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R3_NETWORK_CONNECTED[addr_type]], + "next_hop": R3_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R4_NETWORK_LOOPBACK[addr_type]], + "next_hop": R4_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R4_NETWORK_CONNECTED[addr_type]], + "next_hop": R4_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : static & and connected routes are expected but not found in FIB .... ! \n Error: {}".format( + tc_name, result + ) + result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : static & and connected routes are expected but not found in RIB .... ! \n Error: {}".format( + tc_name, result + ) + snapshot1 = get_prefix_count_route(tgen, topo, dut="r2", peer="r3") + step( + "Configure Default originate on R3 for R3 to R2, on IPv4 and IPv6 BGP address family" + ) + local_as = get_dut_as_number(tgen, dut="r3") + default_originate_config = { + "r3": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": {"unicast": {"default_originate": {"r2": {}}}}, + "ipv6": {"unicast": {"default_originate": {"r2": {}}}}, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed to configure the default originate configuration \n Error: {}".format(tc_name, result) + + step( + "After configuring default-originate command , verify default routes are advertised on R2 on both BGP RIB and FIB" + ) + + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + }, + ] + } + } + + result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : static route from R1 {} and default route from R3 is expected in R2 FIB .....! NOT FOUND \n Error: {}".format( + tc_name, NETWORK1_1,result + ) + + result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : static route from R1 {} and default route from R3 is expected in R2 RIB .....! NOT FOUND \n Error: {}".format( + tc_name,NETWORK1_1, result + ) + + step( + "Verify default route for IPv4 and IPv6 present with path = ebgp as path, metric =0 " + ) + # local preference will bgp not applicable for eBGP + result = verify_rib_default_route( + tgen, + topo, + dut="r2", + routes=DEFAULT_ROUTES, + expected_nexthop=DEFAULT_ROUTE_NXT_HOP_R3, + metric=0, + expected_aspath="4000", + ) + assert result is True, "Testcase {} : Default route from R3 is expected with attributes in R2 RIB .....! NOT FOUND Error: {}".format(tc_name, result) + + step( + "Taking the snapshot2 of the prefix count after configuring the default originate" + ) + snapshot2 = get_prefix_count_route(tgen, topo, dut="r2", peer="r3") + step( + "Verify out-prefix count is incremented default route on IPv4 and IPv6 neighbor" + ) + isIPv4prefix_incremented = False + isIPv6prefix_incremented = False + if snapshot1["ipv4_count"] < snapshot2["ipv4_count"]: + isIPv4prefix_incremented = True + if snapshot1["ipv6_count"] < snapshot2["ipv6_count"]: + isIPv6prefix_incremented = True + + assert ( + isIPv4prefix_incremented is True + ), "Testcase {} : Failed Error: IPV4 Prefix is not incremented on receiveing ".format( + tc_name + ) + + assert ( + isIPv6prefix_incremented is True + ), "Testcase {} : Failed Error: IPV6 Prefix is not incremented on receiveing ".format( + tc_name + ) + write_test_footer(tc_name) + + +def test_verify_bgp_default_originate_in_IBGP_with_route_map_p0(request): + """ + test_verify_bgp_default_originate_in_IBGP_with_route_map_p0 + """ + tgen = get_topogen() + global BGP_CONVERGENCE + global topo + # test case name + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + check_router_status(tgen) + reset_config_on_routers(tgen) + + if BGP_CONVERGENCE != True: + pytest.skip("skipped because of BGP Convergence failure") + + step("Configure IPv4 and IPv6 , IBGP neighbor between R1 and R2") + step("Configure IPv4 and IPv6 , EBGP neighbor between R1 and R0") + r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"] + r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"] + r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"] + r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"] + r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"] + + input_dict = { + "r0": { + "bgp": { + "local_as": r0_local_as, + } + }, + "r1": { + "bgp": { + "local_as": 1000, + } + }, + "r2": { + "bgp": { + "local_as": 1000, + } + }, + "r3": { + "bgp": { + "local_as": r3_local_as, + } + }, + "r4": { + "bgp": { + "local_as": r4_local_as, + } + }, + } + result = modify_as_number(tgen, topo, input_dict) + try: + assert result is True + except AssertionError: + logger.info("Expected behaviour: {}".format(result)) + logger.info("BGP config is not created because of invalid ASNs") + + step("After changing the BGP AS Path Verify the BGP Convergence") + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "Complete convergence is expected after changing ASN ....! ERROR :Failed \n Error: {}".format( + BGP_CONVERGENCE + ) + + step("Configure 2 IPv4 and 2 IPv6 Static route on R0 with next-hop as Null0") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r0": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = create_static_routes(tgen, static_routes_input) + assert result is True, "Testcase {} : Static Configuration is Failed \n Error: {}".format( + tc_name, result + ) + + step("verify IPv4 and IPv6 static route are configured and up on R0") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r0": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r0", static_routes_input) + assert result is True, "Testcase {} : routes {} unable is not found in R0 FIB \n Error: {}".format( + tc_name, static_routes_input,result + ) + + step( + "Configure redistribute static on IPv4 and IPv6 address family on R0 for R0 to R1 neighbor " + ) + redistribute_static = { + "r0": { + "bgp": { + "address_family": { + "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + } + } + } + } + result = create_router_bgp(tgen, topo, redistribute_static) + assert result is True, "Testcase {} : Failed to configure redistribute static configuration....! \n Error: {}".format(tc_name, result) + + step("verify IPv4 and IPv6 static route are configured and up on R1") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r0": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r1", static_routes_input) + assert result is True, "Testcase {} : Failed... Routes {} expected in r0 FIB after configuring the redistribute config \n Error: {}".format( + tc_name,static_routes_input, result + ) + + result = verify_bgp_rib(tgen, addr_type, "r1", static_routes_input) + assert result is True, "Testcase {} : Failed... Routes {} expected in r0 RIB after configuring the redistribute config \n Error: {}".format( + tc_name, static_routes_input,result + ) + + step( + "Configure IPv4 prefix-list Pv4 and and IPv6 prefix-list Pv6 on R1 to match BGP route Sv41, Sv42, IPv6 route Sv61 Sv62 permit " + ) + input_dict_3 = { + "r1": { + "prefix_lists": { + "ipv4": { + "Pv4": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv4"], + "action": "permit", + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv4"], + "action": "permit", + }, + ] + }, + "ipv6": { + "Pv6": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv6"], + "action": "permit", + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv6"], + "action": "permit", + }, + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed to configure the prefix list \n Error: {}".format(tc_name, result) + + step( + "Configure IPV4 and IPv6 route-map (RMv4 and RMv6 ) matching prefix-list (Pv4 and Pv6) respectively on R1" + ) + input_dict_3 = { + "r1": { + "route_maps": { + "RMv4": [ + { + "action": "permit", + "seq_id": "1", + "match": {"ipv4": {"prefix_lists": "Pv4"}}, + }, + ], + "RMv6": [ + { + "action": "permit", + "seq_id": "1", + "match": {"ipv6": {"prefix_lists": "Pv6"}}, + }, + ], + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed to configure the route map \n Error: {}".format(tc_name, result) + + step( + "Configure default-originate with route-map (RMv4 and RMv6) on R1, on BGP IPv4 and IPv6 address family " + ) + local_as = get_dut_as_number(tgen, dut="r1") + default_originate_config = { + "r1": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": { + "unicast": {"default_originate": {"r2": {"route_map": "RMv4"}}} + }, + "ipv6": { + "unicast": {"default_originate": {"r2": {"route_map": "RMv6"}}} + }, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed to configure the default originate \n Error: {}".format(tc_name, result) + + step("Verify the default route is received in BGP RIB and FIB") + step( + "After configuring default-originate command , verify default routes are advertised on R2 " + ) + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + } + ] + } + } + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + ) + assert result is True, "Testcase {} : Failed...! Expected default route from R1 not found in FIB \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + ) + assert result is True, "Testcase {} : Failed...! Expected default route from R1 not found in RIB \n Error: {}".format( + tc_name, result + ) + step("Remove route-map RMv4 and RMv6 from default-originate command in R1") + NOTE = """ Configuring the default-originate should remove the previously applied default originate with condtional route-map""" + local_as = get_dut_as_number(tgen, dut="r1") + default_originate_config = { + "r1": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": {"unicast": {"default_originate": {"r2": {}}}}, + "ipv6": {"unicast": {"default_originate": {"r2": {}}}}, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed to remove the default originate conditional route-map \n Error: {}".format(tc_name, result) + + step( + "Verify BGP RIB and FIB After removing route-map , default route still present on R2" + ) + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + ) + assert result is True, "Testcase {} : Failed Default route from R1 is not found in FIB \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + ) + assert result is True, "Testcase {} : Failed Default route from R1 is not found in RIB \n Error: {}".format( + tc_name, result + ) + + step("Configure default-originate with route-map (RMv4 and RMv6) on R1 ") + local_as = get_dut_as_number(tgen, dut="r1") + default_originate_config = { + "r1": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": { + "unicast": { + "default_originate": { + "r2": { + "route_map": "RMv4", + } + } + } + }, + "ipv6": { + "unicast": { + "default_originate": { + "r2": { + "route_map": "RMv6", + } + } + } + }, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed to configure the Default originate route-map \n Error: {}".format(tc_name, result) + + step( + "After configuring default-originate command , verify default routes are advertised on R2 " + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + ) + assert result is True, "Testcase {} : Failed Default Route from R1 is not found in FIB \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + ) + assert result is True, "Testcase {} : Failed Default Route from R1 is not found in RIB \n Error: {}".format( + tc_name, result + ) + + step("Delete prefix list using no prefix-list") + input_dict_3 = { + "r1": { + "prefix_lists": { + "ipv4": { + "Pv4": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv4"], + "action": "permit", + "delete": True, + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv4"], + "action": "permit", + "delete": True, + }, + ] + }, + "ipv6": { + "Pv6": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv6"], + "action": "permit", + "delete": True, + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv6"], + "action": "permit", + "delete": True, + }, + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed to delete the prefix list Error: {}".format(tc_name, result) + + step( + "Verify BGP RIB and FIB After deleting prefix-list , verify IPv4 and IPv6 default route got removed from DUT " + ) + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + } + ] + } + } + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed\n After deleteing prefix default route is not expected in FIB \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n After deleteing prefix default route is not expected in RIB \n Error: {}".format( + tc_name, result + ) + + step("Configure prefix-list and delete route-map using no route-map") + input_dict_3 = { + "r1": { + "prefix_lists": { + "ipv4": { + "Pv4": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv4"], + "action": "permit", + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv4"], + "action": "permit", + }, + ] + }, + "ipv6": { + "Pv6": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv6"], + "action": "permit", + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv6"], + "action": "permit", + }, + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed to configure the prefix lists Error: {}".format(tc_name, result) + + step( + "After configuring the Prefixlist cross checking the BGP Default route is configured again , before deleting the route map" + ) + + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + } + ] + } + } + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + expected=True, + ) + assert result is True, "Testcase {} : Failed Default route from R1 is expected in FIB but not found \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + expected=True, + ) + assert result is True, "Testcase {} : Failed Default route from R1 is expected in RIB but not found \n Error: {}".format( + tc_name, result + ) + + step("Deleting the routemap") + input_dict = {"r1": {"route_maps": ["RMv4", "RMv6"]}} + result = delete_route_maps(tgen, input_dict) + assert result is True, "Testcase {} : Failed to delete the Route-map \n Error: {}".format(tc_name, result) + + step( + "Verify BGP RIB and FIB ,After deleting route-map , verify IPv4 and IPv6 default route got removed from DUT" + ) + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n After deleteing route-map default route is not expected in FIB \nError: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n After deleteing route-map default route is not expected in RIB \n Error: {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_verify_bgp_default_originate_in_EBGP_with_route_map_p0(request): + """ + test_verify_bgp_default_originate_in_EBGP_with_route_map_p0 + """ + tgen = get_topogen() + global BGP_CONVERGENCE + global topo + # test case name + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + check_router_status(tgen) + reset_config_on_routers(tgen) + + if BGP_CONVERGENCE != True: + pytest.skip("skipped because of BGP Convergence failure") + + step("Configure IPv4 and IPv6 , EBGP neighbor between R3 and R2") + step("Configure IPv4 and IPv6 IBGP neighbor between R3 and R4") + r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"] + r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"] + r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"] + r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"] + r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"] + + input_dict = { + "r0": { + "bgp": { + "local_as": r0_local_as, + } + }, + "r1": { + "bgp": { + "local_as": r1_local_as, + } + }, + "r2": { + "bgp": { + "local_as": r2_local_as, + } + }, + "r3": { + "bgp": { + "local_as": 4000, + } + }, + "r4": { + "bgp": { + "local_as": 4000, + } + }, + } + result = modify_as_number(tgen, topo, input_dict) + try: + assert result is True + except AssertionError: + logger.info("Expected behaviour: {}".format(result)) + logger.info("BGP config is not created because of invalid ASNs") + step("After changing the BGP AS Path Verify the BGP Convergence") + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format( + BGP_CONVERGENCE + ) + + step( + "Configure 2 IPv4 and 2 IPv6, Static route on R4 with next-hop as Null0 IPv4 route Sv41, Sv42, IPv6 route Sv61 Sv62" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r4": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = create_static_routes(tgen, static_routes_input) + assert result is True, "Testcase {} : Failed to configure the static routes \n Error: {}".format( + tc_name, result + ) + step("verify IPv4 and IPv6 static route are configured and up on R4") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r4": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r4", static_routes_input) + assert result is True, "Testcase {} : Failed Static route {} is not found in R4 FIB \n Error: {}".format( + tc_name, static_routes_input,result + ) + + step( + "Configure redistribute static on IPv4 and IPv6 address family on R4 for R4 to R3 neighbo" + ) + redistribute_static = { + "r4": { + "bgp": { + "address_family": { + "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + } + } + } + } + result = create_router_bgp(tgen, topo, redistribute_static) + assert result is True, "Testcase {} : Failed to configure the redistribute static \n Error: {}".format(tc_name, result) + + step("verify IPv4 and IPv6 static route are configured and up on R3") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r3": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r3", static_routes_input) + assert result is True, "Testcase {} : Failed static routes from R1 and R3 is not found in FIB \n Error: {}".format( + tc_name, result + ) + result = verify_bgp_rib(tgen, addr_type, "r3", static_routes_input) + assert result is True, "Testcase {} : Failed static routes from R1 and R3 is not found in RIB \n Error: {}".format( + tc_name, result + ) + + step( + "Configure IPv4 prefix-list Pv4 and and IPv6 prefix-list Pv6 on R3 so new route which is not present on R3" + ) + input_dict_3 = { + "r3": { + "prefix_lists": { + "ipv4": { + "Pv4": [ + { + "seqid": "1", + "network": NETWORK3_1["ipv4"], + "action": "permit", + } + ] + }, + "ipv6": { + "Pv6": [ + { + "seqid": "1", + "network": NETWORK3_1["ipv6"], + "action": "permit", + } + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed to configure the prefix lists \n Error: {}".format(tc_name, result) + + step("verify IPv4 and IPv6 Prefix list got configured on R3") + input_dict = {"r3": {"prefix_lists": ["Pv4", "Pv6"]}} + result = verify_prefix_lists(tgen, input_dict) + assert result is True, "Testcase {} : Failed ..! configured prefix lists {} are not found \n Error: {}".format(tc_name,input_dict, result) + + step( + "Configure IPv4 and IPv6 route-map ( RMv4 and RMv6 ) matching prefix-list (Pv4 and Pv6 ) respectively on R3" + ) + input_dict_3 = { + "r3": { + "route_maps": { + "RMv4": [ + { + "action": "permit", + "seq_id": "1", + "match": {"ipv4": {"prefix_lists": "Pv4"}}, + }, + ], + "RMv6": [ + { + "action": "permit", + "seq_id": "1", + "match": {"ipv6": {"prefix_lists": "Pv6"}}, + }, + ], + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed to configure the route-map \n Error: {}".format(tc_name, result) + step( + "Taking the snapshot of the prefix count before configuring the default originate" + ) + snapshot1 = get_prefix_count_route(tgen, topo, dut="r2", peer="r3") + step( + "Configure default-originate with IPv4 and IPv6 route-map (RMv4 and RMv6) on R3" + ) + local_as = get_dut_as_number(tgen, dut="r3") + default_originate_config = { + "r3": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": { + "unicast": {"default_originate": {"r2": {"route_map": "RMv4"}}} + }, + "ipv6": { + "unicast": {"default_originate": {"r2": {"route_map": "RMv6"}}} + }, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed to configure default-originate \n Error: {}".format(tc_name, result) + + step("Verify the default route is NOT received in BGP RIB and FIB on R2 ") + step( + "After configuring default-originate command , verify default routes are not Received on R2 " + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n Default route is not expected due to deny in prefix list \nError: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \nDefault route is not expected due to deny in prefix list\n Error: {}".format( + tc_name, result + ) + + step("Add route Sv41, Sv42, IPv6 route Sv61 Sv62 on prefix list Pv4 and Pv6") + input_dict_3 = { + "r3": { + "prefix_lists": { + "ipv4": { + "Pv4": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv4"], + "action": "permit", + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv4"], + "action": "permit", + }, + ] + }, + "ipv6": { + "Pv6": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv6"], + "action": "permit", + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv6"], + "action": "permit", + }, + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed to configure the prefix lists Error: {}".format(tc_name, result) + + step("Verify BGP default route for IPv4 and IPv6 is received on R2") + + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed Default routes are expected in R2 FIB from R3 but not found ....! \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed Default routes are expected in R2 RIB from R3 but not found ....! \n Error: {}".format( + tc_name, result + ) + + step("Remove route Sv41, Sv42, IPv6 route Sv61 Sv62 on prefix list Pv4 and Pv6") + input_dict_3 = { + "r3": { + "prefix_lists": { + "ipv4": { + "Pv4": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv4"], + "action": "permit", + "delete": True, + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv4"], + "action": "permit", + "delete": True, + }, + ] + }, + "ipv6": { + "Pv6": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv6"], + "action": "permit", + "delete": True, + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv6"], + "action": "permit", + "delete": True, + }, + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed to remove prefix-lists from R3 Error: {}".format(tc_name, result) + + step( + "After Removing route BGP default route for IPv4 and IPv6 is NOT received on R2" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n After Removing route in prefix list the default route is not expected in FIB \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n After Removing route in prefix list the default route is not expected in RIB\n Error: {}".format( + tc_name, result + ) + + step(" Add route Sv41, Sv42, IPv6 route Sv61 Sv62 on prefix list Pv4 and Pv6") + input_dict_3 = { + "r3": { + "prefix_lists": { + "ipv4": { + "Pv4": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv4"], + "action": "permit", + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv4"], + "action": "permit", + }, + ] + }, + "ipv6": { + "Pv6": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv6"], + "action": "permit", + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv6"], + "action": "permit", + }, + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result) + + step("Verify BGP default route for IPv4 and IPv6 is received on R2") + + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Change IPv4 and IPv6 prefix-list permit and deny ") + input_dict_3 = { + "r3": { + "prefix_lists": { + "ipv4": { + "Pv4": [ + {"seqid": "1", "network": NETWORK1_1["ipv4"], "action": "deny"}, + {"seqid": "2", "network": NETWORK2_1["ipv4"], "action": "deny"}, + ] + }, + "ipv6": { + "Pv6": [ + {"seqid": "1", "network": NETWORK1_1["ipv6"], "action": "deny"}, + {"seqid": "2", "network": NETWORK2_1["ipv6"], "action": "deny"}, + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result) + + step("Verify BGP default route for IPv4 and IPv6 is not received on R2") + + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n after denying the prefix list default route is not expected in FIB \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n after denying the prefix list default route is not expected in RIB \n Error: {}".format( + tc_name, result + ) + + step("Change IPv4 and IPv6 prefix-list deny to permit ") + input_dict_3 = { + "r3": { + "prefix_lists": { + "ipv4": { + "Pv4": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv4"], + "action": "permit", + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv4"], + "action": "permit", + }, + ] + }, + "ipv6": { + "Pv6": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv6"], + "action": "permit", + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv6"], + "action": "permit", + }, + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result) + + step("Verify BGP default route for IPv4 and IPv6 is received on R2") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "Taking the snapshot2 of the prefix count after configuring the default originate" + ) + snapshot2 = get_prefix_count_route(tgen, topo, dut="r2", peer="r3") + + step("verifying the prefix count incrementing or not ") + isIPv4prefix_incremented = False + isIPv6prefix_incremented = False + if snapshot1["ipv4_count"] < snapshot2["ipv4_count"]: + isIPv4prefix_incremented = True + if snapshot1["ipv6_count"] < snapshot2["ipv6_count"]: + isIPv6prefix_incremented = True + + assert ( + isIPv4prefix_incremented is True + ), "Testcase {} : Failed Error: IPV4 Prefix is not incremented on receiveing ".format( + tc_name + ) + + assert ( + isIPv6prefix_incremented is True + ), "Testcase {} : Failed Error: IPV6 Prefix is not incremented on receiveing ".format( + tc_name + ) + + step( + "Configure another IPv4 and IPv6 route-map and match same prefix-list (Sv41, Sv42, IPv6 route Sv61 Sv62) with deny statement " + ) + input_dict_3 = { + "r3": { + "route_maps": { + "RMv41": [ + { + "action": "deny", + "seq_id": "1", + "match": {"ipv4": {"prefix_lists": "Pv4"}}, + }, + ], + "RMv61": [ + { + "action": "deny", + "seq_id": "1", + "match": {"ipv6": {"prefix_lists": "Pv6"}}, + }, + ], + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Attach route-map on IPv4 and IP6 BGP neighbor on fly") + local_as = get_dut_as_number(tgen, dut="r3") + default_originate_config = { + "r3": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": { + "unicast": {"default_originate": {"r2": {"route_map": "RMv41"}}} + }, + "ipv6": { + "unicast": {"default_originate": {"r2": {"route_map": "RMv61"}}} + }, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "After attaching route-map verify IPv4 and IPv6 default route is withdrawn from the R2" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + expected=False, + ) + assert result is not True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + expected=False, + ) + assert result is not True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Change the recently added Routemap from deny to permit") + input_dict_3 = { + "r3": { + "route_maps": { + "RMv41": [ + { + "action": "permit", + "seq_id": "1", + "match": {"ipv4": {"prefix_lists": "Pv4"}}, + }, + ], + "RMv61": [ + { + "action": "permit", + "seq_id": "1", + "match": {"ipv6": {"prefix_lists": "Pv6"}}, + }, + ], + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Verify IPv4 and IPv6 default route is advertised from the R2") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "Delete default-originate route-map command while configuring ( neighbor x.x.x default-originate) for IPv4 and IPv6 BGP neighbor " + ) + """ Configuring the Default originate on neighbor must remove the previously assigned deault-originate with routemap config """ + local_as = get_dut_as_number(tgen, dut="r3") + default_originate_config = { + "r3": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": {"unicast": {"default_originate": {"r2": {}}}}, + "ipv6": {"unicast": {"default_originate": {"r2": {}}}}, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "Verify in running config from BGP that default-originate with route-map command is removed and default-originate command is still present and default route for IPv4 and IPv6 present in RIB and FIB" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "Configure default-originate with conditional route-map command on IPv4 and IPv6 address family " + ) + local_as = get_dut_as_number(tgen, dut="r3") + default_originate_config = { + "r3": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": { + "unicast": {"default_originate": {"r2": {"route_map": "RMv41"}}} + }, + "ipv6": { + "unicast": {"default_originate": {"r2": {"route_map": "RMv61"}}} + }, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "Verify in running config from BGP that default-originate with route-map command is present and default route for IPv4 and IPv6 present" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "Delete default originate with 'no bgp default-originate' from IPV4 and IPV6 address family " + ) + local_as = get_dut_as_number(tgen, dut="r3") + default_originate_config = { + "r3": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": { + "unicast": {"default_originate": {"r2": {"delete": True}}} + }, + "ipv6": { + "unicast": {"default_originate": {"r2": {"delete": True}}} + }, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + " Verify in running config from BGP that default-originate complete CLI is removed for IPV4 and IPV6 address family and default originate routes got deleted" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n Default Route is not expected in FIB \nError: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n Default Route is not expected in RIB\nError: {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) diff --git a/tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_2.py b/tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_2.py new file mode 100644 index 000000000..a9987a8f9 --- /dev/null +++ b/tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_2.py @@ -0,0 +1,2437 @@ +#!/usr/bin/env python +# +# Copyright (c) 2022 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF") +# in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# Shreenidhi A R <rshreenidhi@vmware.com> +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# +""" +Following tests are covered. +5. Verify BGP default originate route-map with OUT route-map +6. Verify BGP default originate route-map with IN route-map +8. Verify BGP default route after removing default-originate +9. Verify default-originate route with GR +""" +import os +import sys +import time +import pytest +from time import sleep +from copy import deepcopy +from lib.topolog import logger + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from lib.topogen import Topogen, get_topogen +from lib.topojson import build_config_from_json +from lib.topolog import logger + +from lib.bgp import ( + verify_bgp_convergence, + verify_graceful_restart, + create_router_bgp, + verify_router_id, + modify_as_number, + verify_as_numbers, + clear_bgp_and_verify, + clear_bgp, + verify_bgp_rib, + get_prefix_count_route, + get_dut_as_number, + verify_rib_default_route, + verify_fib_default_route, + verify_bgp_advertised_routes_from_neighbor, + verify_bgp_received_routes_from_neighbor, +) +from lib.common_config import ( + interface_status, + verify_prefix_lists, + verify_fib_routes, + kill_router_daemons, + start_router_daemons, + shutdown_bringup_interface, + step, + required_linux_kernel_version, + stop_router, + start_router, + create_route_maps, + create_prefix_lists, + get_frr_ipv6_linklocal, + start_topology, + write_test_header, + check_address_types, + write_test_footer, + reset_config_on_routers, + create_static_routes, + check_router_status, + delete_route_maps, +) + + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) +sys.path.append(os.path.join(CWD, "../lib/")) + +# Required to instantiate the topology builder class. + +# pylint: disable=C0413 +# Import topogen and topotest helpers + +# Global variables +topo = None +KEEPALIVETIMER = 1 +HOLDDOWNTIMER = 3 +# Global variables +NETWORK1_1 = {"ipv4": "1.1.1.1/32", "ipv6": "1::1/128"} +NETWORK1_2 = {"ipv4": "1.1.1.2/32", "ipv6": "1::2/128"} +NETWORK2_1 = {"ipv4": "2.1.1.1/32", "ipv6": "2::1/128"} +NETWORK2_2 = {"ipv4": "2.1.1.2/32", "ipv6": "2::2/128"} +NETWORK3_1 = {"ipv4": "3.1.1.1/32", "ipv6": "3::1/128"} +NETWORK3_2 = {"ipv4": "3.1.1.2/32", "ipv6": "3::2/128"} +NETWORK4_1 = {"ipv4": "4.1.1.1/32", "ipv6": "4::1/128"} +NETWORK4_2 = {"ipv4": "4.1.1.2/32", "ipv6": "4::2/128"} +NETWORK5_1 = {"ipv4": "5.1.1.1/32", "ipv6": "5::1/128"} +NETWORK5_2 = {"ipv4": "5.1.1.2/32", "ipv6": "5::2/128"} +DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} +NEXT_HOP_IP = {"ipv4": "Null0", "ipv6": "Null0"} + +IPV4_RM = "RMVIPV4" +IPV6_RM = "RMVIPV6" + +IPV4_RM1 = "RMVIPV41" +IPV6_RM1 = "RMVIPV61" + +IPV4_RM2 = "RMVIPV42" +IPV6_RM2 = "RMVIPV62" + +IPV4_PL_1 = "PV41" +IPV4_PL_2 = "PV42" + +IPV6_PL_1 = "PV61" +IPV6_PL_2 = "PV62" + + +r1_ipv4_loopback = "1.0.1.0/24" +r2_ipv4_loopback = "1.0.2.0/24" +r3_ipv4_loopback = "1.0.3.0/24" +r4_ipv4_loopback = "1.0.4.0/24" +r1_ipv6_loopback = "2001:db8:f::1:0/120" +r2_ipv6_loopback = "2001:db8:f::2:0/120" +r3_ipv6_loopback = "2001:db8:f::3:0/120" +r4_ipv6_loopback = "2001:db8:f::4:0/120" + +r0_connected_address_ipv4 = "192.168.0.0/24" +r0_connected_address_ipv6 = "fd00::/64" +r1_connected_address_ipv4 = "192.168.1.0/24" +r1_connected_address_ipv6 = "fd00:0:0:1::/64" +r3_connected_address_ipv4 = "192.168.2.0/24" +r3_connected_address_ipv6 = "fd00:0:0:2::/64" +r4_connected_address_ipv4 = "192.168.3.0/24" +r4_connected_address_ipv6 = "fd00:0:0:3::/64" + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + # Required linux kernel version for this suite to run. + result = required_linux_kernel_version("4.15") + if result is not True: + pytest.skip("Kernel requirements are not met") + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("=" * 40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + json_file = "{}/bgp_default_originate_topo1.json".format(CWD) + tgen = Topogen(json_file, mod.__name__) + global topo + topo = tgen.json_topo + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start daemons and then start routers + start_topology(tgen) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + global ADDR_TYPES + global BGP_CONVERGENCE + global DEFAULT_ROUTES + global DEFAULT_ROUTE_NXT_HOP_R1, DEFAULT_ROUTE_NXT_HOP_R3 + global R0_NETWORK_LOOPBACK, R0_NETWORK_LOOPBACK_NXTHOP, R1_NETWORK_LOOPBACK, R1_NETWORK_LOOPBACK_NXTHOP + global R0_NETWORK_CONNECTED, R0_NETWORK_CONNECTED_NXTHOP, R1_NETWORK_CONNECTED, R1_NETWORK_CONNECTED_NXTHOP + global R4_NETWORK_LOOPBACK, R4_NETWORK_LOOPBACK_NXTHOP, R3_NETWORK_LOOPBACK, R3_NETWORK_LOOPBACK_NXTHOP + global R4_NETWORK_CONNECTED, R4_NETWORK_CONNECTED_NXTHOP, R3_NETWORK_CONNECTED, R3_NETWORK_CONNECTED_NXTHOP + + ADDR_TYPES = check_address_types() + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format( + BGP_CONVERGENCE + ) + # There are the global varibles used through out the file these are acheived only after building the topology. + + r0_loopback_address_ipv4 = topo["routers"]["r0"]["links"]["lo"]["ipv4"] + r0_loopback_address_ipv4_nxt_hop = topo["routers"]["r0"]["links"]["r1"][ + "ipv4" + ].split("/")[0] + r0_loopback_address_ipv6 = topo["routers"]["r0"]["links"]["lo"]["ipv6"] + r0_loopback_address_ipv6_nxt_hop = topo["routers"]["r0"]["links"]["r1"][ + "ipv6" + ].split("/")[0] + + r1_loopback_address_ipv4 = topo["routers"]["r1"]["links"]["lo"]["ipv4"] + r1_loopback_address_ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"][ + "ipv4" + ].split("/")[0] + r1_loopback_address_ipv6 = topo["routers"]["r1"]["links"]["lo"]["ipv6"] + r1_loopback_address_ipv6_nxt_hop = topo["routers"]["r1"]["links"]["r2"][ + "ipv6" + ].split("/")[0] + + r4_loopback_address_ipv4 = topo["routers"]["r4"]["links"]["lo"]["ipv4"] + r4_loopback_address_ipv4_nxt_hop = topo["routers"]["r4"]["links"]["r3"][ + "ipv4" + ].split("/")[0] + r4_loopback_address_ipv6 = topo["routers"]["r4"]["links"]["lo"]["ipv6"] + r4_loopback_address_ipv6_nxt_hop = topo["routers"]["r4"]["links"]["r3"][ + "ipv6" + ].split("/")[0] + + r3_loopback_address_ipv4 = topo["routers"]["r3"]["links"]["lo"]["ipv4"] + r3_loopback_address_ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"][ + "ipv4" + ].split("/")[0] + r3_loopback_address_ipv6 = topo["routers"]["r3"]["links"]["lo"]["ipv6"] + r3_loopback_address_ipv6_nxt_hop = topo["routers"]["r3"]["links"]["r2"][ + "ipv6" + ].split("/")[0] + + R0_NETWORK_LOOPBACK = { + "ipv4": r0_loopback_address_ipv4, + "ipv6": r0_loopback_address_ipv6, + } + R0_NETWORK_LOOPBACK_NXTHOP = { + "ipv4": r0_loopback_address_ipv4_nxt_hop, + "ipv6": r0_loopback_address_ipv6_nxt_hop, + } + + R1_NETWORK_LOOPBACK = { + "ipv4": r1_loopback_address_ipv4, + "ipv6": r1_loopback_address_ipv6, + } + R1_NETWORK_LOOPBACK_NXTHOP = { + "ipv4": r1_loopback_address_ipv4_nxt_hop, + "ipv6": r1_loopback_address_ipv6_nxt_hop, + } + + R0_NETWORK_CONNECTED = { + "ipv4": r0_connected_address_ipv4, + "ipv6": r0_connected_address_ipv6, + } + R0_NETWORK_CONNECTED_NXTHOP = { + "ipv4": r0_loopback_address_ipv4_nxt_hop, + "ipv6": r0_loopback_address_ipv6_nxt_hop, + } + + R1_NETWORK_CONNECTED = { + "ipv4": r1_connected_address_ipv4, + "ipv6": r1_connected_address_ipv6, + } + R1_NETWORK_CONNECTED_NXTHOP = { + "ipv4": r1_loopback_address_ipv4_nxt_hop, + "ipv6": r1_loopback_address_ipv6_nxt_hop, + } + + R4_NETWORK_LOOPBACK = { + "ipv4": r4_loopback_address_ipv4, + "ipv6": r4_loopback_address_ipv6, + } + R4_NETWORK_LOOPBACK_NXTHOP = { + "ipv4": r4_loopback_address_ipv4_nxt_hop, + "ipv6": r4_loopback_address_ipv6_nxt_hop, + } + + R3_NETWORK_LOOPBACK = { + "ipv4": r3_loopback_address_ipv4, + "ipv6": r3_loopback_address_ipv6, + } + R3_NETWORK_LOOPBACK_NXTHOP = { + "ipv4": r3_loopback_address_ipv4_nxt_hop, + "ipv6": r3_loopback_address_ipv6_nxt_hop, + } + + R4_NETWORK_CONNECTED = { + "ipv4": r4_connected_address_ipv4, + "ipv6": r4_connected_address_ipv6, + } + R4_NETWORK_CONNECTED_NXTHOP = { + "ipv4": r4_loopback_address_ipv4_nxt_hop, + "ipv6": r4_loopback_address_ipv6_nxt_hop, + } + + R3_NETWORK_CONNECTED = { + "ipv4": r3_connected_address_ipv4, + "ipv6": r3_connected_address_ipv6, + } + R3_NETWORK_CONNECTED_NXTHOP = { + "ipv4": r3_loopback_address_ipv4_nxt_hop, + "ipv6": r3_loopback_address_ipv6_nxt_hop, + } + + # populating the nexthop for default routes + + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + + interface = topo["routers"]["r1"]["links"]["r2"]["interface"] + ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r1", intf=interface) + ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0] + ipv6_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv6"].split("/")[0] + DEFAULT_ROUTE_NXT_HOP_R1 = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local} + + interface = topo["routers"]["r3"]["links"]["r2"]["interface"] + ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r3", intf=interface) + ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0] + ipv6_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv6"].split("/")[0] + DEFAULT_ROUTE_NXT_HOP_R3 = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local} + + logger.info("Running setup_module() done") + + +def teardown_module(): + """Teardown the pytest environment""" + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + logger.info( + "Testsuite end time: {}".format(time.asctime(time.localtime(time.time()))) + ) + logger.info("=" * 40) + + +##################################################### +# +# Local API's +# +##################################################### + + +def configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut, peer): + """ + This function groups the repetitive function calls into one function. + """ + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + return True + + +##################################################### +# +# Testcases +# +##################################################### + + +def test_verify_bgp_default_originate_route_map_in_OUT_p1(request): + """ + test_verify_bgp_default_originate_route_map_in_OUT_p1 + """ + tgen = get_topogen() + global BGP_CONVERGENCE + global topo + # test case name + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + check_router_status(tgen) + reset_config_on_routers(tgen) + + if BGP_CONVERGENCE != True: + pytest.skip("skipped because of BGP Convergence failure") + + step("Configure IPv4 and IPv6 , EBGP neighbor between R3 and R2") + step("Configure IPv4 and IPv6 IBGP neighbor between R3 and R4") + r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"] + r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"] + r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"] + r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"] + r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"] + input_dict = { + "r0": { + "bgp": { + "local_as": r0_local_as, + } + }, + "r1": { + "bgp": { + "local_as": r1_local_as, + } + }, + "r2": { + "bgp": { + "local_as": r2_local_as, + } + }, + "r3": { + "bgp": { + "local_as": 4000, + } + }, + "r4": { + "bgp": { + "local_as": 4000, + } + }, + } + result = modify_as_number(tgen, topo, input_dict) + try: + assert result is True + except AssertionError: + logger.info("Expected behaviour: {}".format(result)) + logger.info("BGP config is not created because of invalid ASNs") + step("After changing the BGP AS Path Verify the BGP Convergence") + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format( + BGP_CONVERGENCE + ) + + step( + "Configure 2 IPv4 and 2 IPv6, Static route on R4 with next-hop as Null0 IPv4 route Sv41, Sv42, IPv6 route Sv61 Sv62" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r4": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = create_static_routes(tgen, static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("verify IPv4 and IPv6 static route are configured and up on R4") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r4": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r4", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Configure redistribute static knob on R4 , for R4 to R3 neighbor ") + redistribute_static = { + "r4": { + "bgp": { + "address_family": { + "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + } + } + } + } + result = create_router_bgp(tgen, topo, redistribute_static) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + expected_routes = { + "ipv4": [ + {"network": NETWORK1_1["ipv4"], "nexthop": NEXT_HOP_IP["ipv4"]}, + {"network": NETWORK2_1["ipv4"], "nexthop": NEXT_HOP_IP["ipv4"]}, + ], + "ipv6": [ + {"network": NETWORK1_1["ipv6"], "nexthop": NEXT_HOP_IP["ipv4"]}, + {"network": NETWORK2_1["ipv6"], "nexthop": NEXT_HOP_IP["ipv4"]}, + ], + } + result = verify_bgp_advertised_routes_from_neighbor( + tgen, topo, dut="r4", peer="r3", expected_routes=expected_routes + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "After redistribute static verify the routes is recevied in router R3 in RIB and FIB" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r3": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r3", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + result = verify_bgp_rib(tgen, addr_type, "r3", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "Configure IPv4 prefix-list Pv4 and and IPv6 prefix-list Pv6 on R3 to match BGP route Sv41, IPv6 route Sv61 with permit option " + ) + input_dict_3 = { + "r3": { + "prefix_lists": { + "ipv4": { + "Pv4": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv4"], + "action": "permit", + } + ] + }, + "ipv6": { + "Pv6": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv6"], + "action": "permit", + } + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("verify IPv4 and IPv6 Prefix list got configured on R3") + input_dict = {"r3": {"prefix_lists": ["Pv4", "Pv6"]}} + result = verify_prefix_lists(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "Configure IPv4 and IPv6 route-map RMv4 and RMv6 matching prefix-list Pv4 and Pv6 with permit option " + ) + input_dict_3 = { + "r3": { + "route_maps": { + "RM4": [ + { + "action": "permit", + "seq_id": "1", + "match": {"ipv4": {"prefix_lists": "Pv4"}}, + }, + ], + "RM6": [ + { + "action": "permit", + "seq_id": "1", + "match": {"ipv6": {"prefix_lists": "Pv6"}}, + }, + ], + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "Configure IPv4 prefix-list Pv42 and and IPv6 prefix-list Pv62 on R3 to match BGP route Sv42, IPv6 route Sv62 with deny option" + ) + input_dict_3 = { + "r3": { + "prefix_lists": { + "ipv4": { + "Pv42": [ + {"seqid": "1", "network": NETWORK2_1["ipv4"], "action": "deny"} + ] + }, + "ipv6": { + "Pv62": [ + {"seqid": "1", "network": NETWORK2_1["ipv6"], "action": "deny"} + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("verify IPv4 and IPv6 Prefix list got configured on R3") + input_dict = {"r3": {"prefix_lists": ["Pv42", "Pv62"]}} + result = verify_prefix_lists(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "Configure IPv4 and IPv6 route-map (RMv42 and RMv62 )matching prefix-list Pv42 and Pv62 with permit option " + ) + input_dict_3 = { + "r3": { + "route_maps": { + "RMv42": [ + { + "action": "permit", + "seq_id": "1", + "match": {"ipv4": {"prefix_lists": "Pv42"}}, + }, + ], + "RMv62": [ + { + "action": "permit", + "seq_id": "1", + "match": {"ipv6": {"prefix_lists": "Pv62"}}, + }, + ], + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "Apply IPv4 and IPv6 route-map RMv4 and RMv6 with default-originate on R3 , for R3 to R2 peers and Apply IPv4 and IPv6 out route-map RMv42 and RMv62 on R3 , for R3 to R2 peers " + ) + local_as = get_dut_as_number(tgen, "r3") + default_originate_config = { + "r3": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": { + "unicast": {"default_originate": {"r2": {"route_map": "RM4"}}} + }, + "ipv6": { + "unicast": {"default_originate": {"r2": {"route_map": "RM6"}}} + }, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + updated_topo = topo + updated_topo["routers"]["r0"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r0") + updated_topo["routers"]["r1"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r1") + updated_topo["routers"]["r2"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r2") + updated_topo["routers"]["r3"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r3") + updated_topo["routers"]["r4"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r4") + + step( + "Apply IPv4 and IPv6 route-map RMv42 and RMv62 on R3 (OUT Direction), for R3 to R2 peers " + ) + input_dict_4 = { + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": { + "route_maps": [ + {"name": "RMv42", "direction": "out"} + ] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": { + "route_maps": [ + {"name": "RMv62", "direction": "out"} + ] + } + } + } + } + } + }, + } + } + } + } + + result = create_router_bgp(tgen, updated_topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + NOTE = """ + After applying route-map on neighbor verify default BGP route IPv4 IPv6 route populated in R2 BGP and routing table , verify using "show ip bgp json" "show ipv6 bgp json" "show ip route json" "show ip route json" + Sv42 and Sv62 route should not be present on R2 + """ + step(NOTE) + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + + result = verify_fib_routes( + tgen, addr_type, "r2", static_routes_input, expected=False + ) + assert ( + result is not True + ), "Testcase {} : Failed \n Static routes are not expected due to conditions \nError: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, addr_type, "r2", static_routes_input, expected=False + ) + assert ( + result is not True + ), "Testcase {} : Failed \n Static routes are not expected due to conditions\n Error: {}".format( + tc_name, result + ) + + step("Change IPv4 prefix-list Pv42 and and IPv6 prefix-list Pv62 deny to permit") + input_dict_3 = { + "r3": { + "prefix_lists": { + "ipv4": { + "Pv42": [ + { + "seqid": "1", + "network": NETWORK2_1["ipv4"], + "action": "permit", + } + ] + }, + "ipv6": { + "Pv62": [ + { + "seqid": "1", + "network": NETWORK2_1["ipv6"], + "action": "permit", + } + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("verify IPv4 and IPv6 Prefix list got configured on R3") + input_dict = {"r3": {"prefix_lists": ["Pv42", "Pv62"]}} + result = verify_prefix_lists(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + NOTE = """Default BGP route and IPv4 ( Sv42) , IPv6 (Sv62) route populated in R2 BGP and routing table""" + step(NOTE) + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + } + ] + } + } + + result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("IPv4 prefix-list Pv4 and and IPv6 prefix-list Pv6 permit to deny ") + input_dict_3 = { + "r3": { + "prefix_lists": { + "ipv4": { + "Pv4": [ + {"seqid": "1", "network": NETWORK1_1["ipv4"], "action": "deny"} + ] + }, + "ipv6": { + "Pv6": [ + {"seqid": "1", "network": NETWORK1_1["ipv6"], "action": "deny"} + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + NOTE = """ + Verify default-originate route (IPv4 and IPv6 ) not present on R2 + IPv4 ( Sv42) , IPv6 (Sv62) route populated in R2 BGP + """ + step(NOTE) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + } + ] + } + } + + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n default-route in FIB is not expected due to conditions \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type], + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n default-route in RIB is not expected due to conditions \n Error: {}".format( + tc_name, result + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + } + ] + } + } + + result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + +def test_verify_bgp_default_originate_route_map_in_IN_p1(request): + """Verify BGP default originate route-map with IN route-map""" + tgen = get_topogen() + global BGP_CONVERGENCE + global topo + # test case name + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + check_router_status(tgen) + reset_config_on_routers(tgen) + + if BGP_CONVERGENCE != True: + pytest.skip("skipped because of BGP Convergence failure") + + step("Configure IPv4 and IPv6 , EBGP neighbor between R1 and R2") + step("Configure IPv4 and IPv6 , IBGP neighbor between R1 and R0") + r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"] + r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"] + r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"] + r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"] + r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"] + input_dict = { + "r0": { + "bgp": { + "local_as": 1000, + } + }, + "r1": { + "bgp": { + "local_as": 1000, + } + }, + "r2": { + "bgp": { + "local_as": r2_local_as, + } + }, + "r3": { + "bgp": { + "local_as": r3_local_as, + } + }, + "r4": { + "bgp": { + "local_as": r4_local_as, + } + }, + } + result = modify_as_number(tgen, topo, input_dict) + try: + assert result is True + except AssertionError: + logger.info("Expected behaviour: {}".format(result)) + logger.info("BGP config is not created because of invalid ASNs") + step("After changing the BGP AS Path Verify the BGP Convergence") + + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format( + BGP_CONVERGENCE + ) + + step( + "Configure 2 IPv4 and 2 IPv6, Static route on R0 with next-hop as Null0 IPv4 route Sv41, Sv42, IPv6 route Sv61 Sv62" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r0": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = create_static_routes(tgen, static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("verifyIPv4 and IPv6 static routes are configure and up on R0 ") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r0": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r0", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "Configure redistribute static knob on R0 , for R0 to R1 IPv4 and IPv6 neighbor" + ) + redistribute_static = { + "r0": { + "bgp": { + "address_family": { + "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + } + } + } + } + result = create_router_bgp(tgen, topo, redistribute_static) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Verify IPv4 and IPv6 route received on R1 ") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r1": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r1", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib(tgen, addr_type, "r1", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "Configure IPv4 prefix-list Pv4 and and IPv6 prefix-list Pv6 on R1 to match BGP route Sv41, Sv42, IPv6 route Sv61 Sv62" + ) + input_dict_3 = { + "r1": { + "prefix_lists": { + "ipv4": { + "Pv4": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv4"], + "action": "permit", + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv4"], + "action": "permit", + }, + ] + }, + "ipv6": { + "Pv6": [ + { + "seqid": "1", + "network": NETWORK1_1["ipv6"], + "action": "permit", + }, + { + "seqid": "2", + "network": NETWORK2_1["ipv6"], + "action": "permit", + }, + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("verify IPv4 and IPv6 Prefix list got configured on R1") + input_dict = {"r1": {"prefix_lists": ["Pv4", "Pv6"]}} + result = verify_prefix_lists(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "Configure IPv4 and IPv6 route-map RMv4 and RMv6 matching prefix-list Pv4 and Pv6 with deny option on R1" + ) + input_dict_3 = { + "r1": { + "route_maps": { + "RMv4": [ + { + "action": "deny", + "seq_id": "1", + "match": {"ipv4": {"prefix_lists": "Pv4"}}, + }, + ], + "RMv6": [ + { + "action": "deny", + "seq_id": "1", + "match": {"ipv6": {"prefix_lists": "Pv6"}}, + }, + ], + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Apply route-map IN direction in R1 (R1 to R0) IPv4 and IPv6 neighbor") + updated_topo = topo + updated_topo["routers"]["r0"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r0") + updated_topo["routers"]["r1"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r1") + updated_topo["routers"]["r2"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r2") + updated_topo["routers"]["r3"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r3") + updated_topo["routers"]["r4"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r4") + + local_as_r1 = get_dut_as_number(tgen, dut="r1") + input_dict_4 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r0": { + "dest_link": { + "r1": { + "route_maps": [ + { + "name": "RMv4", + "direction": "in", + } + ] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r0": { + "dest_link": { + "r1": { + "route_maps": [ + { + "name": "RMv6", + "direction": "in", + } + ] + } + } + } + } + } + }, + } + } + } + } + + result = create_router_bgp(tgen, updated_topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + STEP = "After applying route-map verify that IPv4 route Sv41, Sv42, IPv6 route Sv61 Sv62 should not present on R1 BGP and routing table " + step(STEP) + + step( + "After applying route-map verify that IPv4 route Sv41, Sv42, IPv6 route Sv61 Sv62 should not present on R1 " + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r1": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + + result = verify_fib_routes( + tgen, addr_type, "r1", static_routes_input, expected=False + ) + assert ( + result is not True + ), "Testcase {} : Failed \n default-route in FIB is not expected due to conditions \nError: {}".format( + tc_name, result + ) + + result = verify_bgp_rib( + tgen, addr_type, "r1", static_routes_input, expected=False + ) + assert ( + result is not True + ), "Testcase {} : Failed \n default-route in FIB is not expected due to conditions \nError: {}".format( + tc_name, result + ) + # Routes should come to dut but not the shown in RIB thus verifying using show ip bgp nbr xxx received route + step( + " Verify the received routes \n using 'show ip bgp nbr xxx received route' in Router R1" + ) + expected_routes = { + "ipv4": [ + {"network": NETWORK1_1["ipv4"], "nexthop": NEXT_HOP_IP["ipv4"]}, + {"network": NETWORK2_1["ipv4"], "nexthop": NEXT_HOP_IP["ipv4"]}, + ], + "ipv6": [ + {"network": NETWORK1_1["ipv6"], "nexthop": NEXT_HOP_IP["ipv6"]}, + {"network": NETWORK2_1["ipv6"], "nexthop": NEXT_HOP_IP["ipv6"]}, + ], + } + result = verify_bgp_received_routes_from_neighbor( + tgen, topo, dut="r1", peer="r0", expected_routes=expected_routes + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Configure default-originate on R1 for R1 to R2 IPv4 and IPv6 neighbor ") + local_as_r1 = get_dut_as_number(tgen, dut="r1") + default_originate_config = { + "r1": { + "bgp": { + "local_as": local_as_r1, + "address_family": { + "ipv4": {"unicast": {"default_originate": {"r2": {}}}}, + "ipv6": {"unicast": {"default_originate": {"r2": {}}}}, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "Verify Default originate knob is configured and default route advertised to R2 , verify on R1 " + ) + expected_routes = { + "ipv4": [ + {"network": "0.0.0.0/0", "nexthop": ""}, + ], + "ipv6": [ + {"network": "::/0", "nexthop": ""}, + ], + } + result = verify_bgp_advertised_routes_from_neighbor( + tgen, topo, dut="r1", peer="r2", expected_routes=expected_routes + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Verify the Default route Route in FIB in R2") + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + + for addr_type in ADDR_TYPES: + static_routes_input = { + "r1": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + } + ] + } + } + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Change route-map RMv4 and RMv6 from deny to permit") + input_dict_3 = { + "r1": { + "route_maps": { + "RMv4": [ + { + "action": "permit", + "seq_id": "1", + "match": {"ipv4": {"prefix_lists": "Pv4"}}, + }, + ], + "RMv6": [ + { + "action": "permit", + "seq_id": "1", + "match": {"ipv6": {"prefix_lists": "Pv6"}}, + }, + ], + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + NOTE = """After changing route-map to permit verify that IPv4 routes Sv41, Sv42, IPv6 routes Sv61 Sv62 present on R1 BGP and routing table , using "show ip route " "show ip bgp nbr xxx received route " "show ipv6 route " "show ipv6 bgp nbr xxx receied route """ + step(NOTE) + expected_routes = { + "ipv4": [{"network": NETWORK1_1["ipv4"], "nexthop": NEXT_HOP_IP["ipv4"]}], + "ipv6": [{"network": NETWORK1_1["ipv6"], "nexthop": NEXT_HOP_IP["ipv4"]}], + } + result = verify_bgp_received_routes_from_neighbor( + tgen, topo, dut="r1", peer="r0", expected_routes=expected_routes + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r1": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + ] + } + } + result = verify_bgp_rib(tgen, addr_type, "r1", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + result = verify_fib_routes(tgen, addr_type, "r1", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Configure default static route (IPv4 and IPv6) on R2 nexthop as R1 ") + NEXT_HOP_IP_R1 = {} + r1_r2_ipv4_neighbor = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0] + r1_r2_ipv6_neighbor = topo["routers"]["r1"]["links"]["r2"]["ipv6"].split("/")[0] + NEXT_HOP_IP_R1["ipv4"] = r1_r2_ipv4_neighbor + NEXT_HOP_IP_R1["ipv6"] = r1_r2_ipv6_neighbor + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": "0.0.0.0/0", + "next_hop": NEXT_HOP_IP_R1["ipv4"], + }, + { + "network": "0::0/0", + "next_hop": NEXT_HOP_IP_R1["ipv6"], + }, + ] + } + } + result = create_static_routes(tgen, static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "Verify Static default route is taking preference over BGP default routes , BGP default route is inactive IN RIB and static is up and installed in RIB and FIB " + ) + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0] + ipv6_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv6"].split("/")[0] + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + DEFAULT_ROUTE_NXT_HOP = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_nxt_hop} + + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP[addr_type], + "protocol": "static", + } + ] + } + } + result = verify_bgp_rib( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + result = verify_fib_routes( + tgen, + addr_type, + "r2", + static_routes_input, + next_hop=DEFAULT_ROUTE_NXT_HOP[addr_type], + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + write_test_footer(tc_name) + +def test_verify_default_originate_after_removing_default_originate_p1(request): + """Verify BGP default route after removing default-originate""" + + tgen = get_topogen() + global BGP_CONVERGENCE + global topo + # test case name + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + check_router_status(tgen) + reset_config_on_routers(tgen) + + if BGP_CONVERGENCE != True: + pytest.skip("skipped because of BGP Convergence failure") + + step("Configure EBGP between R0 to R1 and IBGP between R1 to R2") + step("Configure EBGP between R2 to R3 and IBGP between R3 to R4") + r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"] + r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"] + r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"] + r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"] + r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"] + input_dict = { + "r0": { + "bgp": { + "local_as": r0_local_as, + } + }, + "r1": { + "bgp": { + "local_as": 2000, + } + }, + "r2": { + "bgp": { + "local_as": 2000, + } + }, + "r3": { + "bgp": { + "local_as": 5000, + } + }, + "r4": { + "bgp": { + "local_as": 5000, + } + }, + } + result = modify_as_number(tgen, topo, input_dict) + try: + assert result is True + except AssertionError: + logger.info("Expected behaviour: {}".format(result)) + logger.info("BGP config is not created because of invalid ASNs") + step("After changing the BGP AS Path Verify the BGP Convergence") + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format( + BGP_CONVERGENCE + ) + + step("Configure IPv4 and IPv6 static route on R0 and R4") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r0": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + } + ] + }, + "r4": { + "static_routes": [ + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + } + ] + }, + } + result = create_static_routes(tgen, static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("verify IPv4 and IPv6 static route are configured and up on R0 and R4") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r0": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + } + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r0", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r4": { + "static_routes": [ + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + } + ] + } + } + result = verify_fib_routes(tgen, addr_type, "r4", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + step( + "Configure redistribute connected and static on R0 (R0-R1) on R4 ( R4-R3) IPv4 and IPv6 address family " + ) + redistribute_static = { + "r0": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + }, + } + } + }, + "r4": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + }, + } + } + }, + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": {"redistribute": [{"redist_type": "connected"}]} + }, + "ipv6": { + "unicast": {"redistribute": [{"redist_type": "connected"}]} + }, + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": {"redistribute": [{"redist_type": "connected"}]} + }, + "ipv6": { + "unicast": {"redistribute": [{"redist_type": "connected"}]} + }, + } + } + }, + } + result = create_router_bgp(tgen, topo, redistribute_static) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("verify IPv4 and IPv6 static route are configured and up on R1 and R3") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r1": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [R0_NETWORK_LOOPBACK[addr_type]], + "next_hop": R0_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R0_NETWORK_CONNECTED[addr_type]], + "next_hop": R0_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_LOOPBACK[addr_type]], + "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_CONNECTED[addr_type]], + "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + ] + } + } + + result = verify_fib_routes(tgen, addr_type, "r1", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + result = verify_bgp_rib(tgen, addr_type, "r1", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("verify IPv4 and IPv6 static route are configured and up on R1 and R3") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r3": { + "static_routes": [ + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [R3_NETWORK_LOOPBACK[addr_type]], + "next_hop": R3_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R3_NETWORK_CONNECTED[addr_type]], + "next_hop": R3_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R4_NETWORK_LOOPBACK[addr_type]], + "next_hop": R4_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R4_NETWORK_CONNECTED[addr_type]], + "next_hop": R4_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + ] + } + } + + result = verify_fib_routes(tgen, addr_type, "r3", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib(tgen, addr_type, "r3", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "Configure default-originate on R1 for R1 to R2 neighbor for IPv4 and IPv6 peer " + ) + local_as = get_dut_as_number(tgen, dut="r1") + default_originate_config = { + "r1": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": {"unicast": {"default_originate": {"r2": {}}}}, + "ipv6": {"unicast": {"default_originate": {"r2": {}}}}, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + step( + "Verify all the static , connected and loopback routes from R0,R1,R3 and R4 is receieved on R2 " + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [R0_NETWORK_LOOPBACK[addr_type]], + "next_hop": R0_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R0_NETWORK_CONNECTED[addr_type]], + "next_hop": R0_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_LOOPBACK[addr_type]], + "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_CONNECTED[addr_type]], + "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R3_NETWORK_LOOPBACK[addr_type]], + "next_hop": R3_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R3_NETWORK_CONNECTED[addr_type]], + "next_hop": R3_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R4_NETWORK_LOOPBACK[addr_type]], + "next_hop": R4_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R4_NETWORK_CONNECTED[addr_type]], + "next_hop": R4_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + ] + } + } + + result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Verify the Default Originate on R2 nexthop as R1") + + interface = topo["routers"]["r1"]["links"]["r2"]["interface"] + ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r1", intf=interface) + ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0] + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + DEFAULT_ROUTE_NXT_HOP = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local} + + result = verify_rib_default_route( + tgen, + topo, + dut="r2", + routes=DEFAULT_ROUTES, + expected_nexthop=DEFAULT_ROUTE_NXT_HOP, + expected=True, + ) + assert ( + result is True + ), "Testcase {} : Failed \n Error: After Deactivating the BGP neighbor the default route is expected but found in RIB -> {}".format( + tc_name, result + ) + + result = verify_fib_default_route( + tgen, + topo, + dut="r2", + routes=DEFAULT_ROUTES, + expected_nexthop=DEFAULT_ROUTE_NXT_HOP, + expected=True, + ) + assert ( + result is True + ), "Testcase {} : Failed \n Error: After Deactivating the BGP neighbor the default route is expected but found in FIB -> {}".format( + tc_name, result + ) + + step( + "Configure default-originate on R3 for R3 to R2 neighbor for IPv4 and IPv6 peer " + ) + local_as = get_dut_as_number(tgen, dut="r3") + default_originate_config = { + "r3": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": {"unicast": {"default_originate": {"r2": {}}}}, + "ipv6": {"unicast": {"default_originate": {"r2": {}}}}, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + STEP = """After configuring the Default Originate From R3 --> R2 + Both Default routes from R1 and R3 Should present in R2 BGP RIB + The Deafult Route from iBGP is prefferedover EBGP thus + Default Route From R1->r2 should only present in R2 FIB """ + step(STEP) + + interface = topo["routers"]["r3"]["links"]["r2"]["interface"] + ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r3", intf=interface) + ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0] + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + DEFAULT_ROUTE_NXT_HOP = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local} + result = verify_fib_default_route( + tgen, + topo, + dut="r2", + routes=DEFAULT_ROUTES, + expected_nexthop=DEFAULT_ROUTE_NXT_HOP, + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n Error: Only IBGP default originate is expected in FIB over EBGP {}".format( + tc_name, result + ) + + result = verify_rib_default_route( + tgen, + topo, + dut="r2", + routes=DEFAULT_ROUTES, + expected_nexthop=DEFAULT_ROUTE_NXT_HOP, + expected=True, + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "No change on static and connected routes which got advertised from R0, R1, R3 and R4" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [R0_NETWORK_LOOPBACK[addr_type]], + "next_hop": R0_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R0_NETWORK_CONNECTED[addr_type]], + "next_hop": R0_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_LOOPBACK[addr_type]], + "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_CONNECTED[addr_type]], + "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R3_NETWORK_LOOPBACK[addr_type]], + "next_hop": R3_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R3_NETWORK_CONNECTED[addr_type]], + "next_hop": R3_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R4_NETWORK_LOOPBACK[addr_type]], + "next_hop": R4_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R4_NETWORK_CONNECTED[addr_type]], + "next_hop": R4_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + ] + } + } + + result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + " Remove default-originate on R1 for R1 to R2 neighbor for IPv4 and IPv6 peer " + ) + local_as = get_dut_as_number(tgen, dut="r1") + default_originate_config = { + "r1": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": { + "unicast": {"default_originate": {"r2": {"delete": True}}} + }, + "ipv6": { + "unicast": {"default_originate": {"r2": {"delete": True}}} + }, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Verify the Default Originate reoute from R1 to r2 is removed in R2 ") + interface = topo["routers"]["r1"]["links"]["r2"]["interface"] + ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r1", intf=interface) + ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0] + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + DEFAULT_ROUTE_NXT_HOP = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local} + result = verify_fib_default_route( + tgen, + topo, + dut="r2", + routes=DEFAULT_ROUTES, + expected_nexthop=DEFAULT_ROUTE_NXT_HOP, + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n After removing the default originate the route should not be present in FIB \n Error: {}".format( + tc_name, result + ) + + result = verify_rib_default_route( + tgen, + topo, + dut="r2", + routes=DEFAULT_ROUTES, + expected_nexthop=DEFAULT_ROUTE_NXT_HOP, + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n After removing the default originate the route should not be present in RIB \n Error: {}".format( + tc_name, result + ) + + NOTE = """ after removing the Default originate from R1-->R2 + Verify the BGP Default route received from R3 is present in both BGP RIB and FIB on R2 + """ + interface = topo["routers"]["r3"]["links"]["r2"]["interface"] + ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r3", intf=interface) + ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0] + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + DEFAULT_ROUTE_NXT_HOP = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local} + result = verify_fib_default_route( + tgen, + topo, + dut="r2", + routes=DEFAULT_ROUTES, + expected_nexthop=DEFAULT_ROUTE_NXT_HOP, + expected=True, + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + result = verify_rib_default_route( + tgen, + topo, + dut="r2", + routes=DEFAULT_ROUTES, + expected_nexthop=DEFAULT_ROUTE_NXT_HOP, + expected=True, + ) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "No change on static and connected routes which got advertised from R0, R1, R3 and R4" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [R0_NETWORK_LOOPBACK[addr_type]], + "next_hop": R0_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R0_NETWORK_CONNECTED[addr_type]], + "next_hop": R0_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_LOOPBACK[addr_type]], + "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_CONNECTED[addr_type]], + "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R3_NETWORK_LOOPBACK[addr_type]], + "next_hop": R3_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R3_NETWORK_CONNECTED[addr_type]], + "next_hop": R3_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R4_NETWORK_LOOPBACK[addr_type]], + "next_hop": R4_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R4_NETWORK_CONNECTED[addr_type]], + "next_hop": R4_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + ] + } + } + + result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "Remove default-originate on R3 for R3 to R2 neighbor for IPv4 and IPv6 peer " + ) + local_as = get_dut_as_number(tgen, dut="r3") + default_originate_config = { + "r3": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": { + "unicast": {"default_originate": {"r2": {"delete": True}}} + }, + "ipv6": { + "unicast": {"default_originate": {"r2": {"delete": True}}} + }, + }, + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "After removing default originate , verify default IPv4 and IPv6 BGP routes removed on R2 from R1 ( next-hop as R3) " + ) + interface = topo["routers"]["r3"]["links"]["r2"]["interface"] + ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r3", intf=interface) + ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0] + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + DEFAULT_ROUTE_NXT_HOP = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local} + result = verify_fib_default_route( + tgen, + topo, + dut="r2", + routes=DEFAULT_ROUTES, + expected_nexthop=DEFAULT_ROUTE_NXT_HOP, + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n After removing the default originate the route should not be present in FIB \n Error: {}".format( + tc_name, result + ) + + result = verify_rib_default_route( + tgen, + topo, + dut="r2", + routes=DEFAULT_ROUTES, + expected_nexthop=DEFAULT_ROUTE_NXT_HOP, + expected=False, + ) + assert ( + result is not True + ), "Testcase {} : Failed \n After removing the default originate the route should not be present in RIB \n Error: {}".format( + tc_name, result + ) + step( + "No change on static and connected routes which got advertised from R0, R1, R3 and R4" + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [NETWORK2_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + }, + { + "network": [R0_NETWORK_LOOPBACK[addr_type]], + "next_hop": R0_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R0_NETWORK_CONNECTED[addr_type]], + "next_hop": R0_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_LOOPBACK[addr_type]], + "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R1_NETWORK_CONNECTED[addr_type]], + "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R3_NETWORK_LOOPBACK[addr_type]], + "next_hop": R3_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R3_NETWORK_CONNECTED[addr_type]], + "next_hop": R3_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + { + "network": [R4_NETWORK_LOOPBACK[addr_type]], + "next_hop": R4_NETWORK_LOOPBACK_NXTHOP[addr_type], + }, + { + "network": [R4_NETWORK_CONNECTED[addr_type]], + "next_hop": R4_NETWORK_CONNECTED_NXTHOP[addr_type], + }, + ] + } + } + + result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + write_test_footer(tc_name) + +def test_verify_default_originate_route_with_GR_p1(request): + """ "Verify default-originate route with GR " + """ + tgen = get_topogen() + global BGP_CONVERGENCE + global topo + # test case name + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + check_router_status(tgen) + reset_config_on_routers(tgen) + + if BGP_CONVERGENCE != True: + pytest.skip("skipped because of BGP Convergence failure") + + + step("Configure IPV4 and IPV6 IBGP between R1 and R2 ") + step("Configure IPV4 and IPV6 EBGP between R2 to R3 ") + r0_local_as = topo['routers']['r0']['bgp']['local_as'] + r1_local_as = topo['routers']['r1']['bgp']['local_as'] + r2_local_as = topo['routers']['r2']['bgp']['local_as'] + r3_local_as = topo['routers']['r3']['bgp']['local_as'] + r4_local_as = topo['routers']['r4']['bgp']['local_as'] + input_dict = { + "r0": { + "bgp": { + "local_as": r0_local_as, + } + }, + "r1": { + "bgp": { + "local_as": 1000, + } + }, + "r2": { + "bgp": { + "local_as": 1000, + } + }, + "r3": { + "bgp": { + "local_as": r3_local_as, + } + }, + "r4": { + "bgp": { + "local_as": r4_local_as, + } + }, + } + result = modify_as_number(tgen, topo, input_dict) + try: + assert result is True + except AssertionError: + logger.info("Expected behaviour: {}".format(result)) + logger.info("BGP config is not created because of invalid ASNs") + step("After changing the BGP AS Path Verify the BGP Convergence") + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format( + BGP_CONVERGENCE + ) + + step( + "Configure per peer Graceful restart on R2 ( restarting router) and R3 helper router " + ) + input_dict = { + "r2": { + "bgp": { + "local_as": get_dut_as_number(tgen, "r2"), + "graceful-restart": { + "graceful-restart": True, + "preserve-fw-state": True, + }, + } + }, + "r3": { + "bgp": { + "local_as": get_dut_as_number(tgen, "r3"), + "graceful-restart": {"graceful-restart-helper": True}, + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r2", peer="r3") + + step("verify Graceful restart at R2") + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r2", peer="r3" + ) + assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + + step( + "Configure default-originate on R1 for R1-R2 neighbor for IPv4 and IPv6 BGP peers " + ) + local_as = get_dut_as_number(tgen, dut="r1") + default_originate_config = { + "r1": { + "bgp": { + "local_as": local_as, + "address_family": { + "ipv4": { + "unicast": { + "default_originate":{ + "r2":{ + + } + + } + + } + }, "ipv6": { + "unicast": { + "default_originate":{ + "r2":{ + + } + + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, default_originate_config) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + step( + "R2 received default-originate routes and advertised it to R3 , verify on R2 and R3" + ) + DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"} + step( + "After configuring default-originate command , verify default routes are advertised on R2 " + ) + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + } + ] + } + } + + result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input,next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type]) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input,next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type]) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + + step(" Kill BGPd session on R2") + kill_router_daemons(tgen, "r2", ["bgpd"]) + start_router_daemons(tgen, "r2", ["bgpd"]) + + step("verify default route is relearned after clear bgp on R2 on BGP RIB and") + for addr_type in ADDR_TYPES: + static_routes_input = { + "r2": { + "static_routes": [ + { + "network": [DEFAULT_ROUTES[addr_type]], + "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type], + } + ] + } + } + + result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input,next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type]) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input,next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type]) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + write_test_footer(tc_name) + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) diff --git a/tests/topotests/ldp_topo1/r1/show_ip_ospf_neighbor.json b/tests/topotests/ldp_topo1/r1/show_ip_ospf_neighbor.json new file mode 100644 index 000000000..d9192f110 --- /dev/null +++ b/tests/topotests/ldp_topo1/r1/show_ip_ospf_neighbor.json @@ -0,0 +1,14 @@ +{ + "neighbors": { + "2.2.2.2": [ + { + "dbSummaryCounter": 0, + "retransmitCounter": 0, + "priority": 1, + "converged": "Full", + "address": "10.0.1.2", + "requestCounter": 0 + } + ] + } +} diff --git a/tests/topotests/ldp_topo1/r2/show_ip_ospf_neighbor.json b/tests/topotests/ldp_topo1/r2/show_ip_ospf_neighbor.json new file mode 100644 index 000000000..ea78592bd --- /dev/null +++ b/tests/topotests/ldp_topo1/r2/show_ip_ospf_neighbor.json @@ -0,0 +1,42 @@ +{ + "neighbors": { + "1.1.1.1": [ + { + "dbSummaryCounter": 0, + "retransmitCounter": 0, + "priority": 1, + "converged": "Full", + "address": "10.0.1.1", + "requestCounter": 0 + } + ], + "3.3.3.3": [ + { + "dbSummaryCounter": 0, + "retransmitCounter": 0, + "priority": 1, + "converged": "Full", + "address": "10.0.2.3", + "requestCounter": 0 + }, + { + "dbSummaryCounter": 0, + "retransmitCounter": 0, + "priority": 1, + "converged": "Full", + "address": "10.0.3.3", + "requestCounter": 0 + } + ], + "4.4.4.4": [ + { + "dbSummaryCounter": 0, + "retransmitCounter": 0, + "priority": 1, + "converged": "Full", + "address": "10.0.2.4", + "requestCounter": 0 + } + ] + } +} diff --git a/tests/topotests/ldp_topo1/r3/show_ip_ospf_neighbor.json b/tests/topotests/ldp_topo1/r3/show_ip_ospf_neighbor.json new file mode 100644 index 000000000..d3c50247e --- /dev/null +++ b/tests/topotests/ldp_topo1/r3/show_ip_ospf_neighbor.json @@ -0,0 +1,32 @@ +{ + "neighbors": { + "2.2.2.2": [ + { + "dbSummaryCounter": 0, + "retransmitCounter": 0, + "priority": 1, + "converged": "Full", + "address": "10.0.2.2", + "requestCounter": 0 + }, + { + "dbSummaryCounter": 0, + "retransmitCounter": 0, + "priority": 1, + "converged": "Full", + "address": "10.0.3.2", + "requestCounter": 0 + } + ], + "4.4.4.4": [ + { + "dbSummaryCounter": 0, + "retransmitCounter": 0, + "priority": 1, + "converged": "Full", + "address": "10.0.2.4", + "requestCounter": 0 + } + ] + } +} diff --git a/tests/topotests/ldp_topo1/r4/show_ip_ospf_neighbor.json b/tests/topotests/ldp_topo1/r4/show_ip_ospf_neighbor.json new file mode 100644 index 000000000..20751a288 --- /dev/null +++ b/tests/topotests/ldp_topo1/r4/show_ip_ospf_neighbor.json @@ -0,0 +1,24 @@ +{ + "neighbors": { + "2.2.2.2": [ + { + "dbSummaryCounter": 0, + "retransmitCounter": 0, + "priority": 1, + "converged": "Full", + "address": "10.0.2.2", + "requestCounter": 0 + } + ], + "3.3.3.3": [ + { + "dbSummaryCounter": 0, + "retransmitCounter": 0, + "priority": 1, + "converged": "Full", + "address": "10.0.2.3", + "requestCounter": 0 + } + ] + } +} diff --git a/tests/topotests/ldp_topo1/test_ldp_topo1.py b/tests/topotests/ldp_topo1/test_ldp_topo1.py index 8d6978723..cb8adfb55 100644 --- a/tests/topotests/ldp_topo1/test_ldp_topo1.py +++ b/tests/topotests/ldp_topo1/test_ldp_topo1.py @@ -63,9 +63,15 @@ import os import re import sys import pytest +import json +from functools import partial from time import sleep from lib.topolog import logger +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) + sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from lib import topotest from lib.topogen import Topogen, get_topogen @@ -104,6 +110,29 @@ def build_topo(tgen): ##################################################### ## +## Helper functions +## +##################################################### + + +def router_compare_json_output(rname, command, reference, count=60, wait=1): + "Compare router JSON output" + + logger.info('Comparing router "%s" "%s" output', rname, command) + + tgen = get_topogen() + filename = "{}/{}/{}".format(CWD, rname, reference) + expected = json.loads(open(filename).read()) + + # Run test function until we get an result. + test_func = partial(topotest.router_json_cmp, tgen.gears[rname], command, expected) + _, diff = topotest.run_and_expect(test_func, None, count, wait) + assertmsg = '"{}" JSON output mismatches the expected result'.format(rname) + assert diff is None, assertmsg + + +##################################################### +## ## Tests starting ## ##################################################### @@ -218,6 +247,19 @@ def test_mpls_interfaces(): assert fatal_error == "", fatal_error +def test_ospf_convergence(): + logger.info("Test: check OSPF adjacencies") + + # Skip if previous fatal error condition is raised + if fatal_error != "": + pytest.skip(fatal_error) + + for rname in ["r1", "r2", "r3", "r4"]: + router_compare_json_output( + rname, "show ip ospf neighbor json", "show_ip_ospf_neighbor.json" + ) + + def test_mpls_ldp_neighbor_establish(): global fatal_error net = get_topogen().net @@ -510,9 +552,10 @@ def test_mpls_ldp_binding(): else: print("r%s ok" % i) - assert ( - failures == 0 - ), "MPLS LDP Interface binding output for router r%s:\n%s" % (i, diff) + assert failures == 0, "MPLS LDP binding output for router r%s:\n%s" % ( + i, + diff, + ) # Make sure that all daemons are running for i in range(1, 5): diff --git a/tests/topotests/ldp_vpls_topo1/r1/ldpd.conf b/tests/topotests/ldp_vpls_topo1/r1/ldpd.conf index 594ec5a58..a19e5ccac 100644 --- a/tests/topotests/ldp_vpls_topo1/r1/ldpd.conf +++ b/tests/topotests/ldp_vpls_topo1/r1/ldpd.conf @@ -14,6 +14,7 @@ mpls ldp ! address-family ipv4 discovery transport-address 1.1.1.1 + ttl-security disable label local allocate host-routes ! interface r1-eth1 diff --git a/tests/topotests/ldp_vpls_topo1/r2/ldpd.conf b/tests/topotests/ldp_vpls_topo1/r2/ldpd.conf index ffb4f0974..447b3f140 100644 --- a/tests/topotests/ldp_vpls_topo1/r2/ldpd.conf +++ b/tests/topotests/ldp_vpls_topo1/r2/ldpd.conf @@ -14,6 +14,7 @@ mpls ldp ! address-family ipv4 discovery transport-address 2.2.2.2 + ttl-security disable label local allocate host-routes ! interface r2-eth1 diff --git a/tests/topotests/ldp_vpls_topo1/r3/ldpd.conf b/tests/topotests/ldp_vpls_topo1/r3/ldpd.conf index c95471ffd..ab5147149 100644 --- a/tests/topotests/ldp_vpls_topo1/r3/ldpd.conf +++ b/tests/topotests/ldp_vpls_topo1/r3/ldpd.conf @@ -14,6 +14,7 @@ mpls ldp ! address-family ipv4 discovery transport-address 3.3.3.3 + ttl-security disable label local allocate host-routes ! interface r3-eth1 diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index 4dd44e3e9..216756f51 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -29,6 +29,7 @@ from lib.common_config import ( create_common_configurations, FRRCFG_FILE, InvalidCLIError, + apply_raw_config, check_address_types, find_interface_with_greater_ip, generate_ips, @@ -74,6 +75,12 @@ def create_router_bgp(tgen, topo=None, input_dict=None, build=False, load_config "address_family": { "ipv4": { "unicast": { + "default_originate":{ + "neighbor":"R2", + "add_type":"lo" + "route_map":"rm" + + }, "redistribute": [{ "redist_type": "static", "attribute": { @@ -498,6 +505,12 @@ def __create_bgp_unicast_neighbor( topo, input_dict, router, addr_type, add_neigh ) config_data.extend(neigh_data) + # configure default originate + if "default_originate" in addr_data: + default_originate_config = __create_bgp_default_originate_neighbor( + topo, input_dict, router, addr_type, add_neigh + ) + config_data.extend(default_originate_config) for addr_type, addr_dict in bgp_data.items(): if not addr_dict or not check_address_types(addr_type): @@ -515,6 +528,78 @@ def __create_bgp_unicast_neighbor( return config_data +def __create_bgp_default_originate_neighbor( + topo, input_dict, router, addr_type, add_neigh=True +): + """ + Helper API to create neighbor default - originate configuration + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : json file data + * `input_dict` : Input dict data, required when configuring from testcase + * `router` : router id to be configured + """ + tgen = get_topogen() + config_data = [] + logger.debug("Entering lib API: __create_bgp_default_originate_neighbor()") + + bgp_data = input_dict["address_family"] + neigh_data = bgp_data[addr_type]["unicast"]["default_originate"] + for name, peer_dict in neigh_data.items(): + nh_details = topo[name] + + neighbor_ip = None + if "dest-link" in neigh_data[name]: + dest_link = neigh_data[name]["dest-link"] + neighbor_ip = nh_details["links"][dest_link][addr_type].split("/")[0] + elif "add_type" in neigh_data[name]: + add_type = neigh_data[name]["add_type"] + neighbor_ip = nh_details["links"][add_type][addr_type].split("/")[0] + else: + neighbor_ip = nh_details["links"][router][addr_type].split("/")[0] + + config_data.append("address-family {} unicast".format(addr_type)) + if "route_map" in peer_dict: + route_map = peer_dict["route_map"] + if "delete" in peer_dict: + if peer_dict["delete"]: + config_data.append( + "no neighbor {} default-originate route-map {}".format( + neighbor_ip, route_map + ) + ) + else: + config_data.append( + " neighbor {} default-originate route-map {}".format( + neighbor_ip, route_map + ) + ) + else: + config_data.append( + " neighbor {} default-originate route-map {}".format( + neighbor_ip, route_map + ) + ) + + else: + if "delete" in peer_dict: + if peer_dict["delete"]: + config_data.append( + "no neighbor {} default-originate".format(neighbor_ip) + ) + else: + config_data.append( + "neighbor {} default-originate".format(neighbor_ip) + ) + else: + config_data.append("neighbor {} default-originate".format(neighbor_ip)) + + logger.debug("Exiting lib API: __create_bgp_default_originate_neighbor()") + return config_data + + def __create_l2vpn_evpn_address_family( tgen, topo, input_dict, router, config_data=None ): @@ -4574,3 +4659,876 @@ def verify_tcp_mss(tgen, dut, neighbour, configured_tcp_mss, vrf=None): return "TCP-MSS Mismatch" logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return False + + +def get_dut_as_number(tgen, dut): + """ + API to get the Autonomous Number of the given DUT + + params: + ======= + dut : Device Under test + + returns : + ======= + Success : DUT Autonomous number + Fail : Error message with Boolean False + """ + tgen = get_topogen() + for router, rnode in tgen.routers().items(): + if router == dut: + show_bgp_json = run_frr_cmd(rnode, "sh ip bgp summary json ", isjson=True) + as_number = show_bgp_json["ipv4Unicast"]["as"] + if as_number: + logger.info( + "[dut {}] DUT contains Automnomous number :: {} ".format( + dut, as_number + ) + ) + return as_number + else: + logger.error( + "[dut {}] ERROR....! DUT doesnot contain any Automnomous number ".format( + dut + ) + ) + return False + + +def get_prefix_count_route( + tgen, topo, dut, peer, vrf=None, link=None, sent=None, received=None +): + """ + API to return the prefix count of default originate the given DUT + dut : Device under test + peer : neigbor on which you are expecting the route to be received + + returns : + prefix_count as dict with ipv4 and ipv6 value + """ + # the neighbor IP address can be accessable by finding the neigborship (vice-versa) + + if link: + neighbor_ipv4_address = topo["routers"][peer]["links"][link]["ipv4"] + neighbor_ipv6_address = topo["routers"][peer]["links"][link]["ipv6"] + else: + neighbor_ipv4_address = topo["routers"][peer]["links"][dut]["ipv4"] + neighbor_ipv6_address = topo["routers"][peer]["links"][dut]["ipv6"] + + neighbor_ipv4_address = neighbor_ipv4_address.split("/")[0] + neighbor_ipv6_address = neighbor_ipv6_address.split("/")[0] + prefix_count = {} + tgen = get_topogen() + for router, rnode in tgen.routers().items(): + if router == dut: + + if vrf: + ipv4_cmd = "sh ip bgp vrf {} summary json".format(vrf) + show_bgp_json_ipv4 = run_frr_cmd(rnode, ipv4_cmd, isjson=True) + ipv6_cmd = "sh ip bgp vrf {} ipv6 unicast summary json".format(vrf) + show_bgp_json_ipv6 = run_frr_cmd(rnode, ipv6_cmd, isjson=True) + + prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"]["peers"][ + neighbor_ipv4_address + ]["pfxRcd"] + prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][ + neighbor_ipv6_address + ]["pfxRcd"] + + logger.info( + "The Prefix Count of the [DUT:{} : vrf [{}] ] towards neighbor ipv4 : {} and ipv6 : {} is : {}".format( + dut, + vrf, + neighbor_ipv4_address, + neighbor_ipv6_address, + prefix_count, + ) + ) + return prefix_count + + else: + show_bgp_json_ipv4 = run_frr_cmd( + rnode, "sh ip bgp summary json ", isjson=True + ) + show_bgp_json_ipv6 = run_frr_cmd( + rnode, "sh ip bgp ipv6 unicast summary json ", isjson=True + ) + if received: + prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][ + "peers" + ][neighbor_ipv4_address]["pfxRcd"] + prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][ + neighbor_ipv6_address + ]["pfxRcd"] + + elif sent: + prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][ + "peers" + ][neighbor_ipv4_address]["pfxSnt"] + prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][ + neighbor_ipv6_address + ]["pfxSnt"] + + else: + prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][ + "peers" + ][neighbor_ipv4_address]["pfxRcd"] + prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][ + neighbor_ipv6_address + ]["pfxRcd"] + + logger.info( + "The Prefix Count of the DUT:{} towards neighbor ipv4 : {} and ipv6 : {} is : {}".format( + dut, neighbor_ipv4_address, neighbor_ipv6_address, prefix_count + ) + ) + return prefix_count + else: + logger.error("ERROR...! Unknown dut {} in topolgy".format(dut)) + + +@retry(retry_timeout=5) +def verify_rib_default_route( + tgen, + topo, + dut, + routes, + expected_nexthop, + metric=None, + origin=None, + locPrf=None, + expected_aspath=None, +): + """ + API to verify the the 'Default route" in BGP RIB with the attributes the rout carries (metric , local preference, ) + + param + ===== + dut : device under test + routes : default route with expected nexthop + expected_nexthop : the nexthop that is expected the deafult route + + """ + result = False + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + tgen = get_topogen() + connected_routes = {} + for router, rnode in tgen.routers().items(): + if router == dut: + + ipv4_routes = run_frr_cmd(rnode, "sh ip bgp json", isjson=True) + ipv6_routes = run_frr_cmd(rnode, "sh ip bgp ipv6 unicast json", isjson=True) + is_ipv4_default_attrib_found = False + is_ipv6_default_attrib_found = False + + default_ipv4_route = routes["ipv4"] + default_ipv6_route = "::/0" + ipv4_route_Origin = False + ipv4_route_local_pref = False + ipv4_route_metric = False + + if default_ipv4_route in ipv4_routes["routes"].keys(): + nxt_hop_count = len(ipv4_routes["routes"][default_ipv4_route]) + rib_next_hops = [] + for index in range(nxt_hop_count): + rib_next_hops.append( + ipv4_routes["routes"][default_ipv4_route][index]["nexthops"][0]["ip"] + ) + + for nxt_hop in expected_nexthop.items(): + if nxt_hop[0] == "ipv4": + if nxt_hop[1] in rib_next_hops: + logger.info( + "Default routes [{}] obtained from {} .....PASSED".format( + default_ipv4_route, nxt_hop[1] + ) + ) + else: + logger.error( + "ERROR ...! Default routes [{}] expected is missing {}".format( + default_ipv4_route, nxt_hop[1] + ) + ) + return False + + else: + pass + + if "origin" in ipv4_routes["routes"][default_ipv4_route][0].keys(): + ipv4_route_Origin = ipv4_routes["routes"][default_ipv4_route][0]["origin"] + if "locPrf" in ipv4_routes["routes"][default_ipv4_route][0].keys(): + ipv4_route_local_pref = ipv4_routes["routes"][default_ipv4_route][0][ + "locPrf" + ] + if "metric" in ipv4_routes["routes"][default_ipv4_route][0].keys(): + ipv4_route_metric = ipv4_routes["routes"][default_ipv4_route][0]["metric"] + else: + logger.error("ERROR [ DUT {}] : The Default Route Not found in RIB".format(dut)) + return False + + origin_found = False + locPrf_found = False + metric_found = False + as_path_found = False + + if origin: + if origin == ipv4_route_Origin: + logger.info( + "Dafault Route {} expected origin {} Found in RIB....PASSED".format( + default_ipv4_route, origin + ) + ) + origin_found = True + else: + logger.error( + "ERROR... IPV4::! Expected Origin is {} obtained {}".format( + origin, ipv4_route_Origin + ) + ) + return False + else: + origin_found = True + + if locPrf: + if locPrf == ipv4_route_local_pref: + logger.info( + "Dafault Route {} expected local preference {} Found in RIB....PASSED".format( + default_ipv4_route, locPrf + ) + ) + locPrf_found = True + else: + logger.error( + "ERROR... IPV4::! Expected Local preference is {} obtained {}".format( + locPrf, ipv4_route_local_pref + ) + ) + return False + else: + locPrf_found = True + + if metric: + if metric == ipv4_route_metric: + logger.info( + "Dafault Route {} expected metric {} Found in RIB....PASSED".format( + default_ipv4_route, metric + ) + ) + + metric_found = True + else: + logger.error( + "ERROR... IPV4::! Expected metric is {} obtained {}".format( + metric, ipv4_route_metric + ) + ) + return False + else: + metric_found = True + + if expected_aspath: + obtained_aspath = ipv4_routes["routes"]["0.0.0.0/0"][0]["path"] + if expected_aspath in obtained_aspath: + as_path_found = True + logger.info( + "Dafault Route {} expected AS path {} Found in RIB....PASSED".format( + default_ipv4_route, expected_aspath + ) + ) + else: + logger.error( + "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format( + expected_aspath, obtained_aspath + ) + ) + return False + else: + as_path_found = True + + if origin_found and locPrf_found and metric_found and as_path_found: + is_ipv4_default_attrib_found = True + logger.info( + "IPV4:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format( + origin, locPrf, metric, expected_aspath + ) + ) + else: + is_ipv4_default_attrib_found = False + logger.error( + "IPV4:: Expected origin ['{}'] Obtained [{}]".format( + origin, ipv4_route_Origin + ) + ) + logger.error( + "IPV4:: Expected locPrf ['{}'] Obtained [{}]".format( + locPrf, ipv4_route_local_pref + ) + ) + logger.error( + "IPV4:: Expected metric ['{}'] Obtained [{}]".format( + metric, ipv4_route_metric + ) + ) + logger.error( + "IPV4:: Expected metric ['{}'] Obtained [{}]".format( + expected_aspath, obtained_aspath + ) + ) + + route_Origin = False + route_local_pref = False + route_local_metric = False + default_ipv6_route = "" + try: + ipv6_routes["routes"]["0::0/0"] + default_ipv6_route = "0::0/0" + except: + ipv6_routes["routes"]["::/0"] + default_ipv6_route = "::/0" + if default_ipv6_route in ipv6_routes["routes"].keys(): + nxt_hop_count = len(ipv6_routes["routes"][default_ipv6_route]) + rib_next_hops = [] + for index in range(nxt_hop_count): + rib_next_hops.append( + ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][0]["ip"] + ) + try: + rib_next_hops.append( + ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][1][ + "ip" + ] + ) + except (KeyError, IndexError) as e: + logger.error("NO impact ..! Global IPV6 Address not found ") + + for nxt_hop in expected_nexthop.items(): + if nxt_hop[0] == "ipv6": + if nxt_hop[1] in rib_next_hops: + logger.info( + "Default routes [{}] obtained from {} .....PASSED".format( + default_ipv6_route, nxt_hop[1] + ) + ) + else: + logger.error( + "ERROR ...! Default routes [{}] expected from {} obtained {}".format( + default_ipv6_route, nxt_hop[1], rib_next_hops + ) + ) + return False + + else: + pass + if "origin" in ipv6_routes["routes"][default_ipv6_route][0].keys(): + route_Origin = ipv6_routes["routes"][default_ipv6_route][0]["origin"] + if "locPrf" in ipv6_routes["routes"][default_ipv6_route][0].keys(): + route_local_pref = ipv6_routes["routes"][default_ipv6_route][0]["locPrf"] + if "metric" in ipv6_routes["routes"][default_ipv6_route][0].keys(): + route_local_metric = ipv6_routes["routes"][default_ipv6_route][0]["metric"] + + origin_found = False + locPrf_found = False + metric_found = False + as_path_found = False + + if origin: + if origin == route_Origin: + logger.info( + "Dafault Route {} expected origin {} Found in RIB....PASSED".format( + default_ipv6_route, route_Origin + ) + ) + origin_found = True + else: + logger.error( + "ERROR... IPV6::! Expected Origin is {} obtained {}".format( + origin, route_Origin + ) + ) + return False + else: + origin_found = True + + if locPrf: + if locPrf == route_local_pref: + logger.info( + "Dafault Route {} expected Local Preference {} Found in RIB....PASSED".format( + default_ipv6_route, route_local_pref + ) + ) + locPrf_found = True + else: + logger.error( + "ERROR... IPV6::! Expected Local Preference is {} obtained {}".format( + locPrf, route_local_pref + ) + ) + return False + else: + locPrf_found = True + + if metric: + if metric == route_local_metric: + logger.info( + "Dafault Route {} expected metric {} Found in RIB....PASSED".format( + default_ipv4_route, metric + ) + ) + + metric_found = True + else: + logger.error( + "ERROR... IPV6::! Expected metric is {} obtained {}".format( + metric, route_local_metric + ) + ) + return False + else: + metric_found = True + + if expected_aspath: + obtained_aspath = ipv6_routes["routes"]["::/0"][0]["path"] + if expected_aspath in obtained_aspath: + as_path_found = True + logger.info( + "Dafault Route {} expected AS path {} Found in RIB....PASSED".format( + default_ipv4_route, expected_aspath + ) + ) + else: + logger.error( + "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format( + expected_aspath, obtained_aspath + ) + ) + return False + else: + as_path_found = True + + if origin_found and locPrf_found and metric_found and as_path_found: + is_ipv6_default_attrib_found = True + logger.info( + "IPV6:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format( + origin, locPrf, metric, expected_aspath + ) + ) + else: + is_ipv6_default_attrib_found = False + logger.error( + "IPV6:: Expected origin ['{}'] Obtained [{}]".format(origin, route_Origin) + ) + logger.error( + "IPV6:: Expected locPrf ['{}'] Obtained [{}]".format( + locPrf, route_local_pref + ) + ) + logger.error( + "IPV6:: Expected metric ['{}'] Obtained [{}]".format( + metric, route_local_metric + ) + ) + logger.error( + "IPV6:: Expected metric ['{}'] Obtained [{}]".format( + expected_aspath, obtained_aspath + ) + ) + + if is_ipv4_default_attrib_found and is_ipv6_default_attrib_found: + logger.info("The attributes are found for default route in RIB ") + return True + else: + return False + + +@retry(retry_timeout=5) +def verify_fib_default_route(tgen, topo, dut, routes, expected_nexthop): + """ + API to verify the the 'Default route" in FIB + + param + ===== + dut : device under test + routes : default route with expected nexthop + expected_nexthop : the nexthop that is expected the deafult route + + """ + result = False + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + tgen = get_topogen() + connected_routes = {} + for router, rnode in tgen.routers().items(): + if router == dut: + ipv4_routes = run_frr_cmd(rnode, "sh ip route json", isjson=True) + ipv6_routes = run_frr_cmd(rnode, "sh ipv6 route json", isjson=True) + + is_ipv4_default_route_found = False + is_ipv6_default_route_found = False + if routes["ipv4"] in ipv4_routes.keys(): + rib_ipv4_nxt_hops = [] + ipv4_default_route = routes["ipv4"] + nxt_hop_count = len(ipv4_routes[ipv4_default_route][0]["nexthops"]) + for index in range(nxt_hop_count): + rib_ipv4_nxt_hops.append( + ipv4_routes[ipv4_default_route][0]["nexthops"][index]["ip"] + ) + + if expected_nexthop["ipv4"] in rib_ipv4_nxt_hops: + is_ipv4_default_route_found = True + logger.info( + "{} default route with next hop {} is found in FIB ".format( + ipv4_default_route, expected_nexthop + ) + ) + else: + logger.error( + "ERROR .. ! {} default route with next hop {} is not found in FIB ".format( + ipv4_default_route, expected_nexthop + ) + ) + return False + + if routes["ipv6"] in ipv6_routes.keys() or "::/0" in ipv6_routes.keys(): + rib_ipv6_nxt_hops = [] + if "::/0" in ipv6_routes.keys(): + ipv6_default_route = "::/0" + elif routes["ipv6"] in ipv6_routes.keys(): + ipv6_default_route = routes["ipv6"] + + nxt_hop_count = len(ipv6_routes[ipv6_default_route][0]["nexthops"]) + for index in range(nxt_hop_count): + rib_ipv6_nxt_hops.append( + ipv6_routes[ipv6_default_route][0]["nexthops"][index]["ip"] + ) + + if expected_nexthop["ipv6"] in rib_ipv6_nxt_hops: + is_ipv6_default_route_found = True + logger.info( + "{} default route with next hop {} is found in FIB ".format( + ipv6_default_route, expected_nexthop + ) + ) + else: + logger.error( + "ERROR .. ! {} default route with next hop {} is not found in FIB ".format( + ipv6_default_route, expected_nexthop + ) + ) + return False + + if is_ipv4_default_route_found and is_ipv6_default_route_found: + return True + else: + logger.error( + "Default Route for ipv4 and ipv6 address family is not found in FIB " + ) + return False + + +@retry(retry_timeout=5) +def verify_bgp_advertised_routes_from_neighbor(tgen, topo, dut, peer, expected_routes): + """ + APi is verifies the the routes that are advertised from dut to peer + + command used : + "sh ip bgp neighbor <x.x.x.x> advertised-routes" and + "sh ip bgp ipv6 unicast neighbor<x::x> advertised-routes" + + dut : Device Under Tests + Peer : Peer on which the routs is expected + expected_routes : dual stack IPV4-and IPv6 routes to be verified + expected_routes + + returns: True / False + + """ + result = False + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + tgen = get_topogen() + + peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0] + peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0] + + for router, rnode in tgen.routers().items(): + if router == dut: + ipv4_receieved_routes = run_frr_cmd( + rnode, + "sh ip bgp neighbor {} advertised-routes json".format( + peer_ipv4_neighbor_ip + ), + isjson=True, + ) + ipv6_receieved_routes = run_frr_cmd( + rnode, + "sh ip bgp ipv6 unicast neighbor {} advertised-routes json".format( + peer_ipv6_neighbor_ip + ), + isjson=True, + ) + ipv4_route_count = 0 + ipv6_route_count = 0 + if ipv4_receieved_routes: + for index in range(len(expected_routes["ipv4"])): + if ( + expected_routes["ipv4"][index]["network"] + in ipv4_receieved_routes["advertisedRoutes"].keys() + ): + ipv4_route_count += 1 + logger.info( + "Success [DUT : {}] The Expected Route {} is advertised to {} ".format( + dut, expected_routes["ipv4"][index]["network"], peer + ) + ) + + elif ( + expected_routes["ipv4"][index]["network"] + in ipv4_receieved_routes["bgpOriginatingDefaultNetwork"] + ): + ipv4_route_count += 1 + logger.info( + "Success [DUT : {}] The Expected Route {} is advertised to {} ".format( + dut, expected_routes["ipv4"][index]["network"], peer + ) + ) + + else: + logger.error( + "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format( + dut, expected_routes["ipv4"][index]["network"], peer + ) + ) + else: + logger.error(ipv4_receieved_routes) + logger.error( + "ERROR...! [DUT : {}] No IPV4 Routes are advertised to the peer {}".format( + dut, peer + ) + ) + return False + + if ipv6_receieved_routes: + for index in range(len(expected_routes["ipv6"])): + if ( + expected_routes["ipv6"][index]["network"] + in ipv6_receieved_routes["advertisedRoutes"].keys() + ): + ipv6_route_count += 1 + logger.info( + "Success [DUT : {}] The Expected Route {} is advertised to {} ".format( + dut, expected_routes["ipv6"][index]["network"], peer + ) + ) + elif ( + expected_routes["ipv6"][index]["network"] + in ipv6_receieved_routes["bgpOriginatingDefaultNetwork"] + ): + ipv6_route_count += 1 + logger.info( + "Success [DUT : {}] The Expected Route {} is advertised to {} ".format( + dut, expected_routes["ipv6"][index]["network"], peer + ) + ) + else: + logger.error( + "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format( + dut, expected_routes["ipv6"][index]["network"], peer + ) + ) + else: + logger.error(ipv6_receieved_routes) + logger.error( + "ERROR...! [DUT : {}] No IPV6 Routes are advertised to the peer {}".format( + dut, peer + ) + ) + return False + + if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len( + expected_routes["ipv6"] + ): + return True + else: + logger.error( + "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format( + expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"] + ) + ) + logger.error( + "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format( + expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"] + ) + ) + return False + + +@retry(retry_timeout=5) +def verify_bgp_received_routes_from_neighbor(tgen, topo, dut, peer, expected_routes): + """ + API to verify the bgp received routes + + commad used : + ============= + show ip bgp neighbor <x.x.x.x> received-routes + show ip bgp ipv6 unicast neighbor <x::x> received-routes + + params + ======= + dut : Device Under Tests + Peer : Peer on which the routs is expected + expected_routes : dual stack IPV4-and IPv6 routes to be verified + expected_routes + + returns: + ======== + True / False + """ + result = False + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + tgen = get_topogen() + + peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0] + peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0] + + logger.info("Enabling Soft configuration to neighbor INBOUND ") + neigbor_dict = {"ipv4": peer_ipv4_neighbor_ip, "ipv6": peer_ipv6_neighbor_ip} + result = configure_bgp_soft_configuration( + tgen, dut, neigbor_dict, direction="inbound" + ) + assert ( + result is True + ), " Failed to configure the soft configuration \n Error: {}".format(result) + + """sleep of 10 sec is required to get the routes on peer after soft configuration""" + sleep(10) + for router, rnode in tgen.routers().items(): + if router == dut: + ipv4_receieved_routes = run_frr_cmd( + rnode, + "sh ip bgp neighbor {} received-routes json".format( + peer_ipv4_neighbor_ip + ), + isjson=True, + ) + ipv6_receieved_routes = run_frr_cmd( + rnode, + "sh ip bgp ipv6 unicast neighbor {} received-routes json".format( + peer_ipv6_neighbor_ip + ), + isjson=True, + ) + ipv4_route_count = 0 + ipv6_route_count = 0 + if ipv4_receieved_routes: + for index in range(len(expected_routes["ipv4"])): + if ( + expected_routes["ipv4"][index]["network"] + in ipv4_receieved_routes["receivedRoutes"].keys() + ): + ipv4_route_count += 1 + logger.info( + "Success [DUT : {}] The Expected Route {} is received from {} ".format( + dut, expected_routes["ipv4"][index]["network"], peer + ) + ) + else: + logger.error( + "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format( + dut, expected_routes["ipv4"][index]["network"], peer + ) + ) + else: + logger.error(ipv4_receieved_routes) + logger.error( + "ERROR...! [DUT : {}] No IPV4 Routes are received from the peer {}".format( + dut, peer + ) + ) + return False + + if ipv6_receieved_routes: + for index in range(len(expected_routes["ipv6"])): + if ( + expected_routes["ipv6"][index]["network"] + in ipv6_receieved_routes["receivedRoutes"].keys() + ): + ipv6_route_count += 1 + logger.info( + "Success [DUT : {}] The Expected Route {} is received from {} ".format( + dut, expected_routes["ipv6"][index]["network"], peer + ) + ) + else: + logger.error( + "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format( + dut, expected_routes["ipv6"][index]["network"], peer + ) + ) + else: + logger.error(ipv6_receieved_routes) + logger.error( + "ERROR...! [DUT : {}] No IPV6 Routes are received from the peer {}".format( + dut, peer + ) + ) + return False + + if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len( + expected_routes["ipv6"] + ): + return True + else: + logger.error( + "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format( + expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"] + ) + ) + logger.error( + "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format( + expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"] + ) + ) + return False + + +def configure_bgp_soft_configuration(tgen, dut, neighbor_dict, direction): + """ + Api to configure the bgp soft configuration to show the received routes from peer + params + ====== + dut : device under test route on which the sonfiguration to be applied + neighbor_dict : dict element contains ipv4 and ipv6 neigbor ip + direction : Directionon which it should be applied in/out + + returns: + ======== + boolean + """ + logger.info("Enabling Soft configuration to neighbor INBOUND ") + local_as = get_dut_as_number(tgen, dut) + ipv4_neighbor = neighbor_dict["ipv4"] + ipv6_neighbor = neighbor_dict["ipv6"] + direction = direction.lower() + if ipv4_neighbor and ipv4_neighbor: + raw_config = { + dut: { + "raw_config": [ + "router bgp {}".format(local_as), + "address-family ipv4 unicast", + "neighbor {} soft-reconfiguration {} ".format( + ipv4_neighbor, direction + ), + "exit-address-family", + "address-family ipv6 unicast", + "neighbor {} soft-reconfiguration {} ".format( + ipv6_neighbor, direction + ), + "exit-address-family", + ] + } + } + result = apply_raw_config(tgen, raw_config) + logger.info( + "Success... [DUT : {}] The soft configuration onis applied on neighbors {} ".format( + dut, neighbor_dict + ) + ) + return True diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index a7d8b2c9b..0e9e0ba40 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -3365,8 +3365,9 @@ def verify_rib( nh_found = False for st_rt in ip_list: - st_rt = str(ipaddress.ip_network(frr_unicode(st_rt))) - + st_rt = str( + ipaddress.ip_network(frr_unicode(st_rt), strict=False) + ) _addr_type = validate_ip_address(st_rt) if _addr_type != addr_type: continue @@ -3384,6 +3385,9 @@ def verify_rib( next_hop = [next_hop] for mnh in range(0, len(rib_routes_json[st_rt])): + if not "selected" in rib_routes_json[st_rt][mnh]: + continue + if ( "fib" in rib_routes_json[st_rt][mnh]["nexthops"][0] @@ -3528,8 +3532,8 @@ def verify_rib( if nh_found: logger.info( - "[DUT: {}]: Found next_hop {} for all bgp" - " routes in RIB".format(router, next_hop) + "[DUT: {}]: Found next_hop {} for" + " RIB routes: {}".format(router, next_hop, found_routes) ) if len(missing_routes) > 0: @@ -3593,7 +3597,7 @@ def verify_rib( nh_found = False for st_rt in ip_list: - st_rt = str(ipaddress.ip_network(frr_unicode(st_rt))) + st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False)) _addr_type = validate_ip_address(st_rt) if _addr_type != addr_type: @@ -3750,8 +3754,9 @@ def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None): nh_found = False for st_rt in ip_list: - st_rt = str(ipaddress.ip_network(frr_unicode(st_rt))) - + st_rt = str( + ipaddress.ip_network(frr_unicode(st_rt), strict=False) + ) _addr_type = validate_ip_address(st_rt) if _addr_type != addr_type: continue @@ -3855,7 +3860,7 @@ def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None): nh_found = False for st_rt in ip_list: - st_rt = str(ipaddress.ip_network(frr_unicode(st_rt))) + st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False)) _addr_type = validate_ip_address(st_rt) if _addr_type != addr_type: diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py index 254ce4f4e..cd070e08b 100644 --- a/tests/topotests/lib/pim.py +++ b/tests/topotests/lib/pim.py @@ -525,6 +525,9 @@ def verify_pim_neighbors(tgen, topo, dut=None, iface=None, nbr_ip=None, expected if "pim" not in data: continue + if "pim" in data and data["pim"] == "disable": + continue + if "pim" in data and data["pim"] == "enable": local_interface = data["interface"] diff --git a/tests/topotests/multicast_pim_bsm_topo1/test_mcast_pim_bsmp_01.py b/tests/topotests/multicast_pim_bsm_topo1/test_mcast_pim_bsmp_01.py index ceac78d88..4610c5f15 100644 --- a/tests/topotests/multicast_pim_bsm_topo1/test_mcast_pim_bsmp_01.py +++ b/tests/topotests/multicast_pim_bsm_topo1/test_mcast_pim_bsmp_01.py @@ -84,6 +84,7 @@ from lib.common_config import ( run_frr_cmd, required_linux_kernel_version, topo_daemons, + verify_rib, ) from lib.pim import ( @@ -106,6 +107,7 @@ from lib.pim import ( clear_pim_interface_traffic, get_pim_interface_traffic, McastTesterHelper, + verify_pim_neighbors, ) from lib.topolog import logger from lib.topojson import build_config_from_json @@ -180,6 +182,10 @@ def setup_module(mod): # Creating configuration from JSON build_config_from_json(tgen, topo) + # Verify PIM neighbors + result = verify_pim_neighbors(tgen, topo) + assert result is True, " Verify PIM neighbor: Failed Error: {}".format(result) + # XXX Replace this using "with McastTesterHelper()... " in each test if possible. global app_helper app_helper = McastTesterHelper(tgen) @@ -306,6 +312,14 @@ def pre_config_to_bsm(tgen, topo, tc_name, bsr, sender, receiver, fhr, rp, lhr, result = create_static_routes(tgen, input_dict) assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + # Verifying static routes are installed + for dut, _nexthop in zip([fhr, rp, lhr], [next_hop, next_hop_rp, next_hop_lhr]): + input_routes = {dut: input_dict[dut]} + result = verify_rib(tgen, "ipv4", dut, input_routes, _nexthop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + # RP Mapping rp_mapping = topo["routers"][bsr]["bsm"]["bsr_packets"][packet]["rp_mapping"] @@ -328,11 +342,24 @@ def pre_config_to_bsm(tgen, topo, tc_name, bsr, sender, receiver, fhr, rp, lhr, result = create_static_routes(tgen, input_dict) assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + # Verifying static routes are installed + result = verify_rib(tgen, "ipv4", fhr, input_dict, next_hop_fhr) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + input_dict = { lhr: {"static_routes": [{"network": rp_list, "next_hop": next_hop_lhr}]}, } result = create_static_routes(tgen, input_dict) assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + + # Verifying static routes are installed + result = verify_rib(tgen, "ipv4", lhr, input_dict, next_hop_lhr) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + return True @@ -443,6 +470,23 @@ def test_BSR_higher_prefer_ip_p0(request): result = create_static_routes(tgen, input_dict) assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + # Verifying static routes are installed + for dut, _nexthop in zip(["i1", "l1"], [next_hop_rp, next_hop_lhr]): + input_routes = {dut: input_dict[dut]} + result = verify_rib(tgen, "ipv4", dut, input_routes, _nexthop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + for bsr_add, next_hop in zip([BSR1_ADDR, BSR2_ADDR], [NEXT_HOP1, NEXT_HOP2]): + input_routes = { + "f1": {"static_routes": [{"network": bsr_add, "next_hop": next_hop}]} + } + result = verify_rib(tgen, "ipv4", "f1", input_routes, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + # Use scapy to send pre-defined packet from senser to receiver step("Send BSR packet from b1 to FHR") result = scapy_send_bsr_raw_packet(tgen, topo, "b1", "f1", "packet9") @@ -626,6 +670,24 @@ def test_BSR_CRP_with_blackhole_address_p1(request): result = create_static_routes(tgen, input_dict) assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + # Verifying static routes are installed + for dut, _nexthop in zip(["i1", "l1"], [next_hop_rp, next_hop_lhr]): + input_routes = {dut: input_dict[dut]} + result = verify_rib(tgen, "ipv4", dut, input_routes, _nexthop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + input_routes = { + "f1": {"static_routes": [{"network": CRP, "next_hop": next_hop_fhr}]} + } + result = verify_rib(tgen, "ipv4", "f1", input_routes, expected=False) + assert ( + result is not True + ), "Testcase {} : Failed \n " "Route is still present \n Error {}".format( + tc_name, result + ) + # Use scapy to send pre-defined packet from senser to receiver group = topo["routers"]["b1"]["bsm"]["bsr_packets"]["packet9"]["group"] @@ -642,6 +704,10 @@ def test_BSR_CRP_with_blackhole_address_p1(request): result = create_static_routes(tgen, input_dict) assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + # Verifying static routes are installed + result = verify_rib(tgen, "ipv4", "f1", input_dict) + assert result is True, "Testcase {} : Failed \n Error {}".format(tc_name, result) + intf_f1_i1 = topo["routers"]["f1"]["links"]["i1"]["interface"] step("Verify bsm transit count is not increamented" "show ip pim interface traffic") state_dict = {"f1": {intf_f1_i1: ["bsmTx"]}} @@ -694,6 +760,28 @@ def test_BSR_CRP_with_blackhole_address_p1(request): result = create_static_routes(tgen, input_dict) assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + # Verifying static routes are installed + input_dict = { + "f1": {"static_routes": [{"network": BSR1_ADDR, "next_hop": NEXT_HOP1}]} + } + result = verify_rib(tgen, "ipv4", "f1", input_dict, NEXT_HOP1) + assert result is True, "Testcase {} : Failed \n Error {}".format(tc_name, result) + + input_dict = { + "f1": { + "static_routes": [ + {"network": [BSR1_ADDR, CRP], "next_hop": "blackhole", "delete": True} + ] + } + } + result = verify_rib(tgen, "ipv4", "f1", input_dict, expected=False) + assert result is not True, ( + "Testcase {} : Failed \n " + "Routes:[{}, {}] are still present \n Error {}".format( + tc_name, BSR1_ADDR, CRP, result + ) + ) + step("Sending BSR after removing black-hole address for BSR and candidate RP") step("Send BSR packet from b1 to FHR") result = scapy_send_bsr_raw_packet(tgen, topo, "b1", "f1", "packet9") @@ -1614,6 +1702,14 @@ def test_iif_join_state_p0(request): result = create_static_routes(tgen, input_dict) assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + # Verifying static routes are installed + result = verify_rib(tgen, "ipv4", "l1", input_dict, expected=False) + assert ( + result is not True + ), "Testcase {} : Failed \n " "Routes:{} are still present \n Error {}".format( + tc_name, rp_ip, result + ) + # Check RP unreachable step("Check RP unreachability") iif = "Unknown" @@ -1654,6 +1750,10 @@ def test_iif_join_state_p0(request): result = create_static_routes(tgen, input_dict) assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + # Verifying static routes are installed + result = verify_rib(tgen, "ipv4", "l1", input_dict, next_hop_lhr) + assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result) + # Verify that (*,G) installed in mroute again iif = "l1-i1-eth0" result = verify_mroutes(tgen, dut, src_addr, GROUP_ADDRESS, iif, oil) diff --git a/tests/topotests/multicast_pim_bsm_topo2/test_mcast_pim_bsmp_02.py b/tests/topotests/multicast_pim_bsm_topo2/test_mcast_pim_bsmp_02.py index 2d6062bf3..191615cbb 100644 --- a/tests/topotests/multicast_pim_bsm_topo2/test_mcast_pim_bsmp_02.py +++ b/tests/topotests/multicast_pim_bsm_topo2/test_mcast_pim_bsmp_02.py @@ -68,6 +68,7 @@ from lib.common_config import ( run_frr_cmd, required_linux_kernel_version, topo_daemons, + verify_rib, ) from lib.pim import ( @@ -86,6 +87,7 @@ from lib.pim import ( clear_mroute, clear_pim_interface_traffic, McastTesterHelper, + verify_pim_neighbors, ) from lib.topolog import logger from lib.topojson import build_config_from_json @@ -160,6 +162,10 @@ def setup_module(mod): # Creating configuration from JSON build_config_from_json(tgen, topo) + # Verify PIM neighbors + result = verify_pim_neighbors(tgen, topo) + assert result is True, " Verify PIM neighbor: Failed Error: {}".format(result) + # XXX Replace this using "with McastTesterHelper()... " in each test if possible. global app_helper app_helper = McastTesterHelper(tgen) @@ -247,6 +253,14 @@ def pre_config_to_bsm(tgen, topo, tc_name, bsr, sender, receiver, fhr, rp, lhr, result = create_static_routes(tgen, input_dict) assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + # Verifying static routes are installed + for dut, _nexthop in zip([fhr, rp, lhr], [next_hop, next_hop_rp, next_hop_lhr]): + input_routes = {dut: input_dict[dut]} + result = verify_rib(tgen, "ipv4", dut, input_routes, _nexthop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + # Add kernel route for source group = topo["routers"][bsr]["bsm"]["bsr_packets"][packet]["pkt_dst"] bsr_interface = topo["routers"][bsr]["links"][fhr]["interface"] @@ -285,11 +299,24 @@ def pre_config_to_bsm(tgen, topo, tc_name, bsr, sender, receiver, fhr, rp, lhr, result = create_static_routes(tgen, input_dict) assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + # Verifying static routes are installed + result = verify_rib(tgen, "ipv4", fhr, input_dict, next_hop_fhr) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + input_dict = { lhr: {"static_routes": [{"network": rp_list, "next_hop": next_hop_lhr}]}, } result = create_static_routes(tgen, input_dict) assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result) + + # Verifying static routes are installed + result = verify_rib(tgen, "ipv4", lhr, input_dict, next_hop_lhr) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + return True |