io_fetch.cc 9.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.

#include <config.h>

#include <unistd.h>             // for some IPC/network system calls
#include <sys/socket.h>
#include <netinet/in.h>

#include <boost/bind.hpp>

#include <dns/message.h>
#include <dns/messagerenderer.h>
#include <dns/opcode.h>
#include <dns/rcode.h>
Stephen Morris's avatar
Stephen Morris committed
27
#include <log/dummylog.h>
28
#include <log/logger.h>
29

Stephen Morris's avatar
Stephen Morris committed
30
#include <asio.hpp>
31 32 33
#include <asiolink/asiodef.h>
#include <asiolink/io_address.h>
#include <asiolink/io_endpoint.h>
Stephen Morris's avatar
Stephen Morris committed
34
#include <asiolink/io_fetch.h>
35 36 37

using namespace asio;
using namespace isc::dns;
Stephen Morris's avatar
Stephen Morris committed
38 39
using namespace isc::log;
using namespace std;
40 41 42

namespace asiolink {

43 44 45 46
/// Use the ASIO logger

isc::log::Logger logger("asio");

47
/// IOFetch Constructor - just initialize the private data
Stephen Morris's avatar
Stephen Morris committed
48

49
IOFetch::IOFetch(Protocol protocol, IOService& service,
Stephen Morris's avatar
Stephen Morris committed
50 51 52 53 54
    const isc::dns::Question& question, const IOAddress& address, uint16_t port,
    isc::dns::OutputBufferPtr& buff, Callback* cb, int wait)
    :
    data_(new IOFetch::IOFetchData(protocol, service, question, address,
        port, buff, cb, wait))
55 56 57 58 59
{
}

/// The function operator is implemented with the "stackless coroutine"
/// pattern; see internal/coroutine.h for details.
Stephen Morris's avatar
Stephen Morris committed
60

61 62
void
IOFetch::operator()(error_code ec, size_t length) {
63 64 65 66
    if (data_->stopped) {
        return;
    } else if (ec) {
        logIOFailure(ec);
67 68 69 70
        return;
    }

    CORO_REENTER (this) {
Stephen Morris's avatar
Stephen Morris committed
71

72 73 74 75 76 77 78 79 80 81 82 83 84 85
        /// Generate the upstream query and render it to wire format
        /// This is done in a different scope to allow inline variable
        /// declarations.
        {
            Message msg(Message::RENDER);
            
            // TODO: replace with boost::random or some other suitable PRNG
            msg.setQid(0);
            msg.setOpcode(Opcode::QUERY());
            msg.setRcode(Rcode::NOERROR());
            msg.setHeaderFlag(Message::HEADERFLAG_RD);
            msg.addQuestion(data_->question);
            MessageRenderer renderer(*data_->msgbuf);
            msg.toWire(renderer);
86 87

            // As this is a new fetch, clear the amount of data received
Stephen Morris's avatar
Stephen Morris committed
88
            data_->cumulative = 0;
89

90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
            dlog("Sending " + msg.toText() + " to " +
                data_->remote->getAddress().toText());
        }


        // If we timeout, we stop, which will shutdown everything and
        // cancel all other attempts to run inside the coroutine
        if (data_->timeout != -1) {
            data_->timer.expires_from_now(boost::posix_time::milliseconds(
                data_->timeout));
            data_->timer.async_wait(boost::bind(&IOFetch::stop, *this,
                TIME_OUT));
        }

        // Open a connection to the target system.  For speed, if the operation
105
        // was completed synchronously (i.e. UDP operation) we bypass the yield.
Stephen Morris's avatar
Stephen Morris committed
106
        if (data_->socket->open(data_->remote.get(), *this)) {
107
            data_->origin = OPEN;
108 109 110
            CORO_YIELD;
        }

Stephen Morris's avatar
Stephen Morris committed
111
        // Begin an asynchronous send, and then yield.  When the send completes
112
        // send completes, we will resume immediately after this point.
113
        data_->origin = SEND;
Stephen Morris's avatar
Stephen Morris committed
114
        CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
115 116
            data_->msgbuf->getLength(), data_->remote.get(), *this);

Stephen Morris's avatar
Stephen Morris committed
117 118 119 120 121 122 123
        // Now receive the response.  Since TCP may not receive the entire
        // message in one operation, we need to loop until we have received
        // it. (This can't be done within the asyncReceive() method because
        // each I/O operation will be done asynchronously and between each one
        // we need to yield ... and we *really* don't want to set up another
        // coroutine within that method.)  So after each receive (and yield),
        // we check if the operation is complete and if not, loop to read again.
124
        data_->origin = RECEIVE;
Stephen Morris's avatar
Stephen Morris committed
125 126 127 128 129 130 131 132
        do {
            CORO_YIELD data_->socket->asyncReceive(data_->data.get(),
                static_cast<size_t>(MAX_LENGTH), data_->cumulative,
                data_->remote.get(), *this);
        } while (!data_->socket->receiveComplete(data_->data.get(), length,
            data_->cumulative));

        // The message is not rendered yet, so we can't print it easily
133 134 135 136 137 138 139 140 141
        dlog("Received response from " + data_->remote->getAddress().toText());

        /// Copy the answer into the response buffer.  (TODO: If the
        /// OutputBuffer object were made to meet the requirements of
        /// a MutableBufferSequence, then it could be written to directly
        /// by async_receive_from() and this additional copy step would
        /// be unnecessary.)
        data_->buffer->writeData(data_->data.get(), length);

142
        // Finished with this socket, so close it.
143
        data_->origin = CLOSE;
144 145
        data_->socket->close();

146 147 148 149 150 151 152 153 154 155 156 157
        /// We are done
        stop(SUCCESS);
    }
}

