Commit 4e8a4299 authored by Jelte Jansen's avatar Jelte Jansen
Browse files

[640] send around 'stopping' messages, and handle them

When closing a ModuleCCSession, a 'i am stopping' message is now sent to the ConfigManager, which in turn informs Cmdctl (which then causes bindctl to update its list).

This causes stopped modules to disappear from bindctl until such time that they are enabled or started again, so that config updates and most importantly, commands, do not cause weird timeouts (bindctl will immediately inform you of modules not running)
parent 789a5935
......@@ -674,6 +674,8 @@ class BoB:
if not self.__started:
raise Exception("Component failed during startup");
else:
if self.ccs is not None:
self.ccs.stop()
self.runnable = False
def shutdown(self):
......
......@@ -311,11 +311,15 @@ class CommandControl():
answer = ccsession.create_answer(0)
if command == ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE:
with self._lock:
self.modules_spec[args[0]] = args[1]
if args[1] is None and args[0] in self.modules_spec:
del self.modules_spec[args[0]]
else:
self.modules_spec[args[0]] = args[1]
elif command == ccsession.COMMAND_SHUTDOWN:
#When cmdctl get 'shutdown' command from boss,
#shutdown the outer httpserver.
self._module_cc.stop()
self._httpserver.shutdown()
self._serving = False
......
......@@ -345,6 +345,29 @@ class TestCommandControl(unittest.TestCase):
self.assertEqual(rcode, 0)
self.assertTrue(msg != None)
def test_command_handler_spec_update(self):
# Should not be present
self.assertFalse("foo" in self.cmdctl.modules_spec)
answer = self.cmdctl.command_handler(
ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE, [ "foo", {} ])
rcode, msg = ccsession.parse_answer(answer)
self.assertEqual(rcode, 0)
self.assertEqual(msg, None)
# Should now be present
self.assertTrue("foo" in self.cmdctl.modules_spec)
# When sending specification 'None', it should be removed
answer = self.cmdctl.command_handler(
ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE, [ "foo", None ])
rcode, msg = ccsession.parse_answer(answer)
self.assertEqual(rcode, 0)
self.assertEqual(msg, None)
# Should no longer be present
self.assertFalse("foo" in self.cmdctl.modules_spec)
def test_check_config_handler(self):
answer = self.cmdctl.config_handler({'non-exist': 123})
self._check_answer(answer, 1, 'unknown config item: non-exist')
......
......@@ -150,9 +150,10 @@ class DDNSServer:
Perform any cleanup that is necessary when shutting down the server.
Do NOT call this to initialize shutdown, use trigger_shutdown().
Currently, it does nothing, but cleanup routines are expected.
Currently, it only causes the ModuleCCSession to send a message that
this module is stopping.
'''
pass
self._cc.stop()
def accept(self):
"""
......
......@@ -58,11 +58,16 @@ class MyCCSession(isc.config.ConfigData):
ddns.SPECFILE_LOCATION)
isc.config.ConfigData.__init__(self, module_spec)
self._started = False
self._stopped = False
def start(self):
'''Called by DDNSServer initialization, but not used in tests'''
self._started = True
def stop(self):
'''Called by shutdown code'''
self._stopped = True
def get_socket(self):
"""
Used to get the file number for select.
......@@ -289,6 +294,7 @@ class TestDDNSServer(unittest.TestCase):
self.__select_answer = ([3], [], [])
self.ddns_server.run()
self.assertTrue(self.ddns_server._shutdown)
self.assertTrue(self.__cc_session._stopped)
self.assertIsNone(self.__select_answer)
self.assertEqual(3, self.__hook_called)
......
......@@ -184,8 +184,11 @@ class Stats:
raise StatsError("stats spec file is incorrect: "
+ ", ".join(errors))
while self.running:
self.mccs.check_command(False)
try:
while self.running:
self.mccs.check_command(False)
finally:
self.mccs.stop()
def config_handler(self, new_config):
"""
......
......@@ -203,6 +203,7 @@ class StatsHttpd:
"""Closes a ModuleCCSession object"""
if self.mccs is None:
return
self.mccs.stop()
logger.debug(DBG_STATHTTPD_INIT, STATHTTPD_CLOSING_CC_SESSION)
self.mccs.close()
......
......@@ -106,6 +106,9 @@ class XfrinTestTimeoutException(Exception):
pass
class MockCC():
def __init__(self):
self.stop_called = False
def get_default_value(self, identifier):
# The returned values should be identical to the spec file
# XXX: these should be retrieved from the spec file
......@@ -117,6 +120,9 @@ class MockCC():
if identifier == "zones/use_ixfr":
return False
def stop(self):
self.stop_called = True
class MockDataSourceClient():
'''A simple mock data source client.
......@@ -2052,7 +2058,9 @@ class TestXfrin(unittest.TestCase):
self.args['tsig_key'] = ''
def tearDown(self):
self.assertFalse(self.xfr._module_cc.stop_called);
self.xfr.shutdown()
self.assertTrue(self.xfr._module_cc.stop_called);
sys.stderr= self.stderr_backup
def _do_parse_zone_name_class(self):
......
......@@ -1224,6 +1224,7 @@ class Xfrin:
''' shutdown the xfrin process. the thread which is doing xfrin should be
terminated.
'''
self._module_cc.stop()
self._shutdown_event.set()
main_thread = threading.currentThread()
for th in threading.enumerate():
......
......@@ -969,6 +969,7 @@ class XfroutServer:
global xfrout_server
xfrout_server = None #Avoid shutdown is called twice
self._cc.stop()
self._shutdown_event.set()
self._notifier.shutdown()
if self._unix_socket_server:
......
......@@ -489,6 +489,18 @@ ModuleCCSession::ModuleCCSession(
}
ModuleCCSession::~ModuleCCSession() {
try {
sendStopping();
} catch (const std::exception& exc) {
LOG_ERROR(config_logger,
CONFIG_CCSESSION_STOPPING).arg(exc.what());
} catch (...) {
LOG_ERROR(config_logger,
CONFIG_CCSESSION_STOPPING_UNKNOWN);
}
};
void
ModuleCCSession::start() {
if (started_) {
......@@ -741,5 +753,15 @@ ModuleCCSession::updateRemoteConfig(const std::string& module_name,
}
}
void
ModuleCCSession::sendStopping() {
// Inform the configuration manager that this module is stopping
ConstElementPtr cmd(createCommand("stopping",
Element::fromJSON("{\"module_name\": \"" +
module_name_ + "\"}")));
// It's just an FYI, configmanager is not expected to respond.
session_.group_sendmsg(cmd, "ConfigManager");
}
}
}
......@@ -192,6 +192,14 @@ public:
bool handle_logging = true
);
/**
* Destructor
*
* The desctructor automatically calls sendStopping(), which sends
* a message to the ConfigManager that this module is stopping
*/
~ModuleCCSession();
/// Start receiving new commands and configuration changes asynchronously.
///
/// This method must be called only once, and only when the ModuleCCSession
......@@ -353,6 +361,7 @@ public:
private:
ModuleSpec readModuleSpecification(const std::string& filename);
void startCheck();
void sendStopping();
bool started_;
std::string module_name_;
......
......@@ -30,6 +30,18 @@ but will not send back an answer.
The most likely cause of this error is a programming error. Please raise
a bug report.
% CONFIG_CCSESSION_STOPPING error sending stopping message: %1
There was a problem when sending a message signaling that the module using
this CCSession is stopping. This message is sent so that the rest of the
system is aware that the module is no longer running. Apart from logging
this message, the error itself is ignored, and the ModuleCCSession is
still stopped. The specific exception message is printed.
% CONFIG_CCSESSION_STOPPING_UNKNOWN unknown error sending stopping message
Similar to CONFIG_CCSESSION_STOPPING, but in this case the exception that
is seen is not a standard exception, and further information is unknown.
This is a bug.
% CONFIG_GET_FAIL error getting configuration from cfgmgr: %1
The configuration manager returned an error when this module requested
the configuration. The full error message answer from the configuration
......
......@@ -190,6 +190,33 @@ TEST_F(CCSessionTest, session2) {
EXPECT_EQ(0, session.getMsgQueue()->size());
}
TEST_F(CCSessionTest, session_close) {
// Test whether ModuleCCSession automatically sends a 'stopping'
// message when it is destroyed
ConstElementPtr msg;
std::string group, to;
EXPECT_FALSE(session.haveSubscription("Spec2", "*"));
{
ModuleCCSession mccs(ccspecfile("spec2.spec"), session, NULL, NULL,
true, false);
EXPECT_TRUE(session.haveSubscription("Spec2", "*"));
// The initial message is irrelevant for this test
// (see session2 test), drop it
session.getFirstMessage(group, to);
// Queue should now be empty
ASSERT_EQ(0, session.getMsgQueue()->size());
}
// Destructor should have cause a new message
ASSERT_EQ(1, session.getMsgQueue()->size());
msg = session.getFirstMessage(group, to);
EXPECT_EQ("{ \"command\": [ \"stopping\", "
"{ \"module_name\": \"Spec2\" } ] }", msg->str());
EXPECT_EQ("ConfigManager", group);
EXPECT_EQ("*", to);
EXPECT_EQ(0, session.getMsgQueue()->size());
}
ConstElementPtr my_config_handler(ConstElementPtr new_config) {
if (new_config && new_config->contains("item1") &&
new_config->get("item1")->intValue() == 5) {
......
......@@ -97,6 +97,7 @@ COMMAND_SET_CONFIG = "set_config"
COMMAND_GET_MODULE_SPEC = "get_module_spec"
COMMAND_MODULE_SPEC = "module_spec"
COMMAND_SHUTDOWN = "shutdown"
COMMAND_MODULE_STOPPING = "stopping"
def parse_command(msg):
"""Parses what may be a command message. If it looks like one,
......@@ -210,6 +211,13 @@ class ModuleCCSession(ConfigData):
self.__send_spec()
self.__request_config()
def stop(self):
"""Inform the system that the module using this ModuleCCSession
is stopping. This call will only cause a 'stopping'
message to be sent to the ConfigManager, it does not clear or
free any resources."""
self.__send_stopping()
def get_socket(self):
"""Returns the socket from the command channel session. This
should *only* be used for select() loops to see if there
......@@ -371,7 +379,22 @@ class ModuleCCSession(ConfigData):
except isc.cc.SessionTimeout:
# TODO: log an error?
pass
def __send_stopping(self):
"""Sends a 'stopping' message to the configuration manager. This
message is just an FYI, and no response is expected. Any errors
when sending this message (for instance if the msgq session has
previously been closed) are logged, but ignored."""
msg = create_command(COMMAND_MODULE_STOPPING,
self.get_module_spec().get_full_spec())
try:
self._session.group_sendmsg(msg, "ConfigManager")
except isc.cc.session.SessionError as se:
# If the session was previously closed, obvously trying to send
# a message fails. (TODO: check if session is open so we can
# error on real problems?)
logger.error(CONFIG_SESSION_STOPPING_FAILED, str(se))
def __request_config(self):
"""Asks the configuration manager for the current configuration, and call the config handler if set.
Raises a ModuleCCSessionError if there is no answer from the configuration manager"""
......@@ -419,6 +442,7 @@ class UIModuleCCSession(MultiConfigData):
# so changes are needed there to make this clean (we need a command to simply get the
# full specs for everything, including commands etc, not separate gets for that)
specs = self._conn.send_GET('/module_spec')
self.clear_specifications()
for module in specs.keys():
self.set_specification(isc.config.ModuleSpec(specs[module]))
......
......@@ -456,13 +456,34 @@ class ConfigManager:
# todo: error checking (like keyerrors)
answer = {}
self.set_module_spec(spec)
self._send_module_spec_to_cmdctl(spec.get_module_name(),
spec.get_full_spec())
return ccsession.create_answer(0)
# We should make one general 'spec update for module' that
# passes both specification and commands at once
def _handle_module_stopping(self, arg):
"""Private function that handles a 'stopping' command;
The argument is of the form { 'module_name': <name> }.
If the module is known, it is removed from the known list,
and a message is sent to the Cmdctl channel to remove it as well.
If it is unknown, the message is ignored."""
if arg['module_name'] in self.module_specs:
del self.module_specs[arg['module_name']]
self._send_module_spec_to_cmdctl(arg['module_name'], None)
# This command is not expected to be answered
return None
def _send_module_spec_to_cmdctl(self, module_name, spec):
"""Sends the given module spec for the given module name to Cmdctl.
Parameters:
module_name: A string with the name of the module
spec: dict containing full module specification, as returned by
ModuleSpec.get_full_spec(). This argument may also be None,
in which case it signals Cmdctl to remove said module from
its list.
No response from Cmdctl is expected."""
spec_update = ccsession.create_command(ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE,
[ spec.get_module_name(), spec.get_full_spec() ])
[ module_name, spec ])
self.cc.group_sendmsg(spec_update, "Cmdctl")
return ccsession.create_answer(0)
def handle_msg(self, msg):
"""Handle a command from the cc channel to the configuration manager"""
......@@ -479,6 +500,8 @@ class ConfigManager:
answer = self._handle_get_config(arg)
elif cmd == ccsession.COMMAND_SET_CONFIG:
answer = self._handle_set_config(arg)
elif cmd == ccsession.COMMAND_MODULE_STOPPING:
answer = self._handle_module_stopping(arg)
elif cmd == ccsession.COMMAND_SHUTDOWN:
self.running = False
answer = ccsession.create_answer(0)
......
......@@ -313,6 +313,10 @@ class MultiConfigData:
self._current_config = {}
self._local_changes = {}
def clear_specifications(self):
"""Remove all known module specifications"""
self._specifications = {}
def set_specification(self, spec):
"""Add or update a ModuleSpec. Raises a ConfigDataError is spec is not a ModuleSpec"""
if type(spec) != isc.config.ModuleSpec:
......
......@@ -31,3 +31,9 @@ The configuration manager returned an error response when the module
requested its configuration. The full error message answer from the
configuration manager is appended to the log error.
% CONFIG_SESSION_STOPPING_FAILED error sending stopping message: %1
There was a problem when sending a message signaling that the module using
this CCSession is stopping. This message is sent so that the rest of the
system is aware that the module is no longer running. Apart from logging
this message, the error itself is ignored, and the ModuleCCSession is
still stopped. The specific exception message is printed.
......@@ -250,6 +250,18 @@ class TestModuleCCSession(unittest.TestCase):
self.assertEqual({'command': ['get_config', {'module_name': 'Spec2'}]},
fake_session.get_message('ConfigManager', None))
def test_stop(self):
fake_session = FakeModuleCCSession()
self.assertFalse("Spec1" in fake_session.subscriptions)
mccs = self.create_session("spec1.spec", None, None, fake_session)
self.assertTrue("Spec1" in fake_session.subscriptions)
self.assertEqual(len(fake_session.message_queue), 0)
mccs.stop()
self.assertEqual(len(fake_session.message_queue), 1)
self.assertEqual({'command': ['stopping', {'module_name': 'Spec1'}]},
fake_session.get_message('ConfigManager', None))
def test_get_socket(self):
fake_session = FakeModuleCCSession()
mccs = self.create_session("spec1.spec", None, None, fake_session)
......@@ -724,6 +736,38 @@ class TestUIModuleCCSession(unittest.TestCase):
fake_conn.set_get_answer('/config_data', { 'version': 123123 })
self.assertRaises(ModuleCCSessionError, UIModuleCCSession, fake_conn)
def test_request_specifications(self):
module_spec1 = isc.config.module_spec_from_file(
self.spec_file("spec1.spec"))
module_spec_dict1 = { "module_spec": module_spec1.get_full_spec() }
module_spec2 = isc.config.module_spec_from_file(
self.spec_file("spec2.spec"))
module_spec_dict2 = { "module_spec": module_spec2.get_full_spec() }
fake_conn = fakeUIConn()
# Set the first one in the answer
fake_conn.set_get_answer('/module_spec', module_spec_dict1)
fake_conn.set_get_answer('/config_data',
{ 'version': BIND10_CONFIG_DATA_VERSION })
uccs = UIModuleCCSession(fake_conn)
# We should now have the first one, but not the second.
self.assertTrue("Spec1" in uccs._specifications)
self.assertEqual(module_spec1.get_full_spec(),
uccs._specifications["Spec1"].get_full_spec())
self.assertFalse("Spec2" in uccs._specifications)
# Now set an answer where only the second one is present
fake_conn.set_get_answer('/module_spec', module_spec_dict2)
uccs.request_specifications()
# Now Spec1 should have been removed, and spec2 should be there
self.assertFalse("Spec1" in uccs._specifications)
self.assertTrue("Spec2" in uccs._specifications)
self.assertEqual(module_spec2.get_full_spec(),
uccs._specifications["Spec2"].get_full_spec())
def test_add_remove_value(self):
fake_conn = fakeUIConn()
uccs = self.create_uccs2(fake_conn)
......
......@@ -358,6 +358,29 @@ class TestConfigManager(unittest.TestCase):
# self.fake_session.get_message("Cmdctl", None))
#self.assertEqual({'commands_update': [ self.name, self.commands ] },
# self.fake_session.get_message("Cmdctl", None))
# drop the two messages for now
self.assertEqual(len(self.fake_session.message_queue), 2)
self.fake_session.get_message("Cmdctl", None)
self.fake_session.get_message("TestModule", None)
self.assertEqual(len(self.fake_session.message_queue), 0)
# A stopping message should get no response, but should cause another
# message to be sent, if it is a known module
self._handle_msg_helper({ "command": [ "stopping",
{ "module_name": "Spec2"}] },
None)
self.assertEqual(len(self.fake_session.message_queue), 1)
self.assertEqual({'command': [ 'module_specification_update',
['Spec2', None] ] },
self.fake_session.get_message("Cmdctl", None))
# but not if it is either unknown or not running
self._handle_msg_helper({ "command":
[ "stopping",
{ "module_name": "NoSuchModule" } ] },
None)
self.assertEqual(len(self.fake_session.message_queue), 0)
self._handle_msg_helper({ "command":
["shutdown"]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment