Commit 62de4b87 authored by Tomek Mrugalski's avatar Tomek Mrugalski 🛰
Browse files

[1651] Support for msgq implemented in b10-dhcp4 component.

parent 977c4a2b
...@@ -2041,7 +2041,7 @@ then change those defaults with config set Resolver/forward_addresses[0]/address ...@@ -2041,7 +2041,7 @@ then change those defaults with config set Resolver/forward_addresses[0]/address
run under BIND10 framework. To add a DHCPv4 process to the set of running run under BIND10 framework. To add a DHCPv4 process to the set of running
BIND10 services, you can use following commands in <command>bindctl</command>: BIND10 services, you can use following commands in <command>bindctl</command>:
<screen>&gt; <userinput>config add Boss/components b10-dhcp4</userinput> <screen>&gt; <userinput>config add Boss/components b10-dhcp4</userinput>
&gt; <userinput>config set Boss/components/b10-resolver/kind dispensable</userinput> &gt; <userinput>config set Boss/components/b10-dhcp4/kind dispensable</userinput>
&gt; <userinput>config commit</userinput></screen></para> &gt; <userinput>config commit</userinput></screen></para>
<para> <para>
......
...@@ -52,15 +52,23 @@ Dhcpv4Srv::Dhcpv4Srv(uint16_t port) { ...@@ -52,15 +52,23 @@ Dhcpv4Srv::Dhcpv4Srv(uint16_t port) {
} }
Dhcpv4Srv::~Dhcpv4Srv() { Dhcpv4Srv::~Dhcpv4Srv() {
cout << "DHCPv4 server shutdown." << endl; cout << "b10-dhcp4: DHCPv4 server terminating." << endl;
IfaceMgr::instance().closeSockets(); IfaceMgr::instance().closeSockets();
} }
void Dhcpv4Srv::shutdown() {
cout << "b10-dhcp4: DHCPv4 server shutdown." << endl;
shutdown_ = true;
}
bool bool
Dhcpv4Srv::run() { Dhcpv4Srv::run() {
while (!shutdown_) { while (!shutdown_) {
/// @todo: calculate actual timeout once we have lease database
int timeout = 1000;
// client's message and server's response // client's message and server's response
Pkt4Ptr query = IfaceMgr::instance().receive4(); Pkt4Ptr query = IfaceMgr::instance().receive4(timeout);
Pkt4Ptr rsp; Pkt4Ptr rsp;
if (query) { if (query) {
......
...@@ -60,6 +60,9 @@ class Dhcpv4Srv : public boost::noncopyable { ...@@ -60,6 +60,9 @@ class Dhcpv4Srv : public boost::noncopyable {
/// critical error. /// critical error.
bool run(); bool run();
/// @brief instructs server to shut down.
void shutdown();
protected: protected:
/// @brief Processes incoming DISCOVER and returns response. /// @brief Processes incoming DISCOVER and returns response.
/// ///
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include <dhcp4/spec_config.h> #include <dhcp4/spec_config.h>
#include <dhcp4/dhcp4_srv.h> #include <dhcp4/dhcp4_srv.h>
#include <dhcp/iface_mgr.h>
#include <asiolink/asiolink.h> #include <asiolink/asiolink.h>
#include <log/logger_support.h> #include <log/logger_support.h>
...@@ -65,8 +66,13 @@ usage() { ...@@ -65,8 +66,13 @@ usage() {
} }
} // end of anonymous namespace } // end of anonymous namespace
// Global objects are ugly, but that is the most convenient way of
// having it accessible from handlers.
IOService io_service; IOService io_service;
// The same applies to global pointers. Ugly, but useful.
Dhcpv4Srv* server = NULL;
ConstElementPtr ConstElementPtr
dhcp4_config_handler(ConstElementPtr new_config) { dhcp4_config_handler(ConstElementPtr new_config) {
cout << "b10-dhcp4: Received new config:" << new_config->str() << endl; cout << "b10-dhcp4: Received new config:" << new_config->str() << endl;
...@@ -80,6 +86,9 @@ dhcp4_command_handler(const string& command, ConstElementPtr args) { ...@@ -80,6 +86,9 @@ dhcp4_command_handler(const string& command, ConstElementPtr args) {
cout << "b10-dhcp4: Received new command: [" << command << "], args=" cout << "b10-dhcp4: Received new command: [" << command << "], args="
<< args->str() << endl; << args->str() << endl;
if (command == "shutdown") { if (command == "shutdown") {
if (server) {
server->shutdown();
}
io_service.stop(); io_service.stop();
ConstElementPtr answer = isc::config::createAnswer(0, ConstElementPtr answer = isc::config::createAnswer(0,
"Shutting down."); "Shutting down.");
...@@ -92,6 +101,9 @@ dhcp4_command_handler(const string& command, ConstElementPtr args) { ...@@ -92,6 +101,9 @@ dhcp4_command_handler(const string& command, ConstElementPtr args) {
return (answer); return (answer);
} }
void session_reader(void) {
io_service.run_one();
}
void establish_session() { void establish_session() {
...@@ -113,14 +125,18 @@ void establish_session() { ...@@ -113,14 +125,18 @@ void establish_session() {
config_session = new ModuleCCSession(specfile, *cc_session, config_session = new ModuleCCSession(specfile, *cc_session,
dhcp4_config_handler, dhcp4_config_handler,
dhcp4_command_handler, false); dhcp4_command_handler, false);
cout << "b10-dhcp4: hasQueuedMsgs()=" << config_session->hasQueuedMsgs() << endl;
config_session->start(); config_session->start();
cout << "b10-dhcp4: After session start." << endl;
cout << "b10-dhcp4: About to call io_service.run()" << endl; int ctrl_socket = cc_session->getSocketDesc();
io_service.run(); cout << "b10-dhcp4: Control session started, socket="
cout << "b10-dhcp4: Returned from io_service.run()" << endl; << ctrl_socket << endl;
IfaceMgr::instance().set_session_socket(ctrl_socket, session_reader);
// cout << "b10-dhcp4: About to call io_service.run()" << endl;
// io_service.run();
// cout << "b10-dhcp4: Returned from io_service.run()" << endl;
} }
int int
...@@ -159,9 +175,11 @@ main(int argc, char* argv[]) { ...@@ -159,9 +175,11 @@ main(int argc, char* argv[]) {
cout << "[b10-dhcp4] Initiating DHCPv4 server operation." << endl; cout << "[b10-dhcp4] Initiating DHCPv4 server operation." << endl;
Dhcpv4Srv* srv = new Dhcpv4Srv(); server = new Dhcpv4Srv();
server->run();
srv->run(); delete server;
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
cerr << "[b10-dhcp4] Server failed: " << ex.what() << endl; cerr << "[b10-dhcp4] Server failed: " << ex.what() << endl;
......
...@@ -88,6 +88,7 @@ public: ...@@ -88,6 +88,7 @@ public:
void startRead(boost::function<void()> user_handler); void startRead(boost::function<void()> user_handler);
void setTimeout(size_t seconds) { timeout_ = seconds; }; void setTimeout(size_t seconds) { timeout_ = seconds; };
size_t getTimeout() const { return timeout_; }; size_t getTimeout() const { return timeout_; };
int getSocketDesc();
long int sequence_; // the next sequence number to use long int sequence_; // the next sequence number to use
std::string lname_; std::string lname_;
...@@ -254,6 +255,13 @@ SessionImpl::internalRead(const asio::error_code& error, ...@@ -254,6 +255,13 @@ SessionImpl::internalRead(const asio::error_code& error,
} }
} }
int
SessionImpl::getSocketDesc() {
/// @todo boost 1.42 uses native() method, but it is deprecated
/// in 1.49 and native_handle() is recommended instead
return socket_.native();
}
Session::Session(asio::io_service& io_service) : Session::Session(asio::io_service& io_service) :
impl_(new SessionImpl(io_service)) impl_(new SessionImpl(io_service))
{} {}
...@@ -273,6 +281,11 @@ Session::startRead(boost::function<void()> read_callback) { ...@@ -273,6 +281,11 @@ Session::startRead(boost::function<void()> read_callback) {
impl_->startRead(read_callback); impl_->startRead(read_callback);
} }
int
Session::getSocketDesc() const {
return impl_->getSocketDesc();
}
namespace { // maybe unnecessary. namespace { // maybe unnecessary.
// This is a helper class to make the establish() method (below) exception-safe // This is a helper class to make the establish() method (below) exception-safe
// with the RAII approach. // with the RAII approach.
......
...@@ -141,6 +141,11 @@ namespace isc { ...@@ -141,6 +141,11 @@ namespace isc {
virtual bool hasQueuedMsgs() const; virtual bool hasQueuedMsgs() const;
virtual void setTimeout(size_t milliseconds); virtual void setTimeout(size_t milliseconds);
virtual size_t getTimeout() const; virtual size_t getTimeout() const;
/// @brief returns socket descriptor from underlying socket connection
///
/// @param returns socket descriptor used for session connection
virtual int getSocketDesc() const;
private: private:
void sendmsg(isc::data::ConstElementPtr msg); void sendmsg(isc::data::ConstElementPtr msg);
void sendmsg(isc::data::ConstElementPtr env, void sendmsg(isc::data::ConstElementPtr env,
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <string.h> #include <string.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <sys/select.h>
#include <dhcp/dhcp4.h> #include <dhcp/dhcp4.h>
#include <dhcp/dhcp6.h> #include <dhcp/dhcp6.h>
...@@ -121,7 +122,8 @@ bool IfaceMgr::Iface::delSocket(uint16_t sockfd) { ...@@ -121,7 +122,8 @@ bool IfaceMgr::Iface::delSocket(uint16_t sockfd) {
IfaceMgr::IfaceMgr() IfaceMgr::IfaceMgr()
:control_buf_len_(CMSG_SPACE(sizeof(struct in6_pktinfo))), :control_buf_len_(CMSG_SPACE(sizeof(struct in6_pktinfo))),
control_buf_(new char[control_buf_len_]) control_buf_(new char[control_buf_len_]),
session_socket_(0), session_callback_(NULL)
{ {
cout << "IfaceMgr initialization." << endl; cout << "IfaceMgr initialization." << endl;
...@@ -230,11 +232,13 @@ bool IfaceMgr::openSockets4(const uint16_t port) { ...@@ -230,11 +232,13 @@ bool IfaceMgr::openSockets4(const uint16_t port) {
iface!=ifaces_.end(); iface!=ifaces_.end();
++iface) { ++iface) {
cout << "Trying interface " << iface->getFullName() << endl; cout << "Trying opening socket on interface " << iface->getFullName() << endl;
if (iface->flag_loopback_ || if (iface->flag_loopback_ ||
!iface->flag_up_ || !iface->flag_up_ ||
!iface->flag_running_) { !iface->flag_running_) {
cout << "Interface " << iface->getFullName()
<< " not suitable: is loopback, is down or not running" << endl;
continue; continue;
} }
...@@ -681,13 +685,19 @@ IfaceMgr::send(const Pkt4Ptr& pkt) ...@@ -681,13 +685,19 @@ IfaceMgr::send(const Pkt4Ptr& pkt)
boost::shared_ptr<Pkt4> boost::shared_ptr<Pkt4>
IfaceMgr::receive4() { IfaceMgr::receive4(unsigned int timeout) {
const SocketInfo* candidate = 0; const SocketInfo* candidate = 0;
IfaceCollection::const_iterator iface; IfaceCollection::const_iterator iface;
fd_set sockets;
FD_ZERO(&sockets);
int maxfd = 0;
stringstream names;
for (iface = ifaces_.begin(); iface != ifaces_.end(); ++iface) { for (iface = ifaces_.begin(); iface != ifaces_.end(); ++iface) {
/// @todo: rewrite this as part of #1555
for (SocketCollection::const_iterator s = iface->sockets_.begin(); for (SocketCollection::const_iterator s = iface->sockets_.begin();
s != iface->sockets_.end(); ++s) { s != iface->sockets_.end(); ++s) {
...@@ -696,20 +706,82 @@ IfaceMgr::receive4() { ...@@ -696,20 +706,82 @@ IfaceMgr::receive4() {
continue; continue;
} }
// This address looks good. names << s->sockfd_ << "(" << iface->getName() << ") ";
if (!candidate) {
// add this socket to listening set
FD_SET(s->sockfd_, &sockets);
if (maxfd < s->sockfd_)
maxfd = s->sockfd_;
}
}
// if there is session socket registered...
if (session_socket_) {
// at it to the set as well
FD_SET(session_socket_, &sockets);
if (maxfd < session_socket_)
maxfd = session_socket_;
names << session_socket_ << "(session)";
}
/// @todo: implement sub-second precision one day
struct timeval select_timeout;
select_timeout.tv_sec = timeout;
select_timeout.tv_usec = 0;
cout << "Trying to receive data on sockets: " << names.str()
<< ". Timeout is " << timeout << " seconds." << endl;
int result = select(maxfd + 1, &sockets, NULL, NULL, &select_timeout);
cout << "select returned " << result << endl;
if (result == 0) {
// nothing received and timeout has been reached
return (Pkt4Ptr()); // NULL
}
if (result < 0) {
char buf[512];
strncpy(buf, strerror(errno), 512);
cout << "Socket read error: " << buf << endl;
/// @todo: perhaps throw here?
return (Pkt4Ptr()); // NULL
}
// Let's find out which socket has the data
if (session_socket_ && (FD_ISSET(session_socket_, &sockets))) {
// something received over session socket
cout << "BIND10 command or config available over session socket." << endl;
if (session_callback_) {
// in theory we could call io_service.run_one() here, instead of
// implementing callback mechanism, but that would introduce
// asiolink dependency to libdhcp++ and that is something we want
// to avoid (see CPE market and out long term plans for minimalistic
// implementations.
session_callback_();
}
return (Pkt4Ptr()); // NULL
}
// Let's find out which interface/socket has the data
for (iface = ifaces_.begin(); iface != ifaces_.end(); ++iface) {
for (SocketCollection::const_iterator s = iface->sockets_.begin();
s != iface->sockets_.end(); ++s) {
if (FD_ISSET(s->sockfd_, &sockets)) {
candidate = &(*s); candidate = &(*s);
break; break;
} }
} }
if (candidate) { if (candidate) {
break; break;
} }
} }
if (!candidate) { if (!candidate) {
isc_throw(Unexpected, "Failed to find any suitable sockets on all interfaces."); cout << "Received data over unknown socket." << endl;
return (Pkt4Ptr()); // NULL
} }
cout << "Trying to receive over UDP4 socket " << candidate->sockfd_ << " bound to " cout << "Trying to receive over UDP4 socket " << candidate->sockfd_ << " bound to "
...@@ -746,7 +818,7 @@ IfaceMgr::receive4() { ...@@ -746,7 +818,7 @@ IfaceMgr::receive4() {
m.msg_control = &control_buf_[0]; m.msg_control = &control_buf_[0];
m.msg_controllen = control_buf_len_; m.msg_controllen = control_buf_len_;
int result = recvmsg(candidate->sockfd_, &m, 0); result = recvmsg(candidate->sockfd_, &m, 0);
if (result < 0) { if (result < 0) {
cout << "Failed to receive UDP4 data." << endl; cout << "Failed to receive UDP4 data." << endl;
return (Pkt4Ptr()); // NULL return (Pkt4Ptr()); // NULL
......
...@@ -39,6 +39,9 @@ public: ...@@ -39,6 +39,9 @@ public:
/// type that defines list of addresses /// type that defines list of addresses
typedef std::vector<isc::asiolink::IOAddress> AddressCollection; typedef std::vector<isc::asiolink::IOAddress> AddressCollection;
/// defines callback used when commands are received over control session
typedef void (*SessionCallback) (void);
/// maximum MAC address length (Infiniband uses 20 bytes) /// maximum MAC address length (Infiniband uses 20 bytes)
static const unsigned int MAX_MAC_LEN = 20; static const unsigned int MAX_MAC_LEN = 20;
...@@ -351,12 +354,10 @@ public: ...@@ -351,12 +354,10 @@ public:
/// If reception is successful and all information about its sender /// If reception is successful and all information about its sender
/// are obtained, Pkt4 object is created and returned. /// are obtained, Pkt4 object is created and returned.
/// ///
/// TODO Start using select() and add timeout to be able /// @param timeout specifies timeout (in seconds)
/// to not wait infinitely, but rather do something useful
/// (e.g. remove expired leases)
/// ///
/// @return Pkt4 object representing received packet (or NULL) /// @return Pkt4 object representing received packet (or NULL)
Pkt4Ptr receive4(); Pkt4Ptr receive4(unsigned int timeout);
/// Opens UDP/IP socket and binds it to address, interface and port. /// Opens UDP/IP socket and binds it to address, interface and port.
/// ///
...@@ -401,6 +402,18 @@ public: ...@@ -401,6 +402,18 @@ public:
/// @return number of detected interfaces /// @return number of detected interfaces
uint16_t countIfaces() { return ifaces_.size(); } uint16_t countIfaces() { return ifaces_.size(); }
/// @brief Sets session socket and a callback
///
/// Specifies session socket and a callback that will be called
/// when data will be received over that socket.
///
/// @param socketfd socket descriptor
/// @param callback callback function
void set_session_socket(int socketfd, SessionCallback callback) {
session_socket_ = socketfd;
session_callback_ = callback;
}
// don't use private, we need derived classes in tests // don't use private, we need derived classes in tests
protected: protected:
...@@ -487,7 +500,6 @@ protected: ...@@ -487,7 +500,6 @@ protected:
/// control-buffer, used in transmission and reception /// control-buffer, used in transmission and reception
boost::scoped_array<char> control_buf_; boost::scoped_array<char> control_buf_;
/// @brief A wrapper for OS-specific operations before sending IPv4 packet /// @brief A wrapper for OS-specific operations before sending IPv4 packet
/// ///
/// @param m message header (will be later used for sendmsg() call) /// @param m message header (will be later used for sendmsg() call)
...@@ -505,6 +517,11 @@ protected: ...@@ -505,6 +517,11 @@ protected:
/// @return true if successful, false otherwise /// @return true if successful, false otherwise
bool os_receive4(struct msghdr& m, Pkt4Ptr& pkt); bool os_receive4(struct msghdr& m, Pkt4Ptr& pkt);
/// socket descriptor of the session socket
int session_socket_;
/// a callback that will be called when data arrives over session_socket_
SessionCallback session_callback_;
private: private:
/// @brief Creates a single instance of this class (a singleton implementation) /// @brief Creates a single instance of this class (a singleton implementation)
......
...@@ -428,7 +428,7 @@ TEST_F(IfaceMgrTest, sendReceive4) { ...@@ -428,7 +428,7 @@ TEST_F(IfaceMgrTest, sendReceive4) {
EXPECT_EQ(true, ifacemgr->send(sendPkt)); EXPECT_EQ(true, ifacemgr->send(sendPkt));
rcvPkt = ifacemgr->receive4(); rcvPkt = ifacemgr->receive4(10);
ASSERT_TRUE(rcvPkt); // received our own packet ASSERT_TRUE(rcvPkt); // received our own packet
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment