diff options
author | softwarefactory-project-zuul[bot] <softwarefactory-project-zuul[bot]@users.noreply.github.com> | 2019-05-28 22:15:26 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-28 22:15:26 +0200 |
commit | 41f2b83ae2ec477c8d7719226a46095119eb48ce (patch) | |
tree | 575509eaf79576bee40e7e4b26cd1937e07082a6 | |
parent | Merge pull request #3932 from shanemcd/zuul_k8s (diff) | |
parent | add the ability to disable RabbitMQ queue durability (diff) | |
download | awx-41f2b83ae2ec477c8d7719226a46095119eb48ce.tar.xz awx-41f2b83ae2ec477c8d7719226a46095119eb48ce.zip |
Merge pull request #3947 from ryanpetrello/transient-queues
RFC: add the ability to disable RabbitMQ queue durability
Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
-rw-r--r-- | awx/main/conf.py | 10 | ||||
-rw-r--r-- | awx/main/dispatch/control.py | 3 | ||||
-rw-r--r-- | awx/main/dispatch/kombu.py | 42 | ||||
-rw-r--r-- | awx/main/dispatch/publish.py | 4 | ||||
-rw-r--r-- | awx/main/management/commands/run_callback_receiver.py | 3 | ||||
-rw-r--r-- | awx/main/management/commands/run_dispatcher.py | 3 | ||||
-rw-r--r-- | awx/main/queue.py | 3 | ||||
-rw-r--r-- | awx/settings/defaults.py | 1 | ||||
-rw-r--r-- | docs/tasks.md | 2 |
9 files changed, 66 insertions, 5 deletions
diff --git a/awx/main/conf.py b/awx/main/conf.py index 72b8148136..7f7ace83f0 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -570,6 +570,16 @@ register( ) +register( + 'BROKER_DURABILITY', + field_class=fields.BooleanField, + label=_('Message Durability'), + help_text=_('When set (the default), underlying queues will be persisted to disk. Disable this to enable higher message bus throughput.'), + category=_('System'), + category_slug='system', +) + + def logging_validate(serializer, attrs): if not serializer.instance or \ not hasattr(serializer.instance, 'LOG_AGGREGATOR_HOST') or \ diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index f836e0624c..5f081e84f2 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -4,7 +4,8 @@ import socket from django.conf import settings from awx.main.dispatch import get_local_queuename -from kombu import Connection, Queue, Exchange, Producer, Consumer +from awx.main.dispatch.kombu import Connection +from kombu import Queue, Exchange, Producer, Consumer logger = logging.getLogger('awx.main.dispatch') diff --git a/awx/main/dispatch/kombu.py b/awx/main/dispatch/kombu.py new file mode 100644 index 0000000000..94fc7a035e --- /dev/null +++ b/awx/main/dispatch/kombu.py @@ -0,0 +1,42 @@ +from amqp.exceptions import PreconditionFailed +from django.conf import settings +from kombu.connection import Connection as KombuConnection +from kombu.transport import pyamqp + +import logging + +logger = logging.getLogger('awx.main.dispatch') + + +__all__ = ['Connection'] + + +class Connection(KombuConnection): + + def __init__(self, *args, **kwargs): + super(Connection, self).__init__(*args, **kwargs) + class _Channel(pyamqp.Channel): + + def queue_declare(self, queue, *args, **kwargs): + kwargs['durable'] = settings.BROKER_DURABILITY + try: + return super(_Channel, self).queue_declare(queue, *args, **kwargs) + except PreconditionFailed as e: + if "inequivalent arg 'durable'" in getattr(e, 'reply_text', None): + logger.error( + 'queue {} durability is not {}, deleting and recreating'.format( + + queue, + kwargs['durable'] + ) + ) + self.queue_delete(queue) + return super(_Channel, self).queue_declare(queue, *args, **kwargs) + + class _Connection(pyamqp.Connection): + Channel = _Channel + + class _Transport(pyamqp.Transport): + Connection = _Connection + + self.transport_cls = _Transport diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index 12b9664d1e..9bbd7ae45f 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -4,7 +4,9 @@ import sys from uuid import uuid4 from django.conf import settings -from kombu import Connection, Exchange, Producer +from kombu import Exchange, Producer + +from awx.main.dispatch.kombu import Connection logger = logging.getLogger('awx.main.dispatch') diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 3f4eae9341..51608a8b7a 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -3,8 +3,9 @@ from django.conf import settings from django.core.management.base import BaseCommand -from kombu import Connection, Exchange, Queue +from kombu import Exchange, Queue +from awx.main.dispatch.kombu import Connection from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index b881c84348..970446a0e5 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -8,10 +8,11 @@ from django.conf import settings from django.core.cache import cache as django_cache from django.core.management.base import BaseCommand from django.db import connection as django_connection, connections -from kombu import Connection, Exchange, Queue +from kombu import Exchange, Queue from awx.main.dispatch import get_local_queuename, reaper from awx.main.dispatch.control import Control +from awx.main.dispatch.kombu import Connection from awx.main.dispatch.pool import AutoscalePool from awx.main.dispatch.worker import AWXConsumer, TaskWorker diff --git a/awx/main/queue.py b/awx/main/queue.py index 867cb68a8a..0da0e22e48 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -10,7 +10,8 @@ import os from django.conf import settings # Kombu -from kombu import Connection, Exchange, Producer +from awx.main.dispatch.kombu import Connection +from kombu import Exchange, Producer from kombu.serialization import registry __all__ = ['CallbackQueueDispatcher'] diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index e72da9a8bd..8341875aa5 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -436,6 +436,7 @@ DEVSERVER_DEFAULT_PORT = '8013' # Set default ports for live server tests. os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') +BROKER_DURABILITY = True BROKER_POOL_LIMIT = None BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_DEFAULT_QUEUE = 'awx_private_queue' diff --git a/docs/tasks.md b/docs/tasks.md index a84366b15b..74d824ac9f 100644 --- a/docs/tasks.md +++ b/docs/tasks.md @@ -28,6 +28,8 @@ To accomplish this, AWX makes use of a "Task Queue" abstraction. Task Queues ar AWX communicates with these worker processes to mediate between clients and workers. This is done via distributed RabbitMQ queues and the already-acknowledged local queue that the Dispatcher is working through. Simply put: to initiate a task, the client (generally, Python code in the AWX API) publishes a message to a queue, and RabbitMQ then delivers that message to one or more workers. +By default, when AWX creates queues in RabbitMQ, it creates them as *durable* queues in RabbitMQ (which allows for message persistence at the cost of lower performance). For increased message throughput (at the risk of message loss on server restarts), set BROKER_DURABILITY=False, and AWX will create _transient_ queues instead. + Clustered AWX installations consist of multiple workers spread across every node, giving way to high availability and horizontal scaling. |