Commit 1d32ebae authored by Razvan Becheriu's avatar Razvan Becheriu

[#892] handle parking

parent 893d4290
......@@ -816,22 +816,24 @@ Dhcpv4Srv::run_one() {
Pkt4Ptr rsp;
try {
bool read_pkt = true;
// Do not read more packets from socket if there are enough
// packets to be processed in the packet thread pool queue
const int max_queued_pkt_per_thread = Dhcpv4Srv::maxThreadQueueSize();
const auto queue_full_wait = std::chrono::milliseconds(1);
const int max_queue_size = Dhcpv4Srv::maxThreadQueueSize();
const int thread_count = Dhcpv4Srv::threadCount();
size_t pkt_queue_size = pkt_thread_pool_.count();
if (pkt_queue_size >= Dhcpv4Srv::threadCount() *
max_queued_pkt_per_thread) {
return;
if (thread_count && (pkt_queue_size >= thread_count * max_queue_size)) {
read_pkt = false;
}
// Set select() timeout to 1s. This value should not be modified
// because it is important that the select() returns control
// frequently so as the IOService can be polled for ready handlers.
uint32_t timeout = 1;
query = receivePacket(timeout);
if (read_pkt) {
// Set select() timeout to 1s. This value should not be modified
// because it is important that the select() returns control
// frequently so as the IOService can be polled for ready handlers.
uint32_t timeout = 1;
query = receivePacket(timeout);
}
// Log if packet has arrived. We can't log the detailed information
// about the DHCP message because it hasn't been unpacked/parsed
......@@ -1218,8 +1220,15 @@ Dhcpv4Srv::processPacket(Pkt4Ptr& query, Pkt4Ptr& rsp, bool allow_packet_park) {
// library unparks the packet.
HooksManager::park("leases4_committed", query,
[this, callout_handle, query, rsp]() mutable {
processPacketPktSend(callout_handle, query, rsp);
processPacketBufferSend(callout_handle, rsp);
if (Dhcpv4Srv::threadCount()) {
ThreadPool::WorkItemCallBack call_back =
std::bind(&Dhcpv4Srv::processPacketSendResponseNoThrow,
this, callout_handle, query, rsp);
pkt_thread_pool_.add(call_back);
} else {
processPacketPktSend(callout_handle, query, rsp);
processPacketBufferSend(callout_handle, rsp);
}
});
// If we have parked the packet, let's reset the pointer to the
......@@ -1232,6 +1241,20 @@ Dhcpv4Srv::processPacket(Pkt4Ptr& query, Pkt4Ptr& rsp, bool allow_packet_park) {
}
}
void
Dhcpv4Srv::processPacketSendResponseNoThrow(hooks::CalloutHandlePtr& callout_handle,
Pkt4Ptr& query, Pkt4Ptr& rsp) {
try {
processPacketPktSend(callout_handle, query, rsp);
processPacketBufferSend(callout_handle, rsp);
} catch (const std::exception& e) {
LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_STD_EXCEPTION)
.arg(e.what());
} catch (...) {
LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_EXCEPTION);
}
}
void
Dhcpv4Srv::processPacketPktSend(hooks::CalloutHandlePtr& callout_handle,
Pkt4Ptr& query, Pkt4Ptr& rsp) {
......
......@@ -485,22 +485,24 @@ void Dhcpv6Srv::run_one() {
Pkt6Ptr rsp;
try {
bool read_pkt = true;
// Do not read more packets from socket if there are enough
// packets to be processed in the packet thread pool queue
const int max_queued_pkt_per_thread = Dhcpv6Srv::maxThreadQueueSize();
const auto queue_full_wait = std::chrono::milliseconds(1);
const int max_queue_size = Dhcpv6Srv::maxThreadQueueSize();
const int thread_count = Dhcpv6Srv::threadCount();
size_t pkt_queue_size = pkt_thread_pool_.count();
if (pkt_queue_size >= Dhcpv6Srv::threadCount() *
max_queued_pkt_per_thread) {
return;
if (thread_count && (pkt_queue_size >= thread_count * max_queue_size)) {
read_pkt = false;
}
// Set select() timeout to 1s. This value should not be modified
// because it is important that the select() returns control
// frequently so as the IOService can be polled for ready handlers.
uint32_t timeout = 1;
query = receivePacket(timeout);
if (read_pkt) {
// Set select() timeout to 1s. This value should not be modified
// because it is important that the select() returns control
// frequently so as the IOService can be polled for ready handlers.
uint32_t timeout = 1;
query = receivePacket(timeout);
}
// Log if packet has arrived. We can't log the detailed information
// about the DHCP message because it hasn't been unpacked/parsed
......@@ -977,8 +979,15 @@ Dhcpv6Srv::processPacket(Pkt6Ptr& query, Pkt6Ptr& rsp) {
// library unparks the packet.
HooksManager::park("leases6_committed", query,
[this, callout_handle, query, rsp]() mutable {
processPacketPktSend(callout_handle, query, rsp);
processPacketBufferSend(callout_handle, rsp);
if (Dhcpv6Srv::threadCount()) {
ThreadPool::WorkItemCallBack call_back =
std::bind(&Dhcpv6Srv::processPacketSendResponseNoThrow,
this, callout_handle, query, rsp);
pkt_thread_pool_.add(call_back);
} else {
processPacketPktSend(callout_handle, query, rsp);
processPacketBufferSend(callout_handle, rsp);
}
});
// If we have parked the packet, let's reset the pointer to the
......@@ -991,6 +1000,20 @@ Dhcpv6Srv::processPacket(Pkt6Ptr& query, Pkt6Ptr& rsp) {
}
}
void
Dhcpv6Srv::processPacketSendResponseNoThrow(hooks::CalloutHandlePtr& callout_handle,
Pkt6Ptr& query, Pkt6Ptr& rsp) {
try {
processPacketPktSend(callout_handle, query, rsp);
processPacketBufferSend(callout_handle, rsp);
} catch (const std::exception& e) {
LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_STD_EXCEPTION)
.arg(e.what());
} catch (...) {
LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_EXCEPTION);
}
}
void
Dhcpv6Srv::processPacketPktSend(hooks::CalloutHandlePtr& callout_handle,
Pkt6Ptr& query, Pkt6Ptr& rsp) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment