Commit d913e9a7 authored by Marcin Siodelski's avatar Marcin Siodelski

[5317] Cleanup of the CommandMgr's code.

parent c4fb6001
......@@ -1066,7 +1066,13 @@ TEST_F(CtrlChannelDhcpv4SrvTest, configReloadValid) {
::remove("test8.json");
}
// Verify that server returns an error if more than one connection is established.
/// Verify that concurrent connections over the control channel can be
/// established.
/// @todo Future Kea 1.3 tickets will modify the behavior of the CommandMgr
/// such that the server will be able to send response in multiple chunks.
/// This test will need to be extended. For now, the receive and write
/// operations are atomic and there is no conflict between concurrent
/// connections.
TEST_F(CtrlChannelDhcpv4SrvTest, concurrentConnections) {
createUnixChannelServer();
......@@ -1088,28 +1094,15 @@ TEST_F(CtrlChannelDhcpv4SrvTest, concurrentConnections) {
ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }"));
ASSERT_NO_THROW(getIOService()->poll());
// The server should not allow for concurrent connections and should send
// out an error message.
std::string response;
ASSERT_TRUE(client2->getResponse(response));
EXPECT_EQ("{ \"result\": 1, \"text\": \"exceeded maximum number of concurrent"
" connections\" }", response);
// Now disconnect the first server and retry.
client1->disconnectFromServer();
ASSERT_NO_THROW(getIOService()->poll());
ASSERT_TRUE(client2->connectToServer(socket_path_));
ASSERT_NO_THROW(getIOService()->poll());
ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }"));
ASSERT_NO_THROW(getIOService()->poll());
// The server should now respond ok.
// The server should respond ok.
ASSERT_TRUE(client2->getResponse(response));
EXPECT_TRUE(response.find("\"result\": 0") != std::string::npos);
// Disconnect the servers.
client1->disconnectFromServer();
client2->disconnectFromServer();
ASSERT_NO_THROW(getIOService()->poll());
}
} // End of anonymous namespace
......@@ -1088,7 +1088,13 @@ TEST_F(CtrlChannelDhcpv6SrvTest, configReloadValid) {
::remove("test8.json");
}
// Verify that server returns an error if more than one connection is established.
/// Verify that concurrent connections over the control channel can be
/// established.
/// @todo Future Kea 1.3 tickets will modify the behavior of the CommandMgr
/// such that the server will be able to send response in multiple chunks.
/// This test will need to be extended. For now, the receive and write
/// operations are atomic and there is no conflict between concurrent
/// connections.
TEST_F(CtrlChannelDhcpv6SrvTest, concurrentConnections) {
createUnixChannelServer();
......@@ -1110,28 +1116,15 @@ TEST_F(CtrlChannelDhcpv6SrvTest, concurrentConnections) {
ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }"));
ASSERT_NO_THROW(getIOService()->poll());
// The server should not allow for concurrent connections and should send
// out an error message.
std::string response;
ASSERT_TRUE(client2->getResponse(response));
EXPECT_EQ("{ \"result\": 1, \"text\": \"exceeded maximum number of concurrent"
" connections\" }", response);
// Now disconnect the first server and retry.
client1->disconnectFromServer();
ASSERT_NO_THROW(getIOService()->poll());
ASSERT_TRUE(client2->connectToServer(socket_path_));
ASSERT_NO_THROW(getIOService()->poll());
ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }"));
ASSERT_NO_THROW(getIOService()->poll());
// The server should now respond ok.
// The server should respond ok.
ASSERT_TRUE(client2->getResponse(response));
EXPECT_TRUE(response.find("\"result\": 0") != std::string::npos);
// Disconnect the servers.
client1->disconnectFromServer();
client2->disconnectFromServer();
ASSERT_NO_THROW(getIOService()->poll());
}
......
......@@ -28,18 +28,32 @@ namespace {
class ConnectionPool;
/// @brief Represents a single connection over control socket.
///
/// An instance of this object is created when the @c CommandMgr acceptor
/// receives new connection from a controlling client.
class Connection : public boost::enable_shared_from_this<Connection> {
public:
/// @brief Constructor.
///
/// This constructor registers a socket of this connection in the Interface
/// Manager to cause the blocking call to @c select() to return as soon as
/// a transmission over the control socket is received.
Connection(const boost::shared_ptr<UnixDomainSocket>& socket,
ConnectionPool& connection_pool)
: socket_(socket), connection_pool_(connection_pool),
response_in_progress_(false) {
isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(),
0);
// Callback value of 0 is used to indicate that callback function is
// not installed.
isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
}
/// @brief Start asynchronous read over the unix domain socket.
///
/// This method doesn't block. Once the transmission is received over the
/// socket, the @c Connection::receiveHandler callback is invoked to
/// process received data.
void start() {
socket_->asyncReceive(&buf_[0], sizeof(buf_),
boost::bind(&Connection::receiveHandler,
......@@ -48,6 +62,12 @@ public:
}
/// @brief Close current connection.
///
/// Connection is not closed if the invocation of this method is a result of
/// server reconfiguration. The connection will be closed once a response is
/// sent to the client. Closing a socket during processing a request would
/// cause the server to not send a response to the client.
void stop() {
if (!response_in_progress_) {
LOG_INFO(command_logger, COMMAND_SOCKET_CONNECTION_CLOSED)
......@@ -58,37 +78,55 @@ public:
}
}
/// @brief Handler invoked when the data is received over the control
/// socket.
///
/// @param ec Error code.
/// @param bytes_transferred Number of bytes received.
void receiveHandler(const boost::system::error_code& ec,
size_t bytes_transferred);
private:
/// @brief Pointer to the socket used for transmission.
boost::shared_ptr<UnixDomainSocket> socket_;
/// @brief Buffer used for received data.
std::array<char, 65535> buf_;
/// @brief Reference to the pool of connections.
ConnectionPool& connection_pool_;
/// @brief Boolean flag indicating if the request to stop connection is a
/// result of server reconfiguration.
bool response_in_progress_;
};
/// @brief Pointer to the @c Connection.
typedef boost::shared_ptr<Connection> ConnectionPtr;
/// @brief Holds all open connections.
class ConnectionPool {
public:
/// @brief Starts new connection.
///
/// @param connection Pointer to the new connection object.
void start(const ConnectionPtr& connection) {
connection->start();
connections_.insert(connection);
}
/// @brief Stops running connection.
///
/// @param connection Pointer to the new connection object.
void stop(const ConnectionPtr& connection) {
connection->stop();
connections_.erase(connection);
}
/// @brief Stops all connections which are allowed to stop.
void stopAll() {
for (auto conn = connections_.begin(); conn != connections_.end();
++conn) {
......@@ -103,6 +141,7 @@ public:
private:
/// @brief Pool of connections.
std::set<ConnectionPtr> connections_;
};
......@@ -112,7 +151,6 @@ void
Connection::receiveHandler(const boost::system::error_code& ec,
size_t bytes_transferred) {
if (ec) {
if (ec.value() == boost::asio::error::eof) {
// Foreign host has closed the connection. We should remove it from the
// connection pool.
......@@ -130,71 +168,67 @@ Connection::receiveHandler(const boost::system::error_code& ec,
return;
} else if (bytes_transferred == 0) {
// Nothing received. Close the connection.
connection_pool_.stop(shared_from_this());
return;
}
ConstElementPtr cmd, rsp;
LOG_DEBUG(command_logger, DBG_COMMAND, COMMAND_SOCKET_READ)
.arg(bytes_transferred).arg(socket_->getNative());
ConstElementPtr cmd, rsp;
try {
response_in_progress_ = true;
// Try to interpret it as JSON.
std::string sbuf(&buf_[0], bytes_transferred);
cmd = Element::fromJSON(sbuf, true);
if (connection_pool_.getConnectionsNum() > 1) {
rsp = createAnswer(CONTROL_RESULT_ERROR, "exceeded maximum number of concurrent"
" connections");
} else {
// If successful, then process it as a command.
rsp = CommandMgr::instance().processCommand(cmd);
}
response_in_progress_ = true;
// If successful, then process it as a command.
rsp = CommandMgr::instance().processCommand(cmd);
response_in_progress_ = false;
} catch (const Exception& ex) {
LOG_WARN(command_logger, COMMAND_PROCESS_ERROR1).arg(ex.what());
rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
}
// No response generated. Connection will be closed.
if (!rsp) {
response_in_progress_ = false;
LOG_WARN(command_logger, COMMAND_RESPONSE_ERROR);
return;
}
// Let's convert JSON response to text. Note that at this stage
// the rsp pointer is always set.
std::string txt = rsp->str();
size_t len = txt.length();
if (len > 65535) {
// Hmm, our response is too large. Let's send the first
// 64KB and hope for the best.
LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len);
} else {
len = 65535;
}
// Let's convert JSON response to text. Note that at this stage
// the rsp pointer is always set.
std::string txt = rsp->str();
size_t len = txt.length();
if (len > 65535) {
// Hmm, our response is too large. Let's send the first
// 64KB and hope for the best.
LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len);
try {
// Send the data back over socket.
socket_->write(txt.c_str(), len);
len = 65535;
}
} catch (const std::exception& ex) {
// Response transmission failed. Since the response failed, it doesn't
// make sense to send any status codes. Let's log it and be done with
// it.
LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL)
.arg(len).arg(socket_->getNative()).arg(ex.what());
}
try {
// Send the data back over socket.
socket_->write(txt.c_str(), len);
response_in_progress_ = false;
} catch (const std::exception& ex) {
// Response transmission failed. Since the response failed, it doesn't
// make sense to send any status codes. Let's log it and be done with
// it.
LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL)
.arg(len).arg(socket_->getNative()).arg(ex.what());
}
}
connection_pool_.stop(shared_from_this());
}
}
......@@ -202,26 +236,42 @@ Connection::receiveHandler(const boost::system::error_code& ec,
namespace isc {
namespace config {
/// @brief Implementation of the @c CommandMgr.
class CommandMgrImpl {
public:
/// @brief Constructor.
CommandMgrImpl()
: io_service_(), acceptor_(), socket_(), socket_name_(),
connection_pool_() {
}
/// @brief Opens acceptor service allowing the control clients to connect.
///
/// @param socket_info Configuration information for the control socket.
/// @throw BadSocketInfo When socket configuration is invalid.
/// @throw SocketError When socket operation fails.
void openCommandSocket(const isc::data::ConstElementPtr& socket_info);
/// @brief Asynchronously accepts next connection.
void doAccept();
/// @brief Pointer to the IO service used by the server process for running
/// asynchronous tasks.
IOServicePtr io_service_;
/// @brief Pointer to the acceptor service.
boost::shared_ptr<UnixDomainSocketAcceptor> acceptor_;
/// @brief Pointer to the socket into which the new connection is accepted.
boost::shared_ptr<UnixDomainSocket> socket_;
/// @brief Path to the unix domain socket descriptor.
///
/// This is used to remove the socket file once the connection terminates.
std::string socket_name_;
/// @brief Pool of connections.
ConnectionPool connection_pool_;
};
......@@ -238,6 +288,7 @@ CommandMgrImpl::openCommandSocket(const isc::data::ConstElementPtr& socket_info)
isc_throw(BadSocketInfo, "Mandatory 'socket-type' parameter missing");
}
// Only supporting unix sockets right now.
if (type->stringValue() != "unix") {
isc_throw(BadSocketInfo, "Invalid 'socket-type' parameter value "
<< type->stringValue());
......
......@@ -49,6 +49,8 @@ public:
/// @brief Sets IO service to be used by the command manager.
///
/// The server should use this method to provide the Command Manager with the
/// common IO service used by the server.
/// @param io_service Pointer to the IO service.
void setIOService(const asiolink::IOServicePtr& io_service);
......@@ -57,13 +59,10 @@ public:
/// Currently supported types are:
/// - unix (required parameters: socket-type: unix, socket-name:/unix/path)
///
/// This method will close previously open command socket (if exists).
/// @throw BadSocketInfo When socket configuration is invalid.
/// @throw SocketError When socket operation fails.
///
/// @throw CommandSocketError if socket creation fails.
/// @throw SocketError if command socket is already open.
///
/// @param socket_info describes control socket parameters
/// @return object representing a socket
/// @param socket_info Configuration information for the control socket.
void
openCommandSocket(const isc::data::ConstElementPtr& socket_info);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment