Commit 6f075ba7 authored by Marcin Siodelski's avatar Marcin Siodelski

[5318] Addressed review comments.

Also, when timeout occurs in control channel connection, all async tasks
are cancelled.
parent 378a7e74
......@@ -28,7 +28,9 @@
#include <boost/scoped_ptr.hpp>
#include <gtest/gtest.h>
#include <cstdlib>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <thread>
......@@ -330,14 +332,50 @@ public:
}
}
/// @brief Handler for long command.
///
/// It checks whether the received command is equal to the one specified
/// as an argument.
///
/// @param expected_command String representing an expected command.
/// @param command_name Command name received by the handler.
/// @param arguments Command arguments received by the handler.
///
/// @returns Success answer.
static ConstElementPtr
longCommandHandler(const std::string& expected_command,
const std::string& command_name,
const ConstElementPtr& arguments) {
// The handler is called with a command name and the structure holding
// command arguments. We have to rebuild the command from those
// two arguments so as it can be compared against expected_command.
ElementPtr entire_command = Element::createMap();
entire_command->set("command", Element::create(command_name));
entire_command->set("arguments", (arguments));
// The rebuilt command will have a different order of parameters so
// let's parse expected_command back to JSON to guarantee that
// both structures are built using the same order.
EXPECT_EQ(Element::fromJSON(expected_command)->str(),
entire_command->str());
return (createAnswer(0, "long command received ok"));
}
/// @brief Command handler which generates long response
///
/// This handler generates a large response (over 400kB). It includes
/// a list of randomly generated strings to make sure that the test
/// can catch out of order delivery.
static ConstElementPtr longResponseHandler(const std::string&,
const ConstElementPtr&) {
// By seeding the generator with the constant value we will always
// get the same sequence of generated strings.
std::srand(1);
ElementPtr arguments = Element::createList();
std::string arg = "responseresponseresponseresponseresponseresponse"
"response";
for (unsigned i = 0; i < 8000; ++i) {
arguments->add(Element::create(arg));
for (unsigned i = 0; i < 40000; ++i) {
std::ostringstream s;
s << std::setw(10) << std::rand();
arguments->add(Element::create(s.str()));
}
return (createAnswer(0, arguments));
}
......@@ -476,8 +514,7 @@ TEST_F(CtrlChannelDhcpv4SrvTest, controlChannelNegative) {
sendUnixCommand("utter nonsense", response);
EXPECT_EQ("{ \"result\": 1, "
"\"text\": \"invalid first character u : "
"current state: [ 12 RECEIVE_START_ST ] next event: [ 1 START_EVT ]\" }",
"\"text\": \"invalid first character u\" }",
response);
}
......@@ -1156,10 +1193,48 @@ TEST_F(CtrlChannelDhcpv4SrvTest, concurrentConnections) {
// This test verifies that the server can receive and process a large command.
TEST_F(CtrlChannelDhcpv4SrvTest, longCommand) {
std::ostringstream command;
// This is the desired size of the command sent to the server (1MB). The
// actual size sent will be slightly greater than that.
const size_t command_size = 1024 * 1000;
while (command.tellp() < command_size) {
// We're sending command 'foo' with arguments being a list of
// strings. If this is the first transmission, send command name
// and open the arguments list. Also insert the first argument
// so as all subsequent arguments can be prefixed with a comma.
if (command.tellp() == 0) {
command << "{ \"command\": \"foo\", \"arguments\": [ \"begin\"";
} else {
// Generate a random number and insert it into the stream as
// 10 digits long string.
std::ostringstream arg;
arg << setw(10) << std::rand();
// Append the argument in the command.
command << ", \"" << arg.str() << "\"\n";
// If we have hit the limit of the command size, close braces to
// get appropriate JSON.
if (command.tellp() > command_size) {
command << "] }";
}
}
}
ASSERT_NO_THROW(
CommandMgr::instance().registerCommand("foo",
boost::bind(&CtrlChannelDhcpv4SrvTest::longCommandHandler,
command.str(), _1, _2));
);
createUnixChannelServer();
std::string response;
std::thread th([this, &response]() {
std::thread th([this, &response, &command]() {
// IO service will be stopped automatically when this object goes
// out of scope and is destroyed. This is useful because we use
......@@ -1174,50 +1249,14 @@ TEST_F(CtrlChannelDhcpv4SrvTest, longCommand) {
// server side and create a new connection.
ASSERT_TRUE(client->connectToServer(socket_path_));
// This counter will hold the number of bytes transferred to the server
// so far.
size_t bytes_transferred = 0;
// This is the desired size of the command sent to the server (1MB). The
// actual size sent will be slightly greater than that.
const size_t command_size = 1024 * 1000;
bool first_payload = true;
// If we still haven't sent the entire command, continue sending.
while (bytes_transferred < command_size) {
// We're sending command 'foo' with arguments being a list of
// strings. If this is the first transmission, send command name
// and open the arguments list.
if (bytes_transferred == 0) {
std::string preamble = "{ \"command\": \"foo\", \"arguments\": [ ";
ASSERT_TRUE(client->sendCommand(preamble));
// Store the number of bytes sent.
bytes_transferred += preamble.size();
} else {
// We have already transmitted command name and arguments. Now
// we send the list of 'blabla' strings.
std::ostringstream payload;
// If this is not the first parameter in on the list it must be
// prefixed with a comma.
if (!first_payload) {
payload << ", ";
}
first_payload = false;
payload << "\"blablablablablablablablablablablablablablablabla\"";
// If we have hit the limit of the command size, close braces to
// get appropriate JSON.
if (bytes_transferred + payload.tellp() > command_size) {
payload << "] }";
}
// Send the payload.
ASSERT_TRUE(client->sendCommand(payload.str()));
// Update the number of bytes sent.
bytes_transferred += payload.tellp();
}
// Initially the remaining_string holds the entire command and we
// will be erasing the portions that we have sent.
std::string remaining_data = command.str();
while (!remaining_data.empty()) {
// Send the command in chunks of 1024 bytes.
const size_t l = remaining_data.size() < 1024 ? remaining_data.size() : 1024;
ASSERT_TRUE(client->sendCommand(remaining_data.substr(0, l)));
remaining_data.erase(0, l);
}
// Set timeout to 5 seconds to allow the time for the server to send
......@@ -1236,7 +1275,7 @@ TEST_F(CtrlChannelDhcpv4SrvTest, longCommand) {
// Wait for the thread to complete.
th.join();
EXPECT_EQ("{ \"result\": 2, \"text\": \"'foo' command not supported.\" }",
EXPECT_EQ("{ \"result\": 0, \"text\": \"long command received ok\" }",
response);
}
......
......@@ -26,6 +26,9 @@
#include <boost/scoped_ptr.hpp>
#include <gtest/gtest.h>
#include <iomanip>
#include <sstream>
#include <sys/select.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
......@@ -94,6 +97,7 @@ public:
virtual ~CtrlDhcpv6SrvTest() {
LeaseMgrFactory::destroy();
StatsMgr::instance().removeAll();
CommandMgr::instance().deregisterAll();
CommandMgr::instance().setConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT);
reset();
......@@ -338,14 +342,50 @@ public:
}
}
/// @brief Handler for long command.
///
/// It checks whether the received command is equal to the one specified
/// as an argument.
///
/// @param expected_command String representing an expected command.
/// @param command_name Command name received by the handler.
/// @param arguments Command arguments received by the handler.
///
/// @returns Success answer.
static ConstElementPtr
longCommandHandler(const std::string& expected_command,
const std::string& command_name,
const ConstElementPtr& arguments) {
// The handler is called with a command name and the structure holding
// command arguments. We have to rebuild the command from those
// two arguments so as it can be compared against expected_command.
ElementPtr entire_command = Element::createMap();
entire_command->set("command", Element::create(command_name));
entire_command->set("arguments", (arguments));
// The rebuilt command will have a different order of parameters so
// let's parse expected_command back to JSON to guarantee that
// both structures are built using the same order.
EXPECT_EQ(Element::fromJSON(expected_command)->str(),
entire_command->str());
return (createAnswer(0, "long command received ok"));
}
/// @brief Command handler which generates long response
///
/// This handler generates a large response (over 400kB). It includes
/// a list of randomly generated strings to make sure that the test
/// can catch out of order delivery.
static ConstElementPtr longResponseHandler(const std::string&,
const ConstElementPtr&) {
// By seeding the generator with the constant value we will always
// get the same sequence of generated strings.
std::srand(1);
ElementPtr arguments = Element::createList();
std::string arg = "responseresponseresponseresponseresponseresponse"
"response";
for (unsigned i = 0; i < 8000; ++i) {
arguments->add(Element::create(arg));
for (unsigned i = 0; i < 40000; ++i) {
std::ostringstream s;
s << std::setw(10) << std::rand();
arguments->add(Element::create(s.str()));
}
return (createAnswer(0, arguments));
}
......@@ -780,8 +820,7 @@ TEST_F(CtrlChannelDhcpv6SrvTest, controlChannelNegative) {
sendUnixCommand("utter nonsense", response);
EXPECT_EQ("{ \"result\": 1, "
"\"text\": \"invalid first character u : "
"current state: [ 12 RECEIVE_START_ST ] next event: [ 1 START_EVT ]\" }",
"\"text\": \"invalid first character u\" }",
response);
}
......@@ -1176,10 +1215,48 @@ TEST_F(CtrlChannelDhcpv6SrvTest, concurrentConnections) {
// This test verifies that the server can receive and process a large command.
TEST_F(CtrlChannelDhcpv6SrvTest, longCommand) {
std::ostringstream command;
// This is the desired size of the command sent to the server (1MB). The
// actual size sent will be slightly greater than that.
const size_t command_size = 1024 * 1000;
while (command.tellp() < command_size) {
// We're sending command 'foo' with arguments being a list of
// strings. If this is the first transmission, send command name
// and open the arguments list. Also insert the first argument
// so as all subsequent arguments can be prefixed with a comma.
if (command.tellp() == 0) {
command << "{ \"command\": \"foo\", \"arguments\": [ \"begin\"";
} else {
// Generate a random number and insert it into the stream as
// 10 digits long string.
std::ostringstream arg;
arg << setw(10) << std::rand();
// Append the argument in the command.
command << ", \"" << arg.str() << "\"\n";
// If we have hit the limit of the command size, close braces to
// get appropriate JSON.
if (command.tellp() > command_size) {
command << "] }";
}
}
}
ASSERT_NO_THROW(
CommandMgr::instance().registerCommand("foo",
boost::bind(&CtrlChannelDhcpv6SrvTest::longCommandHandler,
command.str(), _1, _2));
);
createUnixChannelServer();
std::string response;
std::thread th([this, &response]() {
std::thread th([this, &response, &command]() {
// IO service will be stopped automatically when this object goes
// out of scope and is destroyed. This is useful because we use
......@@ -1194,50 +1271,14 @@ TEST_F(CtrlChannelDhcpv6SrvTest, longCommand) {
// server side and create a new connection.
ASSERT_TRUE(client->connectToServer(socket_path_));
// This counter will hold the number of bytes transferred to the server
// so far.
size_t bytes_transferred = 0;
// This is the desired size of the command sent to the server (1MB). The
// actual size sent will be slightly greater than that.
const size_t command_size = 1024 * 1000;
bool first_payload = true;
// If we still haven't sent the entire command, continue sending.
while (bytes_transferred < command_size) {
// We're sending command 'foo' with arguments being a list of
// strings. If this is the first transmission, send command name
// and open the arguments list.
if (bytes_transferred == 0) {
std::string preamble = "{ \"command\": \"foo\", \"arguments\": [ ";
ASSERT_TRUE(client->sendCommand(preamble));
// Store the number of bytes sent.
bytes_transferred += preamble.size();
} else {
// We have already transmitted command name and arguments. Now
// we send the list of 'blabla' strings.
std::ostringstream payload;
// If this is not the first parameter in on the list it must be
// prefixed with a comma.
if (!first_payload) {
payload << ", ";
}
first_payload = false;
payload << "\"blablablablablablablablablablablablablablablabla\"";
// If we have hit the limit of the command size, close braces to
// get appropriate JSON.
if (bytes_transferred + payload.tellp() > command_size) {
payload << "] }";
}
// Send the payload.
ASSERT_TRUE(client->sendCommand(payload.str()));
// Update the number of bytes sent.
bytes_transferred += payload.tellp();
}
// Initially the remaining_string holds the entire command and we
// will be erasing the portions that we have sent.
std::string remaining_data = command.str();
while (!remaining_data.empty()) {
// Send the command in chunks of 1024 bytes.
const size_t l = remaining_data.size() < 1024 ? remaining_data.size() : 1024;
ASSERT_TRUE(client->sendCommand(remaining_data.substr(0, l)));
remaining_data.erase(0, l);
}
// Set timeout to 5 seconds to allow the time for the server to send
......@@ -1256,7 +1297,7 @@ TEST_F(CtrlChannelDhcpv6SrvTest, longCommand) {
// Wait for the thread to complete.
th.join();
EXPECT_EQ("{ \"result\": 2, \"text\": \"'foo' command not supported.\" }",
EXPECT_EQ("{ \"result\": 0, \"text\": \"long command received ok\" }",
response);
}
......
......@@ -148,6 +148,9 @@ public:
/// @brief Disables read and write operations on the socket.
void shutdown();
/// @brief Cancels asynchronous operations on the socket.
void cancel();
/// @brief Closes the socket.
void close();
......@@ -256,6 +259,15 @@ UnixDomainSocketImpl::shutdown() {
}
}
void
UnixDomainSocketImpl::cancel() {
boost::system::error_code ec;
static_cast<void>(socket_.cancel(ec));
if (ec) {
isc_throw(UnixDomainSocketError, ec.message());
}
}
void
UnixDomainSocketImpl::close() {
boost::system::error_code ec;
......@@ -333,6 +345,11 @@ UnixDomainSocket::shutdown() {
impl_->shutdown();
}
void
UnixDomainSocket::cancel() {
impl_->cancel();
}
void
UnixDomainSocket::close() {
impl_->close();
......
......@@ -109,6 +109,11 @@ public:
/// @throw UnixDomainSocketError if an error occurs during shutdown.
void shutdown();
/// @brief Cancels scheduled asynchronous operations on the socket.
///
/// @throw UnixDomainSocketError if an error occurs during cancel operation.
void cancel();
/// @brief Closes the socket.
///
/// @throw UnixDomainSocketError if an error occurs during closure.
......
......@@ -141,7 +141,7 @@ JSONFeed::defineStates() {
void
JSONFeed::feedFailure(const std::string& error_msg) {
error_message_ = error_msg + " : " + getContextStr();
error_message_ = error_msg;
transition(FEED_FAILED_ST, FEED_FAILED_EVT);
}
......
......@@ -136,6 +136,13 @@ public:
/// @brief Handler invoked when the data is received over the control
/// socket.
///
/// It collects received data into the @c isc::config::JSONFeed object and
/// schedules additional asynchronous read of data if this object signals
/// that command is incomplete. When the entire command is received, the
/// handler processes this command and asynchronously responds to the
/// controlling client.
//
///
/// @param ec Error code.
/// @param bytes_transferred Number of bytes received.
void receiveHandler(const boost::system::error_code& ec,
......@@ -144,12 +151,19 @@ public:
/// @brief Handler invoked when the data is sent over the control socket.
///
/// If there are still data to be sent another asynchronous send is
/// scheduled. When the entire command is sent, the connection is shutdown
/// and closed.
///
/// @param ec Error code.
/// @param bytes_transferred Number of bytes sent.
void sendHandler(const boost::system::error_code& ec,
size_t bytes_trasferred);
/// @brief Handler invoked when timeout has occurred.
///
/// Asynchrnously Sends a response to the client indicating that the
/// timeout has occurred.
void timeoutHandler();
private:
......@@ -359,6 +373,15 @@ Connection::timeoutHandler() {
LOG_INFO(command_logger, COMMAND_SOCKET_CONNECTION_TIMEOUT)
.arg(socket_->getNative());
try {
socket_->cancel();
} catch (const std::exception& ex) {
LOG_ERROR(command_logger, COMMAND_SOCKET_CONNECTION_CANCEL_FAIL)
.arg(socket_->getNative())
.arg(ex.what());
}
ConstElementPtr rsp = createAnswer(CONTROL_RESULT_ERROR, "Connection over"
" control channel timed out");
response_ = rsp->str();
......
......@@ -59,6 +59,12 @@ information may be provided by the system as second parameter.
This is an information message indicating that the command connection has been
closed by a command control client.
% COMMAND_SOCKET_CONNECTION_CANCEL_FAIL Failed to cancel read operation on socket %1: %2
This error message is issued to indicate an error to cancel asynchronous read
of the control command over the control socket. The cancel operation is performed
when the timeout occurs during communication with a client. The error message
includes details about the reason for failure.
% COMMAND_SOCKET_CONNECTION_CLOSED Closed socket %1 for existing command connection
This is an informational message that the socket created for handling
client's connection is closed. This usually means that the client disconnected,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment