diff options
author | Alan Rominger <arominge@redhat.com> | 2022-10-20 21:02:54 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-20 21:02:54 +0200 |
commit | 192f45bbd063494e52166f4798d45889275a16bd (patch) | |
tree | adebb40044199e3b0a18000451f2a774c11fd8f6 | |
parent | Merge pull request #13073 from AlanCoding/max_conn_deadlock (diff) | |
download | awx-192f45bbd063494e52166f4798d45889275a16bd.tar.xz awx-192f45bbd063494e52166f4798d45889275a16bd.zip |
Make canceling view non-atomic to fix 500 errors with job bursts (#13072)
* Make canceling view non-atomic to fix 500 errors with job bursts
* Update test calls for cancel method changes
-rw-r--r-- | awx/api/generics.py | 20 | ||||
-rw-r--r-- | awx/api/views/__init__.py | 69 | ||||
-rw-r--r-- | awx/main/dispatch/control.py | 6 | ||||
-rw-r--r-- | awx/main/models/unified_jobs.py | 16 | ||||
-rw-r--r-- | awx/main/tests/unit/models/test_unified_job_unit.py | 10 |
5 files changed, 50 insertions, 71 deletions
diff --git a/awx/api/generics.py b/awx/api/generics.py index 44d39c6e43..f5178f7fb2 100644 --- a/awx/api/generics.py +++ b/awx/api/generics.py @@ -13,7 +13,7 @@ from django.contrib.auth import views as auth_views from django.contrib.contenttypes.models import ContentType from django.core.cache import cache from django.core.exceptions import FieldDoesNotExist -from django.db import connection +from django.db import connection, transaction from django.db.models.fields.related import OneToOneRel from django.http import QueryDict from django.shortcuts import get_object_or_404 @@ -64,6 +64,7 @@ __all__ = [ 'ParentMixin', 'SubListAttachDetachAPIView', 'CopyAPIView', + 'GenericCancelView', 'BaseUsersList', ] @@ -985,6 +986,23 @@ class CopyAPIView(GenericAPIView): return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) +class GenericCancelView(RetrieveAPIView): + # In subclass set model, serializer_class + obj_permission_type = 'cancel' + + @transaction.non_atomic_requests + def dispatch(self, *args, **kwargs): + return super(GenericCancelView, self).dispatch(*args, **kwargs) + + def post(self, request, *args, **kwargs): + obj = self.get_object() + if obj.can_cancel: + obj.cancel() + return Response(status=status.HTTP_202_ACCEPTED) + else: + return self.http_method_not_allowed(request, *args, **kwargs) + + class BaseUsersList(SubListCreateAttachDetachAPIView): def post(self, request, *args, **kwargs): ret = super(BaseUsersList, self).post(request, *args, **kwargs) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 94b27e292e..15f8359384 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -69,6 +69,7 @@ from awx.api.generics import ( APIView, BaseUsersList, CopyAPIView, + GenericCancelView, GenericAPIView, ListAPIView, ListCreateAPIView, @@ -976,20 +977,11 @@ class SystemJobEventsList(SubListAPIView): return job.get_event_queryset() -class ProjectUpdateCancel(RetrieveAPIView): +class ProjectUpdateCancel(GenericCancelView): model = models.ProjectUpdate - obj_permission_type = 'cancel' serializer_class = serializers.ProjectUpdateCancelSerializer - def post(self, request, *args, **kwargs): - obj = self.get_object() - if obj.can_cancel: - obj.cancel() - return Response(status=status.HTTP_202_ACCEPTED) - else: - return self.http_method_not_allowed(request, *args, **kwargs) - class ProjectUpdateNotificationsList(SubListAPIView): @@ -2262,20 +2254,11 @@ class InventoryUpdateCredentialsList(SubListAPIView): relationship = 'credentials' -class InventoryUpdateCancel(RetrieveAPIView): +class InventoryUpdateCancel(GenericCancelView): model = models.InventoryUpdate - obj_permission_type = 'cancel' serializer_class = serializers.InventoryUpdateCancelSerializer - def post(self, request, *args, **kwargs): - obj = self.get_object() - if obj.can_cancel: - obj.cancel() - return Response(status=status.HTTP_202_ACCEPTED) - else: - return self.http_method_not_allowed(request, *args, **kwargs) - class InventoryUpdateNotificationsList(SubListAPIView): @@ -3352,20 +3335,15 @@ class WorkflowJobWorkflowNodesList(SubListAPIView): return super(WorkflowJobWorkflowNodesList, self).get_queryset().order_by('id') -class WorkflowJobCancel(RetrieveAPIView): +class WorkflowJobCancel(GenericCancelView): model = models.WorkflowJob - obj_permission_type = 'cancel' serializer_class = serializers.WorkflowJobCancelSerializer def post(self, request, *args, **kwargs): - obj = self.get_object() - if obj.can_cancel: - obj.cancel() - ScheduleWorkflowManager().schedule() - return Response(status=status.HTTP_202_ACCEPTED) - else: - return self.http_method_not_allowed(request, *args, **kwargs) + r = super().post(request, *args, **kwargs) + ScheduleWorkflowManager().schedule() + return r class WorkflowJobNotificationsList(SubListAPIView): @@ -3521,20 +3499,11 @@ class JobActivityStreamList(SubListAPIView): search_fields = ('changes',) -class JobCancel(RetrieveAPIView): +class JobCancel(GenericCancelView): model = models.Job - obj_permission_type = 'cancel' serializer_class = serializers.JobCancelSerializer - def post(self, request, *args, **kwargs): - obj = self.get_object() - if obj.can_cancel: - obj.cancel() - return Response(status=status.HTTP_202_ACCEPTED) - else: - return self.http_method_not_allowed(request, *args, **kwargs) - class JobRelaunch(RetrieveAPIView): @@ -4005,20 +3974,11 @@ class AdHocCommandDetail(UnifiedJobDeletionMixin, RetrieveDestroyAPIView): serializer_class = serializers.AdHocCommandDetailSerializer -class AdHocCommandCancel(RetrieveAPIView): +class AdHocCommandCancel(GenericCancelView): model = models.AdHocCommand - obj_permission_type = 'cancel' serializer_class = serializers.AdHocCommandCancelSerializer - def post(self, request, *args, **kwargs): - obj = self.get_object() - if obj.can_cancel: - obj.cancel() - return Response(status=status.HTTP_202_ACCEPTED) - else: - return self.http_method_not_allowed(request, *args, **kwargs) - class AdHocCommandRelaunch(GenericAPIView): @@ -4153,20 +4113,11 @@ class SystemJobDetail(UnifiedJobDeletionMixin, RetrieveDestroyAPIView): serializer_class = serializers.SystemJobSerializer -class SystemJobCancel(RetrieveAPIView): +class SystemJobCancel(GenericCancelView): model = models.SystemJob - obj_permission_type = 'cancel' serializer_class = serializers.SystemJobCancelSerializer - def post(self, request, *args, **kwargs): - obj = self.get_object() - if obj.can_cancel: - obj.cancel() - return Response(status=status.HTTP_202_ACCEPTED) - else: - return self.http_method_not_allowed(request, *args, **kwargs) - class SystemJobNotificationsList(SubListAPIView): diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index e6647170da..2807bdbe19 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -3,6 +3,7 @@ import uuid import json from django.conf import settings +from django.db import connection import redis from awx.main.dispatch import get_local_queuename @@ -49,7 +50,10 @@ class Control(object): reply_queue = Control.generate_reply_queue_name() self.result = None - with pg_bus_conn(new_connection=True) as conn: + if not connection.get_autocommit(): + raise RuntimeError('Control-with-reply messages can only be done in autocommit mode') + + with pg_bus_conn() as conn: conn.listen(reply_queue) send_data = {'control': command, 'reply_to': reply_queue} if extra_data: diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index a8ac64b2cc..e711b443d0 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1465,23 +1465,23 @@ class UnifiedJob( self.job_explanation = job_explanation cancel_fields.append('job_explanation') + # Important to save here before sending cancel signal to dispatcher to cancel because + # the job control process will use the cancel_flag to distinguish a shutdown from a cancel + self.save(update_fields=cancel_fields) + controller_notified = False if self.celery_task_id: controller_notified = self.cancel_dispatcher_process() - else: - # Avoid race condition where we have stale model from pending state but job has already started, - # its checking signal but not cancel_flag, so re-send signal after this database commit - connection.on_commit(self.fallback_cancel) - # If a SIGTERM signal was sent to the control process, and acked by the dispatcher # then we want to let its own cleanup change status, otherwise change status now if not controller_notified: if self.status != 'canceled': self.status = 'canceled' - cancel_fields.append('status') - - self.save(update_fields=cancel_fields) + self.save(update_fields=['status']) + # Avoid race condition where we have stale model from pending state but job has already started, + # its checking signal but not cancel_flag, so re-send signal after updating cancel fields + self.fallback_cancel() return self.cancel_flag diff --git a/awx/main/tests/unit/models/test_unified_job_unit.py b/awx/main/tests/unit/models/test_unified_job_unit.py index b0567500fc..2fc89813d6 100644 --- a/awx/main/tests/unit/models/test_unified_job_unit.py +++ b/awx/main/tests/unit/models/test_unified_job_unit.py @@ -50,7 +50,10 @@ def test_cancel(unified_job): # Some more thought may want to go into only emitting canceled if/when the job record # status is changed to canceled. Unlike, currently, where it's emitted unconditionally. unified_job.websocket_emit_status.assert_called_with("canceled") - unified_job.save.assert_called_with(update_fields=['cancel_flag', 'start_args', 'status']) + assert [(args, kwargs) for args, kwargs in unified_job.save.call_args_list] == [ + ((), {'update_fields': ['cancel_flag', 'start_args']}), + ((), {'update_fields': ['status']}), + ] def test_cancel_job_explanation(unified_job): @@ -60,7 +63,10 @@ def test_cancel_job_explanation(unified_job): unified_job.cancel(job_explanation=job_explanation) assert unified_job.job_explanation == job_explanation - unified_job.save.assert_called_with(update_fields=['cancel_flag', 'start_args', 'job_explanation', 'status']) + assert [(args, kwargs) for args, kwargs in unified_job.save.call_args_list] == [ + ((), {'update_fields': ['cancel_flag', 'start_args', 'job_explanation']}), + ((), {'update_fields': ['status']}), + ] def test_organization_copy_to_jobs(): |