diff options
author | Rick Elrod <rick@elrod.me> | 2023-07-18 17:43:46 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-07-18 17:43:46 +0200 |
commit | 48edb15a03bf3cae94405559693ea14a0ee6f2fd (patch) | |
tree | f2d341bd2bde9ba00351e9962dff319a7075f008 | |
parent | Changing how associations work in awx collection (#13626) (diff) | |
download | awx-48edb15a03bf3cae94405559693ea14a0ee6f2fd.tar.xz awx-48edb15a03bf3cae94405559693ea14a0ee6f2fd.zip |
Prevent Dispatcher deadlock when Redis disappears (#14249)
This fixes https://github.com/ansible/awx/issues/14245 which has
more information about this issue.
This change addresses both:
- A clashing signal handler (registering a callback to fire when
the task manager times out, and hitting that callback in cases
where we didn't expect to). Make dispatcher timeout use
SIGUSR1, not SIGTERM.
- Metrics not being reported should not make us crash, so that is
now fixed as well.
Signed-off-by: Rick Elrod <rick@elrod.me>
Co-authored-by: Alan Rominger <arominge@redhat.com>
-rw-r--r-- | awx/main/dispatch/pool.py | 8 | ||||
-rw-r--r-- | awx/main/dispatch/worker/base.py | 16 | ||||
-rw-r--r-- | awx/main/scheduler/task_manager.py | 47 |
3 files changed, 42 insertions, 29 deletions
diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index b8208012b6..16366c3e0c 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -417,16 +417,16 @@ class AutoscalePool(WorkerPool): # the task manager to never do more work current_task = w.current_task if current_task and isinstance(current_task, dict): - endings = ['tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager'] + endings = ('tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager') current_task_name = current_task.get('task', '') - if any(current_task_name.endswith(e) for e in endings): + if current_task_name.endswith(endings): if 'started' not in current_task: w.managed_tasks[current_task['uuid']]['started'] = time.time() age = time.time() - current_task['started'] w.managed_tasks[current_task['uuid']]['age'] = age if age > self.task_manager_timeout: - logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') - os.kill(w.pid, signal.SIGTERM) + logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGUSR1 to {w.pid}') + os.kill(w.pid, signal.SIGUSR1) for m in orphaned: # if all the workers are dead, spawn at least one diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 65538bcabf..f11c8b1fa7 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -121,10 +121,9 @@ class AWXConsumerBase(object): if time.time() - self.last_stats > 1: # buffer stat recording to once per second try: self.redis.set(f'awx_{self.name}_statistics', self.pool.debug()) - self.last_stats = time.time() except Exception: logger.exception(f"encountered an error communicating with redis to store {self.name} statistics") - self.last_stats = time.time() + self.last_stats = time.time() def run(self, *args, **kwargs): signal.signal(signal.SIGINT, self.stop) @@ -175,9 +174,12 @@ class AWXConsumerPG(AWXConsumerBase): # record subsystem metrics for the dispatcher if current_time - self.last_metrics_gather > 20: - self.pool.produce_subsystem_metrics(self.subsystem_metrics) - self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather)) - self.subsystem_metrics.pipe_execute() + try: + self.pool.produce_subsystem_metrics(self.subsystem_metrics) + self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather)) + self.subsystem_metrics.pipe_execute() + except Exception: + logger.exception(f"encountered an error trying to store {self.name} metrics") self.listen_cumulative_time = 0.0 self.last_metrics_gather = current_time @@ -250,8 +252,8 @@ class BaseWorker(object): break except QueueEmpty: continue - except Exception as e: - logger.error("Exception on worker {}, restarting: ".format(idx) + str(e)) + except Exception: + logger.exception("Exception on worker {}, reconnecting: ".format(idx)) continue try: for conn in db.connections.all(): diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 880d1e9e71..8695c5823f 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -102,27 +102,33 @@ class TaskBase: def record_aggregate_metrics(self, *args): if not is_testing(): - # increment task_manager_schedule_calls regardless if the other - # metrics are recorded - s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1) - # Only record metrics if the last time recording was more - # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. - # Prevents a short-duration task manager that runs directly after a - # long task manager to override useful metrics. - current_time = time.time() - time_last_recorded = current_time - self.subsystem_metrics.decode(f"{self.prefix}_recorded_timestamp") - if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL: - logger.debug(f"recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") - self.subsystem_metrics.set(f"{self.prefix}_recorded_timestamp", current_time) - self.subsystem_metrics.pipe_execute() - else: - logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") + try: + # increment task_manager_schedule_calls regardless if the other + # metrics are recorded + s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1) + # Only record metrics if the last time recording was more + # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. + # Prevents a short-duration task manager that runs directly after a + # long task manager to override useful metrics. + current_time = time.time() + time_last_recorded = current_time - self.subsystem_metrics.decode(f"{self.prefix}_recorded_timestamp") + if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL: + logger.debug(f"recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") + self.subsystem_metrics.set(f"{self.prefix}_recorded_timestamp", current_time) + self.subsystem_metrics.pipe_execute() + else: + logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") + except Exception: + logger.exception(f"Error saving metrics for {self.prefix}") def record_aggregate_metrics_and_exit(self, *args): self.record_aggregate_metrics() sys.exit(1) def schedule(self): + # Always be able to restore the original signal handler if we finish + original_sigusr1 = signal.getsignal(signal.SIGUSR1) + # Lock with task_manager_bulk_reschedule(): with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired: @@ -131,9 +137,14 @@ class TaskBase: logger.debug(f"Not running {self.prefix} scheduler, another task holds lock") return logger.debug(f"Starting {self.prefix} Scheduler") - # if sigterm due to timeout, still record metrics - signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) - self._schedule() + # if sigusr1 due to timeout, still record metrics + signal.signal(signal.SIGUSR1, self.record_aggregate_metrics_and_exit) + try: + self._schedule() + finally: + # Reset the signal handler back to the default just in case anything + # else uses the same signal for other purposes + signal.signal(signal.SIGUSR1, original_sigusr1) commit_start = time.time() if self.prefix == "task_manager": |