Commit b13c2fd0 authored by Stephen Morris's avatar Stephen Morris

[trac499] Checkpoint of work to end of Friday 25 Feb 2011

parent 7c419681
......@@ -36,6 +36,14 @@ namespace asiolink {
class DummyIOCallback {
public:
/// \brief Asynchronous I/O callback method
///
/// \param error Unused
void operator()(asio::error_code)
{
// TODO: log an error if this method ever gets called.
}
/// \brief Asynchronous I/O callback method
///
/// \param error Unused
......
......@@ -50,6 +50,16 @@ public:
IOError(file, line, what) {}
};
/// \brief Buffer Overflow
///
/// Thrown if an attempt is made to receive into an area beyond the end of
/// the receive data buffer.
class BufferOverflow : public IOError {
public:
BufferOverflow(const char* file, size_t line, const char* what) :
IOError(file, line, what) {}
};
/// Forward declaration of an IOEndpoint
class IOEndpoint;
......@@ -129,32 +139,47 @@ public:
/// \return IPPROTO_TCP for TCP sockets
virtual int getProtocol() const = 0;
/// \brief Open AsioSocket
/// \brief Is Open() synchronous?
///
/// Opens the socket for asynchronous I/O. On a UDP socket, this is merely
/// an "open()" on the underlying socket (so completes immediately), but on
/// a TCP socket it also connects to the remote end (which is done as an
/// On a UDP socket, an "open" operation is merely a call to "open()" on
/// the underlying socket (so completes immediately), but on a TCP socket it
/// also includings connecting to the remote end (which is done as an
/// asynchronous operation).
///
/// For TCP, signalling of the completion of the operation is done by
/// by calling the callback function in the normal way. This could be done
/// for UDP (by posting en event on the event queue); however, that will
/// incur additional overhead in the most common case. Instead, the return
/// value indicates whether the operation was asynchronous or not. If yes,
/// (i.e. TCP) the callback has been posted to the event queue: if no (UDP),
/// no callback has been posted (in which case it is up to the caller as to
/// whether they want to manually post the callback themself.)
/// incur additional overhead in the most common case. So we give the
/// caller the choice for calling this open() method synchronously or
/// asynchronously.
///
/// Owing to the way that the stackless coroutines are implemented, we need
/// to know _before_ executing the operation whether or not the open is
/// asynchronous. So this method simply provides that information.
///
/// (The reason there is a need to know is because the call to open() passes
/// in the state of the coroutine at the time the call is made. On an
/// asynchronous I/O, we need to set the state to point to the statement
/// after the call to open() before we pass the corotuine to the open()
/// call. Unfortunately, the macros that do this also yield control - which
/// we don't want to do if the open is synchronous. Hence we need to know
/// before we make the call to open() whether that call will complete
/// asynchronously.)
virtual bool isOpenSynchronous() const = 0;
/// \brief Open AsioSocket
///
/// Opens the socket for asynchronous I/O. The open will complete
/// synchronously on UCP or asynchronously on TCP (in which case a callback
/// will be queued): what will happen can be found by calling the method
/// isOpenSynchronous().
///
/// \param endpoint Pointer to the endpoint object. This is ignored for
/// a UDP socket (the target is specified in the send call), but should
/// be of type TCPEndpoint for a TCP connection.
/// \param callback I/O Completion callback, called when the operation has
/// completed, but only if the operation was asynchronous.
///
/// \return true if an asynchronous operation was started and the caller
/// should yield and wait for completion, false if the operation was
/// completed synchronously and no callback was queued.
virtual bool open(const IOEndpoint* endpoint, C& callback) = 0;
virtual void open(const IOEndpoint* endpoint, C& callback) = 0;
/// \brief Send Asynchronously
///
......@@ -167,7 +192,7 @@ public:
/// \param endpoint Target of the send
/// \param callback Callback object.
virtual void asyncSend(const void* data, size_t length,
const IOEndpoint* endpoint, C& callback) = 0;
const IOEndpoint* endpoint, C& callback) = 0;
/// \brief Receive Asynchronously
///
......@@ -178,11 +203,11 @@ public:
///
/// \param data Buffer to receive incoming message
/// \param length Length of the data buffer
/// \param cumulative Amount of data that should already be in the buffer.
/// \param offset Offset into buffer where data is to be put
/// \param endpoint Source of the communication
/// \param callback Callback object
virtual void asyncReceive(void* data, size_t length, size_t cumulative,
IOEndpoint* endpoint, C& callback) = 0;
virtual void asyncReceive(void* data, size_t length, size_t offset,
IOEndpoint* endpoint, C& callback) = 0;
/// \brief Checks if the data received is complete.
///
......@@ -204,7 +229,7 @@ public:
/// \return true if the receive is complete, false if another receive is
/// needed.
virtual bool receiveComplete(void* data, size_t length,
size_t& cumulative) = 0;
size_t& cumulative) = 0;
/// \brief Cancel I/O On AsioSocket
virtual void cancel() = 0;
......@@ -251,6 +276,13 @@ public:
virtual int getProtocol() const { return (protocol_); }
/// \brief Is socket opening synchronous?
///
/// \return true - it is for this class.
bool isOpenSynchronous() const {
return true;
}
/// \brief Open AsioSocket
///
/// A call that is a no-op on UDP sockets, this opens a connection to the
......@@ -280,7 +312,7 @@ public:
///
/// \param data Unused
/// \param length Unused
/// \param cumulative Unused
/// \param offset Unused
/// \param endpoint Unused
/// \param callback Unused
virtual void asyncReceive(void* data, size_t, size_t, IOEndpoint*, C&) {
......
......@@ -22,6 +22,7 @@
#include <asiolink/io_address.h>
#include <asiolink/io_error.h>
#include <asiolink/io_endpoint.h>
#include <asiolink/tcp_endpoint.h>
#include <asiolink/udp_endpoint.h>
......
......@@ -19,6 +19,9 @@
#include <netinet/in.h>
#include <boost/bind.hpp>
#include <boost/shared_array.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <dns/message.h>
#include <dns/messagerenderer.h>
......@@ -28,10 +31,18 @@
#include <log/logger.h>
#include <asio.hpp>
#include <asio/deadline_timer.hpp>
#include <asiolink/asiodef.h>
#include <asiolink/io_address.h>
#include <asiolink/io_asio_socket.h>
#include <asiolink/io_endpoint.h>
#include <asiolink/io_fetch.h>
#include <asiolink/io_service.h>
#include <asiolink/tcp_endpoint.h>
#include <asiolink/tcp_socket.h>
#include <asiolink/udp_endpoint.h>
#include <asiolink/udp_socket.h>
using namespace asio;
using namespace isc::dns;
......@@ -44,13 +55,87 @@ namespace asiolink {
isc::log::Logger logger("asio");
/// \brief IOFetch Data
///
/// The data for IOFetch is held in a separate struct pointed to by a
/// shared_ptr object. This is because the IOFetch object will be copied
/// often (it is used as a coroutine and passed as callback to many
/// async_*() functions) and we want keep the same data). Organising the
/// data in this way keeps copying to a minimum.
struct IOFetchData {
// The first two members are shared pointers to a base class because what is
// actually instantiated depends on whether the fetch is over UDP or TCP,
// which is not known until construction of the IOFetch. Use of a shared
//pointer here is merely to ensure deletion when the data object is deleted.
boost::shared_ptr<IOAsioSocket<IOFetch> > socket;
///< Socket to use for I/O
boost::shared_ptr<IOEndpoint> remote; ///< Where the fetch was sent
isc::dns::Question question; ///< Question to be asked
isc::dns::OutputBufferPtr msgbuf; ///< Wire buffer for question
isc::dns::OutputBufferPtr buffer; ///< Received data held here
boost::shared_array<char> data; ///< Temporary array for data
IOFetch::Callback* callback; ///< Called on I/O Completion
size_t cumulative; ///< Cumulative received amount
bool stopped; ///< Have we stopped running?
asio::deadline_timer timer; ///< Timer to measure timeouts
int timeout; ///< Timeout in ms
IOFetch::Origin origin; ///< Origin of last asynchronous I/O
/// \brief Constructor
///
/// Just fills in the data members of the IOFetchData structure
///
/// \param protocol Either IOFetch::TCP or IOFetch::UDP
/// \param service I/O Service object to handle the asynchronous
/// operations.
/// \param query DNS question to send to the upstream server.
/// \param address IP address of upstream server
/// \param port Port to use for the query
/// \param buff Output buffer into which the response (in wire format)
/// is written (if a response is received).
/// \param cb Callback object containing the callback to be called
/// when we terminate. The caller is responsible for managing this
/// object and deleting it if necessary.
/// \param wait Timeout for the fetch (in ms).
///
/// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
IOFetchData(IOFetch::Protocol protocol, IOService& service,
const isc::dns::Question& query, const IOAddress& address,
uint16_t port, isc::dns::OutputBufferPtr& buff, IOFetch::Callback* cb,
int wait)
:
socket((protocol == IOFetch::UDP) ?
static_cast<IOAsioSocket<IOFetch>*>(
new UDPSocket<IOFetch>(service)) :
static_cast<IOAsioSocket<IOFetch>*>(
new TCPSocket<IOFetch>(service))
),
remote((protocol == IOFetch::UDP) ?
static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
),
question(query),
msgbuf(new isc::dns::OutputBuffer(512)),
buffer(buff),
data(new char[IOFetch::MIN_LENGTH]),
callback(cb),
cumulative(0),
stopped(false),
timer(service.get_io_service()),
timeout(wait),
origin(IOFetch::NONE)
{}
};
/// IOFetch Constructor - just initialize the private data
IOFetch::IOFetch(Protocol protocol, IOService& service,
const isc::dns::Question& question, const IOAddress& address, uint16_t port,
isc::dns::OutputBufferPtr& buff, Callback* cb, int wait)
OutputBufferPtr& buff, Callback* cb, int wait)
:
data_(new IOFetch::IOFetchData(protocol, service, question, address,
data_(new IOFetchData(protocol, service, question, address,
port, buff, cb, wait))
{
}
......@@ -59,7 +144,9 @@ IOFetch::IOFetch(Protocol protocol, IOService& service,
/// pattern; see internal/coroutine.h for details.
void
IOFetch::operator()(error_code ec, size_t length) {
IOFetch::operator()(asio::error_code ec, size_t length) {
std::cerr << "IOFetch::operator() [" << this << "], origin = " <<
data_->origin << ", coroutine = " << get_value() << "\n";
if (data_->stopped) {
return;
} else if (ec) {
......@@ -91,7 +178,6 @@ IOFetch::operator()(error_code ec, size_t length) {
data_->remote->getAddress().toText());
}
// If we timeout, we stop, which will shutdown everything and
// cancel all other attempts to run inside the coroutine
if (data_->timeout != -1) {
......@@ -103,17 +189,26 @@ IOFetch::operator()(error_code ec, size_t length) {
// Open a connection to the target system. For speed, if the operation
// was completed synchronously (i.e. UDP operation) we bypass the yield.
if (data_->socket->open(data_->remote.get(), *this)) {
data_->origin = OPEN;
CORO_YIELD;
data_->origin = OPEN;
if (data_->socket->isOpenSynchronous()) {
std::cerr << "IOFetch: Opening socket synchronously\n";
data_->socket->open(data_->remote.get(), *this);
} else {
std::cerr << "IOFetch: Opening socket asynchronously and yeilding\n";
CORO_YIELD data_->socket->open(data_->remote.get(), *this);
std::cerr << "IOFetch: Resuming after Opening socket asynchronously\n";
}
// Begin an asynchronous send, and then yield. When the send completes
// send completes, we will resume immediately after this point.
// Note: A TCP message may not be sent in one piece (depends on the
// implementation in TCP socket). Therefore there may be
data_->origin = SEND;
std::cerr << "IOFetch: asynchronous send\n";
CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
data_->msgbuf->getLength(), data_->remote.get(), *this);
std::cerr << "IOFetch: resuming after asynchronous send\n";
// Now receive the response. Since TCP may not receive the entire
// message in one operation, we need to loop until we have received
// it. (This can't be done within the asyncReceive() method because
......@@ -123,9 +218,11 @@ IOFetch::operator()(error_code ec, size_t length) {
// we check if the operation is complete and if not, loop to read again.
data_->origin = RECEIVE;
do {
std::cerr << "IOFetch: asynchronous receive\n";
CORO_YIELD data_->socket->asyncReceive(data_->data.get(),
static_cast<size_t>(MAX_LENGTH), data_->cumulative,
static_cast<size_t>(MIN_LENGTH), data_->cumulative,
data_->remote.get(), *this);
std::cerr << "IOFetch: resuming after asynchronous receive\n";
} while (!data_->socket->receiveComplete(data_->data.get(), length,
data_->cumulative));
......@@ -141,6 +238,7 @@ IOFetch::operator()(error_code ec, size_t length) {
// Finished with this socket, so close it.
data_->origin = CLOSE;
std::cerr << "IOFetch: close\n";
data_->socket->close();
/// We are done
......@@ -230,7 +328,7 @@ IOFetch::stop(Result result) {
// Log an error - called on I/O failure
void IOFetch::logIOFailure(asio::error_code& ec) {
void IOFetch::logIOFailure(asio::error_code ec) {
// Get information that will be in all messages
static const char* PROTOCOL[2] = {"TCP", "UDP"};
......
......@@ -17,31 +17,24 @@
#include <config.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h> // for some IPC/network system calls
#include <boost/shared_array.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <asio/deadline_timer.hpp>
#include <asio/error_code.hpp>
#include <coroutine.h>
#include <dns/buffer.h>
#include <dns/question.h>
#include <asiolink/io_asio_socket.h>
#include <asiolink/io_endpoint.h>
#include <asiolink/io_service.h>
#include <asiolink/tcp_socket.h>
#include <asiolink/tcp_endpoint.h>
#include <asiolink/udp_socket.h>
#include <asiolink/udp_endpoint.h>
namespace asiolink {
// Forward declarations
class IOAddress;
class IOFetchData;
class IOService;
/// \brief Upstream Fetch Processing
///
......@@ -76,9 +69,9 @@ public:
/// even if the contents of the packet indicate that some error occurred.
enum Result {
SUCCESS = 0, ///< Success, fetch completed
TIME_OUT, ///< Failure, fetch timed out
STOPPED, ///< Control code, fetch has been stopped
NOTSET ///< For testing, indicates value not set
TIME_OUT = 1, ///< Failure, fetch timed out
STOPPED = 2, ///< Control code, fetch has been stopped
NOTSET = 3 ///< For testing, indicates value not set
};
// The next enum is a "trick" to allow constants to be defined in a class
......@@ -86,7 +79,7 @@ public:
/// \brief Integer Constants
enum {
MAX_LENGTH = 4096 ///< Maximum size of receive buffer
MIN_LENGTH = 4096 ///< Minimum size of receive buffer
};
/// \brief I/O Fetch Callback
......@@ -112,89 +105,12 @@ public:
virtual ~Callback()
{}
/// \brief Callback method called when the fetch completes /// \brief Origin of Asynchronous I/O Call
///
// The next enum is a "trick" to allow constants to be defined in a class
// declaration.
///
/// \brief result Result of the fetch
virtual void operator()(Result result) = 0;
};
/// \brief IOFetch Data
///
/// The data for IOFetch is held in a separate struct pointed to by a
/// shared_ptr object. This is because the IOFetch object will be copied
/// often (it is used as a coroutine and passed as callback to many
/// async_*() functions) and we want keep the same data). Organising the
/// data in this way keeps copying to a minimum.
struct IOFetchData {
// The next two members are shared pointers to a base class because what
// is actually instantiated depends on whether the fetch is over UDP or
// TCP, which is not known until construction of the IOFetch. Use of
// a shared pointer here is merely to ensure deletion when the data
// object is deleted.
boost::shared_ptr<IOAsioSocket<IOFetch> > socket;
///< Socket to use for I/O
boost::shared_ptr<IOEndpoint> remote; ///< Where the fetch was sent
isc::dns::Question question; ///< Question to be asked
isc::dns::OutputBufferPtr msgbuf; ///< Wire buffer for question
isc::dns::OutputBufferPtr buffer; ///< Received data held here
boost::shared_array<char> data; ///< Temporary array for data
IOFetch::Callback* callback; ///< Called on I/O Completion
size_t cumulative; ///< Cumulative received amount
bool stopped; ///< Have we stopped running?
asio::deadline_timer timer; ///< Timer to measure timeouts
int timeout; ///< Timeout in ms
Origin origin; ///< Origin of last asynchronous I/O
/// \brief Constructor
/// \brief Callback method
///
/// Just fills in the data members of the IOFetchData structure
/// This is the method called when the fecth completes.
///
/// \param proto Protocol: either IOFetch::TCP or IOFetch::UDP
/// \param service I/O Service object to handle the asynchronous
/// operations.
/// \param query DNS question to send to the upstream server.
/// \param address IP address of upstream server
/// \param port Port to use for the query
/// \param buff Output buffer into which the response (in wire format)
/// is written (if a response is received).
/// \param cb Callback object containing the callback to be called
/// when we terminate. The caller is responsible for managing this
/// object and deleting it if necessary.
/// \param wait Timeout for the fetch (in ms).
///
/// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
IOFetchData(Protocol proto, IOService& service,
const isc::dns::Question& query, const IOAddress& address,
uint16_t port, isc::dns::OutputBufferPtr& buff, Callback* cb,
int wait)
:
socket((proto == UDP) ?
static_cast<IOAsioSocket<IOFetch>*>(
new UDPSocket<IOFetch>(service)) :
static_cast<IOAsioSocket<IOFetch>*>(
new TCPSocket<IOFetch>(service))
),
remote((proto == UDP) ?
static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
),
question(query),
msgbuf(new isc::dns::OutputBuffer(512)),
buffer(buff),
data(new char[IOFetch::MAX_LENGTH]),
callback(cb),
cumulative(0),
stopped(false),
timer(service.get_io_service()),
timeout(wait),
origin(NONE)
{}
/// \param result Result of the fetch
virtual void operator()(Result result) = 0;
};
/// \brief Constructor.
......@@ -229,8 +145,16 @@ public:
///
/// \param ec Error code, the result of the last asynchronous I/O operation.
/// \param length Amount of data received on the last asynchronous read
void operator()(asio::error_code ec = asio::error_code(),
size_t length = 0);
void operator()(asio::error_code ec, size_t length);
void operator()(asio::error_code ec) {
operator()(ec, 0);
}
void operator()() {
asio::error_code ec;
operator()(ec);
}
/// \brief Terminate query
///
......@@ -246,7 +170,7 @@ private:
/// Records an I/O failure to the log file
///
/// \param ec ASIO error code
void logIOFailure(asio::error_code& ec);
void logIOFailure(asio::error_code ec);
boost::shared_ptr<IOFetchData> data_; ///< Private data
......
......@@ -24,32 +24,33 @@
namespace asiolink {
/// \brief The \c TCPEndpoint class is a concrete derived class of
/// \c IOEndpoint that represents an endpoint of a TCP connection.
/// \c IOEndpoint that represents an endpoint of a TCP packet.
///
/// In the current implementation, an object of this class is always
/// instantiated within the wrapper routines. Applications are expected to
/// get access to the object via the abstract base class, \c IOEndpoint.
/// This design may be changed when we generalize the wrapper interface.
///
/// Note: this implementation is optimized for the case where this object
/// is created from an ASIO endpoint object in a receiving code path
/// by avoiding to make a copy of the base endpoint. For TCP it may not be
/// a big deal, but when we receive UDP packets at a high rate, the copy
/// overhead might be significant.
/// Other notes about \c TCPEndpoint applies to this class, too.
class TCPEndpoint : public IOEndpoint {
public:
///
/// \name Constructors and Destructor
/// \name Constructors and Destructor.
///
//@{
/// \brief Default Constructor
///
/// Creates an internal endpoint. This is expected to be set by some
/// external call.
TCPEndpoint() :
asio_endpoint_placeholder_(new asio::ip::tcp::endpoint()),
asio_endpoint_(*asio_endpoint_placeholder_)
{}
/// \brief Constructor from a pair of address and port.
///
/// \param address The IP address of the endpoint.
/// \param port The TCP port number of the endpoint.
TCPEndpoint(const IOAddress& address, const unsigned short port) :
asio_endpoint_placeholder_(
new asio::ip::tcp::endpoint(
asio::ip::address::from_string(address.toText()), port)),
new asio::ip::tcp::endpoint(asio::ip::address::from_string(address.toText()),
port)),
asio_endpoint_(*asio_endpoint_placeholder_)
{}
......@@ -59,39 +60,53 @@ public:
/// corresponding ASIO class, \c tcp::endpoint.
///
/// \param asio_endpoint The ASIO representation of the TCP endpoint.
TCPEndpoint(const asio::ip::tcp::endpoint& asio_endpoint) :
TCPEndpoint(asio::ip::tcp::endpoint& asio_endpoint) :
asio_endpoint_placeholder_(NULL), asio_endpoint_(asio_endpoint)
{}
/// \brief Constructor from an ASIO TCP endpoint.
///
/// This constructor is designed to be an efficient wrapper for the
/// corresponding ASIO class, \c tcp::endpoint.
///
/// \param asio_endpoint The ASIO representation of the TCP endpoint.
TCPEndpoint(const asio::ip::tcp::endpoint& asio_endpoint) :
asio_endpoint_placeholder_(new asio::ip::tcp::endpoint(asio_endpoint)),
asio_endpoint_(*asio_endpoint_placeholder_)
{}
/// \brief The destructor.
~TCPEndpoint() { delete asio_endpoint_placeholder_; }
virtual ~TCPEndpoint() { delete asio_endpoint_placeholder_; }
//@}
IOAddress getAddress() const {
virtual IOAddress getAddress() const {
return (asio_endpoint_.address());
}
uint16_t getPort() const {
virtual uint16_t getPort() const {
return (asio_endpoint_.port());
}
short getProtocol() const {
virtual short getProtocol() const {
return (asio_endpoint_.protocol().protocol());
}
short getFamily() const {
virtual short getFamily() const {
return (asio_endpoint_.protocol().family());
}
// This is not part of the exosed IOEndpoint API but allows
// direct access to the ASIO implementation of the endpoint
const asio::ip::tcp::endpoint& getASIOEndpoint() const {
inline const asio::ip::tcp::endpoint& getASIOEndpoint() const {
return (asio_endpoint_);
}
inline asio::ip::tcp::endpoint& getASIOEndpoint() {
return (asio_endpoint_);
}
private:
const asio::ip::tcp::endpoint* asio_endpoint_placeholder_;
const asio::ip::tcp::endpoint& asio_endpoint_;
asio::ip::tcp::endpoint* asio_endpoint_placeholder_;
asio::ip::tcp::endpoint& asio_endpoint_;
};
} // namespace asiolink
......
......@@ -27,8 +27,13 @@
#include <iostream>
#include <cstddef>
#include <boost/bind.hpp>
#include <boost/numeric/conversion/cast.hpp>