Commit 1747c854 authored by Jelte Jansen's avatar Jelte Jansen
Browse files

merged branches/trac296 (ticket #296, timeouts on msgq's command channel)

There is one open issue; there is a difference in behaviour between the different versions, in c++, an exception is thrown, while in the python version None is returned. The python version should probably be updated, but this also requires changes in the modules that use it, and is hence left out for now (and left for a future ticket item)


git-svn-id: svn://bind10.isc.org/svn/bind10/trunk@2761 e5f2f494-b856-4b98-b285-d166d9295462
parents 394c4bf5 aa79e7e9
......@@ -487,6 +487,7 @@ AC_OUTPUT([src/bin/cfgmgr/b10-cfgmgr.py
src/lib/python/bind10_config.py
src/lib/dns/tests/testdata/gen-wiredata.py
src/lib/cc/session_config.h.pre
src/lib/cc/tests/session_unittests_config.h
], [
chmod +x src/bin/cmdctl/run_b10-cmdctl.sh
chmod +x src/bin/xfrin/run_b10-xfrin.sh
......
......@@ -94,6 +94,8 @@ private:
virtual void startRead(boost::function<void()> read_callback);
virtual int reply(ElementPtr& envelope, ElementPtr& newmsg);
virtual bool hasQueuedMsgs();
virtual void setTimeout(size_t timeout UNUSED_PARAM) {};
virtual size_t getTimeout() const { return 0; };
void setMessage(ElementPtr msg) { msg_ = msg; }
void disableSend() { send_ok_ = false; }
......
......@@ -212,7 +212,10 @@ class MsgQ:
EOF."""
received = b''
while len(received) < length:
data = sock.recv(length - len(received))
try:
data = sock.recv(length - len(received))
except socket.error:
raise MsgQReceiveError(socket.error)
if len(data) == 0:
raise MsgQReceiveError("EOF")
received += data
......
......@@ -28,6 +28,7 @@
#include <unistd.h> // for some IPC/network system calls
#include <asio.hpp>
#include <asio/error_code.hpp>
#include <asio/deadline_timer.hpp>
#include <asio/system_error.hpp>
#include <cstdio>
......@@ -38,7 +39,9 @@
#include <sys/un.h>
#include <boost/bind.hpp>
#include <boost/optional.hpp>
#include <boost/function.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <exceptions/exceptions.h>
......@@ -53,20 +56,39 @@ using namespace isc::data;
// (e.g. write(2)) so we don't import the entire asio namespace.
using asio::io_service;
namespace {
/// \brief Sets the given Optional 'result' to the given error code
/// Used as a callback for emulating sync reads with async calls
/// \param result Pointer to the optional to set
/// \param err The error code to set it to
void
setResult(boost::optional<asio::error_code>* result,
const asio::error_code& err)
{
result->reset(err);
}
}
namespace isc {
namespace cc {
class SessionImpl {
public:
SessionImpl(io_service& io_service) :
sequence_(-1), queue_(Element::createList()),
io_service_(io_service), socket_(io_service_), data_length_(0)
io_service_(io_service), socket_(io_service_), data_length_(0),
timeout_(MSGQ_DEFAULT_TIMEOUT)
{}
void establish(const char& socket_file);
void disconnect();
void writeData(const void* data, size_t datalen);
size_t readDataLength();
// Blocking read. Will throw a SessionTimeout if the timeout value
// (in seconds) is thrown. If timeout is 0 it will block forever
void readData(void* data, size_t datalen);
void startRead(boost::function<void()> user_handler);
void setTimeout(size_t seconds) { timeout_ = seconds; };
size_t getTimeout() const { return timeout_; };
long int sequence_; // the next sequence number to use
std::string lname_;
......@@ -82,6 +104,17 @@ private:
uint32_t data_length_;
boost::function<void()> user_handler_;
asio::error_code error_;
size_t timeout_;
// By default, unless changed or disabled, blocking reads on
// the msgq channel will time out after 4 seconds in this
// implementation.
// This number is chosen to be low enough so that whatever
// component is blocking does not seem to be hanging, but
// still gives enough time for other modules to respond if they
// are busy. If this choice turns out to be a bad one, we can
// change it later.
static const size_t MSGQ_DEFAULT_TIMEOUT = 4000;
};
void
......@@ -131,8 +164,51 @@ SessionImpl::readDataLength() {
void
SessionImpl::readData(void* data, size_t datalen) {
boost::optional<asio::error_code> read_result;
boost::optional<asio::error_code> timer_result;
try {
asio::read(socket_, asio::buffer(data, datalen));
asio::async_read(socket_, asio::buffer(data, datalen),
boost::bind(&setResult, &read_result, _1));
asio::deadline_timer timer(socket_.io_service());
if (getTimeout() != 0) {
timer.expires_from_now(boost::posix_time::milliseconds(getTimeout()));
timer.async_wait(boost::bind(&setResult, &timer_result, _1));
}
// wait until either we have read the data we want, the
// timer expires, or one of the two is triggered with an error.
// When one of them has a result, cancel the other, and wait
// until the cancel is processed before we continue
while (!read_result && !timer_result) {
socket_.io_service().run_one();
// Don't cancel the timer if we haven't set it
if (read_result && getTimeout() != 0) {
timer.cancel();
while (!timer_result) {
socket_.io_service().run_one();
}
} else if (timer_result) {
socket_.cancel();
while (!read_result) {
socket_.io_service().run_one();
}
}
}
// asio::error_code evaluates to false if there was no error
if (*read_result) {
if (*read_result == asio::error::operation_aborted) {
isc_throw(SessionTimeout,
"Timeout while reading data from cc session");
} else {
isc_throw(SessionError,
"Error while reading data from cc session: " <<
read_result->message());
}
}
} catch (const asio::system_error& asio_ex) {
// to hide ASIO specific exceptions, we catch them explicitly
// and convert it to SessionError.
......@@ -144,11 +220,11 @@ void
SessionImpl::startRead(boost::function<void()> user_handler) {
data_length_ = 0;
user_handler_ = user_handler;
async_read(socket_, asio::buffer(&data_length_,
sizeof(data_length_)),
boost::bind(&SessionImpl::internalRead, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred));
asio::async_read(socket_, asio::buffer(&data_length_,
sizeof(data_length_)),
boost::bind(&SessionImpl::internalRead, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred));
}
void
......@@ -410,5 +486,14 @@ Session::hasQueuedMsgs() {
return (impl_->queue_->size() > 0);
}
void
Session::setTimeout(size_t milliseconds) {
impl_->setTimeout(milliseconds);
}
size_t
Session::getTimeout() const {
return (impl_->getTimeout());
}
}
}
......@@ -40,6 +40,15 @@ namespace isc {
isc::Exception(file, line, what) {}
};
/// \brief A standard Exception class that is thrown when a
/// blocking readData call does not read the given number of
/// bytes before the timeout expires
class SessionTimeout : public isc::Exception {
public:
SessionTimeout(const char* file, size_t line, const char* what) :
isc::Exception(file, line, what) {}
};
/// \brief The AbstractSession class is an abstract base class that
/// defines the interfaces of Session.
/// The intended primary usage of abstraction is to allow tests for the
......@@ -88,6 +97,17 @@ namespace isc {
virtual int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg) = 0;
virtual bool hasQueuedMsgs() = 0;
/// \brief Sets the default timeout for blocking reads
/// in this session to the given number of milliseconds
/// \param milliseconds the timeout for blocking reads in
/// milliseconds, if this is set to 0, reads will block
/// forever.
virtual void setTimeout(size_t milliseconds) = 0;
/// \brief Returns the current timeout for blocking reads
/// \return The timeout (in milliseconds)
virtual size_t getTimeout() const = 0;
};
class Session : public AbstractSession {
......@@ -121,6 +141,8 @@ namespace isc {
virtual int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg);
virtual bool hasQueuedMsgs();
virtual void setTimeout(size_t milliseconds);
virtual size_t getTimeout() const;
private:
void sendmsg(isc::data::ElementPtr& msg);
void sendmsg(isc::data::ElementPtr& env,
......
......@@ -22,10 +22,13 @@
#include <asio.hpp>
#include <gtest/gtest.h>
#include <boost/bind.hpp>
#include <exceptions/exceptions.h>
#include <cc/session.h>
#include <cc/data.h>
#include <session_unittests_config.h>
using namespace isc::cc;
......@@ -48,3 +51,190 @@ TEST(AsioSession, establish) {
);
}
// This class sets up a domain socket for the session to connect to
// it will impersonate the msgq a tiny bit (if setSendLname() has
// been called, it will send an 'answer' to the lname query that is
// sent in the initialization of Session objects)
class TestDomainSocket {
public:
TestDomainSocket(asio::io_service& io_service, const char* file) :
io_service_(io_service),
ep_(file),
acceptor_(io_service_, ep_),
socket_(io_service_)
{
acceptor_.async_accept(socket_,
boost::bind(&TestDomainSocket::acceptHandler,
this, _1));
}
~TestDomainSocket() {
socket_.close();
unlink(BIND10_TEST_SOCKET_FILE);
}
void
acceptHandler(const asio::error_code& error UNUSED_PARAM) {
}
void
sendmsg(isc::data::ElementPtr& env, isc::data::ElementPtr& msg) {
const std::string header_wire = env->toWire();
const std::string body_wire = msg->toWire();
const unsigned int length = 2 + header_wire.length() +
body_wire.length();
const unsigned int length_net = htonl(length);
const unsigned short header_length = header_wire.length();
const unsigned short header_length_net = htons(header_length);
socket_.send(asio::buffer(&length_net, sizeof(length_net)));
socket_.send(asio::buffer(&header_length_net,
sizeof(header_length_net)));
socket_.send(asio::buffer(header_wire.data(), header_length));
socket_.send(asio::buffer(body_wire.data(), body_wire.length()));
}
void
sendLname() {
isc::data::ElementPtr lname_answer1 =
isc::data::Element::fromJSON("{ \"type\": \"lname\" }");
isc::data::ElementPtr lname_answer2 =
isc::data::Element::fromJSON("{ \"lname\": \"foobar\" }");
sendmsg(lname_answer1, lname_answer2);
}
void
setSendLname() {
// ignore whatever data we get, send back an lname
asio::async_read(socket_, asio::buffer(data_buf, 0),
boost::bind(&TestDomainSocket::sendLname, this));
}
private:
asio::io_service& io_service_;
asio::local::stream_protocol::endpoint ep_;
asio::local::stream_protocol::acceptor acceptor_;
asio::local::stream_protocol::socket socket_;
char data_buf[1024];
};
class SessionTest : public ::testing::Test {
protected:
SessionTest() : sess(my_io_service), work(my_io_service) {
// The TestDomainSocket is held as a 'new'-ed pointer,
// so we can call unlink() first.
unlink(BIND10_TEST_SOCKET_FILE);
tds = new TestDomainSocket(my_io_service, BIND10_TEST_SOCKET_FILE);
}
~SessionTest() {
delete tds;
}
public:
// used in the handler test
// This handler first reads (and ignores) whatever message caused
// it to be invoked. Then it calls group_recv for a second message.
// If this message is { "command": "stop" } it'll tell the
// io_service it is done. Otherwise it'll re-register this handler
void someHandler() {
isc::data::ElementPtr env, msg;
sess.group_recvmsg(env, msg, false, -1);
sess.group_recvmsg(env, msg, false, -1);
if (msg && msg->contains("command") &&
msg->get("command")->stringValue() == "stop") {
my_io_service.stop();
} else {
sess.startRead(boost::bind(&SessionTest::someHandler, this));
}
}
protected:
asio::io_service my_io_service;
TestDomainSocket* tds;
Session sess;
// Keep run() from stopping right away by informing it it has work to do
asio::io_service::work work;
};
TEST_F(SessionTest, timeout_on_connect) {
// set to a short timeout so the test doesn't take too long
EXPECT_EQ(4000, sess.getTimeout());
sess.setTimeout(100);
EXPECT_EQ(100, sess.getTimeout());
// no answer, should timeout
EXPECT_THROW(sess.establish(BIND10_TEST_SOCKET_FILE), SessionTimeout);
}
TEST_F(SessionTest, connect_ok) {
tds->setSendLname();
sess.establish(BIND10_TEST_SOCKET_FILE);
}
TEST_F(SessionTest, connect_ok_no_timeout) {
tds->setSendLname();
sess.setTimeout(0);
sess.establish(BIND10_TEST_SOCKET_FILE);
}
TEST_F(SessionTest, connect_ok_connection_reset) {
tds->setSendLname();
sess.establish(BIND10_TEST_SOCKET_FILE);
// Close the session again, so the next recv() should throw
sess.disconnect();
isc::data::ElementPtr env, msg;
EXPECT_THROW(sess.group_recvmsg(env, msg, false, -1), SessionError);
}
TEST_F(SessionTest, run_with_handler) {
tds->setSendLname();
sess.establish(BIND10_TEST_SOCKET_FILE);
sess.startRead(boost::bind(&SessionTest::someHandler, this));
isc::data::ElementPtr env = isc::data::Element::fromJSON("{ \"to\": \"me\" }");
isc::data::ElementPtr msg = isc::data::Element::fromJSON("{ \"some\": \"message\" }");
tds->sendmsg(env, msg);
msg = isc::data::Element::fromJSON("{ \"another\": \"message\" }");
tds->sendmsg(env, msg);
msg = isc::data::Element::fromJSON("{ \"a third\": \"message\" }");
tds->sendmsg(env, msg);
msg = isc::data::Element::fromJSON("{ \"command\": \"stop\" }");
tds->sendmsg(env, msg);
size_t count = my_io_service.run();
ASSERT_EQ(2, count);
}
TEST_F(SessionTest, run_with_handler_timeout) {
tds->setSendLname();
sess.establish(BIND10_TEST_SOCKET_FILE);
sess.startRead(boost::bind(&SessionTest::someHandler, this));
sess.setTimeout(100);
isc::data::ElementPtr env = isc::data::Element::fromJSON("{ \"to\": \"me\" }");
isc::data::ElementPtr msg = isc::data::Element::fromJSON("{ \"some\": \"message\" }");
tds->sendmsg(env, msg);
msg = isc::data::Element::fromJSON("{ \"another\": \"message\" }");
tds->sendmsg(env, msg);
msg = isc::data::Element::fromJSON("{ \"a third\": \"message\" }");
tds->sendmsg(env, msg);
// No followup message, should time out.
ASSERT_THROW(my_io_service.run(), SessionTimeout);
}
#define BIND10_TEST_SOCKET_FILE "@builddir@/test_socket.sock"
......@@ -195,7 +195,7 @@ ModuleCCSession::ModuleCCSession(
isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config),
isc::data::ElementPtr(*command_handler)(
const std::string& command, const isc::data::ElementPtr args)
) throw (isc::cc::SessionError) :
) :
session_(session)
{
module_specification_ = readModuleSpecification(spec_file_name);
......
......@@ -138,7 +138,7 @@ public:
isc::data::ElementPtr(*command_handler)(
const std::string& command,
const isc::data::ElementPtr args) = NULL
) throw (isc::cc::SessionError);
);
/**
* Optional optimization for checkCommand loop; returns true
......
......@@ -72,6 +72,8 @@ public:
virtual int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg);
virtual bool hasQueuedMsgs();
virtual void setTimeout(size_t milliseconds) {};
virtual size_t getTimeout() const { return 0; };
isc::data::ElementPtr getFirstMessage(std::string& group, std::string& to);
void addMessage(isc::data::ElementPtr, const std::string& group,
const std::string& to);
......
......@@ -29,6 +29,10 @@ class SessionError(Exception): pass
class Session:
def __init__(self, socket_file=None):
self._socket = None
# store the current timeout value in seconds (the way
# settimeout() wants them, our API takes milliseconds
# so that it is consistent with the C++ version)
self._socket_timeout = 4;
self._lname = None
self._recvbuffer = bytearray()
self._recvlength = 0
......@@ -44,7 +48,6 @@ class Session:
self.socket_file = bind10_config.BIND10_MSGQ_SOCKET_FILE
else:
self.socket_file = socket_file
try:
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
......@@ -123,6 +126,10 @@ class Session:
self._socket.setblocking(0)
else:
self._socket.setblocking(1)
if self._socket_timeout == 0.0:
self._socket.settimeout(None)
else:
self._socket.settimeout(self._socket_timeout)
if self._recvlength == 0:
length = 4
......@@ -208,6 +215,15 @@ class Session:
}, isc.cc.message.to_wire(msg))
return seq
def set_timeout(self, milliseconds):
"""Sets the socket timeout for blocking reads to the given
number of milliseconds"""
self._socket_timeout = milliseconds / 1000.0
def get_timeout(self):
"""Returns the current timeout for blocking reads (in milliseconds)"""
return self._socket_timeout * 1000.0
if __name__ == "__main__":
import doctest
doctest.testmod()
......@@ -11,5 +11,6 @@ check-local:
for pytest in $(PYTESTS) ; do \
echo Running test: $$pytest ; \
env PYTHONPATH=$(abs_top_srcdir)/src/lib/python:$(abs_top_builddir)/src/lib/python \
BIND10_TEST_SOCKET_FILE=$(builddir)/test_socket.sock \
$(PYCOVERAGE) $(abs_srcdir)/$$pytest ; \
done
......@@ -89,13 +89,20 @@ class MySocket():
if msg:
self.recvqueue.extend(msg)
def settimeout(self, val):
pass
def gettimeout(self):
return 0
#
# We subclass the Session class we're testing here, only
# to override the __init__() method, which wants a socket,
# and we need to use our fake socket
class MySession(Session):
def __init__(self, port=9912):
def __init__(self, port=9912, s=None):
self._socket = None
self._socket_timeout = 1
self._lname = None
self._recvbuffer = bytearray()
self._recvlength = 0
......@@ -104,13 +111,16 @@ class MySession(Session):
self._queue = []
self._lock = threading.RLock()
try:
self._socket = MySocket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect(tuple(['127.0.0.1', port]))
self._lname = "test_name"
# testing getlname here isn't useful, code removed
except socket.error as se:
raise SessionError(se)
if s is not None:
self._socket = s
else:
try:
self._socket = MySocket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect(tuple(['127.0.0.1', port]))
self._lname = "test_name"
# testing getlname here isn't useful, code removed
except socket.error as se:
raise SessionError(se)
class testSession(unittest.TestCase):
......@@ -323,7 +333,27 @@ class testSession(unittest.TestCase):
sess.group_reply({ 'from': 'me', 'group': 'our_group', 'instance': 'other_instance', 'seq': 9}, {"hello": "a"})
sent = sess._socket.readsentmsg();
self.assertEqual(sent, b'\x00\x00\x00\x8b\x00{{"from": "test_name", "seq": 3, "to": "me", "instance": "other_instance", "reply": 9, "group": "our_group", "type": "send"}{"hello": "a"}')
def test_timeout(self):
if "BIND10_TEST_SOCKET_FILE" not in os.environ:
self.assertEqual("", "This test can only run if the value BIND10_TEST_SOCKET_FILE is set in the environment")
TEST_SOCKET_FILE = os.environ["BIND10_TEST_SOCKET_FILE"]
# create a read domain socket to pass into the session
s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if os.path.exists(TEST_SOCKET_FILE):
os.remove(TEST_SOCKET_FILE)
s1.bind(TEST_SOCKET_FILE)
s1.listen(1)
s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s2.connect(TEST_SOCKET_FILE)
sess = MySession(1, s2)
# set timeout to 100 msec, so test does not take too long
sess.set_timeout(100)
env, msg = sess.group_recvmsg(False)
self.assertEqual(None, env)
self.assertEqual(None, msg)
if __name__ == "__main__":
unittest.main()
......
......@@ -395,8 +395,9 @@ class ConfigManager:
self.running = True
while (self.running):
msg, env = self.cc.group_recvmsg(False)
if msg and not 'result' in msg:
# ignore 'None' value (current result of timeout)
# and messages that are answers to questions we did