summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/util.py')
-rw-r--r--test/lib/ansible_test/_internal/util.py85
1 files changed, 27 insertions, 58 deletions
diff --git a/test/lib/ansible_test/_internal/util.py b/test/lib/ansible_test/_internal/util.py
index 72058cb997..2b0218fb38 100644
--- a/test/lib/ansible_test/_internal/util.py
+++ b/test/lib/ansible_test/_internal/util.py
@@ -45,6 +45,17 @@ except ImportError:
from . import types as t
+from .encoding import (
+ to_bytes,
+ to_optional_bytes,
+ to_optional_text,
+)
+
+from .io import (
+ open_binary_file,
+ read_text_file,
+)
+
try:
C = t.TypeVar('C')
except AttributeError:
@@ -95,10 +106,6 @@ MODE_FILE_WRITE = MODE_FILE | stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
MODE_DIRECTORY = MODE_READ | stat.S_IWUSR | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
MODE_DIRECTORY_WRITE = MODE_DIRECTORY | stat.S_IWGRP | stat.S_IWOTH
-ENCODING = 'utf-8'
-
-Text = type(u'')
-
REMOTE_ONLY_PYTHON_VERSIONS = (
'2.6',
)
@@ -113,38 +120,6 @@ SUPPORTED_PYTHON_VERSIONS = (
)
-def to_optional_bytes(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[bytes]
- """Return the given value as bytes encoded using UTF-8 if not already bytes, or None if the value is None."""
- return None if value is None else to_bytes(value, errors)
-
-
-def to_optional_text(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[t.Text]
- """Return the given value as text decoded using UTF-8 if not already text, or None if the value is None."""
- return None if value is None else to_text(value, errors)
-
-
-def to_bytes(value, errors='strict'): # type: (t.AnyStr, str) -> bytes
- """Return the given value as bytes encoded using UTF-8 if not already bytes."""
- if isinstance(value, bytes):
- return value
-
- if isinstance(value, Text):
- return value.encode(ENCODING, errors)
-
- raise Exception('value is not bytes or text: %s' % type(value))
-
-
-def to_text(value, errors='strict'): # type: (t.AnyStr, str) -> t.Text
- """Return the given value as text decoded using UTF-8 if not already text."""
- if isinstance(value, bytes):
- return value.decode(ENCODING, errors)
-
- if isinstance(value, Text):
- return value
-
- raise Exception('value is not bytes or text: %s' % type(value))
-
-
def get_docker_completion():
"""
:rtype: dict[str, dict[str, str]]
@@ -213,8 +188,7 @@ def read_lines_without_comments(path, remove_blank_lines=False, optional=False):
if optional and not os.path.exists(path):
return []
- with open(path, 'r') as path_fd:
- lines = path_fd.read().splitlines()
+ lines = read_text_file(path).splitlines()
lines = [re.sub(r' *#.*$', '', line) for line in lines]
@@ -521,17 +495,6 @@ def remove_tree(path):
raise
-def make_dirs(path):
- """
- :type path: str
- """
- try:
- os.makedirs(to_bytes(path))
- except OSError as ex:
- if ex.errno != errno.EEXIST:
- raise
-
-
def is_binary_file(path):
"""
:type path: str
@@ -587,7 +550,8 @@ def is_binary_file(path):
if ext in assume_binary:
return True
- with open(path, 'rb') as path_fd:
+ with open_binary_file(path) as path_fd:
+ # noinspection PyTypeChecker
return b'\0' in path_fd.read(1024)
@@ -658,11 +622,15 @@ class Display:
for warning in self.warnings:
self.__warning(warning)
- def warning(self, message, unique=False):
+ def warning(self, message, unique=False, verbosity=0):
"""
:type message: str
:type unique: bool
+ :type verbosity: int
"""
+ if verbosity > self.verbosity:
+ return
+
if unique:
if message in self.warnings_unique:
return
@@ -839,11 +807,11 @@ def get_subclasses(class_type): # type: (t.Type[C]) -> t.Set[t.Type[C]]
def is_subdir(candidate_path, path): # type: (str, str) -> bool
"""Returns true if candidate_path is path or a subdirectory of path."""
- if not path.endswith(os.sep):
- path += os.sep
+ if not path.endswith(os.path.sep):
+ path += os.path.sep
- if not candidate_path.endswith(os.sep):
- candidate_path += os.sep
+ if not candidate_path.endswith(os.path.sep):
+ candidate_path += os.path.sep
return candidate_path.startswith(path)
@@ -874,10 +842,10 @@ def import_plugins(directory, root=None): # type: (str, t.Optional[str]) -> Non
path = os.path.join(root, directory)
package = __name__.rsplit('.', 1)[0]
- prefix = '%s.%s.' % (package, directory.replace(os.sep, '.'))
+ prefix = '%s.%s.' % (package, directory.replace(os.path.sep, '.'))
for (_module_loader, name, _ispkg) in pkgutil.iter_modules([path], prefix=prefix):
- module_path = os.path.join(root, name[len(package) + 1:].replace('.', os.sep) + '.py')
+ module_path = os.path.join(root, name[len(package) + 1:].replace('.', os.path.sep) + '.py')
load_module(module_path, name)
@@ -912,7 +880,8 @@ def load_module(path, name): # type: (str, str) -> None
# noinspection PyDeprecation
import imp
- with open(path, 'r') as module_file:
+ # load_source (and thus load_module) require a file opened with `open` in text mode
+ with open(to_bytes(path)) as module_file:
# noinspection PyDeprecation
imp.load_module(name, module_file, path, ('.py', 'r', imp.PY_SOURCE))