Commit 01f159a6 authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

Merge #2861

Synchronize the workthread and main thread in the authoritative server,
by providing callbacks executed in the main thread after a work in the
workthread is completed.
parents dd01c785 c2dfa379
......@@ -151,6 +151,10 @@ A separate thread for maintaining data source clients has been started.
% AUTH_DATASRC_CLIENTS_BUILDER_STOPPED data source builder thread stopped
The separate thread for maintaining data source clients has been stopped.
% AUTH_DATASRC_CLIENTS_BUILDER_WAKE_ERR failed to wake up main thread: %1
A low-level error happened when trying to send data to the main thread to wake
it up. Terminating to prevent inconsistent state and possiblu hang ups.
% AUTH_DATASRC_CLIENTS_SHUTDOWN_ERROR error on waiting for data source builder thread: %1
This indicates that the separate thread for maintaining data source
clients had been terminated due to an uncaught exception, and the
......
......@@ -319,6 +319,7 @@ AuthSrvImpl::AuthSrvImpl(AbstractXfroutClient& xfrout_client,
xfrin_session_(NULL),
counters_(),
keyring_(NULL),
datasrc_clients_mgr_(io_service_),
ddns_base_forwarder_(ddns_forwarder),
ddns_forwarder_(NULL),
xfrout_connected_(false),
......
......@@ -29,6 +29,9 @@
#include <datasrc/client_list.h>
#include <datasrc/memory/zone_writer.h>
#include <asiolink/io_service.h>
#include <asiolink/local_socket.h>
#include <auth/auth_log.h>
#include <auth/datasrc_config.h>
......@@ -36,11 +39,16 @@
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/noncopyable.hpp>
#include <boost/function.hpp>
#include <boost/foreach.hpp>
#include <exception>
#include <cassert>
#include <cerrno>
#include <list>
#include <utility>
#include <sys/types.h>
#include <sys/socket.h>
namespace isc {
namespace auth {
......@@ -77,13 +85,40 @@ enum CommandID {
NUM_COMMANDS
};
/// \brief Callback to be called when the command is completed.
typedef boost::function<void ()> FinishedCallback;
/// \brief The data type passed from DataSrcClientsMgr to
/// DataSrcClientsBuilder.
/// DataSrcClientsBuilder.
///
/// The first element of the pair is the command ID, and the second element
/// is its argument. If the command doesn't take an argument it should be
/// a null pointer.
typedef std::pair<CommandID, data::ConstElementPtr> Command;
/// This just holds the data items together, no logic or protection
/// is present here.
struct Command {
/// \brief Constructor
///
/// It just initializes the member variables of the same names
/// as the parameters.
Command(CommandID id, const data::ConstElementPtr& params,
const FinishedCallback& callback) :
id(id),
params(params),
callback(callback)
{}
/// \brief The command to execute
CommandID id;
/// \brief Argument of the command.
///
/// If the command takes no argument, it should be null pointer.
///
/// This may be a null pointer if the command takes no parameters.
data::ConstElementPtr params;
/// \brief A callback to be called once the command finishes.
///
/// This may be an empty boost::function. In such case, no callback
/// will be called after completion.
FinishedCallback callback;
};
} // namespace datasrc_clientmgr_internal
/// \brief Frontend to the manager object for data source clients.
......@@ -113,6 +148,24 @@ private:
boost::shared_ptr<datasrc::ConfigurableClientList> >
ClientListsMap;
class FDGuard : boost::noncopyable {
public:
FDGuard(DataSrcClientsMgrBase *mgr) :
mgr_(mgr)
{}
~FDGuard() {
if (mgr_->read_fd_ != -1) {
close(mgr_->read_fd_);
}
if (mgr_->write_fd_ != -1) {
close(mgr_->write_fd_);
}
}
private:
DataSrcClientsMgrBase* mgr_;
};
friend class FDGuard;
public:
/// \brief Thread-safe accessor to the data source client lists.
///
......@@ -176,12 +229,20 @@ public:
///
/// \throw std::bad_alloc internal memory allocation failure.
/// \throw isc::Unexpected general unexpected system errors.
DataSrcClientsMgrBase() :
DataSrcClientsMgrBase(asiolink::IOService& service) :
clients_map_(new ClientListsMap),
builder_(&command_queue_, &cond_, &queue_mutex_, &clients_map_,
&map_mutex_),
builder_thread_(boost::bind(&BuilderType::run, &builder_))
{}
fd_guard_(new FDGuard(this)),
read_fd_(-1), write_fd_(-1),
builder_(&command_queue_, &callback_queue_, &cond_, &queue_mutex_,
&clients_map_, &map_mutex_, createFds()),
builder_thread_(boost::bind(&BuilderType::run, &builder_)),
wakeup_socket_(service, read_fd_)
{
// Schedule wakeups when callbacks are pushed.
wakeup_socket_.asyncRead(
boost::bind(&DataSrcClientsMgrBase::processCallbacks, this, _1),
buffer, 1);
}
/// \brief The destructor.
///
......@@ -220,6 +281,7 @@ public:
AUTH_DATASRC_CLIENTS_SHUTDOWN_UNEXPECTED_ERROR);
}
processCallbacks(); // Any leftover callbacks
cleanup(); // see below
}
......@@ -234,11 +296,18 @@ public:
/// \brief std::bad_alloc
///
/// \param config_arg The new data source configuration. Must not be NULL.
void reconfigure(data::ConstElementPtr config_arg) {
/// \param callback Called once the reconfigure command completes. It is
/// called in the main thread (not in the work one). It should be
/// exceptionless.
void reconfigure(const data::ConstElementPtr& config_arg,
const datasrc_clientmgr_internal::FinishedCallback&
callback = datasrc_clientmgr_internal::FinishedCallback())
{
if (!config_arg) {
isc_throw(InvalidParameter, "Invalid null config argument");
}
sendCommand(datasrc_clientmgr_internal::RECONFIGURE, config_arg);
sendCommand(datasrc_clientmgr_internal::RECONFIGURE, config_arg,
callback);
reconfigureHook(); // for test's customization
}
......@@ -257,12 +326,18 @@ public:
/// \param args Element argument that should be a map of the form
/// { "class": "IN", "origin": "example.com" }
/// (but class is optional and will default to IN)
/// \param callback Called once the loadZone command completes. It
/// is called in the main thread, not in the work thread. It should
/// be exceptionless.
///
/// \exception CommandError if the args value is null, or not in
/// the expected format, or contains
/// a bad origin or class string
void
loadZone(data::ConstElementPtr args) {
loadZone(const data::ConstElementPtr& args,
const datasrc_clientmgr_internal::FinishedCallback& callback =
datasrc_clientmgr_internal::FinishedCallback())
{
if (!args) {
isc_throw(CommandError, "loadZone argument empty");
}
......@@ -303,7 +378,7 @@ public:
// implement it would be to factor out the code from
// the start of doLoadZone(), and call it here too
sendCommand(datasrc_clientmgr_internal::LOADZONE, args);
sendCommand(datasrc_clientmgr_internal::LOADZONE, args, callback);
}
private:
......@@ -317,30 +392,79 @@ private:
void reconfigureHook() {}
void sendCommand(datasrc_clientmgr_internal::CommandID command,
data::ConstElementPtr arg)
const data::ConstElementPtr& arg,
const datasrc_clientmgr_internal::FinishedCallback&
callback = datasrc_clientmgr_internal::FinishedCallback())
{
// The lock will be held until the end of this method. Only
// push_back has to be protected, but we can avoid having an extra
// block this way.
typename MutexType::Locker locker(queue_mutex_);
command_queue_.push_back(
datasrc_clientmgr_internal::Command(command, arg));
datasrc_clientmgr_internal::Command(command, arg, callback));
cond_.signal();
}
int createFds() {
int fds[2];
int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, fds);
if (result != 0) {
isc_throw(Unexpected, "Can't create socket pair: " <<
strerror(errno));
}
read_fd_ = fds[0];
write_fd_ = fds[1];
return write_fd_;
}
void processCallbacks(const std::string& error = std::string()) {
// Schedule the next read.
wakeup_socket_.asyncRead(
boost::bind(&DataSrcClientsMgrBase::processCallbacks, this, _1),
buffer, 1);
if (!error.empty()) {
// Generally, there should be no errors (as we are the other end
// as well), but check just in case.
isc_throw(Unexpected, error);
}
// Steal the callbacks into local copy.
std::list<datasrc_clientmgr_internal::FinishedCallback> queue;
{
typename MutexType::Locker locker(queue_mutex_);
queue.swap(callback_queue_);
}
// Execute the callbacks
BOOST_FOREACH(const datasrc_clientmgr_internal::FinishedCallback&
callback, queue) {
callback();
}
}
//
// The following are shared with the builder.
//
// The list is used as a one-way queue: back-in, front-out
std::list<datasrc_clientmgr_internal::Command> command_queue_;
// Similar to above, for the callbacks that are ready to be called.
// While the command queue is for sending commands from the main thread
// to the work thread, this one is for the other direction. Protected
// by the same mutex (queue_mutex_).
std::list<datasrc_clientmgr_internal::FinishedCallback> callback_queue_;
CondVarType cond_; // condition variable for queue operations
MutexType queue_mutex_; // mutex to protect the queue
datasrc::ClientListMapPtr clients_map_;
// map of actual data source client objects
boost::scoped_ptr<FDGuard> fd_guard_; // A guard to close the fds.
int read_fd_, write_fd_; // Descriptors for wakeup
MutexType map_mutex_; // mutex to protect the clients map
BuilderType builder_;
ThreadType builder_thread_; // for safety this should be placed last
isc::asiolink::LocalSocket wakeup_socket_; // For integration of read_fd_
// to the asio loop
char buffer[1]; // Buffer for the wakeup socket.
};
namespace datasrc_clientmgr_internal {
......@@ -385,12 +509,15 @@ public:
///
/// \throw None
DataSrcClientsBuilderBase(std::list<Command>* command_queue,
std::list<FinishedCallback>* callback_queue,
CondVarType* cond, MutexType* queue_mutex,
datasrc::ClientListMapPtr* clients_map,
MutexType* map_mutex
MutexType* map_mutex,
int wake_fd
) :
command_queue_(command_queue), cond_(cond), queue_mutex_(queue_mutex),
clients_map_(clients_map), map_mutex_(map_mutex)
command_queue_(command_queue), callback_queue_(callback_queue),
cond_(cond), queue_mutex_(queue_mutex),
clients_map_(clients_map), map_mutex_(map_mutex), wake_fd_(wake_fd)
{}
/// \brief The main loop.
......@@ -457,10 +584,12 @@ private:
// The following are shared with the manager
std::list<Command>* command_queue_;
std::list<FinishedCallback> *callback_queue_;
CondVarType* cond_;
MutexType* queue_mutex_;
datasrc::ClientListMapPtr* clients_map_;
MutexType* map_mutex_;
int wake_fd_;
};
// Shortcut typedef for normal use
......@@ -494,6 +623,31 @@ DataSrcClientsBuilderBase<MutexType, CondVarType>::run() {
AUTH_DATASRC_CLIENTS_BUILDER_COMMAND_ERROR).
arg(e.what());
}
if (current_commands.front().callback) {
// Lock the queue
typename MutexType::Locker locker(*queue_mutex_);
callback_queue_->
push_back(current_commands.front().callback);
// Wake up the other end. If it would block, there are data
// and it'll wake anyway.
int result = send(wake_fd_, "w", 1, MSG_DONTWAIT);
if (result == -1 &&
(errno != EWOULDBLOCK && errno != EAGAIN)) {
// Note: the strerror might not be thread safe, as
// subsequent call to it might change the returned
// string. But that is unlikely and strerror_r is
// not portable and we are going to terminate anyway,
// so that's better than nothing.
//
// Also, this error handler is not tested. It should
// be generally impossible to happen, so it is hard
// to trigger in controlled way.
LOG_FATAL(auth_logger,
AUTH_DATASRC_CLIENTS_BUILDER_WAKE_ERR).
arg(strerror(errno));
std::terminate();
}
}
current_commands.pop_front();
}
}
......@@ -515,7 +669,7 @@ bool
DataSrcClientsBuilderBase<MutexType, CondVarType>::handleCommand(
const Command& command)
{
const CommandID cid = command.first;
const CommandID cid = command.id;
if (cid >= NUM_COMMANDS) {
// This shouldn't happen except for a bug within this file.
isc_throw(Unexpected, "internal bug: invalid command, ID: " << cid);
......@@ -526,12 +680,12 @@ DataSrcClientsBuilderBase<MutexType, CondVarType>::handleCommand(
};
LOG_DEBUG(auth_logger, DBGLVL_TRACE_BASIC,
AUTH_DATASRC_CLIENTS_BUILDER_COMMAND).arg(command_desc.at(cid));
switch (command.first) {
switch (command.id) {
case RECONFIGURE:
doReconfigure(command.second);
doReconfigure(command.params);
break;
case LOADZONE:
doLoadZone(command.second);
doLoadZone(command.params);
break;
case SHUTDOWN:
return (false);
......
......@@ -36,9 +36,13 @@
#include <boost/function.hpp>
#include <sys/types.h>
#include <sys/socket.h>
#include <cstdlib>
#include <string>
#include <sstream>
#include <cerrno>
using isc::data::ConstElementPtr;
using namespace isc::dns;
......@@ -54,17 +58,24 @@ protected:
DataSrcClientsBuilderTest() :
clients_map(new std::map<RRClass,
boost::shared_ptr<ConfigurableClientList> >),
builder(&command_queue, &cond, &queue_mutex, &clients_map, &map_mutex),
write_end(-1), read_end(-1),
builder(&command_queue, &callback_queue, &cond, &queue_mutex,
&clients_map, &map_mutex, generateSockets()),
cond(command_queue, delayed_command_queue), rrclass(RRClass::IN()),
shutdown_cmd(SHUTDOWN, ConstElementPtr()),
noop_cmd(NOOP, ConstElementPtr())
shutdown_cmd(SHUTDOWN, ConstElementPtr(), FinishedCallback()),
noop_cmd(NOOP, ConstElementPtr(), FinishedCallback())
{}
~ DataSrcClientsBuilderTest() {
}
void configureZones(); // used for loadzone related tests
ClientListMapPtr clients_map; // configured clients
std::list<Command> command_queue; // test command queue
std::list<Command> delayed_command_queue; // commands available after wait
std::list<FinishedCallback> callback_queue; // Callbacks from commands
int write_end, read_end;
TestDataSrcClientsBuilder builder;
TestCondVar cond;
TestMutex queue_mutex;
......@@ -72,6 +83,15 @@ protected:
const RRClass rrclass;
const Command shutdown_cmd;
const Command noop_cmd;
private:
int generateSockets() {
int pair[2];
int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, pair);
assert(result == 0);
write_end = pair[0];
read_end = pair[1];
return write_end;
}
};
TEST_F(DataSrcClientsBuilderTest, runSingleCommand) {
......@@ -82,6 +102,44 @@ TEST_F(DataSrcClientsBuilderTest, runSingleCommand) {
EXPECT_EQ(0, cond.wait_count); // no wait because command queue is not empty
EXPECT_EQ(1, queue_mutex.lock_count);
EXPECT_EQ(1, queue_mutex.unlock_count);
// No callback scheduled, none called.
EXPECT_TRUE(callback_queue.empty());
// Not woken up.
char c;
int result = recv(read_end, &c, 1, MSG_DONTWAIT);
EXPECT_EQ(-1, result);
EXPECT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK);
}
// Just to have a valid function callback to pass
void emptyCallsback() {}
// Check a command finished callback is passed
TEST_F(DataSrcClientsBuilderTest, commandFinished) {
command_queue.push_back(Command(SHUTDOWN, ConstElementPtr(),
emptyCallsback));
builder.run();
EXPECT_EQ(0, cond.wait_count); // no wait because command queue is not empty
// Once for picking up data, once for putting the callback there
EXPECT_EQ(2, queue_mutex.lock_count);
EXPECT_EQ(2, queue_mutex.unlock_count);
// There's one callback in the queue
ASSERT_EQ(1, callback_queue.size());
EXPECT_EQ(emptyCallsback, callback_queue.front());
// And we are woken up.
char c;
int result = recv(read_end, &c, 1, MSG_DONTWAIT);
EXPECT_EQ(1, result);
}
// Test that low-level errors with the synchronization socket
// (an unexpected condition) is detected and program aborted.
TEST_F(DataSrcClientsBuilderTest, finishedCrash) {
command_queue.push_back(Command(SHUTDOWN, ConstElementPtr(),
emptyCallsback));
// Break the socket
close(write_end);
EXPECT_DEATH_IF_SUPPORTED({builder.run();}, "");
}
TEST_F(DataSrcClientsBuilderTest, runMultiCommands) {
......@@ -138,7 +196,7 @@ TEST_F(DataSrcClientsBuilderTest, reconfigure) {
// the error handling
// A command structure we'll modify to send different commands
Command reconfig_cmd(RECONFIGURE, ConstElementPtr());
Command reconfig_cmd(RECONFIGURE, ConstElementPtr(), FinishedCallback());
// Initially, no clients should be there
EXPECT_TRUE(clients_map->empty());
......@@ -166,7 +224,7 @@ TEST_F(DataSrcClientsBuilderTest, reconfigure) {
"}"
);
reconfig_cmd.second = good_config;
reconfig_cmd.params = good_config;
EXPECT_TRUE(builder.handleCommand(reconfig_cmd));
EXPECT_EQ(1, clients_map->size());
EXPECT_EQ(1, map_mutex.lock_count);
......@@ -177,7 +235,7 @@ TEST_F(DataSrcClientsBuilderTest, reconfigure) {
// If a 'bad' command argument got here, the config validation should
// have failed already, but still, the handler should return true,
// and the clients_map should not be updated.
reconfig_cmd.second = Element::create("{ \"foo\": \"bar\" }");
reconfig_cmd.params = Element::create("{ \"foo\": \"bar\" }");
EXPECT_TRUE(builder.handleCommand(reconfig_cmd));
EXPECT_EQ(working_config_clients, clients_map);
// Building failed, so map mutex should not have been locked again
......@@ -185,7 +243,7 @@ TEST_F(DataSrcClientsBuilderTest, reconfigure) {
// The same for a configuration that has bad data for the type it
// specifies
reconfig_cmd.second = bad_config;
reconfig_cmd.params = bad_config;
builder.handleCommand(reconfig_cmd);
EXPECT_TRUE(builder.handleCommand(reconfig_cmd));
EXPECT_EQ(working_config_clients, clients_map);
......@@ -194,21 +252,21 @@ TEST_F(DataSrcClientsBuilderTest, reconfigure) {
// The same goes for an empty parameter (it should at least be
// an empty map)
reconfig_cmd.second = ConstElementPtr();
reconfig_cmd.params = ConstElementPtr();
EXPECT_TRUE(builder.handleCommand(reconfig_cmd));
EXPECT_EQ(working_config_clients, clients_map);
EXPECT_EQ(1, map_mutex.lock_count);
// Reconfigure again with the same good clients, the result should
// be a different map than the original, but not an empty one.
reconfig_cmd.second = good_config;
reconfig_cmd.params = good_config;
EXPECT_TRUE(builder.handleCommand(reconfig_cmd));
EXPECT_NE(working_config_clients, clients_map);
EXPECT_EQ(1, clients_map->size());
EXPECT_EQ(2, map_mutex.lock_count);
// And finally, try an empty config to disable all datasource clients
reconfig_cmd.second = Element::createMap();
reconfig_cmd.params = Element::createMap();
EXPECT_TRUE(builder.handleCommand(reconfig_cmd));
EXPECT_EQ(0, clients_map->size());
EXPECT_EQ(3, map_mutex.lock_count);
......@@ -224,7 +282,8 @@ TEST_F(DataSrcClientsBuilderTest, shutdown) {
TEST_F(DataSrcClientsBuilderTest, badCommand) {
// out-of-range command ID
EXPECT_THROW(builder.handleCommand(Command(NUM_COMMANDS,
ConstElementPtr())),
ConstElementPtr(),
FinishedCallback())),
isc::Unexpected);
}
......@@ -308,7 +367,8 @@ TEST_F(DataSrcClientsBuilderTest, loadZone) {
const Command loadzone_cmd(LOADZONE, Element::fromJSON(
"{\"class\": \"IN\","
" \"origin\": \"test1.example\"}"));
" \"origin\": \"test1.example\"}"),
FinishedCallback());
EXPECT_TRUE(builder.handleCommand(loadzone_cmd));
// loadZone involves two critical sections: one for getting the zone
......@@ -369,7 +429,8 @@ TEST_F(DataSrcClientsBuilderTest,
// Now send the command to reload it
const Command loadzone_cmd(LOADZONE, Element::fromJSON(
"{\"class\": \"IN\","
" \"origin\": \"example.org\"}"));
" \"origin\": \"example.org\"}"),
FinishedCallback());
EXPECT_TRUE(builder.handleCommand(loadzone_cmd));
// And now it should be present too.
EXPECT_EQ(ZoneFinder::SUCCESS,
......@@ -380,7 +441,8 @@ TEST_F(DataSrcClientsBuilderTest,
// An error case: the zone has no configuration. (note .com here)
const Command nozone_cmd(LOADZONE, Element::fromJSON(
"{\"class\": \"IN\","
" \"origin\": \"example.com\"}"));
" \"origin\": \"example.com\"}"),
FinishedCallback());
EXPECT_THROW(builder.handleCommand(nozone_cmd),
TestDataSrcClientsBuilder::InternalCommandError);
// The previous zone is not hurt in any way
......@@ -403,7 +465,8 @@ TEST_F(DataSrcClientsBuilderTest,
builder.handleCommand(
Command(LOADZONE, Element::fromJSON(
"{\"class\": \"IN\","
" \"origin\": \"example.org\"}")));
" \"origin\": \"example.org\"}"),
FinishedCallback()));
// Only one mutex was needed because there was no actual reload.
EXPECT_EQ(orig_lock_count + 1, map_mutex.lock_count);
EXPECT_EQ(orig_unlock_count + 1, map_mutex.unlock_count);
......@@ -421,7 +484,8 @@ TEST_F(DataSrcClientsBuilderTest,
builder.handleCommand(
Command(LOADZONE, Element::fromJSON(
"{\"class\": \"IN\","
" \"origin\": \"nosuchzone.example\"}"))),
" \"origin\": \"nosuchzone.example\"}"),
FinishedCallback())),
TestDataSrcClientsBuilder::InternalCommandError);
// basically impossible case: in-memory cache is completely disabled.
......@@ -441,7 +505,8 @@ TEST_F(DataSrcClientsBuilderTest,
EXPECT_THROW(builder.handleCommand(
Command(LOADZONE, Element::fromJSON(
"{\"class\": \"IN\","
" \"origin\": \"example.org\"}"))),
" \"origin\": \"example.org\"}"),
FinishedCallback())),
TestDataSrcClientsBuilder::InternalCommandError);
}
......@@ -454,7 +519,8 @@ TEST_F(DataSrcClientsBuilderTest, loadBrokenZone) {
// there's an error in the new zone file. reload will be rejected.
const Command loadzone_cmd(LOADZONE, Element::fromJSON(
"{\"class\": \"IN\","
" \"origin\": \"test1.example\"}"));
" \"origin\": \"test1.example\"}"),
FinishedCallback());
EXPECT_THROW(builder.handleCommand(loadzone_cmd),
TestDataSrcClientsBuilder::InternalCommandError);
zoneChecks(clients_map, rrclass); // zone shouldn't be replaced
......@@ -469,7 +535,8 @@ TEST_F(DataSrcClientsBuilderTest, loadUnreadableZone) {
TEST_DATA_BUILDDIR "/test1.zone.copied"));
const Command loadzone_cmd(LOADZONE, Element::fromJSON(
"{\"class\": \"IN\","
" \"origin\": \"test1.example\"}"));
" \"origin\": \"test1.example\"}"),
FinishedCallback());
EXPECT_THROW(builder.handleCommand(loadzone_cmd),
TestDataSrcClientsBuilder::InternalCommandError);
zoneChecks(clients_map, rrclass); // zone shouldn't be replaced
......@@ -482,7 +549,8 @@ TEST_F(DataSrcClientsBuilderTest, loadZoneWithoutDataSrc) {
Command(LOADZONE,
Element::fromJSON(