Commit 76881598 authored by Marcin Siodelski's avatar Marcin Siodelski

[5189] Implemented ClientConnection to communicate over unix socket.

parent a61bff09
......@@ -95,7 +95,8 @@ TEST_F(UnixDomainSocketTest, sendReceive) {
ASSERT_EQ(outbound_data.size(), sent_size);
// Run IO service to generate server's response.
while (test_socket_.getResponseNum() < 1) {
while ((test_socket_->getResponseNum() < 1) &&
(!test_socket_->isStopped())) {
io_service_.run_one();
}
......@@ -136,7 +137,7 @@ TEST_F(UnixDomainSocketTest, asyncSendReceive) {
}
));
// Run IO service until connect handler is invoked.
while (!connect_handler_invoked) {
while (!connect_handler_invoked && (!test_socket_->isStopped())) {
io_service_.run_one();
}
......@@ -158,7 +159,8 @@ TEST_F(UnixDomainSocketTest, asyncSendReceive) {
));
// Run IO service to generate server's response.
while (test_socket_->getResponseNum() < 1) {
while ((test_socket_->getResponseNum() < 1) &&
(!test_socket_->isStopped())) {
io_service_.run_one();
}
......@@ -189,7 +191,7 @@ TEST_F(UnixDomainSocketTest, asyncSendReceive) {
}));
// Run IO service until we get some response from the server.
while (!receive_handler_invoked) {
while (!receive_handler_invoked && !test_socket_->isStopped()) {
io_service_.run_one();
}
......@@ -230,7 +232,7 @@ TEST_F(UnixDomainSocketTest, asyncClientErrors) {
connect_handler_invoked = true;
EXPECT_TRUE(ec);
});
while (!connect_handler_invoked) {
while (!connect_handler_invoked && !test_socket_->isStopped()) {
io_service_.run_one();
}
......@@ -243,7 +245,7 @@ TEST_F(UnixDomainSocketTest, asyncClientErrors) {
send_handler_invoked = true;
EXPECT_TRUE(ec);
});
while (!send_handler_invoked) {
while (!send_handler_invoked && !test_socket_->isStopped()) {
io_service_.run_one();
}
......@@ -256,7 +258,7 @@ TEST_F(UnixDomainSocketTest, asyncClientErrors) {
receive_handler_invoked = true;
EXPECT_TRUE(ec);
});
while (!receive_handler_invoked) {
while (!receive_handler_invoked && !test_socket_->isStopped()) {
io_service_.run_one();
}
}
......
......@@ -10,6 +10,7 @@
#include <boost/shared_ptr.hpp>
#include <functional>
#include <set>
#include <sstream>
using namespace boost::asio::local;
......@@ -200,22 +201,38 @@ private:
TestServerUnixSocket::TestServerUnixSocket(IOService& io_service,
const std::string& socket_file_path,
const long test_timeout,
const std::string& custom_response)
: io_service_(io_service),
server_endpoint_(socket_file_path),
server_acceptor_(io_service_.get_io_service()),
test_timer_(io_service_),
custom_response_(custom_response),
connection_pool_(new ConnectionPool(io_service)) {
test_timer_.setup(boost::bind(&TestServerUnixSocket::timeoutHandler, this),
test_timeout, IntervalTimer::ONE_SHOT);
connection_pool_(new ConnectionPool(io_service)),
stopped_(false) {
}
TestServerUnixSocket::~TestServerUnixSocket() {
connection_pool_->stopAll();
}
void
TestServerUnixSocket::generateCustomResponse(const uint64_t response_size) {
std::ostringstream s;
s << "{";
while (s.tellp() < response_size) {
s << "\"param\": \"value\",";
}
s << "}";
custom_response_ = s.str();
}
void
TestServerUnixSocket::startTimer(const long test_timeout) {
test_timer_.setup(boost::bind(&TestServerUnixSocket::timeoutHandler,
shared_from_this()),
test_timeout, IntervalTimer::ONE_SHOT);
}
void
TestServerUnixSocket::bindServerSocket() {
server_acceptor_.open();
......@@ -233,13 +250,14 @@ TestServerUnixSocket::acceptHandler(const boost::system::error_code&) {
void
TestServerUnixSocket::accept() {
server_acceptor_.async_accept(*(connection_pool_->getSocket()),
boost::bind(&TestServerUnixSocket::acceptHandler, this, _1));
boost::bind(&TestServerUnixSocket::acceptHandler, shared_from_this(), _1));
}
void
TestServerUnixSocket::timeoutHandler() {
ADD_FAILURE() << "Timeout occurred while running the test!";
io_service_.stop();
stopped_ = true;
}
size_t
......
......@@ -13,6 +13,7 @@
#include <boost/shared_ptr.hpp>
#include <gtest/gtest.h>
#include <list>
#include <stdint.h>
#include <string>
namespace isc {
......@@ -45,11 +46,9 @@ public:
///
/// @param io_service IO service.
/// @param socket_file_path Socket file path.
/// @param test_timeout Test timeout in milliseconds.
/// @param custom_response Custom response to be sent to the client.
TestServerUnixSocket(IOService& io_service,
const std::string& socket_file_path,
const long test_timeout,
const std::string& custom_response = "");
/// @brief Destructor.
......@@ -57,6 +56,16 @@ public:
/// Closes active connections.
~TestServerUnixSocket();
/// @brief Starts timer for detecting test timeout.
///
/// @param test_timeout Test timeout in milliseconds.
void startTimer(const long test_timeout);
/// @brief Generates response of a given length.
///
/// @param response_size Desired response size.
void generateCustomResponse(const uint64_t response_size);
/// @brief Creates and binds server socket.
void bindServerSocket();
......@@ -73,6 +82,12 @@ public:
/// @brief Return number of responses sent so far to the clients.
size_t getResponseNum() const;
/// @brief Checks if IO service has been stopped as a result of the
/// timeout.
bool isStopped() const {
return (stopped_);
}
private:
/// @brief Asynchronously accept new connections.
......@@ -94,6 +109,10 @@ private:
/// @brief Pool of connections.
boost::shared_ptr<ConnectionPool> connection_pool_;
/// @brief Indicates if IO service has been stopped as a result of
/// a timeout.
bool stopped_;
};
/// @brief Pointer to the @ref TestServerUnixSocket.
......
......@@ -7,7 +7,6 @@
#include <cc/data.h>
#include <cc/json_feed.h>
#include <boost/bind.hpp>
#include <iostream>
using namespace isc::data;
using namespace isc::util;
......
......@@ -7,8 +7,10 @@
#ifndef JSON_FEED_H
#define JSON_FEED_H
#include <cc/data.h>
#include <exceptions/exceptions.h>
#include <util/state_model.h>
#include <boost/shared_ptr.hpp>
#include <list>
#include <stdint.h>
#include <string>
......@@ -16,6 +18,14 @@
namespace isc {
namespace config {
class JSONFeed;
/// @brief Pointer to the @ref JSONFeed.
typedef boost::shared_ptr<JSONFeed> JSONFeedPtr;
/// @brief Pointer to the const @ref JSONFeed.
typedef boost::shared_ptr<const JSONFeed> ConstJSONFeedPtr;
/// @brief A generic exception thrown upon an error in the @ref JSONFeed.
class JSONFeedError : public Exception {
public:
......@@ -140,7 +150,9 @@ public:
bool feedOk() const;
/// @brief Returns error string when data processing has failed.
std::string getErrorMessage() const;
std::string getErrorMessage() const {
return (error_message_);
}
/// @brief Returns processed data as a structure of @ref isc::data::Element
/// objects.
......
......@@ -16,6 +16,7 @@ lib_LTLIBRARIES = libkea-cfgclient.la
libkea_cfgclient_la_SOURCES = config_data.h config_data.cc
libkea_cfgclient_la_SOURCES += module_spec.h module_spec.cc
libkea_cfgclient_la_SOURCES += base_command_mgr.cc base_command_mgr.h
libkea_cfgclient_la_SOURCES += client_connection.cc client_connection.h
libkea_cfgclient_la_SOURCES += command_mgr.cc command_mgr.h
libkea_cfgclient_la_SOURCES += command_socket.cc command_socket.h
libkea_cfgclient_la_SOURCES += command_socket_factory.cc command_socket_factory.h
......
// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <asiolink/asio_wrapper.h>
#include <asiolink/interval_timer.h>
#include <asiolink/unix_domain_socket.h>
#include <cc/json_feed.h>
#include <config/client_connection.h>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <array>
using namespace isc::asiolink;
namespace isc {
namespace config {
/// @brief Implementation of the @ref ClientConnection.
class ClientConnectionImpl : public boost::enable_shared_from_this<ClientConnectionImpl> {
public:
/// @brief Constructor.
///
/// @param io_service Reference to the IO service.
explicit ClientConnectionImpl(IOService& io_service);
/// @brief Starts asynchronous transaction with a remote endpoint.
///
/// See @ref ClientConnection::start documentation for the details.
///
/// @param socket_path Path to the socket description that the server
/// is bound to.
/// @param command Control command to be sent to the server.
/// @param handler Pointer to the user suppiled callback function which
/// should be invoked when transaction completes or when an error has
/// occurred during the transaction.
/// @param timeout Connection timeout in milliseconds.
void start(const ClientConnection::SocketPath& socket_path,
const ClientConnection::ControlCommand& command,
const ClientConnection::Handler& handler,
const ClientConnection::Timeout& timeout);
/// @brief Closes the socket.
void stop();
/// @brief Starts asynchronous send.
///
/// This method may be called multiple times internally when the command
/// is large and can't be sent all at once.
///
/// @param buffer Pointer to the buffer holding input data.
/// @param length Length of the data in the input buffer.
/// @param handler User supplied callback.
void doSend(const void* buffer, const size_t length,
const ClientConnection::Handler& handler);
/// @brief Starts asynchronous receive from the server.
///
/// This method may be called multiple times internally if the response
/// is large. The @ref JSONFeed instance is used to detect the boundaries
/// of the command within the stream. Once the entire command has been
/// received the user callback is invoked and the instance of the
/// @ref JSONFeed is returned.
///
/// @param handler User supplied callback.
void doReceive(const ClientConnection::Handler& handler);
/// @brief Terminates the connection and invokes a user callback indicating
/// an error.
///
/// @param ec Error code.
/// @param handler User callback.
void terminate(const boost::system::error_code& ec,
const ClientConnection::Handler& handler);
/// @brief Callback invoked when the timeout occurs.
///
/// It calls @ref terminate with the @c boost::asio::error::timed_out.
void timeoutCallback(const ClientConnection::Handler& handler);
private:
/// @brief Unix domain socket used for communication with a server.
UnixDomainSocket socket_;
/// @brief Pointer to the @ref JSONFeed holding a response.
///
///It may be a null pointer until some part of a response has been received.
JSONFeedPtr feed_;
/// @brief Holds the entire command being transmitted over the unix
/// socket.
std::string current_command_;
/// @brief Buffer into which chunks of the response are received.
std::array<char, 1024> read_buf_;
/// @brief Instance of the interval timer protecting against timeouts.
IntervalTimer timer_;
};
ClientConnectionImpl::ClientConnectionImpl(IOService& io_service)
: socket_(io_service), feed_(), current_command_(), timer_(io_service) {
}
void
ClientConnectionImpl::start(const ClientConnection::SocketPath& socket_path,
const ClientConnection::ControlCommand& command,
const ClientConnection::Handler& handler,
const ClientConnection::Timeout& timeout) {
// Start the timer protecting against timeouts.
timer_.setup(boost::bind(&ClientConnectionImpl::timeoutCallback,
shared_from_this(), handler),
timeout.timeout_, IntervalTimer::ONE_SHOT);
// Store the command in the class member to make sure it is valid
// the entire time.
current_command_.assign(command.control_command_);
// Pass self to lambda to make sure that the instance of this class
// lives as long as the lambda is held for async connect.
auto self(shared_from_this());
// Start asynchronous connect. This will return immediatelly.
socket_.asyncConnect(socket_path.socket_path_,
[this, self, command, handler](const boost::system::error_code& ec) {
// We failed to connect so we can't proceed. Simply clean up
// and invoke the user callback to signal an error.
if (ec) {
// This doesn't throw.
terminate(ec, handler);
} else {
// Connection successful. Transmit the command to the remote
// endpoint asynchronously.
doSend(current_command_.c_str(), current_command_.length(),
handler);
}
});
}
void
ClientConnectionImpl::stop() {
try {
socket_.close();
} catch (...) {
// Suppress errors related to closing a socket. We can't really help
// if an error occurred.
}
}
void
ClientConnectionImpl::doSend(const void* buffer, const size_t length,
const ClientConnection::Handler& handler) {
// Pass self to lambda to make sure that the instance of this class
// lives as long as the lambda is held for async send.
auto self(shared_from_this());
// Start asynchronous transmission of the command. This will return
// immediatelly.
socket_.asyncSend(buffer, length,
[this, self, buffer, length, handler]
(const boost::system::error_code& ec, size_t bytes_transferred) {
// An error has occurred while sending. Close the connection and
// signal an error.
if (ec) {
// This doesn't throw.
terminate(ec, handler);
} else {
// If the number of bytes we have managed to send so far is
// lower than the amount of data we're trying to send, we
// have to schedule another send to deliver the rest of
// the data.
if (bytes_transferred < length) {
doSend(static_cast<const char*>(buffer) + bytes_transferred,
length - bytes_transferred, handler);
} else {
// We have sent all the data. Start receiving a response.
doReceive(handler);
}
}
});
}
void
ClientConnectionImpl::doReceive(const ClientConnection::Handler& handler) {
// Pass self to lambda to make sure that the instance of this class
// lives as long as the lambda is held for async receive.
auto self(shared_from_this());
socket_.asyncReceive(&read_buf_[0], read_buf_.size(),
[this, self, handler]
(const boost::system::error_code& ec, size_t length) {
// An error has occurred while receiving the data. Close the connection
// and signal an error.
if (ec) {
// This doesn't throw.
terminate(ec, handler);
} else {
// Lazy initialization of the JSONFeed. The feed will be "parsing"
// received JSON stream and will detect when the whole response
// has been received.
if (!feed_) {
feed_.reset(new JSONFeed());
feed_->initModel();
}
// Put everything we have received so far into the feed and process
// the data.
feed_->postBuffer(&read_buf_[0], length);
feed_->poll();
// If the feed indicates that only a part of the response has been
// received, schedule another receive to get more data.
if (feed_->needData()) {
doReceive(handler);
} else {
// We have received the entire response, let's call the handler
// and indicate success.
terminate(ec, handler);
}
}
});
}
void
ClientConnectionImpl::terminate(const boost::system::error_code& ec,
const ClientConnection::Handler& handler) {
try {
stop();
current_command_.clear();
handler(ec, feed_);
} catch (...) {
// None of these operations should throw. In particular, the handler
// should not throw but if it has been misimplemented, we want to make
// sure we don't emit any exceptions from here.
}
}
void
ClientConnectionImpl::timeoutCallback(const ClientConnection::Handler& handler) {
// Timeout has occurred. The remote server didn't provide the entire
// response within the given time frame. Let's close the connection
// and signal the timeout.
terminate(boost::asio::error::timed_out, handler);
}
ClientConnection::ClientConnection(asiolink::IOService& io_service)
: impl_(new ClientConnectionImpl(io_service)) {
}
ClientConnection::~ClientConnection() {
stop();
}
void
ClientConnection::start(const ClientConnection::SocketPath& socket_path,
const ClientConnection::ControlCommand& command,
const Handler& handler,
const ClientConnection::Timeout& timeout) {
impl_->start(socket_path, command, handler, timeout);
}
void
ClientConnection::stop() {
impl_->stop();
}
} // end of namespace config
} // end of namespace isc
// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#ifndef CLIENT_CONNECTION_H
#define CLIENT_CONNECTION_H
#include <asiolink/io_service.h>
#include <cc/json_feed.h>
#include <boost/shared_ptr.hpp>
#include <functional>
namespace isc {
namespace config {
class ClientConnectionImpl;
/// @brief Represents client side connection over the unix domain socket.
///
/// This class represents a client side connection between the controlling
/// client and the server exposing control API over a unix domain socket.
/// In particular, this class is used by the Kea Control Agent to establish
/// connections with respective Kea services to forward received commands.
/// As of Kea 1.2 the servers can handle a single connection at the time.
/// In the future, we're planning to support multiple simulatenous connections.
/// In this case, each connection will be handled by a unique instance of the
/// @ref ClientConnection class.
///
/// The @ref ClientConnection supports asynchronous connections. A caller
/// creates an instance of the @ref ClientConnection and calls
/// @ref ClientConnection::start to start asynchronous communication with
/// a remote server. The caller provides a pointer to the callback function
/// (handler) which will be called when the communication with the server
/// completes, i.e. the command is sent to the server and the response
/// from the server is received. If an error occurs, the callback is
/// invoked with an error code indicating a reason for the failure.
///
/// The documentation of the @ref ClientConnection::start explains the
/// sequence of operations performed by this class.
///
/// Even though the @ref ClientConnection is asynchronous in nature, it
/// can also be used in cases requiring synchronous communication. As it
/// has been already mentioned, the servers in Kea 1.2 do not support
/// multiple concurrent connections. The following pseudo code demonstrate
/// how to perform synchronous transaction using this class.
///
/// @code
/// IOService io_service;
/// ClientConnection conn(io_service);
/// bool cb_invoked = false;
/// conn.start(ClientConnection::SocketPath("/tmp/kea.sock"),
/// ClientConnection::ControlCommand(command),
/// [this, &cb_invoked](const boost::system::error_code& ec,
/// const ConstJSONFeedPtr& feed) {
/// cb_invoked = true;
/// if (ec) {
/// ... handle error here ...
/// } else {
/// ... use feed to retrieve the response ...
/// }
/// }
/// );
/// while (!cb_invoked) {
/// io_service.run_one();
/// }
/// @endcode
///
class ClientConnection {
public:
/// @name Structures used for strong typing.
///
//@{
/// @brief Encapsulates socket path.
struct SocketPath {
explicit SocketPath(const std::string& socket_path)
: socket_path_(socket_path) { }
std::string socket_path_;
};
/// @brief Encapsulates control command.
struct ControlCommand {
explicit ControlCommand(const std::string control_command)
: control_command_(control_command) { }
std::string control_command_;
};
/// @brief Encapsulates timeout value.
struct Timeout {
explicit Timeout(const long timeout)
: timeout_(timeout) { }
long timeout_;
};
//@}
/// @brief Type of the callback invoked when the communication with
/// the server is complete or an error has occurred.
typedef std::function<void(const boost::system::error_code& ec,
const ConstJSONFeedPtr& feed)> Handler;
/// @brief Constructor.
///
/// @param io_service Reference to the IO service.
explicit ClientConnection(asiolink::IOService& io_service);
/// @brief Destructor.
///
/// Closes current connection.
~ClientConnection();
/// @brief Starts asynchronous transaction with a remote endpoint.
///
/// Starts asynchronous connection with the remote endpoint. If the
/// connection is successful, the control command is asynchronously
/// sent to the remote endpoint. When the entire command has been sent,
/// the response is read asynchronously, possibly in multiple chunks.
///
/// The timeout is specified for the entire transaction in milliseconds.
/// If the transaction takes longer than the timeout value the connection
/// is closed and the callback is called with the error code of
/// @c boost::asio::error::timed_out.
///
/// In other cases, the callback is called with the error code returned
/// by the boost asynchronous operations. If the transaction is successful