diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/util.py')
-rw-r--r-- | test/lib/ansible_test/_internal/util.py | 85 |
1 files changed, 27 insertions, 58 deletions
diff --git a/test/lib/ansible_test/_internal/util.py b/test/lib/ansible_test/_internal/util.py index 72058cb997..2b0218fb38 100644 --- a/test/lib/ansible_test/_internal/util.py +++ b/test/lib/ansible_test/_internal/util.py @@ -45,6 +45,17 @@ except ImportError: from . import types as t +from .encoding import ( + to_bytes, + to_optional_bytes, + to_optional_text, +) + +from .io import ( + open_binary_file, + read_text_file, +) + try: C = t.TypeVar('C') except AttributeError: @@ -95,10 +106,6 @@ MODE_FILE_WRITE = MODE_FILE | stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH MODE_DIRECTORY = MODE_READ | stat.S_IWUSR | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH MODE_DIRECTORY_WRITE = MODE_DIRECTORY | stat.S_IWGRP | stat.S_IWOTH -ENCODING = 'utf-8' - -Text = type(u'') - REMOTE_ONLY_PYTHON_VERSIONS = ( '2.6', ) @@ -113,38 +120,6 @@ SUPPORTED_PYTHON_VERSIONS = ( ) -def to_optional_bytes(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[bytes] - """Return the given value as bytes encoded using UTF-8 if not already bytes, or None if the value is None.""" - return None if value is None else to_bytes(value, errors) - - -def to_optional_text(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[t.Text] - """Return the given value as text decoded using UTF-8 if not already text, or None if the value is None.""" - return None if value is None else to_text(value, errors) - - -def to_bytes(value, errors='strict'): # type: (t.AnyStr, str) -> bytes - """Return the given value as bytes encoded using UTF-8 if not already bytes.""" - if isinstance(value, bytes): - return value - - if isinstance(value, Text): - return value.encode(ENCODING, errors) - - raise Exception('value is not bytes or text: %s' % type(value)) - - -def to_text(value, errors='strict'): # type: (t.AnyStr, str) -> t.Text - """Return the given value as text decoded using UTF-8 if not already text.""" - if isinstance(value, bytes): - return value.decode(ENCODING, errors) - - if isinstance(value, Text): - return value - - raise Exception('value is not bytes or text: %s' % type(value)) - - def get_docker_completion(): """ :rtype: dict[str, dict[str, str]] @@ -213,8 +188,7 @@ def read_lines_without_comments(path, remove_blank_lines=False, optional=False): if optional and not os.path.exists(path): return [] - with open(path, 'r') as path_fd: - lines = path_fd.read().splitlines() + lines = read_text_file(path).splitlines() lines = [re.sub(r' *#.*$', '', line) for line in lines] @@ -521,17 +495,6 @@ def remove_tree(path): raise -def make_dirs(path): - """ - :type path: str - """ - try: - os.makedirs(to_bytes(path)) - except OSError as ex: - if ex.errno != errno.EEXIST: - raise - - def is_binary_file(path): """ :type path: str @@ -587,7 +550,8 @@ def is_binary_file(path): if ext in assume_binary: return True - with open(path, 'rb') as path_fd: + with open_binary_file(path) as path_fd: + # noinspection PyTypeChecker return b'\0' in path_fd.read(1024) @@ -658,11 +622,15 @@ class Display: for warning in self.warnings: self.__warning(warning) - def warning(self, message, unique=False): + def warning(self, message, unique=False, verbosity=0): """ :type message: str :type unique: bool + :type verbosity: int """ + if verbosity > self.verbosity: + return + if unique: if message in self.warnings_unique: return @@ -839,11 +807,11 @@ def get_subclasses(class_type): # type: (t.Type[C]) -> t.Set[t.Type[C]] def is_subdir(candidate_path, path): # type: (str, str) -> bool """Returns true if candidate_path is path or a subdirectory of path.""" - if not path.endswith(os.sep): - path += os.sep + if not path.endswith(os.path.sep): + path += os.path.sep - if not candidate_path.endswith(os.sep): - candidate_path += os.sep + if not candidate_path.endswith(os.path.sep): + candidate_path += os.path.sep return candidate_path.startswith(path) @@ -874,10 +842,10 @@ def import_plugins(directory, root=None): # type: (str, t.Optional[str]) -> Non path = os.path.join(root, directory) package = __name__.rsplit('.', 1)[0] - prefix = '%s.%s.' % (package, directory.replace(os.sep, '.')) + prefix = '%s.%s.' % (package, directory.replace(os.path.sep, '.')) for (_module_loader, name, _ispkg) in pkgutil.iter_modules([path], prefix=prefix): - module_path = os.path.join(root, name[len(package) + 1:].replace('.', os.sep) + '.py') + module_path = os.path.join(root, name[len(package) + 1:].replace('.', os.path.sep) + '.py') load_module(module_path, name) @@ -912,7 +880,8 @@ def load_module(path, name): # type: (str, str) -> None # noinspection PyDeprecation import imp - with open(path, 'r') as module_file: + # load_source (and thus load_module) require a file opened with `open` in text mode + with open(to_bytes(path)) as module_file: # noinspection PyDeprecation imp.load_module(name, module_file, path, ('.py', 'r', imp.PY_SOURCE)) |