Commit 04f3eb1b authored by Jelte Jansen's avatar Jelte Jansen
Browse files

cfgmgr handle non-existent db file

update copyright statements
added tests for data element helper functions
improved and cleaned up data element helper functions


git-svn-id: svn://bind10.isc.org/svn/bind10/branches/jelte-configuration@773 e5f2f494-b856-4b98-b285-d166d9295462
parent 877a52f1
# data, data_definition, config_data, module_config_data and ui_config_data classes
# we might want to split these up :)
# Copyright (C) 2010 Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
# Helper functions for data elements as used in cc-channel and
# configuration. There is no python equivalent for the cpp Element
# class, since data elements are represented by native python types
# (int, real, bool, string, list and dict respectively)
#
import ast
class DataNotFoundError(Exception): pass
......@@ -9,6 +29,8 @@ def merge(orig, new):
"""Merges the contents of new into orig, think recursive update()
orig and new must both be dicts. If an element value is None in
new it will be removed in orig."""
if type(orig) != dict or type(new) != dict:
raise DataTypeError("Not a dict in merge()")
for kn in new.keys():
if kn in orig:
if new[kn]:
......@@ -23,6 +45,10 @@ def merge(orig, new):
def find(element, identifier):
"""Returns the subelement in the given data element, raises DataNotFoundError if not found"""
if type(identifier) != str or (type(element) != dict and identifier != ""):
raise DataTypeError("identifier in merge() is not a string")
if type(identifier) != str or (type(element) != dict and identifier != ""):
raise DataTypeError("element in merge() is not a dict")
id_parts = identifier.split("/")
id_parts[:] = (value for value in id_parts if value != "")
cur_el = element
......@@ -34,6 +60,17 @@ def find(element, identifier):
return cur_el
def set(element, identifier, value):
"""Sets the value at the element specified by identifier to value.
If the value is None, it is removed from the dict. If element
is not a dict, or if the identifier points to something that is
not, a DataTypeError is raised. The element is updated inline,
so if the original needs to be kept, you must make a copy before
calling set(). The updated base element is returned (so that
el.set().set().set() is possible)"""
if type(element) != dict:
raise DataTypeError("element in set() is not a dict")
if type(identifier) != str:
raise DataTypeError("identifier in set() is not a string")
id_parts = identifier.split("/")
id_parts[:] = (value for value in id_parts if value != "")
cur_el = element
......@@ -41,36 +78,46 @@ def set(element, identifier, value):
if id in cur_el.keys():
cur_el = cur_el[id]
else:
cur_el[id] = {}
cur_el = cur_el[id]
cur_el[id_parts[-1]] = value
if value:
cur_el[id] = {}
cur_el = cur_el[id]
else:
# set to none, and parent el not found, return
return element
if value:
cur_el[id_parts[-1]] = value
else:
del cur_el[id_parts[-1]]
return element
def unset(element, identifier):
id_parts = identifier.split("/")
id_parts[:] = (value for value in id_parts if value != "")
cur_el = element
for id in id_parts[:-1]:
if id in cur_el.keys():
cur_el = cur_el[id]
else:
cur_el[id] = {}
cur_el = cur_el[id]
cur_el[id_parts[-1]] = None
return element
"""Removes the element at the given identifier if it exists. Raises
a DataTypeError if element is not a dict or if identifier is not
a string. Returns the base element."""
# perhaps we can simply do with set none, and remove this whole
# function
return set(element, identifier, None)
def find_no_exc(element, identifier):
"""Returns the subelement in the given data element, returns None if not found"""
"""Returns the subelement in the given data element, returns None
if not found, or if an error occurred (i.e. this function should
never raise an exception)"""
if type(identifier) != str:
return None
id_parts = identifier.split("/")
id_parts[:] = (value for value in id_parts if value != "")
cur_el = element
for id in id_parts:
if type(cur_el) == dict and id in cur_el.keys():
if (type(cur_el) == dict and id in cur_el.keys()) or id=="":
cur_el = cur_el[id]
else:
return None
return cur_el
#
# hmm, these are more relevant for datadefition
# should we (re)move them?
#
def find_spec(element, identifier):
"""find the data definition for the given identifier
returns either a map with 'item_name' etc, or a list of those"""
......@@ -147,6 +194,11 @@ def spec_name_list(spec, prefix="", recurse=False):
return result
def parse_value_str(value_str):
"""Parses the given string to a native python object. If the
string cannot be parsed, it is returned. If it is not a string,
None is returned"""
if type(value_str) != str:
return None
try:
return ast.literal_eval(value_str)
except ValueError as ve:
......
# Copyright (C) 2010 Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
# Tests for the functions in data.py
#
import unittest
import os
import data
class TestData(unittest.TestCase):
def test_merge(self):
d1 = { 'a': 'a', 'b': 1, 'c': { 'd': 'd', 'e': 2 } }
d2 = { 'a': None, 'c': { 'd': None, 'e': 3, 'f': [ 1 ] } }
d12 = { 'b': 1, 'c': { 'e': 3, 'f': [ 1 ] } }
m12 = d1
data.merge(m12, d2)
self.assertEqual(d12, m12)
self.assertRaises(data.DataTypeError, data.merge, d1, "a")
self.assertRaises(data.DataTypeError, data.merge, 1, d2)
self.assertRaises(data.DataTypeError, data.merge, None, None)
def test_find(self):
d1 = { 'a': 'a', 'b': 1, 'c': { 'd': 'd', 'e': 2, 'more': { 'data': 'here' } } }
self.assertEqual(data.find(d1, ''), d1)
self.assertEqual(data.find(d1, 'a'), 'a')
self.assertEqual(data.find(d1, 'c/e'), 2)
self.assertEqual(data.find(d1, 'c/more/'), { 'data': 'here' })
self.assertEqual(data.find(d1, 'c/more/data'), 'here')
self.assertRaises(data.DataNotFoundError, data.find, d1, 'c/f')
self.assertRaises(data.DataNotFoundError, data.find, d1, 'f')
self.assertRaises(data.DataTypeError, data.find, d1, 1)
self.assertRaises(data.DataTypeError, data.find, None, 1)
self.assertRaises(data.DataTypeError, data.find, "123", "123")
self.assertEqual(data.find("123", ""), "123")
def test_set(self):
d1 = { 'a': 'a', 'b': 1, 'c': { 'd': 'd', 'e': 2 } }
d12 = { 'b': 1, 'c': { 'e': 3, 'f': [ 1 ] } }
data.set(d1, 'a', None)
data.set(d1, 'c/d', None)
data.set(d1, 'c/e/', 3)
data.set(d1, 'c/f', [ 1 ] )
self.assertEqual(d1, d12)
self.assertRaises(data.DataTypeError, data.set, d1, 1, 2)
self.assertRaises(data.DataTypeError, data.set, 1, "", 2)
d3 = {}
e3 = data.set(d3, "does/not/exist", 123)
self.assertEqual(d3,
{ 'does': { 'not': { 'exist': 123 } } })
self.assertEqual(e3,
{ 'does': { 'not': { 'exist': 123 } } })
def test_unset(self):
d1 = { 'a': 'a', 'b': 1, 'c': { 'd': 'd', 'e': 2 } }
data.unset(d1, 'a')
data.unset(d1, 'c/d')
data.unset(d1, 'does/not/exist')
self.assertEqual(d1, { 'b': 1, 'c': { 'e': 2 } })
def test_find_no_exc(self):
d1 = { 'a': 'a', 'b': 1, 'c': { 'd': 'd', 'e': 2, 'more': { 'data': 'here' } } }
self.assertEqual(data.find_no_exc(d1, ''), d1)
self.assertEqual(data.find_no_exc(d1, 'a'), 'a')
self.assertEqual(data.find_no_exc(d1, 'c/e'), 2)
self.assertEqual(data.find_no_exc(d1, 'c/more/'), { 'data': 'here' })
self.assertEqual(data.find_no_exc(d1, 'c/more/data'), 'here')
self.assertEqual(data.find_no_exc(d1, 'c/f'), None)
self.assertEqual(data.find_no_exc(d1, 'f'), None)
self.assertEqual(data.find_no_exc(d1, 1), None)
self.assertEqual(data.find_no_exc(d1, 'more/data/here'), None)
self.assertEqual(data.find_no_exc(None, 1), None)
self.assertEqual(data.find_no_exc("123", ""), "123")
self.assertEqual(data.find_no_exc("123", ""), "123")
def test_parse_value_str(self):
self.assertEqual(data.parse_value_str("1"), 1)
self.assertEqual(data.parse_value_str("True"), True)
self.assertEqual(data.parse_value_str("None"), None)
self.assertEqual(data.parse_value_str("1.1"), 1.1)
self.assertEqual(data.parse_value_str("[]"), [])
self.assertEqual(data.parse_value_str("[ 1, None, 'asdf' ]"), [ 1, None, "asdf" ])
self.assertEqual(data.parse_value_str("{}"), {})
self.assertEqual(data.parse_value_str("{ 'a': 'b', 'c': 1 }"), { 'a': 'b', 'c': 1 })
self.assertEqual(data.parse_value_str("[ a c"), "[ a c")
if __name__ == '__main__':
#if not 'CONFIG_TESTDATA_PATH' in os.environ:
# print("You need to set the environment variable CONFIG_TESTDATA_PATH to point to the directory containing the test data files")
# exit(1)
unittest.main()
# Copyright (C) 2010 Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
# This is the main class for the b10-cfgmgr daemon
#
import isc
import signal
import ast
......@@ -122,7 +141,11 @@ class ConfigManager:
def read_config(self):
"""Read the current configuration from the b10-config.db file
at the path specificied at init()"""
self.config = ConfigManagerData.read_from_file(self.data_path)
try:
self.config = ConfigManagerData.read_from_file(self.data_path)
except ConfigManagerDataEmpty:
# ok, just start with an empty config
self.config = ConfigManagerData(self.data_path)
def write_config(self):
"""Write the current configuration to the b10-config.db file
......
# Copyright (C) 2009 Internet Systems Consortium.
# Copyright (C) 2010 Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment