diff options
Diffstat (limited to 'src/bin/xfrin/xfrin.py.in')
-rwxr-xr-x | src/bin/xfrin/xfrin.py.in | 811 |
1 files changed, 475 insertions, 336 deletions
diff --git a/src/bin/xfrin/xfrin.py.in b/src/bin/xfrin/xfrin.py.in index 206640439a..a894d550f7 100755 --- a/src/bin/xfrin/xfrin.py.in +++ b/src/bin/xfrin/xfrin.py.in @@ -28,14 +28,17 @@ import time from functools import reduce from optparse import OptionParser, OptionValueError from isc.config.ccsession import * -from isc.statistics import Counters +from isc.statistics.dns import Counters from isc.notify import notify_out import isc.util.process +import isc.util.traceback_handler +from isc.util.address_formatter import AddressFormatter from isc.datasrc import DataSourceClient, ZoneFinder import isc.net.parse from isc.xfrin.diff import Diff from isc.server_common.auth_command import auth_loadzone_command from isc.server_common.tsig_keyring import init_keyring, get_keyring +from isc.server_common.datasrc_clients_mgr import DataSrcClientsMgr, ConfigError from isc.log_messages.xfrin_messages import * from isc.dns import * @@ -55,13 +58,9 @@ isc.util.process.rename() SPECFILE_PATH = "@datadir@/@PACKAGE@"\ .replace("${datarootdir}", "@datarootdir@")\ .replace("${prefix}", "@prefix@") -AUTH_SPECFILE_PATH = SPECFILE_PATH if "B10_FROM_SOURCE" in os.environ: SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrin" -if "B10_FROM_BUILD" in os.environ: - AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth" SPECFILE_LOCATION = SPECFILE_PATH + "/xfrin.spec" -AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + "/auth.spec" AUTH_MODULE_NAME = 'Auth' XFROUT_MODULE_NAME = 'Xfrout' @@ -565,18 +564,26 @@ class XfrinConnection(asyncore.dispatcher): def __init__(self, sock_map, zone_name, rrclass, datasrc_client, - shutdown_event, master_addrinfo, db_file, tsig_key=None, - idle_timeout=60): - '''Constructor of the XfirnConnection class. + shutdown_event, master_addrinfo, zone_soa, counters, + tsig_key=None, idle_timeout=60): + """Constructor of the XfirnConnection class. + + Parameters: + sock_map: empty dict, used with asyncore. + zone_name (dns.Name): Zone name. + rrclass (dns.RRClass): Zone RR class. + datasrc_client (DataSourceClient): the data source client object + used for the XFR session. + shutdown_event (threading.Event): used for synchronization with + parent thread. + master_addrinfo (tuple: (sock family, sock type, sockaddr)): + address and port of the master server. + zone_soa (RRset or None): SOA RRset of zone's current SOA or None + if it's not available. + counters (Counters): used for statistics counters + idle_timeout (int): max idle time for read data from socket. - db_file: SQLite3 DB file. Unforutnately we still need this for - temporary workaround in _get_zone_soa(). This should be - removed when we eliminate the need for the workaround. - idle_timeout: max idle time for read data from socket. - datasrc_client: the data source client object used for the XFR session. - This will eventually replace db_file completely. - - ''' + """ asyncore.dispatcher.__init__(self, map=sock_map) @@ -595,9 +602,8 @@ class XfrinConnection(asyncore.dispatcher): self._rrclass = rrclass # Data source handler - self._db_file = db_file self._datasrc_client = datasrc_client - self._zone_soa = self._get_zone_soa() + self._zone_soa = zone_soa self._sock_map = sock_map self._soa_rr_count = 0 @@ -605,6 +611,11 @@ class XfrinConnection(asyncore.dispatcher): self._shutdown_event = shutdown_event self._master_addrinfo = master_addrinfo self._tsig_key = tsig_key + # self.tsig_key_name is used for outputting an error massage in + # connect_to_master(). + self.tsig_key_name = None + if tsig_key: + self.tsig_key_name = self._tsig_key.get_key_name() self._tsig_ctx = None # tsig_ctx_creator is introduced to allow tests to use a mock class for # easier tests (in normal case we always use the default) @@ -613,7 +624,75 @@ class XfrinConnection(asyncore.dispatcher): # keep a record of this specific transfer to log on success # (time, rr/s, etc) self._transfer_stats = XfrinTransferStats() - self._counters = Counters(SPECFILE_LOCATION) + self._counters = counters + + def create_socket(self, family, type): + """create_socket() overridden from the super class for + statistics counter open and openfail""" + try: + ret = super().create_socket(family, type) + # count open + self._counters.inc('socket', + 'ip' + self._get_ipver_str(), + 'tcp', 'open') + return ret + except: + # count openfail + self._counters.inc('socket', + 'ip' + self._get_ipver_str(), + 'tcp', 'openfail') + raise + + def close(self): + """close() overridden from the super class for + statistics counter close""" + ret = super().close() + # count close + self._counters.inc('socket', + 'ip' + self._get_ipver_str(), + 'tcp', 'close') + return ret + + def connect(self, address): + """connect() overridden from the super class for + statistics counter conn and connfail""" + try: + ret = super().connect(address) + # count conn + self._counters.inc('socket', + 'ip' + self._get_ipver_str(), + 'tcp', 'conn') + return ret + except: + # count connfail + self._counters.inc('socket', + 'ip' + self._get_ipver_str(), + 'tcp', 'connfail') + raise + + def send(self, data): + """send() overridden from the super class for + statistics counter senderr""" + try: + return super().send(data) + except: + # count senderr + self._counters.inc('socket', + 'ip' + self._get_ipver_str(), + 'tcp', 'senderr') + raise + + def recv(self, buffer_size): + """recv() overridden from the super class for + statistics counter senderr""" + try: + return super().recv(buffer_size) + except: + # count recverr + self._counters.inc('socket', + 'ip' + self._get_ipver_str(), + 'tcp', 'recverr') + raise def init_socket(self): '''Initialize the underlyig socket. @@ -626,54 +705,6 @@ class XfrinConnection(asyncore.dispatcher): self.create_socket(self._master_addrinfo[0], self._master_addrinfo[1]) self.socket.setblocking(1) - def _get_zone_soa(self): - '''Retrieve the current SOA RR of the zone to be transferred. - - It will be used for various purposes in subsequent xfr protocol - processing. It is validly possible that the zone is currently - empty and therefore doesn't have an SOA, so this method doesn't - consider it an error and returns None in such a case. It may or - may not result in failure in the actual processing depending on - how the SOA is used. - - When the zone has an SOA RR, this method makes sure that it's - valid, i.e., it has exactly one RDATA; if it is not the case - this method returns None. - - If the underlying data source doesn't even know the zone, this method - tries to provide backward compatible behavior where xfrin is - responsible for creating zone in the corresponding DB table. - For a longer term we should deprecate this behavior by introducing - more generic zone management framework, but at the moment we try - to not surprise existing users. (Note also that the part of - providing the compatible behavior uses the old data source API. - We'll deprecate this API in a near future, too). - - ''' - # get the zone finder. this must be SUCCESS (not even - # PARTIALMATCH) because we are specifying the zone origin name. - result, finder = self._datasrc_client.find_zone(self._zone_name) - if result != DataSourceClient.SUCCESS: - # The data source doesn't know the zone. For now, we provide - # backward compatibility and creates a new one ourselves. - isc.datasrc.sqlite3_ds.load(self._db_file, - self._zone_name.to_text(), - lambda : []) - logger.warn(XFRIN_ZONE_CREATED, self.zone_str()) - # try again - result, finder = self._datasrc_client.find_zone(self._zone_name) - if result != DataSourceClient.SUCCESS: - return None - result, soa_rrset, _ = finder.find(self._zone_name, RRType.SOA) - if result != ZoneFinder.SUCCESS: - logger.info(XFRIN_ZONE_NO_SOA, self.zone_str()) - return None - if soa_rrset.get_rdata_count() != 1: - logger.warn(XFRIN_ZONE_MULTIPLE_SOA, self.zone_str(), - soa_rrset.get_rdata_count()) - return None - return soa_rrset - def __set_xfrstate(self, new_state): self.__state = new_state @@ -696,7 +727,8 @@ class XfrinConnection(asyncore.dispatcher): self.connect(self._master_addrinfo[2]) return True except socket.error as e: - logger.error(XFRIN_CONNECT_MASTER, self._master_addrinfo[2], + logger.error(XFRIN_CONNECT_MASTER, self.tsig_key_name, + self._master_addrinfo[2], str(e)) return False @@ -746,8 +778,9 @@ class XfrinConnection(asyncore.dispatcher): msg = self._create_query(query_type) render = MessageRenderer() - # XXX Currently, python wrapper doesn't accept 'None' parameter in this case, - # we should remove the if statement and use a universal interface later. + # XXX Currently, python wrapper doesn't accept 'None' parameter in this + # case, we should remove the if statement and use a universal + # interface later. if self._tsig_key is not None: self._tsig_ctx = self._tsig_ctx_creator(self._tsig_key) msg.to_wire(render, self._tsig_ctx) @@ -810,14 +843,15 @@ class XfrinConnection(asyncore.dispatcher): ''' Used as error callback below. ''' - logger.error(XFRIN_ZONE_INVALID, self._zone_name, self._rrclass, - reason) + logger.error(XFRIN_ZONE_INVALID, self._zone_name, + self._rrclass, reason) def __validate_warning(self, reason): ''' Used as warning callback below. ''' - logger.warn(XFRIN_ZONE_WARN, self._zone_name, self._rrclass, reason) + logger.warn(XFRIN_ZONE_WARN, self._zone_name, + self._rrclass, reason) def finish_transfer(self): """ @@ -900,9 +934,9 @@ class XfrinConnection(asyncore.dispatcher): It raises a ValueError exception on other address families. """ - if self.socket.family == socket.AF_INET: + if self._master_addrinfo[0] == socket.AF_INET: return 'v4' - elif self.socket.family == socket.AF_INET6: + elif self._master_addrinfo[0] == socket.AF_INET6: return 'v6' raise ValueError("Invalid address family. " "This is supported only for IP sockets") @@ -916,36 +950,44 @@ class XfrinConnection(asyncore.dispatcher): ''' - self._send_query(RRType.SOA) - # count soaoutv4 or soaoutv6 requests - self._counters.inc('zones', self._zone_name.to_text(), - 'soaout' + self._get_ipver_str()) - data_len = self._get_request_response(2) - msg_len = socket.htons(struct.unpack('H', data_len)[0]) - soa_response = self._get_request_response(msg_len) - msg = Message(Message.PARSE) - msg.from_wire(soa_response, Message.PRESERVE_ORDER) - - # Validate/parse the rest of the response, and extract the SOA - # from the answer section - soa = self.__parse_soa_response(msg, soa_response) - - # Compare the two serials. If ours is 'new', abort with ZoneUptodate. - primary_serial = get_soa_serial(soa.get_rdata()[0]) - if self._request_serial is not None and \ - self._request_serial >= primary_serial: - if self._request_serial != primary_serial: - logger.info(XFRIN_ZONE_SERIAL_AHEAD, primary_serial, - self.zone_str(), - format_addrinfo(self._master_addrinfo), - self._request_serial) - raise XfrinZoneUptodate - - return XFRIN_OK + # increment SOA query in progress + self._counters.inc('soa_in_progress') + try: + self._send_query(RRType.SOA) + # count soaoutv4 or soaoutv6 requests + self._counters.inc('zones', self._rrclass.to_text(), + self._zone_name.to_text(), 'soaout' + + self._get_ipver_str()) + data_len = self._get_request_response(2) + msg_len = socket.htons(struct.unpack('H', data_len)[0]) + soa_response = self._get_request_response(msg_len) + msg = Message(Message.PARSE) + msg.from_wire(soa_response, Message.PRESERVE_ORDER) + + # Validate/parse the rest of the response, and extract the SOA + # from the answer section + soa = self.__parse_soa_response(msg, soa_response) + + # Compare the two serials. If ours is 'new', abort with ZoneUptodate. + primary_serial = get_soa_serial(soa.get_rdata()[0]) + if self._request_serial is not None and \ + self._request_serial >= primary_serial: + if self._request_serial != primary_serial: + logger.info(XFRIN_ZONE_SERIAL_AHEAD, primary_serial, + self.zone_str(), + format_addrinfo(self._master_addrinfo), + self._request_serial) + raise XfrinZoneUptodate + + return XFRIN_OK + finally: + # decrement SOA query in progress + self._counters.dec('soa_in_progress') def do_xfrin(self, check_soa, request_type=RRType.AXFR): '''Do an xfr session by sending xfr request and parsing responses.''' + xfer_started = False # Don't set True until xfer is started try: ret = XFRIN_OK self._request_type = request_type @@ -957,16 +999,24 @@ class XfrinConnection(asyncore.dispatcher): if not self.connect_to_master(): raise XfrinException('Unable to reconnect to master') + xfer_started = True + # increment xfer running + self._counters.inc(req_str.lower() + '_running') # start statistics timer # Note: If the timer for the zone is already started but # not yet stopped due to some error, the last start time # is overwritten at this point. - self._counters.start_timer('zones', self._zone_name.to_text(), - 'last_' + req_str.lower() + '_duration') + self._counters.start_timer('zones', + self._rrclass.to_text(), + self._zone_name.to_text(), + 'last_' + req_str.lower() + + '_duration') logger.info(XFRIN_XFR_TRANSFER_STARTED, req_str, self.zone_str()) # An AXFR or IXFR is being requested. - self._counters.inc('zones', self._zone_name.to_text(), - req_str.lower() + 'req' + self._get_ipver_str()) + self._counters.inc('zones', self._rrclass.to_text(), + self._zone_name.to_text(), + req_str.lower() + 'req' + + self._get_ipver_str()) self._send_query(self._request_type) self.__state = XfrinInitialSOA() self._handle_xfrin_responses() @@ -1002,17 +1052,18 @@ class XfrinConnection(asyncore.dispatcher): # The log message doesn't contain the exception text, since there's # only one place where the exception is thrown now and it'd be the # same generic message every time. - logger.error(XFRIN_INVALID_ZONE_DATA, self.zone_str(), + logger.error(XFRIN_INVALID_ZONE_DATA, + self.zone_str(), format_addrinfo(self._master_addrinfo)) ret = XFRIN_FAIL except XfrinProtocolError as e: - logger.info(XFRIN_XFR_TRANSFER_PROTOCOL_VIOLATION, req_str, - self.zone_str(), + logger.info(XFRIN_XFR_TRANSFER_PROTOCOL_VIOLATION, + req_str, self.zone_str(), format_addrinfo(self._master_addrinfo), str(e)) ret = XFRIN_FAIL except XfrinException as e: - logger.error(XFRIN_XFR_TRANSFER_FAILURE, req_str, - self.zone_str(), + logger.error(XFRIN_XFR_TRANSFER_FAILURE, + req_str, self.zone_str(), format_addrinfo(self._master_addrinfo), str(e)) ret = XFRIN_FAIL except Exception as e: @@ -1031,14 +1082,19 @@ class XfrinConnection(asyncore.dispatcher): # A xfrsuccess or xfrfail counter is incremented depending on # the result. result = {XFRIN_OK: 'xfrsuccess', XFRIN_FAIL: 'xfrfail'}[ret] - self._counters.inc('zones', self._zone_name.to_text(), result) + self._counters.inc('zones', self._rrclass.to_text(), + self._zone_name.to_text(), result) # The started statistics timer is finally stopped only in # a successful case. if ret == XFRIN_OK: self._counters.stop_timer('zones', + self._rrclass.to_text(), self._zone_name.to_text(), 'last_' + req_str.lower() + '_duration') + # decrement xfer running only if started + if xfer_started: + self._counters.dec(req_str.lower() + '_running') # Make sure any remaining transaction in the diff is closed # (if not yet - possible in case of xfr-level exception) as soon # as possible @@ -1114,55 +1170,77 @@ class XfrinConnection(asyncore.dispatcher): return False -def __process_xfrin(server, zone_name, rrclass, db_file, +def __get_initial_xfr_type(zone_soa, request_ixfr, zname, zclass, master_addr): + """Determine the initial xfr request type. + + This is a dedicated subroutine of __process_xfrin. + """ + if zone_soa is None: + # This is a kind of special case, so we log it at info level. + logger.info(XFRIN_INITIAL_AXFR, format_zone_str(zname, zclass), + AddressFormatter(master_addr)) + return RRType.AXFR + if request_ixfr == ZoneInfo.REQUEST_IXFR_DISABLED: + logger.debug(DBG_XFRIN_TRACE, XFRIN_INITIAL_IXFR_DISABLED, + format_zone_str(zname, zclass), + AddressFormatter(master_addr)) + return RRType.AXFR + + assert(request_ixfr == ZoneInfo.REQUEST_IXFR_FIRST or + request_ixfr == ZoneInfo.REQUEST_IXFR_ONLY) + logger.debug(DBG_XFRIN_TRACE, XFRIN_INITIAL_IXFR, + format_zone_str(zname, zclass), + AddressFormatter(master_addr)) + return RRType.IXFR + +def __process_xfrin(server, zone_name, rrclass, datasrc_client, zone_soa, shutdown_event, master_addrinfo, check_soa, tsig_key, - request_type, conn_class): + request_ixfr, counters, conn_class): conn = None exception = None ret = XFRIN_FAIL try: - # Create a data source client used in this XFR session. Right now we - # still assume an sqlite3-based data source, and use both the old and new - # data source APIs. We also need to use a mock client for tests. - # For a temporary workaround to deal with these situations, we skip the - # creation when the given file is none (the test case). Eventually - # this code will be much cleaner. - datasrc_client = None - if db_file is not None: - # temporary hardcoded sqlite initialization. Once we decide on - # the config specification, we need to update this (TODO) - # this may depend on #1207, or any follow-up ticket created for #1207 - datasrc_type = "sqlite3" - datasrc_config = "{ \"database_file\": \"" + db_file + "\"}" - datasrc_client = DataSourceClient(datasrc_type, datasrc_config) - - # Create a TCP connection for the XFR session and perform the operation. + # Determine the initialreuqest type: AXFR or IXFR. + request_type = __get_initial_xfr_type(zone_soa, request_ixfr, + zone_name, rrclass, + master_addrinfo[2]) + + # Create a TCP connection for the XFR session and perform the + # operation. sock_map = {} - # In case we were asked to do IXFR and that one fails, we try again with - # AXFR. But only if we could actually connect to the server. + # In case we were asked to do IXFR and that one fails, we try again + # with AXFR. But only if we could actually connect to the server. # - # So we start with retry as True, which is set to false on each attempt. - # In the case of connected but failed IXFR, we set it to true once again. + # So we start with retry as True, which is set to false on each + # attempt. In the case of connected but failed IXFR, we set it to true + # once again. retry = True while retry: retry = False conn = conn_class(sock_map, zone_name, rrclass, datasrc_client, - shutdown_event, master_addrinfo, db_file, - tsig_key) + shutdown_event, master_addrinfo, zone_soa, + counters, tsig_key) conn.init_socket() ret = XFRIN_FAIL if conn.connect_to_master(): ret = conn.do_xfrin(check_soa, request_type) if ret == XFRIN_FAIL and request_type == RRType.IXFR: - # IXFR failed for some reason. It might mean the server can't - # handle it, or we don't have the zone or we are out of sync or - # whatever else. So we retry with with AXFR, as it may succeed - # in many such cases. - retry = True - request_type = RRType.AXFR - logger.warn(XFRIN_XFR_TRANSFER_FALLBACK, conn.zone_str()) - conn.close() - conn = None + # IXFR failed for some reason. It might mean the server + # can't handle it, or we don't have the zone or we are out + # of sync or whatever else. So we retry with with AXFR, as + # it may succeed in many such cases; if "IXFR only" is + # specified in request_ixfr, however, we suppress the + # fallback. + if request_ixfr == ZoneInfo.REQUEST_IXFR_ONLY: + logger.warn(XFRIN_XFR_TRANSFER_FALLBACK_DISABLED, + tsig_key, conn.zone_str()) + else: + retry = True + request_type = RRType.AXFR + logger.warn(XFRIN_XFR_TRANSFER_FALLBACK, + tsig_key, conn.zone_str()) + conn.close() + conn = None except Exception as ex: # If exception happens, just remember it here so that we can re-raise @@ -1186,9 +1264,9 @@ def __process_xfrin(server, zone_name, rrclass, db_file, if exception is not None: raise exception -def process_xfrin(server, xfrin_recorder, zone_name, rrclass, db_file, - shutdown_event, master_addrinfo, check_soa, tsig_key, - request_type, conn_class=XfrinConnection): +def process_xfrin(server, xfrin_recorder, zone_name, rrclass, datasrc_client, + zone_soa, shutdown_event, master_addrinfo, check_soa, + tsig_key, request_ixfr, counters, conn_class=XfrinConnection): # Even if it should be rare, the main process of xfrin session can # raise an exception. In order to make sure the lock in xfrin_recorder # is released in any cases, we delegate the main part to the helper @@ -1196,16 +1274,19 @@ def process_xfrin(server, xfrin_recorder, zone_name, rrclass, db_file, xfrin_recorder.increment(zone_name) exception = None try: - __process_xfrin(server, zone_name, rrclass, db_file, + __process_xfrin(server, zone_name, rrclass, datasrc_client, zone_soa, shutdown_event, master_addrinfo, check_soa, tsig_key, - request_type, conn_class) + request_ixfr, counters, conn_class) except Exception as ex: # don't log it until we complete decrement(). exception = ex xfrin_recorder.decrement(zone_name) if exception is not None: - typestr = "AXFR" if request_type == RRType.AXFR else "IXFR" + if request_ixfr == ZoneInfo.REQUEST_IXFR_DISABLED: + typestr = "AXFR" + else: + typestr = "IXFR" logger.error(XFRIN_XFR_PROCESS_FAILURE, typestr, zone_name.to_text(), str(rrclass), str(exception)) @@ -1238,10 +1319,26 @@ class XfrinRecorder: return ret class ZoneInfo: + # Internal values corresponding to request_ixfr + REQUEST_IXFR_FIRST = 0 # request_ixfr=yes, use IXFR 1st then AXFR + REQUEST_IXFR_ONLY = 1 # request_ixfr=only, use IXFR only + REQUEST_IXFR_DISABLED = 2 # request_ixfr=no, AXFR-only + + # Map from configuration values for request_ixfr to internal values + # This is a constant; don't modify. + REQUEST_IXFR_CFG_TO_VAL = { 'yes': REQUEST_IXFR_FIRST, + 'only': REQUEST_IXFR_ONLY, + 'no': REQUEST_IXFR_DISABLED } + def __init__(self, config_data, module_cc): """Creates a zone_info with the config data element as specified by the 'zones' list in xfrin.spec. Module_cc is needed to get the defaults from the specification""" + # Handle deprecated config parameter explicitly for the moment. + if config_data.get('use_ixfr') is not None: + raise XfrinZoneInfoException('"use_ixfr" was deprecated, ' + + 'use "request_ixfr"') + self._module_cc = module_cc self.set_name(config_data.get('name')) self.set_master_addr(config_data.get('master_addr')) @@ -1249,7 +1346,17 @@ class ZoneInfo: self.set_master_port(config_data.get('master_port')) self.set_zone_class(config_data.get('class')) self.set_tsig_key_name(config_data.get('tsig_key')) - self.set_use_ixfr(config_data.get('use_ixfr')) + self.set_request_ixfr(config_data.get('request_ixfr')) + + @property + def request_ixfr(self): + """Policy on the use of IXFR. + + Possible values are REQUEST_IXFR_xxx, internally stored in + __request_ixfr, read-only outside of the class. + + """ + return self.__request_ixfr def set_name(self, name_str): """Set the name for this zone given a name string. @@ -1336,16 +1443,15 @@ class ZoneInfo: else: return key - def set_use_ixfr(self, use_ixfr): - """Set use_ixfr. If set to True, it will use - IXFR for incoming transfers. If set to False, it will use AXFR. - At this moment there is no automatic fallback""" - # TODO: http://bind10.isc.org/ticket/1279 - if use_ixfr is None: - self.use_ixfr = \ - self._module_cc.get_default_value("zones/use_ixfr") - else: - self.use_ixfr = use_ixfr + def set_request_ixfr(self, request_ixfr): + if request_ixfr is None: + request_ixfr = \ + self._module_cc.get_default_value("zones/request_ixfr") + try: + self.__request_ixfr = self.REQUEST_IXFR_CFG_TO_VAL[request_ixfr] + except KeyError: + raise XfrinZoneInfoException('invalid value for request_ixfr: ' + + request_ixfr) def get_master_addr_info(self): return (self.master_addr.family, socket.SOCK_STREAM, @@ -1365,15 +1471,22 @@ class Xfrin: def __init__(self): self._max_transfers_in = 10 self._zones = {} - # This is a set of (zone/class) tuples (both as strings), - # representing the in-memory zones maintaned by Xfrin. It - # is used to trigger Auth/in-memory so that it reloads - # zones when they have been transfered in - self._memory_zones = set() - self._cc_setup() self.recorder = XfrinRecorder() self._shutdown_event = threading.Event() self._counters = Counters(SPECFILE_LOCATION) + # This is essentially private, but we allow tests to customize it. + self._datasrc_clients_mgr = DataSrcClientsMgr() + + # Initial configuration + self._cc_setup() + config_data = self._module_cc.get_full_config() + self.config_handler(config_data) + # data_sources configuration should be ready with cfgmgr, so this + # shouldn't fail; if it ever does we simply propagate the exception + # to terminate the program. + self._module_cc.add_remote_config_by_name('data_sources', + self._datasrc_config_handler) + init_keyring(self._module_cc) def _cc_setup(self): '''This method is used only as part of initialization, but is @@ -1384,14 +1497,9 @@ class Xfrin: # listening session will block the send operation. self._send_cc_session = isc.cc.Session() self._module_cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, - self.config_handler, - self.command_handler) + self.config_handler, + self.command_handler) self._module_cc.start() - config_data = self._module_cc.get_full_config() - self.config_handler(config_data) - self._module_cc.add_remote_config(AUTH_SPECFILE_LOCATION, - self._auth_config_handler) - init_keyring(self._module_cc) def _cc_check_command(self): '''This is a straightforward wrapper for cc.check_command, @@ -1423,7 +1531,8 @@ class Xfrin: old_max_transfers_in = self._max_transfers_in old_zones = self._zones - self._max_transfers_in = new_config.get("transfers_in") or self._max_transfers_in + self._max_transfers_in = \ + new_config.get("transfers_in") or self._max_transfers_in if 'zones' in new_config: self._clear_zone_info() @@ -1438,78 +1547,25 @@ class Xfrin: return create_answer(0) - def _auth_config_handler(self, new_config, config_data): - # Config handler for changes in Auth configuration - self._set_db_file() - self._set_memory_zones(new_config, config_data) - - def _clear_memory_zones(self): - """Clears the memory_zones set; called before processing the - changed list of memory datasource zones that have file type - sqlite3""" - self._memory_zones.clear() - - def _is_memory_zone(self, zone_name_str, zone_class_str): - """Returns true if the given zone/class combination is configured - in the in-memory datasource of the Auth process with file type - 'sqlite3'. - Note: this method is not thread-safe. We are considering - changing the threaded model here, but if we do not, take - care in accessing and updating the memory zone set (or add - locks) - """ - # Normalize them first, if either conversion fails, return false - # (they won't be in the set anyway) - try: - zone_name_str = Name(zone_name_str).to_text().lower() - zone_class_str = RRClass(zone_class_str).to_text() - except Exception: - return False - return (zone_name_str, zone_class_str) in self._memory_zones - - def _set_memory_zones(self, new_config, config_data): - """Part of the _auth_config_handler function, keeps an internal set - of zones in the datasources config subset that have 'sqlite3' as - their file type. - Note: this method is not thread-safe. We are considering - changing the threaded model here, but if we do not, take - care in accessing and updating the memory zone set (or add - locks) + def _datasrc_config_handler(self, new_config, config_data): + """Configuration handler of the 'data_sources' module. + + The actual handling is deletegated to the DataSrcClientsMgr class; + this method is a simple wrapper. + + This is essentially private, but implemented as 'protected' so tests + can refer to it; other external use is prohibited. + """ - # walk through the data and collect the memory zones - # If this causes any exception, assume we were passed bad data - # and keep the original set - new_memory_zones = set() try: - if "datasources" in new_config: - for datasource in new_config["datasources"]: - if "class" in datasource: - ds_class = RRClass(datasource["class"]) - else: - # Get the default - ds_class = RRClass(config_data.get_default_value( - "datasources/class")) - if datasource["type"] == "memory": - for zone in datasource["zones"]: - if "filetype" in zone and \ - zone["filetype"] == "sqlite3": - zone_name = Name(zone["origin"]) - zone_name_str = zone_name.to_text().lower() - new_memory_zones.add((zone_name_str, - ds_class.to_text())) - # Ok, we can use the data, update our list - self._memory_zones = new_memory_zones - except Exception: - # Something is wrong with the data. If this data even reached us, - # we cannot do more than assume the real module has logged and - # reported an error. Keep the old set. - return + self._datasrc_clients_mgr.reconfigure(new_config, config_data) + except isc.server_common.datasrc_clients_mgr.ConfigError as ex: + logger.error(XFRIN_DATASRC_CONFIG_ERROR, ex) def shutdown(self): ''' shutdown the xfrin process. the thread which is doing xfrin should be terminated. ''' - self._module_cc.remove_remote_config(AUTH_SPECFILE_LOCATION) self._module_cc.send_stopping() self._shutdown_event.set() main_thread = threading.currentThread() @@ -1518,6 +1574,85 @@ class Xfrin: continue th.join() + def __validate_notify_addr(self, notify_addr, zone_str, zone_info): + """Validate notify source as a destination for xfr source. + + This is called from __handle_xfr_command in case xfr is triggered + by ZoneMgr either due to incoming Notify or periodic refresh event. + + """ + if zone_info is None: + # TODO what to do? no info known about zone. defaults? + errmsg = "Got notification to retransfer unknown zone " + zone_str + logger.info(XFRIN_RETRANSFER_UNKNOWN_ZONE, zone_str) + return create_answer(1, errmsg) + else: + master_addr = zone_info.get_master_addr_info() + if (notify_addr[0] != master_addr[0] or + notify_addr[2] != master_addr[2]): + notify_addr_str = format_addrinfo(notify_addr) + master_addr_str = format_addrinfo(master_addr) + errmsg = "Got notification for " + zone_str\ + + "from unknown address: " + notify_addr_str; + logger.info(XFRIN_NOTIFY_UNKNOWN_MASTER, zone_str, + notify_addr_str, master_addr_str) + return create_answer(1, errmsg) + + # Notified address is okay + return None + + def __get_running_request_ixfr(self, arg_request_ixfr, zone_info): + """Determine the request_ixfr policy for a specific transfer. + + This is a dedicated subroutine of __handle_xfr_command. + + """ + # If explicitly specified, use it. + if arg_request_ixfr is not None: + return arg_request_ixfr + # Otherwise, if zone info is known, use its value. + if zone_info is not None: + return zone_info.request_ixfr + # Otherwise, use the default value for ZoneInfo + request_ixfr_def = \ + self._module_cc.get_default_value("zones/request_ixfr") + return ZoneInfo.REQUEST_IXFR_CFG_TO_VAL[request_ixfr_def] + + def __handle_xfr_command(self, args, check_soa, addr_validator, + request_ixfr): + """Common subroutine for handling transfer commands. + + This helper method unifies both cases of transfer command from + ZoneMgr or from a user. Depending on who invokes the transfer, + details of validation and parameter selection slightly vary. + These conditions are passed through parameters and handled in the + unified code of this method accordingly. + + If this is from the ZoneMgr due to incoming notify, zone transfer + should start from the notify's source address as long as it's + configured as a master address, according to RFC1996. The current + implementation conforms to it in a limited way: we can only set one + master address. Once we add the ability to have multiple master + addresses, we should check if it matches one of them, and then use it. + + In case of transfer command from the user, if the command specifies + the master address, use that one; otherwise try to use a configured + master address for the zone. + + """ + (zone_name, rrclass) = self._parse_zone_name_and_class(args) + master_addr = self._parse_master_and_port(args, zone_name, rrclass) + zone_info = self._get_zone_info(zone_name, rrclass) + tsig_key = None if zone_info is None else zone_info.get_tsig_key() + zone_str = format_zone_str(zone_name, rrclass) # for logging + answer = addr_validator(master_addr, zone_str, zone_info) + if answer is not None: + return answer + request_ixfr = self.__get_running_request_ixfr(request_ixfr, zone_info) + ret = self.xfrin_start(zone_name, rrclass, master_addr, tsig_key, + request_ixfr, check_soa) + return create_answer(ret[0], ret[1]) + def command_handler(self, command, args): logger.debug(DBG_XFRIN_TRACE, XFRIN_RECEIVED_COMMAND, command) answer = create_answer(0) @@ -1525,69 +1660,24 @@ class Xfrin: if command == 'shutdown': self._shutdown_event.set() elif command == 'notify' or command == REFRESH_FROM_ZONEMGR: - # Xfrin receives the refresh/notify command from zone manager. - # notify command maybe has the parameters which - # specify the notifyfrom address and port, according the RFC1996, zone - # transfer should starts first from the notifyfrom, but now, let 'TODO' it. - # (using the value now, while we can only set one master address, would be - # a security hole. Once we add the ability to have multiple master addresses, - # we should check if it matches one of them, and then use it.) - (zone_name, rrclass) = self._parse_zone_name_and_class(args) - zone_str = format_zone_str(zone_name, rrclass) - zone_info = self._get_zone_info(zone_name, rrclass) - notify_addr = self._parse_master_and_port(args, zone_name, - rrclass) - if zone_info is None: - # TODO what to do? no info known about zone. defaults? - errmsg = "Got notification to retransfer unknown zone " + zone_str - logger.info(XFRIN_RETRANSFER_UNKNOWN_ZONE, zone_str) - answer = create_answer(1, errmsg) - else: - request_type = RRType.AXFR - if zone_info.use_ixfr: - request_type = RRType.IXFR - master_addr = zone_info.get_master_addr_info() - if notify_addr[0] == master_addr[0] and\ - notify_addr[2] == master_addr[2]: - ret = self.xfrin_start(zone_name, - rrclass, - self._get_db_file(), - master_addr, - zone_info.get_tsig_key(), request_type, - True) - answer = create_answer(ret[0], ret[1]) - else: - notify_addr_str = format_addrinfo(notify_addr) - master_addr_str = format_addrinfo(master_addr) - errmsg = "Got notification for " + zone_str\ - + "from unknown address: " + notify_addr_str; - logger.info(XFRIN_NOTIFY_UNKNOWN_MASTER, zone_str, - notify_addr_str, master_addr_str) - answer = create_answer(1, errmsg) - - elif command == 'retransfer' or command == 'refresh': - # Xfrin receives the retransfer/refresh from cmdctl(sent by bindctl). - # If the command has specified master address, do transfer from the - # master address, or else do transfer from the configured masters. - (zone_name, rrclass) = self._parse_zone_name_and_class(args) - master_addr = self._parse_master_and_port(args, zone_name, - rrclass) - zone_info = self._get_zone_info(zone_name, rrclass) - tsig_key = None - request_type = RRType.AXFR - if zone_info: - tsig_key = zone_info.get_tsig_key() - if zone_info.use_ixfr: - request_type = RRType.IXFR - db_file = args.get('db_file') or self._get_db_file() - ret = self.xfrin_start(zone_name, - rrclass, - db_file, - master_addr, - tsig_key, request_type, - (False if command == 'retransfer' else True)) - answer = create_answer(ret[0], ret[1]) - + # refresh/notify command from zone manager. + # The address has to be validated and always perform SOA check. + addr_validator = \ + lambda x, y, z: self.__validate_notify_addr(x, y, z) + answer = self.__handle_xfr_command(args, True, addr_validator, + None) + elif command == 'retransfer': + # retransfer from cmdctl (sent by bindctl). + # No need for address validation, skip SOA check, and always + # use AXFR. + answer = self.__handle_xfr_command( + args, False, lambda x, y, z: None, + ZoneInfo.REQUEST_IXFR_DISABLED) + elif command == 'refresh': + # retransfer from cmdctl (sent by bindctl). similar to + # retransfer, but do SOA check, and honor request_ixfr config. + answer = self.__handle_xfr_command( + args, True, lambda x, y, z: None, None) # return statistics data to the stats daemon elif command == "getstats": # The log level is here set to debug in order to avoid @@ -1608,7 +1698,8 @@ class Xfrin: if zone_name_str is None: raise XfrinException('zone name should be provided') - return (_check_zone_name(zone_name_str), _check_zone_class(args.get('zone_class'))) + return (_check_zone_name(zone_name_str), + _check_zone_class(args.get('zone_class'))) def _parse_master_and_port(self, args, zone_name, zone_class): """ @@ -1650,21 +1741,6 @@ class Xfrin: return (addr.family, socket.SOCK_STREAM, (str(addr), port)) - def _get_db_file(self): - return self._db_file - - def _set_db_file(self): - db_file, is_default =\ - self._module_cc.get_remote_config_value(AUTH_MODULE_NAME, "database_file") - if is_default and "B10_FROM_BUILD" in os.environ: - # override the local database setting if it is default and we - # are running from the source tree - # This should be hidden inside the data source library and/or - # done as a configuration, and this special case should be gone). - db_file = os.environ["B10_FROM_BUILD"] + os.sep +\ - "bind10_zones.sqlite3" - self._db_file = db_file - def publish_xfrin_news(self, zone_name, zone_class, xfr_result): '''Send command to xfrout/zone manager module. If xfrin has finished successfully for one zone, tell the good @@ -1703,7 +1779,8 @@ class Xfrin: except isc.cc.session.SessionTimeout: pass # for now we just ignore the failure except socket.error as err: - logger.error(XFRIN_MSGQ_SEND_ERROR, XFROUT_MODULE_NAME, ZONE_MANAGER_MODULE_NAME) + logger.error(XFRIN_MSGQ_SEND_ERROR, self.tsig_key_name, + XFROUT_MODULE_NAME, ZONE_MANAGER_MODULE_NAME) else: msg = create_command(notify_out.ZONE_XFRIN_FAILED, param) @@ -1717,18 +1794,16 @@ class Xfrin: except isc.cc.session.SessionTimeout: pass # for now we just ignore the failure except socket.error as err: - logger.error(XFRIN_MSGQ_SEND_ERROR_ZONE_MANAGER, ZONE_MANAGER_MODULE_NAME) + logger.error(XFRIN_MSGQ_SEND_ERROR_ZONE_MANAGER, self.tsig_key_name, + ZONE_MANAGER_MODULE_NAME) def startup(self): logger.debug(DBG_PROCESS, XFRIN_STARTED) while not self._shutdown_event.is_set(): self._cc_check_command() - def xfrin_start(self, zone_name, rrclass, db_file, master_addrinfo, - tsig_key, request_type, check_soa=True): - if "pydnspp" not in sys.modules: - return (1, "xfrin failed, can't load dns message python library: 'pydnspp'") - + def xfrin_start(self, zone_name, rrclass, master_addrinfo, tsig_key, + request_ixfr, check_soa=True): # check max_transfer_in, else return quota error if self.recorder.count() >= self._max_transfers_in: return (1, 'xfrin quota error') @@ -1736,19 +1811,83 @@ class Xfrin: if self.recorder.xfrin_in_progress(zone_name): return (1, 'zone xfrin is in progress') - xfrin_thread = threading.Thread(target = process_xfrin, - args = (self, - self.recorder, - zone_name, - rrclass, - db_file, - self._shutdown_event, - master_addrinfo, check_soa, - tsig_key, request_type)) + # Identify the data source to which the zone content is transferred, + # and get the current zone SOA from the data source (if available). + # Note that we do this before spawning the xfrin session thread. + # find() on the client list and use of ZoneFinder (in _get_zone_soa()) + # should be completed within the same single thread. + datasrc_client = None + clist = self._datasrc_clients_mgr.get_client_list(rrclass) + if clist is None: + return (1, 'no data source is configured for class %s' % rrclass) + + try: + datasrc_client = clist.find(zone_name, True, False)[0] + if datasrc_client is None: # can happen, so log it separately. + logger.error(XFRIN_DATASRC_UNKNOWN, + format_zone_str(zone_name, rrclass)) + return (1, 'data source to transfer %s to is unknown' % + format_zone_str(zone_name, rrclass)) + zone_soa = _get_zone_soa(datasrc_client, zone_name, rrclass) + except isc.datasrc.Error as ex: + # rare case error. re-raise as XfrinException so it'll be logged + # in command_handler(). + raise XfrinException('unexpected failure in datasrc module: ' + + str(ex)) + + xfrin_thread = threading.Thread(target=process_xfrin, + args=(self, self.recorder, + zone_name, rrclass, + datasrc_client, zone_soa, + self._shutdown_event, + master_addrinfo, check_soa, + tsig_key, request_ixfr, + self._counters)) xfrin_thread.start() return (0, 'zone xfrin is started') +def _get_zone_soa(datasrc_client, zone_name, zone_class): + """Retrieve the current SOA RR of the zone to be transferred. + + This function is essentially private to the module, but will also + be called (or tweaked) from tests; no one else should use this + function directly. + + The specified zone is expected to exist in the data source referenced + by the given datasrc_client at the point of the call to this function. + If this is not met XfrinException exception will be raised. + + It will be used for various purposes in subsequent xfr protocol + processing. It is validly possible that the zone is currently + empty and therefore doesn't have an SOA, so this method doesn't + consider it an error and returns None in such a case. It may or + may not result in failure in the actual processing depending on + how the SOA is used. + + When the zone has an SOA RR, this method makes sure that it's + valid, i.e., it has exactly one RDATA; if it is not the case + this method returns None. + + """ + # get the zone finder. this must be SUCCESS (not even + # PARTIALMATCH) because we are specifying the zone origin name. + result, finder = datasrc_client.find_zone(zone_name) + if result != DataSourceClient.SUCCESS: + # The data source doesn't know the zone. In the context of this + # function is called, this shouldn't happen. + raise XfrinException("unexpected result: zone %s doesn't exist" % + format_zone_str(zone_name, zone_class)) + result, soa_rrset, _ = finder.find(zone_name, RRType.SOA) + if result != ZoneFinder.SUCCESS: + logger.info(XFRIN_ZONE_NO_SOA, format_zone_str(zone_name, zone_class)) + return None + if soa_rrset.get_rdata_count() != 1: + logger.warn(XFRIN_ZONE_MULTIPLE_SOA, + format_zone_str(zone_name, zone_class), + soa_rrset.get_rdata_count()) + return None + return soa_rrset xfrind = None @@ -1797,4 +1936,4 @@ def main(xfrin_class, use_signal=True): logger.info(XFRIN_EXITING) if __name__ == '__main__': - main(Xfrin) + isc.util.traceback_handler.traceback_handler(lambda: main(Xfrin)) |