io_fetch.cc 17.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.

#include <config.h>

#include <netinet/in.h>
18 19 20
#include <stdint.h>
#include <sys/socket.h>
#include <unistd.h>             // for some IPC/network system calls
21 22

#include <boost/bind.hpp>
23
#include <boost/scoped_ptr.hpp>
24
#include <boost/date_time/posix_time/posix_time_types.hpp>
25

Stephen Morris's avatar
Stephen Morris committed
26
#include <asio.hpp>
27 28
#include <asio/deadline_timer.hpp>

29
#include <asiolink/io_address.h>
30
#include <asiolink/io_asio_socket.h>
31
#include <asiolink/io_endpoint.h>
32 33 34 35 36
#include <asiolink/io_service.h>
#include <asiolink/tcp_endpoint.h>
#include <asiolink/tcp_socket.h>
#include <asiolink/udp_endpoint.h>
#include <asiolink/udp_socket.h>
37

38 39 40 41 42 43 44
#include <dns/messagerenderer.h>
#include <dns/opcode.h>
#include <dns/rcode.h>
#include <log/logger.h>

#include <asiodns/asiodef.h>
#include <asiodns/io_fetch.h>
45 46 47

#include <util/buffer.h>
#include <util/random/qid_gen.h>
48

49 50

using namespace asio;
51
using namespace isc::asiolink;
52
using namespace isc::dns;
53
using namespace isc::util;
54
using namespace isc::util::random;
Stephen Morris's avatar
Stephen Morris committed
55 56
using namespace isc::log;
using namespace std;
57

58 59
namespace isc {
namespace asiodns {
60

61 62
/// Use the ASIO logger

63
isc::log::Logger logger("asiolink");
64

65 66
/// \brief IOFetch Data
///
67 68 69 70 71
/// The data for IOFetch is held in a separate struct pointed to by a shared_ptr
/// object.  This is because the IOFetch object will be copied often (it is used
/// as a coroutine and passed as callback to many async_*() functions) and we
/// want keep the same data).  Organising the data in this way keeps copying to
/// a minimum.
72 73 74 75 76
struct IOFetchData {

    // The first two members are shared pointers to a base class because what is
    // actually instantiated depends on whether the fetch is over UDP or TCP,
    // which is not known until construction of the IOFetch.  Use of a shared
77
    // pointer here is merely to ensure deletion when the data object is deleted.
78
    boost::scoped_ptr<IOAsioSocket<IOFetch> > socket;
79 80 81
                                             ///< Socket to use for I/O
    boost::scoped_ptr<IOEndpoint> remote_snd;///< Where the fetch is sent
    boost::scoped_ptr<IOEndpoint> remote_rcv;///< Where the response came from
82 83
    OutputBufferPtr   msgbuf;      ///< Wire buffer for question
    OutputBufferPtr   received;    ///< Received data put here
84 85 86 87 88 89 90 91
    IOFetch::Callback*          callback;    ///< Called on I/O Completion
    asio::deadline_timer        timer;       ///< Timer to measure timeouts
    IOFetch::Protocol           protocol;    ///< Protocol being used
    size_t                      cumulative;  ///< Cumulative received amount
    size_t                      expected;    ///< Expected amount of data
    size_t                      offset;      ///< Offset to receive data
    bool                        stopped;     ///< Have we stopped running?
    int                         timeout;     ///< Timeout in ms
Stephen Morris's avatar
Stephen Morris committed
92
    bool                        packet;      ///< true if packet was supplied
93 94 95 96 97 98 99

    // In case we need to log an error, the origin of the last asynchronous
    // I/O is recorded.  To save time and simplify the code, this is recorded
    // as the ID of the error message that would be generated if the I/O failed.
    // This means that we must make sure that all possible "origins" take the
    // same arguments in their message in the same order.
    isc::log::MessageID         origin;     ///< Origin of last asynchronous I/O
100 101
    uint8_t                     staging[IOFetch::STAGING_LENGTH];
                                            ///< Temporary array for received data
102
    isc::dns::qid_t             qid;         ///< The QID set in the query
103 104 105 106 107

