summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsoftwarefactory-project-zuul[bot] <softwarefactory-project-zuul[bot]@users.noreply.github.com>2019-05-28 22:15:26 +0200
committerGitHub <noreply@github.com>2019-05-28 22:15:26 +0200
commit41f2b83ae2ec477c8d7719226a46095119eb48ce (patch)
tree575509eaf79576bee40e7e4b26cd1937e07082a6
parentMerge pull request #3932 from shanemcd/zuul_k8s (diff)
parentadd the ability to disable RabbitMQ queue durability (diff)
downloadawx-41f2b83ae2ec477c8d7719226a46095119eb48ce.tar.xz
awx-41f2b83ae2ec477c8d7719226a46095119eb48ce.zip
Merge pull request #3947 from ryanpetrello/transient-queues
RFC: add the ability to disable RabbitMQ queue durability Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
-rw-r--r--awx/main/conf.py10
-rw-r--r--awx/main/dispatch/control.py3
-rw-r--r--awx/main/dispatch/kombu.py42
-rw-r--r--awx/main/dispatch/publish.py4
-rw-r--r--awx/main/management/commands/run_callback_receiver.py3
-rw-r--r--awx/main/management/commands/run_dispatcher.py3
-rw-r--r--awx/main/queue.py3
-rw-r--r--awx/settings/defaults.py1
-rw-r--r--docs/tasks.md2
9 files changed, 66 insertions, 5 deletions
diff --git a/awx/main/conf.py b/awx/main/conf.py
index 72b8148136..7f7ace83f0 100644
--- a/awx/main/conf.py
+++ b/awx/main/conf.py
@@ -570,6 +570,16 @@ register(
)
+register(
+ 'BROKER_DURABILITY',
+ field_class=fields.BooleanField,
+ label=_('Message Durability'),
+ help_text=_('When set (the default), underlying queues will be persisted to disk. Disable this to enable higher message bus throughput.'),
+ category=_('System'),
+ category_slug='system',
+)
+
+
def logging_validate(serializer, attrs):
if not serializer.instance or \
not hasattr(serializer.instance, 'LOG_AGGREGATOR_HOST') or \
diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py
index f836e0624c..5f081e84f2 100644
--- a/awx/main/dispatch/control.py
+++ b/awx/main/dispatch/control.py
@@ -4,7 +4,8 @@ import socket
from django.conf import settings
from awx.main.dispatch import get_local_queuename
-from kombu import Connection, Queue, Exchange, Producer, Consumer
+from awx.main.dispatch.kombu import Connection
+from kombu import Queue, Exchange, Producer, Consumer
logger = logging.getLogger('awx.main.dispatch')
diff --git a/awx/main/dispatch/kombu.py b/awx/main/dispatch/kombu.py
new file mode 100644
index 0000000000..94fc7a035e
--- /dev/null
+++ b/awx/main/dispatch/kombu.py
@@ -0,0 +1,42 @@
+from amqp.exceptions import PreconditionFailed
+from django.conf import settings
+from kombu.connection import Connection as KombuConnection
+from kombu.transport import pyamqp
+
+import logging
+
+logger = logging.getLogger('awx.main.dispatch')
+
+
+__all__ = ['Connection']
+
+
+class Connection(KombuConnection):
+
+ def __init__(self, *args, **kwargs):
+ super(Connection, self).__init__(*args, **kwargs)
+ class _Channel(pyamqp.Channel):
+
+ def queue_declare(self, queue, *args, **kwargs):
+ kwargs['durable'] = settings.BROKER_DURABILITY
+ try:
+ return super(_Channel, self).queue_declare(queue, *args, **kwargs)
+ except PreconditionFailed as e:
+ if "inequivalent arg 'durable'" in getattr(e, 'reply_text', None):
+ logger.error(
+ 'queue {} durability is not {}, deleting and recreating'.format(
+
+ queue,
+ kwargs['durable']
+ )
+ )
+ self.queue_delete(queue)
+ return super(_Channel, self).queue_declare(queue, *args, **kwargs)
+
+ class _Connection(pyamqp.Connection):
+ Channel = _Channel
+
+ class _Transport(pyamqp.Transport):
+ Connection = _Connection
+
+ self.transport_cls = _Transport
diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py
index 12b9664d1e..9bbd7ae45f 100644
--- a/awx/main/dispatch/publish.py
+++ b/awx/main/dispatch/publish.py
@@ -4,7 +4,9 @@ import sys
from uuid import uuid4
from django.conf import settings
-from kombu import Connection, Exchange, Producer
+from kombu import Exchange, Producer
+
+from awx.main.dispatch.kombu import Connection
logger = logging.getLogger('awx.main.dispatch')
diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py
index 3f4eae9341..51608a8b7a 100644
--- a/awx/main/management/commands/run_callback_receiver.py
+++ b/awx/main/management/commands/run_callback_receiver.py
@@ -3,8 +3,9 @@
from django.conf import settings
from django.core.management.base import BaseCommand
-from kombu import Connection, Exchange, Queue
+from kombu import Exchange, Queue
+from awx.main.dispatch.kombu import Connection
from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker
diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py
index b881c84348..970446a0e5 100644
--- a/awx/main/management/commands/run_dispatcher.py
+++ b/awx/main/management/commands/run_dispatcher.py
@@ -8,10 +8,11 @@ from django.conf import settings
from django.core.cache import cache as django_cache
from django.core.management.base import BaseCommand
from django.db import connection as django_connection, connections
-from kombu import Connection, Exchange, Queue
+from kombu import Exchange, Queue
from awx.main.dispatch import get_local_queuename, reaper
from awx.main.dispatch.control import Control
+from awx.main.dispatch.kombu import Connection
from awx.main.dispatch.pool import AutoscalePool
from awx.main.dispatch.worker import AWXConsumer, TaskWorker
diff --git a/awx/main/queue.py b/awx/main/queue.py
index 867cb68a8a..0da0e22e48 100644
--- a/awx/main/queue.py
+++ b/awx/main/queue.py
@@ -10,7 +10,8 @@ import os
from django.conf import settings
# Kombu
-from kombu import Connection, Exchange, Producer
+from awx.main.dispatch.kombu import Connection
+from kombu import Exchange, Producer
from kombu.serialization import registry
__all__ = ['CallbackQueueDispatcher']
diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py
index e72da9a8bd..8341875aa5 100644
--- a/awx/settings/defaults.py
+++ b/awx/settings/defaults.py
@@ -436,6 +436,7 @@ DEVSERVER_DEFAULT_PORT = '8013'
# Set default ports for live server tests.
os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')
+BROKER_DURABILITY = True
BROKER_POOL_LIMIT = None
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_DEFAULT_QUEUE = 'awx_private_queue'
diff --git a/docs/tasks.md b/docs/tasks.md
index a84366b15b..74d824ac9f 100644
--- a/docs/tasks.md
+++ b/docs/tasks.md
@@ -28,6 +28,8 @@ To accomplish this, AWX makes use of a "Task Queue" abstraction. Task Queues ar
AWX communicates with these worker processes to mediate between clients and workers. This is done via distributed RabbitMQ queues and the already-acknowledged local queue that the Dispatcher is working through. Simply put: to initiate a task, the client (generally, Python code in the AWX API) publishes a message to a queue, and RabbitMQ then delivers that message to one or more workers.
+By default, when AWX creates queues in RabbitMQ, it creates them as *durable* queues in RabbitMQ (which allows for message persistence at the cost of lower performance). For increased message throughput (at the risk of message loss on server restarts), set BROKER_DURABILITY=False, and AWX will create _transient_ queues instead.
+
Clustered AWX installations consist of multiple workers spread across every
node, giving way to high availability and horizontal scaling.