d2_queue_mgr_unittests.cc 16.9 KB
Newer Older
1
// Copyright (C) 2013-2015 Internet Systems Consortium, Inc. ("ISC")
2
//
3 4 5
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6

7 8
#include <config.h>

9 10
#include <asiolink/io_service.h>
#include <asiolink/interval_timer.h>
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
#include <d2/d2_queue_mgr.h>
#include <dhcp_ddns/ncr_udp.h>
#include <util/time_utilities.h>

#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <gtest/gtest.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <vector>

using namespace std;
using namespace isc;
using namespace isc::dhcp_ddns;
using namespace isc::d2;

namespace {

/// @brief Defines a list of valid JSON NameChangeRequest test messages.
const char *valid_msgs[] =
{
    // Valid Add.
     "{"
34 35 36
     " \"change-type\" : 0 , "
     " \"forward-change\" : true , "
     " \"reverse-change\" : false , "
37
     " \"fqdn\" : \"walah.walah.com\" , "
38
     " \"ip-address\" : \"192.168.2.1\" , "
39
     " \"dhcid\" : \"010203040A7F8E3D\" , "
40 41
     " \"lease-expires-on\" : \"20130121132405\" , "
     " \"lease-length\" : 1300 "
42 43 44
     "}",
    // Valid Remove.
     "{"
45 46 47
     " \"change-type\" : 1 , "
     " \"forward-change\" : true , "
     " \"reverse-change\" : false , "
48
     " \"fqdn\" : \"walah.walah.com\" , "
49
     " \"ip-address\" : \"192.168.2.1\" , "
50
     " \"dhcid\" : \"010203040A7F8E3D\" , "
51 52
     " \"lease-expires-on\" : \"20130121132405\" , "
     " \"lease-length\" : 1300 "
53 54 55
     "}",
     // Valid Add with IPv6 address
     "{"
56 57 58
     " \"change-type\" : 0 , "
     " \"forward-change\" : true , "
     " \"reverse-change\" : false , "
59
     " \"fqdn\" : \"walah.walah.com\" , "
60
     " \"ip-address\" : \"fe80::2acf:e9ff:fe12:e56f\" , "
61
     " \"dhcid\" : \"010203040A7F8E3D\" , "
62 63
     " \"lease-expires-on\" : \"20130121132405\" , "
     " \"lease-length\" : 1300 "
64 65 66 67 68 69 70 71 72 73
     "}"
};

static const  int VALID_MSG_CNT = sizeof(valid_msgs)/sizeof(char*);

const char* TEST_ADDRESS = "127.0.0.1";
const uint32_t LISTENER_PORT = 5301;
const uint32_t SENDER_PORT = LISTENER_PORT+1;
const long TEST_TIMEOUT = 5 * 1000;

74 75
/// @brief Tests that construction with max queue size of zero is not allowed.
TEST(D2QueueMgrBasicTest, construction1) {
76
    asiolink::IOServicePtr io_service;
77 78 79

    // Verify that constructing with null IOServicePtr is not allowed.
    EXPECT_THROW((D2QueueMgr(io_service)), D2QueueMgrError);
80

81
    io_service.reset(new isc::asiolink::IOService());
82
    // Verify that constructing with max queue size of zero is not allowed.
83
    EXPECT_THROW(D2QueueMgr(io_service, 0), D2QueueMgrError);
84 85 86 87
}

/// @brief Tests default construction works.
TEST(D2QueueMgrBasicTest, construction2) {
88
    asiolink::IOServicePtr io_service(new isc::asiolink::IOService());
89 90 91

    // Verify that valid constructor works.
    D2QueueMgrPtr queue_mgr;
92
    ASSERT_NO_THROW(queue_mgr.reset(new D2QueueMgr(io_service)));
93 94
    // Verify queue max is defaulted correctly.
    EXPECT_EQ(D2QueueMgr::MAX_QUEUE_DEFAULT, queue_mgr->getMaxQueueSize());
95 96 97 98
}

/// @brief Tests construction with custom queue size works properly
TEST(D2QueueMgrBasicTest, construction3) {
99
    asiolink::IOServicePtr io_service(new isc::asiolink::IOService());
100 101

    // Verify that custom queue size constructor works.
102 103
    D2QueueMgrPtr queue_mgr;
    ASSERT_NO_THROW(queue_mgr.reset(new D2QueueMgr(io_service, 100)));
104 105 106 107 108 109 110 111 112
    // Verify queue max is the custom value.
    EXPECT_EQ(100, queue_mgr->getMaxQueueSize());
}

/// @brief Tests  QueueMgr's basic queue functions
/// This test verifies that:
/// 1. Following construction queue is empty
/// 2. Attempting to peek at an empty queue is not allowed
/// 3. Attempting to dequeue an empty queue is not allowed
113
/// 4. Peek returns the first entry on the queue without altering queue content
114
/// 5. Dequeue removes the first entry on the queue
115
TEST(D2QueueMgrBasicTest, basicQueue) {
116
    asiolink::IOServicePtr io_service(new isc::asiolink::IOService());
117 118 119 120

    // Construct the manager with max queue size set to number of messages
    // we'll use.
    D2QueueMgrPtr queue_mgr;
121 122
    ASSERT_NO_THROW(queue_mgr.reset(new D2QueueMgr(io_service, VALID_MSG_CNT)));
    ASSERT_EQ(VALID_MSG_CNT, queue_mgr->getMaxQueueSize());
123 124 125 126 127

    // Verify queue is empty after construction.
    EXPECT_EQ(0, queue_mgr->getQueueSize());

    // Verify that peek and dequeue both throw when queue is empty.
128 129
    EXPECT_THROW(queue_mgr->peek(), D2QueueMgrQueueEmpty);
    EXPECT_THROW(queue_mgr->dequeue(), D2QueueMgrQueueEmpty);
130 131 132

    // Vector to keep track of the NCRs we que.
    std::vector<NameChangeRequestPtr>ref_msgs;
133
    NameChangeRequestPtr ncr;
134 135 136 137

    // Iterate over the list of requests and add each to the queue.
    for (int i = 0; i < VALID_MSG_CNT; i++) {
        // Create the ncr and add to our reference list.
138
        ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i]));
139
        ref_msgs.push_back(ncr);
140

141 142 143 144 145 146 147
        // Verify that the request can be added to the queue and queue
        // size increments accordingly.
        EXPECT_NO_THROW(queue_mgr->enqueue(ncr));
        EXPECT_EQ(i+1, queue_mgr->getQueueSize());
    }

    // Loop through and verify that the queue contents match the
148
    // reference list.
149 150 151 152 153 154 155 156 157 158
    for (int i = 0; i < VALID_MSG_CNT; i++) {
        // Verify that peek on a non-empty queue returns first entry
        // without altering queue content.
        EXPECT_NO_THROW(ncr = queue_mgr->peek());

        // Verify the peeked entry is the one it should be.
        ASSERT_TRUE(ncr);
        EXPECT_TRUE (*(ref_msgs[i]) == *ncr);

        // Verify that peek did not alter the queue size.
159
        EXPECT_EQ(VALID_MSG_CNT - i, queue_mgr->getQueueSize());
160 161 162

        // Verify the dequeueing from non-empty queue works
        EXPECT_NO_THROW(queue_mgr->dequeue());
163 164

        // Verify queue size decrements following dequeue.
165
        EXPECT_EQ(VALID_MSG_CNT - (i + 1), queue_mgr->getQueueSize());
166
    }
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192

    // Iterate over the list of requests and add each to the queue.
    for (int i = 0; i < VALID_MSG_CNT; i++) {
        // Create the ncr and add to our reference list.
        ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i]));
        ref_msgs.push_back(ncr);
        EXPECT_NO_THROW(queue_mgr->enqueue(ncr));
    }

    // Verify queue count is correct.
    EXPECT_EQ(VALID_MSG_CNT, queue_mgr->getQueueSize());

    // Verfiy that peekAt returns the correct entry.
    EXPECT_NO_THROW(ncr = queue_mgr->peekAt(1));
    EXPECT_TRUE (*(ref_msgs[1]) == *ncr);

    // Verfiy that dequeueAt removes the correct entry.
    // Removing it, this should shift the queued entries forward by one.
    EXPECT_NO_THROW(queue_mgr->dequeueAt(1));
    EXPECT_NO_THROW(ncr = queue_mgr->peekAt(1));
    EXPECT_TRUE (*(ref_msgs[2]) == *ncr);

    // Verify the peekAt and dequeueAt throw when given indexes beyond the end.
    EXPECT_THROW(queue_mgr->peekAt(VALID_MSG_CNT + 1), D2QueueMgrInvalidIndex);
    EXPECT_THROW(queue_mgr->dequeueAt(VALID_MSG_CNT + 1),
                 D2QueueMgrInvalidIndex);
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
}

/// @brief Compares two NameChangeRequests for equality.
bool checkSendVsReceived(NameChangeRequestPtr sent_ncr,
                         NameChangeRequestPtr received_ncr) {
    return ((sent_ncr && received_ncr) &&
        (*sent_ncr == *received_ncr));
}

/// @brief Text fixture that allows testing a listener and sender together
/// It derives from both the receive and send handler classes and contains
/// and instance of UDP listener and UDP sender.
class QueueMgrUDPTest : public virtual ::testing::Test,
                        NameChangeSender::RequestSendHandler {
public:
208
    asiolink::IOServicePtr io_service_;
209 210 211 212
    NameChangeSenderPtr   sender_;
    isc::asiolink::IntervalTimer test_timer_;
    D2QueueMgrPtr queue_mgr_;

213
    NameChangeSender::Result send_result_;
214 215 216
    std::vector<NameChangeRequestPtr> sent_ncrs_;
    std::vector<NameChangeRequestPtr> received_ncrs_;

217 218
    QueueMgrUDPTest() : io_service_(new isc::asiolink::IOService()),
        test_timer_(*io_service_) {
219 220
        isc::asiolink::IOAddress addr(TEST_ADDRESS);
        // Create our sender instance. Note that reuse_address is true.
221
        sender_.reset(new NameChangeUDPSender(addr, SENDER_PORT,
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
                                              addr, LISTENER_PORT,
                                              FMT_JSON, *this, 100, true));

        // Set the test timeout to break any running tasks if they hang.
        test_timer_.setup(boost::bind(&QueueMgrUDPTest::testTimeoutHandler,
                                      this),
                          TEST_TIMEOUT);
    }

    void reset_results() {
        sent_ncrs_.clear();
        received_ncrs_.clear();
    }

    /// @brief Implements the send completion handler.
    virtual void operator ()(const NameChangeSender::Result result,
                             NameChangeRequestPtr& ncr) {
        // save the result and the NCR sent.
        send_result_ = result;
        sent_ncrs_.push_back(ncr);
    }

    /// @brief Handler invoked when test timeout is hit.
    ///
    /// This callback stops all running (hanging) tasks on IO service.
    void testTimeoutHandler() {
248
        io_service_->stop();
249 250 251 252 253
        FAIL() << "Test timeout hit.";
    }
};

