1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# Copyright (c) 2017 Ansible by Red Hat
# All Rights Reserved.
import mock
import pytest
from django.utils.timezone import now as tz_now
from django.db import DatabaseError
from awx.main.scheduler import TaskManager
from awx.main.models import (
Job,
Instance,
InstanceGroup,
)
from django.core.cache import cache
class TestCleanupInconsistentCeleryTasks():
@mock.patch.object(cache, 'get', return_value=None)
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {}))
@mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, []))
@mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[])
@mock.patch.object(Instance.objects, 'filter', return_value=mock.MagicMock(first=lambda: None))
@mock.patch('awx.main.scheduler.task_manager.logger')
def test_instance_does_not_exist(self, logger_mock, *args):
logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))
tm = TaskManager()
with pytest.raises(RuntimeError) as excinfo:
tm.cleanup_inconsistent_celery_tasks()
assert "mocked" in str(excinfo.value)
logger_mock.error.assert_called_once_with("Execution node Instance host1 not found in database. "
"The node is currently executing jobs ['job 2 (new)', "
"'job 3 (new)']")
@mock.patch.object(cache, 'get', return_value=None)
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []}))
@mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[])
@mock.patch.object(TaskManager, 'get_running_tasks')
@mock.patch('awx.main.scheduler.task_manager.logger')
def test_save_failed(self, logger_mock, get_running_tasks, *args):
logger_mock.error = mock.MagicMock()
job = Job(id=2, modified=tz_now(), status='running', celery_task_id='blah', execution_node='host1')
job.websocket_emit_status = mock.MagicMock()
get_running_tasks.return_value = ({'host1': [job]}, [])
tm = TaskManager()
with mock.patch.object(job, 'save', side_effect=DatabaseError):
tm.cleanup_inconsistent_celery_tasks()
job.save.assert_called_once()
logger_mock.error.assert_called_once_with("Task job 2 (failed) DB error in marking failed. Job possibly deleted.")
@mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[])
@mock.patch('awx.main.scheduler.task_manager.inspect')
def test_multiple_active_instances_sanity_check(self, inspect_mock, *args):
class MockInspector:
pass
mock_inspector = MockInspector()
mock_inspector.active = lambda: {
'celery@host1': [],
'celery@host2': []
}
inspect_mock.return_value = mock_inspector
tm = TaskManager()
active_task_queues, queues = tm.get_active_tasks()
assert 'host1' in queues
assert 'host2' in queues
|