summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris Meyers <chris.meyers.fsu@gmail.com>2024-01-15 15:14:30 +0100
committerChris Meyers <chrismeyersfsu@users.noreply.github.com>2024-02-05 21:17:24 +0100
commit8a902debd569134c6ae10cd26d10ddbe6b4dd7dd (patch)
treec30c42f8ff53075181af63067329b5afae66d809
parentUI rename Endpoints to Listener Addresses (diff)
downloadawx-8a902debd569134c6ae10cd26d10ddbe6b4dd7dd.tar.xz
awx-8a902debd569134c6ae10cd26d10ddbe6b4dd7dd.zip
Per-service metrics http server
* Organize metrics into their respective service * Server per-service metrics on a per-service http server * Increase prometheus client usage over our custom metrics fields
-rw-r--r--awx/main/analytics/analytics_tasks.py5
-rw-r--r--awx/main/analytics/subsystem_metrics.py228
-rw-r--r--awx/main/consumers.py2
-rw-r--r--awx/main/dispatch/worker/base.py2
-rw-r--r--awx/main/dispatch/worker/callback.py2
-rw-r--r--awx/main/management/commands/run_callback_receiver.py4
-rw-r--r--awx/main/management/commands/run_dispatcher.py3
-rw-r--r--awx/main/management/commands/run_wsrelay.py3
-rw-r--r--awx/main/scheduler/task_manager.py4
-rw-r--r--awx/main/tasks/system.py4
-rw-r--r--awx/main/wsrelay.py2
-rw-r--r--awx/settings/defaults.py29
12 files changed, 217 insertions, 71 deletions
diff --git a/awx/main/analytics/analytics_tasks.py b/awx/main/analytics/analytics_tasks.py
index 3072577466..6aa08ab9a4 100644
--- a/awx/main/analytics/analytics_tasks.py
+++ b/awx/main/analytics/analytics_tasks.py
@@ -2,7 +2,7 @@
import logging
# AWX
-from awx.main.analytics.subsystem_metrics import Metrics
+from awx.main.analytics.subsystem_metrics import DispatcherMetrics, CallbackReceiverMetrics
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename
@@ -11,4 +11,5 @@ logger = logging.getLogger('awx.main.scheduler')
@task(queue=get_task_queuename)
def send_subsystem_metrics():
- Metrics().send_metrics()
+ DispatcherMetrics().send_metrics()
+ CallbackReceiverMetrics().send_metrics()
diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py
index 9b93b98bda..2662a257fc 100644
--- a/awx/main/analytics/subsystem_metrics.py
+++ b/awx/main/analytics/subsystem_metrics.py
@@ -1,10 +1,15 @@
+import itertools
import redis
import json
import time
import logging
+import prometheus_client
+from prometheus_client.core import GaugeMetricFamily, HistogramMetricFamily
+from prometheus_client.registry import CollectorRegistry
from django.conf import settings
-from django.apps import apps
+from django.http import HttpRequest
+from rest_framework.request import Request
from awx.main.consumers import emit_channel_notification
from awx.main.utils import is_testing
@@ -13,6 +18,30 @@ root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX
logger = logging.getLogger('awx.main.analytics')
+class MetricsNamespace:
+ def __init__(self, namespace):
+ self._namespace = namespace
+
+
+class MetricsServerSettings(MetricsNamespace):
+ def port(self):
+ return settings.METRICS_SUBSYSTEM_CONFIG['server'][self._namespace]['port']
+
+
+class MetricsServer(MetricsServerSettings):
+ def __init__(self, namespace, registry):
+ MetricsNamespace.__init__(self, namespace)
+ self._registry = registry
+
+ def start(self):
+ try:
+ # TODO: addr for ipv6 ?
+ prometheus_client.start_http_server(self.port(), addr='localhost', registry=self._registry)
+ except Exception:
+ logger.error(f"MetricsServer failed to start for service '{self._namespace}.")
+ raise
+
+
class BaseM:
def __init__(self, field, help_text):
self.field = field
@@ -148,76 +177,40 @@ class HistogramM(BaseM):
return output_text
-class Metrics:
- def __init__(self, auto_pipe_execute=False, instance_name=None):
+class Metrics(MetricsNamespace):
+ # metric name, help_text
+ METRICSLIST = []
+ _METRICSLIST = [
+ FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
+ IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
+ FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
+ ]
+
+ def __init__(self, namespace, auto_pipe_execute=False, instance_name=None, metrics_have_changed=True, **kwargs):
+ MetricsNamespace.__init__(self, namespace)
+
self.pipe = redis.Redis.from_url(settings.BROKER_URL).pipeline()
self.conn = redis.Redis.from_url(settings.BROKER_URL)
self.last_pipe_execute = time.time()
# track if metrics have been modified since last saved to redis
# start with True so that we get an initial save to redis
- self.metrics_have_changed = True
+ self.metrics_have_changed = metrics_have_changed
self.pipe_execute_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SAVE_TO_REDIS
self.send_metrics_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SEND_METRICS
# auto pipe execute will commit transaction of metric data to redis
# at a regular interval (pipe_execute_interval). If set to False,
# the calling function should call .pipe_execute() explicitly
self.auto_pipe_execute = auto_pipe_execute
- Instance = apps.get_model('main', 'Instance')
if instance_name:
self.instance_name = instance_name
elif is_testing():
self.instance_name = "awx_testing"
else:
- self.instance_name = Instance.objects.my_hostname()
-
- # metric name, help_text
- METRICSLIST = [
- SetIntM('callback_receiver_events_queue_size_redis', 'Current number of events in redis queue'),
- IntM('callback_receiver_events_popped_redis', 'Number of events popped from redis'),
- IntM('callback_receiver_events_in_memory', 'Current number of events in memory (in transfer from redis to db)'),
- IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'),
- FloatM('callback_receiver_events_insert_db_seconds', 'Total time spent saving events to database'),
- IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'),
- IntM('callback_receiver_events_broadcast', 'Number of events broadcast to other control plane nodes'),
- HistogramM(
- 'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS
- ),
- SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'),
- FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
- IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
- FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
- SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading tasks from db'),
- SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'),
- SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'),
- SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'),
- SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
- IntM('task_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
- SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
- SetIntM('task_manager_tasks_started', 'Number of tasks started'),
- SetIntM('task_manager_running_processed', 'Number of running tasks processed'),
- SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'),
- SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'),
- SetFloatM('task_manager_commit_seconds', 'Time spent in db transaction, including on_commit calls'),
- SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent loading pending tasks from db'),
- SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
- SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
- IntM('dependency_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
- SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
- SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'),
- SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
- IntM('workflow_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
- SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
- SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'),
- SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'),
- # dispatcher subsystem metrics
- SetIntM('dispatcher_pool_scale_up_events', 'Number of times local dispatcher scaled up a worker since startup'),
- SetIntM('dispatcher_pool_active_task_count', 'Number of active tasks in the worker pool when last task was submitted'),
- SetIntM('dispatcher_pool_max_worker_count', 'Highest number of workers in worker pool in last collection interval, about 20s'),
- SetFloatM('dispatcher_availability', 'Fraction of time (in last collection interval) dispatcher was able to receive messages'),
- ]
+ self.instance_name = settings.CLUSTER_HOST_ID # Same as Instance.objects.my_hostname() BUT we do not need to import Instance
+
# turn metric list into dictionary with the metric name as a key
self.METRICS = {}
- for m in METRICSLIST:
+ for m in itertools.chain(self.METRICSLIST, self._METRICSLIST):
self.METRICS[m.field] = m
# track last time metrics were sent to other nodes
@@ -230,7 +223,7 @@ class Metrics:
m.reset_value(self.conn)
self.metrics_have_changed = True
self.conn.delete(root_key + "_lock")
- for m in self.conn.scan_iter(root_key + '_instance_*'):
+ for m in self.conn.scan_iter(root_key + '-' + self._namespace + '_instance_*'):
self.conn.delete(m)
def inc(self, field, value):
@@ -297,7 +290,7 @@ class Metrics:
def send_metrics(self):
# more than one thread could be calling this at the same time, so should
# acquire redis lock before sending metrics
- lock = self.conn.lock(root_key + '_lock')
+ lock = self.conn.lock(root_key + '-' + self._namespace + '_lock')
if not lock.acquire(blocking=False):
return
try:
@@ -307,9 +300,10 @@ class Metrics:
payload = {
'instance': self.instance_name,
'metrics': serialized_metrics,
+ 'metrics_namespace': self._namespace,
}
# store the serialized data locally as well, so that load_other_metrics will read it
- self.conn.set(root_key + '_instance_' + self.instance_name, serialized_metrics)
+ self.conn.set(root_key + '-' + self._namespace + '_instance_' + self.instance_name, serialized_metrics)
emit_channel_notification("metrics", payload)
self.previous_send_metrics.set(current_time)
@@ -331,14 +325,14 @@ class Metrics:
instances_filter = request.query_params.getlist("node")
# get a sorted list of instance names
instance_names = [self.instance_name]
- for m in self.conn.scan_iter(root_key + '_instance_*'):
+ for m in self.conn.scan_iter(root_key + '-' + self._namespace + '_instance_*'):
instance_names.append(m.decode('UTF-8').split('_instance_')[1])
instance_names.sort()
# load data, including data from the this local instance
instance_data = {}
for instance in instance_names:
if len(instances_filter) == 0 or instance in instances_filter:
- instance_data_from_redis = self.conn.get(root_key + '_instance_' + instance)
+ instance_data_from_redis = self.conn.get(root_key + '-' + self._namespace + '_instance_' + instance)
# data from other instances may not be available. That is OK.
if instance_data_from_redis:
instance_data[instance] = json.loads(instance_data_from_redis.decode('UTF-8'))
@@ -357,6 +351,120 @@ class Metrics:
return output_text
+class DispatcherMetrics(Metrics):
+ METRICSLIST = [
+ SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading tasks from db'),
+ SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'),
+ SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'),
+ SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'),
+ SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
+ IntM('task_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
+ SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
+ SetIntM('task_manager_tasks_started', 'Number of tasks started'),
+ SetIntM('task_manager_running_processed', 'Number of running tasks processed'),
+ SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'),
+ SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'),
+ SetFloatM('task_manager_commit_seconds', 'Time spent in db transaction, including on_commit calls'),
+ SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent loading pending tasks from db'),
+ SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
+ SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
+ IntM('dependency_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
+ SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
+ SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'),
+ SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
+ IntM('workflow_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
+ SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
+ SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'),
+ SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'),
+ # dispatcher subsystem metrics
+ SetIntM('dispatcher_pool_scale_up_events', 'Number of times local dispatcher scaled up a worker since startup'),
+ SetIntM('dispatcher_pool_active_task_count', 'Number of active tasks in the worker pool when last task was submitted'),
+ SetIntM('dispatcher_pool_max_worker_count', 'Highest number of workers in worker pool in last collection interval, about 20s'),
+ SetFloatM('dispatcher_availability', 'Fraction of time (in last collection interval) dispatcher was able to receive messages'),
+ ]
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(settings.METRICS_SERVICE_DISPATCHER, *args, **kwargs)
+
+
+class CallbackReceiverMetrics(Metrics):
+ METRICSLIST = [
+ SetIntM('callback_receiver_events_queue_size_redis', 'Current number of events in redis queue'),
+ IntM('callback_receiver_events_popped_redis', 'Number of events popped from redis'),
+ IntM('callback_receiver_events_in_memory', 'Current number of events in memory (in transfer from redis to db)'),
+ IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'),
+ FloatM('callback_receiver_events_insert_db_seconds', 'Total time spent saving events to database'),
+ IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'),
+ IntM('callback_receiver_events_broadcast', 'Number of events broadcast to other control plane nodes'),
+ HistogramM(
+ 'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS
+ ),
+ SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'),
+ ]
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(settings.METRICS_SERVICE_CALLBACK_RECEIVER, *args, **kwargs)
+
+
def metrics(request):
- m = Metrics()
- return m.generate_metrics(request)
+ output_text = ''
+ for m in [DispatcherMetrics(), CallbackReceiverMetrics()]:
+ output_text += m.generate_metrics(request)
+ return output_text
+
+
+class CustomToPrometheusMetricsCollector(prometheus_client.registry.Collector):
+ """
+ Takes the metric data from redis -> our custom metric fields -> prometheus
+ library metric fields.
+
+ The plan is to get rid of the use of redis, our custom metric fields, and
+ to switch fully to the prometheus library. At that point, this translation
+ code will be deleted.
+ """
+
+ def __init__(self, metrics_obj, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._metrics = metrics_obj
+
+ def collect(self):
+ my_hostname = settings.CLUSTER_HOST_ID
+
+ instance_data = self._metrics.load_other_metrics(Request(HttpRequest()))
+ if not instance_data:
+ logger.debug(f"No metric data not found in redis for metric namespace '{self._metrics._namespace}'")
+ return None
+
+ host_metrics = instance_data.get(my_hostname)
+ for _, metric in self._metrics.METRICS.items():
+ entry = host_metrics.get(metric.field)
+ if not entry:
+ logger.debug(f"{self._metrics._namespace} metric '{metric.field}' not found in redis data payload {json.dumps(instance_data, indent=2)}")
+ continue
+ if isinstance(metric, HistogramM):
+ buckets = list(zip(metric.buckets, entry['counts']))
+ buckets = [[str(i[0]), str(i[1])] for i in buckets]
+ yield HistogramMetricFamily(metric.field, metric.help_text, buckets=buckets, sum_value=entry['sum'])
+ else:
+ yield GaugeMetricFamily(metric.field, metric.help_text, value=entry)
+
+
+class CallbackReceiverMetricsServer(MetricsServer):
+ def __init__(self):
+ registry = CollectorRegistry(auto_describe=True)
+ registry.register(CustomToPrometheusMetricsCollector(DispatcherMetrics(metrics_have_changed=False)))
+ super().__init__(settings.METRICS_SERVICE_CALLBACK_RECEIVER, registry)
+
+
+class DispatcherMetricsServer(MetricsServer):
+ def __init__(self):
+ registry = CollectorRegistry(auto_describe=True)
+ registry.register(CustomToPrometheusMetricsCollector(CallbackReceiverMetrics(metrics_have_changed=False)))
+ super().__init__(settings.METRICS_SERVICE_DISPATCHER, registry)
+
+
+class WebsocketsMetricsServer(MetricsServer):
+ def __init__(self):
+ registry = CollectorRegistry(auto_describe=True)
+ # registry.register()
+ super().__init__(settings.METRICS_SERVICE_WEBSOCKETS, registry)
diff --git a/awx/main/consumers.py b/awx/main/consumers.py
index f856ca915e..ccaf46e0d9 100644
--- a/awx/main/consumers.py
+++ b/awx/main/consumers.py
@@ -106,7 +106,7 @@ class RelayConsumer(AsyncJsonWebsocketConsumer):
if group == "metrics":
message = json.loads(message['text'])
conn = redis.Redis.from_url(settings.BROKER_URL)
- conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "_instance_" + message['instance'], message['metrics'])
+ conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "-" + message['metrics_namespace'] + "_instance_" + message['instance'], message['metrics'])
else:
await self.channel_layer.group_send(group, message)
diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py
index 05b51b2930..691028cd42 100644
--- a/awx/main/dispatch/worker/base.py
+++ b/awx/main/dispatch/worker/base.py
@@ -168,7 +168,7 @@ class AWXConsumerPG(AWXConsumerBase):
init_time = time.time()
self.pg_down_time = init_time - self.pg_max_wait # allow no grace period
self.last_cleanup = init_time
- self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
+ self.subsystem_metrics = s_metrics.DispatcherMetrics(auto_pipe_execute=False)
self.last_metrics_gather = init_time
self.listen_cumulative_time = 0.0
if schedule:
diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py
index 8dca8dd894..199302c76c 100644
--- a/awx/main/dispatch/worker/callback.py
+++ b/awx/main/dispatch/worker/callback.py
@@ -72,7 +72,7 @@ class CallbackBrokerWorker(BaseWorker):
def __init__(self):
self.buff = {}
self.redis = redis.Redis.from_url(settings.BROKER_URL)
- self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
+ self.subsystem_metrics = s_metrics.CallbackReceiverMetrics(auto_pipe_execute=False)
self.queue_pop = 0
self.queue_name = settings.CALLBACK_QUEUE
self.prof = AWXProfiler("CallbackBrokerWorker")
diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py
index cb3ab781b5..6ab2158cae 100644
--- a/awx/main/management/commands/run_callback_receiver.py
+++ b/awx/main/management/commands/run_callback_receiver.py
@@ -3,6 +3,7 @@
from django.conf import settings
from django.core.management.base import BaseCommand
+from awx.main.analytics.subsystem_metrics import CallbackReceiverMetricsServer
from awx.main.dispatch.control import Control
from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker
@@ -25,6 +26,9 @@ class Command(BaseCommand):
print(Control('callback_receiver').status())
return
consumer = None
+
+ CallbackReceiverMetricsServer().start()
+
try:
consumer = AWXConsumerRedis(
'callback_receiver',
diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py
index 111b5ab0e1..28d954abff 100644
--- a/awx/main/management/commands/run_dispatcher.py
+++ b/awx/main/management/commands/run_dispatcher.py
@@ -10,6 +10,7 @@ from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.control import Control
from awx.main.dispatch.pool import AutoscalePool
from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker
+from awx.main.analytics.subsystem_metrics import DispatcherMetricsServer
logger = logging.getLogger('awx.main.dispatch')
@@ -62,6 +63,8 @@ class Command(BaseCommand):
consumer = None
+ DispatcherMetricsServer().start()
+
try:
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE)
diff --git a/awx/main/management/commands/run_wsrelay.py b/awx/main/management/commands/run_wsrelay.py
index 90edafdfc5..2bf7e8b898 100644
--- a/awx/main/management/commands/run_wsrelay.py
+++ b/awx/main/management/commands/run_wsrelay.py
@@ -16,6 +16,7 @@ from awx.main.analytics.broadcast_websocket import (
RelayWebsocketStatsManager,
safe_name,
)
+from awx.main.analytics.subsystem_metrics import WebsocketsMetricsServer
from awx.main.wsrelay import WebSocketRelayManager
@@ -91,6 +92,8 @@ class Command(BaseCommand):
return host_stats
def handle(self, *arg, **options):
+ WebsocketsMetricsServer().start()
+
# it's necessary to delay this import in case
# database migrations are still running
from awx.main.models.ha import Instance
diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py
index d064549012..7e72d4cb8e 100644
--- a/awx/main/scheduler/task_manager.py
+++ b/awx/main/scheduler/task_manager.py
@@ -68,7 +68,7 @@ class TaskBase:
# initialize each metric to 0 and force metric_has_changed to true. This
# ensures each task manager metric will be overridden when pipe_execute
# is called later.
- self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
+ self.subsystem_metrics = s_metrics.DispatcherMetrics(auto_pipe_execute=False)
self.start_time = time.time()
# We want to avoid calling settings in loops, so cache these settings at init time
@@ -105,7 +105,7 @@ class TaskBase:
try:
# increment task_manager_schedule_calls regardless if the other
# metrics are recorded
- s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1)
+ s_metrics.DispatcherMetrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1)
# Only record metrics if the last time recording was more
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
# Prevents a short-duration task manager that runs directly after a
diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py
index 4614b7b4a4..4ebe1d5766 100644
--- a/awx/main/tasks/system.py
+++ b/awx/main/tasks/system.py
@@ -62,7 +62,7 @@ from awx.main.tasks.receptor import get_receptor_ctl, worker_info, worker_cleanu
from awx.main.consumers import emit_channel_notification
from awx.main import analytics
from awx.conf import settings_registry
-from awx.main.analytics.subsystem_metrics import Metrics
+from awx.main.analytics.subsystem_metrics import DispatcherMetrics
from rest_framework.exceptions import PermissionDenied
@@ -113,7 +113,7 @@ def dispatch_startup():
cluster_node_heartbeat()
reaper.startup_reaping()
reaper.reap_waiting(grace_period=0)
- m = Metrics()
+ m = DispatcherMetrics()
m.reset_values()
diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py
index 1a5f727b5d..5f3d165153 100644
--- a/awx/main/wsrelay.py
+++ b/awx/main/wsrelay.py
@@ -20,7 +20,6 @@ from awx.main.analytics.broadcast_websocket import (
RelayWebsocketStats,
RelayWebsocketStatsManager,
)
-import awx.main.analytics.subsystem_metrics as s_metrics
logger = logging.getLogger('awx.main.wsrelay')
@@ -54,7 +53,6 @@ class WebsocketRelayConnection:
self.protocol = protocol
self.verify_ssl = verify_ssl
self.channel_layer = None
- self.subsystem_metrics = s_metrics.Metrics(instance_name=name)
self.producers = dict()
self.connected = False
diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py
index f3f8f1881b..589c382b92 100644
--- a/awx/settings/defaults.py
+++ b/awx/settings/defaults.py
@@ -1076,6 +1076,35 @@ HOST_METRIC_SUMMARY_TASK_LAST_TS = None
HOST_METRIC_SUMMARY_TASK_INTERVAL = 7 # days
+# TODO: cmeyers, replace with with register pattern
+# The register pattern is particularly nice for this because we need
+# to know the process to start the thread that will be the server.
+# The registration location should be the same location as we would
+# call MetricsServer.start()
+# Note: if we don't get to this TODO, then at least create constants
+# for the services strings below.
+# TODO: cmeyers, break this out into a separate django app so other
+# projects can take advantage.
+
+METRICS_SERVICE_CALLBACK_RECEIVER = 'callback_receiver'
+METRICS_SERVICE_DISPATCHER = 'dispatcher'
+METRICS_SERVICE_WEBSOCKETS = 'websockets'
+
+METRICS_SUBSYSTEM_CONFIG = {
+ 'server': {
+ METRICS_SERVICE_CALLBACK_RECEIVER: {
+ 'port': 8014,
+ },
+ METRICS_SERVICE_DISPATCHER: {
+ 'port': 8015,
+ },
+ METRICS_SERVICE_WEBSOCKETS: {
+ 'port': 8016,
+ },
+ }
+}
+
+
# django-ansible-base
ANSIBLE_BASE_TEAM_MODEL = 'main.Team'
ANSIBLE_BASE_ORGANIZATION_MODEL = 'main.Organization'