d2_process.cc 16.3 KB
Newer Older
1
// Copyright (C) 2013-2015  Internet Systems Consortium, Inc. ("ISC")
2 3 4 5 6 7 8 9 10 11 12 13 14
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.

15 16
#include <config.h>

17
#include <cc/command_interpreter.h>
18
#include <d2/d2_log.h>
19
#include <d2/d2_cfg_mgr.h>
20 21
#include <d2/d2_process.h>

22
#include <asio.hpp>
23 24 25 26

namespace isc {
namespace d2 {

27 28
// Setting to 80% for now. This is an arbitrary choice and should probably
// be configurable.
29
const unsigned int D2Process::QUEUE_RESTART_PERCENT =  80;
30

31
D2Process::D2Process(const char* name, const asiolink::IOServicePtr& io_service)
32 33 34 35 36 37 38 39
    : DProcessBase(name, io_service, DCfgMgrBasePtr(new D2CfgMgr())),
     reconf_queue_flag_(false), shutdown_type_(SD_NORMAL) {

    // Instantiate queue manager.  Note that queue manager does not start
    // listening at this point.  That can only occur after configuration has
    // been received.  This means that until we receive the configuration,
    // D2 will neither receive nor process NameChangeRequests.
    // Pass in IOService for NCR IO event processing.
40
    queue_mgr_.reset(new D2QueueMgr(getIoService()));
41 42 43 44 45

    // Instantiate update manager.
    // Pass in both queue manager and configuration manager.
    // Pass in IOService for DNS update transaction IO event processing.
    D2CfgMgrPtr tmp = getD2CfgMgr();
46
    update_mgr_.reset(new D2UpdateMgr(queue_mgr_,  tmp,  getIoService()));
47 48 49 50 51 52
};

void
D2Process::init() {
};

53
void
54
D2Process::run() {
55
    LOG_INFO(dctl_logger, DHCP_DDNS_STARTED).arg(VERSION);
56 57
    // Loop forever until we are allowed to shutdown.
    while (!canShutdown()) {
58
        try {
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
            // Check on the state of the request queue. Take any
            // actions necessary regarding it.
            checkQueueStatus();

            // Give update manager a time slice to queue new jobs and
            // process finished ones.
            update_mgr_->sweep();

            // Wait on IO event(s)  - block until one or more of the following
            // has occurred:
            //   a. NCR message has been received
            //   b. Transaction IO has completed
            //   c. Interval timer expired
            //   d. Something stopped IO service (runIO returns 0)
            if (runIO() == 0) {
                // Pretty sure this amounts to an unexpected stop and we
                // should bail out now.  Normal shutdowns do not utilize
                // stopping the IOService.
                isc_throw(DProcessBaseError,
                          "Primary IO service stopped unexpectedly");
            }
80
        } catch (const std::exception& ex) {
81
            LOG_FATAL(dctl_logger, DHCP_DDNS_FAILED).arg(ex.what());
82
            isc_throw (DProcessBaseError,
83
                       "Process run method failed: " << ex.what());
84 85 86
        }
    }

87 88 89 90
    // @todo - if queue isn't empty, we may need to persist its contents
    // this might be the place to do it, once there is a persistence mgr.
    // This may also be better in checkQueueStatus.

91
    LOG_DEBUG(dctl_logger, DBGLVL_START_SHUT, DHCP_DDNS_RUN_EXIT);
92

93 94
};

95 96 97 98 99 100 101 102 103 104 105
size_t
D2Process::runIO() {
    // We want to block until at least one handler is called.  We'll use
    // asio::io_service directly for two reasons. First off
    // asiolink::IOService::run_one is a void and asio::io_service::stopped
    // is not present in older versions of boost.  We need to know if any
    // handlers ran or if the io_service was stopped.  That latter represents
    // some form of error and the application cannot proceed with a stopped
    // service.  Secondly, asiolink::IOService does not provide the poll
    // method.  This is a handy method which runs all ready handlers without
    // blocking.
106
    asiolink::IOServicePtr& io = getIoService();
107 108 109 110 111 112 113 114 115 116 117 118
    asio::io_service& asio_io_service  = io->get_io_service();

    // Poll runs all that are ready. If none are ready it returns immediately
    // with a count of zero.
    size_t cnt = asio_io_service.poll();
    if (!cnt) {
        // Poll ran no handlers either none are ready or the service has been
        // stopped.  Either way, call run_one to wait for a IO event. If the
        // service is stopped it will return immediately with a cnt of zero.
        cnt = asio_io_service.run_one();
    }

119
    return (cnt);
120 121 122
}

bool
123
D2Process::canShutdown() const {
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
    bool all_clear = false;

    // If we have been told to shutdown, find out if we are ready to do so.
    if (shouldShutdown()) {
        switch (shutdown_type_) {
        case SD_NORMAL:
            // For a normal shutdown we need to stop the queue manager but
            // wait until we have finished all the transactions in progress.
            all_clear = (((queue_mgr_->getMgrState() != D2QueueMgr::RUNNING) &&
                          (queue_mgr_->getMgrState() != D2QueueMgr::STOPPING))
                         && (update_mgr_->getTransactionCount() == 0));
            break;

        case SD_DRAIN_FIRST:
            // For a drain first shutdown we need to stop the queue manager but
            // process all of the requests in the receive queue first.
            all_clear = (((queue_mgr_->getMgrState() != D2QueueMgr::RUNNING) &&
                          (queue_mgr_->getMgrState() != D2QueueMgr::STOPPING))
                          && (queue_mgr_->getQueueSize() == 0)
                          && (update_mgr_->getTransactionCount() == 0));
            break;

        case SD_NOW:
            // Get out right now, no niceties.
            all_clear = true;
            break;
150 151 152 153 154

        default:
            // shutdown_type_ is an enum and should only be one of the above.
            // if its getting through to this, something is whacked.
            break;
155 156 157
        }

        if (all_clear) {
158 159
            LOG_DEBUG(dctl_logger, DBGLVL_START_SHUT,
                     DHCP_DDNS_CLEARED_FOR_SHUTDOWN)
160 161 162 163 164 165 166 167 168
                     .arg(getShutdownTypeStr(shutdown_type_));
        }
    }

    return (all_clear);
}

isc::data::ConstElementPtr
D2Process::shutdown(isc::data::ConstElementPtr args) {
169 170
    LOG_DEBUG(dctl_logger, DBGLVL_START_SHUT, DHCP_DDNS_SHUTDOWN_COMMAND)
              .arg(args ? args->str() : "(no arguments)");
171 172

    // Default shutdown type is normal.
173
    std::string type_str(getShutdownTypeStr(SD_NORMAL));
174 175 176 177 178 179 180
    shutdown_type_ = SD_NORMAL;

    if (args) {
        if ((args->getType() == isc::data::Element::map) &&
            args->contains("type")) {
            type_str = args->get("type")->stringValue();

181
            if (type_str == getShutdownTypeStr(SD_NORMAL)) {
182
                shutdown_type_ = SD_NORMAL;
183
            } else if (type_str == getShutdownTypeStr(SD_DRAIN_FIRST)) {
184
                shutdown_type_ = SD_DRAIN_FIRST;
185
            } else if (type_str == getShutdownTypeStr(SD_NOW)) {
186 187 188 189 190 191 192 193 194 195
                shutdown_type_ = SD_NOW;
            } else {
                setShutdownFlag(false);
                return (isc::config::createAnswer(1, "Invalid Shutdown type: "
                                                  + type_str));
            }
        }
    }

    // Set the base class's shutdown flag.
196
    setShutdownFlag(true);
197 198
    return (isc::config::createAnswer(0, "Shutdown initiated, type is: "
                                      + type_str));
199
}
200

201
isc::data::ConstElementPtr
202
D2Process::configure(isc::data::ConstElementPtr config_set) {
203 204
    LOG_DEBUG(dctl_logger, DBGLVL_TRACE_BASIC,
              DHCP_DDNS_CONFIGURE).arg(config_set->str());
205

206 207 208 209 210 211 212
    int rcode = 0;
    isc::data::ConstElementPtr comment;
    isc::data::ConstElementPtr answer = getCfgMgr()->parseConfig(config_set);;
    comment = isc::config::parseAnswer(rcode, answer);

    if (rcode) {
        // Non-zero means we got an invalid configuration, take no further
213
        // action. In integrated mode, this will send a failed response back
214
        // to the configuration backend.
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
        reconf_queue_flag_ = false;
        return (answer);
    }

    // Set the reconf_queue_flag to indicate that we need to reconfigure
    // the queue manager.  Reconfiguring the queue manager may be asynchronous
    // and require one or more events to occur, therefore we set a flag
    // indicating it needs to be done but we cannot do it here.  It must
    // be done over time, while events are being processed.  Remember that
    // the method we are in now is invoked as part of the configuration event
    // callback.  This means you can't wait for events here, you are already
    // in one.
    // (@todo NOTE This could be turned into a bitmask of flags if we find other
    // things that need reconfiguration.  It might also be useful if we
    // did some analysis to decide what if anything we need to do.)
    reconf_queue_flag_ = true;

    // If we are here, configuration was valid, at least it parsed correctly
    // and therefore contained no invalid values.
    // Return the success answer from above.
    return (answer);
}

void
D2Process::checkQueueStatus() {
    switch (queue_mgr_->getMgrState()){
    case D2QueueMgr::RUNNING:
        if (reconf_queue_flag_ || shouldShutdown()) {
            // If we need to reconfigure the queue manager or we have been
            // told to shutdown, then stop listening first.  Stopping entails
            // canceling active listening which may generate an IO event, so
            // instigate the stop and get out.
            try {
248 249
                LOG_DEBUG(dctl_logger, DBGLVL_START_SHUT,
                          DHCP_DDNS_QUEUE_MGR_STOPPING)
250
                         .arg(reconf_queue_flag_ ? "reconfiguration"
251
                                                 : "shutdown");
252 253 254
                queue_mgr_->stopListening();
            } catch (const isc::Exception& ex) {
                // It is very unlikey that we would experience an error
255
                // here, but theoretically possible.
256 257 258 259 260 261 262 263 264 265
                LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_STOP_ERROR)
                          .arg(ex.what());
            }
        }
        break;

    case D2QueueMgr::STOPPED_QUEUE_FULL: {
            // Resume receiving once the queue has decreased by twenty
            // percent.  This is an arbitrary choice. @todo this value should
            // probably be configurable.
266 267
            size_t threshold = (((queue_mgr_->getMaxQueueSize()
                                * QUEUE_RESTART_PERCENT)) / 100);
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
            if (queue_mgr_->getQueueSize() <= threshold) {
                LOG_INFO (dctl_logger, DHCP_DDNS_QUEUE_MGR_RESUMING)
                          .arg(threshold).arg(queue_mgr_->getMaxQueueSize());
                try {
                    queue_mgr_->startListening();
                } catch (const isc::Exception& ex) {
                    LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_RESUME_ERROR)
                              .arg(ex.what());
                }
            }

        break;
        }

    case D2QueueMgr::STOPPED_RECV_ERROR:
        // If the receive error is not due to some fallout from shutting
        // down then we will attempt to recover by reconfiguring the listener.
        // This will close and destruct the current listener and make a new
        // one with new resources.
        // @todo This may need a safety valve such as retry count or a timer
        // to keep from endlessly retrying over and over, with little time
        // in between.
        if (!shouldShutdown()) {
            LOG_INFO (dctl_logger, DHCP_DDNS_QUEUE_MGR_RECOVERING);
            reconfigureQueueMgr();
        }
        break;

    case D2QueueMgr::STOPPING:
        // We are waiting for IO to cancel, so this is a NOP.
        // @todo Possible timer for self-defense?  We could conceivably
        // get into a condition where we never get the event, which would
        // leave us stuck in stopping.  This is hugely unlikely but possible?
        break;

    default:
        // If the reconfigure flag is set, then we are in a state now where
        // we can do the reconfigure. In other words, we aren't RUNNING or
        // STOPPING.
        if (reconf_queue_flag_) {
308 309
            LOG_DEBUG(dctl_logger, DBGLVL_TRACE_BASIC,
                      DHCP_DDNS_QUEUE_MGR_RECONFIGURING);
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
            reconfigureQueueMgr();
        }
        break;
    }
}

