Commit 4c67dd80 authored by Naoki Kambe's avatar Naoki Kambe
Browse files

[2136] do group_sendmsg too all modules then do group_recvmsg in

do_polling for efficiency

Also added setting timeout value to one second for precise and
refactoring timeout handling
parent da81d3d6
......@@ -23,6 +23,8 @@ import sys; sys.path.append ('@@PYTHONPATH@@')
import os
from time import time, strftime, gmtime
from optparse import OptionParser, OptionValueError
import errno
import select
import isc
import isc.util.process
......@@ -151,90 +153,117 @@ class Stats:
(itm['item_name'], self.mccs.get_value(itm['item_name'])[0])
for itm in self.mccs.get_module_spec().get_config_spec()
])
# set a absolute timestamp polling at next time
self.next_polltime = get_timestamp() + self.get_interval()
# initialized Statistics data
if self.update_statistics_data(
self.module_name,
self.cc_session.lname,
{'lname': self.cc_session.lname,
'boot_time': get_datetime(_BASETIME),
'last_update_time': get_datetime()}):
logger.warn(STATS_RECEIVED_INVALID_STATISTICS_DATA,
self.module_name)
# try to do polling firstly
self.do_polling()
def get_interval(self):
"""return the current value of 'poll-interval'"""
return self.config['poll-interval']
def do_polling(self):
"""Polls modules for statistics data. Return nothing. First
search multiple instances of same module. Second requests
each module to invoke 'getstats' with a 'trees' argument.
Finally updates internal statistics data every time it gets
from each instance."""
# count the number of instances of same module by examing
# 'components' of Boss via ConfigManager
seq = self.cc_session.group_sendmsg(
isc.config.ccsession.create_command(
isc.config.ccsession.COMMAND_GET_CONFIG,
{"module_name": "Boss"}), 'ConfigManager')
(answer, env) = self.cc_session.group_recvmsg(False, seq)
modules = []
if answer:
(rcode, value) = isc.config.ccsession.parse_answer(answer)
if rcode == 0 and 'components' in value:
modules = [ c['special'].capitalize() \
for c in value['components'].values() \
if 'special' in c ]
# start requesting each module to collect statistics data
sequences = []
for (module_name, data) in self.get_statistics_data().items():
# skip if module_name is 'Stats'
if module_name == self.module_name: continue
logger.debug(DBG_STATS_MESSAGING, STATS_SEND_STATISTICS_REQUEST,
module_name)
cmd = isc.config.ccsession.create_command(
"getstats", {'trees': [k for k in data.keys()]})
seq = self.cc_session.group_sendmsg(cmd, module_name)
sequences.append((module_name, seq))
cnt = modules.count(module_name)
if cnt > 1:
sequences = sequences + [ (module_name, seq) \
for i in range(cnt-1) ]
# start receiving statistics data
while len(sequences) > 0:
try:
(module_name, seq) = sequences.pop(0)
answer, env = self.cc_session.group_recvmsg(
False, seq)
if answer:
rcode, args = isc.config.ccsession.parse_answer(
answer)
if rcode == 0:
if self.update_statistics_data(
module_name, env['from'], args):
logger.warn(
STATS_RECEIVED_INVALID_STATISTICS_DATA,
module_name)
else:
if self.update_statistics_data(
self.module_name,
self.cc_session.lname,
{'last_update_time': get_datetime()}):
logger.warn(
STATS_RECEIVED_INVALID_STATISTICS_DATA,
self.module_name)
# skip this module if SessionTimeout raised
except isc.cc.session.SessionTimeout:
pass
def start(self):
"""
Start stats module
"""
self.running = True
logger.info(STATS_STARTING)
# initialized Statistics data
if self.update_statistics_data(
self.module_name,
lname=self.cc_session.lname,
boot_time=get_datetime(_BASETIME),
last_update_time=get_datetime()):
logger.warn(STATS_RECEIVED_INVALID_STATISTICS_DATA,
self.module_name)
def _poll_modules():
"""poll modules for statistics data"""
# exam number of multi-type module by getting
# components of boss config
num_of_modules = {}
seq = self.cc_session.group_sendmsg(
isc.config.ccsession.create_command(
isc.config.ccsession.COMMAND_GET_CONFIG,
{"module_name": "Boss"}), 'ConfigManager')
(answer, env) = self.cc_session.group_recvmsg(False, seq)
if answer:
(rcode, value) = isc.config.ccsession.parse_answer(answer)
if rcode == 0 and 'components' in value:
for c in value['components'].values():
if 'special' in c:
mname = c['special'].capitalize()
if mname in num_of_modules:
num_of_modules[mname] += 1
else:
num_of_modules[mname] = 1
# start requesting each module to collect statistics data
for (module_name, data) in self.get_statistics_data().items():
# skip if module_name is 'Stats'
if module_name == self.module_name: continue
logger.debug(DBG_STATS_MESSAGING, STATS_SEND_STATISTICS_REQUEST,
module_name)
cmd = isc.config.ccsession.create_command(
"getstats", {'trees': [k for k in data.keys()]})
seq = self.cc_session.group_sendmsg(cmd, module_name)
try:
n = 1
if module_name in num_of_modules \
and num_of_modules[module_name] > 1:
n = num_of_modules[module_name]
for i in range(n):
answer, env = self.cc_session.group_recvmsg(False, seq)
if answer:
rcode, args = isc.config.ccsession.parse_answer(answer)
if rcode == 0:
if self.update_statistics_data(
module_name, env['from'], **args):
logger.warn(STATS_RECEIVED_INVALID_STATISTICS_DATA,
module_name)
else:
if self.update_statistics_data(
self.module_name,
last_update_time=get_datetime()):
logger.warn(STATS_RECEIVED_INVALID_STATISTICS_DATA,
self.module_name)
# skip this module if SessionTimeout raised
except isc.cc.session.SessionTimeout:
pass
def _check_command(nonblock=False):
"""check invoked command by waiting for 'poll-interval'
seconds"""
# backup original timeout
orig_timeout = self.cc_session.get_timeout()
# set config['poll-interval'] * 1000 (milliseconds) to
# timeout of cc-sesson
self.cc_session.set_timeout(self.get_interval()*1000)
try:
answer, env = self.cc_session.group_recvmsg(nonblock)
self.mccs.check_command_without_recvmsg(answer, env)
except isc.cc.session.SessionTimeout:
pass # waited for poll-interval seconds
# restore timeout
self.cc_session.set_timeout(orig_timeout)
try:
start_poll = get_timestamp() - self.config['poll-interval']
self.running = True
while self.running:
# don't do polling if 'poll-interval' is 0
if self.config['poll-interval'] > 0 and \
get_timestamp() - start_poll >= self.config['poll-interval']:
_poll_modules()
start_poll = get_timestamp()
try:
answer, env = self.cc_session.group_recvmsg(False)
self.mccs.check_command_without_recvmsg(answer, env)
except isc.cc.session.SessionTimeout:
pass
_check_command()
if self.get_interval() > 0 and get_timestamp() >= self.next_polltime:
# update the next polling timestamp
self.next_polltime = get_timestamp() + self.get_interval()
self.do_polling()
finally:
self.mccs.send_stopping()
......@@ -256,6 +285,9 @@ class Stats:
1, "Negative integer ignored")
self.config.update(new_config)
if 'poll-interval' in self.config:
# update next polling timestamp
self.next_polltime = get_timestamp() + self.get_interval()
return isc.config.create_answer(0)
def command_handler(self, command, kwargs):
......@@ -324,7 +356,7 @@ class Stats:
+ "owner: " + str(owner) + ", "
+ "name: " + str(name))
def update_statistics_data(self, owner=None, mid=None, **data):
def update_statistics_data(self, owner=None, mid=None, data=None):
"""
change statistics date of specified module into specified
data. It updates information of each module first, and it
......@@ -455,8 +487,9 @@ class Stats:
STATS_RECEIVED_SHOW_ALL_COMMAND)
errors = self.update_statistics_data(
self.module_name,
timestamp=get_timestamp(),
report_time=get_datetime()
self.cc_session.lname,
{'timestamp': get_timestamp(),
'report_time': get_datetime()}
)
if errors:
raise StatsError("stats spec file is incorrect: "
......
......@@ -217,11 +217,6 @@ class TestStats(unittest.TestCase):
# Also temporarily disabled for #1668, see above
#self.assertTrue(self.stats.mccs.stopped)
# start with err
self.stats = stats.Stats()
self.stats.update_statistics_data = lambda x,**y: ['an error']
self.assertRaises(stats.StatsError, self.stats.start)
def test_handlers(self):
self.stats_server = ThreadingServerManager(MyStats)
self.stats = self.stats_server.server
......@@ -744,39 +739,39 @@ class TestStats(unittest.TestCase):
isc.config.create_answer(
1, "module name is not specified"))
def test_statistics_data(self):
def test_polling(self):
stats_server = ThreadingServerManager(MyStats)
stats = stats_server.server
stat = stats_server.server
stats_server.run()
self.assertEqual(
send_command('status', 'Stats'),
(0, "Stats is up. (PID " + str(os.getpid()) + ")"))
# check statistics data of 'Boss'
boss = self.base.boss.server
self.assertEqual(
stats.statistics_data_bymid['Boss']\
[self.base.boss.server.cc_session.lname],
stat.statistics_data_bymid['Boss'][boss.cc_session.lname],
{'boot_time': self.const_datetime})
self.assertEqual(
len(stats.statistics_data_bymid['Boss']), 1)
len(stat.statistics_data_bymid['Boss']), 1)
self.assertEqual(
stats.statistics_data['Boss'],
stat.statistics_data['Boss'],
{'boot_time': self.const_datetime})
# check statistics data of each 'Auth' instances
list_auth = ['', '2', '3', '4']
for i in list_auth:
auth = getattr(self.base,"auth"+i).server
self.assertEqual(
stats.statistics_data_bymid['Auth']\
stat.statistics_data_bymid['Auth']\
[auth.cc_session.lname],
{'queries.perzone': auth.queries_per_zone,
'queries.tcp': auth.queries_tcp,
'queries.udp': auth.queries_udp})
n = len(stats.statistics_data_bymid['Auth'])
n = len(stat.statistics_data_bymid['Auth'])
self.assertEqual(n, len(list_auth))
# check consolidation of statistics data of the auth
# instances
self.assertEqual(
stats.statistics_data['Auth'],
stat.statistics_data['Auth'],
{'queries.perzone': [
{'zonename':
auth.queries_per_zone[0]['zonename'],
......@@ -784,8 +779,38 @@ class TestStats(unittest.TestCase):
auth.queries_per_zone[0]['queries.tcp']*n,
'queries.udp':
auth.queries_per_zone[0]['queries.udp']*n}],
'queries.tcp': auth.queries_tcp*n,
'queries.udp': auth.queries_udp*n})
'queries.tcp': auth.queries_tcp*n,
'queries.udp': auth.queries_udp*n})
# check statistics data of 'Stats'
self.assertEqual(
len(stat.statistics_data['Stats']), 5)
self.assertTrue('boot_time' in
stat.statistics_data['Stats'])
self.assertTrue('last_update_time' in
stat.statistics_data['Stats'])
self.assertTrue('report_time' in
stat.statistics_data['Stats'])
self.assertTrue('timestamp' in
stat.statistics_data['Stats'])
self.assertEqual(
stat.statistics_data['Stats']['lname'],
stat.cc_session.lname)
stats_server.shutdown()
def test_polling2(self):
stats_server = ThreadingServerManager(MyStats)
stat = stats_server.server
boss = self.base.boss.server
# set invalid statistics
boss.statistics_data = {'boot_time':1}
stats_server.run()
self.assertEqual(
send_command('status', 'Stats'),
(0, "Stats is up. (PID " + str(os.getpid()) + ")"))
# check default statistics data of 'Boss'
self.assertEqual(
stat.statistics_data['Boss'],
{'boot_time': self.const_default_datetime})
stats_server.shutdown()
class TestOSEnv(unittest.TestCase):
......
......@@ -249,6 +249,9 @@ class MockBoss:
[ 9998, "b10-auth-2" ],
[ 9997, "b10-auth-3" ],
[ 9996, "b10-auth-4" ]]
self.statistics_data = {
'boot_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', self._BASETIME)
}
def run(self):
self.mccs.start()
......@@ -269,9 +272,7 @@ class MockBoss:
def command_handler(self, command, *args, **kwargs):
self._started.set()
self.got_command_name = command
sdata = {
'boot_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', self._BASETIME)
}
sdata = self.statistics_data
params = { "owner": "Boss",
"data": sdata
}
......@@ -426,7 +427,7 @@ class MyStats(stats.Stats):
try:
self.start()
except Exception:
raise
pass
def shutdown(self):
self.command_shutdown()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment