diff options
-rw-r--r-- | awx/main/analytics/collectors.py | 2 | ||||
-rw-r--r-- | awx/main/analytics/core.py | 4 | ||||
-rw-r--r-- | awx/main/dispatch/pool.py | 4 | ||||
-rw-r--r-- | awx/main/migrations/0158_make_instance_cpu_decimal.py | 19 | ||||
-rw-r--r-- | awx/main/models/ha.py | 6 | ||||
-rw-r--r-- | awx/main/tests/unit/settings/test_k8s_resource_setttings.py | 61 | ||||
-rw-r--r-- | awx/main/utils/common.py | 147 |
7 files changed, 202 insertions, 41 deletions
diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 9fd8005a91..f8456ca2f1 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -211,7 +211,7 @@ def projects_by_scm_type(since, **kwargs): return counts -@register('instance_info', '1.1', description=_('Cluster topology and capacity')) +@register('instance_info', '1.2', description=_('Cluster topology and capacity')) def instance_info(since, include_hostnames=False, **kwargs): info = {} instances = models.Instance.objects.values_list('hostname').values( diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index d63afdfbf3..6aa2f5090e 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -90,7 +90,7 @@ def package(target, data, timestamp): if isinstance(item, str): f.add(item, arcname=f'./{name}') else: - buf = json.dumps(item).encode('utf-8') + buf = json.dumps(item, cls=DjangoJSONEncoder).encode('utf-8') info = tarfile.TarInfo(f'./{name}') info.size = len(buf) info.mtime = timestamp.timestamp() @@ -230,7 +230,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti try: last_entry = max(last_entries.get(key) or last_gather, until - timedelta(weeks=4)) results = (func(since or last_entry, collection_type=collection_type, until=until), func.__awx_analytics_version__) - json.dumps(results) # throwaway check to see if the data is json-serializable + json.dumps(results, cls=DjangoJSONEncoder) # throwaway check to see if the data is json-serializable data[filename] = results except Exception: logger.exception("Could not generate metric {}".format(filename)) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 97e2fa630a..3d08ca3fd7 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -22,6 +22,7 @@ import psutil from awx.main.models import UnifiedJob from awx.main.dispatch import reaper +from awx.main.utils.common import convert_mem_str_to_bytes if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -319,7 +320,8 @@ class AutoscalePool(WorkerPool): if self.max_workers is None: settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None) if settings_absmem is not None: - total_memory_gb = int(settings_absmem) + # There are 1073741824 bytes in a gigabyte. Convert bytes to gigabytes by dividing by 2**30 + total_memory_gb = convert_mem_str_to_bytes(settings_absmem) // 2**30 else: total_memory_gb = (psutil.virtual_memory().total >> 30) + 1 # noqa: round up # 5 workers per GB of total memory diff --git a/awx/main/migrations/0158_make_instance_cpu_decimal.py b/awx/main/migrations/0158_make_instance_cpu_decimal.py new file mode 100644 index 0000000000..b78ff1b754 --- /dev/null +++ b/awx/main/migrations/0158_make_instance_cpu_decimal.py @@ -0,0 +1,19 @@ +# Generated by Django 2.2.24 on 2022-02-14 17:37 + +from decimal import Decimal +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0157_inventory_labels'), + ] + + operations = [ + migrations.AlterField( + model_name='instance', + name='cpu', + field=models.DecimalField(decimal_places=1, default=Decimal('0'), editable=False, max_digits=4), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 43c1567119..add2564015 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -82,8 +82,10 @@ class Instance(HasPolicyEditsMixin, BaseModel): modified = models.DateTimeField(auto_now=True) # Fields defined in health check or heartbeat version = models.CharField(max_length=120, blank=True) - cpu = models.IntegerField( - default=0, + cpu = models.DecimalField( + default=Decimal(0.0), + max_digits=4, + decimal_places=1, editable=False, ) memory = models.BigIntegerField( diff --git a/awx/main/tests/unit/settings/test_k8s_resource_setttings.py b/awx/main/tests/unit/settings/test_k8s_resource_setttings.py new file mode 100644 index 0000000000..a2899a8561 --- /dev/null +++ b/awx/main/tests/unit/settings/test_k8s_resource_setttings.py @@ -0,0 +1,61 @@ +import pytest + +from unittest import mock + +from awx.main.utils.common import ( + convert_mem_str_to_bytes, + get_mem_effective_capacity, + get_corrected_memory, + convert_cpu_str_to_decimal_cpu, + get_cpu_effective_capacity, + get_corrected_cpu, +) + + +@pytest.mark.parametrize( + "value,converted_value,mem_capacity", + [ + ('2G', 2000000000, 19), + ('4G', 4000000000, 38), + ('2Gi', 2147483648, 20), + ('2.1G', 1, 1), # expressing memory with non-integers is not supported, and we'll fall back to 1 fork for memory capacity. + ('4Gi', 4294967296, 40), + ('2M', 2000000, 1), + ('3M', 3000000, 1), + ('2Mi', 2097152, 1), + ('2048Mi', 2147483648, 20), + ('4096Mi', 4294967296, 40), + ('64G', 64000000000, 610), + ('64Garbage', 1, 1), + ], +) +def test_SYSTEM_TASK_ABS_MEM_conversion(value, converted_value, mem_capacity): + with mock.patch('django.conf.settings') as mock_settings: + mock_settings.SYSTEM_TASK_ABS_MEM = value + mock_settings.SYSTEM_TASK_FORKS_MEM = 100 + mock_settings.IS_K8S = True + assert convert_mem_str_to_bytes(value) == converted_value + assert get_corrected_memory(-1) == converted_value + assert get_mem_effective_capacity(-1) == mem_capacity + + +@pytest.mark.parametrize( + "value,converted_value,cpu_capacity", + [ + ('2', 2.0, 8), + ('1.5', 1.5, 6), + ('100m', 0.1, 1), + ('2000m', 2.0, 8), + ('4MillionCPUm', 1.0, 4), # Any suffix other than 'm' is not supported, we fall back to 1 CPU + ('Random', 1.0, 4), # Any setting value other than integers, floats millicores (e.g 1, 1.0, or 1000m) is not supported, fall back to 1 CPU + ('2505m', 2.5, 10), + ('1.55', 1.6, 6), + ], +) +def test_SYSTEM_TASK_ABS_CPU_conversion(value, converted_value, cpu_capacity): + with mock.patch('django.conf.settings') as mock_settings: + mock_settings.SYSTEM_TASK_ABS_CPU = value + mock_settings.SYSTEM_TASK_FORKS_CPU = 4 + assert convert_cpu_str_to_decimal_cpu(value) == converted_value + assert get_corrected_cpu(-1) == converted_value + assert get_cpu_effective_capacity(-1) == cpu_capacity diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 45f3ae66c6..49885d70c7 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -692,16 +692,56 @@ def parse_yaml_or_json(vars_str, silent_failure=True): return vars_dict -def get_cpu_effective_capacity(cpu_count): +def convert_cpu_str_to_decimal_cpu(cpu_str): + """Convert a string indicating cpu units to decimal. + + Useful for dealing with cpu setting that may be expressed in units compatible with + kubernetes. + + See https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units + """ + cpu = cpu_str + millicores = False + + if cpu_str[-1] == 'm': + cpu = cpu_str[:-1] + millicores = True + + try: + cpu = float(cpu) + except ValueError: + cpu = 1.0 + millicores = False + logger.warning(f"Could not convert SYSTEM_TASK_ABS_CPU {cpu_str} to a decimal number, falling back to default of 1 cpu") + + if millicores: + cpu = cpu / 1000 + + # Per kubernetes docs, fractional CPU less than .1 are not allowed + return max(0.1, round(cpu, 1)) + + +def get_corrected_cpu(cpu_count): # formerlly get_cpu_capacity + """Some environments will do a correction to the reported CPU number + because the given OpenShift value is a lie + """ from django.conf import settings settings_abscpu = getattr(settings, 'SYSTEM_TASK_ABS_CPU', None) env_abscpu = os.getenv('SYSTEM_TASK_ABS_CPU', None) if env_abscpu is not None: - return int(env_abscpu) + return convert_cpu_str_to_decimal_cpu(env_abscpu) elif settings_abscpu is not None: - return int(settings_abscpu) + return convert_cpu_str_to_decimal_cpu(settings_abscpu) + + return cpu_count # no correction + + +def get_cpu_effective_capacity(cpu_count): + from django.conf import settings + + cpu_count = get_corrected_cpu(cpu_count) settings_forkcpu = getattr(settings, 'SYSTEM_TASK_FORKS_CPU', None) env_forkcpu = os.getenv('SYSTEM_TASK_FORKS_CPU', None) @@ -713,58 +753,95 @@ def get_cpu_effective_capacity(cpu_count): else: forkcpu = 4 - return cpu_count * forkcpu + return max(1, int(cpu_count * forkcpu)) -def get_corrected_cpu(cpu_count): # formerlly get_cpu_capacity - """Some environments will do a correction to the reported CPU number - because the given OpenShift value is a lie - """ - from django.conf import settings +def convert_mem_str_to_bytes(mem_str): + """Convert string with suffix indicating units to memory in bytes (base 2) - settings_abscpu = getattr(settings, 'SYSTEM_TASK_ABS_CPU', None) - env_abscpu = os.getenv('SYSTEM_TASK_ABS_CPU', None) - - if env_abscpu is not None or settings_abscpu is not None: - return 0 + Useful for dealing with memory setting that may be expressed in units compatible with + kubernetes. - return cpu_count # no correction + See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory + """ + # If there is no suffix, the memory sourced from the request is in bytes + if mem_str.isdigit(): + return int(mem_str) + + conversions = { + 'Ei': lambda x: x * 2**60, + 'E': lambda x: x * 10**18, + 'Pi': lambda x: x * 2**50, + 'P': lambda x: x * 10**15, + 'Ti': lambda x: x * 2**40, + 'T': lambda x: x * 10**12, + 'Gi': lambda x: x * 2**30, + 'G': lambda x: x * 10**9, + 'Mi': lambda x: x * 2**20, + 'M': lambda x: x * 10**6, + 'Ki': lambda x: x * 2**10, + 'K': lambda x: x * 10**3, + } + mem = 0 + mem_unit = None + for i, char in enumerate(mem_str): + if not char.isdigit(): + mem_unit = mem_str[i:] + mem = int(mem_str[:i]) + break + if not mem_unit or mem_unit not in conversions.keys(): + error = f"Unsupported value for SYSTEM_TASK_ABS_MEM: {mem_str}, memory must be expressed in bytes or with known suffix: {conversions.keys()}. Falling back to 1 byte" + logger.warning(error) + return 1 + return max(1, conversions[mem_unit](mem)) -def get_mem_effective_capacity(mem_mb): +def get_corrected_memory(memory): from django.conf import settings settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None) env_absmem = os.getenv('SYSTEM_TASK_ABS_MEM', None) + # Runner returns memory in bytes + # so we convert memory from settings to bytes as well. if env_absmem is not None: - return int(env_absmem) + return convert_mem_str_to_bytes(env_absmem) elif settings_absmem is not None: - return int(settings_absmem) - - settings_forkmem = getattr(settings, 'SYSTEM_TASK_FORKS_MEM', None) - env_forkmem = os.getenv('SYSTEM_TASK_FORKS_MEM', None) + return convert_mem_str_to_bytes(settings_absmem) - if env_forkmem: - forkmem = int(env_forkmem) - elif settings_forkmem: - forkmem = int(settings_forkmem) - else: - forkmem = 100 - - return max(1, ((mem_mb // 1024 // 1024) - 2048) // forkmem) + return memory -def get_corrected_memory(memory): +def get_mem_effective_capacity(mem_bytes): from django.conf import settings - settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None) - env_absmem = os.getenv('SYSTEM_TASK_ABS_MEM', None) + mem_bytes = get_corrected_memory(mem_bytes) - if env_absmem is not None or settings_absmem is not None: - return 0 + settings_mem_mb_per_fork = getattr(settings, 'SYSTEM_TASK_FORKS_MEM', None) + env_mem_mb_per_fork = os.getenv('SYSTEM_TASK_FORKS_MEM', None) - return memory + if env_mem_mb_per_fork: + mem_mb_per_fork = int(env_mem_mb_per_fork) + elif settings_mem_mb_per_fork: + mem_mb_per_fork = int(settings_mem_mb_per_fork) + else: + mem_mb_per_fork = 100 + + # Per docs, deduct 2GB of memory from the available memory + # to cover memory consumption of background tasks when redis/web etc are colocated with + # the other control processes + memory_penalty_bytes = 2147483648 + if settings.IS_K8S: + # In k8s, this is dealt with differently because + # redis and the web containers have their own memory allocation + memory_penalty_bytes = 0 + + # convert memory to megabytes because our setting of how much memory we + # should allocate per fork is in megabytes + mem_mb = (mem_bytes - memory_penalty_bytes) // 2**20 + max_forks_based_on_memory = mem_mb // mem_mb_per_fork + + return max(1, max_forks_based_on_memory) _inventory_updates = threading.local() |