thread_pool.h 8.01 KB
Newer Older
1 2
// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
//
3 4 5
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 7 8 9

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

10
#include <exceptions/exceptions.h>
11 12
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
13

14 15 16 17 18 19 20 21
#include <atomic>
#include <condition_variable>
#include <list>
#include <mutex>
#include <queue>
#include <thread>

namespace isc {
22
namespace util {
23

24
/// @brief Defines a thread pool which uses a thread pool queue for managing
25
/// work items. Each work item is a 'functor' object.
26 27 28
///
/// @tparam WorkItem a functor
/// @tparam Container a 'queue like' container
29
template <typename WorkItem, typename Container = std::queue<boost::shared_ptr<WorkItem>>>
30
struct ThreadPool {
31
    typedef typename boost::shared_ptr<WorkItem> WorkItemPtr;
32

33
    /// @brief Constructor
34
    ThreadPool() {
35
    }
36 37

    /// @brief Destructor
38 39 40
    ~ThreadPool() {
        reset();
    }
41

42 43
    /// @brief reset the thread pool stopping threads and clearing the internal
    /// queue
44 45
    ///
    /// It can be called several times even when the thread pool is stopped
46
    void reset() {
47
        stopInternal();
48 49
        queue_.clear();
    }
50

51 52
    /// @brief start all the threads
    ///
53 54
    /// @param thread_count specifies the number of threads to be created and
    /// started
55 56 57
    ///
    /// @throw InvalidOperation if thread pool already started
    /// @throw InvalidParameter if thread count is 0
58
    void start(uint32_t thread_count) {
59 60 61
        if (!thread_count) {
            isc_throw(InvalidParameter, "thread count is 0");
        }
62
        if (queue_.enabled()) {
63
            isc_throw(InvalidOperation, "thread pool already started");
64
        }
65 66 67
        startInternal(thread_count);
    }

68
    /// @brief stop all the threads
69 70
    ///
    /// @throw InvalidOperation if thread pool already stopped
71
    void stop() {
72
        if (!queue_.enabled()) {
73
            isc_throw(InvalidOperation, "thread pool already stopped");
74
        }
75 76 77
        stopInternal();
    }

78
    /// @brief add a work item to the thread pool
79
    ///
80
    /// @param item the 'functor' object to be added to the queue
81
    void add(const WorkItemPtr& item) {
82 83
        queue_.push(item);
    }
84

85 86 87
    /// @brief count number of work items in the queue
    ///
    /// @return the number of work items in the queue
88
    size_t count() {
89
        return (queue_.count());
90
    }
91

92 93 94
    /// @brief size number of thread pool threads
    ///
    /// @return the number of threads
95
    size_t size() {
96
        return (threads_.size());
97
    }
98

99
private:
Razvan Becheriu's avatar
Razvan Becheriu committed
100 101 102 103 104 105 106
    /// @brief start all the threads
    ///
    /// @param thread_count specifies the number of threads to be created and
    /// started
    void startInternal(uint32_t thread_count) {
        queue_.enable();
        for (uint32_t i = 0; i < thread_count; ++i) {
107
            threads_.push_back(boost::make_shared<std::thread>(&ThreadPool::run, this));
Razvan Becheriu's avatar
Razvan Becheriu committed
108 109 110 111 112 113 114 115 116 117 118 119
        }
    }

    /// @brief stop all the threads
    void stopInternal() {
        queue_.disable();
        for (auto thread : threads_) {
            thread->join();
        }
        threads_.clear();
    }

120 121 122 123 124 125 126 127
    /// @brief Defines a generic thread pool queue.
    ///
    /// The main purpose is to safely manage thread pool tasks.
    /// The thread pool queue can be 'disabled', which means that no items can be
    /// removed from the queue, or 'enabled', which guarantees that inserting or
    /// removing items are thread safe.
    /// In 'disabled' state, all threads waiting on the queue are unlocked and all
    /// operations are non blocking.
128 129 130 131
    ///
    /// @tparam Item a 'smart pointer' to a functor
    /// @tparam QueueContainer a 'queue like' container
    template <typename Item, typename QueueContainer = std::queue<Item>>
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
    struct ThreadPoolQueue {
        /// @brief Constructor
        ///
        /// Creates the thread pool queue in 'disabled' state
        ThreadPoolQueue() : enabled_(false) {
        }

