io_fetch.cc 15.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.

#include <config.h>

#include <unistd.h>             // for some IPC/network system calls
#include <sys/socket.h>
#include <netinet/in.h>

#include <boost/bind.hpp>
22
#include <boost/scoped_ptr.hpp>
23
#include <boost/date_time/posix_time/posix_time_types.hpp>
24 25 26 27 28

#include <dns/message.h>
#include <dns/messagerenderer.h>
#include <dns/opcode.h>
#include <dns/rcode.h>
29
#include <log/logger.h>
30

31 32
#include <asiolink/qid_gen.h>

Stephen Morris's avatar
Stephen Morris committed
33
#include <asio.hpp>
34 35
#include <asio/deadline_timer.hpp>

36 37
#include <asiolink/asiodef.h>
#include <asiolink/io_address.h>
38
#include <asiolink/io_asio_socket.h>
39
#include <asiolink/io_endpoint.h>
40 41 42 43 44
#include <asiolink/io_service.h>
#include <asiolink/tcp_endpoint.h>
#include <asiolink/tcp_socket.h>
#include <asiolink/udp_endpoint.h>
#include <asiolink/udp_socket.h>
45 46
#include <asiolink/qid_gen.h>

47 48
#include "io_fetch.h"

49
#include <stdint.h>
50 51

using namespace asio;
52
using namespace isc::asiolink;
53
using namespace isc::dns;
Stephen Morris's avatar
Stephen Morris committed
54 55
using namespace isc::log;
using namespace std;
56

57 58
namespace isc {
namespace asiodns {
59

60 61
/// Use the ASIO logger

62
isc::log::Logger logger("asiodns");
63

64 65
/// \brief IOFetch Data
///
66 67 68 69 70
/// The data for IOFetch is held in a separate struct pointed to by a shared_ptr
/// object.  This is because the IOFetch object will be copied often (it is used
/// as a coroutine and passed as callback to many async_*() functions) and we
/// want keep the same data).  Organising the data in this way keeps copying to
/// a minimum.
71 72 73 74 75
struct IOFetchData {

    // The first two members are shared pointers to a base class because what is
    // actually instantiated depends on whether the fetch is over UDP or TCP,
    // which is not known until construction of the IOFetch.  Use of a shared
76
    // pointer here is merely to ensure deletion when the data object is deleted.
77
    boost::scoped_ptr<IOAsioSocket<IOFetch> > socket;
78 79 80 81 82 83 84 85 86 87 88 89 90 91
                                             ///< Socket to use for I/O
    boost::scoped_ptr<IOEndpoint> remote_snd;///< Where the fetch is sent
    boost::scoped_ptr<IOEndpoint> remote_rcv;///< Where the response came from
    isc::dns::Question          question;    ///< Question to be asked
    isc::dns::OutputBufferPtr   msgbuf;      ///< Wire buffer for question
    isc::dns::OutputBufferPtr   received;    ///< Received data put here
    IOFetch::Callback*          callback;    ///< Called on I/O Completion
    asio::deadline_timer        timer;       ///< Timer to measure timeouts
    IOFetch::Protocol           protocol;    ///< Protocol being used
    size_t                      cumulative;  ///< Cumulative received amount
    size_t                      expected;    ///< Expected amount of data
    size_t                      offset;      ///< Offset to receive data
    bool                        stopped;     ///< Have we stopped running?
    int                         timeout;     ///< Timeout in ms
92 93 94 95 96 97 98

    // In case we need to log an error, the origin of the last asynchronous
    // I/O is recorded.  To save time and simplify the code, this is recorded
    // as the ID of the error message that would be generated if the I/O failed.
    // This means that we must make sure that all possible "origins" take the
    // same arguments in their message in the same order.
    isc::log::MessageID         origin;     ///< Origin of last asynchronous I/O
99 100
    uint8_t                     staging[IOFetch::STAGING_LENGTH];
                                            ///< Temporary array for received data
101
    isc::dns::qid_t             qid;         ///< The QID set in the query
102 103 104 105 106

