io_fetch.cc 16.4 KB
Newer Older
1
// Copyright (C) 2011-2017 Internet Systems Consortium, Inc. ("ISC")
2
//
3 4 5
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 7

#include <config.h>
8
#include <asiolink/asio_wrapper.h>
9
#include <asiolink/io_address.h>
10
#include <asiolink/io_asio_socket.h>
11
#include <asiolink/io_endpoint.h>
12 13 14 15 16
#include <asiolink/io_service.h>
#include <asiolink/tcp_endpoint.h>
#include <asiolink/tcp_socket.h>
#include <asiolink/udp_endpoint.h>
#include <asiolink/udp_socket.h>
17 18
#include <asiodns/io_fetch.h>
#include <asiodns/logger.h>
19 20 21
#include <dns/messagerenderer.h>
#include <dns/opcode.h>
#include <dns/rcode.h>
22 23
#include <util/buffer.h>
#include <util/random/qid_gen.h>
24

25 26 27 28 29 30 31 32
#include <boost/bind.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>

#include <unistd.h>             // for some IPC/network system calls
#include <netinet/in.h>
#include <stdint.h>
#include <sys/socket.h>
33

34
using namespace boost::asio;
35
using namespace isc::asiolink;
36
using namespace isc::dns;
37
using namespace isc::util;
38
using namespace isc::util::random;
Stephen Morris's avatar
Stephen Morris committed
39 40
using namespace isc::log;
using namespace std;
41

42 43
namespace isc {
namespace asiodns {
44

45 46
// Log debug verbosity

47 48 49
const int DBG_IMPORTANT = DBGLVL_TRACE_BASIC;
const int DBG_COMMON = DBGLVL_TRACE_DETAIL;
const int DBG_ALL = DBGLVL_TRACE_DETAIL + 20;
50

51 52
/// \brief IOFetch Data
///
53 54 55 56 57
/// The data for IOFetch is held in a separate struct pointed to by a shared_ptr
/// object.  This is because the IOFetch object will be copied often (it is used
/// as a coroutine and passed as callback to many async_*() functions) and we
/// want keep the same data).  Organising the data in this way keeps copying to
/// a minimum.
58 59 60 61 62
struct IOFetchData {

    // The first two members are shared pointers to a base class because what is
    // actually instantiated depends on whether the fetch is over UDP or TCP,
    // which is not known until construction of the IOFetch.  Use of a shared
63
    // pointer here is merely to ensure deletion when the data object is deleted.
64
    boost::scoped_ptr<IOAsioSocket<IOFetch> > socket;
65 66 67
                                             ///< Socket to use for I/O
    boost::scoped_ptr<IOEndpoint> remote_snd;///< Where the fetch is sent
    boost::scoped_ptr<IOEndpoint> remote_rcv;///< Where the response came from
68 69
    OutputBufferPtr   msgbuf;      ///< Wire buffer for question
    OutputBufferPtr   received;    ///< Received data put here
70
    IOFetch::Callback*          callback;    ///< Called on I/O Completion
71
    boost::asio::deadline_timer timer;       ///< Timer to measure timeouts
72 73 74 75 76 77
    IOFetch::Protocol           protocol;    ///< Protocol being used
    size_t                      cumulative;  ///< Cumulative received amount
    size_t                      expected;    ///< Expected amount of data
    size_t                      offset;      ///< Offset to receive data
    bool                        stopped;     ///< Have we stopped running?
    int                         timeout;     ///< Timeout in ms
Stephen Morris's avatar
Stephen Morris committed
78
    bool                        packet;      ///< true if packet was supplied
79 80 81 82 83 84 85

    // In case we need to log an error, the origin of the last asynchronous
    // I/O is recorded.  To save time and simplify the code, this is recorded
    // as the ID of the error message that would be generated if the I/O failed.
    // This means that we must make sure that all possible "origins" take the
    // same arguments in their message in the same order.
    isc::log::MessageID         origin;     ///< Origin of last asynchronous I/O
86 87
    uint8_t                     staging[IOFetch::STAGING_LENGTH];
                                            ///< Temporary array for received data
88
    isc::dns::qid_t             qid;         ///< The QID set in the query
89 90 91 92 93

    /// \brief Constructor
    ///
    /// Just fills in the data members of the IOFetchData structure
    ///
94
    /// \param proto Either IOFetch::TCP or IOFetch::UDP.
95
    /// \param service I/O Service object to handle the asynchronous
96
    ///        operations.
97 98 99
    /// \param address IP address of upstream server
    /// \param port Port to use for the query
    /// \param buff Output buffer into which the response (in wire format)
100
    ///        is written (if a response is received).
101
    /// \param cb Callback object containing the callback to be called
102 103
    ///        when we terminate.  The caller is responsible for managing this
    ///        object and deleting it if necessary.
104 105 106
    /// \param wait Timeout for the fetch (in ms).
    ///
    /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
107
    IOFetchData(IOFetch::Protocol proto, IOService& service,
108 109
        const IOAddress& address, uint16_t port, OutputBufferPtr& buff,
        IOFetch::Callback* cb, int wait)
110
        :
111
        socket((proto == IOFetch::UDP) ?
112 113 114 115 116
            static_cast<IOAsioSocket<IOFetch>*>(
                new UDPSocket<IOFetch>(service)) :
            static_cast<IOAsioSocket<IOFetch>*>(
                new TCPSocket<IOFetch>(service))
            ),
117 118 119 120 121
        remote_snd((proto == IOFetch::UDP) ?
            static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
            static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
            ),
        remote_rcv((proto == IOFetch::UDP) ?
122 123 124
            static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
            static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
            ),
125
        msgbuf(new OutputBuffer(512)),
126
        received(buff),
127
        callback(cb),
128 129
        timer(service.get_io_service()),
        protocol(proto),
130
        cumulative(0),
131 132
        expected(0),
        offset(0),
133 134
        stopped(false),
        timeout(wait),
Stephen Morris's avatar
Stephen Morris committed
135
        packet(false),
136
        origin(ASIODNS_UNKNOWN_ORIGIN),
137 138
        staging(),
        qid(QidGenerator::getInstance().generateQid())
139
    {}
140 141 142 143 144 145 146 147 148 149

