Commit 0d7644a5 authored by Marcin Siodelski's avatar Marcin Siodelski
Browse files

[5200] More proper fix for hangs in unix domain sockets tests.

parent 72827eb1
......@@ -171,15 +171,12 @@ public:
///
/// @param response Stub response to be sent from the server socket to the
/// client.
/// @param stop_after_count Number of received messages received over the
/// server socket after which the IO service should be stopped.
void bindServerSocket(const std::string& response,
const unsigned int stop_after_count = 1) {
void bindServerSocket(const std::string& response) {
server_socket_.reset(new test::TestServerUnixSocket(*getIOService(),
unixSocketFilePath(),
TEST_TIMEOUT,
response));
server_socket_->bindServerSocket(stop_after_count);
server_socket_->bindServerSocket();
}
/// @brief Creates command with no arguments.
......@@ -214,27 +211,28 @@ public:
/// @param expected_result0 Expected first result in response from the server.
/// @param expected_result1 Expected second result in response from the server.
/// @param expected_result2 Expected third result in response from the server.
/// @param stop_after_count Number of received messages received over the
/// server socket after which the IO service should be stopped.
/// @param expected_responses Number of responses after which the test finishes.
/// @param server_response Stub response to be sent by the server.
void testForward(const CtrlAgentCfgContext::ServerType& server_type,
const std::string& service,
const int expected_result0,
const int expected_result1 = -1,
const int expected_result2 = -1,
const unsigned stop_after_count = 1,
const size_t expected_responses = 1,
const std::string& server_response = "{ \"result\": 0 }") {
// Configure client side socket.
configureControlSocket(server_type);
// Create server side socket.
bindServerSocket(server_response, stop_after_count);
bindServerSocket(server_response);
// The client side communication is synchronous. To be able to respond
// to this we need to run the server side socket at the same time.
// Running IO service in a thread guarantees that the server responds
// as soon as it receives the control command.
isc::util::thread::Thread(boost::bind(&IOService::run,
getIOService().get()));
isc::util::thread::Thread(boost::bind(&CtrlAgentCommandMgrTest::runIO,
getIOService(), server_socket_,
expected_responses));
ConstElementPtr command = createCommand("foo", service);
ConstElementPtr answer = mgr_.handleCommand("foo", ConstElementPtr(),
......@@ -243,6 +241,24 @@ public:
checkAnswer(answer, expected_result0, expected_result1, expected_result2);
}
/// @brief Runs IO service until number of sent responses is lower than
/// expected.
///
/// @param server_socket Pointer to the server socket.
/// @param expected_responses Number of expected responses.
static void runIO(IOServicePtr& io_service,
const test::TestServerUnixSocketPtr& server_socket,
const size_t expected_responses) {
while (server_socket->getResponseNum() < expected_responses) {
io_service->run_one();
}
}
CtrlAgentCommandMgrTest* getTestSelf() {
return (this);
}
/// @brief a convenience reference to control agent command manager
CtrlAgentCommandMgr& mgr_;
......@@ -347,8 +363,8 @@ TEST_F(CtrlAgentCommandMgrTest, forwardListCommands) {
// to this we need to run the server side socket at the same time.
// Running IO service in a thread guarantees that the server responds
// as soon as it receives the control command.
isc::util::thread::Thread(boost::bind(&IOService::run,
getIOService().get()));
isc::util::thread::Thread(boost::bind(&CtrlAgentCommandMgrTest::runIO,
getIOService(), server_socket_, 1));
ConstElementPtr command = createCommand("list-commands", "dhcp4");
ConstElementPtr answer = mgr_.handleCommand("list-commands", ConstElementPtr(),
......
......@@ -95,7 +95,9 @@ TEST_F(UnixDomainSocketTest, sendReceive) {
ASSERT_EQ(outbound_data.size(), sent_size);
// Run IO service to generate server's response.
io_service_.run();
while (test_socket_.getResponseNum() < 1) {
io_service_.run_one();
}
// Receive response from the socket.
std::array<char, 1024> read_buf;
......
......@@ -7,11 +7,121 @@
#include <asiolink/asio_wrapper.h>
#include <asiolink/testutils/test_server_unix_socket.h>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <functional>
#include <set>
using namespace boost::asio::local;
namespace isc {
namespace asiolink {
namespace test {
typedef stream_protocol::socket UnixSocket;
typedef boost::shared_ptr<UnixSocket> UnixSocketPtr;
typedef std::function<void()> SentResponseCallback;
class Connection {
public:
Connection(const UnixSocketPtr& unix_socket,
const std::string custom_response,
const SentResponseCallback& sent_response_callback)
: socket_(unix_socket), custom_response_(custom_response),
sent_response_callback_(sent_response_callback) {
socket_->async_read_some(boost::asio::buffer(&raw_buf_[0], raw_buf_.size()),
boost::bind(&Connection::readHandler, this, _1, _2));
}
void
readHandler(const boost::system::error_code&, size_t bytes_transferred) {
if (!custom_response_.empty()) {
boost::asio::write(*socket_,
boost::asio::buffer(custom_response_.c_str(), custom_response_.size()));
} else {
std::string received(&raw_buf_[0], bytes_transferred);
std::string response("received " + received);
boost::asio::write(*socket_,
boost::asio::buffer(response.c_str(), response.size()));
}
sent_response_callback_();
}
void stop() {
socket_->close();
}
private:
UnixSocketPtr socket_;
std::string custom_response_;
/// @brief Receive buffer.
std::array<char, 1024> raw_buf_;
SentResponseCallback sent_response_callback_;
};
typedef boost::shared_ptr<Connection> ConnectionPtr;
class ConnectionPool {
public:
ConnectionPool(IOService& io_service)
: io_service_(io_service), connections_(), next_socket_(),
response_num_(0) {
}
UnixSocketPtr getSocket() {
if (!next_socket_) {
next_socket_.reset(new UnixSocket(io_service_.get_io_service()));
}
return (next_socket_);
}
void start(const std::string& custom_response) {
ConnectionPtr conn(new Connection(next_socket_, custom_response, [this] {
++response_num_;
}));
connections_.insert(conn);
next_socket_.reset();
}
void stop(const ConnectionPtr& conn) {
conn->stop();
connections_.erase(conn);
}
void stopAll() {
for (auto conn = connections_.begin(); conn != connections_.end();
++conn) {
(*conn)->stop();
}
connections_.clear();
}
size_t getResponseNum() const {
return (response_num_);
}
private:
IOService& io_service_;
std::set<ConnectionPtr> connections_;
UnixSocketPtr next_socket_;
size_t response_num_;
};
TestServerUnixSocket::TestServerUnixSocket(IOService& io_service,
const std::string& socket_file_path,
const long test_timeout,
......@@ -19,71 +129,35 @@ TestServerUnixSocket::TestServerUnixSocket(IOService& io_service,
: io_service_(io_service),
server_endpoint_(socket_file_path),
server_acceptor_(io_service_.get_io_service()),
server_sockets_(),
test_timer_(io_service_),
custom_response_(custom_response),
stop_after_count_(1),
read_count_(0) {
connection_pool_(new ConnectionPool(io_service)) {
test_timer_.setup(boost::bind(&TestServerUnixSocket::timeoutHandler, this),
test_timeout, IntervalTimer::ONE_SHOT);
}
TestServerUnixSocket::~TestServerUnixSocket() {
for (auto sock = server_sockets_.begin(); sock != server_sockets_.end(); ++sock) {
sock->close();
}
connection_pool_->stopAll();
}
void
TestServerUnixSocket::bindServerSocket(const unsigned int stop_after_count) {
TestServerUnixSocket::bindServerSocket() {
server_acceptor_.open();
server_acceptor_.bind(server_endpoint_);
server_acceptor_.listen();
accept();
stop_after_count_ = stop_after_count;
}
void
TestServerUnixSocket::acceptHandler(const boost::system::error_code&) {
server_sockets_.back().async_read_some(boost::asio::buffer(&raw_buf_[0],
raw_buf_.size()),
boost::bind(&TestServerUnixSocket::readHandler, this, _1, _2));
connection_pool_->start(custom_response_);
accept();
}
void
TestServerUnixSocket::accept() {
server_sockets_.push_back(boost::asio::local::stream_protocol::
socket(io_service_.get_io_service()));
server_acceptor_.async_accept(server_sockets_.back(),
boost::bind(&TestServerUnixSocket::
acceptHandler, this, _1));
}
void
TestServerUnixSocket::readHandler(const boost::system::error_code&,
size_t bytes_transferred) {
if (!custom_response_.empty()) {
boost::asio::write(server_sockets_.back(),
boost::asio::buffer(custom_response_.c_str(),
custom_response_.size()));
} else {
std::string received(&raw_buf_[0], bytes_transferred);
std::string response("received " + received);
boost::asio::write(server_sockets_.back(),
boost::asio::buffer(response.c_str(), response.size()));
}
// Stop IO service if we have reached the maximum number of read messages.
if (++read_count_ >= stop_after_count_) {
io_service_.stop();
} else {
// Previous connection is done, so let's accept another connection.
accept();
}
server_acceptor_.async_accept(*(connection_pool_->getSocket()),
boost::bind(&TestServerUnixSocket::acceptHandler, this, _1));
}
void
......@@ -92,6 +166,12 @@ TestServerUnixSocket::timeoutHandler() {
io_service_.stop();
}
size_t
TestServerUnixSocket::getResponseNum() const {
return (connection_pool_->getResponseNum());
}
} // end of namespace isc::asiolink::test
} // end of namespace isc::asiolink
} // end of namespace isc
......@@ -12,7 +12,6 @@
#include <asiolink/io_service.h>
#include <boost/shared_ptr.hpp>
#include <gtest/gtest.h>
#include <array>
#include <list>
#include <string>
......@@ -20,6 +19,8 @@ namespace isc {
namespace asiolink {
namespace test {
class ConnectionPool;
/// @brief Provides unix domain socket functionality for unit tests.
class TestServerUnixSocket {
public:
......@@ -41,28 +42,21 @@ public:
~TestServerUnixSocket();
/// @brief Creates and binds server socket.
///
/// @param stop_after_count Number of received messages after which the
/// IO service should be stopped.
void bindServerSocket(const unsigned int stop_after_count = 1);
void bindServerSocket();
/// @brief Server acceptor handler.
///
/// @param ec Error code.
void acceptHandler(const boost::system::error_code& ec);
/// @brief Server read handler.
///
/// @param ec Error code.
/// @param bytes_transferred Number of bytes read.
void readHandler(const boost::system::error_code& ec,
size_t bytes_transferred);
/// @brief Callback function invoke upon test timeout.
///
/// It stops the IO service and reports test timeout.
void timeoutHandler();
/// @brief Return number of responses sent so far to the clients.
size_t getResponseNum() const;
private:
/// @brief Asynchronously accept new connections.
......@@ -76,23 +70,14 @@ private:
/// @brief Server acceptor.
boost::asio::local::stream_protocol::acceptor server_acceptor_;
/// @brief Server side unix domain sockets.
std::list<boost::asio::local::stream_protocol::socket> server_sockets_;
/// @brief Receive buffer.
std::array<char, 1024> raw_buf_;
/// @brief Asynchronous timer service to detect timeouts.
IntervalTimer test_timer_;
/// @brief Holds custom response to be sent to the client.
std::string custom_response_;
/// @brief Number of messages received after which IO service gets stopped.
unsigned int stop_after_count_;
/// @brief Number of messages received so far.
unsigned int read_count_;
/// @brief Pool of connections.
boost::shared_ptr<ConnectionPool> connection_pool_;
};
/// @brief Pointer to the @ref TestServerUnixSocket.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment