1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
|
#!@PYTHON@
"""\
This file implements the Boss of Bind (BoB, or bob) program.
It's purpose is to start up the BIND 10 system, and then manage the
processes, by starting and stopping processes, plus restarting
processes that exit.
To start the system, it first runs the c-channel program (msgq), then
connects to that. It then runs the configuration manager, and reads
its own configuration. Then it proceeds to starting other modules.
The Python subprocess module is used for starting processes, but
because this is not efficient for managing groups of processes,
SIGCHLD signals are caught and processed using the signal module.
Most of the logic is contained in the BoB class. However, since Python
requires that signal processing happen in the main thread, we do
signal handling outside of that class, in the code running for
__main__.
"""
import sys; sys.path.append ('@@PYTHONPATH@@')
import os
# If B10_FROM_SOURCE is set in the environment, we use data files
# from a directory relative to that, otherwise we use the ones
# installed on the system
if "B10_FROM_SOURCE" in os.environ:
SPECFILE_LOCATION = os.environ["B10_FROM_SOURCE"] + "/src/bin/bind10/bob.spec"
else:
PREFIX = "@prefix@"
DATAROOTDIR = "@datarootdir@"
SPECFILE_LOCATION = "@datadir@/@PACKAGE@/bob.spec".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
# TODO: start up statistics thingy
import subprocess
import signal
import re
import errno
import time
import select
import random
from optparse import OptionParser, OptionValueError
import isc.cc
# This is the version that gets displayed to the user.
__version__ = "v20100310"
# Nothing at all to do with the 1990-12-10 article here:
# http://www.subgenius.com/subg-digest/v2/0056.html
class RestartSchedule:
"""
Keeps state when restarting something (in this case, a process).
When a process dies unexpectedly, we need to restart it. However, if
it fails to restart for some reason, then we should not simply keep
restarting it at high speed.
A more sophisticated algorithm can be developed, but for now we choose
a simple set of rules:
* If a process was been running for >=10 seconds, we restart it
right away.
* If a process was running for <10 seconds, we wait until 10 seconds
after it was started.
To avoid programs getting into lockstep, we use a normal distribution
to avoid being restarted at exactly 10 seconds."""
def __init__(self, restart_frequency=10.0):
self.restart_frequency = restart_frequency
self.run_start_time = None
self.run_stop_time = None
self.restart_time = None
def set_run_start_time(self, when=None):
if when is None:
when = time.time()
self.run_start_time = when
sigma = self.restart_frequency * 0.05
self.restart_time = when + random.normalvariate(self.restart_frequency,
sigma)
def set_run_stop_time(self, when=None):
"""We don't actually do anything with stop time now, but it
might be useful for future algorithms."""
if when is None:
when = time.time()
self.run_stop_time = when
def get_restart_time(self, when=None):
if when is None:
when = time.time()
return max(when, self.restart_time)
class ProcessInfo:
"""Information about a process"""
dev_null = open("/dev/null", "w")
def _spawn(self):
if self.dev_null_stdout:
spawn_stdout = self.dev_null
else:
spawn_stdout = None
spawn_env = self.env
spawn_env['PATH'] = os.environ['PATH']
if 'B10_FROM_SOURCE' in os.environ:
spawn_env['B10_FROM_SOURCE'] = os.environ['B10_FROM_SOURCE']
else:
spawn_env['PATH'] = "@@LIBEXECDIR@@:" + spawn_env['PATH']
if 'PYTHON_EXEC' in os.environ:
spawn_env['PYTHON_EXEC'] = os.environ['PYTHON_EXEC']
if 'PYTHONPATH' in os.environ:
spawn_env['PYTHONPATH'] = os.environ['PYTHONPATH']
self.process = subprocess.Popen(self.args,
stdin=subprocess.PIPE,
stdout=spawn_stdout,
stderr=spawn_stdout,
close_fds=True,
env=spawn_env,)
self.pid = self.process.pid
self.restart_schedule.set_run_start_time()
def __init__(self, name, args, env={}, dev_null_stdout=False):
self.name = name
self.args = args
self.env = env
self.dev_null_stdout = dev_null_stdout
self.restart_schedule = RestartSchedule()
self._spawn()
def respawn(self):
self._spawn()
class BoB:
"""Boss of BIND class."""
def __init__(self, c_channel_port=9912, auth_port=5300, verbose=False):
"""Initialize the Boss of BIND. This is a singleton (only one
can run).
The c_channel_port specifies the TCP/IP port that the msgq
process listens on. If verbose is True, then the boss reports
what it is doing.
"""
self.verbose = verbose
self.c_channel_port = c_channel_port
self.auth_port = auth_port
self.cc_session = None
self.ccs = None
self.processes = {}
self.dead_processes = {}
self.runnable = False
os.environ['ISC_MSGQ_PORT'] = str(self.c_channel_port)
def config_handler(self, new_config):
if self.verbose:
print("[XX] handling new config:")
print(new_config)
answer = isc.config.ccsession.create_answer(0)
return answer
# TODO
def command_handler(self, command, args):
if self.verbose:
print("[XX] Boss got command:")
print(command)
answer = [ 1, "Command not implemented" ]
if type(command) != str:
answer = isc.config.ccsession.create_answer(1, "bad command")
else:
cmd = command
if cmd == "shutdown":
print("[XX] got shutdown command")
self.runnable = False
answer = isc.config.ccsession.create_answer(0)
elif cmd == "print_message":
if args:
print(args)
answer = isc.config.ccsession.create_answer(0, args)
elif cmd == "print_settings":
print("Full Config:")
full_config = self.ccs.get_full_config()
for item in full_config:
print(item + ": " + str(full_config[item]))
answer = isc.config.ccsession.create_answer(0)
else:
answer = isc.config.ccsession.create_answer(1, "Unknown command")
return answer
def startup(self):
"""Start the BoB instance.
Returns None if successful, otherwise an string describing the
problem.
"""
# try to connect to the c-channel daemon,
# to see if it is already running
c_channel_env = { "ISC_MSGQ_PORT": str(self.c_channel_port), }
if self.verbose:
sys.stdout.write("Checking for already running msgq\n")
# try to connect, and if we can't wait a short while
try:
self.cc_session = isc.cc.Session(self.c_channel_port)
return "msgq already running, cannot start"
except isc.cc.session.SessionError:
pass
# start the c-channel daemon
if self.verbose:
sys.stdout.write("Starting msgq using port %d\n" %
self.c_channel_port)
try:
c_channel = ProcessInfo("msgq", ["msgq"], c_channel_env, True)
except Exception as e:
return "Unable to start msgq; " + str(e)
self.processes[c_channel.pid] = c_channel
if self.verbose:
sys.stdout.write("Started msgq (PID %d)\n" % c_channel.pid)
# now connect to the c-channel
cc_connect_start = time.time()
while self.cc_session is None:
# if we have been trying for "a while" give up
if (time.time() - cc_connect_start) > 5:
c_channel.process.kill()
return "Unable to connect to c-channel after 5 seconds"
# try to connect, and if we can't wait a short while
try:
self.cc_session = isc.cc.Session(self.c_channel_port)
except isc.cc.session.SessionError:
time.sleep(0.1)
#self.cc_session.group_subscribe("Boss", "boss")
# start the configuration manager
if self.verbose:
sys.stdout.write("Starting b10-cfgmgr\n")
try:
bind_cfgd = ProcessInfo("b10-cfgmgr", ["b10-cfgmgr"],
{ 'ISC_MSGQ_PORT': str(self.c_channel_port)})
except Exception as e:
c_channel.process.kill()
return "Unable to start b10-cfgmgr; " + str(e)
self.processes[bind_cfgd.pid] = bind_cfgd
if self.verbose:
sys.stdout.write("Started b10-cfgmgr (PID %d)\n" % bind_cfgd.pid)
# TODO: once this interface is done, replace self.cc_session
# by this one
# sleep until b10-cfgmgr is fully up and running, this is a good place
# to have a (short) timeout on synchronized groupsend/receive
# TODO: replace the sleep by a listen for ConfigManager started
# message
time.sleep(1)
if self.verbose:
print("[XX] starting ccsession")
self.ccs = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
self.ccs.start()
if self.verbose:
print("[XX] ccsession started")
# start the xfrout before auth-server, to make sure every xfr-query can be
# processed properly.
if self.verbose:
sys.stdout.write("Starting b10-xfrout\n")
try:
xfrout = ProcessInfo("b10-xfrout", ["b10-xfrout"],
{ 'ISC_MSGQ_PORT': str(self.c_channel_port)})
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
return "Unable to start b10-xfrout; " + str(e)
self.processes[xfrout.pid] = xfrout
if self.verbose:
sys.stdout.write("Started b10-xfrout (PID %d)\n" % xfrout.pid)
# start b10-auth
# XXX: this must be read from the configuration manager in the future
authargs = ['b10-auth', '-p', str(self.auth_port)]
if self.verbose:
sys.stdout.write("Starting b10-auth using port %d\n" %
self.auth_port)
authargs += ['-v']
try:
auth = ProcessInfo("b10-auth", authargs,
{ 'ISC_MSGQ_PORT': str(self.c_channel_port)})
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
xfrout.process.kill()
return "Unable to start b10-auth; " + str(e)
self.processes[auth.pid] = auth
if self.verbose:
sys.stdout.write("Started b10-auth (PID %d)\n" % auth.pid)
# start the b10-xfrin
if self.verbose:
sys.stdout.write("Starting b10-xfrin\n")
try:
xfrind = ProcessInfo("b10-xfrin", ['b10-xfrin'],
{ 'ISC_MSGQ_PORT': str(self.c_channel_port)})
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
xfrout.process.kill()
auth.process.kill()
return "Unable to start b10-xfrin; " + str(e)
self.processes[xfrind.pid] = xfrind
if self.verbose:
sys.stdout.write("Started b10-xfrin (PID %d)\n" % xfrind.pid)
# start the b10-cmdctl
# XXX: we hardcode port 8080
if self.verbose:
sys.stdout.write("Starting b10-cmdctl on port 8080\n")
try:
cmd_ctrld = ProcessInfo("b10-cmdctl", ['b10-cmdctl'],
{ 'ISC_MSGQ_PORT': str(self.c_channel_port)})
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
xfrout.process.kill()
auth.process.kill()
xfrind.process.kill()
return "Unable to start b10-cmdctl; " + str(e)
self.processes[cmd_ctrld.pid] = cmd_ctrld
if self.verbose:
sys.stdout.write("Started b10-cmdctl (PID %d)\n" % cmd_ctrld.pid)
self.runnable = True
return None
def stop_all_processes(self):
"""Stop all processes."""
cmd = { "command": ['shutdown']}
self.cc_session.group_sendmsg(cmd, 'Boss', 'Cmd-Ctrld')
self.cc_session.group_sendmsg(cmd, "Boss", "ConfigManager")
self.cc_session.group_sendmsg(cmd, "Boss", "Auth")
def stop_process(self, process):
"""Stop the given process, friendly-like."""
# XXX nothing yet
pass
def shutdown(self):
"""Stop the BoB instance."""
if self.verbose:
sys.stdout.write("Stopping the server.\n")
# first try using the BIND 10 request to stop
try:
self.stop_all_processes()
except:
pass
# XXX: some delay probably useful... how much is uncertain
time.sleep(0.1)
self.reap_children()
# next try sending a SIGTERM
processes_to_stop = list(self.processes.values())
unstopped_processes = []
for proc_info in processes_to_stop:
if self.verbose:
sys.stdout.write("Sending SIGTERM to %s (PID %d).\n" %
(proc_info.name, proc_info.pid))
try:
proc_info.process.terminate()
except OSError:
# ignore these (usually ESRCH because the child
# finally exited)
pass
# XXX: some delay probably useful... how much is uncertain
time.sleep(0.1)
self.reap_children()
# finally, send a SIGKILL (unmaskable termination)
processes_to_stop = unstopped_processes
for proc_info in processes_to_stop:
if self.verbose:
sys.stdout.write("Sending SIGKILL to %s (PID %d).\n" %
(proc_info.name, proc_info.pid))
try:
proc_info.process.kill()
except OSError:
# ignore these (usually ESRCH because the child
# finally exited)
pass
if self.verbose:
sys.stdout.write("All processes ended, server done.\n")
def reap_children(self):
"""Check to see if any of our child processes have exited,
and note this for later handling.
"""
while True:
try:
(pid, exit_status) = os.waitpid(-1, os.WNOHANG)
except OSError as o:
if o.errno == errno.ECHILD: break
# XXX: should be impossible to get any other error here
raise
if pid == 0: break
if pid in self.processes:
proc_info = self.processes.pop(pid)
proc_info.restart_schedule.set_run_stop_time()
self.dead_processes[proc_info.pid] = proc_info
if self.verbose:
sys.stdout.write("Process %s (PID %d) died.\n" %
(proc_info.name, proc_info.pid))
if proc_info.name == "msgq":
if self.verbose and self.runnable:
sys.stdout.write(
"The msgq process died, shutting down.\n")
self.runnable = False
else:
sys.stdout.write("Unknown child pid %d exited.\n" % pid)
# 'old' command style, uncommented for now
# move the handling below move to command_handler please
#def recv_and_process_cc_msg(self):
#"""Receive and process the next message on the c-channel,
#if any."""
#self.ccs.checkCommand()
#msg, envelope = self.cc_session.group_recvmsg(False)
#print(msg)
#if msg is None:
# return
#if not ((type(msg) is dict) and (type(envelope) is dict)):
# if self.verbose:
# sys.stdout.write("Non-dictionary message\n")
# return
#if not "command" in msg:
# if self.verbose:
# if "msg" in envelope:
# del envelope['msg']
# sys.stdout.write("Unknown message received\n")
# sys.stdout.write(pprint.pformat(envelope) + "\n")
# sys.stdout.write(pprint.pformat(msg) + "\n")
# return
#cmd = msg['command']
#if not (type(cmd) is list):
# if self.verbose:
# sys.stdout.write("Non-list command\n")
# return
#
# done checking and extracting... time to execute the command
#if cmd[0] == "shutdown":
# if self.verbose:
# sys.stdout.write("shutdown command received\n")
# self.runnable = False
# # XXX: reply here?
#elif cmd[0] == "getProcessList":
# if self.verbose:
# sys.stdout.write("getProcessList command received\n")
# live_processes = [ ]
# for proc_info in processes:
# live_processes.append({ "name": proc_info.name,
# "args": proc_info.args,
# "pid": proc_info.pid, })
# dead_processes = [ ]
# for proc_info in dead_processes:
# dead_processes.append({ "name": proc_info.name,
# "args": proc_info.args, })
# cc.group_reply(envelope, { "response": cmd,
# "sent": msg["sent"],
# "live_processes": live_processes,
# "dead_processes": dead_processes, })
#else:
# if self.verbose:
# sys.stdout.write("Unknown command %s\n" % str(cmd))
def restart_processes(self):
"""Restart any dead processes."""
next_restart = None
# if we're shutting down, then don't restart
if not self.runnable:
return next_restart
# otherwise look through each dead process and try to restart
still_dead = {}
now = time.time()
for proc_info in self.dead_processes.values():
restart_time = proc_info.restart_schedule.get_restart_time(now)
if restart_time > now:
# if self.verbose:
# sys.stdout.write("Dead %s process waiting %.1f seconds "\
# "for resurrection\n" %
# (proc_info.name, (restart_time-now)))
if (next_restart is None) or (next_restart > restart_time):
next_restart = restart_time
still_dead[proc_info.pid] = proc_info
else:
if self.verbose:
sys.stdout.write("Resurrecting dead %s process...\n" %
proc_info.name)
try:
proc_info.respawn()
self.processes[proc_info.pid] = proc_info
if self.verbose:
sys.stdout.write("Resurrected %s (PID %d)\n" %
(proc_info.name, proc_info.pid))
except:
still_dead[proc_info.pid] = proc_info
# remember any processes that refuse to be resurrected
self.dead_processes = still_dead
# return the time when the next process is ready to be restarted
return next_restart
# global variables, needed for signal handlers
options = None
boss_of_bind = None
def reaper(signal_number, stack_frame):
"""A child process has died (SIGCHLD received)."""
# don't do anything...
# the Python signal handler has been set up to write
# down a pipe, waking up our select() bit
pass
def get_signame(signal_number):
"""Return the symbolic name for a signal."""
for sig in dir(signal):
if sig.startswith("SIG") and sig[3].isalnum():
if getattr(signal, sig) == signal_number:
return sig
return "Unknown signal %d" % signal_number
# XXX: perhaps register atexit() function and invoke that instead
def fatal_signal(signal_number, stack_frame):
"""We need to exit (SIGINT or SIGTERM received)."""
global options
global boss_of_bind
if options.verbose:
sys.stdout.write("Received %s.\n" % get_signame(signal_number))
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
boss_of_bind.runnable = False
def check_port(option, opt_str, value, parser):
"""Function to insure that the port we are passed is actually
a valid port number. Used by OptionParser() on startup."""
if not re.match('^(6553[0-5]|655[0-2]\d|65[0-4]\d\d|6[0-4]\d{3}|[1-5]\d{4}|[1-9]\d{0,3}|0)$', value):
raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
if (opt_str == '-m' or opt_str == '--msgq-port'):
parser.values.msgq_port = value
elif (opt_str == '-p' or opt_str == '--port'):
parser.values.auth_port = value
else:
raise OptionValueError("Unknown option " + opt_str)
def main():
global options
global boss_of_bind
# Parse any command-line options.
parser = OptionParser(version=__version__)
parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
help="display more about what is going on")
parser.add_option("-p", "--port", dest="auth_port", type="string",
action="callback", callback=check_port, default="5300",
help="port the b10-auth daemon will use (default 5300)")
parser.add_option("-m", "--msgq-port", dest="msgq_port", type="string",
action="callback", callback=check_port, default="9912",
help="port the msgq daemon will use (default 9912)")
(options, args) = parser.parse_args()
# Announce startup.
if options.verbose:
sys.stdout.write("BIND 10 %s\n" % __version__)
# TODO: set process name, perhaps by:
# http://code.google.com/p/procname/
# http://github.com/lericson/procname/
# Create wakeup pipe for signal handlers
wakeup_pipe = os.pipe()
signal.set_wakeup_fd(wakeup_pipe[1])
# Set signal handlers for catching child termination, as well
# as our own demise.
signal.signal(signal.SIGCHLD, reaper)
signal.siginterrupt(signal.SIGCHLD, False)
signal.signal(signal.SIGINT, fatal_signal)
signal.signal(signal.SIGTERM, fatal_signal)
# Go bob!
boss_of_bind = BoB(int(options.msgq_port), int(options.auth_port), \
options.verbose)
startup_result = boss_of_bind.startup()
if startup_result:
sys.stderr.write("Error on startup: %s\n" % startup_result)
sys.exit(1)
# In our main loop, we check for dead processes or messages
# on the c-channel.
wakeup_fd = wakeup_pipe[0]
ccs_fd = boss_of_bind.ccs.get_socket().fileno()
while boss_of_bind.runnable:
# clean up any processes that exited
boss_of_bind.reap_children()
next_restart = boss_of_bind.restart_processes()
if next_restart is None:
wait_time = None
else:
wait_time = max(next_restart - time.time(), 0)
# select() can raise EINTR when a signal arrives,
# even if they are resumable, so we have to catch
# the exception
try:
(rlist, wlist, xlist) = select.select([wakeup_fd, ccs_fd], [], [],
wait_time)
except select.error as err:
if err.args[0] == errno.EINTR:
(rlist, wlist, xlist) = ([], [], [])
else:
sys.stderr.write("Error with select(); %s\n" % err)
break
for fd in rlist + xlist:
if fd == ccs_fd:
boss_of_bind.ccs.check_command()
elif fd == wakeup_fd:
os.read(wakeup_fd, 32)
# shutdown
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
boss_of_bind.shutdown()
if __name__ == "__main__":
main()
|