summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatt Clay <matt@mystile.com>2023-02-23 22:19:19 +0100
committerGitHub <noreply@github.com>2023-02-23 22:19:19 +0100
commit086046d4780c306b4892d4d1d053470a64612ae7 (patch)
treee12608d84e74c38c8b86b60355bc969fa7335698
parentansible-test - Fix vendoring support (#80074) (diff)
downloadansible-086046d4780c306b4892d4d1d053470a64612ae7.tar.xz
ansible-086046d4780c306b4892d4d1d053470a64612ae7.zip
Remove unused network test support files (#80080)
* Remove unused network test support files * Remove obsolete ignores
-rw-r--r--test/sanity/ignore.txt4
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/action/net_base.py90
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/become/enable.py42
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/httpapi.py324
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/netconf.py404
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/netconf.py66
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/network_agnostic.py14
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py1186
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/network.py531
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/httpapi/restconf.py91
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/netconf/netconf.py147
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/restconf/restconf.py61
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_get.py71
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_put.py82
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/netconf/default.py70
15 files changed, 0 insertions, 3183 deletions
diff --git a/test/sanity/ignore.txt b/test/sanity/ignore.txt
index 577e59d0c8..272b59bc20 100644
--- a/test/sanity/ignore.txt
+++ b/test/sanity/ignore.txt
@@ -183,7 +183,6 @@ test/support/integration/plugins/modules/sefcontext.py pylint:unused-import
test/support/integration/plugins/modules/zypper.py pylint:unused-import
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/utils.py pylint:unused-import
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/network_cli.py pylint:pointless-string-statement
-test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py pylint:pointless-string-statement
test/support/windows-integration/plugins/action/win_reboot.py pylint:unused-import
test/support/integration/plugins/modules/timezone.py pylint:disallowed-name
test/support/integration/plugins/module_utils/compat/ipaddress.py future-import-boilerplate
@@ -192,12 +191,9 @@ test/support/integration/plugins/module_utils/compat/ipaddress.py no-unicode-lit
test/support/integration/plugins/module_utils/network/common/utils.py future-import-boilerplate
test/support/integration/plugins/module_utils/network/common/utils.py metaclass-boilerplate
test/support/integration/plugins/module_utils/network/common/utils.py pylint:use-a-generator
-test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/netconf/netconf.py pylint:used-before-assignment
-test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/network.py pylint:consider-using-dict-comprehension
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/compat/ipaddress.py no-unicode-literals
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/facts/facts.py pylint:unnecessary-comprehension
test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/utils.py pylint:use-a-generator
-test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/netconf/default.py pylint:unnecessary-comprehension
test/support/network-integration/collections/ansible_collections/cisco/ios/plugins/cliconf/ios.py pylint:arguments-renamed
test/support/network-integration/collections/ansible_collections/cisco/ios/plugins/modules/ios_config.py pep8:E501
test/support/network-integration/collections/ansible_collections/cisco/ios/plugins/modules/ios_config.py pylint:used-before-assignment
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/action/net_base.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/action/net_base.py
deleted file mode 100644
index 542dcfef87..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/action/net_base.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# Copyright: (c) 2015, Ansible Inc,
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-
-__metaclass__ = type
-
-import copy
-
-from ansible.errors import AnsibleError
-from ansible.plugins.action import ActionBase
-from ansible.utils.display import Display
-
-display = Display()
-
-
-class ActionModule(ActionBase):
- def run(self, tmp=None, task_vars=None):
- del tmp # tmp no longer has any effect
-
- result = {}
- play_context = copy.deepcopy(self._play_context)
- play_context.network_os = self._get_network_os(task_vars)
- new_task = self._task.copy()
-
- module = self._get_implementation_module(
- play_context.network_os, self._task.action
- )
- if not module:
- if self._task.args["fail_on_missing_module"]:
- result["failed"] = True
- else:
- result["failed"] = False
-
- result["msg"] = (
- "Could not find implementation module %s for %s"
- % (self._task.action, play_context.network_os)
- )
- return result
-
- new_task.action = module
-
- action = self._shared_loader_obj.action_loader.get(
- play_context.network_os,
- task=new_task,
- connection=self._connection,
- play_context=play_context,
- loader=self._loader,
- templar=self._templar,
- shared_loader_obj=self._shared_loader_obj,
- )
- display.vvvv("Running implementation module %s" % module)
- return action.run(task_vars=task_vars)
-
- def _get_network_os(self, task_vars):
- if "network_os" in self._task.args and self._task.args["network_os"]:
- display.vvvv("Getting network OS from task argument")
- network_os = self._task.args["network_os"]
- elif self._play_context.network_os:
- display.vvvv("Getting network OS from inventory")
- network_os = self._play_context.network_os
- elif (
- "network_os" in task_vars.get("ansible_facts", {})
- and task_vars["ansible_facts"]["network_os"]
- ):
- display.vvvv("Getting network OS from fact")
- network_os = task_vars["ansible_facts"]["network_os"]
- else:
- raise AnsibleError(
- "ansible_network_os must be specified on this host to use platform agnostic modules"
- )
-
- return network_os
-
- def _get_implementation_module(self, network_os, platform_agnostic_module):
- module_name = (
- network_os.split(".")[-1]
- + "_"
- + platform_agnostic_module.partition("_")[2]
- )
- if "." in network_os:
- fqcn_module = ".".join(network_os.split(".")[0:-1])
- implementation_module = fqcn_module + "." + module_name
- else:
- implementation_module = module_name
-
- if implementation_module not in self._shared_loader_obj.module_loader:
- implementation_module = None
-
- return implementation_module
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/become/enable.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/become/enable.py
deleted file mode 100644
index 33938fd1e7..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/become/enable.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright: (c) 2018, Ansible Project
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-from __future__ import absolute_import, division, print_function
-
-__metaclass__ = type
-
-DOCUMENTATION = """become: enable
-short_description: Switch to elevated permissions on a network device
-description:
-- This become plugins allows elevated permissions on a remote network device.
-author: ansible (@core)
-options:
- become_pass:
- description: password
- ini:
- - section: enable_become_plugin
- key: password
- vars:
- - name: ansible_become_password
- - name: ansible_become_pass
- - name: ansible_enable_pass
- env:
- - name: ANSIBLE_BECOME_PASS
- - name: ANSIBLE_ENABLE_PASS
-notes:
-- enable is really implemented in the network connection handler and as such can only
- be used with network connections.
-- This plugin ignores the 'become_exe' and 'become_user' settings as it uses an API
- and not an executable.
-"""
-
-from ansible.plugins.become import BecomeBase
-
-
-class BecomeModule(BecomeBase):
-
- name = "ansible.netcommon.enable"
-
- def build_become_command(self, cmd, shell):
- # enable is implemented inside the network connection plugins
- return cmd
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/httpapi.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/httpapi.py
deleted file mode 100644
index b063ef0d60..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/httpapi.py
+++ /dev/null
@@ -1,324 +0,0 @@
-# (c) 2018 Red Hat Inc.
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-
-__metaclass__ = type
-
-DOCUMENTATION = """author: Ansible Networking Team
-connection: httpapi
-short_description: Use httpapi to run command on network appliances
-description:
-- This connection plugin provides a connection to remote devices over a HTTP(S)-based
- api.
-options:
- host:
- description:
- - Specifies the remote device FQDN or IP address to establish the HTTP(S) connection
- to.
- default: inventory_hostname
- vars:
- - name: ansible_host
- port:
- type: int
- description:
- - Specifies the port on the remote device that listens for connections when establishing
- the HTTP(S) connection.
- - When unspecified, will pick 80 or 443 based on the value of use_ssl.
- ini:
- - section: defaults
- key: remote_port
- env:
- - name: ANSIBLE_REMOTE_PORT
- vars:
- - name: ansible_httpapi_port
- network_os:
- description:
- - Configures the device platform network operating system. This value is used
- to load the correct httpapi plugin to communicate with the remote device
- vars:
- - name: ansible_network_os
- remote_user:
- description:
- - The username used to authenticate to the remote device when the API connection
- is first established. If the remote_user is not specified, the connection will
- use the username of the logged in user.
- - Can be configured from the CLI via the C(--user) or C(-u) options.
- ini:
- - section: defaults
- key: remote_user
- env:
- - name: ANSIBLE_REMOTE_USER
- vars:
- - name: ansible_user
- password:
- description:
- - Configures the user password used to authenticate to the remote device when
- needed for the device API.
- vars:
- - name: ansible_password
- - name: ansible_httpapi_pass
- - name: ansible_httpapi_password
- use_ssl:
- type: boolean
- description:
- - Whether to connect using SSL (HTTPS) or not (HTTP).
- default: false
- vars:
- - name: ansible_httpapi_use_ssl
- validate_certs:
- type: boolean
- description:
- - Whether to validate SSL certificates
- default: true
- vars:
- - name: ansible_httpapi_validate_certs
- use_proxy:
- type: boolean
- description:
- - Whether to use https_proxy for requests.
- default: true
- vars:
- - name: ansible_httpapi_use_proxy
- become:
- type: boolean
- description:
- - The become option will instruct the CLI session to attempt privilege escalation
- on platforms that support it. Normally this means transitioning from user mode
- to C(enable) mode in the CLI session. If become is set to True and the remote
- device does not support privilege escalation or the privilege has already been
- elevated, then this option is silently ignored.
- - Can be configured from the CLI via the C(--become) or C(-b) options.
- default: false
- ini:
- - section: privilege_escalation
- key: become
- env:
- - name: ANSIBLE_BECOME
- vars:
- - name: ansible_become
- become_method:
- description:
- - This option allows the become method to be specified in for handling privilege
- escalation. Typically the become_method value is set to C(enable) but could
- be defined as other values.
- default: sudo
- ini:
- - section: privilege_escalation
- key: become_method
- env:
- - name: ANSIBLE_BECOME_METHOD
- vars:
- - name: ansible_become_method
- persistent_connect_timeout:
- type: int
- description:
- - Configures, in seconds, the amount of time to wait when trying to initially
- establish a persistent connection. If this value expires before the connection
- to the remote device is completed, the connection will fail.
- default: 30
- ini:
- - section: persistent_connection
- key: connect_timeout
- env:
- - name: ANSIBLE_PERSISTENT_CONNECT_TIMEOUT
- vars:
- - name: ansible_connect_timeout
- persistent_command_timeout:
- type: int
- description:
- - Configures, in seconds, the amount of time to wait for a command to return from
- the remote device. If this timer is exceeded before the command returns, the
- connection plugin will raise an exception and close.
- default: 30
- ini:
- - section: persistent_connection
- key: command_timeout
- env:
- - name: ANSIBLE_PERSISTENT_COMMAND_TIMEOUT
- vars:
- - name: ansible_command_timeout
- persistent_log_messages:
- type: boolean
- description:
- - This flag will enable logging the command executed and response received from
- target device in the ansible log file. For this option to work 'log_path' ansible
- configuration option is required to be set to a file path with write access.
- - Be sure to fully understand the security implications of enabling this option
- as it could create a security vulnerability by logging sensitive information
- in log file.
- default: false
- ini:
- - section: persistent_connection
- key: log_messages
- env:
- - name: ANSIBLE_PERSISTENT_LOG_MESSAGES
- vars:
- - name: ansible_persistent_log_messages
-"""
-
-from io import BytesIO
-
-from ansible.errors import AnsibleConnectionFailure
-from ansible.module_utils._text import to_bytes
-from ansible.module_utils.six import PY3
-from ansible.module_utils.six.moves import cPickle
-from ansible.module_utils.six.moves.urllib.error import HTTPError, URLError
-from ansible.module_utils.urls import open_url
-from ansible.playbook.play_context import PlayContext
-from ansible.plugins.loader import httpapi_loader
-from ansible.plugins.connection import NetworkConnectionBase, ensure_connect
-
-
-class Connection(NetworkConnectionBase):
- """Network API connection"""
-
- transport = "ansible.netcommon.httpapi"
- has_pipelining = True
-
- def __init__(self, play_context, new_stdin, *args, **kwargs):
- super(Connection, self).__init__(
- play_context, new_stdin, *args, **kwargs
- )
-
- self._url = None
- self._auth = None
-
- if self._network_os:
-
- self.httpapi = httpapi_loader.get(self._network_os, self)
- if self.httpapi:
- self._sub_plugin = {
- "type": "httpapi",
- "name": self.httpapi._load_name,
- "obj": self.httpapi,
- }
- self.queue_message(
- "vvvv",
- "loaded API plugin %s from path %s for network_os %s"
- % (
- self.httpapi._load_name,
- self.httpapi._original_path,
- self._network_os,
- ),
- )
- else:
- raise AnsibleConnectionFailure(
- "unable to load API plugin for network_os %s"
- % self._network_os
- )
-
- else:
- raise AnsibleConnectionFailure(
- "Unable to automatically determine host network os. Please "
- "manually configure ansible_network_os value for this host"
- )
- self.queue_message("log", "network_os is set to %s" % self._network_os)
-
- def update_play_context(self, pc_data):
- """Updates the play context information for the connection"""
- pc_data = to_bytes(pc_data)
- if PY3:
- pc_data = cPickle.loads(pc_data, encoding="bytes")
- else:
- pc_data = cPickle.loads(pc_data)
- play_context = PlayContext()
- play_context.deserialize(pc_data)
-
- self.queue_message("vvvv", "updating play_context for connection")
- if self._play_context.become ^ play_context.become:
- self.set_become(play_context)
- if play_context.become is True:
- self.queue_message("vvvv", "authorizing connection")
- else:
- self.queue_message("vvvv", "deauthorizing connection")
-
- self._play_context = play_context
-
- def _connect(self):
- if not self.connected:
- protocol = "https" if self.get_option("use_ssl") else "http"
- host = self.get_option("host")
- port = self.get_option("port") or (
- 443 if protocol == "https" else 80
- )
- self._url = "%s://%s:%s" % (protocol, host, port)
-
- self.queue_message(
- "vvv",
- "ESTABLISH HTTP(S) CONNECTFOR USER: %s TO %s"
- % (self._play_context.remote_user, self._url),
- )
- self.httpapi.set_become(self._play_context)
- self._connected = True
-
- self.httpapi.login(
- self.get_option("remote_user"), self.get_option("password")
- )
-
- def close(self):
- """
- Close the active session to the device
- """
- # only close the connection if its connected.
- if self._connected:
- self.queue_message("vvvv", "closing http(s) connection to device")
- self.logout()
-
- super(Connection, self).close()
-
- @ensure_connect
- def send(self, path, data, **kwargs):
- """
- Sends the command to the device over api
- """
- url_kwargs = dict(
- timeout=self.get_option("persistent_command_timeout"),
- validate_certs=self.get_option("validate_certs"),
- use_proxy=self.get_option("use_proxy"),
- headers={},
- )
- url_kwargs.update(kwargs)
- if self._auth:
- # Avoid modifying passed-in headers
- headers = dict(kwargs.get("headers", {}))
- headers.update(self._auth)
- url_kwargs["headers"] = headers
- else:
- url_kwargs["force_basic_auth"] = True
- url_kwargs["url_username"] = self.get_option("remote_user")
- url_kwargs["url_password"] = self.get_option("password")
-
- try:
- url = self._url + path
- self._log_messages(
- "send url '%s' with data '%s' and kwargs '%s'"
- % (url, data, url_kwargs)
- )
- response = open_url(url, data=data, **url_kwargs)
- except HTTPError as exc:
- is_handled = self.handle_httperror(exc)
- if is_handled is True:
- return self.send(path, data, **kwargs)
- elif is_handled is False:
- raise
- else:
- response = is_handled
- except URLError as exc:
- raise AnsibleConnectionFailure(
- "Could not connect to {0}: {1}".format(
- self._url + path, exc.reason
- )
- )
-
- response_buffer = BytesIO()
- resp_data = response.read()
- self._log_messages("received response: '%s'" % resp_data)
- response_buffer.write(resp_data)
-
- # Try to assign a new auth token if one is given
- self._auth = self.update_auth(response, response_buffer) or self._auth
-
- response_buffer.seek(0)
-
- return response, response_buffer
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/netconf.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/netconf.py
deleted file mode 100644
index 1e2d3caa48..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/connection/netconf.py
+++ /dev/null
@@ -1,404 +0,0 @@
-# (c) 2016 Red Hat Inc.
-# (c) 2017 Ansible Project
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-
-__metaclass__ = type
-
-DOCUMENTATION = """author: Ansible Networking Team
-connection: netconf
-short_description: Provides a persistent connection using the netconf protocol
-description:
-- This connection plugin provides a connection to remote devices over the SSH NETCONF
- subsystem. This connection plugin is typically used by network devices for sending
- and receiving RPC calls over NETCONF.
-- Note this connection plugin requires ncclient to be installed on the local Ansible
- controller.
-requirements:
-- ncclient
-options:
- host:
- description:
- - Specifies the remote device FQDN or IP address to establish the SSH connection
- to.
- default: inventory_hostname
- vars:
- - name: ansible_host
- port:
- type: int
- description:
- - Specifies the port on the remote device that listens for connections when establishing
- the SSH connection.
- default: 830
- ini:
- - section: defaults
- key: remote_port
- env:
- - name: ANSIBLE_REMOTE_PORT
- vars:
- - name: ansible_port
- network_os:
- description:
- - Configures the device platform network operating system. This value is used
- to load a device specific netconf plugin. If this option is not configured
- (or set to C(auto)), then Ansible will attempt to guess the correct network_os
- to use. If it can not guess a network_os correctly it will use C(default).
- vars:
- - name: ansible_network_os
- remote_user:
- description:
- - The username used to authenticate to the remote device when the SSH connection
- is first established. If the remote_user is not specified, the connection will
- use the username of the logged in user.
- - Can be configured from the CLI via the C(--user) or C(-u) options.
- ini:
- - section: defaults
- key: remote_user
- env:
- - name: ANSIBLE_REMOTE_USER
- vars:
- - name: ansible_user
- password:
- description:
- - Configures the user password used to authenticate to the remote device when
- first establishing the SSH connection.
- vars:
- - name: ansible_password
- - name: ansible_ssh_pass
- - name: ansible_ssh_password
- - name: ansible_netconf_password
- private_key_file:
- description:
- - The private SSH key or certificate file used to authenticate to the remote device
- when first establishing the SSH connection.
- ini:
- - section: defaults
- key: private_key_file
- env:
- - name: ANSIBLE_PRIVATE_KEY_FILE
- vars:
- - name: ansible_private_key_file
- look_for_keys:
- default: true
- description:
- - Enables looking for ssh keys in the usual locations for ssh keys (e.g. :file:`~/.ssh/id_*`).
- env:
- - name: ANSIBLE_PARAMIKO_LOOK_FOR_KEYS
- ini:
- - section: paramiko_connection
- key: look_for_keys
- type: boolean
- host_key_checking:
- description: Set this to "False" if you want to avoid host key checking by the
- underlying tools Ansible uses to connect to the host
- type: boolean
- default: true
- env:
- - name: ANSIBLE_HOST_KEY_CHECKING
- - name: ANSIBLE_SSH_HOST_KEY_CHECKING
- - name: ANSIBLE_NETCONF_HOST_KEY_CHECKING
- ini:
- - section: defaults
- key: host_key_checking
- - section: paramiko_connection
- key: host_key_checking
- vars:
- - name: ansible_host_key_checking
- - name: ansible_ssh_host_key_checking
- - name: ansible_netconf_host_key_checking
- persistent_connect_timeout:
- type: int
- description:
- - Configures, in seconds, the amount of time to wait when trying to initially
- establish a persistent connection. If this value expires before the connection
- to the remote device is completed, the connection will fail.
- default: 30
- ini:
- - section: persistent_connection
- key: connect_timeout
- env:
- - name: ANSIBLE_PERSISTENT_CONNECT_TIMEOUT
- vars:
- - name: ansible_connect_timeout
- persistent_command_timeout:
- type: int
- description:
- - Configures, in seconds, the amount of time to wait for a command to return from
- the remote device. If this timer is exceeded before the command returns, the
- connection plugin will raise an exception and close.
- default: 30
- ini:
- - section: persistent_connection
- key: command_timeout
- env:
- - name: ANSIBLE_PERSISTENT_COMMAND_TIMEOUT
- vars:
- - name: ansible_command_timeout
- netconf_ssh_config:
- description:
- - This variable is used to enable bastion/jump host with netconf connection. If
- set to True the bastion/jump host ssh settings should be present in ~/.ssh/config
- file, alternatively it can be set to custom ssh configuration file path to read
- the bastion/jump host settings.
- ini:
- - section: netconf_connection
- key: ssh_config
- version_added: '2.7'
- env:
- - name: ANSIBLE_NETCONF_SSH_CONFIG
- vars:
- - name: ansible_netconf_ssh_config
- version_added: '2.7'
- persistent_log_messages:
- type: boolean
- description:
- - This flag will enable logging the command executed and response received from
- target device in the ansible log file. For this option to work 'log_path' ansible
- configuration option is required to be set to a file path with write access.
- - Be sure to fully understand the security implications of enabling this option
- as it could create a security vulnerability by logging sensitive information
- in log file.
- default: false
- ini:
- - section: persistent_connection
- key: log_messages
- env:
- - name: ANSIBLE_PERSISTENT_LOG_MESSAGES
- vars:
- - name: ansible_persistent_log_messages
-"""
-
-import os
-import logging
-import json
-
-from ansible.errors import AnsibleConnectionFailure, AnsibleError
-from ansible.module_utils._text import to_bytes, to_native, to_text
-from ansible.module_utils.basic import missing_required_lib
-from ansible.module_utils.parsing.convert_bool import (
- BOOLEANS_TRUE,
- BOOLEANS_FALSE,
-)
-from ansible.plugins.loader import netconf_loader
-from ansible.plugins.connection import NetworkConnectionBase, ensure_connect
-
-try:
- from ncclient import manager
- from ncclient.operations import RPCError
- from ncclient.transport.errors import SSHUnknownHostError
- from ncclient.xml_ import to_ele, to_xml
-
- HAS_NCCLIENT = True
- NCCLIENT_IMP_ERR = None
-except (
- ImportError,
- AttributeError,
-) as err: # paramiko and gssapi are incompatible and raise AttributeError not ImportError
- HAS_NCCLIENT = False
- NCCLIENT_IMP_ERR = err
-
-logging.getLogger("ncclient").setLevel(logging.INFO)
-
-
-class Connection(NetworkConnectionBase):
- """NetConf connections"""
-
- transport = "ansible.netcommon.netconf"
- has_pipelining = False
-
- def __init__(self, play_context, new_stdin, *args, **kwargs):
- super(Connection, self).__init__(
- play_context, new_stdin, *args, **kwargs
- )
-
- # If network_os is not specified then set the network os to auto
- # This will be used to trigger the use of guess_network_os when connecting.
- self._network_os = self._network_os or "auto"
-
- self.netconf = netconf_loader.get(self._network_os, self)
- if self.netconf:
- self._sub_plugin = {
- "type": "netconf",
- "name": self.netconf._load_name,
- "obj": self.netconf,
- }
- self.queue_message(
- "vvvv",
- "loaded netconf plugin %s from path %s for network_os %s"
- % (
- self.netconf._load_name,
- self.netconf._original_path,
- self._network_os,
- ),
- )
- else:
- self.netconf = netconf_loader.get("default", self)
- self._sub_plugin = {
- "type": "netconf",
- "name": "default",
- "obj": self.netconf,
- }
- self.queue_message(
- "display",
- "unable to load netconf plugin for network_os %s, falling back to default plugin"
- % self._network_os,
- )
-
- self.queue_message("log", "network_os is set to %s" % self._network_os)
- self._manager = None
- self.key_filename = None
- self._ssh_config = None
-
- def exec_command(self, cmd, in_data=None, sudoable=True):
- """Sends the request to the node and returns the reply
- The method accepts two forms of request. The first form is as a byte
- string that represents xml string be send over netconf session.
- The second form is a json-rpc (2.0) byte string.
- """
- if self._manager:
- # to_ele operates on native strings
- request = to_ele(to_native(cmd, errors="surrogate_or_strict"))
-
- if request is None:
- return "unable to parse request"
-
- try:
- reply = self._manager.rpc(request)
- except RPCError as exc:
- error = self.internal_error(
- data=to_text(to_xml(exc.xml), errors="surrogate_or_strict")
- )
- return json.dumps(error)
-
- return reply.data_xml
- else:
- return super(Connection, self).exec_command(cmd, in_data, sudoable)
-
- @property
- @ensure_connect
- def manager(self):
- return self._manager
-
- def _connect(self):
- if not HAS_NCCLIENT:
- raise AnsibleError(
- "%s: %s"
- % (
- missing_required_lib("ncclient"),
- to_native(NCCLIENT_IMP_ERR),
- )
- )
-
- self.queue_message("log", "ssh connection done, starting ncclient")
-
- allow_agent = True
- if self._play_context.password is not None:
- allow_agent = False
- setattr(self._play_context, "allow_agent", allow_agent)
-
- self.key_filename = (
- self._play_context.private_key_file
- or self.get_option("private_key_file")
- )
- if self.key_filename:
- self.key_filename = str(os.path.expanduser(self.key_filename))
-
- self._ssh_config = self.get_option("netconf_ssh_config")
- if self._ssh_config in BOOLEANS_TRUE:
- self._ssh_config = True
- elif self._ssh_config in BOOLEANS_FALSE:
- self._ssh_config = None
-
- # Try to guess the network_os if the network_os is set to auto
- if self._network_os == "auto":
- for cls in netconf_loader.all(class_only=True):
- network_os = cls.guess_network_os(self)
- if network_os:
- self.queue_message(
- "vvv", "discovered network_os %s" % network_os
- )
- self._network_os = network_os
-
- # If we have tried to detect the network_os but were unable to i.e. network_os is still 'auto'
- # then use default as the network_os
-
- if self._network_os == "auto":
- # Network os not discovered. Set it to default
- self.queue_message(
- "vvv",
- "Unable to discover network_os. Falling back to default.",
- )
- self._network_os = "default"
- try:
- ncclient_device_handler = self.netconf.get_option(
- "ncclient_device_handler"
- )
- except KeyError:
- ncclient_device_handler = "default"
- self.queue_message(
- "vvv",
- "identified ncclient device handler: %s."
- % ncclient_device_handler,
- )
- device_params = {"name": ncclient_device_handler}
-
- try:
- port = self._play_context.port or 830
- self.queue_message(
- "vvv",
- "ESTABLISH NETCONF SSH CONNECTION FOR USER: %s on PORT %s TO %s WITH SSH_CONFIG = %s"
- % (
- self._play_context.remote_user,
- port,
- self._play_context.remote_addr,
- self._ssh_config,
- ),
- )
- self._manager = manager.connect(
- host=self._play_context.remote_addr,
- port=port,
- username=self._play_context.remote_user,
- password=self._play_context.password,
- key_filename=self.key_filename,
- hostkey_verify=self.get_option("host_key_checking"),
- look_for_keys=self.get_option("look_for_keys"),
- device_params=device_params,
- allow_agent=self._play_context.allow_agent,
- timeout=self.get_option("persistent_connect_timeout"),
- ssh_config=self._ssh_config,
- )
-
- self._manager._timeout = self.get_option(
- "persistent_command_timeout"
- )
- except SSHUnknownHostError as exc:
- raise AnsibleConnectionFailure(to_native(exc))
- except ImportError:
- raise AnsibleError(
- "connection=netconf is not supported on {0}".format(
- self._network_os
- )
- )
-
- if not self._manager.connected:
- return 1, b"", b"not connected"
-
- self.queue_message(
- "log", "ncclient manager object created successfully"
- )
-
- self._connected = True
-
- super(Connection, self)._connect()
-
- return (
- 0,
- to_bytes(self._manager.session_id, errors="surrogate_or_strict"),
- b"",
- )
-
- def close(self):
- if self._manager:
- self._manager.close_session()
- super(Connection, self).close()
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/netconf.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/netconf.py
deleted file mode 100644
index 8789075af8..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/netconf.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-
-class ModuleDocFragment(object):
-
- # Standard files documentation fragment
- DOCUMENTATION = r"""options:
- host:
- description:
- - Specifies the DNS host name or address for connecting to the remote device over
- the specified transport. The value of host is used as the destination address
- for the transport.
- type: str
- required: true
- port:
- description:
- - Specifies the port to use when building the connection to the remote device. The
- port value will default to port 830.
- type: int
- default: 830
- username:
- description:
- - Configures the username to use to authenticate the connection to the remote
- device. This value is used to authenticate the SSH session. If the value is
- not specified in the task, the value of environment variable C(ANSIBLE_NET_USERNAME)
- will be used instead.
- type: str
- password:
- description:
- - Specifies the password to use to authenticate the connection to the remote device. This
- value is used to authenticate the SSH session. If the value is not specified
- in the task, the value of environment variable C(ANSIBLE_NET_PASSWORD) will
- be used instead.
- type: str
- timeout:
- description:
- - Specifies the timeout in seconds for communicating with the network device for
- either connecting or sending commands. If the timeout is exceeded before the
- operation is completed, the module will error.
- type: int
- default: 10
- ssh_keyfile:
- description:
- - Specifies the SSH key to use to authenticate the connection to the remote device. This
- value is the path to the key used to authenticate the SSH session. If the value
- is not specified in the task, the value of environment variable C(ANSIBLE_NET_SSH_KEYFILE)
- will be used instead.
- type: path
- hostkey_verify:
- description:
- - If set to C(yes), the ssh host key of the device must match a ssh key present
- on the host if set to C(no), the ssh host key of the device is not checked.
- type: bool
- default: true
- look_for_keys:
- description:
- - Enables looking in the usual locations for the ssh keys (e.g. :file:`~/.ssh/id_*`)
- type: bool
- default: true
-notes:
-- For information on using netconf see the :ref:`Platform Options guide using Netconf<netconf_enabled_platform_options>`
-- For more information on using Ansible to manage network devices see the :ref:`Ansible
- Network Guide <network_guide>`
-"""
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/network_agnostic.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/network_agnostic.py
deleted file mode 100644
index ad65f6ef73..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/doc_fragments/network_agnostic.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright: (c) 2019 Ansible, Inc
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-
-class ModuleDocFragment(object):
-
- # Standard files documentation fragment
- DOCUMENTATION = r"""options: {}
-notes:
-- This module is supported on C(ansible_network_os) network platforms. See the :ref:`Network
- Platform Options <platform_options>` for details.
-"""
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py
deleted file mode 100644
index 6ae47a7302..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/ipaddr.py
+++ /dev/null
@@ -1,1186 +0,0 @@
-# (c) 2014, Maciej Delmanowski <drybjed@gmail.com>
-#
-# This file is part of Ansible
-#
-# Ansible is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Ansible is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-
-# Make coding more python3-ish
-from __future__ import absolute_import, division, print_function
-
-__metaclass__ = type
-
-from functools import partial
-import types
-
-try:
- import netaddr
-except ImportError:
- # in this case, we'll make the filters return error messages (see bottom)
- netaddr = None
-else:
-
- class mac_linux(netaddr.mac_unix):
- pass
-
- mac_linux.word_fmt = "%.2x"
-
-from ansible import errors
-
-
-# ---- IP address and network query helpers ----
-def _empty_ipaddr_query(v, vtype):
- # We don't have any query to process, so just check what type the user
- # expects, and return the IP address in a correct format
- if v:
- if vtype == "address":
- return str(v.ip)
- elif vtype == "network":
- return str(v)
-
-
-def _first_last(v):
- if v.size == 2:
- first_usable = int(netaddr.IPAddress(v.first))
- last_usable = int(netaddr.IPAddress(v.last))
- return first_usable, last_usable
- elif v.size > 1:
- first_usable = int(netaddr.IPAddress(v.first + 1))
- last_usable = int(netaddr.IPAddress(v.last - 1))
- return first_usable, last_usable
-
-
-def _6to4_query(v, vtype, value):
- if v.version == 4:
-
- if v.size == 1:
- ipconv = str(v.ip)
- elif v.size > 1:
- if v.ip != v.network:
- ipconv = str(v.ip)
- else:
- ipconv = False
-
- if ipaddr(ipconv, "public"):
- numbers = list(map(int, ipconv.split(".")))
-
- try:
- return "2002:{:02x}{:02x}:{:02x}{:02x}::1/48".format(*numbers)
- except Exception:
- return False
-
- elif v.version == 6:
- if vtype == "address":
- if ipaddr(str(v), "2002::/16"):
- return value
- elif vtype == "network":
- if v.ip != v.network:
- if ipaddr(str(v.ip), "2002::/16"):
- return value
- else:
- return False
-
-
-def _ip_query(v):
- if v.size == 1:
- return str(v.ip)
- if v.size > 1:
- # /31 networks in netaddr have no broadcast address
- if v.ip != v.network or not v.broadcast:
- return str(v.ip)
-
-
-def _gateway_query(v):
- if v.size > 1:
- if v.ip != v.network:
- return str(v.ip) + "/" + str(v.prefixlen)
-
-
-def _address_prefix_query(v):
- if v.size > 1:
- if v.ip != v.network:
- return str(v.ip) + "/" + str(v.prefixlen)
-
-
-def _bool_ipaddr_query(v):
- if v:
- return True
-
-
-def _broadcast_query(v):
- if v.size > 2:
- return str(v.broadcast)
-
-
-def _cidr_query(v):
- return str(v)
-
-
-def _cidr_lookup_query(v, iplist, value):
- try:
- if v in iplist:
- return value
- except Exception:
- return False
-
-
-def _first_usable_query(v, vtype):
- if vtype == "address":
- "Does it make sense to raise an error"
- raise errors.AnsibleFilterError("Not a network address")
- elif vtype == "network":
- if v.size == 2:
- return str(netaddr.IPAddress(int(v.network)))
- elif v.size > 1:
- return str(netaddr.IPAddress(int(v.network) + 1))
-
-
-def _host_query(v):
- if v.size == 1:
- return str(v)
- elif v.size > 1:
- if v.ip != v.network:
- return str(v.ip) + "/" + str(v.prefixlen)
-
-
-def _hostmask_query(v):
- return str(v.hostmask)
-
-
-def _int_query(v, vtype):
- if vtype == "address":
- return int(v.ip)
- elif vtype == "network":
- return str(int(v.ip)) + "/" + str(int(v.prefixlen))
-
-
-def _ip_prefix_query(v):
- if v.size == 2:
- return str(v.ip) + "/" + str(v.prefixlen)
- elif v.size > 1:
- if v.ip != v.network:
- return str(v.ip) + "/" + str(v.prefixlen)
-
-
-def _ip_netmask_query(v):
- if v.size == 2:
- return str(v.ip) + " " + str(v.netmask)
- elif v.size > 1:
- if v.ip != v.network:
- return str(v.ip) + " " + str(v.netmask)
-
-
-"""
-def _ip_wildcard_query(v):
- if v.size == 2:
- return str(v.ip) + ' ' + str(v.hostmask)
- elif v.size > 1:
- if v.ip != v.network:
- return str(v.ip) + ' ' + str(v.hostmask)
-"""
-
-
-def _ipv4_query(v, value):
- if v.version == 6:
- try:
- return str(v.ipv4())
- except Exception:
- return False
- else:
- return value
-
-
-def _ipv6_query(v, value):
- if v.version == 4:
- return str(v.ipv6())
- else:
- return value
-
-
-def _last_usable_query(v, vtype):
- if vtype == "address":
- "Does it make sense to raise an error"
- raise errors.AnsibleFilterError("Not a network address")
- elif vtype == "network":
- if v.size > 1:
- first_usable, last_usable = _first_last(v)
- return str(netaddr.IPAddress(last_usable))
-
-
-def _link_local_query(v, value):
- v_ip = netaddr.IPAddress(str(v.ip))
- if v.version == 4:
- if ipaddr(str(v_ip), "169.254.0.0/24"):
- return value
-
- elif v.version == 6:
- if ipaddr(str(v_ip), "fe80::/10"):
- return value
-
-
-def _loopback_query(v, value):
- v_ip = netaddr.IPAddress(str(v.ip))
- if v_ip.is_loopback():
- return value
-
-
-def _multicast_query(v, value):
- if v.is_multicast():
- return value
-
-
-def _net_query(v):
- if v.size > 1:
- if v.ip == v.network:
- return str(v.network) + "/" + str(v.prefixlen)
-
-
-def _netmask_query(v):
- return str(v.netmask)
-
-
-def _network_query(v):
- """Return the network of a given IP or subnet"""
- return str(v.network)
-
-
-def _network_id_query(v):
- """Return the network of a given IP or subnet"""
- return str(v.network)
-
-
-def _network_netmask_query(v):
- return str(v.network) + " " + str(v.netmask)
-
-
-def _network_wildcard_query(v):
- return str(v.network) + " " + str(v.hostmask)
-
-
-def _next_usable_query(v, vtype):
- if vtype == "address":
- "Does it make sense to raise an error"
- raise errors.AnsibleFilterError("Not a network address")
- elif vtype == "network":
- if v.size > 1:
- first_usable, last_usable = _first_last(v)
- next_ip = int(netaddr.IPAddress(int(v.ip) + 1))
- if next_ip >= first_usable and next_ip <= last_usable:
- return str(netaddr.IPAddress(int(v.ip) + 1))
-
-
-def _peer_query(v, vtype):
- if vtype == "address":
- raise errors.AnsibleFilterError("Not a network address")
- elif vtype == "network":
- if v.size == 2:
- return str(netaddr.IPAddress(int(v.ip) ^ 1))
- if v.size == 4:
- if int(v.ip) % 4 == 0:
- raise errors.AnsibleFilterError(
- "Network address of /30 has no peer"
- )
- if int(v.ip) % 4 == 3:
- raise errors.AnsibleFilterError(
- "Broadcast address of /30 has no peer"
- )
- return str(netaddr.IPAddress(int(v.ip) ^ 3))
- raise errors.AnsibleFilterError("Not a point-to-point network")
-
-
-def _prefix_query(v):
- return int(v.prefixlen)
-
-
-def _previous_usable_query(v, vtype):
- if vtype == "address":
- "Does it make sense to raise an error"
- raise errors.AnsibleFilterError("Not a network address")
- elif vtype == "network":
- if v.size > 1:
- first_usable, last_usable = _first_last(v)
- previous_ip = int(netaddr.IPAddress(int(v.ip) - 1))
- if previous_ip >= first_usable and previous_ip <= last_usable:
- return str(netaddr.IPAddress(int(v.ip) - 1))
-
-
-def _private_query(v, value):
- if v.is_private():
- return value
-
-
-def _public_query(v, value):
- v_ip = netaddr.IPAddress(str(v.ip))
- if (
- v_ip.is_unicast()
- and not v_ip.is_private()
- and not v_ip.is_loopback()
- and not v_ip.is_netmask()
- and not v_ip.is_hostmask()
- ):
- return value
-
-
-def _range_usable_query(v, vtype):
- if vtype == "address":
- "Does it make sense to raise an error"
- raise errors.AnsibleFilterError("Not a network address")
- elif vtype == "network":
- if v.size > 1:
- first_usable, last_usable = _first_last(v)
- first_usable = str(netaddr.IPAddress(first_usable))
- last_usable = str(netaddr.IPAddress(last_usable))
- return "{0}-{1}".format(first_usable, last_usable)
-
-
-def _revdns_query(v):
- v_ip = netaddr.IPAddress(str(v.ip))
- return v_ip.reverse_dns
-
-
-def _size_query(v):
- return v.size
-
-
-def _size_usable_query(v):
- if v.size == 1:
- return 0
- elif v.size == 2:
- return 2
- return v.size - 2
-
-
-def _subnet_query(v):
- return str(v.cidr)
-
-
-def _type_query(v):
- if v.size == 1:
- return "address"
- if v.size > 1:
- if v.ip != v.network:
- return "address"
- else:
- return "network"
-
-
-def _unicast_query(v, value):
- if v.is_unicast():
- return value
-
-
-def _version_query(v):
- return v.version
-
-
-def _wrap_query(v, vtype, value):
- if v.version == 6:
- if vtype == "address":
- return "[" + str(v.ip) + "]"
- elif vtype == "network":
- return "[" + str(v.ip) + "]/" + str(v.prefixlen)
- else:
- return value
-
-
-# ---- HWaddr query helpers ----
-def _bare_query(v):
- v.dialect = netaddr.mac_bare
- return str(v)
-
-
-def _bool_hwaddr_query(v):
- if v:
- return True
-
-
-def _int_hwaddr_query(v):
- return int(v)
-
-
-def _cisco_query(v):
- v.dialect = netaddr.mac_cisco
- return str(v)
-
-
-def _empty_hwaddr_query(v, value):
- if v:
- return value
-
-
-def _linux_query(v):
- v.dialect = mac_linux
- return str(v)
-
-
-def _postgresql_query(v):
- v.dialect = netaddr.mac_pgsql
- return str(v)
-
-
-def _unix_query(v):
- v.dialect = netaddr.mac_unix
- return str(v)
-
-
-def _win_query(v):
- v.dialect = netaddr.mac_eui48
- return str(v)
-
-
-# ---- IP address and network filters ----
-
-# Returns a minified list of subnets or a single subnet that spans all of
-# the inputs.
-def cidr_merge(value, action="merge"):
- if not hasattr(value, "__iter__"):
- raise errors.AnsibleFilterError(
- "cidr_merge: expected iterable, got " + repr(value)
- )
-
- if action == "merge":
- try:
- return [str(ip) for ip in netaddr.cidr_merge(value)]
- except Exception as e:
- raise errors.AnsibleFilterError(
- "cidr_merge: error in netaddr:\n%s" % e
- )
-
- elif action == "span":
- # spanning_cidr needs at least two values
- if len(value) == 0:
- return None
- elif len(value) == 1:
- try:
- return str(netaddr.IPNetwork(value[0]))
- except Exception as e:
- raise errors.AnsibleFilterError(
- "cidr_merge: error in netaddr:\n%s" % e
- )
- else:
- try:
- return str(netaddr.spanning_cidr(value))
- except Exception as e:
- raise errors.AnsibleFilterError(
- "cidr_merge: error in netaddr:\n%s" % e
- )
-
- else:
- raise errors.AnsibleFilterError(
- "cidr_merge: invalid action '%s'" % action
- )
-
-
-def ipaddr(value, query="", version=False, alias="ipaddr"):
- """ Check if string is an IP address or network and filter it """
-
- query_func_extra_args = {
- "": ("vtype",),
- "6to4": ("vtype", "value"),
- "cidr_lookup": ("iplist", "value"),
- "first_usable": ("vtype",),
- "int": ("vtype",),
- "ipv4": ("value",),
- "ipv6": ("value",),
- "last_usable": ("vtype",),
- "link-local": ("value",),
- "loopback": ("value",),
- "lo": ("value",),
- "multicast": ("value",),
- "next_usable": ("vtype",),
- "peer": ("vtype",),
- "previous_usable": ("vtype",),
- "private": ("value",),
- "public": ("value",),
- "unicast": ("value",),
- "range_usable": ("vtype",),
- "wrap": ("vtype", "value"),
- }
-
- query_func_map = {
- "": _empty_ipaddr_query,
- "6to4": _6to4_query,
- "address": _ip_query,
- "address/prefix": _address_prefix_query, # deprecate
- "bool": _bool_ipaddr_query,
- "broadcast": _broadcast_query,
- "cidr": _cidr_query,
- "cidr_lookup": _cidr_lookup_query,
- "first_usable": _first_usable_query,
- "gateway": _gateway_query, # deprecate
- "gw": _gateway_query, # deprecate
- "host": _host_query,
- "host/prefix": _address_prefix_query, # deprecate
- "hostmask": _hostmask_query,
- "hostnet": _gateway_query, # deprecate
- "int": _int_query,
- "ip": _ip_query,
- "ip/prefix": _ip_prefix_query,
- "ip_netmask": _ip_netmask_query,
- # 'ip_wildcard': _ip_wildcard_query, built then could not think of use case
- "ipv4": _ipv4_query,
- "ipv6": _ipv6_query,
- "last_usable": _last_usable_query,
- "link-local": _link_local_query,
- "lo": _loopback_query,
- "loopback": _loopback_query,
- "multicast": _multicast_query,
- "net": _net_query,
- "next_usable": _next_usable_query,
- "netmask": _netmask_query,
- "network": _network_query,
- "network_id": _network_id_query,
- "network/prefix": _subnet_query,
- "network_netmask": _network_netmask_query,
- "network_wildcard": _network_wildcard_query,
- "peer": _peer_query,
- "prefix": _prefix_query,
- "previous_usable": _previous_usable_query,
- "private": _private_query,
- "public": _public_query,
- "range_usable": _range_usable_query,
- "revdns": _revdns_query,
- "router": _gateway_query, # deprecate
- "size": _size_query,
- "size_usable": _size_usable_query,
- "subnet": _subnet_query,
- "type": _type_query,
- "unicast": _unicast_query,
- "v4": _ipv4_query,
- "v6": _ipv6_query,
- "version": _version_query,
- "wildcard": _hostmask_query,
- "wrap": _wrap_query,
- }
-
- vtype = None
-
- if not value:
- return False
-
- elif value is True:
- return False
-
- # Check if value is a list and parse each element
- elif isinstance(value, (list, tuple, types.GeneratorType)):
-
- _ret = []
- for element in value:
- if ipaddr(element, str(query), version):
- _ret.append(ipaddr(element, str(query), version))
-
- if _ret:
- return _ret
- else:
- return list()
-
- # Check if value is a number and convert it to an IP address
- elif str(value).isdigit():
-
- # We don't know what IP version to assume, so let's check IPv4 first,
- # then IPv6
- try:
- if (not version) or (version and version == 4):
- v = netaddr.IPNetwork("0.0.0.0/0")
- v.value = int(value)
- v.prefixlen = 32
- elif version and version == 6:
- v = netaddr.IPNetwork("::/0")
- v.value = int(value)
- v.prefixlen = 128
-
- # IPv4 didn't work the first time, so it definitely has to be IPv6
- except Exception:
- try:
- v = netaddr.IPNetwork("::/0")
- v.value = int(value)
- v.prefixlen = 128
-
- # The value is too big for IPv6. Are you a nanobot?
- except Exception:
- return False
-
- # We got an IP address, let's mark it as such
- value = str(v)
- vtype = "address"
-
- # value has not been recognized, check if it's a valid IP string
- else:
- try:
- v = netaddr.IPNetwork(value)
-
- # value is a valid IP string, check if user specified
- # CIDR prefix or just an IP address, this will indicate default
- # output format
- try:
- address, prefix = value.split("/")
- vtype = "network"
- except Exception:
- vtype = "address"
-
- # value hasn't been recognized, maybe it's a numerical CIDR?
- except Exception:
- try:
- address, prefix = value.split("/")
- address.isdigit()
- address = int(address)
- prefix.isdigit()
- prefix = int(prefix)
-
- # It's not numerical CIDR, give up
- except Exception:
- return False
-
- # It is something, so let's try and build a CIDR from the parts
- try:
- v = netaddr.IPNetwork("0.0.0.0/0")
- v.value = address
- v.prefixlen = prefix
-
- # It's not a valid IPv4 CIDR
- except Exception:
- try:
- v = netaddr.IPNetwork("::/0")
- v.value = address
- v.prefixlen = prefix
-
- # It's not a valid IPv6 CIDR. Give up.
- except Exception:
- return False
-
- # We have a valid CIDR, so let's write it in correct format
- value = str(v)
- vtype = "network"
-
- # We have a query string but it's not in the known query types. Check if
- # that string is a valid subnet, if so, we can check later if given IP
- # address/network is inside that specific subnet
- try:
- # ?? 6to4 and link-local were True here before. Should they still?
- if (
- query
- and (query not in query_func_map or query == "cidr_lookup")
- and not str(query).isdigit()
- and ipaddr(query, "network")
- ):
- iplist = netaddr.IPSet([netaddr.IPNetwork(query)])
- query = "cidr_lookup"
- except Exception:
- pass
-
- # This code checks if value maches the IP version the user wants, ie. if
- # it's any version ("ipaddr()"), IPv4 ("ipv4()") or IPv6 ("ipv6()")
- # If version does not match, return False
- if version and v.version != version:
- return False
-
- extras = []
- for arg in query_func_extra_args.get(query, tuple()):
- extras.append(locals()[arg])
- try:
- return query_func_map[query](v, *extras)
- except KeyError:
- try:
- float(query)
- if v.size == 1:
- if vtype == "address":
- return str(v.ip)
- elif vtype == "network":
- return str(v)
-
- elif v.size > 1:
- try:
- return str(v[query]) + "/" + str(v.prefixlen)
- except Exception:
- return False
-
- else:
- return value
-
- except Exception:
- raise errors.AnsibleFilterError(
- alias + ": unknown filter type: %s" % query
- )
-
- return False
-
-
-def ipmath(value, amount):
- try:
- if "/" in value:
- ip = netaddr.IPNetwork(value).ip
- else:
- ip = netaddr.IPAddress(value)
- except (netaddr.AddrFormatError, ValueError):
- msg = "You must pass a valid IP address; {0} is invalid".format(value)
- raise errors.AnsibleFilterError(msg)
-
- if not isinstance(amount, int):
- msg = (
- "You must pass an integer for arithmetic; "
- "{0} is not a valid integer"
- ).format(amount)
- raise errors.AnsibleFilterError(msg)
-
- return str(ip + amount)
-
-
-def ipwrap(value, query=""):
- try:
- if isinstance(value, (list, tuple, types.GeneratorType)):
- _ret = []
- for element in value:
- if ipaddr(element, query, version=False, alias="ipwrap"):
- _ret.append(ipaddr(element, "wrap"))
- else:
- _ret.append(element)
-
- return _ret
- else:
- _ret = ipaddr(value, query, version=False, alias="ipwrap")
- if _ret:
- return ipaddr(_ret, "wrap")
- else:
- return value
-
- except Exception:
- return value
-
-
-def ipv4(value, query=""):
- return ipaddr(value, query, version=4, alias="ipv4")
-
-
-def ipv6(value, query=""):
- return ipaddr(value, query, version=6, alias="ipv6")
-
-
-# Split given subnet into smaller subnets or find out the biggest subnet of
-# a given IP address with given CIDR prefix
-# Usage:
-#
-# - address or address/prefix | ipsubnet
-# returns CIDR subnet of a given input
-#
-# - address/prefix | ipsubnet(cidr)
-# returns number of possible subnets for given CIDR prefix
-#
-# - address/prefix | ipsubnet(cidr, index)
-# returns new subnet with given CIDR prefix
-#
-# - address | ipsubnet(cidr)
-# returns biggest subnet with given CIDR prefix that address belongs to
-#
-# - address | ipsubnet(cidr, index)
-# returns next indexed subnet which contains given address
-#
-# - address/prefix | ipsubnet(subnet/prefix)
-# return the index of the subnet in the subnet
-def ipsubnet(value, query="", index="x"):
- """ Manipulate IPv4/IPv6 subnets """
-
- try:
- vtype = ipaddr(value, "type")
- if vtype == "address":
- v = ipaddr(value, "cidr")
- elif vtype == "network":
- v = ipaddr(value, "subnet")
-
- value = netaddr.IPNetwork(v)
- except Exception:
- return False
- query_string = str(query)
- if not query:
- return str(value)
-
- elif query_string.isdigit():
- vsize = ipaddr(v, "size")
- query = int(query)
-
- try:
- float(index)
- index = int(index)
-
- if vsize > 1:
- try:
- return str(list(value.subnet(query))[index])
- except Exception:
- return False
-
- elif vsize == 1:
- try:
- return str(value.supernet(query)[index])
- except Exception:
- return False
-
- except Exception:
- if vsize > 1:
- try:
- return str(len(list(value.subnet(query))))
- except Exception:
- return False
-
- elif vsize == 1:
- try:
- return str(value.supernet(query)[0])
- except Exception:
- return False
-
- elif query_string:
- vtype = ipaddr(query, "type")
- if vtype == "address":
- v = ipaddr(query, "cidr")
- elif vtype == "network":
- v = ipaddr(query, "subnet")
- else:
- msg = "You must pass a valid subnet or IP address; {0} is invalid".format(
- query_string
- )
- raise errors.AnsibleFilterError(msg)
- query = netaddr.IPNetwork(v)
- for i, subnet in enumerate(query.subnet(value.prefixlen), 1):
- if subnet == value:
- return str(i)
- msg = "{0} is not in the subnet {1}".format(value.cidr, query.cidr)
- raise errors.AnsibleFilterError(msg)
- return False
-
-
-# Returns the nth host within a network described by value.
-# Usage:
-#
-# - address or address/prefix | nthhost(nth)
-# returns the nth host within the given network
-def nthhost(value, query=""):
- """ Get the nth host within a given network """
- try:
- vtype = ipaddr(value, "type")
- if vtype == "address":
- v = ipaddr(value, "cidr")
- elif vtype == "network":
- v = ipaddr(value, "subnet")
-
- value = netaddr.IPNetwork(v)
- except Exception:
- return False
-
- if not query:
- return False
-
- try:
- nth = int(query)
- if value.size > nth:
- return value[nth]
-
- except ValueError:
- return False
-
- return False
-
-
-# Returns the next nth usable ip within a network described by value.
-def next_nth_usable(value, offset):
- try:
- vtype = ipaddr(value, "type")
- if vtype == "address":
- v = ipaddr(value, "cidr")
- elif vtype == "network":
- v = ipaddr(value, "subnet")
-
- v = netaddr.IPNetwork(v)
- except Exception:
- return False
-
- if type(offset) != int:
- raise errors.AnsibleFilterError("Must pass in an integer")
- if v.size > 1:
- first_usable, last_usable = _first_last(v)
- nth_ip = int(netaddr.IPAddress(int(v.ip) + offset))
- if nth_ip >= first_usable and nth_ip <= last_usable:
- return str(netaddr.IPAddress(int(v.ip) + offset))
-
-
-# Returns the previous nth usable ip within a network described by value.
-def previous_nth_usable(value, offset):
- try:
- vtype = ipaddr(value, "type")
- if vtype == "address":
- v = ipaddr(value, "cidr")
- elif vtype == "network":
- v = ipaddr(value, "subnet")
-
- v = netaddr.IPNetwork(v)
- except Exception:
- return False
-
- if type(offset) != int:
- raise errors.AnsibleFilterError("Must pass in an integer")
- if v.size > 1:
- first_usable, last_usable = _first_last(v)
- nth_ip = int(netaddr.IPAddress(int(v.ip) - offset))
- if nth_ip >= first_usable and nth_ip <= last_usable:
- return str(netaddr.IPAddress(int(v.ip) - offset))
-
-
-def _range_checker(ip_check, first, last):
- """
- Tests whether an ip address is within the bounds of the first and last address.
-
- :param ip_check: The ip to test if it is within first and last.
- :param first: The first IP in the range to test against.
- :param last: The last IP in the range to test against.
-
- :return: bool
- """
- if ip_check >= first and ip_check <= last:
- return True
- else:
- return False
-
-
-def _address_normalizer(value):
- """
- Used to validate an address or network type and return it in a consistent format.
- This is being used for future use cases not currently available such as an address range.
-
- :param value: The string representation of an address or network.
-
- :return: The address or network in the normalized form.
- """
- try:
- vtype = ipaddr(value, "type")
- if vtype == "address" or vtype == "network":
- v = ipaddr(value, "subnet")
- except Exception:
- return False
-
- return v
-
-
-def network_in_usable(value, test):
- """
- Checks whether 'test' is a useable address or addresses in 'value'
-
- :param: value: The string representation of an address or network to test against.
- :param test: The string representation of an address or network to validate if it is within the range of 'value'.
-
- :return: bool
- """
- # normalize value and test variables into an ipaddr
- v = _address_normalizer(value)
- w = _address_normalizer(test)
-
- # get first and last addresses as integers to compare value and test; or cathes value when case is /32
- v_first = ipaddr(ipaddr(v, "first_usable") or ipaddr(v, "address"), "int")
- v_last = ipaddr(ipaddr(v, "last_usable") or ipaddr(v, "address"), "int")
- w_first = ipaddr(ipaddr(w, "network") or ipaddr(w, "address"), "int")
- w_last = ipaddr(ipaddr(w, "broadcast") or ipaddr(w, "address"), "int")
-
- if _range_checker(w_first, v_first, v_last) and _range_checker(
- w_last, v_first, v_last
- ):
- return True
- else:
- return False
-
-
-def network_in_network(value, test):
- """
- Checks whether the 'test' address or addresses are in 'value', including broadcast and network
-
- :param: value: The network address or range to test against.
- :param test: The address or network to validate if it is within the range of 'value'.
-
- :return: bool
- """
- # normalize value and test variables into an ipaddr
- v = _address_normalizer(value)
- w = _address_normalizer(test)
-
- # get first and last addresses as integers to compare value and test; or cathes value when case is /32
- v_first = ipaddr(ipaddr(v, "network") or ipaddr(v, "address"), "int")
- v_last = ipaddr(ipaddr(v, "broadcast") or ipaddr(v, "address"), "int")
- w_first = ipaddr(ipaddr(w, "network") or ipaddr(w, "address"), "int")
- w_last = ipaddr(ipaddr(w, "broadcast") or ipaddr(w, "address"), "int")
-
- if _range_checker(w_first, v_first, v_last) and _range_checker(
- w_last, v_first, v_last
- ):
- return True
- else:
- return False
-
-
-def reduce_on_network(value, network):
- """
- Reduces a list of addresses to only the addresses that match a given network.
-
- :param: value: The list of addresses to filter on.
- :param: network: The network to validate against.
-
- :return: The reduced list of addresses.
- """
- # normalize network variable into an ipaddr
- n = _address_normalizer(network)
-
- # get first and last addresses as integers to compare value and test; or cathes value when case is /32
- n_first = ipaddr(ipaddr(n, "network") or ipaddr(n, "address"), "int")
- n_last = ipaddr(ipaddr(n, "broadcast") or ipaddr(n, "address"), "int")
-
- # create an empty list to fill and return
- r = []
-
- for address in value:
- # normalize address variables into an ipaddr
- a = _address_normalizer(address)
-
- # get first and last addresses as integers to compare value and test; or cathes value when case is /32
- a_first = ipaddr(ipaddr(a, "network") or ipaddr(a, "address"), "int")
- a_last = ipaddr(ipaddr(a, "broadcast") or ipaddr(a, "address"), "int")
-
- if _range_checker(a_first, n_first, n_last) and _range_checker(
- a_last, n_first, n_last
- ):
- r.append(address)
-
- return r
-
-
-# Returns the SLAAC address within a network for a given HW/MAC address.
-# Usage:
-#
-# - prefix | slaac(mac)
-def slaac(value, query=""):
- """ Get the SLAAC address within given network """
- try:
- vtype = ipaddr(value, "type")
- if vtype == "address":
- v = ipaddr(value, "cidr")
- elif vtype == "network":
- v = ipaddr(value, "subnet")
-
- if ipaddr(value, "version") != 6:
- return False
-
- value = netaddr.IPNetwork(v)
- except Exception:
- return False
-
- if not query:
- return False
-
- try:
- mac = hwaddr(query, alias="slaac")
-
- eui = netaddr.EUI(mac)
- except Exception:
- return False
-
- return eui.ipv6(value.network)
-
-
-# ---- HWaddr / MAC address filters ----
-def hwaddr(value, query="", alias="hwaddr"):
- """ Check if string is a HW/MAC address and filter it """
-
- query_func_extra_args = {"": ("value",)}
-
- query_func_map = {
- "": _empty_hwaddr_query,
- "bare": _bare_query,
- "bool": _bool_hwaddr_query,
- "int": _int_hwaddr_query,
- "cisco": _cisco_query,
- "eui48": _win_query,
- "linux": _linux_query,
- "pgsql": _postgresql_query,
- "postgresql": _postgresql_query,
- "psql": _postgresql_query,
- "unix": _unix_query,
- "win": _win_query,
- }
-
- try:
- v = netaddr.EUI(value)
- except Exception:
- if query and query != "bool":
- raise errors.AnsibleFilterError(
- alias + ": not a hardware address: %s" % value
- )
-
- extras = []
- for arg in query_func_extra_args.get(query, tuple()):
- extras.append(locals()[arg])
- try:
- return query_func_map[query](v, *extras)
- except KeyError:
- raise errors.AnsibleFilterError(
- alias + ": unknown filter type: %s" % query
- )
-
- return False
-
-
-def macaddr(value, query=""):
- return hwaddr(value, query, alias="macaddr")
-
-
-def _need_netaddr(f_name, *args, **kwargs):
- raise errors.AnsibleFilterError(
- "The %s filter requires python's netaddr be "
- "installed on the ansible controller" % f_name
- )
-
-
-def ip4_hex(arg, delimiter=""):
- """ Convert an IPv4 address to Hexadecimal notation """
- numbers = list(map(int, arg.split(".")))
- return "{0:02x}{sep}{1:02x}{sep}{2:02x}{sep}{3:02x}".format(
- *numbers, sep=delimiter
- )
-
-
-# ---- Ansible filters ----
-class FilterModule(object):
- """ IP address and network manipulation filters """
-
- filter_map = {
- # IP addresses and networks
- "cidr_merge": cidr_merge,
- "ipaddr": ipaddr,
- "ipmath": ipmath,
- "ipwrap": ipwrap,
- "ip4_hex": ip4_hex,
- "ipv4": ipv4,
- "ipv6": ipv6,
- "ipsubnet": ipsubnet,
- "next_nth_usable": next_nth_usable,
- "network_in_network": network_in_network,
- "network_in_usable": network_in_usable,
- "reduce_on_network": reduce_on_network,
- "nthhost": nthhost,
- "previous_nth_usable": previous_nth_usable,
- "slaac": slaac,
- # MAC / HW addresses
- "hwaddr": hwaddr,
- "macaddr": macaddr,
- }
-
- def filters(self):
- if netaddr:
- return self.filter_map
- else:
- # Need to install python's netaddr for these filters to work
- return dict(
- (f, partial(_need_netaddr, f)) for f in self.filter_map
- )
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/network.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/network.py
deleted file mode 100644
index 72d6c8684c..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/filter/network.py
+++ /dev/null
@@ -1,531 +0,0 @@
-#
-# {c) 2017 Red Hat, Inc.
-#
-# This file is part of Ansible
-#
-# Ansible is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Ansible is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-
-# Make coding more python3-ish
-from __future__ import absolute_import, division, print_function
-
-__metaclass__ = type
-
-import re
-import os
-import traceback
-import string
-
-from collections.abc import Mapping
-from xml.etree.ElementTree import fromstring
-
-from ansible.module_utils._text import to_native, to_text
-from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import (
- Template,
-)
-from ansible.module_utils.six import iteritems, string_types
-from ansible.errors import AnsibleError, AnsibleFilterError
-from ansible.utils.display import Display
-from ansible.utils.encrypt import passlib_or_crypt, random_password
-
-try:
- import yaml
-
- HAS_YAML = True
-except ImportError:
- HAS_YAML = False
-
-try:
- import textfsm
-
- HAS_TEXTFSM = True
-except ImportError:
- HAS_TEXTFSM = False
-
-display = Display()
-
-
-def re_matchall(regex, value):
- objects = list()
- for match in re.findall(regex.pattern, value, re.M):
- obj = {}
- if regex.groupindex:
- for name, index in iteritems(regex.groupindex):
- if len(regex.groupindex) == 1:
- obj[name] = match
- else:
- obj[name] = match[index - 1]
- objects.append(obj)
- return objects
-
-
-def re_search(regex, value):
- obj = {}
- match = regex.search(value, re.M)
- if match:
- items = list(match.groups())
- if regex.groupindex:
- for name, index in iteritems(regex.groupindex):
- obj[name] = items[index - 1]
- return obj
-
-
-def parse_cli(output, tmpl):
- if not isinstance(output, string_types):
- raise AnsibleError(
- "parse_cli input should be a string, but was given a input of %s"
- % (type(output))
- )
-
- if not os.path.exists(tmpl):
- raise AnsibleError("unable to locate parse_cli template: %s" % tmpl)
-
- try:
- template = Template()
- except ImportError as exc:
- raise AnsibleError(to_native(exc))
-
- with open(tmpl) as tmpl_fh:
- tmpl_content = tmpl_fh.read()
-
- spec = yaml.safe_load(tmpl_content)
- obj = {}
-
- for name, attrs in iteritems(spec["keys"]):
- value = attrs["value"]
-
- try:
- variables = spec.get("vars", {})
- value = template(value, variables)
- except Exception:
- pass
-
- if "start_block" in attrs and "end_block" in attrs:
- start_block = re.compile(attrs["start_block"])
- end_block = re.compile(attrs["end_block"])
-
- blocks = list()
- lines = None
- block_started = False
-
- for line in output.split("\n"):
- match_start = start_block.match(line)
- match_end = end_block.match(line)
-
- if match_start:
- lines = list()
- lines.append(line)
- block_started = True
-
- elif match_end:
- if lines:
- lines.append(line)
- blocks.append("\n".join(lines))
- block_started = False
-
- elif block_started:
- if lines:
- lines.append(line)
-
- regex_items = [re.compile(r) for r in attrs["items"]]
- objects = list()
-
- for block in blocks:
- if isinstance(value, Mapping) and "key" not in value:
- items = list()
- for regex in regex_items:
- match = regex.search(block)
- if match:
- item_values = match.groupdict()
- item_values["match"] = list(match.groups())
- items.append(item_values)
- else:
- items.append(None)
-
- obj = {}
- for k, v in iteritems(value):
- try:
- obj[k] = template(
- v, {"item": items}, fail_on_undefined=False
- )
- except Exception:
- obj[k] = None
- objects.append(obj)
-
- elif isinstance(value, Mapping):
- items = list()
- for regex in regex_items:
- match = regex.search(block)
- if match:
- item_values = match.groupdict()
- item_values["match"] = list(match.groups())
- items.append(item_values)
- else:
- items.append(None)
-
- key = template(value["key"], {"item": items})
- values = dict(
- [
- (k, template(v, {"item": items}))
- for k, v in iteritems(value["values"])
- ]
- )
- objects.append({key: values})
-
- return objects
-
- elif "items" in attrs:
- regexp = re.compile(attrs["items"])
- when = attrs.get("when")
- conditional = (
- "{%% if %s %%}True{%% else %%}False{%% endif %%}" % when
- )
-
- if isinstance(value, Mapping) and "key" not in value:
- values = list()
-
- for item in re_matchall(regexp, output):
- entry = {}
-
- for item_key, item_value in iteritems(value):
- entry[item_key] = template(item_value, {"item": item})
-
- if when:
- if template(conditional, {"item": entry}):
- values.append(entry)
- else:
- values.append(entry)
-
- obj[name] = values
-
- elif isinstance(value, Mapping):
- values = dict()
-
- for item in re_matchall(regexp, output):
- entry = {}
-
- for item_key, item_value in iteritems(value["values"]):
- entry[item_key] = template(item_value, {"item": item})
-
- key = template(value["key"], {"item": item})
-
- if when:
- if template(
- conditional, {"item": {"key": key, "value": entry}}
- ):
- values[key] = entry
- else:
- values[key] = entry
-
- obj[name] = values
-
- else:
- item = re_search(regexp, output)
- obj[name] = template(value, {"item": item})
-
- else:
- obj[name] = value
-
- return obj
-
-
-def parse_cli_textfsm(value, template):
- if not HAS_TEXTFSM:
- raise AnsibleError(
- "parse_cli_textfsm filter requires TextFSM library to be installed"
- )
-
- if not isinstance(value, string_types):
- raise AnsibleError(
- "parse_cli_textfsm input should be a string, but was given a input of %s"
- % (type(value))
- )
-
- if not os.path.exists(template):
- raise AnsibleError(
- "unable to locate parse_cli_textfsm template: %s" % template
- )
-
- try:
- template = open(template)
- except IOError as exc:
- raise AnsibleError(to_native(exc))
-
- re_table = textfsm.TextFSM(template)
- fsm_results = re_table.ParseText(value)
-
- results = list()
- for item in fsm_results:
- results.append(dict(zip(re_table.header, item)))
-
- return results
-
-
-def _extract_param(template, root, attrs, value):
-
- key = None
- when = attrs.get("when")
- conditional = "{%% if %s %%}True{%% else %%}False{%% endif %%}" % when
- param_to_xpath_map = attrs["items"]
-
- if isinstance(value, Mapping):
- key = value.get("key", None)
- if key:
- value = value["values"]
-
- entries = dict() if key else list()
-
- for element in root.findall(attrs["top"]):
- entry = dict()
- item_dict = dict()
- for param, param_xpath in iteritems(param_to_xpath_map):
- fields = None
- try:
- fields = element.findall(param_xpath)
- except Exception:
- display.warning(
- "Failed to evaluate value of '%s' with XPath '%s'.\nUnexpected error: %s."
- % (param, param_xpath, traceback.format_exc())
- )
-
- tags = param_xpath.split("/")
-
- # check if xpath ends with attribute.
- # If yes set attribute key/value dict to param value in case attribute matches
- # else if it is a normal xpath assign matched element text value.
- if len(tags) and tags[-1].endswith("]"):
- if fields:
- if len(fields) > 1:
- item_dict[param] = [field.attrib for field in fields]
- else:
- item_dict[param] = fields[0].attrib
- else:
- item_dict[param] = {}
- else:
- if fields:
- if len(fields) > 1:
- item_dict[param] = [field.text for field in fields]
- else:
- item_dict[param] = fields[0].text
- else:
- item_dict[param] = None
-
- if isinstance(value, Mapping):
- for item_key, item_value in iteritems(value):
- entry[item_key] = template(item_value, {"item": item_dict})
- else:
- entry = template(value, {"item": item_dict})
-
- if key:
- expanded_key = template(key, {"item": item_dict})
- if when:
- if template(
- conditional,
- {"item": {"key": expanded_key, "value": entry}},
- ):
- entries[expanded_key] = entry
- else:
- entries[expanded_key] = entry
- else:
- if when:
- if template(conditional, {"item": entry}):
- entries.append(entry)
- else:
- entries.append(entry)
-
- return entries
-
-
-def parse_xml(output, tmpl):
- if not os.path.exists(tmpl):
- raise AnsibleError("unable to locate parse_xml template: %s" % tmpl)
-
- if not isinstance(output, string_types):
- raise AnsibleError(
- "parse_xml works on string input, but given input of : %s"
- % type(output)
- )
-
- root = fromstring(output)
- try:
- template = Template()
- except ImportError as exc:
- raise AnsibleError(to_native(exc))
-
- with open(tmpl) as tmpl_fh:
- tmpl_content = tmpl_fh.read()
-
- spec = yaml.safe_load(tmpl_content)
- obj = {}
-
- for name, attrs in iteritems(spec["keys"]):
- value = attrs["value"]
-
- try:
- variables = spec.get("vars", {})
- value = template(value, variables)
- except Exception:
- pass
-
- if "items" in attrs:
- obj[name] = _extract_param(template, root, attrs, value)
- else:
- obj[name] = value
-
- return obj
-
-
-def type5_pw(password, salt=None):
- if not isinstance(password, string_types):
- raise AnsibleFilterError(
- "type5_pw password input should be a string, but was given a input of %s"
- % (type(password).__name__)
- )
-
- salt_chars = u"".join(
- (to_text(string.ascii_letters), to_text(string.digits), u"./")
- )
- if salt is not None and not isinstance(salt, string_types):
- raise AnsibleFilterError(
- "type5_pw salt input should be a string, but was given a input of %s"
- % (type(salt).__name__)
- )
- elif not salt:
- salt = random_password(length=4, chars=salt_chars)
- elif not set(salt) <= set(salt_chars):
- raise AnsibleFilterError(
- "type5_pw salt used inproper characters, must be one of %s"
- % (salt_chars)
- )
-
- encrypted_password = passlib_or_crypt(password, "md5_crypt", salt=salt)
-
- return encrypted_password
-
-
-def hash_salt(password):
-
- split_password = password.split("$")
- if len(split_password) != 4:
- raise AnsibleFilterError(
- "Could not parse salt out password correctly from {0}".format(
- password
- )
- )
- else:
- return split_password[2]
-
-
-def comp_type5(
- unencrypted_password, encrypted_password, return_original=False
-):
-
- salt = hash_salt(encrypted_password)
- if type5_pw(unencrypted_password, salt) == encrypted_password:
- if return_original is True:
- return encrypted_password
- else:
- return True
- return False
-
-
-def vlan_parser(vlan_list, first_line_len=48, other_line_len=44):
-
- """
- Input: Unsorted list of vlan integers
- Output: Sorted string list of integers according to IOS-like vlan list rules
-
- 1. Vlans are listed in ascending order
- 2. Runs of 3 or more consecutive vlans are listed with a dash
- 3. The first line of the list can be first_line_len characters long
- 4. Subsequent list lines can be other_line_len characters
- """
-
- # Sort and remove duplicates
- sorted_list = sorted(set(vlan_list))
-
- if sorted_list[0] < 1 or sorted_list[-1] > 4094:
- raise AnsibleFilterError("Valid VLAN range is 1-4094")
-
- parse_list = []
- idx = 0
- while idx < len(sorted_list):
- start = idx
- end = start
- while end < len(sorted_list) - 1:
- if sorted_list[end + 1] - sorted_list[end] == 1:
- end += 1
- else:
- break
-
- if start == end:
- # Single VLAN
- parse_list.append(str(sorted_list[idx]))
- elif start + 1 == end:
- # Run of 2 VLANs
- parse_list.append(str(sorted_list[start]))
- parse_list.append(str(sorted_list[end]))
- else:
- # Run of 3 or more VLANs
- parse_list.append(
- str(sorted_list[start]) + "-" + str(sorted_list[end])
- )
- idx = end + 1
-
- line_count = 0
- result = [""]
- for vlans in parse_list:
- # First line (" switchport trunk allowed vlan ")
- if line_count == 0:
- if len(result[line_count] + vlans) > first_line_len:
- result.append("")
- line_count += 1
- result[line_count] += vlans + ","
- else:
- result[line_count] += vlans + ","
-
- # Subsequent lines (" switchport trunk allowed vlan add ")
- else:
- if len(result[line_count] + vlans) > other_line_len:
- result.append("")
- line_count += 1
- result[line_count] += vlans + ","
- else:
- result[line_count] += vlans + ","
-
- # Remove trailing orphan commas
- for idx in range(0, len(result)):
- result[idx] = result[idx].rstrip(",")
-
- # Sometimes text wraps to next line, but there are no remaining VLANs
- if "" in result:
- result.remove("")
-
- return result
-
-
-class FilterModule(object):
- """Filters for working with output from network devices"""
-
- filter_map = {
- "parse_cli": parse_cli,
- "parse_cli_textfsm": parse_cli_textfsm,
- "parse_xml": parse_xml,
- "type5_pw": type5_pw,
- "hash_salt": hash_salt,
- "comp_type5": comp_type5,
- "vlan_parser": vlan_parser,
- }
-
- def filters(self):
- return self.filter_map
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/httpapi/restconf.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/httpapi/restconf.py
deleted file mode 100644
index 8afb3e5e28..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/httpapi/restconf.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# Copyright (c) 2018 Cisco and/or its affiliates.
-#
-# This file is part of Ansible
-#
-# Ansible is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Ansible is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-#
-
-from __future__ import absolute_import, division, print_function
-
-__metaclass__ = type
-
-DOCUMENTATION = """author: Ansible Networking Team
-httpapi: restconf
-short_description: HttpApi Plugin for devices supporting Restconf API
-description:
-- This HttpApi plugin provides methods to connect to Restconf API endpoints.
-options:
- root_path:
- type: str
- description:
- - Specifies the location of the Restconf root.
- default: /restconf
- vars:
- - name: ansible_httpapi_restconf_root
-"""
-
-import json
-
-from ansible.module_utils._text import to_text
-from ansible.module_utils.connection import ConnectionError
-from ansible.module_utils.six.moves.urllib.error import HTTPError
-from ansible.plugins.httpapi import HttpApiBase
-
-
-CONTENT_TYPE = "application/yang-data+json"
-
-
-class HttpApi(HttpApiBase):
- def send_request(self, data, **message_kwargs):
- if data:
- data = json.dumps(data)
-
- path = "/".join(
- [
- self.get_option("root_path").rstrip("/"),
- message_kwargs.get("path", "").lstrip("/"),
- ]
- )
-
- headers = {
- "Content-Type": message_kwargs.get("content_type") or CONTENT_TYPE,
- "Accept": message_kwargs.get("accept") or CONTENT_TYPE,
- }
- response, response_data = self.connection.send(
- path, data, headers=headers, method=message_kwargs.get("method")
- )
-
- return handle_response(response, response_data)
-
-
-def handle_response(response, response_data):
- try:
- response_data = json.loads(response_data.read())
- except ValueError:
- response_data = response_data.read()
-
- if isinstance(response, HTTPError):
- if response_data:
- if "errors" in response_data:
- errors = response_data["errors"]["error"]
- error_text = "\n".join(
- (error["error-message"] for error in errors)
- )
- else:
- error_text = response_data
-
- raise ConnectionError(error_text, code=response.code)
- raise ConnectionError(to_text(response), code=response.code)
-
- return response_data
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/netconf/netconf.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/netconf/netconf.py
deleted file mode 100644
index 1f03299bea..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/netconf/netconf.py
+++ /dev/null
@@ -1,147 +0,0 @@
-#
-# (c) 2018 Red Hat, Inc.
-#
-# This file is part of Ansible
-#
-# Ansible is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Ansible is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-#
-import json
-
-from copy import deepcopy
-from contextlib import contextmanager
-
-try:
- from lxml.etree import fromstring, tostring
-except ImportError:
- from xml.etree.ElementTree import fromstring, tostring
-
-from ansible.module_utils._text import to_text, to_bytes
-from ansible.module_utils.connection import Connection, ConnectionError
-from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.netconf import (
- NetconfConnection,
-)
-
-
-IGNORE_XML_ATTRIBUTE = ()
-
-
-def get_connection(module):
- if hasattr(module, "_netconf_connection"):
- return module._netconf_connection
-
- capabilities = get_capabilities(module)
- network_api = capabilities.get("network_api")
- if network_api == "netconf":
- module._netconf_connection = NetconfConnection(module._socket_path)
- else:
- module.fail_json(msg="Invalid connection type %s" % network_api)
-
- return module._netconf_connection
-
-
-def get_capabilities(module):
- if hasattr(module, "_netconf_capabilities"):
- return module._netconf_capabilities
-
- capabilities = Connection(module._socket_path).get_capabilities()
- module._netconf_capabilities = json.loads(capabilities)
- return module._netconf_capabilities
-
-
-def lock_configuration(module, target=None):
- conn = get_connection(module)
- return conn.lock(target=target)
-
-
-def unlock_configuration(module, target=None):
- conn = get_connection(module)
- return conn.unlock(target=target)
-
-
-@contextmanager
-def locked_config(module, target=None):
- try:
- lock_configuration(module, target=target)
- yield
- finally:
- unlock_configuration(module, target=target)
-
-
-def get_config(module, source, filter=None, lock=False):
- conn = get_connection(module)
- try:
- locked = False
- if lock:
- conn.lock(target=source)
- locked = True
- response = conn.get_config(source=source, filter=filter)
-
- except ConnectionError as e:
- module.fail_json(
- msg=to_text(e, errors="surrogate_then_replace").strip()
- )
-
- finally:
- if locked:
- conn.unlock(target=source)
-
- return response
-
-
-def get(module, filter, lock=False):
- conn = get_connection(module)
- try:
- locked = False
- if lock:
- conn.lock(target="running")
- locked = True
-
- response = conn.get(filter=filter)
-
- except ConnectionError as e:
- module.fail_json(
- msg=to_text(e, errors="surrogate_then_replace").strip()
- )
-
- finally:
- if locked:
- conn.unlock(target="running")
-
- return response
-
-
-def dispatch(module, request):
- conn = get_connection(module)
- try:
- response = conn.dispatch(request)
- except ConnectionError as e:
- module.fail_json(
- msg=to_text(e, errors="surrogate_then_replace").strip()
- )
-
- return response
-
-
-def sanitize_xml(data):
- tree = fromstring(
- to_bytes(deepcopy(data), errors="surrogate_then_replace")
- )
- for element in tree.getiterator():
- # remove attributes
- attribute = element.attrib
- if attribute:
- for key in list(attribute):
- if key not in IGNORE_XML_ATTRIBUTE:
- attribute.pop(key)
- return to_text(tostring(tree), errors="surrogate_then_replace").strip()
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/restconf/restconf.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/restconf/restconf.py
deleted file mode 100644
index fba46be0d6..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/restconf/restconf.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# This code is part of Ansible, but is an independent component.
-# This particular file snippet, and this file snippet only, is BSD licensed.
-# Modules you write using this snippet, which is embedded dynamically by Ansible
-# still belong to the author of the module, and may assign their own license
-# to the complete work.
-#
-# (c) 2018 Red Hat Inc.
-#
-# Redistribution and use in source and binary forms, with or without modification,
-# are permitted provided that the following conditions are met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
-# and/or other materials provided with the distribution.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
-# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
-# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
-# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-#
-
-from ansible.module_utils.connection import Connection
-
-
-def get(module, path=None, content=None, fields=None, output="json"):
- if path is None:
- raise ValueError("path value must be provided")
- if content:
- path += "?" + "content=%s" % content
- if fields:
- path += "?" + "field=%s" % fields
-
- accept = None
- if output == "xml":
- accept = "application/yang-data+xml"
-
- connection = Connection(module._socket_path)
- return connection.send_request(
- None, path=path, method="GET", accept=accept
- )
-
-
-def edit_config(module, path=None, content=None, method="GET", format="json"):
- if path is None:
- raise ValueError("path value must be provided")
-
- content_type = None
- if format == "xml":
- content_type = "application/yang-data+xml"
-
- connection = Connection(module._socket_path)
- return connection.send_request(
- content, path=path, method=method, content_type=content_type
- )
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_get.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_get.py
deleted file mode 100644
index f0910f52e6..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_get.py
+++ /dev/null
@@ -1,71 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-# (c) 2018, Ansible by Red Hat, inc
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-
-__metaclass__ = type
-
-
-ANSIBLE_METADATA = {
- "metadata_version": "1.1",
- "status": ["preview"],
- "supported_by": "network",
-}
-
-
-DOCUMENTATION = """module: net_get
-author: Deepak Agrawal (@dagrawal)
-short_description: Copy a file from a network device to Ansible Controller
-description:
-- This module provides functionality to copy file from network device to ansible controller.
-extends_documentation_fragment:
-- ansible.netcommon.network_agnostic
-options:
- src:
- description:
- - Specifies the source file. The path to the source file can either be the full
- path on the network device or a relative path as per path supported by destination
- network device.
- required: true
- protocol:
- description:
- - Protocol used to transfer file.
- default: scp
- choices:
- - scp
- - sftp
- dest:
- description:
- - Specifies the destination file. The path to the destination file can either
- be the full path on the Ansible control host or a relative path from the playbook
- or role root directory.
- default:
- - Same filename as specified in I(src). The path will be playbook root or role
- root directory if playbook is part of a role.
-requirements:
-- scp
-notes:
-- Some devices need specific configurations to be enabled before scp can work These
- configuration should be pre-configured before using this module e.g ios - C(ip scp
- server enable).
-- User privilege to do scp on network device should be pre-configured e.g. ios - need
- user privilege 15 by default for allowing scp.
-- Default destination of source file.
-"""
-
-EXAMPLES = """
-- name: copy file from the network device to Ansible controller
- net_get:
- src: running_cfg_ios1.txt
-
-- name: copy file from ios to common location at /tmp
- net_get:
- src: running_cfg_sw1.txt
- dest : /tmp/ios1.txt
-"""
-
-RETURN = """
-"""
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_put.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_put.py
deleted file mode 100644
index 2fc4a98c01..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/modules/net_put.py
+++ /dev/null
@@ -1,82 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-# (c) 2018, Ansible by Red Hat, inc
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-
-__metaclass__ = type
-
-
-ANSIBLE_METADATA = {
- "metadata_version": "1.1",
- "status": ["preview"],
- "supported_by": "network",
-}
-
-
-DOCUMENTATION = """module: net_put
-author: Deepak Agrawal (@dagrawal)
-short_description: Copy a file from Ansible Controller to a network device
-description:
-- This module provides functionality to copy file from Ansible controller to network
- devices.
-extends_documentation_fragment:
-- ansible.netcommon.network_agnostic
-options:
- src:
- description:
- - Specifies the source file. The path to the source file can either be the full
- path on the Ansible control host or a relative path from the playbook or role
- root directory.
- required: true
- protocol:
- description:
- - Protocol used to transfer file.
- default: scp
- choices:
- - scp
- - sftp
- dest:
- description:
- - Specifies the destination file. The path to destination file can either be the
- full path or relative path as supported by network_os.
- default:
- - Filename from src and at default directory of user shell on network_os.
- required: false
- mode:
- description:
- - Set the file transfer mode. If mode is set to I(text) then I(src) file will
- go through Jinja2 template engine to replace any vars if present in the src
- file. If mode is set to I(binary) then file will be copied as it is to destination
- device.
- default: binary
- choices:
- - binary
- - text
-requirements:
-- scp
-notes:
-- Some devices need specific configurations to be enabled before scp can work These
- configuration should be pre-configured before using this module e.g ios - C(ip scp
- server enable).
-- User privilege to do scp on network device should be pre-configured e.g. ios - need
- user privilege 15 by default for allowing scp.
-- Default destination of source file.
-"""
-
-EXAMPLES = """
-- name: copy file from ansible controller to a network device
- net_put:
- src: running_cfg_ios1.txt
-
-- name: copy file at root dir of flash in slot 3 of sw1(ios)
- net_put:
- src: running_cfg_sw1.txt
- protocol: sftp
- dest : flash3:/running_cfg_sw1.txt
-"""
-
-RETURN = """
-"""
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/netconf/default.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/netconf/default.py
deleted file mode 100644
index e9332f26d9..0000000000
--- a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/netconf/default.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# (c) 2017 Red Hat Inc.
-#
-# This file is part of Ansible
-#
-# Ansible is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Ansible is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-#
-from __future__ import absolute_import, division, print_function
-
-__metaclass__ = type
-
-DOCUMENTATION = """author: Ansible Networking Team
-netconf: default
-short_description: Use default netconf plugin to run standard netconf commands as
- per RFC
-description:
-- This default plugin provides low level abstraction apis for sending and receiving
- netconf commands as per Netconf RFC specification.
-options:
- ncclient_device_handler:
- type: str
- default: default
- description:
- - Specifies the ncclient device handler name for network os that support default
- netconf implementation as per Netconf RFC specification. To identify the ncclient
- device handler name refer ncclient library documentation.
-"""
-import json
-
-from ansible.module_utils._text import to_text
-from ansible.plugins.netconf import NetconfBase
-
-
-class Netconf(NetconfBase):
- def get_text(self, ele, tag):
- try:
- return to_text(
- ele.find(tag).text, errors="surrogate_then_replace"
- ).strip()
- except AttributeError:
- pass
-
- def get_device_info(self):
- device_info = dict()
- device_info["network_os"] = "default"
- return device_info
-
- def get_capabilities(self):
- result = dict()
- result["rpc"] = self.get_base_rpc()
- result["network_api"] = "netconf"
- result["device_info"] = self.get_device_info()
- result["server_capabilities"] = [c for c in self.m.server_capabilities]
- result["client_capabilities"] = [c for c in self.m.client_capabilities]
- result["session_id"] = self.m.session_id
- result["device_operations"] = self.get_device_operations(
- result["server_capabilities"]
- )
- return json.dumps(result)