diff options
author | Matt Clay <matt@mystile.com> | 2022-08-05 07:29:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-05 07:29:38 +0200 |
commit | 85acf4d1e55e95c266a35c49f74af3c0f251de08 (patch) | |
tree | e30d4ad68ac64f342f0c4371c1d76e8b776419b4 /test/lib/ansible_test/_internal/util.py | |
parent | ansible-test - More type hint updates. (#78455) (diff) | |
download | ansible-85acf4d1e55e95c266a35c49f74af3c0f251de08.tar.xz ansible-85acf4d1e55e95c266a35c49f74af3c0f251de08.zip |
ansible-test - Avoid use of deprecated type hints. (#78456)
* ansible-test - Avoid use of deprecated type hints.
PEP 585 deprecated many container types in the `typing` module in favor of the actual types, which support subscripting as of Python 3.9.
Conversion of `t.Type` was skipped since PyCharm does not currently recognize it.
* ansible-test - Fix `t` and `c` imports/shadowing.
Diffstat (limited to 'test/lib/ansible_test/_internal/util.py')
-rw-r--r-- | test/lib/ansible_test/_internal/util.py | 65 |
1 files changed, 33 insertions, 32 deletions
diff --git a/test/lib/ansible_test/_internal/util.py b/test/lib/ansible_test/_internal/util.py index 5b1149fe17..42cf77a66b 100644 --- a/test/lib/ansible_test/_internal/util.py +++ b/test/lib/ansible_test/_internal/util.py @@ -2,6 +2,7 @@ from __future__ import annotations import abc +import collections.abc as c import errno import enum import fcntl @@ -62,7 +63,7 @@ TBase = t.TypeVar('TBase') TKey = t.TypeVar('TKey') TValue = t.TypeVar('TValue') -PYTHON_PATHS: t.Dict[str, str] = {} +PYTHON_PATHS: dict[str, str] = {} COVERAGE_CONFIG_NAME = 'coveragerc' @@ -141,9 +142,9 @@ def is_valid_identifier(value: str) -> bool: return value.isidentifier() and not keyword.iskeyword(value) -def cache(func: t.Callable[[], TValue]) -> t.Callable[[], TValue]: +def cache(func: c.Callable[[], TValue]) -> c.Callable[[], TValue]: """Enforce exclusive access on a decorated function and cache the result.""" - storage: t.Dict[None, TValue] = {} + storage: dict[None, TValue] = {} sentinel = object() @functools.wraps(func) @@ -162,7 +163,7 @@ def cache(func: t.Callable[[], TValue]) -> t.Callable[[], TValue]: @mutex def detect_architecture(python: str) -> t.Optional[str]: """Detect the architecture of the specified Python and return a normalized version, or None if it cannot be determined.""" - results: t.Dict[str, t.Optional[str]] + results: dict[str, t.Optional[str]] try: results = detect_architecture.results # type: ignore[attr-defined] @@ -211,7 +212,7 @@ def detect_architecture(python: str) -> t.Optional[str]: return architecture -def filter_args(args: t.List[str], filters: t.Dict[str, int]) -> t.List[str]: +def filter_args(args: list[str], filters: dict[str, int]) -> list[str]: """Return a filtered version of the given command line arguments.""" remaining = 0 result = [] @@ -235,7 +236,7 @@ def filter_args(args: t.List[str], filters: t.Dict[str, int]) -> t.List[str]: return result -def read_lines_without_comments(path: str, remove_blank_lines: bool = False, optional: bool = False) -> t.List[str]: +def read_lines_without_comments(path: str, remove_blank_lines: bool = False, optional: bool = False) -> list[str]: """ Returns lines from the specified text file with comments removed. Comments are any content from a hash symbol to the end of a line. @@ -254,7 +255,7 @@ def read_lines_without_comments(path: str, remove_blank_lines: bool = False, opt return lines -def exclude_none_values(data: t.Dict[TKey, t.Optional[TValue]]) -> t.Dict[TKey, TValue]: +def exclude_none_values(data: dict[TKey, t.Optional[TValue]]) -> dict[TKey, TValue]: """Return the provided dictionary with any None values excluded.""" return dict((key, value) for key, value in data.items() if value is not None) @@ -339,15 +340,15 @@ def get_ansible_version() -> str: @cache -def get_available_python_versions() -> t.Dict[str, str]: +def get_available_python_versions() -> dict[str, str]: """Return a dictionary indicating which supported Python versions are available.""" return dict((version, path) for version, path in ((version, find_python(version, required=False)) for version in SUPPORTED_PYTHON_VERSIONS) if path) def raw_command( - cmd: t.Iterable[str], + cmd: c.Iterable[str], capture: bool, - env: t.Optional[t.Dict[str, str]] = None, + env: t.Optional[dict[str, str]] = None, data: t.Optional[str] = None, cwd: t.Optional[str] = None, explain: bool = False, @@ -357,8 +358,8 @@ def raw_command( output_stream: t.Optional[OutputStream] = None, cmd_verbosity: int = 1, str_errors: str = 'strict', - error_callback: t.Optional[t.Callable[[SubprocessError], None]] = None, -) -> t.Tuple[t.Optional[str], t.Optional[str]]: + error_callback: t.Optional[c.Callable[[SubprocessError], None]] = None, +) -> tuple[t.Optional[str], t.Optional[str]]: """Run the specified command and return stdout and stderr as a tuple.""" output_stream = output_stream or OutputStream.AUTO @@ -463,7 +464,7 @@ def raw_command( try: try: - cmd_bytes = [to_bytes(c) for c in cmd] + cmd_bytes = [to_bytes(arg) for arg in cmd] env_bytes = dict((to_bytes(k), to_bytes(v)) for k, v in env.items()) process = subprocess.Popen(cmd_bytes, env=env_bytes, stdin=stdin, stdout=stdout, stderr=stderr, cwd=cwd) # pylint: disable=consider-using-with except OSError as ex: @@ -504,9 +505,9 @@ def communicate_with_process( stderr: bool, capture: bool, output_stream: OutputStream, -) -> t.Tuple[bytes, bytes]: +) -> tuple[bytes, bytes]: """Communicate with the specified process, handling stdin/stdout/stderr as requested.""" - threads: t.List[WrappedThread] = [] + threads: list[WrappedThread] = [] reader: t.Type[ReaderThread] if capture: @@ -577,7 +578,7 @@ class ReaderThread(WrappedThread, metaclass=abc.ABCMeta): self.handle = handle self.buffer = buffer - self.lines: t.List[bytes] = [] + self.lines: list[bytes] = [] @abc.abstractmethod def _run(self) -> None: @@ -665,7 +666,7 @@ def report_locale() -> None: display.warning(LOCALE_WARNING) -def pass_vars(required: t.Collection[str], optional: t.Collection[str]) -> t.Dict[str, str]: +def pass_vars(required: c.Collection[str], optional: c.Collection[str]) -> dict[str, str]: """Return a filtered dictionary of environment variables based on the current environment.""" env = {} @@ -895,12 +896,12 @@ class SubprocessError(ApplicationError): """Error resulting from failed subprocess execution.""" def __init__( self, - cmd: t.List[str], + cmd: list[str], status: int = 0, stdout: t.Optional[str] = None, stderr: t.Optional[str] = None, runtime: t.Optional[float] = None, - error_callback: t.Optional[t.Callable[[SubprocessError], None]] = None, + error_callback: t.Optional[c.Callable[[SubprocessError], None]] = None, ) -> None: message = 'Command "%s" returned exit status %s.\n' % (shlex.join(cmd), status) @@ -949,7 +950,7 @@ def retry(func, ex_type=SubprocessError, sleep=10, attempts=10, warn=True): return func() -def parse_to_list_of_dict(pattern: str, value: str) -> t.List[t.Dict[str, str]]: +def parse_to_list_of_dict(pattern: str, value: str) -> list[dict[str, str]]: """Parse lines from the given value using the specified pattern and return the extracted list of key/value pair dictionaries.""" matched = [] unmatched = [] @@ -968,10 +969,10 @@ def parse_to_list_of_dict(pattern: str, value: str) -> t.List[t.Dict[str, str]]: return matched -def get_subclasses(class_type: t.Type[C]) -> t.List[t.Type[C]]: +def get_subclasses(class_type: t.Type[C]) -> list[t.Type[C]]: """Returns a list of types that are concrete subclasses of the given type.""" - subclasses: t.Set[t.Type[C]] = set() - queue: t.List[t.Type[C]] = [class_type] + subclasses: set[t.Type[C]] = set() + queue: list[t.Type[C]] = [class_type] while queue: parent = queue.pop() @@ -996,7 +997,7 @@ def is_subdir(candidate_path: str, path: str) -> bool: return candidate_path.startswith(path) -def paths_to_dirs(paths: t.List[str]) -> t.List[str]: +def paths_to_dirs(paths: list[str]) -> list[str]: """Returns a list of directories extracted from the given list of paths.""" dir_names = set() @@ -1012,17 +1013,17 @@ def paths_to_dirs(paths: t.List[str]) -> t.List[str]: return sorted(dir_names) -def str_to_version(version: str) -> t.Tuple[int, ...]: +def str_to_version(version: str) -> tuple[int, ...]: """Return a version tuple from a version string.""" return tuple(int(n) for n in version.split('.')) -def version_to_str(version: t.Tuple[int, ...]) -> str: +def version_to_str(version: tuple[int, ...]) -> str: """Return a version string from a version tuple.""" return '.'.join(str(n) for n in version) -def sorted_versions(versions: t.List[str]) -> t.List[str]: +def sorted_versions(versions: list[str]) -> list[str]: """Return a sorted copy of the given list of versions.""" return [version_to_str(version) for version in sorted(str_to_version(version) for version in versions)] @@ -1044,12 +1045,12 @@ def import_plugins(directory: str, root: t.Optional[str] = None) -> None: load_module(module_path, name) -def load_plugins(base_type: t.Type[C], database: t.Dict[str, t.Type[C]]) -> None: +def load_plugins(base_type: t.Type[C], database: dict[str, t.Type[C]]) -> None: """ Load plugins of the specified type and track them in the specified database. Only plugins which have already been imported will be loaded. """ - plugins: t.Dict[str, t.Type[C]] = dict((sc.__module__.rsplit('.', 1)[1], sc) for sc in get_subclasses(base_type)) + plugins: dict[str, t.Type[C]] = dict((sc.__module__.rsplit('.', 1)[1], sc) for sc in get_subclasses(base_type)) for plugin in plugins: database[plugin] = plugins[plugin] @@ -1078,12 +1079,12 @@ def get_generic_type(base_type: t.Type, generic_base_type: t.Type[TValue]) -> t. return None if isinstance(type_arg, generic_base_type) else type_arg -def get_type_associations(base_type: t.Type[TBase], generic_base_type: t.Type[TValue]) -> t.List[t.Tuple[t.Type[TValue], t.Type[TBase]]]: +def get_type_associations(base_type: t.Type[TBase], generic_base_type: t.Type[TValue]) -> list[tuple[t.Type[TValue], t.Type[TBase]]]: """Create and return a list of tuples associating generic_base_type derived types with a corresponding base_type derived type.""" return [item for item in [(get_generic_type(sc_type, generic_base_type), sc_type) for sc_type in get_subclasses(base_type)] if item[1]] -def get_type_map(base_type: t.Type[TBase], generic_base_type: t.Type[TValue]) -> t.Dict[t.Type[TValue], t.Type[TBase]]: +def get_type_map(base_type: t.Type[TBase], generic_base_type: t.Type[TValue]) -> dict[t.Type[TValue], t.Type[TBase]]: """Create and return a mapping of generic_base_type derived types to base_type derived types.""" return {item[0]: item[1] for item in get_type_associations(base_type, generic_base_type)} @@ -1104,7 +1105,7 @@ def verify_sys_executable(path: str) -> t.Optional[str]: return expected_executable -def type_guard(sequence: t.Sequence[t.Any], guard_type: t.Type[C]) -> TypeGuard[t.Sequence[C]]: +def type_guard(sequence: c.Sequence[t.Any], guard_type: t.Type[C]) -> TypeGuard[c.Sequence[C]]: """ Raises an exception if any item in the given sequence does not match the specified guard type. Use with assert so that type checkers are aware of the type guard. |