summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/util.py')
-rw-r--r--test/lib/ansible_test/_internal/util.py65
1 files changed, 33 insertions, 32 deletions
diff --git a/test/lib/ansible_test/_internal/util.py b/test/lib/ansible_test/_internal/util.py
index 5b1149fe17..42cf77a66b 100644
--- a/test/lib/ansible_test/_internal/util.py
+++ b/test/lib/ansible_test/_internal/util.py
@@ -2,6 +2,7 @@
from __future__ import annotations
import abc
+import collections.abc as c
import errno
import enum
import fcntl
@@ -62,7 +63,7 @@ TBase = t.TypeVar('TBase')
TKey = t.TypeVar('TKey')
TValue = t.TypeVar('TValue')
-PYTHON_PATHS: t.Dict[str, str] = {}
+PYTHON_PATHS: dict[str, str] = {}
COVERAGE_CONFIG_NAME = 'coveragerc'
@@ -141,9 +142,9 @@ def is_valid_identifier(value: str) -> bool:
return value.isidentifier() and not keyword.iskeyword(value)
-def cache(func: t.Callable[[], TValue]) -> t.Callable[[], TValue]:
+def cache(func: c.Callable[[], TValue]) -> c.Callable[[], TValue]:
"""Enforce exclusive access on a decorated function and cache the result."""
- storage: t.Dict[None, TValue] = {}
+ storage: dict[None, TValue] = {}
sentinel = object()
@functools.wraps(func)
@@ -162,7 +163,7 @@ def cache(func: t.Callable[[], TValue]) -> t.Callable[[], TValue]:
@mutex
def detect_architecture(python: str) -> t.Optional[str]:
"""Detect the architecture of the specified Python and return a normalized version, or None if it cannot be determined."""
- results: t.Dict[str, t.Optional[str]]
+ results: dict[str, t.Optional[str]]
try:
results = detect_architecture.results # type: ignore[attr-defined]
@@ -211,7 +212,7 @@ def detect_architecture(python: str) -> t.Optional[str]:
return architecture
-def filter_args(args: t.List[str], filters: t.Dict[str, int]) -> t.List[str]:
+def filter_args(args: list[str], filters: dict[str, int]) -> list[str]:
"""Return a filtered version of the given command line arguments."""
remaining = 0
result = []
@@ -235,7 +236,7 @@ def filter_args(args: t.List[str], filters: t.Dict[str, int]) -> t.List[str]:
return result
-def read_lines_without_comments(path: str, remove_blank_lines: bool = False, optional: bool = False) -> t.List[str]:
+def read_lines_without_comments(path: str, remove_blank_lines: bool = False, optional: bool = False) -> list[str]:
"""
Returns lines from the specified text file with comments removed.
Comments are any content from a hash symbol to the end of a line.
@@ -254,7 +255,7 @@ def read_lines_without_comments(path: str, remove_blank_lines: bool = False, opt
return lines
-def exclude_none_values(data: t.Dict[TKey, t.Optional[TValue]]) -> t.Dict[TKey, TValue]:
+def exclude_none_values(data: dict[TKey, t.Optional[TValue]]) -> dict[TKey, TValue]:
"""Return the provided dictionary with any None values excluded."""
return dict((key, value) for key, value in data.items() if value is not None)
@@ -339,15 +340,15 @@ def get_ansible_version() -> str:
@cache
-def get_available_python_versions() -> t.Dict[str, str]:
+def get_available_python_versions() -> dict[str, str]:
"""Return a dictionary indicating which supported Python versions are available."""
return dict((version, path) for version, path in ((version, find_python(version, required=False)) for version in SUPPORTED_PYTHON_VERSIONS) if path)
def raw_command(
- cmd: t.Iterable[str],
+ cmd: c.Iterable[str],
capture: bool,
- env: t.Optional[t.Dict[str, str]] = None,
+ env: t.Optional[dict[str, str]] = None,
data: t.Optional[str] = None,
cwd: t.Optional[str] = None,
explain: bool = False,
@@ -357,8 +358,8 @@ def raw_command(
output_stream: t.Optional[OutputStream] = None,
cmd_verbosity: int = 1,
str_errors: str = 'strict',
- error_callback: t.Optional[t.Callable[[SubprocessError], None]] = None,
-) -> t.Tuple[t.Optional[str], t.Optional[str]]:
+ error_callback: t.Optional[c.Callable[[SubprocessError], None]] = None,
+) -> tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return stdout and stderr as a tuple."""
output_stream = output_stream or OutputStream.AUTO
@@ -463,7 +464,7 @@ def raw_command(
try:
try:
- cmd_bytes = [to_bytes(c) for c in cmd]
+ cmd_bytes = [to_bytes(arg) for arg in cmd]
env_bytes = dict((to_bytes(k), to_bytes(v)) for k, v in env.items())
process = subprocess.Popen(cmd_bytes, env=env_bytes, stdin=stdin, stdout=stdout, stderr=stderr, cwd=cwd) # pylint: disable=consider-using-with
except OSError as ex:
@@ -504,9 +505,9 @@ def communicate_with_process(
stderr: bool,
capture: bool,
output_stream: OutputStream,
-) -> t.Tuple[bytes, bytes]:
+) -> tuple[bytes, bytes]:
"""Communicate with the specified process, handling stdin/stdout/stderr as requested."""
- threads: t.List[WrappedThread] = []
+ threads: list[WrappedThread] = []
reader: t.Type[ReaderThread]
if capture:
@@ -577,7 +578,7 @@ class ReaderThread(WrappedThread, metaclass=abc.ABCMeta):
self.handle = handle
self.buffer = buffer
- self.lines: t.List[bytes] = []
+ self.lines: list[bytes] = []
@abc.abstractmethod
def _run(self) -> None:
@@ -665,7 +666,7 @@ def report_locale() -> None:
display.warning(LOCALE_WARNING)
-def pass_vars(required: t.Collection[str], optional: t.Collection[str]) -> t.Dict[str, str]:
+def pass_vars(required: c.Collection[str], optional: c.Collection[str]) -> dict[str, str]:
"""Return a filtered dictionary of environment variables based on the current environment."""
env = {}
@@ -895,12 +896,12 @@ class SubprocessError(ApplicationError):
"""Error resulting from failed subprocess execution."""
def __init__(
self,
- cmd: t.List[str],
+ cmd: list[str],
status: int = 0,
stdout: t.Optional[str] = None,
stderr: t.Optional[str] = None,
runtime: t.Optional[float] = None,
- error_callback: t.Optional[t.Callable[[SubprocessError], None]] = None,
+ error_callback: t.Optional[c.Callable[[SubprocessError], None]] = None,
) -> None:
message = 'Command "%s" returned exit status %s.\n' % (shlex.join(cmd), status)
@@ -949,7 +950,7 @@ def retry(func, ex_type=SubprocessError, sleep=10, attempts=10, warn=True):
return func()
-def parse_to_list_of_dict(pattern: str, value: str) -> t.List[t.Dict[str, str]]:
+def parse_to_list_of_dict(pattern: str, value: str) -> list[dict[str, str]]:
"""Parse lines from the given value using the specified pattern and return the extracted list of key/value pair dictionaries."""
matched = []
unmatched = []
@@ -968,10 +969,10 @@ def parse_to_list_of_dict(pattern: str, value: str) -> t.List[t.Dict[str, str]]:
return matched
-def get_subclasses(class_type: t.Type[C]) -> t.List[t.Type[C]]:
+def get_subclasses(class_type: t.Type[C]) -> list[t.Type[C]]:
"""Returns a list of types that are concrete subclasses of the given type."""
- subclasses: t.Set[t.Type[C]] = set()
- queue: t.List[t.Type[C]] = [class_type]
+ subclasses: set[t.Type[C]] = set()
+ queue: list[t.Type[C]] = [class_type]
while queue:
parent = queue.pop()
@@ -996,7 +997,7 @@ def is_subdir(candidate_path: str, path: str) -> bool:
return candidate_path.startswith(path)
-def paths_to_dirs(paths: t.List[str]) -> t.List[str]:
+def paths_to_dirs(paths: list[str]) -> list[str]:
"""Returns a list of directories extracted from the given list of paths."""
dir_names = set()
@@ -1012,17 +1013,17 @@ def paths_to_dirs(paths: t.List[str]) -> t.List[str]:
return sorted(dir_names)
-def str_to_version(version: str) -> t.Tuple[int, ...]:
+def str_to_version(version: str) -> tuple[int, ...]:
"""Return a version tuple from a version string."""
return tuple(int(n) for n in version.split('.'))
-def version_to_str(version: t.Tuple[int, ...]) -> str:
+def version_to_str(version: tuple[int, ...]) -> str:
"""Return a version string from a version tuple."""
return '.'.join(str(n) for n in version)
-def sorted_versions(versions: t.List[str]) -> t.List[str]:
+def sorted_versions(versions: list[str]) -> list[str]:
"""Return a sorted copy of the given list of versions."""
return [version_to_str(version) for version in sorted(str_to_version(version) for version in versions)]
@@ -1044,12 +1045,12 @@ def import_plugins(directory: str, root: t.Optional[str] = None) -> None:
load_module(module_path, name)
-def load_plugins(base_type: t.Type[C], database: t.Dict[str, t.Type[C]]) -> None:
+def load_plugins(base_type: t.Type[C], database: dict[str, t.Type[C]]) -> None:
"""
Load plugins of the specified type and track them in the specified database.
Only plugins which have already been imported will be loaded.
"""
- plugins: t.Dict[str, t.Type[C]] = dict((sc.__module__.rsplit('.', 1)[1], sc) for sc in get_subclasses(base_type))
+ plugins: dict[str, t.Type[C]] = dict((sc.__module__.rsplit('.', 1)[1], sc) for sc in get_subclasses(base_type))
for plugin in plugins:
database[plugin] = plugins[plugin]
@@ -1078,12 +1079,12 @@ def get_generic_type(base_type: t.Type, generic_base_type: t.Type[TValue]) -> t.
return None if isinstance(type_arg, generic_base_type) else type_arg
-def get_type_associations(base_type: t.Type[TBase], generic_base_type: t.Type[TValue]) -> t.List[t.Tuple[t.Type[TValue], t.Type[TBase]]]:
+def get_type_associations(base_type: t.Type[TBase], generic_base_type: t.Type[TValue]) -> list[tuple[t.Type[TValue], t.Type[TBase]]]:
"""Create and return a list of tuples associating generic_base_type derived types with a corresponding base_type derived type."""
return [item for item in [(get_generic_type(sc_type, generic_base_type), sc_type) for sc_type in get_subclasses(base_type)] if item[1]]
-def get_type_map(base_type: t.Type[TBase], generic_base_type: t.Type[TValue]) -> t.Dict[t.Type[TValue], t.Type[TBase]]:
+def get_type_map(base_type: t.Type[TBase], generic_base_type: t.Type[TValue]) -> dict[t.Type[TValue], t.Type[TBase]]:
"""Create and return a mapping of generic_base_type derived types to base_type derived types."""
return {item[0]: item[1] for item in get_type_associations(base_type, generic_base_type)}
@@ -1104,7 +1105,7 @@ def verify_sys_executable(path: str) -> t.Optional[str]:
return expected_executable
-def type_guard(sequence: t.Sequence[t.Any], guard_type: t.Type[C]) -> TypeGuard[t.Sequence[C]]:
+def type_guard(sequence: c.Sequence[t.Any], guard_type: t.Type[C]) -> TypeGuard[c.Sequence[C]]:
"""
Raises an exception if any item in the given sequence does not match the specified guard type.
Use with assert so that type checkers are aware of the type guard.