Commit 0ea522a2 authored by Marcin Siodelski's avatar Marcin Siodelski
Browse files

[5317] Cleanup new code added to facilitate unix domain sockets.

parent 7f14fd19
......@@ -593,7 +593,10 @@ ControlledDhcpv4Srv::ControlledDhcpv4Srv(uint16_t port /*= DHCP4_SERVER_PORT*/)
}
server_ = this; // remember this instance for later use in handlers
// TimerMgr uses IO service to run asynchronous timers.
TimerMgr::instance()->setIOService(getIOService());
// CommandMgr uses IO service to run asynchronous socket operations.
CommandMgr::instance().setIOService(getIOService());
// These are the commands always supported by the DHCPv4 server.
......
......@@ -21,8 +21,6 @@
#include <testutils/io_utils.h>
#include <testutils/unix_control_client.h>
#include <util/threads/thread.h>
#include "marker_file.h"
#include "test_libraries.h"
......@@ -87,15 +85,14 @@ public:
StatsMgr::instance().removeAll();
CommandMgr::instance().closeCommandSocket();
if (getIOService()) {
getIOService()->stopWork();
getIOService()->poll();
}
server_.reset();
};
/// @brief Returns pointer to the server's IO service.
///
/// @return Pointer to the server's IO service or null pointer if the server
/// hasn't been created.
IOServicePtr getIOService() {
return (server_ ? server_->getIOService() : IOServicePtr());
}
......@@ -134,7 +131,6 @@ public:
std::string config_txt = header + socket_path_ + footer;
server_.reset(new NakedControlledDhcpv4Srv());
CommandMgr::instance().setIOService(getIOService());
ConstElementPtr config;
ASSERT_NO_THROW(config = parseDHCP4(config_txt));
......@@ -309,7 +305,6 @@ TEST_F(CtrlChannelDhcpv4SrvTest, commands) {
ASSERT_NO_THROW(
server_.reset(new NakedControlledDhcpv4Srv());
CommandMgr::instance().setIOService(getIOService());
);
// Use empty parameters list
......@@ -394,7 +389,6 @@ TEST_F(CtrlChannelDhcpv4SrvTest, commandsRegistration) {
// Created server should register several additional commands.
ASSERT_NO_THROW(
server_.reset(new NakedControlledDhcpv4Srv());
CommandMgr::instance().setIOService(getIOService());
);
EXPECT_NO_THROW(answer = CommandMgr::instance().processCommand(list_cmds));
......@@ -696,7 +690,7 @@ TEST_F(CtrlChannelDhcpv4SrvTest, configSet) {
response);
/// Check that the config was indeed applied.
/* const Subnet4Collection* subnets =
const Subnet4Collection* subnets =
CfgMgr::instance().getCurrentCfg()->getCfgSubnets4()->getAll();
EXPECT_EQ(1, subnets->size());
......@@ -757,7 +751,7 @@ TEST_F(CtrlChannelDhcpv4SrvTest, configSet) {
EXPECT_EQ(2, subnets->size());
// Clean up after the test.
CfgMgr::instance().clear(); */
CfgMgr::instance().clear();
}
// Tests that the server properly responds to shtudown command sent
......
......@@ -17,13 +17,21 @@
namespace isc {
namespace asiolink {
/// @brief Base class for acceptor services in Kea.
///
/// This is a wrapper class for ASIO acceptor service. Classes implementing
/// services for specific protocol types should derive from this class.
///
/// @tparam ProtocolType ASIO protocol type, e.g. stream_protocol
/// @tparam CallbackType Callback function type which should have the following
/// signature: @c void(const boost::system::error_code&).
template<typename ProtocolType, typename CallbackType>
class IOAcceptor : public IOSocket {
public:
/// @brief Constructor.
///
/// @param io_service IO service.
/// @param io_service Reference to the IO service.
explicit IOAcceptor(IOService& io_service)
: IOSocket(),
acceptor_(new typename ProtocolType::acceptor(io_service.get_io_service())) {
......@@ -39,8 +47,10 @@ public:
/// @brief Opens acceptor socket given the endpoint.
///
/// @param endpoint Reference to the endpoint object which specifies the
/// address and port on which the acceptor service will run.
/// @param endpoint Reference to the endpoint object defining local
/// acceptor endpoint.
///
/// @tparam EndpointType Endpoint type.
template<typename EndpointType>
void open(const EndpointType& endpoint) {
acceptor_->open(endpoint.getASIOEndpoint().protocol());
......@@ -48,8 +58,10 @@ public:
/// @brief Binds socket to an endpoint.
///
/// @param endpoint Reference to an endpoint to which the socket is to
/// be bound.
/// @param endpoint Reference to the endpoint object defining local
/// acceptor endpoint.
///
/// @tparam EndpointType Endpoint type.
template<typename EndpointType>
void bind(const EndpointType& endpoint) {
acceptor_->bind(endpoint.getASIOEndpoint());
......@@ -66,17 +78,11 @@ public:
acceptor_->set_option(socket_option);
}
/// @brief Starts listening for the new connections.
/// @brief Starts listening new connections.
void listen() {
acceptor_->listen();
}
template<template<typename> class SocketType, typename SocketCallback>
void asyncAccept(const SocketType<SocketCallback>& socket,
const CallbackType& callback) {
acceptor_->async_accept(socket.getASIOSocket(), callback);
}
/// @brief Checks if the acceptor is open.
///
/// @return true if acceptor is open.
......@@ -94,15 +100,17 @@ protected:
/// @brief Asynchronously accept new connection.
///
/// This method accepts new connection into the specified socket. When the
/// new connection arrives or an error occurs the specified callback function
/// is invoked.
/// new connection arrives or an error occurs the specified callback
/// function is invoked.
///
/// @param socket Socket into which connection should be accepted.
/// @param callback Callback function to be invoked when the new connection
/// arrives.
/// @tparam SocketType
/// @tparam SocketType Socket type, e.g. @ref UnixDomainSocket. It must
/// implement @c getASIOSocket method.
template<typename SocketType>
void asyncAcceptInternal(const SocketType& socket, const CallbackType& callback) {
void asyncAcceptInternal(const SocketType& socket,
const CallbackType& callback) {
acceptor_->async_accept(socket.getASIOSocket(), callback);
}
......
......@@ -2,7 +2,7 @@
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file obtain one at http://mozilla.org/MPL/2.0/.
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#ifndef IO_SOCKET_H
#define IO_SOCKET_H 1
......
......@@ -11,6 +11,7 @@
#error "asio.hpp must be included before including this, see asiolink.h as to why"
#endif
#include <asiolink/io_acceptor.h>
#include <asiolink/io_service.h>
#include <asiolink/io_socket.h>
#include <asiolink/tcp_endpoint.h>
......@@ -28,23 +29,14 @@ namespace asiolink {
///
/// @tparam C Acceptor callback type.
template<typename C>
class TCPAcceptor : public IOSocket {
class TCPAcceptor : public IOAcceptor<boost::asio::ip::tcp, C> {
public:
/// @brief Constructor.
///
/// @param io_service IO service.
explicit TCPAcceptor(IOService& io_service)
: IOSocket(),
acceptor_(new boost::asio::ip::tcp::acceptor(io_service.get_io_service())) {
}
/// @brief Destructor.
virtual ~TCPAcceptor() { }
/// @brief Returns file descriptor of the underlying socket.
virtual int getNative() const final {
return (acceptor_->native());
: IOAcceptor<boost::asio::ip::tcp, C>(io_service) {
}
/// @brief Returns protocol of the socket.
......@@ -54,45 +46,6 @@ public:
return (IPPROTO_TCP);
}
/// @brief Opens acceptor socket given the endpoint.
///
/// @param endpoint Reference to the endpoint object which specifies the
/// address and port on which the acceptor service will run.
void open(const TCPEndpoint& endpoint) {
acceptor_->open(endpoint.getASIOEndpoint().protocol());
}
/// @brief Sets socket option.
///
/// Typically, this method is used to set SO_REUSEADDR option on the socket:
/// @code
/// IOService io_service;
/// TCPAcceptor<Callback> acceptor(io_service);
/// acceptor.setOption(TCPAcceptor::ReuseAddress(true))
/// @endcode
///
/// @param socket_option Reference to the object encapsulating an option to
/// be set for the socket.
/// @tparam SettableSocketOption Type of the object encapsulating socket option
/// being set.
template<typename SettableSocketOption>
void setOption(const SettableSocketOption& socket_option) {
acceptor_->set_option(socket_option);
}
/// @brief Binds socket to an endpoint.
///
/// @param endpoint Reference to an endpoint to which the socket is to
/// be bound.
void bind(const TCPEndpoint& endpoint) {
acceptor_->bind(endpoint.getASIOEndpoint());
}
/// @brief Starts listening for the new connections.
void listen() {
acceptor_->listen();
}
/// @brief Asynchronously accept new connection.
///
/// This method accepts new connection into the specified socket. When the
......@@ -105,26 +58,8 @@ public:
/// @tparam SocketCallback Type of the callback for the @ref TCPSocket.
template<typename SocketCallback>
void asyncAccept(const TCPSocket<SocketCallback>& socket, C& callback) {
acceptor_->async_accept(socket.getASIOSocket(), callback);
}
/// @brief Checks if the acceptor is open.
///
/// @return true if acceptor is open.
bool isOpen() const {
return (acceptor_->is_open());
IOAcceptor<boost::asio::ip::tcp, C>::asyncAcceptInternal(socket, callback);
}
/// @brief Closes the acceptor.
void close() const {
acceptor_->close();
}
private:
/// @brief Underlying ASIO acceptor implementation.
boost::shared_ptr<boost::asio::ip::tcp::acceptor> acceptor_;
};
......
......@@ -18,19 +18,25 @@
namespace isc {
namespace asiolink {
class UnixDomainSocketAcceptor
: public IOAcceptor<boost::asio::local::stream_protocol,
std::function<void(const boost::system::error_code&)> > {
/// @brief Implements acceptor service for @ref UnixDomainSocket.
class UnixDomainSocketAcceptor : public IOAcceptor<boost::asio::local::stream_protocol,
std::function<void(const boost::system::error_code&)> > {
public:
/// @brief Callback type used in call to @ref UnixDomainSocketAcceptor::asyncAccept.
typedef std::function<void(const boost::system::error_code&)> AcceptHandler;
/// @brief Constructor.
///
/// @param io_service Reference to the IO service.
explicit UnixDomainSocketAcceptor(IOService& io_service)
: IOAcceptor<boost::asio::local::stream_protocol,
std::function<void(const boost::system::error_code&)> >(io_service) {
}
/// @brief Returns the transport protocol of the socket.
///
/// @return AF_LOCAL.
virtual int getProtocol() const final {
return (AF_LOCAL);
}
......
......@@ -16,13 +16,20 @@
namespace isc {
namespace asiolink {
/// @brief Endpoint for @ref UnixDomainSocket.
///
/// This is a simple class encapsulating ASIO unix domain socket.
class UnixDomainSocketEndpoint {
public:
/// @brief Constructor.
///
/// @param endpoint_path Path to the socket descriptor.
explicit UnixDomainSocketEndpoint(const std::string& endpoint_path)
: endpoint_(endpoint_path) {
}
/// @brief Returns underlying ASIO endpoint.
const boost::asio::local::stream_protocol::endpoint&
getASIOEndpoint() const {
return (endpoint_);
......@@ -30,11 +37,12 @@ public:
private:
/// @brief Underlying ASIO endpoint.
boost::asio::local::stream_protocol::endpoint endpoint_;
};
}
}
} // end of namespace isc::asiolink
} // end of namespace isc
#endif // UNIX_DOMAIN_SOCKET_ENDPOINT_H
......@@ -18,8 +18,6 @@ libkea_cfgclient_la_SOURCES += module_spec.h module_spec.cc
libkea_cfgclient_la_SOURCES += base_command_mgr.cc base_command_mgr.h
libkea_cfgclient_la_SOURCES += client_connection.cc client_connection.h
libkea_cfgclient_la_SOURCES += command_mgr.cc command_mgr.h
libkea_cfgclient_la_SOURCES += command_socket.cc command_socket.h
libkea_cfgclient_la_SOURCES += command_socket_factory.cc command_socket_factory.h
libkea_cfgclient_la_SOURCES += config_log.h config_log.cc
libkea_cfgclient_la_SOURCES += hooked_command_mgr.cc hooked_command_mgr.h
......
......@@ -10,7 +10,6 @@
#include <asiolink/unix_domain_socket_acceptor.h>
#include <asiolink/unix_domain_socket_endpoint.h>
#include <config/command_mgr.h>
#include <config/command_socket_factory.h>
#include <cc/data.h>
#include <cc/command_interpreter.h>
#include <dhcp/iface_mgr.h>
......@@ -51,6 +50,10 @@ public:
void stop() {
if (!response_in_progress_) {
LOG_INFO(command_logger, COMMAND_SOCKET_CONNECTION_CLOSED)
.arg(socket_->getNative());
isc::dhcp::IfaceMgr::instance().deleteExternalSocket(socket_->getNative());
socket_->close();
}
}
......@@ -105,11 +108,27 @@ void
Connection::receiveHandler(const boost::system::error_code& ec,
size_t bytes_transferred) {
if (ec) {
if (ec.value() != boost::asio::error::operation_aborted) {
LOG_ERROR(command_logger, COMMAND_SOCKET_READ_FAIL)
.arg(ec.value()).arg(socket_->getNative());
}
/// @todo: Should we close the connection, similar to what is already
/// being done for bytes_transferred == 0.
return;
} else if (bytes_transferred == 0) {
connection_pool_.stop(shared_from_this());
return;
}
ConstElementPtr cmd, rsp;
LOG_DEBUG(command_logger, DBG_COMMAND, COMMAND_SOCKET_READ)
.arg(bytes_transferred).arg(socket_->getNative());
try {
response_in_progress_ = true;
......@@ -120,6 +139,7 @@ Connection::receiveHandler(const boost::system::error_code& ec,
// If successful, then process it as a command.
rsp = CommandMgr::instance().processCommand(cmd);
} catch (const Exception& ex) {
LOG_WARN(command_logger, COMMAND_PROCESS_ERROR1).arg(ex.what());
rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
......@@ -143,15 +163,22 @@ Connection::receiveHandler(const boost::system::error_code& ec,
len = 65535;
}
// Send the data back over socket.
socket_->write(txt.c_str(), len);
response_in_progress_ = false;
try {
// Send the data back over socket.
socket_->write(txt.c_str(), len);
} catch (const std::exception& ex) {
// Response transmission failed. Since the response failed, it doesn't
// make sense to send any status codes. Let's log it and be done with
// it.
LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL)
.arg(len).arg(socket_->getNative()).arg(ex.what());
}
isc::dhcp::IfaceMgr::instance().deleteExternalSocket(socket_->getNative());
response_in_progress_ = false;
connection_pool_.stop(shared_from_this());
}
}
......@@ -185,7 +212,11 @@ public:
void
CommandMgrImpl::openCommandSocket(const isc::data::ConstElementPtr& socket_info) {
socket_name_.clear();
if(!socket_info) {
isc_throw(BadSocketInfo, "Missing socket_info parameters, can't create socket.");
}
ConstElementPtr type = socket_info->get("socket-type");
if (!type) {
isc_throw(BadSocketInfo, "Mandatory 'socket-type' parameter missing");
......@@ -209,27 +240,37 @@ CommandMgrImpl::openCommandSocket(const isc::data::ConstElementPtr& socket_info)
socket_name_ = name->stringValue();
acceptor_.reset(new UnixDomainSocketAcceptor(*io_service_));
UnixDomainSocketEndpoint endpoint(socket_name_);
acceptor_->open(endpoint);
acceptor_->bind(endpoint);
acceptor_->listen();
try {
// Start asynchronous acceptor service.
acceptor_.reset(new UnixDomainSocketAcceptor(*io_service_));
UnixDomainSocketEndpoint endpoint(socket_name_);
acceptor_->open(endpoint);
acceptor_->bind(endpoint);
acceptor_->listen();
doAccept();
// Install this socket in Interface Manager.
isc::dhcp::IfaceMgr::instance().addExternalSocket(acceptor_->getNative(), 0);
// Install this socket in Interface Manager.
isc::dhcp::IfaceMgr::instance().addExternalSocket(acceptor_->getNative(), 0);
doAccept();
} catch (const std::exception& ex) {
isc_throw(SocketError, ex.what());
}
}
void
CommandMgrImpl::doAccept() {
// Create a socket into which the acceptor will accept new connection.
socket_.reset(new UnixDomainSocket(*io_service_));
acceptor_->asyncAccept(*socket_,
[this](const boost::system::error_code& ec) {
acceptor_->asyncAccept(*socket_, [this](const boost::system::error_code& ec) {
if (!ec) {
// New connection is arriving. Start asynchronous transmission.
ConnectionPtr connection(new Connection(socket_, connection_pool_));
connection_pool_.start(connection);
}
// Unless we're stopping the service, start accepting connections again.
if (ec.value() != boost::asio::error::operation_aborted) {
doAccept();
}
});
......@@ -245,42 +286,18 @@ CommandMgr::openCommandSocket(const isc::data::ConstElementPtr& socket_info) {
}
void CommandMgr::closeCommandSocket() {
impl_->connection_pool_.stopAll();
if (impl_->acceptor_) {
// Close acceptor if the acceptor is open.
if (impl_->acceptor_ && impl_->acceptor_->isOpen()) {
isc::dhcp::IfaceMgr::instance().deleteExternalSocket(impl_->acceptor_->getNative());
impl_->acceptor_->close();
static_cast<void>(::remove(impl_->socket_name_.c_str()));
}
/* // Now let's close all existing connections that we may have.
for (std::list<CommandSocketPtr>::iterator conn = connections_.begin();
conn != connections_.end(); ++conn) {
(*conn)->close();
}
connections_.clear(); */
}
void CommandMgr::addConnection(const CommandSocketPtr& conn) {
connections_.push_back(conn);
}
bool CommandMgr::closeConnection(int fd) {
// Let's iterate over all currently registered connections.
for (std::list<CommandSocketPtr>::iterator conn = connections_.begin();
conn != connections_.end(); ++conn) {
// If found, close it.
if ((*conn)->getFD() == fd) {
(*conn)->close();
connections_.erase(conn);
return (true);
}
}
return (false);
// Stop all connections which can be closed. The only connection that won't
// be closed is the one over which we have received a request to reconfigure
// the server. This connection will be held until the CommandMgr responds to
// such request.
impl_->connection_pool_.stopAll();
}
int
......@@ -297,107 +314,8 @@ CommandMgr::instance() {
void
CommandMgr::setIOService(const IOServicePtr& io_service) {
closeCommandSocket();
impl_->io_service_ = io_service;
}
void
CommandMgr::commandReader(int sockfd) {
/// @todo: We do not handle commands that are larger than 64K.
// We should not expect commands bigger than 64K.
char buf[65536];
memset(buf, 0, sizeof(buf));
ConstElementPtr cmd, rsp;
// Read incoming data.
int rval = read(sockfd, buf, sizeof(buf));
if (rval < 0) {
// Read failed
LOG_ERROR(command_logger, COMMAND_SOCKET_READ_FAIL).arg(rval).arg(sockfd);
/// @todo: Should we close the connection, similar to what is already
/// being done for rval == 0?
return;
} else if (rval == 0) {
// Remove it from the active connections list.
instance().closeConnection(sockfd);
return;
}
// Duplicate the connection's socket in the event, the command causes the
// channel to close (like a reconfig). This permits us to always have
// a socket on which to respond. If for some reason we can't fall back
// to the connection socket.
int rsp_fd = dup(sockfd);
if (rsp_fd < 0 ) {
// Highly unlikely
const char* errmsg = strerror(errno);
LOG_DEBUG(command_logger, DBG_COMMAND, COMMAND_SOCKET_DUP_WARN)
.arg(errmsg);
rsp_fd = sockfd;
}
LOG_DEBUG(command_logger, DBG_COMMAND, COMMAND_SOCKET_READ).arg(rval).arg(sockfd);
// Ok, we received something. Let's see if we can make any sense of it.
try {
// Try to interpret it as JSON.
std::string sbuf(buf, static_cast<size_t>(rval));
cmd = Element::fromJSON(sbuf, true);
// If successful, then process it as a command.
rsp = CommandMgr::instance().processCommand(cmd);
} catch (const Exception& ex) {
LOG_WARN(command_logger, COMMAND_PROCESS_ERROR1).arg(ex.what());
rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
}
if (!rsp) {
LOG_WARN(command_logger, COMMAND_RESPONSE_ERROR);
// Only close the duped socket if it's different (should be)
if (rsp_fd != sockfd) {
close(rsp_fd);
}