Commit ee4099c3 authored by JINMEI Tatuya's avatar JINMEI Tatuya

introduced an abstract base Session class so that we can easily create test...

introduced an abstract base Session class so that we can easily create test cases using a mock sessio implementation.

This is probably beyond the scope of this ticket, so it's probably better to make a separate ticket focusing on this refactoring.

Some tests currently fail.  They'll be fixed in the next commit.


git-svn-id: svn://bind10.isc.org/svn/bind10/branches/trac221@2393 e5f2f494-b856-4b98-b285-d166d9295462
parent 3d8cbc4a
......@@ -70,8 +70,7 @@ private:
AuthSrvImpl(const AuthSrvImpl& source);
AuthSrvImpl& operator=(const AuthSrvImpl& source);
public:
AuthSrvImpl(AbstractSession& session_with_xfrin,
AbstractXfroutClient& xfrout_client);
AuthSrvImpl(AbstractXfroutClient& xfrout_client);
~AuthSrvImpl();
isc::data::ElementPtr setDbFile(const isc::data::ElementPtr config);
......@@ -91,8 +90,7 @@ public:
bool verbose_mode_;
bool is_notify_session_established_;
AbstractSession& session_with_xfrin_;
AbstractSession* session_with_xfrin_;
bool is_axfr_connection_established_;
AbstractXfroutClient& xfrout_client_;
......@@ -101,11 +99,9 @@ public:
static const uint16_t DEFAULT_LOCAL_UDPSIZE = 4096;
};
AuthSrvImpl::AuthSrvImpl(AbstractSession& session_with_xfrin,
AbstractXfroutClient& xfrout_client) :
AuthSrvImpl::AuthSrvImpl(AbstractXfroutClient& xfrout_client) :
cs_(NULL), verbose_mode_(false),
is_notify_session_established_(false),
session_with_xfrin_(session_with_xfrin),
session_with_xfrin_(NULL),
is_axfr_connection_established_(false),
xfrout_client_(xfrout_client)
{
......@@ -118,20 +114,14 @@ AuthSrvImpl::AuthSrvImpl(AbstractSession& session_with_xfrin,
}
AuthSrvImpl::~AuthSrvImpl() {
if (is_notify_session_established_) {
session_with_xfrin_.disconnect();
is_notify_session_established_ = false;
}
if (is_axfr_connection_established_) {
xfrout_client_.disconnect();
is_axfr_connection_established_ = false;
}
}
AuthSrv::AuthSrv(AbstractSession& session_with_xfrin,
AbstractXfroutClient& xfrout_client) :
impl_(new AuthSrvImpl(session_with_xfrin, xfrout_client))
AuthSrv::AuthSrv(AbstractXfroutClient& xfrout_client) :
impl_(new AuthSrvImpl(xfrout_client))
{}
AuthSrv::~AuthSrv() {
......@@ -200,6 +190,11 @@ AuthSrv::getVerbose() const {
return (impl_->verbose_mode_);
}
void
AuthSrv::setSession(AbstractSession* session) {
impl_->session_with_xfrin_ = session;
}
void
AuthSrv::setConfigSession(ModuleCCSession* cs) {
impl_->cs_ = cs;
......@@ -411,19 +406,14 @@ AuthSrvImpl::processNotify(const IOMessage& io_message, Message& message,
// error happens rather than returning (e.g.) SERVFAIL. RFC 1996 is
// silent about such cases, but there doesn't seem to be anything we can
// improve at the primary server side by sending an error anyway.
if (!is_notify_session_established_) {
try {
session_with_xfrin_.establish(NULL);
is_notify_session_established_ = true;
} catch (const isc::cc::SessionError& err) {
if (session_with_xfrin_ == NULL) {
if (verbose_mode_) {
cerr << "[b10-auth] Error in connection with xfrin module: "
<< err.what() << endl;
cerr << "[b10-auth] "
"session interface for xfrin is not available" << endl;
}
return (false);
}
}
const string remote_ip_address =
io_message.getRemoteEndpoint().getAddress().toText();
static const string command_template_start =
......@@ -437,10 +427,10 @@ AuthSrvImpl::processNotify(const IOMessage& io_message, Message& message,
command_template_mid + remote_ip_address +
command_template_end);
const unsigned int seq =
session_with_xfrin_.group_sendmsg(notify_command, "Xfrin",
session_with_xfrin_->group_sendmsg(notify_command, "Xfrin",
"*", "*");
ElementPtr env, answer, parsed_answer;
session_with_xfrin_.group_recvmsg(env, answer, false, seq);
session_with_xfrin_->group_recvmsg(env, answer, false, seq);
int rcode;
parsed_answer = parseAnswer(rcode, answer);
if (rcode != 0) {
......
......@@ -51,8 +51,7 @@ private:
AuthSrv(const AuthSrv& source);
AuthSrv& operator=(const AuthSrv& source);
public:
AuthSrv(isc::cc::AbstractSession& session_with_xfrin,
isc::xfr::AbstractXfroutClient& xfrout_client);
AuthSrv(isc::xfr::AbstractXfroutClient& xfrout_client);
~AuthSrv();
//@}
/// \return \c true if the \message contains a response to be returned;
......@@ -64,6 +63,7 @@ public:
bool getVerbose() const;
void serve(std::string zone_name);
isc::data::ElementPtr updateConfig(isc::data::ElementPtr config);
void setSession(isc::cc::AbstractSession* session);
isc::config::ModuleCCSession* configSession() const;
void setConfigSession(isc::config::ModuleCCSession* cs);
private:
......
......@@ -134,10 +134,10 @@ main(int argc, char* argv[]) {
usage();
}
// initialize command channel
int ret = 0;
Session session_with_xfrin; // we should eventually pass io_service here.
Session* cc_session = NULL;
ModuleCCSession* cs = NULL;
XfroutClient xfrout_client(UNIX_SOCKET_FILE);
try {
string specfile;
......@@ -148,26 +148,39 @@ main(int argc, char* argv[]) {
specfile = string(AUTH_SPECFILE_LOCATION);
}
auth_server = new AuthSrv(session_with_xfrin, xfrout_client);
auth_server = new AuthSrv(xfrout_client);
auth_server->setVerbose(verbose_mode);
cout << "[b10-auth] Server created." << endl;
io_service = new asio_link::IOService(auth_server, port, use_ipv4,
use_ipv6);
ModuleCCSession cs(specfile, io_service->get_io_service(),
my_config_handler, my_command_handler);
auth_server->setConfigSession(&cs);
cout << "[b10-auth] IOService created." << endl;
cc_session = new Session(io_service->get_io_service());
cout << "[b10-auth] Session channel created." << endl;
cs = new ModuleCCSession(specfile, *cc_session, my_config_handler,
my_command_handler);
cout << "[b10-auth] Configuration channel established." << endl;
// XXX: with the current interface to asio_link we have to create
// auth_server before io_service while Session needs io_service.
// In a next step of refactoring we should make asio_link independent
// from auth_server, and create io_service, auth_server, and
// cc_session in that order.
auth_server->setSession(cc_session);
auth_server->setConfigSession(cs);
auth_server->updateConfig(ElementPtr());
cout << "[b10-auth] Server started." << endl;
io_service->run();
} catch (const std::exception& ex) {
cerr << "[b10-auth] " << ex.what() << endl;
cerr << "[b10-auth] Initialization failed: " << ex.what() << endl;
ret = 1;
}
delete cs;
delete cc_session;
delete io_service;
delete auth_server;
return (ret);
......
......@@ -16,6 +16,8 @@
#include <config.h>
#include <boost/function.hpp>
#include <gtest/gtest.h>
#include <dns/buffer.h>
......@@ -89,6 +91,20 @@ private:
string instance, string to);
virtual bool group_recvmsg(ElementPtr& envelope, ElementPtr& msg,
bool nonblock, int seq);
virtual void subscribe(string group UNUSED_PARAM,
string instance UNUSED_PARAM)
{}
virtual void unsubscribe(string group UNUSED_PARAM,
string instance UNUSED_PARAM)
{}
virtual void startRead(
boost::function<void()> read_callback UNUSED_PARAM)
{}
virtual int reply(ElementPtr& envelope UNUSED_PARAM,
ElementPtr& newmsg UNUSED_PARAM)
{ return (0); }
virtual bool hasQueuedMsgs() { return (false); }
void setMessage(ElementPtr msg) { msg_ = msg; }
bool isEstablished() const { return (is_established_); }
void disableEstablish() { establish_ok_ = false; }
......@@ -103,7 +119,7 @@ private:
};
protected:
AuthSrvTest() : server(notify_session, xfrout),
AuthSrvTest() : server(xfrout),
request_message(Message::RENDER),
parse_message(Message::PARSE), default_qid(0x1035),
opcode(Opcode(Opcode::QUERY())), qname("www.example.com"),
......@@ -111,7 +127,9 @@ protected:
io_message(NULL), endpoint(NULL), request_obuffer(0),
request_renderer(request_obuffer),
response_obuffer(0), response_renderer(response_obuffer)
{}
{
server.setSession(&notify_session);
}
~AuthSrvTest() {
delete io_message;
delete endpoint;
......
......@@ -15,7 +15,7 @@
// $Id$
#include <config.h>
#include "session_config.h"
#include <cc/session_config.h>
#include <stdint.h>
......@@ -41,8 +41,8 @@
#include <exceptions/exceptions.h>
#include "data.h"
#include "session.h"
#include <cc/data.h>
#include <cc/session.h>
using namespace std;
using namespace isc::cc;
......@@ -51,44 +51,26 @@ using namespace isc::data;
// some of the asio names conflict with socket API system calls
// (e.g. write(2)) so we don't import the entire asio namespace.
using asio::io_service;
using asio::ip::tcp;
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
namespace isc {
namespace cc {
class SessionImpl {
public:
SessionImpl() : sequence_(-1) { queue_ = Element::createFromString("[]"); }
virtual ~SessionImpl() {}
virtual void establish(const char& socket_file) = 0;
virtual int getSocket() = 0;
virtual void disconnect() = 0;
virtual void writeData(const void* data, size_t datalen) = 0;
virtual size_t readDataLength() = 0;
virtual void readData(void* data, size_t datalen) = 0;
virtual void startRead(boost::function<void()> user_handler) = 0;
SessionImpl(io_service& io_service) :
sequence_(-1), queue_(Element::createFromString("[]")),
io_service_(io_service), socket_(io_service_), data_length_(0)
{}
void establish(const char& socket_file);
void disconnect();
void writeData(const void* data, size_t datalen);
size_t readDataLength();
void readData(void* data, size_t datalen);
void startRead(boost::function<void()> user_handler);
int sequence_; // the next sequence number to use
std::string lname_;
ElementPtr queue_;
};
class ASIOSession : public SessionImpl {
public:
ASIOSession(io_service& io_service) :
io_service_(io_service), socket_(io_service_), data_length_(0)
{}
virtual void establish(const char& socket_file);
virtual void disconnect();
virtual int getSocket() { return (socket_.native()); }
virtual void writeData(const void* data, size_t datalen);
virtual size_t readDataLength();
virtual void readData(void* data, size_t datalen);
virtual void startRead(boost::function<void()> user_handler);
private:
void internalRead(const asio::error_code& error,
size_t bytes_transferred);
......@@ -101,28 +83,28 @@ private:
asio::error_code error_;
};
void
ASIOSession::establish(const char& socket_file) {
SessionImpl::establish(const char& socket_file) {
try {
socket_.connect(asio::local::stream_protocol::endpoint(&socket_file), error_);
} catch (asio::system_error& se) {
socket_.connect(asio::local::stream_protocol::endpoint(&socket_file),
error_);
} catch(const asio::system_error& se) {
isc_throw(SessionError, se.what());
}
if (error_) {
isc_throw(SessionError, "Unable to connect to message queue.");
isc_throw(SessionError, "Unable to connect to message queue: " <<
error_.message());
}
}
void
ASIOSession::disconnect() {
SessionImpl::disconnect() {
socket_.close();
data_length_ = 0;
}
void
ASIOSession::writeData(const void* data, size_t datalen) {
SessionImpl::writeData(const void* data, size_t datalen) {
try {
asio::write(socket_, asio::buffer(data, datalen));
} catch (const asio::system_error& asio_ex) {
......@@ -131,7 +113,7 @@ ASIOSession::writeData(const void* data, size_t datalen) {
}
size_t
ASIOSession::readDataLength() {
SessionImpl::readDataLength() {
size_t ret_len = data_length_;
if (ret_len == 0) {
......@@ -147,7 +129,7 @@ ASIOSession::readDataLength() {
}
void
ASIOSession::readData(void* data, size_t datalen) {
SessionImpl::readData(void* data, size_t datalen) {
try {
asio::read(socket_, asio::buffer(data, datalen));
} catch (const asio::system_error& asio_ex) {
......@@ -158,18 +140,18 @@ ASIOSession::readData(void* data, size_t datalen) {
}
void
ASIOSession::startRead(boost::function<void()> user_handler) {
SessionImpl::startRead(boost::function<void()> user_handler) {
data_length_ = 0;
user_handler_ = user_handler;
async_read(socket_, asio::buffer(&data_length_,
sizeof(data_length_)),
boost::bind(&ASIOSession::internalRead, this,
boost::bind(&SessionImpl::internalRead, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred));
}
void
ASIOSession::internalRead(const asio::error_code& error,
SessionImpl::internalRead(const asio::error_code& error,
size_t bytes_transferred)
{
if (!error) {
......@@ -184,27 +166,22 @@ ASIOSession::internalRead(const asio::error_code& error,
}
}
class SocketSession : public SessionImpl {
public:
SocketSession() : sock_(-1) {}
virtual ~SocketSession() { disconnect(); }
virtual int getSocket() { return (sock_); }
void establish(const char& socket_file);
virtual void disconnect()
{
if (sock_ >= 0) {
close(sock_);
}
sock_ = -1;
}
virtual void writeData(const void* data, size_t datalen);
virtual void readData(void* data, size_t datalen);
virtual size_t readDataLength();
virtual void startRead(boost::function<void()> user_handler UNUSED_PARAM)
{} // nothing to do for this class
private:
int sock_;
};
Session::Session(io_service& io_service) : impl_(new SessionImpl(io_service))
{}
Session::~Session() {
delete impl_;
}
void
Session::disconnect() {
impl_->disconnect();
}
void
Session::startRead(boost::function<void()> read_callback) {
impl_->startRead(read_callback);
}
namespace { // maybe unnecessary.
// This is a helper class to make the establish() method (below) exception-safe
......@@ -223,83 +200,6 @@ public:
};
}
void
SocketSession::establish(const char& socket_file) {
struct sockaddr_un s_un;
#ifdef HAVE_SA_LEN
s_un.sun_len = sizeof(struct sockaddr_un);
#endif
if (strlen(&socket_file) >= sizeof(s_un.sun_path)) {
isc_throw(SessionError, "Unable to connect to message queue; "
"socket file path too long: " << socket_file);
}
s_un.sun_family = AF_UNIX;
strncpy(s_un.sun_path, &socket_file, sizeof(s_un.sun_path) - 1);
int s = socket(AF_UNIX, SOCK_STREAM, 0);
if (s < 0) {
isc_throw(SessionError, "socket() failed");
}
if (connect(s, (struct sockaddr *)&s_un, sizeof(s_un)) < 0) {
close(s);
isc_throw(SessionError, "Unable to connect to message queue");
}
sock_ = s;
}
void
SocketSession::writeData(const void* data, const size_t datalen) {
int cc = write(sock_, data, datalen);
if (cc != datalen) {
isc_throw(SessionError, "Write failed: expect " << datalen <<
", actual " << cc);
}
}
size_t
SocketSession::readDataLength() {
uint32_t length;
readData(&length, sizeof(length));
return (ntohl(length));
}
void
SocketSession::readData(void* data, const size_t datalen) {
int cc = read(sock_, data, datalen);
if (cc != datalen) {
isc_throw(SessionError, "Read failed: expect " << datalen <<
", actual " << cc);
}
}
Session::Session() : impl_(new SocketSession)
{}
Session::Session(io_service& io_service) : impl_(new ASIOSession(io_service))
{}
Session::~Session() {
delete impl_;
}
void
Session::disconnect() {
impl_->disconnect();
}
int
Session::getSocket() const {
return (impl_->getSocket());
}
void
Session::startRead(boost::function<void()> read_callback) {
impl_->startRead(read_callback);
}
void
Session::establish(const char* socket_file) {
if (socket_file == NULL) {
......@@ -333,7 +233,8 @@ Session::establish(const char* socket_file) {
}
//
// Convert to wire format and send this on the TCP stream with its length prefix
// Convert to wire format and send this via the stream socket with its length
// prefix.
//
void
Session::sendmsg(ElementPtr& msg) {
......
......@@ -68,12 +68,20 @@ namespace isc {
virtual void disconnect() = 0;
virtual int group_sendmsg(isc::data::ElementPtr msg,
std::string group,
std::string instance,
std::string to) = 0;
std::string instance = "*",
std::string to = "*") = 0;
virtual bool group_recvmsg(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& msg,
bool nonblock,
int seq) = 0;
bool nonblock = true,
int seq = -1) = 0;
virtual void subscribe(std::string group,
std::string instance = "*") = 0;
virtual void unsubscribe(std::string group,
std::string instance = "*") = 0;
virtual void startRead(boost::function<void()> read_callback) = 0;
virtual int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg) = 0;
virtual bool hasQueuedMsgs() = 0;
};
class Session : public AbstractSession {
......@@ -85,14 +93,10 @@ namespace isc {
Session& operator=(const Session& source);
public:
Session();
Session(asio::io_service& ioservice);
virtual ~Session();
// XXX: quick hack to allow the user to watch the socket directly.
int getSocket() const;
void startRead(boost::function<void()> read_callback);
virtual void startRead(boost::function<void()> read_callback);
virtual void establish(const char* socket_file = NULL);
void disconnect();
......@@ -106,9 +110,9 @@ namespace isc {
isc::data::ElementPtr& msg,
bool nonblock = true,
int seq = -1);
void subscribe(std::string group,
std::string instance = "*");
void unsubscribe(std::string group,
virtual void subscribe(std::string group,
std::string instance = "*");
virtual void unsubscribe(std::string group,
std::string instance = "*");
virtual int group_sendmsg(isc::data::ElementPtr msg,
std::string group,
......@@ -118,9 +122,9 @@ namespace isc {
isc::data::ElementPtr& msg,
bool nonblock = true,
int seq = -1);
int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg);
bool hasQueuedMsgs();
virtual int reply(isc::data::ElementPtr& envelope,
isc::data::ElementPtr& newmsg);
virtual bool hasQueuedMsgs();
};
} // namespace cc
} // namespace isc
......
......@@ -45,22 +45,3 @@ TEST(AsioSession, establish) {
);
}
TEST(Session, establish) {
Session sess;
EXPECT_THROW(
sess.establish("/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
"/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/aaaaaaaaaa/"
), isc::cc::SessionError
);
}
......@@ -198,12 +198,12 @@ ModuleCCSession::startCheck() {
ModuleCCSession::ModuleCCSession(
std::string spec_file_name,
asio::io_service& io_service,
isc::cc::AbstractSession& session,
isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config),
isc::data::ElementPtr(*command_handler)(
const std::string& command, const isc::data::ElementPtr args)
) throw (isc::cc::SessionError) :
session_(io_service)