    /// \brief Constructor
    ///
    /// Just fills in the data members of the IOFetchData structure
    ///
107
    /// \param proto Either IOFetch::TCP or IOFetch::UDP.
108
    /// \param service I/O Service object to handle the asynchronous
109
    ///        operations.
110 111 112 113
    /// \param query DNS question to send to the upstream server.
    /// \param address IP address of upstream server
    /// \param port Port to use for the query
    /// \param buff Output buffer into which the response (in wire format)
114
    ///        is written (if a response is received).
115
    /// \param cb Callback object containing the callback to be called
116 117
    ///        when we terminate.  The caller is responsible for managing this
    ///        object and deleting it if necessary.
118 119 120
    /// \param wait Timeout for the fetch (in ms).
    ///
    /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
121
    IOFetchData(IOFetch::Protocol proto, IOService& service,
122 123 124 125
        const isc::dns::Question& query, const IOAddress& address,
        uint16_t port, isc::dns::OutputBufferPtr& buff, IOFetch::Callback* cb,
        int wait)
        :
126
        socket((proto == IOFetch::UDP) ?
127 128 129 130 131
            static_cast<IOAsioSocket<IOFetch>*>(
                new UDPSocket<IOFetch>(service)) :
            static_cast<IOAsioSocket<IOFetch>*>(
                new TCPSocket<IOFetch>(service))
            ),
132 133 134 135 136
        remote_snd((proto == IOFetch::UDP) ?
            static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
            static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
            ),
        remote_rcv((proto == IOFetch::UDP) ?
137 138 139 140 141
            static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
            static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
            ),
        question(query),
        msgbuf(new isc::dns::OutputBuffer(512)),
142
        received(buff),
143

144
        callback(cb),
145 146
        timer(service.get_io_service()),
        protocol(proto),
147
        cumulative(0),
148 149
        expected(0),
        offset(0),
150 151
        stopped(false),
        timeout(wait),
152
        origin(ASIO_UNKORIGIN),
153 154
        staging(),
        qid(QidGenerator::getInstance().generateQid())
155
    {}
156 157 158 159 160 161 162 163 164 165 166 167

