summaryrefslogtreecommitdiffstats
path: root/src/cephadm/cephadmlib/call_wrappers.py
blob: 3fe2171e99d5a1a292a29726b8e8d26ee2b66da4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# call_wrappers.py - functions to wrap calling external commands


import asyncio
import logging
import os
import subprocess
import sys

from enum import Enum
from typing import Callable, List, Dict, Optional, Any, Tuple, NoReturn

from .constants import QUIET_LOG_LEVEL, DEFAULT_TIMEOUT
from .context import CephadmContext
from .exceptions import TimeoutExpired

logger = logging.getLogger()


async def run_func(func: Callable, cmd: str) -> subprocess.CompletedProcess:
    logger.debug(f'running function {func.__name__}, with parms: {cmd}')
    response = func(cmd)
    return response


async def concurrent_tasks(func: Callable, cmd_list: List[str]) -> List[Any]:
    tasks = []
    for cmd in cmd_list:
        tasks.append(run_func(func, cmd))

    data = await asyncio.gather(*tasks)

    return data


class CallVerbosity(Enum):
    #####
    # Format:
    # Normal Operation: <log-level-when-no-errors>, Errors: <log-level-when-error>
    #
    # NOTE: QUIET log level is custom level only used when --verbose is passed
    #####

    # Normal Operation: None, Errors: None
    SILENT = 0
    # Normal Operation: QUIET, Error: QUIET
    QUIET = 1
    # Normal Operation: DEBUG, Error: DEBUG
    DEBUG = 2
    # Normal Operation: QUIET, Error: INFO
    QUIET_UNLESS_ERROR = 3
    # Normal Operation: DEBUG, Error: INFO
    VERBOSE_ON_FAILURE = 4
    # Normal Operation: INFO, Error: INFO
    VERBOSE = 5

    def success_log_level(self) -> int:
        _verbosity_level_to_log_level = {
            self.SILENT: 0,
            self.QUIET: QUIET_LOG_LEVEL,
            self.DEBUG: logging.DEBUG,
            self.QUIET_UNLESS_ERROR: QUIET_LOG_LEVEL,
            self.VERBOSE_ON_FAILURE: logging.DEBUG,
            self.VERBOSE: logging.INFO,
        }
        return _verbosity_level_to_log_level[self]  # type: ignore

    def error_log_level(self) -> int:
        _verbosity_level_to_log_level = {
            self.SILENT: 0,
            self.QUIET: QUIET_LOG_LEVEL,
            self.DEBUG: logging.DEBUG,
            self.QUIET_UNLESS_ERROR: logging.INFO,
            self.VERBOSE_ON_FAILURE: logging.INFO,
            self.VERBOSE: logging.INFO,
        }
        return _verbosity_level_to_log_level[self]  # type: ignore


# disable coverage for the next block. this is copy-n-paste
# from other code for compatibilty on older python versions
if sys.version_info < (3, 8):  # pragma: no cover
    import itertools
    import threading
    import warnings
    from asyncio import events

    class ThreadedChildWatcher(asyncio.AbstractChildWatcher):
        """Threaded child watcher implementation.
        The watcher uses a thread per process
        for waiting for the process finish.
        It doesn't require subscription on POSIX signal
        but a thread creation is not free.
        The watcher has O(1) complexity, its performance doesn't depend
        on amount of spawn processes.
        """

        def __init__(self) -> None:
            self._pid_counter = itertools.count(0)
            self._threads: Dict[Any, Any] = {}

        def is_active(self) -> bool:
            return True

        def close(self) -> None:
            self._join_threads()

        def _join_threads(self) -> None:
            """Internal: Join all non-daemon threads"""
            threads = [
                thread
                for thread in list(self._threads.values())
                if thread.is_alive() and not thread.daemon
            ]
            for thread in threads:
                thread.join()

        def __enter__(self) -> Any:
            return self

        def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
            pass

        def __del__(self, _warn: Any = warnings.warn) -> None:
            threads = [
                thread
                for thread in list(self._threads.values())
                if thread.is_alive()
            ]
            if threads:
                _warn(
                    f'{self.__class__} has registered but not finished child processes',
                    ResourceWarning,
                    source=self,
                )

        def add_child_handler(
            self, pid: Any, callback: Any, *args: Any
        ) -> None:
            loop = events.get_event_loop()
            thread = threading.Thread(
                target=self._do_waitpid,
                name=f'waitpid-{next(self._pid_counter)}',
                args=(loop, pid, callback, args),
                daemon=True,
            )
            self._threads[pid] = thread
            thread.start()

        def remove_child_handler(self, pid: Any) -> bool:
            # asyncio never calls remove_child_handler() !!!
            # The method is no-op but is implemented because
            # abstract base classe requires it
            return True

        def attach_loop(self, loop: Any) -> None:
            pass

        def _do_waitpid(
            self, loop: Any, expected_pid: Any, callback: Any, args: Any
        ) -> None:
            assert expected_pid > 0

            try:
                pid, status = os.waitpid(expected_pid, 0)
            except ChildProcessError:
                # The child process is already reaped
                # (may happen if waitpid() is called elsewhere).
                pid = expected_pid
                returncode = 255
                logger.warning(
                    'Unknown child process pid %d, will report returncode 255',
                    pid,
                )
            else:
                if os.WIFEXITED(status):
                    returncode = os.WEXITSTATUS(status)
                elif os.WIFSIGNALED(status):
                    returncode = -os.WTERMSIG(status)
                else:
                    raise ValueError(f'unknown wait status {status}')
                if loop.get_debug():
                    logger.debug(
                        'process %s exited with returncode %s',
                        expected_pid,
                        returncode,
                    )

            if loop.is_closed():
                logger.warning(
                    'Loop %r that handles pid %r is closed', loop, pid
                )
            else:
                loop.call_soon_threadsafe(callback, pid, returncode, *args)

            self._threads.pop(expected_pid)

    # unlike SafeChildWatcher which handles SIGCHLD in the main thread,
    # ThreadedChildWatcher runs in a separated thread, hence allows us to
    # run create_subprocess_exec() in non-main thread, see
    # https://bugs.python.org/issue35621
    asyncio.set_child_watcher(ThreadedChildWatcher())


