Commit d53ab435 authored by Razvan Becheriu's avatar Razvan Becheriu

[#892] refactored

parent 9b500ee9
......@@ -19,6 +19,7 @@
#include <dhcpsrv/cfg_db_access.h>
#include <dhcpsrv/cfgmgr.h>
#include <dhcpsrv/db_type.h>
#include <dhcpsrv/multi_threading_utils.h>
#include <hooks/hooks.h>
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
......@@ -168,6 +169,17 @@ ControlledDhcpv4Srv::loadConfigFile(const std::string& file_name) {
"processCommand(\"config-set\", json)");
}
if (MultiThreadingUtil::threadCount()) {
auto& thread_pool = MultiThreadingMgr::instance().getPktThreadPool();
if (thread_pool.size()) {
thread_pool.stop();
}
MultiThreadingMgr::instance().setMode(true);
thread_pool.start(MultiThreadingUtil::threadCount());
} else {
MultiThreadingMgr::instance().setMode(false);
}
// Now check is the returned result is successful (rcode=0) or not
// (see @ref isc::config::parseAnswer).
int rcode;
......@@ -621,16 +633,6 @@ ControlledDhcpv4Srv::processCommand(const string& command,
return (no_srv);
}
if (Dhcpv4Srv::threadCount()) {
if (srv->pkt_thread_pool_.size()) {
srv->pkt_thread_pool_.stop();
}
MultiThreadingMgr::instance().setMode(true);
srv->pkt_thread_pool_.start(Dhcpv4Srv::threadCount());
} else {
MultiThreadingMgr::instance().setMode(false);
}
try {
if (command == "shutdown") {
return (srv->commandShutdownHandler(command, args));
......
......@@ -34,6 +34,7 @@
#include <dhcpsrv/fuzz.h>
#include <dhcpsrv/lease_mgr.h>
#include <dhcpsrv/lease_mgr_factory.h>
#include <dhcpsrv/multi_threading_utils.h>
#include <dhcpsrv/ncr_generator.h>
#include <dhcpsrv/shared_network.h>
#include <dhcpsrv/subnet.h>
......@@ -802,8 +803,8 @@ Dhcpv4Srv::run() {
}
// destroying the thread pool
if (Dhcpv4Srv::threadCount()) {
pkt_thread_pool_.reset();
if (MultiThreadingUtil::threadCount()) {
MultiThreadingMgr::instance().getPktThreadPool().reset();
}
return (true);
......@@ -820,9 +821,9 @@ Dhcpv4Srv::run_one() {
// Do not read more packets from socket if there are enough
// packets to be processed in the packet thread pool queue
const int max_queue_size = Dhcpv4Srv::maxThreadQueueSize();
const int thread_count = Dhcpv4Srv::threadCount();
size_t pkt_queue_size = pkt_thread_pool_.count();
const int max_queue_size = MultiThreadingUtil::maxThreadQueueSize();
const int thread_count = MultiThreadingUtil::threadCount();
size_t pkt_queue_size = MultiThreadingMgr::instance().getPktThreadPool().count();
if (thread_count && (pkt_queue_size >= thread_count * max_queue_size)) {
read_pkt = false;
}
......@@ -902,12 +903,12 @@ Dhcpv4Srv::run_one() {
.arg(query->getLabel());
return;
} else {
if (Dhcpv4Srv::threadCount()) {
if (MultiThreadingUtil::threadCount()) {
typedef function<void()> CallBack;
boost::shared_ptr<CallBack> call_back =
boost::make_shared<CallBack>(std::bind(&Dhcpv4Srv::processPacketAndSendResponseNoThrow,
this, query, rsp));
pkt_thread_pool_.add(call_back);
MultiThreadingMgr::instance().getPktThreadPool().add(call_back);
} else {
processPacketAndSendResponse(query, rsp);
}
......@@ -1220,12 +1221,12 @@ Dhcpv4Srv::processPacket(Pkt4Ptr& query, Pkt4Ptr& rsp, bool allow_packet_park) {
// library unparks the packet.
HooksManager::park("leases4_committed", query,
[this, callout_handle, query, rsp]() mutable {
if (Dhcpv4Srv::threadCount()) {
if (MultiThreadingUtil::threadCount()) {
typedef function<void()> CallBack;
boost::shared_ptr<CallBack> call_back =
boost::make_shared<CallBack>(std::bind(&Dhcpv4Srv::sendResponseNoThrow,
this, callout_handle, query, rsp));
pkt_thread_pool_.add(call_back);
MultiThreadingMgr::instance().getPktThreadPool().add(call_back);
} else {
processPacketPktSend(callout_handle, query, rsp);
processPacketBufferSend(callout_handle, rsp);
......@@ -3864,23 +3865,6 @@ int Dhcpv4Srv::getHookIndexLease4Decline() {
return (Hooks.hook_index_lease4_decline_);
}
uint32_t Dhcpv4Srv::threadCount() {
uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount();
if (sys_threads) {
return sys_threads;
}
sys_threads = std::thread::hardware_concurrency();
return sys_threads * 0;
}
uint32_t Dhcpv4Srv::maxThreadQueueSize() {
uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize();
if (max_thread_queue_size) {
return max_thread_queue_size;
}
return 4;
}
void Dhcpv4Srv::discardPackets() {
// Clear any packets held by the callhout handle store and
// all parked packets
......
......@@ -24,7 +24,6 @@
#include <dhcpsrv/subnet.h>
#include <hooks/callout_handle.h>
#include <process/daemon.h>
#include <util/thread_pool.h>
#include <functional>
#include <iostream>
......@@ -268,12 +267,6 @@ public:
/// redeclaration/redefinition. @ref isc::process::Daemon::getVersion()
static std::string getVersion(bool extended);
/// @brief returns Kea DHCPv4 server thread count.
static uint32_t threadCount();
/// @brief returns Kea DHCPv4 server max thread queue size.
static uint32_t maxThreadQueueSize();
/// @brief Main server processing loop.
///
/// Main server processing loop. Call the processing step routine
......@@ -1039,9 +1032,6 @@ protected:
/// @brief Controls access to the configuration backends.
CBControlDHCPv4Ptr cb_control_;
/// @brief Packet processing thread pool
isc::util::ThreadPool<std::function<void()>> pkt_thread_pool_;
public:
/// Class methods for DHCPv4-over-DHCPv6 handler
......
......@@ -19,6 +19,7 @@
#include <dhcpsrv/cfg_db_access.h>
#include <dhcpsrv/cfgmgr.h>
#include <dhcpsrv/db_type.h>
#include <dhcpsrv/multi_threading_utils.h>
#include <hooks/hooks.h>
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
......@@ -140,6 +141,17 @@ ControlledDhcpv6Srv::loadConfigFile(const std::string& file_name) {
"processCommand(\"config-set\", json)");
}
if (MultiThreadingUtil::threadCount()) {
auto& thread_pool = MultiThreadingMgr::instance().getPktThreadPool();
if (thread_pool.size()) {
thread_pool.stop();
}
MultiThreadingMgr::instance().setMode(true);
thread_pool.start(MultiThreadingUtil::threadCount());
} else {
MultiThreadingMgr::instance().setMode(false);
}
// Now check is the returned result is successful (rcode=0) or not
// (see @ref isc::config::parseAnswer).
int rcode;
......@@ -254,7 +266,8 @@ ControlledDhcpv6Srv::commandConfigGetHandler(const string&,
}
ConstElementPtr
ControlledDhcpv6Srv::commandConfigWriteHandler(const string&, ConstElementPtr args) {
ControlledDhcpv6Srv::commandConfigWriteHandler(const string&,
ConstElementPtr args) {
string filename;
if (args) {
......@@ -508,7 +521,8 @@ ControlledDhcpv6Srv::commandVersionGetHandler(const string&, ConstElementPtr) {
}
ConstElementPtr
ControlledDhcpv6Srv::commandBuildReportHandler(const string&, ConstElementPtr) {
ControlledDhcpv6Srv::commandBuildReportHandler(const string&,
ConstElementPtr) {
ConstElementPtr answer =
isc::config::createAnswer(0, isc::detail::getConfigReport());
return (answer);
......@@ -605,9 +619,9 @@ ControlledDhcpv6Srv::commandStatusGetHandler(const string&,
return (createAnswer(0, status));
}
isc::data::ConstElementPtr
ControlledDhcpv6Srv::processCommand(const std::string& command,
isc::data::ConstElementPtr args) {
ConstElementPtr
ControlledDhcpv6Srv::processCommand(const string& command,
ConstElementPtr args) {
string txt = args ? args->str() : "(none)";
LOG_DEBUG(dhcp6_logger, DBG_DHCP6_COMMAND, DHCP6_COMMAND_RECEIVED)
......@@ -622,16 +636,6 @@ ControlledDhcpv6Srv::processCommand(const std::string& command,
return (no_srv);
}
if (Dhcpv6Srv::threadCount()) {
if (srv->pkt_thread_pool_.size()) {
srv->pkt_thread_pool_.stop();
}
MultiThreadingMgr::instance().setMode(true);
srv->pkt_thread_pool_.start(Dhcpv6Srv::threadCount());
} else {
MultiThreadingMgr::instance().setMode(false);
}
try {
if (command == "shutdown") {
return (srv->commandShutdownHandler(command, args));
......
......@@ -35,6 +35,7 @@
#include <dhcpsrv/cfgmgr.h>
#include <dhcpsrv/lease_mgr.h>
#include <dhcpsrv/lease_mgr_factory.h>
#include <dhcpsrv/multi_threading_utils.h>
#include <dhcpsrv/ncr_generator.h>
#include <dhcpsrv/subnet.h>
#include <dhcpsrv/subnet_selector.h>
......@@ -472,8 +473,8 @@ bool Dhcpv6Srv::run() {
}
// destroying the thread pool
if (Dhcpv6Srv::threadCount()) {
pkt_thread_pool_.reset();
if (MultiThreadingUtil::threadCount()) {
MultiThreadingMgr::instance().getPktThreadPool().reset();
}
return (true);
......@@ -489,9 +490,9 @@ void Dhcpv6Srv::run_one() {
// Do not read more packets from socket if there are enough
// packets to be processed in the packet thread pool queue
const int max_queue_size = Dhcpv6Srv::maxThreadQueueSize();
const int thread_count = Dhcpv6Srv::threadCount();
size_t pkt_queue_size = pkt_thread_pool_.count();
const int max_queue_size = MultiThreadingUtil::maxThreadQueueSize();
const int thread_count = MultiThreadingUtil::threadCount();
size_t pkt_queue_size = MultiThreadingMgr::instance().getPktThreadPool().count();
if (thread_count && (pkt_queue_size >= thread_count * max_queue_size)) {
read_pkt = false;
}
......@@ -575,12 +576,12 @@ void Dhcpv6Srv::run_one() {
.arg(query->getLabel());
return;
} else {
if (Dhcpv6Srv::threadCount()) {
if (MultiThreadingUtil::threadCount()) {
typedef function<void()> CallBack;
boost::shared_ptr<CallBack> call_back =
boost::make_shared<CallBack>(std::bind(&Dhcpv6Srv::processPacketAndSendResponseNoThrow,
this, query, rsp));
pkt_thread_pool_.add(call_back);
MultiThreadingMgr::instance().getPktThreadPool().add(call_back);
} else {
processPacketAndSendResponse(query, rsp);
}
......@@ -979,12 +980,12 @@ Dhcpv6Srv::processPacket(Pkt6Ptr& query, Pkt6Ptr& rsp) {
// library unparks the packet.
HooksManager::park("leases6_committed", query,
[this, callout_handle, query, rsp]() mutable {
if (Dhcpv6Srv::threadCount()) {
if (MultiThreadingUtil::threadCount()) {
typedef function<void()> CallBack;
boost::shared_ptr<CallBack> call_back =
boost::make_shared<CallBack>(std::bind(&Dhcpv6Srv::sendResponseNoThrow,
this, callout_handle, query, rsp));
pkt_thread_pool_.add(call_back);
MultiThreadingMgr::instance().getPktThreadPool().add(call_back);
} else {
processPacketPktSend(callout_handle, query, rsp);
processPacketBufferSend(callout_handle, rsp);
......@@ -4060,23 +4061,6 @@ Dhcpv6Srv::requestedInORO(const Pkt6Ptr& query, const uint16_t code) const {
return (false);
}
uint32_t Dhcpv6Srv::threadCount() {
uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount();
if (sys_threads) {
return sys_threads;
}
sys_threads = std::thread::hardware_concurrency();
return sys_threads * 0;
}
uint32_t Dhcpv6Srv::maxThreadQueueSize() {
uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize();
if (max_thread_queue_size) {
return max_thread_queue_size;
}
return 4;
}
void Dhcpv6Srv::discardPackets() {
// Dump all of our current packets, anything that is mid-stream
isc::dhcp::Pkt6Ptr pkt6ptr_empty;
......
......@@ -27,7 +27,6 @@
#include <dhcpsrv/subnet.h>
#include <hooks/callout_handle.h>
#include <process/daemon.h>
#include <util/thread_pool.h>
#include <functional>
#include <iostream>
......@@ -130,12 +129,6 @@ public:
/// redeclaration/redefinition. @ref isc::process::Daemon::getVersion()
static std::string getVersion(bool extended);
/// @brief returns Kea DHCPv6 server thread count.
static uint32_t threadCount();
/// @brief returns Kea DHCPv6 server max thread queue size.
static uint32_t maxThreadQueueSize();
/// @brief Returns server-identifier option.
///
/// @return server-id option
......@@ -1105,9 +1098,6 @@ protected:
/// @brief Controls access to the configuration backends.
CBControlDHCPv6Ptr cb_control_;
/// @brief Packet processing thread pool
isc::util::ThreadPool<std::function<void()>> pkt_thread_pool_;
};
} // namespace dhcp
......
......@@ -6,9 +6,11 @@
#include <config.h>
#include <dhcpsrv/cfgmgr.h>
#include <dhcpsrv/multi_threading_utils.h>
#include <util/multi_threading_mgr.h>
#include <exceptions/exceptions.h>
#include <util/multi_threading_mgr.h>
using namespace isc::util;
......@@ -41,5 +43,24 @@ MultiThreadingCriticalSection::~MultiThreadingCriticalSection() {
}
}
uint32_t
MultiThreadingUtil::threadCount() {
uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount();
if (sys_threads) {
return sys_threads;
}
sys_threads = std::thread::hardware_concurrency();
return sys_threads * 0;
}
uint32_t
MultiThreadingUtil::maxThreadQueueSize() {
uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize();
if (max_thread_queue_size) {
return max_thread_queue_size;
}
return 4;
}
} // dhcp
} // isc
......@@ -9,6 +9,8 @@
#include <boost/noncopyable.hpp>
#include <stdint.h>
namespace isc {
namespace dhcp {
......@@ -46,6 +48,16 @@ public:
static void startPktProcessing();
};
class MultiThreadingUtil {
public:
/// @brief returns Kea DHCPv4 server thread count.
static uint32_t threadCount();
/// @brief returns Kea DHCPv4 server max thread queue size.
static uint32_t maxThreadQueueSize();
};
}
}
#endif // MULTI_THREADING_UTIL_H
......@@ -31,5 +31,20 @@ MultiThreadingMgr::setMode(bool enabled) {
enabled_ = enabled;
}
} // namespace isc::util
} // namespace isc
ThreadPool<std::function<void()>>&
MultiThreadingMgr::getPktThreadPool() {
return pkt_thread_pool_;
}
uint32_t
MultiThreadingMgr::getPktThreadPoolSize() const {
return (pkt_thread_pool_size_);
}
void
MultiThreadingMgr::setPktThreadPoolSize(uint32_t size) {
pkt_thread_pool_size_ = size;
}
} // namespace util
} // namespace isc
......@@ -7,8 +7,12 @@
#ifndef MULTI_THREADING_MGR_H
#define MULTI_THREADING_MGR_H
#include <util/thread_pool.h>
#include <boost/noncopyable.hpp>
#include <stdint.h>
namespace isc {
namespace util {
......@@ -62,6 +66,21 @@ public:
/// @param enabled The new mode.
void setMode(bool enabled);
/// @brief Get the packet thread pool.
///
/// @return The packet thread pool of this binary instance.
ThreadPool<std::function<void()>>& getPktThreadPool();
/// @brief Get the configured packet thread pool size.
///
/// @return The packet thread pool size of this binary instance.
uint32_t getPktThreadPoolSize() const;
/// @brief Set the configured packet thread pool size.
///
/// @param size The packet thread pool size of this binary instance.
void setPktThreadPoolSize(uint32_t size);
protected:
/// @brief Constructor.
......@@ -71,11 +90,18 @@ protected:
virtual ~MultiThreadingMgr();
private:
/// @brief the current mode.
/// @brief The current mode.
bool enabled_;
/// @brief The configured size of the packet thread pool.
uint32_t pkt_thread_pool_size_;
/// @brief Packet processing thread pool.
ThreadPool<std::function<void()>> pkt_thread_pool_;
};
} // namespace isc::util
} // namespace isc
} // namespace util
} // namespace isc
#endif // MULTI_THREADING_MGR_H
......@@ -230,7 +230,7 @@ private:
/// @return the state
bool enabled() {
return (enabled_);
}
}
private:
/// @brief underlying queue container
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment