Commit a3888f31 authored by Jelte Jansen's avatar Jelte Jansen

merge branches/trac58

(message queuing on cc channel)
see also http://bind10.isc.org/ticket/58


git-svn-id: svn://bind10.isc.org/svn/bind10/trunk@1870 e5f2f494-b856-4b98-b285-d166d9295462
parent e6e1366f
...@@ -664,6 +664,9 @@ run_server(const char* port, const bool use_ipv4, const bool use_ipv6, ...@@ -664,6 +664,9 @@ run_server(const char* port, const bool use_ipv4, const bool use_ipv6,
FD_SET(ss, &fds); FD_SET(ss, &fds);
++nfds; ++nfds;
if (srv->configSession()->hasQueuedMsgs()) {
srv->configSession()->checkCommand();
}
int n = select(nfds, &fds, NULL, NULL, NULL); int n = select(nfds, &fds, NULL, NULL, NULL);
if (n < 0) { if (n < 0) {
if (errno != EINTR) { if (errno != EINTR) {
......
...@@ -54,7 +54,7 @@ namespace cc { ...@@ -54,7 +54,7 @@ namespace cc {
class SessionImpl { class SessionImpl {
public: public:
SessionImpl() : sequence_(-1) {} SessionImpl() : sequence_(-1) { queue_ = Element::createFromString("[]"); }
virtual ~SessionImpl() {} virtual ~SessionImpl() {}
virtual void establish() = 0; virtual void establish() = 0;
virtual int getSocket() = 0; virtual int getSocket() = 0;
...@@ -63,9 +63,10 @@ public: ...@@ -63,9 +63,10 @@ public:
virtual size_t readDataLength() = 0; virtual size_t readDataLength() = 0;
virtual void readData(void* data, size_t datalen) = 0; virtual void readData(void* data, size_t datalen) = 0;
virtual void startRead(boost::function<void()> user_handler) = 0; virtual void startRead(boost::function<void()> user_handler) = 0;
int sequence_; // the next sequence number to use int sequence_; // the next sequence number to use
std::string lname_; std::string lname_;
ElementPtr queue_;
}; };
#ifdef HAVE_BOOST_SYSTEM #ifdef HAVE_BOOST_SYSTEM
...@@ -352,35 +353,35 @@ Session::sendmsg(ElementPtr& env, ElementPtr& msg) { ...@@ -352,35 +353,35 @@ Session::sendmsg(ElementPtr& env, ElementPtr& msg) {
} }
bool bool
Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM) { Session::recvmsg(ElementPtr& msg, bool nonblock, int seq) {
size_t length = impl_->readDataLength(); ElementPtr l_env;
return recvmsg(l_env, msg, nonblock, seq);
unsigned short header_length_net;
impl_->readData(&header_length_net, sizeof(header_length_net));
unsigned short header_length = ntohs(header_length_net);
if (header_length != length) {
isc_throw(SessionError, "Length parameters invalid: total=" << length
<< ", header=" << header_length);
}
std::vector<char> buffer(length);
impl_->readData(&buffer[0], length);
std::string wire = std::string(&buffer[0], length);
std::stringstream wire_stream;
wire_stream << wire;
msg = Element::fromWire(wire_stream, length);
return (true);
// XXXMLG handle non-block here, and return false for short reads
} }
bool bool
Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM) { Session::recvmsg(ElementPtr& env, ElementPtr& msg,
bool nonblock, int seq) {
size_t length = impl_->readDataLength(); size_t length = impl_->readDataLength();
ElementPtr l_env, l_msg;
if (hasQueuedMsgs()) {
ElementPtr q_el;
for (int i = 0; i < impl_->queue_->size(); i++) {
q_el = impl_->queue_->get(i);
if (( seq == -1 &&
!q_el->get(0)->contains("reply")
) || (
q_el->get(0)->contains("reply") &&
q_el->get(0)->get("reply")->intValue() == seq
)
) {
env = q_el->get(0);
msg = q_el->get(1);
impl_->queue_->remove(i);
return true;
}
}
}
unsigned short header_length_net; unsigned short header_length_net;
impl_->readData(&header_length_net, sizeof(header_length_net)); impl_->readData(&header_length_net, sizeof(header_length_net));
...@@ -400,13 +401,28 @@ Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM) { ...@@ -400,13 +401,28 @@ Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM) {
length - header_length); length - header_length);
std::stringstream header_wire_stream; std::stringstream header_wire_stream;
header_wire_stream << header_wire; header_wire_stream << header_wire;
env = Element::fromWire(header_wire_stream, header_length); l_env = Element::fromWire(header_wire_stream, header_length);
std::stringstream body_wire_stream; std::stringstream body_wire_stream;
body_wire_stream << body_wire; body_wire_stream << body_wire;
msg = Element::fromWire(body_wire_stream, length - header_length); l_msg = Element::fromWire(body_wire_stream, length - header_length);
if ((seq == -1 &&
return (true); !l_env->contains("reply")
) || (
l_env->contains("reply") &&
l_env->get("reply")->intValue() == seq
)
) {
env = l_env;
msg = l_msg;
return true;
} else {
ElementPtr q_el = Element::createFromString("[]");
q_el->add(l_env);
q_el->add(l_msg);
impl_->queue_->add(q_el);
return recvmsg(env, msg, nonblock, seq);
}
// XXXMLG handle non-block here, and return false for short reads // XXXMLG handle non-block here, and return false for short reads
} }
...@@ -432,47 +448,55 @@ Session::unsubscribe(std::string group, std::string instance) { ...@@ -432,47 +448,55 @@ Session::unsubscribe(std::string group, std::string instance) {
sendmsg(env); sendmsg(env);
} }
unsigned int int
Session::group_sendmsg(ElementPtr msg, std::string group, Session::group_sendmsg(ElementPtr msg, std::string group,
std::string instance, std::string to) std::string instance, std::string to)
{ {
ElementPtr env = Element::create(std::map<std::string, ElementPtr>()); ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
int nseq = ++impl_->sequence_;
env->set("type", Element::create("send")); env->set("type", Element::create("send"));
env->set("from", Element::create(impl_->lname_)); env->set("from", Element::create(impl_->lname_));
env->set("to", Element::create(to)); env->set("to", Element::create(to));
env->set("group", Element::create(group)); env->set("group", Element::create(group));
env->set("instance", Element::create(instance)); env->set("instance", Element::create(instance));
env->set("seq", Element::create(impl_->sequence_)); env->set("seq", Element::create(nseq));
//env->set("msg", Element::create(msg->toWire())); //env->set("msg", Element::create(msg->toWire()));
sendmsg(env, msg); sendmsg(env, msg);
return nseq;
return (++impl_->sequence_);
} }
bool bool
Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg, Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg,
bool nonblock) bool nonblock, int seq)
{ {
return (recvmsg(envelope, msg, nonblock)); return (recvmsg(envelope, msg, nonblock, seq));
} }
unsigned int int
Session::reply(ElementPtr& envelope, ElementPtr& newmsg) { Session::reply(ElementPtr& envelope, ElementPtr& newmsg) {
ElementPtr env = Element::create(std::map<std::string, ElementPtr>()); ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
int nseq = ++impl_->sequence_;
env->set("type", Element::create("send")); env->set("type", Element::create("send"));
env->set("from", Element::create(impl_->lname_)); env->set("from", Element::create(impl_->lname_));
env->set("to", Element::create(envelope->get("from")->stringValue())); env->set("to", Element::create(envelope->get("from")->stringValue()));
env->set("group", Element::create(envelope->get("group")->stringValue())); env->set("group", Element::create(envelope->get("group")->stringValue()));
env->set("instance", Element::create(envelope->get("instance")->stringValue())); env->set("instance", Element::create(envelope->get("instance")->stringValue()));
env->set("seq", Element::create(impl_->sequence_)); env->set("seq", Element::create(nseq));
env->set("reply", Element::create(envelope->get("seq")->intValue())); env->set("reply", Element::create(envelope->get("seq")->intValue()));
sendmsg(env, newmsg); sendmsg(env, newmsg);
return (++impl_->sequence_); return nseq;
} }
bool
Session::hasQueuedMsgs()
{
return (impl_->queue_->size() > 0);
}
} }
} }
...@@ -65,23 +65,27 @@ namespace isc { ...@@ -65,23 +65,27 @@ namespace isc {
void sendmsg(isc::data::ElementPtr& env, void sendmsg(isc::data::ElementPtr& env,
isc::data::ElementPtr& msg); isc::data::ElementPtr& msg);
bool recvmsg(isc::data::ElementPtr& msg, bool recvmsg(isc::data::ElementPtr& msg,
bool nonblock = true); bool nonblock = true,
int seq = -1);
bool recvmsg(isc::data::ElementPtr& env, bool recvmsg(isc::data::ElementPtr& env,
isc::data::ElementPtr& msg, isc::data::ElementPtr& msg,
bool nonblock = true); bool nonblock = true,
int seq = -1);
void subscribe(std::string group, void subscribe(std::string group,
std::string instance = "*"); std::string instance = "*");
void unsubscribe(std::string group, void unsubscribe(std::string group,
std::string instance = "*"); std::string instance = "*");
unsigned int group_sendmsg(isc::data::ElementPtr msg, int group_sendmsg(isc::data::ElementPtr msg,
std::string group, std::string group,
std::string instance = "*", std::string instance = "*",
std::string to = "*"); std::string to = "*");
bool group_recvmsg(isc::data::ElementPtr& envelope, bool group_recvmsg(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& msg, isc::data::ElementPtr& msg,
bool nonblock = true); bool nonblock = true,
unsigned int reply(isc::data::ElementPtr& envelope, int seq = -1);
int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg); isc::data::ElementPtr& newmsg);
bool hasQueuedMsgs();
}; };
} // namespace cc } // namespace cc
} // namespace isc } // namespace isc
......
...@@ -248,8 +248,8 @@ ModuleCCSession::init( ...@@ -248,8 +248,8 @@ ModuleCCSession::init(
//session_.subscribe("statistics", "*"); //session_.subscribe("statistics", "*");
// send the data specification // send the data specification
ElementPtr spec_msg = createCommand("module_spec", module_specification_.getFullSpec()); ElementPtr spec_msg = createCommand("module_spec", module_specification_.getFullSpec());
session_.group_sendmsg(spec_msg, "ConfigManager"); unsigned int seq = session_.group_sendmsg(spec_msg, "ConfigManager");
session_.group_recvmsg(env, answer, false); session_.group_recvmsg(env, answer, false, seq);
int rcode; int rcode;
ElementPtr err = parseAnswer(rcode, answer); ElementPtr err = parseAnswer(rcode, answer);
if (rcode != 0) { if (rcode != 0) {
...@@ -260,8 +260,8 @@ ModuleCCSession::init( ...@@ -260,8 +260,8 @@ ModuleCCSession::init(
// get any stored configuration from the manager // get any stored configuration from the manager
if (config_handler_) { if (config_handler_) {
ElementPtr cmd = Element::createFromString("{ \"command\": [\"get_config\", {\"module_name\":\"" + module_name_ + "\"} ] }"); ElementPtr cmd = Element::createFromString("{ \"command\": [\"get_config\", {\"module_name\":\"" + module_name_ + "\"} ] }");
session_.group_sendmsg(cmd, "ConfigManager"); seq = session_.group_sendmsg(cmd, "ConfigManager");
session_.group_recvmsg(env, answer, false); session_.group_recvmsg(env, answer, false, seq);
ElementPtr new_config = parseAnswer(rcode, answer); ElementPtr new_config = parseAnswer(rcode, answer);
if (rcode == 0) { if (rcode == 0) {
handleConfigUpdate(new_config); handleConfigUpdate(new_config);
...@@ -310,6 +310,12 @@ ModuleCCSession::getSocket() ...@@ -310,6 +310,12 @@ ModuleCCSession::getSocket()
return (session_.getSocket()); return (session_.getSocket());
} }
bool
ModuleCCSession::hasQueuedMsgs()
{
return (session_.hasQueuedMsgs());
}
int int
ModuleCCSession::checkCommand() ModuleCCSession::checkCommand()
{ {
...@@ -365,8 +371,8 @@ ModuleCCSession::addRemoteConfig(const std::string& spec_file_name) ...@@ -365,8 +371,8 @@ ModuleCCSession::addRemoteConfig(const std::string& spec_file_name)
ElementPtr env, answer; ElementPtr env, answer;
int rcode; int rcode;
session_.group_sendmsg(cmd, "ConfigManager"); unsigned int seq = session_.group_sendmsg(cmd, "ConfigManager");
session_.group_recvmsg(env, answer, false); session_.group_recvmsg(env, answer, false, seq);
ElementPtr new_config = parseAnswer(rcode, answer); ElementPtr new_config = parseAnswer(rcode, answer);
if (rcode == 0) { if (rcode == 0) {
rmod_config.setLocalConfig(new_config); rmod_config.setLocalConfig(new_config);
......
...@@ -149,6 +149,16 @@ public: ...@@ -149,6 +149,16 @@ public:
*/ */
int getSocket(); int getSocket();
/**
* Optional optimization for checkCommand loop; returns true
* if there are unhandled queued messages in the cc session.
* (if either this is true or there is data on the socket found
* by the select() call on getSocket(), run checkCommand())
*
* @return true if there are unhandled queued messages
*/
bool hasQueuedMsgs();
/** /**
* Check if there is a command or config change on the command * Check if there is a command or config change on the command
* session. If so, the appropriate handler is called if set. * session. If so, the appropriate handler is called if set.
......
...@@ -14,9 +14,9 @@ run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES) ...@@ -14,9 +14,9 @@ run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES)
run_unittests_LDFLAGS = $(AM_LDFLAGS) $(GTEST_LDFLAGS) run_unittests_LDFLAGS = $(AM_LDFLAGS) $(GTEST_LDFLAGS)
run_unittests_LDADD = $(GTEST_LDADD) run_unittests_LDADD = $(GTEST_LDADD)
run_unittests_LDADD += $(top_builddir)/src/lib/exceptions/libexceptions.la run_unittests_LDADD += $(top_builddir)/src/lib/exceptions/libexceptions.la
run_unittests_LDADD += libfake_session.la
run_unittests_LDADD += $(top_builddir)/src/lib/config/libcfgclient.la run_unittests_LDADD += $(top_builddir)/src/lib/config/libcfgclient.la
run_unittests_LDADD += $(top_builddir)/src/lib/cc/data.o run_unittests_LDADD += $(top_builddir)/src/lib/cc/data.o
run_unittests_LDADD += libfake_session.la
if HAVE_BOOST_SYSTEM if HAVE_BOOST_SYSTEM
run_unittests_LDFLAGS += $(AM_LDFLAGS) $(BOOST_LDFLAGS) run_unittests_LDFLAGS += $(AM_LDFLAGS) $(BOOST_LDFLAGS)
......
...@@ -153,6 +153,11 @@ Session::Session(io_service& io_service UNUSED_PARAM) ...@@ -153,6 +153,11 @@ Session::Session(io_service& io_service UNUSED_PARAM)
Session::~Session() { Session::~Session() {
} }
bool
Session::connect() {
return true;
}
void void
Session::disconnect() { Session::disconnect() {
} }
...@@ -188,7 +193,7 @@ Session::sendmsg(ElementPtr& env, ElementPtr& msg) { ...@@ -188,7 +193,7 @@ Session::sendmsg(ElementPtr& env, ElementPtr& msg) {
} }
bool bool
Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM) { Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM, int seq UNUSED_PARAM) {
//cout << "[XX] client asks for message " << endl; //cout << "[XX] client asks for message " << endl;
if (initial_messages && if (initial_messages &&
initial_messages->getType() == Element::list && initial_messages->getType() == Element::list &&
...@@ -202,7 +207,7 @@ Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM) { ...@@ -202,7 +207,7 @@ Session::recvmsg(ElementPtr& msg, bool nonblock UNUSED_PARAM) {
} }
bool bool
Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM) { Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock UNUSED_PARAM, int seq UNUSED_PARAM) {
//cout << "[XX] client asks for message and env" << endl; //cout << "[XX] client asks for message and env" << endl;
env = ElementPtr(); env = ElementPtr();
if (initial_messages && if (initial_messages &&
...@@ -269,9 +274,9 @@ Session::group_sendmsg(ElementPtr msg, std::string group, ...@@ -269,9 +274,9 @@ Session::group_sendmsg(ElementPtr msg, std::string group,
bool bool
Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg, Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg,
bool nonblock) bool nonblock, int seq)
{ {
return (recvmsg(envelope, msg, nonblock)); return (recvmsg(envelope, msg, nonblock, seq));
} }
unsigned int unsigned int
...@@ -282,5 +287,10 @@ Session::reply(ElementPtr& envelope, ElementPtr& newmsg) { ...@@ -282,5 +287,10 @@ Session::reply(ElementPtr& envelope, ElementPtr& newmsg) {
return 1; return 1;
} }
bool
Session::hasQueuedMsgs() {
return false;
}
} }
} }
...@@ -74,15 +74,16 @@ namespace isc { ...@@ -74,15 +74,16 @@ namespace isc {
void startRead(boost::function<void()> read_callback); void startRead(boost::function<void()> read_callback);
void establish(); void establish();
bool connect();
void disconnect(); void disconnect();
void sendmsg(isc::data::ElementPtr& msg); void sendmsg(isc::data::ElementPtr& msg);
void sendmsg(isc::data::ElementPtr& env, void sendmsg(isc::data::ElementPtr& env,
isc::data::ElementPtr& msg); isc::data::ElementPtr& msg);
bool recvmsg(isc::data::ElementPtr& msg, bool recvmsg(isc::data::ElementPtr& msg,
bool nonblock = true); bool nonblock = true, int seq = -1);
bool recvmsg(isc::data::ElementPtr& env, bool recvmsg(isc::data::ElementPtr& env,
isc::data::ElementPtr& msg, isc::data::ElementPtr& msg,
bool nonblock = true); bool nonblock = true, int seq = -1);
void subscribe(std::string group, void subscribe(std::string group,
std::string instance = "*"); std::string instance = "*");
void unsubscribe(std::string group, void unsubscribe(std::string group,
...@@ -93,9 +94,11 @@ namespace isc { ...@@ -93,9 +94,11 @@ namespace isc {
std::string to = "*"); std::string to = "*");
bool group_recvmsg(isc::data::ElementPtr& envelope, bool group_recvmsg(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& msg, isc::data::ElementPtr& msg,
bool nonblock = true); bool nonblock = true,
int seq = -1);
unsigned int reply(isc::data::ElementPtr& envelope, unsigned int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg); isc::data::ElementPtr& newmsg);
bool hasQueuedMsgs();
}; };
} // namespace cc } // namespace cc
......
...@@ -17,6 +17,7 @@ import sys ...@@ -17,6 +17,7 @@ import sys
import socket import socket
import struct import struct
import os import os
import threading
import isc.cc.message import isc.cc.message
...@@ -33,6 +34,7 @@ class Session: ...@@ -33,6 +34,7 @@ class Session:
self._sequence = 1 self._sequence = 1
self._closed = False self._closed = False
self._queue = [] self._queue = []
self._lock = threading.RLock()
if port == 0: if port == 0:
if 'ISC_MSGQ_PORT' in os.environ: if 'ISC_MSGQ_PORT' in os.environ:
...@@ -64,56 +66,54 @@ class Session: ...@@ -64,56 +66,54 @@ class Session:
self._closed = True self._closed = True
def sendmsg(self, env, msg = None): def sendmsg(self, env, msg = None):
XXmsg = msg with self._lock:
XXenv = env if self._closed:
if self._closed: raise SessionError("Session has been closed.")
raise SessionError("Session has been closed.") if type(env) == dict:
if type(env) == dict: env = isc.cc.message.to_wire(env)
env = isc.cc.message.to_wire(env) if type(msg) == dict:
if type(msg) == dict: msg = isc.cc.message.to_wire(msg)
msg = isc.cc.message.to_wire(msg) self._socket.setblocking(1)
self._socket.setblocking(1) length = 2 + len(env);
length = 2 + len(env); if msg:
if msg: length += len(msg)
length += len(msg) self._socket.send(struct.pack("!I", length))
self._socket.send(struct.pack("!I", length)) self._socket.send(struct.pack("!H", len(env)))
self._socket.send(struct.pack("!H", len(env))) self._socket.send(env)
self._socket.send(env) if msg:
if msg: self._socket.send(msg)
self._socket.send(msg)
def recvmsg(self, nonblock = True, seq = None): def recvmsg(self, nonblock = True, seq = None):
#print("[XX] queue len: " + str(len(self._queue))) with self._lock:
if len(self._queue) > 0: if len(self._queue) > 0:
if seq == None:
#print("[XX] return first")
return self._queue.pop(0)
else:
i = 0; i = 0;
#print("[XX] check rest")
for env, msg in self._queue: for env, msg in self._queue:
if "reply" in env and seq == env["reply"]: if seq != None and "reply" in env and seq == env["reply"]: