diff options
author | Alan Rominger <arominge@redhat.com> | 2022-09-06 21:45:29 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-06 21:45:29 +0200 |
commit | 15964dc3959472950db23ed6463c7f4e1978192c (patch) | |
tree | f63e6acd5696a8355ee81affd22104dc5c95d9ca | |
parent | clear output follow mode flag on search (#12791) (diff) | |
parent | Add project sync to job cancel chain (diff) | |
download | awx-15964dc3959472950db23ed6463c7f4e1978192c.tar.xz awx-15964dc3959472950db23ed6463c7f4e1978192c.zip |
Merge pull request #11745 from AlanCoding/cancel_rework_no_close
Close database connections while processing job output
-rw-r--r-- | awx/main/dispatch/control.py | 10 | ||||
-rw-r--r-- | awx/main/dispatch/worker/base.py | 13 | ||||
-rw-r--r-- | awx/main/management/commands/run_dispatcher.py | 22 | ||||
-rw-r--r-- | awx/main/models/mixins.py | 5 | ||||
-rw-r--r-- | awx/main/models/unified_jobs.py | 69 | ||||
-rw-r--r-- | awx/main/models/workflow.py | 5 | ||||
-rw-r--r-- | awx/main/tasks/callback.py | 27 | ||||
-rw-r--r-- | awx/main/tasks/jobs.py | 5 | ||||
-rw-r--r-- | awx/main/tasks/receptor.py | 48 | ||||
-rw-r--r-- | awx/main/tasks/signals.py | 8 | ||||
-rw-r--r-- | awx/main/tests/functional/test_dispatch.py | 2 | ||||
-rw-r--r-- | awx/main/tests/unit/models/test_unified_job_unit.py | 13 | ||||
-rw-r--r-- | awx/main/utils/handlers.py | 2 | ||||
-rw-r--r-- | docs/ansible_runner_integration.md | 2 |
14 files changed, 141 insertions, 90 deletions
diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index cb7fabda94..e6647170da 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -37,18 +37,24 @@ class Control(object): def running(self, *args, **kwargs): return self.control_with_reply('running', *args, **kwargs) + def cancel(self, task_ids, *args, **kwargs): + return self.control_with_reply('cancel', *args, extra_data={'task_ids': task_ids}, **kwargs) + @classmethod def generate_reply_queue_name(cls): return f"reply_to_{str(uuid.uuid4()).replace('-','_')}" - def control_with_reply(self, command, timeout=5): + def control_with_reply(self, command, timeout=5, extra_data=None): logger.warning('checking {} {} for {}'.format(self.service, command, self.queuename)) reply_queue = Control.generate_reply_queue_name() self.result = None with pg_bus_conn(new_connection=True) as conn: conn.listen(reply_queue) - conn.notify(self.queuename, json.dumps({'control': command, 'reply_to': reply_queue})) + send_data = {'control': command, 'reply_to': reply_queue} + if extra_data: + send_data.update(extra_data) + conn.notify(self.queuename, json.dumps(send_data)) for reply in conn.events(select_timeout=timeout, yield_timeouts=True): if reply is None: diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index b30f7ec17a..ff05d9e846 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -63,7 +63,7 @@ class AWXConsumerBase(object): def control(self, body): logger.warning(f'Received control signal:\n{body}') control = body.get('control') - if control in ('status', 'running'): + if control in ('status', 'running', 'cancel'): reply_queue = body['reply_to'] if control == 'status': msg = '\n'.join([self.listening_on, self.pool.debug()]) @@ -72,6 +72,17 @@ class AWXConsumerBase(object): for worker in self.pool.workers: worker.calculate_managed_tasks() msg.extend(worker.managed_tasks.keys()) + elif control == 'cancel': + msg = [] + task_ids = set(body['task_ids']) + for worker in self.pool.workers: + task = worker.current_task + if task and task['uuid'] in task_ids: + logger.warn(f'Sending SIGTERM to task id={task["uuid"]}, task={task.get("task")}, args={task.get("args")}') + os.kill(worker.pid, signal.SIGTERM) + msg.append(task['uuid']) + if task_ids and not msg: + logger.info(f'Could not locate running tasks to cancel with ids={task_ids}') with pg_bus_conn() as conn: conn.notify(reply_queue, json.dumps(msg)) diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index 2fc35a75d2..74a8ac3f94 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -1,6 +1,7 @@ # Copyright (c) 2015 Ansible, Inc. # All Rights Reserved. import logging +import yaml from django.conf import settings from django.core.cache import cache as django_cache @@ -30,7 +31,16 @@ class Command(BaseCommand): '--reload', dest='reload', action='store_true', - help=('cause the dispatcher to recycle all of its worker processes;' 'running jobs will run to completion first'), + help=('cause the dispatcher to recycle all of its worker processes; running jobs will run to completion first'), + ) + parser.add_argument( + '--cancel', + dest='cancel', + help=( + 'Cancel a particular task id. Takes either a single id string, or a JSON list of multiple ids. ' + 'Can take in output from the --running argument as input to cancel all tasks. ' + 'Only running tasks can be canceled, queued tasks must be started before they can be canceled.' + ), ) def handle(self, *arg, **options): @@ -42,6 +52,16 @@ class Command(BaseCommand): return if options.get('reload'): return Control('dispatcher').control({'control': 'reload'}) + if options.get('cancel'): + cancel_str = options.get('cancel') + try: + cancel_data = yaml.safe_load(cancel_str) + except Exception: + cancel_data = [cancel_str] + if not isinstance(cancel_data, list): + cancel_data = [cancel_str] + print(Control('dispatcher').cancel(cancel_data)) + return # It's important to close these because we're _about_ to fork, and we # don't want the forked processes to inherit the open sockets diff --git a/awx/main/models/mixins.py b/awx/main/models/mixins.py index 34e05fa818..0e38d7288c 100644 --- a/awx/main/models/mixins.py +++ b/awx/main/models/mixins.py @@ -412,6 +412,11 @@ class TaskManagerJobMixin(TaskManagerUnifiedJobMixin): class Meta: abstract = True + def get_jobs_fail_chain(self): + if self.project_update_id: + return [self.project_update] + return [] + class TaskManagerUpdateOnLaunchMixin(TaskManagerUnifiedJobMixin): class Meta: diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index b49bcf0242..5ef8fed0f7 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1396,23 +1396,6 @@ class UnifiedJob( return True @property - def actually_running(self): - # returns True if the job is running in the appropriate dispatcher process - running = False - if all([self.status == 'running', self.celery_task_id, self.execution_node]): - # If the job is marked as running, but the dispatcher - # doesn't know about it (or the dispatcher doesn't reply), - # then cancel the job - timeout = 5 - try: - running = self.celery_task_id in ControlDispatcher('dispatcher', self.controller_node or self.execution_node).running(timeout=timeout) - except socket.timeout: - logger.error('could not reach dispatcher on {} within {}s'.format(self.execution_node, timeout)) - except Exception: - logger.exception("error encountered when checking task status") - return running - - @property def can_cancel(self): return bool(self.status in CAN_CANCEL) @@ -1421,27 +1404,61 @@ class UnifiedJob( return 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (self.model_to_str(), self.name, self.id) return None + def fallback_cancel(self): + if not self.celery_task_id: + self.refresh_from_db(fields=['celery_task_id']) + self.cancel_dispatcher_process() + + def cancel_dispatcher_process(self): + """Returns True if dispatcher running this job acknowledged request and sent SIGTERM""" + if not self.celery_task_id: + return + canceled = [] + try: + # Use control and reply mechanism to cancel and obtain confirmation + timeout = 5 + canceled = ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id]) + except socket.timeout: + logger.error(f'could not reach dispatcher on {self.controller_node} within {timeout}s') + except Exception: + logger.exception("error encountered when checking task status") + return bool(self.celery_task_id in canceled) # True or False, whether confirmation was obtained + def cancel(self, job_explanation=None, is_chain=False): if self.can_cancel: if not is_chain: for x in self.get_jobs_fail_chain(): x.cancel(job_explanation=self._build_job_explanation(), is_chain=True) + cancel_fields = [] if not self.cancel_flag: self.cancel_flag = True self.start_args = '' # blank field to remove encrypted passwords - cancel_fields = ['cancel_flag', 'start_args'] - if self.status in ('pending', 'waiting', 'new'): - self.status = 'canceled' - cancel_fields.append('status') - if self.status == 'running' and not self.actually_running: - self.status = 'canceled' - cancel_fields.append('status') + cancel_fields.extend(['cancel_flag', 'start_args']) + connection.on_commit(lambda: self.websocket_emit_status("canceled")) + if job_explanation is not None: self.job_explanation = job_explanation cancel_fields.append('job_explanation') - self.save(update_fields=cancel_fields) - self.websocket_emit_status("canceled") + + controller_notified = False + if self.celery_task_id: + controller_notified = self.cancel_dispatcher_process() + + else: + # Avoid race condition where we have stale model from pending state but job has already started, + # its checking signal but not cancel_flag, so re-send signal after this database commit + connection.on_commit(self.fallback_cancel) + + # If a SIGTERM signal was sent to the control process, and acked by the dispatcher + # then we want to let its own cleanup change status, otherwise change status now + if not controller_notified: + if self.status != 'canceled': + self.status = 'canceled' + cancel_fields.append('status') + + self.save(update_fields=cancel_fields) + return self.cancel_flag @property diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index c9301f769a..4f52ade6b4 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -723,11 +723,10 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio def preferred_instance_groups(self): return [] - @property - def actually_running(self): + def cancel_dispatcher_process(self): # WorkflowJobs don't _actually_ run anything in the dispatcher, so # there's no point in asking the dispatcher if it knows about this task - return self.status == 'running' + return True class WorkflowApprovalTemplate(UnifiedJobTemplate, RelatedJobsMixin): diff --git a/awx/main/tasks/callback.py b/awx/main/tasks/callback.py index a4a02421a0..e131f368a5 100644 --- a/awx/main/tasks/callback.py +++ b/awx/main/tasks/callback.py @@ -6,17 +6,16 @@ import os import stat # Django -from django.utils.timezone import now from django.conf import settings from django_guid import get_guid from django.utils.functional import cached_property +from django.db import connections # AWX from awx.main.redact import UriCleaner from awx.main.constants import MINIMAL_EVENTS, ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE from awx.main.utils.update_model import update_model from awx.main.queue import CallbackQueueDispatcher -from awx.main.tasks.signals import signal_callback logger = logging.getLogger('awx.main.tasks.callback') @@ -175,28 +174,6 @@ class RunnerCallback: return False - def cancel_callback(self): - """ - Ansible runner callback to tell the job when/if it is canceled - """ - unified_job_id = self.instance.pk - if signal_callback(): - return True - try: - self.instance = self.update_model(unified_job_id) - except Exception: - logger.exception(f'Encountered error during cancel check for {unified_job_id}, canceling now') - return True - if not self.instance: - logger.error('unified job {} was deleted while running, canceling'.format(unified_job_id)) - return True - if self.instance.cancel_flag or self.instance.status == 'canceled': - cancel_wait = (now() - self.instance.modified).seconds if self.instance.modified else 0 - if cancel_wait > 5: - logger.warning('Request to cancel {} took {} seconds to complete.'.format(self.instance.log_format, cancel_wait)) - return True - return False - def finished_callback(self, runner_obj): """ Ansible runner callback triggered on finished run @@ -227,6 +204,8 @@ class RunnerCallback: with disable_activity_stream(): self.instance = self.update_model(self.instance.pk, job_args=json.dumps(runner_config.command), job_cwd=runner_config.cwd, job_env=job_env) + # We opened a connection just for that save, close it here now + connections.close_all() elif status_data['status'] == 'failed': # For encrypted ssh_key_data, ansible-runner worker will open and write the # ssh_key_data to a named pipe. Then, once the podman container starts, ssh-agent will diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 30b0377015..35d8edf33b 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -487,6 +487,7 @@ class BaseTask(object): self.instance.log_lifecycle("preparing_playbook") if self.instance.cancel_flag or signal_callback(): self.instance = self.update_model(self.instance.pk, status='canceled') + if self.instance.status != 'running': # Stop the task chain and prevent starting the job if it has # already been canceled. @@ -589,7 +590,7 @@ class BaseTask(object): event_handler=self.runner_callback.event_handler, finished_callback=self.runner_callback.finished_callback, status_handler=self.runner_callback.status_handler, - cancel_callback=self.runner_callback.cancel_callback, + cancel_callback=signal_callback, **params, ) else: @@ -1626,7 +1627,7 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask): handler = SpecialInventoryHandler( self.runner_callback.event_handler, - self.runner_callback.cancel_callback, + signal_callback, verbosity=inventory_update.verbosity, job_timeout=self.get_instance_timeout(self.instance), start_time=inventory_update.started, diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 54ff67f6cb..0350a96836 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -12,6 +12,7 @@ import yaml # Django from django.conf import settings +from django.db import connections # Runner import ansible_runner @@ -25,6 +26,7 @@ from awx.main.utils.common import ( cleanup_new_process, ) from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER +from awx.main.tasks.signals import signal_state, signal_callback, SignalExit # Receptorctl from receptorctl.socket_interface import ReceptorControl @@ -335,24 +337,32 @@ class AWXReceptorJob: shutil.rmtree(artifact_dir) resultsock, resultfile = receptor_ctl.get_work_results(self.unit_id, return_socket=True, return_sockfile=True) - # Both "processor" and "cancel_watcher" are spawned in separate threads. - # We wait for the first one to return. If cancel_watcher returns first, - # we yank the socket out from underneath the processor, which will cause it - # to exit. A reference to the processor_future is passed into the cancel_watcher_future, - # Which exits if the job has finished normally. The context manager ensures we do not - # leave any threads laying around. - with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + + connections.close_all() + + # "processor" and the main thread will be separate threads. + # If a cancel happens, the main thread will encounter an exception, in which case + # we yank the socket out from underneath the processor, which will cause it to exit. + # The ThreadPoolExecutor context manager ensures we do not leave any threads laying around. + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: processor_future = executor.submit(self.processor, resultfile) - cancel_watcher_future = executor.submit(self.cancel_watcher, processor_future) - futures = [processor_future, cancel_watcher_future] - first_future = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) - res = list(first_future.done)[0].result() - if res.status == 'canceled': + try: + signal_state.raise_exception = True + # address race condition where SIGTERM was issued after this dispatcher task started + if signal_callback(): + raise SignalExit() + res = processor_future.result() + except SignalExit: receptor_ctl.simple_command(f"work cancel {self.unit_id}") resultsock.shutdown(socket.SHUT_RDWR) resultfile.close() - elif res.status == 'error': + result = namedtuple('result', ['status', 'rc']) + res = result('canceled', 1) + finally: + signal_state.raise_exception = False + + if res.status == 'error': # If ansible-runner ran, but an error occured at runtime, the traceback information # is saved via the status_handler passed in to the processor. if 'result_traceback' in self.task.runner_callback.extra_update_fields: @@ -446,18 +456,6 @@ class AWXReceptorJob: return 'local' return 'ansible-runner' - @cleanup_new_process - def cancel_watcher(self, processor_future): - while True: - if processor_future.done(): - return processor_future.result() - - if self.task.runner_callback.cancel_callback(): - result = namedtuple('result', ['status', 'rc']) - return result('canceled', 1) - - time.sleep(1) - @property def pod_definition(self): ee = self.task.instance.execution_environment diff --git a/awx/main/tasks/signals.py b/awx/main/tasks/signals.py index 6f0c69ca4c..95610548b9 100644 --- a/awx/main/tasks/signals.py +++ b/awx/main/tasks/signals.py @@ -9,12 +9,17 @@ logger = logging.getLogger('awx.main.tasks.signals') __all__ = ['with_signal_handling', 'signal_callback'] +class SignalExit(Exception): + pass + + class SignalState: def reset(self): self.sigterm_flag = False self.is_active = False self.original_sigterm = None self.original_sigint = None + self.raise_exception = False def __init__(self): self.reset() @@ -22,6 +27,9 @@ class SignalState: def set_flag(self, *args): """Method to pass into the python signal.signal method to receive signals""" self.sigterm_flag = True + if self.raise_exception: + self.raise_exception = False # so it is not raised a second time in error handling + raise SignalExit() def connect_signals(self): self.original_sigterm = signal.getsignal(signal.SIGTERM) diff --git a/awx/main/tests/functional/test_dispatch.py b/awx/main/tests/functional/test_dispatch.py index f3c9afe58b..86e90e50a0 100644 --- a/awx/main/tests/functional/test_dispatch.py +++ b/awx/main/tests/functional/test_dispatch.py @@ -244,7 +244,7 @@ class TestAutoScaling: assert not self.pool.should_grow alive_pid = self.pool.workers[1].pid self.pool.workers[0].process.terminate() - time.sleep(1) # wait a moment for sigterm + time.sleep(2) # wait a moment for sigterm # clean up and the dead worker self.pool.cleanup() diff --git a/awx/main/tests/unit/models/test_unified_job_unit.py b/awx/main/tests/unit/models/test_unified_job_unit.py index 592c457b0c..b0567500fc 100644 --- a/awx/main/tests/unit/models/test_unified_job_unit.py +++ b/awx/main/tests/unit/models/test_unified_job_unit.py @@ -22,6 +22,10 @@ def test_unified_job_workflow_attributes(): assert job.workflow_job_id == 1 +def mock_on_commit(f): + f() + + @pytest.fixture def unified_job(mocker): mocker.patch.object(UnifiedJob, 'can_cancel', return_value=True) @@ -30,12 +34,14 @@ def unified_job(mocker): j.cancel_flag = None j.save = mocker.MagicMock() j.websocket_emit_status = mocker.MagicMock() + j.fallback_cancel = mocker.MagicMock() return j def test_cancel(unified_job): - unified_job.cancel() + with mock.patch('awx.main.models.unified_jobs.connection.on_commit', wraps=mock_on_commit): + unified_job.cancel() assert unified_job.cancel_flag is True assert unified_job.status == 'canceled' @@ -50,10 +56,11 @@ def test_cancel(unified_job): def test_cancel_job_explanation(unified_job): job_explanation = 'giggity giggity' - unified_job.cancel(job_explanation=job_explanation) + with mock.patch('awx.main.models.unified_jobs.connection.on_commit'): + unified_job.cancel(job_explanation=job_explanation) assert unified_job.job_explanation == job_explanation - unified_job.save.assert_called_with(update_fields=['cancel_flag', 'start_args', 'status', 'job_explanation']) + unified_job.save.assert_called_with(update_fields=['cancel_flag', 'start_args', 'job_explanation', 'status']) def test_organization_copy_to_jobs(): diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index c6a2b3b596..636de46cdf 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -76,7 +76,7 @@ class SpecialInventoryHandler(logging.Handler): def emit(self, record): # check cancel and timeout status regardless of log level this_time = now() - if (this_time - self.last_check).total_seconds() > 0.5: # cancel callback is expensive + if (this_time - self.last_check).total_seconds() > 0.1: self.last_check = this_time if self.cancel_callback(): raise PostRunError('Inventory update has been canceled', status='canceled') diff --git a/docs/ansible_runner_integration.md b/docs/ansible_runner_integration.md index 5475c6877d..e7ef0df887 100644 --- a/docs/ansible_runner_integration.md +++ b/docs/ansible_runner_integration.md @@ -8,7 +8,7 @@ In AWX, a task of a certain job type is kicked off (_i.e._, RunJob, RunProjectUp The callbacks and handlers are: * `event_handler`: Called each time a new event is created in `ansible-runner`. AWX will dispatch the event to `redis` to be processed on the other end by the callback receiver. -* `cancel_callback`: Called periodically by `ansible-runner`; this is so that AWX can inform `ansible-runner` if the job should be canceled or not. +* `cancel_callback`: Called periodically by `ansible-runner`; this is so that AWX can inform `ansible-runner` if the job should be canceled or not. Only applies for system jobs now, and other jobs are canceled via receptor. * `finished_callback`: Called once by `ansible-runner` to denote that the process that was asked to run is finished. AWX will construct the special control event, `EOF`, with the associated total number of events that it observed. * `status_handler`: Called by `ansible-runner` as the process transitions state internally. AWX uses the `starting` status to know that `ansible-runner` has made all of its decisions around the process that it will launch. AWX gathers and associates these decisions with the Job for historical observation. |