Commit ab11e239 authored by Marcin Siodelski's avatar Marcin Siodelski
Browse files

[5318] PoC for extending maximum request/response over 64k in Ctrl channel.

parent f5add97e
......@@ -30,6 +30,7 @@
#include <fstream>
#include <iostream>
#include <sstream>
#include <thread>
#include <arpa/inet.h>
#include <unistd.h>
......@@ -85,6 +86,7 @@ public:
StatsMgr::instance().removeAll();
CommandMgr::instance().closeCommandSocket();
CommandMgr::instance().deregisterAll();
server_.reset();
};
......@@ -296,6 +298,18 @@ public:
ADD_FAILURE() << "Invalid expected status: " << exp_status;
}
}
/// @brief Command handler which generates long response
static ConstElementPtr longResponseHandler(const std::string&,
const ConstElementPtr&) {
ElementPtr arguments = Element::createList();
std::string arg = "responseresponseresponseresponseresponseresponse"
"response";
for (unsigned i = 0; i < 8000; ++i) {
arguments->add(Element::create(arg));
}
return (createAnswer(0, arguments));
}
};
TEST_F(CtrlChannelDhcpv4SrvTest, commands) {
......@@ -431,7 +445,8 @@ TEST_F(CtrlChannelDhcpv4SrvTest, controlChannelNegative) {
sendUnixCommand("utter nonsense", response);
EXPECT_EQ("{ \"result\": 1, "
"\"text\": \"error: unexpected character u in <string>:1:2\" }",
"\"text\": \"invalid first character u : "
"current state: [ 12 RECEIVE_START_ST ] next event: [ 1 START_EVT ]\" }",
response);
}
......@@ -710,7 +725,7 @@ TEST_F(CtrlChannelDhcpv4SrvTest, configSet) {
// Should fail with a syntax error
EXPECT_EQ("{ \"result\": 1, "
"\"text\": \"subnet configuration failed: mandatory 'subnet' "
"parameter is missing for a subnet being configured (<string>:20:17)\" }",
"parameter is missing for a subnet being configured (<wire>:19:17)\" }",
response);
// Check that the config was not lost
......@@ -909,7 +924,7 @@ TEST_F(CtrlChannelDhcpv4SrvTest, configTest) {
// Should fail with a syntax error
EXPECT_EQ("{ \"result\": 1, "
"\"text\": \"subnet configuration failed: mandatory 'subnet' "
"parameter is missing for a subnet being configured (<string>:20:17)\" }",
"parameter is missing for a subnet being configured (<wire>:19:17)\" }",
response);
// Check that the config was not lost
......@@ -950,7 +965,7 @@ TEST_F(CtrlChannelDhcpv4SrvTest, configTest) {
// Clean up after the test.
CfgMgr::instance().clear();
}
// Tests if config-write can be called without any parameters.
TEST_F(CtrlChannelDhcpv4SrvTest, writeConfigNoFilename) {
createUnixChannelServer();
......@@ -1108,4 +1123,93 @@ TEST_F(CtrlChannelDhcpv4SrvTest, concurrentConnections) {
ASSERT_NO_THROW(getIOService()->poll());
}
TEST_F(CtrlChannelDhcpv4SrvTest, longCommand) {
createUnixChannelServer();
boost::scoped_ptr<UnixControlClient> client(new UnixControlClient());
ASSERT_TRUE(client);
ASSERT_TRUE(client->connectToServer(socket_path_));
getIOService()->run_one();
getIOService()->poll();
size_t bytes_transferred = 0;
const size_t payload_size = 1024 * 1000;
bool first_payload = true;
while (bytes_transferred < payload_size) {
if (bytes_transferred == 0) {
std::string preamble = "{ \"command\": \"foo\", \"arguments\": [ ";
ASSERT_TRUE(client->sendCommand(preamble));
bytes_transferred += preamble.size();
} else {
std::ostringstream payload;
if (!first_payload) {
payload << ", ";
}
first_payload = false;
payload << "\"blablablablablablablablablablablablablablablabla\"";
if (bytes_transferred + payload.tellp() > payload_size) {
payload << "] }";
}
ASSERT_TRUE(client->sendCommand(payload.str()));
bytes_transferred += payload.tellp();
}
getIOService()->run_one();
getIOService()->poll();
}
std::string response;
ASSERT_TRUE(client->getResponse(response));
EXPECT_EQ("{ \"result\": 2, \"text\": \"'foo' command not supported.\" }",
response);
client->disconnectFromServer();
}
TEST_F(CtrlChannelDhcpv4SrvTest, longResponse) {
ASSERT_NO_THROW(
CommandMgr::instance().registerCommand("foo",
boost::bind(&CtrlChannelDhcpv4SrvTest::longResponseHandler, _1, _2));
);
createUnixChannelServer();
std::string reference_response = longResponseHandler("foo", ConstElementPtr())->str();
std::ostringstream response;
std::thread th([this, &response, reference_response]() {
size_t long_response_size = reference_response.size();
boost::scoped_ptr<UnixControlClient> client(new UnixControlClient());
ASSERT_TRUE(client);
ASSERT_TRUE(client->connectToServer(socket_path_));
std::string command = "{ \"command\": \"foo\", \"arguments\": { } }";
ASSERT_TRUE(client->sendCommand(command));
while (response.tellp() < long_response_size) {
std::string partial;
client->getResponse(partial);
response << partial;
}
client->disconnectFromServer();
getIOService()->stop();
});
getIOService()->run();
th.join();
EXPECT_EQ(reference_response, response.str());
}
} // End of anonymous namespace
......@@ -484,7 +484,7 @@ TEST_F(CtrlChannelDhcpv6SrvTest, configSet) {
// Should fail with a syntax error
EXPECT_EQ("{ \"result\": 1, "
"\"text\": \"subnet configuration failed: mandatory 'subnet' parameter is missing for a subnet being configured (<string>:21:17)\" }",
"\"text\": \"subnet configuration failed: mandatory 'subnet' parameter is missing for a subnet being configured (<wire>:20:17)\" }",
response);
// Check that the config was not lost
......@@ -630,7 +630,7 @@ TEST_F(CtrlChannelDhcpv6SrvTest, configTest) {
// Should fail with a syntax error
EXPECT_EQ("{ \"result\": 1, "
"\"text\": \"subnet configuration failed: mandatory 'subnet' parameter "
"is missing for a subnet being configured (<string>:21:17)\" }",
"is missing for a subnet being configured (<wire>:20:17)\" }",
response);
// Check that the config was not lost
......@@ -737,7 +737,8 @@ TEST_F(CtrlChannelDhcpv6SrvTest, controlChannelNegative) {
sendUnixCommand("utter nonsense", response);
EXPECT_EQ("{ \"result\": 1, "
"\"text\": \"error: unexpected character u in <string>:1:2\" }",
"\"text\": \"invalid first character u : "
"current state: [ 12 RECEIVE_START_ST ] next event: [ 1 START_EVT ]\" }",
response);
}
......
......@@ -12,6 +12,7 @@
#include <config/command_mgr.h>
#include <cc/data.h>
#include <cc/command_interpreter.h>
#include <cc/json_feed.h>
#include <dhcp/iface_mgr.h>
#include <config/config_log.h>
#include <boost/bind.hpp>
......@@ -42,24 +43,13 @@ public:
/// a transmission over the control socket is received.
Connection(const boost::shared_ptr<UnixDomainSocket>& socket,
ConnectionPool& connection_pool)
: socket_(socket), connection_pool_(connection_pool),
response_in_progress_(false) {
: socket_(socket), buf_(), response_(), connection_pool_(connection_pool),
feed_(), response_in_progress_(false) {
// Callback value of 0 is used to indicate that callback function is
// not installed.
isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
}
/// @brief Start asynchronous read over the unix domain socket.
///
/// This method doesn't block. Once the transmission is received over the
/// socket, the @c Connection::receiveHandler callback is invoked to
/// process received data.
void start() {
socket_->asyncReceive(&buf_[0], sizeof(buf_),
boost::bind(&Connection::receiveHandler,
shared_from_this(), _1, _2));
// Initialize state model for receiving and preparsing commands.
feed_.initModel();
}
/// @brief Close current connection.
......@@ -78,6 +68,29 @@ public:
}
}
/// @brief Start asynchronous read over the unix domain socket.
///
/// This method doesn't block. Once the transmission is received over the
/// socket, the @c Connection::receiveHandler callback is invoked to
/// process received data.
void doReceive() {
socket_->asyncReceive(&buf_[0], sizeof(buf_),
boost::bind(&Connection::receiveHandler,
shared_from_this(), _1, _2));
}
/// @brief Starts asynchronous send over the unix domain socket.
///
/// This method doesn't block. Once the send operation is completed, the
/// @c Connection::sendHandler cllback is invoked.
void doSend() {
size_t chunk_size = response_.size() < 8192 ? response_.size() : 8192;
socket_->asyncSend(&response_[0], chunk_size,
boost::bind(&Connection::sendHandler, shared_from_this(), _1, _2));
}
/// @brief Handler invoked when the data is received over the control
/// socket.
///
......@@ -86,6 +99,13 @@ public:
void receiveHandler(const boost::system::error_code& ec,
size_t bytes_transferred);
/// @brief Handler invoked when the data is sent over the control socket.
///
/// @param ec Error code.
/// @param bytes_transferred Number of bytes sent.
void sendHandler(const boost::system::error_code& ec,
size_t bytes_trasferred);
private:
/// @brief Pointer to the socket used for transmission.
......@@ -94,9 +114,16 @@ private:
/// @brief Buffer used for received data.
std::array<char, 65535> buf_;
/// @brief Response created by the server.
std::string response_;
/// @brief Reference to the pool of connections.
ConnectionPool& connection_pool_;
/// @brief State model used to receive data over the connection and detect
/// when the command ends.
JSONFeed feed_;
/// @brief Boolean flag indicating if the request to stop connection is a
/// result of server reconfiguration.
bool response_in_progress_;
......@@ -114,7 +141,7 @@ public:
///
/// @param connection Pointer to the new connection object.
void start(const ConnectionPtr& connection) {
connection->start();
connection->doReceive();
connections_.insert(connection);
}
......@@ -180,17 +207,25 @@ Connection::receiveHandler(const boost::system::error_code& ec,
try {
// Try to interpret it as JSON.
std::string sbuf(&buf_[0], bytes_transferred);
cmd = Element::fromJSON(sbuf, true);
feed_.postBuffer(&buf_[0], bytes_transferred);
feed_.poll();
if (feed_.needData()) {
doReceive();
return;
}
response_in_progress_ = true;
if (feed_.feedOk()) {
cmd = feed_.toElement();
response_in_progress_ = true;
// If successful, then process it as a command.
rsp = CommandMgr::instance().processCommand(cmd);
// If successful, then process it as a command.
rsp = CommandMgr::instance().processCommand(cmd);
response_in_progress_ = false;
response_in_progress_ = false;
} else {
isc_throw(BadValue, feed_.getErrorMessage());
}
} catch (const Exception& ex) {
LOG_WARN(command_logger, COMMAND_PROCESS_ERROR1).arg(ex.what());
......@@ -203,30 +238,52 @@ Connection::receiveHandler(const boost::system::error_code& ec,
rsp = createAnswer(CONTROL_RESULT_ERROR,
"internal server error: no response generated");
}
} else {
// Let's convert JSON response to text. Note that at this stage
// the rsp pointer is always set.
std::string txt = rsp->str();
size_t len = txt.length();
if (len > 65535) {
// Hmm, our response is too large. Let's send the first
// 64KB and hope for the best.
LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len);
// Let's convert JSON response to text. Note that at this stage
// the rsp pointer is always set.
response_ = rsp->str();
len = 65535;
doSend();
return;
/* size_t len = response_.length();
if (len > 65535) {
// Hmm, our response is too large. Let's send the first
// 64KB and hope for the best.
LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len);
len = 65535;
}
try {
// Send the data back over socket.
socket_->write(response_.c_str(), len);
} catch (const std::exception& ex) {
// Response transmission failed. Since the response failed, it doesn't
// make sense to send any status codes. Let's log it and be done with
// it.
LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL)
.arg(len).arg(socket_->getNative()).arg(ex.what());
} */
}
try {
// Send the data back over socket.
socket_->write(txt.c_str(), len);
connection_pool_.stop(shared_from_this());
}
} catch (const std::exception& ex) {
// Response transmission failed. Since the response failed, it doesn't
// make sense to send any status codes. Let's log it and be done with
// it.
void
Connection::sendHandler(const boost::system::error_code& ec,
size_t bytes_transferred) {
if (ec && ec.value() != boost::asio::error::operation_aborted) {
LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL)
.arg(len).arg(socket_->getNative()).arg(ex.what());
.arg(socket_->getNative()).arg(ec.message());
} else {
response_.erase(0, bytes_transferred);
if (!response_.empty()) {
doSend();
return;
}
connection_pool_.stop(shared_from_this());
......
......@@ -88,13 +88,6 @@ over command socket identified by specified file descriptor.
This error message indicates that an error was encountered while
reading from command socket.
% COMMAND_SOCKET_RESPONSE_TOOLARGE Server's response was larger (%1) than supported 64KB
This error message indicates that the server received a command and generated
an answer for it, but that response was larger than supported 64KB. Server
will attempt to send the first 64KB of the response. Depending on the nature
of this response, this may indicate a software or configuration error. Future
Kea versions are expected to have better support for large responses.
% COMMAND_SOCKET_UNIX_CLOSE Command socket closed: UNIX, fd=%1, path=%2
This informational message indicates that the daemon closed a command
processing socket. This was a UNIX socket. It was opened with the file
......@@ -109,6 +102,6 @@ descriptor and path specified.
This debug message indicates that the specified number of bytes was sent
over command socket identifier by the specified file descriptor.
% COMMAND_SOCKET_WRITE_FAIL Error while writing %1 bytes to command socket %2 : %3
% COMMAND_SOCKET_WRITE_FAIL Error while writing to command socket %1 : %2
This error message indicates that an error was encountered while
attempting to send a response to the command socket.
......@@ -95,7 +95,6 @@ bool UnixControlClient::getResponse(std::string& response) {
return (false);
}
case 0:
ADD_FAILURE() << "No response data sent";
return (false);
default:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment