......@@ -35,7 +35,7 @@ def from_wire(data):
Raises an AttributeError if the given object has no decode()
method (which should return a string).
return json.loads(data.decode('utf8'))
return json.loads(data.decode('utf8'), strict=False)
if __name__ == "__main__":
import doctest
......@@ -93,6 +93,19 @@ class Session:
def recvmsg(self, nonblock = True, seq = None):
"""Reads a message. If nonblock is true, and there is no
message to read, it returns (None, None).
If seq is not None, it should be a value as returned by
group_sendmsg(), in which case only the response to
that message is returned, and others will be queued until
the next call to this method.
If seq is None, only messages that are *not* responses
will be returned, and responses will be queued.
The queue is checked for relevant messages before data
is read from the socket.
Raises a SessionError if there is a JSON decode problem in
the message that is read, or if the session has been closed
prior to the call of recvmsg()"""
with self._lock:
if len(self._queue) > 0:
i = 0;
......@@ -109,16 +122,22 @@ class Session:
if data and len(data) > 2:
header_length = struct.unpack('>H', data[0:2])[0]
data_length = len(data) - 2 - header_length
if data_length > 0:
env =[2:header_length+2])
msg =[header_length + 2:])
if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]):
return env, msg
if data_length > 0:
env =[2:header_length+2])
msg =[header_length + 2:])
if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]):
return env, msg
return self.recvmsg(nonblock, seq)
return self.recvmsg(nonblock, seq)
return[2:header_length+2]), None
return[2:header_length+2]), None
except ValueError as ve:
# TODO: when we have logging here, add a debug
# message printing the data that we were unable
# to parse as JSON
raise SessionError(ve)
return None, None
def _receive_bytes(self, size):
......@@ -31,6 +31,10 @@ class MessageTest(unittest.TestCase):
self.msg2_str = "{\"aaa\": [1, 1.1, true, false, null]}";
self.msg2_wire = self.msg2_str.encode()
self.msg3 = { "aaa": [ 1, 1.1, True, False, "string\n" ] }
self.msg3_str = "{\"aaa\": [1, 1.1, true, false, \"string\n\" ]}";
self.msg3_wire = self.msg3_str.encode()
def test_encode_json(self):
......@@ -40,6 +44,7 @@ class MessageTest(unittest.TestCase):
def test_decode_json(self):
self.assertRaises(AttributeError,, 1)
self.assertRaises(ValueError,, b'\x001')
......@@ -274,6 +274,16 @@ class testSession(unittest.TestCase):
self.assertEqual({"hello": "b"}, msg)
def test_recv_bad_msg(self):
sess = MySession()
sess._socket.addrecv({'to': 'someone' }, {'hello': 'b'})
sess._socket.addrecv({'to': 'someone', 'reply': 1}, {'hello': 'a'})
# mangle the bytes a bit
sess._socket.recvqueue[5] = sess._socket.recvqueue[5] - 2
sess._socket.recvqueue = sess._socket.recvqueue[:-2]
self.assertRaises(SessionError, sess.recvmsg, True, 1)
def test_next_sequence(self):
sess = MySession()
self.assertEqual(sess._sequence, 1)
......@@ -380,6 +380,9 @@ class ConfigManager:
answer, env =, seq)
answer = ccsession.create_answer(1, "Timeout waiting for answer from " + module_name)
except as se:
logger.error(CFGMGR_BAD_UPDATE_RESPONSE_FROM_MODULE, module_name, se)
answer = ccsession.create_answer(1, "Unable to parse response from " + module_name + ": " + str(se))
if answer:
rcode, val = ccsession.parse_answer(answer)
if rcode == 0:
......@@ -20,6 +20,13 @@ An older version of the configuration database has been found, from which
there was an automatic upgrade path to the current version. These changes
are now applied, and no action from the administrator is necessary.
% CFGMGR_BAD_UPDATE_RESPONSE_FROM_MODULE Unable to parse response from module %1: %2
The configuration manager sent a configuration update to a module, but
the module responded with an answer that could not be parsed. The answer
message appears to be invalid JSON data, or not decodable to a string.
This is likely to be a problem in the module in question. The update is
assumed to have failed, and will not be stored.
% CFGMGR_CC_SESSION_ERROR Error connecting to command channel: %1
The configuration manager daemon was unable to connect to the messaging
system. The most likely cause is that msgq is not running.
