summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRick Elrod <rick@elrod.me>2023-07-18 17:43:46 +0200
committerGitHub <noreply@github.com>2023-07-18 17:43:46 +0200
commit48edb15a03bf3cae94405559693ea14a0ee6f2fd (patch)
treef2d341bd2bde9ba00351e9962dff319a7075f008
parentChanging how associations work in awx collection (#13626) (diff)
downloadawx-48edb15a03bf3cae94405559693ea14a0ee6f2fd.tar.xz
awx-48edb15a03bf3cae94405559693ea14a0ee6f2fd.zip
Prevent Dispatcher deadlock when Redis disappears (#14249)
This fixes https://github.com/ansible/awx/issues/14245 which has more information about this issue. This change addresses both: - A clashing signal handler (registering a callback to fire when the task manager times out, and hitting that callback in cases where we didn't expect to). Make dispatcher timeout use SIGUSR1, not SIGTERM. - Metrics not being reported should not make us crash, so that is now fixed as well. Signed-off-by: Rick Elrod <rick@elrod.me> Co-authored-by: Alan Rominger <arominge@redhat.com>
-rw-r--r--awx/main/dispatch/pool.py8
-rw-r--r--awx/main/dispatch/worker/base.py16
-rw-r--r--awx/main/scheduler/task_manager.py47
3 files changed, 42 insertions, 29 deletions
diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py
index b8208012b6..16366c3e0c 100644
--- a/awx/main/dispatch/pool.py
+++ b/awx/main/dispatch/pool.py
@@ -417,16 +417,16 @@ class AutoscalePool(WorkerPool):
# the task manager to never do more work
current_task = w.current_task
if current_task and isinstance(current_task, dict):
- endings = ['tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager']
+ endings = ('tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager')
current_task_name = current_task.get('task', '')
- if any(current_task_name.endswith(e) for e in endings):
+ if current_task_name.endswith(endings):
if 'started' not in current_task:
w.managed_tasks[current_task['uuid']]['started'] = time.time()
age = time.time() - current_task['started']
w.managed_tasks[current_task['uuid']]['age'] = age
if age > self.task_manager_timeout:
- logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}')
- os.kill(w.pid, signal.SIGTERM)
+ logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGUSR1 to {w.pid}')
+ os.kill(w.pid, signal.SIGUSR1)
for m in orphaned:
# if all the workers are dead, spawn at least one
diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py
index 65538bcabf..f11c8b1fa7 100644
--- a/awx/main/dispatch/worker/base.py
+++ b/awx/main/dispatch/worker/base.py
@@ -121,10 +121,9 @@ class AWXConsumerBase(object):
if time.time() - self.last_stats > 1: # buffer stat recording to once per second
try:
self.redis.set(f'awx_{self.name}_statistics', self.pool.debug())
- self.last_stats = time.time()
except Exception:
logger.exception(f"encountered an error communicating with redis to store {self.name} statistics")
- self.last_stats = time.time()
+ self.last_stats = time.time()
def run(self, *args, **kwargs):
signal.signal(signal.SIGINT, self.stop)
@@ -175,9 +174,12 @@ class AWXConsumerPG(AWXConsumerBase):
# record subsystem metrics for the dispatcher
if current_time - self.last_metrics_gather > 20:
- self.pool.produce_subsystem_metrics(self.subsystem_metrics)
- self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather))
- self.subsystem_metrics.pipe_execute()
+ try:
+ self.pool.produce_subsystem_metrics(self.subsystem_metrics)
+ self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather))
+ self.subsystem_metrics.pipe_execute()
+ except Exception:
+ logger.exception(f"encountered an error trying to store {self.name} metrics")
self.listen_cumulative_time = 0.0
self.last_metrics_gather = current_time
@@ -250,8 +252,8 @@ class BaseWorker(object):
break
except QueueEmpty:
continue
- except Exception as e:
- logger.error("Exception on worker {}, restarting: ".format(idx) + str(e))
+ except Exception:
+ logger.exception("Exception on worker {}, reconnecting: ".format(idx))
continue
try:
for conn in db.connections.all():
diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py
index 880d1e9e71..8695c5823f 100644
--- a/awx/main/scheduler/task_manager.py
+++ b/awx/main/scheduler/task_manager.py
@@ -102,27 +102,33 @@ class TaskBase:
def record_aggregate_metrics(self, *args):
if not is_testing():
- # increment task_manager_schedule_calls regardless if the other
- # metrics are recorded
- s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1)
- # Only record metrics if the last time recording was more
- # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
- # Prevents a short-duration task manager that runs directly after a
- # long task manager to override useful metrics.
- current_time = time.time()
- time_last_recorded = current_time - self.subsystem_metrics.decode(f"{self.prefix}_recorded_timestamp")
- if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL:
- logger.debug(f"recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
- self.subsystem_metrics.set(f"{self.prefix}_recorded_timestamp", current_time)
- self.subsystem_metrics.pipe_execute()
- else:
- logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
+ try:
+ # increment task_manager_schedule_calls regardless if the other
+ # metrics are recorded
+ s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1)
+ # Only record metrics if the last time recording was more
+ # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
+ # Prevents a short-duration task manager that runs directly after a
+ # long task manager to override useful metrics.
+ current_time = time.time()
+ time_last_recorded = current_time - self.subsystem_metrics.decode(f"{self.prefix}_recorded_timestamp")
+ if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL:
+ logger.debug(f"recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
+ self.subsystem_metrics.set(f"{self.prefix}_recorded_timestamp", current_time)
+ self.subsystem_metrics.pipe_execute()
+ else:
+ logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
+ except Exception:
+ logger.exception(f"Error saving metrics for {self.prefix}")
def record_aggregate_metrics_and_exit(self, *args):
self.record_aggregate_metrics()
sys.exit(1)
def schedule(self):
+ # Always be able to restore the original signal handler if we finish
+ original_sigusr1 = signal.getsignal(signal.SIGUSR1)
+
# Lock
with task_manager_bulk_reschedule():
with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired:
@@ -131,9 +137,14 @@ class TaskBase:
logger.debug(f"Not running {self.prefix} scheduler, another task holds lock")
return
logger.debug(f"Starting {self.prefix} Scheduler")
- # if sigterm due to timeout, still record metrics
- signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit)
- self._schedule()
+ # if sigusr1 due to timeout, still record metrics
+ signal.signal(signal.SIGUSR1, self.record_aggregate_metrics_and_exit)
+ try:
+ self._schedule()
+ finally:
+ # Reset the signal handler back to the default just in case anything
+ # else uses the same signal for other purposes
+ signal.signal(signal.SIGUSR1, original_sigusr1)
commit_start = time.time()
if self.prefix == "task_manager":