try:
    from asyncio import run as async_run  # type: ignore[attr-defined]
except ImportError:  # pragma: no cover
    # disable coverage for this block. it should be a copy-n-paste from
    # from newer libs for compatibilty on older python versions
    def async_run(coro):  # type: ignore
        loop = asyncio.new_event_loop()
        try:
            asyncio.set_event_loop(loop)
            return loop.run_until_complete(coro)
        finally:
            try:
                loop.run_until_complete(loop.shutdown_asyncgens())
            finally:
                asyncio.set_event_loop(None)
                loop.close()


def call(
    ctx: CephadmContext,
    command: List[str],
    desc: Optional[str] = None,
    verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
    timeout: Optional[int] = DEFAULT_TIMEOUT,
    **kwargs: Any,
) -> Tuple[str, str, int]:
    """
    Wrap subprocess.Popen to

    - log stdout/stderr to a logger,
    - decode utf-8
    - cleanly return out, err, returncode

    :param timeout: timeout in seconds
    """

    prefix = command[0] if desc is None else desc
    if prefix:
        prefix += ': '
    timeout = timeout or ctx.timeout

    async def run_with_timeout() -> Tuple[str, str, int]:
        process = await asyncio.create_subprocess_exec(
            *command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=os.environ.copy(),
        )
        assert process.stdout
        assert process.stderr
        try:
            stdout, stderr = await asyncio.wait_for(
                process.communicate(),
                timeout,
            )
        except asyncio.TimeoutError:
            # try to terminate the process assuming it is still running.  It's
            # possible that even after killing the process it will not
            # complete, particularly if it is D-state.  If that happens the
            # process.wait call will block, but we're no worse off than before
            # when the timeout did not work.  Additionally, there are other
            # corner-cases we could try and handle here but we decided to start
            # simple.
            process.kill()
            await process.wait()
            logger.info(prefix + f'timeout after {timeout} seconds')
            return '', '', 124
        else:
            assert process.returncode is not None
            return (
                stdout.decode('utf-8'),
                stderr.decode('utf-8'),
                process.returncode,
            )

    stdout, stderr, returncode = async_run(run_with_timeout())
    log_level = verbosity.success_log_level()
    if returncode != 0:
        log_level = verbosity.error_log_level()
        logger.log(
            log_level,
            f'Non-zero exit code {returncode} from {" ".join(command)}',
        )
    for line in stdout.splitlines():
        logger.log(log_level, prefix + 'stdout ' + line)
    for line in stderr.splitlines():
        logger.log(log_level, prefix + 'stderr ' + line)
    return stdout, stderr, returncode


def call_throws(
    ctx: CephadmContext,
    command: List[str],
    desc: Optional[str] = None,
    verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
    timeout: Optional[int] = DEFAULT_TIMEOUT,
    **kwargs: Any,
) -> Tuple[str, str, int]:
    out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs)
    if ret:
        for s in (out, err):
            if s.strip() and len(s.splitlines()) <= 2:  # readable message?
                raise RuntimeError(
                    f'Failed command: {" ".join(command)}: {s}'
                )
        raise RuntimeError('Failed command: %s' % ' '.join(command))
    return out, err, ret


def call_timeout(ctx, command, timeout):
    # type: (CephadmContext, List[str], int) -> int
    logger.debug(
        'Running command (timeout=%s): %s' % (timeout, ' '.join(command))
    )

    def raise_timeout(command, timeout):
        # type: (List[str], int) -> NoReturn
        msg = 'Command `%s` timed out after %s seconds' % (command, timeout)
        logger.debug(msg)
        raise TimeoutExpired(msg)

    try:
        return subprocess.call(
            command, timeout=timeout, env=os.environ.copy()
        )
    except subprocess.TimeoutExpired:
        raise_timeout(command, timeout)