        /// @brief Destructor
        ///
        /// Destroys the thread pool queue
        ~ThreadPoolQueue() {
            disable();
            clear();
        }

        /// @brief push work item to the queue
        ///
149
        /// Used to add work items to the queue.
150 151 152 153
        /// This function adds an item to the queue and wakes up at least one thread
        /// waiting on the queue.
        ///
        /// @param item the new item to be added to the queue
154
        void push(const Item& item) {
155 156 157
            if (!item) {
                return;
            }
158 159 160 161
            {
                std::lock_guard<std::mutex> lock(mutex_);
                queue_.push(item);
            }
162
            // Notify pop function so that it can effectively remove a work item.
163
            cv_.notify_one();
164 165 166 167 168
        }

        /// @brief pop work item from the queue or block waiting
        ///
        /// Used to retrieve and remove a work item from the queue
169 170
        /// If the queue is 'disabled', this function returns immediately an empty
        /// element.
171 172 173 174
        /// If the queue is 'enabled', this function returns the first element in
        /// the queue or blocks the calling thread if there are no work items
        /// available.
        ///
175
        /// @return the first work item from the queue or an empty element.
176 177
        Item pop() {
            std::unique_lock<std::mutex> lock(mutex_);
178 179 180 181
            // Wait for push or disable functions.
            cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());});
            if (!enabled_) {
                return (Item());
182
            }
183 184 185
            Item item = queue_.front();
            queue_.pop();
            return (item);
186 187 188 189 190 191 192 193 194
        }

        /// @brief count number of work items in the queue
        ///
        /// Returns the number of work items in the queue
        ///
        /// @return the number of work items
        size_t count() {
            std::lock_guard<std::mutex> lock(mutex_);
195
            return (queue_.size());
196 197 198 199 200 201 202
        }

        /// @brief clear remove all work items
        ///
        /// Removes all queued work items
        void clear() {
            std::lock_guard<std::mutex> lock(mutex_);
203
            queue_ = QueueContainer();
204 205
        }

206
        /// @brief enable the queue
207 208 209
        ///
        /// Sets the queue state to 'enabled'
        void enable() {
210
            std::lock_guard<std::mutex> lock(mutex_);
211 212 213
            enabled_ = true;
        }

214
        /// @brief disable the queue
215
        ///
216
        /// Sets the queue state to 'disabled'
217
        void disable() {
218 219 220 221
            {
                std::lock_guard<std::mutex> lock(mutex_);
                enabled_ = false;
            }
222 223 224 225
            // Notify pop so that it can exit.
            cv_.notify_all();
        }

226 227 228 229 230 231 232
        /// @brief return the state of the queue
        ///
        /// Returns the state of the queue
        ///
        /// @return the state
        bool enabled() {
            return (enabled_);
Razvan Becheriu's avatar
Razvan Becheriu committed
233
        }
234

235 236
    private:
        /// @brief underlying queue container
237
        QueueContainer queue_;
238 239 240 241 242 243 244 245

        /// @brief mutex used for critical sections
        std::mutex mutex_;

        /// @brief condition variable used to signal waiting threads
        std::condition_variable cv_;

        /// @brief the sate of the queue
246 247
        /// The 'enabled' state corresponds to true value
        /// The 'disabled' state corresponds to false value
248
        std::atomic<bool> enabled_;
249 250
    };

251
    /// @brief run function of each thread
252
    void run() {
253
        while (queue_.enabled()) {
254
            WorkItemPtr item = queue_.pop();
255
            if (item) {
256
                try {
257
                    (*item)();
258
                } catch (...) {
259
                    // catch all exceptions
260
                }
261 262 263
            }
        }
    }
264

265
    /// @brief list of worker threads
266
    std::vector<boost::shared_ptr<std::thread>> threads_;
267 268

    /// @brief underlying work items queue
269
    ThreadPoolQueue<WorkItemPtr, Container> queue_;
270 271
};

272
}  // namespace util
273 274 275
}  // namespace isc

#endif  // THREAD_POOL_H