From a3888f316e115a9dd39cf5013efcd9a6936a4a7f Mon Sep 17 00:00:00 2001 From: Jelte Jansen Date: Thu, 20 May 2010 10:34:26 +0000 Subject: [PATCH] merge branches/trac58 (message queuing on cc channel) see also http://bind10.isc.org/ticket/58 git-svn-id: svn://bind10.isc.org/svn/bind10/trunk@1870 e5f2f494-b856-4b98-b285-d166d9295462 --- src/bin/auth/main.cc | 3 + src/lib/cc/session.cc | 108 ++++++++++++-------- src/lib/cc/session.h | 14 ++- src/lib/config/ccsession.cc | 18 ++-- src/lib/config/ccsession.h | 10 ++ src/lib/config/tests/Makefile.am | 2 +- src/lib/config/tests/fake_session.cc | 18 +++- src/lib/config/tests/fake_session.h | 9 +- src/lib/python/isc/cc/session.py | 88 ++++++++-------- src/lib/python/isc/cc/tests/session_test.py | 17 +++ 10 files changed, 183 insertions(+), 104 deletions(-) diff --git a/src/bin/auth/main.cc b/src/bin/auth/main.cc index 1203a9cb4..ddb2a4493 100644 --- a/src/bin/auth/main.cc +++ b/src/bin/auth/main.cc @@ -664,6 +664,9 @@ run_server(const char* port, const bool use_ipv4, const bool use_ipv6, FD_SET(ss, &fds); ++nfds; + if (srv->configSession()->hasQueuedMsgs()) { + srv->configSession()->checkCommand(); + } int n = select(nfds, &fds, NULL, NULL, NULL); if (n < 0) { if (errno != EINTR) { diff --git a/src/lib/cc/session.cc b/src/lib/cc/session.cc index 68da5a83e..d049b3a59 100644 --- a/src/lib/cc/session.cc +++ b/src/lib/cc/session.cc @@ -54,7 +54,7 @@ namespace cc { class SessionImpl { public: - SessionImpl() : sequence_(-1) {} + SessionImpl() : sequence_(-1) { queue_ = Element::createFromString("[]"); } virtual ~SessionImpl() {} virtual void establish() = 0; virtual int getSocket() = 0; @@ -63,9 +63,10 @@ public: virtual size_t readDataLength() = 0; virtual void readData(void* data, size_t datalen) = 0; virtual void startRead(boost::function user_handler) = 0; - + int sequence_; // the next sequence number to use std::string lname_; + ElementPtr queue_; }; #ifdef HAVE_BOOST_SYSTEM @@ -352,35 +353,35 @@ Session::sendmsg(ElementPtr& env, ElementPtr& msg) { } bool -Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM) { - size_t length = impl_->readDataLength(); - - unsigned short header_length_net; - impl_->readData(&header_length_net, sizeof(header_length_net)); - - unsigned short header_length = ntohs(header_length_net); - if (header_length != length) { - isc_throw(SessionError, "Length parameters invalid: total=" << length - << ", header=" << header_length); - } - - std::vector buffer(length); - impl_->readData(&buffer[0], length); - - std::string wire = std::string(&buffer[0], length); - std::stringstream wire_stream; - wire_stream << wire; - - msg = Element::fromWire(wire_stream, length); - - return (true); - // XXXMLG handle non-block here, and return false for short reads +Session::recvmsg(ElementPtr& msg, bool nonblock, int seq) { + ElementPtr l_env; + return recvmsg(l_env, msg, nonblock, seq); } bool -Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM) { +Session::recvmsg(ElementPtr& env, ElementPtr& msg, + bool nonblock, int seq) { size_t length = impl_->readDataLength(); - + ElementPtr l_env, l_msg; + if (hasQueuedMsgs()) { + ElementPtr q_el; + for (int i = 0; i < impl_->queue_->size(); i++) { + q_el = impl_->queue_->get(i); + if (( seq == -1 && + !q_el->get(0)->contains("reply") + ) || ( + q_el->get(0)->contains("reply") && + q_el->get(0)->get("reply")->intValue() == seq + ) + ) { + env = q_el->get(0); + msg = q_el->get(1); + impl_->queue_->remove(i); + return true; + } + } + } + unsigned short header_length_net; impl_->readData(&header_length_net, sizeof(header_length_net)); @@ -400,13 +401,28 @@ Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM) { length - header_length); std::stringstream header_wire_stream; header_wire_stream << header_wire; - env = Element::fromWire(header_wire_stream, header_length); + l_env = Element::fromWire(header_wire_stream, header_length); std::stringstream body_wire_stream; body_wire_stream << body_wire; - msg = Element::fromWire(body_wire_stream, length - header_length); - - return (true); + l_msg = Element::fromWire(body_wire_stream, length - header_length); + if ((seq == -1 && + !l_env->contains("reply") + ) || ( + l_env->contains("reply") && + l_env->get("reply")->intValue() == seq + ) + ) { + env = l_env; + msg = l_msg; + return true; + } else { + ElementPtr q_el = Element::createFromString("[]"); + q_el->add(l_env); + q_el->add(l_msg); + impl_->queue_->add(q_el); + return recvmsg(env, msg, nonblock, seq); + } // XXXMLG handle non-block here, and return false for short reads } @@ -432,47 +448,55 @@ Session::unsubscribe(std::string group, std::string instance) { sendmsg(env); } -unsigned int +int Session::group_sendmsg(ElementPtr msg, std::string group, std::string instance, std::string to) { ElementPtr env = Element::create(std::map()); - + int nseq = ++impl_->sequence_; + env->set("type", Element::create("send")); env->set("from", Element::create(impl_->lname_)); env->set("to", Element::create(to)); env->set("group", Element::create(group)); env->set("instance", Element::create(instance)); - env->set("seq", Element::create(impl_->sequence_)); + env->set("seq", Element::create(nseq)); //env->set("msg", Element::create(msg->toWire())); sendmsg(env, msg); - - return (++impl_->sequence_); + return nseq; } bool Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg, - bool nonblock) + bool nonblock, int seq) { - return (recvmsg(envelope, msg, nonblock)); + return (recvmsg(envelope, msg, nonblock, seq)); } -unsigned int +int Session::reply(ElementPtr& envelope, ElementPtr& newmsg) { ElementPtr env = Element::create(std::map()); - + int nseq = ++impl_->sequence_; + env->set("type", Element::create("send")); env->set("from", Element::create(impl_->lname_)); env->set("to", Element::create(envelope->get("from")->stringValue())); env->set("group", Element::create(envelope->get("group")->stringValue())); env->set("instance", Element::create(envelope->get("instance")->stringValue())); - env->set("seq", Element::create(impl_->sequence_)); + env->set("seq", Element::create(nseq)); env->set("reply", Element::create(envelope->get("seq")->intValue())); sendmsg(env, newmsg); - return (++impl_->sequence_); + return nseq; } + +bool +Session::hasQueuedMsgs() +{ + return (impl_->queue_->size() > 0); +} + } } diff --git a/src/lib/cc/session.h b/src/lib/cc/session.h index 509cf35fe..1768e6a7a 100644 --- a/src/lib/cc/session.h +++ b/src/lib/cc/session.h @@ -65,23 +65,27 @@ namespace isc { void sendmsg(isc::data::ElementPtr& env, isc::data::ElementPtr& msg); bool recvmsg(isc::data::ElementPtr& msg, - bool nonblock = true); + bool nonblock = true, + int seq = -1); bool recvmsg(isc::data::ElementPtr& env, isc::data::ElementPtr& msg, - bool nonblock = true); + bool nonblock = true, + int seq = -1); void subscribe(std::string group, std::string instance = "*"); void unsubscribe(std::string group, std::string instance = "*"); - unsigned int group_sendmsg(isc::data::ElementPtr msg, + int group_sendmsg(isc::data::ElementPtr msg, std::string group, std::string instance = "*", std::string to = "*"); bool group_recvmsg(isc::data::ElementPtr& envelope, isc::data::ElementPtr& msg, - bool nonblock = true); - unsigned int reply(isc::data::ElementPtr& envelope, + bool nonblock = true, + int seq = -1); + int reply(isc::data::ElementPtr& envelope, isc::data::ElementPtr& newmsg); + bool hasQueuedMsgs(); }; } // namespace cc } // namespace isc diff --git a/src/lib/config/ccsession.cc b/src/lib/config/ccsession.cc index abe6ba42a..97e8e86dc 100644 --- a/src/lib/config/ccsession.cc +++ b/src/lib/config/ccsession.cc @@ -248,8 +248,8 @@ ModuleCCSession::init( //session_.subscribe("statistics", "*"); // send the data specification ElementPtr spec_msg = createCommand("module_spec", module_specification_.getFullSpec()); - session_.group_sendmsg(spec_msg, "ConfigManager"); - session_.group_recvmsg(env, answer, false); + unsigned int seq = session_.group_sendmsg(spec_msg, "ConfigManager"); + session_.group_recvmsg(env, answer, false, seq); int rcode; ElementPtr err = parseAnswer(rcode, answer); if (rcode != 0) { @@ -260,8 +260,8 @@ ModuleCCSession::init( // get any stored configuration from the manager if (config_handler_) { ElementPtr cmd = Element::createFromString("{ \"command\": [\"get_config\", {\"module_name\":\"" + module_name_ + "\"} ] }"); - session_.group_sendmsg(cmd, "ConfigManager"); - session_.group_recvmsg(env, answer, false); + seq = session_.group_sendmsg(cmd, "ConfigManager"); + session_.group_recvmsg(env, answer, false, seq); ElementPtr new_config = parseAnswer(rcode, answer); if (rcode == 0) { handleConfigUpdate(new_config); @@ -310,6 +310,12 @@ ModuleCCSession::getSocket() return (session_.getSocket()); } +bool +ModuleCCSession::hasQueuedMsgs() +{ + return (session_.hasQueuedMsgs()); +} + int ModuleCCSession::checkCommand() { @@ -365,8 +371,8 @@ ModuleCCSession::addRemoteConfig(const std::string& spec_file_name) ElementPtr env, answer; int rcode; - session_.group_sendmsg(cmd, "ConfigManager"); - session_.group_recvmsg(env, answer, false); + unsigned int seq = session_.group_sendmsg(cmd, "ConfigManager"); + session_.group_recvmsg(env, answer, false, seq); ElementPtr new_config = parseAnswer(rcode, answer); if (rcode == 0) { rmod_config.setLocalConfig(new_config); diff --git a/src/lib/config/ccsession.h b/src/lib/config/ccsession.h index 4030364d2..faa6ae928 100644 --- a/src/lib/config/ccsession.h +++ b/src/lib/config/ccsession.h @@ -149,6 +149,16 @@ public: */ int getSocket(); + /** + * Optional optimization for checkCommand loop; returns true + * if there are unhandled queued messages in the cc session. + * (if either this is true or there is data on the socket found + * by the select() call on getSocket(), run checkCommand()) + * + * @return true if there are unhandled queued messages + */ + bool hasQueuedMsgs(); + /** * Check if there is a command or config change on the command * session. If so, the appropriate handler is called if set. diff --git a/src/lib/config/tests/Makefile.am b/src/lib/config/tests/Makefile.am index 1cfda66e5..2c767478d 100644 --- a/src/lib/config/tests/Makefile.am +++ b/src/lib/config/tests/Makefile.am @@ -14,9 +14,9 @@ run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES) run_unittests_LDFLAGS = $(AM_LDFLAGS) $(GTEST_LDFLAGS) run_unittests_LDADD = $(GTEST_LDADD) run_unittests_LDADD += $(top_builddir)/src/lib/exceptions/libexceptions.la +run_unittests_LDADD += libfake_session.la run_unittests_LDADD += $(top_builddir)/src/lib/config/libcfgclient.la run_unittests_LDADD += $(top_builddir)/src/lib/cc/data.o -run_unittests_LDADD += libfake_session.la if HAVE_BOOST_SYSTEM run_unittests_LDFLAGS += $(AM_LDFLAGS) $(BOOST_LDFLAGS) diff --git a/src/lib/config/tests/fake_session.cc b/src/lib/config/tests/fake_session.cc index 2a83bb18c..65eed3b40 100644 --- a/src/lib/config/tests/fake_session.cc +++ b/src/lib/config/tests/fake_session.cc @@ -153,6 +153,11 @@ Session::Session(io_service& io_service UNUSED_PARAM) Session::~Session() { } +bool +Session::connect() { + return true; +} + void Session::disconnect() { } @@ -188,7 +193,7 @@ Session::sendmsg(ElementPtr& env, ElementPtr& msg) { } bool -Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM) { +Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM, int seq UNUSED_PARAM) { //cout << "[XX] client asks for message " << endl; if (initial_messages && initial_messages->getType() == Element::list && @@ -202,7 +207,7 @@ Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM) { } bool -Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM) { +Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM, int seq UNUSED_PARAM) { //cout << "[XX] client asks for message and env" << endl; env = ElementPtr(); if (initial_messages && @@ -269,9 +274,9 @@ Session::group_sendmsg(ElementPtr msg, std::string group, bool Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg, - bool nonblock) + bool nonblock, int seq) { - return (recvmsg(envelope, msg, nonblock)); + return (recvmsg(envelope, msg, nonblock, seq)); } unsigned int @@ -282,5 +287,10 @@ Session::reply(ElementPtr& envelope, ElementPtr& newmsg) { return 1; } +bool +Session::hasQueuedMsgs() { + return false; +} + } } diff --git a/src/lib/config/tests/fake_session.h b/src/lib/config/tests/fake_session.h index 18ee92ef0..7c0d94a37 100644 --- a/src/lib/config/tests/fake_session.h +++ b/src/lib/config/tests/fake_session.h @@ -74,15 +74,16 @@ namespace isc { void startRead(boost::function read_callback); void establish(); + bool connect(); void disconnect(); void sendmsg(isc::data::ElementPtr& msg); void sendmsg(isc::data::ElementPtr& env, isc::data::ElementPtr& msg); bool recvmsg(isc::data::ElementPtr& msg, - bool nonblock = true); + bool nonblock = true, int seq = -1); bool recvmsg(isc::data::ElementPtr& env, isc::data::ElementPtr& msg, - bool nonblock = true); + bool nonblock = true, int seq = -1); void subscribe(std::string group, std::string instance = "*"); void unsubscribe(std::string group, @@ -93,9 +94,11 @@ namespace isc { std::string to = "*"); bool group_recvmsg(isc::data::ElementPtr& envelope, isc::data::ElementPtr& msg, - bool nonblock = true); + bool nonblock = true, + int seq = -1); unsigned int reply(isc::data::ElementPtr& envelope, isc::data::ElementPtr& newmsg); + bool hasQueuedMsgs(); }; } // namespace cc diff --git a/src/lib/python/isc/cc/session.py b/src/lib/python/isc/cc/session.py index 119a38456..56f303048 100644 --- a/src/lib/python/isc/cc/session.py +++ b/src/lib/python/isc/cc/session.py @@ -17,6 +17,7 @@ import sys import socket import struct import os +import threading import isc.cc.message @@ -33,6 +34,7 @@ class Session: self._sequence = 1 self._closed = False self._queue = [] + self._lock = threading.RLock() if port == 0: if 'ISC_MSGQ_PORT' in os.environ: @@ -64,56 +66,54 @@ class Session: self._closed = True def sendmsg(self, env, msg = None): - XXmsg = msg - XXenv = env - if self._closed: - raise SessionError("Session has been closed.") - if type(env) == dict: - env = isc.cc.message.to_wire(env) - if type(msg) == dict: - msg = isc.cc.message.to_wire(msg) - self._socket.setblocking(1) - length = 2 + len(env); - if msg: - length += len(msg) - self._socket.send(struct.pack("!I", length)) - self._socket.send(struct.pack("!H", len(env))) - self._socket.send(env) - if msg: - self._socket.send(msg) + with self._lock: + if self._closed: + raise SessionError("Session has been closed.") + if type(env) == dict: + env = isc.cc.message.to_wire(env) + if type(msg) == dict: + msg = isc.cc.message.to_wire(msg) + self._socket.setblocking(1) + length = 2 + len(env); + if msg: + length += len(msg) + self._socket.send(struct.pack("!I", length)) + self._socket.send(struct.pack("!H", len(env))) + self._socket.send(env) + if msg: + self._socket.send(msg) def recvmsg(self, nonblock = True, seq = None): - #print("[XX] queue len: " + str(len(self._queue))) - if len(self._queue) > 0: - if seq == None: - #print("[XX] return first") - return self._queue.pop(0) - else: + with self._lock: + if len(self._queue) > 0: i = 0; - #print("[XX] check rest") for env, msg in self._queue: - if "reply" in env and seq == env["reply"]: + if seq != None and "reply" in env and seq == env["reply"]: + return self._queue.pop(i) + elif seq == None and "reply" not in env: return self._queue.pop(i) else: i = i + 1 - #print("[XX] not found") - if self._closed: - raise SessionError("Session has been closed.") - data = self._receive_full_buffer(nonblock) - if data and len(data) > 2: - header_length = struct.unpack('>H', data[0:2])[0] - data_length = len(data) - 2 - header_length - if data_length > 0: - env = isc.cc.message.from_wire(data[2:header_length+2]) - msg = isc.cc.message.from_wire(data[header_length + 2:]) - if seq == None or "reply" in env and seq == env["reply"]: - return env, msg + if self._closed: + raise SessionError("Session has been closed.") + data = self._receive_full_buffer(nonblock) + if data and len(data) > 2: + header_length = struct.unpack('>H', data[0:2])[0] + data_length = len(data) - 2 - header_length + if data_length > 0: + env = isc.cc.message.from_wire(data[2:header_length+2]) + msg = isc.cc.message.from_wire(data[header_length + 2:]) + if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]): + return env, msg + else: + tmp = None + if "reply" in env: + tmp = env["reply"] + self._queue.append((env,msg)) + return self.recvmsg(nonblock, seq) else: - self._queue.append((env,msg)) - return self.recvmsg(nonblock, seq) - else: - return isc.cc.message.from_wire(data[2:header_length+2]), None - return None, None + return isc.cc.message.from_wire(data[2:header_length+2]), None + return None, None def _receive_full_buffer(self, nonblock): if nonblock: @@ -130,7 +130,6 @@ class Session: return None if data == "": # server closed connection raise ProtocolError("Read of 0 bytes: connection closed") - self._recvbuffer += data if len(self._recvbuffer) < 4: return None @@ -182,6 +181,9 @@ class Session: }, isc.cc.message.to_wire(msg)) return seq + def has_queued_msgs(self): + return len(self._queue) > 0 + def group_recvmsg(self, nonblock = True, seq = None): env, msg = self.recvmsg(nonblock, seq) if env == None: diff --git a/src/lib/python/isc/cc/tests/session_test.py b/src/lib/python/isc/cc/tests/session_test.py index 413d1d391..5dae626bb 100644 --- a/src/lib/python/isc/cc/tests/session_test.py +++ b/src/lib/python/isc/cc/tests/session_test.py @@ -179,65 +179,82 @@ class testSession(unittest.TestCase): #print("sending message {'to': 'someone', 'reply': 1}, {'hello': 'a'}") # simply get the message without asking for a specific sequence number reply + self.assertFalse(sess.has_queued_msgs()) sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a') env, msg = sess.recvmsg(False) self.assertEqual({'to': 'someone', 'reply': 1}, env) self.assertEqual({"hello": "a"}, msg) + self.assertFalse(sess.has_queued_msgs()) # simply get the message, asking for a specific sequence number reply + self.assertFalse(sess.has_queued_msgs()) sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a') env, msg = sess.recvmsg(False, 1) self.assertEqual({'to': 'someone', 'reply': 1}, env) self.assertEqual({"hello": "a"}, msg) + self.assertFalse(sess.has_queued_msgs()) # ask for a differe sequence number reply (that doesn't exist) # then ask for the one that is there + self.assertFalse(sess.has_queued_msgs()) sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a') env, msg = sess.recvmsg(False, 2) self.assertEqual(None, env) self.assertEqual(None, msg) + self.assertTrue(sess.has_queued_msgs()) env, msg = sess.recvmsg(False, 1) self.assertEqual({'to': 'someone', 'reply': 1}, env) self.assertEqual({"hello": "a"}, msg) + self.assertFalse(sess.has_queued_msgs()) # ask for a differe sequence number reply (that doesn't exist) # then ask for any message + self.assertFalse(sess.has_queued_msgs()) sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a') env, msg = sess.recvmsg(False, 2) self.assertEqual(None, env) self.assertEqual(None, msg) + self.assertTrue(sess.has_queued_msgs()) env, msg = sess.recvmsg(False) self.assertEqual({'to': 'someone', 'reply': 1}, env) self.assertEqual({"hello": "a"}, msg) + self.assertFalse(sess.has_queued_msgs()) #print("sending message {'to': 'someone', 'reply': 1}, {'hello': 'a'}") # ask for a differe sequence number reply (that doesn't exist) # send a new message, ask for any message (get the first) # then ask for any message (get the second) + self.assertFalse(sess.has_queued_msgs()) sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a') env, msg = sess.recvmsg(False, 2) self.assertEqual(None, env) self.assertEqual(None, msg) + self.assertTrue(sess.has_queued_msgs()) sess._socket.addrecv(b'\x00\x00\x00\x1f\x00\x10Skan\x02to(\x07someoneSkan\x05hello(\x01b') env, msg = sess.recvmsg(False) self.assertEqual({'to': 'someone', 'reply': 1}, env) self.assertEqual({"hello": "a"}, msg) + self.assertFalse(sess.has_queued_msgs()) env, msg = sess.recvmsg(False) self.assertEqual({'to': 'someone'}, env) self.assertEqual({"hello": "b"}, msg) + self.assertFalse(sess.has_queued_msgs()) # send a message, then one with specific reply value # ask for that specific message (get the second) # then ask for any message (get the first) + self.assertFalse(sess.has_queued_msgs()) sess._socket.addrecv(b'\x00\x00\x00\x1f\x00\x10Skan\x02to(\x07someoneSkan\x05hello(\x01b') sess._socket.addrecv(b'\x00\x00\x00(\x00\x19Skan\x02to(\x07someone\x05reply&\x011Skan\x05hello(\x01a') env, msg = sess.recvmsg(False, 1) self.assertEqual({'to': 'someone', 'reply': 1}, env) self.assertEqual({"hello": "a"}, msg) + self.assertTrue(sess.has_queued_msgs()) env, msg = sess.recvmsg(False) self.assertEqual({'to': 'someone'}, env) self.assertEqual({"hello": "b"}, msg) + self.assertFalse(sess.has_queued_msgs()) def test_next_sequence(self): sess = MySession() -- GitLab