diff options
author | Chris Meyers <chris.meyers.fsu@gmail.com> | 2024-01-15 15:14:30 +0100 |
---|---|---|
committer | Chris Meyers <chrismeyersfsu@users.noreply.github.com> | 2024-02-05 21:17:24 +0100 |
commit | 8a902debd569134c6ae10cd26d10ddbe6b4dd7dd (patch) | |
tree | c30c42f8ff53075181af63067329b5afae66d809 | |
parent | UI rename Endpoints to Listener Addresses (diff) | |
download | awx-8a902debd569134c6ae10cd26d10ddbe6b4dd7dd.tar.xz awx-8a902debd569134c6ae10cd26d10ddbe6b4dd7dd.zip |
Per-service metrics http server
* Organize metrics into their respective service
* Server per-service metrics on a per-service http server
* Increase prometheus client usage over our custom metrics fields
-rw-r--r-- | awx/main/analytics/analytics_tasks.py | 5 | ||||
-rw-r--r-- | awx/main/analytics/subsystem_metrics.py | 228 | ||||
-rw-r--r-- | awx/main/consumers.py | 2 | ||||
-rw-r--r-- | awx/main/dispatch/worker/base.py | 2 | ||||
-rw-r--r-- | awx/main/dispatch/worker/callback.py | 2 | ||||
-rw-r--r-- | awx/main/management/commands/run_callback_receiver.py | 4 | ||||
-rw-r--r-- | awx/main/management/commands/run_dispatcher.py | 3 | ||||
-rw-r--r-- | awx/main/management/commands/run_wsrelay.py | 3 | ||||
-rw-r--r-- | awx/main/scheduler/task_manager.py | 4 | ||||
-rw-r--r-- | awx/main/tasks/system.py | 4 | ||||
-rw-r--r-- | awx/main/wsrelay.py | 2 | ||||
-rw-r--r-- | awx/settings/defaults.py | 29 |
12 files changed, 217 insertions, 71 deletions
diff --git a/awx/main/analytics/analytics_tasks.py b/awx/main/analytics/analytics_tasks.py index 3072577466..6aa08ab9a4 100644 --- a/awx/main/analytics/analytics_tasks.py +++ b/awx/main/analytics/analytics_tasks.py @@ -2,7 +2,7 @@ import logging # AWX -from awx.main.analytics.subsystem_metrics import Metrics +from awx.main.analytics.subsystem_metrics import DispatcherMetrics, CallbackReceiverMetrics from awx.main.dispatch.publish import task from awx.main.dispatch import get_task_queuename @@ -11,4 +11,5 @@ logger = logging.getLogger('awx.main.scheduler') @task(queue=get_task_queuename) def send_subsystem_metrics(): - Metrics().send_metrics() + DispatcherMetrics().send_metrics() + CallbackReceiverMetrics().send_metrics() diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 9b93b98bda..2662a257fc 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -1,10 +1,15 @@ +import itertools import redis import json import time import logging +import prometheus_client +from prometheus_client.core import GaugeMetricFamily, HistogramMetricFamily +from prometheus_client.registry import CollectorRegistry from django.conf import settings -from django.apps import apps +from django.http import HttpRequest +from rest_framework.request import Request from awx.main.consumers import emit_channel_notification from awx.main.utils import is_testing @@ -13,6 +18,30 @@ root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX logger = logging.getLogger('awx.main.analytics') +class MetricsNamespace: + def __init__(self, namespace): + self._namespace = namespace + + +class MetricsServerSettings(MetricsNamespace): + def port(self): + return settings.METRICS_SUBSYSTEM_CONFIG['server'][self._namespace]['port'] + + +class MetricsServer(MetricsServerSettings): + def __init__(self, namespace, registry): + MetricsNamespace.__init__(self, namespace) + self._registry = registry + + def start(self): + try: + # TODO: addr for ipv6 ? + prometheus_client.start_http_server(self.port(), addr='localhost', registry=self._registry) + except Exception: + logger.error(f"MetricsServer failed to start for service '{self._namespace}.") + raise + + class BaseM: def __init__(self, field, help_text): self.field = field @@ -148,76 +177,40 @@ class HistogramM(BaseM): return output_text -class Metrics: - def __init__(self, auto_pipe_execute=False, instance_name=None): +class Metrics(MetricsNamespace): + # metric name, help_text + METRICSLIST = [] + _METRICSLIST = [ + FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'), + IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'), + FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'), + ] + + def __init__(self, namespace, auto_pipe_execute=False, instance_name=None, metrics_have_changed=True, **kwargs): + MetricsNamespace.__init__(self, namespace) + self.pipe = redis.Redis.from_url(settings.BROKER_URL).pipeline() self.conn = redis.Redis.from_url(settings.BROKER_URL) self.last_pipe_execute = time.time() # track if metrics have been modified since last saved to redis # start with True so that we get an initial save to redis - self.metrics_have_changed = True + self.metrics_have_changed = metrics_have_changed self.pipe_execute_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SAVE_TO_REDIS self.send_metrics_interval = settings.SUBSYSTEM_METRICS_INTERVAL_SEND_METRICS # auto pipe execute will commit transaction of metric data to redis # at a regular interval (pipe_execute_interval). If set to False, # the calling function should call .pipe_execute() explicitly self.auto_pipe_execute = auto_pipe_execute - Instance = apps.get_model('main', 'Instance') if instance_name: self.instance_name = instance_name elif is_testing(): self.instance_name = "awx_testing" else: - self.instance_name = Instance.objects.my_hostname() - - # metric name, help_text - METRICSLIST = [ - SetIntM('callback_receiver_events_queue_size_redis', 'Current number of events in redis queue'), - IntM('callback_receiver_events_popped_redis', 'Number of events popped from redis'), - IntM('callback_receiver_events_in_memory', 'Current number of events in memory (in transfer from redis to db)'), - IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'), - FloatM('callback_receiver_events_insert_db_seconds', 'Total time spent saving events to database'), - IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'), - IntM('callback_receiver_events_broadcast', 'Number of events broadcast to other control plane nodes'), - HistogramM( - 'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS - ), - SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'), - FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'), - IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'), - FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'), - SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), - SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'), - SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'), - SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'), - SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'), - IntM('task_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'), - SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), - SetIntM('task_manager_tasks_started', 'Number of tasks started'), - SetIntM('task_manager_running_processed', 'Number of running tasks processed'), - SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), - SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), - SetFloatM('task_manager_commit_seconds', 'Time spent in db transaction, including on_commit calls'), - SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent loading pending tasks from db'), - SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), - SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'), - IntM('dependency_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'), - SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), - SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'), - SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'), - IntM('workflow_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'), - SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), - SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'), - SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'), - # dispatcher subsystem metrics - SetIntM('dispatcher_pool_scale_up_events', 'Number of times local dispatcher scaled up a worker since startup'), - SetIntM('dispatcher_pool_active_task_count', 'Number of active tasks in the worker pool when last task was submitted'), - SetIntM('dispatcher_pool_max_worker_count', 'Highest number of workers in worker pool in last collection interval, about 20s'), - SetFloatM('dispatcher_availability', 'Fraction of time (in last collection interval) dispatcher was able to receive messages'), - ] + self.instance_name = settings.CLUSTER_HOST_ID # Same as Instance.objects.my_hostname() BUT we do not need to import Instance + # turn metric list into dictionary with the metric name as a key self.METRICS = {} - for m in METRICSLIST: + for m in itertools.chain(self.METRICSLIST, self._METRICSLIST): self.METRICS[m.field] = m # track last time metrics were sent to other nodes @@ -230,7 +223,7 @@ class Metrics: m.reset_value(self.conn) self.metrics_have_changed = True self.conn.delete(root_key + "_lock") - for m in self.conn.scan_iter(root_key + '_instance_*'): + for m in self.conn.scan_iter(root_key + '-' + self._namespace + '_instance_*'): self.conn.delete(m) def inc(self, field, value): @@ -297,7 +290,7 @@ class Metrics: def send_metrics(self): # more than one thread could be calling this at the same time, so should # acquire redis lock before sending metrics - lock = self.conn.lock(root_key + '_lock') + lock = self.conn.lock(root_key + '-' + self._namespace + '_lock') if not lock.acquire(blocking=False): return try: @@ -307,9 +300,10 @@ class Metrics: payload = { 'instance': self.instance_name, 'metrics': serialized_metrics, + 'metrics_namespace': self._namespace, } # store the serialized data locally as well, so that load_other_metrics will read it - self.conn.set(root_key + '_instance_' + self.instance_name, serialized_metrics) + self.conn.set(root_key + '-' + self._namespace + '_instance_' + self.instance_name, serialized_metrics) emit_channel_notification("metrics", payload) self.previous_send_metrics.set(current_time) @@ -331,14 +325,14 @@ class Metrics: instances_filter = request.query_params.getlist("node") # get a sorted list of instance names instance_names = [self.instance_name] - for m in self.conn.scan_iter(root_key + '_instance_*'): + for m in self.conn.scan_iter(root_key + '-' + self._namespace + '_instance_*'): instance_names.append(m.decode('UTF-8').split('_instance_')[1]) instance_names.sort() # load data, including data from the this local instance instance_data = {} for instance in instance_names: if len(instances_filter) == 0 or instance in instances_filter: - instance_data_from_redis = self.conn.get(root_key + '_instance_' + instance) + instance_data_from_redis = self.conn.get(root_key + '-' + self._namespace + '_instance_' + instance) # data from other instances may not be available. That is OK. if instance_data_from_redis: instance_data[instance] = json.loads(instance_data_from_redis.decode('UTF-8')) @@ -357,6 +351,120 @@ class Metrics: return output_text +class DispatcherMetrics(Metrics): + METRICSLIST = [ + SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), + SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'), + SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'), + SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'), + SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('task_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'), + SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetIntM('task_manager_tasks_started', 'Number of tasks started'), + SetIntM('task_manager_running_processed', 'Number of running tasks processed'), + SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), + SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), + SetFloatM('task_manager_commit_seconds', 'Time spent in db transaction, including on_commit calls'), + SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent loading pending tasks from db'), + SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), + SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('dependency_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'), + SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'), + SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('workflow_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'), + SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'), + SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'), + # dispatcher subsystem metrics + SetIntM('dispatcher_pool_scale_up_events', 'Number of times local dispatcher scaled up a worker since startup'), + SetIntM('dispatcher_pool_active_task_count', 'Number of active tasks in the worker pool when last task was submitted'), + SetIntM('dispatcher_pool_max_worker_count', 'Highest number of workers in worker pool in last collection interval, about 20s'), + SetFloatM('dispatcher_availability', 'Fraction of time (in last collection interval) dispatcher was able to receive messages'), + ] + + def __init__(self, *args, **kwargs): + super().__init__(settings.METRICS_SERVICE_DISPATCHER, *args, **kwargs) + + +class CallbackReceiverMetrics(Metrics): + METRICSLIST = [ + SetIntM('callback_receiver_events_queue_size_redis', 'Current number of events in redis queue'), + IntM('callback_receiver_events_popped_redis', 'Number of events popped from redis'), + IntM('callback_receiver_events_in_memory', 'Current number of events in memory (in transfer from redis to db)'), + IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'), + FloatM('callback_receiver_events_insert_db_seconds', 'Total time spent saving events to database'), + IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'), + IntM('callback_receiver_events_broadcast', 'Number of events broadcast to other control plane nodes'), + HistogramM( + 'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS + ), + SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'), + ] + + def __init__(self, *args, **kwargs): + super().__init__(settings.METRICS_SERVICE_CALLBACK_RECEIVER, *args, **kwargs) + + def metrics(request): - m = Metrics() - return m.generate_metrics(request) + output_text = '' + for m in [DispatcherMetrics(), CallbackReceiverMetrics()]: + output_text += m.generate_metrics(request) + return output_text + + +class CustomToPrometheusMetricsCollector(prometheus_client.registry.Collector): + """ + Takes the metric data from redis -> our custom metric fields -> prometheus + library metric fields. + + The plan is to get rid of the use of redis, our custom metric fields, and + to switch fully to the prometheus library. At that point, this translation + code will be deleted. + """ + + def __init__(self, metrics_obj, *args, **kwargs): + super().__init__(*args, **kwargs) + self._metrics = metrics_obj + + def collect(self): + my_hostname = settings.CLUSTER_HOST_ID + + instance_data = self._metrics.load_other_metrics(Request(HttpRequest())) + if not instance_data: + logger.debug(f"No metric data not found in redis for metric namespace '{self._metrics._namespace}'") + return None + + host_metrics = instance_data.get(my_hostname) + for _, metric in self._metrics.METRICS.items(): + entry = host_metrics.get(metric.field) + if not entry: + logger.debug(f"{self._metrics._namespace} metric '{metric.field}' not found in redis data payload {json.dumps(instance_data, indent=2)}") + continue + if isinstance(metric, HistogramM): + buckets = list(zip(metric.buckets, entry['counts'])) + buckets = [[str(i[0]), str(i[1])] for i in buckets] + yield HistogramMetricFamily(metric.field, metric.help_text, buckets=buckets, sum_value=entry['sum']) + else: + yield GaugeMetricFamily(metric.field, metric.help_text, value=entry) + + +class CallbackReceiverMetricsServer(MetricsServer): + def __init__(self): + registry = CollectorRegistry(auto_describe=True) + registry.register(CustomToPrometheusMetricsCollector(DispatcherMetrics(metrics_have_changed=False))) + super().__init__(settings.METRICS_SERVICE_CALLBACK_RECEIVER, registry) + + +class DispatcherMetricsServer(MetricsServer): + def __init__(self): + registry = CollectorRegistry(auto_describe=True) + registry.register(CustomToPrometheusMetricsCollector(CallbackReceiverMetrics(metrics_have_changed=False))) + super().__init__(settings.METRICS_SERVICE_DISPATCHER, registry) + + +class WebsocketsMetricsServer(MetricsServer): + def __init__(self): + registry = CollectorRegistry(auto_describe=True) + # registry.register() + super().__init__(settings.METRICS_SERVICE_WEBSOCKETS, registry) diff --git a/awx/main/consumers.py b/awx/main/consumers.py index f856ca915e..ccaf46e0d9 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -106,7 +106,7 @@ class RelayConsumer(AsyncJsonWebsocketConsumer): if group == "metrics": message = json.loads(message['text']) conn = redis.Redis.from_url(settings.BROKER_URL) - conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "_instance_" + message['instance'], message['metrics']) + conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "-" + message['metrics_namespace'] + "_instance_" + message['instance'], message['metrics']) else: await self.channel_layer.group_send(group, message) diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 05b51b2930..691028cd42 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -168,7 +168,7 @@ class AWXConsumerPG(AWXConsumerBase): init_time = time.time() self.pg_down_time = init_time - self.pg_max_wait # allow no grace period self.last_cleanup = init_time - self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) + self.subsystem_metrics = s_metrics.DispatcherMetrics(auto_pipe_execute=False) self.last_metrics_gather = init_time self.listen_cumulative_time = 0.0 if schedule: diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 8dca8dd894..199302c76c 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -72,7 +72,7 @@ class CallbackBrokerWorker(BaseWorker): def __init__(self): self.buff = {} self.redis = redis.Redis.from_url(settings.BROKER_URL) - self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) + self.subsystem_metrics = s_metrics.CallbackReceiverMetrics(auto_pipe_execute=False) self.queue_pop = 0 self.queue_name = settings.CALLBACK_QUEUE self.prof = AWXProfiler("CallbackBrokerWorker") diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index cb3ab781b5..6ab2158cae 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -3,6 +3,7 @@ from django.conf import settings from django.core.management.base import BaseCommand +from awx.main.analytics.subsystem_metrics import CallbackReceiverMetricsServer from awx.main.dispatch.control import Control from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker @@ -25,6 +26,9 @@ class Command(BaseCommand): print(Control('callback_receiver').status()) return consumer = None + + CallbackReceiverMetricsServer().start() + try: consumer = AWXConsumerRedis( 'callback_receiver', diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index 111b5ab0e1..28d954abff 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -10,6 +10,7 @@ from awx.main.dispatch import get_task_queuename from awx.main.dispatch.control import Control from awx.main.dispatch.pool import AutoscalePool from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker +from awx.main.analytics.subsystem_metrics import DispatcherMetricsServer logger = logging.getLogger('awx.main.dispatch') @@ -62,6 +63,8 @@ class Command(BaseCommand): consumer = None + DispatcherMetricsServer().start() + try: queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()] consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE) diff --git a/awx/main/management/commands/run_wsrelay.py b/awx/main/management/commands/run_wsrelay.py index 90edafdfc5..2bf7e8b898 100644 --- a/awx/main/management/commands/run_wsrelay.py +++ b/awx/main/management/commands/run_wsrelay.py @@ -16,6 +16,7 @@ from awx.main.analytics.broadcast_websocket import ( RelayWebsocketStatsManager, safe_name, ) +from awx.main.analytics.subsystem_metrics import WebsocketsMetricsServer from awx.main.wsrelay import WebSocketRelayManager @@ -91,6 +92,8 @@ class Command(BaseCommand): return host_stats def handle(self, *arg, **options): + WebsocketsMetricsServer().start() + # it's necessary to delay this import in case # database migrations are still running from awx.main.models.ha import Instance diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index d064549012..7e72d4cb8e 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -68,7 +68,7 @@ class TaskBase: # initialize each metric to 0 and force metric_has_changed to true. This # ensures each task manager metric will be overridden when pipe_execute # is called later. - self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) + self.subsystem_metrics = s_metrics.DispatcherMetrics(auto_pipe_execute=False) self.start_time = time.time() # We want to avoid calling settings in loops, so cache these settings at init time @@ -105,7 +105,7 @@ class TaskBase: try: # increment task_manager_schedule_calls regardless if the other # metrics are recorded - s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1) + s_metrics.DispatcherMetrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1) # Only record metrics if the last time recording was more # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. # Prevents a short-duration task manager that runs directly after a diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 4614b7b4a4..4ebe1d5766 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -62,7 +62,7 @@ from awx.main.tasks.receptor import get_receptor_ctl, worker_info, worker_cleanu from awx.main.consumers import emit_channel_notification from awx.main import analytics from awx.conf import settings_registry -from awx.main.analytics.subsystem_metrics import Metrics +from awx.main.analytics.subsystem_metrics import DispatcherMetrics from rest_framework.exceptions import PermissionDenied @@ -113,7 +113,7 @@ def dispatch_startup(): cluster_node_heartbeat() reaper.startup_reaping() reaper.reap_waiting(grace_period=0) - m = Metrics() + m = DispatcherMetrics() m.reset_values() diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 1a5f727b5d..5f3d165153 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -20,7 +20,6 @@ from awx.main.analytics.broadcast_websocket import ( RelayWebsocketStats, RelayWebsocketStatsManager, ) -import awx.main.analytics.subsystem_metrics as s_metrics logger = logging.getLogger('awx.main.wsrelay') @@ -54,7 +53,6 @@ class WebsocketRelayConnection: self.protocol = protocol self.verify_ssl = verify_ssl self.channel_layer = None - self.subsystem_metrics = s_metrics.Metrics(instance_name=name) self.producers = dict() self.connected = False diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index f3f8f1881b..589c382b92 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1076,6 +1076,35 @@ HOST_METRIC_SUMMARY_TASK_LAST_TS = None HOST_METRIC_SUMMARY_TASK_INTERVAL = 7 # days +# TODO: cmeyers, replace with with register pattern +# The register pattern is particularly nice for this because we need +# to know the process to start the thread that will be the server. +# The registration location should be the same location as we would +# call MetricsServer.start() +# Note: if we don't get to this TODO, then at least create constants +# for the services strings below. +# TODO: cmeyers, break this out into a separate django app so other +# projects can take advantage. + +METRICS_SERVICE_CALLBACK_RECEIVER = 'callback_receiver' +METRICS_SERVICE_DISPATCHER = 'dispatcher' +METRICS_SERVICE_WEBSOCKETS = 'websockets' + +METRICS_SUBSYSTEM_CONFIG = { + 'server': { + METRICS_SERVICE_CALLBACK_RECEIVER: { + 'port': 8014, + }, + METRICS_SERVICE_DISPATCHER: { + 'port': 8015, + }, + METRICS_SERVICE_WEBSOCKETS: { + 'port': 8016, + }, + } +} + + # django-ansible-base ANSIBLE_BASE_TEAM_MODEL = 'main.Team' ANSIBLE_BASE_ORGANIZATION_MODEL = 'main.Organization' |