summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlan Rominger <arominge@redhat.com>2022-10-20 21:02:54 +0200
committerGitHub <noreply@github.com>2022-10-20 21:02:54 +0200
commit192f45bbd063494e52166f4798d45889275a16bd (patch)
treeadebb40044199e3b0a18000451f2a774c11fd8f6
parentMerge pull request #13073 from AlanCoding/max_conn_deadlock (diff)
downloadawx-192f45bbd063494e52166f4798d45889275a16bd.tar.xz
awx-192f45bbd063494e52166f4798d45889275a16bd.zip
Make canceling view non-atomic to fix 500 errors with job bursts (#13072)
* Make canceling view non-atomic to fix 500 errors with job bursts * Update test calls for cancel method changes
-rw-r--r--awx/api/generics.py20
-rw-r--r--awx/api/views/__init__.py69
-rw-r--r--awx/main/dispatch/control.py6
-rw-r--r--awx/main/models/unified_jobs.py16
-rw-r--r--awx/main/tests/unit/models/test_unified_job_unit.py10
5 files changed, 50 insertions, 71 deletions
diff --git a/awx/api/generics.py b/awx/api/generics.py
index 44d39c6e43..f5178f7fb2 100644
--- a/awx/api/generics.py
+++ b/awx/api/generics.py
@@ -13,7 +13,7 @@ from django.contrib.auth import views as auth_views
from django.contrib.contenttypes.models import ContentType
from django.core.cache import cache
from django.core.exceptions import FieldDoesNotExist
-from django.db import connection
+from django.db import connection, transaction
from django.db.models.fields.related import OneToOneRel
from django.http import QueryDict
from django.shortcuts import get_object_or_404
@@ -64,6 +64,7 @@ __all__ = [
'ParentMixin',
'SubListAttachDetachAPIView',
'CopyAPIView',
+ 'GenericCancelView',
'BaseUsersList',
]
@@ -985,6 +986,23 @@ class CopyAPIView(GenericAPIView):
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
+class GenericCancelView(RetrieveAPIView):
+ # In subclass set model, serializer_class
+ obj_permission_type = 'cancel'
+
+ @transaction.non_atomic_requests
+ def dispatch(self, *args, **kwargs):
+ return super(GenericCancelView, self).dispatch(*args, **kwargs)
+
+ def post(self, request, *args, **kwargs):
+ obj = self.get_object()
+ if obj.can_cancel:
+ obj.cancel()
+ return Response(status=status.HTTP_202_ACCEPTED)
+ else:
+ return self.http_method_not_allowed(request, *args, **kwargs)
+
+
class BaseUsersList(SubListCreateAttachDetachAPIView):
def post(self, request, *args, **kwargs):
ret = super(BaseUsersList, self).post(request, *args, **kwargs)
diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py
index 94b27e292e..15f8359384 100644
--- a/awx/api/views/__init__.py
+++ b/awx/api/views/__init__.py
@@ -69,6 +69,7 @@ from awx.api.generics import (
APIView,
BaseUsersList,
CopyAPIView,
+ GenericCancelView,
GenericAPIView,
ListAPIView,
ListCreateAPIView,
@@ -976,20 +977,11 @@ class SystemJobEventsList(SubListAPIView):
return job.get_event_queryset()
-class ProjectUpdateCancel(RetrieveAPIView):
+class ProjectUpdateCancel(GenericCancelView):
model = models.ProjectUpdate
- obj_permission_type = 'cancel'
serializer_class = serializers.ProjectUpdateCancelSerializer
- def post(self, request, *args, **kwargs):
- obj = self.get_object()
- if obj.can_cancel:
- obj.cancel()
- return Response(status=status.HTTP_202_ACCEPTED)
- else:
- return self.http_method_not_allowed(request, *args, **kwargs)
-
class ProjectUpdateNotificationsList(SubListAPIView):
@@ -2262,20 +2254,11 @@ class InventoryUpdateCredentialsList(SubListAPIView):
relationship = 'credentials'
-class InventoryUpdateCancel(RetrieveAPIView):
+class InventoryUpdateCancel(GenericCancelView):
model = models.InventoryUpdate
- obj_permission_type = 'cancel'
serializer_class = serializers.InventoryUpdateCancelSerializer
- def post(self, request, *args, **kwargs):
- obj = self.get_object()
- if obj.can_cancel:
- obj.cancel()
- return Response(status=status.HTTP_202_ACCEPTED)
- else:
- return self.http_method_not_allowed(request, *args, **kwargs)
-
class InventoryUpdateNotificationsList(SubListAPIView):
@@ -3352,20 +3335,15 @@ class WorkflowJobWorkflowNodesList(SubListAPIView):
return super(WorkflowJobWorkflowNodesList, self).get_queryset().order_by('id')
-class WorkflowJobCancel(RetrieveAPIView):
+class WorkflowJobCancel(GenericCancelView):
model = models.WorkflowJob
- obj_permission_type = 'cancel'
serializer_class = serializers.WorkflowJobCancelSerializer
def post(self, request, *args, **kwargs):
- obj = self.get_object()
- if obj.can_cancel:
- obj.cancel()
- ScheduleWorkflowManager().schedule()
- return Response(status=status.HTTP_202_ACCEPTED)
- else:
- return self.http_method_not_allowed(request, *args, **kwargs)
+ r = super().post(request, *args, **kwargs)
+ ScheduleWorkflowManager().schedule()
+ return r
class WorkflowJobNotificationsList(SubListAPIView):
@@ -3521,20 +3499,11 @@ class JobActivityStreamList(SubListAPIView):
search_fields = ('changes',)
-class JobCancel(RetrieveAPIView):
+class JobCancel(GenericCancelView):
model = models.Job
- obj_permission_type = 'cancel'
serializer_class = serializers.JobCancelSerializer
- def post(self, request, *args, **kwargs):
- obj = self.get_object()
- if obj.can_cancel:
- obj.cancel()
- return Response(status=status.HTTP_202_ACCEPTED)
- else:
- return self.http_method_not_allowed(request, *args, **kwargs)
-
class JobRelaunch(RetrieveAPIView):
@@ -4005,20 +3974,11 @@ class AdHocCommandDetail(UnifiedJobDeletionMixin, RetrieveDestroyAPIView):
serializer_class = serializers.AdHocCommandDetailSerializer
-class AdHocCommandCancel(RetrieveAPIView):
+class AdHocCommandCancel(GenericCancelView):
model = models.AdHocCommand
- obj_permission_type = 'cancel'
serializer_class = serializers.AdHocCommandCancelSerializer
- def post(self, request, *args, **kwargs):
- obj = self.get_object()
- if obj.can_cancel:
- obj.cancel()
- return Response(status=status.HTTP_202_ACCEPTED)
- else:
- return self.http_method_not_allowed(request, *args, **kwargs)
-
class AdHocCommandRelaunch(GenericAPIView):
@@ -4153,20 +4113,11 @@ class SystemJobDetail(UnifiedJobDeletionMixin, RetrieveDestroyAPIView):
serializer_class = serializers.SystemJobSerializer
-class SystemJobCancel(RetrieveAPIView):
+class SystemJobCancel(GenericCancelView):
model = models.SystemJob
- obj_permission_type = 'cancel'
serializer_class = serializers.SystemJobCancelSerializer
- def post(self, request, *args, **kwargs):
- obj = self.get_object()
- if obj.can_cancel:
- obj.cancel()
- return Response(status=status.HTTP_202_ACCEPTED)
- else:
- return self.http_method_not_allowed(request, *args, **kwargs)
-
class SystemJobNotificationsList(SubListAPIView):
diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py
index e6647170da..2807bdbe19 100644
--- a/awx/main/dispatch/control.py
+++ b/awx/main/dispatch/control.py
@@ -3,6 +3,7 @@ import uuid
import json
from django.conf import settings
+from django.db import connection
import redis
from awx.main.dispatch import get_local_queuename
@@ -49,7 +50,10 @@ class Control(object):
reply_queue = Control.generate_reply_queue_name()
self.result = None
- with pg_bus_conn(new_connection=True) as conn:
+ if not connection.get_autocommit():
+ raise RuntimeError('Control-with-reply messages can only be done in autocommit mode')
+
+ with pg_bus_conn() as conn:
conn.listen(reply_queue)
send_data = {'control': command, 'reply_to': reply_queue}
if extra_data:
diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py
index a8ac64b2cc..e711b443d0 100644
--- a/awx/main/models/unified_jobs.py
+++ b/awx/main/models/unified_jobs.py
@@ -1465,23 +1465,23 @@ class UnifiedJob(
self.job_explanation = job_explanation
cancel_fields.append('job_explanation')
+ # Important to save here before sending cancel signal to dispatcher to cancel because
+ # the job control process will use the cancel_flag to distinguish a shutdown from a cancel
+ self.save(update_fields=cancel_fields)
+
controller_notified = False
if self.celery_task_id:
controller_notified = self.cancel_dispatcher_process()
- else:
- # Avoid race condition where we have stale model from pending state but job has already started,
- # its checking signal but not cancel_flag, so re-send signal after this database commit
- connection.on_commit(self.fallback_cancel)
-
# If a SIGTERM signal was sent to the control process, and acked by the dispatcher
# then we want to let its own cleanup change status, otherwise change status now
if not controller_notified:
if self.status != 'canceled':
self.status = 'canceled'
- cancel_fields.append('status')
-
- self.save(update_fields=cancel_fields)
+ self.save(update_fields=['status'])
+ # Avoid race condition where we have stale model from pending state but job has already started,
+ # its checking signal but not cancel_flag, so re-send signal after updating cancel fields
+ self.fallback_cancel()
return self.cancel_flag
diff --git a/awx/main/tests/unit/models/test_unified_job_unit.py b/awx/main/tests/unit/models/test_unified_job_unit.py
index b0567500fc..2fc89813d6 100644
--- a/awx/main/tests/unit/models/test_unified_job_unit.py
+++ b/awx/main/tests/unit/models/test_unified_job_unit.py
@@ -50,7 +50,10 @@ def test_cancel(unified_job):
# Some more thought may want to go into only emitting canceled if/when the job record
# status is changed to canceled. Unlike, currently, where it's emitted unconditionally.
unified_job.websocket_emit_status.assert_called_with("canceled")
- unified_job.save.assert_called_with(update_fields=['cancel_flag', 'start_args', 'status'])
+ assert [(args, kwargs) for args, kwargs in unified_job.save.call_args_list] == [
+ ((), {'update_fields': ['cancel_flag', 'start_args']}),
+ ((), {'update_fields': ['status']}),
+ ]
def test_cancel_job_explanation(unified_job):
@@ -60,7 +63,10 @@ def test_cancel_job_explanation(unified_job):
unified_job.cancel(job_explanation=job_explanation)
assert unified_job.job_explanation == job_explanation
- unified_job.save.assert_called_with(update_fields=['cancel_flag', 'start_args', 'job_explanation', 'status'])
+ assert [(args, kwargs) for args, kwargs in unified_job.save.call_args_list] == [
+ ((), {'update_fields': ['cancel_flag', 'start_args', 'job_explanation']}),
+ ((), {'update_fields': ['status']}),
+ ]
def test_organization_copy_to_jobs():