summaryrefslogtreecommitdiffstats
path: root/src/bin
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin')
-rw-r--r--src/bin/Makefile.am2
-rw-r--r--src/bin/auth/Makefile.am1
-rw-r--r--src/bin/auth/main.cc107
-rw-r--r--src/bin/bind10/bind10.py.in18
-rw-r--r--src/bin/xfrout/Makefile.am18
-rw-r--r--src/bin/xfrout/run_b10-xfrout.sh.in12
-rw-r--r--src/bin/xfrout/xfrout.pre.in27
-rw-r--r--src/bin/xfrout/xfrout.py.in425
8 files changed, 578 insertions, 32 deletions
diff --git a/src/bin/Makefile.am b/src/bin/Makefile.am
index 7b331018f2..e91b658244 100644
--- a/src/bin/Makefile.am
+++ b/src/bin/Makefile.am
@@ -1 +1 @@
-SUBDIRS = bind10 bindctl cfgmgr loadzone msgq host cmdctl auth xfrin usermgr
+SUBDIRS = bind10 bindctl cfgmgr loadzone msgq host cmdctl auth xfrin xfrout usermgr
diff --git a/src/bin/auth/Makefile.am b/src/bin/auth/Makefile.am
index ae22bd34ab..e90584f106 100644
--- a/src/bin/auth/Makefile.am
+++ b/src/bin/auth/Makefile.am
@@ -33,6 +33,7 @@ b10_auth_LDADD += $(top_builddir)/src/lib/config/.libs/libcfgclient.a
b10_auth_LDADD += $(top_builddir)/src/lib/cc/libcc.a
b10_auth_LDADD += $(top_builddir)/src/lib/exceptions/.libs/libexceptions.a
b10_auth_LDADD += $(SQLITE_LIBS)
+b10_auth_LDADD += $(top_builddir)/src/lib/xfr/.libs/libxfr.a
if HAVE_BOOST_SYSTEM
b10_auth_LDFLAGS = $(AM_LDFLAGS) $(BOOST_LDFLAGS)
b10_auth_LDADD += $(BOOST_SYSTEM_LIB)
diff --git a/src/bin/auth/main.cc b/src/bin/auth/main.cc
index 1e1307ae99..96884dddd1 100644
--- a/src/bin/auth/main.cc
+++ b/src/bin/auth/main.cc
@@ -42,12 +42,14 @@
#include <cc/session.h>
#include <cc/data.h>
#include <config/ccsession.h>
+#include <xfr/xfrout_client.h>
#include "spec_config.h"
#include "common.h"
#include "auth_srv.h"
using namespace std;
+using namespace isc::xfr;
#ifdef HAVE_BOOST_SYSTEM
using namespace boost::asio;
@@ -103,7 +105,38 @@ my_command_handler(const string& command, const ElementPtr args) {
return answer;
}
-#ifdef HAVE_BOOST_SYSTEM
+//TODO. The sample way for checking axfr query, the code should be merged to auth server class
+static bool
+check_axfr_query(char *msg_data, uint16_t msg_len)
+{
+ if (msg_len < 15)
+ return false;
+
+ uint16_t query_type = *(uint16_t *)(msg_data + (msg_len - 4));
+ if ( query_type == 0xFC00)
+ return true;
+
+ return false;
+}
+
+//TODO. Send the xfr query to xfrout module, the code should be merged to auth server class
+static void
+dispatch_axfr_query(int tcp_sock, char axfr_query[], uint16_t query_len)
+{
+ std::string path = "/tmp/auth_xfrout_conn";
+ XfroutClient xfr_client(path);
+ try {
+ xfr_client.connect();
+ xfr_client.sendXfroutRequestInfo(tcp_sock, (uint8_t *)axfr_query, query_len);
+ xfr_client.disconnect();
+ }
+ catch (const std::exception & err) {
+ //if (verbose_mode)
+ cerr << "error handle xfr query:" << err.what() << endl;
+ }
+}
+
+#ifdef HAVE_BOOSTLIB
//
// Helper classes for asynchronous I/O using boost::asio
//
@@ -148,17 +181,24 @@ public:
{
if (!error) {
InputBuffer dnsbuffer(data_, bytes_transferred);
- if (auth_server->processMessage(dnsbuffer, dns_message_,
- response_renderer_, false)) {
- responselen_buffer_.writeUint16(response_buffer_.getLength());
- async_write(socket_,
- boost::asio::buffer(
- responselen_buffer_.getData(),
- responselen_buffer_.getLength()),
- boost::bind(&TCPClient::responseWrite, this,
- placeholders::error));
- } else {
- delete this;
+ if (check_axfr_query(data_, bytes_transferred)) {
+ dispatch_axfr_query(socket_.native(), data_, bytes_transferred);
+ // start to get new query ?
+ start();
+ }
+ else {
+ if (auth_server->processMessage(dnsbuffer, dns_message_,
+ response_renderer_, false)) {
+ responselen_buffer_.writeUint16(response_buffer_.getLength());
+ async_write(socket_,
+ boost::asio::buffer(
+ responselen_buffer_.getData(),
+ responselen_buffer_.getLength()),
+ boost::bind(&TCPClient::responseWrite, this,
+ placeholders::error));
+ } else {
+ delete this;
+ }
}
} else {
delete this;
@@ -537,25 +577,30 @@ processMessageTCP(const int fd, Message& dns_message,
cc += cc0;
}
- InputBuffer buffer(&message_buffer[0], size);
- dns_message.clear(Message::PARSE);
- response_renderer.clear();
- if (auth_server->processMessage(buffer, dns_message, response_renderer,
- false)) {
- size = response_renderer.getLength();
- size_n = htons(size);
- if (send(ts, &size_n, 2, 0) == 2) {
- cc = send(ts, response_renderer.getData(),
- response_renderer.getLength(), 0);
- if (cc == -1) {
- if (verbose_mode) {
- cerr << "[AuthSrv] error in sending TCP response message" <<
- endl;
- }
- } else {
- if (verbose_mode) {
- cerr << "[XX] sent TCP response: " << cc << " bytes"
- << endl;
+ if (check_axfr_query(&message_buffer[0], size)) {
+ dispatch_axfr_query(ts, &message_buffer[0], size);
+ }
+ else {
+ InputBuffer buffer(&message_buffer[0], size);
+ dns_message.clear(Message::PARSE);
+ response_renderer.clear();
+ if (auth_server->processMessage(buffer, dns_message, response_renderer,
+ false)) {
+ size = response_renderer.getLength();
+ size_n = htons(size);
+ if (send(ts, &size_n, 2, 0) == 2) {
+ cc = send(ts, response_renderer.getData(),
+ response_renderer.getLength(), 0);
+ if (cc == -1) {
+ if (verbose_mode) {
+ cerr << "[AuthSrv] error in sending TCP response message" <<
+ endl;
+ }
+ } else {
+ if (verbose_mode) {
+ cerr << "[XX] sent TCP response: " << cc << " bytes"
+ << endl;
+ }
}
}
} else {
diff --git a/src/bin/bind10/bind10.py.in b/src/bin/bind10/bind10.py.in
index 01223dcaa9..5ef69a5077 100644
--- a/src/bin/bind10/bind10.py.in
+++ b/src/bin/bind10/bind10.py.in
@@ -265,6 +265,21 @@ class BoB:
if self.verbose:
print("[XX] ccsession started")
+ # start the xfrout before auth-server, to make sure every xfr-query can be
+ # processed properly.
+ if self.verbose:
+ sys.stdout.write("Starting b10-xfrout\n")
+ try:
+ xfrout = ProcessInfo("b10-xfrout", ["b10-xfrout"],
+ { 'ISC_MSGQ_PORT': str(self.c_channel_port)})
+ except Exception as e:
+ c_channel.process.kill()
+ bind_cfgd.process.kill()
+ return "Unable to start b10-xfrout; " + str(e)
+ self.processes[xfrout.pid] = xfrout
+ if self.verbose:
+ sys.stdout.write("Started b10-xfrout (PID %d)\n" % xfrout.pid)
+
# start b10-auth
# XXX: this must be read from the configuration manager in the future
authargs = ['b10-auth', '-p', str(self.auth_port)]
@@ -278,6 +293,7 @@ class BoB:
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
+ xfrout.process.kill()
return "Unable to start b10-auth; " + str(e)
self.processes[auth.pid] = auth
if self.verbose:
@@ -292,6 +308,7 @@ class BoB:
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
+ xfrout.process.kill()
auth.process.kill()
return "Unable to start b10-xfrin; " + str(e)
self.processes[xfrind.pid] = xfrind
@@ -308,6 +325,7 @@ class BoB:
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
+ xfrout.process.kill()
auth.process.kill()
xfrind.process.kill()
return "Unable to start b10-cmdctl; " + str(e)
diff --git a/src/bin/xfrout/Makefile.am b/src/bin/xfrout/Makefile.am
new file mode 100644
index 0000000000..1284ba6d5e
--- /dev/null
+++ b/src/bin/xfrout/Makefile.am
@@ -0,0 +1,18 @@
+pkglibexecdir = $(libexecdir)/@PACKAGE@
+
+pkglibexec_SCRIPTS = b10-xfrout
+
+b10_xfroutdir = $(DESTDIR)$(pkgdatadir)
+b10_xfrout_DATA = xfrout.spec
+
+CLEANFILES= xfrout.py xfrout.pre xfrout.spec
+
+xfrout.spec: xfrout.pre
+ $(SED) -e "s|@@LOCALSTATEDIR@@|$(localstatedir)|" xfrout.pre >$@
+
+# TODO: does this need $$(DESTDIR) also?
+# this is done here since configure.ac AC_OUTPUT doesn't expand exec_prefix
+b10-xfrout: xfrout.py
+ $(SED) -e "s|@@PYTHONPATH@@|@pyexecdir@|" \
+ -e "s|@@LOCALSTATEDIR@@|$(localstatedir)|" xfrout.py >$@
+ chmod a+x $@
diff --git a/src/bin/xfrout/run_b10-xfrout.sh.in b/src/bin/xfrout/run_b10-xfrout.sh.in
new file mode 100644
index 0000000000..9be34921e7
--- /dev/null
+++ b/src/bin/xfrout/run_b10-xfrout.sh.in
@@ -0,0 +1,12 @@
+#! /bin/sh
+
+PYTHON_EXEC=${PYTHON_EXEC:-@PYTHON@}
+export PYTHON_EXEC
+
+MYPATH_PATH=@abs_top_builddir@/src/bin/xfrout
+PYTHONPATH=@abs_top_srcdir@/src/lib/python
+export PYTHONPATH
+
+cd ${MYPATH_PATH}
+${PYTHON_EXEC} b10-xfrout
+
diff --git a/src/bin/xfrout/xfrout.pre.in b/src/bin/xfrout/xfrout.pre.in
new file mode 100644
index 0000000000..495d3fd2a0
--- /dev/null
+++ b/src/bin/xfrout/xfrout.pre.in
@@ -0,0 +1,27 @@
+{
+ "module_spec": {
+ "module_name": "Xfrout",
+ "config_data": [
+ {
+ "item_name": "transfers_out",
+ "item_type": "integer",
+ "item_optional": False,
+ "item_default": 10
+ },
+ {
+ "item_name": "db_file",
+ "item_type": "string",
+ "item_optional": False,
+ "item_default": '@@LOCALSTATEDIR@@/@PACKAGE@/zone.sqlite3'
+ }
+ ],
+ "commands": [
+ {
+ "command_name": "shutdown",
+ "command_description": "Shut down Xfrout",
+ "command_args": []
+ }
+ ]
+ }
+}
+
diff --git a/src/bin/xfrout/xfrout.py.in b/src/bin/xfrout/xfrout.py.in
new file mode 100644
index 0000000000..d909f43e16
--- /dev/null
+++ b/src/bin/xfrout/xfrout.py.in
@@ -0,0 +1,425 @@
+#!@PYTHON@
+
+# Copyright (C) 2010 Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+
+import sys; sys.path.append ('@@PYTHONPATH@@')
+import isc
+import isc.cc
+import os
+import threading
+import struct
+import signal
+from isc.auth import sqlite3_ds
+from socketserver import *
+import os
+from isc.config.ccsession import *
+import socket
+from bind10_xfr import *
+from bind10_dns import *
+from optparse import OptionParser, OptionValueError
+try:
+ from bind10_xfr import *
+ from bind10_dns import *
+except:
+ pass
+
+if "B10_FROM_SOURCE" in os.environ:
+ SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrout"
+else:
+ PREFIX = "@prefix@"
+ DATAROOTDIR = "@datarootdir@"
+ SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
+SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
+
+MAX_TRANSFERS_OUT = 10
+verbose_mode = True
+
+
+class XfroutException(Exception): pass
+
+
+class XfroutSession(BaseRequestHandler):
+ def handle(self):
+ fd = recv_fd(self.request.fileno())
+ data_len = self.request.recv(2)
+ msg_len = struct.unpack('H', data_len)[0]
+ msgdata = self.request.recv(msg_len)
+ sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ self.dns_xfrout_start(sock, msgdata)
+ except Exception as e:
+ self.log_msg(str(e))
+
+ sock.close()
+
+ def _parse_query_message(self, mdata):
+ ''' parse query message to [socket,message]'''
+ #TODO, need to add parseHeader() in case the message header is invalid
+ try:
+ msg = message(message_mode.PARSE)
+ msg.from_wire(input_buffer(mdata))
+ except Exception as err:
+ self.log_msg(str(err))
+ return rcode.FORMERR(), None
+
+ return rcode.NOERROR(), msg
+
+
+ def _get_query_zone_name(self, msg):
+ q_iter = question_iter(msg)
+ question = q_iter.get_question()
+ return question.get_name().to_text()
+
+
+ def _send_data(self, sock, data):
+ size = len(data)
+ total_count = 0
+ while total_count < size:
+ count = sock.send(data[total_count:])
+ total_count += count
+
+
+ def _send_message(self, sock, msg):
+ obuf = output_buffer(0)
+ render = message_render(obuf)
+ msg.to_wire(render)
+ header_len = struct.pack('H', socket.htons(obuf.get_length()))
+ self._send_data(sock, header_len)
+ self._send_data(sock, obuf.get_data())
+
+
+ def _reply_query_with_error_rcode(self, msg, sock, rcode_):
+ msg.make_response()
+ msg.set_rcode(rcode_)
+ self._send_message(sock, msg)
+
+
+ def _reply_query_with_format_error(self, sock, msg):
+ '''query message format isn't legal.'''
+ if not msg:
+ return # query message is invalid. send nothing back.
+
+ msg.make_response()
+ msg.set_rcode(rcode.FORMERR())
+ self._send_message(sock, msg)
+
+
+ def _zone_is_empty(self, zone):
+ if sqlite3_ds.get_zone_soa(zone, self.server.get_db_file()):
+ return False
+
+ return True
+
+ def _zone_exist(self, zonename):
+ # Find zone in datasource, should this works? maybe should ask
+ # config manager.
+ soa = sqlite3_ds.get_zone_soa(zonename, self.server.get_db_file())
+ if soa:
+ return True
+ return False
+
+
+ def _check_xfrout_available(self, zone_name):
+ '''Check if xfr request can be responsed.
+ TODO, Get zone's configuration from cfgmgr or some other place
+ eg. check allow_transfer setting,
+ '''
+ if not self._zone_exist(zone_name):
+ return rcode.NOTAUTH()
+
+ if self._zone_is_empty(zone_name):
+ return rcode.SERVFAIL()
+
+ #TODO, check allow_transfer
+ if not self.server.increase_transfers_counter():
+ return rcode.REFUSED()
+
+ return rcode.NOERROR()
+
+
+ def dns_xfrout_start(self, sock, msg_query):
+ rcode_, msg = self._parse_query_message(msg_query)
+ #TODO. create query message and parse header
+ if rcode_ != rcode.NOERROR():
+ return self._reply_query_with_format_error(msg, sock, msg_query)
+
+ zone_name = self._get_query_zone_name(msg)
+ rcode_ = self._check_xfrout_available(zone_name)
+ if rcode_ != rcode.NOERROR():
+ return self. _reply_query_with_error_rcode(msg, sock, rcode_)
+
+ try:
+ if verbose_mode:
+ self.log_msg("transfer of '%s/IN': AXFR started" % zone_name)
+
+ self._reply_xfrout_query(msg, sock, zone_name)
+
+ if verbose_mode:
+ self.log_msg("transfer of '%s/IN': AXFR end" % zone_name)
+ except Exception as err:
+ if verbose_mode:
+ sys.stderr.write(str(err))
+
+ self.server.decrease_transfers_counter()
+ return
+
+
+ def _clear_message(self, msg):
+ qid = msg.get_qid()
+ opcode = msg.get_opcode()
+ rcode = msg.get_rcode()
+
+ msg.clear(message_mode.RENDER)
+ msg.set_qid(qid)
+ msg.set_opcode(opcode)
+ msg.set_rcode(rcode)
+ msg.set_header_flag(message_flag.AA())
+ msg.set_header_flag(message_flag.QR())
+ return msg
+
+ def _create_rrset_from_db_record(self, record):
+ '''Create one rrset from one record of datasource, if the schema of record is changed,
+ This function should be updated first.
+ '''
+ rrtype_ = rr_type(record[5])
+ rdata_ = create_rdata(rrtype_, rr_class.IN(), " ".join(record[7:]))
+ rrset_ = rrset(name(record[2]), rr_class.IN(), rrtype_, rr_ttl( int(record[4])))
+ rrset_.add_rdata(rdata_)
+ return rrset_
+
+ def _send_message_with_last_soa(self, msg, sock, rrset_soa):
+ '''Add the SOA record to the end of message. If it can't be
+ added, a new message should be created to send out the last soa .
+ '''
+
+ obuf = output_buffer(0)
+ render = message_render(obuf)
+ msg.to_wire(render)
+ old_message_len = obuf.get_length()
+ msg.add_rrset(section.ANSWER(), rrset_soa)
+
+ msg.to_wire(render)
+ message_len = obuf.get_length()
+
+ if message_len != old_message_len:
+ self._send_message(sock, msg)
+ else:
+ msg = self._clear_message(msg)
+ msg.add_rrset(section.ANSWER(), rrset_soa)
+ self._send_message(sock, msg)
+
+ def _get_message_len(self, msg):
+ '''Get message length, every time need do like this? Actually there should be
+ a better way, I need check with jinmei later.
+ '''
+
+ obuf = output_buffer(0)
+ render = message_render(obuf)
+ msg.to_wire(render)
+ return obuf.get_length()
+
+
+ def _reply_xfrout_query(self, msg, sock, zone_name):
+ #TODO, there should be a better way to insert rrset.
+ msg.make_response()
+ msg.set_header_flag(message_flag.AA())
+ soa_record = sqlite3_ds.get_zone_soa(zone_name, self.server.get_db_file())
+ rrset_soa = self._create_rrset_from_db_record(soa_record)
+ msg.add_rrset(section.ANSWER(), rrset_soa)
+
+ old_message_len = 0
+ # TODO, Since add_rrset() return nothing when rrset can't be added, so I have to compare
+ # the message length to know if the rrset has been added sucessfully.
+ for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()):
+ if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown
+ raise XfroutException("shutdown!")
+
+ if rr_type(rr_data[5]) == rr_type.SOA(): #ignore soa record
+ continue
+
+ rrset_ = self._create_rrset_from_db_record(rr_data)
+ msg.add_rrset(section.ANSWER(), rrset_)
+ message_len = self._get_message_len(msg)
+ if message_len != old_message_len:
+ old_message_len = message_len
+ continue
+
+ self._send_message(sock, msg)
+ msg = self._clear_message(msg)
+ msg.add_rrset(section.ANSWER(), rrset_) # Add the rrset to the new message
+ old_message_len = 0
+
+ self._send_message_with_last_soa(msg, sock, rrset_soa)
+
+ def log_msg(self, msg):
+ print('[b10-xfrout] ', msg)
+
+
+class UnixSockServer(ThreadingUnixStreamServer):
+ '''The unix domain socket server which accept xfr query sent from auth server.'''
+
+ def __init__(self, sock_file, handle_class, shutdown_event, config_data):
+ ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
+ self._lock = threading.Lock()
+ self._transfers_counter = 0
+ self._shutdown_event = shutdown_event
+ self.update_config_data(config_data)
+
+
+ def update_config_data(self, new_config):
+ '''Apply the new config setting of xfrout module. '''
+
+ self._lock.acquire()
+ self._max_transfers_out = new_config.get('transfers_out')
+ self._db_file = new_config.get('db_file')
+ self._lock.release()
+
+ def get_db_file(self):
+ self._lock.acquire()
+ file = self._db_file
+ self._lock.release()
+ return file
+
+ def increase_transfers_counter(self):
+ '''Return False, if counter + 1 > max_transfers_out, or else
+ return True
+ '''
+ ret = False
+ self._lock.acquire()
+ if self._transfers_counter < self._max_transfers_out:
+ self._transfers_counter += 1
+ ret = True
+ self._lock.release()
+ return ret
+
+ def decrease_transfers_counter(self):
+ self._lock.acquire()
+ self._transfers_counter -= 1
+ self._lock.release()
+
+
+def listen_on_xfr_query(unix_socket_server):
+ '''Listen xfr query in one single thread. '''
+ unix_socket_server.serve_forever()
+
+
+class XfroutServer:
+ def __init__(self):
+ self._init_config_data()
+ self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
+ self._cc.start()
+ self._lock = threading.Lock()
+ self._shutdown_event = threading.Event()
+ self._listen_sock_file = '/tmp/auth_xfrout_conn' # TODO, should this be configurable in cfgmgr
+ self._start_xfr_query_listener()
+
+ def _start_xfr_query_listener(self):
+ '''Start a new thread to accept xfr query. '''
+ try:
+ os.unlink(self._listen_sock_file)
+ except:
+ pass
+
+ self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
+ self._shutdown_event, self._config_data);
+ listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
+ listener.start()
+
+
+ def _init_config_data(self):
+ '''Init the config item here. In case there is some error in config data got from cfgmgr.'''
+ self._config_data = {'transfers_out': 10, 'db_file':'/tmp/zone.sqlite3'}
+
+
+ def config_handler(self, new_config):
+ '''Update config data. TODO. Do error check'''
+
+ answer = create_answer(0)
+ for key in new_config:
+ if key not in self._config_data:
+ answer = create_answer(1, "Unknown config data: " + str(key))
+ continue
+ self._config_data[key] = new_config[key]
+
+ return answer
+
+
+ def shutdown(self):
+ ''' shutdown the xfrin process. the thread which is doing xfrin should be
+ terminated.
+ '''
+ self._shutdown_event.set()
+ self._unix_socket_server.shutdown()
+
+ main_thread = threading.currentThread()
+ for th in threading.enumerate():
+ if th is main_thread:
+ continue
+ th.join()
+
+
+ def command_handler(self, cmd, args):
+ if cmd == "shutdown":
+ if verbose_mode:
+ log_msg("Received shutdown command")
+ self.shutdown()
+ answer = create_answer(0)
+ else:
+ answer = create_answer(1, "Unknown command:" + str(cmd))
+
+ return answer
+
+
+ def run(self):
+ '''Get and process all commands sent from cfgmgr or other modules. '''
+ while not self._shutdown_event.is_set():
+ self._cc.check_command()
+
+
+xfrout_server = None
+
+def signal_handler(signal, frame):
+ if xfrout_server:
+ xfrout_server.shutdown()
+ sys.exit(0)
+
+def set_signal_handler():
+ signal.signal(signal.SIGTERM, signal_handler)
+ signal.signal(signal.SIGINT, signal_handler)
+
+def set_cmd_options(parser):
+ parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
+ help="display more about what is going on")
+
+if '__main__' == __name__:
+ try:
+ parser = OptionParser()
+ set_cmd_options(parser)
+ (options, args) = parser.parse_args()
+ verbose_mode = options.verbose
+
+ set_signal_handler()
+ xfrout_server = XfroutServer()
+ xfrout_server.run()
+ except KeyboardInterrupt:
+ print("[b10-xfrout] exit xfrout process")
+ except Exception as e:
+ print('[b10-xfrout] ', e)
+
+ if xfrout_server:
+ xfrout_server.shutdown()
+