Commit ea33f606 authored by Likun Zhang's avatar Likun Zhang
Browse files

commit the code of notify-out. TODO:merge the code of secondary manager(in...

commit the code of notify-out. TODO:merge the code of secondary manager(in branch 215) to this branch, so that it's easy do the test.

git-svn-id: svn://bind10.isc.org/svn/bind10/branches/trac289@2611 e5f2f494-b856-4b98-b285-d166d9295462
parent 8b2485e5
...@@ -423,6 +423,8 @@ AC_CONFIG_FILES([Makefile ...@@ -423,6 +423,8 @@ AC_CONFIG_FILES([Makefile
src/lib/python/isc/config/tests/Makefile src/lib/python/isc/config/tests/Makefile
src/lib/python/isc/log/Makefile src/lib/python/isc/log/Makefile
src/lib/python/isc/log/tests/Makefile src/lib/python/isc/log/tests/Makefile
src/lib/python/isc/notify/Makefile
src/lib/python/isc/notify/tests/Makefile
src/lib/config/Makefile src/lib/config/Makefile
src/lib/config/tests/Makefile src/lib/config/tests/Makefile
src/lib/dns/Makefile src/lib/dns/Makefile
...@@ -469,6 +471,7 @@ AC_OUTPUT([src/bin/cfgmgr/b10-cfgmgr.py ...@@ -469,6 +471,7 @@ AC_OUTPUT([src/bin/cfgmgr/b10-cfgmgr.py
src/lib/python/isc/config/tests/config_test src/lib/python/isc/config/tests/config_test
src/lib/python/isc/cc/tests/cc_test src/lib/python/isc/cc/tests/cc_test
src/lib/python/isc/log/tests/log_test src/lib/python/isc/log/tests/log_test
src/lib/python/isc/notify/tests/notify_out_test
src/lib/dns/gen-rdatacode.py src/lib/dns/gen-rdatacode.py
src/lib/python/bind10_config.py src/lib/python/bind10_config.py
src/lib/dns/tests/testdata/gen-wiredata.py src/lib/dns/tests/testdata/gen-wiredata.py
......
...@@ -28,6 +28,7 @@ import socket ...@@ -28,6 +28,7 @@ import socket
import random import random
from optparse import OptionParser, OptionValueError from optparse import OptionParser, OptionValueError
from isc.config.ccsession import * from isc.config.ccsession import *
from isc.notify import notify_out
try: try:
from libdns_python import * from libdns_python import *
except ImportError as e: except ImportError as e:
...@@ -49,7 +50,7 @@ else: ...@@ -49,7 +50,7 @@ else:
SPECFILE_LOCATION = SPECFILE_PATH + "/xfrin.spec" SPECFILE_LOCATION = SPECFILE_PATH + "/xfrin.spec"
AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + "/auth.spec" AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + "/auth.spec"
XFROUT_MODULE_NAME = 'Xfrout'
__version__ = 'BIND10' __version__ = 'BIND10'
# define xfrin rcode # define xfrin rcode
XFRIN_OK = 0 XFRIN_OK = 0
...@@ -66,7 +67,7 @@ class XfrinException(Exception): ...@@ -66,7 +67,7 @@ class XfrinException(Exception):
class XfrinConnection(asyncore.dispatcher): class XfrinConnection(asyncore.dispatcher):
'''Do xfrin in this class. ''' '''Do xfrin in this class. '''
def __init__(self, def __init__(self, server_,
sock_map, zone_name, rrclass, db_file, shutdown_event, sock_map, zone_name, rrclass, db_file, shutdown_event,
master_addrinfo, verbose = False, idle_timeout = 60): master_addrinfo, verbose = False, idle_timeout = 60):
''' idle_timeout: max idle time for read data from socket. ''' idle_timeout: max idle time for read data from socket.
...@@ -77,6 +78,7 @@ class XfrinConnection(asyncore.dispatcher): ...@@ -77,6 +78,7 @@ class XfrinConnection(asyncore.dispatcher):
asyncore.dispatcher.__init__(self, map=sock_map) asyncore.dispatcher.__init__(self, map=sock_map)
self.create_socket(master_addrinfo[0], master_addrinfo[1]) self.create_socket(master_addrinfo[0], master_addrinfo[1])
self._zone_name = zone_name self._zone_name = zone_name
self._server = server_
self._sock_map = sock_map self._sock_map = sock_map
self._rrclass = rrclass self._rrclass = rrclass
self._db_file = db_file self._db_file = db_file
...@@ -192,6 +194,7 @@ class XfrinConnection(asyncore.dispatcher): ...@@ -192,6 +194,7 @@ class XfrinConnection(asyncore.dispatcher):
self._handle_xfrin_response) self._handle_xfrin_response)
self.log_msg(logstr + 'succeeded') self.log_msg(logstr + 'succeeded')
self._server.send_notify_command(self._zone_name)
ret = XFRIN_OK ret = XFRIN_OK
except XfrinException as e: except XfrinException as e:
...@@ -316,11 +319,11 @@ class XfrinConnection(asyncore.dispatcher): ...@@ -316,11 +319,11 @@ class XfrinConnection(asyncore.dispatcher):
sys.stdout.write('[b10-xfrin] %s\n' % str(msg)) sys.stdout.write('[b10-xfrin] %s\n' % str(msg))
def process_xfrin(xfrin_recorder, zone_name, rrclass, db_file, def process_xfrin(server, xfrin_recorder, zone_name, rrclass, db_file,
shutdown_event, master_addrinfo, check_soa, verbose): shutdown_event, master_addrinfo, check_soa, verbose):
xfrin_recorder.increment(zone_name) xfrin_recorder.increment(zone_name)
sock_map = {} sock_map = {}
conn = XfrinConnection(sock_map, zone_name, rrclass, db_file, conn = XfrinConnection(server, sock_map, zone_name, rrclass, db_file,
shutdown_event, master_addrinfo, verbose) shutdown_event, master_addrinfo, verbose)
if conn.connect_to_master(): if conn.connect_to_master():
conn.do_xfrin(check_soa) conn.do_xfrin(check_soa)
...@@ -370,17 +373,20 @@ This method is used only as part of initialization, but is implemented ...@@ -370,17 +373,20 @@ This method is used only as part of initialization, but is implemented
separately for convenience of unit tests; by letting the test code override separately for convenience of unit tests; by letting the test code override
this method we can test most of this class without requiring a command channel. this method we can test most of this class without requiring a command channel.
''' '''
self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, # Create one session for sending command to other modules, because the
# listening session will block the send operation.
self._send_cc_session = isc.cc.Session()
self._module_cc = isc.config.ModuleCCSession(SPECFILE_LOCATION,
self.config_handler, self.config_handler,
self.command_handler) self.command_handler)
self._cc.start() self._module_cc.start()
def _cc_check_command(self): def _cc_check_command(self):
''' '''
This is a straightforward wrapper for cc.check_command, but provided as This is a straightforward wrapper for cc.check_command, but provided as
a separate method for the convenience of unit tests. a separate method for the convenience of unit tests.
''' '''
self._cc.check_command() self._module_cc.check_command()
def config_handler(self, new_config): def config_handler(self, new_config):
# TODO, process new config data # TODO, process new config data
...@@ -420,6 +426,12 @@ a separate method for the convenience of unit tests. ...@@ -420,6 +426,12 @@ a separate method for the convenience of unit tests.
return answer return answer
def send_notify_command(self, zone_name):
'''Send Notify command to xfrout module.'''
param = {'zone_name': zone_name}
msg = create_command(notify_out.ZONE_NOTIFY_CMD, param)
self._send_cc_session.group_sendmsg(msg, XFROUT_MODULE_NAME)
def _parse_cmd_params(self, args): def _parse_cmd_params(self, args):
zone_name = args.get('zone_name') zone_name = args.get('zone_name')
if not zone_name: if not zone_name:
...@@ -441,14 +453,14 @@ a separate method for the convenience of unit tests. ...@@ -441,14 +453,14 @@ a separate method for the convenience of unit tests.
# should add it on start, and not remove it here # should add it on start, and not remove it here
# (or, if we have writable ds, we might not need this in # (or, if we have writable ds, we might not need this in
# the first place) # the first place)
self._cc.add_remote_config(AUTH_SPECFILE_LOCATION) self._module_cc.add_remote_config(AUTH_SPECFILE_LOCATION)
db_file, is_default = self._cc.get_remote_config_value("Auth", "database_file") db_file, is_default = self._module_cc.get_remote_config_value("Auth", "database_file")
if is_default and "B10_FROM_BUILD" in os.environ: if is_default and "B10_FROM_BUILD" in os.environ:
# this too should be unnecessary, but currently the # this too should be unnecessary, but currently the
# 'from build' override isn't stored in the config # 'from build' override isn't stored in the config
# (and we don't have writable datasources yet) # (and we don't have writable datasources yet)
db_file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3" db_file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
self._cc.remove_remote_config(AUTH_SPECFILE_LOCATION) self._module_cc.remove_remote_config(AUTH_SPECFILE_LOCATION)
return (zone_name, master_addrinfo, db_file) return (zone_name, master_addrinfo, db_file)
...@@ -469,7 +481,8 @@ a separate method for the convenience of unit tests. ...@@ -469,7 +481,8 @@ a separate method for the convenience of unit tests.
return (1, 'zone xfrin is in progress') return (1, 'zone xfrin is in progress')
xfrin_thread = threading.Thread(target = process_xfrin, xfrin_thread = threading.Thread(target = process_xfrin,
args = (self.recorder, args = (self,
self.recorder,
zone_name, rrclass, zone_name, rrclass,
db_file, db_file,
self._shutdown_event, self._shutdown_event,
......
...@@ -28,6 +28,7 @@ import os ...@@ -28,6 +28,7 @@ import os
from isc.config.ccsession import * from isc.config.ccsession import *
from isc.log.log import * from isc.log.log import *
from isc.cc import SessionError from isc.cc import SessionError
from isc.notify import notify_out
import socket import socket
import select import select
import errno import errno
...@@ -303,7 +304,7 @@ class UnixSockServer(ThreadingUnixStreamServer): ...@@ -303,7 +304,7 @@ class UnixSockServer(ThreadingUnixStreamServer):
self._log = log self._log = log
self.update_config_data(config_data) self.update_config_data(config_data)
self._cc = cc self._cc = cc
def finish_request(self, request, client_address): def finish_request(self, request, client_address):
'''Finish one request by instantiating RequestHandlerClass.''' '''Finish one request by instantiating RequestHandlerClass.'''
self.RequestHandlerClass(request, client_address, self, self._log) self.RequestHandlerClass(request, client_address, self, self._log)
...@@ -415,16 +416,25 @@ class XfroutServer: ...@@ -415,16 +416,25 @@ class XfroutServer:
self._config_data.get('log_severity'), self._config_data.get('log_versions'), self._config_data.get('log_severity'), self._config_data.get('log_versions'),
self._config_data.get('log_max_bytes'), True) self._config_data.get('log_max_bytes'), True)
self._start_xfr_query_listener() self._start_xfr_query_listener()
self._start_notifier()
def _start_xfr_query_listener(self): def _start_xfr_query_listener(self):
'''Start a new thread to accept xfr query. ''' '''Start a new thread to accept xfr query. '''
self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession, self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
self._shutdown_event, self._config_data, self._shutdown_event, self._config_data,
self._cc, self._log); self._cc, self._log);
listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,)) listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
listener.start() listener.start()
def _start_notifier(self):
datasrc = self._unix_socket_server.get_db_file()
self._notifier = notify_out.NotifyOut(datasrc, self._log)
td = threading.Thread(target = notify_out.dispatcher, args = (self._notifier,))
td.daemon = True
td.start()
def send_notify(self, zone_name):
self._notifier.send_notify(zone_name)
def config_handler(self, new_config): def config_handler(self, new_config):
'''Update config data. TODO. Do error check''' '''Update config data. TODO. Do error check'''
...@@ -466,11 +476,20 @@ class XfroutServer: ...@@ -466,11 +476,20 @@ class XfroutServer:
self._log.log_message("info", "Received shutdown command.") self._log.log_message("info", "Received shutdown command.")
self.shutdown() self.shutdown()
answer = create_answer(0) answer = create_answer(0)
elif cmd == notify_out.ZONE_NOTIFY_CMD:
zone_name = args.get('zone_name')
if zone_name:
self._log.log_message("info", "Receive notify command for zone " + zone_name)
self.send_notify(zone_name)
answer = create_answer(0)
else:
answer = create_answer(1, "Bad command parameter:" + str(args))
else: else:
answer = create_answer(1, "Unknown command:" + str(cmd)) answer = create_answer(1, "Unknown command:" + str(cmd))
return answer return answer
def run(self): def run(self):
'''Get and process all commands sent from cfgmgr or other modules. ''' '''Get and process all commands sent from cfgmgr or other modules. '''
......
SUBDIRS = datasrc cc config log # Util SUBDIRS = datasrc cc config log notify # Util
python_PYTHON = __init__.py python_PYTHON = __init__.py
......
...@@ -120,6 +120,39 @@ def get_zone_soa(zonename, dbfile): ...@@ -120,6 +120,39 @@ def get_zone_soa(zonename, dbfile):
return datas return datas
#########################################################################
# get_zone_rrset
# returns the rrset of the zone with the given zone name, rrset name
# and given rd type.
# If the zone doesn't exist or rd type doesn't exist, return an empty list.
#########################################################################
def get_zone_rrset(zonename, rr_name, rdtype, dbfile):
conn, cur = open(dbfile)
id = get_zoneid(zonename, cur)
cur.execute("SELECT * FROM records WHERE name = ? and zone_id = ? and rdtype = ?",
[rr_name, id, rdtype])
datas = cur.fetchall()
cur.close()
conn.close()
return datas
#########################################################################
# get_zones_info:
# returns all the zones' information.
#########################################################################
def get_zones_info(db_file):
conn, cur = open(db_file)
cur.execute("SELECT name, rdclass FROM zones")
info = cur.fetchone()
while info:
yield info
info = cur.fetchone()
cur.close()
conn.close()
######################################################################### #########################################################################
# get_zoneid: # get_zoneid:
# returns the zone_id for a given zone name, or an empty # returns the zone_id for a given zone name, or an empty
......
SUBDIRS = tests
python_PYTHON = __init__.py notify_out.py
pythondir = $(pyexecdir)/isc/notify
from isc.notify.notify_out import *
import select
import random
import socket
import threading
import time
from isc.datasrc import sqlite3_ds
import isc
try:
from libdns_python import *
except ImportError as e:
# C++ loadable module may not be installed;
sys.stderr.write('[b10-xfrout] failed to import DNS or XFR module: %s\n' % str(e))
ZONE_NOTIFY_CMD = 'zone_new_data_ready'
_MAX_NOTIFY_NUM = 30
_MAX_NOTIFY_TRY_NUM = 5
_EVENT_NONE = 0
_EVENT_READ = 1
_EVENT_TIMEOUT = 2
_NOTIFY_TIMEOUT = 2
def addr_to_str(addr):
return '%s#%s' % (addr[0], addr[1])
def dispatcher(notifier):
while True:
replied_zones, not_replied_zones = notifier._wait_for_notify_reply()
if len(replied_zones) == 0 and len(not_replied_zones) == 0:
time.sleep(0.5) # A better time?
continue
for name_ in replied_zones:
notifier._zone_notify_handler(replied_zones[name_], _EVENT_READ)
for name_ in not_replied_zones:
if not_replied_zones[name_].notify_timeout < time.time():
notifier._zone_notify_handler(not_replied_zones[name_], _EVENT_TIMEOUT)
class ZoneNotifyInfo:
'''This class keeps track of notify-out information for one zone.
timeout_: absolute time for next notify reply.
'''
def __init__(self, zone_name_, klass):
self._notify_slaves = []
self._notify_current = None
self._slave_index = 0
self._sock = None
self.zone_name = zone_name_
self.zone_class = klass
self.notify_msg_id = 0
self.notify_timeout = 0
# Notify times sending to one target.
self.notify_try_num = 0
def set_next_notify_target(self):
if self._slave_index < (len(self._notify_slaves) - 1):
self._slave_index += 1
self._notify_current = self._notify_slaves[self._slave_index]
else:
self._notify_current = None
def prepare_notify_out(self):
'''Create the socket and set notify timeout time to now'''
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #TODO support IPv6?
self.notify_timeout = time.time()
self.notify_try_num = 0
self._slave_index = 0
if len(self._notify_slaves) > 0:
self._notify_current = self._notify_slaves[0]
def finish_notify_out(self):
if self._sock:
self._sock.close()
self._sock = None
def get_socket(self):
return self._sock
def get_current_notify_target(self):
return self._notify_current
class NotifyOut:
def __init__(self, datasrc_file, log=None, verbose=True):
self._notify_infos = {}
self._waiting_zones = []
self._notifying_zones = []
self._log = log
self.notify_num = 0 # the count of in progress notifies
self._verbose = verbose
self._lock = threading.Lock()
self._db_file = datasrc_file
self._init_notify_out(datasrc_file)
def _init_notify_out(self, datasrc_file):
'''Get all the zones name and its notify target's address
TODO, currently the zones are got by going through the zone
table in database. There should be a better way to get them
and also the setting 'also_notify', and there should be one
mechanism to cover the changed datasrc.'''
self._db_file = datasrc_file
for zone_name, zone_class in sqlite3_ds.get_zones_info(datasrc_file):
self._notify_infos[zone_name] = ZoneNotifyInfo(zone_name, zone_class)
slaves = self._get_notify_slaves_from_ns(zone_name)
for item in slaves:
self._notify_infos[zone_name]._notify_slaves.append((item, 53))
def _get_rdata_data(self, rr):
return rr[7].strip()
def _get_notify_slaves_from_ns(self, zone_name):
'''The simplest way to get the address of slaves, but now correct.
TODO. the function should be provided by one library.'''
ns_rrset = sqlite3_ds.get_zone_rrset(zone_name, zone_name, 'NS', self._db_file)
soa_rrset = sqlite3_ds.get_zone_rrset(zone_name, zone_name, 'SOA', self._db_file)
ns_rr_name = []
for ns in ns_rrset:
ns_rr_name.append(self._get_rdata_data(ns))
sname = (soa_rrset[0][7].split(' '))[0].strip() #TODO, bad hardcode to get rdata part
if sname in ns_rr_name:
ns_rr_name.remove(sname)
addr_list = []
for rr_name in ns_rr_name:
a_rrset = sqlite3_ds.get_zone_rrset(zone_name, rr_name, 'A', self._db_file)
aaaa_rrset = sqlite3_ds.get_zone_rrset(zone_name, rr_name, 'AAAA', self._db_file)
for rr in a_rrset:
addr_list.append(self._get_rdata_data(rr))
for rr in aaaa_rrset:
addr_list.append(self._get_rdata_data(rr))
return addr_list
def send_notify(self, zone_name):
print('=============begin to send notify', zone_name, '===', self._notify_infos)
print(self._notify_infos)
if zone_name not in self._notify_infos:
print('=============not eixst')
return
print('=============begin to send notify')
with self._lock:
if (self.notify_num >= _MAX_NOTIFY_NUM) or (zone_name in self._notifying_zones):
if zone_name not in self._waiting_zones:
self._waiting_zones.append(zone_name)
else:
self._notify_infos[zone_name].prepare_notify_out()
self.notify_num += 1
self._notifying_zones.append(zone_name)
def _wait_for_notify_reply(self):
'''receive notify replies in specified time. returned value
is one tuple:(replied_zones, not_replied_zones)
replied_zones: the zones which receive notify reply.
not_replied_zones: the zones which haven't got notify reply.
'''
valid_socks = []
notifying_zones = {}
min_timeout = time.time()
for info in self._notify_infos:
sock = self._notify_infos[info].get_socket()
if sock:
valid_socks.append(sock)
notifying_zones[info] = self._notify_infos[info]
tmp_timeout = self._notify_infos[info].notify_timeout
if min_timeout > tmp_timeout:
min_timeout = tmp_timeout
block_timeout = min_timeout - time.time()
if block_timeout < 0:
block_timeout = 0
try:
r_fds, w, e = select.select(valid_socks, [], [], block_timeout)
except select.error as err:
if err.args[0] != EINTR:
return [], []
not_replied_zones = {}
replied_zones = {}
for info in notifying_zones:
if notifying_zones[info].get_socket() in r_fds:
replied_zones[info] = notifying_zones[info]
else:
not_replied_zones[info] = notifying_zones[info]
return replied_zones, not_replied_zones
def _zone_notify_handler(self, zone_notify_info, event_type):
tgt = zone_notify_info.get_current_notify_target()
if event_type == _EVENT_READ:
reply = self._get_notify_reply(zone_notify_info.get_socket(), tgt)
if reply:
if self._handle_notify_reply(zone_notify_info, reply):
self._notify_next_target(zone_notify_info)
elif event_type == _EVENT_TIMEOUT and zone_notify_info.notify_try_num > 0:
self._log_msg('info', 'notify retry to %s' % addr_to_str(tgt))
tgt = zone_notify_info.get_current_notify_target()
if tgt:
zone_notify_info.notify_try_num += 1
if zone_notify_info.notify_try_num > _MAX_NOTIFY_TRY_NUM:
self._log_msg('info', 'notify to %s: retried exceeded' % addr_to_str(tgt))
self._notify_next_target(zone_notify_info)
else:
retry_timeout = _NOTIFY_TIMEOUT * pow(2, zone_notify_info.notify_try_num)
# set exponential backoff according rfc1996 section 3.6
zone_notify_info.notify_timeout = time.time() + retry_timeout
self._send_notify_message_udp(zone_notify_info, tgt)
def _notify_next_target(self, zone_notify_info):
'''Notify next address for the same zone. If all the targets
has been notified, notify the first zone in waiting list. '''
zone_notify_info.notify_try_num = 0
zone_notify_info.set_next_notify_target()
tgt = zone_notify_info.get_current_notify_target()
if not tgt:
zone_notify_info.finish_notify_out()
with self._lock:
self.notify_num -= 1
self._notifying_zones.remove(zone_notify_info.zone_name)
# trigger notify out for waiting zones
if len(self._waiting_zones) > 0:
zone_name = self._waiting_zones.pop(0)
self._notify_infos[zone_name].prepare_notify_out()
self.notify_num += 1
def _send_notify_message_udp(self, zone_notify_info, addrinfo):
msg, qid = self._create_notify_message(zone_notify_info.zone_name,
zone_notify_info.zone_class)
render = MessageRenderer()
render.set_length_limit(512)
msg.to_wire(render)
zone_notify_info.notify_msg_id = qid
sock = zone_notify_info.get_socket()
try:
sock.sendto(render.get_data(), 0, addrinfo)
self._log_msg('info', 'sending notify to %s' % addr_to_str(addrinfo))
except socket.error as err:
self._log_msg('error', 'send notify to %s failed: %s' % (addr_to_str(addrinfo), str(err)))
return False
return True
def _create_rrset_from_db_record(self, record):
'''Create one rrset from one record of datasource, if the schema of record is changed,
This function should be updated first. TODO, the function is copied from xfrout, there
should be library for creating one rrset. '''
rrtype_ = RRType(record[5])
rdata_ = Rdata(rrtype_, RRClass("IN"), " ".join(record[7:]))
rrset_ = RRset(Name(record[2]), RRClass("IN"), rrtype_, RRTTL( int(record[4])))
rrset_.add_rdata(rdata_)
return rrset_
def _create_notify_message(self, zone_name, zone_class):
msg = Message(Message.RENDER)
qid = random.randint(0, 0xFFFF)
msg.set_qid(qid)
msg.set_opcode(Opcode.NOTIFY())
msg.set_rcode(Rcode.NOERROR())
msg.set_header_flag(MessageFlag.AA())
question = Question(Name(zone_name), RRClass(zone_class), RRType('SOA'))