Commit 236adec4 authored by Stephen Morris's avatar Stephen Morris

[trac554] Removed UDPQuery

Replaced with IOFetch with a protocol selection of UDP.
parent 85b6fa72
......@@ -19,6 +19,7 @@ libasiolink_la_SOURCES += dns_server.h
libasiolink_la_SOURCES += dns_service.h dns_service.cc
libasiolink_la_SOURCES += interval_timer.h interval_timer.cc
libasiolink_la_SOURCES += io_address.h io_address.cc
libasiolink_la_SOURCES += io_completion_cb.h
libasiolink_la_SOURCES += io_endpoint.h io_endpoint.cc
libasiolink_la_SOURCES += io_error.h
libasiolink_la_SOURCES += io_fetch.h io_fetch.cc
......@@ -31,7 +32,6 @@ libasiolink_la_SOURCES += tcp_endpoint.h
libasiolink_la_SOURCES += tcp_server.h tcp_server.cc
libasiolink_la_SOURCES += tcp_socket.h
libasiolink_la_SOURCES += udp_endpoint.h
libasiolink_la_SOURCES += udp_query.h udp_query.cc
libasiolink_la_SOURCES += udp_server.h udp_server.cc
libasiolink_la_SOURCES += udp_socket.h
# Note: the ordering matters: -Wno-... must follow -Wextra (defined in
......
......@@ -97,6 +97,12 @@ public:
/// It will eventually be removed once the wrapper interface is
/// generalized.
asio::io_service& get_io_service() { return io_service_.get_io_service(); }
/// \brief Return the IO Service Object
///
/// \return IOService object for this DNS service.
asiolink::IOService& getIOService() { return (io_service_);}
private:
DNSServiceImpl* impl_;
IOService& io_service_;
......
......@@ -18,9 +18,10 @@
#include <asio.hpp>
#include <asiolink/recursive_query.h>
#include <asiolink/dns_service.h>
#include <asiolink/udp_query.h>
#include <asiolink/io_fetch.h>
#include <asiolink/io_service.h>
#include <asiolink/recursive_query.h>
#include <log/dummylog.h>
......@@ -65,10 +66,10 @@ typedef std::pair<std::string, uint16_t> addr_t;
*
* Used by RecursiveQuery::sendQuery.
*/
class RunningQuery : public UDPQuery::Callback {
class RunningQuery : public IOFetch::Callback {
private:
// The io service to handle async calls
asio::io_service& io_;
IOService& io_;
// Info for (re)sending the query (the question and destination)
Question question_;
......@@ -138,22 +139,22 @@ private:
int serverIndex = rand() % uc;
dlog("Sending upstream query (" + question_.toText() +
") to " + upstream_->at(serverIndex).first);
UDPQuery query(io_, question_,
IOFetch query(IPPROTO_UDP, io_, question_,
upstream_->at(serverIndex).first,
upstream_->at(serverIndex).second, buffer_, this,
query_timeout_);
++queries_out_;
io_.post(query);
io_.get_io_service().post(query);
} else if (zs > 0) {
int serverIndex = rand() % zs;
dlog("Sending query to zone server (" + question_.toText() +
") to " + zone_servers_.at(serverIndex).first);
UDPQuery query(io_, question_,
IOFetch query(IPPROTO_IDP, io_, question_,
zone_servers_.at(serverIndex).first,
zone_servers_.at(serverIndex).second, buffer_, this,
query_timeout_);
++queries_out_;
io_.post(query);
io_.get_io_service().post(query);
} else {
dlog("Error, no upstream servers to send to.");
}
......@@ -280,7 +281,7 @@ private:
}
public:
RunningQuery(asio::io_service& io,
RunningQuery(IOService& io,
const Question &question,
MessagePtr answer_message,
boost::shared_ptr<AddressVector> upstream,
......@@ -299,8 +300,8 @@ public:
cname_count_(0),
query_timeout_(query_timeout),
retries_(retries),
client_timer(io),
lookup_timer(io),
client_timer(io.get_io_service()),
lookup_timer(io.get_io_service()),
queries_out_(0),
done_(false),
answer_sent_(false)
......@@ -380,10 +381,10 @@ public:
}
// This function is used as callback from DNSQuery.
virtual void operator()(UDPQuery::Result result) {
virtual void operator()(IOFetch::Result result) {
// XXX is this the place for TCP retry?
--queries_out_;
if (!done_ && result != UDPQuery::TIME_OUT) {
if (!done_ && result != IOFetch::TIME_OUT) {
// we got an answer
Message incoming(Message::PARSE);
InputBuffer ibuf(buffer_->getData(), buffer_->getLength());
......@@ -417,7 +418,7 @@ void
RecursiveQuery::resolve(const QuestionPtr& question,
const isc::resolve::ResolverInterface::CallbackPtr callback)
{
asio::io_service& io = dns_service_.get_io_service();
IOService& io = dns_service_.getIOService();
MessagePtr answer_message(new Message(Message::RENDER));
OutputBufferPtr buffer(new OutputBuffer(0));
......@@ -438,7 +439,7 @@ RecursiveQuery::resolve(const Question& question,
// the message should be sent via TCP or UDP, or sent initially via
// UDP and then fall back to TCP on failure, but for the moment
// we're only going to handle UDP.
asio::io_service& io = dns_service_.get_io_service();
IOService& io = dns_service_.getIOService();
isc::resolve::ResolverInterface::CallbackPtr crs(
new isc::resolve::ResolverCallbackServer(server));
......
......@@ -26,7 +26,6 @@ run_unittests_SOURCES += io_service_unittest.cc
run_unittests_SOURCES += interval_timer_unittest.cc
run_unittests_SOURCES += recursive_query_unittest.cc
run_unittests_SOURCES += udp_endpoint_unittest.cc
run_unittests_SOURCES += udp_query_unittest.cc
run_unittests_SOURCES += udp_socket_unittest.cc
run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES)
......
// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#include <gtest/gtest.h>
#include <asio.hpp>
#include <boost/bind.hpp>
#include <cstdlib>
#include <dns/question.h>
#include <asiolink/udp_query.h>
using namespace asio;
using namespace isc::dns;
using asio::ip::udp;
namespace {
const asio::ip::address TEST_HOST(asio::ip::address::from_string("127.0.0.1"));
const uint16_t TEST_PORT(5301);
// FIXME Shouldn't we send something that is real message?
const char TEST_DATA[] = "TEST DATA";
// Test fixture for the asiolink::UDPQuery.
class UDPQueryTest : public ::testing::Test,
public asiolink::UDPQuery::Callback
{
public:
// Expected result of the callback
asiolink::UDPQuery::Result expected_;
// Did the callback run already?
bool run_;
// We use an io_service to run the query
io_service service_;
// Something to ask
Question question_;
// Buffer where the UDPQuery will store response
OutputBufferPtr buffer_;
// The query we are testing
asiolink::UDPQuery query_;
UDPQueryTest() :
run_(false),
question_(Name("example.net"), RRClass::IN(), RRType::A()),
buffer_(new OutputBuffer(512)),
query_(service_, question_, asiolink::IOAddress(TEST_HOST),
TEST_PORT, buffer_, this, 100)
{ }
// This is the callback's (), so it can be called.
void operator()(asiolink::UDPQuery::Result result) {
// We check the query returns the correct result
EXPECT_EQ(expected_, result);
// Check it is called only once
EXPECT_FALSE(run_);
// And mark the callback was called
run_ = true;
}
// A response handler, pretending to be remote DNS server
void respond(udp::endpoint* remote, udp::socket* socket) {
// Some data came, just send something back.
socket->send_to(asio::buffer(TEST_DATA, sizeof TEST_DATA),
*remote);
socket->close();
}
};
/*
* Test that when we run the query and stop it after it was run,
* it returns "stopped" correctly.
*
* That is why stop() is posted to the service_ as well instead
* of calling it.
*/
TEST_F(UDPQueryTest, stop) {
expected_ = asiolink::UDPQuery::STOPPED;
// Post the query
service_.post(query_);
// Post query_.stop() (yes, the boost::bind thing is just
// query_.stop()).
service_.post(boost::bind(&asiolink::UDPQuery::stop, query_,
asiolink::UDPQuery::STOPPED));
// Run both of them
service_.run();
EXPECT_TRUE(run_);
}
/*
* Test that when we queue the query to service_ and call stop()
* before it gets executed, it acts sanely as well (eg. has the
* same result as running stop() after - calls the callback).
*/
TEST_F(UDPQueryTest, prematureStop) {
expected_ = asiolink::UDPQuery::STOPPED;
// Stop before it is started
query_.stop();
service_.post(query_);
service_.run();
EXPECT_TRUE(run_);
}
/*
* Test that it will timeout when no answer will arrive.
*/
TEST_F(UDPQueryTest, timeout) {
expected_ = asiolink::UDPQuery::TIME_OUT;
service_.post(query_);
service_.run();
EXPECT_TRUE(run_);
}
/*
* Test that it will succeed when we fake an answer and
* stores the same data we send.
*
* This is done through a real socket on loopback address.
*/
TEST_F(UDPQueryTest, receive) {
expected_ = asiolink::UDPQuery::SUCCESS;
udp::socket socket(service_, udp::v4());
socket.set_option(socket_base::reuse_address(true));
socket.bind(udp::endpoint(TEST_HOST, TEST_PORT));
char inbuff[512];
udp::endpoint remote;
socket.async_receive_from(asio::buffer(inbuff, 512), remote, boost::bind(
&UDPQueryTest::respond, this, &remote, &socket));
service_.post(query_);
service_.run();
EXPECT_TRUE(run_);
ASSERT_EQ(sizeof TEST_DATA, buffer_->getLength());
EXPECT_EQ(0, memcmp(TEST_DATA, buffer_->getData(), sizeof TEST_DATA));
}
}
// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#include <config.h>
#include <asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_array.hpp>
#include <dns/messagerenderer.h>
#include <dns/opcode.h>
#include <dns/rcode.h>
#include <log/dummylog.h>
#include <asio.hpp>
#include <asiolink.h>
#include <coroutine.h>
#include <asiolink/udp_endpoint.h>
#include <asiolink/udp_query.h>
using namespace asio;
using asio::ip::udp;
using asio::ip::tcp;
using isc::log::dlog;
using namespace std;
using namespace isc::dns;
namespace asiolink {
// Private UDPQuery data (see internal/udpdns.h for reasons)
struct UDPQuery::PrivateData {
// Socket we send query to and expect reply from there
udp::socket socket;
// Where was the query sent
udp::endpoint remote;
// What we ask the server
Question question;
// We will store the answer here
OutputBufferPtr buffer;
OutputBufferPtr msgbuf;
// Temporary buffer for answer
boost::shared_array<char> data;
// This will be called when the data arrive or timeouts
Callback* callback;
// Did we already stop operating (data arrived, we timed out, someone
// called stop). This can be so when we are cleaning up/there are
// still pointers to us.
bool stopped;
// Timer to measure timeouts.
deadline_timer timer;
// How many milliseconds are we willing to wait for answer?
int timeout;
PrivateData(io_service& service,
const udp::socket::protocol_type& protocol, const Question &q,
OutputBufferPtr b, Callback *c) :
socket(service, protocol),
question(q),
buffer(b),
msgbuf(new OutputBuffer(512)),
callback(c),
stopped(false),
timer(service)
{ }
};
/// The following functions implement the \c UDPQuery class.
///
/// The constructor
UDPQuery::UDPQuery(io_service& io_service,
const Question& q, const IOAddress& addr, uint16_t port,
OutputBufferPtr buffer, Callback *callback, int timeout) :
data_(new PrivateData(io_service,
addr.getFamily() == AF_INET ? udp::v4() : udp::v6(), q, buffer,
callback))
{
data_->remote = UDPEndpoint(addr, port).getASIOEndpoint();
data_->timeout = timeout;
}
/// The function operator is implemented with the "stackless coroutine"
/// pattern; see internal/coroutine.h for details.
void
UDPQuery::operator()(error_code ec, size_t length) {
if (ec || data_->stopped) {
return;
}
CORO_REENTER (this) {
/// Generate the upstream query and render it to wire format
/// This is done in a different scope to allow inline variable
/// declarations.
{
Message msg(Message::RENDER);
// XXX: replace with boost::random or some other suitable PRNG
msg.setQid(0);
msg.setOpcode(Opcode::QUERY());
msg.setRcode(Rcode::NOERROR());
msg.setHeaderFlag(Message::HEADERFLAG_RD);
msg.addQuestion(data_->question);
MessageRenderer renderer(*data_->msgbuf);
msg.toWire(renderer);
dlog("Sending " + msg.toText() + " to " +
data_->remote.address().to_string());
}
// If we timeout, we stop, which will shutdown everything and
// cancel all other attempts to run inside the coroutine
if (data_->timeout != -1) {
data_->timer.expires_from_now(boost::posix_time::milliseconds(
data_->timeout));
data_->timer.async_wait(boost::bind(&UDPQuery::stop, *this,
TIME_OUT));
}
// Begin an asynchronous send, and then yield. When the
// send completes, we will resume immediately after this point.
CORO_YIELD data_->socket.async_send_to(buffer(data_->msgbuf->getData(),
data_->msgbuf->getLength()), data_->remote, *this);
/// Allocate space for the response. (XXX: This should be
/// optimized by maintaining a free list of pre-allocated blocks)
data_->data.reset(new char[MAX_LENGTH]);
/// Begin an asynchronous receive, and yield. When the receive
/// completes, we will resume immediately after this point.
CORO_YIELD data_->socket.async_receive_from(buffer(data_->data.get(),
MAX_LENGTH), data_->remote, *this);
// The message is not rendered yet, so we can't print it easilly
dlog("Received response from " + data_->remote.address().to_string());
/// Copy the answer into the response buffer. (XXX: If the
/// OutputBuffer object were made to meet the requirements of
/// a MutableBufferSequence, then it could be written to directly
/// by async_recieve_from() and this additional copy step would
/// be unnecessary.)
data_->buffer->writeData(data_->data.get(), length);
/// We are done
stop(SUCCESS);
}
}
void
UDPQuery::stop(Result result) {
if (!data_->stopped) {
switch (result) {
case TIME_OUT:
dlog("Query timed out");
break;
case STOPPED:
dlog("Query stopped");
break;
default:;
}
data_->stopped = true;
data_->socket.cancel();
data_->socket.close();
data_->timer.cancel();
if (data_->callback) {
(*data_->callback)(result);
}
}
}
} // namespace asiolink
// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#ifndef __UDP_QUERY_H
#define __UDP_QUERY_H 1
#ifndef ASIO_HPP
#error "asio.hpp must be included before including this, see asiolink.h as to why"
#endif
#include <dns/buffer.h>
#include <asiolink/io_address.h>
#include <coroutine.h>
namespace asiolink {
//
// Asynchronous UDP coroutine for upstream queries
//
class UDPQuery : public coroutine {
public:
// TODO Maybe this should be more generic than just for UDPQuery?
///
/// \brief Result of the query
///
/// This is related only to contacting the remote server. If the answer
///indicates error, it is still counted as SUCCESS here, if it comes back.
///
enum Result {
SUCCESS,
TIME_OUT,
STOPPED
};
/// Abstract callback for the UDPQuery.
class Callback {
public:
virtual ~Callback() {}
/// This will be called when the UDPQuery is completed
virtual void operator()(Result result) = 0;
};
///
/// \brief Constructor.
///
/// It creates the query.
/// @param callback will be called when we terminate. It is your task to
/// delete it if allocated on heap.
///@param timeout in ms.
///
explicit UDPQuery(asio::io_service& io_service,
const isc::dns::Question& q,
const IOAddress& addr, uint16_t port,
isc::dns::OutputBufferPtr buffer,
Callback* callback, int timeout = -1);
void operator()(asio::error_code ec = asio::error_code(),
size_t length = 0);
/// Terminate the query.
void stop(Result reason = STOPPED);
private:
enum { MAX_LENGTH = 4096 };
///
/// \short Private data
///
/// They are not private because of stability of the
/// interface (this is private class anyway), but because this class
/// will be copyed often (it is used as a coroutine and passed as callback
/// to many async_*() functions) and we want keep the same data. Some of
/// the data is not copyable too.
///
struct PrivateData;
boost::shared_ptr<PrivateData> data_;
};
} // namespace asiolink
#endif // __UDP_QUERY_H
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment