Commit e68c127f authored by Naoki Kambe's avatar Naoki Kambe
Browse files

[master] Merge branch 'trac2158'

Conflicts:
	ChangeLog
parents 1db4a396 2c05e5b0
475. [func] naokikambe
Added Xfrout statistics counters: notifyoutv4, notifyoutv6, xfrrej, and
xfrreqdone. These are per-zone type counters. The value of these
counters can be seen with zone name by invoking "Stats show Xfrout" via
bindctl.
(Trac #2158, git TBD)
474. [func] stephen
DHCP servers now use the BIND 10 logging system for messages.
(Trac #1545, git de69a92613b36bd3944cb061e1b7c611c3c85506)
......
......@@ -153,6 +153,54 @@
</refsect1>
<refsect1>
<title>STATISTICS DATA</title>
<para>
The statistics data collected by the <command>b10-xfrout</command>
daemon for <quote>Xfrout</quote> include:
</para>
<variablelist>
<varlistentry>
<term>notifyoutv4</term>
<listitem><simpara>
Number of IPv4 notifies per zone name sent out from Xfrout
</simpara></listitem>
</varlistentry>
<varlistentry>
<term>notifyoutv6</term>
<listitem><simpara>
Number of IPv6 notifies per zone name sent out from Xfrout
</simpara></listitem>
</varlistentry>
<varlistentry>
<term>xfrrej</term>
<listitem><simpara>
Number of XFR requests per zone name rejected by Xfrout
</simpara></listitem>
</varlistentry>
<varlistentry>
<term>xfrreqdone</term>
<listitem><simpara>
Number of requested zone transfers per zone name completed
</simpara></listitem>
</varlistentry>
</variablelist>
<para>
In per-zone counters the special zone name '_SERVER_' exists. It doesn't
mean a specific zone. It represents an entire server and its value means
a total count of all zones.
</para>
</refsect1>
<!--
<refsect1>
<title>OPTIONS</title>
......
......@@ -277,13 +277,23 @@ class TestXfroutSessionBase(unittest.TestCase):
# When not testing ACLs, simply accept
isc.acl.dns.REQUEST_LOADER.load(
[{"action": "ACCEPT"}]),
{})
{},
counter_xfrrej=self._counter_xfrrej,
counter_xfrreqdone=self._counter_xfrreqdone)
self.set_request_type(RRType.AXFR()) # test AXFR by default
self.mdata = self.create_request_data()
self.soa_rrset = create_soa(SOA_CURRENT_VERSION)
# some test replaces a module-wide function. We should ensure the
# original is used elsewhere.
self.orig_get_rrset_len = xfrout.get_rrset_len
self._zone_name_xfrrej = None
self._zone_name_xfrreqdone = None
def _counter_xfrrej(self, zone_name):
self._zone_name_xfrrej = zone_name
def _counter_xfrreqdone(self, zone_name):
self._zone_name_xfrreqdone = zone_name
def tearDown(self):
xfrout.get_rrset_len = self.orig_get_rrset_len
......@@ -458,7 +468,28 @@ class TestXfroutSession(TestXfroutSessionBase):
# ACL checks only with the default ACL
def acl_setter(acl):
self.xfrsess._acl = acl
self.assertIsNone(self._zone_name_xfrrej)
self.check_transfer_acl(acl_setter)
self.assertEqual(self._zone_name_xfrrej, TEST_ZONE_NAME_STR)
def test_transfer_acl_with_nonetype_xfrrej(self):
# ACL checks only with the default ACL and NoneType xfrrej
# counter
def acl_setter(acl):
self.xfrsess._acl = acl
self.xfrsess._counter_xfrrej = None
self.assertIsNone(self._zone_name_xfrrej)
self.check_transfer_acl(acl_setter)
self.assertIsNone(self._zone_name_xfrrej)
def test_transfer_acl_with_notcallable_xfrrej(self):
# ACL checks only with the default ACL and not callable xfrrej
# counter
def acl_setter(acl):
self.xfrsess._acl = acl
self.xfrsess._counter_xfrrej = 'NOT CALLABLE'
self.assertRaises(TypeError,
self.check_transfer_acl, acl_setter)
def test_transfer_zoneacl(self):
# ACL check with a per zone ACL + default ACL. The per zone ACL
......@@ -469,7 +500,9 @@ class TestXfroutSession(TestXfroutSessionBase):
self.xfrsess._zone_config[zone_key]['transfer_acl'] = acl
self.xfrsess._acl = isc.acl.dns.REQUEST_LOADER.load([
{"from": "127.0.0.1", "action": "DROP"}])
self.assertIsNone(self._zone_name_xfrrej)
self.check_transfer_acl(acl_setter)
self.assertEqual(self._zone_name_xfrrej, TEST_ZONE_NAME_STR)
def test_transfer_zoneacl_nomatch(self):
# similar to the previous one, but the per zone doesn't match the
......@@ -481,7 +514,9 @@ class TestXfroutSession(TestXfroutSessionBase):
isc.acl.dns.REQUEST_LOADER.load([
{"from": "127.0.0.1", "action": "DROP"}])
self.xfrsess._acl = acl
self.assertIsNone(self._zone_name_xfrrej)
self.check_transfer_acl(acl_setter)
self.assertEqual(self._zone_name_xfrrej, TEST_ZONE_NAME_STR)
def test_get_transfer_acl(self):
# set the default ACL. If there's no specific zone ACL, this one
......@@ -831,9 +866,39 @@ class TestXfroutSession(TestXfroutSessionBase):
def myreply(msg, sock):
self.sock.send(b"success")
self.assertIsNone(self._zone_name_xfrreqdone)
self.xfrsess._reply_xfrout_query = myreply
self.xfrsess.dns_xfrout_start(self.sock, self.mdata)
self.assertEqual(self.sock.readsent(), b"success")
self.assertEqual(self._zone_name_xfrreqdone, TEST_ZONE_NAME_STR)
def test_dns_xfrout_start_with_nonetype_xfrreqdone(self):
def noerror(msg, name, rrclass):
return Rcode.NOERROR()
self.xfrsess._xfrout_setup = noerror
def myreply(msg, sock):
self.sock.send(b"success")
self.assertIsNone(self._zone_name_xfrreqdone)
self.xfrsess._reply_xfrout_query = myreply
self.xfrsess._counter_xfrreqdone = None
self.xfrsess.dns_xfrout_start(self.sock, self.mdata)
self.assertIsNone(self._zone_name_xfrreqdone)
def test_dns_xfrout_start_with_notcallable_xfrreqdone(self):
def noerror(msg, name, rrclass):
return Rcode.NOERROR()
self.xfrsess._xfrout_setup = noerror
def myreply(msg, sock):
self.sock.send(b"success")
self.xfrsess._reply_xfrout_query = myreply
self.xfrsess._counter_xfrreqdone = 'NOT CALLABLE'
self.assertRaises(TypeError,
self.xfrsess.dns_xfrout_start, self.sock,
self.mdata)
def test_reply_xfrout_query_axfr(self):
self.xfrsess._soa = self.soa_rrset
......@@ -1153,6 +1218,7 @@ class MyUnixSockServer(UnixSockServer):
self._common_init()
self._cc = MyCCSession()
self.update_config_data(self._cc.get_full_config())
self._counters = {}
class TestUnixSockServer(unittest.TestCase):
def setUp(self):
......@@ -1504,6 +1570,80 @@ class MyXfroutServer(XfroutServer):
self._unix_socket_server = None
# Disable the wait for threads
self._wait_for_threads = lambda : None
self._cc.get_module_spec = lambda:\
isc.config.module_spec_from_file(xfrout.SPECFILE_LOCATION)
# setup an XfroutCount object
self._counter = XfroutCounter(
self._cc.get_module_spec().get_statistics_spec())
class TestXfroutCounter(unittest.TestCase):
def setUp(self):
statistics_spec = \
isc.config.module_spec_from_file(\
xfrout.SPECFILE_LOCATION).get_statistics_spec()
self.xfrout_counter = XfroutCounter(statistics_spec)
self._counters = isc.config.spec_name_list(\
isc.config.find_spec_part(\
statistics_spec, XfroutCounter.perzone_prefix)\
['named_set_item_spec']['map_item_spec'])
self._started = threading.Event()
self._number = 3 # number of the threads
self._cycle = 10000 # number of counting per thread
def test_get_default_statistics_data(self):
self.assertEqual(self.xfrout_counter._get_default_statistics_data(),
{XfroutCounter.perzone_prefix: {
XfroutCounter.entire_server: \
dict([(cnt, 0) for cnt in self._counters])
}})
def setup_incrementer(self, incrementer):
self._started.wait()
for i in range(self._cycle): incrementer(TEST_ZONE_NAME_STR)
def start_incrementer(self, incrementer):
threads = []
for i in range(self._number):
threads.append(threading.Thread(\
target=self.setup_incrementer,\
args=(incrementer,)\
))
for th in threads: th.start()
self._started.set()
for th in threads: th.join()
def get_count(self, zone_name, counter_name):
return isc.cc.data.find(\
self.xfrout_counter.get_statistics(),\
'%s/%s/%s' % (XfroutCounter.perzone_prefix,\
zone_name, counter_name))
def test_incrementers(self):
result = { XfroutCounter.entire_server: {},
TEST_ZONE_NAME_STR: {} }
for counter_name in self._counters:
incrementer = getattr(self.xfrout_counter, 'inc_%s' % counter_name)
self.start_incrementer(incrementer)
self.assertEqual(self.get_count(\
TEST_ZONE_NAME_STR, counter_name), \
self._number * self._cycle)
self.assertEqual(self.get_count(\
XfroutCounter.entire_server, counter_name), \
self._number * self._cycle)
result[XfroutCounter.entire_server][counter_name] = \
result[TEST_ZONE_NAME_STR][counter_name] = \
self._number * self._cycle
self.assertEqual(
self.xfrout_counter.get_statistics(),
{XfroutCounter.perzone_prefix: result})
def test_add_perzone_counter(self):
for counter_name in self._counters:
self.assertRaises(isc.cc.data.DataNotFoundError,\
self.get_count, TEST_ZONE_NAME_STR, counter_name)
self.xfrout_counter._add_perzone_counter(TEST_ZONE_NAME_STR)
for counter_name in self._counters:
self.assertEqual(self.get_count(TEST_ZONE_NAME_STR, counter_name), 0)
class TestXfroutServer(unittest.TestCase):
def setUp(self):
......@@ -1514,6 +1654,11 @@ class TestXfroutServer(unittest.TestCase):
self.assertTrue(self.xfrout_server._notifier.shutdown_called)
self.assertTrue(self.xfrout_server._cc.stopped)
def test_getstats(self):
self.assertEqual(
self.xfrout_server.command_handler('getstats', None), \
create_answer(0, {}))
if __name__== "__main__":
isc.log.resetUnitTestRootLogger()
unittest.main()
......@@ -153,7 +153,8 @@ def get_soa_serial(soa_rdata):
class XfroutSession():
def __init__(self, sock_fd, request_data, server, tsig_key_ring, remote,
default_acl, zone_config, client_class=DataSourceClient):
default_acl, zone_config, client_class=DataSourceClient,
counter_xfrrej=None, counter_xfrreqdone=None):
self._sock_fd = sock_fd
self._request_data = request_data
self._server = server
......@@ -168,6 +169,10 @@ class XfroutSession():
self.ClientClass = client_class # parameterize this for testing
self._soa = None # will be set in _xfrout_setup or in tests
self._jnl_reader = None # will be set to a reader for IXFR
# Set counter handlers for counting Xfr requests. An argument
# is required for zone name.
self._counter_xfrrej = counter_xfrrej
self._counter_xfrreqdone = counter_xfrreqdone
self._handle()
def create_tsig_ctx(self, tsig_record, tsig_key_ring):
......@@ -270,6 +275,9 @@ class XfroutSession():
format_zone_str(zone_name, zone_class))
return None, None
elif acl_result == REJECT:
if self._counter_xfrrej is not None:
# count rejected Xfr request by each zone name
self._counter_xfrrej(zone_name.to_text())
logger.debug(DBG_XFROUT_TRACE, XFROUT_QUERY_REJECTED,
self._request_type, format_addrinfo(self._remote),
format_zone_str(zone_name, zone_class))
......@@ -525,6 +533,9 @@ class XfroutSession():
except Exception as err:
logger.error(XFROUT_XFR_TRANSFER_ERROR, self._request_typestr,
format_addrinfo(self._remote), zone_str, err)
if self._counter_xfrreqdone is not None:
# count done Xfr requests by each zone name
self._counter_xfrreqdone(zone_name.to_text())
logger.info(XFROUT_XFR_TRANSFER_DONE, self._request_typestr,
format_addrinfo(self._remote), zone_str)
......@@ -634,7 +645,7 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn,
'''The unix domain socket server which accept xfr query sent from auth server.'''
def __init__(self, sock_file, handle_class, shutdown_event, config_data,
cc):
cc, **counters):
self._remove_unused_sock_file(sock_file)
self._sock_file = sock_file
socketserver_mixin.NoPollMixIn.__init__(self)
......@@ -644,6 +655,8 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn,
self._common_init()
self._cc = cc
self.update_config_data(config_data)
# handlers for statistics use
self._counters = counters
def _common_init(self):
'''Initialization shared with the mock server class used for tests'''
......@@ -798,7 +811,8 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn,
self._lock.release()
self.RequestHandlerClass(sock_fd, request_data, self,
isc.server_common.tsig_keyring.get_keyring(),
self._guess_remote(sock_fd), acl, zone_config)
self._guess_remote(sock_fd), acl, zone_config,
**self._counters)
def _remove_unused_sock_file(self, sock_file):
'''Try to remove the socket file. If the file is being used
......@@ -926,6 +940,107 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn,
self._transfers_counter -= 1
self._lock.release()
class XfroutCounter:
"""A class for handling all statistics counters of Xfrout. In
this class, the structure of per-zone counters is assumed to be
like this:
zones/example.com./notifyoutv4
zones/example.com./notifyoutv6
zones/example.com./xfrrej
zones/example.com./xfrreqdone
"""
# '_SERVER_' is a special zone name representing an entire
# count. It doesn't mean a specific zone, but it means an
# entire count in the server.
entire_server = '_SERVER_'
# zone names are contained under this dirname in the spec file.
perzone_prefix = 'zones'
def __init__(self, statistics_spec):
self._statistics_spec = statistics_spec
# holding statistics data for Xfrout module
self._statistics_data = {}
self._lock = threading.RLock()
self._create_perzone_incrementers()
def get_statistics(self):
"""Calculates an entire server counts, and returns statistics
data format to send out the stats module including each
counter. If there is no counts, then it returns an empty
dictionary. Locks the thread because it is considered to be
invoked by a multi-threading caller."""
# If self._statistics_data contains nothing of zone name, it
# returns an empty dict.
if len(self._statistics_data) == 0: return {}
zones = {}
with self._lock:
zones = self._statistics_data[self.perzone_prefix].copy()
# Start calculation for '_SERVER_' counts
attrs = self._get_default_statistics_data()[self.perzone_prefix][self.entire_server]
statistics_data = {self.perzone_prefix: {}}
for attr in attrs:
sum_ = 0
for name in zones:
if name == self.entire_server: continue
if attr in zones[name]:
if name not in statistics_data[self.perzone_prefix]:
statistics_data[self.perzone_prefix][name] = {}
statistics_data[self.perzone_prefix][name].update(
{attr: zones[name][attr]}
)
sum_ += zones[name][attr]
if sum_ > 0:
if self.entire_server not in statistics_data[self.perzone_prefix]:
statistics_data[self.perzone_prefix][self.entire_server] = {}
statistics_data[self.perzone_prefix][self.entire_server].update({attr: sum_})
return statistics_data
def _get_default_statistics_data(self):
"""Returns default statistics data from the spec file"""
statistics_data = {}
for id_ in isc.config.spec_name_list(self._statistics_spec):
spec = isc.config.find_spec_part(self._statistics_spec, id_)
statistics_data.update({id_: spec['item_default']})
return statistics_data
def _create_perzone_incrementers(self):
"""Creates increment method of each per-zone counter based on
the spec file. Incrementer can be accessed by name
"inc_${item_name}".Incrementers are passed to the
XfroutSession and NotifyOut class as counter handlers."""
# add a new element under the named_set item for the zone
zones_spec = isc.config.find_spec_part(
self._statistics_spec, self.perzone_prefix)
item_list = isc.config.spec_name_list(\
zones_spec['named_set_item_spec']['map_item_spec'])
# can be accessed by the name 'inc_xxx'
for item in item_list:
def __perzone_incrementer(zone_name, counter_name=item, step=1):
"""A per-zone incrementer for counter_name. Locks the thread
because it is considered to be invoked by a multi-threading
caller."""
with self._lock:
self._add_perzone_counter(zone_name)
self._statistics_data[self.perzone_prefix][zone_name][counter_name] += step
setattr(self, 'inc_%s' % item, __perzone_incrementer)
def _add_perzone_counter(self, zone):
"""Adds named_set-type counter for each zone name"""
try:
self._statistics_data[self.perzone_prefix][zone]
except KeyError:
# add a new element under the named_set item for the zone
map_spec = isc.config.find_spec_part(
self._statistics_spec, '%s/%s' % \
(self.perzone_prefix, zone))['map_item_spec']
id_list = isc.config.spec_name_list(map_spec)
for id_ in id_list:
spec = isc.config.find_spec_part(map_spec, id_)
isc.cc.data.set(self._statistics_data,
'%s/%s/%s' % \
(self.perzone_prefix, zone, id_),
spec['item_default'])
class XfroutServer:
def __init__(self):
self._unix_socket_server = None
......@@ -933,6 +1048,8 @@ class XfroutServer:
self._shutdown_event = threading.Event()
self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
self._config_data = self._cc.get_full_config()
self._counter = XfroutCounter(
self._cc.get_module_spec().get_statistics_spec())
self._cc.start()
self._cc.add_remote_config(AUTH_SPECFILE_LOCATION)
isc.server_common.tsig_keyring.init_keyring(self._cc)
......@@ -941,17 +1058,25 @@ class XfroutServer:
def _start_xfr_query_listener(self):
'''Start a new thread to accept xfr query. '''
self._unix_socket_server = UnixSockServer(self._listen_sock_file,
XfroutSession,
self._shutdown_event,
self._config_data,
self._cc)
self._unix_socket_server = UnixSockServer(
self._listen_sock_file,
XfroutSession,
self._shutdown_event,
self._config_data,
self._cc,
counter_xfrrej=self._counter.inc_xfrrej,
counter_xfrreqdone=self._counter.inc_xfrreqdone
)
listener = threading.Thread(target=self._unix_socket_server.serve_forever)
listener.start()
def _start_notifier(self):
datasrc = self._unix_socket_server.get_db_file()
self._notifier = notify_out.NotifyOut(datasrc)
self._notifier = notify_out.NotifyOut(
datasrc,
counter_notifyoutv4=self._counter.inc_notifyoutv4,
counter_notifyoutv6=self._counter.inc_notifyoutv6
)
if 'also_notify' in self._config_data:
for slave in self._config_data['also_notify']:
self._notifier.add_slave(slave['address'], slave['port'])
......@@ -1027,6 +1152,15 @@ class XfroutServer:
else:
answer = create_answer(1, "Bad command parameter:" + str(args))
# return statistics data to the stats daemon
elif cmd == "getstats":
# The log level is here set to debug in order to avoid
# that a log becomes too verbose. Because the b10-stats
# daemon is periodically asking to the b10-xfrout daemon.
logger.debug(DBG_XFROUT_TRACE, \
XFROUT_RECEIVED_GETSTATS_COMMAND)
answer = create_answer(0, self._counter.get_statistics())
else:
answer = create_answer(1, "Unknown command:" + str(cmd))
......
......@@ -114,6 +114,65 @@
"item_default": "IN"
} ]
}
],
"statistics": [
{
"item_name": "zones",
"item_type": "named_set",
"item_optional": false,
"item_default": {
"_SERVER_" : {
"notifyoutv4" : 0,
"notifyoutv6" : 0,
"xfrrej" : 0,
"xfrreqdone" : 0
}
},
"item_title": "Zone names",
"item_description": "Zone names for Xfrout statistics",
"named_set_item_spec": {
"item_name": "zonename",
"item_type": "map",
"item_optional": false,
"item_default": {},
"item_title": "Zone name",
"item_description": "Zone name for Xfrout statistics",
"map_item_spec": [
{
"item_name": "notifyoutv4",
"item_type": "integer",
"item_optional": false,
"item_default": 0,
"item_title": "IPv4 notifies",
"item_description": "Number of IPv4 notifies per zone name sent out from Xfrout"
},
{
"item_name": "notifyoutv6",
"item_type": "integer",
"item_optional": false,
"item_default": 0,
"item_title": "IPv6 notifies",
"item_description": "Number of IPv6 notifies per zone name sent out from Xfrout"
},
{
"item_name": "xfrrej",
"item_type": "integer",
"item_optional": false,
"item_default": 0,
"item_title": "XFR rejected requests",
"item_description": "Number of XFR requests per zone name rejected by Xfrout"
},
{
"item_name": "xfrreqdone",
"item_type": "integer",
"item_optional": false,
"item_default": 0,
"item_title": "Requested zone transfers",
"item_description": "Number of requested zone transfers completed per zone name"
}
]
}
}
]
}
}
......
......@@ -107,6 +107,10 @@ received from the configuration manager.
The xfrout daemon received a command on the command channel that
NOTIFY packets should be sent for the given zone.
% XFROUT_RECEIVED_GETSTATS_COMMAND received command to get statistics data
The xfrout daemon received a command on the command channel that
statistics data should be sent to the stats daemon.
% XFROUT_PARSE_QUERY_ERROR error parsing query: %1
There was a parse error while reading an incoming query. The parse
error is shown in the log message. A remote client sent a packet we
......
......@@ -125,9 +125,10 @@ class ZoneNotifyInfo:
class NotifyOut:
'''This class is used to handle notify logic for all zones(sending
notify message to its slaves). notify service can be started by
calling dispatcher(), and it can be stoped by calling shutdown()
calling dispatcher(), and it can be stopped by calling shutdown()
in another thread. '''
def __init__(self, datasrc_file, verbose=True):
def __init__(self, datasrc_file, counter_handler=None, verbose=True,
counter_notifyoutv4=None, counter_notifyoutv6=None):
self._notify_infos = {} # key is (zone_name, zone_class)
self._waiting_zones = []
self._notifying_zones = []
......@@ -142,6 +143,10 @@ class NotifyOut:
# Use nonblock event to eliminate busy loop
# If there are no notifying zones, clear the event bit and wait.
self._nonblock_event = threading.Event()
# Set counter handlers for counting notifies. An argument is
# required for zone name.
self._counter_notifyoutv4 = counter_notifyoutv4
self._counter_notifyoutv6 = counter_notifyoutv6
def _init_notify_out(self, datasrc_file):
'''Get all the zones name and its notify target's address.
......@@ -478,6 +483,15 @@ class NotifyOut:
try:
sock = zone_notify_info.create_socket(addrinfo[0])
sock.sendto(render.get_data(), 0, addrinfo)
# count notifying by IPv4 or IPv6 for statistics
if zone_notify_info.get_socket().family \
== socket.AF_INET \
and self._counter_notifyoutv4 is not None:
self._counter_notifyoutv4(zone_notify_info.zone_name)
elif zone_notify_info.get_socket().family \
== socket.AF_INET6 \
and self._counter_notifyoutv6 is not None:
self._counter_notifyoutv6(zone_notify_info.zone_name)
logger.info(NOTIFY_OUT_SENDING_NOTIFY, addrinfo[0],
addrinfo[1])
except (socket.error, addr.InvalidAddress) as err:
......
......@@ -61,6 +61,7 @@ class MockZoneNotifyInfo(notify_out.ZoneNotifyInfo):
self.sock_family = self._sock.family
self._sock.close()
self._sock = MockSocket()
self._sock.family = self.sock_family
return self._sock
class TestZoneNotifyInfo(unittest.TestCase):
......@@ -95,7 +96,13 @@ class TestZoneNotifyInfo(unittest.TestCase):
class TestNotifyOut(unittest.TestCase):
def setUp(self):
self._db_file = TESTDATA_SRCDIR + '/test.sqlite3'
self._notify = notify_out.NotifyOut(self._db_file)