diff --git a/src/lib/util/multi_threading_mgr.cc b/src/lib/util/multi_threading_mgr.cc index 7efd048bca96ecac1515f314e24ad1774a05daae..56878e0c0af251539ff8bc3a7cad786086f6c6c1 100644 --- a/src/lib/util/multi_threading_mgr.cc +++ b/src/lib/util/multi_threading_mgr.cc @@ -45,7 +45,10 @@ MultiThreadingMgr::enterCriticalSection() { ++critical_section_count_; if (getMode() && !inside) { if (getThreadPoolSize()) { - thread_pool_.stop(); + // We simply pause without waiting for all tasks to complete. + // We could also call wait() and pause(false) so that all tasks are + // complete and threads are stopped. + thread_pool_.pause(); } // Now it is safe to call callbacks which can also create other CSs. callEntryCallbacks(); @@ -67,7 +70,14 @@ MultiThreadingMgr::exitCriticalSection() { --critical_section_count_; if (getMode() && !isInCriticalSection()) { if (getThreadPoolSize()) { - thread_pool_.start(getThreadPoolSize()); + // If apply has been called, threads have never been started inside + // a critical section, so start them now, otherwise just resume + // paused threads. + if (!thread_pool_.enabled()) { + thread_pool_.start(getThreadPoolSize()); + } else { + thread_pool_.resume(); + } } // Now it is safe to call callbacks which can also create other CSs. callExitCallbacks(); diff --git a/src/lib/util/tests/multi_threading_mgr_unittest.cc b/src/lib/util/tests/multi_threading_mgr_unittest.cc index 2dc20759ca0e67810a3f697acf5174b862a52686..b13528a58fd322d8e2a79ea16d3878b86787e201 100644 --- a/src/lib/util/tests/multi_threading_mgr_unittest.cc +++ b/src/lib/util/tests/multi_threading_mgr_unittest.cc @@ -180,8 +180,16 @@ TEST(MultiThreadingMgrTest, criticalSection) { auto& thread_pool = MultiThreadingMgr::instance().getThreadPool(); // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); + // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); // apply multi-threading configuration with 16 threads and queue size 256 MultiThreadingMgr::instance().apply(true, 16, 256); + // thread pool should be running + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be running + EXPECT_EQ(thread_pool.paused(), false); // thread count should match EXPECT_EQ(thread_pool.size(), 16); // thread count should be 16 @@ -191,8 +199,12 @@ TEST(MultiThreadingMgrTest, criticalSection) { // use scope to test constructor and destructor { MultiThreadingCriticalSection cs; - // thread pool should be stopped - EXPECT_EQ(thread_pool.size(), 0); + // thread pool should be paused + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be paused + EXPECT_EQ(thread_pool.paused(), true); + // thread count should be 16 + EXPECT_EQ(thread_pool.size(), 16); // thread count should be 16 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16); // queue size should be 256 @@ -200,20 +212,32 @@ TEST(MultiThreadingMgrTest, criticalSection) { // use scope to test constructor and destructor { MultiThreadingCriticalSection inner_cs; - // thread pool should be stopped - EXPECT_EQ(thread_pool.size(), 0); + // thread pool should be paused + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be paused + EXPECT_EQ(thread_pool.paused(), true); + // thread count should be 16 + EXPECT_EQ(thread_pool.size(), 16); // thread count should be 16 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16); // queue size should be 256 EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256); } - // thread pool should be stopped - EXPECT_EQ(thread_pool.size(), 0); + // thread pool should be paused + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be paused + EXPECT_EQ(thread_pool.paused(), true); + // thread count should be 16 + EXPECT_EQ(thread_pool.size(), 16); // thread count should be 16 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16); // queue size should be 256 EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256); } + // thread pool should be running + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be running + EXPECT_EQ(thread_pool.paused(), false); // thread count should match EXPECT_EQ(thread_pool.size(), 16); // thread count should be 16 @@ -223,8 +247,12 @@ TEST(MultiThreadingMgrTest, criticalSection) { // use scope to test constructor and destructor { MultiThreadingCriticalSection cs; - // thread pool should be stopped - EXPECT_EQ(thread_pool.size(), 0); + // thread pool should be paused + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be paused + EXPECT_EQ(thread_pool.paused(), true); + // thread count should be 16 + EXPECT_EQ(thread_pool.size(), 16); // thread count should be 16 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16); // queue size should be 256 @@ -232,12 +260,20 @@ TEST(MultiThreadingMgrTest, criticalSection) { // apply multi-threading configuration with 64 threads and queue size 4 MultiThreadingMgr::instance().apply(true, 64, 4); // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // thread count should be 64 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 64); // queue size should be 4 EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 4); } + // thread pool should be running + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be running + EXPECT_EQ(thread_pool.paused(), false); // thread count should match EXPECT_EQ(thread_pool.size(), 64); // thread count should be 64 @@ -247,8 +283,12 @@ TEST(MultiThreadingMgrTest, criticalSection) { // use scope to test constructor and destructor { MultiThreadingCriticalSection cs; - // thread pool should be stopped - EXPECT_EQ(thread_pool.size(), 0); + // thread pool should be running + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be paused + EXPECT_EQ(thread_pool.paused(), true); + // thread count should be 64 + EXPECT_EQ(thread_pool.size(), 64); // thread count should be 64 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 64); // queue size should be 4 @@ -256,12 +296,20 @@ TEST(MultiThreadingMgrTest, criticalSection) { // apply multi-threading configuration with 0 threads MultiThreadingMgr::instance().apply(false, 64, 256); // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // thread count should be 0 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0); // queue size should be 0 EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0); } + // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); // thread count should match EXPECT_EQ(thread_pool.size(), 0); // thread count should be 0 @@ -272,6 +320,10 @@ TEST(MultiThreadingMgrTest, criticalSection) { { MultiThreadingCriticalSection cs; // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // thread count should be 0 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0); @@ -281,6 +333,10 @@ TEST(MultiThreadingMgrTest, criticalSection) { { MultiThreadingCriticalSection inner_cs; // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // thread count should be 0 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0); @@ -288,12 +344,20 @@ TEST(MultiThreadingMgrTest, criticalSection) { EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0); } // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // thread count should be 0 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0); // queue size should be 0 EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0); } + // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); // thread count should match EXPECT_EQ(thread_pool.size(), 0); // thread count should be 0 @@ -304,16 +368,28 @@ TEST(MultiThreadingMgrTest, criticalSection) { { MultiThreadingCriticalSection cs; // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // apply multi-threading configuration with 64 threads MultiThreadingMgr::instance().apply(true, 64, 256); // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // thread count should be 64 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 64); // queue size should be 256 EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256); } + // thread pool should be running + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be running + EXPECT_EQ(thread_pool.paused(), false); // thread count should match EXPECT_EQ(thread_pool.size(), 64); // thread count should be 64 @@ -373,7 +449,7 @@ public: /// /// @return True if the pool is running, false otherwise. bool isThreadPoolRunning() { - return (MultiThreadingMgr::instance().getThreadPool().size()); + return (!MultiThreadingMgr::instance().getThreadPool().paused()); } /// @brief Checks callback invocations over a series of nested diff --git a/src/lib/util/tests/thread_pool_unittest.cc b/src/lib/util/tests/thread_pool_unittest.cc index 9c636c9e85f42b2d8507a1c67f3a96603f0549aa..b44e6b06f0c92d6d95a381ef49c5fc82387a06cc 100644 --- a/src/lib/util/tests/thread_pool_unittest.cc +++ b/src/lib/util/tests/thread_pool_unittest.cc @@ -570,6 +570,271 @@ TEST_F(ThreadPoolTest, wait) { // check that the number of processed tasks matches the number of items checkRunHistory(items_count); + + // calling stop should clear all threads and should keep queued items + EXPECT_NO_THROW(thread_pool.stop()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + items_count = 64; + thread_count = 16; + // prepare setup + reset(thread_count); + + // add items to stopped thread pool + for (uint32_t i = 0; i < items_count; ++i) { + bool ret = true; + EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared(call_back))); + EXPECT_TRUE(ret); + } + + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling start should create the threads and should keep the queued items + EXPECT_NO_THROW(thread_pool.start(thread_count)); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // calling stop should clear all threads and should keep queued items + EXPECT_NO_THROW(thread_pool.stop()); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + // wait for all items to be processed + ASSERT_TRUE(thread_pool.wait(1)); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); +} + +/// @brief test ThreadPool pause and resume +TEST_F(ThreadPoolTest, pauseAndResume) { + uint32_t items_count; + uint32_t thread_count; + CallBack call_back; + ThreadPool thread_pool; + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + items_count = 4; + thread_count = 4; + // prepare setup + reset(thread_count); + + // create tasks which block thread pool threads until signaled by main + // thread to force all threads of the thread pool to run exactly one task + call_back = std::bind(&ThreadPoolTest::runAndWait, this); + + // calling resume should do nothing if not started + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // do it once again to check if it works + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling pause should do nothing if not started + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // do it once again to check if it works + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling resume should do nothing if not started + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // add items to stopped thread pool + for (uint32_t i = 0; i < items_count; ++i) { + bool ret = true; + EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared(call_back))); + EXPECT_TRUE(ret); + } + + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling pause should do nothing if not started + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling resume should do nothing if not started + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling pause should do nothing if not started + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling start should create the threads and should keep the queued items + EXPECT_NO_THROW(thread_pool.start(thread_count)); + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // calling resume should resume threads + EXPECT_NO_THROW(thread_pool.resume()); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // as each thread pool thread is still waiting on main to unblock, each + // thread should have been registered in ids list + checkIds(items_count); + // all items should have been processed + ASSERT_EQ(count(), items_count); + + // check that the number of processed tasks matches the number of items + checkRunHistory(items_count); + + // signal thread pool tasks to continue + signalThreads(); + + // do it once again to check if it works + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // calling stop should clear all threads and should keep queued items + EXPECT_NO_THROW(thread_pool.stop()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + items_count = 64; + thread_count = 16; + // prepare setup + reset(thread_count); + + // create tasks which do not block the thread pool threads so that several + // tasks can be run on the same thread and some of the threads never even + // having a chance to run + call_back = std::bind(&ThreadPoolTest::run, this); + + // add items to stopped thread pool + for (uint32_t i = 0; i < items_count; ++i) { + bool ret = true; + EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared(call_back))); + EXPECT_TRUE(ret); + } + + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling start should create the threads and should keep the queued items + EXPECT_NO_THROW(thread_pool.start(thread_count)); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // all items should have been processed + ASSERT_EQ(count(), items_count); + + // check that the number of processed tasks matches the number of items + checkRunHistory(items_count); + + // calling pause should pause threads + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // do it once again to check if it works + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // prepare setup + reset(thread_count); + + // add items to paused thread pool + for (uint32_t i = 0; i < items_count; ++i) { + bool ret = true; + EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared(call_back))); + EXPECT_TRUE(ret); + } + + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // calling resume should resume threads + EXPECT_NO_THROW(thread_pool.resume()); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // all items should have been processed + ASSERT_EQ(count(), items_count); + + // check that the number of processed tasks matches the number of items + checkRunHistory(items_count); + + // calling stop should clear all threads and should keep queued items + EXPECT_NO_THROW(thread_pool.stop()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); } /// @brief test ThreadPool max queue size diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index fdfce0f71bb3538a1acebfe620548243d79a841d..aecd261ca313af7018066050d27888172bba2cd8 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -140,6 +140,40 @@ struct ThreadPool { return (queue_.wait(seconds)); } + /// @brief pause threads + /// + /// Used to pause threads so that they stop processing tasks + /// + /// @param wait the flag indicating if should wait for threads to pause. + void pause(bool wait = true) { + queue_.pause(wait); + } + + /// @brief resume threads + /// + /// Used to resume threads so that they start processing tasks + void resume() { + queue_.resume(); + } + + /// @brief return the state of the queue + /// + /// Returns the state of the queue + /// + /// @return the state + bool enabled() { + return (queue_.enabled()); + } + + /// @brief return the state of the threads + /// + /// Returns the state of the threads + /// + /// @return the state + bool paused() { + return (queue_.paused()); + } + /// @brief set maximum number of work items in the queue /// /// @param max_queue_size the maximum size (0 means unlimited) @@ -185,7 +219,7 @@ private: sigaddset(&sset, SIGHUP); sigaddset(&sset, SIGTERM); pthread_sigmask(SIG_BLOCK, &sset, &osset); - queue_.enable(thread_count); + queue_.enable(); try { for (uint32_t i = 0; i < thread_count; ++i) { threads_.push_back(boost::make_shared(&ThreadPool::run, this)); @@ -241,7 +275,7 @@ private: /// /// Creates the thread pool queue in 'disabled' state ThreadPoolQueue() - : enabled_(false), max_queue_size_(0), working_(0), + : enabled_(false), paused_(false), max_queue_size_(0), working_(0), stat10(0.), stat100(0.), stat1000(0.) { } @@ -253,6 +287,18 @@ private: clear(); } + /// @brief register thread so that it can be taken into account + void registerThread() { + std::lock_guard lock(mutex_); + ++working_; + } + + /// @brief unregister thread so that it can be ignored + void unregisterThread() { + std::lock_guard lock(mutex_); + --working_; + } + /// @brief set maximum number of work items in the queue /// /// @return the maximum size (0 means unlimited) @@ -338,15 +384,21 @@ private: Item pop() { std::unique_lock lock(mutex_); --working_; - // Wait for push or disable functions. + // Signal thread waiting for threads to pause. + if (working_ == 0 && paused_) { + wait_threads_cv_.notify_all(); + } + // Signal thread waiting for tasks to finish. if (working_ == 0 && queue_.empty()) { wait_cv_.notify_all(); } + // Wait for push or disable functions. cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());}); + pause_cv_.wait(lock, [&]() {return (!enabled_ || !paused_);}); + ++working_; if (!enabled_) { return (Item()); } - ++working_; size_t length = queue_.size(); stat10 = stat10 * CEXP10 + (1 - CEXP10) * length; stat100 = stat100 * CEXP100 + (1 - CEXP100) * length; @@ -391,6 +443,29 @@ private: return (ret); } + /// @brief pause threads + /// + /// Used to pause threads so that they stop processing tasks + /// + /// @param wait the flag indicating if should wait for threads to pause. + void pause(bool wait) { + std::unique_lock lock(mutex_); + paused_ = true; + if (wait) { + // Wait for working threads to finish. + wait_threads_cv_.wait(lock, [&]() {return (working_ == 0);}); + } + } + + /// @brief resume threads + /// + /// Used to resume threads so that they start processing tasks + void resume() { + std::unique_lock lock(mutex_); + paused_ = false; + pause_cv_.notify_all(); + } + /// @brief get queue length statistic /// /// @param which select the statistic (10, 100 or 1000) @@ -417,19 +492,14 @@ private: void clear() { std::lock_guard lock(mutex_); queue_ = QueueContainer(); - working_ = 0; - wait_cv_.notify_all(); } /// @brief enable the queue /// /// Sets the queue state to 'enabled' - /// - /// @param number of working threads - void enable(uint32_t thread_count) { + void enable() { std::lock_guard lock(mutex_); enabled_ = true; - working_ = thread_count; } /// @brief disable the queue @@ -438,9 +508,11 @@ private: void disable() { { std::lock_guard lock(mutex_); + paused_ = false; enabled_ = false; } // Notify pop so that it can exit. + pause_cv_.notify_all(); cv_.notify_all(); } @@ -453,6 +525,15 @@ private: return (enabled_); } + /// @brief return the state of the threads + /// + /// Returns the state of the threads + /// + /// @return the state + bool paused() { + return (paused_); + } + private: /// @brief underlying queue container QueueContainer queue_; @@ -466,11 +547,22 @@ private: /// @brief condition variable used to wait for all items to be processed std::condition_variable wait_cv_; - /// @brief the sate of the queue + /// @brief condition variable used to wait for all threads to be paused + std::condition_variable wait_threads_cv_; + + /// @brief condition variable used to pause threads + std::condition_variable pause_cv_; + + /// @brief the state of the queue /// The 'enabled' state corresponds to true value /// The 'disabled' state corresponds to false value std::atomic enabled_; + /// @brief the pause state of the queue + /// The 'paused' state corresponds to true value + /// The 'resumed' state corresponds to false value + std::atomic paused_; + /// @brief maximum number of work items in the queue /// (0 means unlimited) size_t max_queue_size_; @@ -490,7 +582,12 @@ private: /// @brief run function of each thread void run() { - while (queue_.enabled()) { + bool work = queue_.enabled(); + if (!work) { + return; + } + queue_.registerThread(); + for (; work; work = queue_.enabled()) { WorkItemPtr item = queue_.pop(); if (item) { try { @@ -500,6 +597,7 @@ private: } } } + queue_.unregisterThread(); } /// @brief list of worker threads