Commit 21cac7d0 authored by Thomas Markwalder's avatar Thomas Markwalder

[#42,!103] Initial receiver thread and packet queuing

New files:
    src/lib/dhcp -
    packet_queue.h - defines packet queuing template classes
    socket_info.h - contains existing class extracted iface_mgr.h
    tests/packet_queue4_unittest.cc
    tests/packet_queue6_unittest.cc

src/lib/dhcp/iface_mgr.*
IfaceMgr:: - new functions
    - receiveDHCP<4/6>Packets() - thread worker function which
    monitors interface sockets, enqueues packets as they are read

    - receiveDHCP<4/6>Packet() - reads a single packet from a socket

    - startDHCPReceiver(const uint16_t family) - runs
    receiveDHCP<4/6?appropriate worker function in a thread

    - stopReceiver() - stops the receiver thread

    - setPacketQueue<4/6> - replaces the default packet queue instance

    receiveDHCP<4/6>() - modified to monitor receiver watch
    socekts rather than interface sockets.  Dequeue packets
    from packet queue.

src/lib/dhcp/tests/iface_mgr_unittest.cc
    TEST_F(IfaceMgrTest, packetQueue4)
    TEST_F(IfaceMgrTest, packetQueue6)

src/lib/dhcpsrv/cfg_iface.cc
    CfgIface::openSockets() - starts DHCP receiver
    CfgIface::closeSockets() - stops DHCP receiver
parent 1d34b296
......@@ -43,6 +43,7 @@ libkea_dhcp___la_SOURCES += option_space_container.h
libkea_dhcp___la_SOURCES += option_string.cc option_string.h
libkea_dhcp___la_SOURCES += option_vendor.cc option_vendor.h
libkea_dhcp___la_SOURCES += option_vendor_class.cc option_vendor_class.h
libkea_dhcp___la_SOURCES += packet_queue.h
libkea_dhcp___la_SOURCES += pkt.cc pkt.h
libkea_dhcp___la_SOURCES += pkt4.cc pkt4.h
libkea_dhcp___la_SOURCES += pkt4o6.cc pkt4o6.h
......@@ -51,6 +52,7 @@ libkea_dhcp___la_SOURCES += pkt_filter.h pkt_filter.cc
libkea_dhcp___la_SOURCES += pkt_filter6.h pkt_filter6.cc
libkea_dhcp___la_SOURCES += pkt_filter_inet.cc pkt_filter_inet.h
libkea_dhcp___la_SOURCES += pkt_filter_inet6.cc pkt_filter_inet6.h
libkea_dhcp___la_SOURCES += socket_info.h
# Utilize Linux Packet Filtering on Linux.
if OS_LINUX
......
......@@ -19,6 +19,7 @@
#include <boost/foreach.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/bind.hpp>
#include <cstring>
#include <errno.h>
......@@ -28,11 +29,21 @@
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/select.h>
#ifndef FD_COPY
#define FD_COPY(orig, copy) \
do { \
memmove(copy, orig, sizeof(fd_set)); \
} while (0)
#endif
using namespace std;
using namespace isc::asiolink;
using namespace isc::util;
using namespace isc::util::thread;
using namespace isc::util::io;
using namespace isc::util::io::internal;
namespace isc {
......@@ -173,16 +184,16 @@ IfaceMgr::IfaceMgr()
packet_filter_(new PktFilterInet()),
packet_filter6_(new PktFilterInet6()),
test_mode_(false),
allow_loopback_(false)
{
allow_loopback_(false),
receiver_error_("no error"),
packet_queue4_(new PacketQueueRing4()),
packet_queue6_(new PacketQueueRing6()) {
try {
// required for sending/receiving packets
// let's keep it in front, just in case someone
// wants to send anything during initialization
// control_buf_ = boost::scoped_array<char>();
detectIfaces();
} catch (const std::exception& ex) {
......@@ -269,17 +280,33 @@ void IfaceMgr::closeSockets() {
}
}
void
IfaceMgr::closeSockets(const uint16_t family) {
BOOST_FOREACH(IfacePtr iface, ifaces_) {
iface->closeSockets(family);
void IfaceMgr::stopReceiver() {
if (receiver_thread_) {
terminate_watch_.markReady();
receiver_thread_->wait();
receiver_thread_.reset();
error_watch_.clearReady();
}
receiver_error_ = "no error";
if (packet_queue4_) {
packet_queue4_->clear();
}
if (packet_queue6_) {
packet_queue6_->clear();
}
}
void
IfaceMgr::closeSockets(const uint16_t) {
isc_throw(NotImplemented, "closeSockets(family) is obsolete");
}
IfaceMgr::~IfaceMgr() {
// control_buf_ is deleted automatically (scoped_ptr)
control_buf_len_ = 0;
stopReceiver();
closeSockets();
}
......@@ -636,6 +663,25 @@ IfaceMgr::openSockets6(const uint16_t port,
return (count > 0);
}
void
IfaceMgr::startDHCPReceiver(const uint16_t family) {
if (receiver_thread_) {
isc_throw(Unexpected, "a receiver thread already exits");
}
switch (family) {
case AF_INET:
receiver_thread_.reset(new Thread(boost::bind(&IfaceMgr::receiveDHCP4Packets, this)));
break;
case AF_INET6:
receiver_thread_.reset(new Thread(boost::bind(&IfaceMgr::receiveDHCP6Packets, this)));
break;
default:
isc_throw (BadValue, "startDHCPReceiver: invalid family: " << family);
break;
}
}
void
IfaceMgr::printIfaces(std::ostream& out /*= std::cout*/) {
BOOST_FOREACH(IfacePtr iface, ifaces_) {
......@@ -893,31 +939,11 @@ Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
isc_throw(BadValue, "fractional timeout must be shorter than"
" one million microseconds");
}
boost::scoped_ptr<SocketInfo> candidate;
IfacePtr iface;
fd_set sockets;
int maxfd = 0;
FD_ZERO(&sockets);
/// @todo: marginal performance optimization. We could create the set once
/// and then use its copy for select(). Please note that select() modifies
/// provided set to indicated which sockets have something to read.
BOOST_FOREACH(iface, ifaces_) {
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
// Only deal with IPv4 addresses.
if (s.addr_.isV4()) {
// Add this socket to listening set
FD_SET(s.sockfd_, &sockets);
if (maxfd < s.sockfd_) {
maxfd = s.sockfd_;
}
}
}
}
// if there are any callbacks for external sockets registered...
if (!callbacks_.empty()) {
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
......@@ -928,16 +954,31 @@ Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
}
}
// add watch sockets.
FD_SET(receive_watch_.getSelectFd(), &sockets);
if (maxfd < receive_watch_.getSelectFd()) {
maxfd = receive_watch_.getSelectFd();
}
FD_SET(error_watch_.getSelectFd(), &sockets);
if (maxfd < error_watch_.getSelectFd()) {
maxfd = error_watch_.getSelectFd();
}
struct timeval select_timeout;
select_timeout.tv_sec = timeout_sec;
select_timeout.tv_usec = timeout_usec;
if (packet_queue4_->empty()) {
select_timeout.tv_sec = timeout_sec;
select_timeout.tv_usec = timeout_usec;
} else {
select_timeout.tv_sec = 0;
select_timeout.tv_usec = 0;
}
// zero out the errno to be safe
errno = 0;
int result = select(maxfd + 1, &sockets, NULL, NULL, &select_timeout);
if (result == 0) {
if ((result == 0) && packet_queue4_->empty()) {
// nothing received and timeout has been reached
return (Pkt4Ptr()); // NULL
......@@ -956,6 +997,13 @@ Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
}
}
// Check errors.
if (FD_ISSET(error_watch_.getSelectFd(), &sockets)) {
string msg = receiver_error_;
error_watch_.clearReady();
isc_throw(SocketReadError, msg);
}
// Let's find out which socket has the data
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
if (!FD_ISSET(s.socket_, &sockets)) {
......@@ -974,26 +1022,16 @@ Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
return (Pkt4Ptr());
}
// Let's find out which interface/socket has the data
BOOST_FOREACH(iface, ifaces_) {
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
if (FD_ISSET(s.sockfd_, &sockets)) {
candidate.reset(new SocketInfo(s));
break;
}
}
if (candidate) {
break;
// Protected packet queue access.
{
Mutex::Locker lock(receiver_lock_);
Pkt4Ptr pkt = packet_queue4_->dequeuePacket();
if (!pkt) {
receive_watch_.clearReady();
}
}
if (!candidate) {
isc_throw(SocketReadError, "received data over unknown socket");
return (pkt);
}
// Now we have a socket, let's get some data from it!
// Assuming that packet filter is not NULL, because its modifier checks it.
return (packet_filter_->receive(*iface, *candidate));
}
Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) {
......@@ -1003,30 +1041,11 @@ Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
" one million microseconds");
}
boost::scoped_ptr<SocketInfo> candidate;
fd_set sockets;
int maxfd = 0;
FD_ZERO(&sockets);
/// @todo: marginal performance optimization. We could create the set once
/// and then use its copy for select(). Please note that select() modifies
/// provided set to indicated which sockets have something to read.
BOOST_FOREACH(IfacePtr iface, ifaces_) {
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
// Only deal with IPv6 addresses.
if (s.addr_.isV6()) {
// Add this socket to listening set
FD_SET(s.sockfd_, &sockets);
if (maxfd < s.sockfd_) {
maxfd = s.sockfd_;
}
}
}
}
// if there are any callbacks for external sockets registered...
if (!callbacks_.empty()) {
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
......@@ -1038,16 +1057,31 @@ Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
}
}
// add watch sockets.
FD_SET(receive_watch_.getSelectFd(), &sockets);
if (maxfd < receive_watch_.getSelectFd()) {
maxfd = receive_watch_.getSelectFd();
}
FD_SET(error_watch_.getSelectFd(), &sockets);
if (maxfd < error_watch_.getSelectFd()) {
maxfd = error_watch_.getSelectFd();
}
struct timeval select_timeout;
select_timeout.tv_sec = timeout_sec;
select_timeout.tv_usec = timeout_usec;
if (packet_queue6_->empty()) {
select_timeout.tv_sec = timeout_sec;
select_timeout.tv_usec = timeout_usec;
} else {
select_timeout.tv_sec = 0;
select_timeout.tv_usec = 0;
}
// zero out the errno to be safe
errno = 0;
int result = select(maxfd + 1, &sockets, NULL, NULL, &select_timeout);
if (result == 0) {
if ((result == 0) && packet_queue6_->empty()) {
// nothing received and timeout has been reached
return (Pkt6Ptr()); // NULL
......@@ -1066,6 +1100,13 @@ Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
}
}
// Check errors.
if (FD_ISSET(error_watch_.getSelectFd(), &sockets)) {
string msg = receiver_error_;
error_watch_.clearReady();
isc_throw(SocketReadError, msg);
}
// Let's find out which socket has the data
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
if (!FD_ISSET(s.socket_, &sockets)) {
......@@ -1084,24 +1125,245 @@ Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
return (Pkt6Ptr());
}
// Let's find out which interface/socket has the data
BOOST_FOREACH(IfacePtr iface, ifaces_) {
// Protected DHCP packet queue access.
{
Mutex::Locker lock(receiver_lock_);
Pkt6Ptr pkt = packet_queue6_->dequeuePacket();
if (!pkt) {
receive_watch_.clearReady();
}
return (pkt);
}
}
void IfaceMgr::receiveDHCP4Packets() {
IfacePtr iface;
fd_set sockets;
int maxfd = 0;
FD_ZERO(&sockets);
// Add terminate watch socket.
FD_SET(terminate_watch_.getSelectFd(), &sockets);
if (maxfd < terminate_watch_.getSelectFd()) {
maxfd = terminate_watch_.getSelectFd();
}
// Add Interface sockets.
BOOST_FOREACH(iface, ifaces_) {
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
// Only deal with IPv4 addresses.
if (s.addr_.isV4()) {
// Add this socket to listening set.
FD_SET(s.sockfd_, &sockets);
if (maxfd < s.sockfd_) {
maxfd = s.sockfd_;
}
}
}
}
for (;;) {
// Check the watch socket.
if (terminate_watch_.isReady()) {
terminate_watch_.clearReady();
return;
}
fd_set rd_set;
FD_COPY(&sockets, &rd_set);
// zero out the errno to be safe.
errno = 0;
// Note we wait until something happen.
int result = select(maxfd + 1, &rd_set, 0, 0, 0);
// Re-check the watch socket.
if (terminate_watch_.isReady()) {
terminate_watch_.clearReady();
return;
}
if (result == 0) {
// nothing received?
continue;
} else if (result < 0) {
// This thread should not get signals?
if (errno != EINTR) {
// Signal the error to receive4.
receiver_error_ = strerror(errno);
error_watch_.markReady();
sleep(1);
}
continue;
}
// Let's find out which interface/socket has data.
BOOST_FOREACH(iface, ifaces_) {
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
if (FD_ISSET(s.sockfd_, &sockets)) {
receiveDHCP4Packet(*iface, s);
// Can take time so check one more time the watch socket.
if (terminate_watch_.isReady()) {
terminate_watch_.clearReady();
return;
}
}
}
}
}
}
void IfaceMgr::receiveDHCP6Packets() {
IfacePtr iface;
fd_set sockets;
int maxfd = 0;
FD_ZERO(&sockets);
// Add terminate watch socket.
FD_SET(terminate_watch_.getSelectFd(), &sockets);
if (maxfd < terminate_watch_.getSelectFd()) {
maxfd = terminate_watch_.getSelectFd();
}
// Add Interface sockets.
BOOST_FOREACH(iface, ifaces_) {
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
if (FD_ISSET(s.sockfd_, &sockets)) {
candidate.reset(new SocketInfo(s));
break;
// Only deal with IPv6 addresses.
if (s.addr_.isV6()) {
// Add this socket to listening set.
FD_SET(s.sockfd_, &sockets);
if (maxfd < s.sockfd_) {
maxfd = s.sockfd_;
}
}
}
if (candidate) {
break;
}
for (;;) {
// Check the watch socket.
if (terminate_watch_.isReady()) {
terminate_watch_.clearReady();
return;
}
fd_set rd_set;
FD_COPY(&sockets, &rd_set);
// zero out the errno to be safe.
errno = 0;
// Note we wait until something happen.
int result = select(maxfd + 1, &rd_set, 0, 0, 0);
// Re-check the watch socket.
if (terminate_watch_.isReady()) {
terminate_watch_.clearReady();
return;
}
if (result == 0) {
// nothing received?
continue;
} else if (result < 0) {
// This thread should not get signals?
if (errno != EINTR) {
// Signal the error to receive6.
receiver_error_ = strerror(errno);
error_watch_.markReady();
sleep(1);
}
continue;
}
// Let's find out which interface/socket has data.
BOOST_FOREACH(iface, ifaces_) {
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
if (FD_ISSET(s.sockfd_, &sockets)) {
receiveDHCP6Packet(s);
// Can take time so check one more time the watch socket.
if (terminate_watch_.isReady()) {
terminate_watch_.clearReady();
return;
}
}
}
}
}
}
void IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) {
int len;
int result = ioctl(socket_info.sockfd_, FIONREAD, &len);
if (result < 0) {
// Signal the error to receive4.
receiver_error_ = strerror(errno);
error_watch_.markReady();
return;
}
if (len == 0) {
// Nothing to read.
return;
}
Pkt4Ptr pkt;
try {
pkt = packet_filter_->receive(iface, socket_info);
} catch (const std::exception& ex) {
receiver_error_ = ex.what();
error_watch_.markReady();
} catch (...) {
receiver_error_ = "packet filter receive() failed";
error_watch_.markReady();
}
if (!candidate) {
isc_throw(SocketReadError, "received data over unknown socket");
if (pkt) {
Mutex::Locker lock(receiver_lock_);
packet_queue4_->enqueuePacket(pkt, socket_info);
receive_watch_.markReady();
}
}
void IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) {
int len;
int result = ioctl(socket_info.sockfd_, FIONREAD, &len);
if (result < 0) {
// Signal the error to receive6.
receiver_error_ = strerror(errno);
error_watch_.markReady();
return;
}
if (len == 0) {
// Nothing to read.
return;
}
Pkt6Ptr pkt;
try {
pkt = packet_filter6_->receive(socket_info);
} catch (const std::exception& ex) {
receiver_error_ = ex.what();
error_watch_.markReady();
} catch (...) {
receiver_error_ = "packet filter receive() failed";
error_watch_.markReady();
}
if (pkt) {
Mutex::Locker lock(receiver_lock_);
packet_queue6_->enqueuePacket(pkt, socket_info);
receive_watch_.markReady();
}
// Assuming that packet filter is not NULL, because its modifier checks it.
return (packet_filter6_->receive(*candidate));
}
uint16_t IfaceMgr::getSocket(const isc::dhcp::Pkt6& pkt) {
......@@ -1193,5 +1455,51 @@ IfaceMgr::getSocket(isc::dhcp::Pkt4 const& pkt) {
return (*candidate);
}
void
IfaceMgr::setPacketQueue4(PacketQueue4Ptr& packet_queue4) {
if (!packet_queue4) {
isc_throw(BadValue, "IfaceMgr::setPacketQueue4 "
" queue pointer cannot be empty");
}
// On the off chance the existing impl doesn't clear on
// destruction, we will as a safe guard.
packet_queue4_->clear();
packet_queue4_ = packet_queue4;
}
void
IfaceMgr::setPacketQueue6(PacketQueue6Ptr& packet_queue6) {
if (!packet_queue6) {
isc_throw(BadValue, "IfaceMgr::setPacketQueue6 "
" queue pointer cannot be empty");
}
// On the off chance the existing impl doesn't clear on
// destruction, we will as a safe guard.
packet_queue6_->clear();
packet_queue6_ = packet_queue6;
}
size_t
IfaceMgr::getPacketQueueCapacity4() const {
return (packet_queue4_->getCapacity());
}
void
IfaceMgr::setPacketQueueCapacity4(const size_t new_capacity) {
packet_queue4_->setCapacity(new_capacity);
}
size_t
IfaceMgr::getPacketQueueCapacity6() const {
return (packet_queue6_->getCapacity());
}
void
IfaceMgr::setPacketQueueCapacity6(const size_t new_capacity) {
packet_queue6_->setCapacity(new_capacity);
}
} // end of namespace isc::dhcp
} // end of namespace isc
......@@ -12,14 +12,19 @@
#include <dhcp/dhcp6.h>
#include <dhcp/pkt4.h>
#include <dhcp/pkt6.h>
#include <dhcp/packet_queue.h>
#include <dhcp/pkt_filter.h>
#include <dhcp/pkt_filter6.h>
#include <util/optional_value.h>
#include <util/watch_socket.h>
#include <util/threads/thread.h>
#include <util/threads/sync.h>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_array.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/circular_buffer.hpp>