Commit bbb822b1 authored by Thomas Markwalder's avatar Thomas Markwalder

[#260,!20] Packet queueing is now optional

src/bin/dhcp<4/6>/ctrl_dhcp<4/6>_srv.cc
    ControlledDhcpv<4/6>Srv::processConfig() - now calls
        IfaceMgr::configureDHCPPacketQueue

src/bin/dhcp<4/6>/dhcp<4/6>_parser.yy
    dhpc-queue-control parsing updated to enforce
    enable-queue/queue-type rules

src/bin/dhcp<4/6>/tests/config_parser_unittest.cc
    TEST_F(Dhcp<4/6>ParserTest, dhcpQueueControl)
    TEST_F(Dhcp<4/6>ParserTest, dhcpQueueControlInvalid)

src/lib/dhcp/iface_mgr.*
    IfaceMgr
    - closeSockets()  - now calls stopDHCPReceiver

    - openSockets<4/6>() -  now calls startDHCPReceiver

    - receive<4/6>Indirect() - new function which monitors receiver
    thread watch sockets, reads DHCP packets from queue

    - receive<4/6>Direct() - new function which monitors and reads DHCP
    packets from interface sockets directly

    - receive<4/6>() - rewritten to call receive<4/6>Indirect
    if receiver thread is running, otherwise it calls receive<4/6>Direct

    - configureDHCPPacketQueue() - new function which either enables queuing
    by creating a new packet queue, or disables it by destroying the
    existing queue

src/lib/dhcp/packet_queue_mgr.h
    PacketQueue::destroyPacketQueue() - new function

src/lib/dhcp/packet_queue_mgr<4/6>.cc
    PacketQueueMgr<4/6>::PacketQueueMgr<4/6>() - no longer creates a
    default packet queue

src/lib/dhcpsrv/cfg_iface.cc
    CfgIface::closeSockets() - removed call to stopDHCPReceiver
    CfgIface::openSockets() - removed call to startDHCPReceiver

src/lib/dhcpsrv/parsers/dhcp_queue_control_parser.*
    DHCPQueueControlParser
    - removed unused family_ member
    - parse() - added support for enable-queue

src/lib/dhcpsrv/tests/dhcp_queue_control_parser_unittest.cc
    - new file
parent 2f823382
......@@ -634,24 +634,15 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) {
return (isc::config::createAnswer(1, err.str()));
}
// Configure packet queue
// Configure DHCP packet queueing
try {
data::ConstElementPtr qc;
qc = CfgMgr::instance().getStagingCfg()->getDHCPQueueControl();
if (!qc) {
// @todo For now we're manually constructing default queue config
// This probably needs to be built into the PQM?
data::ElementPtr default_qc = data::Element::createMap();
default_qc->set("queue-type", data::Element::create("kea-ring4"));
default_qc->set("capacity", data::Element::create(static_cast<long int>(500)));
PacketQueueMgr4::instance().createPacketQueue(default_qc);
} else {
PacketQueueMgr4::instance().createPacketQueue(qc);
if (IfaceMgr::instance().configureDHCPPacketQueue(AF_INET, qc)) {
LOG_DEBUG(dhcp4_logger, DBG_DHCP4_BASIC, DHCP4_CONFIG_PACKET_QUEUE)
.arg(PacketQueueMgr4::instance().getPacketQueue()->getInfoStr());
}
LOG_DEBUG(dhcp4_logger, DBG_DHCP4_BASIC, DHCP4_CONFIG_PACKET_QUEUE)
.arg(PacketQueueMgr4::instance().getPacketQueue()->getInfoStr());
} catch (const std::exception& ex) {
err << "Error setting packet queue controls after server reconfiguration: "
<< ex.what();
......
......@@ -1844,13 +1844,40 @@ dhcp_queue_control: DHCP_QUEUE_CONTROL {
ElementPtr qc = $4;
ctx.stack_.back()->set("dhcp-queue-control", qc);
if (!qc->contains("queue-type")) {
// Doing this manually, because dhcp-queue-control
// content is otherwise arbitrary
if (!qc->contains("enable-queue")) {
std::stringstream msg;
msg << "'queue-type' is required: ";
msg << "'enable-queue' is required: ";
msg << qc->getPosition().str() << ")";
error(@1, msg.str());
}
ConstElementPtr enable_queue = qc->get("enable-queue");
if (enable_queue->getType() != Element::boolean) {
std::stringstream msg;
msg << "'enable-queue' must be boolean: ";
msg << qc->getPosition().str() << ")";
error(@1, msg.str());
}
if (enable_queue->boolValue()) {
if (!qc->contains("queue-type")) {
std::stringstream msg;
msg << "'queue-type' is required, when 'enable-queue' is true: ";
msg << qc->getPosition().str() << ")";
error(@1, msg.str());
}
ConstElementPtr queue_type = qc->get("queue-type");
if (queue_type->getType() != Element::string) {
std::stringstream msg;
msg << "'queue-type' must be a string: ";
msg << qc->getPosition().str() << ")";
error(@1, msg.str());
}
}
ctx.leave();
};
......
......@@ -391,7 +391,7 @@ configureDhcp4Server(Dhcpv4Srv& server, isc::data::ConstElementPtr config_set,
}
if (config_pair.first == "dhcp-queue-control") {
DHCPQueueControlParser parser(AF_INET);
DHCPQueueControlParser parser;
srv_cfg->setDHCPQueueControl(parser.parse(config_pair.second));
continue;
}
......
......@@ -6488,20 +6488,33 @@ TEST_F(Dhcp4ParserTest, dhcpQueueControl) {
""
},
{
"valid entry",
"queue disabled",
"{ \n"
" \"queue-type\": \"some-type\", \n"
" \"capacity\": 75 \n"
" \"enable-queue\": false \n"
"} \n"
},
{
"queue disabled, arbitrary content allowed",
"{ \n"
" \"enable-queue\": false, \n"
" \"foo\": \"bogus\", \n"
" \"random-int\" : 1234 \n"
"} \n"
},
{
"queue enabled, with queue-type",
"{ \n"
" \"enable-queue\": true, \n"
" \"queue-type\": \"some-type\" \n"
"} \n"
},
{
"valid arbitrary content",
"queue enabled with queue-type and arbitrary content",
"{ \n"
" \"queue-type\": \"some-type\", \n"
" \"capacity\": 90, \n"
" \"user-context\": { \"comment\": \"some text\" },\n"
" \"random-bool\" : false, \n"
" \"random-int\" : 1234 \n"
" \"enable-queue\": true, \n"
" \"queue-type\": \"some-type\", \n"
" \"foo\": \"bogus\", \n"
" \"random-int\" : 1234 \n"
"} \n"
}
};
......@@ -6511,9 +6524,7 @@ TEST_F(Dhcp4ParserTest, dhcpQueueControl) {
control = CfgMgr::instance().getStagingCfg()->getDHCPQueueControl();
ASSERT_FALSE(control);
// Iterate over the incorrect scenarios and verify they
// fail as expected. Note, we use parseDHCP4() directly
// as all of the errors above are enforced by the grammar.
// Iterate over the valid scenarios and verify they succeed.
data::ConstElementPtr exp_elems;
for (auto scenario : scenarios) {
SCOPED_TRACE(scenario.description_);
......@@ -6558,24 +6569,43 @@ TEST_F(Dhcp4ParserTest, dhcpQueueControlInvalid) {
struct Scenario {
std::string description_;
std::string json_;
std::string exp_error_;
};
std::vector<Scenario> scenarios = {
{
"not a map",
"{ " + genIfaceConfig() + ", \n" +
" \"subnet4\": [ ], \n"
" \"dhcp-queue-control\": 75 \n"
"} \n"
"not a map",
"75 \n",
"<string>:2.24-25: syntax error, unexpected integer, expecting {"
},
{
"enable-queue missing",
"{ \n"
" \"enable-type\": \"some-type\" \n"
"} \n",
"<string>:2.2-21: 'enable-queue' is required: <string>:2:24)"
},
{
"queue type missing",
"{ " + genIfaceConfig() + ", \n" +
" \"subnet4\": [ ], \n"
" \"dhcp-queue-control\": { \n"
" \"capacity\": 100 \n"
" } \n"
"} \n"
"enable-queue not boolean",
"{ \n"
" \"enable-queue\": \"always\" \n"
"} \n",
"<string>:2.2-21: 'enable-queue' must be boolean: <string>:2:24)"
},
{
"queue enabled, type missing",
"{ \n"
" \"enable-queue\": true \n"
"} \n",
"<string>:2.2-21: 'queue-type' is required, when 'enable-queue' is true: <string>:2:24)"
},
{
"queue enabled, type not a string",
"{ \n"
" \"enable-queue\": true, \n"
" \"queue-type\": 7777 \n"
"} \n",
"<string>:2.2-21: 'queue-type' must be a string: <string>:2:24)"
}
};
......@@ -6585,10 +6615,23 @@ TEST_F(Dhcp4ParserTest, dhcpQueueControlInvalid) {
for (auto scenario : scenarios) {
SCOPED_TRACE(scenario.description_);
{
EXPECT_THROW(parseDHCP4(scenario.json_), Dhcp4ParseError);
// Construct the config JSON
std::stringstream os;
os << "{ " + genIfaceConfig();
os << ",\n \"dhcp-queue-control\": " << scenario.json_;
os << "} \n";
std::string error_msg = "";
try {
ASSERT_TRUE(parseDHCP4(os.str(), false)) << "parser returned empty element";
} catch(const std::exception& ex) {
error_msg = ex.what();
}
ASSERT_FALSE(error_msg.empty()) << "parseDHCP4 should have thrown";
EXPECT_EQ(scenario.exp_error_, error_msg);
}
}
}
}
......@@ -653,27 +653,18 @@ ControlledDhcpv6Srv::processConfig(isc::data::ConstElementPtr config) {
return (isc::config::createAnswer(1, err.str()));
}
// Configure DHCP packet queue
// Configure DHCP packet queueing
try {
data::ConstElementPtr qc;
qc = CfgMgr::instance().getStagingCfg()->getDHCPQueueControl();
if (!qc) {
// @todo For now we're manually constructing default queue config
// This probably needs to be built into the PQM?
data::ElementPtr default_qc = data::Element::createMap();
default_qc->set("queue-type", data::Element::create("kea-ring6"));
default_qc->set("capacity", data::Element::create(static_cast<long int>(500)));
PacketQueueMgr6::instance().createPacketQueue(default_qc);
} else {
PacketQueueMgr6::instance().createPacketQueue(qc);
if (IfaceMgr::instance().configureDHCPPacketQueue(AF_INET6, qc)) {
LOG_DEBUG(dhcp6_logger, DBG_DHCP6_BASIC, DHCP6_CONFIG_PACKET_QUEUE)
.arg(PacketQueueMgr6::instance().getPacketQueue()->getInfoStr());
}
LOG_DEBUG(dhcp6_logger, DBG_DHCP6_BASIC, DHCP6_CONFIG_PACKET_QUEUE)
.arg(PacketQueueMgr6::instance().getPacketQueue()->getInfoStr());
} catch (const std::exception& ex) {
std::ostringstream err;
err << "Error setting DHCP packet queue controls after server reconfiguration: "
err << "Error setting packet queue controls after server reconfiguration: "
<< ex.what();
return (isc::config::createAnswer(1, err.str()));
}
......
......@@ -1933,13 +1933,41 @@ dhcp_queue_control: DHCP_QUEUE_CONTROL {
ElementPtr qc = $4;
ctx.stack_.back()->set("dhcp-queue-control", qc);
if (!qc->contains("queue-type")) {
// Doing this manually, because dhcp-queue-control
// content is otherwise arbitrary
if (!qc->contains("enable-queue")) {
std::stringstream msg;
msg << "'queue-type' is required: ";
msg << "'enable-queue' is required: ";
msg << qc->getPosition().str() << ")";
error(@1, msg.str());
}
ConstElementPtr enable_queue = qc->get("enable-queue");
if (enable_queue->getType() != Element::boolean) {
std::stringstream msg;
msg << "'enable-queue' must be boolean: ";
msg << qc->getPosition().str() << ")";
error(@1, msg.str());
}
if (enable_queue->boolValue()) {
if (!qc->contains("queue-type")) {
std::stringstream msg;
msg << "'queue-type' is required, when 'enable-queue' is true: ";
msg << qc->getPosition().str() << ")";
error(@1, msg.str());
}
ConstElementPtr queue_type = qc->get("queue-type");
if (queue_type->getType() != Element::string) {
std::stringstream msg;
msg << "'queue-type' must be a string: ";
msg << qc->getPosition().str() << ")";
error(@1, msg.str());
}
}
ctx.leave();
};
......
......@@ -484,7 +484,7 @@ configureDhcp6Server(Dhcpv6Srv& server, isc::data::ConstElementPtr config_set,
}
if (config_pair.first == "dhcp-queue-control") {
DHCPQueueControlParser parser(AF_INET);
DHCPQueueControlParser parser;
srv_config->setDHCPQueueControl(parser.parse(config_pair.second));
continue;
}
......
......@@ -6993,20 +6993,33 @@ TEST_F(Dhcp6ParserTest, dhcpQueueControl) {
""
},
{
"valid entry",
"queue disabled",
"{ \n"
" \"queue-type\": \"some-type\", \n"
" \"capacity\": 75 \n"
" \"enable-queue\": false \n"
"} \n"
},
{
"queue disabled, arbitrary content allowed",
"{ \n"
" \"enable-queue\": false, \n"
" \"foo\": \"bogus\", \n"
" \"random-int\" : 1234 \n"
"} \n"
},
{
"valid arbitrary content",
"queue enabled, with queue-type",
"{ \n"
" \"queue-type\": \"some-type\", \n"
" \"capacity\": 90, \n"
" \"user-context\": { \"comment\": \"some text\" },\n"
" \"random-bool\" : false, \n"
" \"random-int\" : 1236 \n"
" \"enable-queue\": true, \n"
" \"queue-type\": \"some-type\" \n"
"} \n"
},
{
"queue enabled with queue-type and arbitrary content",
"{ \n"
" \"enable-queue\": true, \n"
" \"queue-type\": \"some-type\", \n"
" \"foo\": \"bogus\", \n"
" \"random-int\" : 1234 \n"
"} \n"
}
};
......@@ -7016,9 +7029,7 @@ TEST_F(Dhcp6ParserTest, dhcpQueueControl) {
control = CfgMgr::instance().getStagingCfg()->getDHCPQueueControl();
ASSERT_FALSE(control);
// Iterate over the incorrect scenarios and verify they
// fail as expected. Note, we use parseDHCP6() directly
// as all of the errors above are enforced by the grammar.
// Iterate over the valid scenarios and verify they succeed.
data::ConstElementPtr exp_elems;
for (auto scenario : scenarios) {
SCOPED_TRACE(scenario.description_);
......@@ -7063,24 +7074,43 @@ TEST_F(Dhcp6ParserTest, dhcpQueueControlInvalid) {
struct Scenario {
std::string description_;
std::string json_;
std::string exp_error_;
};
std::vector<Scenario> scenarios = {
{
"not a map",
"{ " + genIfaceConfig() + ", \n" +
" \"subnet6\": [ ], \n"
" \"dhcp-queue-control\": 75 \n"
"} \n"
"not a map",
"75 \n",
"<string>:2.24-25: syntax error, unexpected integer, expecting {"
},
{
"enable-queue missing",
"{ \n"
" \"enable-type\": \"some-type\" \n"
"} \n",
"<string>:2.2-21: 'enable-queue' is required: <string>:2:24)"
},
{
"enable-queue not boolean",
"{ \n"
" \"enable-queue\": \"always\" \n"
"} \n",
"<string>:2.2-21: 'enable-queue' must be boolean: <string>:2:24)"
},
{
"queue type missing",
"{ " + genIfaceConfig() + ", \n" +
" \"subnet6\": [ ], \n"
" \"dhcp-queue-control\": { \n"
" \"capacity\": 100 \n"
" } \n"
"} \n"
"queue enabled, type missing",
"{ \n"
" \"enable-queue\": true \n"
"} \n",
"<string>:2.2-21: 'queue-type' is required, when 'enable-queue' is true: <string>:2:24)"
},
{
"queue enabled, type not a string",
"{ \n"
" \"enable-queue\": true, \n"
" \"queue-type\": 7777 \n"
"} \n",
"<string>:2.2-21: 'queue-type' must be a string: <string>:2:24)"
}
};
......@@ -7090,7 +7120,21 @@ TEST_F(Dhcp6ParserTest, dhcpQueueControlInvalid) {
for (auto scenario : scenarios) {
SCOPED_TRACE(scenario.description_);
{
EXPECT_THROW(parseDHCP6(scenario.json_), Dhcp6ParseError);
// Construct the config JSON
std::stringstream os;
os << "{ " + genIfaceConfig();
os << ",\n \"dhcp-queue-control\": " << scenario.json_;
os << "} \n";
std::string error_msg = "";
try {
ASSERT_TRUE(parseDHCP6(os.str(), false)) << "parser returned empty element";
} catch(const std::exception& ex) {
error_msg = ex.what();
}
ASSERT_FALSE(error_msg.empty()) << "parseDHCP6 should have thrown";
EXPECT_EQ(scenario.exp_error_, error_msg);
}
}
}
......
......@@ -93,6 +93,7 @@ Iface::closeSockets(const uint16_t family) {
<< " specified when requested to close all sockets"
<< " which belong to this family");
}
// Search for the socket of the specific type.
SocketCollection::iterator sock = sockets_.begin();
while (sock != sockets_.end()) {
......@@ -282,6 +283,9 @@ Iface::countActive4() const {
}
void IfaceMgr::closeSockets() {
// Stops the receiver thread if there is one.
stopDHCPReceiver();
BOOST_FOREACH(IfacePtr iface, ifaces_) {
iface->closeSockets();
}
......@@ -293,11 +297,17 @@ void IfaceMgr::stopDHCPReceiver() {
receiver_thread_->wait();
receiver_thread_.reset();
error_watch_.clearReady();
}
receiver_error_ = "no error";
getPacketQueue4()->clear();
getPacketQueue4()->clear();
if (getPacketQueue4()) {
getPacketQueue4()->clear();
}
if (getPacketQueue6()) {
getPacketQueue6()->clear();
}
}
IfaceMgr::~IfaceMgr() {
......@@ -584,6 +594,12 @@ IfaceMgr::openSockets4(const uint16_t port, const bool use_bcast,
}
}
if (count > 0) {
// starts the receiver thread (if queueing is enabled);
startDHCPReceiver(AF_INET);
}
return (count > 0);
}
......@@ -662,6 +678,11 @@ IfaceMgr::openSockets6(const uint16_t port,
}
}
if (count > 0) {
// starts the receiver thread (if queueing is enabled);
startDHCPReceiver(AF_INET6);
}
return (count > 0);
}
......@@ -674,14 +695,14 @@ IfaceMgr::startDHCPReceiver(const uint16_t family) {
switch (family) {
case AF_INET:
if(!getPacketQueue4()) {
isc_throw(Unexpected, "startDHCPRecever - no packet queue?");
return;
}
receiver_thread_.reset(new Thread(boost::bind(&IfaceMgr::receiveDHCP4Packets, this)));
break;
case AF_INET6:
if(!getPacketQueue6()) {
isc_throw(Unexpected, "startDHCPRecever - no packet queue?");
return;
}
receiver_thread_.reset(new Thread(boost::bind(&IfaceMgr::receiveDHCP6Packets, this)));
......@@ -942,8 +963,15 @@ IfaceMgr::send(const Pkt4Ptr& pkt) {
return (packet_filter_->send(*iface, getSocket(*pkt).sockfd_, pkt));
}
Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) {
if (receiver_thread_) {
return (receive4Indirect(timeout_sec, timeout_usec));
}
return (receive4Direct(timeout_sec, timeout_usec));
}
Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) {
// Sanity check for microsecond timeout.
if (timeout_usec >= 1000000) {
isc_throw(BadValue, "fractional timeout must be shorter than"
......@@ -1051,7 +1079,233 @@ Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
return (pkt);
}
Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) {
Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) {
// Sanity check for microsecond timeout.
if (timeout_usec >= 1000000) {
isc_throw(BadValue, "fractional timeout must be shorter than"
" one million microseconds");
}
boost::scoped_ptr<SocketInfo> candidate;
IfacePtr iface;
fd_set sockets;
int maxfd = 0;
FD_ZERO(&sockets);
/// @todo: marginal performance optimization. We could create the set once
/// and then use its copy for select(). Please note that select() modifies
/// provided set to indicated which sockets have something to read.
BOOST_FOREACH(iface, ifaces_) {
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
// Only deal with IPv4 addresses.
if (s.addr_.isV4()) {
// Add this socket to listening set
FD_SET(s.sockfd_, &sockets);
if (maxfd < s.sockfd_) {
maxfd = s.sockfd_;
}
}
}
}
// if there are any callbacks for external sockets registered...
if (!callbacks_.empty()) {
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
FD_SET(s.socket_, &sockets);
if (maxfd < s.socket_) {
maxfd = s.socket_;
}
}
}
struct timeval select_timeout;
select_timeout.tv_sec = timeout_sec;
select_timeout.tv_usec = timeout_usec;
// zero out the errno to be safe
errno = 0;
int result = select(maxfd + 1, &sockets, NULL, NULL, &select_timeout);
if (result == 0) {
// nothing received and timeout has been reached
return (Pkt4Ptr()); // NULL
} else if (result < 0) {
// In most cases we would like to know whether select() returned
// an error because of a signal being received or for some other
// reason. This is because DHCP servers use signals to trigger
// certain actions, like reconfiguration or graceful shutdown.
// By catching a dedicated exception the caller will know if the
// error returned by the function is due to the reception of the
// signal or for some other reason.
if (errno == EINTR) {
isc_throw(SignalInterruptOnSelect, strerror(errno));
} else {