summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/topotests/bgp_default_originate/bgp_default_originate_topo1.json294
-rw-r--r--tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_1.py2537
-rw-r--r--tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_2.py2437
-rw-r--r--tests/topotests/ldp_topo1/r1/show_ip_ospf_neighbor.json14
-rw-r--r--tests/topotests/ldp_topo1/r2/show_ip_ospf_neighbor.json42
-rw-r--r--tests/topotests/ldp_topo1/r3/show_ip_ospf_neighbor.json32
-rw-r--r--tests/topotests/ldp_topo1/r4/show_ip_ospf_neighbor.json24
-rw-r--r--tests/topotests/ldp_topo1/test_ldp_topo1.py49
-rw-r--r--tests/topotests/ldp_vpls_topo1/r1/ldpd.conf1
-rw-r--r--tests/topotests/ldp_vpls_topo1/r2/ldpd.conf1
-rw-r--r--tests/topotests/ldp_vpls_topo1/r3/ldpd.conf1
-rw-r--r--tests/topotests/lib/bgp.py958
-rw-r--r--tests/topotests/lib/common_config.py21
-rw-r--r--tests/topotests/lib/pim.py3
-rw-r--r--tests/topotests/multicast_pim_bsm_topo1/test_mcast_pim_bsmp_01.py100
-rw-r--r--tests/topotests/multicast_pim_bsm_topo2/test_mcast_pim_bsmp_02.py27
16 files changed, 6530 insertions, 11 deletions
diff --git a/tests/topotests/bgp_default_originate/bgp_default_originate_topo1.json b/tests/topotests/bgp_default_originate/bgp_default_originate_topo1.json
new file mode 100644
index 000000000..5fae34dd7
--- /dev/null
+++ b/tests/topotests/bgp_default_originate/bgp_default_originate_topo1.json
@@ -0,0 +1,294 @@
+{
+ "address_types": [
+ "ipv4",
+ "ipv6"
+ ],
+ "ipv4base": "192.168.0.0",
+ "ipv4mask": 3024,
+ "ipv6base": "fd00::",
+ "ipv6mask": 64,
+ "link_ip_start": {
+ "ipv4": "192.168.0.0",
+ "v4mask": 24,
+ "ipv6": "fd00::",
+ "v6mask": 64
+ },
+ "lo_prefix": {
+ "ipv4": "1.0.",
+ "v4mask": 32,
+ "ipv6": "2001:db8:f::",
+ "v6mask": 128
+ },
+ "routers": {
+ "r0": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r0": {
+ "keepalivetimer": 1,
+ "holddowntimer": 3
+ }
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r0": {
+ "keepalivetimer": 1,
+ "holddowntimer": 3
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r1": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r0": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "200",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r0": {
+ "dest_link": {
+ "r1": {
+ "keepalivetimer": 1,
+ "holddowntimer": 3
+ }
+ }
+ },
+ "r2": {
+ "dest_link": {
+ "r1": {
+ "keepalivetimer": 1,
+ "holddowntimer": 3
+ }
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r0": {
+ "dest_link": {
+ "r1": {
+ "keepalivetimer": 1,
+ "holddowntimer": 3
+ }
+ }
+ },
+ "r2": {
+ "dest_link": {
+ "r1": {
+ "keepalivetimer": 1,
+ "holddowntimer": 3
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r3": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "300",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {"keepalivetimer": 1,
+ "holddowntimer": 3}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r2": {"keepalivetimer": 1,
+ "holddowntimer": 3}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {"keepalivetimer": 1,
+ "holddowntimer": 3}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r2": {"keepalivetimer": 1,
+ "holddowntimer": 3}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r4": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "400",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {"keepalivetimer": 1,
+ "holddowntimer": 3}
+ }
+ },
+ "r4": {
+ "dest_link": {
+ "r3": {"keepalivetimer": 1,
+ "holddowntimer": 3}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {"keepalivetimer": 1,
+ "holddowntimer": 3}
+ }
+ },
+ "r4": {
+ "dest_link": {
+ "r3": {"keepalivetimer": 1,
+ "holddowntimer": 3}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r4": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r3": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "500",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link": {
+ "r4": {"keepalivetimer": 1,
+ "holddowntimer": 3}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link": {
+ "r4": {"keepalivetimer": 1,
+ "holddowntimer": 3}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_1.py b/tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_1.py
new file mode 100644
index 000000000..ee71ae16e
--- /dev/null
+++ b/tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_1.py
@@ -0,0 +1,2537 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2022 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF")
+# in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+# Shreenidhi A R <rshreenidhi@vmware.com>
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+"""
+Following tests are covered.
+1. Verify BGP default-originate route with IBGP peer
+2. Verify BGP default-originate route with EBGP peer
+3. Verify BGP default route when default-originate configured with route-map over IBGP peer
+4. Verify BGP default route when default-originate configured with route-map over EBGP peer"
+
+"""
+import os
+import sys
+import time
+import pytest
+from time import sleep
+from copy import deepcopy
+from lib.topolog import logger
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib.topogen import Topogen, get_topogen
+from lib.topojson import build_config_from_json
+from lib.topolog import logger
+
+from lib.bgp import (
+ verify_bgp_convergence,
+ verify_graceful_restart,
+ create_router_bgp,
+ verify_router_id,
+ modify_as_number,
+ verify_as_numbers,
+ clear_bgp_and_verify,
+ clear_bgp,
+ verify_bgp_rib,
+ get_prefix_count_route,
+ get_dut_as_number,
+ verify_rib_default_route,
+ verify_fib_default_route,
+ verify_bgp_advertised_routes_from_neighbor,
+ verify_bgp_received_routes_from_neighbor,
+)
+from lib.common_config import (
+ interface_status,
+ verify_prefix_lists,
+ verify_fib_routes,
+ kill_router_daemons,
+ start_router_daemons,
+ shutdown_bringup_interface,
+ step,
+ required_linux_kernel_version,
+ stop_router,
+ start_router,
+ create_route_maps,
+ create_prefix_lists,
+ get_frr_ipv6_linklocal,
+ start_topology,
+ write_test_header,
+ check_address_types,
+ write_test_footer,
+ reset_config_on_routers,
+ create_static_routes,
+ check_router_status,
+ delete_route_maps,
+)
+
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+sys.path.append(os.path.join(CWD, "../lib/"))
+
+# Required to instantiate the topology builder class.
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+
+# Global variables
+topo = None
+KEEPALIVETIMER = 1
+HOLDDOWNTIMER = 3
+# Global variables
+NETWORK1_1 = {"ipv4": "1.1.1.1/32", "ipv6": "1::1/128"}
+NETWORK1_2 = {"ipv4": "1.1.1.2/32", "ipv6": "1::2/128"}
+NETWORK2_1 = {"ipv4": "2.1.1.1/32", "ipv6": "2::1/128"}
+NETWORK2_2 = {"ipv4": "2.1.1.2/32", "ipv6": "2::2/128"}
+NETWORK3_1 = {"ipv4": "3.1.1.1/32", "ipv6": "3::1/128"}
+NETWORK3_2 = {"ipv4": "3.1.1.2/32", "ipv6": "3::2/128"}
+NETWORK4_1 = {"ipv4": "4.1.1.1/32", "ipv6": "4::1/128"}
+NETWORK4_2 = {"ipv4": "4.1.1.2/32", "ipv6": "4::2/128"}
+NETWORK5_1 = {"ipv4": "5.1.1.1/32", "ipv6": "5::1/128"}
+NETWORK5_2 = {"ipv4": "5.1.1.2/32", "ipv6": "5::2/128"}
+DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+NEXT_HOP_IP = {"ipv4": "Null0", "ipv6": "Null0"}
+
+IPV4_RM = "RMVIPV4"
+IPV6_RM = "RMVIPV6"
+
+IPV4_RM1 = "RMVIPV41"
+IPV6_RM1 = "RMVIPV61"
+
+IPV4_RM2 = "RMVIPV42"
+IPV6_RM2 = "RMVIPV62"
+
+IPV4_PL_1 = "PV41"
+IPV4_PL_2 = "PV42"
+
+IPV6_PL_1 = "PV61"
+IPV6_PL_2 = "PV62"
+
+
+r1_ipv4_loopback = "1.0.1.0/24"
+r2_ipv4_loopback = "1.0.2.0/24"
+r3_ipv4_loopback = "1.0.3.0/24"
+r4_ipv4_loopback = "1.0.4.0/24"
+r1_ipv6_loopback = "2001:db8:f::1:0/120"
+r2_ipv6_loopback = "2001:db8:f::2:0/120"
+r3_ipv6_loopback = "2001:db8:f::3:0/120"
+r4_ipv6_loopback = "2001:db8:f::4:0/120"
+
+r0_connected_address_ipv4 = "192.168.0.0/24"
+r0_connected_address_ipv6 = "fd00::/64"
+r1_connected_address_ipv4 = "192.168.1.0/24"
+r1_connected_address_ipv6 = "fd00:0:0:1::/64"
+r3_connected_address_ipv4 = "192.168.2.0/24"
+r3_connected_address_ipv6 = "fd00:0:0:2::/64"
+r4_connected_address_ipv4 = "192.168.3.0/24"
+r4_connected_address_ipv6 = "fd00:0:0:3::/64"
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ # Required linux kernel version for this suite to run.
+ result = required_linux_kernel_version("4.15")
+ if result is not True:
+ pytest.skip("Kernel requirements are not met")
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: {}".format(testsuite_run_time))
+ logger.info("=" * 40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ json_file = "{}/bgp_default_originate_topo1.json".format(CWD)
+ tgen = Topogen(json_file, mod.__name__)
+ global topo
+ topo = tgen.json_topo
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start daemons and then start routers
+ start_topology(tgen)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ global ADDR_TYPES
+ global BGP_CONVERGENCE
+ global DEFAULT_ROUTES
+ global DEFAULT_ROUTE_NXT_HOP_R1, DEFAULT_ROUTE_NXT_HOP_R3
+ global R0_NETWORK_LOOPBACK, R0_NETWORK_LOOPBACK_NXTHOP, R1_NETWORK_LOOPBACK, R1_NETWORK_LOOPBACK_NXTHOP
+ global R0_NETWORK_CONNECTED, R0_NETWORK_CONNECTED_NXTHOP, R1_NETWORK_CONNECTED, R1_NETWORK_CONNECTED_NXTHOP
+ global R4_NETWORK_LOOPBACK, R4_NETWORK_LOOPBACK_NXTHOP, R3_NETWORK_LOOPBACK, R3_NETWORK_LOOPBACK_NXTHOP
+ global R4_NETWORK_CONNECTED, R4_NETWORK_CONNECTED_NXTHOP, R3_NETWORK_CONNECTED, R3_NETWORK_CONNECTED_NXTHOP
+
+ ADDR_TYPES = check_address_types()
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format(
+ BGP_CONVERGENCE
+ )
+ # There are the global varibles used through out the file these are acheived only after building the topology.
+
+ r0_loopback_address_ipv4 = topo["routers"]["r0"]["links"]["lo"]["ipv4"]
+ r0_loopback_address_ipv4_nxt_hop = topo["routers"]["r0"]["links"]["r1"][
+ "ipv4"
+ ].split("/")[0]
+ r0_loopback_address_ipv6 = topo["routers"]["r0"]["links"]["lo"]["ipv6"]
+ r0_loopback_address_ipv6_nxt_hop = topo["routers"]["r0"]["links"]["r1"][
+ "ipv6"
+ ].split("/")[0]
+
+ r1_loopback_address_ipv4 = topo["routers"]["r1"]["links"]["lo"]["ipv4"]
+ r1_loopback_address_ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"][
+ "ipv4"
+ ].split("/")[0]
+ r1_loopback_address_ipv6 = topo["routers"]["r1"]["links"]["lo"]["ipv6"]
+ r1_loopback_address_ipv6_nxt_hop = topo["routers"]["r1"]["links"]["r2"][
+ "ipv6"
+ ].split("/")[0]
+
+ r4_loopback_address_ipv4 = topo["routers"]["r4"]["links"]["lo"]["ipv4"]
+ r4_loopback_address_ipv4_nxt_hop = topo["routers"]["r4"]["links"]["r3"][
+ "ipv4"
+ ].split("/")[0]
+ r4_loopback_address_ipv6 = topo["routers"]["r4"]["links"]["lo"]["ipv6"]
+ r4_loopback_address_ipv6_nxt_hop = topo["routers"]["r4"]["links"]["r3"][
+ "ipv6"
+ ].split("/")[0]
+
+ r3_loopback_address_ipv4 = topo["routers"]["r3"]["links"]["lo"]["ipv4"]
+ r3_loopback_address_ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"][
+ "ipv4"
+ ].split("/")[0]
+ r3_loopback_address_ipv6 = topo["routers"]["r3"]["links"]["lo"]["ipv6"]
+ r3_loopback_address_ipv6_nxt_hop = topo["routers"]["r3"]["links"]["r2"][
+ "ipv6"
+ ].split("/")[0]
+
+ R0_NETWORK_LOOPBACK = {
+ "ipv4": r0_loopback_address_ipv4,
+ "ipv6": r0_loopback_address_ipv6,
+ }
+ R0_NETWORK_LOOPBACK_NXTHOP = {
+ "ipv4": r0_loopback_address_ipv4_nxt_hop,
+ "ipv6": r0_loopback_address_ipv6_nxt_hop,
+ }
+
+ R1_NETWORK_LOOPBACK = {
+ "ipv4": r1_loopback_address_ipv4,
+ "ipv6": r1_loopback_address_ipv6,
+ }
+ R1_NETWORK_LOOPBACK_NXTHOP = {
+ "ipv4": r1_loopback_address_ipv4_nxt_hop,
+ "ipv6": r1_loopback_address_ipv6_nxt_hop,
+ }
+
+ R0_NETWORK_CONNECTED = {
+ "ipv4": r0_connected_address_ipv4,
+ "ipv6": r0_connected_address_ipv6,
+ }
+ R0_NETWORK_CONNECTED_NXTHOP = {
+ "ipv4": r0_loopback_address_ipv4_nxt_hop,
+ "ipv6": r0_loopback_address_ipv6_nxt_hop,
+ }
+
+ R1_NETWORK_CONNECTED = {
+ "ipv4": r1_connected_address_ipv4,
+ "ipv6": r1_connected_address_ipv6,
+ }
+ R1_NETWORK_CONNECTED_NXTHOP = {
+ "ipv4": r1_loopback_address_ipv4_nxt_hop,
+ "ipv6": r1_loopback_address_ipv6_nxt_hop,
+ }
+
+ R4_NETWORK_LOOPBACK = {
+ "ipv4": r4_loopback_address_ipv4,
+ "ipv6": r4_loopback_address_ipv6,
+ }
+ R4_NETWORK_LOOPBACK_NXTHOP = {
+ "ipv4": r4_loopback_address_ipv4_nxt_hop,
+ "ipv6": r4_loopback_address_ipv6_nxt_hop,
+ }
+
+ R3_NETWORK_LOOPBACK = {
+ "ipv4": r3_loopback_address_ipv4,
+ "ipv6": r3_loopback_address_ipv6,
+ }
+ R3_NETWORK_LOOPBACK_NXTHOP = {
+ "ipv4": r3_loopback_address_ipv4_nxt_hop,
+ "ipv6": r3_loopback_address_ipv6_nxt_hop,
+ }
+
+ R4_NETWORK_CONNECTED = {
+ "ipv4": r4_connected_address_ipv4,
+ "ipv6": r4_connected_address_ipv6,
+ }
+ R4_NETWORK_CONNECTED_NXTHOP = {
+ "ipv4": r4_loopback_address_ipv4_nxt_hop,
+ "ipv6": r4_loopback_address_ipv6_nxt_hop,
+ }
+
+ R3_NETWORK_CONNECTED = {
+ "ipv4": r3_connected_address_ipv4,
+ "ipv6": r3_connected_address_ipv6,
+ }
+ R3_NETWORK_CONNECTED_NXTHOP = {
+ "ipv4": r3_loopback_address_ipv4_nxt_hop,
+ "ipv6": r3_loopback_address_ipv6_nxt_hop,
+ }
+
+ # populating the nexthop for default routes
+
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+
+ interface = topo["routers"]["r1"]["links"]["r2"]["interface"]
+ ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r1", intf=interface)
+ ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0]
+ ipv6_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv6"].split("/")[0]
+ DEFAULT_ROUTE_NXT_HOP_R1 = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local}
+
+ interface = topo["routers"]["r3"]["links"]["r2"]["interface"]
+ ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r3", intf=interface)
+ ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0]
+ ipv6_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv6"].split("/")[0]
+ DEFAULT_ROUTE_NXT_HOP_R3 = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local}
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module():
+ """Teardown the pytest environment"""
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+ logger.info(
+ "Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
+ )
+ logger.info("=" * 40)
+
+#####################################################
+#
+# Testcases
+#
+#####################################################
+
+def test_verify_bgp_default_originate_in_IBGP_p0(request):
+ """
+ Verify BGP default-originate route with IBGP peer
+ """
+ tgen = get_topogen()
+ global BGP_CONVERGENCE
+ global topo
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ check_router_status(tgen)
+ reset_config_on_routers(tgen)
+
+ if BGP_CONVERGENCE != True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ step("Configure IPv4 and IPv6 , IBGP neighbor between R1 and R2")
+ step("Configure IPv4 and IPv6 Loopback interface on R1, R0 and R2")
+ step("Configure IPv4 and IPv6 EBGP neighbor between R0 and R1")
+
+ r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"]
+ r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"]
+ r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"]
+ r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"]
+ r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"]
+
+ input_dict = {
+ "r0": {
+ "bgp": {
+ "local_as": 1000,
+ }
+ },
+ "r1": {
+ "bgp": {
+ "local_as": 2000,
+ }
+ },
+ "r2": {
+ "bgp": {
+ "local_as": 2000,
+ }
+ },
+ "r3": {
+ "bgp": {
+ "local_as": r3_local_as,
+ }
+ },
+ "r4": {
+ "bgp": {
+ "local_as": r4_local_as,
+ }
+ },
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+ try:
+ assert result is True
+ except AssertionError:
+ logger.info("Expected behaviour: {}".format(result))
+ logger.info("BGP config is not created because of invalid ASNs")
+
+ step("After changing the BGP AS Path Verify the BGP Convergence")
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, " Complete Convergence is expected after changing the ASN but failed to converge --> :Failed \n Error: {}".format(
+ BGP_CONVERGENCE
+ )
+
+ step("Configure IPv4 and IPv6 static route on R1 next-hop as NULL0")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ }
+ ]
+ }
+ }
+ result = create_static_routes(tgen, static_routes_input)
+ assert result is True, "Testcase {} : Failed to configure the static routes {} on router R1 \n Error: {}".format(
+ tc_name,static_routes_input, result
+ )
+ step("verify IPv4 and IPv6 static route are configured and up on R1")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ }
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r1", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n After configuring the static routes {} , the routes are not found in FIB \n Error: {}".format(
+ tc_name,static_routes_input, result
+ )
+
+ step(
+ "Configure redistribute static and connected on R0 and R1, for IPv4 and IPv6 address family "
+ )
+ redistribute_static = {
+ "r0": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ },
+ }
+ }
+ },
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ },
+ }
+ }
+ },
+ }
+ result = create_router_bgp(tgen, topo, redistribute_static)
+ assert result is True, "Testcase {} : Failed to configure the redistribute static configuration \n Error: {}".format(tc_name, result)
+
+ step(
+ "After configuring redistribute command , verify static and connected routes ( loopback connected routes) are advertised on R2"
+ )
+
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [R0_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R0_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R0_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R0_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : After redistributing static routes the routes {} expected in FIB but NOT FOUND ......! \n Error: {}".format(
+ tc_name, static_routes_input,result
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : After redistributing static routes the routes {} expected in RIB but NOT FOUND ......! \n Error: {}".format(
+ tc_name, static_routes_input , result
+ )
+
+ step(
+ "Taking the snapshot of the prefix count before configuring the default originate"
+ )
+ snapshot1 = get_prefix_count_route(tgen, topo, dut="r2", peer="r1")
+
+ step(
+ "Configure Default originate on R1 for R1 to R2, for IPv4 and IPv6 BGP address family "
+ )
+ local_as = get_dut_as_number(tgen, dut="r1")
+ default_originate_config = {
+ "r1": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {"unicast": {"default_originate": {"r2": {}}}},
+ "ipv6": {"unicast": {"default_originate": {"r2": {}}}},
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed Configuring default originate configuration. \n Error: {}".format(tc_name, result)
+
+ step(
+ "After configuring default-originate command , verify default routes are advertised on R2 "
+ " R1 static and loopback routes received on R2 BGP and FIB"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ ]
+ }
+ }
+
+ result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : post configuring the BGP Default originate configuration static and connected routes should not be effected but impacted on FIB .......! FAILED \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failedpost configuring the BGP Default originate configuration static and connected routes should not be effected but impacted on RIB......! FAILED \n Error: {}".format(
+ tc_name, result
+ )
+ step(
+ "Verify default route for IPv4 and IPv6 present with path=igp metric =0 , local-preference= 100 "
+ )
+ result = verify_rib_default_route(
+ tgen,
+ topo,
+ dut="r2",
+ routes=DEFAULT_ROUTES,
+ expected_nexthop=DEFAULT_ROUTE_NXT_HOP_R1,
+ metric=0,
+ locPrf=100,
+ )
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+ step(
+ "Taking the snapshot2 of the prefix count after configuring the default originate"
+ )
+ snapshot2 = get_prefix_count_route(tgen, topo, dut="r2", peer="r1")
+
+ step("verifying the prefix count incrementing or not ")
+ isIPv4prefix_incremented = False
+ isIPv6prefix_incremented = False
+ if snapshot1["ipv4_count"] < snapshot2["ipv4_count"]:
+ isIPv4prefix_incremented = True
+ if snapshot1["ipv6_count"] < snapshot2["ipv6_count"]:
+ isIPv6prefix_incremented = True
+
+ assert (
+ isIPv4prefix_incremented is True
+ ), "Testcase {} : Failed Error: IPV4 Prefix is not incremented on receiveing ".format(
+ tc_name
+ )
+
+ assert (
+ isIPv6prefix_incremented is True
+ ), "Testcase {} : Failed Error: IPV6 Prefix is not incremented on receiveing ".format(
+ tc_name
+ )
+ write_test_footer(tc_name)
+
+
+def test_verify_bgp_default_originate_in_EBGP_p0(request):
+ """
+ Verify BGP default-originate route with EBGP peer
+ """
+ tgen = get_topogen()
+ global BGP_CONVERGENCE
+ global topo
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ check_router_status(tgen)
+ reset_config_on_routers(tgen)
+
+ if BGP_CONVERGENCE != True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ step("Configure IPv4 and IPv6 , EBGP neighbor between R3 and R2")
+ step("Configure lPv4 and IPv6 Loopback interface on R3, R4 and R2")
+ step("Configure IPv4 and IPv6 IBGP neighbor between R4 and R3")
+ r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"]
+ r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"]
+ r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"]
+ r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"]
+ r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"]
+
+ input_dict = {
+ "r0": {
+ "bgp": {
+ "local_as": r0_local_as,
+ }
+ },
+ "r1": {
+ "bgp": {
+ "local_as": r1_local_as,
+ }
+ },
+ "r2": {
+ "bgp": {
+ "local_as": r2_local_as,
+ }
+ },
+ "r3": {
+ "bgp": {
+ "local_as": 4000,
+ }
+ },
+ "r4": {
+ "bgp": {
+ "local_as": 4000,
+ }
+ },
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+ try:
+ assert result is True
+ except AssertionError:
+ logger.info("Expected behaviour: {}".format(result))
+ logger.info("BGP config is not created because of invalid ASNs")
+ step("After changing the BGP AS Path Verify the BGP Convergence")
+
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "Complete convergence is expeceted after changing the ASN os the routes ..! :Failed \n Error: {}".format(
+ BGP_CONVERGENCE
+ )
+
+ step(" Configure IPv4 and IPv6 static route on R3 next-hop on R4 interface")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r3": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ }
+ ]
+ }
+ }
+ result = create_static_routes(tgen, static_routes_input)
+ assert result is True, "Testcase {} : Failed to configure the static routes ....! Failed \n Error: {}".format(
+ tc_name, result
+ )
+ step("verify IPv4 and IPv6 static route are configured and up on R1")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r3": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ }
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r3", static_routes_input)
+ assert result is True, "Testcase {} : Route is not found in {} in FIB ......! Failed \n Error: {}".format(
+ tc_name, static_routes_input,result
+ )
+
+ step(
+ "Configure redistribute static and connected on R3 and R4 for IPv4 and IPv6 address family "
+ )
+ redistribute_static = {
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ },
+ }
+ }
+ },
+ "r4": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ },
+ }
+ }
+ },
+ }
+ result = create_router_bgp(tgen, topo, redistribute_static)
+ assert result is True, "Testcase {} : Failed to configure redistribute configuratin \n Error: {}".format(tc_name, result)
+
+ step(
+ "After configuring redistribute command , verify static and connected routes ( loopback connected routes) are advertised on R2"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [R3_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R3_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R3_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R3_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R4_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R4_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R4_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R4_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : static & and connected routes are expected but not found in FIB .... ! \n Error: {}".format(
+ tc_name, result
+ )
+ result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : static & and connected routes are expected but not found in RIB .... ! \n Error: {}".format(
+ tc_name, result
+ )
+ snapshot1 = get_prefix_count_route(tgen, topo, dut="r2", peer="r3")
+ step(
+ "Configure Default originate on R3 for R3 to R2, on IPv4 and IPv6 BGP address family"
+ )
+ local_as = get_dut_as_number(tgen, dut="r3")
+ default_originate_config = {
+ "r3": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {"unicast": {"default_originate": {"r2": {}}}},
+ "ipv6": {"unicast": {"default_originate": {"r2": {}}}},
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed to configure the default originate configuration \n Error: {}".format(tc_name, result)
+
+ step(
+ "After configuring default-originate command , verify default routes are advertised on R2 on both BGP RIB and FIB"
+ )
+
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ },
+ ]
+ }
+ }
+
+ result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : static route from R1 {} and default route from R3 is expected in R2 FIB .....! NOT FOUND \n Error: {}".format(
+ tc_name, NETWORK1_1,result
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : static route from R1 {} and default route from R3 is expected in R2 RIB .....! NOT FOUND \n Error: {}".format(
+ tc_name,NETWORK1_1, result
+ )
+
+ step(
+ "Verify default route for IPv4 and IPv6 present with path = ebgp as path, metric =0 "
+ )
+ # local preference will bgp not applicable for eBGP
+ result = verify_rib_default_route(
+ tgen,
+ topo,
+ dut="r2",
+ routes=DEFAULT_ROUTES,
+ expected_nexthop=DEFAULT_ROUTE_NXT_HOP_R3,
+ metric=0,
+ expected_aspath="4000",
+ )
+ assert result is True, "Testcase {} : Default route from R3 is expected with attributes in R2 RIB .....! NOT FOUND Error: {}".format(tc_name, result)
+
+ step(
+ "Taking the snapshot2 of the prefix count after configuring the default originate"
+ )
+ snapshot2 = get_prefix_count_route(tgen, topo, dut="r2", peer="r3")
+ step(
+ "Verify out-prefix count is incremented default route on IPv4 and IPv6 neighbor"
+ )
+ isIPv4prefix_incremented = False
+ isIPv6prefix_incremented = False
+ if snapshot1["ipv4_count"] < snapshot2["ipv4_count"]:
+ isIPv4prefix_incremented = True
+ if snapshot1["ipv6_count"] < snapshot2["ipv6_count"]:
+ isIPv6prefix_incremented = True
+
+ assert (
+ isIPv4prefix_incremented is True
+ ), "Testcase {} : Failed Error: IPV4 Prefix is not incremented on receiveing ".format(
+ tc_name
+ )
+
+ assert (
+ isIPv6prefix_incremented is True
+ ), "Testcase {} : Failed Error: IPV6 Prefix is not incremented on receiveing ".format(
+ tc_name
+ )
+ write_test_footer(tc_name)
+
+
+def test_verify_bgp_default_originate_in_IBGP_with_route_map_p0(request):
+ """
+ test_verify_bgp_default_originate_in_IBGP_with_route_map_p0
+ """
+ tgen = get_topogen()
+ global BGP_CONVERGENCE
+ global topo
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ check_router_status(tgen)
+ reset_config_on_routers(tgen)
+
+ if BGP_CONVERGENCE != True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ step("Configure IPv4 and IPv6 , IBGP neighbor between R1 and R2")
+ step("Configure IPv4 and IPv6 , EBGP neighbor between R1 and R0")
+ r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"]
+ r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"]
+ r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"]
+ r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"]
+ r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"]
+
+ input_dict = {
+ "r0": {
+ "bgp": {
+ "local_as": r0_local_as,
+ }
+ },
+ "r1": {
+ "bgp": {
+ "local_as": 1000,
+ }
+ },
+ "r2": {
+ "bgp": {
+ "local_as": 1000,
+ }
+ },
+ "r3": {
+ "bgp": {
+ "local_as": r3_local_as,
+ }
+ },
+ "r4": {
+ "bgp": {
+ "local_as": r4_local_as,
+ }
+ },
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+ try:
+ assert result is True
+ except AssertionError:
+ logger.info("Expected behaviour: {}".format(result))
+ logger.info("BGP config is not created because of invalid ASNs")
+
+ step("After changing the BGP AS Path Verify the BGP Convergence")
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "Complete convergence is expected after changing ASN ....! ERROR :Failed \n Error: {}".format(
+ BGP_CONVERGENCE
+ )
+
+ step("Configure 2 IPv4 and 2 IPv6 Static route on R0 with next-hop as Null0")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r0": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = create_static_routes(tgen, static_routes_input)
+ assert result is True, "Testcase {} : Static Configuration is Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("verify IPv4 and IPv6 static route are configured and up on R0")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r0": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r0", static_routes_input)
+ assert result is True, "Testcase {} : routes {} unable is not found in R0 FIB \n Error: {}".format(
+ tc_name, static_routes_input,result
+ )
+
+ step(
+ "Configure redistribute static on IPv4 and IPv6 address family on R0 for R0 to R1 neighbor "
+ )
+ redistribute_static = {
+ "r0": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
+ "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, redistribute_static)
+ assert result is True, "Testcase {} : Failed to configure redistribute static configuration....! \n Error: {}".format(tc_name, result)
+
+ step("verify IPv4 and IPv6 static route are configured and up on R1")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r0": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r1", static_routes_input)
+ assert result is True, "Testcase {} : Failed... Routes {} expected in r0 FIB after configuring the redistribute config \n Error: {}".format(
+ tc_name,static_routes_input, result
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, "r1", static_routes_input)
+ assert result is True, "Testcase {} : Failed... Routes {} expected in r0 RIB after configuring the redistribute config \n Error: {}".format(
+ tc_name, static_routes_input,result
+ )
+
+ step(
+ "Configure IPv4 prefix-list Pv4 and and IPv6 prefix-list Pv6 on R1 to match BGP route Sv41, Sv42, IPv6 route Sv61 Sv62 permit "
+ )
+ input_dict_3 = {
+ "r1": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv4": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv4"],
+ "action": "permit",
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv4"],
+ "action": "permit",
+ },
+ ]
+ },
+ "ipv6": {
+ "Pv6": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv6"],
+ "action": "permit",
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv6"],
+ "action": "permit",
+ },
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed to configure the prefix list \n Error: {}".format(tc_name, result)
+
+ step(
+ "Configure IPV4 and IPv6 route-map (RMv4 and RMv6 ) matching prefix-list (Pv4 and Pv6) respectively on R1"
+ )
+ input_dict_3 = {
+ "r1": {
+ "route_maps": {
+ "RMv4": [
+ {
+ "action": "permit",
+ "seq_id": "1",
+ "match": {"ipv4": {"prefix_lists": "Pv4"}},
+ },
+ ],
+ "RMv6": [
+ {
+ "action": "permit",
+ "seq_id": "1",
+ "match": {"ipv6": {"prefix_lists": "Pv6"}},
+ },
+ ],
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed to configure the route map \n Error: {}".format(tc_name, result)
+
+ step(
+ "Configure default-originate with route-map (RMv4 and RMv6) on R1, on BGP IPv4 and IPv6 address family "
+ )
+ local_as = get_dut_as_number(tgen, dut="r1")
+ default_originate_config = {
+ "r1": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {
+ "unicast": {"default_originate": {"r2": {"route_map": "RMv4"}}}
+ },
+ "ipv6": {
+ "unicast": {"default_originate": {"r2": {"route_map": "RMv6"}}}
+ },
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed to configure the default originate \n Error: {}".format(tc_name, result)
+
+ step("Verify the default route is received in BGP RIB and FIB")
+ step(
+ "After configuring default-originate command , verify default routes are advertised on R2 "
+ )
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ }
+ ]
+ }
+ }
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed...! Expected default route from R1 not found in FIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed...! Expected default route from R1 not found in RIB \n Error: {}".format(
+ tc_name, result
+ )
+ step("Remove route-map RMv4 and RMv6 from default-originate command in R1")
+ NOTE = """ Configuring the default-originate should remove the previously applied default originate with condtional route-map"""
+ local_as = get_dut_as_number(tgen, dut="r1")
+ default_originate_config = {
+ "r1": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {"unicast": {"default_originate": {"r2": {}}}},
+ "ipv6": {"unicast": {"default_originate": {"r2": {}}}},
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed to remove the default originate conditional route-map \n Error: {}".format(tc_name, result)
+
+ step(
+ "Verify BGP RIB and FIB After removing route-map , default route still present on R2"
+ )
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed Default route from R1 is not found in FIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed Default route from R1 is not found in RIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Configure default-originate with route-map (RMv4 and RMv6) on R1 ")
+ local_as = get_dut_as_number(tgen, dut="r1")
+ default_originate_config = {
+ "r1": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "default_originate": {
+ "r2": {
+ "route_map": "RMv4",
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "default_originate": {
+ "r2": {
+ "route_map": "RMv6",
+ }
+ }
+ }
+ },
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed to configure the Default originate route-map \n Error: {}".format(tc_name, result)
+
+ step(
+ "After configuring default-originate command , verify default routes are advertised on R2 "
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed Default Route from R1 is not found in FIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed Default Route from R1 is not found in RIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Delete prefix list using no prefix-list")
+ input_dict_3 = {
+ "r1": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv4": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv4"],
+ "action": "permit",
+ "delete": True,
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv4"],
+ "action": "permit",
+ "delete": True,
+ },
+ ]
+ },
+ "ipv6": {
+ "Pv6": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv6"],
+ "action": "permit",
+ "delete": True,
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv6"],
+ "action": "permit",
+ "delete": True,
+ },
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed to delete the prefix list Error: {}".format(tc_name, result)
+
+ step(
+ "Verify BGP RIB and FIB After deleting prefix-list , verify IPv4 and IPv6 default route got removed from DUT "
+ )
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ }
+ ]
+ }
+ }
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed\n After deleteing prefix default route is not expected in FIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n After deleteing prefix default route is not expected in RIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Configure prefix-list and delete route-map using no route-map")
+ input_dict_3 = {
+ "r1": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv4": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv4"],
+ "action": "permit",
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv4"],
+ "action": "permit",
+ },
+ ]
+ },
+ "ipv6": {
+ "Pv6": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv6"],
+ "action": "permit",
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv6"],
+ "action": "permit",
+ },
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed to configure the prefix lists Error: {}".format(tc_name, result)
+
+ step(
+ "After configuring the Prefixlist cross checking the BGP Default route is configured again , before deleting the route map"
+ )
+
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ }
+ ]
+ }
+ }
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ expected=True,
+ )
+ assert result is True, "Testcase {} : Failed Default route from R1 is expected in FIB but not found \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ expected=True,
+ )
+ assert result is True, "Testcase {} : Failed Default route from R1 is expected in RIB but not found \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Deleting the routemap")
+ input_dict = {"r1": {"route_maps": ["RMv4", "RMv6"]}}
+ result = delete_route_maps(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed to delete the Route-map \n Error: {}".format(tc_name, result)
+
+ step(
+ "Verify BGP RIB and FIB ,After deleting route-map , verify IPv4 and IPv6 default route got removed from DUT"
+ )
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n After deleteing route-map default route is not expected in FIB \nError: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n After deleteing route-map default route is not expected in RIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_verify_bgp_default_originate_in_EBGP_with_route_map_p0(request):
+ """
+ test_verify_bgp_default_originate_in_EBGP_with_route_map_p0
+ """
+ tgen = get_topogen()
+ global BGP_CONVERGENCE
+ global topo
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ check_router_status(tgen)
+ reset_config_on_routers(tgen)
+
+ if BGP_CONVERGENCE != True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ step("Configure IPv4 and IPv6 , EBGP neighbor between R3 and R2")
+ step("Configure IPv4 and IPv6 IBGP neighbor between R3 and R4")
+ r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"]
+ r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"]
+ r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"]
+ r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"]
+ r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"]
+
+ input_dict = {
+ "r0": {
+ "bgp": {
+ "local_as": r0_local_as,
+ }
+ },
+ "r1": {
+ "bgp": {
+ "local_as": r1_local_as,
+ }
+ },
+ "r2": {
+ "bgp": {
+ "local_as": r2_local_as,
+ }
+ },
+ "r3": {
+ "bgp": {
+ "local_as": 4000,
+ }
+ },
+ "r4": {
+ "bgp": {
+ "local_as": 4000,
+ }
+ },
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+ try:
+ assert result is True
+ except AssertionError:
+ logger.info("Expected behaviour: {}".format(result))
+ logger.info("BGP config is not created because of invalid ASNs")
+ step("After changing the BGP AS Path Verify the BGP Convergence")
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format(
+ BGP_CONVERGENCE
+ )
+
+ step(
+ "Configure 2 IPv4 and 2 IPv6, Static route on R4 with next-hop as Null0 IPv4 route Sv41, Sv42, IPv6 route Sv61 Sv62"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r4": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = create_static_routes(tgen, static_routes_input)
+ assert result is True, "Testcase {} : Failed to configure the static routes \n Error: {}".format(
+ tc_name, result
+ )
+ step("verify IPv4 and IPv6 static route are configured and up on R4")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r4": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r4", static_routes_input)
+ assert result is True, "Testcase {} : Failed Static route {} is not found in R4 FIB \n Error: {}".format(
+ tc_name, static_routes_input,result
+ )
+
+ step(
+ "Configure redistribute static on IPv4 and IPv6 address family on R4 for R4 to R3 neighbo"
+ )
+ redistribute_static = {
+ "r4": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
+ "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, redistribute_static)
+ assert result is True, "Testcase {} : Failed to configure the redistribute static \n Error: {}".format(tc_name, result)
+
+ step("verify IPv4 and IPv6 static route are configured and up on R3")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r3": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r3", static_routes_input)
+ assert result is True, "Testcase {} : Failed static routes from R1 and R3 is not found in FIB \n Error: {}".format(
+ tc_name, result
+ )
+ result = verify_bgp_rib(tgen, addr_type, "r3", static_routes_input)
+ assert result is True, "Testcase {} : Failed static routes from R1 and R3 is not found in RIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Configure IPv4 prefix-list Pv4 and and IPv6 prefix-list Pv6 on R3 so new route which is not present on R3"
+ )
+ input_dict_3 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv4": [
+ {
+ "seqid": "1",
+ "network": NETWORK3_1["ipv4"],
+ "action": "permit",
+ }
+ ]
+ },
+ "ipv6": {
+ "Pv6": [
+ {
+ "seqid": "1",
+ "network": NETWORK3_1["ipv6"],
+ "action": "permit",
+ }
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed to configure the prefix lists \n Error: {}".format(tc_name, result)
+
+ step("verify IPv4 and IPv6 Prefix list got configured on R3")
+ input_dict = {"r3": {"prefix_lists": ["Pv4", "Pv6"]}}
+ result = verify_prefix_lists(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed ..! configured prefix lists {} are not found \n Error: {}".format(tc_name,input_dict, result)
+
+ step(
+ "Configure IPv4 and IPv6 route-map ( RMv4 and RMv6 ) matching prefix-list (Pv4 and Pv6 ) respectively on R3"
+ )
+ input_dict_3 = {
+ "r3": {
+ "route_maps": {
+ "RMv4": [
+ {
+ "action": "permit",
+ "seq_id": "1",
+ "match": {"ipv4": {"prefix_lists": "Pv4"}},
+ },
+ ],
+ "RMv6": [
+ {
+ "action": "permit",
+ "seq_id": "1",
+ "match": {"ipv6": {"prefix_lists": "Pv6"}},
+ },
+ ],
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed to configure the route-map \n Error: {}".format(tc_name, result)
+ step(
+ "Taking the snapshot of the prefix count before configuring the default originate"
+ )
+ snapshot1 = get_prefix_count_route(tgen, topo, dut="r2", peer="r3")
+ step(
+ "Configure default-originate with IPv4 and IPv6 route-map (RMv4 and RMv6) on R3"
+ )
+ local_as = get_dut_as_number(tgen, dut="r3")
+ default_originate_config = {
+ "r3": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {
+ "unicast": {"default_originate": {"r2": {"route_map": "RMv4"}}}
+ },
+ "ipv6": {
+ "unicast": {"default_originate": {"r2": {"route_map": "RMv6"}}}
+ },
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed to configure default-originate \n Error: {}".format(tc_name, result)
+
+ step("Verify the default route is NOT received in BGP RIB and FIB on R2 ")
+ step(
+ "After configuring default-originate command , verify default routes are not Received on R2 "
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Default route is not expected due to deny in prefix list \nError: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \nDefault route is not expected due to deny in prefix list\n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Add route Sv41, Sv42, IPv6 route Sv61 Sv62 on prefix list Pv4 and Pv6")
+ input_dict_3 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv4": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv4"],
+ "action": "permit",
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv4"],
+ "action": "permit",
+ },
+ ]
+ },
+ "ipv6": {
+ "Pv6": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv6"],
+ "action": "permit",
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv6"],
+ "action": "permit",
+ },
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed to configure the prefix lists Error: {}".format(tc_name, result)
+
+ step("Verify BGP default route for IPv4 and IPv6 is received on R2")
+
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed Default routes are expected in R2 FIB from R3 but not found ....! \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed Default routes are expected in R2 RIB from R3 but not found ....! \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Remove route Sv41, Sv42, IPv6 route Sv61 Sv62 on prefix list Pv4 and Pv6")
+ input_dict_3 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv4": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv4"],
+ "action": "permit",
+ "delete": True,
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv4"],
+ "action": "permit",
+ "delete": True,
+ },
+ ]
+ },
+ "ipv6": {
+ "Pv6": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv6"],
+ "action": "permit",
+ "delete": True,
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv6"],
+ "action": "permit",
+ "delete": True,
+ },
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed to remove prefix-lists from R3 Error: {}".format(tc_name, result)
+
+ step(
+ "After Removing route BGP default route for IPv4 and IPv6 is NOT received on R2"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n After Removing route in prefix list the default route is not expected in FIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n After Removing route in prefix list the default route is not expected in RIB\n Error: {}".format(
+ tc_name, result
+ )
+
+ step(" Add route Sv41, Sv42, IPv6 route Sv61 Sv62 on prefix list Pv4 and Pv6")
+ input_dict_3 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv4": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv4"],
+ "action": "permit",
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv4"],
+ "action": "permit",
+ },
+ ]
+ },
+ "ipv6": {
+ "Pv6": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv6"],
+ "action": "permit",
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv6"],
+ "action": "permit",
+ },
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("Verify BGP default route for IPv4 and IPv6 is received on R2")
+
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Change IPv4 and IPv6 prefix-list permit and deny ")
+ input_dict_3 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv4": [
+ {"seqid": "1", "network": NETWORK1_1["ipv4"], "action": "deny"},
+ {"seqid": "2", "network": NETWORK2_1["ipv4"], "action": "deny"},
+ ]
+ },
+ "ipv6": {
+ "Pv6": [
+ {"seqid": "1", "network": NETWORK1_1["ipv6"], "action": "deny"},
+ {"seqid": "2", "network": NETWORK2_1["ipv6"], "action": "deny"},
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("Verify BGP default route for IPv4 and IPv6 is not received on R2")
+
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n after denying the prefix list default route is not expected in FIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n after denying the prefix list default route is not expected in RIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Change IPv4 and IPv6 prefix-list deny to permit ")
+ input_dict_3 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv4": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv4"],
+ "action": "permit",
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv4"],
+ "action": "permit",
+ },
+ ]
+ },
+ "ipv6": {
+ "Pv6": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv6"],
+ "action": "permit",
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv6"],
+ "action": "permit",
+ },
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ step("Verify BGP default route for IPv4 and IPv6 is received on R2")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Taking the snapshot2 of the prefix count after configuring the default originate"
+ )
+ snapshot2 = get_prefix_count_route(tgen, topo, dut="r2", peer="r3")
+
+ step("verifying the prefix count incrementing or not ")
+ isIPv4prefix_incremented = False
+ isIPv6prefix_incremented = False
+ if snapshot1["ipv4_count"] < snapshot2["ipv4_count"]:
+ isIPv4prefix_incremented = True
+ if snapshot1["ipv6_count"] < snapshot2["ipv6_count"]:
+ isIPv6prefix_incremented = True
+
+ assert (
+ isIPv4prefix_incremented is True
+ ), "Testcase {} : Failed Error: IPV4 Prefix is not incremented on receiveing ".format(
+ tc_name
+ )
+
+ assert (
+ isIPv6prefix_incremented is True
+ ), "Testcase {} : Failed Error: IPV6 Prefix is not incremented on receiveing ".format(
+ tc_name
+ )
+
+ step(
+ "Configure another IPv4 and IPv6 route-map and match same prefix-list (Sv41, Sv42, IPv6 route Sv61 Sv62) with deny statement "
+ )
+ input_dict_3 = {
+ "r3": {
+ "route_maps": {
+ "RMv41": [
+ {
+ "action": "deny",
+ "seq_id": "1",
+ "match": {"ipv4": {"prefix_lists": "Pv4"}},
+ },
+ ],
+ "RMv61": [
+ {
+ "action": "deny",
+ "seq_id": "1",
+ "match": {"ipv6": {"prefix_lists": "Pv6"}},
+ },
+ ],
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("Attach route-map on IPv4 and IP6 BGP neighbor on fly")
+ local_as = get_dut_as_number(tgen, dut="r3")
+ default_originate_config = {
+ "r3": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {
+ "unicast": {"default_originate": {"r2": {"route_map": "RMv41"}}}
+ },
+ "ipv6": {
+ "unicast": {"default_originate": {"r2": {"route_map": "RMv61"}}}
+ },
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "After attaching route-map verify IPv4 and IPv6 default route is withdrawn from the R2"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ expected=False,
+ )
+ assert result is not True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ expected=False,
+ )
+ assert result is not True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Change the recently added Routemap from deny to permit")
+ input_dict_3 = {
+ "r3": {
+ "route_maps": {
+ "RMv41": [
+ {
+ "action": "permit",
+ "seq_id": "1",
+ "match": {"ipv4": {"prefix_lists": "Pv4"}},
+ },
+ ],
+ "RMv61": [
+ {
+ "action": "permit",
+ "seq_id": "1",
+ "match": {"ipv6": {"prefix_lists": "Pv6"}},
+ },
+ ],
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("Verify IPv4 and IPv6 default route is advertised from the R2")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Delete default-originate route-map command while configuring ( neighbor x.x.x default-originate) for IPv4 and IPv6 BGP neighbor "
+ )
+ """ Configuring the Default originate on neighbor must remove the previously assigned deault-originate with routemap config """
+ local_as = get_dut_as_number(tgen, dut="r3")
+ default_originate_config = {
+ "r3": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {"unicast": {"default_originate": {"r2": {}}}},
+ "ipv6": {"unicast": {"default_originate": {"r2": {}}}},
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "Verify in running config from BGP that default-originate with route-map command is removed and default-originate command is still present and default route for IPv4 and IPv6 present in RIB and FIB"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Configure default-originate with conditional route-map command on IPv4 and IPv6 address family "
+ )
+ local_as = get_dut_as_number(tgen, dut="r3")
+ default_originate_config = {
+ "r3": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {
+ "unicast": {"default_originate": {"r2": {"route_map": "RMv41"}}}
+ },
+ "ipv6": {
+ "unicast": {"default_originate": {"r2": {"route_map": "RMv61"}}}
+ },
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "Verify in running config from BGP that default-originate with route-map command is present and default route for IPv4 and IPv6 present"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Delete default originate with 'no bgp default-originate' from IPV4 and IPV6 address family "
+ )
+ local_as = get_dut_as_number(tgen, dut="r3")
+ default_originate_config = {
+ "r3": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {
+ "unicast": {"default_originate": {"r2": {"delete": True}}}
+ },
+ "ipv6": {
+ "unicast": {"default_originate": {"r2": {"delete": True}}}
+ },
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ " Verify in running config from BGP that default-originate complete CLI is removed for IPV4 and IPV6 address family and default originate routes got deleted"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Default Route is not expected in FIB \nError: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Default Route is not expected in RIB\nError: {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_2.py b/tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_2.py
new file mode 100644
index 000000000..a9987a8f9
--- /dev/null
+++ b/tests/topotests/bgp_default_originate/test_bgp_default_originate_topo1_2.py
@@ -0,0 +1,2437 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2022 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF")
+# in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+# Shreenidhi A R <rshreenidhi@vmware.com>
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+"""
+Following tests are covered.
+5. Verify BGP default originate route-map with OUT route-map
+6. Verify BGP default originate route-map with IN route-map
+8. Verify BGP default route after removing default-originate
+9. Verify default-originate route with GR
+"""
+import os
+import sys
+import time
+import pytest
+from time import sleep
+from copy import deepcopy
+from lib.topolog import logger
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib.topogen import Topogen, get_topogen
+from lib.topojson import build_config_from_json
+from lib.topolog import logger
+
+from lib.bgp import (
+ verify_bgp_convergence,
+ verify_graceful_restart,
+ create_router_bgp,
+ verify_router_id,
+ modify_as_number,
+ verify_as_numbers,
+ clear_bgp_and_verify,
+ clear_bgp,
+ verify_bgp_rib,
+ get_prefix_count_route,
+ get_dut_as_number,
+ verify_rib_default_route,
+ verify_fib_default_route,
+ verify_bgp_advertised_routes_from_neighbor,
+ verify_bgp_received_routes_from_neighbor,
+)
+from lib.common_config import (
+ interface_status,
+ verify_prefix_lists,
+ verify_fib_routes,
+ kill_router_daemons,
+ start_router_daemons,
+ shutdown_bringup_interface,
+ step,
+ required_linux_kernel_version,
+ stop_router,
+ start_router,
+ create_route_maps,
+ create_prefix_lists,
+ get_frr_ipv6_linklocal,
+ start_topology,
+ write_test_header,
+ check_address_types,
+ write_test_footer,
+ reset_config_on_routers,
+ create_static_routes,
+ check_router_status,
+ delete_route_maps,
+)
+
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+sys.path.append(os.path.join(CWD, "../lib/"))
+
+# Required to instantiate the topology builder class.
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+
+# Global variables
+topo = None
+KEEPALIVETIMER = 1
+HOLDDOWNTIMER = 3
+# Global variables
+NETWORK1_1 = {"ipv4": "1.1.1.1/32", "ipv6": "1::1/128"}
+NETWORK1_2 = {"ipv4": "1.1.1.2/32", "ipv6": "1::2/128"}
+NETWORK2_1 = {"ipv4": "2.1.1.1/32", "ipv6": "2::1/128"}
+NETWORK2_2 = {"ipv4": "2.1.1.2/32", "ipv6": "2::2/128"}
+NETWORK3_1 = {"ipv4": "3.1.1.1/32", "ipv6": "3::1/128"}
+NETWORK3_2 = {"ipv4": "3.1.1.2/32", "ipv6": "3::2/128"}
+NETWORK4_1 = {"ipv4": "4.1.1.1/32", "ipv6": "4::1/128"}
+NETWORK4_2 = {"ipv4": "4.1.1.2/32", "ipv6": "4::2/128"}
+NETWORK5_1 = {"ipv4": "5.1.1.1/32", "ipv6": "5::1/128"}
+NETWORK5_2 = {"ipv4": "5.1.1.2/32", "ipv6": "5::2/128"}
+DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+NEXT_HOP_IP = {"ipv4": "Null0", "ipv6": "Null0"}
+
+IPV4_RM = "RMVIPV4"
+IPV6_RM = "RMVIPV6"
+
+IPV4_RM1 = "RMVIPV41"
+IPV6_RM1 = "RMVIPV61"
+
+IPV4_RM2 = "RMVIPV42"
+IPV6_RM2 = "RMVIPV62"
+
+IPV4_PL_1 = "PV41"
+IPV4_PL_2 = "PV42"
+
+IPV6_PL_1 = "PV61"
+IPV6_PL_2 = "PV62"
+
+
+r1_ipv4_loopback = "1.0.1.0/24"
+r2_ipv4_loopback = "1.0.2.0/24"
+r3_ipv4_loopback = "1.0.3.0/24"
+r4_ipv4_loopback = "1.0.4.0/24"
+r1_ipv6_loopback = "2001:db8:f::1:0/120"
+r2_ipv6_loopback = "2001:db8:f::2:0/120"
+r3_ipv6_loopback = "2001:db8:f::3:0/120"
+r4_ipv6_loopback = "2001:db8:f::4:0/120"
+
+r0_connected_address_ipv4 = "192.168.0.0/24"
+r0_connected_address_ipv6 = "fd00::/64"
+r1_connected_address_ipv4 = "192.168.1.0/24"
+r1_connected_address_ipv6 = "fd00:0:0:1::/64"
+r3_connected_address_ipv4 = "192.168.2.0/24"
+r3_connected_address_ipv6 = "fd00:0:0:2::/64"
+r4_connected_address_ipv4 = "192.168.3.0/24"
+r4_connected_address_ipv6 = "fd00:0:0:3::/64"
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ # Required linux kernel version for this suite to run.
+ result = required_linux_kernel_version("4.15")
+ if result is not True:
+ pytest.skip("Kernel requirements are not met")
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: {}".format(testsuite_run_time))
+ logger.info("=" * 40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ json_file = "{}/bgp_default_originate_topo1.json".format(CWD)
+ tgen = Topogen(json_file, mod.__name__)
+ global topo
+ topo = tgen.json_topo
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start daemons and then start routers
+ start_topology(tgen)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ global ADDR_TYPES
+ global BGP_CONVERGENCE
+ global DEFAULT_ROUTES
+ global DEFAULT_ROUTE_NXT_HOP_R1, DEFAULT_ROUTE_NXT_HOP_R3
+ global R0_NETWORK_LOOPBACK, R0_NETWORK_LOOPBACK_NXTHOP, R1_NETWORK_LOOPBACK, R1_NETWORK_LOOPBACK_NXTHOP
+ global R0_NETWORK_CONNECTED, R0_NETWORK_CONNECTED_NXTHOP, R1_NETWORK_CONNECTED, R1_NETWORK_CONNECTED_NXTHOP
+ global R4_NETWORK_LOOPBACK, R4_NETWORK_LOOPBACK_NXTHOP, R3_NETWORK_LOOPBACK, R3_NETWORK_LOOPBACK_NXTHOP
+ global R4_NETWORK_CONNECTED, R4_NETWORK_CONNECTED_NXTHOP, R3_NETWORK_CONNECTED, R3_NETWORK_CONNECTED_NXTHOP
+
+ ADDR_TYPES = check_address_types()
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format(
+ BGP_CONVERGENCE
+ )
+ # There are the global varibles used through out the file these are acheived only after building the topology.
+
+ r0_loopback_address_ipv4 = topo["routers"]["r0"]["links"]["lo"]["ipv4"]
+ r0_loopback_address_ipv4_nxt_hop = topo["routers"]["r0"]["links"]["r1"][
+ "ipv4"
+ ].split("/")[0]
+ r0_loopback_address_ipv6 = topo["routers"]["r0"]["links"]["lo"]["ipv6"]
+ r0_loopback_address_ipv6_nxt_hop = topo["routers"]["r0"]["links"]["r1"][
+ "ipv6"
+ ].split("/")[0]
+
+ r1_loopback_address_ipv4 = topo["routers"]["r1"]["links"]["lo"]["ipv4"]
+ r1_loopback_address_ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"][
+ "ipv4"
+ ].split("/")[0]
+ r1_loopback_address_ipv6 = topo["routers"]["r1"]["links"]["lo"]["ipv6"]
+ r1_loopback_address_ipv6_nxt_hop = topo["routers"]["r1"]["links"]["r2"][
+ "ipv6"
+ ].split("/")[0]
+
+ r4_loopback_address_ipv4 = topo["routers"]["r4"]["links"]["lo"]["ipv4"]
+ r4_loopback_address_ipv4_nxt_hop = topo["routers"]["r4"]["links"]["r3"][
+ "ipv4"
+ ].split("/")[0]
+ r4_loopback_address_ipv6 = topo["routers"]["r4"]["links"]["lo"]["ipv6"]
+ r4_loopback_address_ipv6_nxt_hop = topo["routers"]["r4"]["links"]["r3"][
+ "ipv6"
+ ].split("/")[0]
+
+ r3_loopback_address_ipv4 = topo["routers"]["r3"]["links"]["lo"]["ipv4"]
+ r3_loopback_address_ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"][
+ "ipv4"
+ ].split("/")[0]
+ r3_loopback_address_ipv6 = topo["routers"]["r3"]["links"]["lo"]["ipv6"]
+ r3_loopback_address_ipv6_nxt_hop = topo["routers"]["r3"]["links"]["r2"][
+ "ipv6"
+ ].split("/")[0]
+
+ R0_NETWORK_LOOPBACK = {
+ "ipv4": r0_loopback_address_ipv4,
+ "ipv6": r0_loopback_address_ipv6,
+ }
+ R0_NETWORK_LOOPBACK_NXTHOP = {
+ "ipv4": r0_loopback_address_ipv4_nxt_hop,
+ "ipv6": r0_loopback_address_ipv6_nxt_hop,
+ }
+
+ R1_NETWORK_LOOPBACK = {
+ "ipv4": r1_loopback_address_ipv4,
+ "ipv6": r1_loopback_address_ipv6,
+ }
+ R1_NETWORK_LOOPBACK_NXTHOP = {
+ "ipv4": r1_loopback_address_ipv4_nxt_hop,
+ "ipv6": r1_loopback_address_ipv6_nxt_hop,
+ }
+
+ R0_NETWORK_CONNECTED = {
+ "ipv4": r0_connected_address_ipv4,
+ "ipv6": r0_connected_address_ipv6,
+ }
+ R0_NETWORK_CONNECTED_NXTHOP = {
+ "ipv4": r0_loopback_address_ipv4_nxt_hop,
+ "ipv6": r0_loopback_address_ipv6_nxt_hop,
+ }
+
+ R1_NETWORK_CONNECTED = {
+ "ipv4": r1_connected_address_ipv4,
+ "ipv6": r1_connected_address_ipv6,
+ }
+ R1_NETWORK_CONNECTED_NXTHOP = {
+ "ipv4": r1_loopback_address_ipv4_nxt_hop,
+ "ipv6": r1_loopback_address_ipv6_nxt_hop,
+ }
+
+ R4_NETWORK_LOOPBACK = {
+ "ipv4": r4_loopback_address_ipv4,
+ "ipv6": r4_loopback_address_ipv6,
+ }
+ R4_NETWORK_LOOPBACK_NXTHOP = {
+ "ipv4": r4_loopback_address_ipv4_nxt_hop,
+ "ipv6": r4_loopback_address_ipv6_nxt_hop,
+ }
+
+ R3_NETWORK_LOOPBACK = {
+ "ipv4": r3_loopback_address_ipv4,
+ "ipv6": r3_loopback_address_ipv6,
+ }
+ R3_NETWORK_LOOPBACK_NXTHOP = {
+ "ipv4": r3_loopback_address_ipv4_nxt_hop,
+ "ipv6": r3_loopback_address_ipv6_nxt_hop,
+ }
+
+ R4_NETWORK_CONNECTED = {
+ "ipv4": r4_connected_address_ipv4,
+ "ipv6": r4_connected_address_ipv6,
+ }
+ R4_NETWORK_CONNECTED_NXTHOP = {
+ "ipv4": r4_loopback_address_ipv4_nxt_hop,
+ "ipv6": r4_loopback_address_ipv6_nxt_hop,
+ }
+
+ R3_NETWORK_CONNECTED = {
+ "ipv4": r3_connected_address_ipv4,
+ "ipv6": r3_connected_address_ipv6,
+ }
+ R3_NETWORK_CONNECTED_NXTHOP = {
+ "ipv4": r3_loopback_address_ipv4_nxt_hop,
+ "ipv6": r3_loopback_address_ipv6_nxt_hop,
+ }
+
+ # populating the nexthop for default routes
+
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+
+ interface = topo["routers"]["r1"]["links"]["r2"]["interface"]
+ ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r1", intf=interface)
+ ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0]
+ ipv6_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv6"].split("/")[0]
+ DEFAULT_ROUTE_NXT_HOP_R1 = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local}
+
+ interface = topo["routers"]["r3"]["links"]["r2"]["interface"]
+ ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r3", intf=interface)
+ ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0]
+ ipv6_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv6"].split("/")[0]
+ DEFAULT_ROUTE_NXT_HOP_R3 = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local}
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module():
+ """Teardown the pytest environment"""
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+ logger.info(
+ "Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
+ )
+ logger.info("=" * 40)
+
+
+#####################################################
+#
+# Local API's
+#
+#####################################################
+
+
+def configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut, peer):
+ """
+ This function groups the repetitive function calls into one function.
+ """
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+ return True
+
+
+#####################################################
+#
+# Testcases
+#
+#####################################################
+
+
+def test_verify_bgp_default_originate_route_map_in_OUT_p1(request):
+ """
+ test_verify_bgp_default_originate_route_map_in_OUT_p1
+ """
+ tgen = get_topogen()
+ global BGP_CONVERGENCE
+ global topo
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ check_router_status(tgen)
+ reset_config_on_routers(tgen)
+
+ if BGP_CONVERGENCE != True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ step("Configure IPv4 and IPv6 , EBGP neighbor between R3 and R2")
+ step("Configure IPv4 and IPv6 IBGP neighbor between R3 and R4")
+ r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"]
+ r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"]
+ r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"]
+ r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"]
+ r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"]
+ input_dict = {
+ "r0": {
+ "bgp": {
+ "local_as": r0_local_as,
+ }
+ },
+ "r1": {
+ "bgp": {
+ "local_as": r1_local_as,
+ }
+ },
+ "r2": {
+ "bgp": {
+ "local_as": r2_local_as,
+ }
+ },
+ "r3": {
+ "bgp": {
+ "local_as": 4000,
+ }
+ },
+ "r4": {
+ "bgp": {
+ "local_as": 4000,
+ }
+ },
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+ try:
+ assert result is True
+ except AssertionError:
+ logger.info("Expected behaviour: {}".format(result))
+ logger.info("BGP config is not created because of invalid ASNs")
+ step("After changing the BGP AS Path Verify the BGP Convergence")
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format(
+ BGP_CONVERGENCE
+ )
+
+ step(
+ "Configure 2 IPv4 and 2 IPv6, Static route on R4 with next-hop as Null0 IPv4 route Sv41, Sv42, IPv6 route Sv61 Sv62"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r4": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = create_static_routes(tgen, static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("verify IPv4 and IPv6 static route are configured and up on R4")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r4": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r4", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Configure redistribute static knob on R4 , for R4 to R3 neighbor ")
+ redistribute_static = {
+ "r4": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
+ "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, redistribute_static)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ expected_routes = {
+ "ipv4": [
+ {"network": NETWORK1_1["ipv4"], "nexthop": NEXT_HOP_IP["ipv4"]},
+ {"network": NETWORK2_1["ipv4"], "nexthop": NEXT_HOP_IP["ipv4"]},
+ ],
+ "ipv6": [
+ {"network": NETWORK1_1["ipv6"], "nexthop": NEXT_HOP_IP["ipv4"]},
+ {"network": NETWORK2_1["ipv6"], "nexthop": NEXT_HOP_IP["ipv4"]},
+ ],
+ }
+ result = verify_bgp_advertised_routes_from_neighbor(
+ tgen, topo, dut="r4", peer="r3", expected_routes=expected_routes
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "After redistribute static verify the routes is recevied in router R3 in RIB and FIB"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r3": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r3", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ result = verify_bgp_rib(tgen, addr_type, "r3", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Configure IPv4 prefix-list Pv4 and and IPv6 prefix-list Pv6 on R3 to match BGP route Sv41, IPv6 route Sv61 with permit option "
+ )
+ input_dict_3 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv4": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv4"],
+ "action": "permit",
+ }
+ ]
+ },
+ "ipv6": {
+ "Pv6": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv6"],
+ "action": "permit",
+ }
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("verify IPv4 and IPv6 Prefix list got configured on R3")
+ input_dict = {"r3": {"prefix_lists": ["Pv4", "Pv6"]}}
+ result = verify_prefix_lists(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "Configure IPv4 and IPv6 route-map RMv4 and RMv6 matching prefix-list Pv4 and Pv6 with permit option "
+ )
+ input_dict_3 = {
+ "r3": {
+ "route_maps": {
+ "RM4": [
+ {
+ "action": "permit",
+ "seq_id": "1",
+ "match": {"ipv4": {"prefix_lists": "Pv4"}},
+ },
+ ],
+ "RM6": [
+ {
+ "action": "permit",
+ "seq_id": "1",
+ "match": {"ipv6": {"prefix_lists": "Pv6"}},
+ },
+ ],
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "Configure IPv4 prefix-list Pv42 and and IPv6 prefix-list Pv62 on R3 to match BGP route Sv42, IPv6 route Sv62 with deny option"
+ )
+ input_dict_3 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv42": [
+ {"seqid": "1", "network": NETWORK2_1["ipv4"], "action": "deny"}
+ ]
+ },
+ "ipv6": {
+ "Pv62": [
+ {"seqid": "1", "network": NETWORK2_1["ipv6"], "action": "deny"}
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("verify IPv4 and IPv6 Prefix list got configured on R3")
+ input_dict = {"r3": {"prefix_lists": ["Pv42", "Pv62"]}}
+ result = verify_prefix_lists(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "Configure IPv4 and IPv6 route-map (RMv42 and RMv62 )matching prefix-list Pv42 and Pv62 with permit option "
+ )
+ input_dict_3 = {
+ "r3": {
+ "route_maps": {
+ "RMv42": [
+ {
+ "action": "permit",
+ "seq_id": "1",
+ "match": {"ipv4": {"prefix_lists": "Pv42"}},
+ },
+ ],
+ "RMv62": [
+ {
+ "action": "permit",
+ "seq_id": "1",
+ "match": {"ipv6": {"prefix_lists": "Pv62"}},
+ },
+ ],
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "Apply IPv4 and IPv6 route-map RMv4 and RMv6 with default-originate on R3 , for R3 to R2 peers and Apply IPv4 and IPv6 out route-map RMv42 and RMv62 on R3 , for R3 to R2 peers "
+ )
+ local_as = get_dut_as_number(tgen, "r3")
+ default_originate_config = {
+ "r3": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {
+ "unicast": {"default_originate": {"r2": {"route_map": "RM4"}}}
+ },
+ "ipv6": {
+ "unicast": {"default_originate": {"r2": {"route_map": "RM6"}}}
+ },
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ updated_topo = topo
+ updated_topo["routers"]["r0"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r0")
+ updated_topo["routers"]["r1"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r1")
+ updated_topo["routers"]["r2"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r2")
+ updated_topo["routers"]["r3"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r3")
+ updated_topo["routers"]["r4"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r4")
+
+ step(
+ "Apply IPv4 and IPv6 route-map RMv42 and RMv62 on R3 (OUT Direction), for R3 to R2 peers "
+ )
+ input_dict_4 = {
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "route_maps": [
+ {"name": "RMv42", "direction": "out"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "route_maps": [
+ {"name": "RMv62", "direction": "out"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, updated_topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ NOTE = """
+ After applying route-map on neighbor verify default BGP route IPv4 IPv6 route populated in R2 BGP and routing table , verify using "show ip bgp json" "show ipv6 bgp json" "show ip route json" "show ip route json"
+ Sv42 and Sv62 route should not be present on R2
+ """
+ step(NOTE)
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen, addr_type, "r2", static_routes_input, expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Static routes are not expected due to conditions \nError: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen, addr_type, "r2", static_routes_input, expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Static routes are not expected due to conditions\n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Change IPv4 prefix-list Pv42 and and IPv6 prefix-list Pv62 deny to permit")
+ input_dict_3 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv42": [
+ {
+ "seqid": "1",
+ "network": NETWORK2_1["ipv4"],
+ "action": "permit",
+ }
+ ]
+ },
+ "ipv6": {
+ "Pv62": [
+ {
+ "seqid": "1",
+ "network": NETWORK2_1["ipv6"],
+ "action": "permit",
+ }
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("verify IPv4 and IPv6 Prefix list got configured on R3")
+ input_dict = {"r3": {"prefix_lists": ["Pv42", "Pv62"]}}
+ result = verify_prefix_lists(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ NOTE = """Default BGP route and IPv4 ( Sv42) , IPv6 (Sv62) route populated in R2 BGP and routing table"""
+ step(NOTE)
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("IPv4 prefix-list Pv4 and and IPv6 prefix-list Pv6 permit to deny ")
+ input_dict_3 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv4": [
+ {"seqid": "1", "network": NETWORK1_1["ipv4"], "action": "deny"}
+ ]
+ },
+ "ipv6": {
+ "Pv6": [
+ {"seqid": "1", "network": NETWORK1_1["ipv6"], "action": "deny"}
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ NOTE = """
+ Verify default-originate route (IPv4 and IPv6 ) not present on R2
+ IPv4 ( Sv42) , IPv6 (Sv62) route populated in R2 BGP
+ """
+ step(NOTE)
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n default-route in FIB is not expected due to conditions \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R3[addr_type],
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n default-route in RIB is not expected due to conditions \n Error: {}".format(
+ tc_name, result
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+def test_verify_bgp_default_originate_route_map_in_IN_p1(request):
+ """Verify BGP default originate route-map with IN route-map"""
+ tgen = get_topogen()
+ global BGP_CONVERGENCE
+ global topo
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ check_router_status(tgen)
+ reset_config_on_routers(tgen)
+
+ if BGP_CONVERGENCE != True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ step("Configure IPv4 and IPv6 , EBGP neighbor between R1 and R2")
+ step("Configure IPv4 and IPv6 , IBGP neighbor between R1 and R0")
+ r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"]
+ r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"]
+ r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"]
+ r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"]
+ r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"]
+ input_dict = {
+ "r0": {
+ "bgp": {
+ "local_as": 1000,
+ }
+ },
+ "r1": {
+ "bgp": {
+ "local_as": 1000,
+ }
+ },
+ "r2": {
+ "bgp": {
+ "local_as": r2_local_as,
+ }
+ },
+ "r3": {
+ "bgp": {
+ "local_as": r3_local_as,
+ }
+ },
+ "r4": {
+ "bgp": {
+ "local_as": r4_local_as,
+ }
+ },
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+ try:
+ assert result is True
+ except AssertionError:
+ logger.info("Expected behaviour: {}".format(result))
+ logger.info("BGP config is not created because of invalid ASNs")
+ step("After changing the BGP AS Path Verify the BGP Convergence")
+
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format(
+ BGP_CONVERGENCE
+ )
+
+ step(
+ "Configure 2 IPv4 and 2 IPv6, Static route on R0 with next-hop as Null0 IPv4 route Sv41, Sv42, IPv6 route Sv61 Sv62"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r0": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = create_static_routes(tgen, static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("verifyIPv4 and IPv6 static routes are configure and up on R0 ")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r0": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r0", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Configure redistribute static knob on R0 , for R0 to R1 IPv4 and IPv6 neighbor"
+ )
+ redistribute_static = {
+ "r0": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
+ "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, redistribute_static)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("Verify IPv4 and IPv6 route received on R1 ")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r1", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, "r1", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Configure IPv4 prefix-list Pv4 and and IPv6 prefix-list Pv6 on R1 to match BGP route Sv41, Sv42, IPv6 route Sv61 Sv62"
+ )
+ input_dict_3 = {
+ "r1": {
+ "prefix_lists": {
+ "ipv4": {
+ "Pv4": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv4"],
+ "action": "permit",
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv4"],
+ "action": "permit",
+ },
+ ]
+ },
+ "ipv6": {
+ "Pv6": [
+ {
+ "seqid": "1",
+ "network": NETWORK1_1["ipv6"],
+ "action": "permit",
+ },
+ {
+ "seqid": "2",
+ "network": NETWORK2_1["ipv6"],
+ "action": "permit",
+ },
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("verify IPv4 and IPv6 Prefix list got configured on R1")
+ input_dict = {"r1": {"prefix_lists": ["Pv4", "Pv6"]}}
+ result = verify_prefix_lists(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "Configure IPv4 and IPv6 route-map RMv4 and RMv6 matching prefix-list Pv4 and Pv6 with deny option on R1"
+ )
+ input_dict_3 = {
+ "r1": {
+ "route_maps": {
+ "RMv4": [
+ {
+ "action": "deny",
+ "seq_id": "1",
+ "match": {"ipv4": {"prefix_lists": "Pv4"}},
+ },
+ ],
+ "RMv6": [
+ {
+ "action": "deny",
+ "seq_id": "1",
+ "match": {"ipv6": {"prefix_lists": "Pv6"}},
+ },
+ ],
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("Apply route-map IN direction in R1 (R1 to R0) IPv4 and IPv6 neighbor")
+ updated_topo = topo
+ updated_topo["routers"]["r0"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r0")
+ updated_topo["routers"]["r1"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r1")
+ updated_topo["routers"]["r2"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r2")
+ updated_topo["routers"]["r3"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r3")
+ updated_topo["routers"]["r4"]["bgp"]["local_as"] = get_dut_as_number(tgen, "r4")
+
+ local_as_r1 = get_dut_as_number(tgen, dut="r1")
+ input_dict_4 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r0": {
+ "dest_link": {
+ "r1": {
+ "route_maps": [
+ {
+ "name": "RMv4",
+ "direction": "in",
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r0": {
+ "dest_link": {
+ "r1": {
+ "route_maps": [
+ {
+ "name": "RMv6",
+ "direction": "in",
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, updated_topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ STEP = "After applying route-map verify that IPv4 route Sv41, Sv42, IPv6 route Sv61 Sv62 should not present on R1 BGP and routing table "
+ step(STEP)
+
+ step(
+ "After applying route-map verify that IPv4 route Sv41, Sv42, IPv6 route Sv61 Sv62 should not present on R1 "
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+
+ result = verify_fib_routes(
+ tgen, addr_type, "r1", static_routes_input, expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n default-route in FIB is not expected due to conditions \nError: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(
+ tgen, addr_type, "r1", static_routes_input, expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n default-route in FIB is not expected due to conditions \nError: {}".format(
+ tc_name, result
+ )
+ # Routes should come to dut but not the shown in RIB thus verifying using show ip bgp nbr xxx received route
+ step(
+ " Verify the received routes \n using 'show ip bgp nbr xxx received route' in Router R1"
+ )
+ expected_routes = {
+ "ipv4": [
+ {"network": NETWORK1_1["ipv4"], "nexthop": NEXT_HOP_IP["ipv4"]},
+ {"network": NETWORK2_1["ipv4"], "nexthop": NEXT_HOP_IP["ipv4"]},
+ ],
+ "ipv6": [
+ {"network": NETWORK1_1["ipv6"], "nexthop": NEXT_HOP_IP["ipv6"]},
+ {"network": NETWORK2_1["ipv6"], "nexthop": NEXT_HOP_IP["ipv6"]},
+ ],
+ }
+ result = verify_bgp_received_routes_from_neighbor(
+ tgen, topo, dut="r1", peer="r0", expected_routes=expected_routes
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("Configure default-originate on R1 for R1 to R2 IPv4 and IPv6 neighbor ")
+ local_as_r1 = get_dut_as_number(tgen, dut="r1")
+ default_originate_config = {
+ "r1": {
+ "bgp": {
+ "local_as": local_as_r1,
+ "address_family": {
+ "ipv4": {"unicast": {"default_originate": {"r2": {}}}},
+ "ipv6": {"unicast": {"default_originate": {"r2": {}}}},
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "Verify Default originate knob is configured and default route advertised to R2 , verify on R1 "
+ )
+ expected_routes = {
+ "ipv4": [
+ {"network": "0.0.0.0/0", "nexthop": ""},
+ ],
+ "ipv6": [
+ {"network": "::/0", "nexthop": ""},
+ ],
+ }
+ result = verify_bgp_advertised_routes_from_neighbor(
+ tgen, topo, dut="r1", peer="r2", expected_routes=expected_routes
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("Verify the Default route Route in FIB in R2")
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ }
+ ]
+ }
+ }
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Change route-map RMv4 and RMv6 from deny to permit")
+ input_dict_3 = {
+ "r1": {
+ "route_maps": {
+ "RMv4": [
+ {
+ "action": "permit",
+ "seq_id": "1",
+ "match": {"ipv4": {"prefix_lists": "Pv4"}},
+ },
+ ],
+ "RMv6": [
+ {
+ "action": "permit",
+ "seq_id": "1",
+ "match": {"ipv6": {"prefix_lists": "Pv6"}},
+ },
+ ],
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ NOTE = """After changing route-map to permit verify that IPv4 routes Sv41, Sv42, IPv6 routes Sv61 Sv62 present on R1 BGP and routing table , using "show ip route " "show ip bgp nbr xxx received route " "show ipv6 route " "show ipv6 bgp nbr xxx receied route """
+ step(NOTE)
+ expected_routes = {
+ "ipv4": [{"network": NETWORK1_1["ipv4"], "nexthop": NEXT_HOP_IP["ipv4"]}],
+ "ipv6": [{"network": NETWORK1_1["ipv6"], "nexthop": NEXT_HOP_IP["ipv4"]}],
+ }
+ result = verify_bgp_received_routes_from_neighbor(
+ tgen, topo, dut="r1", peer="r0", expected_routes=expected_routes
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ ]
+ }
+ }
+ result = verify_bgp_rib(tgen, addr_type, "r1", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ result = verify_fib_routes(tgen, addr_type, "r1", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Configure default static route (IPv4 and IPv6) on R2 nexthop as R1 ")
+ NEXT_HOP_IP_R1 = {}
+ r1_r2_ipv4_neighbor = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0]
+ r1_r2_ipv6_neighbor = topo["routers"]["r1"]["links"]["r2"]["ipv6"].split("/")[0]
+ NEXT_HOP_IP_R1["ipv4"] = r1_r2_ipv4_neighbor
+ NEXT_HOP_IP_R1["ipv6"] = r1_r2_ipv6_neighbor
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": "0.0.0.0/0",
+ "next_hop": NEXT_HOP_IP_R1["ipv4"],
+ },
+ {
+ "network": "0::0/0",
+ "next_hop": NEXT_HOP_IP_R1["ipv6"],
+ },
+ ]
+ }
+ }
+ result = create_static_routes(tgen, static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "Verify Static default route is taking preference over BGP default routes , BGP default route is inactive IN RIB and static is up and installed in RIB and FIB "
+ )
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0]
+ ipv6_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv6"].split("/")[0]
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ DEFAULT_ROUTE_NXT_HOP = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_nxt_hop}
+
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP[addr_type],
+ "protocol": "static",
+ }
+ ]
+ }
+ }
+ result = verify_bgp_rib(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ result = verify_fib_routes(
+ tgen,
+ addr_type,
+ "r2",
+ static_routes_input,
+ next_hop=DEFAULT_ROUTE_NXT_HOP[addr_type],
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ write_test_footer(tc_name)
+
+def test_verify_default_originate_after_removing_default_originate_p1(request):
+ """Verify BGP default route after removing default-originate"""
+
+ tgen = get_topogen()
+ global BGP_CONVERGENCE
+ global topo
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ check_router_status(tgen)
+ reset_config_on_routers(tgen)
+
+ if BGP_CONVERGENCE != True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ step("Configure EBGP between R0 to R1 and IBGP between R1 to R2")
+ step("Configure EBGP between R2 to R3 and IBGP between R3 to R4")
+ r0_local_as = topo["routers"]["r0"]["bgp"]["local_as"]
+ r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"]
+ r2_local_as = topo["routers"]["r2"]["bgp"]["local_as"]
+ r3_local_as = topo["routers"]["r3"]["bgp"]["local_as"]
+ r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"]
+ input_dict = {
+ "r0": {
+ "bgp": {
+ "local_as": r0_local_as,
+ }
+ },
+ "r1": {
+ "bgp": {
+ "local_as": 2000,
+ }
+ },
+ "r2": {
+ "bgp": {
+ "local_as": 2000,
+ }
+ },
+ "r3": {
+ "bgp": {
+ "local_as": 5000,
+ }
+ },
+ "r4": {
+ "bgp": {
+ "local_as": 5000,
+ }
+ },
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+ try:
+ assert result is True
+ except AssertionError:
+ logger.info("Expected behaviour: {}".format(result))
+ logger.info("BGP config is not created because of invalid ASNs")
+ step("After changing the BGP AS Path Verify the BGP Convergence")
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format(
+ BGP_CONVERGENCE
+ )
+
+ step("Configure IPv4 and IPv6 static route on R0 and R4")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r0": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ }
+ ]
+ },
+ "r4": {
+ "static_routes": [
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ }
+ ]
+ },
+ }
+ result = create_static_routes(tgen, static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("verify IPv4 and IPv6 static route are configured and up on R0 and R4")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r0": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ }
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r0", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r4": {
+ "static_routes": [
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ }
+ ]
+ }
+ }
+ result = verify_fib_routes(tgen, addr_type, "r4", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ step(
+ "Configure redistribute connected and static on R0 (R0-R1) on R4 ( R4-R3) IPv4 and IPv6 address family "
+ )
+ redistribute_static = {
+ "r0": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ },
+ }
+ }
+ },
+ "r4": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ },
+ }
+ }
+ },
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {"redistribute": [{"redist_type": "connected"}]}
+ },
+ "ipv6": {
+ "unicast": {"redistribute": [{"redist_type": "connected"}]}
+ },
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {"redistribute": [{"redist_type": "connected"}]}
+ },
+ "ipv6": {
+ "unicast": {"redistribute": [{"redist_type": "connected"}]}
+ },
+ }
+ }
+ },
+ }
+ result = create_router_bgp(tgen, topo, redistribute_static)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("verify IPv4 and IPv6 static route are configured and up on R1 and R3")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [R0_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R0_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R0_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R0_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ ]
+ }
+ }
+
+ result = verify_fib_routes(tgen, addr_type, "r1", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ result = verify_bgp_rib(tgen, addr_type, "r1", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("verify IPv4 and IPv6 static route are configured and up on R1 and R3")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r3": {
+ "static_routes": [
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [R3_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R3_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R3_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R3_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R4_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R4_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R4_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R4_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ ]
+ }
+ }
+
+ result = verify_fib_routes(tgen, addr_type, "r3", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, "r3", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Configure default-originate on R1 for R1 to R2 neighbor for IPv4 and IPv6 peer "
+ )
+ local_as = get_dut_as_number(tgen, dut="r1")
+ default_originate_config = {
+ "r1": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {"unicast": {"default_originate": {"r2": {}}}},
+ "ipv6": {"unicast": {"default_originate": {"r2": {}}}},
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+ step(
+ "Verify all the static , connected and loopback routes from R0,R1,R3 and R4 is receieved on R2 "
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [R0_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R0_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R0_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R0_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R3_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R3_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R3_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R3_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R4_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R4_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R4_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R4_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ ]
+ }
+ }
+
+ result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Verify the Default Originate on R2 nexthop as R1")
+
+ interface = topo["routers"]["r1"]["links"]["r2"]["interface"]
+ ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r1", intf=interface)
+ ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0]
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ DEFAULT_ROUTE_NXT_HOP = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local}
+
+ result = verify_rib_default_route(
+ tgen,
+ topo,
+ dut="r2",
+ routes=DEFAULT_ROUTES,
+ expected_nexthop=DEFAULT_ROUTE_NXT_HOP,
+ expected=True,
+ )
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n Error: After Deactivating the BGP neighbor the default route is expected but found in RIB -> {}".format(
+ tc_name, result
+ )
+
+ result = verify_fib_default_route(
+ tgen,
+ topo,
+ dut="r2",
+ routes=DEFAULT_ROUTES,
+ expected_nexthop=DEFAULT_ROUTE_NXT_HOP,
+ expected=True,
+ )
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n Error: After Deactivating the BGP neighbor the default route is expected but found in FIB -> {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Configure default-originate on R3 for R3 to R2 neighbor for IPv4 and IPv6 peer "
+ )
+ local_as = get_dut_as_number(tgen, dut="r3")
+ default_originate_config = {
+ "r3": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {"unicast": {"default_originate": {"r2": {}}}},
+ "ipv6": {"unicast": {"default_originate": {"r2": {}}}},
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+ STEP = """After configuring the Default Originate From R3 --> R2
+ Both Default routes from R1 and R3 Should present in R2 BGP RIB
+ The Deafult Route from iBGP is prefferedover EBGP thus
+ Default Route From R1->r2 should only present in R2 FIB """
+ step(STEP)
+
+ interface = topo["routers"]["r3"]["links"]["r2"]["interface"]
+ ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r3", intf=interface)
+ ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0]
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ DEFAULT_ROUTE_NXT_HOP = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local}
+ result = verify_fib_default_route(
+ tgen,
+ topo,
+ dut="r2",
+ routes=DEFAULT_ROUTES,
+ expected_nexthop=DEFAULT_ROUTE_NXT_HOP,
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Error: Only IBGP default originate is expected in FIB over EBGP {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib_default_route(
+ tgen,
+ topo,
+ dut="r2",
+ routes=DEFAULT_ROUTES,
+ expected_nexthop=DEFAULT_ROUTE_NXT_HOP,
+ expected=True,
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "No change on static and connected routes which got advertised from R0, R1, R3 and R4"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [R0_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R0_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R0_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R0_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R3_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R3_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R3_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R3_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R4_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R4_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R4_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R4_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ ]
+ }
+ }
+
+ result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ " Remove default-originate on R1 for R1 to R2 neighbor for IPv4 and IPv6 peer "
+ )
+ local_as = get_dut_as_number(tgen, dut="r1")
+ default_originate_config = {
+ "r1": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {
+ "unicast": {"default_originate": {"r2": {"delete": True}}}
+ },
+ "ipv6": {
+ "unicast": {"default_originate": {"r2": {"delete": True}}}
+ },
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("Verify the Default Originate reoute from R1 to r2 is removed in R2 ")
+ interface = topo["routers"]["r1"]["links"]["r2"]["interface"]
+ ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r1", intf=interface)
+ ipv4_nxt_hop = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0]
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ DEFAULT_ROUTE_NXT_HOP = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local}
+ result = verify_fib_default_route(
+ tgen,
+ topo,
+ dut="r2",
+ routes=DEFAULT_ROUTES,
+ expected_nexthop=DEFAULT_ROUTE_NXT_HOP,
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n After removing the default originate the route should not be present in FIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib_default_route(
+ tgen,
+ topo,
+ dut="r2",
+ routes=DEFAULT_ROUTES,
+ expected_nexthop=DEFAULT_ROUTE_NXT_HOP,
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n After removing the default originate the route should not be present in RIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ NOTE = """ after removing the Default originate from R1-->R2
+ Verify the BGP Default route received from R3 is present in both BGP RIB and FIB on R2
+ """
+ interface = topo["routers"]["r3"]["links"]["r2"]["interface"]
+ ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r3", intf=interface)
+ ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0]
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ DEFAULT_ROUTE_NXT_HOP = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local}
+ result = verify_fib_default_route(
+ tgen,
+ topo,
+ dut="r2",
+ routes=DEFAULT_ROUTES,
+ expected_nexthop=DEFAULT_ROUTE_NXT_HOP,
+ expected=True,
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ result = verify_rib_default_route(
+ tgen,
+ topo,
+ dut="r2",
+ routes=DEFAULT_ROUTES,
+ expected_nexthop=DEFAULT_ROUTE_NXT_HOP,
+ expected=True,
+ )
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "No change on static and connected routes which got advertised from R0, R1, R3 and R4"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [R0_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R0_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R0_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R0_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R3_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R3_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R3_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R3_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R4_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R4_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R4_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R4_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ ]
+ }
+ }
+
+ result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Remove default-originate on R3 for R3 to R2 neighbor for IPv4 and IPv6 peer "
+ )
+ local_as = get_dut_as_number(tgen, dut="r3")
+ default_originate_config = {
+ "r3": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {
+ "unicast": {"default_originate": {"r2": {"delete": True}}}
+ },
+ "ipv6": {
+ "unicast": {"default_originate": {"r2": {"delete": True}}}
+ },
+ },
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "After removing default originate , verify default IPv4 and IPv6 BGP routes removed on R2 from R1 ( next-hop as R3) "
+ )
+ interface = topo["routers"]["r3"]["links"]["r2"]["interface"]
+ ipv6_link_local = get_frr_ipv6_linklocal(tgen, "r3", intf=interface)
+ ipv4_nxt_hop = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0]
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ DEFAULT_ROUTE_NXT_HOP = {"ipv4": ipv4_nxt_hop, "ipv6": ipv6_link_local}
+ result = verify_fib_default_route(
+ tgen,
+ topo,
+ dut="r2",
+ routes=DEFAULT_ROUTES,
+ expected_nexthop=DEFAULT_ROUTE_NXT_HOP,
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n After removing the default originate the route should not be present in FIB \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib_default_route(
+ tgen,
+ topo,
+ dut="r2",
+ routes=DEFAULT_ROUTES,
+ expected_nexthop=DEFAULT_ROUTE_NXT_HOP,
+ expected=False,
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n After removing the default originate the route should not be present in RIB \n Error: {}".format(
+ tc_name, result
+ )
+ step(
+ "No change on static and connected routes which got advertised from R0, R1, R3 and R4"
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [NETWORK2_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ },
+ {
+ "network": [R0_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R0_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R0_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R0_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R1_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R1_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R1_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R3_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R3_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R3_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R3_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ {
+ "network": [R4_NETWORK_LOOPBACK[addr_type]],
+ "next_hop": R4_NETWORK_LOOPBACK_NXTHOP[addr_type],
+ },
+ {
+ "network": [R4_NETWORK_CONNECTED[addr_type]],
+ "next_hop": R4_NETWORK_CONNECTED_NXTHOP[addr_type],
+ },
+ ]
+ }
+ }
+
+ result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ write_test_footer(tc_name)
+
+def test_verify_default_originate_route_with_GR_p1(request):
+ """ "Verify default-originate route with GR "
+ """
+ tgen = get_topogen()
+ global BGP_CONVERGENCE
+ global topo
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ check_router_status(tgen)
+ reset_config_on_routers(tgen)
+
+ if BGP_CONVERGENCE != True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+
+ step("Configure IPV4 and IPV6 IBGP between R1 and R2 ")
+ step("Configure IPV4 and IPV6 EBGP between R2 to R3 ")
+ r0_local_as = topo['routers']['r0']['bgp']['local_as']
+ r1_local_as = topo['routers']['r1']['bgp']['local_as']
+ r2_local_as = topo['routers']['r2']['bgp']['local_as']
+ r3_local_as = topo['routers']['r3']['bgp']['local_as']
+ r4_local_as = topo['routers']['r4']['bgp']['local_as']
+ input_dict = {
+ "r0": {
+ "bgp": {
+ "local_as": r0_local_as,
+ }
+ },
+ "r1": {
+ "bgp": {
+ "local_as": 1000,
+ }
+ },
+ "r2": {
+ "bgp": {
+ "local_as": 1000,
+ }
+ },
+ "r3": {
+ "bgp": {
+ "local_as": r3_local_as,
+ }
+ },
+ "r4": {
+ "bgp": {
+ "local_as": r4_local_as,
+ }
+ },
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+ try:
+ assert result is True
+ except AssertionError:
+ logger.info("Expected behaviour: {}".format(result))
+ logger.info("BGP config is not created because of invalid ASNs")
+ step("After changing the BGP AS Path Verify the BGP Convergence")
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format(
+ BGP_CONVERGENCE
+ )
+
+ step(
+ "Configure per peer Graceful restart on R2 ( restarting router) and R3 helper router "
+ )
+ input_dict = {
+ "r2": {
+ "bgp": {
+ "local_as": get_dut_as_number(tgen, "r2"),
+ "graceful-restart": {
+ "graceful-restart": True,
+ "preserve-fw-state": True,
+ },
+ }
+ },
+ "r3": {
+ "bgp": {
+ "local_as": get_dut_as_number(tgen, "r3"),
+ "graceful-restart": {"graceful-restart-helper": True},
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r2", peer="r3")
+
+ step("verify Graceful restart at R2")
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r2", peer="r3"
+ )
+ assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+
+ step(
+ "Configure default-originate on R1 for R1-R2 neighbor for IPv4 and IPv6 BGP peers "
+ )
+ local_as = get_dut_as_number(tgen, dut="r1")
+ default_originate_config = {
+ "r1": {
+ "bgp": {
+ "local_as": local_as,
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "default_originate":{
+ "r2":{
+
+ }
+
+ }
+
+ }
+ }, "ipv6": {
+ "unicast": {
+ "default_originate":{
+ "r2":{
+
+ }
+
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, default_originate_config)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ step(
+ "R2 received default-originate routes and advertised it to R3 , verify on R2 and R3"
+ )
+ DEFAULT_ROUTES = {"ipv4": "0.0.0.0/0", "ipv6": "0::0/0"}
+ step(
+ "After configuring default-originate command , verify default routes are advertised on R2 "
+ )
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input,next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type])
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input,next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type])
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+
+ step(" Kill BGPd session on R2")
+ kill_router_daemons(tgen, "r2", ["bgpd"])
+ start_router_daemons(tgen, "r2", ["bgpd"])
+
+ step("verify default route is relearned after clear bgp on R2 on BGP RIB and")
+ for addr_type in ADDR_TYPES:
+ static_routes_input = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": [DEFAULT_ROUTES[addr_type]],
+ "next_hop": DEFAULT_ROUTE_NXT_HOP_R1[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = verify_fib_routes(tgen, addr_type, "r2", static_routes_input,next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type])
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, "r2", static_routes_input,next_hop=DEFAULT_ROUTE_NXT_HOP_R1[addr_type])
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ write_test_footer(tc_name)
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/ldp_topo1/r1/show_ip_ospf_neighbor.json b/tests/topotests/ldp_topo1/r1/show_ip_ospf_neighbor.json
new file mode 100644
index 000000000..d9192f110
--- /dev/null
+++ b/tests/topotests/ldp_topo1/r1/show_ip_ospf_neighbor.json
@@ -0,0 +1,14 @@
+{
+ "neighbors": {
+ "2.2.2.2": [
+ {
+ "dbSummaryCounter": 0,
+ "retransmitCounter": 0,
+ "priority": 1,
+ "converged": "Full",
+ "address": "10.0.1.2",
+ "requestCounter": 0
+ }
+ ]
+ }
+}
diff --git a/tests/topotests/ldp_topo1/r2/show_ip_ospf_neighbor.json b/tests/topotests/ldp_topo1/r2/show_ip_ospf_neighbor.json
new file mode 100644
index 000000000..ea78592bd
--- /dev/null
+++ b/tests/topotests/ldp_topo1/r2/show_ip_ospf_neighbor.json
@@ -0,0 +1,42 @@
+{
+ "neighbors": {
+ "1.1.1.1": [
+ {
+ "dbSummaryCounter": 0,
+ "retransmitCounter": 0,
+ "priority": 1,
+ "converged": "Full",
+ "address": "10.0.1.1",
+ "requestCounter": 0
+ }
+ ],
+ "3.3.3.3": [
+ {
+ "dbSummaryCounter": 0,
+ "retransmitCounter": 0,
+ "priority": 1,
+ "converged": "Full",
+ "address": "10.0.2.3",
+ "requestCounter": 0
+ },
+ {
+ "dbSummaryCounter": 0,
+ "retransmitCounter": 0,
+ "priority": 1,
+ "converged": "Full",
+ "address": "10.0.3.3",
+ "requestCounter": 0
+ }
+ ],
+ "4.4.4.4": [
+ {
+ "dbSummaryCounter": 0,
+ "retransmitCounter": 0,
+ "priority": 1,
+ "converged": "Full",
+ "address": "10.0.2.4",
+ "requestCounter": 0
+ }
+ ]
+ }
+}
diff --git a/tests/topotests/ldp_topo1/r3/show_ip_ospf_neighbor.json b/tests/topotests/ldp_topo1/r3/show_ip_ospf_neighbor.json
new file mode 100644
index 000000000..d3c50247e
--- /dev/null
+++ b/tests/topotests/ldp_topo1/r3/show_ip_ospf_neighbor.json
@@ -0,0 +1,32 @@
+{
+ "neighbors": {
+ "2.2.2.2": [
+ {
+ "dbSummaryCounter": 0,
+ "retransmitCounter": 0,
+ "priority": 1,
+ "converged": "Full",
+ "address": "10.0.2.2",
+ "requestCounter": 0
+ },
+ {
+ "dbSummaryCounter": 0,
+ "retransmitCounter": 0,
+ "priority": 1,
+ "converged": "Full",
+ "address": "10.0.3.2",
+ "requestCounter": 0
+ }
+ ],
+ "4.4.4.4": [
+ {
+ "dbSummaryCounter": 0,
+ "retransmitCounter": 0,
+ "priority": 1,
+ "converged": "Full",
+ "address": "10.0.2.4",
+ "requestCounter": 0
+ }
+ ]
+ }
+}
diff --git a/tests/topotests/ldp_topo1/r4/show_ip_ospf_neighbor.json b/tests/topotests/ldp_topo1/r4/show_ip_ospf_neighbor.json
new file mode 100644
index 000000000..20751a288
--- /dev/null
+++ b/tests/topotests/ldp_topo1/r4/show_ip_ospf_neighbor.json
@@ -0,0 +1,24 @@
+{
+ "neighbors": {
+ "2.2.2.2": [
+ {
+ "dbSummaryCounter": 0,
+ "retransmitCounter": 0,
+ "priority": 1,
+ "converged": "Full",
+ "address": "10.0.2.2",
+ "requestCounter": 0
+ }
+ ],
+ "3.3.3.3": [
+ {
+ "dbSummaryCounter": 0,
+ "retransmitCounter": 0,
+ "priority": 1,
+ "converged": "Full",
+ "address": "10.0.2.3",
+ "requestCounter": 0
+ }
+ ]
+ }
+}
diff --git a/tests/topotests/ldp_topo1/test_ldp_topo1.py b/tests/topotests/ldp_topo1/test_ldp_topo1.py
index 8d6978723..cb8adfb55 100644
--- a/tests/topotests/ldp_topo1/test_ldp_topo1.py
+++ b/tests/topotests/ldp_topo1/test_ldp_topo1.py
@@ -63,9 +63,15 @@ import os
import re
import sys
import pytest
+import json
+from functools import partial
from time import sleep
from lib.topolog import logger
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lib import topotest
from lib.topogen import Topogen, get_topogen
@@ -104,6 +110,29 @@ def build_topo(tgen):
#####################################################
##
+## Helper functions
+##
+#####################################################
+
+
+def router_compare_json_output(rname, command, reference, count=60, wait=1):
+ "Compare router JSON output"
+
+ logger.info('Comparing router "%s" "%s" output', rname, command)
+
+ tgen = get_topogen()
+ filename = "{}/{}/{}".format(CWD, rname, reference)
+ expected = json.loads(open(filename).read())
+
+ # Run test function until we get an result.
+ test_func = partial(topotest.router_json_cmp, tgen.gears[rname], command, expected)
+ _, diff = topotest.run_and_expect(test_func, None, count, wait)
+ assertmsg = '"{}" JSON output mismatches the expected result'.format(rname)
+ assert diff is None, assertmsg
+
+
+#####################################################
+##
## Tests starting
##
#####################################################
@@ -218,6 +247,19 @@ def test_mpls_interfaces():
assert fatal_error == "", fatal_error
+def test_ospf_convergence():
+ logger.info("Test: check OSPF adjacencies")
+
+ # Skip if previous fatal error condition is raised
+ if fatal_error != "":
+ pytest.skip(fatal_error)
+
+ for rname in ["r1", "r2", "r3", "r4"]:
+ router_compare_json_output(
+ rname, "show ip ospf neighbor json", "show_ip_ospf_neighbor.json"
+ )
+
+
def test_mpls_ldp_neighbor_establish():
global fatal_error
net = get_topogen().net
@@ -510,9 +552,10 @@ def test_mpls_ldp_binding():
else:
print("r%s ok" % i)
- assert (
- failures == 0
- ), "MPLS LDP Interface binding output for router r%s:\n%s" % (i, diff)
+ assert failures == 0, "MPLS LDP binding output for router r%s:\n%s" % (
+ i,
+ diff,
+ )
# Make sure that all daemons are running
for i in range(1, 5):
diff --git a/tests/topotests/ldp_vpls_topo1/r1/ldpd.conf b/tests/topotests/ldp_vpls_topo1/r1/ldpd.conf
index 594ec5a58..a19e5ccac 100644
--- a/tests/topotests/ldp_vpls_topo1/r1/ldpd.conf
+++ b/tests/topotests/ldp_vpls_topo1/r1/ldpd.conf
@@ -14,6 +14,7 @@ mpls ldp
!
address-family ipv4
discovery transport-address 1.1.1.1
+ ttl-security disable
label local allocate host-routes
!
interface r1-eth1
diff --git a/tests/topotests/ldp_vpls_topo1/r2/ldpd.conf b/tests/topotests/ldp_vpls_topo1/r2/ldpd.conf
index ffb4f0974..447b3f140 100644
--- a/tests/topotests/ldp_vpls_topo1/r2/ldpd.conf
+++ b/tests/topotests/ldp_vpls_topo1/r2/ldpd.conf
@@ -14,6 +14,7 @@ mpls ldp
!
address-family ipv4
discovery transport-address 2.2.2.2
+ ttl-security disable
label local allocate host-routes
!
interface r2-eth1
diff --git a/tests/topotests/ldp_vpls_topo1/r3/ldpd.conf b/tests/topotests/ldp_vpls_topo1/r3/ldpd.conf
index c95471ffd..ab5147149 100644
--- a/tests/topotests/ldp_vpls_topo1/r3/ldpd.conf
+++ b/tests/topotests/ldp_vpls_topo1/r3/ldpd.conf
@@ -14,6 +14,7 @@ mpls ldp
!
address-family ipv4
discovery transport-address 3.3.3.3
+ ttl-security disable
label local allocate host-routes
!
interface r3-eth1
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index 4dd44e3e9..216756f51 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -29,6 +29,7 @@ from lib.common_config import (
create_common_configurations,
FRRCFG_FILE,
InvalidCLIError,
+ apply_raw_config,
check_address_types,
find_interface_with_greater_ip,
generate_ips,
@@ -74,6 +75,12 @@ def create_router_bgp(tgen, topo=None, input_dict=None, build=False, load_config
"address_family": {
"ipv4": {
"unicast": {
+ "default_originate":{
+ "neighbor":"R2",
+ "add_type":"lo"
+ "route_map":"rm"
+
+ },
"redistribute": [{
"redist_type": "static",
"attribute": {
@@ -498,6 +505,12 @@ def __create_bgp_unicast_neighbor(
topo, input_dict, router, addr_type, add_neigh
)
config_data.extend(neigh_data)
+ # configure default originate
+ if "default_originate" in addr_data:
+ default_originate_config = __create_bgp_default_originate_neighbor(
+ topo, input_dict, router, addr_type, add_neigh
+ )
+ config_data.extend(default_originate_config)
for addr_type, addr_dict in bgp_data.items():
if not addr_dict or not check_address_types(addr_type):
@@ -515,6 +528,78 @@ def __create_bgp_unicast_neighbor(
return config_data
+def __create_bgp_default_originate_neighbor(
+ topo, input_dict, router, addr_type, add_neigh=True
+):
+ """
+ Helper API to create neighbor default - originate configuration
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `router` : router id to be configured
+ """
+ tgen = get_topogen()
+ config_data = []
+ logger.debug("Entering lib API: __create_bgp_default_originate_neighbor()")
+
+ bgp_data = input_dict["address_family"]
+ neigh_data = bgp_data[addr_type]["unicast"]["default_originate"]
+ for name, peer_dict in neigh_data.items():
+ nh_details = topo[name]
+
+ neighbor_ip = None
+ if "dest-link" in neigh_data[name]:
+ dest_link = neigh_data[name]["dest-link"]
+ neighbor_ip = nh_details["links"][dest_link][addr_type].split("/")[0]
+ elif "add_type" in neigh_data[name]:
+ add_type = neigh_data[name]["add_type"]
+ neighbor_ip = nh_details["links"][add_type][addr_type].split("/")[0]
+ else:
+ neighbor_ip = nh_details["links"][router][addr_type].split("/")[0]
+
+ config_data.append("address-family {} unicast".format(addr_type))
+ if "route_map" in peer_dict:
+ route_map = peer_dict["route_map"]
+ if "delete" in peer_dict:
+ if peer_dict["delete"]:
+ config_data.append(
+ "no neighbor {} default-originate route-map {}".format(
+ neighbor_ip, route_map
+ )
+ )
+ else:
+ config_data.append(
+ " neighbor {} default-originate route-map {}".format(
+ neighbor_ip, route_map
+ )
+ )
+ else:
+ config_data.append(
+ " neighbor {} default-originate route-map {}".format(
+ neighbor_ip, route_map
+ )
+ )
+
+ else:
+ if "delete" in peer_dict:
+ if peer_dict["delete"]:
+ config_data.append(
+ "no neighbor {} default-originate".format(neighbor_ip)
+ )
+ else:
+ config_data.append(
+ "neighbor {} default-originate".format(neighbor_ip)
+ )
+ else:
+ config_data.append("neighbor {} default-originate".format(neighbor_ip))
+
+ logger.debug("Exiting lib API: __create_bgp_default_originate_neighbor()")
+ return config_data
+
+
def __create_l2vpn_evpn_address_family(
tgen, topo, input_dict, router, config_data=None
):
@@ -4574,3 +4659,876 @@ def verify_tcp_mss(tgen, dut, neighbour, configured_tcp_mss, vrf=None):
return "TCP-MSS Mismatch"
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return False
+
+
+def get_dut_as_number(tgen, dut):
+ """
+ API to get the Autonomous Number of the given DUT
+
+ params:
+ =======
+ dut : Device Under test
+
+ returns :
+ =======
+ Success : DUT Autonomous number
+ Fail : Error message with Boolean False
+ """
+ tgen = get_topogen()
+ for router, rnode in tgen.routers().items():
+ if router == dut:
+ show_bgp_json = run_frr_cmd(rnode, "sh ip bgp summary json ", isjson=True)
+ as_number = show_bgp_json["ipv4Unicast"]["as"]
+ if as_number:
+ logger.info(
+ "[dut {}] DUT contains Automnomous number :: {} ".format(
+ dut, as_number
+ )
+ )
+ return as_number
+ else:
+ logger.error(
+ "[dut {}] ERROR....! DUT doesnot contain any Automnomous number ".format(
+ dut
+ )
+ )
+ return False
+
+
+def get_prefix_count_route(
+ tgen, topo, dut, peer, vrf=None, link=None, sent=None, received=None
+):
+ """
+ API to return the prefix count of default originate the given DUT
+ dut : Device under test
+ peer : neigbor on which you are expecting the route to be received
+
+ returns :
+ prefix_count as dict with ipv4 and ipv6 value
+ """
+ # the neighbor IP address can be accessable by finding the neigborship (vice-versa)
+
+ if link:
+ neighbor_ipv4_address = topo["routers"][peer]["links"][link]["ipv4"]
+ neighbor_ipv6_address = topo["routers"][peer]["links"][link]["ipv6"]
+ else:
+ neighbor_ipv4_address = topo["routers"][peer]["links"][dut]["ipv4"]
+ neighbor_ipv6_address = topo["routers"][peer]["links"][dut]["ipv6"]
+
+ neighbor_ipv4_address = neighbor_ipv4_address.split("/")[0]
+ neighbor_ipv6_address = neighbor_ipv6_address.split("/")[0]
+ prefix_count = {}
+ tgen = get_topogen()
+ for router, rnode in tgen.routers().items():
+ if router == dut:
+
+ if vrf:
+ ipv4_cmd = "sh ip bgp vrf {} summary json".format(vrf)
+ show_bgp_json_ipv4 = run_frr_cmd(rnode, ipv4_cmd, isjson=True)
+ ipv6_cmd = "sh ip bgp vrf {} ipv6 unicast summary json".format(vrf)
+ show_bgp_json_ipv6 = run_frr_cmd(rnode, ipv6_cmd, isjson=True)
+
+ prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"]["peers"][
+ neighbor_ipv4_address
+ ]["pfxRcd"]
+ prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
+ neighbor_ipv6_address
+ ]["pfxRcd"]
+
+ logger.info(
+ "The Prefix Count of the [DUT:{} : vrf [{}] ] towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
+ dut,
+ vrf,
+ neighbor_ipv4_address,
+ neighbor_ipv6_address,
+ prefix_count,
+ )
+ )
+ return prefix_count
+
+ else:
+ show_bgp_json_ipv4 = run_frr_cmd(
+ rnode, "sh ip bgp summary json ", isjson=True
+ )
+ show_bgp_json_ipv6 = run_frr_cmd(
+ rnode, "sh ip bgp ipv6 unicast summary json ", isjson=True
+ )
+ if received:
+ prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
+ "peers"
+ ][neighbor_ipv4_address]["pfxRcd"]
+ prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
+ neighbor_ipv6_address
+ ]["pfxRcd"]
+
+ elif sent:
+ prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
+ "peers"
+ ][neighbor_ipv4_address]["pfxSnt"]
+ prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
+ neighbor_ipv6_address
+ ]["pfxSnt"]
+
+ else:
+ prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
+ "peers"
+ ][neighbor_ipv4_address]["pfxRcd"]
+ prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
+ neighbor_ipv6_address
+ ]["pfxRcd"]
+
+ logger.info(
+ "The Prefix Count of the DUT:{} towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
+ dut, neighbor_ipv4_address, neighbor_ipv6_address, prefix_count
+ )
+ )
+ return prefix_count
+ else:
+ logger.error("ERROR...! Unknown dut {} in topolgy".format(dut))
+
+
+@retry(retry_timeout=5)
+def verify_rib_default_route(
+ tgen,
+ topo,
+ dut,
+ routes,
+ expected_nexthop,
+ metric=None,
+ origin=None,
+ locPrf=None,
+ expected_aspath=None,
+):
+ """
+ API to verify the the 'Default route" in BGP RIB with the attributes the rout carries (metric , local preference, )
+
+ param
+ =====
+ dut : device under test
+ routes : default route with expected nexthop
+ expected_nexthop : the nexthop that is expected the deafult route
+
+ """
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ tgen = get_topogen()
+ connected_routes = {}
+ for router, rnode in tgen.routers().items():
+ if router == dut:
+
+ ipv4_routes = run_frr_cmd(rnode, "sh ip bgp json", isjson=True)
+ ipv6_routes = run_frr_cmd(rnode, "sh ip bgp ipv6 unicast json", isjson=True)
+ is_ipv4_default_attrib_found = False
+ is_ipv6_default_attrib_found = False
+
+ default_ipv4_route = routes["ipv4"]
+ default_ipv6_route = "::/0"
+ ipv4_route_Origin = False
+ ipv4_route_local_pref = False
+ ipv4_route_metric = False
+
+ if default_ipv4_route in ipv4_routes["routes"].keys():
+ nxt_hop_count = len(ipv4_routes["routes"][default_ipv4_route])
+ rib_next_hops = []
+ for index in range(nxt_hop_count):
+ rib_next_hops.append(
+ ipv4_routes["routes"][default_ipv4_route][index]["nexthops"][0]["ip"]
+ )
+
+ for nxt_hop in expected_nexthop.items():
+ if nxt_hop[0] == "ipv4":
+ if nxt_hop[1] in rib_next_hops:
+ logger.info(
+ "Default routes [{}] obtained from {} .....PASSED".format(
+ default_ipv4_route, nxt_hop[1]
+ )
+ )
+ else:
+ logger.error(
+ "ERROR ...! Default routes [{}] expected is missing {}".format(
+ default_ipv4_route, nxt_hop[1]
+ )
+ )
+ return False
+
+ else:
+ pass
+
+ if "origin" in ipv4_routes["routes"][default_ipv4_route][0].keys():
+ ipv4_route_Origin = ipv4_routes["routes"][default_ipv4_route][0]["origin"]
+ if "locPrf" in ipv4_routes["routes"][default_ipv4_route][0].keys():
+ ipv4_route_local_pref = ipv4_routes["routes"][default_ipv4_route][0][
+ "locPrf"
+ ]
+ if "metric" in ipv4_routes["routes"][default_ipv4_route][0].keys():
+ ipv4_route_metric = ipv4_routes["routes"][default_ipv4_route][0]["metric"]
+ else:
+ logger.error("ERROR [ DUT {}] : The Default Route Not found in RIB".format(dut))
+ return False
+
+ origin_found = False
+ locPrf_found = False
+ metric_found = False
+ as_path_found = False
+
+ if origin:
+ if origin == ipv4_route_Origin:
+ logger.info(
+ "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
+ default_ipv4_route, origin
+ )
+ )
+ origin_found = True
+ else:
+ logger.error(
+ "ERROR... IPV4::! Expected Origin is {} obtained {}".format(
+ origin, ipv4_route_Origin
+ )
+ )
+ return False
+ else:
+ origin_found = True
+
+ if locPrf:
+ if locPrf == ipv4_route_local_pref:
+ logger.info(
+ "Dafault Route {} expected local preference {} Found in RIB....PASSED".format(
+ default_ipv4_route, locPrf
+ )
+ )
+ locPrf_found = True
+ else:
+ logger.error(
+ "ERROR... IPV4::! Expected Local preference is {} obtained {}".format(
+ locPrf, ipv4_route_local_pref
+ )
+ )
+ return False
+ else:
+ locPrf_found = True
+
+ if metric:
+ if metric == ipv4_route_metric:
+ logger.info(
+ "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
+ default_ipv4_route, metric
+ )
+ )
+
+ metric_found = True
+ else:
+ logger.error(
+ "ERROR... IPV4::! Expected metric is {} obtained {}".format(
+ metric, ipv4_route_metric
+ )
+ )
+ return False
+ else:
+ metric_found = True
+
+ if expected_aspath:
+ obtained_aspath = ipv4_routes["routes"]["0.0.0.0/0"][0]["path"]
+ if expected_aspath in obtained_aspath:
+ as_path_found = True
+ logger.info(
+ "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
+ default_ipv4_route, expected_aspath
+ )
+ )
+ else:
+ logger.error(
+ "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
+ expected_aspath, obtained_aspath
+ )
+ )
+ return False
+ else:
+ as_path_found = True
+
+ if origin_found and locPrf_found and metric_found and as_path_found:
+ is_ipv4_default_attrib_found = True
+ logger.info(
+ "IPV4:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
+ origin, locPrf, metric, expected_aspath
+ )
+ )
+ else:
+ is_ipv4_default_attrib_found = False
+ logger.error(
+ "IPV4:: Expected origin ['{}'] Obtained [{}]".format(
+ origin, ipv4_route_Origin
+ )
+ )
+ logger.error(
+ "IPV4:: Expected locPrf ['{}'] Obtained [{}]".format(
+ locPrf, ipv4_route_local_pref
+ )
+ )
+ logger.error(
+ "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
+ metric, ipv4_route_metric
+ )
+ )
+ logger.error(
+ "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
+ expected_aspath, obtained_aspath
+ )
+ )
+
+ route_Origin = False
+ route_local_pref = False
+ route_local_metric = False
+ default_ipv6_route = ""
+ try:
+ ipv6_routes["routes"]["0::0/0"]
+ default_ipv6_route = "0::0/0"
+ except:
+ ipv6_routes["routes"]["::/0"]
+ default_ipv6_route = "::/0"
+ if default_ipv6_route in ipv6_routes["routes"].keys():
+ nxt_hop_count = len(ipv6_routes["routes"][default_ipv6_route])
+ rib_next_hops = []
+ for index in range(nxt_hop_count):
+ rib_next_hops.append(
+ ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][0]["ip"]
+ )
+ try:
+ rib_next_hops.append(
+ ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][1][
+ "ip"
+ ]
+ )
+ except (KeyError, IndexError) as e:
+ logger.error("NO impact ..! Global IPV6 Address not found ")
+
+ for nxt_hop in expected_nexthop.items():
+ if nxt_hop[0] == "ipv6":
+ if nxt_hop[1] in rib_next_hops:
+ logger.info(
+ "Default routes [{}] obtained from {} .....PASSED".format(
+ default_ipv6_route, nxt_hop[1]
+ )
+ )
+ else:
+ logger.error(
+ "ERROR ...! Default routes [{}] expected from {} obtained {}".format(
+ default_ipv6_route, nxt_hop[1], rib_next_hops
+ )
+ )
+ return False
+
+ else:
+ pass
+ if "origin" in ipv6_routes["routes"][default_ipv6_route][0].keys():
+ route_Origin = ipv6_routes["routes"][default_ipv6_route][0]["origin"]
+ if "locPrf" in ipv6_routes["routes"][default_ipv6_route][0].keys():
+ route_local_pref = ipv6_routes["routes"][default_ipv6_route][0]["locPrf"]
+ if "metric" in ipv6_routes["routes"][default_ipv6_route][0].keys():
+ route_local_metric = ipv6_routes["routes"][default_ipv6_route][0]["metric"]
+
+ origin_found = False
+ locPrf_found = False
+ metric_found = False
+ as_path_found = False
+
+ if origin:
+ if origin == route_Origin:
+ logger.info(
+ "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
+ default_ipv6_route, route_Origin
+ )
+ )
+ origin_found = True
+ else:
+ logger.error(
+ "ERROR... IPV6::! Expected Origin is {} obtained {}".format(
+ origin, route_Origin
+ )
+ )
+ return False
+ else:
+ origin_found = True
+
+ if locPrf:
+ if locPrf == route_local_pref:
+ logger.info(
+ "Dafault Route {} expected Local Preference {} Found in RIB....PASSED".format(
+ default_ipv6_route, route_local_pref
+ )
+ )
+ locPrf_found = True
+ else:
+ logger.error(
+ "ERROR... IPV6::! Expected Local Preference is {} obtained {}".format(
+ locPrf, route_local_pref
+ )
+ )
+ return False
+ else:
+ locPrf_found = True
+
+ if metric:
+ if metric == route_local_metric:
+ logger.info(
+ "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
+ default_ipv4_route, metric
+ )
+ )
+
+ metric_found = True
+ else:
+ logger.error(
+ "ERROR... IPV6::! Expected metric is {} obtained {}".format(
+ metric, route_local_metric
+ )
+ )
+ return False
+ else:
+ metric_found = True
+
+ if expected_aspath:
+ obtained_aspath = ipv6_routes["routes"]["::/0"][0]["path"]
+ if expected_aspath in obtained_aspath:
+ as_path_found = True
+ logger.info(
+ "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
+ default_ipv4_route, expected_aspath
+ )
+ )
+ else:
+ logger.error(
+ "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
+ expected_aspath, obtained_aspath
+ )
+ )
+ return False
+ else:
+ as_path_found = True
+
+ if origin_found and locPrf_found and metric_found and as_path_found:
+ is_ipv6_default_attrib_found = True
+ logger.info(
+ "IPV6:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
+ origin, locPrf, metric, expected_aspath
+ )
+ )
+ else:
+ is_ipv6_default_attrib_found = False
+ logger.error(
+ "IPV6:: Expected origin ['{}'] Obtained [{}]".format(origin, route_Origin)
+ )
+ logger.error(
+ "IPV6:: Expected locPrf ['{}'] Obtained [{}]".format(
+ locPrf, route_local_pref
+ )
+ )
+ logger.error(
+ "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
+ metric, route_local_metric
+ )
+ )
+ logger.error(
+ "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
+ expected_aspath, obtained_aspath
+ )
+ )
+
+ if is_ipv4_default_attrib_found and is_ipv6_default_attrib_found:
+ logger.info("The attributes are found for default route in RIB ")
+ return True
+ else:
+ return False
+
+
+@retry(retry_timeout=5)
+def verify_fib_default_route(tgen, topo, dut, routes, expected_nexthop):
+ """
+ API to verify the the 'Default route" in FIB
+
+ param
+ =====
+ dut : device under test
+ routes : default route with expected nexthop
+ expected_nexthop : the nexthop that is expected the deafult route
+
+ """
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ tgen = get_topogen()
+ connected_routes = {}
+ for router, rnode in tgen.routers().items():
+ if router == dut:
+ ipv4_routes = run_frr_cmd(rnode, "sh ip route json", isjson=True)
+ ipv6_routes = run_frr_cmd(rnode, "sh ipv6 route json", isjson=True)
+
+ is_ipv4_default_route_found = False
+ is_ipv6_default_route_found = False
+ if routes["ipv4"] in ipv4_routes.keys():
+ rib_ipv4_nxt_hops = []
+ ipv4_default_route = routes["ipv4"]
+ nxt_hop_count = len(ipv4_routes[ipv4_default_route][0]["nexthops"])
+ for index in range(nxt_hop_count):
+ rib_ipv4_nxt_hops.append(
+ ipv4_routes[ipv4_default_route][0]["nexthops"][index]["ip"]
+ )
+
+ if expected_nexthop["ipv4"] in rib_ipv4_nxt_hops:
+ is_ipv4_default_route_found = True
+ logger.info(
+ "{} default route with next hop {} is found in FIB ".format(
+ ipv4_default_route, expected_nexthop
+ )
+ )
+ else:
+ logger.error(
+ "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
+ ipv4_default_route, expected_nexthop
+ )
+ )
+ return False
+
+ if routes["ipv6"] in ipv6_routes.keys() or "::/0" in ipv6_routes.keys():
+ rib_ipv6_nxt_hops = []
+ if "::/0" in ipv6_routes.keys():
+ ipv6_default_route = "::/0"
+ elif routes["ipv6"] in ipv6_routes.keys():
+ ipv6_default_route = routes["ipv6"]
+
+ nxt_hop_count = len(ipv6_routes[ipv6_default_route][0]["nexthops"])
+ for index in range(nxt_hop_count):
+ rib_ipv6_nxt_hops.append(
+ ipv6_routes[ipv6_default_route][0]["nexthops"][index]["ip"]
+ )
+
+ if expected_nexthop["ipv6"] in rib_ipv6_nxt_hops:
+ is_ipv6_default_route_found = True
+ logger.info(
+ "{} default route with next hop {} is found in FIB ".format(
+ ipv6_default_route, expected_nexthop
+ )
+ )
+ else:
+ logger.error(
+ "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
+ ipv6_default_route, expected_nexthop
+ )
+ )
+ return False
+
+ if is_ipv4_default_route_found and is_ipv6_default_route_found:
+ return True
+ else:
+ logger.error(
+ "Default Route for ipv4 and ipv6 address family is not found in FIB "
+ )
+ return False
+
+
+@retry(retry_timeout=5)
+def verify_bgp_advertised_routes_from_neighbor(tgen, topo, dut, peer, expected_routes):
+ """
+ APi is verifies the the routes that are advertised from dut to peer
+
+ command used :
+ "sh ip bgp neighbor <x.x.x.x> advertised-routes" and
+ "sh ip bgp ipv6 unicast neighbor<x::x> advertised-routes"
+
+ dut : Device Under Tests
+ Peer : Peer on which the routs is expected
+ expected_routes : dual stack IPV4-and IPv6 routes to be verified
+ expected_routes
+
+ returns: True / False
+
+ """
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ tgen = get_topogen()
+
+ peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0]
+ peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0]
+
+ for router, rnode in tgen.routers().items():
+ if router == dut:
+ ipv4_receieved_routes = run_frr_cmd(
+ rnode,
+ "sh ip bgp neighbor {} advertised-routes json".format(
+ peer_ipv4_neighbor_ip
+ ),
+ isjson=True,
+ )
+ ipv6_receieved_routes = run_frr_cmd(
+ rnode,
+ "sh ip bgp ipv6 unicast neighbor {} advertised-routes json".format(
+ peer_ipv6_neighbor_ip
+ ),
+ isjson=True,
+ )
+ ipv4_route_count = 0
+ ipv6_route_count = 0
+ if ipv4_receieved_routes:
+ for index in range(len(expected_routes["ipv4"])):
+ if (
+ expected_routes["ipv4"][index]["network"]
+ in ipv4_receieved_routes["advertisedRoutes"].keys()
+ ):
+ ipv4_route_count += 1
+ logger.info(
+ "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
+ dut, expected_routes["ipv4"][index]["network"], peer
+ )
+ )
+
+ elif (
+ expected_routes["ipv4"][index]["network"]
+ in ipv4_receieved_routes["bgpOriginatingDefaultNetwork"]
+ ):
+ ipv4_route_count += 1
+ logger.info(
+ "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
+ dut, expected_routes["ipv4"][index]["network"], peer
+ )
+ )
+
+ else:
+ logger.error(
+ "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
+ dut, expected_routes["ipv4"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(ipv4_receieved_routes)
+ logger.error(
+ "ERROR...! [DUT : {}] No IPV4 Routes are advertised to the peer {}".format(
+ dut, peer
+ )
+ )
+ return False
+
+ if ipv6_receieved_routes:
+ for index in range(len(expected_routes["ipv6"])):
+ if (
+ expected_routes["ipv6"][index]["network"]
+ in ipv6_receieved_routes["advertisedRoutes"].keys()
+ ):
+ ipv6_route_count += 1
+ logger.info(
+ "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
+ dut, expected_routes["ipv6"][index]["network"], peer
+ )
+ )
+ elif (
+ expected_routes["ipv6"][index]["network"]
+ in ipv6_receieved_routes["bgpOriginatingDefaultNetwork"]
+ ):
+ ipv6_route_count += 1
+ logger.info(
+ "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
+ dut, expected_routes["ipv6"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(
+ "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
+ dut, expected_routes["ipv6"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(ipv6_receieved_routes)
+ logger.error(
+ "ERROR...! [DUT : {}] No IPV6 Routes are advertised to the peer {}".format(
+ dut, peer
+ )
+ )
+ return False
+
+ if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len(
+ expected_routes["ipv6"]
+ ):
+ return True
+ else:
+ logger.error(
+ "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
+ expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"]
+ )
+ )
+ logger.error(
+ "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
+ expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"]
+ )
+ )
+ return False
+
+
+@retry(retry_timeout=5)
+def verify_bgp_received_routes_from_neighbor(tgen, topo, dut, peer, expected_routes):
+ """
+ API to verify the bgp received routes
+
+ commad used :
+ =============
+ show ip bgp neighbor <x.x.x.x> received-routes
+ show ip bgp ipv6 unicast neighbor <x::x> received-routes
+
+ params
+ =======
+ dut : Device Under Tests
+ Peer : Peer on which the routs is expected
+ expected_routes : dual stack IPV4-and IPv6 routes to be verified
+ expected_routes
+
+ returns:
+ ========
+ True / False
+ """
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ tgen = get_topogen()
+
+ peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0]
+ peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0]
+
+ logger.info("Enabling Soft configuration to neighbor INBOUND ")
+ neigbor_dict = {"ipv4": peer_ipv4_neighbor_ip, "ipv6": peer_ipv6_neighbor_ip}
+ result = configure_bgp_soft_configuration(
+ tgen, dut, neigbor_dict, direction="inbound"
+ )
+ assert (
+ result is True
+ ), " Failed to configure the soft configuration \n Error: {}".format(result)
+
+ """sleep of 10 sec is required to get the routes on peer after soft configuration"""
+ sleep(10)
+ for router, rnode in tgen.routers().items():
+ if router == dut:
+ ipv4_receieved_routes = run_frr_cmd(
+ rnode,
+ "sh ip bgp neighbor {} received-routes json".format(
+ peer_ipv4_neighbor_ip
+ ),
+ isjson=True,
+ )
+ ipv6_receieved_routes = run_frr_cmd(
+ rnode,
+ "sh ip bgp ipv6 unicast neighbor {} received-routes json".format(
+ peer_ipv6_neighbor_ip
+ ),
+ isjson=True,
+ )
+ ipv4_route_count = 0
+ ipv6_route_count = 0
+ if ipv4_receieved_routes:
+ for index in range(len(expected_routes["ipv4"])):
+ if (
+ expected_routes["ipv4"][index]["network"]
+ in ipv4_receieved_routes["receivedRoutes"].keys()
+ ):
+ ipv4_route_count += 1
+ logger.info(
+ "Success [DUT : {}] The Expected Route {} is received from {} ".format(
+ dut, expected_routes["ipv4"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(
+ "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
+ dut, expected_routes["ipv4"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(ipv4_receieved_routes)
+ logger.error(
+ "ERROR...! [DUT : {}] No IPV4 Routes are received from the peer {}".format(
+ dut, peer
+ )
+ )
+ return False
+
+ if ipv6_receieved_routes:
+ for index in range(len(expected_routes["ipv6"])):
+ if (
+ expected_routes["ipv6"][index]["network"]
+ in ipv6_receieved_routes["receivedRoutes"].keys()
+ ):
+ ipv6_route_count += 1
+ logger.info(
+ "Success [DUT : {}] The Expected Route {} is received from {} ".format(
+ dut, expected_routes["ipv6"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(
+ "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
+ dut, expected_routes["ipv6"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(ipv6_receieved_routes)
+ logger.error(
+ "ERROR...! [DUT : {}] No IPV6 Routes are received from the peer {}".format(
+ dut, peer
+ )
+ )
+ return False
+
+ if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len(
+ expected_routes["ipv6"]
+ ):
+ return True
+ else:
+ logger.error(
+ "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
+ expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"]
+ )
+ )
+ logger.error(
+ "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
+ expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"]
+ )
+ )
+ return False
+
+
+def configure_bgp_soft_configuration(tgen, dut, neighbor_dict, direction):
+ """
+ Api to configure the bgp soft configuration to show the received routes from peer
+ params
+ ======
+ dut : device under test route on which the sonfiguration to be applied
+ neighbor_dict : dict element contains ipv4 and ipv6 neigbor ip
+ direction : Directionon which it should be applied in/out
+
+ returns:
+ ========
+ boolean
+ """
+ logger.info("Enabling Soft configuration to neighbor INBOUND ")
+ local_as = get_dut_as_number(tgen, dut)
+ ipv4_neighbor = neighbor_dict["ipv4"]
+ ipv6_neighbor = neighbor_dict["ipv6"]
+ direction = direction.lower()
+ if ipv4_neighbor and ipv4_neighbor:
+ raw_config = {
+ dut: {
+ "raw_config": [
+ "router bgp {}".format(local_as),
+ "address-family ipv4 unicast",
+ "neighbor {} soft-reconfiguration {} ".format(
+ ipv4_neighbor, direction
+ ),
+ "exit-address-family",
+ "address-family ipv6 unicast",
+ "neighbor {} soft-reconfiguration {} ".format(
+ ipv6_neighbor, direction
+ ),
+ "exit-address-family",
+ ]
+ }
+ }
+ result = apply_raw_config(tgen, raw_config)
+ logger.info(
+ "Success... [DUT : {}] The soft configuration onis applied on neighbors {} ".format(
+ dut, neighbor_dict
+ )
+ )
+ return True
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index a7d8b2c9b..0e9e0ba40 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -3365,8 +3365,9 @@ def verify_rib(
nh_found = False
for st_rt in ip_list:
- st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
-
+ st_rt = str(
+ ipaddress.ip_network(frr_unicode(st_rt), strict=False)
+ )
_addr_type = validate_ip_address(st_rt)
if _addr_type != addr_type:
continue
@@ -3384,6 +3385,9 @@ def verify_rib(
next_hop = [next_hop]
for mnh in range(0, len(rib_routes_json[st_rt])):
+ if not "selected" in rib_routes_json[st_rt][mnh]:
+ continue
+
if (
"fib"
in rib_routes_json[st_rt][mnh]["nexthops"][0]
@@ -3528,8 +3532,8 @@ def verify_rib(
if nh_found:
logger.info(
- "[DUT: {}]: Found next_hop {} for all bgp"
- " routes in RIB".format(router, next_hop)
+ "[DUT: {}]: Found next_hop {} for"
+ " RIB routes: {}".format(router, next_hop, found_routes)
)
if len(missing_routes) > 0:
@@ -3593,7 +3597,7 @@ def verify_rib(
nh_found = False
for st_rt in ip_list:
- st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
+ st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
_addr_type = validate_ip_address(st_rt)
if _addr_type != addr_type:
@@ -3750,8 +3754,9 @@ def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
nh_found = False
for st_rt in ip_list:
- st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
-
+ st_rt = str(
+ ipaddress.ip_network(frr_unicode(st_rt), strict=False)
+ )
_addr_type = validate_ip_address(st_rt)
if _addr_type != addr_type:
continue
@@ -3855,7 +3860,7 @@ def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
nh_found = False
for st_rt in ip_list:
- st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
+ st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
_addr_type = validate_ip_address(st_rt)
if _addr_type != addr_type:
diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py
index 254ce4f4e..cd070e08b 100644
--- a/tests/topotests/lib/pim.py
+++ b/tests/topotests/lib/pim.py
@@ -525,6 +525,9 @@ def verify_pim_neighbors(tgen, topo, dut=None, iface=None, nbr_ip=None, expected
if "pim" not in data:
continue
+ if "pim" in data and data["pim"] == "disable":
+ continue
+
if "pim" in data and data["pim"] == "enable":
local_interface = data["interface"]
diff --git a/tests/topotests/multicast_pim_bsm_topo1/test_mcast_pim_bsmp_01.py b/tests/topotests/multicast_pim_bsm_topo1/test_mcast_pim_bsmp_01.py
index ceac78d88..4610c5f15 100644
--- a/tests/topotests/multicast_pim_bsm_topo1/test_mcast_pim_bsmp_01.py
+++ b/tests/topotests/multicast_pim_bsm_topo1/test_mcast_pim_bsmp_01.py
@@ -84,6 +84,7 @@ from lib.common_config import (
run_frr_cmd,
required_linux_kernel_version,
topo_daemons,
+ verify_rib,
)
from lib.pim import (
@@ -106,6 +107,7 @@ from lib.pim import (
clear_pim_interface_traffic,
get_pim_interface_traffic,
McastTesterHelper,
+ verify_pim_neighbors,
)
from lib.topolog import logger
from lib.topojson import build_config_from_json
@@ -180,6 +182,10 @@ def setup_module(mod):
# Creating configuration from JSON
build_config_from_json(tgen, topo)
+ # Verify PIM neighbors
+ result = verify_pim_neighbors(tgen, topo)
+ assert result is True, " Verify PIM neighbor: Failed Error: {}".format(result)
+
# XXX Replace this using "with McastTesterHelper()... " in each test if possible.
global app_helper
app_helper = McastTesterHelper(tgen)
@@ -306,6 +312,14 @@ def pre_config_to_bsm(tgen, topo, tc_name, bsr, sender, receiver, fhr, rp, lhr,
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+ # Verifying static routes are installed
+ for dut, _nexthop in zip([fhr, rp, lhr], [next_hop, next_hop_rp, next_hop_lhr]):
+ input_routes = {dut: input_dict[dut]}
+ result = verify_rib(tgen, "ipv4", dut, input_routes, _nexthop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
# RP Mapping
rp_mapping = topo["routers"][bsr]["bsm"]["bsr_packets"][packet]["rp_mapping"]
@@ -328,11 +342,24 @@ def pre_config_to_bsm(tgen, topo, tc_name, bsr, sender, receiver, fhr, rp, lhr,
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+ # Verifying static routes are installed
+ result = verify_rib(tgen, "ipv4", fhr, input_dict, next_hop_fhr)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
input_dict = {
lhr: {"static_routes": [{"network": rp_list, "next_hop": next_hop_lhr}]},
}
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+
+ # Verifying static routes are installed
+ result = verify_rib(tgen, "ipv4", lhr, input_dict, next_hop_lhr)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
return True
@@ -443,6 +470,23 @@ def test_BSR_higher_prefer_ip_p0(request):
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+ # Verifying static routes are installed
+ for dut, _nexthop in zip(["i1", "l1"], [next_hop_rp, next_hop_lhr]):
+ input_routes = {dut: input_dict[dut]}
+ result = verify_rib(tgen, "ipv4", dut, input_routes, _nexthop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ for bsr_add, next_hop in zip([BSR1_ADDR, BSR2_ADDR], [NEXT_HOP1, NEXT_HOP2]):
+ input_routes = {
+ "f1": {"static_routes": [{"network": bsr_add, "next_hop": next_hop}]}
+ }
+ result = verify_rib(tgen, "ipv4", "f1", input_routes, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
# Use scapy to send pre-defined packet from senser to receiver
step("Send BSR packet from b1 to FHR")
result = scapy_send_bsr_raw_packet(tgen, topo, "b1", "f1", "packet9")
@@ -626,6 +670,24 @@ def test_BSR_CRP_with_blackhole_address_p1(request):
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+ # Verifying static routes are installed
+ for dut, _nexthop in zip(["i1", "l1"], [next_hop_rp, next_hop_lhr]):
+ input_routes = {dut: input_dict[dut]}
+ result = verify_rib(tgen, "ipv4", dut, input_routes, _nexthop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ input_routes = {
+ "f1": {"static_routes": [{"network": CRP, "next_hop": next_hop_fhr}]}
+ }
+ result = verify_rib(tgen, "ipv4", "f1", input_routes, expected=False)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Route is still present \n Error {}".format(
+ tc_name, result
+ )
+
# Use scapy to send pre-defined packet from senser to receiver
group = topo["routers"]["b1"]["bsm"]["bsr_packets"]["packet9"]["group"]
@@ -642,6 +704,10 @@ def test_BSR_CRP_with_blackhole_address_p1(request):
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+ # Verifying static routes are installed
+ result = verify_rib(tgen, "ipv4", "f1", input_dict)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(tc_name, result)
+
intf_f1_i1 = topo["routers"]["f1"]["links"]["i1"]["interface"]
step("Verify bsm transit count is not increamented" "show ip pim interface traffic")
state_dict = {"f1": {intf_f1_i1: ["bsmTx"]}}
@@ -694,6 +760,28 @@ def test_BSR_CRP_with_blackhole_address_p1(request):
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+ # Verifying static routes are installed
+ input_dict = {
+ "f1": {"static_routes": [{"network": BSR1_ADDR, "next_hop": NEXT_HOP1}]}
+ }
+ result = verify_rib(tgen, "ipv4", "f1", input_dict, NEXT_HOP1)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(tc_name, result)
+
+ input_dict = {
+ "f1": {
+ "static_routes": [
+ {"network": [BSR1_ADDR, CRP], "next_hop": "blackhole", "delete": True}
+ ]
+ }
+ }
+ result = verify_rib(tgen, "ipv4", "f1", input_dict, expected=False)
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "Routes:[{}, {}] are still present \n Error {}".format(
+ tc_name, BSR1_ADDR, CRP, result
+ )
+ )
+
step("Sending BSR after removing black-hole address for BSR and candidate RP")
step("Send BSR packet from b1 to FHR")
result = scapy_send_bsr_raw_packet(tgen, topo, "b1", "f1", "packet9")
@@ -1614,6 +1702,14 @@ def test_iif_join_state_p0(request):
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+ # Verifying static routes are installed
+ result = verify_rib(tgen, "ipv4", "l1", input_dict, expected=False)
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Routes:{} are still present \n Error {}".format(
+ tc_name, rp_ip, result
+ )
+
# Check RP unreachable
step("Check RP unreachability")
iif = "Unknown"
@@ -1654,6 +1750,10 @@ def test_iif_join_state_p0(request):
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+ # Verifying static routes are installed
+ result = verify_rib(tgen, "ipv4", "l1", input_dict, next_hop_lhr)
+ assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result)
+
# Verify that (*,G) installed in mroute again
iif = "l1-i1-eth0"
result = verify_mroutes(tgen, dut, src_addr, GROUP_ADDRESS, iif, oil)
diff --git a/tests/topotests/multicast_pim_bsm_topo2/test_mcast_pim_bsmp_02.py b/tests/topotests/multicast_pim_bsm_topo2/test_mcast_pim_bsmp_02.py
index 2d6062bf3..191615cbb 100644
--- a/tests/topotests/multicast_pim_bsm_topo2/test_mcast_pim_bsmp_02.py
+++ b/tests/topotests/multicast_pim_bsm_topo2/test_mcast_pim_bsmp_02.py
@@ -68,6 +68,7 @@ from lib.common_config import (
run_frr_cmd,
required_linux_kernel_version,
topo_daemons,
+ verify_rib,
)
from lib.pim import (
@@ -86,6 +87,7 @@ from lib.pim import (
clear_mroute,
clear_pim_interface_traffic,
McastTesterHelper,
+ verify_pim_neighbors,
)
from lib.topolog import logger
from lib.topojson import build_config_from_json
@@ -160,6 +162,10 @@ def setup_module(mod):
# Creating configuration from JSON
build_config_from_json(tgen, topo)
+ # Verify PIM neighbors
+ result = verify_pim_neighbors(tgen, topo)
+ assert result is True, " Verify PIM neighbor: Failed Error: {}".format(result)
+
# XXX Replace this using "with McastTesterHelper()... " in each test if possible.
global app_helper
app_helper = McastTesterHelper(tgen)
@@ -247,6 +253,14 @@ def pre_config_to_bsm(tgen, topo, tc_name, bsr, sender, receiver, fhr, rp, lhr,
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+ # Verifying static routes are installed
+ for dut, _nexthop in zip([fhr, rp, lhr], [next_hop, next_hop_rp, next_hop_lhr]):
+ input_routes = {dut: input_dict[dut]}
+ result = verify_rib(tgen, "ipv4", dut, input_routes, _nexthop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
# Add kernel route for source
group = topo["routers"][bsr]["bsm"]["bsr_packets"][packet]["pkt_dst"]
bsr_interface = topo["routers"][bsr]["links"][fhr]["interface"]
@@ -285,11 +299,24 @@ def pre_config_to_bsm(tgen, topo, tc_name, bsr, sender, receiver, fhr, rp, lhr,
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+ # Verifying static routes are installed
+ result = verify_rib(tgen, "ipv4", fhr, input_dict, next_hop_fhr)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
input_dict = {
lhr: {"static_routes": [{"network": rp_list, "next_hop": next_hop_lhr}]},
}
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+
+ # Verifying static routes are installed
+ result = verify_rib(tgen, "ipv4", lhr, input_dict, next_hop_lhr)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
return True