diff options
Diffstat (limited to 'src/bin')
-rw-r--r-- | src/bin/Makefile.am | 2 | ||||
-rw-r--r-- | src/bin/auth/Makefile.am | 1 | ||||
-rw-r--r-- | src/bin/auth/main.cc | 107 | ||||
-rw-r--r-- | src/bin/bind10/bind10.py.in | 18 | ||||
-rw-r--r-- | src/bin/xfrout/Makefile.am | 18 | ||||
-rw-r--r-- | src/bin/xfrout/run_b10-xfrout.sh.in | 12 | ||||
-rw-r--r-- | src/bin/xfrout/xfrout.pre.in | 27 | ||||
-rw-r--r-- | src/bin/xfrout/xfrout.py.in | 425 |
8 files changed, 578 insertions, 32 deletions
diff --git a/src/bin/Makefile.am b/src/bin/Makefile.am index 7b331018f2..e91b658244 100644 --- a/src/bin/Makefile.am +++ b/src/bin/Makefile.am @@ -1 +1 @@ -SUBDIRS = bind10 bindctl cfgmgr loadzone msgq host cmdctl auth xfrin usermgr +SUBDIRS = bind10 bindctl cfgmgr loadzone msgq host cmdctl auth xfrin xfrout usermgr diff --git a/src/bin/auth/Makefile.am b/src/bin/auth/Makefile.am index ae22bd34ab..e90584f106 100644 --- a/src/bin/auth/Makefile.am +++ b/src/bin/auth/Makefile.am @@ -33,6 +33,7 @@ b10_auth_LDADD += $(top_builddir)/src/lib/config/.libs/libcfgclient.a b10_auth_LDADD += $(top_builddir)/src/lib/cc/libcc.a b10_auth_LDADD += $(top_builddir)/src/lib/exceptions/.libs/libexceptions.a b10_auth_LDADD += $(SQLITE_LIBS) +b10_auth_LDADD += $(top_builddir)/src/lib/xfr/.libs/libxfr.a if HAVE_BOOST_SYSTEM b10_auth_LDFLAGS = $(AM_LDFLAGS) $(BOOST_LDFLAGS) b10_auth_LDADD += $(BOOST_SYSTEM_LIB) diff --git a/src/bin/auth/main.cc b/src/bin/auth/main.cc index 1e1307ae99..96884dddd1 100644 --- a/src/bin/auth/main.cc +++ b/src/bin/auth/main.cc @@ -42,12 +42,14 @@ #include <cc/session.h> #include <cc/data.h> #include <config/ccsession.h> +#include <xfr/xfrout_client.h> #include "spec_config.h" #include "common.h" #include "auth_srv.h" using namespace std; +using namespace isc::xfr; #ifdef HAVE_BOOST_SYSTEM using namespace boost::asio; @@ -103,7 +105,38 @@ my_command_handler(const string& command, const ElementPtr args) { return answer; } -#ifdef HAVE_BOOST_SYSTEM +//TODO. The sample way for checking axfr query, the code should be merged to auth server class +static bool +check_axfr_query(char *msg_data, uint16_t msg_len) +{ + if (msg_len < 15) + return false; + + uint16_t query_type = *(uint16_t *)(msg_data + (msg_len - 4)); + if ( query_type == 0xFC00) + return true; + + return false; +} + +//TODO. Send the xfr query to xfrout module, the code should be merged to auth server class +static void +dispatch_axfr_query(int tcp_sock, char axfr_query[], uint16_t query_len) +{ + std::string path = "/tmp/auth_xfrout_conn"; + XfroutClient xfr_client(path); + try { + xfr_client.connect(); + xfr_client.sendXfroutRequestInfo(tcp_sock, (uint8_t *)axfr_query, query_len); + xfr_client.disconnect(); + } + catch (const std::exception & err) { + //if (verbose_mode) + cerr << "error handle xfr query:" << err.what() << endl; + } +} + +#ifdef HAVE_BOOSTLIB // // Helper classes for asynchronous I/O using boost::asio // @@ -148,17 +181,24 @@ public: { if (!error) { InputBuffer dnsbuffer(data_, bytes_transferred); - if (auth_server->processMessage(dnsbuffer, dns_message_, - response_renderer_, false)) { - responselen_buffer_.writeUint16(response_buffer_.getLength()); - async_write(socket_, - boost::asio::buffer( - responselen_buffer_.getData(), - responselen_buffer_.getLength()), - boost::bind(&TCPClient::responseWrite, this, - placeholders::error)); - } else { - delete this; + if (check_axfr_query(data_, bytes_transferred)) { + dispatch_axfr_query(socket_.native(), data_, bytes_transferred); + // start to get new query ? + start(); + } + else { + if (auth_server->processMessage(dnsbuffer, dns_message_, + response_renderer_, false)) { + responselen_buffer_.writeUint16(response_buffer_.getLength()); + async_write(socket_, + boost::asio::buffer( + responselen_buffer_.getData(), + responselen_buffer_.getLength()), + boost::bind(&TCPClient::responseWrite, this, + placeholders::error)); + } else { + delete this; + } } } else { delete this; @@ -537,25 +577,30 @@ processMessageTCP(const int fd, Message& dns_message, cc += cc0; } - InputBuffer buffer(&message_buffer[0], size); - dns_message.clear(Message::PARSE); - response_renderer.clear(); - if (auth_server->processMessage(buffer, dns_message, response_renderer, - false)) { - size = response_renderer.getLength(); - size_n = htons(size); - if (send(ts, &size_n, 2, 0) == 2) { - cc = send(ts, response_renderer.getData(), - response_renderer.getLength(), 0); - if (cc == -1) { - if (verbose_mode) { - cerr << "[AuthSrv] error in sending TCP response message" << - endl; - } - } else { - if (verbose_mode) { - cerr << "[XX] sent TCP response: " << cc << " bytes" - << endl; + if (check_axfr_query(&message_buffer[0], size)) { + dispatch_axfr_query(ts, &message_buffer[0], size); + } + else { + InputBuffer buffer(&message_buffer[0], size); + dns_message.clear(Message::PARSE); + response_renderer.clear(); + if (auth_server->processMessage(buffer, dns_message, response_renderer, + false)) { + size = response_renderer.getLength(); + size_n = htons(size); + if (send(ts, &size_n, 2, 0) == 2) { + cc = send(ts, response_renderer.getData(), + response_renderer.getLength(), 0); + if (cc == -1) { + if (verbose_mode) { + cerr << "[AuthSrv] error in sending TCP response message" << + endl; + } + } else { + if (verbose_mode) { + cerr << "[XX] sent TCP response: " << cc << " bytes" + << endl; + } } } } else { diff --git a/src/bin/bind10/bind10.py.in b/src/bin/bind10/bind10.py.in index 01223dcaa9..5ef69a5077 100644 --- a/src/bin/bind10/bind10.py.in +++ b/src/bin/bind10/bind10.py.in @@ -265,6 +265,21 @@ class BoB: if self.verbose: print("[XX] ccsession started") + # start the xfrout before auth-server, to make sure every xfr-query can be + # processed properly. + if self.verbose: + sys.stdout.write("Starting b10-xfrout\n") + try: + xfrout = ProcessInfo("b10-xfrout", ["b10-xfrout"], + { 'ISC_MSGQ_PORT': str(self.c_channel_port)}) + except Exception as e: + c_channel.process.kill() + bind_cfgd.process.kill() + return "Unable to start b10-xfrout; " + str(e) + self.processes[xfrout.pid] = xfrout + if self.verbose: + sys.stdout.write("Started b10-xfrout (PID %d)\n" % xfrout.pid) + # start b10-auth # XXX: this must be read from the configuration manager in the future authargs = ['b10-auth', '-p', str(self.auth_port)] @@ -278,6 +293,7 @@ class BoB: except Exception as e: c_channel.process.kill() bind_cfgd.process.kill() + xfrout.process.kill() return "Unable to start b10-auth; " + str(e) self.processes[auth.pid] = auth if self.verbose: @@ -292,6 +308,7 @@ class BoB: except Exception as e: c_channel.process.kill() bind_cfgd.process.kill() + xfrout.process.kill() auth.process.kill() return "Unable to start b10-xfrin; " + str(e) self.processes[xfrind.pid] = xfrind @@ -308,6 +325,7 @@ class BoB: except Exception as e: c_channel.process.kill() bind_cfgd.process.kill() + xfrout.process.kill() auth.process.kill() xfrind.process.kill() return "Unable to start b10-cmdctl; " + str(e) diff --git a/src/bin/xfrout/Makefile.am b/src/bin/xfrout/Makefile.am new file mode 100644 index 0000000000..1284ba6d5e --- /dev/null +++ b/src/bin/xfrout/Makefile.am @@ -0,0 +1,18 @@ +pkglibexecdir = $(libexecdir)/@PACKAGE@ + +pkglibexec_SCRIPTS = b10-xfrout + +b10_xfroutdir = $(DESTDIR)$(pkgdatadir) +b10_xfrout_DATA = xfrout.spec + +CLEANFILES= xfrout.py xfrout.pre xfrout.spec + +xfrout.spec: xfrout.pre + $(SED) -e "s|@@LOCALSTATEDIR@@|$(localstatedir)|" xfrout.pre >$@ + +# TODO: does this need $$(DESTDIR) also? +# this is done here since configure.ac AC_OUTPUT doesn't expand exec_prefix +b10-xfrout: xfrout.py + $(SED) -e "s|@@PYTHONPATH@@|@pyexecdir@|" \ + -e "s|@@LOCALSTATEDIR@@|$(localstatedir)|" xfrout.py >$@ + chmod a+x $@ diff --git a/src/bin/xfrout/run_b10-xfrout.sh.in b/src/bin/xfrout/run_b10-xfrout.sh.in new file mode 100644 index 0000000000..9be34921e7 --- /dev/null +++ b/src/bin/xfrout/run_b10-xfrout.sh.in @@ -0,0 +1,12 @@ +#! /bin/sh + +PYTHON_EXEC=${PYTHON_EXEC:-@PYTHON@} +export PYTHON_EXEC + +MYPATH_PATH=@abs_top_builddir@/src/bin/xfrout +PYTHONPATH=@abs_top_srcdir@/src/lib/python +export PYTHONPATH + +cd ${MYPATH_PATH} +${PYTHON_EXEC} b10-xfrout + diff --git a/src/bin/xfrout/xfrout.pre.in b/src/bin/xfrout/xfrout.pre.in new file mode 100644 index 0000000000..495d3fd2a0 --- /dev/null +++ b/src/bin/xfrout/xfrout.pre.in @@ -0,0 +1,27 @@ +{ + "module_spec": { + "module_name": "Xfrout", + "config_data": [ + { + "item_name": "transfers_out", + "item_type": "integer", + "item_optional": False, + "item_default": 10 + }, + { + "item_name": "db_file", + "item_type": "string", + "item_optional": False, + "item_default": '@@LOCALSTATEDIR@@/@PACKAGE@/zone.sqlite3' + } + ], + "commands": [ + { + "command_name": "shutdown", + "command_description": "Shut down Xfrout", + "command_args": [] + } + ] + } +} + diff --git a/src/bin/xfrout/xfrout.py.in b/src/bin/xfrout/xfrout.py.in new file mode 100644 index 0000000000..d909f43e16 --- /dev/null +++ b/src/bin/xfrout/xfrout.py.in @@ -0,0 +1,425 @@ +#!@PYTHON@ + +# Copyright (C) 2010 Internet Systems Consortium. +# +# Permission to use, copy, modify, and distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM +# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL +# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, +# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING +# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, +# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION +# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + + +import sys; sys.path.append ('@@PYTHONPATH@@') +import isc +import isc.cc +import os +import threading +import struct +import signal +from isc.auth import sqlite3_ds +from socketserver import * +import os +from isc.config.ccsession import * +import socket +from bind10_xfr import * +from bind10_dns import * +from optparse import OptionParser, OptionValueError +try: + from bind10_xfr import * + from bind10_dns import * +except: + pass + +if "B10_FROM_SOURCE" in os.environ: + SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrout" +else: + PREFIX = "@prefix@" + DATAROOTDIR = "@datarootdir@" + SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX) +SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec" + +MAX_TRANSFERS_OUT = 10 +verbose_mode = True + + +class XfroutException(Exception): pass + + +class XfroutSession(BaseRequestHandler): + def handle(self): + fd = recv_fd(self.request.fileno()) + data_len = self.request.recv(2) + msg_len = struct.unpack('H', data_len)[0] + msgdata = self.request.recv(msg_len) + sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) + try: + self.dns_xfrout_start(sock, msgdata) + except Exception as e: + self.log_msg(str(e)) + + sock.close() + + def _parse_query_message(self, mdata): + ''' parse query message to [socket,message]''' + #TODO, need to add parseHeader() in case the message header is invalid + try: + msg = message(message_mode.PARSE) + msg.from_wire(input_buffer(mdata)) + except Exception as err: + self.log_msg(str(err)) + return rcode.FORMERR(), None + + return rcode.NOERROR(), msg + + + def _get_query_zone_name(self, msg): + q_iter = question_iter(msg) + question = q_iter.get_question() + return question.get_name().to_text() + + + def _send_data(self, sock, data): + size = len(data) + total_count = 0 + while total_count < size: + count = sock.send(data[total_count:]) + total_count += count + + + def _send_message(self, sock, msg): + obuf = output_buffer(0) + render = message_render(obuf) + msg.to_wire(render) + header_len = struct.pack('H', socket.htons(obuf.get_length())) + self._send_data(sock, header_len) + self._send_data(sock, obuf.get_data()) + + + def _reply_query_with_error_rcode(self, msg, sock, rcode_): + msg.make_response() + msg.set_rcode(rcode_) + self._send_message(sock, msg) + + + def _reply_query_with_format_error(self, sock, msg): + '''query message format isn't legal.''' + if not msg: + return # query message is invalid. send nothing back. + + msg.make_response() + msg.set_rcode(rcode.FORMERR()) + self._send_message(sock, msg) + + + def _zone_is_empty(self, zone): + if sqlite3_ds.get_zone_soa(zone, self.server.get_db_file()): + return False + + return True + + def _zone_exist(self, zonename): + # Find zone in datasource, should this works? maybe should ask + # config manager. + soa = sqlite3_ds.get_zone_soa(zonename, self.server.get_db_file()) + if soa: + return True + return False + + + def _check_xfrout_available(self, zone_name): + '''Check if xfr request can be responsed. + TODO, Get zone's configuration from cfgmgr or some other place + eg. check allow_transfer setting, + ''' + if not self._zone_exist(zone_name): + return rcode.NOTAUTH() + + if self._zone_is_empty(zone_name): + return rcode.SERVFAIL() + + #TODO, check allow_transfer + if not self.server.increase_transfers_counter(): + return rcode.REFUSED() + + return rcode.NOERROR() + + + def dns_xfrout_start(self, sock, msg_query): + rcode_, msg = self._parse_query_message(msg_query) + #TODO. create query message and parse header + if rcode_ != rcode.NOERROR(): + return self._reply_query_with_format_error(msg, sock, msg_query) + + zone_name = self._get_query_zone_name(msg) + rcode_ = self._check_xfrout_available(zone_name) + if rcode_ != rcode.NOERROR(): + return self. _reply_query_with_error_rcode(msg, sock, rcode_) + + try: + if verbose_mode: + self.log_msg("transfer of '%s/IN': AXFR started" % zone_name) + + self._reply_xfrout_query(msg, sock, zone_name) + + if verbose_mode: + self.log_msg("transfer of '%s/IN': AXFR end" % zone_name) + except Exception as err: + if verbose_mode: + sys.stderr.write(str(err)) + + self.server.decrease_transfers_counter() + return + + + def _clear_message(self, msg): + qid = msg.get_qid() + opcode = msg.get_opcode() + rcode = msg.get_rcode() + + msg.clear(message_mode.RENDER) + msg.set_qid(qid) + msg.set_opcode(opcode) + msg.set_rcode(rcode) + msg.set_header_flag(message_flag.AA()) + msg.set_header_flag(message_flag.QR()) + return msg + + def _create_rrset_from_db_record(self, record): + '''Create one rrset from one record of datasource, if the schema of record is changed, + This function should be updated first. + ''' + rrtype_ = rr_type(record[5]) + rdata_ = create_rdata(rrtype_, rr_class.IN(), " ".join(record[7:])) + rrset_ = rrset(name(record[2]), rr_class.IN(), rrtype_, rr_ttl( int(record[4]))) + rrset_.add_rdata(rdata_) + return rrset_ + + def _send_message_with_last_soa(self, msg, sock, rrset_soa): + '''Add the SOA record to the end of message. If it can't be + added, a new message should be created to send out the last soa . + ''' + + obuf = output_buffer(0) + render = message_render(obuf) + msg.to_wire(render) + old_message_len = obuf.get_length() + msg.add_rrset(section.ANSWER(), rrset_soa) + + msg.to_wire(render) + message_len = obuf.get_length() + + if message_len != old_message_len: + self._send_message(sock, msg) + else: + msg = self._clear_message(msg) + msg.add_rrset(section.ANSWER(), rrset_soa) + self._send_message(sock, msg) + + def _get_message_len(self, msg): + '''Get message length, every time need do like this? Actually there should be + a better way, I need check with jinmei later. + ''' + + obuf = output_buffer(0) + render = message_render(obuf) + msg.to_wire(render) + return obuf.get_length() + + + def _reply_xfrout_query(self, msg, sock, zone_name): + #TODO, there should be a better way to insert rrset. + msg.make_response() + msg.set_header_flag(message_flag.AA()) + soa_record = sqlite3_ds.get_zone_soa(zone_name, self.server.get_db_file()) + rrset_soa = self._create_rrset_from_db_record(soa_record) + msg.add_rrset(section.ANSWER(), rrset_soa) + + old_message_len = 0 + # TODO, Since add_rrset() return nothing when rrset can't be added, so I have to compare + # the message length to know if the rrset has been added sucessfully. + for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()): + if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown + raise XfroutException("shutdown!") + + if rr_type(rr_data[5]) == rr_type.SOA(): #ignore soa record + continue + + rrset_ = self._create_rrset_from_db_record(rr_data) + msg.add_rrset(section.ANSWER(), rrset_) + message_len = self._get_message_len(msg) + if message_len != old_message_len: + old_message_len = message_len + continue + + self._send_message(sock, msg) + msg = self._clear_message(msg) + msg.add_rrset(section.ANSWER(), rrset_) # Add the rrset to the new message + old_message_len = 0 + + self._send_message_with_last_soa(msg, sock, rrset_soa) + + def log_msg(self, msg): + print('[b10-xfrout] ', msg) + + +class UnixSockServer(ThreadingUnixStreamServer): + '''The unix domain socket server which accept xfr query sent from auth server.''' + + def __init__(self, sock_file, handle_class, shutdown_event, config_data): + ThreadingUnixStreamServer.__init__(self, sock_file, handle_class) + self._lock = threading.Lock() + self._transfers_counter = 0 + self._shutdown_event = shutdown_event + self.update_config_data(config_data) + + + def update_config_data(self, new_config): + '''Apply the new config setting of xfrout module. ''' + + self._lock.acquire() + self._max_transfers_out = new_config.get('transfers_out') + self._db_file = new_config.get('db_file') + self._lock.release() + + def get_db_file(self): + self._lock.acquire() + file = self._db_file + self._lock.release() + return file + + def increase_transfers_counter(self): + '''Return False, if counter + 1 > max_transfers_out, or else + return True + ''' + ret = False + self._lock.acquire() + if self._transfers_counter < self._max_transfers_out: + self._transfers_counter += 1 + ret = True + self._lock.release() + return ret + + def decrease_transfers_counter(self): + self._lock.acquire() + self._transfers_counter -= 1 + self._lock.release() + + +def listen_on_xfr_query(unix_socket_server): + '''Listen xfr query in one single thread. ''' + unix_socket_server.serve_forever() + + +class XfroutServer: + def __init__(self): + self._init_config_data() + self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler) + self._cc.start() + self._lock = threading.Lock() + self._shutdown_event = threading.Event() + self._listen_sock_file = '/tmp/auth_xfrout_conn' # TODO, should this be configurable in cfgmgr + self._start_xfr_query_listener() + + def _start_xfr_query_listener(self): + '''Start a new thread to accept xfr query. ''' + try: + os.unlink(self._listen_sock_file) + except: + pass + + self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession, + self._shutdown_event, self._config_data); + listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,)) + listener.start() + + + def _init_config_data(self): + '''Init the config item here. In case there is some error in config data got from cfgmgr.''' + self._config_data = {'transfers_out': 10, 'db_file':'/tmp/zone.sqlite3'} + + + def config_handler(self, new_config): + '''Update config data. TODO. Do error check''' + + answer = create_answer(0) + for key in new_config: + if key not in self._config_data: + answer = create_answer(1, "Unknown config data: " + str(key)) + continue + self._config_data[key] = new_config[key] + + return answer + + + def shutdown(self): + ''' shutdown the xfrin process. the thread which is doing xfrin should be + terminated. + ''' + self._shutdown_event.set() + self._unix_socket_server.shutdown() + + main_thread = threading.currentThread() + for th in threading.enumerate(): + if th is main_thread: + continue + th.join() + + + def command_handler(self, cmd, args): + if cmd == "shutdown": + if verbose_mode: + log_msg("Received shutdown command") + self.shutdown() + answer = create_answer(0) + else: + answer = create_answer(1, "Unknown command:" + str(cmd)) + + return answer + + + def run(self): + '''Get and process all commands sent from cfgmgr or other modules. ''' + while not self._shutdown_event.is_set(): + self._cc.check_command() + + +xfrout_server = None + +def signal_handler(signal, frame): + if xfrout_server: + xfrout_server.shutdown() + sys.exit(0) + +def set_signal_handler(): + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + +def set_cmd_options(parser): + parser.add_option("-v", "--verbose", dest="verbose", action="store_true", + help="display more about what is going on") + +if '__main__' == __name__: + try: + parser = OptionParser() + set_cmd_options(parser) + (options, args) = parser.parse_args() + verbose_mode = options.verbose + + set_signal_handler() + xfrout_server = XfroutServer() + xfrout_server.run() + except KeyboardInterrupt: + print("[b10-xfrout] exit xfrout process") + except Exception as e: + print('[b10-xfrout] ', e) + + if xfrout_server: + xfrout_server.shutdown() + |