summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatt Martz <matt@sivel.net>2021-03-18 20:12:29 +0100
committerGitHub <noreply@github.com>2021-03-18 20:12:29 +0100
commit78f34786dd468c42d7a222468685590207e74679 (patch)
tree976813b5d508c6c27e841b18a712ff1a0b40c991
parentUpdated COLLECTIONS_4.rst to include ansible-inclusion repo link for new coll... (diff)
downloadansible-78f34786dd468c42d7a222468685590207e74679.tar.xz
ansible-78f34786dd468c42d7a222468685590207e74679.zip
Send callbacks directly from the TaskExecutor instead of TaskResults masquerading as callbacks (#73927)
-rw-r--r--changelogs/fragments/73899-more-te-callbacks.yml5
-rw-r--r--lib/ansible/executor/task_executor.py25
-rw-r--r--lib/ansible/plugins/strategy/__init__.py66
-rw-r--r--test/units/plugins/strategy/test_strategy.py32
4 files changed, 74 insertions, 54 deletions
diff --git a/changelogs/fragments/73899-more-te-callbacks.yml b/changelogs/fragments/73899-more-te-callbacks.yml
new file mode 100644
index 0000000000..4980325980
--- /dev/null
+++ b/changelogs/fragments/73899-more-te-callbacks.yml
@@ -0,0 +1,5 @@
+minor_changes:
+- Callbacks - Migrate more places in the ``TaskExecutor`` to sending callbacks directly
+ over the queue, instead of sending them as ``TaskResult`` and short circuiting in the
+ Strategy to send the callback. This enables closer to real time callbacks of retries
+ and loop results (https://github.com/ansible/ansible/issues/73899)
diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py
index 55db183016..876e8c834b 100644
--- a/lib/ansible/executor/task_executor.py
+++ b/lib/ansible/executor/task_executor.py
@@ -382,12 +382,21 @@ class TaskExecutor:
'msg': 'Failed to template loop_control.label: %s' % to_text(e)
})
- self._final_q.send_task_result(
+ tr = TaskResult(
self._host.name,
self._task._uuid,
res,
task_fields=task_fields,
)
+ if tr.is_failed() or tr.is_unreachable():
+ self._final_q.send_callback('v2_runner_item_on_failed', tr)
+ elif tr.is_skipped():
+ self._final_q.send_callback('v2_runner_item_on_skipped', tr)
+ else:
+ if getattr(self._task, 'diff', False):
+ self._final_q.send_callback('v2_on_file_diff', tr)
+ self._final_q.send_callback('v2_runner_item_on_ok', tr)
+
results.append(res)
del task_vars[loop_var]
@@ -673,7 +682,15 @@ class TaskExecutor:
result['_ansible_retry'] = True
result['retries'] = retries
display.debug('Retrying task, attempt %d of %d' % (attempt, retries))
- self._final_q.send_task_result(self._host.name, self._task._uuid, result, task_fields=self._task.dump_attrs())
+ self._final_q.send_callback(
+ 'v2_runner_retry',
+ TaskResult(
+ self._host.name,
+ self._task._uuid,
+ result,
+ task_fields=self._task.dump_attrs()
+ )
+ )
time.sleep(delay)
self._handler = self._get_action_handler(connection=self._connection, templar=templar)
else:
@@ -782,8 +799,8 @@ class TaskExecutor:
self._final_q.send_callback(
'v2_runner_on_async_poll',
TaskResult(
- self._host,
- async_task,
+ self._host.name,
+ async_task, # We send the full task here, because the controller knows nothing about it, the TE created it
async_result,
task_fields=self._task.dump_attrs(),
),
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index a1377f8863..70dea1ea6f 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -94,8 +94,13 @@ def results_thread_main(strategy):
if isinstance(result, StrategySentinel):
break
elif isinstance(result, CallbackSend):
+ for arg in result.args:
+ if isinstance(arg, TaskResult):
+ strategy.normalize_task_result(arg)
+ break
strategy._tqm.send_callback(result.method_name, *result.args, **result.kwargs)
elif isinstance(result, TaskResult):
+ strategy.normalize_task_result(result)
with strategy._results_lock:
# only handlers have the listen attr, so this must be a handler
# we split up the results into two queues here to make sure
@@ -446,6 +451,31 @@ class StrategyBase:
for target_host in host_list:
_set_host_facts(target_host, always_facts)
+ def normalize_task_result(self, task_result):
+ """Normalize a TaskResult to reference actual Host and Task objects
+ when only given the ``Host.name``, or the ``Task._uuid``
+
+ Only the ``Host.name`` and ``Task._uuid`` are commonly sent back from
+ the ``TaskExecutor`` or ``WorkerProcess`` due to performance concerns
+
+ Mutates the original object
+ """
+
+ if isinstance(task_result._host, string_types):
+ # If the value is a string, it is ``Host.name``
+ task_result._host = self._inventory.get_host(to_text(task_result._host))
+
+ if isinstance(task_result._task, string_types):
+ # If the value is a string, it is ``Task._uuid``
+ queue_cache_entry = (task_result._host.name, task_result._task)
+ found_task = self._queued_task_cache.get(queue_cache_entry)['task']
+ original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
+ original_task._parent = found_task._parent
+ original_task.from_attrs(task_result._task_fields)
+ task_result._task = original_task
+
+ return task_result
+
@debug_closure
def _process_pending_results(self, iterator, one_pass=False, max_passes=None, do_handlers=False):
'''
@@ -456,14 +486,6 @@ class StrategyBase:
ret_results = []
handler_templar = Templar(self._loader)
- def get_original_host(host_name):
- # FIXME: this should not need x2 _inventory
- host_name = to_text(host_name)
- if host_name in self._inventory.hosts:
- return self._inventory.hosts[host_name]
- else:
- return self._inventory.get_host(host_name)
-
def search_handler_blocks_by_name(handler_name, handler_blocks):
# iterate in reversed order since last handler loaded with the same name wins
for handler_block in reversed(handler_blocks):
@@ -512,32 +534,8 @@ class StrategyBase:
finally:
self._results_lock.release()
- # get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
- original_host = get_original_host(task_result._host)
- queue_cache_entry = (original_host.name, task_result._task)
- found_task = self._queued_task_cache.get(queue_cache_entry)['task']
- original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
- original_task._parent = found_task._parent
- original_task.from_attrs(task_result._task_fields)
-
- task_result._host = original_host
- task_result._task = original_task
-
- # send callbacks for 'non final' results
- if '_ansible_retry' in task_result._result:
- self._tqm.send_callback('v2_runner_retry', task_result)
- continue
- elif '_ansible_item_result' in task_result._result:
- if task_result.is_failed() or task_result.is_unreachable():
- self._tqm.send_callback('v2_runner_item_on_failed', task_result)
- elif task_result.is_skipped():
- self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
- else:
- if 'diff' in task_result._result:
- if self._diff or getattr(original_task, 'diff', False):
- self._tqm.send_callback('v2_on_file_diff', task_result)
- self._tqm.send_callback('v2_runner_item_on_ok', task_result)
- continue
+ original_host = task_result._host
+ original_task = task_result._task
# all host status messages contain 2 entries: (msg, task_result)
role_ran = False
diff --git a/test/units/plugins/strategy/test_strategy.py b/test/units/plugins/strategy/test_strategy.py
index 9a2574d279..bfb694f2fb 100644
--- a/test/units/plugins/strategy/test_strategy.py
+++ b/test/units/plugins/strategy/test_strategy.py
@@ -20,7 +20,6 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from units.mock.loader import DictDataLoader
-from copy import deepcopy
import uuid
from units.compat import unittest
@@ -254,7 +253,7 @@ class TestStrategyBase(unittest.TestCase):
mock_task._parent = None
mock_task.ignore_errors = False
mock_task.ignore_unreachable = False
- mock_task._uuid = uuid.uuid4()
+ mock_task._uuid = str(uuid.uuid4())
mock_task.loop = None
mock_task.copy.return_value = mock_task
@@ -319,16 +318,17 @@ class TestStrategyBase(unittest.TestCase):
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
- mock_queued_task_cache = {
- (mock_host.name, mock_task._uuid): {
- 'task': mock_task,
- 'host': mock_host,
- 'task_vars': {},
- 'play_context': {},
+ def mock_queued_task_cache():
+ return {
+ (mock_host.name, mock_task._uuid): {
+ 'task': mock_task,
+ 'host': mock_host,
+ 'task_vars': {},
+ 'play_context': {},
+ }
}
- }
- strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
+ strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
@@ -340,7 +340,7 @@ class TestStrategyBase(unittest.TestCase):
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
mock_iterator.is_failed.return_value = True
- strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
+ strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
@@ -354,7 +354,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
- strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
+ strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
@@ -367,7 +367,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
- strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
+ strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
@@ -377,7 +377,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo']))))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
- strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
+ strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0)
@@ -386,7 +386,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo'))))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
- strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
+ strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0)
@@ -395,7 +395,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler'])))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
- strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
+ strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0)