// Function that stops the coroutine sequence.  It is called either when the
// query finishes or when the timer times out.  Either way, it sets the
// "stopped_" flag and cancels anything that is in progress.
//
// As the function may be entered multiple times as things wind down, the
// stopped_ flag checks if stop() has already been called.  If it has,
// subsequent calls are no-ops.
Stephen Morris's avatar
Stephen Morris committed
158

159 160
void
IOFetch::stop(Result result) {
161

162
    if (!data_->stopped) {
163 164 165 166 167 168 169 170 171 172 173

        // Mark the fetch as stopped to prevent other completion callbacks
        // (invoked because of the calls to cancel()) from executing the
        // cancel calls again.
        //
        // In a single threaded environment, the callbacks won't be invoked
        // until this one completes. In a multi-threaded environment, they may
        // well be, in which case the testing (and setting) of the stopped_
        // variable should be done inside a mutex (and the stopped_ variable
        // declared as "volatile").
        //
174 175 176 177
        // The numeric arguments indicate the debug level, with the lower
        // numbers indicating the most important information.  The relative
        // values are somewhat arbitrary.
        //
178
        // Although Logger::debug checks the debug flag internally, doing it
179 180 181
        // below before calling Logger::debug avoids the overhead of a string
        // conversion in the common paths and in the common case when debug is
        // not enabled.
182
        //
183 184 185
        // TODO: Update testing of stopped_ if threads are used.
        data_->stopped = true;

186 187
        switch (result) {
            case TIME_OUT:
188 189 190 191
                if (logger.isDebugEnabled(1)) {
                    logger.debug(1, ASIO_RECVTMO,
                                 data_->remote->getAddress().toText().c_str());
                }
192 193
                break;

194
            case SUCCESS:
195
                if (logger.isDebugEnabled(50)) {
196
                    logger.debug(50, ASIO_FETCHCOMP,
197 198
                                 data_->remote->getAddress().toText().c_str());
                }
199 200
                break;

201 202 203 204 205 206 207 208
            case STOPPED:
                // Fetch has been stopped for some other reason.  This is
                // allowed but as it is unusual it is logged, but with a lower
                // debug level than a timeout (which is totally normal).
                logger.debug(10, ASIO_FETCHSTOP,
                             data_->remote->getAddress().toText().c_str());
                break;

209
            default:
210 211
                logger.error(ASIO_UNKRESULT, static_cast<int>(result),
                             data_->remote->getAddress().toText().c_str());
212 213
        }

Stephen Morris's avatar
Stephen Morris committed
214 215 216 217 218 219 220 221
        // Stop requested, cancel and I/O's on the socket and shut it down,
        // and cancel the timer.
        data_->socket->cancel();
        data_->socket->close();

        data_->timer.cancel();

        // Execute the I/O completion callback (if present).
222
        if (data_->callback) {
Stephen Morris's avatar
Stephen Morris committed
223
            (*(data_->callback))(result);
224
        }
Stephen Morris's avatar
Stephen Morris committed
225 226

        // Mark that stop() has now been called.
227

228 229 230
    }
}

231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
// Log an error - called on I/O failure

void IOFetch::logIOFailure(asio::error_code& ec) {

    // Get information that will be in all messages
    static const char* PROTOCOL[2] = {"TCP", "UDP"};
    const char* prot = (data_->remote->getProtocol() == IPPROTO_TCP) ?
                        PROTOCOL[0] : PROTOCOL[1];

    int errcode = ec.value();

    std::string str_address = data_->remote->getAddress().toText();
    const char* address = str_address.c_str();

    switch (data_->origin) {
    case OPEN:
        logger.error(ASIO_OPENSOCK, errcode, prot, address);
        break;

    case SEND:
        logger.error(ASIO_SENDSOCK, errcode, prot, address);
        break;

    case RECEIVE:
        logger.error(ASIO_RECVSOCK, errcode, prot, address);
        break;

    default:
        logger.error(ASIO_UNKORIGIN, errcode, prot, address);
    }
}

263
} // namespace asiolink
Stephen Morris's avatar
Stephen Morris committed
264