|
|
# Process each packet on a separate thread using thread pool.
|
|
|
---
|
|
|
|
|
|
An initial implementation (proof of concept) can be found in branch: **new-multi-threading-pkt-processing**.
|
|
|
The branch might be outdated relative to the design document.
|
|
|
|
|
|
## Proof of concept results
|
|
|
---
|
|
|
The tests were performed on a system with the following hardware specifications:
|
|
|
|
|
|
```
|
|
|
CPUs: 12 x Intel(R) Xeon(R) CPU E5-2680 v3 @ 2.50GHz
|
|
|
RAM: 128GB (4 x 32GB)
|
|
|
Disk: SCSI 1TB HDD XFS filesystem
|
|
|
```
|
|
|
|
|
|
The implementation uses ThreadResource instead of thread_local variables (more details will follow).
|
|
|
|
|
|
The server process was configured to use 12 threads for parallel packet processing.
|
|
|
|
|
|
The server is using only one subnet and one address pool.
|
|
|
|
|
|
The perf-dhcp process simulates discrete clients, each client requesting for one address, without any extra requested options.
|
|
|
|
|
|
The measurements are made by counting the leases in the database, not by counting the packets received back by perf-dhcp process.
|
|
|
|
|
|
The full 4 message exchange (solicit/discover - advertise/offer - request - confirm/reply) is done for each client when a lease is inserted in the database back-end.
|
|
|
|
|
|
Results:
|
|
|
|
|
|
```
|
|
|
mysql v4
|
|
|
3200 leases/sec
|
|
|
|
|
|
mysql v6
|
|
|
3450 leases/sec
|
|
|
|
|
|
pgsql v4
|
|
|
2750 leases/sec
|
|
|
|
|
|
pgsql v6
|
|
|
2700 leases/sec
|
|
|
```
|
|
|
|
|
|
## Initial considerations
|
|
|
---
|
|
|
|
|
|
As the implementation of the feature will have impact of many aspects of the server, it might be possible that not all known functionality will be compatible while configuring the server to use multi-threading packet processing.
|
|
|
|
|
|
The solution would be to disable multi-threading when using the respective functionality or simply warn the user about disabling the functionality while using the multi-threading feature.
|
|
|
|
|
|
This design is considering adding multi-threading packet processing for the dhcpv4 and dhcpv6 servers, for MySql, PgSql, Memfile and Cassandra (being scheduled last) back-ends.
|
|
|
|
|
|
The use of Memfile will be supported but it is not a priority as the performance is already satisfactory.
|
|
|
The Cassandra back-end is not a priority as it is not widely used.
|
|
|
|
|
|
It is safe to consider that not all hook libraries will be compatible initially with the multi-threading feature (referring to the first iteration of the implementation), but the target is to fully support hooks in multi-threading mode when enabling this feature.
|
|
|
|
|
|
It is expected to have a performance impact when using some hooks that require multiple critical sections (usually libraries which modify the internal state of the server).
|
|
|
|
|
|
The status of the development will update this restrictions as new insight of the compatibility between features is discovered and verified.
|
|
|
|
|
|
## Requirements and assumptions
|
|
|
---
|
|
|
The list of requirements can be found at:
|
|
|
https://gitlab.isc.org/isc-projects/kea/wikis/kea-multithreading-packet-processing-requirements
|
|
|
|
|
|
The list of supported deployment scenarios which must not break DHCP protocol:
|
|
|
```
|
|
|
Scenario 0: single server, one thread only (i.e. current code).
|
|
|
Scenario 1: single server, single threaded (one main thread + one worker thread)
|
|
|
Scenario 2: single server, multi-threaded
|
|
|
Scenario 3: two servers, connected to a single database, using different pools
|
|
|
Scenario 4: two servers, connected to a single database, HA
|
|
|
Scenario 5: two servers, connected to a single database, LB
|
|
|
Scenario 6: two servers, using shared database, the same pool
|
|
|
Scenario 7: two servers, leases stored in mysql, hosts stored in something else than mysql, e.g. pgsql
|
|
|
```
|
|
|
|
|
|
The assumptions made are that if the listed scenarios are fully supported and are compliant with a valid DHCP protocol behavior, then the multi-threaded implementation will rely on this functionality and will at most require the same restrictions (usually less as one process has more control and can synchronize easily it's own threads and internal state) as the listed scenarios which imply multiple processes using same or different server IDs.
|
|
|
|
|
|
## Configuration options
|
|
|
---
|
|
|
|
|
|
The design is targeting the possibility of the user to configure the feature on and off, and to change the number of threads even at run-time, using command channel, just as it would be done for the other supported settings.
|
|
|
This design takes into consideration the possibility to implement in the future a way to make the server automatically reconfigure itself according to traffic load.
|
|
|
|
|
|
The initial configuration options will include:
|
|
|
- enabling the multi threading packet processing, which will use some predefined default values for the thread count and packet queue size, if these are not explicitly specified
|
|
|
- configuring the number of threads used to process packets (0 translates in automatic detection of the number of threads to be used by the multi-threaded packet processing feature, 1 or more translates in enabling the multi-threaded packet processing feature with respective thread count, max allowed value should be 65535)
|
|
|
- configuring the size of the packet queue used to read packets from the internet network adapter driver (kernel driver queue) through the socket and feed to the thread pool (0 translates to unlimited, max allowed value should be 65535).
|
|
|
|
|
|
```
|
|
|
// eg: server v4
|
|
|
|
|
|
"Dhcp4": {
|
|
|
...
|
|
|
"multi-threading": {
|
|
|
"enable-multi-threading": true,
|
|
|
"thread-pool-size": 64,
|
|
|
"packet-queue-size": 256
|
|
|
},
|
|
|
...
|
|
|
}
|
|
|
|
|
|
// eg: server v6
|
|
|
|
|
|
"Dhcp6": {
|
|
|
...
|
|
|
"multi-threading": {
|
|
|
"enable-multi-threading": true,
|
|
|
"thread-pool-size": 64,
|
|
|
"packet-queue-size": 256
|
|
|
},
|
|
|
...
|
|
|
}
|
|
|
```
|
|
|
|
|
|
## High level design
|
|
|
---
|
|
|
The main idea of this feature is to split the packet processing in a completely separate and independent action.
|
|
|
|
|
|
If this action is viewed as a 'stream', it should not interfere with other 'stream'. This means that all the resources needed to process one packet should be 'reserved' and 'fully available' throughout the whole duration of the action.
|
|
|
|
|
|
If this is accomplished, each 'stream' can be processed in parallel on a different thread.
|
|
|
To make this possible, the current implementation must be changed to offer a way to reserve and manage such resources on demand for each 'stream'.
|
|
|
|
|
|
However, there are some resources that need to be shared, but access to these resources must be kept to a minimum.
|
|
|
Some of these resources are the socket used to read and write packets, the running configuration, the allocation process (if it is not independent as well by splitting the allocation space) and statistics.
|
|
|
|
|
|
The following image displays the timeline and interaction of the main components.
|
|
|
|
|
|
![Untitled_Diagram](uploads/d88e280a2ad438ccfb1f04ccc54dfeba/Untitled_Diagram.png)
|
|
|
|
|
|
As the image describes, the main thread is the only thread with lifetime from the beginning to the end of the process. The main thread handles the configure action which consists of reading the configuration from the file or, as displayed, from the config database.
|
|
|
|
|
|
It first destroys resources used until the configuration process, like old managers, old database handles, old processing threads, then it start loading and applying new configuration, like creating managers, creating threads, creating new config database handle to retrieve the remote configuration, merging and applying database configuration, etc.
|
|
|
|
|
|
After creating new resources, the main thread continues with the usual processing actions, like reading packets, handling io events and handling signals. The only major difference is that the processing of each packet has moved on the thread pool threads.
|
|
|
|
|
|
Next to the main thread is the queue control thread which reads packets and adds them in an internal queue, and then signals the waiting receivePacket function on the main thread. If direct reading is used, the receivePacket simply waits for a packet to be read from the socket (kernel driver queue).
|
|
|
|
|
|
Initially, the queue control feature can not be used when multi-threading is enabled, but these features can share common code in future releases.
|
|
|
|
|
|
The lifespan and actions of these threads can be seen in the image by following the vertical lines associated to each thread.
|
|
|
|
|
|
The main thread usually blocks on the read packet action (direct read of packet or waiting for the queue control thread to add a new packet in the queue). Whenever it reads a new packet, it adds it to the thread pool queue and it signals one of the threads to start processing.
|
|
|
|
|
|
The thread pool threads usually wait for packets to be inserted in the queue. At this moment, they start processing until the queue is empty and start waiting again.
|
|
|
|
|
|
Each thread creates it's own lease and host database handle (inside the context object of the respective manager) which it uses independently, without other thread synchronization or interaction. This resource is available for the respective thread until the next reconfiguration action.
|
|
|
|
|
|
There are some critical actions that the server need to execute with the threads stopped (or paused) so that the internal state is preserved. Because the initial implementation does not pause threads, but it stops them instead, a connection pool is used to manage resources needed by each thread. These resources are managed by the respective database manager (lease manager/host manager) and are available even if the threads are destroyed.
|
|
|
In the future, pausing threads will make this redundant, as the resources can be managed automatically, without locking, by using thread_local variables.
|
|
|
|
|
|
The resources handle (context) is created inside the respective manager, using lazy initialization (only when the thread reaches the manager code requiring the handle).
|
|
|
|
|
|
This can also be seen in the image by following the vertical lines.
|
|
|
|
|
|
As it can be observed, between the destruction of old resources and the creation of the new ones, the only thread running is the main thread, which can safely update the state of the configuration and other global, shared 'thread unsafe' objects. This is the only 'window' that can be used to perform such 'unsafe' actions.
|
|
|
|
|
|
At the moment of the reconfiguration, all resources are destroyed and recreated using the new configuration, meaning that all managers, connection pools, connections and database back-end handles are destroyed and then created.
|
|
|
|
|
|
The handling of lease reclamation and delete expired leases is performed on the main thread in the handle io events block. As it is shown, the lease database handle is created when first handling the event and it remains available for the thread until the next reconfiguration action.
|
|
|
|
|
|
## Implementation details
|
|
|
---
|
|
|
|
|
|
Single-threaded server implementation handles the following tasks on each loop iteration:
|
|
|
|
|
|
```
|
|
|
- receivePacket (read packets from interface socket or from packet queue)
|
|
|
- handleSignal (handle signals synchronously, including processCommand config-reload on SIGHUP)
|
|
|
- processPacket
|
|
|
- processPacketBufferSend
|
|
|
- io service poll
|
|
|
```
|
|
|
|
|
|
The idea of processing packets on different threads will require the separation of the stages:
|
|
|
|
|
|
```
|
|
|
- receivePacket
|
|
|
- handleSignal
|
|
|
- io service poll
|
|
|
```
|
|
|
|
|
|
from the rest of the stages:
|
|
|
|
|
|
```
|
|
|
- processPacket
|
|
|
- processPacketBufferSend
|
|
|
```
|
|
|
|
|
|
which are strictly related to the data received from the first stage (receivePacket).
|
|
|
|
|
|
*As the first stage (receivePacket) might also be a bottle-neck, this performance enhancement is not discussed in this document.*
|
|
|
|
|
|
##### To make each packet computation independent, the processPacket and processPacketBufferSend functions must be thread safe.
|
|
|
##### To make this possible, some simple guidelines must be taken into consideration:
|
|
|
- all global classes accessed by these functions must be thread safe
|
|
|
- all global classes accessed by these functions must not change state relative to any packet (except if that is the intended use of the class, like StatsMgr)
|
|
|
- all thread specific variables must be created on stack, so each thread can use it's own local stack variables, or use thread_local if initialization is too expensive
|
|
|
- all function static variables must be removed or moved on the stack, or use thread_local if initialization is too expensive
|
|
|
- the std::shared_ptr offers thread safe access to the underlying object, as long as there are no threads overwriting the same smart pointer (there are no functions returning a reference to the same shared pointer which might be modified in separate threads: calling reset or assign)
|
|
|
- as long as functions return shared_ptr objects, the implementation can be considered safe, as the underlying state of the smart pointers remains consistent
|
|
|
|
|
|
From the design of the application and the requirements of a valid packet response, some global classes can change their internal state (like the CfgMgr), as long as these changes do not occur while any packet is processed.
|
|
|
|
|
|
On each packet processing, the following global classes (static function calls or singleton implementations) are used:
|
|
|
|
|
|
```
|
|
|
- StatsMgr (update different statistics related to packet processing)
|
|
|
- CfgMgr (global server configuration, including configured subnets, options and reservations)
|
|
|
- LibDHCP (global option definitions)
|
|
|
- IfaceMgr (handling DHCP traffic, reading and writing packets)
|
|
|
- AllocEngine (allocation engine which generates new addresses and handles different packet requests)
|
|
|
- LeaseMgr (database back-end storing leases)
|
|
|
- HostMgr (database back-end storing reservations)
|
|
|
- HooksManager (handle hook points and creates hooks libraries context and parameters)
|
|
|
- CalloutManager (handle hook points and hooks libraries)
|
|
|
- Dhcp4to6Ipc (handle send/receive DHCPv4 Over DHCPv6 packets between v4 server and v6 server)
|
|
|
- Dhcp6to4Ipc (handle send/receive DHCPv4 Over DHCPv6 packets between v4 server and v6 server)
|
|
|
- D2ClientMgr (handle D2 requests)
|
|
|
```
|
|
|
|
|
|
The io service poll handles the lease reclamation and lease deletion events. These events only call functions within AllocEngine and LeaseMgr so the thread safety will be covered by requirements of processPacket and processPacketBufferSend functions.
|
|
|
|
|
|
Some specific optimizations for each class:
|
|
|
|
|
|
*** StatsMgr
|
|
|
* statistics for each thread should be non-blocking:
|
|
|
* each thread should have it's own statistics and only merge statistics on command request
|
|
|
* could use read-write locks (read lock on statistics update, write lock on merge statistics)
|
|
|
* initial implementation can use standard locks for each statistic and lock all of them when merging statistics on retrieve statistics
|
|
|
* the std::atomic (which is platform dependent, having different performance depending on the implementation and HW support) can not be used, as time based statistics and history using count limit have been implemented and statistics are not saved in a single variable any more
|
|
|
|
|
|
*** CfgMgr
|
|
|
* thread safety not absolutely necessary as the thread pool can be stopped before and started after server reconfiguration
|
|
|
* the reconfiguration of the server is done in specific locations in the execution path like processCommand
|
|
|
* before modifying any run-time configuration, the packet thread pool must be stopped, including cases like hook points
|
|
|
* a RAII class MultiThreadingCriticalSection can be used to stop and start the thread pool before doing any configuration changes
|
|
|
|
|
|
```
|
|
|
// eg:
|
|
|
|
|
|
MultiThreadingCriticalSection::MultiThreadingCriticalSection() {
|
|
|
MultiThreadingMgr::instance().enterCriticalSection();
|
|
|
}
|
|
|
|
|
|
MultiThreadingCriticalSection::~MultiThreadingCriticalSection() {
|
|
|
MultiThreadingMgr::instance().exitCriticalSection();
|
|
|
}
|
|
|
|
|
|
void
|
|
|
MultiThreadingMgr::stopPktProcessing() {
|
|
|
if (getMode() && getThreadPoolSize() && !isInCriticalSection()) {
|
|
|
thread_pool_.stop();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void
|
|
|
MultiThreadingMgr::startPktProcessing() {
|
|
|
if (getMode() && getThreadPoolSize() && !isInCriticalSection()) {
|
|
|
thread_pool_.start(getThreadPoolSize());
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
*** IfaceMgr
|
|
|
* the IfaceMgr must handle sockets related to TCP/UDP DHCP traffic
|
|
|
* several changes of the current code are necessary so that the handling of IO events is done by other manager (eg: EventMgr) within the server
|
|
|
* the IFaceMgr must register the interface sockets to EventMgr and wait for a notification (the code for IndirectRead becomes the primary and only way to read packets from the socket) from EventMgr whenever a DHCP packet is received
|
|
|
* readIndirect should be called only if there is enough space in the thread pool task queue, otherwise the main thread will just continue to io service poll
|
|
|
* the initial implementation can disable queue control when using multi-threading, so that only the direct path is used
|
|
|
|
|
|
*** LeaseMgr
|
|
|
* create thread context and all database exchanges in a pool or use thread_local (the context will be created only once for each thread)
|
|
|
* design a context class to use per thread back-end connection and exchanges
|
|
|
* each context should have a thread_local specific handle
|
|
|
* if thread_local is not available, each thread should have a ThreadResource which returns the specific context handle, or should be able to extract an available context from a context pool
|
|
|
* the ThreadResource implementation (if required) will be discussed below
|
|
|
* the openDatabase and prepareStatements functions should only do such operation for the newly created resource retrieved by the context handle (thread_local or ThreadResource handle object or context from context pool of resources)
|
|
|
* the reconnect event recreates the managers and also recreates the respective contexts, resources, connections and all database back-end handles along with it
|
|
|
* initial implementation will use a context pool which will allocate a new context on demand
|
|
|
* each context will have it's own back-end database connection and respective exchanges
|
|
|
* each context is popped out of the pool when a database operation is needed, and it is pushed back into the pool after the operation is performed (using XXXLeaseContextAlloc RAII class)
|
|
|
* this behavior will not provide any guarantee that all queries used to process a packet are going to use the same context and resources and require each query to be independent from the others (using transactions is not possible)
|
|
|
* this can be easily optimized in the future by retaining a 'cached' reference in a thread_local variable so that the pop and push operations are no longer needed, or even completely removing the context pool and using thread_local and compiler to manage resources automatically, effectively permitting using transactions when processing packets
|
|
|
|
|
|
The context class which contains a database connection handle and respective exchanges.
|
|
|
|
|
|
```
|
|
|
class XXXLeaseContext {
|
|
|
public:
|
|
|
|
|
|
/// @brief Constructor
|
|
|
XXXLeaseContext(const db::DatabaseConnection::ParameterMap& parameters);
|
|
|
|
|
|
boost::scoped_ptr<XXXLease4Exchange> exchange4_; ///< Exchange object
|
|
|
boost::scoped_ptr<XXXLease6Exchange> exchange6_; ///< Exchange object
|
|
|
|
|
|
/// @brief connection
|
|
|
db::XXXConnection conn_;
|
|
|
};
|
|
|
```
|
|
|
|
|
|
The lease manager will create a context pool and also an initial context used in single thread mode.
|
|
|
|
|
|
```
|
|
|
XXXLeaseMgr::XXXLeaseMgr(const XXXConnection::ParameterMap& parameters)
|
|
|
: parameters_(parameters) {
|
|
|
|
|
|
...
|
|
|
|
|
|
// Create an initial context.
|
|
|
pool_.reset(new XXXLeaseContextPool());
|
|
|
pool_->pool_.push_back(createContext());
|
|
|
}
|
|
|
```
|
|
|
|
|
|
The context pool of the manager which stores all allocated and available resources.
|
|
|
|
|
|
```
|
|
|
class XXXLeaseContextPool {
|
|
|
public:
|
|
|
|
|
|
/// @brief The vector of available contexts.
|
|
|
std::vector<XXXLeaseContextPtr> pool_;
|
|
|
|
|
|
/// @brief The mutex to protect pool access.
|
|
|
std::mutex mutex_;
|
|
|
};
|
|
|
```
|
|
|
|
|
|
The RAII XXXLeaseContextAlloc class responsible for creating new contexts on demand, popping/pushing current context from/to the context pool.
|
|
|
|
|
|
```
|
|
|
XXXLeaseMgr::XXXLeaseContextAlloc::XXXLeaseContextAlloc(
|
|
|
const XXXLeaseMgr& mgr) : ctx_(), mgr_(mgr) {
|
|
|
|
|
|
if (MultiThreadingMgr::instance().getMode()) {
|
|
|
// multi-threaded
|
|
|
{
|
|
|
// we need to protect the whole pool_ operation, hence extra scope {}
|
|
|
lock_guard<mutex> lock(mgr_.pool_->mutex_);
|
|
|
if (!mgr_.pool_->pool_.empty()) {
|
|
|
ctx_ = mgr_.pool_->pool_.back();
|
|
|
mgr_.pool_->pool_.pop_back();
|
|
|
}
|
|
|
}
|
|
|
if (!ctx_) {
|
|
|
ctx_ = mgr_.createContext();
|
|
|
}
|
|
|
} else {
|
|
|
// single-threaded
|
|
|
if (mgr_.pool_->pool_.empty()) {
|
|
|
isc_throw(Unexpected, "No available XXX lease context?!");
|
|
|
}
|
|
|
ctx_ = mgr_.pool_->pool_.back();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
XXXLeaseMgr::XXXLeaseContextAlloc::~XXXLeaseContextAlloc() {
|
|
|
if (MultiThreadingMgr::instance().getMode()) {
|
|
|
// multi-threaded
|
|
|
lock_guard<mutex> lock(mgr_.pool_->mutex_);
|
|
|
mgr_.pool_->pool_.push_back(ctx_);
|
|
|
}
|
|
|
// If running in single-threaded mode, there's nothing to do here.
|
|
|
}
|
|
|
```
|
|
|
|
|
|
The lease manager create context function.
|
|
|
|
|
|
```
|
|
|
XXXLeaseContextPtr
|
|
|
XXXLeaseMgr::createContext() const {
|
|
|
XXXLeaseContextPtr ctx(new XXXLeaseContext(parameters_));
|
|
|
|
|
|
// Open the database.
|
|
|
ctx->conn_.openDatabase();
|
|
|
|
|
|
// Prepare all statements likely to be used.
|
|
|
ctx->conn_.prepareStatements();
|
|
|
|
|
|
// Create the exchange objects for use in exchanging data between the
|
|
|
// program and the database.
|
|
|
ctx->exchange4_.reset(new XXXLease4Exchange());
|
|
|
ctx->exchange6_.reset(new XXXLease6Exchange());
|
|
|
|
|
|
return (ctx);
|
|
|
}
|
|
|
```
|
|
|
|
|
|
*** HostMgr
|
|
|
* similar as for LeaseMgr
|
|
|
|
|
|
*** ThreadResource
|
|
|
* although it might not be used, it can be useful to manage resources specific to each thread
|
|
|
* should create a resource for the calling thread if the resource does not exist, or return the already existing resource
|
|
|
* in some cases using thread_local variables will suffice, but if thread_local is not supported by the platform/compiler, this custom implementation can be used
|
|
|
* should be a generic class which can be used to allocate respective class members needed for each thread
|
|
|
|
|
|
```
|
|
|
// eg:
|
|
|
|
|
|
template <typename Resource>
|
|
|
class ThreadResource {
|
|
|
typedef std::shared_ptr<Resource> ResourcePtr;
|
|
|
public:
|
|
|
ResourcePtr resource() {
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
auto id = std::this_thread::get_id();
|
|
|
if (map_.find(id) != map_.end()) {
|
|
|
return map_[id];
|
|
|
}
|
|
|
ResourcePtr result(std::make_shared<Resource>());
|
|
|
map_[id] = result;
|
|
|
return result;
|
|
|
}
|
|
|
private:
|
|
|
std::mutex mutex_;
|
|
|
std::unordered_map<std::thread::id, ResourcePtr> map_;
|
|
|
};
|
|
|
```
|
|
|
|
|
|
*** ThreadPool
|
|
|
* is responsible for creating and managing a specified number of threads which all do the same type of processing
|
|
|
* it should be possible to stop the thread pool and resize it without losing the already queued tasks, as waiting for the entire queue to be processed would take too long
|
|
|
* implementation of a wait function would provide a great advantage so that the threads are just paused, not destroyed, and thread_local resources don't need to be recreated in some scenarios, also making the resume operation faster by simply signaling the threads, not recreating them
|
|
|
|
|
|
```
|
|
|
using WorkItemCallBack = std::function<void()>;
|
|
|
|
|
|
void create(uint32_t worker_threads); // create a specified number of threads and create a new tasks queue
|
|
|
|
|
|
void destroy(); // stop all threads and destroy the tasks queue
|
|
|
|
|
|
void start(uint32_t worker_threads); // create a specified number of threads without creating a new tasks queue
|
|
|
|
|
|
void stop(); // stop all threads without destroying the tasks queue
|
|
|
|
|
|
void add(WorkItemCallBack& call_back); // add a new item to be processed by any available thread
|
|
|
|
|
|
size_t count(); // number of items in the queue
|
|
|
|
|
|
size_t size(); // number of configured threads
|
|
|
```
|
|
|
|
|
|
*** CalloutManager
|
|
|
* it must be able to handle hooks callouts independent on each thread
|
|
|
* the 'current_hook' and 'current_library' must be independent and available for each thread
|
|
|
|
|
|
##### Integration with the server code
|
|
|
*** possible implementation
|
|
|
- the server code should only take packets out from the socket (kernel driver queue) if there is enough space in the thread pool queue to add the new packets, so the call to receivePacket is done after this check
|
|
|
- the transaction id or 'client identifier' and (if necessary and available) the address of interest (ClientTxdAddressUUID) can be used as a slot id for the packets in the queue, so if a newer packet is received, the older unprocessed packet can be overwritten
|
|
|
- if the packet has been received from the driver, update the server packet map with the current packet if there is such a mapping already (updating the client request with the latest request), or add the new mapping and create an associated thread pool task
|
|
|
- as packets added to the queue are processed in the order of the tasks associated to the packet ClientTxdAddressUUID, packets will be processed in the order of the thread pool tasks
|
|
|
- a single packet queue can be used for all threads which will naturally provide the functionality of automatically balance the 'virtual' queues of all the threads (packets are not pre-scheduled to any specific thread)
|
|
|
|
|
|
```
|
|
|
// eg: server v4
|
|
|
|
|
|
void
|
|
|
Dhcpv4Srv::run_one() {
|
|
|
// client's message and server's response
|
|
|
Pkt4Ptr query;
|
|
|
Pkt4Ptr rsp;
|
|
|
|
|
|
try {
|
|
|
|
|
|
// Do not read more packets from socket if there are enough
|
|
|
// packets to be processed in the packet thread pool queue
|
|
|
const int max_queued_pkt_per_thread = Dhcpv4Srv::maxThreadQueueSize();
|
|
|
size_t pkt_queue_size = pkt_thread_pool_.count();
|
|
|
bool read_packet = true;
|
|
|
if (MultiThreadingMgr::instance().getMode() && pkt_queue_size >=
|
|
|
Dhcpv4Srv::threadCount() * max_queued_pkt_per_thread) {
|
|
|
read_packet = false;
|
|
|
}
|
|
|
|
|
|
// Set select() timeout to 1s. This value should not be modified
|
|
|
// because it is important that the select() returns control
|
|
|
// frequently so as the IOService can be polled for ready handlers.
|
|
|
if (read_packet) {
|
|
|
uint32_t timeout = 1;
|
|
|
// LOG_DEBUG(packet4_logger, DBG_DHCP4_DETAIL, DHCP4_BUFFER_WAIT).arg(timeout);
|
|
|
query = receivePacket(timeout);
|
|
|
}
|
|
|
...
|
|
|
|
|
|
// eg: server v6
|
|
|
|
|
|
void
|
|
|
Dhcpv6Srv::run_one() {
|
|
|
// client's message and server's response
|
|
|
Pkt6Ptr query;
|
|
|
Pkt6Ptr rsp;
|
|
|
|
|
|
try {
|
|
|
|
|
|
// Do not read more packets from socket if there are enough
|
|
|
// packets to be processed in the packet thread pool queue
|
|
|
const int max_queued_pkt_per_thread = Dhcpv6Srv::maxThreadQueueSize();
|
|
|
size_t pkt_queue_size = pkt_thread_pool_.count();
|
|
|
bool read_packet = true;
|
|
|
if (MultiThreadingMgr::instance().getMode() && pkt_queue_size >=
|
|
|
Dhcpv6Srv::threadCount() * max_queued_pkt_per_thread) {
|
|
|
read_packet = false;
|
|
|
}
|
|
|
|
|
|
// Set select() timeout to 1s. This value should not be modified
|
|
|
// because it is important that the select() returns control
|
|
|
// frequently so as the IOService can be polled for ready handlers.
|
|
|
if (read_packet) {
|
|
|
uint32_t timeout = 1;
|
|
|
// LOG_DEBUG(packet6_logger, DBG_DHCP6_DETAIL, DHCP6_BUFFER_WAIT).arg(timeout);
|
|
|
query = receivePacket(timeout);
|
|
|
}
|
|
|
...
|
|
|
```
|
|
|
|
|
|
##### Calling processPacket and processPacketBufferSend functions
|
|
|
Each DHCP packet should be uniquely identified by the transaction id or 'client identifier' and (if necessary and available) the address of interest for the specific DHCP transaction.
|
|
|
|
|
|
The Pkt class should implement a method to receive such unique identifier (eg. getClientTxdAddressUUID()).
|
|
|
|
|
|
The server will keep the mapping of each ClientTxdAddressUUID and corresponding received packet so that it can update the packet with the latest version of the request. This will ensure that the server will not process the same request several times, causing possible transactions failures (if using transactions) on the database back-end queries, and also not wasting processing time on old packets which might have already 'expired'.
|
|
|
|
|
|
Each read packet is searched in the server map and if it is found, the old packet is overwritten by the new packet and no new task is added to the queue. If the packet is not found, then it is inserted in the mapping and a new thread pool task is added.
|
|
|
|
|
|
```
|
|
|
// eg: server 4
|
|
|
|
|
|
// If the DHCP service has been globally disabled, drop the packet.
|
|
|
if (!network_state_->isServiceEnabled()) {
|
|
|
LOG_DEBUG(bad_packet4_logger, DBG_DHCP4_BASIC,
|
|
|
DHCP4_PACKET_DROP_0008)
|
|
|
.arg(query->getLabel());
|
|
|
return;
|
|
|
} else {
|
|
|
if (MultiThreadingMgr::instance().getMode()) {
|
|
|
bool create_task = true;
|
|
|
auto id = query->getClientTxdAddressUUID();
|
|
|
{
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
if (pkt_map_.find(id) != pkt_map_.end()) {
|
|
|
pkt_map_[id] = std::make_pair(query, rsp);
|
|
|
create_task = false;
|
|
|
} else {
|
|
|
pkt_map_[id] = std::make_pair(query, rsp);
|
|
|
}
|
|
|
}
|
|
|
if (create_task) {
|
|
|
ThreadPool::WorkItemCallBack call_back =
|
|
|
std::bind(&Dhcpv4Srv::processPacketAndSendResponseNoThrow, this, id);
|
|
|
pkt_thread_pool_.add(call_back);
|
|
|
}
|
|
|
} else {
|
|
|
processPacketAndSendResponse(query, rsp);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void
|
|
|
Dhcpv4Srv::processPacketAndSendResponseNoThrow(ClientTxdAddressUUID& id) {
|
|
|
try {
|
|
|
Pkt4Ptr query;
|
|
|
Pkt4Ptr rsp;
|
|
|
{
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
auto pkt_data = pkt_map_[id];
|
|
|
query = pkt_data.first;
|
|
|
rsp = pkt_data.second;
|
|
|
pkt_map_.erase(id);
|
|
|
}
|
|
|
processPacketAndSendResponse(query, rsp);
|
|
|
} catch (const std::exception& e) {
|
|
|
LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_STD_EXCEPTION)
|
|
|
.arg(e.what());
|
|
|
} catch (...) {
|
|
|
LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_EXCEPTION);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void
|
|
|
Dhcpv4Srv::processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp) {
|
|
|
processPacket(query, rsp);
|
|
|
if (!rsp) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
CalloutHandlePtr callout_handle = getCalloutHandle(query);
|
|
|
processPacketBufferSend(callout_handle, rsp);
|
|
|
}
|
|
|
|
|
|
// eg: server 6
|
|
|
|
|
|
// If the DHCP service has been globally disabled, drop the packet.
|
|
|
if (!network_state_->isServiceEnabled()) {
|
|
|
LOG_DEBUG(bad_packet6_logger, DBG_DHCP6_DETAIL_DATA,
|
|
|
DHCP6_PACKET_DROP_DHCP_DISABLED)
|
|
|
.arg(query->getLabel());
|
|
|
return;
|
|
|
} else {
|
|
|
if (MultiThreadingMgr::instance().getMode()) {
|
|
|
bool create_task = true;
|
|
|
auto id = query->getClientTxdAddressUUID();
|
|
|
{
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
if (pkt_map_.find(id) != pkt_map_.end()) {
|
|
|
pkt_map_[id] = std::make_pair(query, rsp);
|
|
|
create_task = false;
|
|
|
} else {
|
|
|
pkt_map_[id] = std::make_pair(query, rsp);
|
|
|
}
|
|
|
}
|
|
|
if (create_task) {
|
|
|
ThreadPool::WorkItemCallBack call_back =
|
|
|
std::bind(&Dhcpv6Srv::processPacketAndSendResponseNoThrow, this, id);
|
|
|
pkt_thread_pool_.add(call_back);
|
|
|
}
|
|
|
} else {
|
|
|
processPacketAndSendResponse(query, rsp);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void
|
|
|
Dhcpv6Srv::processPacketAndSendResponseNoThrow(ClientTxdAddressUUID& id) {
|
|
|
try {
|
|
|
Pkt6Ptr query;
|
|
|
Pkt6Ptr rsp;
|
|
|
{
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
auto pkt_data = pkt_map_[id];
|
|
|
query = pkt_data.first;
|
|
|
rsp = pkt_data.second;
|
|
|
pkt_map_.erase(id);
|
|
|
}
|
|
|
processPacketAndSendResponse(query, rsp);
|
|
|
} catch (const std::exception& e) {
|
|
|
LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_STD_EXCEPTION)
|
|
|
.arg(e.what());
|
|
|
} catch (...) {
|
|
|
LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_EXCEPTION);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void
|
|
|
Dhcpv6Srv::processPacketAndSendResponse(Pkt6Ptr& query, Pkt6Ptr& rsp) {
|
|
|
processPacket(query, rsp);
|
|
|
if (!rsp) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
CalloutHandlePtr callout_handle = getCalloutHandle(query);
|
|
|
processPacketBufferSend(callout_handle, rsp);
|
|
|
}
|
|
|
```
|
|
|
|
|
|
The server packet map is defined as:
|
|
|
|
|
|
```
|
|
|
// eg: server 4
|
|
|
|
|
|
std::unordered_map pkt_map_<ClientTxdAddressUUID, std::pair<Pkt4Ptr, Pkt4Ptr>>;
|
|
|
|
|
|
// eg: server 6
|
|
|
|
|
|
std::unordered_map pkt_map_<ClientTxdAddressUUID, std::pair<Pkt6Ptr, Pkt6Ptr>>;
|
|
|
|
|
|
```
|
|
|
|
|
|
The sendPacket function called from processPacketBufferSend calls in turn the IfaceMgr send function.
|
|
|
|
|
|
*As this might also be a bottle-neck, this performance enhancement is not discussed in this document.*
|
|
|
|
|
|
##### Handling High Availability parked packets
|
|
|
|
|
|
```
|
|
|
// eg: server 4
|
|
|
|
|
|
void
|
|
|
Dhcpv4Srv::processPacket(Pkt4Ptr& query, Pkt4Ptr& rsp, bool allow_packet_park) {
|
|
|
...
|
|
|
// PARKING SPOT after leases4_committed hook point.
|
|
|
CalloutHandlePtr callout_handle = getCalloutHandle(query);
|
|
|
if (packet_park) {
|
|
|
LOG_DEBUG(hooks_logger, DBG_DHCP4_HOOKS,
|
|
|
DHCP4_HOOK_LEASES4_COMMITTED_PARK)
|
|
|
.arg(query->getLabel());
|
|
|
|
|
|
// Park the packet. The function we bind here will be executed when the hook
|
|
|
// library unparks the packet.
|
|
|
HooksManager::park("leases4_committed", query,
|
|
|
[this, callout_handle, query, rsp]() mutable {
|
|
|
ThreadPool::WorkItemCallBack call_back =
|
|
|
std::bind(&Dhcpv4Srv::processPacketSendResponseNoThrow, this, callout_handle, query, rsp);
|
|
|
pkt_thread_pool_.add(call_back);
|
|
|
});
|
|
|
|
|
|
// If we have parked the packet, let's reset the pointer to the
|
|
|
// response to indicate to the caller that it should return, as
|
|
|
// the packet processing will continue via the callback.
|
|
|
rsp.reset();
|
|
|
|
|
|
} else {
|
|
|
processPacketPktSend(callout_handle, query, rsp);
|
|
|
}
|
|
|
...
|
|
|
}
|
|
|
|
|
|
Dhcpv4Srv::processPacketSendResponseNoThrow(hooks::CalloutHandlePtr& callout_handle,
|
|
|
Pkt4Ptr& query, Pkt4Ptr& rsp) {
|
|
|
try {
|
|
|
processPacketPktSend(callout_handle, query, rsp);
|
|
|
processPacketBufferSend(callout_handle, rsp);
|
|
|
} catch (const std::exception& e) {
|
|
|
LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_STD_EXCEPTION)
|
|
|
.arg(e.what());
|
|
|
} catch (...) {
|
|
|
LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_EXCEPTION);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// eg: server 6
|
|
|
|
|
|
void
|
|
|
Dhcpv6Srv::processPacket(Pkt6Ptr& query, Pkt6Ptr& rsp) {
|
|
|
...
|
|
|
// PARKING SPOT after leases6_committed hook point.
|
|
|
CalloutHandlePtr callout_handle = getCalloutHandle(query);
|
|
|
if (packet_park) {
|
|
|
LOG_DEBUG(hooks_logger, DBG_DHCP6_HOOKS,
|
|
|
DHCP6_HOOK_LEASES6_COMMITTED_PARK)
|
|
|
.arg(query->getLabel());
|
|
|
|
|
|
// Park the packet. The function we bind here will be executed when the hook
|
|
|
// library unparks the packet.
|
|
|
HooksManager::park("leases6_committed", query,
|
|
|
[this, callout_handle, query, rsp]() mutable {
|
|
|
ThreadPool::WorkItemCallBack call_back =
|
|
|
std::bind(&Dhcpv6Srv::processPacketSendResponseNoThrow, this, callout_handle, query, rsp);
|
|
|
pkt_thread_pool_.add(call_back);
|
|
|
});
|
|
|
|
|
|
// If we have parked the packet, let's reset the pointer to the
|
|
|
// response to indicate to the caller that it should return, as
|
|
|
// the packet processing will continue via the callback.
|
|
|
rsp.reset();
|
|
|
|
|
|
} else {
|
|
|
processPacketPktSend(callout_handle, query, rsp);
|
|
|
}
|
|
|
...
|
|
|
}
|
|
|
|
|
|
Dhcpv6Srv::processPacketSendResponseNoThrow(hooks::CalloutHandlePtr& callout_handle,
|
|
|
Pkt6Ptr& query, Pkt6Ptr& rsp) {
|
|
|
try {
|
|
|
processPacketPktSend(callout_handle, query, rsp);
|
|
|
processPacketBufferSend(callout_handle, rsp);
|
|
|
} catch (const std::exception& e) {
|
|
|
LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_STD_EXCEPTION)
|
|
|
.arg(e.what());
|
|
|
} catch (...) {
|
|
|
LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_EXCEPTION);
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
The callback function called after un-parking the packet will create an additional thread pool task which will handle only the remaining part of the packet processing.
|
|
|
|
|
|
##### Handling race conditions
|
|
|
The initial implementation does not need using transactions when performing lease updates, so it will not be added while implementing this feature, but it will be fully supported by the proposed design and can be added at a later time if needed.
|
|
|
|
|
|
Initial implementation will consist in dropping requests which might lead to race conditions in the database back-end.
|
|
|
|
|
|
This can be achieved by storing information related to requested address for each received packet and dropping requests that require the same resource that other packet being processed have already requested.
|
|
|
|
|
|
There are several types of races that need to be handled:
|
|
|
* handling multiple requests from the same client
|
|
|
* handling multiple requests that request/require same resource (IP address)
|
|
|
|
|
|
The first race avoidance mechanism can be implemented by using ClientHandler class which will verify if a specific request from a specific client is already processed and queue the packet back in the thread pool.
|
|
|
|
|
|
```
|
|
|
// eg: server 4
|
|
|
|
|
|
void
|
|
|
Dhcpv4Srv::processDhcp4Query(Pkt4Ptr& query, Pkt4Ptr& rsp,
|
|
|
bool allow_packet_park) {
|
|
|
// Create a client race avoidance RAII handler.
|
|
|
ClientHandler client_handler;
|
|
|
|
|
|
// Check for lease modifier queries from the same client being processed.
|
|
|
if (MultiThreadingMgr::instance().getMode() &&
|
|
|
((query->getType() == DHCPDISCOVER) ||
|
|
|
(query->getType() == DHCPREQUEST) ||
|
|
|
(query->getType() == DHCPRELEASE) ||
|
|
|
(query->getType() == DHCPDECLINE))) {
|
|
|
ContinuationPtr cont =
|
|
|
makeContinuation(std::bind(&Dhcpv4Srv::processDhcp4QueryAndSendResponse,
|
|
|
this, query, rsp, allow_packet_park));
|
|
|
if (!client_handler.tryLock(query, cont)) {
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
...
|
|
|
}
|
|
|
|
|
|
void
|
|
|
Dhcpv4Srv::processDhcp4QueryAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp,
|
|
|
bool allow_packet_park) {
|
|
|
try {
|
|
|
processDhcp4Query(query, rsp, allow_packet_park);
|
|
|
if (!rsp) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
CalloutHandlePtr callout_handle = getCalloutHandle(query);
|
|
|
processPacketBufferSend(callout_handle, rsp);
|
|
|
} catch (const std::exception& e) {
|
|
|
LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_STD_EXCEPTION)
|
|
|
.arg(e.what());
|
|
|
} catch (...) {
|
|
|
LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_EXCEPTION);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// eg: server 6
|
|
|
|
|
|
void
|
|
|
Dhcpv6Srv::processDhcp6Query(Pkt6Ptr& query, Pkt6Ptr& rsp) {
|
|
|
// Create a client race avoidance RAII handler.
|
|
|
ClientHandler client_handler;
|
|
|
|
|
|
// Check for lease modifier queries from the same client being processed.
|
|
|
if (MultiThreadingMgr::instance().getMode() &&
|
|
|
((query->getType() == DHCPV6_SOLICIT) ||
|
|
|
(query->getType() == DHCPV6_REQUEST) ||
|
|
|
(query->getType() == DHCPV6_RENEW) ||
|
|
|
(query->getType() == DHCPV6_REBIND) ||
|
|
|
(query->getType() == DHCPV6_RELEASE) ||
|
|
|
(query->getType() == DHCPV6_DECLINE))) {
|
|
|
ContinuationPtr cont =
|
|
|
makeContinuation(std::bind(&Dhcpv6Srv::processDhcp6QueryAndSendResponse,
|
|
|
this, query, rsp));
|
|
|
if (!client_handler.tryLock(query, cont)) {
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
...
|
|
|
}
|
|
|
|
|
|
void
|
|
|
Dhcpv6Srv::processDhcp6QueryAndSendResponse(Pkt6Ptr& query, Pkt6Ptr& rsp) {
|
|
|
try {
|
|
|
processDhcp6Query(query, rsp);
|
|
|
if (!rsp) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
CalloutHandlePtr callout_handle = getCalloutHandle(query);
|
|
|
processPacketBufferSend(callout_handle, rsp);
|
|
|
} catch (const std::exception& e) {
|
|
|
LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_STD_EXCEPTION)
|
|
|
.arg(e.what());
|
|
|
} catch (...) {
|
|
|
LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_EXCEPTION);
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
The second race avoidance consists in using ResourceHandler class to avoid parallel operations on the same address.
|
|
|
|
|
|
```
|
|
|
...
|
|
|
// Check if the resource is busy i.e. can be being allocated
|
|
|
// by another thread to another client.
|
|
|
ResourceHandler resource_handler;
|
|
|
if (MultiThreadingMgr::instance().getMode() &&
|
|
|
!resource_handler.tryLock(candidate)) {
|
|
|
// Don't allocate.
|
|
|
continue;
|
|
|
}
|
|
|
...
|
|
|
```
|
|
|
|
|
|
To prevent the reclamation process to reclaim a lease while a client packet is using it, RW locks will be used.
|
|
|
|
|
|
To enable transactions for packet processing, using ThreadResource or thread_local variables will be required.
|
|
|
|
|
|
The current code can use database back-end transactions when performing actions on host or configuration back-end databases. The same should be applied for lease database, as this will take care of any race condition related to lease allocation and will guarantee a consistent internal state of the server.
|
|
|
|
|
|
As each thread will use it's own connection and each packet transaction will be done on a separate connection, the queries related to each transaction will be independent.
|
|
|
|
|
|
Opening a transaction for each packet processing is safe and easy to do. Is is robust and less error-prone as the new features will seamlessly integrate with the multi-treading implementation and the transactions will handle any new queries added on top of the initial implementation.
|
|
|
|
|
|
This will also support full transaction rollback on all operations performed by a specific thread, as the connection is explicitly used by only one thread.
|
|
|
|
|
|
##### Configuration options
|
|
|
- each server should be able to configure the following parameters (as described at the beginning of the document):
|
|
|
- enabling the multi threading packet processing
|
|
|
- configuring the number of threads used to process packets
|
|
|
- configuring the size of the packet queue
|
|
|
|
|
|
```
|
|
|
// eg: server v4
|
|
|
|
|
|
"Dhcp4": {
|
|
|
...
|
|
|
"multi-threading": {
|
|
|
"enable-multi-threading": true,
|
|
|
"thread-pool-size": 64,
|
|
|
"packet-queue-size": 256
|
|
|
},
|
|
|
...
|
|
|
}
|
|
|
|
|
|
// eg: server v6
|
|
|
|
|
|
"Dhcp6": {
|
|
|
...
|
|
|
"multi-threading": {
|
|
|
"enable-multi-threading": true,
|
|
|
"thread-pool-size": 64,
|
|
|
"packet-queue-size": 256
|
|
|
},
|
|
|
...
|
|
|
}
|
|
|
```
|
|
|
|
|
|
##### Supported back-ends
|
|
|
All currently available back-ends will support multi-threading.
|
|
|
|
|
|
From the design and the origin of the problem, probably the best performance improvement will be observed when using remote databases, as using Memfile will cause threads to wait for synchronization with the ram and/or disk.
|
|
|
|
|
|
The Memfile back-end will, most likely use locks to address synchronization between threads, but it should still have better performance than running in single-threaded mode.
|
|
|
|
|
|
The priority of the multi-threading support for back-ends will be:
|
|
|
* MySql
|
|
|
* PgSql
|
|
|
* Memfile
|
|
|
* Cassandra
|
|
|
|
|
|
##### Hook points and hook libraries
|
|
|
The HooksManager must be able to create an individual (if it does not already) context for each packet, and must be also thread safe. Each hook point could access non thread-safe objects through the aid of the singletons provided by the server code.
|
|
|
|
|
|
Just as the case of LeaseMgr and HostMgr, variables must be created on the stack, so all static function variables must be removed.
|
|
|
|
|
|
Code using static function variables:
|
|
|
```
|
|
|
// eg:
|
|
|
|
|
|
static CalloutHandlePtr callout_handle;
|
|
|
if (!callout_handle) {
|
|
|
callout_handle = HooksManager::createCalloutHandle();
|
|
|
}
|
|
|
```
|
|
|
|
|
|
must create variables on the stack:
|
|
|
|
|
|
```
|
|
|
// eg:
|
|
|
|
|
|
CalloutHandlePtr callout_handle = HooksManager::createCalloutHandle();
|
|
|
```
|
|
|
|
|
|
or create thread_local variable:
|
|
|
|
|
|
```
|
|
|
thread_local CalloutHandlePtr callout_handle = HooksManager::createCalloutHandle();
|
|
|
```
|
|
|
|
|
|
Because most of the headers are exported to the install directory of the server, the hook library owner/developer has no restrictions regarding the way functions are called and what objects are modified during any hook point.
|
|
|
|
|
|
There are 2 approaches to this problem.
|
|
|
|
|
|
The first would be:
|
|
|
- document the list of thread safe classes or the list of non thread safe classes (whichever is easier to manage)
|
|
|
- document the restrictions the owner/developer of the library has when calling internal server code
|
|
|
|
|
|
If the hooks are design to change internal, non thread safe objects, it would be the responsibility of the owner/developer of the library to ensure the right behavior of the server.
|
|
|
|
|
|
This is also the case with the current implementation, as the hooks can create multiple threads which can end up corrupting the internal state of the server (race conditions, deadlocks and other related issues).
|
|
|
|
|
|
The other approach would be:
|
|
|
- acquire a lock on each hook point, so that all actions executed inside these functions are thread safe
|
|
|
|
|
|
This introduces several problems:
|
|
|
- performance impact regardless if the hook libraries do or do not change the internal state of the server
|
|
|
- increased probability for a deadlock if there are other class functions with their own locks and the basic rules of parallel programming are not followed (as for the first approach, this is the responsibility of the hook library owner/developer)
|
|
|
|
|
|
As the first approach has fewer disadvantages and it is easier to implement, it should be the right one to choose.
|
|
|
|
|
|
Each hook library should also have a function defined which instructs the server if the library is thread safe and can be used with the multi-threading feature enabled. If there is not such a function, or if the result returned by the function is negative, then the library will not be loaded when the multi-threading feature is enabled, or the server can call all hooks corresponding to this hook library using locks (having performance impact).
|
|
|
|
|
|
```
|
|
|
// eg:
|
|
|
|
|
|
unsigned char multi_threading_compatible();
|
|
|
```
|
|
|
|
|
|
The server code should also provide a function to be able to start and stop the packet thread pool if the respective hook point/command does any run-time configuration change. This function should not be available (should throw) if called from within a hook point inside a function related to the packet processing as this would cause a deadlock.
|
|
|
|
|
|
```
|
|
|
// eg:
|
|
|
|
|
|
void startPktProcessing();
|
|
|
|
|
|
void stopPktProcessing();
|
|
|
```
|
|
|
|
|
|
The 'util' library should provide a header which provides get and set means to a global state (variable) of the process indicating whether the multi-threading mode is used.
|
|
|
|
|
|
```
|
|
|
// eg: multi_threading_mgr.h
|
|
|
|
|
|
class MultiThreadingMgr : public boost::noncopyable {
|
|
|
public:
|
|
|
static MultiThreadingMgr& instance() {
|
|
|
static MultiThreadningMgr manager;
|
|
|
return manager;
|
|
|
}
|
|
|
|
|
|
void setMode(bool enabled) {
|
|
|
enabled_ = enabled;
|
|
|
}
|
|
|
|
|
|
bool getMode() {
|
|
|
return enabled_;
|
|
|
}
|
|
|
|
|
|
protected:
|
|
|
MultiThreadingMgr() : enabled_(false) {};
|
|
|
|
|
|
virtual ~MultiThreadingMgr();
|
|
|
|
|
|
private:
|
|
|
bool enabled_;
|
|
|
};
|
|
|
```
|
|
|
|
|
|
##### Sanity checks at run time
|
|
|
Some classes will be considered safe to be used without locking even when the parallel processing is accessing the member functions (none of the functions that are actually modifying the state are called). It will be necessary to update the functions that modify the internal state to just log an error if they are called while the context is not safe (eg: while processing packets).
|
|
|
|
|
|
```
|
|
|
// eg:
|
|
|
|
|
|
if (getLockState()) {
|
|
|
LOG_ERROR(xxx_logger, INVALID_ACTION_VIOLATING_INTERNAL_SERVER_STATE).arg(__PRETTY_FUNCTION__);
|
|
|
return;
|
|
|
}
|
|
|
```
|
|
|
|
|
|
There should be a function called for the instances of these classes whenever the code exits reconfiguration process and is ready to perform packet processing and also when entering such a process.
|
|
|
|
|
|
```
|
|
|
// eg: on exit configuration process
|
|
|
|
|
|
MultiThreading::instance().setMode(CfgMgr::instance().getCurrentCfg()->getServerThreadCount() != 0);
|
|
|
|
|
|
CfgMgr::instance().lockInternalState();
|
|
|
LibDHCP::lockInternalState();
|
|
|
HooksManager::getHooksManager().lockInternalState();
|
|
|
CalloutManager::lockInternalState();
|
|
|
IfaceMgr::instance().lockInternalState();
|
|
|
|
|
|
// eg: on enter configuration process
|
|
|
|
|
|
CfgMgr::instance().unlockInternalState();
|
|
|
LibDHCP::unlockInternalState();
|
|
|
HooksManager::getHooksManager().unlockInternalState();
|
|
|
CalloutManager::unlockInternalState();
|
|
|
IfaceMgr::instance().unlockInternalState();
|
|
|
|
|
|
MultiThreading::instance().setMode(false);
|
|
|
```
|
|
|
|
|
|
Another option would be to create a ConfigCriticalSection RAII class which sets a flag indicating that the respective section is modifying the configuration, and any other processing threads should throw an exception if run in parallel.
|
|
|
|
|
|
```
|
|
|
/// @brief specialization of @ref enter for @ref CriticalSectionBase using
|
|
|
/// @ref ConfigSection
|
|
|
template<>
|
|
|
void
|
|
|
CriticalSectionBase<ConfigSection>::enter() {
|
|
|
if (!CriticalSectionBase<ConfigSection>::getCriticalSectionCount()) {
|
|
|
MultiThreadingMgr::instance().setReadOnlyConfig(false);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/// @brief specialization of @ref leave for @ref CriticalSectionBase using
|
|
|
/// @ref ConfigSection
|
|
|
template<>
|
|
|
void
|
|
|
CriticalSectionBase<ConfigSection>::leave() {
|
|
|
if (!CriticalSectionBase<ConfigSection>::getCriticalSectionCount()) {
|
|
|
MultiThreadingMgr::instance().setReadOnlyConfig(true);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
/// @throw @ref InvalidOperation if the configuration is read only.
|
|
|
ReadOnlyConfigProbe() {
|
|
|
if (MultiThreadingMgr::instance().getReadOnlyConfig()) {
|
|
|
isc_throw(isc::InvalidOperation,
|
|
|
"configuration modification is not allowed.");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
...
|
|
|
// Check that configuration changes are permitted.
|
|
|
ReadOnlyConfigProbe ck;
|
|
|
...
|
|
|
```
|
|
|
|
|
|
Some of the classes which should implement such a safety mechanism are:
|
|
|
* CfgMgr
|
|
|
* LibDHCP
|
|
|
* HooksManager
|
|
|
* CalloutManager
|
|
|
* IfaceMgr
|
|
|
|
|
|
There should be a discussion regarding what actions are and should be allowed to be performed at run time by the hook libraries. If this is documented, then the hook developer should be aware of what functions can the hook call within each hook point. Also this will be reflected in the server logs as an invalid action performed by the hook.
|
|
|
|
|
|
---
|
|
|
##### Things to consider and to investigate:
|
|
|
- enforce restrictions and document classes exposed by 'exported' headers in install include directory
|
|
|
- investigate hooks and hook points:
|
|
|
* flex_id
|
|
|
* radius
|
|
|
* forensic_log
|
|
|
* high_availability
|
|
|
* user_chk
|
|
|
|
|
|
Some other approaches mentioned were:
|
|
|
- split each packet processing on a pipeline in separate and parallel independent stages, however this introduces some problems like:
|
|
|
* slower on a machine with fewer cores than number of pipeline stages
|
|
|
* can not scale up if the number of CPUs exceeds the length of the pipeline
|
|
|
|
|
|
---
|
|
|
##### Implementing locks which do not have performance impact in single-threaded mode
|
|
|
There are several ways to design classes which use locks when multi-threading feature is enabled, but have low or no impact on performance when used in single-threaded mode.
|
|
|
* implement a mutex base class interface which can be derived in:
|
|
|
- a dummy mutex implementation which does nothing on lock and unlock when using single-threaded mode
|
|
|
- a wrapper mutex implementation over std::mutex when using multi-threaded mode
|
|
|
|
|
|
```
|
|
|
struct DummyLock {
|
|
|
void lock();
|
|
|
void unlock();
|
|
|
};
|
|
|
|
|
|
class MutexInterface {
|
|
|
public:
|
|
|
virtual void lock() = 0;
|
|
|
virtual void unlock() = 0;
|
|
|
};
|
|
|
|
|
|
template <typename Lock>
|
|
|
class MutexImpl : public MutexInterface {
|
|
|
public:
|
|
|
void lock() {
|
|
|
mutex_.lock();
|
|
|
}
|
|
|
void unlock() {
|
|
|
mutex_.unlock();
|
|
|
}
|
|
|
private:
|
|
|
Lock mutex_;
|
|
|
};
|
|
|
|
|
|
typedef MutexImpl<DummyLock> DummyMutex;
|
|
|
|
|
|
typedef MutexImpl<std::mutex> StandardMutex;
|
|
|
```
|
|
|
|
|
|
*** Limitations:
|
|
|
|
|
|
- in reconfiguration event, all mutex members of all classes must be re-instantiated with the proper type
|
|
|
|
|
|
* implement wrapper functions for the lock_guard, unique_lock, scoped_lock which will lock the mutex only in multi-threaded mode (or if using pointers, only if the mutex argument is not null)
|
|
|
|
|
|
```
|
|
|
// eg:
|
|
|
|
|
|
class LockGuard {
|
|
|
public:
|
|
|
LockGuard(Lock& lock) : lk_(lock) {
|
|
|
if (MultiThreading::instance().getMode()) {
|
|
|
lk_.lock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
~LockGuard() {
|
|
|
if (MultiThreading::instance().getMode()) {
|
|
|
lk_.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
LockGuard(const LockGuard&) = delete;
|
|
|
LockGuard& operator=(const LockGuard&) = delete;
|
|
|
|
|
|
LockGuard(LockGuard&&) = delete;
|
|
|
LockGuard& operator=(LockGuard&&) = delete;
|
|
|
|
|
|
private:
|
|
|
Lock& lk_;
|
|
|
};
|
|
|
```
|
|
|
|
|
|
or
|
|
|
|
|
|
```
|
|
|
// eg:
|
|
|
|
|
|
template <typename Lock>
|
|
|
class LockGuard {
|
|
|
public:
|
|
|
LockGuard(Lock* lock) : lk_(lock) {
|
|
|
if (lk_) {
|
|
|
lk_->lock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
~LockGuard() {
|
|
|
if (lk_) {
|
|
|
lk_->unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
LockGuard(const LockGuard&) = delete;
|
|
|
LockGuard& operator=(const LockGuard&) = delete;
|
|
|
|
|
|
LockGuard(LockGuard&&) = delete;
|
|
|
LockGuard& operator=(LockGuard&&) = delete;
|
|
|
|
|
|
private:
|
|
|
Lock* lk_;
|
|
|
};
|
|
|
|
|
|
class Foo {
|
|
|
public:
|
|
|
Foo () {
|
|
|
if (MultiThreading::instance().getMode()) {
|
|
|
mutex_ = make_shared<mutex>();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void f() {
|
|
|
LockGuard<std::mutex> lock(mutex_);
|
|
|
}
|
|
|
|
|
|
private:
|
|
|
shared_ptr<std::mutex> mutex_;
|
|
|
}
|
|
|
```
|
|
|
|
|
|
*** The second implementation can be used in other scenarios beside code affected by the multi-threaded feature
|
|
|
|
|
|
*** Limitations:
|
|
|
|
|
|
- in reconfiguration event, all mutex members of all classes must be re-instantiated
|
|
|
|
|
|
* all classes using locks in multi-threaded mode can be defined as using template lock type
|
|
|
- when using single-threaded mode, the template class does noting
|
|
|
- when using multi-threaded mode, the template class does lock on the mutex
|
|
|
|
|
|
```
|
|
|
// eg:
|
|
|
|
|
|
struct DummyLock {
|
|
|
void lock();
|
|
|
void unlock();
|
|
|
};
|
|
|
|
|
|
template <typename Lock>
|
|
|
class Foo {
|
|
|
public:
|
|
|
void f() {
|
|
|
std::unique_lock<Lock> lk(mutex_);
|
|
|
...
|
|
|
}
|
|
|
|
|
|
private:
|
|
|
Lock mutex_;
|
|
|
}
|
|
|
|
|
|
void bar () {
|
|
|
...
|
|
|
Foo<std::mutex> mt;
|
|
|
mt.f();
|
|
|
...
|
|
|
Foo<DummyLock> st;
|
|
|
st.f();
|
|
|
...
|
|
|
}
|
|
|
```
|
|
|
|
|
|
*** Limitations:
|
|
|
|
|
|
- in reconfiguration event, all classes must be re-instantiated with proper template class
|
|
|
- increased binary size
|
|
|
|
|
|
* instantiate both single-threaded dummy lock and multi-threaded standard lock and switch between them at run-time
|
|
|
|
|
|
```
|
|
|
// eg:
|
|
|
|
|
|
class Mutex {
|
|
|
public:
|
|
|
void lock() {
|
|
|
data_[MultiThreading::instance().getMode()]->lock();
|
|
|
}
|
|
|
void unlock() {
|
|
|
data_[MultiThreading::instance().getMode()]->unlock();
|
|
|
}
|
|
|
|
|
|
private:
|
|
|
struct DummyLock {
|
|
|
void lock();
|
|
|
void unlock();
|
|
|
};
|
|
|
|
|
|
class MutexInterface {
|
|
|
public:
|
|
|
virtual void lock() = 0;
|
|
|
virtual void unlock() = 0;
|
|
|
};
|
|
|
|
|
|
template <typename Lock>
|
|
|
class MutexImpl : public MutexInterface {
|
|
|
public:
|
|
|
void lock() {
|
|
|
mutex_.lock();
|
|
|
}
|
|
|
void unlock() {
|
|
|
mutex_.unlock();
|
|
|
}
|
|
|
private:
|
|
|
Lock mutex_;
|
|
|
};
|
|
|
|
|
|
MutexImpl<DummyLock> dummy_;
|
|
|
|
|
|
MutexImpl<std::mutex> mutex_;
|
|
|
|
|
|
std::vector<bool, MutexInterface*> data_ = {&dummy_, &mutex_};
|
|
|
}
|
|
|
```
|
|
|
|
|
|
*** Limitations:
|
|
|
|
|
|
- doing improper reconfiguration would lead to inconsistent state of the multi-threaded lock
|
|
|
- harder to debug
|
|
|
---
|
|
|
|
|
|
**Any suggestions and comments are appreciated!**
|
|
|
|
|
|
## Data race avoidance proposal
|
|
|
|
|
|
Problem statement: the multi-threaded server should behave the same than the single-threaded / sequential one from an external observer point of view but the current lease management shows a lot of races: a lease is read, the result is used to take a decision and a lease is written: between the read and the write another thread can modify the lease leading to an incorrect behavior.
|
|
|
|
|
|
Goal: avoid all possible data races without introducing resource contention i.e. locks hold during long lifetimes.
|
|
|
|
|
|
What races: all races follow the same schema: read then write. Keys for reads are hardware address, client identifier or DUID, and address (which includes prefix and is the primary key in SQL).
|
|
|
|
|
|
Standard solution: the standard solution is to lock corresponding keys so there is no possible change between the read and the write, or with other words this makes read+write pairs sequential. Of course if this avoids races it introduces contention, i.e. the "second" thread waiting to get the lock does nothing.
|
|
|
|
|
|
Proposed solution: instead of locking the keys a record of what is currently being processed is kept. Here "being processed" includes packets in parking lots when they were parked between the read and the write, not only DHCP server threads. The record structure (likely a multi index) uses at least the same keys than for lease fetching.
|
|
|
|
|
|
Recording details: to simplify coding the record should be created in a RAII way when the processing begins. There should be an easy way to add or modify the address part as in some parts of the allocation engine even the wanted address can change... When a key is busy the only constraint is to not continue: possible actions is to drop the query (should require a new exception to not confuse with negative acknowledgment), to queue the packet context to the thread holding the key (easy way to enforce sequential processing), mixed both (i.e. queue of one), general queue of packet contexts waiting for a key to become no longer busy, etc.
|
|
|
Note that this record (and only it) must be thread safe (e.g. by access holding a guard lock).
|
|
|
|
|
|
Busy address: in the look for a free resource this record will make "busy resources" to be rejected with a call to the backend.
|
|
|
|
|
|
Performance: there should be a small overhead to maintain the record but at the other hands trivial duplicates are dropped. Without conflicts it is the main source of performance decrease. With conflicts only conflicting processing are performed sequentially so both the behavior is correct and performance nearly the same with a single thread. Not only this is acceptable but there is no easy way to perform better in such cases.
|
|
|
|
|
|
High Availability compatibility: first the parking lot used by the HA hook is "lease4_committed" and "lease6_committed" so as the name suggest it is after the processing phase where a race can occur. With a correct setup of the load balancing mode there is no possible race between the two HA peers (clients and pools are partitioned between peers) so lease updates performed by the ASIO I/O service never conflict with the DHCP service performed by members of the thread pool. Now it is not a bad idea to check but IMHO there is no need to protect each update by adding record. |