Commit 4139a2f4 authored by Marcin Siodelski's avatar Marcin Siodelski
Browse files

[master] Merge branch 'trac5317a'

parents f5e88bca c7f86fda
......@@ -494,16 +494,6 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) {
return (isc::config::createAnswer(1, err.str()));
}
// We're going to modify the timers configuration. This is not allowed
// when the thread is running.
try {
TimerMgr::instance()->stopThread();
} catch (const std::exception& ex) {
err << "Unable to stop worker thread running timers: "
<< ex.what() << ".";
return (isc::config::createAnswer(1, err.str()));
}
ConstElementPtr answer = configureDhcp4Server(*srv, config);
// Check that configuration was successful. If not, do not reopen sockets
......@@ -573,17 +563,6 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) {
return (isc::config::createAnswer(1, err.str()));
}
// Start worker thread if there are any timers installed.
if (TimerMgr::instance()->timersCount() > 0) {
try {
TimerMgr::instance()->startThread();
} catch (const std::exception& ex) {
err << "Unable to start worker thread running timers: "
<< ex.what() << ".";
return (isc::config::createAnswer(1, err.str()));
}
}
return (answer);
}
......@@ -614,6 +593,12 @@ ControlledDhcpv4Srv::ControlledDhcpv4Srv(uint16_t port /*= DHCP4_SERVER_PORT*/)
}
server_ = this; // remember this instance for later use in handlers
// TimerMgr uses IO service to run asynchronous timers.
TimerMgr::instance()->setIOService(getIOService());
// CommandMgr uses IO service to run asynchronous socket operations.
CommandMgr::instance().setIOService(getIOService());
// These are the commands always supported by the DHCPv4 server.
// Please keep the list in alphabetic order.
CommandMgr::instance().registerCommand("build-report",
......@@ -676,9 +661,6 @@ ControlledDhcpv4Srv::~ControlledDhcpv4Srv() {
try {
cleanup();
// Stop worker thread running timers, if it is running. Then
// unregister any timers.
timer_mgr_->stopThread();
timer_mgr_->unregisterTimers();
// Close the command socket (if it exists).
......
......@@ -5,7 +5,6 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <config.h>
#include <asiolink/io_address.h>
#include <dhcp/dhcp4.h>
#include <dhcp/duid.h>
#include <dhcp/hwaddr.h>
......@@ -418,7 +417,7 @@ const std::string Dhcpv4Srv::VENDOR_CLASS_PREFIX("VENDOR_CLASS_");
Dhcpv4Srv::Dhcpv4Srv(uint16_t port, const bool use_bcast,
const bool direct_response_desired)
: shutdown_(true), alloc_engine_(), port_(port),
: io_service_(new IOService()), shutdown_(true), alloc_engine_(), port_(port),
use_bcast_(use_bcast) {
LOG_DEBUG(dhcp4_logger, DBG_DHCP4_START, DHCP4_OPEN_SOCKET).arg(port);
......@@ -717,6 +716,7 @@ Dhcpv4Srv::run() {
while (!shutdown_) {
try {
run_one();
getIOService()->poll();
} catch (const std::exception& e) {
// General catch-all exception that are not caught by more specific
// catches. This one is for exceptions derived from std::exception.
......@@ -740,7 +740,10 @@ Dhcpv4Srv::run_one() {
Pkt4Ptr rsp;
try {
uint32_t timeout = 1000;
// Set select() timeout to 1s. This value should not be modified
// because it is important that the select() returns control
// frequently so as the IOService can be polled for ready handlers.
uint32_t timeout = 1;
LOG_DEBUG(packet4_logger, DBG_DHCP4_DETAIL, DHCP4_BUFFER_WAIT).arg(timeout);
query = receivePacket(timeout);
......
// Copyright (C) 2011-2016 Internet Systems Consortium, Inc. ("ISC")
// Copyright (C) 2011-2017 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
......@@ -7,6 +7,7 @@
#ifndef DHCPV4_SRV_H
#define DHCPV4_SRV_H
#include <asiolink/io_service.h>
#include <dhcp/dhcp4.h>
#include <dhcp/pkt4.h>
#include <dhcp/option.h>
......@@ -188,6 +189,10 @@ typedef boost::shared_ptr<Dhcpv4Exchange> Dhcpv4ExchangePtr;
/// See the derived \ref ControlledDhcpv4Srv class for support for
/// command and configuration updates over msgq.
class Dhcpv4Srv : public Daemon {
private:
/// @brief Pointer to IO service used by the server.
asiolink::IOServicePtr io_service_;
public:
......@@ -222,6 +227,11 @@ public:
/// @brief Destructor. Used during DHCPv4 service shutdown.
virtual ~Dhcpv4Srv();
/// @brief Returns pointer to the IO service used by the server.
asiolink::IOServicePtr& getIOService() {
return (io_service_);
}
/// @brief returns Kea version on stdout and exit.
/// redeclaration/redefinition. @ref Daemon::getVersion()
static std::string getVersion(bool extended);
......
......@@ -6,6 +6,7 @@
#include <config.h>
#include <asiolink/io_service.h>
#include <cc/command_interpreter.h>
#include <config/command_mgr.h>
#include <dhcp/dhcp4.h>
......@@ -82,10 +83,20 @@ public:
~CtrlChannelDhcpv4SrvTest() {
LeaseMgrFactory::destroy();
StatsMgr::instance().removeAll();
CommandMgr::instance().closeCommandSocket();
server_.reset();
reset();
};
/// @brief Returns pointer to the server's IO service.
///
/// @return Pointer to the server's IO service or null pointer if the server
/// hasn't been created.
IOServicePtr getIOService() {
return (server_ ? server_->getIOService() : IOServicePtr());
}
void createUnixChannelServer() {
::remove(socket_path_.c_str());
......@@ -179,15 +190,16 @@ public:
client.reset(new UnixControlClient());
ASSERT_TRUE(client);
// Connect and then call server's receivePacket() so it can
// detect the control socket connect and call the accept handler
// Connect to the server. This is expected to trigger server's acceptor
// handler when IOService::poll() is run.
ASSERT_TRUE(client->connectToServer(socket_path_));
ASSERT_NO_THROW(server_->receivePacket(0));
ASSERT_NO_THROW(getIOService()->poll());
// Send the command and then call server's receivePacket() so it can
// detect the inbound data and call the read handler
// Send the command. This will trigger server's handler which receives
// data over the unix domain socket. The server will start sending
// response to the client.
ASSERT_TRUE(client->sendCommand(command));
ASSERT_NO_THROW(server_->receivePacket(0));
ASSERT_NO_THROW(getIOService()->poll());
// Read the response generated by the server. Note that getResponse
// only fails if there an IO error or no response data was present.
......@@ -196,7 +208,8 @@ public:
// Now disconnect and process the close event
client->disconnectFromServer();
ASSERT_NO_THROW(server_->receivePacket(0));
ASSERT_NO_THROW(getIOService()->poll());
}
/// @brief Checks response for list-commands
......@@ -1056,4 +1069,43 @@ TEST_F(CtrlChannelDhcpv4SrvTest, configReloadValid) {
::remove("test8.json");
}
/// Verify that concurrent connections over the control channel can be
/// established.
/// @todo Future Kea 1.3 tickets will modify the behavior of the CommandMgr
/// such that the server will be able to send response in multiple chunks.
/// This test will need to be extended. For now, the receive and write
/// operations are atomic and there is no conflict between concurrent
/// connections.
TEST_F(CtrlChannelDhcpv4SrvTest, concurrentConnections) {
createUnixChannelServer();
boost::scoped_ptr<UnixControlClient> client1(new UnixControlClient());
ASSERT_TRUE(client1);
boost::scoped_ptr<UnixControlClient> client2(new UnixControlClient());
ASSERT_TRUE(client2);
// Client 1 connects.
ASSERT_TRUE(client1->connectToServer(socket_path_));
ASSERT_NO_THROW(getIOService()->poll());
// Client 2 connects.
ASSERT_TRUE(client2->connectToServer(socket_path_));
ASSERT_NO_THROW(getIOService()->poll());
// Send the command while another client is connected.
ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }"));
ASSERT_NO_THROW(getIOService()->poll());
std::string response;
// The server should respond ok.
ASSERT_TRUE(client2->getResponse(response));
EXPECT_TRUE(response.find("\"result\": 0") != std::string::npos);
// Disconnect the servers.
client1->disconnectFromServer();
client2->disconnectFromServer();
ASSERT_NO_THROW(getIOService()->poll());
}
} // End of anonymous namespace
......@@ -6,7 +6,9 @@
#include <config.h>
#include <asiolink/interval_timer.h>
#include <asiolink/io_address.h>
#include <asiolink/io_service.h>
#include <cc/command_interpreter.h>
#include <dhcp/dhcp4.h>
#include <dhcp/hwaddr.h>
......@@ -85,17 +87,15 @@ public:
/// @brief Runs timers for specified time.
///
/// Internally, this method calls @c IfaceMgr::receive4 to run the
/// callbacks for the installed timers.
///
/// @param io_service Pointer to the IO service to be ran.
/// @param timeout_ms Amount of time after which the method returns.
void runTimersWithTimeout(const long timeout_ms) {
isc::util::Stopwatch stopwatch;
while (stopwatch.getTotalMilliseconds() < timeout_ms) {
// Block for up to one millisecond waiting for the timers'
// callbacks to be executed.
IfaceMgr::instancePtr()->receive4(0, 1000);
}
void runTimersWithTimeout(const IOServicePtr& io_service, const long timeout_ms) {
IntervalTimer timer(*io_service);
timer.setup([this, &io_service]() {
io_service->stop();
}, timeout_ms, IntervalTimer::ONE_SHOT);
io_service->run();
io_service->get_io_service().reset();
}
/// Name of a config file used during tests
......@@ -571,7 +571,7 @@ TEST_F(JSONFileBackendTest, timers) {
// Poll the timers for a while to make sure that each of them is executed
// at least once.
ASSERT_NO_THROW(runTimersWithTimeout(5000));
ASSERT_NO_THROW(runTimersWithTimeout(srv->getIOService(), 5000));
// Verify that the leases in the database have been processed as expected.
......
......@@ -495,17 +495,6 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) {
return (no_srv);
}
// We're going to modify the timers configuration. This is not allowed
// when the thread is running.
try {
TimerMgr::instance()->stopThread();
} catch (const std::exception& ex) {
std::ostringstream err;
err << "Unable to stop worker thread running timers: "
<< ex.what() << ".";
return (isc::config::createAnswer(1, err.str()));
}
ConstElementPtr answer = configureDhcp6Server(*srv, config);
// Check that configuration was successful. If not, do not reopen sockets
......@@ -594,18 +583,6 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) {
return (isc::config::createAnswer(1, err.str()));
}
// Start worker thread if there are any timers installed.
if (TimerMgr::instance()->timersCount() > 0) {
try {
TimerMgr::instance()->startThread();
} catch (const std::exception& ex) {
std::ostringstream err;
err << "Unable to start worker thread running timers: "
<< ex.what() << ".";
return (isc::config::createAnswer(1, err.str()));
}
}
// Finally, we can commit runtime option definitions in libdhcp++. This is
// exception free.
LibDHCP::commitRuntimeOptionDefs();
......@@ -638,6 +615,12 @@ ControlledDhcpv6Srv::ControlledDhcpv6Srv(uint16_t port)
}
server_ = this; // remember this instance for use in callback
// TimerMgr uses IO service to run asynchronous timers.
TimerMgr::instance()->setIOService(getIOService());
// CommandMgr uses IO service to run asynchronous socket operations.
CommandMgr::instance().setIOService(getIOService());
// These are the commands always supported by the DHCPv6 server.
// Please keep the list in alphabetic order.
CommandMgr::instance().registerCommand("build-report",
......@@ -699,9 +682,6 @@ ControlledDhcpv6Srv::~ControlledDhcpv6Srv() {
try {
cleanup();
// Stop worker thread running timers, if it is running. Then
// unregister any timers.
timer_mgr_->stopThread();
timer_mgr_->unregisterTimers();
// Close the command socket (if it exists).
......
......@@ -177,7 +177,8 @@ namespace dhcp {
const std::string Dhcpv6Srv::VENDOR_CLASS_PREFIX("VENDOR_CLASS_");
Dhcpv6Srv::Dhcpv6Srv(uint16_t port)
: port_(port), serverid_(), shutdown_(true), alloc_engine_()
: io_service_(new IOService()), port_(port), serverid_(), shutdown_(true),
alloc_engine_()
{
LOG_DEBUG(dhcp6_logger, DBG_DHCP6_START, DHCP6_OPEN_SOCKET).arg(port);
......@@ -374,6 +375,7 @@ bool Dhcpv6Srv::run() {
while (!shutdown_) {
try {
run_one();
getIOService()->poll();
} catch (const std::exception& e) {
// General catch-all standard exceptions that are not caught by more
// specific catches.
......@@ -395,7 +397,10 @@ void Dhcpv6Srv::run_one() {
Pkt6Ptr rsp;
try {
uint32_t timeout = 1000;
// Set select() timeout to 1s. This value should not be modified
// because it is important that the select() returns control
// frequently so as the IOService can be polled for ready handlers.
uint32_t timeout = 1;
LOG_DEBUG(packet6_logger, DBG_DHCP6_DETAIL, DHCP6_BUFFER_WAIT).arg(timeout);
query = receivePacket(timeout);
......
......@@ -7,6 +7,7 @@
#ifndef DHCPV6_SRV_H
#define DHCPV6_SRV_H
#include <asiolink/io_service.h>
#include <dhcp_ddns/ncr_msg.h>
#include <dhcp/dhcp6.h>
#include <dhcp/duid.h>
......@@ -53,6 +54,10 @@ public:
/// packets, processes them, manages leases assignment and generates
/// appropriate responses.
class Dhcpv6Srv : public Daemon {
private:
/// @brief Pointer to IO service used by the server.
asiolink::IOServicePtr io_service_;
public:
/// @brief defines if certain option may, must or must not appear
......@@ -78,6 +83,11 @@ public:
/// @brief Destructor. Used during DHCPv6 service shutdown.
virtual ~Dhcpv6Srv();
/// @brief Returns pointer to the IO service used by the server.
asiolink::IOServicePtr& getIOService() {
return (io_service_);
}
/// @brief returns Kea version on stdout and exit.
/// redeclaration/redefinition. @ref Daemon::getVersion()
static std::string getVersion(bool extended);
......
......@@ -114,6 +114,14 @@ public:
reset();
};
/// @brief Returns pointer to the server's IO service.
///
/// @return Pointer to the server's IO service or null pointer if the server
/// hasn't been created.
IOServicePtr getIOService() {
return (server_ ? server_->getIOService() : IOServicePtr());
}
void createUnixChannelServer() {
static_cast<void>(::remove(socket_path_.c_str()));
......@@ -192,15 +200,16 @@ public:
client.reset(new UnixControlClient());
ASSERT_TRUE(client);
// Connect and then call server's receivePacket() so it can
// detect the control socket connect and call the accept handler
// Connect to the server. This is expected to trigger server's acceptor
// handler when IOService::poll() is run.
ASSERT_TRUE(client->connectToServer(socket_path_));
ASSERT_NO_THROW(server_->receivePacket(0));
ASSERT_NO_THROW(getIOService()->poll());
// Send the command and then call server's receivePacket() so it can
// detect the inbound data and call the read handler
// Send the command. This will trigger server's handler which receives
// data over the unix domain socket. The server will start sending
// response to the client.
ASSERT_TRUE(client->sendCommand(command));
ASSERT_NO_THROW(server_->receivePacket(0));
ASSERT_NO_THROW(getIOService()->poll());
// Read the response generated by the server. Note that getResponse
// only fails if there an IO error or no response data was present.
......@@ -209,7 +218,8 @@ public:
// Now disconnect and process the close event
client->disconnectFromServer();
ASSERT_NO_THROW(server_->receivePacket(0));
ASSERT_NO_THROW(getIOService()->poll());
}
/// @brief Checks response for list-commands
......@@ -1081,4 +1091,44 @@ TEST_F(CtrlChannelDhcpv6SrvTest, configReloadValid) {
::remove("test8.json");
}
/// Verify that concurrent connections over the control channel can be
/// established.
/// @todo Future Kea 1.3 tickets will modify the behavior of the CommandMgr
/// such that the server will be able to send response in multiple chunks.
/// This test will need to be extended. For now, the receive and write
/// operations are atomic and there is no conflict between concurrent
/// connections.
TEST_F(CtrlChannelDhcpv6SrvTest, concurrentConnections) {
createUnixChannelServer();
boost::scoped_ptr<UnixControlClient> client1(new UnixControlClient());
ASSERT_TRUE(client1);
boost::scoped_ptr<UnixControlClient> client2(new UnixControlClient());
ASSERT_TRUE(client2);
// Client 1 connects.
ASSERT_TRUE(client1->connectToServer(socket_path_));
ASSERT_NO_THROW(getIOService()->poll());
// Client 2 connects.
ASSERT_TRUE(client2->connectToServer(socket_path_));
ASSERT_NO_THROW(getIOService()->poll());
// Send the command while another client is connected.
ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }"));
ASSERT_NO_THROW(getIOService()->poll());
std::string response;
// The server should respond ok.
ASSERT_TRUE(client2->getResponse(response));
EXPECT_TRUE(response.find("\"result\": 0") != std::string::npos);
// Disconnect the servers.
client1->disconnectFromServer();
client2->disconnectFromServer();
ASSERT_NO_THROW(getIOService()->poll());
}
} // End of anonymous namespace
......@@ -74,17 +74,15 @@ public:
/// @brief Runs timers for specified time.
///
/// Internally, this method calls @c IfaceMgr::receive6 to run the
/// callbacks for the installed timers.
///
/// @param io_service Pointer to the IO service to be ran.
/// @param timeout_ms Amount of time after which the method returns.
void runTimersWithTimeout(const long timeout_ms) {
isc::util::Stopwatch stopwatch;
while (stopwatch.getTotalMilliseconds() < timeout_ms) {
// Block for up to one millisecond waiting for the timers'
// callbacks to be executed.
IfaceMgr::instancePtr()->receive6(0, 1000);
}
void runTimersWithTimeout(const IOServicePtr& io_service, const long timeout_ms) {
IntervalTimer timer(*io_service);
timer.setup([this, &io_service]() {
io_service->stop();
}, timeout_ms, IntervalTimer::ONE_SHOT);
io_service->run();
io_service->get_io_service().reset();
}
static const char* TEST_FILE;
......@@ -521,7 +519,7 @@ TEST_F(JSONFileBackendTest, timers) {
// Poll the timers for a while to make sure that each of them is executed
// at least once.
ASSERT_NO_THROW(runTimersWithTimeout(5000));
ASSERT_NO_THROW(runTimersWithTimeout(srv->getIOService(), 5000));
// Verify that the leases in the database have been processed as expected.
......
......@@ -76,8 +76,8 @@ IntervalTimerImpl::setup(const IntervalTimer::Callback& cbfunc,
const long interval,
const IntervalTimer::Mode& mode)
{
// Interval should not be less than or equal to 0.
if (interval <= 0) {
// Interval should not be less than 0.
if (interval < 0) {
isc_throw(isc::BadValue, "Interval should not be less than or "
"equal to 0");
}
......
// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#ifndef IO_ACCEPTOR_H
#define IO_ACCEPTOR_H
#ifndef BOOST_ASIO_HPP
#error "asio.hpp must be included before including this, see asiolink.h as to why"
#endif
#include <asiolink/io_service.h>
#include <asiolink/io_socket.h>
namespace isc {
namespace asiolink {
/// @brief Base class for acceptor services in Kea.
///
/// This is a wrapper class for ASIO acceptor service. Classes implementing
/// services for specific protocol types should derive from this class.
///
/// Acceptor is an IO object which accepts incoming connections into a socket
/// object. This socket is then used for data transmission from the client
/// to server and back. The acceptor is continued to be used to accept new
/// connections while the accepted connection is active.
///
/// @tparam ProtocolType ASIO protocol type, e.g. stream_protocol
/// @tparam CallbackType Callback function type which should have the following
/// signature: @c void(const boost::system::error_code&).
template<typename ProtocolType, typename CallbackType>
class IOAcceptor : public IOSocket {
public:
/// @brief Constructor.
///
/// @param io_service Reference to the IO service.
explicit IOAcceptor(IOService& io_service)
: IOSocket(),
acceptor_(new typename ProtocolType::acceptor(io_service.get_io_service())) {
}
/// @brief Destructor.
virtual ~IOAcceptor() { }
/// @brief Returns file descriptor of the underlying socket.
virtual int getNative() const {
return (acceptor_->native());
}
/// @brief Opens acceptor socket given the endpoint.
///
/// @param endpoint Reference to the endpoint object defining local
/// acceptor endpoint.
///
/// @tparam EndpointType Endpoint type.
template<typename EndpointType>
void open(const EndpointType& endpoint) {
acceptor_->open(endpoint.getASIOEndpoint().protocol());
}
/// @brief Binds socket to an endpoint.
///
/// @param endpoint Reference to the endpoint object defining local
/// acceptor endpoint.
///
/// @tparam EndpointType Endpoint type.
template<typename EndpointType>
void bind(const EndpointType& endpoint) {
acceptor_->bind(endpoint.getASIOEndpoint());
}