d2_queue_mgr.cc 8.64 KB
Newer Older
1
// Copyright (C) 2013-2017 Internet Systems Consortium, Inc. ("ISC")
2
//
3 4 5
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6

7
#include <config.h>
8 9 10 11 12 13 14 15 16 17
#include <d2/d2_log.h>
#include <d2/d2_queue_mgr.h>
#include <dhcp_ddns/ncr_udp.h>

namespace isc {
namespace d2 {

// Makes constant visible to Google test macros.
const size_t D2QueueMgr::MAX_QUEUE_DEFAULT;

18
D2QueueMgr::D2QueueMgr(asiolink::IOServicePtr& io_service, const size_t max_queue_size)
19
    : io_service_(io_service), max_queue_size_(max_queue_size),
20
      mgr_state_(NOT_INITTED), target_stop_state_(NOT_INITTED) {
21 22 23 24
    if (!io_service_) {
        isc_throw(D2QueueMgrError, "IOServicePtr cannot be null");
    }

25 26 27 28 29 30 31 32 33 34
    // Use setter to do validation.
    setMaxQueueSize(max_queue_size);
}

D2QueueMgr::~D2QueueMgr() {
}

void
D2QueueMgr::operator()(const dhcp_ddns::NameChangeListener::Result result,
                       dhcp_ddns::NameChangeRequestPtr& ncr) {
35 36 37 38 39 40 41 42 43 44 45 46 47 48
    try {
        // Note that error conditions must be handled here without throwing
        // exceptions. Remember this is the application level "link" in the
        // callback chain.  Throwing an exception here will "break" the
        // io_service "run" we are operating under.  With that in mind,
        // if we hit a problem, we will stop the listener transition to
        // the appropriate stopped state.  Upper layer(s) must monitor our
        // state as well as our queue size.
        switch (result) {
        case dhcp_ddns::NameChangeListener::SUCCESS:
            // Receive was successful, attempt to queue the request.
            if (getQueueSize() < getMaxQueueSize()) {
                // There's room on the queue, add to the end
                enqueue(ncr);
49 50 51 52 53

                // Log that we got the request
                LOG_DEBUG(dhcp_to_d2_logger, DBGLVL_TRACE_DETAIL_DATA,
                          DHCP_DDNS_QUEUE_MGR_QUEUE_RECEIVE)
                          .arg(ncr->getRequestId());
54 55
                return;
            }
56

57 58 59
            // Queue is full, stop the listener.
            // Note that we can move straight to a STOPPED state as there
            // is no receive in progress.
60
            LOG_ERROR(dhcp_to_d2_logger, DHCP_DDNS_QUEUE_MGR_QUEUE_FULL)
61 62 63
                      .arg(max_queue_size_);
            stopListening(STOPPED_QUEUE_FULL);
            break;
64

65 66 67 68 69 70
        case dhcp_ddns::NameChangeListener::STOPPED:
            if (mgr_state_ == STOPPING) {
                // This is confirmation that the listener has stopped and its
                // callback will not be called again, unless its restarted.
                updateStopState();
            } else {
71
                // We should not get a receive complete status of stopped
72 73 74
                // unless we canceled the read as part of stopping. Therefore
                // this is unexpected so we will treat it as a receive error.
                // This is most likely an unforeseen programmatic issue.
75
                LOG_ERROR(dhcp_to_d2_logger, DHCP_DDNS_QUEUE_MGR_UNEXPECTED_STOP)
76 77 78
                          .arg(mgr_state_);
                stopListening(STOPPED_RECV_ERROR);
            }
79

80
            break;
81

82 83 84 85
        default:
            // Receive failed, stop the listener.
            // Note that we can move straight to a STOPPED state as there
            // is no receive in progress.
86
            LOG_ERROR(dhcp_to_d2_logger, DHCP_DDNS_QUEUE_MGR_RECV_ERROR);
87 88 89 90 91
            stopListening(STOPPED_RECV_ERROR);
            break;
        }
    } catch (const std::exception& ex) {
        // On the outside chance a throw occurs, let's log it and swallow it.
92
        LOG_ERROR(dhcp_to_d2_logger, DHCP_DDNS_QUEUE_MGR_UNEXPECTED_HANDLER_ERROR)
93
                  .arg(ex.what());
94 95 96 97 98
    }
}

void
D2QueueMgr::initUDPListener(const isc::asiolink::IOAddress& ip_address,
99 100 101
                            const uint32_t port,
                            const dhcp_ddns::NameChangeFormat format,
                            const bool reuse_address) {
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132

    if (listener_) {
        isc_throw(D2QueueMgrError,
                  "D2QueueMgr listener is already initialized");
    }

    // Instantiate a UDP listener and set state to INITTED.
    // Note UDP listener constructor does not throw.
    listener_.reset(new dhcp_ddns::
                    NameChangeUDPListener(ip_address, port, format, *this,
                                          reuse_address));
    mgr_state_ = INITTED;
}

void
D2QueueMgr::startListening() {
    // We can't listen if we haven't initialized the listener yet.
    if (!listener_) {
        isc_throw(D2QueueMgrError, "D2QueueMgr "
                  "listener is not initialized, cannot start listening");
    }

    // If we are already listening, we do not want to "reopen" the listener
    // and really we shouldn't be trying.
    if (mgr_state_ == RUNNING) {
        isc_throw(D2QueueMgrError, "D2QueueMgr "
                  "cannot call startListening from the RUNNING state");
    }

    // Instruct the listener to start listening and set state accordingly.
    try {
133
        listener_->startListening(*io_service_);
134 135 136 137 138
        mgr_state_ = RUNNING;
    } catch (const isc::Exception& ex) {
        isc_throw(D2QueueMgrError, "D2QueueMgr listener start failed: "
                  << ex.what());
    }
139

140
    LOG_DEBUG(d2_logger, DBGLVL_START_SHUT, DHCP_DDNS_QUEUE_MGR_STARTED);
141 142 143
}

void
144
D2QueueMgr::stopListening(const State target_stop_state) {
145
    if (listener_) {
146
        // Enforce only valid "stop" states.
147
        // This is purely a programmatic error and should never happen.
148 149 150 151 152 153 154 155
        if (target_stop_state != STOPPED &&
            target_stop_state != STOPPED_QUEUE_FULL &&
            target_stop_state != STOPPED_RECV_ERROR) {
            isc_throw(D2QueueMgrError,
                      "D2QueueMgr invalid value for stop state: "
                      << target_stop_state);
        }

156
        // Remember the state we want to achieve.
157 158 159 160 161 162 163 164 165 166 167 168
        target_stop_state_ = target_stop_state;

        // Instruct the listener to stop.  If the listener reports that  it
        // has IO pending, then we transition to STOPPING to wait for the
        // cancellation event.  Otherwise, we can move directly to the targeted
        // state.
        listener_->stopListening();
        if (listener_->isIoPending()) {
            mgr_state_ = STOPPING;
        } else {
            updateStopState();
        }
169
    }
170
}
171

172 173 174
void
D2QueueMgr::updateStopState() {
    mgr_state_ = target_stop_state_;
175
    LOG_DEBUG(d2_logger, DBGLVL_TRACE_BASIC, DHCP_DDNS_QUEUE_MGR_STOPPED);
176 177
}

178

179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
void
D2QueueMgr::removeListener() {
    // Force our managing layer(s) to stop us properly first.
    if (mgr_state_ == RUNNING) {
        isc_throw(D2QueueMgrError,
                  "D2QueueMgr cannot delete listener while state is RUNNING");
    }

    listener_.reset();
    mgr_state_ = NOT_INITTED;
}

const dhcp_ddns::NameChangeRequestPtr&
D2QueueMgr::peek() const {
    if (getQueueSize() ==  0) {
194
        isc_throw(D2QueueMgrQueueEmpty,
195 196 197 198 199 200
                  "D2QueueMgr peek attempted on an empty queue");
    }

    return (ncr_queue_.front());
}

201
const dhcp_ddns::NameChangeRequestPtr&
202
D2QueueMgr::peekAt(const size_t index) const {
203 204
    if (index >= getQueueSize()) {
        isc_throw(D2QueueMgrInvalidIndex,
205 206
                  "D2QueueMgr peek beyond end of queue attempted"
                  << " index: " << index << " queue size: " << getQueueSize());
207 208 209 210 211 212
    }

    return (ncr_queue_.at(index));
}

void
213
D2QueueMgr::dequeueAt(const size_t index) {
214 215
    if (index >= getQueueSize()) {
        isc_throw(D2QueueMgrInvalidIndex,
216 217
                  "D2QueueMgr dequeue beyond end of queue attempted"
                  << " index: " << index << " queue size: " << getQueueSize());
218 219 220 221 222 223 224
    }

    RequestQueue::iterator pos = ncr_queue_.begin() + index;
    ncr_queue_.erase(pos);
}


225 226 227
void
D2QueueMgr::dequeue() {
    if (getQueueSize() ==  0) {
228
        isc_throw(D2QueueMgrQueueEmpty,
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
                  "D2QueueMgr dequeue attempted on an empty queue");
    }

    ncr_queue_.pop_front();
}

void
D2QueueMgr::enqueue(dhcp_ddns::NameChangeRequestPtr& ncr) {
    ncr_queue_.push_back(ncr);
}

void
D2QueueMgr::clearQueue() {
    ncr_queue_.clear();
}

void
D2QueueMgr::setMaxQueueSize(const size_t new_queue_max) {
    if (new_queue_max < 1) {
        isc_throw(D2QueueMgrError,
                  "D2QueueMgr maximum queue size must be greater than zero");
    }

    if (new_queue_max < getQueueSize()) {
        isc_throw(D2QueueMgrError, "D2QueueMgr maximum queue size value cannot"
                  " be less than the current queue size :" << getQueueSize());
    }

    max_queue_size_ = new_queue_max;
}

} // namespace isc::d2
} // namespace isc