1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
|
#! /usr/bin/env awx-python
#
# !!! READ BEFORE POINTING THIS AT YOUR FOOT !!!
#
# This script attempts to connect to an AWX database and insert (by default)
# a billion main_jobevent rows as screamingly fast as possible.
#
# tl;dr for best results, feed it high IOPS.
#
# this script exists *solely* for the purpose of generating *test* data very
# quickly; do *not* point this at a production installation or you *will* be
# very unhappy
#
# Before running this script, you should give postgres *GOBS* of memory
# and disk so it can create indexes and constraints as quickly as possible.
# In fact, it's probably not smart to attempt this on anything less than 8 core,
# 32GB of RAM, and tens of thousands of IOPS.
#
# Also, a billion events is a *lot* of data; make sure you've
# provisioned *at least* 750GB of disk space
#
# if you want this script to complete in a few hours, a good starting point
# is something like m5.4xlarge w/ 1TB provisioned IOPS SSD (io1)
#
import math
import argparse
import datetime
import itertools
import json
import multiprocessing
import pkg_resources
import random
import subprocess
import sys
from io import StringIO
from time import time
from uuid import uuid4
import psycopg2
from django import setup as setup_django
from django.db import connection
from django.db.models.sql import InsertQuery
from django.utils.timezone import now
db = json.loads(subprocess.check_output(['awx-manage', 'print_settings', 'DATABASES', '--format', 'json']))
name = db['DATABASES']['default']['NAME']
user = db['DATABASES']['default']['USER']
pw = db['DATABASES']['default']['PASSWORD']
host = db['DATABASES']['default']['HOST']
dsn = f'dbname={name} user={user} password={pw} host={host}'
u = str(uuid4())
STATUS_OPTIONS = ('successful', 'failed', 'error', 'canceled')
EVENT_OPTIONS = ('runner_on_ok', 'runner_on_failed', 'runner_on_changed', 'runner_on_skipped', 'runner_on_unreachable')
MODULE_OPTIONS = ('yup', 'stonchronize', 'templotz', 'deboog')
class YieldedRows(StringIO):
def __init__(self, job_id, rows, created_stamp, modified_stamp, *args, **kwargs):
self.rows = rows
self.rowlist = []
for (event, module) in itertools.product(EVENT_OPTIONS, MODULE_OPTIONS):
event_data_json = {"task_action": module, "name": "Do a {} thing".format(module), "task": "Do a {} thing".format(module)}
row = (
"\t".join(
[
f"{created_stamp}",
f"{modified_stamp}",
f"{created_stamp}",
event,
json.dumps(event_data_json),
str(event in ('runner_on_failed', 'runner_on_unreachable')),
str(event == 'runner_on_changed'),
"localhost",
"Example Play",
"Hello World",
"",
"0",
"1",
job_id,
u,
"",
"1",
"hello_world.yml",
"0",
"X",
"1",
]
)
+ '\n'
)
self.rowlist.append(row)
def read(self, x):
if self.rows <= 0:
self.close()
return ''
elif self.rows >= 1 and self.rows < 1000:
event_rows = self.rowlist[random.randrange(len(self.rowlist))] * self.rows
self.rows -= self.rows
return event_rows
self.rows -= 1000
return self.rowlist[random.randrange(len(self.rowlist))] * 1000
def firehose(job, count, created_stamp, modified_stamp):
conn = psycopg2.connect(dsn)
f = YieldedRows(job, count, created_stamp, modified_stamp)
with conn.cursor() as cursor:
cursor.copy_expert(
(
'COPY '
'main_jobevent('
'created, modified, job_created, event, event_data, failed, changed, '
'host_name, play, role, task, counter, host_id, job_id, uuid, '
'parent_uuid, end_line, playbook, start_line, stdout, verbosity'
') '
'FROM STDIN'
),
f,
size=1024 * 1000,
)
conn.commit()
conn.close()
def cleanup(sql):
print(sql)
conn = psycopg2.connect(dsn)
with conn.cursor() as cursor:
cursor.execute(sql)
conn.commit()
conn.close()
def generate_jobs(jobs, batch_size, time_delta):
print(f'inserting {jobs} job(s)')
sys.path.insert(0, pkg_resources.get_distribution('awx').module_path)
from awx import prepare_env
prepare_env()
setup_django()
from awx.main.models import UnifiedJob, Job, JobTemplate
fields = list(set(Job._meta.fields) - set(UnifiedJob._meta.fields))
job_field_names = set([f.attname for f in fields])
# extra unified job field names from base class
for field_name in ('name', 'created_by_id', 'modified_by_id'):
job_field_names.add(field_name)
jt_count = JobTemplate.objects.count()
def make_batch(N, jt_pos=0):
jt = None
while not jt:
try:
jt = JobTemplate.objects.all()[jt_pos % jt_count]
except IndexError as e:
# seems to happen every now and then due to some race condition
print('Warning: IndexError on {} JT, error: {}'.format(jt_pos % jt_count, e))
jt_pos += 1
jt_defaults = dict(
(f.attname, getattr(jt, f.attname))
for f in JobTemplate._meta.get_fields()
if f.concrete and f.attname in job_field_names and getattr(jt, f.attname)
)
jt_defaults['job_template_id'] = jt.pk
jt_defaults['unified_job_template_id'] = jt.pk # populated by save method
jobs = [
Job(
status=STATUS_OPTIONS[i % len(STATUS_OPTIONS)],
started=now() - time_delta,
created=now() - time_delta,
modified=now() - time_delta,
finished=now() - time_delta,
elapsed=0.0,
**jt_defaults,
)
for i in range(N)
]
ujs = UnifiedJob.objects.bulk_create(jobs)
for uj in ujs:
uj.unifiedjob_ptr_id = uj.id # hack around the polymorphic id field not being picked up
query = InsertQuery(Job)
query.insert_values(fields, ujs)
with connection.cursor() as cursor:
query, params = query.sql_with_params()[0]
cursor.execute(query, params)
return ujs[-1], jt_pos, [uj.pk for uj in ujs]
i = 1
jt_pos = 0
created_job_ids = []
s = time()
from awx.main.models import JobEvent
from awx.main.utils.common import create_partition
start_partition = (now() - time_delta).replace(minute=0, second=0, microsecond=0)
create_partition(JobEvent._meta.db_table, start_partition)
while jobs > 0:
s_loop = time()
print('running batch {}, runtime {}'.format(i, time() - s))
created, jt_pos, ujs_pk = make_batch(min(jobs, batch_size), jt_pos)
print('took {}'.format(time() - s_loop))
i += 1
jobs -= batch_size
created_job_ids += ujs_pk
print('Created Job IDS: {}'.format(created_job_ids))
# return created
return created_job_ids
def generate_events(events, job, time_delta):
conn = psycopg2.connect(dsn)
cursor = conn.cursor()
created_time = datetime.datetime.today() - time_delta - datetime.timedelta(seconds=5)
modified_time = datetime.datetime.today() - time_delta
created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S")
modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S")
print(f'attaching {events} events to job {job}')
cores = multiprocessing.cpu_count()
workers = []
num_procs = min(cores, events)
num_events = math.ceil(events / num_procs)
if num_events <= 1:
num_events = events
for i in range(num_procs):
p = multiprocessing.Process(target=firehose, args=(job, num_events, created_stamp, modified_stamp))
p.daemon = True
workers.append(p)
for w in workers:
w.start()
for w in workers:
w.join()
workers = []
print('generating unique start/end line counts')
cursor.execute('CREATE SEQUENCE IF NOT EXISTS firehose_seq;')
cursor.execute('CREATE SEQUENCE IF NOT EXISTS firehose_line_seq MINVALUE 0;')
cursor.execute('ALTER SEQUENCE firehose_seq RESTART WITH 1;')
cursor.execute('ALTER SEQUENCE firehose_line_seq RESTART WITH 0;')
cursor.execute("SELECT nextval('firehose_line_seq')")
conn.commit()
cursor.execute(
"UPDATE main_jobevent SET "
"counter=nextval('firehose_seq')::integer,"
"start_line=nextval('firehose_line_seq')::integer,"
"end_line=currval('firehose_line_seq')::integer + 2 "
f"WHERE job_id={job}"
)
conn.commit()
conn.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--jobs-per-hour', type=int, help='Number of jobs to create.', default=1000) # 1M by default
parser.add_argument('--events-per-job', type=int, help='Number of events to create.', default=1345) # 1B by default
parser.add_argument('--batch-size', type=int, help='Number of jobs to create in a single batch.', default=100)
parser.add_argument('--days-delta', type=int, help='Number of days old to create the events. Defaults to 31.', default=31)
params = parser.parse_args()
jobs = params.jobs_per_hour
events = params.events_per_job
days_delta = params.days_delta
batch_size = params.batch_size
try:
conn = psycopg2.connect(dsn)
cursor = conn.cursor()
# Drop all the indexes before generating jobs
print('removing indexes and constraints')
# get all the indexes for main_jobevent
# disable WAL to drastically increase write speed
# we're not doing replication, and the goal of this script is to just
# insert data as quickly as possible without concern for the risk of
# data loss on crash
# see: https://www.compose.com/articles/faster-performance-with-unlogged-tables-in-postgresql/
cursor.execute('ALTER TABLE main_jobevent SET UNLOGGED')
cursor.execute("SELECT indexname, indexdef FROM pg_indexes WHERE tablename='main_jobevent' AND indexname != 'main_jobevent_pkey1';")
indexes = cursor.fetchall()
cursor.execute(
"SELECT conname, contype, pg_catalog.pg_get_constraintdef(r.oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = 'main_jobevent'::regclass AND conname != 'main_jobevent_pkey1';"
) # noqa
constraints = cursor.fetchall()
# drop all indexes for speed
for indexname, indexdef in indexes:
if indexname == 'main_jobevent_pkey_new': # Dropped by the constraint
continue
cursor.execute(f'DROP INDEX IF EXISTS {indexname}')
print(f'DROP INDEX IF EXISTS {indexname}')
for conname, contype, condef in constraints:
cursor.execute(f'ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS {conname}')
print(f'ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS {conname}')
conn.commit()
for i_day in range(days_delta, 0, -1):
for j_hour in range(24):
time_delta = datetime.timedelta(days=i_day, hours=j_hour, seconds=0)
created_job_ids = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta)
if events > 0:
for k_id in created_job_ids:
generate_events(events, str(k_id), time_delta)
print(datetime.datetime.utcnow().isoformat())
conn.close()
finally:
# restore all indexes
print(datetime.datetime.utcnow().isoformat())
print('restoring indexes and constraints (this may take awhile)')
workers = []
for indexname, indexdef in indexes:
if indexname == 'main_jobevent_pkey_new': # Created by the constraint
continue
p = multiprocessing.Process(target=cleanup, args=(indexdef,))
p.daemon = True
workers.append(p)
for w in workers:
w.start()
for w in workers:
w.join()
for conname, contype, condef in constraints:
if contype == 'c':
# if there are any check constraints, don't add them back
# (historically, these are > 0 checks, which are basically
# worthless, because Ansible doesn't emit counters, line
# numbers, verbosity, etc... < 0)
continue
sql = f'ALTER TABLE main_jobevent ADD CONSTRAINT {conname} {condef}'
cleanup(sql)
print(datetime.datetime.utcnow().isoformat())
|