summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/ansible_util.py
blob: 385416bd77bcc6c5d0ed4ce870e1116e442ae763 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
"""Miscellaneous utility functions and classes specific to ansible cli tools."""
from __future__ import annotations

import json
import os
import typing as t

from .constants import (
    SOFT_RLIMIT_NOFILE,
)

from .io import (
    write_text_file,
)

from .util import (
    common_environment,
    ApplicationError,
    ANSIBLE_LIB_ROOT,
    ANSIBLE_TEST_DATA_ROOT,
    ANSIBLE_BIN_PATH,
    ANSIBLE_SOURCE_ROOT,
    ANSIBLE_TEST_TOOLS_ROOT,
    get_ansible_version,
)

from .util_common import (
    create_temp_dir,
    run_command,
    ResultType,
    intercept_python,
    get_injector_path,
)

from .config import (
    IntegrationConfig,
    PosixIntegrationConfig,
    EnvironmentConfig,
    CommonConfig,
)

from .data import (
    data_context,
)

from .python_requirements import (
    install_requirements,
)

from .host_configs import (
    PythonConfig,
)


def parse_inventory(args, inventory_path):  # type: (EnvironmentConfig, str) -> t.Dict[str, t.Any]
    """Return a dict parsed from the given inventory file."""
    cmd = ['ansible-inventory', '-i', inventory_path, '--list']
    env = ansible_environment(args)
    inventory = json.loads(intercept_python(args, args.controller_python, cmd, env, capture=True, always=True)[0])
    return inventory


def get_hosts(inventory, group_name):  # type: (t.Dict[str, t.Any], str) -> t.Dict[str, t.Dict[str, t.Any]]
    """Return a dict of hosts from the specified group in the given inventory."""
    hostvars = inventory.get('_meta', {}).get('hostvars', {})
    group = inventory.get(group_name, {})
    host_names = group.get('hosts', [])
    hosts = dict((name, hostvars.get(name, {})) for name in host_names)
    return hosts


def ansible_environment(args, color=True, ansible_config=None):  # type: (CommonConfig, bool, t.Optional[str]) -> t.Dict[str, str]
    """Return a dictionary of environment variables to use when running Ansible commands."""
    env = common_environment()
    path = env['PATH']

    if not path.startswith(ANSIBLE_BIN_PATH + os.path.pathsep):
        path = ANSIBLE_BIN_PATH + os.path.pathsep + path

    if not ansible_config:
        # use the default empty configuration unless one has been provided
        ansible_config = args.get_ansible_config()

    if not args.explain and not os.path.exists(ansible_config):
        raise ApplicationError('Configuration not found: %s' % ansible_config)

    ansible = dict(
        ANSIBLE_PYTHON_MODULE_RLIMIT_NOFILE=str(SOFT_RLIMIT_NOFILE),
        ANSIBLE_FORCE_COLOR='%s' % 'true' if args.color and color else 'false',
        ANSIBLE_FORCE_HANDLERS='true',  # allow cleanup handlers to run when tests fail
        ANSIBLE_HOST_PATTERN_MISMATCH='error',  # prevent tests from unintentionally passing when hosts are not found
        ANSIBLE_INVENTORY='/dev/null',  # force tests to provide inventory
        ANSIBLE_DEPRECATION_WARNINGS='false',
        ANSIBLE_HOST_KEY_CHECKING='false',
        ANSIBLE_RETRY_FILES_ENABLED='false',
        ANSIBLE_CONFIG=ansible_config,
        ANSIBLE_LIBRARY='/dev/null',
        ANSIBLE_DEVEL_WARNING='false',  # Don't show warnings that CI is running devel
        PYTHONPATH=get_ansible_python_path(args),
        PAGER='/bin/cat',
        PATH=path,
        # give TQM worker processes time to report code coverage results
        # without this the last task in a play may write no coverage file, an empty file, or an incomplete file
        # enabled even when not using code coverage to surface warnings when worker processes do not exit cleanly
        ANSIBLE_WORKER_SHUTDOWN_POLL_COUNT='100',
        ANSIBLE_WORKER_SHUTDOWN_POLL_DELAY='0.1',
    )

    if isinstance(args, IntegrationConfig) and args.coverage:
        # standard path injection is not effective for ansible-connection, instead the location must be configured
        # ansible-connection only requires the injector for code coverage
        # the correct python interpreter is already selected using the sys.executable used to invoke ansible
        ansible.update(dict(
            ANSIBLE_CONNECTION_PATH=os.path.join(get_injector_path(), 'ansible-connection'),
        ))

    if isinstance(args, PosixIntegrationConfig):
        ansible.update(dict(
            ANSIBLE_PYTHON_INTERPRETER='/set/ansible_python_interpreter/in/inventory',  # force tests to set ansible_python_interpreter in inventory
        ))

    env.update(ansible)

    if args.debug:
        env.update(dict(
            ANSIBLE_DEBUG='true',
            ANSIBLE_LOG_PATH=os.path.join(ResultType.LOGS.name, 'debug.log'),
        ))

    if data_context().content.collection:
        env.update(dict(
            ANSIBLE_COLLECTIONS_PATH=data_context().content.collection.root,
        ))

    if data_context().content.is_ansible:
        env.update(configure_plugin_paths(args))

    return env


def configure_plugin_paths(args):  # type: (CommonConfig) -> t.Dict[str, str]
    """Return environment variables with paths to plugins relevant for the current command."""
    if not isinstance(args, IntegrationConfig):
        return {}

    support_path = os.path.join(ANSIBLE_SOURCE_ROOT, 'test', 'support', args.command)

    # provide private copies of collections for integration tests
    collection_root = os.path.join(support_path, 'collections')

    env = dict(
        ANSIBLE_COLLECTIONS_PATH=collection_root,
    )

    # provide private copies of plugins for integration tests
    plugin_root = os.path.join(support_path, 'plugins')

    plugin_list = [
        'action',
        'become',
        'cache',
        'callback',
        'cliconf',
        'connection',
        'filter',
        'httpapi',
        'inventory',
        'lookup',
        'netconf',
        # 'shell' is not configurable
        'strategy',
        'terminal',
        'test',
        'vars',
    ]

    # most plugins follow a standard naming convention
    plugin_map = dict(('%s_plugins' % name, name) for name in plugin_list)

    # these plugins do not follow the standard naming convention
    plugin_map.update(
        doc_fragment='doc_fragments',
        library='modules',
        module_utils='module_utils',
    )

    env.update(dict(('ANSIBLE_%s' % key.upper(), os.path.join(plugin_root, value)) for key, value in plugin_map.items()))

    # only configure directories which exist
    env = dict((key, value) for key, value in env.items() if os.path.isdir(value))

    return env


def get_ansible_python_path(args):  # type: (CommonConfig) -> str
    """
    Return a directory usable for PYTHONPATH, containing only the ansible package.
    If a temporary directory is required, it will be cached for the lifetime of the process and cleaned up at exit.
    """
    try:
        return get_ansible_python_path.python_path
    except AttributeError:
        pass

    if ANSIBLE_SOURCE_ROOT:
        # when running from source there is no need for a temporary directory to isolate the ansible package
        python_path = os.path.dirname(ANSIBLE_LIB_ROOT)
    else:
        # when not running from source the installed directory is unsafe to add to PYTHONPATH
        # doing so would expose many unwanted packages on sys.path
        # instead a temporary directory is created which contains only ansible using a symlink
        python_path = create_temp_dir(prefix='ansible-test-')

        os.symlink(ANSIBLE_LIB_ROOT, os.path.join(python_path, 'ansible'))

    if not args.explain:
        generate_egg_info(python_path)

    get_ansible_python_path.python_path = python_path

    return python_path


def generate_egg_info(path):  # type: (str) -> None
    """Generate an egg-info in the specified base directory."""
    # minimal PKG-INFO stub following the format defined in PEP 241
    # required for older setuptools versions to avoid a traceback when importing pkg_resources from packages like cryptography
    # newer setuptools versions are happy with an empty directory
    # including a stub here means we don't need to locate the existing file or have setup.py generate it when running from source
    pkg_info = '''
Metadata-Version: 1.0
Name: ansible
Version: %s
Platform: UNKNOWN
Summary: Radically simple IT automation
Author-email: info@ansible.com
License: GPLv3+
''' % get_ansible_version()

    pkg_info_path = os.path.join(path, 'ansible_core.egg-info', 'PKG-INFO')

    if os.path.exists(pkg_info_path):
        return

    write_text_file(pkg_info_path, pkg_info.lstrip(), create_directories=True)


class CollectionDetail:
    """Collection detail."""
    def __init__(self):  # type: () -> None
        self.version = None  # type: t.Optional[str]


class CollectionDetailError(ApplicationError):
    """An error occurred retrieving collection detail."""
    def __init__(self, reason):  # type: (str) -> None
        super().__init__('Error collecting collection detail: %s' % reason)
        self.reason = reason


def get_collection_detail(args, python):  # type: (EnvironmentConfig, PythonConfig) -> CollectionDetail
    """Return collection detail."""
    collection = data_context().content.collection
    directory = os.path.join(collection.root, collection.directory)

    stdout = run_command(args, [python.path, os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'collection_detail.py'), directory], capture=True, always=True)[0]
    result = json.loads(stdout)
    error = result.get('error')

    if error:
        raise CollectionDetailError(error)

    version = result.get('version')

    detail = CollectionDetail()
    detail.version = str(version) if version is not None else None

    return detail


def run_playbook(
        args,  # type: EnvironmentConfig
        inventory_path,  # type: str
        playbook,   # type: str
        run_playbook_vars=None,  # type: t.Optional[t.Dict[str, t.Any]]
        capture=False,  # type: bool
):  # type: (...) -> None
    """Run the specified playbook using the given inventory file and playbook variables."""
    playbook_path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'playbooks', playbook)
    cmd = ['ansible-playbook', '-i', inventory_path, playbook_path]

    if run_playbook_vars:
        cmd.extend(['-e', json.dumps(run_playbook_vars)])

    if args.verbosity:
        cmd.append('-%s' % ('v' * args.verbosity))

    install_requirements(args, args.controller_python, ansible=True)  # run_playbook()
    env = ansible_environment(args)
    intercept_python(args, args.controller_python, cmd, env, capture=capture)