summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatt Davis <6775756+nitzmahone@users.noreply.github.com>2024-09-03 20:23:38 +0200
committerGitHub <noreply@github.com>2024-09-03 20:23:38 +0200
commit24e5b0d4fcd6c1a1eb7f46ef11d35adb9f459b32 (patch)
tree93ac9c8b8480bf53e8d65df4d79d0450b27972b0
parentpsrp - Remove extras lookups (#83760) (diff)
downloadansible-24e5b0d4fcd6c1a1eb7f46ef11d35adb9f459b32.tar.xz
ansible-24e5b0d4fcd6c1a1eb7f46ef11d35adb9f459b32.zip
Add DaemonThreadPoolExecutor impl (#83880)
* Add DaemonThreadPoolExecutor impl * Provide a simple parallel execution method with the ability to abandon timed-out operations that won't block threadpool/process shutdown, and without a dependency on /dev/shm (as multiprocessing Thread/Process pools have). * Create module_utils/_internal to ensure that this is clearly not supported for public consumption.
-rw-r--r--lib/ansible/module_utils/_internal/__init__.py0
-rw-r--r--lib/ansible/module_utils/_internal/_concurrent/__init__.py0
-rw-r--r--lib/ansible/module_utils/_internal/_concurrent/_daemon_threading.py28
-rw-r--r--lib/ansible/module_utils/_internal/_concurrent/_futures.py21
-rw-r--r--test/units/module_utils/_internal/__init__.py0
-rw-r--r--test/units/module_utils/_internal/_concurrent/__init__.py0
-rw-r--r--test/units/module_utils/_internal/_concurrent/test_daemon_threading.py15
-rw-r--r--test/units/module_utils/_internal/_concurrent/test_futures.py62
8 files changed, 126 insertions, 0 deletions
diff --git a/lib/ansible/module_utils/_internal/__init__.py b/lib/ansible/module_utils/_internal/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/lib/ansible/module_utils/_internal/__init__.py
diff --git a/lib/ansible/module_utils/_internal/_concurrent/__init__.py b/lib/ansible/module_utils/_internal/_concurrent/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/lib/ansible/module_utils/_internal/_concurrent/__init__.py
diff --git a/lib/ansible/module_utils/_internal/_concurrent/_daemon_threading.py b/lib/ansible/module_utils/_internal/_concurrent/_daemon_threading.py
new file mode 100644
index 0000000000..0b32a062fe
--- /dev/null
+++ b/lib/ansible/module_utils/_internal/_concurrent/_daemon_threading.py
@@ -0,0 +1,28 @@
+"""Proxy stdlib threading module that only supports non-joinable daemon threads."""
+# NB: all new local module attrs are _ prefixed to ensure an identical public attribute surface area to the module we're proxying
+
+from __future__ import annotations as _annotations
+
+import threading as _threading
+import typing as _t
+
+
+class _DaemonThread(_threading.Thread):
+ """
+ Daemon-only Thread subclass; prevents running threads of this type from blocking interpreter shutdown and process exit.
+ The join() method is a no-op.
+ """
+
+ def __init__(self, *args, daemon: bool | None = None, **kwargs) -> None:
+ super().__init__(*args, daemon=daemon or True, **kwargs)
+
+ def join(self, timeout=None) -> None:
+ """ThreadPoolExecutor's atexit handler joins all queue threads before allowing shutdown; prevent them from blocking."""
+
+
+Thread = _DaemonThread # shadow the real Thread attr with our _DaemonThread
+
+
+def __getattr__(name: str) -> _t.Any:
+ """Delegate anything not defined locally to the real `threading` module."""
+ return getattr(_threading, name)
diff --git a/lib/ansible/module_utils/_internal/_concurrent/_futures.py b/lib/ansible/module_utils/_internal/_concurrent/_futures.py
new file mode 100644
index 0000000000..2ca493f687
--- /dev/null
+++ b/lib/ansible/module_utils/_internal/_concurrent/_futures.py
@@ -0,0 +1,21 @@
+"""Utilities for concurrent code execution using futures."""
+
+from __future__ import annotations
+
+import concurrent.futures
+import types
+
+from . import _daemon_threading
+
+
+class DaemonThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
+ """ThreadPoolExecutor subclass that creates non-joinable daemon threads for non-blocking pool and process shutdown with abandoned threads."""
+
+ atc = concurrent.futures.ThreadPoolExecutor._adjust_thread_count
+
+ # clone the base class `_adjust_thread_count` method with a copy of its globals dict
+ _adjust_thread_count = types.FunctionType(atc.__code__, atc.__globals__.copy(), name=atc.__name__, argdefs=atc.__defaults__, closure=atc.__closure__)
+ # patch the method closure's `threading` module import to use our daemon-only thread factory instead
+ _adjust_thread_count.__globals__.update(threading=_daemon_threading)
+
+ del atc # don't expose this as a class attribute
diff --git a/test/units/module_utils/_internal/__init__.py b/test/units/module_utils/_internal/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/test/units/module_utils/_internal/__init__.py
diff --git a/test/units/module_utils/_internal/_concurrent/__init__.py b/test/units/module_utils/_internal/_concurrent/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/test/units/module_utils/_internal/_concurrent/__init__.py
diff --git a/test/units/module_utils/_internal/_concurrent/test_daemon_threading.py b/test/units/module_utils/_internal/_concurrent/test_daemon_threading.py
new file mode 100644
index 0000000000..4140fae1ae
--- /dev/null
+++ b/test/units/module_utils/_internal/_concurrent/test_daemon_threading.py
@@ -0,0 +1,15 @@
+from __future__ import annotations
+
+import threading
+
+from ansible.module_utils._internal._concurrent import _daemon_threading
+
+
+def test_daemon_thread_getattr() -> None:
+ """Ensure that the threading module proxy delegates properly to the real module."""
+ assert _daemon_threading.current_thread is threading.current_thread
+
+
+def test_daemon_threading_thread_override() -> None:
+ """Ensure that the proxy module's Thread attribute is different from the real module's."""
+ assert _daemon_threading.Thread is not threading.Thread
diff --git a/test/units/module_utils/_internal/_concurrent/test_futures.py b/test/units/module_utils/_internal/_concurrent/test_futures.py
new file mode 100644
index 0000000000..71e032da27
--- /dev/null
+++ b/test/units/module_utils/_internal/_concurrent/test_futures.py
@@ -0,0 +1,62 @@
+from __future__ import annotations
+
+import concurrent.futures as _cf
+import subprocess
+import sys
+import time
+
+import pytest
+
+from ansible.module_utils._internal._concurrent import _futures
+
+
+def test_daemon_thread_pool_nonblocking_cm_exit() -> None:
+ """Ensure that the ThreadPoolExecutor context manager exit is not blocked by in-flight tasks."""
+ with _futures.DaemonThreadPoolExecutor(max_workers=1) as executor:
+ future = executor.submit(time.sleep, 5)
+
+ with pytest.raises(_cf.TimeoutError): # deprecated: description='aliased to stdlib TimeoutError in 3.11' python_version='3.10'
+ future.result(timeout=1)
+
+ assert future.running() # ensure the future is still going (ie, we didn't have to wait for it to return)
+
+
+_task_success_msg = "work completed"
+_process_success_msg = "exit success"
+_timeout_sec = 3
+_sleep_time_sec = _timeout_sec * 2
+
+
+def test_blocking_shutdown() -> None:
+ """Run with the DaemonThreadPoolExecutor patch disabled to verify that shutdown is blocked by in-flight tasks."""
+ with pytest.raises(subprocess.TimeoutExpired):
+ subprocess.run(args=[sys.executable, __file__, 'block'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True, timeout=_timeout_sec)
+
+
+def test_non_blocking_shutdown() -> None:
+ """Run with the DaemonThreadPoolExecutor patch enabled to verify that shutdown is not blocked by in-flight tasks."""
+ cp = subprocess.run(args=[sys.executable, __file__, ''], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True, timeout=_timeout_sec)
+
+ assert _task_success_msg in cp.stdout
+ assert _process_success_msg in cp.stdout
+
+
+def _run_blocking_exit_test(use_patched: bool) -> None: # pragma: nocover
+ """Helper for external process integration test."""
+ tpe_type = _futures.DaemonThreadPoolExecutor if use_patched else _cf.ThreadPoolExecutor
+
+ with tpe_type(max_workers=2) as tp:
+ fs_non_blocking = tp.submit(lambda: print(_task_success_msg))
+ assert [tp.submit(time.sleep, _sleep_time_sec) for _idx in range(4)] # not a pointless statement
+ fs_non_blocking.result(timeout=1)
+
+ print(_process_success_msg)
+
+
+def main() -> None: # pragma: nocover
+ """Used by test_(non)blocking_shutdown as a script-style run."""
+ _run_blocking_exit_test(sys.argv[1] != 'block')
+
+
+if __name__ == '__main__': # pragma: nocover
+ main()