void
D2Process::reconfigureQueueMgr() {
    // Set reconfigure flag to false.  We are only here because we have
    // a valid configuration to work with so if we fail below, it will be
    // an operational issue, such as a busy IP address. That will leave
    // queue manager in INITTED state, which is fine.
    // What we dont' want is to continually attempt to reconfigure so set
    // the flag false now.
    // @todo This method assumes only 1 type of listener.  This will change
    // to support at least a TCP version, possibly some form of RDBMS listener
    // as well.
    reconf_queue_flag_ = false;
    try {
        // Wipe out the current listener.
        queue_mgr_->removeListener();

        // Get the configuration parameters that affect Queue Manager.
333
        const D2ParamsPtr& d2_params = getD2CfgMgr()->getD2Params();
334 335 336

        // Warn the user if the server address is not the loopback.
        /// @todo Remove this once we provide a secure mechanism.
337
        std::string ip_address =  d2_params->getIpAddress().toText();
338 339 340 341
        if (ip_address != "127.0.0.1" && ip_address != "::1") {
            LOG_WARN(dctl_logger, DHCP_DDNS_NOT_ON_LOOPBACK).arg(ip_address);
        }

342
        // Instantiate the listener.
343 344 345 346 347 348 349 350 351 352 353
        if (d2_params->getNcrProtocol() == dhcp_ddns::NCR_UDP) {
            queue_mgr_->initUDPListener(d2_params->getIpAddress(),
                                        d2_params->getPort(),
                                        d2_params->getNcrFormat(), true);
        } else {
            /// @todo Add TCP/IP once it's supported
            // We should never get this far but if we do deal with it.
            isc_throw(DProcessBaseError, "Unsupported NCR listener protocol:"
                      << dhcp_ddns::ncrProtocolToString(d2_params->
                                                        getNcrProtocol()));
        }
354 355 356 357 358 359

        // Now start it. This assumes that starting is a synchronous,
        // blocking call that executes quickly.  @todo Should that change then
        // we will have to expand the state model to accommodate this.
        queue_mgr_->startListening();
    } catch (const isc::Exception& ex) {
360 361
        // Queue manager failed to initialize and therefore not listening.
        // This is most likely due to an unavailable IP address or port,
362 363 364
        // which is a configuration issue.
        LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_START_ERROR).arg(ex.what());
    }
