Unverified Commit 0e0d28e6 authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

Merge #1914

This is the asynchronous read on ModuleCCSession.
parents 42d8e0ec 9915ecdc
......@@ -601,6 +601,11 @@ ModuleCCSession::checkCommand() {
ConstElementPtr cmd, routing, data;
if (session_.group_recvmsg(routing, data, true)) {
// In case the message is wanted asynchronously, it gets used.
if (checkAsyncRecv(routing, data)) {
return (0);
}
/* ignore result messages (in case we're out of sync, to prevent
* pingpongs */
if (data->getType() != Element::map || data->contains("result")) {
......@@ -764,5 +769,95 @@ ModuleCCSession::sendStopping() {
session_.group_sendmsg(cmd, "ConfigManager");
}
class ModuleCCSession::AsyncRecvRequest {
public: // Everything is public here, as the definition is hidden anyway
AsyncRecvRequest(const AsyncRecvCallback& cb, const string& rcp, int sq,
bool reply) :
callback(cb),
recipient(rcp),
seq(sq),
is_reply(reply)
{}
const AsyncRecvCallback callback;
const string recipient;
const int seq;
const bool is_reply;
};
ModuleCCSession::AsyncRecvRequestID
ModuleCCSession::groupRecvMsgAsync(const AsyncRecvCallback& callback,
bool is_reply, int seq,
const string& recipient) {
// This just stores the request, the handling is done in checkCommand()
// push_back would be simpler, but it does not return the iterator we need
return (async_recv_requests_.insert(async_recv_requests_.end(),
AsyncRecvRequest(callback, recipient,
seq, is_reply)));
}
bool
ModuleCCSession::checkAsyncRecv(const ConstElementPtr& envelope,
const ConstElementPtr& msg)
{
for (AsyncRecvRequestID request(async_recv_requests_.begin());
request != async_recv_requests_.end(); ++request) {
// Just go through all the requests and look for a matching one
if (requestMatch(*request, envelope)) {
// We want the request to be still alive at the time we
// call the callback. But we need to remove it on an exception
// too, so we use the class. If just C++ had the finally keyword.
class RequestDeleter {
public:
RequestDeleter(AsyncRecvRequests& requests,
AsyncRecvRequestID& request) :
requests_(requests),
request_(request)
{ }
~RequestDeleter() {
requests_.erase(request_);
}
private:
AsyncRecvRequests& requests_;
AsyncRecvRequestID& request_;
};
RequestDeleter deleter(async_recv_requests_, request);
// Call the callback
request->callback(envelope, msg, request);
return (true);
}
}
return (false);
}
bool
ModuleCCSession::requestMatch(const AsyncRecvRequest& request,
const ConstElementPtr& envelope) const
{
if (request.is_reply != envelope->contains("reply")) {
// Wrong type of message
return (false);
}
if (request.is_reply &&
(request.seq == -1 ||
request.seq == envelope->get("reply")->intValue())) {
// This is the correct reply
return (true);
}
if (!request.is_reply &&
(request.recipient.empty() ||
request.recipient == envelope->get("group")->stringValue())) {
// This is the correct command
return (true);
}
// If nothing from the above, we don't want it
return (false);
}
void
ModuleCCSession::cancelAsyncRecv(const AsyncRecvRequestID& id) {
async_recv_requests_.erase(id);
}
}
}
......@@ -15,13 +15,16 @@
#ifndef __CCSESSION_H
#define __CCSESSION_H 1
#include <string>
#include <config/config_data.h>
#include <config/module_spec.h>
#include <cc/session.h>
#include <cc/data.h>
#include <string>
#include <list>
#include <boost/function.hpp>
namespace isc {
namespace config {
......@@ -358,15 +361,140 @@ public:
return (session_.group_recvmsg(envelope, msg, nonblock, seq));
};
/// \brief Forward declaration of internal data structure.
///
/// This holds information about one asynchronous request to receive
/// a message. It is declared as public to allow declaring other derived
/// types, but without showing the internal representation.
class AsyncRecvRequest;
/// \brief List of all requests for asynchronous reads.
typedef std::list<AsyncRecvRequest> AsyncRecvRequests;
/// \brief Identifier of single request for asynchronous read.
typedef AsyncRecvRequests::iterator AsyncRecvRequestID;
/// \brief Callback which is called when an asynchronous receive finishes.
///
/// This is the callback used by groupRecvMsgAsync() function. It is called
/// when a matching message arrives. It receives following parameters when
/// called:
/// - The envelope of the message
/// - The message itself
/// - The ID of the request, as returned by corresponding groupRecvMsgAsync
/// call.
///
/// It is possible to throw exceptions from the callback, but they will not
/// be caught and they will get propagated out through the checkCommand()
/// call. This, if not handled on higher level, will likely terminate the
/// application. However, the ModuleCCSession internals will be in
/// well-defined state after the call (both the callback and the message
/// will be removed from the queues as already called).
typedef boost::function3<void, const isc::data::ConstElementPtr&,
const isc::data::ConstElementPtr&,
const AsyncRecvRequestID&>
AsyncRecvCallback;
/// \brief Receive a message from the CC session asynchronously.
///
/// This registers a callback which is called when a matching message
/// is received. This message returns immediately.
///
/// Once a matching message arrives, the callback is called with the
/// envelope of the message, the message itself and the result of this
/// function call (which might be useful for identifying which of many
/// events the recipient is waiting for this is). This makes the callback
/// used and is not called again even if a message that would match
/// arrives later (this is a single-shot callback).
///
/// The callback is never called from within this function. Even if there
/// are queued messages, the callback would be called once checkCommand()
/// is invoked (possibly from start() or the constructor).
///
/// The matching is as follows. If is_reply is true, only replies are
/// considered. In that case, if seq is -1, any reply is accepted. If
/// it is something else than -1, only the reply with matching seq is
/// taken. This may be used to receive replies to commands
/// asynchronously.
///
/// In case the is_reply is false, the function looks for command messages.
/// The seq parameter is ignored, but the recipient one is considered. If
/// it is an empty string, any command is taken. If it is non-empty, only
/// commands addressed to the recipient channel (eg. group - instance is
/// ignored for now) are taken. This can be used to receive foreign commands
/// or notifications. In such case, it might be desirable to call the
/// groupRecvMsgAsync again from within the callback, to receive any future
/// commands or events of the same type.
///
/// The interaction with other receiving functions is slightly complicated.
/// The groupRecvMsg call takes precedence. If the message matches its
/// parameters, it steals the message and no callback matching it as well
/// is called. Then, all the queued asynchronous receives are considered,
/// with the oldest active ones taking precedence (they work as FIFO).
/// If none of them matches, generic command and config handling takes
/// place. If it is not handled by that, the message is dropped. However,
/// it is better if there's just one place that wants to receive each given
/// message.
///
/// \exception std::bad_alloc if there isn't enough memory to store the
/// callback.
/// \param callback is the function to be called when a matching message
/// arrives.
/// \param is_reply specifies if the desired message should be a reply or
/// a command.
/// \param seq specifies the reply sequence number in case a reply is
/// desired. The default -1 means any reply is OK.
/// \param recipient is the CC channel to which the command should be
/// addressed to match (in case is_reply is false). Empty means any
/// command is good one.
/// \return An identifier of the request. This will be passed to the
/// callback or can be used to cancel the request by cancelAsyncRecv.
/// \todo Decide what to do with instance and what was it meant for anyway.
AsyncRecvRequestID groupRecvMsgAsync(const AsyncRecvCallback& callback,
bool is_reply, int seq = -1,
const std::string& recipient =
std::string());
/// \brief Removes yet unused request for asynchronous receive.
///
/// This function cancels a request previously queued by
/// groupRecvMsgAsync(). You may use it only before the callback was
/// already triggered. If you call it with an ID of callback that
/// already happened or was already canceled, the behaviour is undefined
/// (but something like a crash is very likely, as the function removes
/// an item from a list and this would be removing it from a list that
/// does not contain the item).
///
/// It is important to cancel requests that are no longer going to happen
/// for some reason, as the request would occupy memory forever.
///
/// \param id The id of request as returned by groupRecvMsgAsync.
void cancelAsyncRecv(const AsyncRecvRequestID& id);
private:
ModuleSpec readModuleSpecification(const std::string& filename);
void startCheck();
void sendStopping();
/// \brief Check if the message is wanted by asynchronous read
///
/// It checks if any of the previously queued requests match
/// the message. If so, the callback is dispatched and removed.
///
/// \param envelope The envelope of the message.
/// \param msg The actual message data.
/// \return True if the message was used for a callback, false
/// otherwise.
bool checkAsyncRecv(const data::ConstElementPtr& envelope,
const data::ConstElementPtr& msg);
/// \brief Checks if a message with this envelope matches the request
bool requestMatch(const AsyncRecvRequest& request,
const data::ConstElementPtr& envelope) const;
bool started_;
std::string module_name_;
isc::cc::AbstractSession& session_;
ModuleSpec module_specification_;
AsyncRecvRequests async_recv_requests_;
isc::data::ConstElementPtr handleConfigUpdate(
isc::data::ConstElementPtr new_config);
......
......@@ -27,11 +27,13 @@
#include <log/logger_name.h>
#include <boost/scoped_ptr.hpp>
#include <boost/bind.hpp>
using namespace isc::data;
using namespace isc::config;
using namespace isc::cc;
using namespace std;
using namespace boost;
namespace {
std::string
......@@ -497,10 +499,10 @@ TEST_F(CCSessionTest, remoteConfig) {
const size_t qsize(session.getMsgQueue()->size());
EXPECT_TRUE(session.getMsgQueue()->get(qsize - 2)->equals(*el(
"[ \"ConfigManager\", \"*\", { \"command\": ["
"\"get_module_spec\", { \"module_name\": \"Spec2\" } ] } ]")));
"\"get_module_spec\", { \"module_name\": \"Spec2\" } ] }, -1 ]")));
EXPECT_TRUE(session.getMsgQueue()->get(qsize - 1)->equals(*el(
"[ \"ConfigManager\", \"*\", { \"command\": [ \"get_config\","
"{ \"module_name\": \"Spec2\" } ] } ]")));
"{ \"module_name\": \"Spec2\" } ] }, -1 ]")));
EXPECT_EQ("Spec2", module_name);
// Since we returned an empty local config above, the default value
// for "item1", which is 1, should be used.
......@@ -709,13 +711,286 @@ TEST_F(CCSessionTest, doubleStartWithAddRemoteConfig) {
FakeSession::DoubleRead);
}
namespace {
/// \brief Test fixture for asynchronous receiving of messages.
///
/// This is an extension to the CCSessionTest. It would be possible to add
/// the functionality to the CCSessionTest, but it is going to be used
/// only by few tests and is non-trivial, so it is placed to a separate
/// sub-class.
class AsyncReceiveCCSessionTest : public CCSessionTest {
protected:
AsyncReceiveCCSessionTest() :
mccs_(ccspecfile("spec29.spec"), session, NULL, NULL, false, false),
msg_(el("{\"result\": [0]}")),
next_flag_(0)
{
// This is just to make sure the messages get through the fake
// session.
session.subscribe("test group");
session.subscribe("other group");
session.subscribe("<ignored>");
// Get rid of all unrelated stray messages
while (session.getMsgQueue()->size() > 0) {
session.getMsgQueue()->remove(0);
}
}
/// \brief Convenience function to queue a request to get a command
/// message.
ModuleCCSession::AsyncRecvRequestID
registerCommand(const string& recipient)
{
return (mccs_.groupRecvMsgAsync(
bind(&AsyncReceiveCCSessionTest::callback, this, next_flag_ ++, _1,
_2, _3), false, -1, recipient));
}
/// \brief Convenience function to queue a request to get a reply
/// message.
ModuleCCSession::AsyncRecvRequestID
registerReply(int seq)
{
return (mccs_.groupRecvMsgAsync(
bind(&AsyncReceiveCCSessionTest::callback, this, next_flag_ ++, _1,
_2, _3), true, seq));
}
/// \brief Check the next called callback was with this flag
void called(int flag) {
ASSERT_FALSE(called_.empty());
EXPECT_EQ(flag, *called_.begin());
called_.pop_front();
}
/// \brief Checks that no more callbacks were called.
void nothingCalled() {
EXPECT_TRUE(called_.empty());
}
/// \brief The tested session.
ModuleCCSession mccs_;
/// \brief The value of message on the last called callback.
ConstElementPtr last_msg_;
/// \brief A message that can be used
ConstElementPtr msg_;
// Shared part of the simpleCommand and similar tests.
void commandTest(const string& group) {
// Push the message inside
session.addMessage(msg_, "test group", "<unused>");
EXPECT_TRUE(mccs_.hasQueuedMsgs());
// Register the callback
registerCommand(group);
// But the callback should not be called yet
// (even if the message is there).
nothingCalled();
// But when we call the checkCommand(), it should be called.
mccs_.checkCommand();
called(0);
EXPECT_EQ(msg_, last_msg_);
// But only once
nothingCalled();
// And the message should be eaten
EXPECT_FALSE(mccs_.hasQueuedMsgs());
// The callback should have been eaten as well, inserting another
// message will not invoke it again
session.addMessage(msg_, "test group", "<unused>");
mccs_.checkCommand();
nothingCalled();
}
/// \brief Shared part of the simpleResponse and wildcardResponse tests.
void responseTest(int seq) {
// Push the message inside
session.addMessage(msg_, "<ignored>", "<unused>", 1);
EXPECT_TRUE(mccs_.hasQueuedMsgs());
// Register the callback
registerReply(seq);
// But the callback should not be called yet
// (even if the message is there).
nothingCalled();
// But when we call the checkCommand(), it should be called.
mccs_.checkCommand();
called(0);
EXPECT_EQ(msg_, last_msg_);
// But only once
nothingCalled();
// And the message should be eaten
EXPECT_FALSE(mccs_.hasQueuedMsgs());
// The callback should have been eaten as well, inserting another
// message will not invoke it again
session.addMessage(msg_, "test group", "<unused>");
mccs_.checkCommand();
nothingCalled();
}
/// \brief Shared part of the noMatch* tests
void noMatchTest(int seq, int wanted_seq, bool is_reply) {
// Push the message inside
session.addMessage(msg_, "other group", "<unused>", seq);
EXPECT_TRUE(mccs_.hasQueuedMsgs());
// Register the callback
if (is_reply) {
registerReply(wanted_seq);
} else {
registerCommand("test group");
}
// But the callback should not be called yet
// (even if the message is there).
nothingCalled();
// And even not now, because it does not match.
mccs_.checkCommand();
nothingCalled();
// And the message should be eaten by the checkCommand
EXPECT_FALSE(mccs_.hasQueuedMsgs());
}
private:
/// \brief The next flag to be handed out
int next_flag_;
/// \brief Flags of callbacks already called (as FIFO)
list<int> called_;
/// \brief This is the callback registered to the tested groupRecvMsgAsync
/// function.
void callback(int store_flag, const ConstElementPtr&,
const ConstElementPtr& msg,
const ModuleCCSession::AsyncRecvRequestID&)
{
called_.push_back(store_flag);
last_msg_ = msg;
}
};
// Test we can receive a command, without anything fancy yet
TEST_F(AsyncReceiveCCSessionTest, simpleCommand) {
commandTest("test group");
}
// Test we can receive a "wildcard" command - without specifying the
// group to subscribe to. Very similar to simpleCommand test.
TEST_F(AsyncReceiveCCSessionTest, wildcardCommand) {
commandTest("");
}
// Very similar to simpleCommand, but with a response message
TEST_F(AsyncReceiveCCSessionTest, simpleResponse) {
responseTest(1);
}
// Matching a response message with wildcard
TEST_F(AsyncReceiveCCSessionTest, wildcardResponse) {
responseTest(-1);
}
// Check that a wrong command message is not matched
TEST_F(AsyncReceiveCCSessionTest, noMatchCommand) {
noMatchTest(-1, -1, false);
}
// Check that a wrong response message is not matched
TEST_F(AsyncReceiveCCSessionTest, noMatchResponse) {
noMatchTest(2, 3, true);
}
// Check that a command will not match on a reply check and vice versa
TEST_F(AsyncReceiveCCSessionTest, noMatchResponseAgainstCommand) {
// Send a command and check it is not matched as a response
noMatchTest(-1, -1, true);
}
TEST_F(AsyncReceiveCCSessionTest, noMatchCommandAgainstResponse) {
noMatchTest(2, -1, false);
}
// We check for command several times before the message actually arrives.
TEST_F(AsyncReceiveCCSessionTest, delayedCallback) {
// First, register the callback
registerReply(1);
// And see it is not called, because the message is not there yet
EXPECT_FALSE(mccs_.hasQueuedMsgs());
for (size_t i(0); i < 100; ++ i) {
mccs_.checkCommand();
EXPECT_FALSE(mccs_.hasQueuedMsgs());
nothingCalled();
}
// Now the message finally arrives
session.addMessage(msg_, "<ignored>", "<unused>", 1);
EXPECT_TRUE(mccs_.hasQueuedMsgs());
// And now, the callback is happily triggered.
mccs_.checkCommand();
called(0);
EXPECT_EQ(msg_, last_msg_);
// But only once
nothingCalled();
}
// See that if we put multiple messages inside, and request some callbacks,
// the callbacks are called in the order of messages, not in the order they
// were registered.
TEST_F(AsyncReceiveCCSessionTest, outOfOrder) {
// First, put some messages there
session.addMessage(msg_, "<ignored>", "<unused>", 1);
session.addMessage(msg_, "test group", "<unused>");
session.addMessage(msg_, "other group", "<unused>");
session.addMessage(msg_, "<ignored>", "<unused>", 2);
session.addMessage(msg_, "<ignored>", "<unused>", 3);
session.addMessage(msg_, "<ignored>", "<unused>", 4);
// Now register some callbacks
registerReply(13); // Will not be called
registerCommand("other group"); // Matches 3rd message
registerReply(2); // Matches 4th message
registerCommand(""); // Matches the 2nd message
registerCommand("test group"); // Will not be called
registerReply(-1); // Matches the 1st message
registerReply(-1); // Matches the 5th message
// Process all messages there
while (mccs_.hasQueuedMsgs()) {
mccs_.checkCommand();
}
// These are the numbers of callbacks in the order of messages
called(5);
called(3);
called(1);
called(2);
called(6);
// The last message doesn't trigger anything, so nothing more is called
nothingCalled();
}
// We first add, then remove the callback again and check that nothing is
// matched.
TEST_F(AsyncReceiveCCSessionTest, cancel) {
// Add the callback
ModuleCCSession::AsyncRecvRequestID request(registerReply(1));
// Add corresponding message
session.addMessage(msg_, "<ignored>", "<unused>", 1);
EXPECT_TRUE(mccs_.hasQueuedMsgs());
// And now, remove the callback again
mccs_.cancelAsyncRecv(request);
// And see that Nothing Happens(TM)
mccs_.checkCommand();
EXPECT_FALSE(mccs_.hasQueuedMsgs());
nothingCalled();
}
// We add multiple requests and cancel only one of them to see the rest
// is unaffected.
TEST_F(AsyncReceiveCCSessionTest, cancelSome) {
// Register few callbacks
registerReply(1);
ModuleCCSession::AsyncRecvRequestID request(registerCommand(""));
registerCommand("test group");
// Put some messages there
session.addMessage(msg_, "test group", "<unused>");
session.addMessage(msg_, "<ignored>", "<unused>", 1);
// Cancel the second callback. Therefore the first message will be matched
// by the third callback, not by the second.
mccs_.cancelAsyncRecv(request);
// Now, process the messages
mccs_.checkCommand();
mccs_.checkCommand();
// And see how they matched
called(2);
called(0);
nothingCalled();
}
void doRelatedLoggersTest(const char* input, const char* expected) {
ConstElementPtr all_conf = isc::data::Element::fromJSON(input);
ConstElementPtr expected_conf = isc::data::Element::fromJSON(expected);
EXPECT_EQ(*expected_conf, *isc::config::getRelatedLoggers(all_conf));
}
} // end anonymous namespace
TEST(LogConfigTest, relatedLoggersTest) {
// make sure logger configs for 'other' programs are ignored,
......
......@@ -139,6 +139,9 @@ FakeSession::recvmsg(ConstElementPtr& env, ConstElementPtr& msg, bool nonblock,
ElementPtr new_env = Element::createMap();
new_env->set("group", c_m->get(0));
new_env->set("to", c_m->get(1));
if (c_m->get(3)->intValue() != -1) {
new_env->set("reply", c_m->get(3));
}
env = new_env;
msg = c_m->get(2);
to_remove = c_m;
......@@ -207,7 +210,7 @@ FakeSession::reply(ConstElementPtr envelope, ConstElementPtr newmsg) {
bool
FakeSession::hasQueuedMsgs() const {
return (false);
return (msg_queue_ && msg_queue_->size() > 0);
}
ConstElementPtr
......@@ -228,12 +231,13 @@ FakeSession::getFirstMessage(std::string& group, std::string& to) const {
void
FakeSession::addMessage(ConstElementPtr msg, const std::string& group,
const std::string& to)
const std::string& to, int seq)
{
ElementPtr m_el = Element::createList();
m_el->add(Element::create(group));
m_