    // Checks if the response we received was ok;
    // - data contains the buffer we read, as well as the address
    // we sent to and the address we received from.
    // length is provided by the operator() in IOFetch.
    // Addresses must match, number of octets read must be at least
    // 2, and the first two octets must match the qid of the message
    // we sent.
    bool responseOK() {
        return (*remote_snd == *remote_rcv && cumulative >= 2 &&
150
                readUint16(received->getData(), received->getLength()) == qid);
151
    }
152 153
};

154
/// IOFetch Constructor - just initialize the private data
Stephen Morris's avatar
Stephen Morris committed
155

156
IOFetch::IOFetch(Protocol protocol, IOService& service,
Dima Volodin's avatar
Dima Volodin committed
157 158
    const isc::dns::Question& question, const IOAddress& address,
    uint16_t port, OutputBufferPtr& buff, Callback* cb, int wait, bool edns)
159
{
160 161
    MessagePtr query_msg(new Message(Message::RENDER));
    initIOFetch(query_msg, protocol, service, question, address, port, buff,
Dima Volodin's avatar
Dima Volodin committed
162
                cb, wait, edns);
163 164
}

Stephen Morris's avatar
Stephen Morris committed
165 166 167 168 169 170 171 172 173 174 175
IOFetch::IOFetch(Protocol protocol, IOService& service,
    OutputBufferPtr& outpkt, const IOAddress& address, uint16_t port,
    OutputBufferPtr& buff, Callback* cb, int wait)
    :
    data_(new IOFetchData(protocol, service,
          address, port, buff, cb, wait))
{
    data_->msgbuf = outpkt;
    data_->packet = true;
}

176 177 178 179 180
IOFetch::IOFetch(Protocol protocol, IOService& service,
    ConstMessagePtr query_message, const IOAddress& address, uint16_t port,
    OutputBufferPtr& buff, Callback* cb, int wait)
{
    MessagePtr msg(new Message(Message::RENDER));
181 182 183 184 185

    msg->setHeaderFlag(Message::HEADERFLAG_RD,
                       query_message->getHeaderFlag(Message::HEADERFLAG_RD));
    msg->setHeaderFlag(Message::HEADERFLAG_CD,
                       query_message->getHeaderFlag(Message::HEADERFLAG_CD));
186 187 188 189 190 191 192

    initIOFetch(msg, protocol, service,
                **(query_message->beginQuestion()),
                address, port, buff, cb, wait);
}

void
193 194
IOFetch::initIOFetch(MessagePtr& query_msg, Protocol protocol,
                     IOService& service,
195 196
                     const isc::dns::Question& question,
                     const IOAddress& address, uint16_t port,
Dima Volodin's avatar
Dima Volodin committed
197
                     OutputBufferPtr& buff, Callback* cb, int wait, bool edns)
198 199 200 201 202 203 204 205 206
{
    data_ = boost::shared_ptr<IOFetchData>(new IOFetchData(
        protocol, service, address, port, buff, cb, wait));

    query_msg->setQid(data_->qid);
    query_msg->setOpcode(Opcode::QUERY());
    query_msg->setRcode(Rcode::NOERROR());
    query_msg->setHeaderFlag(Message::HEADERFLAG_RD);
    query_msg->addQuestion(question);
Dima Volodin's avatar
Dima Volodin committed
207 208 209 210 211 212 213

    if (edns) {
        EDNSPtr edns_query(new EDNS());
        edns_query->setUDPSize(Message::DEFAULT_MAX_EDNS0_UDPSIZE);
        query_msg->setEDNS(edns_query);
    }

214 215
    MessageRenderer renderer;
    renderer.setBuffer(data_->msgbuf.get());
216
    query_msg->toWire(renderer);
217
    renderer.setBuffer(NULL);
218 219
}

220 221 222 223 224 225 226
// Return protocol in use.

IOFetch::Protocol
IOFetch::getProtocol() const {
    return (data_->protocol);
}

227 228
/// The function operator is implemented with the "stackless coroutine"
/// pattern; see internal/coroutine.h for details.
Stephen Morris's avatar
Stephen Morris committed
229

230
void
231
IOFetch::operator()(boost::system::error_code ec, size_t length) {
232

233 234
    if (data_->stopped) {
        return;
235 236

    // On Debian it has been often observed that boost::asio async
Josh Soref's avatar
Josh Soref committed
237
    // operations result in EINPROGRESS. This doesn't necessarily
238 239
    // indicate an issue. Thus, we continue as if no error occurred.
    } else if (ec && (ec.value() != boost::asio::error::in_progress)) {
240
        logIOFailure(ec);
241 242 243 244
        return;
    }

    CORO_REENTER (this) {
Stephen Morris's avatar
Stephen Morris committed
245

246 247 248 249
        /// Generate the upstream query and render it to wire format
        /// This is done in a different scope to allow inline variable
        /// declarations.
        {
Stephen Morris's avatar
Stephen Morris committed
250 251 252 253 254
            if (data_->packet) {
                // A packet was given, overwrite the QID (which is in the
                // first two bytes of the packet).
                data_->msgbuf->writeUint16At(data_->qid, 0);

255
            } 
256 257
        }

258 259
        // If we timeout, we stop, which will can cancel outstanding I/Os and
        // shutdown everything.
260 261 262 263 264 265 266 267
        if (data_->timeout != -1) {
            data_->timer.expires_from_now(boost::posix_time::milliseconds(
                data_->timeout));
            data_->timer.async_wait(boost::bind(&IOFetch::stop, *this,
                TIME_OUT));
        }

        // Open a connection to the target system.  For speed, if the operation
268
        // is synchronous (i.e. UDP operation) we bypass the yield.
269
        data_->origin = ASIODNS_OPEN_SOCKET;
270
        if (data_->socket->isOpenSynchronous()) {
271
            data_->socket->open(data_->remote_snd.get(), *this);
272
        } else {
273
            CORO_YIELD data_->socket->open(data_->remote_snd.get(), *this);
274 275
        }

Stephen Morris's avatar
Stephen Morris committed
276
        do {
277 278
            // Begin an asynchronous send, and then yield.  When the send completes,
            // we will resume immediately after this point.
279
            data_->origin = ASIODNS_SEND_DATA;
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
            CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
                data_->msgbuf->getLength(), data_->remote_snd.get(), *this);
    
            // Now receive the response.  Since TCP may not receive the entire
            // message in one operation, we need to loop until we have received
            // it. (This can't be done within the asyncReceive() method because
            // each I/O operation will be done asynchronously and between each one
            // we need to yield ... and we *really* don't want to set up another
            // coroutine within that method.)  So after each receive (and yield),
            // we check if the operation is complete and if not, loop to read again.
            //
            // Another concession to TCP is that the amount of is contained in the
            // first two bytes.  This leads to two problems:
            //
            // a) We don't want those bytes in the return buffer.
            // b) They may not both arrive in the first I/O.
            //
            // So... we need to loop until we have at least two bytes, then store
            // the expected amount of data.  Then we need to loop until we have
            // received all the data before copying it back to the user's buffer.
Josh Soref's avatar
Josh Soref committed
300
            // And we want to minimize the amount of copying...
301
    
302
            data_->origin = ASIODNS_READ_DATA;
303 304
            data_->cumulative = 0;          // No data yet received
            data_->offset = 0;              // First data into start of buffer
305
            data_->received->clear();       // Clear the receive buffer
306 307 308 309 310 311 312 313 314
            do {
                CORO_YIELD data_->socket->asyncReceive(data_->staging,
                                                       static_cast<size_t>(STAGING_LENGTH),
                                                       data_->offset,
                                                       data_->remote_rcv.get(), *this);
            } while (!data_->socket->processReceivedData(data_->staging, length,
                                                         data_->cumulative, data_->offset,
                                                         data_->expected, data_->received));
        } while (!data_->responseOK());
315

316 317
        // Finished with this socket, so close it.  This will not generate an
        // I/O error, but reset the origin to unknown in case we change this.
318
        data_->origin = ASIODNS_UNKNOWN_ORIGIN;
319 320
        data_->socket->close();

321 322 323 324 325 326 327 328 329
        /// We are done
        stop(SUCCESS);
    }
}

// Function that stops the coroutine sequence.  It is called either when the
// query finishes or when the timer times out.  Either way, it sets the
// "stopped_" flag and cancels anything that is in progress.
//
330 331
// As the function may be entered multiple times as things wind down, it checks
// if the stopped_ flag is already set.  If it is, the call is a no-op.
Stephen Morris's avatar
Stephen Morris committed
332

333 334
void
IOFetch::stop(Result result) {
335

336
    if (!data_->stopped) {
337 338 339 340 341 342 343 344 345 346 347 348 349

        // Mark the fetch as stopped to prevent other completion callbacks
        // (invoked because of the calls to cancel()) from executing the
        // cancel calls again.
        //
        // In a single threaded environment, the callbacks won't be invoked
        // until this one completes. In a multi-threaded environment, they may
        // well be, in which case the testing (and setting) of the stopped_
        // variable should be done inside a mutex (and the stopped_ variable
        // declared as "volatile").
        //
        // TODO: Update testing of stopped_ if threads are used.
        data_->stopped = true;
350 351
        switch (result) {
            case TIME_OUT:
352
                LOG_DEBUG(logger, DBG_COMMON, ASIODNS_READ_TIMEOUT).
353 354
                    arg(data_->remote_snd->getAddress().toText()).
                    arg(data_->remote_snd->getPort());
355 356
                break;

357
            case SUCCESS:
358
                LOG_DEBUG(logger, DBG_ALL, ASIODNS_FETCH_COMPLETED).
359 360
                    arg(data_->remote_rcv->getAddress().toText()).
                    arg(data_->remote_rcv->getPort());
361 362
                break;

363 364 365 366
            case STOPPED:
                // Fetch has been stopped for some other reason.  This is
                // allowed but as it is unusual it is logged, but with a lower
                // debug level than a timeout (which is totally normal).
367
                LOG_DEBUG(logger, DBG_IMPORTANT, ASIODNS_FETCH_STOPPED).
368 369
                    arg(data_->remote_snd->getAddress().toText()).
                    arg(data_->remote_snd->getPort());
370 371
                break;

372
            default:
373
                LOG_ERROR(logger, ASIODNS_UNKNOWN_RESULT).
374 375
                    arg(data_->remote_snd->getAddress().toText()).
                    arg(data_->remote_snd->getPort());
376 377
        }

Stephen Morris's avatar
Stephen Morris committed
378 379 380 381 382 383 384 385
        // Stop requested, cancel and I/O's on the socket and shut it down,
        // and cancel the timer.
        data_->socket->cancel();
        data_->socket->close();

        data_->timer.cancel();

        // Execute the I/O completion callback (if present).
386
        if (data_->callback) {
Stephen Morris's avatar
Stephen Morris committed
387
            (*(data_->callback))(result);
388 389 390 391
        }
    }
}

392 393
// Log an error - called on I/O failure

394
void IOFetch::logIOFailure(boost::system::error_code ec) {
395

396
    // Should only get here with a known error code.
397 398 399
    assert((data_->origin == ASIODNS_OPEN_SOCKET) ||
           (data_->origin == ASIODNS_SEND_DATA) ||
           (data_->origin == ASIODNS_READ_DATA) ||
400
           (data_->origin == ASIODNS_UNKNOWN_ORIGIN));
401

402 403
    LOG_ERROR(logger, data_->origin).arg(ec.value()).
        arg((data_->remote_snd->getProtocol() == IPPROTO_TCP) ?
404
                     "TCP" : "UDP").
405 406
        arg(data_->remote_snd->getAddress().toText()).
        arg(data_->remote_snd->getPort());
407 408
}

409
} // namespace asiodns
410
} // namespace isc {