summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--awx/main/tasks.py92
1 files changed, 49 insertions, 43 deletions
diff --git a/awx/main/tasks.py b/awx/main/tasks.py
index f4e8a6ea4c..d8fa4a88c0 100644
--- a/awx/main/tasks.py
+++ b/awx/main/tasks.py
@@ -435,53 +435,59 @@ def awx_isolated_heartbeat():
@task()
def awx_periodic_scheduler():
- run_now = now()
- state = TowerScheduleState.get_solo()
- last_run = state.schedule_last_run
- logger.debug("Last scheduler run was: %s", last_run)
- state.schedule_last_run = run_now
- state.save()
-
- old_schedules = Schedule.objects.enabled().before(last_run)
- for schedule in old_schedules:
- schedule.save()
- schedules = Schedule.objects.enabled().between(last_run, run_now)
-
- invalid_license = False
- try:
- access_registry[Job](None).check_license()
- except PermissionDenied as e:
- invalid_license = e
-
- for schedule in schedules:
- template = schedule.unified_job_template
- schedule.save() # To update next_run timestamp.
- if template.cache_timeout_blocked:
- logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
- continue
- try:
- job_kwargs = schedule.get_job_kwargs()
- new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs)
- logger.info('Spawned {} from schedule {}-{}.'.format(
- new_unified_job.log_format, schedule.name, schedule.pk))
+ with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired:
+ if acquired is False:
+ logger.debug("Not running periodic scheduler, another task holds lock")
+ return
+ logger.debug("Starting periodic scheduler")
- if invalid_license:
+ run_now = now()
+ state = TowerScheduleState.get_solo()
+ last_run = state.schedule_last_run
+ logger.debug("Last scheduler run was: %s", last_run)
+ state.schedule_last_run = run_now
+ state.save()
+
+ old_schedules = Schedule.objects.enabled().before(last_run)
+ for schedule in old_schedules:
+ schedule.save()
+ schedules = Schedule.objects.enabled().between(last_run, run_now)
+
+ invalid_license = False
+ try:
+ access_registry[Job](None).check_license()
+ except PermissionDenied as e:
+ invalid_license = e
+
+ for schedule in schedules:
+ template = schedule.unified_job_template
+ schedule.save() # To update next_run timestamp.
+ if template.cache_timeout_blocked:
+ logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
+ continue
+ try:
+ job_kwargs = schedule.get_job_kwargs()
+ new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs)
+ logger.info(six.text_type('Spawned {} from schedule {}-{}.').format(
+ new_unified_job.log_format, schedule.name, schedule.pk))
+
+ if invalid_license:
+ new_unified_job.status = 'failed'
+ new_unified_job.job_explanation = str(invalid_license)
+ new_unified_job.save(update_fields=['status', 'job_explanation'])
+ new_unified_job.websocket_emit_status("failed")
+ raise invalid_license
+ can_start = new_unified_job.signal_start()
+ except Exception:
+ logger.exception('Error spawning scheduled job.')
+ continue
+ if not can_start:
new_unified_job.status = 'failed'
- new_unified_job.job_explanation = str(invalid_license)
+ new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
new_unified_job.save(update_fields=['status', 'job_explanation'])
new_unified_job.websocket_emit_status("failed")
- raise invalid_license
- can_start = new_unified_job.signal_start()
- except Exception:
- logger.exception('Error spawning scheduled job.')
- continue
- if not can_start:
- new_unified_job.status = 'failed'
- new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
- new_unified_job.save(update_fields=['status', 'job_explanation'])
- new_unified_job.websocket_emit_status("failed")
- emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
- state.save()
+ emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
+ state.save()
@task()