Commit 552e8267 authored by Michal Nowikowski's avatar Michal Nowikowski Committed by Michal Nowikowski

[#283,!135] Fixed request rate accuracy issue in perfdhcp introducing 2...

[#283,!135] Fixed request rate accuracy issue in perfdhcp introducing 2 threads (for sending and receiving).

Delays in sending and receiving (e.g. timeout is select) was causing time slips so sending new packets was always late.
The fix splits receiving into separate thread to not interfere with sending in main thread.

RateController was changed. Now it tracks actual request rate estimating it from the beginning of the test.
In every iteration of sending main loop it is checked if actual request rate is lower than expected.
In such case packets are sent immediatelly. This way actual request rate is always accurate.

The receiver thread receives packets from socket using select as before but do not have to worry about delaying sending.
Read packet are parsed and then passed to main sending thread for further processing.
This processing involves updating stats, matching with sent packets and issueing responses if necessary.

There have been removed some features that do not make sense after introducing these changes.
These includes: aggresivity and some custom statistics.
parent 4bbf912a
......@@ -25,6 +25,8 @@ libperfdhcp_la_SOURCES += pkt_transform.cc pkt_transform.h
libperfdhcp_la_SOURCES += rate_control.cc rate_control.h
libperfdhcp_la_SOURCES += stats_mgr.h
libperfdhcp_la_SOURCES += test_control.cc test_control.h
libperfdhcp_la_SOURCES += receiver.cc receiver.h
libperfdhcp_la_SOURCES += better_socket.cc better_socket.h
sbin_PROGRAMS = perfdhcp
perfdhcp_SOURCES = main.cc
......
// Copyright (C) 2012-2018 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include "better_socket.h"
#include <dhcp/iface_mgr.h>
#include <boost/foreach.hpp>
using namespace isc::dhcp;
namespace isc {
namespace perfdhcp {
BetterSocket::BetterSocket(const int socket) :
SocketInfo(asiolink::IOAddress("127.0.0.1"), 0, socket),
ifindex_(0), valid_(true) {
try {
initSocketData();
} catch (const Exception&) {
valid_ = false;
}
}
BetterSocket::~BetterSocket() {
IfacePtr iface = IfaceMgr::instance().getIface(ifindex_);
if (iface) {
iface->delSocket(sockfd_);
}
}
void
BetterSocket::initSocketData() {
BOOST_FOREACH(IfacePtr iface, IfaceMgr::instance().getIfaces()) {
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
if (s.sockfd_ == sockfd_) {
ifindex_ = iface->getIndex();
addr_ = s.addr_;
return;
}
}
}
isc_throw(BadValue, "interface for for specified socket "
"descriptor not found");
}
}
}
// Copyright (C) 2012-2018 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#pragma once
#include <dhcp/socket_info.h>
namespace isc {
namespace perfdhcp {
/// \brief Socket wrapper structure.
///
/// This is the wrapper that holds descriptor of the socket
/// used to run DHCP test. The wrapped socket is closed in
/// the destructor. This prevents resource leaks when
/// function that created the socket ends (normally or
/// when exception occurs). This structure extends parent
/// structure with new field ifindex_ that holds interface
/// index where socket is bound to.
struct BetterSocket : public dhcp::SocketInfo {
/// Interface index.
uint16_t ifindex_;
/// Is socket valid. It will not be valid if the provided socket
/// descriptor does not point to valid socket.
bool valid_;
/// \brief Constructor of socket wrapper class.
///
/// This constructor uses provided socket descriptor to
/// find the name of the interface where socket has been
/// bound to. If provided socket descriptor is invalid then
/// valid_ field is set to false;
///
/// \param socket socket descriptor.
BetterSocket(const int socket);
/// \brief Destructor of the socket wrapper class.
///
/// Destructor closes wrapped socket.
~BetterSocket();
private:
/// \brief Initialize socket data.
///
/// This method initializes members of the class that Interface
/// Manager holds: interface name, local address.
///
/// \throw isc::BadValue if interface for specified socket
/// descriptor does not exist.
void initSocketData();
};
}
}
......@@ -23,6 +23,7 @@
#include <stdint.h>
#include <unistd.h>
#include <fstream>
#include <thread>
#ifdef HAVE_OPTRESET
extern int optreset;
......@@ -136,7 +137,6 @@ CommandOptions::reset() {
localname_.clear();
is_interface_ = false;
preload_ = 0;
aggressivity_ = 1;
local_port_ = 0;
seeded_ = false;
seed_ = 0;
......@@ -155,6 +155,11 @@ CommandOptions::reset() {
v6_relay_encapsulation_level_ = 0;
generateDuidTemplate();
extra_opts_.clear();
if (std::thread::hardware_concurrency() == 1) {
single_thread_mode_ = true;
} else {
single_thread_mode_ = false;
}
}
bool
......@@ -221,8 +226,8 @@ CommandOptions::initialize(int argc, char** argv, bool print_cmd_line) {
// In this section we collect argument values from command line
// they will be tuned and validated elsewhere
while((opt = getopt(argc, argv, "hv46A:r:t:R:b:n:p:d:D:l:P:a:L:M:"
"s:iBc1T:X:O:o:E:S:I:x:W:w:e:f:F:")) != -1) {
while((opt = getopt(argc, argv, "hv46A:r:t:R:b:n:p:d:D:l:P:L:M:"
"s:iBc1T:X:O:o:E:S:I:x:W:w:e:f:F:g:")) != -1) {
stream << " -" << static_cast<char>(opt);
if (optarg) {
stream << " " << optarg;
......@@ -255,11 +260,6 @@ CommandOptions::initialize(int argc, char** argv, bool print_cmd_line) {
ipversion_ = 6;
break;
case 'a':
aggressivity_ = positiveInteger("value of aggressivity: -a<value>"
" must be a positive integer");
break;
case 'b':
check(base_.size() > 3, "-b<value> already specified,"
" unexpected occurrence of 5th -b<value>");
......@@ -337,6 +337,17 @@ CommandOptions::initialize(int argc, char** argv, bool print_cmd_line) {
" positive integer");
break;
case 'g': {
auto optarg_text = std::string(optarg);
if (optarg_text == "single") {
single_thread_mode_ = true;
} else if (optarg_text == "multi") {
single_thread_mode_ = false;
} else {
isc_throw(InvalidParameter, "value of thread mode (-g) '" << optarg << "' is wrong - should be '-g single' or '-g multi'");
}
break;
}
case 'h':
usage();
return (true);
......@@ -523,7 +534,7 @@ CommandOptions::initialize(int argc, char** argv, bool print_cmd_line) {
break;
default:
isc_throw(isc::InvalidParameter, "unknown command line option");
isc_throw(isc::InvalidParameter, "wrong command line option");
}
}
......@@ -866,6 +877,17 @@ CommandOptions::validate() const {
"use -I<ip-offset>");
check((!getMacListFile().empty() && base_.size() > 0),
"Can't use -b with -M option");
auto nthreads = std::thread::hardware_concurrency();
if (nthreads == 1 && isSingleThreaded() == false) {
std::cout << "WARNING: Currently system can run only 1 thread in parallel." << std::endl
<< "WARNING: Better results are achieved when run in single-threaded mode." << std::endl
<< "WARNING: To switch use -g single option." << std::endl;
} else if (nthreads > 1 && isSingleThreaded()) {
std::cout << "WARNING: Currently system can run more than 1 thread in parallel." << std::endl
<< "WARNING: Better results are achieved when run in multi-threaded mode." << std::endl
<< "WARNING: To switch use -g multi option." << std::endl;
}
}
void
......@@ -963,7 +985,6 @@ CommandOptions::printCommandLine() const {
if (preload_ != 0) {
std::cout << "preload=" << preload_ << std::endl;
}
std::cout << "aggressivity=" << aggressivity_ << std::endl;
if (getLocalPort() != 0) {
std::cout << "local-port=" << local_port_ << std::endl;
}
......@@ -1016,6 +1037,11 @@ CommandOptions::printCommandLine() const {
if (!server_name_.empty()) {
std::cout << "server=" << server_name_ << std::endl;
}
if (single_thread_mode_) {
std::cout << "single-thread-mode" << std::endl;
} else {
std::cout << "multi-thread-mode" << std::endl;
}
}
void
......@@ -1026,7 +1052,7 @@ CommandOptions::usage() const {
" [-F<release-rate>] [-t<report>] [-R<range>] [-b<base>]\n"
" [-n<num-request>] [-p<test-period>] [-d<drop-time>]\n"
" [-D<max-drop>] [-l<local-addr|interface>] [-P<preload>]\n"
" [-a<aggressivity>] [-L<local-port>] [-s<seed>] [-i] [-B]\n"
" [-L<local-port>] [-s<seed>] [-i] [-B] [-g<single/multi>]\n"
" [-W<late-exit-delay>]\n"
" [-c] [-1] [-M<mac-list-file>] [-T<template-file>]\n"
" [-X<xid-offset>] [-O<random-offset] [-E<time-offset>]\n"
......@@ -1054,8 +1080,6 @@ CommandOptions::usage() const {
"-1: Take the server-ID option from the first received message.\n"
"-4: DHCPv4 operation (default). This is incompatible with the -6 option.\n"
"-6: DHCPv6 operation. This is incompatible with the -4 option.\n"
"-a<aggressivity>: When the target sending rate is not yet reached,\n"
" control how many exchanges are initiated before the next pause.\n"
"-b<base>: The base mac, duid, IP, etc, used to simulate different\n"
" clients. This can be specified multiple times, each instance is\n"
" in the <type>=<value> form, for instance:\n"
......@@ -1079,6 +1103,10 @@ CommandOptions::usage() const {
" with the exchange rate (given by -r<rate>). Furthermore the sum of\n"
" this value and the release-rate (given by -F<rate) must be equal\n"
" to or less than the exchange rate.\n"
"-g Select thread mode: 'single' or 'multi'. In multi-thread mode packets\n"
" are received in separate thread. This allows better utilisation of CPUs."
" If more than 1 CPU is present then multi-thread mode is default otherwise"
" single-thread is default."
"-h: Print this help.\n"
"-i: Do only the initial part of an exchange: DO or SA, depending on\n"
" whether -6 is given.\n"
......
......@@ -238,11 +238,6 @@ public:
/// \return number of preload exchanges.
int getPreload() const { return preload_; }
/// \brief Returns aggressivity value.
///
/// \return aggressivity value.
int getAggressivity() const { return aggressivity_; }
/// \brief Returns local port number.
///
/// \return local port number.
......@@ -343,6 +338,11 @@ public:
/// @return container with options
const isc::dhcp::OptionCollection& getExtraOpts() const { return extra_opts_; }
/// \brief Check if single-threaded mode is enabled.
///
/// \return true if single-threaded mode is enabled.
bool isSingleThreaded() const { return single_thread_mode_; }
/// \brief Returns server name.
///
/// \return server name.
......@@ -574,9 +574,6 @@ private:
/// measurements.
int preload_;
/// Number of exchanges sent before next pause.
int aggressivity_;
/// Local port number (host endian)
int local_port_;
......@@ -646,6 +643,9 @@ private:
/// @brief Extra options to be sent in each packet.
isc::dhcp::OptionCollection extra_opts_;
/// @brief Option to switch modes between single-threaded and multi-threaded.
bool single_thread_mode_;
};
} // namespace perfdhcp
......
......@@ -33,9 +33,9 @@ main(int argc, char* argv[]) {
}
} catch(isc::Exception& e) {
ret_code = 1;
command_options.usage();
std::cerr << "Error parsing command line options: "
<< e.what() << std::endl;
command_options.usage();
if (diags.find('e') != std::string::npos) {
std::cerr << "Fatal error" << std::endl;
}
......
......@@ -47,7 +47,6 @@
<arg choice="opt" rep="norepeat"><option>-1</option></arg>
<arg choice="opt" rep="norepeat"><option>-4|-6</option></arg>
<arg choice="opt" rep="norepeat"><option>-A <replaceable class="parameter">encapsulation-level</replaceable></option></arg>
<arg choice="opt" rep="norepeat"><option>-a <replaceable class="parameter">aggressivity</replaceable></option></arg>
<arg choice="opt" rep="norepeat"><option>-b <replaceable class="parameter">base</replaceable></option></arg>
<arg choice="opt" rep="norepeat"><option>-B</option></arg>
<arg choice="opt" rep="norepeat"><option>-c</option></arg>
......@@ -57,6 +56,7 @@
<arg choice="opt" rep="norepeat"><option>-E <replaceable class="parameter">time-offset</replaceable></option></arg>
<arg choice="opt" rep="norepeat"><option>-f <replaceable class="parameter">renew-rate</replaceable></option></arg>
<arg choice="opt" rep="norepeat"><option>-F <replaceable class="parameter">release-rate</replaceable></option></arg>
<arg choice="opt" rep="norepeat"><option>-g <replaceable class="parameter">thread-mode</replaceable></option></arg>
<arg choice="opt" rep="norepeat"><option>-h</option></arg>
<arg choice="opt" rep="norepeat"><option>-i</option></arg>
<arg choice="opt" rep="norepeat"><option>-I <replaceable class="parameter">ip-offset</replaceable></option></arg>
......@@ -248,18 +248,6 @@
</listitem>
</varlistentry>
<varlistentry>
<term><option>-a <replaceable class="parameter">aggressivity</replaceable></option></term>
<listitem>
<para>
When the target sending rate is not yet reached,
control how many exchanges are initiated before the
next pause. This is a positive integer and defaults
to 1.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-b <replaceable class="parameter">basetype=value</replaceable></option></term>
<listitem>
......@@ -345,6 +333,18 @@
</listitem>
</varlistentry>
<varlistentry>
<term><option>-g <replaceable class="parameter">thread-mode</replaceable></option></term>
<listitem>
<para>
Thread operation mode can be either 'single' or 'multi'. In multi-thread mode packets
are received in separate thread. This allows better utilisation of CPUs.
In single CPU system it is better to run in 1 thread to avoid blocking of threads each other.
If more than 1 CPU is present in the system then multi-thread mode is default otherwise
single-thread is default.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-h</option></term>
......
......@@ -15,17 +15,11 @@ namespace perfdhcp {
using namespace boost::posix_time;
RateControl::RateControl()
: send_due_(currentTime()), last_sent_(currentTime()),
aggressivity_(1), rate_(0), late_sent_(false) {
: rate_(0), total_pkts_sent_count_(0) {
}
RateControl::RateControl(const int rate, const int aggressivity)
: send_due_(currentTime()), last_sent_(currentTime()),
aggressivity_(aggressivity), rate_(rate), late_sent_(false) {
if (aggressivity_ < 1) {
isc_throw(isc::BadValue, "invalid value of aggressivity "
<< aggressivity << ", expected value is greater than 0");
}
RateControl::RateControl(const int rate)
: rate_(rate), total_pkts_sent_count_(0) {
if (rate_ < 0) {
isc_throw(isc::BadValue, "invalid value of rate " << rate
<< ", expected non-negative value");
......@@ -34,48 +28,35 @@ RateControl::RateControl(const int rate, const int aggressivity)
uint64_t
RateControl::getOutboundMessageCount() {
if (total_pkts_sent_count_ == 0) {
start_time_ = currentTime();
total_pkts_sent_count_ = 1;
return 1;
}
// If rate is not limited, then each time send 1 packet.
if (getRate() == 0) {
return 1;
}
// We need calculate the due time for sending next set of messages.
updateSendDue();
// Estimate number of packets to sent. If we are behind of time we will
// try to catch up to upkeep request rate by sending more packets in one cycle.
auto now = currentTime();
time_period period(start_time_, now);
time_duration duration = period.length();
uint64_t should_sent_pkts_count = static_cast<double>(getRate()) / static_cast<double>(time_duration::ticks_per_second()) * duration.ticks();
if (should_sent_pkts_count <= total_pkts_sent_count_) {
return 0;
}
auto pending_pkts_count = should_sent_pkts_count - total_pkts_sent_count_;
// Get current time. If we are behind due time, we have to calculate
// how many messages to send to catch up with the rate.
ptime now = currentTime();
if (now >= send_due_) {
// Reset number of exchanges.
uint64_t due_exchanges = 0;
// If rate is specified from the command line we have to
// synchronize with it.
if (getRate() != 0) {
time_period period(send_due_, now);
time_duration duration = period.length();
// due_factor indicates the number of seconds that
// sending next chunk of packets will take.
double due_factor =
static_cast<double>(duration.fractional_seconds()) /
static_cast<double>(time_duration::ticks_per_second());
due_factor += static_cast<double>(duration.total_seconds());
// Multiplying due_factor by expected rate gives the number
// of exchanges to be initiated.
due_exchanges = static_cast<uint64_t>(due_factor * getRate());
// We want to make sure that at least one packet goes out.
if (due_exchanges == 0) {
due_exchanges = 1;
}
// We should not exceed aggressivity as it could have been
// restricted from command line.
if (due_exchanges > getAggressivity()) {
due_exchanges = getAggressivity();
}
} else {
// Rate is not specified so we rely on aggressivity
// which is the number of packets to be sent in
// one chunk.
due_exchanges = getAggressivity();
}
return (due_exchanges);
// Reduce bursts to have more uniform traffic.
if (pending_pkts_count > 3) {
pending_pkts_count = 3;
}
return (0);
total_pkts_sent_count_ += pending_pkts_count;
return pending_pkts_count;
}
boost::posix_time::ptime
......@@ -83,51 +64,6 @@ RateControl::currentTime() {
return (microsec_clock::universal_time());
}
void
RateControl::updateSendDue() {
// There is no sense to update due time if the current due time is in the
// future. The due time is calculated as a duration between the moment
// when the last message of the given type was sent and the time when
// next one is supposed to be sent based on a given rate. The former value
// will not change until we send the next message, which we don't do
// until we reach the due time.
if (send_due_ > currentTime()) {
return;
}
// This is initialized in the class constructor, so if it is not initialized
// it is a programmatic error.
if (last_sent_.is_not_a_date_time()) {
isc_throw(isc::Unexpected, "timestamp of the last sent packet not"
" initialized");
}
// If rate was not specified we will wait just one clock tick to
// send next packet. This simulates best effort conditions.
long duration = 1;
if (getRate() != 0) {
// We use number of ticks instead of nanoseconds because
// nanosecond resolution may not be available on some
// machines. Number of ticks guarantees the highest possible
// timer resolution.
duration = time_duration::ticks_per_second() / getRate();
}
// Calculate due time to initiate next chunk of exchanges.
send_due_ = last_sent_ + time_duration(0, 0, 0, duration);
if (send_due_ > currentTime()) {
late_sent_ = true;
} else {
late_sent_ = false;
}
}
void
RateControl::setAggressivity(const int aggressivity) {
if (aggressivity < 1) {
isc_throw(isc::BadValue, "invalid value of aggressivity "
<< aggressivity << ", expected value is greater than 0");
}
aggressivity_ = aggressivity;
}
void
RateControl::setRate(const int rate) {
if (rate < 0) {
......@@ -137,17 +73,5 @@ RateControl::setRate(const int rate) {
rate_ = rate;
}
void
RateControl::setRelativeDue(const int offset) {
send_due_ = offset > 0 ?
currentTime() + seconds(abs(offset)) :
currentTime() - seconds(abs(offset));
}
void
RateControl::updateSendTime() {
last_sent_ = currentTime();
}
} // namespace perfdhcp
} // namespace isc
......@@ -41,21 +41,10 @@ public:
/// \brief Default constructor.
RateControl();
/// \brief Constructor which sets desired rate and aggressivity.
/// \brief Constructor which sets desired rate.
///
/// \param rate A desired rate.
/// \param aggressivity A desired aggressivity.
RateControl(const int rate, const int aggressivity);
/// \brief Returns the value of aggressivity.
int getAggressivity() const {
return (aggressivity_);
}
/// \brief Returns current due time to send next message.
boost::posix_time::ptime getDue() const {
return (send_due_);
}
RateControl(const int rate);
/// \brief Returns number of messages to be sent "now".
///
......@@ -79,14 +68,6 @@ public:
/// the timer resolution). If the calculated value is equal to 0, it is
/// rounded to 1, so as at least one message is sent.
///
/// The value of aggressivity limits the maximal number of messages to
/// be sent one after another. If the number of messages calculated with
/// the equation above exceeds the aggressivity, this function will return
/// the value equal to aggressivity.
///
/// If the rate is not specified (equal to 0), the value calculated by
/// this function is equal to aggressivity.
///
/// \return A number of messages to be sent immediately.
uint64_t getOutboundMessageCount();
......@@ -95,75 +76,33 @@ public:
return (rate_);
}
/// \brief Returns the value of the late send flag.
///
/// The flag returned by this function indicates whether the new due time
/// calculated by the \c RateControl::updateSendDue is in the past.
/// This value is used by the \c TestControl object to increment the counter
/// of the late sent messages in the \c StatsMgr.
bool isLateSent() const {
return (late_sent_);
}
/// \brief Sets the value of aggressivity.
///
/// \param aggressivity A new value of aggressivity. This value must be
/// a positive integer.
/// \throw isc::BadValue if new value is not a positive integer.
void setAggressivity(const int aggressivity);
/// \brief Sets the new rate.
///
/// \param rate A new value of rate. This value must not be negative.
/// \throw isc::BadValue if new rate is negative.
void setRate(const int rate);
/// \brief Sets the value of the due time.
///
/// This function is intended for unit testing. It manipulates the value of
/// the due time. The parameter passed to this function specifies the
/// (positive or negative) number of seconds relative to current time.
///
/// \param offset A number of seconds relative to current time which
/// constitutes the new due time.
void setRelativeDue(const int offset);
/// \brief Sets the timestamp of the last sent message to current time.
void updateSendTime();
protected:
/// \brief Convenience function returning current time.
///
/// \return current time.
static boost::posix_time::ptime currentTime();
/// \brief Calculates the send due.
///
/// This function calculates the send due timestamp using the current time
/// and desired rate. The due timestamp is calculated as a sum of the
/// timestamp when the last message was sent and the reciprocal of the rate
/// in micro or nanoseconds (depending on the timer resolution). If the rate
/// is not specified, the duration between two consecutive sends is one
/// timer tick.
void updateSendDue();
/// \brief Holds a timestamp when the next message should be sent.
boost::posix_time::ptime send_due_;
/// \brief Holds a timestamp when the last message was sent.
boost::posix_time::ptime last_sent_;
/// \brief Holds an aggressivity value.
int aggressivity_;
boost::posix_time::ptime currentTime();
/// \brief Holds a desired rate value.
int rate_;
/// \brief A flag which indicates that the calculated due time is in the
/// past.