Commit 0c0161c6 authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

]1599] Changed the server to be synchronous

It compiles, but has some problems in tests.
parent ecd7dbe0
// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
// Copyright (C) 2012 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
......@@ -12,312 +12,162 @@
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h> // for some IPC/network system calls
#include <errno.h>
#include <boost/shared_array.hpp>
#include <config.h>
#include <log/dummylog.h>
#include <asio.hpp>
#include <asio/error.hpp>
#include "sync_udp_server.h"
#include "logger.h"
#include <dns/message.h>
#include <util/buffer.h>
#include <asiolink/dummy_io_cb.h>
#include <asiolink/udp_endpoint.h>
#include <asiolink/udp_socket.h>
#include "sync_udp_server.h"
#include "logger.h"
#include <dns/opcode.h>
#include <boost/bind.hpp>
using namespace asio;
using asio::ip::udp;
using isc::log::dlog;
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h> // for some IPC/network system calls
#include <errno.h>
using namespace std;
using namespace isc::dns;
using namespace isc::util;
using namespace boost;
using namespace isc::asiolink;
namespace isc {
namespace asiodns {
/*
* Some of the member variables here are shared_ptrs and some are
* auto_ptrs. There will be one instance of Data for the lifetime
* of packet. The variables that are state only for a single packet
* use auto_ptr, as it is more lightweight. In the case of shared
* configuration (eg. the callbacks, socket), we use shared_ptrs.
*/
struct SyncUDPServer::Data {
/*
* Constructors from parameters passed to UDPServer constructor.
* This instance will not be used to retrieve and answer the actual
* query, it will only hold parameters until we wait for the
* first packet. But we do initialize the socket in here.
*/
Data(io_service& io_service, const ip::address& addr, const uint16_t port,
SimpleCallback* checkin, DNSLookup* lookup, DNSAnswer* answer) :
io_(io_service), done_(false),
checkin_callback_(checkin),lookup_callback_(lookup),
answer_callback_(answer)
{
// We must use different instantiations for v4 and v6;
// otherwise ASIO will bind to both
udp proto = addr.is_v4() ? udp::v4() : udp::v6();
socket_.reset(new udp::socket(io_service, proto));
socket_->set_option(socket_base::reuse_address(true));
if (addr.is_v6()) {
socket_->set_option(asio::ip::v6_only(true));
}
socket_->bind(udp::endpoint(addr, port));
}
Data(io_service& io_service, int fd, int af, SimpleCallback* checkin,
DNSLookup* lookup, DNSAnswer* answer) :
io_(io_service), done_(false),
checkin_callback_(checkin),lookup_callback_(lookup),
answer_callback_(answer)
{
if (af != AF_INET && af != AF_INET6) {
isc_throw(InvalidParameter, "Address family must be either AF_INET "
"or AF_INET6, not " << af);
}
LOG_DEBUG(logger, DBGLVL_TRACE_BASIC, ASIODNS_FD_ADD_UDP).arg(fd);
try {
socket_.reset(new udp::socket(io_service));
socket_->assign(af == AF_INET6 ? udp::v6() : udp::v4(), fd);
} catch (const std::exception& exception) {
// Whatever the thing throws, it is something from ASIO and we
// convert it
isc_throw(IOError, exception.what());
}
SyncUDPServer::SyncUDPServer(asio::io_service& io_service,
const asio::ip::address& addr,
const uint16_t port,
asiolink::SimpleCallback* checkin,
DNSLookup* lookup, DNSAnswer* answer) :
io_(io_service), checkin_callback_(checkin), lookup_callback_(lookup),
answer_callback_(answer)
{
// We must use different instantiations for v4 and v6;
// otherwise ASIO will bind to both
asio::ip::udp proto = addr.is_v4() ? asio::ip::udp::v4() :
asio::ip::udp::v6();
socket_.reset(new asio::ip::udp::socket(io_service, proto));
socket_->set_option(asio::socket_base::reuse_address(true));
if (addr.is_v6()) {
socket_->set_option(asio::ip::v6_only(true));
}
socket_->bind(asio::ip::udp::endpoint(addr, port));
}
/*
* Copy constructor. Default one would probably do, but it is unnecessary
* to copy many of the member variables every time we fork to handle
* another packet.
*
* We also allocate data for receiving the packet here.
*/
Data(const Data& other) :
io_(other.io_), socket_(other.socket_), done_(false),
checkin_callback_(other.checkin_callback_),
lookup_callback_(other.lookup_callback_),
answer_callback_(other.answer_callback_)
{
// Instantiate the data buffer and endpoint that will
// be used by the asynchronous receive call.
data_.reset(new char[MAX_LENGTH]);
sender_.reset(new udp::endpoint());
SyncUDPServer::SyncUDPServer(asio::io_service& io_service, const int fd,
const int af, asiolink::SimpleCallback* checkin,
DNSLookup* lookup, DNSAnswer* answer) :
io_(io_service), checkin_callback_(checkin), lookup_callback_(lookup),
answer_callback_(answer)
{
if (af != AF_INET && af != AF_INET6) {
isc_throw(InvalidParameter, "Address family must be either AF_INET "
"or AF_INET6, not " << af);
}
LOG_DEBUG(logger, DBGLVL_TRACE_BASIC, ASIODNS_FD_ADD_UDP).arg(fd);
try {
socket_.reset(new asio::ip::udp::socket(io_service));
socket_->assign(af == AF_INET6 ? asio::ip::udp::v6() :
asio::ip::udp::v4(), fd);
} catch (const std::exception& exception) {
// Whatever the thing throws, it is something from ASIO and we
// convert it
isc_throw(IOError, exception.what());
}
}
// The ASIO service object
asio::io_service& io_;
// Class member variables which are dynamic, and changes to which
// need to accessible from both sides of a coroutine fork or from
// outside of the coroutine (i.e., from an asynchronous I/O call),
// should be declared here as pointers and allocated in the
// constructor or in the coroutine. This allows state information
// to persist when an individual copy of the coroutine falls out
// scope while waiting for an event, *so long as* there is another
// object that is referencing the same data. As a side-benefit, using
// pointers also reduces copy overhead for coroutine objects.
//
// Note: Currently these objects are allocated by "new" in the
// constructor, or in the function operator while processing a query.
// Repeated allocations from the heap for every incoming query is
// clearly a performance issue; this must be optimized in the future.
// The plan is to have a structure pre-allocate several "Data"
// objects which can be pulled off a free list and placed on an in-use
// list whenever a query comes in. This will serve the dual purpose
// of improving performance and guaranteeing that state information
// will *not* be destroyed when any one instance of the coroutine
// falls out of scope while waiting for an event.
//
// Socket used to for listen for queries. Created in the
// constructor and stored in a shared_ptr because socket objects
// are not copyable.
boost::shared_ptr<asio::ip::udp::socket> socket_;
// The ASIO-internal endpoint object representing the client
std::auto_ptr<asio::ip::udp::endpoint> sender_;
// \c IOMessage and \c Message objects to be passed to the
// DNS lookup and answer providers
std::auto_ptr<asiolink::IOMessage> io_message_;
// The original query as sent by the client
isc::dns::MessagePtr query_message_;
// The response message we are building
isc::dns::MessagePtr answer_message_;
// The buffer into which the response is written
isc::util::OutputBufferPtr respbuf_;
// The buffer into which the query packet is written
boost::shared_array<char> data_;
// State information that is entirely internal to a given instance
// of the coroutine can be declared here.
size_t bytes_;
bool done_;
// Callback functions provided by the caller
const SimpleCallback* checkin_callback_;
const DNSLookup* lookup_callback_;
const DNSAnswer* answer_callback_;
std::auto_ptr<IOEndpoint> peer_;
std::auto_ptr<IOSocket> iosock_;
};
/// The following functions implement the \c UDPServer class.
///
/// The constructor. It just creates new internal state object
/// and lets it handle the initialization.
SyncUDPServer::SyncUDPServer(io_service& io_service, const ip::address& addr,
const uint16_t port, SimpleCallback* checkin,
DNSLookup* lookup, DNSAnswer* answer) :
data_(new Data(io_service, addr, port, checkin, lookup, answer))
{ }
SyncUDPServer::SyncUDPServer(io_service& io_service, int fd, int af,
SimpleCallback* checkin, DNSLookup* lookup,
DNSAnswer* answer) :
data_(new Data(io_service, fd, af, checkin, lookup, answer))
{ }
/// The function operator is implemented with the "stackless coroutine"
/// pattern; see internal/coroutine.h for details.
void
SyncUDPServer::operator()(asio::error_code ec, size_t length) {
/// Because the coroutine reentry block is implemented as
/// a switch statement, inline variable declarations are not
/// permitted. Certain variables used below can be declared here.
CORO_REENTER (this) {
do {
/*
* This is preparation for receiving a packet. We get a new
* state object for the lifetime of the next packet to come.
* It allocates the buffers to receive data into.
*/
data_.reset(new Data(*data_));
do {
// Begin an asynchronous receive, then yield.
// When the receive event is posted, the coroutine
// will resume immediately after this point.
CORO_YIELD data_->socket_->async_receive_from(
buffer(data_->data_.get(), MAX_LENGTH), *data_->sender_,
*this);
// Abort on fatal errors
// TODO: add log
if (ec) {
using namespace asio::error;
if (ec.value() != would_block && ec.value() != try_again &&
ec.value() != interrupted) {
return;
}
}
} while (ec || length == 0);
data_->bytes_ = length;
/*
* We fork the coroutine now. One (the child) will keep
* the current state and handle the packet, then die and
* drop ownership of the state. The other (parent) will just
* go into the loop again and replace the current state with
* a new one for a new object.
*
* Actually, both of the coroutines will be a copy of this
* one, but that's just internal implementation detail.
*/
CORO_FORK data_->io_.post(SyncUDPServer(*this));
} while (is_parent());
// Create an \c IOMessage object to store the query.
//
// (XXX: It would be good to write a factory function
// that would quickly generate an IOMessage object without
// all these calls to "new".)
data_->peer_.reset(new UDPEndpoint(*data_->sender_));
// The UDP socket class has been extended with asynchronous functions
// and takes as a template parameter a completion callback class. As
// UDPServer does not use these extended functions (only those defined
// in the IOSocket base class) - but needs a UDPSocket to get hold of
// the underlying Boost UDP socket - DummyIOCallback is used. This
// provides the appropriate operator() but is otherwise functionless.
data_->iosock_.reset(
new UDPSocket<DummyIOCallback>(*data_->socket_));
data_->io_message_.reset(new IOMessage(data_->data_.get(),
data_->bytes_, *data_->iosock_, *data_->peer_));
SyncUDPServer::scheduleRead() {
socket_->async_receive_from(asio::buffer(data_, MAX_LENGTH), sender_,
boost::bind(&SyncUDPServer::handleRead, this,
_1, _2));
}
// Perform any necessary operations prior to processing an incoming
// query (e.g., checking for queued configuration messages).
//
// (XXX: it may be a performance issue to check in for every single
// incoming query; we may wish to throttle this in the future.)
if (data_->checkin_callback_ != NULL) {
(*data_->checkin_callback_)(*data_->io_message_);
void
SyncUDPServer::handleRead(const asio::error_code& ec, const size_t length) {
// Abort on fatal errors
if (ec) {
using namespace asio::error;
if (ec.value() != would_block && ec.value() != try_again &&
ec.value() != interrupted) {
return;
}
}
// Some kind of interrupt, spurious wakeup, or like that. Just try reading
// again.
if (ec || length == 0) {
scheduleRead();
return;
}
// OK, we have a real packet of data. Let's dig into it!
// XXX: This is taken (and ported) from UDPSocket class. What the hell does
// it really mean?
// The UDP socket class has been extended with asynchronous functions
// and takes as a template parameter a completion callback class. As
// UDPServer does not use these extended functions (only those defined
// in the IOSocket base class) - but needs a UDPSocket to get hold of
// the underlying Boost UDP socket - DummyIOCallback is used. This
// provides the appropriate operator() but is otherwise functionless.
UDPSocket<DummyIOCallback> socket(*socket_);
UDPEndpoint endpoint(sender_);
IOMessage message(data_, length, socket, endpoint);
if (checkin_callback_ != NULL) {
(*checkin_callback_)(message);
}
// If we don't have a DNS Lookup provider, there's no point in
// continuing; we exit the coroutine permanently.
if (data_->lookup_callback_ == NULL) {
CORO_YIELD return;
}
// If we don't have a DNS Lookup provider, there's no point in
// continuing; we exit the coroutine permanently.
if (lookup_callback_ == NULL) {
scheduleRead();
return;
}
// Instantiate objects that will be needed by the
// asynchronous DNS lookup and/or by the send call.
data_->respbuf_.reset(new OutputBuffer(0));
data_->query_message_.reset(new Message(Message::PARSE));
data_->answer_message_.reset(new Message(Message::RENDER));
// TODO: Can any of these be put to the object and reused?
isc::util::OutputBufferPtr output(new isc::util::OutputBuffer(0));
isc::dns::MessagePtr
query(new isc::dns::Message(isc::dns::Message::PARSE));
isc::dns::MessagePtr
answer(new isc::dns::Message(isc::dns::Message::RENDER));
// Schedule a DNS lookup, and yield. When the lookup is
// finished, the coroutine will resume immediately after
// this point.
CORO_YIELD data_->io_.post(AsyncLookup<SyncUDPServer>(*this));
// Mark that we don't have an answer yet.
done_ = false;
resume_called_ = false;
// The 'done_' flag indicates whether we have an answer
// to send back. If not, exit the coroutine permanently.
if (!data_->done_) {
CORO_YIELD return;
}
// Call the actual lookup
(*lookup_callback_)(message, query, answer, output, this);
// Call the DNS answer provider to render the answer into
// wire format
(*data_->answer_callback_)(*data_->io_message_, data_->query_message_,
data_->answer_message_, data_->respbuf_);
if (!resume_called_) {
isc_throw(isc::Unexpected,
"No resume called from the lookup callback");
}
if (done_) {
// Good, there's an answer.
// Call the answer callback to render it.
(*answer_callback_)(message, query, answer, output);
// Begin an asynchronous send, and then yield. When the
// send completes, we will resume immediately after this point
// (though we have nothing further to do, so the coroutine
// will simply exit at that time).
CORO_YIELD data_->socket_->async_send_to(
buffer(data_->respbuf_->getData(), data_->respbuf_->getLength()),
*data_->sender_, *this);
socket_->send_to(asio::buffer(output->getData(), output->getLength()),
sender_);
}
// And schedule handling another socket.
scheduleRead();
}
/// Call the DNS lookup provider. (Expected to be called by the
/// AsyncLookup<UDPServer> handler.)
void
SyncUDPServer::asyncLookup() {
(*data_->lookup_callback_)(*data_->io_message_,
data_->query_message_, data_->answer_message_, data_->respbuf_, this);
SyncUDPServer::operator()(asio::error_code, size_t) {
// To start the server, we just schedule reading of data when they
// arrive.
scheduleRead();
}
/// Stop the UDPServer
......@@ -331,7 +181,7 @@ SyncUDPServer::stop() {
/// for it won't be scheduled by io service not matter it is
/// submit to io serice before or after close call. And we will
//. get bad_descriptor error
data_->socket_->close();
socket_->close();
}
/// Post this coroutine on the ASIO service queue so that it will
......@@ -339,13 +189,13 @@ SyncUDPServer::stop() {
/// whether there is an answer to return to the client.
void
SyncUDPServer::resume(const bool done) {
data_->done_ = done;
data_->io_.post(*this);
resume_called_ = true;
done_ = done;
}
bool
SyncUDPServer::hasAnswer() {
return (data_->done_);
return (done_);
}
} // namespace asiodns
......
// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
// Copyright (C) 2012 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
......@@ -19,25 +19,24 @@
#error "asio.hpp must be included before including this, see asiolink.h as to why"
#endif
#include "dns_answer.h"
#include "dns_lookup.h"
#include "dns_server.h"
#include <asiolink/simple_callback.h>
#include <asiodns/dns_answer.h>
#include <asiodns/dns_lookup.h>
#include <asiodns/dns_server.h>
#include <exceptions.h>
#include <coroutine.h>
#include <boost/noncopyable.hpp>
namespace isc {
namespace asiodns {
//
// Asynchronous UDP server coroutine
//
/// \brief An UDP server that doesn't asynchronous lookup handlers.
///
/// \brief This class implements the coroutine to handle UDP
/// DNS query event. As such, it is both a \c DNSServer and
/// a \c coroutine
///
class SyncUDPServer : public virtual DNSServer, public virtual coroutine {
/// That means, the lookup handler must provide the answer right away.
/// This allows for implementation with less overhead, compared with
/// the UDPClass.
class SyncUDPServer : public DNSServer, public boost::noncopyable {
public:
/// \brief Constructor
/// \param io_service the asio::io_service to work with
......@@ -47,10 +46,10 @@ public:
/// \param lookup the callbackprovider for DNS lookup events
/// \param answer the callbackprovider for DNS answer events
explicit SyncUDPServer(asio::io_service& io_service,
const asio::ip::address& addr, const uint16_t port,
isc::asiolink::SimpleCallback* checkin = NULL,
DNSLookup* lookup = NULL,
DNSAnswer* answer = NULL);
const asio::ip::address& addr, const uint16_t port,
isc::asiolink::SimpleCallback* checkin = NULL,
DNSLookup* lookup = NULL,
DNSAnswer* answer = NULL);
/// \brief Constructor
/// \param io_service the asio::io_service to work with
......@@ -62,59 +61,64 @@ public:
/// \throw isc::InvalidParameter if af is neither AF_INET nor AF_INET6
/// \throw isc::asiolink::IOError when a low-level error happens, like the
/// fd is not a valid descriptor.
SyncUDPServer(asio::io_service& io_service, int fd, int af,
isc::asiolink::SimpleCallback* checkin = NULL,
DNSLookup* lookup = NULL, DNSAnswer* answer = NULL);
SyncUDPServer(asio::io_service& io_service, const int fd, const int af,
isc::asiolink::SimpleCallback* checkin = NULL,
DNSLookup* lookup = NULL, DNSAnswer* answer = NULL);
/// \brief The function operator
void operator()(asio::error_code ec = asio::error_code(),
/// \brief Start the SyncUDPServer.
///
/// This is the function operator to keep interface with other server
/// classes. They need that because they're coroutines.
virtual void operator()(asio::error_code ec = asio::error_code(),
size_t length = 0);
/// \brief Calls the lookup callback
void asyncLookup();
virtual void asyncLookup() {
isc_throw(Unexpected,
"SyncUDPServer doesn't support asyncLookup by design, use "
"UDPServer if you need it.");
}
/// \brief Stop the running server
/// \note once the server stopped, it can't restart
void stop();
virtual void stop();
/// \brief Resume operation
///
/// Note that unlike other servers, this one expects it to be called
/// directly from the lookup callback. If it isn't, the server will
/// throw an Unexpected exception (probably to the event loop, which
/// would usually lead to termination of the program, but that's OK,
/// as it would be serious programmer error).
///
/// \param done Set this to true if the lookup action is done and
/// we have an answer
void resume(const bool done);
virtual void resume(const bool done);
/// \brief Check if we have an answer
///
/// \return true if we have an answer
bool hasAnswer();
/// \brief Returns the coroutine state value
///
/// \return the coroutine state value
int value() { return (get_value()); }
virtual bool hasAnswer();
/// \brief Clones the object
///
/// \return a newly allocated copy of this object
DNSServer* clone() {
SyncUDPServer* s = new SyncUDPServer(*this);
return (s);
virtual DNSServer* clone() {
isc_throw(Unexpected, "SyncUDPServer can't be cloned.");
}
private:
enum { MAX_LENGTH = 4096 };
/**
* \brief Internal state and data.
*
* We use the pimple design pattern, but not because we need to hide
* internal data. This class and whole header is for private use anyway.
* It turned out that SyncUDPServer is copied a lot, because it is a coroutine.
* This way the overhead of copying is lower, we copy only one shared
* pointer instead of about 10 of them.
*/
struct Data;
boost::shared_ptr<Data> data_;
static const size_t MAX_LENGTH = 4096;
uint8_t data_[MAX_LENGTH];
std::auto_ptr<asio::ip::udp::socket> socket_;
asio::io_service& io_;
asio::ip::udp::endpoint sender_;
const asiolink::SimpleCallback* checkin_callback_;
const DNSLookup* lookup_callback_;
const DNSAnswer* answer_callback_;
bool resume_called_, done_;
void scheduleRead();
void handleRead(const asio::error_code& ec, const size_t length);