365 366
}

367
isc::data::ConstElementPtr
368
D2Process::command(const std::string& command,
369
                   isc::data::ConstElementPtr args) {
370
    // @todo This is the initial implementation.  If and when D2 is extended
371
    // to support its own commands, this implementation must change. Otherwise
372
    // it should reject all commands as it does now.
373 374
    LOG_DEBUG(dctl_logger, DBGLVL_TRACE_BASIC, DHCP_DDNS_COMMAND)
        .arg(command).arg(args ? args->str() : "(no args)");
375

376
    return (isc::config::createAnswer(COMMAND_INVALID, "Unrecognized command: "
377
                                      + command));
378 379 380 381 382
}

D2Process::~D2Process() {
};

383 384 385 386 387 388 389 390
D2CfgMgrPtr
D2Process::getD2CfgMgr() {
    // The base class gives a base class pointer to our configuration manager.
    // Since we are D2, and we need D2 specific extensions, we need a pointer
    // to D2CfgMgr for some things.
    return (boost::dynamic_pointer_cast<D2CfgMgr>(getCfgMgr()));
}

391
const char* D2Process::getShutdownTypeStr(const ShutdownType& type) {
392
    const char* str = "invalid";
393 394
    switch (type) {
    case SD_NORMAL:
395
        str = "normal";
396 397
        break;
    case SD_DRAIN_FIRST:
398
        str = "drain_first";
399 400
        break;
    case SD_NOW:
401
        str = "now";
402
        break;
403 404
    default:
        break;
405 406 407 408 409
    }

    return (str);
}

410
}; // namespace isc::d2
411
}; // namespace isc