Commit f205b568 authored by Thomas Markwalder's avatar Thomas Markwalder
Browse files

[#260,!120] Moved thread creation from Receiver ctor to new start function

    dhcp::Receiver - moved creating receiver's thread out of the constructor
    to new Receier::start() method.  This alleviates issues with thread worker
    functions referring to Receiver members before Receiver fully exists.
parent f72ce56b
......@@ -179,7 +179,8 @@ bool Iface::delSocket(const uint16_t sockfd) {
return (false); // socket not found
}
Receiver::Receiver(const boost::function<void()>& thread_main) {
void
Receiver::start(const boost::function<void()>& thread_main) {
clearReady(RCV_ERROR);
clearReady(RCV_READY);
clearReady(RCV_TERMINATE);
......@@ -219,9 +220,12 @@ Receiver::shouldTerminate() {
void
Receiver::stop() {
markReady(RCV_TERMINATE);
thread_->wait();
thread_.reset();
if (thread_) {
markReady(RCV_TERMINATE);
thread_->wait();
thread_.reset();
}
clearReady(RCV_ERROR);
clearReady(RCV_READY);
last_error_ = "thread stopped";
......@@ -353,14 +357,14 @@ void IfaceMgr::stopDHCPReceiver() {
if (isReceiverRunning()) {
receiver_->stop();
receiver_.reset();
}
if (getPacketQueue4()) {
getPacketQueue4()->clear();
}
if (getPacketQueue4()) {
getPacketQueue4()->clear();
}
if (getPacketQueue6()) {
getPacketQueue6()->clear();
if (getPacketQueue6()) {
getPacketQueue6()->clear();
}
}
}
......@@ -369,7 +373,6 @@ IfaceMgr::~IfaceMgr() {
control_buf_len_ = 0;
closeSockets();
// Explicitly delete PQM singletons.
PacketQueueMgr4::destroy();
PacketQueueMgr6::destroy();
......@@ -754,7 +757,9 @@ IfaceMgr::startDHCPReceiver(const uint16_t family) {
return;
}
receiver_.reset(new Receiver(boost::bind(boost::bind(&IfaceMgr::receiveDHCP4Packets, this))));
receiver_.reset(new Receiver());
receiver_->start(boost::bind(boost::bind(&IfaceMgr::receiveDHCP4Packets, this)));
break;
case AF_INET6:
// If there's no queue, then has been disabled, simply return.
......@@ -762,7 +767,8 @@ IfaceMgr::startDHCPReceiver(const uint16_t family) {
return;
}
receiver_.reset(new Receiver(boost::bind(boost::bind(&IfaceMgr::receiveDHCP6Packets, this))));
receiver_.reset(new Receiver());
receiver_->start(boost::bind(boost::bind(&IfaceMgr::receiveDHCP6Packets, this)));
break;
default:
isc_throw (BadValue, "startDHCPReceiver: invalid family: " << family);
......
......@@ -475,15 +475,10 @@ public:
};
/// @brief Constructor
///
/// It initializes the watch sockets and then instantiates and
/// starts the receiver's worker thread.
///
/// @param thread_main function the receiver's thread should run
Receiver(const boost::function<void()>& thread_main);
Receiver(){};
/// @brief Virtual destructor
virtual ~Receiver() {}
virtual ~Receiver(){}
/// @brief Fetches the fd of a watch socket
///
......@@ -516,6 +511,22 @@ public:
/// @return true if the terminate watch socket is ready
bool shouldTerminate();
/// @brief Creates and runs the thread.
///
/// Creates teh receiver's thread, passing into it the given
/// function to run.
///
/// @param thread_main function the receiver's thread should run
void start(const boost::function<void()>& thread_main);
/// @brief Returns true if the receiver thread is running
/// @todo - this may need additional logic to handle cases where
/// a thread function exits w/o the caller invoking @c
/// Receiver::stop().
bool isRunning() {
return (thread_ != 0);
}
/// @brief Terminates the receiver thread
///
/// It marks the terminate watch socket ready, and then waits for the
......@@ -1152,9 +1163,10 @@ public:
/// the packet queue is flushed.
void stopDHCPReceiver();
/// @brief Returns true if there is a receiver currently running.
/// @brief Returns true if there is a receiver exists and its
/// thread is currently running.
bool isReceiverRunning() const {
return (receiver_ != 0);
return (receiver_ != 0 && receiver_->isRunning());
}
/// @brief Configures DHCP packet queue
......
......@@ -68,7 +68,7 @@ public:
/// @return true if the queue type has been successfully registered, false
/// if the type already exists.
bool registerPacketQueueFactory(const std::string& queue_type,
const Factory& factory) {
Factory factory) {
// Check if this backend has been already registered.
if (factories_.count(queue_type)) {
return (false);
......
......@@ -3265,8 +3265,11 @@ TEST_F(ReceiverTest, receiverClassBasics) {
/// We'll create a receiver and let it run until it expires. (Note this is more
/// of a test of ReceiverTest itself and ensures our tests later for why we
/// exited are sound.)
receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
Receiver::RCV_TERMINATE))));
receiver_.reset(new Receiver());
ASSERT_FALSE(receiver_->isRunning());
receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_TERMINATE));
ASSERT_TRUE(receiver_->isRunning());
// Wait long enough for thread to expire.
nap(WORKER_MAX_PASSES + 1);
......@@ -3279,10 +3282,18 @@ TEST_F(ReceiverTest, receiverClassBasics) {
ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
EXPECT_EQ("thread expired", receiver_->getLastError());
// This one is a little wonky, as a thread function expiring needs to be
// supported in Receiver. There needs to be something in Receiver, so it
// nows the thread exited. Thread exists but I think it's underlying
// impl does not.
EXPECT_TRUE(receiver_->isRunning());
ASSERT_NO_THROW(receiver_->stop());
/// Now we'll test stopping a thread.
/// We'll create a Receiver, let it run a little and then tell it to stop.
receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
Receiver::RCV_TERMINATE))));
/// Start the receiver, let it run a little and then tell it to stop.
receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_TERMINATE));
ASSERT_TRUE(receiver_->isRunning());
// No watches should be ready.
ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
......@@ -3293,6 +3304,7 @@ TEST_F(ReceiverTest, receiverClassBasics) {
// Tell it to stop.
receiver_->stop();
ASSERT_FALSE(receiver_->isRunning());
// It should have done less than the maximum number of passes.
EXPECT_LT(passes_, WORKER_MAX_PASSES);
......@@ -3305,9 +3317,10 @@ TEST_F(ReceiverTest, receiverClassBasics) {
// Next we'll test error notification.
// We'll create a receiver that sets an error on the second pass.
receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
Receiver::RCV_ERROR))));
// Start the receiver with a thread that sets an error on the second pass.
receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_ERROR));
ASSERT_TRUE(receiver_->isRunning());
// No watches should be ready.
ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
......@@ -3322,6 +3335,7 @@ TEST_F(ReceiverTest, receiverClassBasics) {
// Tell it to stop.
receiver_->stop();
ASSERT_FALSE(receiver_->isRunning());
// It should have done less than the maximum number of passes.
EXPECT_LT(passes_, WORKER_MAX_PASSES);
......@@ -3334,9 +3348,10 @@ TEST_F(ReceiverTest, receiverClassBasics) {
// Finally, we'll test data ready notification.
// We'll create a receiver that indicates data ready on its second pass.
receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
Receiver::RCV_READY))));
// We'll start the receiver with a thread that indicates data ready on its second pass.
receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_READY));
ASSERT_TRUE(receiver_->isRunning());
// No watches should be ready.
ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
......@@ -3350,6 +3365,7 @@ TEST_F(ReceiverTest, receiverClassBasics) {
// Tell it to stop.
receiver_->stop();
ASSERT_FALSE(receiver_->isRunning());
// It should have done less than the maximum number of passes.
EXPECT_LT(passes_, WORKER_MAX_PASSES);
......
......@@ -47,7 +47,6 @@ IfacesConfigParser::parse(const CfgIfacePtr& cfg,
if (re_detect) {
// Interface clear will drop opened socket information
// so close them if the caller did not.
IfaceMgr::instance().stopDHCPReceiver();
IfaceMgr::instance().closeSockets();
IfaceMgr::instance().clearIfaces();
IfaceMgr::instance().detectIfaces();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment