io_fetch.cc 13.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.

#include <config.h>

#include <unistd.h>             // for some IPC/network system calls
#include <sys/socket.h>
#include <netinet/in.h>

#include <boost/bind.hpp>
22 23 24
#include <boost/shared_array.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
25 26 27 28 29

#include <dns/message.h>
#include <dns/messagerenderer.h>
#include <dns/opcode.h>
#include <dns/rcode.h>
Stephen Morris's avatar
Stephen Morris committed
30
#include <log/dummylog.h>
31
#include <log/logger.h>
32

Stephen Morris's avatar
Stephen Morris committed
33
#include <asio.hpp>
34 35
#include <asio/deadline_timer.hpp>

36 37
#include <asiolink/asiodef.h>
#include <asiolink/io_address.h>
38
#include <asiolink/io_asio_socket.h>
39
#include <asiolink/io_endpoint.h>
Stephen Morris's avatar
Stephen Morris committed
40
#include <asiolink/io_fetch.h>
41 42 43 44 45
#include <asiolink/io_service.h>
#include <asiolink/tcp_endpoint.h>
#include <asiolink/tcp_socket.h>
#include <asiolink/udp_endpoint.h>
#include <asiolink/udp_socket.h>
46 47 48

using namespace asio;
using namespace isc::dns;
Stephen Morris's avatar
Stephen Morris committed
49 50
using namespace isc::log;
using namespace std;
51 52 53

namespace asiolink {

54 55 56 57
/// Use the ASIO logger

isc::log::Logger logger("asio");

58 59
/// \brief IOFetch Data
///
60 61 62 63 64
/// The data for IOFetch is held in a separate struct pointed to by a shared_ptr
/// object.  This is because the IOFetch object will be copied often (it is used
/// as a coroutine and passed as callback to many async_*() functions) and we
/// want keep the same data).  Organising the data in this way keeps copying to
/// a minimum.
65 66 67 68 69
struct IOFetchData {

    // The first two members are shared pointers to a base class because what is
    // actually instantiated depends on whether the fetch is over UDP or TCP,
    // which is not known until construction of the IOFetch.  Use of a shared
70
    // pointer here is merely to ensure deletion when the data object is deleted.
71 72 73 74 75 76 77 78 79 80 81 82
    boost::shared_ptr<IOAsioSocket<IOFetch> > socket;
                                            ///< Socket to use for I/O
    boost::shared_ptr<IOEndpoint> remote;   ///< Where the fetch was sent
    isc::dns::Question          question;   ///< Question to be asked
    isc::dns::OutputBufferPtr   msgbuf;     ///< Wire buffer for question
    isc::dns::OutputBufferPtr   buffer;     ///< Received data held here
    boost::shared_array<char>   data;       ///< Temporary array for data
    IOFetch::Callback*          callback;   ///< Called on I/O Completion
    size_t                      cumulative; ///< Cumulative received amount
    bool                        stopped;    ///< Have we stopped running?
    asio::deadline_timer        timer;      ///< Timer to measure timeouts
    int                         timeout;    ///< Timeout in ms
83 84 85 86 87 88 89

    // In case we need to log an error, the origin of the last asynchronous
    // I/O is recorded.  To save time and simplify the code, this is recorded
    // as the ID of the error message that would be generated if the I/O failed.
    // This means that we must make sure that all possible "origins" take the
    // same arguments in their message in the same order.
    isc::log::MessageID         origin;     ///< Origin of last asynchronous I/O
90 91 92 93 94

    /// \brief Constructor
    ///
    /// Just fills in the data members of the IOFetchData structure
    ///
95
    /// \param protocol Either IOFetch::TCP or IOFetch::UDP.
96
    /// \param service I/O Service object to handle the asynchronous
97
    ///        operations.
98 99 100 101
    /// \param query DNS question to send to the upstream server.
    /// \param address IP address of upstream server
    /// \param port Port to use for the query
    /// \param buff Output buffer into which the response (in wire format)
102
    ///        is written (if a response is received).
103
    /// \param cb Callback object containing the callback to be called
104 105
    ///        when we terminate.  The caller is responsible for managing this
    ///        object and deleting it if necessary.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
    /// \param wait Timeout for the fetch (in ms).
    ///
    /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
    IOFetchData(IOFetch::Protocol protocol, IOService& service,
        const isc::dns::Question& query, const IOAddress& address,
        uint16_t port, isc::dns::OutputBufferPtr& buff, IOFetch::Callback* cb,
        int wait)
        :
        socket((protocol == IOFetch::UDP) ?
            static_cast<IOAsioSocket<IOFetch>*>(
                new UDPSocket<IOFetch>(service)) :
            static_cast<IOAsioSocket<IOFetch>*>(
                new TCPSocket<IOFetch>(service))
            ),
        remote((protocol == IOFetch::UDP) ?
            static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
            static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
            ),
        question(query),
        msgbuf(new isc::dns::OutputBuffer(512)),
        buffer(buff),
        data(new char[IOFetch::MIN_LENGTH]),
        callback(cb),
        cumulative(0),
        stopped(false),
        timer(service.get_io_service()),
        timeout(wait),
133
        origin(ASIO_UNKORIGIN)
134 135 136
    {}
};

137
/// IOFetch Constructor - just initialize the private data
Stephen Morris's avatar
Stephen Morris committed
138

139
IOFetch::IOFetch(Protocol protocol, IOService& service,
Stephen Morris's avatar
Stephen Morris committed
140
    const isc::dns::Question& question, const IOAddress& address, uint16_t port,
141
    OutputBufferPtr& buff, Callback* cb, int wait)
Stephen Morris's avatar
Stephen Morris committed
142
    :
143
    data_(new IOFetchData(protocol, service, question, address,
Stephen Morris's avatar
Stephen Morris committed
144
        port, buff, cb, wait))
145 146 147 148 149
{
}

/// The function operator is implemented with the "stackless coroutine"
/// pattern; see internal/coroutine.h for details.
Stephen Morris's avatar
Stephen Morris committed
150

151
void
152
IOFetch::operator()(asio::error_code ec, size_t length) {
153

154 155 156 157
    if (data_->stopped) {
        return;
    } else if (ec) {
        logIOFailure(ec);
158 159 160 161
        return;
    }

    CORO_REENTER (this) {
Stephen Morris's avatar
Stephen Morris committed
162

163 164 165 166 167
        /// Generate the upstream query and render it to wire format
        /// This is done in a different scope to allow inline variable
        /// declarations.
        {
            Message msg(Message::RENDER);
168

169 170 171 172 173 174 175 176
            // TODO: replace with boost::random or some other suitable PRNG
            msg.setQid(0);
            msg.setOpcode(Opcode::QUERY());
            msg.setRcode(Rcode::NOERROR());
            msg.setHeaderFlag(Message::HEADERFLAG_RD);
            msg.addQuestion(data_->question);
            MessageRenderer renderer(*data_->msgbuf);
            msg.toWire(renderer);
177 178

            // As this is a new fetch, clear the amount of data received
Stephen Morris's avatar
Stephen Morris committed
179
            data_->cumulative = 0;
180

181 182 183 184
            dlog("Sending " + msg.toText() + " to " +
                data_->remote->getAddress().toText());
        }

185 186
        // If we timeout, we stop, which will can cancel outstanding I/Os and
        // shutdown everything.
187 188 189 190 191 192 193 194
        if (data_->timeout != -1) {
            data_->timer.expires_from_now(boost::posix_time::milliseconds(
                data_->timeout));
            data_->timer.async_wait(boost::bind(&IOFetch::stop, *this,
                TIME_OUT));
        }

        // Open a connection to the target system.  For speed, if the operation
195 196
        // is synchronous (i.e. UDP operation) we bypass the yield.
        data_->origin = ASIO_OPENSOCK;
197 198 199 200
        if (data_->socket->isOpenSynchronous()) {
            data_->socket->open(data_->remote.get(), *this);
        } else {
            CORO_YIELD data_->socket->open(data_->remote.get(), *this);
201 202
        }

203 204 205
        // Begin an asynchronous send, and then yield.  When the send completes,
        // we will resume immediately after this point.
        data_->origin = ASIO_SENDSOCK;
Stephen Morris's avatar
Stephen Morris committed
206
        CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
207
            data_->msgbuf->getLength(), data_->remote.get(), *this);
208

Stephen Morris's avatar
Stephen Morris committed
209 210 211 212 213 214 215
        // Now receive the response.  Since TCP may not receive the entire
        // message in one operation, we need to loop until we have received
        // it. (This can't be done within the asyncReceive() method because
        // each I/O operation will be done asynchronously and between each one
        // we need to yield ... and we *really* don't want to set up another
        // coroutine within that method.)  So after each receive (and yield),
        // we check if the operation is complete and if not, loop to read again.
216
        data_->origin = ASIO_RECVSOCK;
Stephen Morris's avatar
Stephen Morris committed
217 218
        do {
            CORO_YIELD data_->socket->asyncReceive(data_->data.get(),
219
                static_cast<size_t>(MIN_LENGTH), data_->cumulative,
Stephen Morris's avatar
Stephen Morris committed
220
                data_->remote.get(), *this);
221 222
            data_->cumulative += length;
        } while (!data_->socket->receiveComplete(data_->data.get(),
Stephen Morris's avatar
Stephen Morris committed
223 224
            data_->cumulative));

225
        /// Copy the answer into the response buffer.  (TODO: If the
226 227 228 229
        /// OutputBuffer object were made to meet the requirements of a
        /// MutableBufferSequence, then it could be written to directly by
        /// async_receive_from() and this additional copy step would be
        /// unnecessary.)
230 231
        data_->buffer->writeData(data_->data.get(), length);

232 233 234
        // Finished with this socket, so close it.  This will not generate an
        // I/O error, but reset the origin to unknown in case we change this.
        data_->origin = ASIO_UNKORIGIN;
235 236
        data_->socket->close();

237 238 239 240 241 242 243 244 245
        /// We are done
        stop(SUCCESS);
    }
}

// Function that stops the coroutine sequence.  It is called either when the
// query finishes or when the timer times out.  Either way, it sets the
// "stopped_" flag and cancels anything that is in progress.
//
246 247
// As the function may be entered multiple times as things wind down, it checks
// if the stopped_ flag is already set.  If it is, the call is a no-op.
Stephen Morris's avatar
Stephen Morris committed
248

249 250
void
IOFetch::stop(Result result) {
251

252
    if (!data_->stopped) {
253 254 255 256 257 258 259 260 261 262 263

        // Mark the fetch as stopped to prevent other completion callbacks
        // (invoked because of the calls to cancel()) from executing the
        // cancel calls again.
        //
        // In a single threaded environment, the callbacks won't be invoked
        // until this one completes. In a multi-threaded environment, they may
        // well be, in which case the testing (and setting) of the stopped_
        // variable should be done inside a mutex (and the stopped_ variable
        // declared as "volatile").
        //
264 265 266 267
        // The numeric arguments indicate the debug level, with the lower
        // numbers indicating the most important information.  The relative
        // values are somewhat arbitrary.
        //
268
        // Although Logger::debug checks the debug flag internally, doing it
269
        // below before calling Logger::debug avoids the overhead of a string
270
        // conversion in the common case when debug is not enabled.
271
        //
272 273
        // TODO: Update testing of stopped_ if threads are used.
        data_->stopped = true;
274 275
        switch (result) {
            case TIME_OUT:
276
                if (logger.isDebugEnabled(1)) {
277 278 279
                    logger.debug(20, ASIO_RECVTMO,
                                 data_->remote->getAddress().toText().c_str(),
                                 static_cast<int>(data_->remote->getPort()));
280
                }
281 282
                break;

283
            case SUCCESS:
284
                if (logger.isDebugEnabled(50)) {
285 286 287
                    logger.debug(30, ASIO_FETCHCOMP,
                                 data_->remote->getAddress().toText().c_str(),
                                 static_cast<int>(data_->remote->getPort()));
288
                }
289 290
                break;

291 292 293 294
            case STOPPED:
                // Fetch has been stopped for some other reason.  This is
                // allowed but as it is unusual it is logged, but with a lower
                // debug level than a timeout (which is totally normal).
295 296 297
                logger.debug(1, ASIO_FETCHSTOP,
                             data_->remote->getAddress().toText().c_str(),
                             static_cast<int>(data_->remote->getPort()));
298 299
                break;

300
            default:
301
                logger.error(ASIO_UNKRESULT, static_cast<int>(result),
302 303
                             data_->remote->getAddress().toText().c_str(),
                             static_cast<int>(data_->remote->getPort()));
304 305
        }

Stephen Morris's avatar
Stephen Morris committed
306 307 308 309 310 311 312 313
        // Stop requested, cancel and I/O's on the socket and shut it down,
        // and cancel the timer.
        data_->socket->cancel();
        data_->socket->close();

        data_->timer.cancel();

        // Execute the I/O completion callback (if present).
314
        if (data_->callback) {
Stephen Morris's avatar
Stephen Morris committed
315
            (*(data_->callback))(result);
316 317 318 319
        }
    }
}

320 321
// Log an error - called on I/O failure

322
void IOFetch::logIOFailure(asio::error_code ec) {
323

324 325 326 327 328
    // Should only get here with a known error code.
    assert((data_->origin == ASIO_OPENSOCK) ||
           (data_->origin == ASIO_SENDSOCK) ||
           (data_->origin == ASIO_RECVSOCK) ||
           (data_->origin == ASIO_UNKORIGIN));
329

330 331 332 333 334 335 336
    static const char* PROTOCOL[2] = {"TCP", "UDP"};
    logger.error(data_->origin,
                 ec.value(),
                 ((data_->remote->getProtocol() == IPPROTO_TCP) ?
                     PROTOCOL[0] : PROTOCOL[1]),
                 data_->remote->getAddress().toText().c_str(),
                 static_cast<int>(data_->remote->getPort()));
337 338
}

339
} // namespace asiolink
Stephen Morris's avatar
Stephen Morris committed
340