d2_process.cc 15.8 KB
Newer Older
1
// Copyright (C) 2013-2016 Internet Systems Consortium, Inc. ("ISC")
2
//
3 4 5
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6

7
#include <config.h>
8
#include <asiolink/asio_wrapper.h>
9
#include <cc/command_interpreter.h>
10
#include <d2/d2_log.h>
11
#include <d2/d2_cfg_mgr.h>
12 13
#include <d2/d2_process.h>

14 15
using namespace isc::process;

16 17 18
namespace isc {
namespace d2 {

19 20
// Setting to 80% for now. This is an arbitrary choice and should probably
// be configurable.
21
const unsigned int D2Process::QUEUE_RESTART_PERCENT =  80;
22

23
D2Process::D2Process(const char* name, const asiolink::IOServicePtr& io_service)
24 25 26 27 28 29 30 31
    : DProcessBase(name, io_service, DCfgMgrBasePtr(new D2CfgMgr())),
     reconf_queue_flag_(false), shutdown_type_(SD_NORMAL) {

    // Instantiate queue manager.  Note that queue manager does not start
    // listening at this point.  That can only occur after configuration has
    // been received.  This means that until we receive the configuration,
    // D2 will neither receive nor process NameChangeRequests.
    // Pass in IOService for NCR IO event processing.
32
    queue_mgr_.reset(new D2QueueMgr(getIoService()));
33 34 35 36 37

    // Instantiate update manager.
    // Pass in both queue manager and configuration manager.
    // Pass in IOService for DNS update transaction IO event processing.
    D2CfgMgrPtr tmp = getD2CfgMgr();
38
    update_mgr_.reset(new D2UpdateMgr(queue_mgr_,  tmp,  getIoService()));
39 40 41 42 43 44
};

void
D2Process::init() {
};

45
void
46
D2Process::run() {
47
    LOG_INFO(d2_logger, DHCP_DDNS_STARTED).arg(VERSION);
48 49
    // Loop forever until we are allowed to shutdown.
    while (!canShutdown()) {
50
        try {
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
            // Check on the state of the request queue. Take any
            // actions necessary regarding it.
            checkQueueStatus();

            // Give update manager a time slice to queue new jobs and
            // process finished ones.
            update_mgr_->sweep();

            // Wait on IO event(s)  - block until one or more of the following
            // has occurred:
            //   a. NCR message has been received
            //   b. Transaction IO has completed
            //   c. Interval timer expired
            //   d. Something stopped IO service (runIO returns 0)
            if (runIO() == 0) {
                // Pretty sure this amounts to an unexpected stop and we
                // should bail out now.  Normal shutdowns do not utilize
                // stopping the IOService.
                isc_throw(DProcessBaseError,
                          "Primary IO service stopped unexpectedly");
            }
72
        } catch (const std::exception& ex) {
73
            LOG_FATAL(d2_logger, DHCP_DDNS_FAILED).arg(ex.what());
74
            isc_throw (DProcessBaseError,
75
                       "Process run method failed: " << ex.what());
76 77 78
        }
    }

79 80 81 82
    // @todo - if queue isn't empty, we may need to persist its contents
    // this might be the place to do it, once there is a persistence mgr.
    // This may also be better in checkQueueStatus.

83
    LOG_DEBUG(d2_logger, DBGLVL_START_SHUT, DHCP_DDNS_RUN_EXIT);
84

85 86
};

87 88 89
size_t
D2Process::runIO() {
    // We want to block until at least one handler is called.  We'll use
90 91
    // boost::asio::io_service directly for two reasons. First off
    // asiolink::IOService::run_one is a void and boost::asio::io_service::stopped
92 93 94 95 96 97
    // is not present in older versions of boost.  We need to know if any
    // handlers ran or if the io_service was stopped.  That latter represents
    // some form of error and the application cannot proceed with a stopped
    // service.  Secondly, asiolink::IOService does not provide the poll
    // method.  This is a handy method which runs all ready handlers without
    // blocking.
98
    asiolink::IOServicePtr& io = getIoService();
99
    boost::asio::io_service& asio_io_service  = io->get_io_service();
100 101 102 103 104 105 106 107 108 109 110

    // Poll runs all that are ready. If none are ready it returns immediately
    // with a count of zero.
    size_t cnt = asio_io_service.poll();
    if (!cnt) {
        // Poll ran no handlers either none are ready or the service has been
        // stopped.  Either way, call run_one to wait for a IO event. If the
        // service is stopped it will return immediately with a cnt of zero.
        cnt = asio_io_service.run_one();
    }

111
    return (cnt);
112 113 114
}

bool
115
D2Process::canShutdown() const {
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
    bool all_clear = false;

    // If we have been told to shutdown, find out if we are ready to do so.
    if (shouldShutdown()) {
        switch (shutdown_type_) {
        case SD_NORMAL:
            // For a normal shutdown we need to stop the queue manager but
            // wait until we have finished all the transactions in progress.
            all_clear = (((queue_mgr_->getMgrState() != D2QueueMgr::RUNNING) &&
                          (queue_mgr_->getMgrState() != D2QueueMgr::STOPPING))
                         && (update_mgr_->getTransactionCount() == 0));
            break;

        case SD_DRAIN_FIRST:
            // For a drain first shutdown we need to stop the queue manager but
            // process all of the requests in the receive queue first.
            all_clear = (((queue_mgr_->getMgrState() != D2QueueMgr::RUNNING) &&
                          (queue_mgr_->getMgrState() != D2QueueMgr::STOPPING))
                          && (queue_mgr_->getQueueSize() == 0)
                          && (update_mgr_->getTransactionCount() == 0));
            break;

        case SD_NOW:
            // Get out right now, no niceties.
            all_clear = true;
            break;
142 143 144 145 146

        default:
            // shutdown_type_ is an enum and should only be one of the above.
            // if its getting through to this, something is whacked.
            break;
147 148 149
        }

        if (all_clear) {
150
            LOG_DEBUG(d2_logger, DBGLVL_START_SHUT,
151
                     DHCP_DDNS_CLEARED_FOR_SHUTDOWN)
152 153 154 155 156 157 158 159 160
                     .arg(getShutdownTypeStr(shutdown_type_));
        }
    }

    return (all_clear);
}

isc::data::ConstElementPtr
D2Process::shutdown(isc::data::ConstElementPtr args) {
161
    LOG_DEBUG(d2_logger, DBGLVL_START_SHUT, DHCP_DDNS_SHUTDOWN_COMMAND)
162
              .arg(args ? args->str() : "(no arguments)");
163 164

    // Default shutdown type is normal.
165
    std::string type_str(getShutdownTypeStr(SD_NORMAL));
166 167 168 169 170 171 172
    shutdown_type_ = SD_NORMAL;

    if (args) {
        if ((args->getType() == isc::data::Element::map) &&
            args->contains("type")) {
            type_str = args->get("type")->stringValue();

173
            if (type_str == getShutdownTypeStr(SD_NORMAL)) {
174
                shutdown_type_ = SD_NORMAL;
175
            } else if (type_str == getShutdownTypeStr(SD_DRAIN_FIRST)) {
176
                shutdown_type_ = SD_DRAIN_FIRST;
177
            } else if (type_str == getShutdownTypeStr(SD_NOW)) {
178 179 180 181 182 183 184 185 186 187
                shutdown_type_ = SD_NOW;
            } else {
                setShutdownFlag(false);
                return (isc::config::createAnswer(1, "Invalid Shutdown type: "
                                                  + type_str));
            }
        }
    }

    // Set the base class's shutdown flag.
188
    setShutdownFlag(true);
189 190
    return (isc::config::createAnswer(0, "Shutdown initiated, type is: "
                                      + type_str));
191
}
192

193
isc::data::ConstElementPtr
194
D2Process::configure(isc::data::ConstElementPtr config_set) {
195
    LOG_DEBUG(d2_logger, DBGLVL_TRACE_BASIC,
196
              DHCP_DDNS_CONFIGURE).arg(config_set->str());
197

198 199 200 201 202 203 204
    int rcode = 0;
    isc::data::ConstElementPtr comment;
    isc::data::ConstElementPtr answer = getCfgMgr()->parseConfig(config_set);;
    comment = isc::config::parseAnswer(rcode, answer);

    if (rcode) {
        // Non-zero means we got an invalid configuration, take no further
205
        // action. In integrated mode, this will send a failed response back
206
        // to the configuration backend.
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
        reconf_queue_flag_ = false;
        return (answer);
    }

    // Set the reconf_queue_flag to indicate that we need to reconfigure
    // the queue manager.  Reconfiguring the queue manager may be asynchronous
    // and require one or more events to occur, therefore we set a flag
    // indicating it needs to be done but we cannot do it here.  It must
    // be done over time, while events are being processed.  Remember that
    // the method we are in now is invoked as part of the configuration event
    // callback.  This means you can't wait for events here, you are already
    // in one.
    // (@todo NOTE This could be turned into a bitmask of flags if we find other
    // things that need reconfiguration.  It might also be useful if we
    // did some analysis to decide what if anything we need to do.)
    reconf_queue_flag_ = true;

    // If we are here, configuration was valid, at least it parsed correctly
    // and therefore contained no invalid values.
    // Return the success answer from above.
    return (answer);
}

void
D2Process::checkQueueStatus() {
    switch (queue_mgr_->getMgrState()){
    case D2QueueMgr::RUNNING:
        if (reconf_queue_flag_ || shouldShutdown()) {
            // If we need to reconfigure the queue manager or we have been
            // told to shutdown, then stop listening first.  Stopping entails
            // canceling active listening which may generate an IO event, so
            // instigate the stop and get out.
            try {
240
                LOG_DEBUG(d2_logger, DBGLVL_START_SHUT,
241
                          DHCP_DDNS_QUEUE_MGR_STOPPING)
242
                         .arg(reconf_queue_flag_ ? "reconfiguration"
243
                                                 : "shutdown");
244 245
                queue_mgr_->stopListening();
            } catch (const isc::Exception& ex) {
Andrei Pavel's avatar
Andrei Pavel committed
246
                // It is very unlikely that we would experience an error
247
                // here, but theoretically possible.
248
                LOG_ERROR(d2_logger, DHCP_DDNS_QUEUE_MGR_STOP_ERROR)
249 250 251 252 253 254 255 256 257
                          .arg(ex.what());
            }
        }
        break;

    case D2QueueMgr::STOPPED_QUEUE_FULL: {
            // Resume receiving once the queue has decreased by twenty
            // percent.  This is an arbitrary choice. @todo this value should
            // probably be configurable.
258 259
            size_t threshold = (((queue_mgr_->getMaxQueueSize()
                                * QUEUE_RESTART_PERCENT)) / 100);
260
            if (queue_mgr_->getQueueSize() <= threshold) {
261
                LOG_INFO (d2_logger, DHCP_DDNS_QUEUE_MGR_RESUMING)
262 263 264 265
                          .arg(threshold).arg(queue_mgr_->getMaxQueueSize());
                try {
                    queue_mgr_->startListening();
                } catch (const isc::Exception& ex) {
266
                    LOG_ERROR(d2_logger, DHCP_DDNS_QUEUE_MGR_RESUME_ERROR)
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
                              .arg(ex.what());
                }
            }

        break;
        }

    case D2QueueMgr::STOPPED_RECV_ERROR:
        // If the receive error is not due to some fallout from shutting
        // down then we will attempt to recover by reconfiguring the listener.
        // This will close and destruct the current listener and make a new
        // one with new resources.
        // @todo This may need a safety valve such as retry count or a timer
        // to keep from endlessly retrying over and over, with little time
        // in between.
        if (!shouldShutdown()) {
283
            LOG_INFO (d2_logger, DHCP_DDNS_QUEUE_MGR_RECOVERING);
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
            reconfigureQueueMgr();
        }
        break;

    case D2QueueMgr::STOPPING:
        // We are waiting for IO to cancel, so this is a NOP.
        // @todo Possible timer for self-defense?  We could conceivably
        // get into a condition where we never get the event, which would
        // leave us stuck in stopping.  This is hugely unlikely but possible?
        break;

    default:
        // If the reconfigure flag is set, then we are in a state now where
        // we can do the reconfigure. In other words, we aren't RUNNING or
        // STOPPING.
        if (reconf_queue_flag_) {
300
            LOG_DEBUG(d2_logger, DBGLVL_TRACE_BASIC,
301
                      DHCP_DDNS_QUEUE_MGR_RECONFIGURING);
302 303 304 305 306 307 308 309 310 311 312 313
            reconfigureQueueMgr();
        }
        break;
    }
}

void
D2Process::reconfigureQueueMgr() {
    // Set reconfigure flag to false.  We are only here because we have
    // a valid configuration to work with so if we fail below, it will be
    // an operational issue, such as a busy IP address. That will leave
    // queue manager in INITTED state, which is fine.
314
    // What we don't want is to continually attempt to reconfigure so set
315 316 317 318 319 320 321 322 323 324
    // the flag false now.
    // @todo This method assumes only 1 type of listener.  This will change
    // to support at least a TCP version, possibly some form of RDBMS listener
    // as well.
    reconf_queue_flag_ = false;
    try {
        // Wipe out the current listener.
        queue_mgr_->removeListener();

        // Get the configuration parameters that affect Queue Manager.
325
        const D2ParamsPtr& d2_params = getD2CfgMgr()->getD2Params();
326 327 328

        // Warn the user if the server address is not the loopback.
        /// @todo Remove this once we provide a secure mechanism.
329
        std::string ip_address =  d2_params->getIpAddress().toText();
330
        if (ip_address != "127.0.0.1" && ip_address != "::1") {
331
            LOG_WARN(d2_logger, DHCP_DDNS_NOT_ON_LOOPBACK).arg(ip_address);
332 333
        }

334
        // Instantiate the listener.
335 336 337 338 339 340 341 342 343 344 345
        if (d2_params->getNcrProtocol() == dhcp_ddns::NCR_UDP) {
            queue_mgr_->initUDPListener(d2_params->getIpAddress(),
                                        d2_params->getPort(),
                                        d2_params->getNcrFormat(), true);
        } else {
            /// @todo Add TCP/IP once it's supported
            // We should never get this far but if we do deal with it.
            isc_throw(DProcessBaseError, "Unsupported NCR listener protocol:"
                      << dhcp_ddns::ncrProtocolToString(d2_params->
                                                        getNcrProtocol()));
        }
346 347 348 349 350 351

        // Now start it. This assumes that starting is a synchronous,
        // blocking call that executes quickly.  @todo Should that change then
        // we will have to expand the state model to accommodate this.
        queue_mgr_->startListening();
    } catch (const isc::Exception& ex) {
352 353
        // Queue manager failed to initialize and therefore not listening.
        // This is most likely due to an unavailable IP address or port,
354
        // which is a configuration issue.
355
        LOG_ERROR(d2_logger, DHCP_DDNS_QUEUE_MGR_START_ERROR).arg(ex.what());
356
    }
357 358
}

359
isc::data::ConstElementPtr
360
D2Process::command(const std::string& command,
361
                   isc::data::ConstElementPtr args) {
362
    // @todo This is the initial implementation.  If and when D2 is extended
363
    // to support its own commands, this implementation must change. Otherwise
364
    // it should reject all commands as it does now.
365
    LOG_DEBUG(d2_logger, DBGLVL_TRACE_BASIC, DHCP_DDNS_COMMAND)
366
        .arg(command).arg(args ? args->str() : "(no args)");
367

368
    return (isc::config::createAnswer(COMMAND_INVALID, "Unrecognized command: "
369
                                      + command));
370 371 372 373 374
}

D2Process::~D2Process() {
};

375 376 377 378 379 380 381 382
D2CfgMgrPtr
D2Process::getD2CfgMgr() {
    // The base class gives a base class pointer to our configuration manager.
    // Since we are D2, and we need D2 specific extensions, we need a pointer
    // to D2CfgMgr for some things.
    return (boost::dynamic_pointer_cast<D2CfgMgr>(getCfgMgr()));
}

383
const char* D2Process::getShutdownTypeStr(const ShutdownType& type) {
384
    const char* str = "invalid";
385 386
    switch (type) {
    case SD_NORMAL:
387
        str = "normal";
388 389
        break;
    case SD_DRAIN_FIRST:
390
        str = "drain_first";
391 392
        break;
    case SD_NOW:
393
        str = "now";
394
        break;
395 396
    default:
        break;
397 398 399 400 401
    }

    return (str);
}

402
}; // namespace isc::d2
403
}; // namespace isc