Commit e01aeb05 authored by Stephen Morris's avatar Stephen Morris
Browse files

[trac554] First stage of adding protocol-dependent upstream fetch

Admin tasks:
* Split out io_error.h from asiolink.h
* Made test files follow naming convention of main files
* More discriminatory includes in some files
* Updates tests/Makefile.am

Coding tasks:
* Add additional methods to io_socket.* and put in dummies in
  {tcp,udp}_socket.h
* Incorporated basic IO Fetch code from Scott
parent bbb0031f
...@@ -21,6 +21,7 @@ libasiolink_la_SOURCES += dns_answer.h ...@@ -21,6 +21,7 @@ libasiolink_la_SOURCES += dns_answer.h
libasiolink_la_SOURCES += simple_callback.h libasiolink_la_SOURCES += simple_callback.h
libasiolink_la_SOURCES += interval_timer.h interval_timer.cc libasiolink_la_SOURCES += interval_timer.h interval_timer.cc
libasiolink_la_SOURCES += recursive_query.h recursive_query.cc libasiolink_la_SOURCES += recursive_query.h recursive_query.cc
libasiolink_la_SOURCES += io_error.h
libasiolink_la_SOURCES += io_socket.cc io_socket.h libasiolink_la_SOURCES += io_socket.cc io_socket.h
libasiolink_la_SOURCES += io_message.h libasiolink_la_SOURCES += io_message.h
libasiolink_la_SOURCES += io_address.cc io_address.h libasiolink_la_SOURCES += io_address.cc io_address.h
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include <asiolink/io_endpoint.h> #include <asiolink/io_endpoint.h>
#include <asiolink/io_message.h> #include <asiolink/io_message.h>
#include <asiolink/io_socket.h> #include <asiolink/io_socket.h>
#include <asiolink/io_error.h>
/// \namespace asiolink /// \namespace asiolink
/// \brief A wrapper interface for the ASIO library. /// \brief A wrapper interface for the ASIO library.
...@@ -83,20 +84,6 @@ ...@@ -83,20 +84,6 @@
/// the placeholder of callback handlers: /// the placeholder of callback handlers:
/// http://think-async.com/Asio/asio-1.3.1/doc/asio/reference/asio_handler_allocate.html /// http://think-async.com/Asio/asio-1.3.1/doc/asio/reference/asio_handler_allocate.html
namespace asiolink {
/// \brief An exception that is thrown if an error occurs within the IO
/// module. This is mainly intended to be a wrapper exception class for
/// ASIO specific exceptions.
class IOError : public isc::Exception {
public:
IOError(const char* file, size_t line, const char* what) :
isc::Exception(file, line, what) {}
};
} // asiolink
#endif // __ASIOLINK_H #endif // __ASIOLINK_H
// Local Variables: // Local Variables:
......
// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#ifndef __IO_COMPLETION_CB_H
#define __IO_COMPLETION_CB_H
#include <asio/error_code.hpp>
#include <coroutine.h>
/// \brief Asynchronous I/O Completion Callback
///
/// The stackless coroutine code requires that there be an "entry function"
/// containing the coroutine macros. When the coroutine yields, its state is
/// stored and when the "entry function" is called again, it jumps to the
/// line when processing left off last time. In BIND-10, that "entry function"
/// is the Boost asynchronous I/O completion callback - in essence operator().
///
/// This class solves the problem of circularity in class definitions. In
/// BIND10, classes such as IOFetch contain the coroutine code. These include
/// objects of classes such as IOSocket, whose signature has to include the
/// callback object - IOFetch. By abstracting the I/O completion callback into
/// this class, that circularity is broken.
///
/// One more point: the asynchronous I/O functions take the callback object by
/// reference. But if a derived class object is passed as a reference to its
/// base class, "class slicing" takes place - the derived part of the class is
/// lost and only the base class functionality remains. By storing a pointer
/// to the true object and having the base class method call the derived class
/// method through that, the behaviour of class inheritance is restored. In
/// other words:
/// \code
/// class derived: public class base {
/// :
/// };
/// derived drv;
///
/// // Call with pointer to base class
/// void f(base* b, asio::error_code& ec, size_t length) {
/// b->operator()(ec, length);
/// }
///
/// // Call with reference to base class
/// void g(base& b, asio::error_code& ec, size_t length) {
/// b.operator()(ec, length);
/// }
///
/// void function xyz(derived *d, asio::error_code& ec, size_t length) {
/// f(d, ec, length); // Calls derived::operator()
/// g(*d, ec, length); // Also calls derived::operator()
/// }
/// \endcode
class IOCompletionCallback : public coroutine {
public:
/// \brief Constructor
IOCompletionCallback() : self_(this)
{}
/// \brief Virtual Destructor
virtual ~IOCompletionCallback()
{}
/// \brief Callback Method
virtual void operator()(asio::error_code ec = asio::error_code(),
size_t length = 0) {
(*self_)(ec, length);
}
private:
IOCompletionCallback* self_; ///< Pointer to real object
};
#endif // __IO_COMPLETION_CB_H
// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#ifndef __IO_ERROR_H
#define __IO_ERROR_H
#include <exceptions/exceptions.h>
namespace asiolink {
/// \brief An exception that is thrown if an error occurs within the IO
/// module. This is mainly intended to be a wrapper exception class for
/// ASIO specific exceptions.
class IOError : public isc::Exception {
public:
IOError(const char* file, size_t line, const char* what) :
isc::Exception(file, line, what) {}
};
} // asiolink
#endif // __IO_ERROR_H
...@@ -18,13 +18,12 @@ ...@@ -18,13 +18,12 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <boost/bind.hpp>
#include <asio.hpp> #include <asio.hpp>
#include <asio/deadline_timer.hpp> #include <asio/deadline_timer.hpp>
#include <asio/ip/address.hpp> #include <asio/ip/address.hpp>
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp> #include <boost/date_time/posix_time/posix_time_types.hpp>
#include <dns/buffer.h> #include <dns/buffer.h>
...@@ -35,7 +34,7 @@ ...@@ -35,7 +34,7 @@
#include <dns/rcode.h> #include <dns/rcode.h>
#include <asiolink.h> #include <asiolink.h>
#include <internal/coroutine.h> #include <coroutine.h>
#include <internal/udpdns.h> #include <internal/udpdns.h>
#include <internal/tcpdns.h> #include <internal/tcpdns.h>
#include <internal/iofetch.h> #include <internal/iofetch.h>
...@@ -50,137 +49,59 @@ using namespace isc::dns; ...@@ -50,137 +49,59 @@ using namespace isc::dns;
namespace asiolink { namespace asiolink {
struct TcpFetch::UdpData { // Constructor for the IOFetchData member
// UDP Socket we send query to and expect reply from there
udp::socket socket;
// Where was the query sent
udp::endpoint remote;
// What we ask the server
Question question;
// We will store the answer here
OutputBufferPtr buffer;
OutputBufferPtr msgbuf;
// Temporary buffer for answer
boost::shared_array<char> data;
// This will be called when the data arrive or timeouts
Callback* callback;
//Callback* callback;
// Did we already stop operating (data arrived, we timed out, someone
// called stop). This can be so when we are cleaning up/there are
// still pointers to us.
bool stopped;
// Timer to measure timeouts.
deadline_timer timer;
// How many milliseconds are we willing to wait for answer?
int timeout;
UdpData(io_service& service,
const udp::socket::protocol_type& protocol,
const Question &q,
OutputBufferPtr b, Callback *c) :
socket(service, protocol),
question(q),
buffer(b),
msgbuf(new OutputBuffer(512)),
callback(c),
stopped(false),
timer(service)
{ }
};
struct TcpFetch::TcpData {
// TCP Socket
tcp::socket socket;
// tcp endpoint
tcp::endpoint remote;
// What we ask the server
Question question;
// We will store the answer here
OutputBufferPtr buffer;
OutputBufferPtr msgbuf;
// Temporary buffer for answer
boost::shared_array<char> data;
// This will be called when the data arrive or timeouts
Callback* callback;
// Did we already stop operating (data arrived, we timed out, someone
// called stop). This can be so when we are cleaning up/there are
// still pointers to us.
bool stopped;
// Timer to measure timeouts.
deadline_timer timer;
// How many milliseconds are we willing to wait for answer?
int timeout;
TcpData(io_service& service, /// \brief Constructor
const tcp::socket::protocol_type& protocol, ///
const Question &q, /// Just fills in the data members of the IOFetchData structure
OutputBufferPtr b, Callback *c) : ///
socket(service, protocol), /// \param io_service I/O Service object to handle the asynchronous
question(q), /// operations.
buffer(b), /// \param question DNS question to send to the upstream server.
msgbuf(new OutputBuffer(512)), /// \param address IP address of upstream server
callback(c), /// \param port Port to use for the query
stopped(false), /// \param buffer Output buffer into which the response (in wire format)
timer(service) /// is written (if a response is received).
{ } /// \param callback Callback object containing the callback to be called
/// when we terminate. The caller is responsible for managing this
/// object and deleting it if necessary.
}; /// \param timeout Timeout for the fetch (in ms). The default value of
/// -1 indicates no timeout.
UdpFetch::UdpFetch(io_service& io_service, const Question& q, /// \param protocol Protocol to use for the fetch. The default is UDP
const IOAddress& addr, uint16_t port,
OutputBufferPtr buffer, Callback *callback, int timeout) : IOFetch::IOFetchData::IOFetchData(IOService& io_service,
data_(new UdpData(io_service, const isc::dns::Question& query, const IOAddress& address, uint16_t port,
addr.getFamily() == AF_INET ? udp::v4() : udp::v6(), isc::dns::OutputBufferPtr buff, Callback* cb, int wait, int protocol)
q, buffer, callback)) :
{ socket((protocol == IPPROTO_UDP) ?
data_->remote = UDPEndpoint(addr, port).getASIOEndpoint(); static_cast<IOSocket*>(new UDPSocket(io_service, address)) :
data_->timeout = timeout; static_cast<IOSocket*>(new TCPSocket(io_service, address))
} ),
TcpFetch::TcpFetch(io_service& io_service, const Question& q, remote((protocol == IPPROTO_UDP) ?
const IOAddress& addr, uint16_t port, static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
OutputBufferPtr buffer, Callback *callback, int timeout) : static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
tcp_data_(new TcpData(io_service, ),
addr.getFamily() == AF_INET ? udp::v4() : udp::v6(), question(query),
q, buffer, callback)) buffer(buff),
msgbuf(new OutputBuffer(512)), // TODO: Why this number?
data(new char[IOFetch::MAX_LENGTH]),
callback(cb),
stopped(false),
timer(io_service.get_io_service()),
timeout(wait)
{ {
tcp_data_->remote = TCPEndpoint(addr, port).getASIOEndpoint();
tcp_data_->timeout = timeout;
} }
//
/// The following functions implement the \c IOFetch class.
/// /// IOFetch Constructor - just initialize the private data
/// The constructor IOFetch::IOFetch(IOService& io_service, const Question& question,
IOFetch::IOFetch(io_service& io_service, const Question& q, const IOAddress& address, uint16_t port, OutputBufferPtr buffer,
const IOAddress& addr, uint16_t port, Callback *callback, int timeout, int protocol) :
OutputBufferPtr buffer, Callback *callback, int timeout, data_(new IOFetch::IOFetchData(io_service, question, address, port,
int protocol) buffer, callback, timeout, protocol)
)
{ {
if (protocol == IPPROTO_TCP)
{
TcpFetch(io_service, q, addr, port, buffer, callback, timeout);
/*
tcp_data_ = new TcpData(io_service,
addr.getFamily() == AF_INET ? udp::v4() : udp::v6(),
q, buffer, callback);
tcp_data_->remote = TCPEndpoint(addr, port).getASIOEndpoint();
tcp_data_->timeout = timeout;
*/
}
else
{
UdpFetch(io_service, q, addr, port, buffer, callback, timeout);
/*
data_(new UdpData(io_service,
addr.getFamily() == AF_INET ? udp::v4() : udp::v6(),
q, buffer, callback));
data_->remote = UDPEndpoint(addr, port).getASIOEndpoint();
data_->timeout = timeout;
*/
}
} }
/// The function operator is implemented with the "stackless coroutine" /// The function operator is implemented with the "stackless coroutine"
...@@ -198,7 +119,7 @@ IOFetch::operator()(error_code ec, size_t length) { ...@@ -198,7 +119,7 @@ IOFetch::operator()(error_code ec, size_t length) {
{ {
Message msg(Message::RENDER); Message msg(Message::RENDER);
// XXX: replace with boost::random or some other suitable PRNG // TODO: replace with boost::random or some other suitable PRNG
msg.setQid(0); msg.setQid(0);
msg.setOpcode(Opcode::QUERY()); msg.setOpcode(Opcode::QUERY());
msg.setRcode(Rcode::NOERROR()); msg.setRcode(Rcode::NOERROR());
...@@ -207,7 +128,7 @@ IOFetch::operator()(error_code ec, size_t length) { ...@@ -207,7 +128,7 @@ IOFetch::operator()(error_code ec, size_t length) {
MessageRenderer renderer(*data_->msgbuf); MessageRenderer renderer(*data_->msgbuf);
msg.toWire(renderer); msg.toWire(renderer);
dlog("Sending " + msg.toText() + " to " + dlog("Sending " + msg.toText() + " to " +
data_->remote.address().to_string()); data_->remote->getAddress().toText());
} }
...@@ -220,10 +141,17 @@ IOFetch::operator()(error_code ec, size_t length) { ...@@ -220,10 +141,17 @@ IOFetch::operator()(error_code ec, size_t length) {
TIME_OUT)); TIME_OUT));
} }
// Open a connection to the target system. For speed, if the operation
// was a no-op (i.e. UDP operation) we bypass the yield.
bool do_yield = data_->socket->open(data->remote.get(), *this);
if (do_yield) {
CORO_YIELD;
}
// Begin an asynchronous send, and then yield. When the // Begin an asynchronous send, and then yield. When the
// send completes, we will resume immediately after this point. // send completes, we will resume immediately after this point.
CORO_YIELD data_->socket.async_send_to(buffer(data_->msgbuf->getData(), CORO_YIELD data_->socket->async_send(data_->msgbuf->getData(),
data_->msgbuf->getLength()), data_->remote, *this); data_->msgbuf->getLength(), data_->remote.get(), *this);
/// Allocate space for the response. (XXX: This should be /// Allocate space for the response. (XXX: This should be
/// optimized by maintaining a free list of pre-allocated blocks) /// optimized by maintaining a free list of pre-allocated blocks)
...@@ -231,15 +159,16 @@ IOFetch::operator()(error_code ec, size_t length) { ...@@ -231,15 +159,16 @@ IOFetch::operator()(error_code ec, size_t length) {
/// Begin an asynchronous receive, and yield. When the receive /// Begin an asynchronous receive, and yield. When the receive
/// completes, we will resume immediately after this point. /// completes, we will resume immediately after this point.
CORO_YIELD data_->socket.async_receive_from(buffer(data_->data.get(), CORO_YIELD data_->socket->async_receive(data_->data.get(),
MAX_LENGTH), data_->remote, *this); static_cast<size_t>(MAX_LENGTH), data_->remote.get(), *this);
// The message is not rendered yet, so we can't print it easilly // The message is not rendered yet, so we can't print it easilly
dlog("Received response from " + data_->remote.address().to_string()); dlog("Received response from " + data_->remote->getAddress().toText());
/// Copy the answer into the response buffer. (XXX: If the /// Copy the answer into the response buffer. (TODO: If the
/// OutputBuffer object were made to meet the requirements of /// OutputBuffer object were made to meet the requirements of
/// a MutableBufferSequence, then it could be written to directly /// a MutableBufferSequence, then it could be written to directly
/// by async_recieve_from() and this additional copy step would /// by async_receive_from() and this additional copy step would
/// be unnecessary.) /// be unnecessary.)
data_->buffer->writeData(data_->data.get(), length); data_->buffer->writeData(data_->data.get(), length);
...@@ -248,6 +177,13 @@ IOFetch::operator()(error_code ec, size_t length) { ...@@ -248,6 +177,13 @@ IOFetch::operator()(error_code ec, size_t length) {
} }
} }
// Function that stops the coroutine sequence. It is called either when the
// query finishes or when the timer times out. Either way, it sets the
// "stopped_" flag and cancels anything that is in progress.
//
// As the function may be entered multiple times as things wind down, the
// stopped_ flag checks if stop() has already been called. If it has,
// subsequent calls are no-ops.
void void
IOFetch::stop(Result result) { IOFetch::stop(Result result) {
if (!data_->stopped) { if (!data_->stopped) {
...@@ -255,19 +191,23 @@ IOFetch::stop(Result result) { ...@@ -255,19 +191,23 @@ IOFetch::stop(Result result) {
case TIME_OUT: case TIME_OUT:
dlog("Query timed out"); dlog("Query timed out");
break; break;
case STOPPED: case STOPPED:
dlog("Query stopped"); dlog("Query stopped");
break; break;
default:;
default:
;
} }
data_->stopped = true; data_->stopped = true;
data_->socket.cancel(); data_->socket->cancel(); // Cancel outstanding I/O
data_->socket.close(); data_->socket->close(); // ... and close the socket
data_->timer.cancel(); data_->timer.cancel(); // Cancel timeout timer
if (data_->callback) { if (data_->callback) {
(*data_->callback)(result); (*(data_->callback))(result); // Call callback
} }
} }
} }
} } // namespace asiolink
// Copyright (C) 2010 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#ifndef __IOFETCH_H
#define __IOFETCH_H 1
#include <netinet/in.h>
#include <config.h>
#include <asio.hpp>
#include <boost/shared_array.hpp>
#include <boost/shared_ptr.hpp>
#include <asio/deadline_timer.hpp>
#include <dns/buffer.h>
#include <dns/question.h>
#include <asiolink/asiolink.h>
#include <asiolink/ioaddress.h>
#include <asiolink/iocompletioncb.h>
#include <asiolink/iocompletioncb.h>
#include <asiolink/iosocket.h>
#include <asiolink/ioendpoint.h>
#include <coroutine.h>
namespace asiolink {
/// \brief Upstream Fetch Processing
///
/// IOFetch is the class used to send upstream fetches and to handle responses.
/// It is a base class containing most of the logic, although the ASIO will
/// actually instantiate one of the derived classes TCPFetch or UDPFetch.
/// (These differ in the type of socket and endpoint.)
class IOFetch : public IOCompletionCallback {
public:
/// \brief Result of Upstream Fetch
///
/// Note that this applies to the status of I/Os in the fetch - a fetch
/// that resulted in a packet being received from the server is a SUCCESS,
/// even if the contents of the packet indicate that some error occurred.
enum Result {
SUCCESS = 0, ///< Success, fetch completed
TIME_OUT, ///< Failure, fetch timed out
STOPPED ///< Control code, fetch has been stopped
};
// The next enum is a "trick" to allow constants to be defined in a class
// declaration.
/// \brief Integer Constants
enum {
MAX_LENGTH = 4096 ///< Maximum size of receive buffer
};
/// \brief I/O Fetch Callback
///
/// Callback object for when the fetch itself has completed. Note that this
/// is different to the IOCompletionCallback; that is used to signal the
/// completion of an asynchronous I/O call. The IOFetch::Callback is called
/// when an upstream fetch - which may have involved several asynchronous
/// I/O operations - has completed.
///
/// This is an abstract class.
class Callback {
public:
/// \brief Default Constructor
Callback()
{}
/// \brief Virtual Destructor
virtual ~Callback()
{}
/// \brief Callback method called when the fetch completes