/// @brief Tests D2QueueMgr's state model.
254
/// This test verifies that:
255 256 257
/// 1. Upon construction, initial state is NOT_INITTED.
/// 2. Cannot start listening from while state is NOT_INITTED.
/// 3. Successful listener initialization transitions from NOT_INITTED
258
/// to INITTED.
259 260 261 262 263
/// 4. Attempting to initialize the listener from INITTED state is not
/// allowed.
/// 5. Starting listener from INITTED transitions to RUNNING.
/// 6. Stopping the  listener transitions from RUNNING to STOPPED.
/// 7. Starting listener from STOPPED transitions to RUNNING.
264
TEST_F (QueueMgrUDPTest, stateModel) {
265
    // Create the queue manager.
266
    ASSERT_NO_THROW(queue_mgr_.reset(new D2QueueMgr(io_service_,
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
                                     VALID_MSG_CNT)));

    // Verify that the initial state is NOT_INITTED.
    EXPECT_EQ(D2QueueMgr::NOT_INITTED, queue_mgr_->getMgrState());

    // Verify that trying to listen before when not initialized fails.
    EXPECT_THROW(queue_mgr_->startListening(), D2QueueMgrError);

    // Verify that initializing the listener moves us to INITTED state.
    isc::asiolink::IOAddress addr(TEST_ADDRESS);
    EXPECT_NO_THROW(queue_mgr_->initUDPListener(addr, LISTENER_PORT,
                                              FMT_JSON, true));
    EXPECT_EQ(D2QueueMgr::INITTED, queue_mgr_->getMgrState());

    // Verify that attempting to initialize the listener, from INITTED
    // is not allowed.
    EXPECT_THROW(queue_mgr_->initUDPListener(addr, LISTENER_PORT,
284
                                              FMT_JSON, true),
285
                 D2QueueMgrError);
286

287 288 289 290 291
    // Verify that we can enter the RUNNING from INITTED by starting the
    // listener.
    EXPECT_NO_THROW(queue_mgr_->startListening());
    EXPECT_EQ(D2QueueMgr::RUNNING, queue_mgr_->getMgrState());

292
    // Verify that we can move from RUNNING to STOPPING by stopping the
293 294
    // listener.
    EXPECT_NO_THROW(queue_mgr_->stopListening());
295 296 297 298
    EXPECT_EQ(D2QueueMgr::STOPPING, queue_mgr_->getMgrState());

    // Stopping requires IO cancel, which result in a callback.
    // So process one event and verify we are STOPPED.
299
    io_service_->run_one();
300 301 302 303 304 305 306 307 308 309 310 311
    EXPECT_EQ(D2QueueMgr::STOPPED, queue_mgr_->getMgrState());

    // Verify that we can re-enter the RUNNING from STOPPED by starting the
    // listener.
    EXPECT_NO_THROW(queue_mgr_->startListening());
    EXPECT_EQ(D2QueueMgr::RUNNING, queue_mgr_->getMgrState());

    // Verify that we cannot remove the listener in the RUNNING state
    EXPECT_THROW(queue_mgr_->removeListener(), D2QueueMgrError);

    // Stop the listener.
    EXPECT_NO_THROW(queue_mgr_->stopListening());
312 313 314 315
    EXPECT_EQ(D2QueueMgr::STOPPING, queue_mgr_->getMgrState());

    // Stopping requires IO cancel, which result in a callback.
    // So process one event and verify we are STOPPED.
316
    io_service_->run_one();
317 318 319 320 321 322 323 324
    EXPECT_EQ(D2QueueMgr::STOPPED, queue_mgr_->getMgrState());

    // Verify that we can remove the listener in the STOPPED state and
    // end up back in NOT_INITTED.
    EXPECT_NO_THROW(queue_mgr_->removeListener());
    EXPECT_EQ(D2QueueMgr::NOT_INITTED, queue_mgr_->getMgrState());
}

