Commit 4b87d904 authored by Thomas Markwalder's avatar Thomas Markwalder
Browse files

[3221] Added use of dhcp_ddns::WatchSocket and additonal queue methods

Integrated use of WatchSocket to NameChangeSender and NameChangeUDPSender.
Added addtional methods for accessing sender's send queue.
Created include and source for common dhcp_ddns lib test components.
parent 1ebd0b78
......@@ -23,7 +23,7 @@ namespace dhcp_ddns {
NameChangeProtocol stringToNcrProtocol(const std::string& protocol_str) {
if (boost::iequals(protocol_str, "UDP")) {
return (NCR_UDP);
}
}
if (boost::iequals(protocol_str, "TCP")) {
return (NCR_TCP);
......@@ -162,10 +162,7 @@ NameChangeSender::NameChangeSender(RequestSendHandler& send_handler,
send_queue_max_(send_queue_max) {
// Queue size must be big enough to hold at least 1 entry.
if (send_queue_max == 0) {
isc_throw(NcrSenderError, "NameChangeSender constructor"
" queue size must be greater than zero");
}
setQueueMaxSize(send_queue_max);
}
void
......@@ -318,5 +315,57 @@ NameChangeSender::clearSendQueue() {
send_queue_.clear();
}
void
NameChangeSender::setQueueMaxSize(const size_t new_max) {
if (new_max == 0) {
isc_throw(NcrSenderError, "NameChangeSender:"
" queue size must be greater than zero");
}
send_queue_max_ = new_max;
}
const NameChangeRequestPtr&
NameChangeSender::peekAt(const size_t index) const {
if (index >= getQueueSize()) {
isc_throw(NcrSenderError,
"NameChangeSender::peekAt peek beyond end of queue attempted"
<< " index: " << index << " queue size: " << getQueueSize());
}
return (send_queue_.at(index));
}
void
NameChangeSender::assumeQueue(NameChangeSender& sourceSender) {
if (sourceSender.amSending()) {
isc_throw(NcrSenderError, "Cannot assume queue:"
" source sender is actively sending");
}
if (amSending()) {
isc_throw(NcrSenderError, "Cannot assume queue:"
" target sender is actively sending");
}
if (getQueueMaxSize() < sourceSender.getQueueSize()) {
isc_throw(NcrSenderError, "Cannot assume queue:"
" source queue count exceeds target queue max");
}
if (send_queue_.size() != 0) {
isc_throw(NcrSenderError, "Cannot assume queue:"
" target queue is not empty");
}
send_queue_.swap(sourceSender.getSendQueue());
}
int
NameChangeSender::getSelectFd() {
isc_throw(NotImplemented, "NameChangeSender::getSelectFd is not supported");
}
} // namespace isc::dhcp_ddns
} // namespace isc
......@@ -547,6 +547,36 @@ public:
/// capacity.
void sendRequest(NameChangeRequestPtr& ncr);
/// @brief Move all queued requests from a given sender into the send queue
///
/// Moves all of the entries in the given sender's queue and places them
/// into send queue. This provides a mechanism of reassigning queued
/// messages from one sender to another. This is useful for dealing with
/// dynamic configuration changes.
///
/// @param Sender from whom the queued messages will be taken
///
/// @throw NcrSenderError if either sender is in send mode, if the number of
/// messages in the source sender's queue is larger than this sender's
/// maxium queue size, or if this sender's queue is not empty.
void assumeQueue(NameChangeSender& fromSender);
/// @brief Returns a file description suitable for use with select
///
/// The value returned is an open file descriptor which can be used with
/// select() system call to monitor the sender for IO events. This allows
/// NameChangeSenders to be used in applications which use select, rather
/// than IOService to wait for IO events to occur.
///
/// @note Attempting other use of this value may lead to unpredictable
/// behavior in the sender.
///
/// @return Returns an "open" file descriptor
///
/// @throw NcrSenderError if the sender is not in send mode,
/// NotImplemented if the implementation does not support such an fd.
virtual int getSelectFd();
protected:
/// @brief Dequeues and sends the next request on the send queue.
///
......@@ -659,11 +689,39 @@ public:
return (send_queue_max_);
}
/// @brief Sets the maxium queue size to the given value.
///
/// Sets the maximum number of entries allowed in the queue to the
/// the given value.
///
/// @param new_max the new value to use as the maximum
///
/// @throw NcrSenderError if the value is less than one.
void setQueueMaxSize(const size_t new_max);
/// @brief Returns the number of entries currently in the send queue.
size_t getQueueSize() const {
return (send_queue_.size());
}
/// @brief Returns the entry at a given position in the queue.
///
/// Note that the entry is not removed from the queue.
/// @param index the index of the entry in the queue to fetch.
/// Valid values are 0 (front of the queue) to (queue size - 1).
///
/// @return Pointer reference to the queue entry.
///
/// @throw NcrSenderError if the given index is beyond the
/// end of the queue.
const NameChangeRequestPtr& peekAt(const size_t index) const;
protected:
/// @brief Returns a reference to the send queue.
SendQueue& getSendQueue() {
return (send_queue_);
}
private:
/// @brief Sets the sending indicator to the given value.
///
......
......@@ -256,6 +256,8 @@ NameChangeUDPSender::open(isc::asiolink::IOService& io_service) {
server_port_));
send_callback_->setDataSource(server_endpoint_);
watch_socket_.reset(new WatchSocket());
}
void
......@@ -282,6 +284,8 @@ NameChangeUDPSender::close() {
}
socket_.reset();
watch_socket_.reset();
}
void
......@@ -298,11 +302,17 @@ NameChangeUDPSender::doSend(NameChangeRequestPtr& ncr) {
// Call the socket's asychronous send, passing our callback
socket_->asyncSend(send_callback_->getData(), send_callback_->getPutLen(),
send_callback_->getDataSource().get(), *send_callback_);
// Set IO ready marker so sender activity is visible to select() or poll().
watch_socket_->markReady();
}
void
NameChangeUDPSender::sendCompletionHandler(const bool successful,
const UDPCallback *send_callback) {
// Clear the IO ready marker.
watch_socket_->clearReady();
Result result;
if (successful) {
result = SUCCESS;
......@@ -324,5 +334,17 @@ NameChangeUDPSender::sendCompletionHandler(const bool successful,
// Call the application's registered request send handler.
invokeSendHandler(result);
}
int
NameChangeUDPSender::getSelectFd() {
if (!amSending()) {
isc_throw(NotImplemented, "NameChangeUDPSender::getSelectFd"
" not in send mode");
}
return(watch_socket_->getSelectFd());
}
}; // end of isc::dhcp_ddns namespace
}; // end of isc namespace
......@@ -112,10 +112,12 @@
#include <asiolink/udp_endpoint.h>
#include <asiolink/udp_socket.h>
#include <dhcp_ddns/ncr_io.h>
#include <dhcp_ddns/watch_socket.h>
#include <util/buffer.h>
#include <boost/shared_array.hpp>
/// responsibility of the completion handler to perform the steps necessary
/// to interpret the raw data provided by the service outcome. The
/// UDPCallback operator implementation is mostly a pass through.
......@@ -524,6 +526,21 @@ public:
void sendCompletionHandler(const bool successful,
const UDPCallback* send_callback);
/// @brief Returns a file description suitable for use with select
///
/// The value returned is an open file descriptor which can be used with
/// select() system call to monitor the sender for IO events. This allows
/// NameChangeUDPSenders to be used in applications which use select,
/// rather than IOService to wait for IO events to occur.
///
/// @note Attempting other use of this value may lead to unpredictable
/// behavior in the sender.
///
/// @return Returns an "open" file descriptor
///
/// @throw NcrSenderError if the sender is not in send mode,
virtual int getSelectFd();
private:
/// @brief IP address from which to send.
isc::asiolink::IOAddress ip_address_;
......@@ -554,6 +571,8 @@ private:
/// @brief Flag which enables the reuse address socket option if true.
bool reuse_address_;
WatchSocketPtr watch_socket_;
};
} // namespace isc::dhcp_ddns
......
......@@ -29,6 +29,7 @@ TESTS += libdhcp_ddns_unittests
libdhcp_ddns_unittests_SOURCES = run_unittests.cc
libdhcp_ddns_unittests_SOURCES += ncr_unittests.cc
libdhcp_ddns_unittests_SOURCES += ncr_udp_unittests.cc
libdhcp_ddns_unittests_SOURCES += test_utils.cc test_utils.h
libdhcp_ddns_unittests_SOURCES += watch_socket_unittests.cc
libdhcp_ddns_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES) $(LOG4CPLUS_INCLUDES)
......
......@@ -16,6 +16,7 @@
#include <dhcp_ddns/ncr_io.h>
#include <dhcp_ddns/ncr_udp.h>
#include <util/time_utilities.h>
#include <test_utils.h>
#include <asio/ip/udp.hpp>
#include <boost/function.hpp>
......@@ -23,6 +24,8 @@
#include <gtest/gtest.h>
#include <algorithm>
#include <sys/select.h>
using namespace std;
using namespace isc;
using namespace isc::dhcp_ddns;
......@@ -68,6 +71,7 @@ const char *valid_msgs[] =
};
const char* TEST_ADDRESS = "127.0.0.1";
//const char* TEST_ADDRESS = "192.0.2.10";
const uint32_t LISTENER_PORT = 5301;
const uint32_t SENDER_PORT = LISTENER_PORT+1;
const long TEST_TIMEOUT = 5 * 1000;
......@@ -113,6 +117,7 @@ TEST(NameChangeUDPListenerBasicTest, basicListenTests) {
// Verify that we can start listening.
EXPECT_NO_THROW(listener->startListening(io_service));
// Verify that we are in listening mode.
EXPECT_TRUE(listener->amListening());
// Verify that a read is in progress.
......@@ -310,8 +315,8 @@ TEST(NameChangeUDPSenderBasicTest, constructionTests) {
/// @brief Tests NameChangeUDPSender basic send functionality
/// This test verifies that:
TEST(NameChangeUDPSenderBasicTest, basicSendTests) {
isc::asiolink::IOAddress ip_address(TEST_ADDRESS);
uint32_t port = SENDER_PORT;
isc::asiolink::IOAddress ip_address("127.0.0.1");
uint32_t port = 5301;
isc::asiolink::IOService io_service;
SimpleSendHandler ncr_handler;
......@@ -320,7 +325,8 @@ TEST(NameChangeUDPSenderBasicTest, basicSendTests) {
// Create the sender, setting the queue max equal to the number of
// messages we will have in the list.
NameChangeUDPSender sender(ip_address, port, ip_address, port,
isc::asiolink::IOAddress any("0.0.0.0");
NameChangeUDPSender sender(any, 0, ip_address, port,
FMT_JSON, ncr_handler, num_msgs);
// Verify that we can start sending.
......@@ -341,30 +347,55 @@ TEST(NameChangeUDPSenderBasicTest, basicSendTests) {
EXPECT_NO_THROW(sender.startSending(io_service));
EXPECT_TRUE(sender.amSending());
// Fetch the sender's select-fd.
int select_fd = sender.getSelectFd();
// Verify select_fd is valid and currently shows no ready to read.
ASSERT_NE(dhcp_ddns::WatchSocket::INVALID_SOCKET, select_fd);
ASSERT_EQ(0, selectCheck(select_fd));
// Iterate over a series of messages, sending each one. Since we
// do not invoke IOService::run, then the messages should accumulate
// in the queue.
NameChangeRequestPtr ncr;
NameChangeRequestPtr ncr2;
for (int i = 0; i < num_msgs; i++) {
ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i]));
EXPECT_NO_THROW(sender.sendRequest(ncr));
// Verify that the queue count increments in step with each send.
EXPECT_EQ(i+1, sender.getQueueSize());
// Verify that peekAt(i) returns the NCR we just added.
ASSERT_NO_THROW(ncr2 = sender.peekAt(i));
ASSERT_TRUE(ncr2);
EXPECT_TRUE(*ncr == *ncr2);
}
// Verify that attempting to peek beyond the end of the queue, throws.
ASSERT_THROW(sender.peekAt(sender.getQueueSize()+1), NcrSenderError);
// Verify that attempting to send an additional message results in a
// queue full exception.
EXPECT_THROW(sender.sendRequest(ncr), NcrSenderQueueFull);
// Loop for the number of valid messages and invoke IOService::run_one.
// This should send exactly one message and the queue count should
// decrement accordingly.
// Loop for the number of valid messages. So long as there is at least
// on NCR in the queue, select-fd indicate ready to read. Invoke
// IOService::run_one. This should complete the send of exactly one
// message and the queue count should decrement accordingly.
for (int i = num_msgs; i > 0; i--) {
// Verify that sender shows IO ready.
ASSERT_TRUE(selectCheck(select_fd) > 0);
// Execute at one ready handler.
io_service.run_one();
// Verify that the queue count decrements in step with each run.
EXPECT_EQ(i-1, sender.getQueueSize());
}
// Verify that sender shows no IO ready.
EXPECT_EQ(0, selectCheck(select_fd));
// Verify that the queue is empty.
EXPECT_EQ(0, sender.getQueueSize());
......@@ -395,6 +426,72 @@ TEST(NameChangeUDPSenderBasicTest, basicSendTests) {
EXPECT_EQ(0, sender.getQueueSize());
}
/// @brief Test the NameChangeSender::assumeQueue method.
TEST(NameChangeSender, assumeQueue) {
isc::asiolink::IOAddress ip_address(TEST_ADDRESS);
uint32_t port = SENDER_PORT;
isc::asiolink::IOService io_service;
SimpleSendHandler ncr_handler;
NameChangeRequestPtr ncr;
// Tests are based on a list of messages, get the count now.
int num_msgs = sizeof(valid_msgs)/sizeof(char*);
// Create two senders with queue max equal to the number of
// messages we will have in the list.
NameChangeUDPSender sender1(ip_address, port, ip_address, port,
FMT_JSON, ncr_handler, num_msgs);
NameChangeUDPSender sender2(ip_address, port+1, ip_address, port,
FMT_JSON, ncr_handler, num_msgs);
// Place sender1 into send mode and queue up messages.
ASSERT_NO_THROW(sender1.startSending(io_service));
for (int i = 0; i < num_msgs; i++) {
ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i]));
ASSERT_NO_THROW(sender1.sendRequest(ncr));
}
// Make sure sender1's queue count is as expected.
ASSERT_EQ(num_msgs, sender1.getQueueSize());
// Verify sender1 is sending, sender2 is not.
ASSERT_TRUE(sender1.amSending());
ASSERT_FALSE(sender2.amSending());
// Transfer from sender1 to sender2 should fail because
// sender1 is in send mode.
ASSERT_THROW(sender2.assumeQueue(sender1), NcrSenderError);
// Take sender1 out of send mode.
ASSERT_NO_THROW(sender1.stopSending());
ASSERT_FALSE(sender1.amSending());
// Transfer should succeed. Verify sender1 has none,
// and sender2 has num_msgs queued.
EXPECT_NO_THROW(sender2.assumeQueue(sender1));
EXPECT_EQ(0, sender1.getQueueSize());
EXPECT_EQ(num_msgs, sender2.getQueueSize());
// Reduce sender1's max queue size.
ASSERT_NO_THROW(sender1.setQueueMaxSize(num_msgs - 1));
// Transfer should fail as sender1's queue is not large enough.
ASSERT_THROW(sender1.assumeQueue(sender2), NcrSenderError);
// Place sender1 into send mode and queue up a message.
ASSERT_NO_THROW(sender1.startSending(io_service));
ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[0]));
ASSERT_NO_THROW(sender1.sendRequest(ncr));
// Take sender1 out of send mode.
ASSERT_NO_THROW(sender1.stopSending());
// Try to transfer from sender1 to sender2. This should fail
// as sender2's queue is not empty.
ASSERT_THROW(sender2.assumeQueue(sender1), NcrSenderError);
}
/// @brief Text fixture that allows testing a listener and sender together
/// It derives from both the receive and send handler classes and contains
/// and instance of UDP listener and UDP sender.
......
// Copyright (C) 2014 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#include <gtest/gtest.h>
#include <sys/select.h>
#include <sys/ioctl.h>
using namespace std;
namespace isc {
namespace dhcp_ddns {
int selectCheck(int fd_to_check) {
fd_set read_fds;
int maxfd = 0;
FD_ZERO(&read_fds);
// Add this socket to listening set
FD_SET(fd_to_check, &read_fds);
maxfd = fd_to_check;
struct timeval select_timeout;
select_timeout.tv_sec = 0;
select_timeout.tv_usec = 0;
return (select(maxfd + 1, &read_fds, NULL, NULL, &select_timeout));
}
}; // namespace isc::d2
}; // namespace isc
// Copyright (C) 2014 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#ifndef TEST_UTILS_H
#define TEST_UTILS_H
/// @file test_utils.h Common dhcp_ddns testing elements
#include <gtest/gtest.h>
namespace isc {
namespace dhcp_ddns {
/// @brief Returns the result of select() given an fd to check for read status.
///
/// @param fd_to_check The file descriptor to test
///
/// @return Returns less than one on an error, 0 if the fd is not ready to
/// read, > 0 if it is ready to read.
int selectCheck(int fd_to_check);
}; // namespace isc::dhcp_ddns;
}; // namespace isc;
#endif
......@@ -13,6 +13,7 @@
// PERFORMANCE OF THIS SOFTWARE.
#include <dhcp_ddns/watch_socket.h>
#include <test_utils.h>
#include <gtest/gtest.h>
......@@ -25,31 +26,6 @@ using namespace isc::dhcp_ddns;
namespace {
/// @brief Returns the result of select() given an fd to check for read status.
///
/// @param fd_to_check The file descriptor to test
/// @return Returns less than one on an error, 0 if the fd is not ready to
/// read, > 0 if it is ready to read.
int selectCheck(int fd_to_check).
int selectCheck(int fd_to_check) {
fd_set read_fds;
int maxfd = 0;
FD_ZERO(&read_fds);
// Add this socket to listening set
FD_SET(fd_to_check, &read_fds);
maxfd = fd_to_check;
struct timeval select_timeout;
select_timeout.tv_sec = 0;
select_timeout.tv_usec = 0;
return (select(maxfd + 1, &read_fds, NULL, NULL, &select_timeout));
}
/// @brief Tests the basic functionality of WatchSocket.
TEST(WatchSocketTest, basics) {
WatchSocketPtr watch;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment