summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorARShreenidhi <rshreenidhi@vmware.com>2022-06-07 08:34:20 +0200
committerARShreenidhi <rshreenidhi@vmware.com>2022-06-14 17:40:19 +0200
commit0d06640b85dbfebbe0e49eb2e2b442f637b3b3f5 (patch)
treebd50c67f8eddb8041dd00db2defe889a96a8342e /tests
parentMerge pull request #11395 from opensourcerouting/fix/autocompete_for_rmap_bgp (diff)
downloadfrr-0d06640b85dbfebbe0e49eb2e2b442f637b3b3f5.tar.xz
frr-0d06640b85dbfebbe0e49eb2e2b442f637b3b3f5.zip
tests : bgp default-originate api are added
Signed-off-by: ARShreenidhi <rshreenidhi@vmware.com>
Diffstat (limited to 'tests')
-rw-r--r--tests/topotests/lib/bgp.py958
1 files changed, 958 insertions, 0 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index 4dd44e3e9..216756f51 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -29,6 +29,7 @@ from lib.common_config import (
create_common_configurations,
FRRCFG_FILE,
InvalidCLIError,
+ apply_raw_config,
check_address_types,
find_interface_with_greater_ip,
generate_ips,
@@ -74,6 +75,12 @@ def create_router_bgp(tgen, topo=None, input_dict=None, build=False, load_config
"address_family": {
"ipv4": {
"unicast": {
+ "default_originate":{
+ "neighbor":"R2",
+ "add_type":"lo"
+ "route_map":"rm"
+
+ },
"redistribute": [{
"redist_type": "static",
"attribute": {
@@ -498,6 +505,12 @@ def __create_bgp_unicast_neighbor(
topo, input_dict, router, addr_type, add_neigh
)
config_data.extend(neigh_data)
+ # configure default originate
+ if "default_originate" in addr_data:
+ default_originate_config = __create_bgp_default_originate_neighbor(
+ topo, input_dict, router, addr_type, add_neigh
+ )
+ config_data.extend(default_originate_config)
for addr_type, addr_dict in bgp_data.items():
if not addr_dict or not check_address_types(addr_type):
@@ -515,6 +528,78 @@ def __create_bgp_unicast_neighbor(
return config_data
+def __create_bgp_default_originate_neighbor(
+ topo, input_dict, router, addr_type, add_neigh=True
+):
+ """
+ Helper API to create neighbor default - originate configuration
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `router` : router id to be configured
+ """
+ tgen = get_topogen()
+ config_data = []
+ logger.debug("Entering lib API: __create_bgp_default_originate_neighbor()")
+
+ bgp_data = input_dict["address_family"]
+ neigh_data = bgp_data[addr_type]["unicast"]["default_originate"]
+ for name, peer_dict in neigh_data.items():
+ nh_details = topo[name]
+
+ neighbor_ip = None
+ if "dest-link" in neigh_data[name]:
+ dest_link = neigh_data[name]["dest-link"]
+ neighbor_ip = nh_details["links"][dest_link][addr_type].split("/")[0]
+ elif "add_type" in neigh_data[name]:
+ add_type = neigh_data[name]["add_type"]
+ neighbor_ip = nh_details["links"][add_type][addr_type].split("/")[0]
+ else:
+ neighbor_ip = nh_details["links"][router][addr_type].split("/")[0]
+
+ config_data.append("address-family {} unicast".format(addr_type))
+ if "route_map" in peer_dict:
+ route_map = peer_dict["route_map"]
+ if "delete" in peer_dict:
+ if peer_dict["delete"]:
+ config_data.append(
+ "no neighbor {} default-originate route-map {}".format(
+ neighbor_ip, route_map
+ )
+ )
+ else:
+ config_data.append(
+ " neighbor {} default-originate route-map {}".format(
+ neighbor_ip, route_map
+ )
+ )
+ else:
+ config_data.append(
+ " neighbor {} default-originate route-map {}".format(
+ neighbor_ip, route_map
+ )
+ )
+
+ else:
+ if "delete" in peer_dict:
+ if peer_dict["delete"]:
+ config_data.append(
+ "no neighbor {} default-originate".format(neighbor_ip)
+ )
+ else:
+ config_data.append(
+ "neighbor {} default-originate".format(neighbor_ip)
+ )
+ else:
+ config_data.append("neighbor {} default-originate".format(neighbor_ip))
+
+ logger.debug("Exiting lib API: __create_bgp_default_originate_neighbor()")
+ return config_data
+
+
def __create_l2vpn_evpn_address_family(
tgen, topo, input_dict, router, config_data=None
):
@@ -4574,3 +4659,876 @@ def verify_tcp_mss(tgen, dut, neighbour, configured_tcp_mss, vrf=None):
return "TCP-MSS Mismatch"
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return False
+
+
+def get_dut_as_number(tgen, dut):
+ """
+ API to get the Autonomous Number of the given DUT
+
+ params:
+ =======
+ dut : Device Under test
+
+ returns :
+ =======
+ Success : DUT Autonomous number
+ Fail : Error message with Boolean False
+ """
+ tgen = get_topogen()
+ for router, rnode in tgen.routers().items():
+ if router == dut:
+ show_bgp_json = run_frr_cmd(rnode, "sh ip bgp summary json ", isjson=True)
+ as_number = show_bgp_json["ipv4Unicast"]["as"]
+ if as_number:
+ logger.info(
+ "[dut {}] DUT contains Automnomous number :: {} ".format(
+ dut, as_number
+ )
+ )
+ return as_number
+ else:
+ logger.error(
+ "[dut {}] ERROR....! DUT doesnot contain any Automnomous number ".format(
+ dut
+ )
+ )
+ return False
+
+
+def get_prefix_count_route(
+ tgen, topo, dut, peer, vrf=None, link=None, sent=None, received=None
+):
+ """
+ API to return the prefix count of default originate the given DUT
+ dut : Device under test
+ peer : neigbor on which you are expecting the route to be received
+
+ returns :
+ prefix_count as dict with ipv4 and ipv6 value
+ """
+ # the neighbor IP address can be accessable by finding the neigborship (vice-versa)
+
+ if link:
+ neighbor_ipv4_address = topo["routers"][peer]["links"][link]["ipv4"]
+ neighbor_ipv6_address = topo["routers"][peer]["links"][link]["ipv6"]
+ else:
+ neighbor_ipv4_address = topo["routers"][peer]["links"][dut]["ipv4"]
+ neighbor_ipv6_address = topo["routers"][peer]["links"][dut]["ipv6"]
+
+ neighbor_ipv4_address = neighbor_ipv4_address.split("/")[0]
+ neighbor_ipv6_address = neighbor_ipv6_address.split("/")[0]
+ prefix_count = {}
+ tgen = get_topogen()
+ for router, rnode in tgen.routers().items():
+ if router == dut:
+
+ if vrf:
+ ipv4_cmd = "sh ip bgp vrf {} summary json".format(vrf)
+ show_bgp_json_ipv4 = run_frr_cmd(rnode, ipv4_cmd, isjson=True)
+ ipv6_cmd = "sh ip bgp vrf {} ipv6 unicast summary json".format(vrf)
+ show_bgp_json_ipv6 = run_frr_cmd(rnode, ipv6_cmd, isjson=True)
+
+ prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"]["peers"][
+ neighbor_ipv4_address
+ ]["pfxRcd"]
+ prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
+ neighbor_ipv6_address
+ ]["pfxRcd"]
+
+ logger.info(
+ "The Prefix Count of the [DUT:{} : vrf [{}] ] towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
+ dut,
+ vrf,
+ neighbor_ipv4_address,
+ neighbor_ipv6_address,
+ prefix_count,
+ )
+ )
+ return prefix_count
+
+ else:
+ show_bgp_json_ipv4 = run_frr_cmd(
+ rnode, "sh ip bgp summary json ", isjson=True
+ )
+ show_bgp_json_ipv6 = run_frr_cmd(
+ rnode, "sh ip bgp ipv6 unicast summary json ", isjson=True
+ )
+ if received:
+ prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
+ "peers"
+ ][neighbor_ipv4_address]["pfxRcd"]
+ prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
+ neighbor_ipv6_address
+ ]["pfxRcd"]
+
+ elif sent:
+ prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
+ "peers"
+ ][neighbor_ipv4_address]["pfxSnt"]
+ prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
+ neighbor_ipv6_address
+ ]["pfxSnt"]
+
+ else:
+ prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
+ "peers"
+ ][neighbor_ipv4_address]["pfxRcd"]
+ prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
+ neighbor_ipv6_address
+ ]["pfxRcd"]
+
+ logger.info(
+ "The Prefix Count of the DUT:{} towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
+ dut, neighbor_ipv4_address, neighbor_ipv6_address, prefix_count
+ )
+ )
+ return prefix_count
+ else:
+ logger.error("ERROR...! Unknown dut {} in topolgy".format(dut))
+
+
+@retry(retry_timeout=5)
+def verify_rib_default_route(
+ tgen,
+ topo,
+ dut,
+ routes,
+ expected_nexthop,
+ metric=None,
+ origin=None,
+ locPrf=None,
+ expected_aspath=None,
+):
+ """
+ API to verify the the 'Default route" in BGP RIB with the attributes the rout carries (metric , local preference, )
+
+ param
+ =====
+ dut : device under test
+ routes : default route with expected nexthop
+ expected_nexthop : the nexthop that is expected the deafult route
+
+ """
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ tgen = get_topogen()
+ connected_routes = {}
+ for router, rnode in tgen.routers().items():
+ if router == dut:
+
+ ipv4_routes = run_frr_cmd(rnode, "sh ip bgp json", isjson=True)
+ ipv6_routes = run_frr_cmd(rnode, "sh ip bgp ipv6 unicast json", isjson=True)
+ is_ipv4_default_attrib_found = False
+ is_ipv6_default_attrib_found = False
+
+ default_ipv4_route = routes["ipv4"]
+ default_ipv6_route = "::/0"
+ ipv4_route_Origin = False
+ ipv4_route_local_pref = False
+ ipv4_route_metric = False
+
+ if default_ipv4_route in ipv4_routes["routes"].keys():
+ nxt_hop_count = len(ipv4_routes["routes"][default_ipv4_route])
+ rib_next_hops = []
+ for index in range(nxt_hop_count):
+ rib_next_hops.append(
+ ipv4_routes["routes"][default_ipv4_route][index]["nexthops"][0]["ip"]
+ )
+
+ for nxt_hop in expected_nexthop.items():
+ if nxt_hop[0] == "ipv4":
+ if nxt_hop[1] in rib_next_hops:
+ logger.info(
+ "Default routes [{}] obtained from {} .....PASSED".format(
+ default_ipv4_route, nxt_hop[1]
+ )
+ )
+ else:
+ logger.error(
+ "ERROR ...! Default routes [{}] expected is missing {}".format(
+ default_ipv4_route, nxt_hop[1]
+ )
+ )
+ return False
+
+ else:
+ pass
+
+ if "origin" in ipv4_routes["routes"][default_ipv4_route][0].keys():
+ ipv4_route_Origin = ipv4_routes["routes"][default_ipv4_route][0]["origin"]
+ if "locPrf" in ipv4_routes["routes"][default_ipv4_route][0].keys():
+ ipv4_route_local_pref = ipv4_routes["routes"][default_ipv4_route][0][
+ "locPrf"
+ ]
+ if "metric" in ipv4_routes["routes"][default_ipv4_route][0].keys():
+ ipv4_route_metric = ipv4_routes["routes"][default_ipv4_route][0]["metric"]
+ else:
+ logger.error("ERROR [ DUT {}] : The Default Route Not found in RIB".format(dut))
+ return False
+
+ origin_found = False
+ locPrf_found = False
+ metric_found = False
+ as_path_found = False
+
+ if origin:
+ if origin == ipv4_route_Origin:
+ logger.info(
+ "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
+ default_ipv4_route, origin
+ )
+ )
+ origin_found = True
+ else:
+ logger.error(
+ "ERROR... IPV4::! Expected Origin is {} obtained {}".format(
+ origin, ipv4_route_Origin
+ )
+ )
+ return False
+ else:
+ origin_found = True
+
+ if locPrf:
+ if locPrf == ipv4_route_local_pref:
+ logger.info(
+ "Dafault Route {} expected local preference {} Found in RIB....PASSED".format(
+ default_ipv4_route, locPrf
+ )
+ )
+ locPrf_found = True
+ else:
+ logger.error(
+ "ERROR... IPV4::! Expected Local preference is {} obtained {}".format(
+ locPrf, ipv4_route_local_pref
+ )
+ )
+ return False
+ else:
+ locPrf_found = True
+
+ if metric:
+ if metric == ipv4_route_metric:
+ logger.info(
+ "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
+ default_ipv4_route, metric
+ )
+ )
+
+ metric_found = True
+ else:
+ logger.error(
+ "ERROR... IPV4::! Expected metric is {} obtained {}".format(
+ metric, ipv4_route_metric
+ )
+ )
+ return False
+ else:
+ metric_found = True
+
+ if expected_aspath:
+ obtained_aspath = ipv4_routes["routes"]["0.0.0.0/0"][0]["path"]
+ if expected_aspath in obtained_aspath:
+ as_path_found = True
+ logger.info(
+ "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
+ default_ipv4_route, expected_aspath
+ )
+ )
+ else:
+ logger.error(
+ "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
+ expected_aspath, obtained_aspath
+ )
+ )
+ return False
+ else:
+ as_path_found = True
+
+ if origin_found and locPrf_found and metric_found and as_path_found:
+ is_ipv4_default_attrib_found = True
+ logger.info(
+ "IPV4:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
+ origin, locPrf, metric, expected_aspath
+ )
+ )
+ else:
+ is_ipv4_default_attrib_found = False
+ logger.error(
+ "IPV4:: Expected origin ['{}'] Obtained [{}]".format(
+ origin, ipv4_route_Origin
+ )
+ )
+ logger.error(
+ "IPV4:: Expected locPrf ['{}'] Obtained [{}]".format(
+ locPrf, ipv4_route_local_pref
+ )
+ )
+ logger.error(
+ "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
+ metric, ipv4_route_metric
+ )
+ )
+ logger.error(
+ "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
+ expected_aspath, obtained_aspath
+ )
+ )
+
+ route_Origin = False
+ route_local_pref = False
+ route_local_metric = False
+ default_ipv6_route = ""
+ try:
+ ipv6_routes["routes"]["0::0/0"]
+ default_ipv6_route = "0::0/0"
+ except:
+ ipv6_routes["routes"]["::/0"]
+ default_ipv6_route = "::/0"
+ if default_ipv6_route in ipv6_routes["routes"].keys():
+ nxt_hop_count = len(ipv6_routes["routes"][default_ipv6_route])
+ rib_next_hops = []
+ for index in range(nxt_hop_count):
+ rib_next_hops.append(
+ ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][0]["ip"]
+ )
+ try:
+ rib_next_hops.append(
+ ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][1][
+ "ip"
+ ]
+ )
+ except (KeyError, IndexError) as e:
+ logger.error("NO impact ..! Global IPV6 Address not found ")
+
+ for nxt_hop in expected_nexthop.items():
+ if nxt_hop[0] == "ipv6":
+ if nxt_hop[1] in rib_next_hops:
+ logger.info(
+ "Default routes [{}] obtained from {} .....PASSED".format(
+ default_ipv6_route, nxt_hop[1]
+ )
+ )
+ else:
+ logger.error(
+ "ERROR ...! Default routes [{}] expected from {} obtained {}".format(
+ default_ipv6_route, nxt_hop[1], rib_next_hops
+ )
+ )
+ return False
+
+ else:
+ pass
+ if "origin" in ipv6_routes["routes"][default_ipv6_route][0].keys():
+ route_Origin = ipv6_routes["routes"][default_ipv6_route][0]["origin"]
+ if "locPrf" in ipv6_routes["routes"][default_ipv6_route][0].keys():
+ route_local_pref = ipv6_routes["routes"][default_ipv6_route][0]["locPrf"]
+ if "metric" in ipv6_routes["routes"][default_ipv6_route][0].keys():
+ route_local_metric = ipv6_routes["routes"][default_ipv6_route][0]["metric"]
+
+ origin_found = False
+ locPrf_found = False
+ metric_found = False
+ as_path_found = False
+
+ if origin:
+ if origin == route_Origin:
+ logger.info(
+ "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
+ default_ipv6_route, route_Origin
+ )
+ )
+ origin_found = True
+ else:
+ logger.error(
+ "ERROR... IPV6::! Expected Origin is {} obtained {}".format(
+ origin, route_Origin
+ )
+ )
+ return False
+ else:
+ origin_found = True
+
+ if locPrf:
+ if locPrf == route_local_pref:
+ logger.info(
+ "Dafault Route {} expected Local Preference {} Found in RIB....PASSED".format(
+ default_ipv6_route, route_local_pref
+ )
+ )
+ locPrf_found = True
+ else:
+ logger.error(
+ "ERROR... IPV6::! Expected Local Preference is {} obtained {}".format(
+ locPrf, route_local_pref
+ )
+ )
+ return False
+ else:
+ locPrf_found = True
+
+ if metric:
+ if metric == route_local_metric:
+ logger.info(
+ "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
+ default_ipv4_route, metric
+ )
+ )
+
+ metric_found = True
+ else:
+ logger.error(
+ "ERROR... IPV6::! Expected metric is {} obtained {}".format(
+ metric, route_local_metric
+ )
+ )
+ return False
+ else:
+ metric_found = True
+
+ if expected_aspath:
+ obtained_aspath = ipv6_routes["routes"]["::/0"][0]["path"]
+ if expected_aspath in obtained_aspath:
+ as_path_found = True
+ logger.info(
+ "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
+ default_ipv4_route, expected_aspath
+ )
+ )
+ else:
+ logger.error(
+ "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
+ expected_aspath, obtained_aspath
+ )
+ )
+ return False
+ else:
+ as_path_found = True
+
+ if origin_found and locPrf_found and metric_found and as_path_found:
+ is_ipv6_default_attrib_found = True
+ logger.info(
+ "IPV6:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
+ origin, locPrf, metric, expected_aspath
+ )
+ )
+ else:
+ is_ipv6_default_attrib_found = False
+ logger.error(
+ "IPV6:: Expected origin ['{}'] Obtained [{}]".format(origin, route_Origin)
+ )
+ logger.error(
+ "IPV6:: Expected locPrf ['{}'] Obtained [{}]".format(
+ locPrf, route_local_pref
+ )
+ )
+ logger.error(
+ "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
+ metric, route_local_metric
+ )
+ )
+ logger.error(
+ "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
+ expected_aspath, obtained_aspath
+ )
+ )
+
+ if is_ipv4_default_attrib_found and is_ipv6_default_attrib_found:
+ logger.info("The attributes are found for default route in RIB ")
+ return True
+ else:
+ return False
+
+
+@retry(retry_timeout=5)
+def verify_fib_default_route(tgen, topo, dut, routes, expected_nexthop):
+ """
+ API to verify the the 'Default route" in FIB
+
+ param
+ =====
+ dut : device under test
+ routes : default route with expected nexthop
+ expected_nexthop : the nexthop that is expected the deafult route
+
+ """
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ tgen = get_topogen()
+ connected_routes = {}
+ for router, rnode in tgen.routers().items():
+ if router == dut:
+ ipv4_routes = run_frr_cmd(rnode, "sh ip route json", isjson=True)
+ ipv6_routes = run_frr_cmd(rnode, "sh ipv6 route json", isjson=True)
+
+ is_ipv4_default_route_found = False
+ is_ipv6_default_route_found = False
+ if routes["ipv4"] in ipv4_routes.keys():
+ rib_ipv4_nxt_hops = []
+ ipv4_default_route = routes["ipv4"]
+ nxt_hop_count = len(ipv4_routes[ipv4_default_route][0]["nexthops"])
+ for index in range(nxt_hop_count):
+ rib_ipv4_nxt_hops.append(
+ ipv4_routes[ipv4_default_route][0]["nexthops"][index]["ip"]
+ )
+
+ if expected_nexthop["ipv4"] in rib_ipv4_nxt_hops:
+ is_ipv4_default_route_found = True
+ logger.info(
+ "{} default route with next hop {} is found in FIB ".format(
+ ipv4_default_route, expected_nexthop
+ )
+ )
+ else:
+ logger.error(
+ "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
+ ipv4_default_route, expected_nexthop
+ )
+ )
+ return False
+
+ if routes["ipv6"] in ipv6_routes.keys() or "::/0" in ipv6_routes.keys():
+ rib_ipv6_nxt_hops = []
+ if "::/0" in ipv6_routes.keys():
+ ipv6_default_route = "::/0"
+ elif routes["ipv6"] in ipv6_routes.keys():
+ ipv6_default_route = routes["ipv6"]
+
+ nxt_hop_count = len(ipv6_routes[ipv6_default_route][0]["nexthops"])
+ for index in range(nxt_hop_count):
+ rib_ipv6_nxt_hops.append(
+ ipv6_routes[ipv6_default_route][0]["nexthops"][index]["ip"]
+ )
+
+ if expected_nexthop["ipv6"] in rib_ipv6_nxt_hops:
+ is_ipv6_default_route_found = True
+ logger.info(
+ "{} default route with next hop {} is found in FIB ".format(
+ ipv6_default_route, expected_nexthop
+ )
+ )
+ else:
+ logger.error(
+ "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
+ ipv6_default_route, expected_nexthop
+ )
+ )
+ return False
+
+ if is_ipv4_default_route_found and is_ipv6_default_route_found:
+ return True
+ else:
+ logger.error(
+ "Default Route for ipv4 and ipv6 address family is not found in FIB "
+ )
+ return False
+
+
+@retry(retry_timeout=5)
+def verify_bgp_advertised_routes_from_neighbor(tgen, topo, dut, peer, expected_routes):
+ """
+ APi is verifies the the routes that are advertised from dut to peer
+
+ command used :
+ "sh ip bgp neighbor <x.x.x.x> advertised-routes" and
+ "sh ip bgp ipv6 unicast neighbor<x::x> advertised-routes"
+
+ dut : Device Under Tests
+ Peer : Peer on which the routs is expected
+ expected_routes : dual stack IPV4-and IPv6 routes to be verified
+ expected_routes
+
+ returns: True / False
+
+ """
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ tgen = get_topogen()
+
+ peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0]
+ peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0]
+
+ for router, rnode in tgen.routers().items():
+ if router == dut:
+ ipv4_receieved_routes = run_frr_cmd(
+ rnode,
+ "sh ip bgp neighbor {} advertised-routes json".format(
+ peer_ipv4_neighbor_ip
+ ),
+ isjson=True,
+ )
+ ipv6_receieved_routes = run_frr_cmd(
+ rnode,
+ "sh ip bgp ipv6 unicast neighbor {} advertised-routes json".format(
+ peer_ipv6_neighbor_ip
+ ),
+ isjson=True,
+ )
+ ipv4_route_count = 0
+ ipv6_route_count = 0
+ if ipv4_receieved_routes:
+ for index in range(len(expected_routes["ipv4"])):
+ if (
+ expected_routes["ipv4"][index]["network"]
+ in ipv4_receieved_routes["advertisedRoutes"].keys()
+ ):
+ ipv4_route_count += 1
+ logger.info(
+ "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
+ dut, expected_routes["ipv4"][index]["network"], peer
+ )
+ )
+
+ elif (
+ expected_routes["ipv4"][index]["network"]
+ in ipv4_receieved_routes["bgpOriginatingDefaultNetwork"]
+ ):
+ ipv4_route_count += 1
+ logger.info(
+ "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
+ dut, expected_routes["ipv4"][index]["network"], peer
+ )
+ )
+
+ else:
+ logger.error(
+ "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
+ dut, expected_routes["ipv4"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(ipv4_receieved_routes)
+ logger.error(
+ "ERROR...! [DUT : {}] No IPV4 Routes are advertised to the peer {}".format(
+ dut, peer
+ )
+ )
+ return False
+
+ if ipv6_receieved_routes:
+ for index in range(len(expected_routes["ipv6"])):
+ if (
+ expected_routes["ipv6"][index]["network"]
+ in ipv6_receieved_routes["advertisedRoutes"].keys()
+ ):
+ ipv6_route_count += 1
+ logger.info(
+ "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
+ dut, expected_routes["ipv6"][index]["network"], peer
+ )
+ )
+ elif (
+ expected_routes["ipv6"][index]["network"]
+ in ipv6_receieved_routes["bgpOriginatingDefaultNetwork"]
+ ):
+ ipv6_route_count += 1
+ logger.info(
+ "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
+ dut, expected_routes["ipv6"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(
+ "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
+ dut, expected_routes["ipv6"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(ipv6_receieved_routes)
+ logger.error(
+ "ERROR...! [DUT : {}] No IPV6 Routes are advertised to the peer {}".format(
+ dut, peer
+ )
+ )
+ return False
+
+ if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len(
+ expected_routes["ipv6"]
+ ):
+ return True
+ else:
+ logger.error(
+ "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
+ expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"]
+ )
+ )
+ logger.error(
+ "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
+ expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"]
+ )
+ )
+ return False
+
+
+@retry(retry_timeout=5)
+def verify_bgp_received_routes_from_neighbor(tgen, topo, dut, peer, expected_routes):
+ """
+ API to verify the bgp received routes
+
+ commad used :
+ =============
+ show ip bgp neighbor <x.x.x.x> received-routes
+ show ip bgp ipv6 unicast neighbor <x::x> received-routes
+
+ params
+ =======
+ dut : Device Under Tests
+ Peer : Peer on which the routs is expected
+ expected_routes : dual stack IPV4-and IPv6 routes to be verified
+ expected_routes
+
+ returns:
+ ========
+ True / False
+ """
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ tgen = get_topogen()
+
+ peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0]
+ peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0]
+
+ logger.info("Enabling Soft configuration to neighbor INBOUND ")
+ neigbor_dict = {"ipv4": peer_ipv4_neighbor_ip, "ipv6": peer_ipv6_neighbor_ip}
+ result = configure_bgp_soft_configuration(
+ tgen, dut, neigbor_dict, direction="inbound"
+ )
+ assert (
+ result is True
+ ), " Failed to configure the soft configuration \n Error: {}".format(result)
+
+ """sleep of 10 sec is required to get the routes on peer after soft configuration"""
+ sleep(10)
+ for router, rnode in tgen.routers().items():
+ if router == dut:
+ ipv4_receieved_routes = run_frr_cmd(
+ rnode,
+ "sh ip bgp neighbor {} received-routes json".format(
+ peer_ipv4_neighbor_ip
+ ),
+ isjson=True,
+ )
+ ipv6_receieved_routes = run_frr_cmd(
+ rnode,
+ "sh ip bgp ipv6 unicast neighbor {} received-routes json".format(
+ peer_ipv6_neighbor_ip
+ ),
+ isjson=True,
+ )
+ ipv4_route_count = 0
+ ipv6_route_count = 0
+ if ipv4_receieved_routes:
+ for index in range(len(expected_routes["ipv4"])):
+ if (
+ expected_routes["ipv4"][index]["network"]
+ in ipv4_receieved_routes["receivedRoutes"].keys()
+ ):
+ ipv4_route_count += 1
+ logger.info(
+ "Success [DUT : {}] The Expected Route {} is received from {} ".format(
+ dut, expected_routes["ipv4"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(
+ "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
+ dut, expected_routes["ipv4"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(ipv4_receieved_routes)
+ logger.error(
+ "ERROR...! [DUT : {}] No IPV4 Routes are received from the peer {}".format(
+ dut, peer
+ )
+ )
+ return False
+
+ if ipv6_receieved_routes:
+ for index in range(len(expected_routes["ipv6"])):
+ if (
+ expected_routes["ipv6"][index]["network"]
+ in ipv6_receieved_routes["receivedRoutes"].keys()
+ ):
+ ipv6_route_count += 1
+ logger.info(
+ "Success [DUT : {}] The Expected Route {} is received from {} ".format(
+ dut, expected_routes["ipv6"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(
+ "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
+ dut, expected_routes["ipv6"][index]["network"], peer
+ )
+ )
+ else:
+ logger.error(ipv6_receieved_routes)
+ logger.error(
+ "ERROR...! [DUT : {}] No IPV6 Routes are received from the peer {}".format(
+ dut, peer
+ )
+ )
+ return False
+
+ if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len(
+ expected_routes["ipv6"]
+ ):
+ return True
+ else:
+ logger.error(
+ "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
+ expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"]
+ )
+ )
+ logger.error(
+ "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
+ expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"]
+ )
+ )
+ return False
+
+
+def configure_bgp_soft_configuration(tgen, dut, neighbor_dict, direction):
+ """
+ Api to configure the bgp soft configuration to show the received routes from peer
+ params
+ ======
+ dut : device under test route on which the sonfiguration to be applied
+ neighbor_dict : dict element contains ipv4 and ipv6 neigbor ip
+ direction : Directionon which it should be applied in/out
+
+ returns:
+ ========
+ boolean
+ """
+ logger.info("Enabling Soft configuration to neighbor INBOUND ")
+ local_as = get_dut_as_number(tgen, dut)
+ ipv4_neighbor = neighbor_dict["ipv4"]
+ ipv6_neighbor = neighbor_dict["ipv6"]
+ direction = direction.lower()
+ if ipv4_neighbor and ipv4_neighbor:
+ raw_config = {
+ dut: {
+ "raw_config": [
+ "router bgp {}".format(local_as),
+ "address-family ipv4 unicast",
+ "neighbor {} soft-reconfiguration {} ".format(
+ ipv4_neighbor, direction
+ ),
+ "exit-address-family",
+ "address-family ipv6 unicast",
+ "neighbor {} soft-reconfiguration {} ".format(
+ ipv6_neighbor, direction
+ ),
+ "exit-address-family",
+ ]
+ }
+ }
+ result = apply_raw_config(tgen, raw_config)
+ logger.info(
+ "Success... [DUT : {}] The soft configuration onis applied on neighbors {} ".format(
+ dut, neighbor_dict
+ )
+ )
+ return True