325 326
/// @brief Tests D2QueueMgr's ability to manage received requests
/// This test verifies that:
327 328 329 330 331 332 333 334 335
/// 1. Requests can be received, queued, and dequeued
/// 2. Once the queue is full, a subsequent request transitions
/// manager to STOPPED_QUEUE_FULL state.
/// 3. Starting listener returns manager to the RUNNING state.
/// 4. Queue contents are preserved across state transitions.
/// 5. Clearing the queue via the clearQueue() method works.
/// 6. Requests can be received and queued normally after the queue
/// has been emptied.
/// 7. setQueueMax disallows values of 0 or less than current queue size.
336
TEST_F (QueueMgrUDPTest, liveFeed) {
337 338 339 340
    NameChangeRequestPtr send_ncr;
    NameChangeRequestPtr received_ncr;

    // Create the queue manager and start listening..
341
    ASSERT_NO_THROW(queue_mgr_.reset(new D2QueueMgr(io_service_,
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
                                                    VALID_MSG_CNT)));
    ASSERT_EQ(D2QueueMgr::NOT_INITTED, queue_mgr_->getMgrState());

    // Verify that setting max queue size to 0 is not allowed.
    EXPECT_THROW(queue_mgr_->setMaxQueueSize(0), D2QueueMgrError);
    EXPECT_EQ(VALID_MSG_CNT, queue_mgr_->getMaxQueueSize());

    isc::asiolink::IOAddress addr(TEST_ADDRESS);
    ASSERT_NO_THROW(queue_mgr_->initUDPListener(addr, LISTENER_PORT,
                                              FMT_JSON, true));
    ASSERT_EQ(D2QueueMgr::INITTED, queue_mgr_->getMgrState());

    ASSERT_NO_THROW(queue_mgr_->startListening());
    ASSERT_EQ(D2QueueMgr::RUNNING, queue_mgr_->getMgrState());

    // Place the sender into sending state.
358
    ASSERT_NO_THROW(sender_->startSending(*io_service_));
359 360 361 362 363 364
    ASSERT_TRUE(sender_->amSending());

    // Iterate over the list of requests sending and receiving
    // each one.  Verify and dequeue as they arrive.
    for (int i = 0; i < VALID_MSG_CNT; i++) {
        // Create the ncr and add to our reference list.
365
        ASSERT_NO_THROW(send_ncr = NameChangeRequest::fromJSON(valid_msgs[i]));
366 367 368
        ASSERT_NO_THROW(sender_->sendRequest(send_ncr));

        // running two should do the send then the receive
369 370
        io_service_->run_one();
        io_service_->run_one();
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388

        // Verify that the request can be added to the queue and queue
        // size increments accordingly.
        EXPECT_EQ(1, queue_mgr_->getQueueSize());

        // Verify that peek shows the NCR we just sent
        EXPECT_NO_THROW(received_ncr = queue_mgr_->peek());
        EXPECT_TRUE(checkSendVsReceived(send_ncr, received_ncr));

        // Verify that we and dequeue the request.
        EXPECT_NO_THROW(queue_mgr_->dequeue());
        EXPECT_EQ(0, queue_mgr_->getQueueSize());
    }

    // Iterate over the list of requests, sending and receiving
    // each one. Allow them to accumulate in the queue.
    for (int i = 0; i < VALID_MSG_CNT; i++) {
        // Create the ncr and add to our reference list.
389
        ASSERT_NO_THROW(send_ncr = NameChangeRequest::fromJSON(valid_msgs[i]));
390 391 392
        ASSERT_NO_THROW(sender_->sendRequest(send_ncr));

        // running two should do the send then the receive
393 394
        EXPECT_NO_THROW(io_service_->run_one());
        EXPECT_NO_THROW(io_service_->run_one());
395 396 397 398 399 400 401 402
        EXPECT_EQ(i+1, queue_mgr_->getQueueSize());
    }

    // Verify that the queue is at max capacity.
    EXPECT_EQ(queue_mgr_->getMaxQueueSize(), queue_mgr_->getQueueSize());

    // Send another. The send should succeed.
    ASSERT_NO_THROW(sender_->sendRequest(send_ncr));
403
    EXPECT_NO_THROW(io_service_->run_one());
404 405 406

    // Now execute the receive which should not throw but should move us
    // to STOPPED_QUEUE_FULL state.
407
    EXPECT_NO_THROW(io_service_->run_one());
408 409
    EXPECT_EQ(D2QueueMgr::STOPPED_QUEUE_FULL, queue_mgr_->getMgrState());

410
    // Verify queue size did not increase beyond max.
411 412 413
    EXPECT_EQ(VALID_MSG_CNT, queue_mgr_->getQueueSize());

    // Verify that setting max queue size to a value less than current size of
414
    // the queue is not allowed.
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
    EXPECT_THROW(queue_mgr_->setMaxQueueSize(VALID_MSG_CNT-1), D2QueueMgrError);
    EXPECT_EQ(VALID_MSG_CNT, queue_mgr_->getQueueSize());

    // Verify that we can re-enter RUNNING from STOPPED_QUEUE_FULL.
    EXPECT_NO_THROW(queue_mgr_->startListening());
    EXPECT_EQ(D2QueueMgr::RUNNING, queue_mgr_->getMgrState());

    // Verify that the queue contents were preserved.
    EXPECT_EQ(queue_mgr_->getMaxQueueSize(), queue_mgr_->getQueueSize());

    // Verify that clearQueue works.
    EXPECT_NO_THROW(queue_mgr_->clearQueue());
    EXPECT_EQ(0, queue_mgr_->getQueueSize());


    // Verify that we can again receive requests.
    // Send should be fine.
    ASSERT_NO_THROW(sender_->sendRequest(send_ncr));
433
    EXPECT_NO_THROW(io_service_->run_one());
434 435

    // Receive should succeed.
436
    EXPECT_NO_THROW(io_service_->run_one());
437 438 439 440
    EXPECT_EQ(1, queue_mgr_->getQueueSize());
}

} // end of anonymous namespace