    // Checks if the response we received was ok;
    // - data contains the buffer we read, as well as the address
    // we sent to and the address we received from.
    // length is provided by the operator() in IOFetch.
    // Addresses must match, number of octets read must be at least
    // 2, and the first two octets must match the qid of the message
    // we sent.
    bool responseOK() {
        return (*remote_snd == *remote_rcv && cumulative >= 2 &&
                readUint16(received->getData()) == qid);
    }
168 169
};

170
/// IOFetch Constructor - just initialize the private data
Stephen Morris's avatar
Stephen Morris committed
171

172
IOFetch::IOFetch(Protocol protocol, IOService& service,
Stephen Morris's avatar
Stephen Morris committed
173
    const isc::dns::Question& question, const IOAddress& address, uint16_t port,
174
    OutputBufferPtr& buff, Callback* cb, int wait)
Stephen Morris's avatar
Stephen Morris committed
175
    :
176
    data_(new IOFetchData(protocol, service, question, address,
Stephen Morris's avatar
Stephen Morris committed
177
        port, buff, cb, wait))
178 179 180
{
}

181 182 183 184 185 186 187
// Return protocol in use.

IOFetch::Protocol
IOFetch::getProtocol() const {
    return (data_->protocol);
}

188 189
/// The function operator is implemented with the "stackless coroutine"
/// pattern; see internal/coroutine.h for details.
Stephen Morris's avatar
Stephen Morris committed
190

191
void
192
IOFetch::operator()(asio::error_code ec, size_t length) {
193

194 195 196 197
    if (data_->stopped) {
        return;
    } else if (ec) {
        logIOFailure(ec);
198 199 200 201
        return;
    }

    CORO_REENTER (this) {
Stephen Morris's avatar
Stephen Morris committed
202

203 204 205 206 207
        /// Generate the upstream query and render it to wire format
        /// This is done in a different scope to allow inline variable
        /// declarations.
        {
            Message msg(Message::RENDER);
208
            msg.setQid(data_->qid);
209 210 211 212 213 214 215 216
            msg.setOpcode(Opcode::QUERY());
            msg.setRcode(Rcode::NOERROR());
            msg.setHeaderFlag(Message::HEADERFLAG_RD);
            msg.addQuestion(data_->question);
            MessageRenderer renderer(*data_->msgbuf);
            msg.toWire(renderer);
        }

217 218
        // If we timeout, we stop, which will can cancel outstanding I/Os and
        // shutdown everything.
219 220 221 222 223 224 225 226
        if (data_->timeout != -1) {
            data_->timer.expires_from_now(boost::posix_time::milliseconds(
                data_->timeout));
            data_->timer.async_wait(boost::bind(&IOFetch::stop, *this,
                TIME_OUT));
        }

        // Open a connection to the target system.  For speed, if the operation
227 228
        // is synchronous (i.e. UDP operation) we bypass the yield.
        data_->origin = ASIO_OPENSOCK;
229
        if (data_->socket->isOpenSynchronous()) {
230
            data_->socket->open(data_->remote_snd.get(), *this);
231
        } else {
232
            CORO_YIELD data_->socket->open(data_->remote_snd.get(), *this);
233 234
        }

Stephen Morris's avatar
Stephen Morris committed
235
        do {
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
            // Begin an asynchronous send, and then yield.  When the send completes,
            // we will resume immediately after this point.
            data_->origin = ASIO_SENDSOCK;
            CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
                data_->msgbuf->getLength(), data_->remote_snd.get(), *this);
    
            // Now receive the response.  Since TCP may not receive the entire
            // message in one operation, we need to loop until we have received
            // it. (This can't be done within the asyncReceive() method because
            // each I/O operation will be done asynchronously and between each one
            // we need to yield ... and we *really* don't want to set up another
            // coroutine within that method.)  So after each receive (and yield),
            // we check if the operation is complete and if not, loop to read again.
            //
            // Another concession to TCP is that the amount of is contained in the
            // first two bytes.  This leads to two problems:
            //
            // a) We don't want those bytes in the return buffer.
            // b) They may not both arrive in the first I/O.
            //
            // So... we need to loop until we have at least two bytes, then store
            // the expected amount of data.  Then we need to loop until we have
            // received all the data before copying it back to the user's buffer.
            // And we want to minimise the amount of copying...
    
            data_->origin = ASIO_RECVSOCK;
            data_->cumulative = 0;          // No data yet received
            data_->offset = 0;              // First data into start of buffer
264
            data_->received->clear();       // Clear the receive buffer
265 266 267 268 269 270 271 272 273
            do {
                CORO_YIELD data_->socket->asyncReceive(data_->staging,
                                                       static_cast<size_t>(STAGING_LENGTH),
                                                       data_->offset,
                                                       data_->remote_rcv.get(), *this);
            } while (!data_->socket->processReceivedData(data_->staging, length,
                                                         data_->cumulative, data_->offset,
                                                         data_->expected, data_->received));
        } while (!data_->responseOK());
274

275 276 277
        // Finished with this socket, so close it.  This will not generate an
        // I/O error, but reset the origin to unknown in case we change this.
        data_->origin = ASIO_UNKORIGIN;
278 279
        data_->socket->close();

280 281 282 283 284 285 286 287 288
        /// We are done
        stop(SUCCESS);
    }
}

// Function that stops the coroutine sequence.  It is called either when the
// query finishes or when the timer times out.  Either way, it sets the
// "stopped_" flag and cancels anything that is in progress.
//
289 290
// As the function may be entered multiple times as things wind down, it checks
// if the stopped_ flag is already set.  If it is, the call is a no-op.
Stephen Morris's avatar
Stephen Morris committed
291

292 293
void
IOFetch::stop(Result result) {
294

295
    if (!data_->stopped) {
296 297 298 299 300 301 302 303 304 305 306

        // Mark the fetch as stopped to prevent other completion callbacks
        // (invoked because of the calls to cancel()) from executing the
        // cancel calls again.
        //
        // In a single threaded environment, the callbacks won't be invoked
        // until this one completes. In a multi-threaded environment, they may
        // well be, in which case the testing (and setting) of the stopped_
        // variable should be done inside a mutex (and the stopped_ variable
        // declared as "volatile").
        //
307 308 309 310
        // The numeric arguments indicate the debug level, with the lower
        // numbers indicating the most important information.  The relative
        // values are somewhat arbitrary.
        //
311
        // Although Logger::debug checks the debug flag internally, doing it
312
        // below before calling Logger::debug avoids the overhead of a string
313
        // conversion in the common case when debug is not enabled.
314
        //
315 316
        // TODO: Update testing of stopped_ if threads are used.
        data_->stopped = true;
317 318
        switch (result) {
            case TIME_OUT:
319
                if (logger.isDebugEnabled(1)) {
320
                    logger.debug(20, ASIO_RECVTMO,
321 322
                                 data_->remote_snd->getAddress().toText().c_str(),
                                 static_cast<int>(data_->remote_snd->getPort()));
323
                }
324 325
                break;

326
            case SUCCESS:
327
                if (logger.isDebugEnabled(50)) {
328
                    logger.debug(30, ASIO_FETCHCOMP,
329 330
                                 data_->remote_rcv->getAddress().toText().c_str(),
                                 static_cast<int>(data_->remote_rcv->getPort()));
331
                }
332 333
                break;

334 335 336 337
            case STOPPED:
                // Fetch has been stopped for some other reason.  This is
                // allowed but as it is unusual it is logged, but with a lower
                // debug level than a timeout (which is totally normal).
338
                logger.debug(1, ASIO_FETCHSTOP,
339 340
                             data_->remote_snd->getAddress().toText().c_str(),
                             static_cast<int>(data_->remote_snd->getPort()));
341 342
                break;

343
            default:
344
                logger.error(ASIO_UNKRESULT, static_cast<int>(result),
345 346
                             data_->remote_snd->getAddress().toText().c_str(),
                             static_cast<int>(data_->remote_snd->getPort()));
347 348
        }

Stephen Morris's avatar
Stephen Morris committed
349 350 351 352 353 354 355 356
        // Stop requested, cancel and I/O's on the socket and shut it down,
        // and cancel the timer.
        data_->socket->cancel();
        data_->socket->close();

        data_->timer.cancel();

        // Execute the I/O completion callback (if present).
357
        if (data_->callback) {
Stephen Morris's avatar
Stephen Morris committed
358
            (*(data_->callback))(result);
359 360 361 362
        }
    }
}

363 364
// Log an error - called on I/O failure

365
void IOFetch::logIOFailure(asio::error_code ec) {
366

367 368 369 370 371
    // Should only get here with a known error code.
    assert((data_->origin == ASIO_OPENSOCK) ||
           (data_->origin == ASIO_SENDSOCK) ||
           (data_->origin == ASIO_RECVSOCK) ||
           (data_->origin == ASIO_UNKORIGIN));
372

373 374 375
    static const char* PROTOCOL[2] = {"TCP", "UDP"};
    logger.error(data_->origin,
                 ec.value(),
376
                 ((data_->remote_snd->getProtocol() == IPPROTO_TCP) ?
377
                     PROTOCOL[0] : PROTOCOL[1]),
378 379
                 data_->remote_snd->getAddress().toText().c_str(),
                 static_cast<int>(data_->remote_snd->getPort()));
380 381
}

382 383
} // namespace asiodns
} // namespace isc
Stephen Morris's avatar
Stephen Morris committed
384