    /// \brief Constructor
    ///
    /// Just fills in the data members of the IOFetchData structure
    ///
108
    /// \param proto Either IOFetch::TCP or IOFetch::UDP.
109
    /// \param service I/O Service object to handle the asynchronous
110
    ///        operations.
111 112 113
    /// \param address IP address of upstream server
    /// \param port Port to use for the query
    /// \param buff Output buffer into which the response (in wire format)
114
    ///        is written (if a response is received).
115
    /// \param cb Callback object containing the callback to be called
116 117
    ///        when we terminate.  The caller is responsible for managing this
    ///        object and deleting it if necessary.
118 119 120
    /// \param wait Timeout for the fetch (in ms).
    ///
    /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
121
    IOFetchData(IOFetch::Protocol proto, IOService& service,
122 123
        const IOAddress& address, uint16_t port, OutputBufferPtr& buff,
        IOFetch::Callback* cb, int wait)
124
        :
125
        socket((proto == IOFetch::UDP) ?
126 127 128 129 130
            static_cast<IOAsioSocket<IOFetch>*>(
                new UDPSocket<IOFetch>(service)) :
            static_cast<IOAsioSocket<IOFetch>*>(
                new TCPSocket<IOFetch>(service))
            ),
131 132 133 134 135
        remote_snd((proto == IOFetch::UDP) ?
            static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
            static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
            ),
        remote_rcv((proto == IOFetch::UDP) ?
136 137 138
            static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
            static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
            ),
139
        msgbuf(new OutputBuffer(512)),
140
        received(buff),
141
        callback(cb),
142 143
        timer(service.get_io_service()),
        protocol(proto),
144
        cumulative(0),
145 146
        expected(0),
        offset(0),
147 148
        stopped(false),
        timeout(wait),
Stephen Morris's avatar
Stephen Morris committed
149
        packet(false),
150
        origin(ASIODNS_UNKORIGIN),
151 152
        staging(),
        qid(QidGenerator::getInstance().generateQid())
153
    {}
154 155 156 157 158 159 160 161 162 163 164 165

    // Checks if the response we received was ok;
    // - data contains the buffer we read, as well as the address
    // we sent to and the address we received from.
    // length is provided by the operator() in IOFetch.
    // Addresses must match, number of octets read must be at least
    // 2, and the first two octets must match the qid of the message
    // we sent.
    bool responseOK() {
        return (*remote_snd == *remote_rcv && cumulative >= 2 &&
                readUint16(received->getData()) == qid);
    }
166 167
};

168
/// IOFetch Constructor - just initialize the private data
Stephen Morris's avatar
Stephen Morris committed
169

170
IOFetch::IOFetch(Protocol protocol, IOService& service,
Stephen Morris's avatar
Stephen Morris committed
171
    const isc::dns::Question& question, const IOAddress& address, uint16_t port,
172
    OutputBufferPtr& buff, Callback* cb, int wait)
173
{
174 175 176
    MessagePtr query_msg(new Message(Message::RENDER));
    initIOFetch(query_msg, protocol, service, question, address, port, buff,
                cb, wait);
177 178
}

Stephen Morris's avatar
Stephen Morris committed
179 180 181 182 183 184 185 186 187 188 189
IOFetch::IOFetch(Protocol protocol, IOService& service,
    OutputBufferPtr& outpkt, const IOAddress& address, uint16_t port,
    OutputBufferPtr& buff, Callback* cb, int wait)
    :
    data_(new IOFetchData(protocol, service,
          address, port, buff, cb, wait))
{
    data_->msgbuf = outpkt;
    data_->packet = true;
}

190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
IOFetch::IOFetch(Protocol protocol, IOService& service,
    ConstMessagePtr query_message, const IOAddress& address, uint16_t port,
    OutputBufferPtr& buff, Callback* cb, int wait)
{
    MessagePtr msg(new Message(Message::RENDER));
    Message::HeaderFlag flag = Message::HEADERFLAG_RD;
    msg->setHeaderFlag(flag, query_message->getHeaderFlag(flag));
    flag = Message::HEADERFLAG_CD;
    msg->setHeaderFlag(flag, query_message->getHeaderFlag(flag));

    ConstEDNSPtr edns(query_message->getEDNS());
    const bool dnssec_ok = edns && edns->getDNSSECAwareness();
    if (edns) {
        EDNSPtr edns_response(new EDNS());
        edns_response->setDNSSECAwareness(dnssec_ok);
        // TODO: We should make our own edns bufsize length configurable
        edns_response->setUDPSize(Message::DEFAULT_MAX_EDNS0_UDPSIZE);
        msg->setEDNS(edns_response);
    }

    initIOFetch(msg, protocol, service,
                **(query_message->beginQuestion()),
                address, port, buff, cb, wait);
}

void
IOFetch::initIOFetch(MessagePtr& query_msg, Protocol protocol, IOService& service,
                     const isc::dns::Question& question,
                     const IOAddress& address, uint16_t port,
                     OutputBufferPtr& buff, Callback* cb, int wait)
{
    data_ = boost::shared_ptr<IOFetchData>(new IOFetchData(
        protocol, service, address, port, buff, cb, wait));

    query_msg->setQid(data_->qid);
    query_msg->setOpcode(Opcode::QUERY());
    query_msg->setRcode(Rcode::NOERROR());
    query_msg->setHeaderFlag(Message::HEADERFLAG_RD);
    query_msg->addQuestion(question);
    MessageRenderer renderer(*data_->msgbuf);
    query_msg->toWire(renderer);
}

233 234 235 236 237 238 239
// Return protocol in use.

IOFetch::Protocol
IOFetch::getProtocol() const {
    return (data_->protocol);
}

240 241
/// The function operator is implemented with the "stackless coroutine"
/// pattern; see internal/coroutine.h for details.
Stephen Morris's avatar
Stephen Morris committed
242

243
void
244
IOFetch::operator()(asio::error_code ec, size_t length) {
245

246 247 248 249
    if (data_->stopped) {
        return;
    } else if (ec) {
        logIOFailure(ec);
250 251 252 253
        return;
    }

    CORO_REENTER (this) {
Stephen Morris's avatar
Stephen Morris committed
254

255 256 257 258
        /// Generate the upstream query and render it to wire format
        /// This is done in a different scope to allow inline variable
        /// declarations.
        {
Stephen Morris's avatar
Stephen Morris committed
259 260 261 262 263
            if (data_->packet) {
                // A packet was given, overwrite the QID (which is in the
                // first two bytes of the packet).
                data_->msgbuf->writeUint16At(data_->qid, 0);

264
            } 
265 266
        }

267 268
        // If we timeout, we stop, which will can cancel outstanding I/Os and
        // shutdown everything.
269 270 271 272 273 274 275 276
        if (data_->timeout != -1) {
            data_->timer.expires_from_now(boost::posix_time::milliseconds(
                data_->timeout));
            data_->timer.async_wait(boost::bind(&IOFetch::stop, *this,
                TIME_OUT));
        }

        // Open a connection to the target system.  For speed, if the operation
277
        // is synchronous (i.e. UDP operation) we bypass the yield.
278
        data_->origin = ASIODNS_OPENSOCK;
279
        if (data_->socket->isOpenSynchronous()) {
280
            data_->socket->open(data_->remote_snd.get(), *this);
281
        } else {
282
            CORO_YIELD data_->socket->open(data_->remote_snd.get(), *this);
283 284
        }

Stephen Morris's avatar
Stephen Morris committed
285
        do {
286 287
            // Begin an asynchronous send, and then yield.  When the send completes,
            // we will resume immediately after this point.
288
            data_->origin = ASIODNS_SENDSOCK;
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
            CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
                data_->msgbuf->getLength(), data_->remote_snd.get(), *this);
    
            // Now receive the response.  Since TCP may not receive the entire
            // message in one operation, we need to loop until we have received
            // it. (This can't be done within the asyncReceive() method because
            // each I/O operation will be done asynchronously and between each one
            // we need to yield ... and we *really* don't want to set up another
            // coroutine within that method.)  So after each receive (and yield),
            // we check if the operation is complete and if not, loop to read again.
            //
            // Another concession to TCP is that the amount of is contained in the
            // first two bytes.  This leads to two problems:
            //
            // a) We don't want those bytes in the return buffer.
            // b) They may not both arrive in the first I/O.
            //
            // So... we need to loop until we have at least two bytes, then store
            // the expected amount of data.  Then we need to loop until we have
            // received all the data before copying it back to the user's buffer.
            // And we want to minimise the amount of copying...
    
311
            data_->origin = ASIODNS_RECVSOCK;
312 313
            data_->cumulative = 0;          // No data yet received
            data_->offset = 0;              // First data into start of buffer
314
            data_->received->clear();       // Clear the receive buffer
315 316 317 318 319 320 321 322 323
            do {
                CORO_YIELD data_->socket->asyncReceive(data_->staging,
                                                       static_cast<size_t>(STAGING_LENGTH),
                                                       data_->offset,
                                                       data_->remote_rcv.get(), *this);
            } while (!data_->socket->processReceivedData(data_->staging, length,
                                                         data_->cumulative, data_->offset,
                                                         data_->expected, data_->received));
        } while (!data_->responseOK());
324

325 326
        // Finished with this socket, so close it.  This will not generate an
        // I/O error, but reset the origin to unknown in case we change this.
327
        data_->origin = ASIODNS_UNKORIGIN;
328 329
        data_->socket->close();

330 331 332 333 334 335 336 337 338
        /// We are done
        stop(SUCCESS);
    }
}

// Function that stops the coroutine sequence.  It is called either when the
// query finishes or when the timer times out.  Either way, it sets the
// "stopped_" flag and cancels anything that is in progress.
//
339 340
// As the function may be entered multiple times as things wind down, it checks
// if the stopped_ flag is already set.  If it is, the call is a no-op.
Stephen Morris's avatar
Stephen Morris committed
341

342 343
void
IOFetch::stop(Result result) {
344

345
    if (!data_->stopped) {
346 347 348 349 350 351 352 353 354 355 356

        // Mark the fetch as stopped to prevent other completion callbacks
        // (invoked because of the calls to cancel()) from executing the
        // cancel calls again.
        //
        // In a single threaded environment, the callbacks won't be invoked
        // until this one completes. In a multi-threaded environment, they may
        // well be, in which case the testing (and setting) of the stopped_
        // variable should be done inside a mutex (and the stopped_ variable
        // declared as "volatile").
        //
357 358 359 360
        // The numeric arguments indicate the debug level, with the lower
        // numbers indicating the most important information.  The relative
        // values are somewhat arbitrary.
        //
361
        // Although Logger::debug checks the debug flag internally, doing it
362
        // below before calling Logger::debug avoids the overhead of a string
363
        // conversion in the common case when debug is not enabled.
364
        //
365 366
        // TODO: Update testing of stopped_ if threads are used.
        data_->stopped = true;
367 368
        switch (result) {
            case TIME_OUT:
369
                if (logger.isDebugEnabled(1)) {
370
                    logger.debug(20, ASIODNS_RECVTMO,
371 372
                                 data_->remote_snd->getAddress().toText().c_str(),
                                 static_cast<int>(data_->remote_snd->getPort()));
373
                }
374 375
                break;

376
            case SUCCESS:
377
                if (logger.isDebugEnabled(50)) {
378
                    logger.debug(30, ASIODNS_FETCHCOMP,
379 380
                                 data_->remote_rcv->getAddress().toText().c_str(),
                                 static_cast<int>(data_->remote_rcv->getPort()));
381
                }
382 383
                break;

384 385 386 387
            case STOPPED:
                // Fetch has been stopped for some other reason.  This is
                // allowed but as it is unusual it is logged, but with a lower
                // debug level than a timeout (which is totally normal).
388
                logger.debug(1, ASIODNS_FETCHSTOP,
389 390
                             data_->remote_snd->getAddress().toText().c_str(),
                             static_cast<int>(data_->remote_snd->getPort()));
391 392
                break;

393
            default:
394
                logger.error(ASIODNS_UNKRESULT, static_cast<int>(result),
395 396
                             data_->remote_snd->getAddress().toText().c_str(),
                             static_cast<int>(data_->remote_snd->getPort()));
397 398
        }

Stephen Morris's avatar
Stephen Morris committed
399 400 401 402 403 404 405 406
        // Stop requested, cancel and I/O's on the socket and shut it down,
        // and cancel the timer.
        data_->socket->cancel();
        data_->socket->close();

        data_->timer.cancel();

        // Execute the I/O completion callback (if present).
407
        if (data_->callback) {
Stephen Morris's avatar
Stephen Morris committed
408
            (*(data_->callback))(result);
409 410 411 412
        }
    }
}

413 414
// Log an error - called on I/O failure

415
void IOFetch::logIOFailure(asio::error_code ec) {
416

417
    // Should only get here with a known error code.
418 419 420 421
    assert((data_->origin == ASIODNS_OPENSOCK) ||
           (data_->origin == ASIODNS_SENDSOCK) ||
           (data_->origin == ASIODNS_RECVSOCK) ||
           (data_->origin == ASIODNS_UNKORIGIN));
422

423 424 425
    static const char* PROTOCOL[2] = {"TCP", "UDP"};
    logger.error(data_->origin,
                 ec.value(),
426
                 ((data_->remote_snd->getProtocol() == IPPROTO_TCP) ?
427
                     PROTOCOL[0] : PROTOCOL[1]),
428 429
                 data_->remote_snd->getAddress().toText().c_str(),
                 static_cast<int>(data_->remote_snd->getPort()));
430 431
}

432
} // namespace asiodns
433
} // namespace isc {