From abf48e00f83f55443c9c6cf9db559345ebc35938 Mon Sep 17 00:00:00 2001 From: Razvan Becheriu Date: Thu, 10 Dec 2020 22:52:58 +0200 Subject: [PATCH 1/8] [#1599] fixed race on wait and disable in ThreadPoolQueue --- src/lib/util/thread_pool.h | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index fdfce0f71b..432dc94a8c 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -185,7 +185,7 @@ private: sigaddset(&sset, SIGHUP); sigaddset(&sset, SIGTERM); pthread_sigmask(SIG_BLOCK, &sset, &osset); - queue_.enable(thread_count); + queue_.enable(); try { for (uint32_t i = 0; i < thread_count; ++i) { threads_.push_back(boost::make_shared(&ThreadPool::run, this)); @@ -253,6 +253,12 @@ private: clear(); } + /// @brief register thread so that it can be taken into account + void registerThread() { + std::lock_guard lock(mutex_); + ++working_; + } + /// @brief set maximum number of work items in the queue /// /// @return the maximum size (0 means unlimited) @@ -417,19 +423,14 @@ private: void clear() { std::lock_guard lock(mutex_); queue_ = QueueContainer(); - working_ = 0; - wait_cv_.notify_all(); } /// @brief enable the queue /// /// Sets the queue state to 'enabled' - /// - /// @param number of working threads - void enable(uint32_t thread_count) { + void enable() { std::lock_guard lock(mutex_); enabled_ = true; - working_ = thread_count; } /// @brief disable the queue @@ -490,7 +491,12 @@ private: /// @brief run function of each thread void run() { + bool register_thread = true; while (queue_.enabled()) { + if (register_thread) { + queue_.registerThread(); + register_thread = false; + } WorkItemPtr item = queue_.pop(); if (item) { try { -- GitLab From d2a8b3211f5f77a3db6323eabece654841ba79d7 Mon Sep 17 00:00:00 2001 From: Razvan Becheriu Date: Thu, 10 Dec 2020 23:19:28 +0200 Subject: [PATCH 2/8] [#1599] fixed register/unregister thread in ThreadPoolQueue --- src/lib/util/thread_pool.h | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index 432dc94a8c..710a5c9300 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -259,6 +259,12 @@ private: ++working_; } + /// @brief unregister thread so that it can be ignored + void unregisterThread() { + std::lock_guard lock(mutex_); + --working_; + } + /// @brief set maximum number of work items in the queue /// /// @return the maximum size (0 means unlimited) @@ -349,10 +355,10 @@ private: wait_cv_.notify_all(); } cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());}); + ++working_; if (!enabled_) { return (Item()); } - ++working_; size_t length = queue_.size(); stat10 = stat10 * CEXP10 + (1 - CEXP10) * length; stat100 = stat100 * CEXP100 + (1 - CEXP100) * length; @@ -506,6 +512,9 @@ private: } } } + if (!register_thread) { + queue_.unregisterThread(); + } } /// @brief list of worker threads -- GitLab From 258442631e5b4c3070d7e7a5650e1e05902f5519 Mon Sep 17 00:00:00 2001 From: Razvan Becheriu Date: Fri, 11 Dec 2020 00:21:28 +0200 Subject: [PATCH 3/8] [#1599] optimizations --- src/lib/util/thread_pool.h | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index 710a5c9300..f4c5153cb9 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -497,12 +497,12 @@ private: /// @brief run function of each thread void run() { - bool register_thread = true; - while (queue_.enabled()) { - if (register_thread) { - queue_.registerThread(); - register_thread = false; - } + bool work = queue_.enabled(); + if (!work) { + return; + } + queue_.registerThread(); + for (; work; work = queue_.enabled()) { WorkItemPtr item = queue_.pop(); if (item) { try { @@ -512,9 +512,7 @@ private: } } } - if (!register_thread) { - queue_.unregisterThread(); - } + queue_.unregisterThread(); } /// @brief list of worker threads -- GitLab From a1777659c37cbcd7a60559d599283f6afbcb2602 Mon Sep 17 00:00:00 2001 From: Razvan Becheriu Date: Fri, 11 Dec 2020 10:35:46 +0200 Subject: [PATCH 4/8] [#1599] added UT for start stop wait --- src/lib/util/tests/thread_pool_unittest.cc | 38 ++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/lib/util/tests/thread_pool_unittest.cc b/src/lib/util/tests/thread_pool_unittest.cc index 9c636c9e85..e0152a236f 100644 --- a/src/lib/util/tests/thread_pool_unittest.cc +++ b/src/lib/util/tests/thread_pool_unittest.cc @@ -570,6 +570,44 @@ TEST_F(ThreadPoolTest, wait) { // check that the number of processed tasks matches the number of items checkRunHistory(items_count); + + // calling stop should clear all threads and should keep queued items + EXPECT_NO_THROW(thread_pool.stop()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + items_count = 64; + thread_count = 16; + // prepare setup + reset(thread_count); + + // add items to stopped thread pool + for (uint32_t i = 0; i < items_count; ++i) { + bool ret = true; + EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared(call_back))); + EXPECT_TRUE(ret); + } + + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling start should create the threads and should keep the queued items + EXPECT_NO_THROW(thread_pool.start(thread_count)); + + // calling stop should clear all threads and should keep queued items + EXPECT_NO_THROW(thread_pool.stop()); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + // wait for all items to be processed + ASSERT_TRUE(thread_pool.wait(1)); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), 0); } /// @brief test ThreadPool max queue size -- GitLab From 0c5bfdf6fd01bfdc23bb43104e8d167dc57bd5fd Mon Sep 17 00:00:00 2001 From: Razvan Becheriu Date: Fri, 11 Dec 2020 12:52:03 +0200 Subject: [PATCH 5/8] [#1599] implemented pause and resume --- src/lib/util/tests/thread_pool_unittest.cc | 225 +++++++++++++++++++++ src/lib/util/thread_pool.h | 44 +++- 2 files changed, 267 insertions(+), 2 deletions(-) diff --git a/src/lib/util/tests/thread_pool_unittest.cc b/src/lib/util/tests/thread_pool_unittest.cc index e0152a236f..e3b84a62ab 100644 --- a/src/lib/util/tests/thread_pool_unittest.cc +++ b/src/lib/util/tests/thread_pool_unittest.cc @@ -606,7 +606,232 @@ TEST_F(ThreadPoolTest, wait) { ASSERT_TRUE(thread_pool.wait(1)); // the item count should be 0 ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); +} + +/// @brief test ThreadPool pause and resume +TEST_F(ThreadPoolTest, pauseAndResume) { + uint32_t items_count; + uint32_t thread_count; + CallBack call_back; + ThreadPool thread_pool; + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + items_count = 4; + thread_count = 4; + // prepare setup + reset(thread_count); + + // create tasks which block thread pool threads until signaled by main + // thread to force all threads of the thread pool to run exactly one task + call_back = std::bind(&ThreadPoolTest::runAndWait, this); + + // calling resume should do nothing if not started + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // do it once again to check if it works + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling pause should do nothing if not started + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // do it once again to check if it works + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling resume should do nothing if not started + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // add items to stopped thread pool + for (uint32_t i = 0; i < items_count; ++i) { + bool ret = true; + EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared(call_back))); + EXPECT_TRUE(ret); + } + + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling pause should do nothing if not started + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling resume should do nothing if not started + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling pause should do nothing if not started + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling start should create the threads and should keep the queued items + EXPECT_NO_THROW(thread_pool.start(thread_count)); + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // calling resume should resume threads + EXPECT_NO_THROW(thread_pool.resume()); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // as each thread pool thread is still waiting on main to unblock, each + // thread should have been registered in ids list + checkIds(items_count); + // all items should have been processed + ASSERT_EQ(count(), items_count); + + // check that the number of processed tasks matches the number of items + checkRunHistory(items_count); + + // signal thread pool tasks to continue + signalThreads(); + + // do it once again to check if it works + EXPECT_NO_THROW(thread_pool.resume()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // calling stop should clear all threads and should keep queued items + EXPECT_NO_THROW(thread_pool.stop()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + items_count = 64; + thread_count = 16; + // prepare setup + reset(thread_count); + + // create tasks which do not block the thread pool threads so that several + // tasks can be run on the same thread and some of the threads never even + // having a chance to run + call_back = std::bind(&ThreadPoolTest::run, this); + + // add items to stopped thread pool + for (uint32_t i = 0; i < items_count; ++i) { + bool ret = true; + EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared(call_back))); + EXPECT_TRUE(ret); + } + + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling start should create the threads and should keep the queued items + EXPECT_NO_THROW(thread_pool.start(thread_count)); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // all items should have been processed + ASSERT_EQ(count(), items_count); + + // check that the number of processed tasks matches the number of items + checkRunHistory(items_count); + + // calling pause should pause threads + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // do it once again to check if it works + EXPECT_NO_THROW(thread_pool.pause()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // prepare setup + reset(thread_count); + + // add items to paused thread pool + for (uint32_t i = 0; i < items_count; ++i) { + bool ret = true; + EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared(call_back))); + EXPECT_TRUE(ret); + } + + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // calling resume should resume threads + EXPECT_NO_THROW(thread_pool.resume()); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // all items should have been processed + ASSERT_EQ(count(), items_count); + + // check that the number of processed tasks matches the number of items + checkRunHistory(items_count); + + // calling stop should clear all threads and should keep queued items + EXPECT_NO_THROW(thread_pool.stop()); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 ASSERT_EQ(thread_pool.size(), 0); } diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index f4c5153cb9..207c265d57 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -140,6 +140,20 @@ struct ThreadPool { return (queue_.wait(seconds)); } + /// @brief pause threads + /// + /// Used to pause threads so that they stop processing tasks + void pause() { + queue_.pause(); + } + + /// @brief resume threads + /// + /// Used to resume threads so that they start processing tasks + void resume() { + queue_.resume(); + } + /// @brief set maximum number of work items in the queue /// /// @param max_queue_size the maximum size (0 means unlimited) @@ -241,7 +255,7 @@ private: /// /// Creates the thread pool queue in 'disabled' state ThreadPoolQueue() - : enabled_(false), max_queue_size_(0), working_(0), + : enabled_(false), paused_(false), max_queue_size_(0), working_(0), stat10(0.), stat100(0.), stat1000(0.) { } @@ -355,6 +369,7 @@ private: wait_cv_.notify_all(); } cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());}); + pause_cv_.wait(lock, [&]() {return (!enabled_ || !paused_);}); ++working_; if (!enabled_) { return (Item()); @@ -403,6 +418,23 @@ private: return (ret); } + /// @brief pause threads + /// + /// Used to pause threads so that they stop processing tasks + void pause() { + std::unique_lock lock(mutex_); + paused_ = true; + } + + /// @brief resume threads + /// + /// Used to resume threads so that they start processing tasks + void resume() { + std::unique_lock lock(mutex_); + paused_ = false; + pause_cv_.notify_all(); + } + /// @brief get queue length statistic /// /// @param which select the statistic (10, 100 or 1000) @@ -473,11 +505,19 @@ private: /// @brief condition variable used to wait for all items to be processed std::condition_variable wait_cv_; - /// @brief the sate of the queue + /// @brief condition variable used to pause threads + std::condition_variable pause_cv_; + + /// @brief the state of the queue /// The 'enabled' state corresponds to true value /// The 'disabled' state corresponds to false value std::atomic enabled_; + /// @brief the pause state of the queue + /// The 'paused' state corresponds to true value + /// The 'resumed' state corresponds to false value + std::atomic paused_; + /// @brief maximum number of work items in the queue /// (0 means unlimited) size_t max_queue_size_; -- GitLab From b6c86fc18fa4f851e8c3b49986d396da156a5187 Mon Sep 17 00:00:00 2001 From: Razvan Becheriu Date: Sat, 12 Dec 2020 13:41:57 +0200 Subject: [PATCH 6/8] [#1599] added extra check in unittests --- src/lib/util/tests/thread_pool_unittest.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lib/util/tests/thread_pool_unittest.cc b/src/lib/util/tests/thread_pool_unittest.cc index e3b84a62ab..b44e6b06f0 100644 --- a/src/lib/util/tests/thread_pool_unittest.cc +++ b/src/lib/util/tests/thread_pool_unittest.cc @@ -597,6 +597,8 @@ TEST_F(ThreadPoolTest, wait) { // calling start should create the threads and should keep the queued items EXPECT_NO_THROW(thread_pool.start(thread_count)); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); // calling stop should clear all threads and should keep queued items EXPECT_NO_THROW(thread_pool.stop()); -- GitLab From e2b5cd3db1f16ca35d9ef13736e20d5d67a38ff1 Mon Sep 17 00:00:00 2001 From: Razvan Becheriu Date: Wed, 25 Aug 2021 09:36:24 +0300 Subject: [PATCH 7/8] [#1599] added pause with wait for threads to stop --- src/lib/util/multi_threading_mgr.cc | 8 ++++++-- src/lib/util/thread_pool.h | 24 ++++++++++++++++++++---- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/src/lib/util/multi_threading_mgr.cc b/src/lib/util/multi_threading_mgr.cc index 7efd048bca..5f8734f5e2 100644 --- a/src/lib/util/multi_threading_mgr.cc +++ b/src/lib/util/multi_threading_mgr.cc @@ -45,7 +45,11 @@ MultiThreadingMgr::enterCriticalSection() { ++critical_section_count_; if (getMode() && !inside) { if (getThreadPoolSize()) { - thread_pool_.stop(); + // We simply pause without waiting for all tasks to complete. + // We could also call pause(false) which does not wait for + // threads to stop and wait() so that all tasks are complete + // and threads are stopped. + thread_pool_.pause(); } // Now it is safe to call callbacks which can also create other CSs. callEntryCallbacks(); @@ -67,7 +71,7 @@ MultiThreadingMgr::exitCriticalSection() { --critical_section_count_; if (getMode() && !isInCriticalSection()) { if (getThreadPoolSize()) { - thread_pool_.start(getThreadPoolSize()); + thread_pool_.resume(); } // Now it is safe to call callbacks which can also create other CSs. callExitCallbacks(); diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index 207c265d57..28e66c09d9 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -143,8 +143,10 @@ struct ThreadPool { /// @brief pause threads /// /// Used to pause threads so that they stop processing tasks - void pause() { - queue_.pause(); + /// + /// @param wait the flag indicating if should wait for threads to pause. + void pause(bool wait = true) { + queue_.pause(wait); } /// @brief resume threads @@ -364,10 +366,15 @@ private: Item pop() { std::unique_lock lock(mutex_); --working_; - // Wait for push or disable functions. + // Signal thread waiting for threads to pause. + if (working_ == 0 && paused_) { + wait_threads_cv_.notify_all(); + } + // Signal thread waiting for tasks to finish. if (working_ == 0 && queue_.empty()) { wait_cv_.notify_all(); } + // Wait for push or disable functions. cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());}); pause_cv_.wait(lock, [&]() {return (!enabled_ || !paused_);}); ++working_; @@ -421,9 +428,15 @@ private: /// @brief pause threads /// /// Used to pause threads so that they stop processing tasks - void pause() { + /// + /// @param wait the flag indicating if should wait for threads to pause. + void pause(bool wait) { std::unique_lock lock(mutex_); paused_ = true; + if (wait) { + // Wait for working threads to finish. + wait_threads_cv_.wait(lock, [&]() {return (working_ == 0);}); + } } /// @brief resume threads @@ -505,6 +518,9 @@ private: /// @brief condition variable used to wait for all items to be processed std::condition_variable wait_cv_; + /// @brief condition variable used to wait for all threads to be paused + std::condition_variable wait_threads_cv_; + /// @brief condition variable used to pause threads std::condition_variable pause_cv_; -- GitLab From 2485fbfac4baa10c9ee204482eca9ab4b614b832 Mon Sep 17 00:00:00 2001 From: Razvan Becheriu Date: Tue, 5 Apr 2022 15:26:38 +0300 Subject: [PATCH 8/8] [#1599] updated unittests --- src/lib/util/multi_threading_mgr.cc | 14 ++- .../tests/multi_threading_mgr_unittest.cc | 98 ++++++++++++++++--- src/lib/util/thread_pool.h | 29 ++++++ 3 files changed, 126 insertions(+), 15 deletions(-) diff --git a/src/lib/util/multi_threading_mgr.cc b/src/lib/util/multi_threading_mgr.cc index 5f8734f5e2..56878e0c0a 100644 --- a/src/lib/util/multi_threading_mgr.cc +++ b/src/lib/util/multi_threading_mgr.cc @@ -46,9 +46,8 @@ MultiThreadingMgr::enterCriticalSection() { if (getMode() && !inside) { if (getThreadPoolSize()) { // We simply pause without waiting for all tasks to complete. - // We could also call pause(false) which does not wait for - // threads to stop and wait() so that all tasks are complete - // and threads are stopped. + // We could also call wait() and pause(false) so that all tasks are + // complete and threads are stopped. thread_pool_.pause(); } // Now it is safe to call callbacks which can also create other CSs. @@ -71,7 +70,14 @@ MultiThreadingMgr::exitCriticalSection() { --critical_section_count_; if (getMode() && !isInCriticalSection()) { if (getThreadPoolSize()) { - thread_pool_.resume(); + // If apply has been called, threads have never been started inside + // a critical section, so start them now, otherwise just resume + // paused threads. + if (!thread_pool_.enabled()) { + thread_pool_.start(getThreadPoolSize()); + } else { + thread_pool_.resume(); + } } // Now it is safe to call callbacks which can also create other CSs. callExitCallbacks(); diff --git a/src/lib/util/tests/multi_threading_mgr_unittest.cc b/src/lib/util/tests/multi_threading_mgr_unittest.cc index 2dc20759ca..b13528a58f 100644 --- a/src/lib/util/tests/multi_threading_mgr_unittest.cc +++ b/src/lib/util/tests/multi_threading_mgr_unittest.cc @@ -180,8 +180,16 @@ TEST(MultiThreadingMgrTest, criticalSection) { auto& thread_pool = MultiThreadingMgr::instance().getThreadPool(); // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); + // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); // apply multi-threading configuration with 16 threads and queue size 256 MultiThreadingMgr::instance().apply(true, 16, 256); + // thread pool should be running + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be running + EXPECT_EQ(thread_pool.paused(), false); // thread count should match EXPECT_EQ(thread_pool.size(), 16); // thread count should be 16 @@ -191,8 +199,12 @@ TEST(MultiThreadingMgrTest, criticalSection) { // use scope to test constructor and destructor { MultiThreadingCriticalSection cs; - // thread pool should be stopped - EXPECT_EQ(thread_pool.size(), 0); + // thread pool should be paused + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be paused + EXPECT_EQ(thread_pool.paused(), true); + // thread count should be 16 + EXPECT_EQ(thread_pool.size(), 16); // thread count should be 16 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16); // queue size should be 256 @@ -200,20 +212,32 @@ TEST(MultiThreadingMgrTest, criticalSection) { // use scope to test constructor and destructor { MultiThreadingCriticalSection inner_cs; - // thread pool should be stopped - EXPECT_EQ(thread_pool.size(), 0); + // thread pool should be paused + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be paused + EXPECT_EQ(thread_pool.paused(), true); + // thread count should be 16 + EXPECT_EQ(thread_pool.size(), 16); // thread count should be 16 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16); // queue size should be 256 EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256); } - // thread pool should be stopped - EXPECT_EQ(thread_pool.size(), 0); + // thread pool should be paused + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be paused + EXPECT_EQ(thread_pool.paused(), true); + // thread count should be 16 + EXPECT_EQ(thread_pool.size(), 16); // thread count should be 16 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16); // queue size should be 256 EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256); } + // thread pool should be running + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be running + EXPECT_EQ(thread_pool.paused(), false); // thread count should match EXPECT_EQ(thread_pool.size(), 16); // thread count should be 16 @@ -223,8 +247,12 @@ TEST(MultiThreadingMgrTest, criticalSection) { // use scope to test constructor and destructor { MultiThreadingCriticalSection cs; - // thread pool should be stopped - EXPECT_EQ(thread_pool.size(), 0); + // thread pool should be paused + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be paused + EXPECT_EQ(thread_pool.paused(), true); + // thread count should be 16 + EXPECT_EQ(thread_pool.size(), 16); // thread count should be 16 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16); // queue size should be 256 @@ -232,12 +260,20 @@ TEST(MultiThreadingMgrTest, criticalSection) { // apply multi-threading configuration with 64 threads and queue size 4 MultiThreadingMgr::instance().apply(true, 64, 4); // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // thread count should be 64 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 64); // queue size should be 4 EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 4); } + // thread pool should be running + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be running + EXPECT_EQ(thread_pool.paused(), false); // thread count should match EXPECT_EQ(thread_pool.size(), 64); // thread count should be 64 @@ -247,8 +283,12 @@ TEST(MultiThreadingMgrTest, criticalSection) { // use scope to test constructor and destructor { MultiThreadingCriticalSection cs; - // thread pool should be stopped - EXPECT_EQ(thread_pool.size(), 0); + // thread pool should be running + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be paused + EXPECT_EQ(thread_pool.paused(), true); + // thread count should be 64 + EXPECT_EQ(thread_pool.size(), 64); // thread count should be 64 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 64); // queue size should be 4 @@ -256,12 +296,20 @@ TEST(MultiThreadingMgrTest, criticalSection) { // apply multi-threading configuration with 0 threads MultiThreadingMgr::instance().apply(false, 64, 256); // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // thread count should be 0 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0); // queue size should be 0 EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0); } + // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); // thread count should match EXPECT_EQ(thread_pool.size(), 0); // thread count should be 0 @@ -272,6 +320,10 @@ TEST(MultiThreadingMgrTest, criticalSection) { { MultiThreadingCriticalSection cs; // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // thread count should be 0 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0); @@ -281,6 +333,10 @@ TEST(MultiThreadingMgrTest, criticalSection) { { MultiThreadingCriticalSection inner_cs; // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // thread count should be 0 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0); @@ -288,12 +344,20 @@ TEST(MultiThreadingMgrTest, criticalSection) { EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0); } // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // thread count should be 0 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0); // queue size should be 0 EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0); } + // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); // thread count should match EXPECT_EQ(thread_pool.size(), 0); // thread count should be 0 @@ -304,16 +368,28 @@ TEST(MultiThreadingMgrTest, criticalSection) { { MultiThreadingCriticalSection cs; // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // apply multi-threading configuration with 64 threads MultiThreadingMgr::instance().apply(true, 64, 256); // thread pool should be stopped + EXPECT_EQ(thread_pool.enabled(), false); + // thread pool should be stopped + EXPECT_EQ(thread_pool.paused(), false); + // thread pool should be stopped EXPECT_EQ(thread_pool.size(), 0); // thread count should be 64 EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 64); // queue size should be 256 EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256); } + // thread pool should be running + EXPECT_EQ(thread_pool.enabled(), true); + // thread pool should be running + EXPECT_EQ(thread_pool.paused(), false); // thread count should match EXPECT_EQ(thread_pool.size(), 64); // thread count should be 64 @@ -373,7 +449,7 @@ public: /// /// @return True if the pool is running, false otherwise. bool isThreadPoolRunning() { - return (MultiThreadingMgr::instance().getThreadPool().size()); + return (!MultiThreadingMgr::instance().getThreadPool().paused()); } /// @brief Checks callback invocations over a series of nested diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index 28e66c09d9..aecd261ca3 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -156,6 +156,24 @@ struct ThreadPool { queue_.resume(); } + /// @brief return the state of the queue + /// + /// Returns the state of the queue + /// + /// @return the state + bool enabled() { + return (queue_.enabled()); + } + + /// @brief return the state of the threads + /// + /// Returns the state of the threads + /// + /// @return the state + bool paused() { + return (queue_.paused()); + } + /// @brief set maximum number of work items in the queue /// /// @param max_queue_size the maximum size (0 means unlimited) @@ -490,9 +508,11 @@ private: void disable() { { std::lock_guard lock(mutex_); + paused_ = false; enabled_ = false; } // Notify pop so that it can exit. + pause_cv_.notify_all(); cv_.notify_all(); } @@ -505,6 +525,15 @@ private: return (enabled_); } + /// @brief return the state of the threads + /// + /// Returns the state of the threads + /// + /// @return the state + bool paused() { + return (paused_); + } + private: /// @brief underlying queue container QueueContainer queue_; -- GitLab