Commit 877a52f1 authored by Jelte Jansen's avatar Jelte Jansen
Browse files

bit of refactoring in cfgmgr.py

added tests for ConfigManager class
also added in these tests a fake command session class, which we might want to generalize for other tests



git-svn-id: svn://bind10.isc.org/svn/bind10/branches/jelte-configuration@748 e5f2f494-b856-4b98-b285-d166d9295462
parent 15c1f8bc
......@@ -80,137 +80,167 @@ class ConfigManagerData:
return self.data == other.data
class ConfigManager:
def __init__(self, data_path):
"""Creates a configuration manager. The data_path is the path
to the directory containing the b10-config.db file.
If session is set, this will be used as the communication
channel session. If not, a new session will be created.
The ability to specify a custom session is for testing purposes
and should not be needed for normal usage."""
def __init__(self, data_path, session = None):
self.commands = {}
self.data_definitions = {}
self.data_path = data_path
self.config = ConfigManagerData(data_path)
self.cc = isc.cc.Session()
if session:
self.cc = session
else:
self.cc = isc.cc.Session()
self.cc.group_subscribe("ConfigManager")
self.cc.group_subscribe("Boss", "ConfigManager")
self.running = False
def notify_boss(self):
"""Notifies the Boss module that the Config Manager is running"""
self.cc.group_sendmsg({"running": "configmanager"}, "Boss")
def set_config(self, module_name, data_specification):
"""Set the data specification for the given module"""
self.data_definitions[module_name] = data_specification
def remove_config(self, module_name):
"""Remove the data specification for the given module"""
self.data_definitions[module_name]
def set_commands(self, module_name, commands):
"""Set the command list for the given module"""
self.commands[module_name] = commands
def remove_commands(self, module_name):
"""Remove the command list for the given module"""
del self.commands[module_name]
def read_config(self):
print("Reading config")
"""Read the current configuration from the b10-config.db file
at the path specificied at init()"""
self.config = ConfigManagerData.read_from_file(self.data_path)
def write_config(self):
print("Writing config")
"""Write the current configuration to the b10-config.db file
at the path specificied at init()"""
self.config.write_to_file()
def _handle_get_data_spec(self, cmd):
answer = {}
if len(cmd) > 1:
if type(cmd[1]) == dict:
if 'module_name' in cmd[1] and cmd[1]['module_name'] != '':
module_name = cmd[1]['module_name']
try:
answer["result"] = [0, self.data_definitions[module_name]]
except KeyError as ke:
answer["result"] = [1, "No specification for module " + module_name]
else:
answer["result"] = [1, "Bad module_name in get_data_spec command"]
else:
answer["result"] = [1, "Bad get_data_spec command, argument not a dict"]
else:
answer["result"] = [0, self.data_definitions]
return answer
def _handle_get_config(self, cmd):
answer = {}
if len(cmd) > 1:
if type(cmd[1]) == dict:
if 'module_name' in cmd[1] and cmd[1]['module_name'] != '':
module_name = cmd[1]['module_name']
try:
answer["result"] = [0, data.find(self.config.data, module_name) ]
except data.DataNotFoundError as dnfe:
# no data is ok, that means we have nothing that
# deviates from default values
answer["result"] = [0, {} ]
else:
answer["result"] = [1, "Bad module_name in get_config command"]
else:
answer["result"] = [1, "Bad get_config command, argument not a dict"]
else:
answer["result"] = [0, self.config.data]
return answer
def _handle_set_config(self, cmd):
answer = {}
if len(cmd) == 3:
# todo: use api (and check the data against the definition?)
module_name = cmd[1]
conf_part = data.find_no_exc(self.config.data, module_name)
if conf_part:
data.merge(conf_part, cmd[2])
self.cc.group_sendmsg({ "config_update": conf_part }, module_name)
else:
conf_part = data.set(self.config.data, module_name, {})
data.merge(conf_part[module_name], cmd[2])
# send out changed info
self.cc.group_sendmsg({ "config_update": conf_part[module_name] }, module_name)
self.write_config()
answer["result"] = [ 0 ]
elif len(cmd) == 2:
# todo: use api (and check the data against the definition?)
data.merge(self.config.data, cmd[1])
# send out changed info
for module in self.config.data:
if module != "version":
self.cc.group_sendmsg({ "config_update": self.config.data[module] }, module)
self.write_config()
answer["result"] = [ 0 ]
else:
answer["result"] = [ 1, "Wrong number of arguments" ]
return answer
def _handle_data_specification(self, spec):
# todo: validate? (no direct access to spec as
# todo: use DataDefinition class
# todo: error checking (like keyerrors)
answer = {}
if "config_data" in spec:
self.set_config(spec["module_name"], spec["config_data"])
self.cc.group_sendmsg({ "specification_update": [ spec["module_name"], spec["config_data"] ] }, "Cmd-Ctrld")
if "commands" in spec:
self.set_commands(spec["module_name"], spec["commands"])
self.cc.group_sendmsg({ "commands_update": [ spec["module_name"], spec["commands"] ] }, "Cmd-Ctrld")
answer["result"] = [ 0 ]
return answer
def handle_msg(self, msg):
"""return answer message"""
"""Handle a direct command"""
answer = {}
if "command" in msg:
cmd = msg["command"]
try:
if cmd[0] == "get_commands":
print("[XX] bind-cfgd got get_commands");
print("[XX] sending:")
print(self.commands)
answer["result"] = [ 0, self.commands ]
elif cmd[0] == "get_data_spec":
if len(cmd) > 1 and cmd[1]['module_name'] != '':
module_name = cmd[1]['module_name']
try:
answer["result"] = [0, self.data_definitions[module_name]]
except KeyError as ke:
answer["result"] = [1, "No specification for module " + module_name]
else:
answer["result"] = [0, self.data_definitions]
answer = self._handle_get_data_spec(cmd)
elif cmd[0] == "get_config":
# we may not have any configuration here
conf_part = None
print("[XX] bind-cfgd got command:")
print(cmd)
if len(cmd) > 1:
try:
conf_part = data.find(self.config.data, cmd[1]['module_name'])
except data.DataNotFoundError as dnfe:
pass
else:
conf_part = self.config.data
if conf_part:
answer["result"] = [ 0, conf_part ]
else:
answer["result"] = [ 0 ]
answer = self._handle_get_config(cmd)
elif cmd[0] == "set_config":
if len(cmd) == 3:
# todo: use api (and check types?)
if cmd[1] != "":
conf_part = data.find_no_exc(self.config.data, cmd[1])
if not conf_part:
conf_part = data.set(self.config.data, cmd[1], "")
else:
conf_part = self.config.data
data.merge(conf_part, cmd[2])
print("[XX bind-cfgd] new config (part):")
print(conf_part)
# send out changed info
self.cc.group_sendmsg({ "config_update": conf_part }, cmd[1])
self.write_config()
answer["result"] = [ 0 ]
elif len(cmd) == 2:
print("[XX bind-cfgd] old config:")
print(self.config.data)
print("[XX bind-cfgd] updating with:")
print(cmd[1])
# TODO: ask module to check the config first...
data.merge(self.config.data, cmd[1])
print("[XX bind-cfgd] new config:")
print(self.config.data)
# send out changed info
for module in self.config.data:
self.cc.group_sendmsg({ "config_update": self.config.data[module] }, module)
self.write_config()
answer["result"] = [ 0 ]
else:
answer["result"] = [ 1, "Wrong number of arguments" ]
answer = self._handle_set_config(cmd)
elif cmd[0] == "shutdown":
print("[bind-cfgd] Received shutdown command")
self.running = False
answer["result"] = [ 0 ]
else:
print("[bind-cfgd] unknown command: " + str(cmd))
answer["result"] = [ 1, "Unknown command: " + str(cmd) ]
except IndexError as ie:
print("[bind-cfgd] missing argument")
answer["result"] = [ 1, "Missing argument in command: " + str(ie) ]
raise ie
elif "data_specification" in msg:
# todo: validate? (no direct access to spec as
# todo: use DataDefinition class?
print("[XX] bind-cfgd got specification:")
print(msg["data_specification"])
spec = msg["data_specification"]
if "config_data" in spec:
self.set_config(spec["module_name"], spec["config_data"])
self.cc.group_sendmsg({ "specification_update": [ spec["module_name"], spec["config_data"] ] }, "Cmd-Ctrld")
if "commands" in spec:
self.set_commands(spec["module_name"], spec["commands"])
self.cc.group_sendmsg({ "commands_update": [ spec["module_name"], spec["commands"] ] }, "Cmd-Ctrld")
answer["result"] = [ 0 ]
answer = self._handle_data_specification(msg["data_specification"])
elif 'result' in msg:
# this seems wrong, might start pingpong
answer['result'] = [0]
else:
print("[bind-cfgd] unknown message: " + str(msg))
answer["result"] = [ 1, "Unknown module: " + str(msg) ]
answer["result"] = [ 1, "Unknown message format: " + str(msg) ]
return answer
def run(self):
......
......@@ -59,26 +59,140 @@ class TestConfigManagerData(unittest.TestCase):
new_config = ConfigManagerData(self.data_path, output_file_name)
self.assertEqual(self.config_manager_data, new_config)
class TestConfigManager:
#
# We can probably use a more general version of this
#
class FakeCCSession:
def __init__(self):
self.subscriptions = {}
# each entry is of the form [ channel, instance, message ]
self.message_queue = []
def group_subscribe(self, group_name, instance_name = None):
if not group_name in self.subscriptions:
self.subscriptions[group_name] = []
if instance_name:
self.subscriptions[group_name].append(instance_name)
def has_subscription(self, group_name, instance_name = None):
if group_name in self.subscriptions:
if instance_name:
return instance_name in self.subscriptions[group_name]
else:
return True
else:
return False
def group_sendmsg(self, msg, channel, target = None):
self.message_queue.append([ channel, target, msg ])
def get_message(self, channel, target = None):
for qm in self.message_queue:
if qm[0] == channel and qm[1] == target:
self.message_queue.remove(qm)
return qm[2]
return None
class TestConfigManager(unittest.TestCase):
def setUp(self):
pass
self.data_path = os.environ['CONFIG_TESTDATA_PATH']
self.fake_session = FakeCCSession()
self.cm = ConfigManager(self.data_path, self.fake_session)
self.name = "TestModule"
self.spec = { "asdf" }
self.commands = { "bbbb" }
def test_init(self):
pass
self.assert_(self.cm.commands == {})
self.assert_(self.cm.data_definitions == {})
self.assert_(self.cm.data_path == self.data_path)
self.assert_(self.cm.config != None)
self.assert_(self.fake_session.has_subscription("ConfigManager"))
self.assert_(self.fake_session.has_subscription("Boss", "ConfigManager"))
self.assertFalse(self.cm.running)
def test_notify_boss(self):
self.cm.notify_boss()
msg = self.fake_session.get_message("Boss", None)
self.assert_(msg)
# this one is actually wrong, but 'current status quo'
self.assertEqual(msg, {"running": "configmanager"})
def test_set_config(self):
pass
self.cm.set_config(self.name, self.spec)
self.assertEqual(self.cm.data_definitions[self.name], self.spec)
def test_remove_config(self):
pass
self.assertRaises(KeyError, self.cm.remove_config, self.name)
self.cm.set_config(self.name, self.spec)
self.cm.remove_config(self.name)
def test_set_commands(self):
self.cm.set_commands(self.name, self.commands)
self.assertEqual(self.cm.commands[self.name], self.commands)
def test_write_config(self):
pass
self.assertRaises(KeyError, self.cm.remove_commands, self.name)
self.cm.set_commands(self.name, self.commands)
self.cm.remove_commands(self.name)
def _handle_msg_helper(self, msg, expected_answer):
answer = self.cm.handle_msg(msg)
#self.assertEquals(answer, expected_answer)
self.assertEqual(expected_answer, answer)
def test_handle_msg(self):
pass
self._handle_msg_helper({}, { 'result': [ 1, 'Unknown message format: {}']})
self._handle_msg_helper("", { 'result': [ 1, 'Unknown message format: ']})
self._handle_msg_helper({ "command": [ "badcommand" ] }, { 'result': [ 1, "Unknown command: ['badcommand']"]})
self._handle_msg_helper({ "command": [ "get_commands" ] }, { 'result': [ 0, {} ]})
self._handle_msg_helper({ "command": [ "get_data_spec" ] }, { 'result': [ 0, {} ]})
self._handle_msg_helper({ "command": [ "get_data_spec", { "module_name": "nosuchmodule" } ] },
{'result': [1, 'No specification for module nosuchmodule']})
self._handle_msg_helper({ "command": [ "get_data_spec", 1 ] },
{'result': [1, 'Bad get_data_spec command, argument not a dict']})
self._handle_msg_helper({ "command": [ "get_data_spec", { } ] },
{'result': [1, 'Bad module_name in get_data_spec command']})
self._handle_msg_helper({ "command": [ "get_config" ] }, { 'result': [ 0, { 'version': 1} ]})
self._handle_msg_helper({ "command": [ "get_config", { "module_name": "nosuchmodule" } ] },
{'result': [0, {}]})
self._handle_msg_helper({ "command": [ "get_config", 1 ] },
{'result': [1, 'Bad get_config command, argument not a dict']})
self._handle_msg_helper({ "command": [ "get_config", { } ] },
{'result': [1, 'Bad module_name in get_config command']})
self._handle_msg_helper({ "command": [ "set_config" ] },
{'result': [1, 'Wrong number of arguments']})
self._handle_msg_helper({ "command": [ "set_config", {} ] },
{'result': [0]})
self.assertEqual(len(self.fake_session.message_queue), 0)
self._handle_msg_helper({ "command": [ "set_config", self.name, { "test": 123 } ] },
{'result': [0]})
self.assertEqual(len(self.fake_session.message_queue), 1)
self.assertEqual({'config_update': {'test': 123}},
self.fake_session.get_message(self.name, None))
self._handle_msg_helper({ "command": [ "set_config", self.name, { "test": 124 } ] },
{'result': [0]})
#print(self.fake_session.message_queue)
self.assertEqual(len(self.fake_session.message_queue), 1)
self.assertEqual({'config_update': {'test': 124}},
self.fake_session.get_message(self.name, None))
self.assertEqual({'version': 1, 'TestModule': {'test': 124}}, self.cm.config.data)
self._handle_msg_helper({ "data_specification":
{ "module_name": self.name, "config_data": self.spec, "commands": self.commands }
},
{'result': [0]})
self.assertEqual(len(self.fake_session.message_queue), 2)
# the name here is actually wrong (and hardcoded), but needed in the current version
# TODO: fix that
self.assertEqual({'specification_update': [ self.name, self.spec ] },
self.fake_session.get_message("Cmd-Ctrld", None))
self.assertEqual({'commands_update': [ self.name, self.commands ] },
self.fake_session.get_message("Cmd-Ctrld", None))
def test_run(self):
pass
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment