summaryrefslogtreecommitdiffstats
path: root/awx/main/analytics/core.py
blob: d63afdfbf31f4bae765c82abf4c1c7fb0d66e8fa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
import inspect
import io
import json
import logging
import os
import os.path
import pathlib
import shutil
import tarfile
import tempfile

from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder
from django.utils.timezone import now, timedelta
from rest_framework.exceptions import PermissionDenied
import requests

from awx.conf.license import get_license
from awx.main.models import Job
from awx.main.access import access_registry
from awx.main.utils import get_awx_http_client_headers, set_environ, datetime_hook
from awx.main.utils.pglock import advisory_lock

__all__ = ['register', 'gather', 'ship']


logger = logging.getLogger('awx.main.analytics')


def _valid_license():
    try:
        if get_license().get('license_type', 'UNLICENSED') == 'open':
            return False
        access_registry[Job](None).check_license()
    except PermissionDenied:
        logger.exception("A valid license was not found:")
        return False
    return True


def all_collectors():
    from awx.main.analytics import collectors

    return {
        func.__awx_analytics_key__: {
            'name': func.__awx_analytics_key__,
            'version': func.__awx_analytics_version__,
            'description': func.__awx_analytics_description__ or '',
        }
        for name, func in inspect.getmembers(collectors)
        if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__')
    }


def register(key, version, description=None, format='json', expensive=None):
    """
    A decorator used to register a function as a metric collector.

    Decorated functions should do the following based on format:
    - json: return JSON-serializable objects.
    - csv: write CSV data to a filename named 'key'

    @register('projects_by_scm_type', 1)
    def projects_by_scm_type():
        return {'git': 5, 'svn': 1}
    """

    def decorate(f):
        f.__awx_analytics_key__ = key
        f.__awx_analytics_version__ = version
        f.__awx_analytics_description__ = description
        f.__awx_analytics_type__ = format
        f.__awx_expensive__ = expensive
        return f

    return decorate


def package(target, data, timestamp):
    try:
        tarname_base = f'{settings.SYSTEM_UUID}-{timestamp.strftime("%Y-%m-%d-%H%M%S%z")}'
        path = pathlib.Path(target)
        index = len(list(path.glob(f'{tarname_base}-*.*')))
        tarname = f'{tarname_base}-{index}.tar.gz'

        manifest = {}
        with tarfile.open(target.joinpath(tarname), 'w:gz') as f:
            for name, (item, version) in data.items():
                try:
                    if isinstance(item, str):
                        f.add(item, arcname=f'./{name}')
                    else:
                        buf = json.dumps(item).encode('utf-8')
                        info = tarfile.TarInfo(f'./{name}')
                        info.size = len(buf)
                        info.mtime = timestamp.timestamp()
                        f.addfile(info, fileobj=io.BytesIO(buf))
                    manifest[name] = version
                except Exception:
                    logger.exception(f"Could not generate metric {name}")
                    return None

            try:
                buf = json.dumps(manifest).encode('utf-8')
                info = tarfile.TarInfo('./manifest.json')
                info.size = len(buf)
                info.mtime = timestamp.timestamp()
                f.addfile(info, fileobj=io.BytesIO(buf))
            except Exception:
                logger.exception("Could not generate manifest.json")
                return None

        return f.name
    except Exception:
        logger.exception("Failed to write analytics archive file")
        return None


def calculate_collection_interval(since, until):
    _now = now()

    # Make sure that the endpoints are not in the future.
    if until is not None and until > _now:
        until = _now
        logger.warning(f"End of the collection interval is in the future, setting to {_now}.")
    if since is not None and since > _now:
        since = _now
        logger.warning(f"Start of the collection interval is in the future, setting to {_now}.")

    # The value of `until` needs to be concrete, so resolve it.  If it wasn't passed in,
    # set it to `now`, but only if that isn't more than 4 weeks ahead of a passed-in
    # `since` parameter.
    if since is not None:
        if until is not None:
            if until > since + timedelta(weeks=4):
                until = since + timedelta(weeks=4)
                logger.warning(f"End of the collection interval is greater than 4 weeks from start, setting end to {until}.")
        else:  # until is None
            until = min(since + timedelta(weeks=4), _now)
    elif until is None:
        until = _now

    if since and since >= until:
        logger.warning("Start of the collection interval is later than the end, ignoring request.")
        raise ValueError

    # The ultimate beginning of the interval needs to be compared to 4 weeks prior to
    # `until`, but we want to keep `since` empty if it wasn't passed in because we use that
    # case to know whether to use the bookkeeping settings variables to decide the start of
    # the interval.
    horizon = until - timedelta(weeks=4)
    if since is not None and since < horizon:
        since = horizon
        logger.warning(f"Start of the collection interval is more than 4 weeks prior to {until}, setting to {horizon}.")

    last_gather = settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon
    if last_gather < horizon:
        last_gather = horizon
        logger.warning(f"Last analytics run was more than 4 weeks prior to {until}, using {horizon} instead.")

    return since, until, last_gather


def gather(dest=None, module=None, subset=None, since=None, until=None, collection_type='scheduled'):
    """
    Gather all defined metrics and write them as JSON files in a .tgz

    :param dest:   the (optional) absolute path to write a compressed tarball
    :param module: the module to search for registered analytic collector
                   functions; defaults to awx.main.analytics.collectors
    """
    log_level = logging.ERROR if collection_type != 'scheduled' else logging.DEBUG

    if not _valid_license():
        logger.log(log_level, "Invalid License provided, or No License Provided")
        return None

    if collection_type != 'dry-run':
        if not settings.INSIGHTS_TRACKING_STATE:
            logger.log(log_level, "Insights for Ansible Automation Platform not enabled. Use --dry-run to gather locally without sending.")
            return None

        if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD):
            logger.log(log_level, "Not gathering analytics, configuration is invalid. Use --dry-run to gather locally without sending.")
            return None

    with advisory_lock('gather_analytics_lock', wait=False) as acquired:
        if not acquired:
            logger.log(log_level, "Not gathering analytics, another task holds lock")
            return None

        from awx.conf.models import Setting
        from awx.main.analytics import collectors
        from awx.main.signals import disable_activity_stream

        logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER))

        try:
            since, until, last_gather = calculate_collection_interval(since, until)
        except ValueError:
            return None

        last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first()
        last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook)

        collector_module = module if module else collectors
        collector_list = [
            func
            for name, func in inspect.getmembers(collector_module)
            if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset)
        ]
        if not any(c.__awx_analytics_key__ == 'config' for c in collector_list):
            # In order to ship to analytics, we must include the output of the built-in 'config' collector.
            collector_list.append(collectors.config)

        json_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'json']
        csv_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'csv']

        dest = pathlib.Path(dest or tempfile.mkdtemp(prefix='awx_analytics'))
        gather_dir = dest.joinpath('stage')
        gather_dir.mkdir(mode=0o700)
        tarfiles = []
        succeeded = True

        # These json collectors are pretty compact, so collect all of them before shipping to analytics.
        data = {}
        for func in json_collectors:
            key = func.__awx_analytics_key__
            filename = f'{key}.json'
            try:
                last_entry = max(last_entries.get(key) or last_gather, until - timedelta(weeks=4))
                results = (func(since or last_entry, collection_type=collection_type, until=until), func.__awx_analytics_version__)
                json.dumps(results)  # throwaway check to see if the data is json-serializable
                data[filename] = results
            except Exception:
                logger.exception("Could not generate metric {}".format(filename))
        if data:
            if data.get('config.json') is None:
                logger.error("'config' collector data is missing.")
                return None

            tgzfile = package(dest.parent, data, until)
            if tgzfile is not None:
                tarfiles.append(tgzfile)
                if collection_type != 'dry-run':
                    if ship(tgzfile):
                        with disable_activity_stream():
                            for filename in data:
                                key = filename.replace('.json', '')
                                last_entries[key] = max(last_entries[key], until) if last_entries.get(key) else until
                            settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder)
                    else:
                        succeeded = False

        for func in csv_collectors:
            key = func.__awx_analytics_key__
            filename = f'{key}.csv'
            try:
                # These slicer functions may return a generator. The `since` parameter is
                # allowed to be None, and will fall back to LAST_ENTRIES[key] or to
                # LAST_GATHER (truncated appropriately to match the 4-week limit).
                if func.__awx_expensive__:
                    slices = func.__awx_expensive__(key, since, until, last_gather)
                else:
                    slices = collectors.trivial_slicing(key, since, until, last_gather)

                for start, end in slices:
                    files = func(start, full_path=gather_dir, until=end)

                    if not files:
                        if collection_type != 'dry-run':
                            with disable_activity_stream():
                                entry = last_entries.get(key)
                                last_entries[key] = max(entry, end) if entry and type(entry) == type(end) else end
                                settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder)
                        continue

                    slice_succeeded = True
                    for fpath in files:
                        payload = {filename: (fpath, func.__awx_analytics_version__)}

                        payload['config.json'] = data.get('config.json')
                        if payload['config.json'] is None:
                            logger.error("'config' collector data is missing, and is required to ship.")
                            return None

                        tgzfile = package(dest.parent, payload, until)
                        if tgzfile is not None:
                            tarfiles.append(tgzfile)
                            if collection_type != 'dry-run':
                                if not ship(tgzfile):
                                    slice_succeeded, succeeded = False, False
                                    break

                    if slice_succeeded and collection_type != 'dry-run':
                        with disable_activity_stream():
                            entry = last_entries.get(key)
                            last_entries[key] = max(entry, end) if entry and type(entry) == type(end) else end
                            settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder)
            except Exception:
                succeeded = False
                logger.exception("Could not generate metric {}".format(filename))

        if collection_type != 'dry-run':
            if succeeded:
                for fpath in tarfiles:
                    if os.path.exists(fpath):
                        os.remove(fpath)
            with disable_activity_stream():
                if not settings.AUTOMATION_ANALYTICS_LAST_GATHER or until > settings.AUTOMATION_ANALYTICS_LAST_GATHER:
                    # `AUTOMATION_ANALYTICS_LAST_GATHER` is set whether collection succeeds or fails;
                    # if collection fails because of a persistent, underlying issue and we do not set last_gather,
                    # we risk the collectors hitting an increasingly greater workload while the underlying issue
                    # remains unresolved. Put simply, if collection fails, we just move on.

                    # All that said, `AUTOMATION_ANALYTICS_LAST_GATHER` plays a much smaller role in determining
                    # what is actually collected than it used to; collectors now mostly rely on their respective entry
                    # under `last_entries` to determine what should be collected.
                    settings.AUTOMATION_ANALYTICS_LAST_GATHER = until

        shutil.rmtree(dest, ignore_errors=True)  # clean up individual artifact files
        if not tarfiles:
            # No data was collected
            logger.warning("No data from {} to {}".format(since or last_gather, until))
            return None

        return tarfiles


def ship(path):
    """
    Ship gathered metrics to the Insights API
    """
    if not path:
        logger.error('Insights for Ansible Automation Platform TAR not found')
        return False
    if not os.path.exists(path):
        logger.error('Insights for Ansible Automation Platform TAR {} not found'.format(path))
        return False
    if "Error:" in str(path):
        return False

    logger.debug('shipping analytics file: {}'.format(path))
    url = getattr(settings, 'AUTOMATION_ANALYTICS_URL', None)
    if not url:
        logger.error('AUTOMATION_ANALYTICS_URL is not set')
        return False
    rh_user = getattr(settings, 'REDHAT_USERNAME', None)
    rh_password = getattr(settings, 'REDHAT_PASSWORD', None)
    if not rh_user:
        logger.error('REDHAT_USERNAME is not set')
        return False
    if not rh_password:
        logger.error('REDHAT_PASSWORD is not set')
        return False
    with open(path, 'rb') as f:
        files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)}
        s = requests.Session()
        s.headers = get_awx_http_client_headers()
        s.headers.pop('Content-Type')
        with set_environ(**settings.AWX_TASK_ENV):
            response = s.post(
                url, files=files, verify="/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", auth=(rh_user, rh_password), headers=s.headers, timeout=(31, 31)
            )
        # Accept 2XX status_codes
        if response.status_code >= 300:
            logger.error('Upload failed with status {}, {}'.format(response.status_code, response.text))
            return False

        return True