Commit a52058ff authored by JINMEI Tatuya's avatar JINMEI Tatuya

merged r2393


git-svn-id: svn://bind10.isc.org/svn/bind10/branches/trac275@2404 e5f2f494-b856-4b98-b285-d166d9295462
parent 03fd62de
......@@ -56,6 +56,7 @@ public:
bool getVerbose() const;
void serve(std::string zone_name);
isc::data::ElementPtr updateConfig(isc::data::ElementPtr config);
void setSession(isc::cc::AbstractSession* session);
isc::config::ModuleCCSession* configSession() const;
void setConfigSession(isc::config::ModuleCCSession* cs);
private:
......
......@@ -138,8 +138,9 @@ main(int argc, char* argv[]) {
usage();
}
// initialize command channel
int ret = 0;
Session* cc_session = NULL;
ModuleCCSession* cs = NULL;
try {
string specfile;
if (getenv("B10_FROM_BUILD")) {
......@@ -151,23 +152,31 @@ main(int argc, char* argv[]) {
auth_server = new AuthSrv(cache);
auth_server->setVerbose(verbose_mode);
cout << "[b10-auth] Server created." << endl;
io_service = new asio_link::IOService(auth_server, address, port,
use_ipv4, use_ipv6);
cout << "[b10-auth] IOService created." << endl;
ModuleCCSession cs(specfile, io_service->get_io_service(),
my_config_handler, my_command_handler);
cc_session = new Session(io_service->get_io_service());
cout << "[b10-auth] Session channel created." << endl;
auth_server->setConfigSession(&cs);
cs = new ModuleCCSession(specfile, *cc_session, my_config_handler,
my_command_handler);
cout << "[b10-auth] Configuration channel established." << endl;
auth_server->setConfigSession(cs);
auth_server->updateConfig(ElementPtr());
cout << "[b10-auth] Server started." << endl;
io_service->run();
} catch (const std::exception& ex) {
cerr << "[b10-auth] " << ex.what() << endl;
cerr << "[b10-auth] Initialization failed: " << ex.what() << endl;
ret = 1;
}
delete cs;
delete cc_session;
delete io_service;
delete auth_server;
return (ret);
......
......@@ -15,7 +15,7 @@
// $Id$
#include <config.h>
#include "session_config.h"
#include <cc/session_config.h>
#include <stdint.h>
......@@ -42,8 +42,8 @@
#include <exceptions/exceptions.h>
#include "data.h"
#include "session.h"
#include <cc/data.h>
#include <cc/session.h>
using namespace std;
using namespace isc::cc;
......@@ -52,44 +52,26 @@ using namespace isc::data;
// some of the asio names conflict with socket API system calls
// (e.g. write(2)) so we don't import the entire asio namespace.
using asio::io_service;
using asio::ip::tcp;
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
namespace isc {
namespace cc {
class SessionImpl {
public:
SessionImpl() : sequence_(-1) { queue_ = Element::createList(); }
virtual ~SessionImpl() {}
virtual void establish(const char& socket_file) = 0;
virtual int getSocket() = 0;
virtual void disconnect() = 0;
virtual void writeData(const void* data, size_t datalen) = 0;
virtual size_t readDataLength() = 0;
virtual void readData(void* data, size_t datalen) = 0;
virtual void startRead(boost::function<void()> user_handler) = 0;
SessionImpl(io_service& io_service) :
sequence_(-1), queue_(Element::createList()),
io_service_(io_service), socket_(io_service_), data_length_(0)
{}
void establish(const char& socket_file);
void disconnect();
void writeData(const void* data, size_t datalen);
size_t readDataLength();
void readData(void* data, size_t datalen);
void startRead(boost::function<void()> user_handler);
long int sequence_; // the next sequence number to use
std::string lname_;
ElementPtr queue_;
};
class ASIOSession : public SessionImpl {
public:
ASIOSession(io_service& io_service) :
io_service_(io_service), socket_(io_service_), data_length_(0)
{}
virtual void establish(const char& socket_file);
virtual void disconnect();
virtual int getSocket() { return (socket_.native()); }
virtual void writeData(const void* data, size_t datalen);
virtual size_t readDataLength();
virtual void readData(void* data, size_t datalen);
virtual void startRead(boost::function<void()> user_handler);
private:
void internalRead(const asio::error_code& error,
size_t bytes_transferred);
......@@ -102,28 +84,28 @@ private:
asio::error_code error_;
};
void
ASIOSession::establish(const char& socket_file) {
SessionImpl::establish(const char& socket_file) {
try {
socket_.connect(asio::local::stream_protocol::endpoint(&socket_file), error_);
} catch (asio::system_error& se) {
socket_.connect(asio::local::stream_protocol::endpoint(&socket_file),
error_);
} catch(const asio::system_error& se) {
isc_throw(SessionError, se.what());
}
if (error_) {
isc_throw(SessionError, "Unable to connect to message queue.");
isc_throw(SessionError, "Unable to connect to message queue: " <<
error_.message());
}
}
void
ASIOSession::disconnect() {
SessionImpl::disconnect() {
socket_.close();
data_length_ = 0;
}
void
ASIOSession::writeData(const void* data, size_t datalen) {
SessionImpl::writeData(const void* data, size_t datalen) {
try {
asio::write(socket_, asio::buffer(data, datalen));
} catch (const asio::system_error& asio_ex) {
......@@ -132,7 +114,7 @@ ASIOSession::writeData(const void* data, size_t datalen) {
}
size_t
ASIOSession::readDataLength() {
SessionImpl::readDataLength() {
size_t ret_len = data_length_;
if (ret_len == 0) {
......@@ -148,7 +130,7 @@ ASIOSession::readDataLength() {
}
void
ASIOSession::readData(void* data, size_t datalen) {
SessionImpl::readData(void* data, size_t datalen) {
try {
asio::read(socket_, asio::buffer(data, datalen));
} catch (const asio::system_error& asio_ex) {
......@@ -159,18 +141,18 @@ ASIOSession::readData(void* data, size_t datalen) {
}
void
ASIOSession::startRead(boost::function<void()> user_handler) {
SessionImpl::startRead(boost::function<void()> user_handler) {
data_length_ = 0;
user_handler_ = user_handler;
async_read(socket_, asio::buffer(&data_length_,
sizeof(data_length_)),
boost::bind(&ASIOSession::internalRead, this,
boost::bind(&SessionImpl::internalRead, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred));
}
void
ASIOSession::internalRead(const asio::error_code& error,
SessionImpl::internalRead(const asio::error_code& error,
size_t bytes_transferred)
{
if (!error) {
......@@ -185,27 +167,22 @@ ASIOSession::internalRead(const asio::error_code& error,
}
}
class SocketSession : public SessionImpl {
public:
SocketSession() : sock_(-1) {}
virtual ~SocketSession() { disconnect(); }
virtual int getSocket() { return (sock_); }
void establish(const char& socket_file);
virtual void disconnect()
{
if (sock_ >= 0) {
close(sock_);
}
sock_ = -1;
}
virtual void writeData(const void* data, size_t datalen);
virtual void readData(void* data, size_t datalen);
virtual size_t readDataLength();
virtual void startRead(boost::function<void()> user_handler UNUSED_PARAM)
{} // nothing to do for this class
private:
int sock_;
};
Session::Session(io_service& io_service) : impl_(new SessionImpl(io_service))
{}
Session::~Session() {
delete impl_;
}
void
Session::disconnect() {
impl_->disconnect();
}
void
Session::startRead(boost::function<void()> read_callback) {
impl_->startRead(read_callback);
}
namespace { // maybe unnecessary.
// This is a helper class to make the establish() method (below) exception-safe
......@@ -224,83 +201,6 @@ public:
};
}
void
SocketSession::establish(const char& socket_file) {
struct sockaddr_un s_un;
#ifdef HAVE_SA_LEN
s_un.sun_len = sizeof(struct sockaddr_un);
#endif
if (strlen(&socket_file) >= sizeof(s_un.sun_path)) {
isc_throw(SessionError, "Unable to connect to message queue; "
"socket file path too long: " << socket_file);
}
s_un.sun_family = AF_UNIX;
strncpy(s_un.sun_path, &socket_file, sizeof(s_un.sun_path) - 1);
int s = socket(AF_UNIX, SOCK_STREAM, 0);
if (s < 0) {
isc_throw(SessionError, "socket() failed");
}
if (connect(s, (struct sockaddr *)&s_un, sizeof(s_un)) < 0) {
close(s);
isc_throw(SessionError, "Unable to connect to message queue");
}
sock_ = s;
}
void
SocketSession::writeData(const void* data, const size_t datalen) {
int cc = write(sock_, data, datalen);
if (cc != datalen) {
isc_throw(SessionError, "Write failed: expect " << datalen <<
", actual " << cc);
}
}
size_t
SocketSession::readDataLength() {
uint32_t length;
readData(&length, sizeof(length));
return (ntohl(length));
}
void
SocketSession::readData(void* data, const size_t datalen) {
int cc = read(sock_, data, datalen);
if (cc != datalen) {
isc_throw(SessionError, "Read failed: expect " << datalen <<
", actual " << cc);
}
}
Session::Session() : impl_(new SocketSession)
{}
Session::Session(io_service& io_service) : impl_(new ASIOSession(io_service))
{}
Session::~Session() {
delete impl_;
}
void
Session::disconnect() {
impl_->disconnect();
}
int
Session::getSocket() const {
return (impl_->getSocket());
}
void
Session::startRead(boost::function<void()> read_callback) {
impl_->startRead(read_callback);
}
void
Session::establish(const char* socket_file) {
if (socket_file == NULL) {
......@@ -334,7 +234,8 @@ Session::establish(const char* socket_file) {
}
//
// Convert to wire format and send this on the TCP stream with its length prefix
// Convert to wire format and send this via the stream socket with its length
// prefix.
//
void
Session::sendmsg(ElementPtr& msg) {
......
......@@ -40,7 +40,36 @@ namespace isc {
isc::Exception(file, line, what) {}
};
class Session {
class AbstractSession {
private:
AbstractSession(const AbstractSession& source);
AbstractSession& operator=(const AbstractSession& source);
protected:
AbstractSession() {}
public:
virtual ~AbstractSession() {}
//@}
virtual void establish(const char* socket_file) = 0;
virtual void disconnect() = 0;
virtual int group_sendmsg(isc::data::ElementPtr msg,
std::string group,
std::string instance = "*",
std::string to = "*") = 0;
virtual bool group_recvmsg(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& msg,
bool nonblock = true,
int seq = -1) = 0;
virtual void subscribe(std::string group,
std::string instance = "*") = 0;
virtual void unsubscribe(std::string group,
std::string instance = "*") = 0;
virtual void startRead(boost::function<void()> read_callback) = 0;
virtual int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg) = 0;
virtual bool hasQueuedMsgs() = 0;
};
class Session : public AbstractSession {
private:
SessionImpl* impl_;
......@@ -49,14 +78,10 @@ namespace isc {
Session& operator=(const Session& source);
public:
Session();
Session(asio::io_service& ioservice);
~Session();
// XXX: quick hack to allow the user to watch the socket directly.
int getSocket() const;
void startRead(boost::function<void()> read_callback);
virtual void startRead(boost::function<void()> read_callback);
void establish(const char* socket_file = NULL);
void disconnect();
......@@ -70,9 +95,9 @@ namespace isc {
isc::data::ElementPtr& msg,
bool nonblock = true,
int seq = -1);
void subscribe(std::string group,
std::string instance = "*");
void unsubscribe(std::string group,
virtual void subscribe(std::string group,
std::string instance = "*");
virtual void unsubscribe(std::string group,
std::string instance = "*");
int group_sendmsg(isc::data::ElementPtr msg,
std::string group,
......@@ -82,9 +107,9 @@ namespace isc {
isc::data::ElementPtr& msg,
bool nonblock = true,
int seq = -1);
int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg);
bool hasQueuedMsgs();
virtual int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg);
virtual bool hasQueuedMsgs();
};
} // namespace cc
} // namespace isc
......
......@@ -47,22 +47,3 @@ TEST(AsioSession, establish) {
);
}
TEST(Session, establish) {
Session sess;
EXPECT_THROW(
sess.establish("/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
), isc::cc::SessionError
);
}
......@@ -198,36 +198,12 @@ ModuleCCSession::startCheck() {
ModuleCCSession::ModuleCCSession(
std::string spec_file_name,
asio::io_service& io_service,
isc::cc::AbstractSession& session,
isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config),
isc::data::ElementPtr(*command_handler)(
const std::string& command, const isc::data::ElementPtr args)
) throw (isc::cc::SessionError) :
session_(io_service)
{
init(spec_file_name, config_handler, command_handler);
// register callback for asynchronous read
session_.startRead(boost::bind(&ModuleCCSession::startCheck, this));
}
ModuleCCSession::ModuleCCSession(
std::string spec_file_name,
isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config),
isc::data::ElementPtr(*command_handler)(
const std::string& command, const isc::data::ElementPtr args)
) throw (isc::cc::SessionError)
{
init(spec_file_name, config_handler, command_handler);
}
void
ModuleCCSession::init(
std::string spec_file_name,
isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config),
isc::data::ElementPtr(*command_handler)(
const std::string& command, const isc::data::ElementPtr args)
) throw (isc::cc::SessionError)
session_(session)
{
module_specification_ = readModuleSpecification(spec_file_name);
setModuleSpec(module_specification_);
......@@ -238,7 +214,7 @@ ModuleCCSession::init(
ElementPtr answer, env;
session_.establish();
session_.establish(NULL);
session_.subscribe(module_name_, "*");
//session_.subscribe("Boss", "*");
//session_.subscribe("statistics", "*");
......@@ -265,6 +241,9 @@ ModuleCCSession::init(
std::cerr << "[" << module_name_ << "] Error getting config: " << new_config << std::endl;
}
}
// register callback for asynchronous read
session_.startRead(boost::bind(&ModuleCCSession::startCheck, this));
}
/// Validates the new config values, if they are correct,
......@@ -300,12 +279,6 @@ ModuleCCSession::handleConfigUpdate(ElementPtr new_config)
return answer;
}
int
ModuleCCSession::getSocket()
{
return (session_.getSocket());
}
bool
ModuleCCSession::hasQueuedMsgs()
{
......
......@@ -24,10 +24,6 @@
#include <cc/session.h>
#include <cc/data.h>
namespace asio {
class io_service;
}
namespace isc {
namespace config {
......@@ -127,26 +123,14 @@ public:
* module specification.
*/
ModuleCCSession(std::string spec_file_name,
isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config) = NULL,
isc::data::ElementPtr(*command_handler)(const std::string& command, const isc::data::ElementPtr args) = NULL
) throw (isc::cc::SessionError);
ModuleCCSession(std::string spec_file_name,
asio::io_service& io_service,
isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config) = NULL,
isc::data::ElementPtr(*command_handler)(const std::string& command, const isc::data::ElementPtr args) = NULL
isc::cc::AbstractSession& session,
isc::data::ElementPtr(*config_handler)(
isc::data::ElementPtr new_config) = NULL,
isc::data::ElementPtr(*command_handler)(
const std::string& command,
const isc::data::ElementPtr args) = NULL
) throw (isc::cc::SessionError);
/**
* Returns the socket that is used to communicate with the msgq
* command channel. This socket should *only* be used to run a
* select() loop over it. And if not time-critical, it is strongly
* recommended to only use checkCommand() to check for messages
*
* @return The socket used to communicate with the msgq command
* channel.
*/
int getSocket();
/**
* Optional optimization for checkCommand loop; returns true
* if there are unhandled queued messages in the cc session.
......@@ -227,18 +211,11 @@ public:
ElementPtr getRemoteConfigValue(const std::string& module_name, const std::string& identifier);
private:
void init(
std::string spec_file_name,
isc::data::ElementPtr(*config_handler)(
isc::data::ElementPtr new_config),
isc::data::ElementPtr(*command_handler)(
const std::string& command, const isc::data::ElementPtr args)
) throw (isc::cc::SessionError);
ModuleSpec readModuleSpecification(const std::string& filename);
void startCheck();
std::string module_name_;
isc::cc::Session session_;
isc::cc::AbstractSession& session_;
ModuleSpec module_specification_;
ElementPtr handleConfigUpdate(ElementPtr new_config);
......
......@@ -14,11 +14,11 @@
// $Id: module_spec_unittests.cc 1321 2010-03-11 10:17:03Z jelte $
#include "config.h"
#include <config.h>
#include <gtest/gtest.h>
#include "fake_session.h"
#include <config/tests/fake_session.h>
#include <config/ccsession.h>
......@@ -28,42 +28,35 @@
using namespace isc::data;
using namespace isc::config;
using namespace isc::cc;
using namespace std;
std::string ccspecfile(const std::string name) {
namespace {
std::string
ccspecfile(const std::string name) {
return std::string(TEST_DATA_PATH) + "/" + name;
}
static ElementPtr
el(const std::string& str)
{
ElementPtr
el(const std::string& str) {
return Element::fromJSON(str);
}
// upon creation of a ModuleCCSession, the class
// sends its specification to the config manager
// it expects an ok answer back, so everytime we
// create a ModuleCCSession, we must set an initial
// ok answer