Commit 583203a8 authored by zhanglikun's avatar zhanglikun

[trac598_new] Reimplement the ticket based on new branch. Implement the...

[trac598_new] Reimplement the ticket based on new branch. Implement the simplest forwarder by refactoring the code
parent e87e34cd
......@@ -144,7 +144,7 @@ public:
void resolve(const isc::dns::QuestionPtr& question,
const isc::resolve::ResolverInterface::CallbackPtr& callback);
void processNormalQuery(const Question& question,
void processNormalQuery(ConstMessagePtr query_message,
MessagePtr answer_message,
OutputBufferPtr buffer,
DNSServer* server);
......@@ -468,7 +468,7 @@ Resolver::processMessage(const IOMessage& io_message,
// The RecursiveQuery object will post the "resume" event to the
// DNSServer when an answer arrives, so we don't have to do it now.
sendAnswer = false;
impl_->processNormalQuery(*question, answer_message,
impl_->processNormalQuery(query_message, answer_message,
buffer, server);
}
}
......@@ -486,13 +486,19 @@ ResolverImpl::resolve(const QuestionPtr& question,
}
void
ResolverImpl::processNormalQuery(const Question& question,
ResolverImpl::processNormalQuery(ConstMessagePtr query_message,
MessagePtr answer_message,
OutputBufferPtr buffer,
DNSServer* server)
{
dlog("Processing normal query");
rec_query_->resolve(question, answer_message, buffer, server);
if (upstream_.empty()) {
dlog("Processing normal query");
ConstQuestionPtr question = *query_message->beginQuestion();
rec_query_->resolve(*question, answer_message, buffer, server);
} else {
dlog("Processing forward query");
rec_query_->forward(query_message, answer_message, buffer, server);
}
}
ConstElementPtr
......
......@@ -35,7 +35,6 @@
#include <asiolink/udp_endpoint.h>
#include <asiolink/udp_socket.h>
#include <dns/message.h>
#include <dns/messagerenderer.h>
#include <dns/opcode.h>
#include <dns/rcode.h>
......@@ -80,7 +79,6 @@ struct IOFetchData {
///< Socket to use for I/O
boost::scoped_ptr<IOEndpoint> remote_snd;///< Where the fetch is sent
boost::scoped_ptr<IOEndpoint> remote_rcv;///< Where the response came from
isc::dns::Question question; ///< Question to be asked
OutputBufferPtr msgbuf; ///< Wire buffer for question
OutputBufferPtr received; ///< Received data put here
IOFetch::Callback* callback; ///< Called on I/O Completion
......@@ -110,7 +108,6 @@ struct IOFetchData {
/// \param proto Either IOFetch::TCP or IOFetch::UDP.
/// \param service I/O Service object to handle the asynchronous
/// operations.
/// \param query DNS question to send to the upstream server.
/// \param address IP address of upstream server
/// \param port Port to use for the query
/// \param buff Output buffer into which the response (in wire format)
......@@ -122,8 +119,8 @@ struct IOFetchData {
///
/// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
IOFetchData(IOFetch::Protocol proto, IOService& service,
const isc::dns::Question& query, const IOAddress& address,
uint16_t port, OutputBufferPtr& buff, IOFetch::Callback* cb, int wait)
const IOAddress& address, uint16_t port, OutputBufferPtr& buff,
IOFetch::Callback* cb, int wait)
:
socket((proto == IOFetch::UDP) ?
static_cast<IOAsioSocket<IOFetch>*>(
......@@ -139,7 +136,6 @@ struct IOFetchData {
static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
),
question(query),
msgbuf(new OutputBuffer(512)),
received(buff),
callback(cb),
......@@ -174,10 +170,10 @@ struct IOFetchData {
IOFetch::IOFetch(Protocol protocol, IOService& service,
const isc::dns::Question& question, const IOAddress& address, uint16_t port,
OutputBufferPtr& buff, Callback* cb, int wait)
:
data_(new IOFetchData(protocol, service, question, address,
port, buff, cb, wait))
{
MessagePtr query_msg(new Message(Message::RENDER));
initIOFetch(query_msg, protocol, service, question, address, port, buff,
cb, wait);
}
IOFetch::IOFetch(Protocol protocol, IOService& service,
......@@ -185,14 +181,55 @@ IOFetch::IOFetch(Protocol protocol, IOService& service,
OutputBufferPtr& buff, Callback* cb, int wait)
:
data_(new IOFetchData(protocol, service,
isc::dns::Question(isc::dns::Name("dummy.example.org"),
isc::dns::RRClass::IN(), isc::dns::RRType::A()),
address, port, buff, cb, wait))
{
data_->msgbuf = outpkt;
data_->packet = true;
}
IOFetch::IOFetch(Protocol protocol, IOService& service,
ConstMessagePtr query_message, const IOAddress& address, uint16_t port,
OutputBufferPtr& buff, Callback* cb, int wait)
{
MessagePtr msg(new Message(Message::RENDER));
Message::HeaderFlag flag = Message::HEADERFLAG_RD;
msg->setHeaderFlag(flag, query_message->getHeaderFlag(flag));
flag = Message::HEADERFLAG_CD;
msg->setHeaderFlag(flag, query_message->getHeaderFlag(flag));
ConstEDNSPtr edns(query_message->getEDNS());
const bool dnssec_ok = edns && edns->getDNSSECAwareness();
if (edns) {
EDNSPtr edns_response(new EDNS());
edns_response->setDNSSECAwareness(dnssec_ok);
// TODO: We should make our own edns bufsize length configurable
edns_response->setUDPSize(Message::DEFAULT_MAX_EDNS0_UDPSIZE);
msg->setEDNS(edns_response);
}
initIOFetch(msg, protocol, service,
**(query_message->beginQuestion()),
address, port, buff, cb, wait);
}
void
IOFetch::initIOFetch(MessagePtr& query_msg, Protocol protocol, IOService& service,
const isc::dns::Question& question,
const IOAddress& address, uint16_t port,
OutputBufferPtr& buff, Callback* cb, int wait)
{
data_ = boost::shared_ptr<IOFetchData>(new IOFetchData(
protocol, service, address, port, buff, cb, wait));
query_msg->setQid(data_->qid);
query_msg->setOpcode(Opcode::QUERY());
query_msg->setRcode(Rcode::NOERROR());
query_msg->setHeaderFlag(Message::HEADERFLAG_RD);
query_msg->addQuestion(question);
MessageRenderer renderer(*data_->msgbuf);
query_msg->toWire(renderer);
}
// Return protocol in use.
IOFetch::Protocol
......@@ -224,17 +261,7 @@ IOFetch::operator()(asio::error_code ec, size_t length) {
// first two bytes of the packet).
data_->msgbuf->writeUint16At(data_->qid, 0);
} else {
// A question was given, construct the packet
Message msg(Message::RENDER);
msg.setQid(data_->qid);
msg.setOpcode(Opcode::QUERY());
msg.setRcode(Rcode::NOERROR());
msg.setHeaderFlag(Message::HEADERFLAG_RD);
msg.addQuestion(data_->question);
MessageRenderer renderer(*data_->msgbuf);
msg.toWire(renderer);
}
}
}
// If we timeout, we stop, which will can cancel outstanding I/Os and
......@@ -404,4 +431,3 @@ void IOFetch::logIOFailure(asio::error_code ec) {
} // namespace asiodns
} // namespace isc {
......@@ -29,6 +29,7 @@
#include <util/buffer.h>
#include <dns/question.h>
#include <dns/message.h>
namespace isc {
namespace asiodns {
......@@ -136,6 +137,20 @@ public:
uint16_t port, isc::util::OutputBufferPtr& buff, Callback* cb,
int wait = -1);
/// \brief Constructor
/// This constructor has one parameter "query_message", which
/// is the shared_ptr to a full query message. It's different
/// with above contructor which has only question section. All
/// other parameters are same.
///
/// \param query_message the shared_ptr to a full query message
/// got from a query client.
IOFetch(Protocol protocol, isc::asiolink::IOService& service,
isc::dns::ConstMessagePtr query_message,
const isc::asiolink::IOAddress& address,
uint16_t port, isc::util::OutputBufferPtr& buff, Callback* cb,
int wait = -1);
/// \brief Constructor.
///
/// Creates the object that will handle the upstream fetch.
......@@ -184,6 +199,15 @@ public:
void stop(Result reason = STOPPED);
private:
/// \brief IOFetch Initialization Function.
/// All the parameters are same with the constructor, except
/// parameter "query_message"
/// \param query_message the message to be sent out.
void initIOFetch(isc::dns::MessagePtr& query_message, Protocol protocol,
isc::asiolink::IOService& service, const isc::dns::Question& question,
const isc::asiolink::IOAddress& address, uint16_t port,
isc::util::OutputBufferPtr& buff, Callback* cb, int wait);
/// \brief Log I/O Failure
///
/// Records an I/O failure to the log file
......
......@@ -562,6 +562,7 @@ private:
/// that ongoing state information will not be lost if the object
/// that originated the asynchronous call falls out of scope.
typedef boost::shared_ptr<Message> MessagePtr;
typedef boost::shared_ptr<const Message> ConstMessagePtr;
std::ostream& operator<<(std::ostream& os, const Message& message);
}
......
......@@ -131,13 +131,12 @@ private:
// Info for (re)sending the query (the question and destination)
Question question_;
// This is the query message got from client
ConstMessagePtr query_message_;
// This is where we build and store our final answer
MessagePtr answer_message_;
// currently we use upstream as the current list of NS records
// we should differentiate between forwarding and resolving
boost::shared_ptr<AddressVector> upstream_;
// Test server - only used for testing. This takes precedence over all
// other servers if the port is non-zero.
std::pair<std::string, uint16_t> test_server_;
......@@ -282,13 +281,8 @@ private:
}
}
// 'general' send; if we are in forwarder mode, send a query to
// a random nameserver in our forwarders list. If we are in
// recursive mode, ask the NSAS to give us an address.
// 'general' send, ask the NSAS to give us an address.
void send(IOFetch::Protocol protocol = IOFetch::UDP) {
// If are in forwarder mode, send it to a random
// forwarder. If not, ask the NSAS for an address
const int uc = upstream_->size();
protocol_ = protocol; // Store protocol being used for this
if (test_server_.second != 0) {
dlog("Sending upstream query (" + question_.toText() +
......@@ -300,18 +294,6 @@ private:
test_server_.second, buffer_, this,
query_timeout_);
io_.get_io_service().post(query);
} else if (uc > 0) {
// TODO: use boost, or rand()-utility function we provide
int serverIndex = rand() % uc;
dlog("Sending upstream query (" + question_.toText() +
") to " + upstream_->at(serverIndex).first);
++outstanding_events_;
gettimeofday(&current_ns_qsent_time, NULL);
IOFetch query(protocol, io_, question_,
upstream_->at(serverIndex).first,
upstream_->at(serverIndex).second, buffer_, this,
query_timeout_);
io_.get_io_service().post(query);
} else {
// Ask the NSAS for an address for the current zone,
// the callback will call the actual sendTo()
......@@ -486,7 +468,6 @@ public:
RunningQuery(IOService& io,
const Question& question,
MessagePtr answer_message,
boost::shared_ptr<AddressVector> upstream,
std::pair<std::string, uint16_t>& test_server,
OutputBufferPtr buffer,
isc::resolve::ResolverInterface::CallbackPtr cb,
......@@ -498,8 +479,8 @@ public:
:
io_(io),
question_(question),
query_message_(),
answer_message_(answer_message),
upstream_(upstream),
test_server_(test_server),
buffer_(buffer),
resolvercallback_(cb),
......@@ -647,8 +628,7 @@ public:
incoming.fromWire(ibuf);
buffer_->clear();
if (recursive_mode() &&
incoming.getRcode() == Rcode::NOERROR()) {
if (incoming.getRcode() == Rcode::NOERROR()) {
done_ = handleRecursiveAnswer(incoming);
} else {
isc::resolve::copyResponseMessage(incoming, answer_message_);
......@@ -682,13 +662,11 @@ public:
} else if (!done_ && retries_--) {
// Query timed out, but we have some retries, so send again
dlog("Timeout for " + question_.toText() + " to " + current_ns_address.getAddress().toText() + ", resending query");
if (recursive_mode()) {
current_ns_address.updateRTT(isc::nsas::AddressEntry::UNREACHABLE);
}
current_ns_address.updateRTT(isc::nsas::AddressEntry::UNREACHABLE);
send();
} else {
// We are either already done, or out of retries
if (recursive_mode() && result == IOFetch::TIME_OUT) {
if (result == IOFetch::TIME_OUT) {
dlog("Timeout for " + question_.toText() + " to " + current_ns_address.getAddress().toText() + ", giving up");
current_ns_address.updateRTT(isc::nsas::AddressEntry::UNREACHABLE);
}
......@@ -705,12 +683,148 @@ public:
void makeSERVFAIL() {
isc::resolve::makeErrorMessage(answer_message_, Rcode::SERVFAIL());
}
// Returns true if we are in 'recursive' mode
// Returns false if we are in 'forwarding' mode
// (i.e. if we have anything in upstream_)
bool recursive_mode() const {
return upstream_->empty();
};
class ForwardQuery : public IOFetch::Callback {
private:
// The io service to handle async calls
IOService& io_;
// This is the query message got from client
ConstMessagePtr query_message_;
// This is where we build and store our final answer
MessagePtr answer_message_;
// List of nameservers to forward to
boost::shared_ptr<AddressVector> upstream_;
// Buffer to store the result.
OutputBufferPtr buffer_;
// This will be notified when we succeed or fail
isc::resolve::ResolverInterface::CallbackPtr resolvercallback_;
/*
* TODO Do something more clever with timeouts. In the long term, some
* computation of average RTT, increase with each retry, etc.
*/
// Timeout information
int query_timeout_;
// TODO: replace by our wrapper
asio::deadline_timer client_timer;
asio::deadline_timer lookup_timer;
// If we have a client timeout, we send back an answer, but don't
// stop. We use this variable to make sure we don't send another
// answer if we do find one later (or if we have a lookup_timeout)
bool answer_sent_;
// send the query to the server.
void send(IOFetch::Protocol protocol = IOFetch::UDP) {
const int uc = upstream_->size();
buffer_->clear();
int serverIndex = rand() % uc;
ConstQuestionPtr question = *(query_message_->beginQuestion());
dlog("Sending upstream query (" + question->toText() +
") to " + upstream_->at(serverIndex).first);
// Forward the query, create the IOFetch with
// query message, so that query flags can be forwarded
// together.
IOFetch query(protocol, io_, query_message_,
upstream_->at(serverIndex).first,
upstream_->at(serverIndex).second,
buffer_, this, query_timeout_);
io_.get_io_service().post(query);
}
public:
ForwardQuery(IOService& io,
ConstMessagePtr query_message,
MessagePtr answer_message,
boost::shared_ptr<AddressVector> upstream,
OutputBufferPtr buffer,
isc::resolve::ResolverInterface::CallbackPtr cb,
int query_timeout, int client_timeout, int lookup_timeout) :
io_(io),
query_message_(query_message),
answer_message_(answer_message),
upstream_(upstream),
buffer_(buffer),
resolvercallback_(cb),
query_timeout_(query_timeout),
client_timer(io.get_io_service()),
lookup_timer(io.get_io_service()),
answer_sent_(false)
{
// Setup the timer to stop trying (lookup_timeout)
if (lookup_timeout >= 0) {
lookup_timer.expires_from_now(
boost::posix_time::milliseconds(lookup_timeout));
lookup_timer.async_wait(boost::bind(&ForwardQuery::stop, this, false));
}
// Setup the timer to send an answer (client_timeout)
if (client_timeout >= 0) {
client_timer.expires_from_now(
boost::posix_time::milliseconds(client_timeout));
client_timer.async_wait(boost::bind(&ForwardQuery::clientTimeout, this));
}
send();
}
virtual void clientTimeout() {
// Return a SERVFAIL, but do not stop until
// we have an answer or timeout ourselves
isc::resolve::makeErrorMessage(answer_message_,
Rcode::SERVFAIL());
if (!answer_sent_) {
answer_sent_ = true;
resolvercallback_->success(answer_message_);
}
}
virtual void stop(bool resume) {
// if we cancel our timers, we will still get an event for
// that, so we cannot delete ourselves just yet (those events
// would be bound to a deleted object)
// cancel them one by one, both cancels should get us back
// here again.
// same goes if we have an outstanding query (can't delete
// until that one comes back to us)
if (resume && !answer_sent_) {
answer_sent_ = true;
resolvercallback_->success(answer_message_);
} else {
resolvercallback_->failure();
}
if (lookup_timer.cancel() != 0) {
return;
}
if (client_timer.cancel() != 0) {
return;
}
delete this;
}
// This function is used as callback from DNSQuery.
virtual void operator()(IOFetch::Result result) {
// XXX is this the place for TCP retry?
if (result != IOFetch::TIME_OUT) {
// we got an answer
Message incoming(Message::PARSE);
InputBuffer ibuf(buffer_->getData(), buffer_->getLength());
incoming.fromWire(ibuf);
isc::resolve::copyResponseMessage(incoming, answer_message_);
stop(true);
} else {
// timeout, give up for now
stop(false);
}
}
};
......@@ -753,7 +867,7 @@ RecursiveQuery::resolve(const QuestionPtr& question,
} else {
dlog("Message not found in cache, starting recursive query");
// It will delete itself when it is done
new RunningQuery(io, *question, answer_message, upstream_,
new RunningQuery(io, *question, answer_message,
test_server_, buffer, callback,
query_timeout_, client_timeout_,
lookup_timeout_, retries_, nsas_,
......@@ -807,7 +921,7 @@ RecursiveQuery::resolve(const Question& question,
} else {
dlog("Message not found in cache, starting recursive query");
// It will delete itself when it is done
new RunningQuery(io, question, answer_message, upstream_,
new RunningQuery(io, question, answer_message,
test_server_, buffer, crs, query_timeout_,
client_timeout_, lookup_timeout_, retries_,
nsas_, cache_, rtt_recorder_);
......@@ -815,5 +929,36 @@ RecursiveQuery::resolve(const Question& question,
}
}
void
RecursiveQuery::forward(ConstMessagePtr query_message,
MessagePtr answer_message,
OutputBufferPtr buffer,
DNSServer* server,
isc::resolve::ResolverInterface::CallbackPtr callback)
{
// XXX: eventually we will need to be able to determine whether
// the message should be sent via TCP or UDP, or sent initially via
// UDP and then fall back to TCP on failure, but for the moment
// we're only going to handle UDP.
IOService& io = dns_service_.getIOService();
if (!callback) {
callback.reset(new isc::resolve::ResolverCallbackServer(server));
}
// TODO: general 'prepareinitialanswer'
answer_message->setOpcode(isc::dns::Opcode::QUERY());
ConstQuestionPtr question = *query_message->beginQuestion();
answer_message->addQuestion(*question);
// implement the simplest forwarder, which will pass
// everything throught without interpretation, except
// QID, port number. The response will not be cached.
// It will delete itself when it is done
new ForwardQuery(io, query_message, answer_message,
upstream_, buffer, callback, query_timeout_,
client_timeout_, lookup_timeout_);
}
} // namespace asiodns
} // namespace isc
......@@ -141,6 +141,20 @@ public:
isc::util::OutputBufferPtr buffer,
DNSServer* server);
/// \brief Initiates forwarding for the given query.
///
/// Others parameters are same with the parameters of
/// function resolve().
///
/// \param query_message the full query got from client.
/// \param callback callback object
void forward(isc::dns::ConstMessagePtr query_message,
isc::dns::MessagePtr answer_message,
isc::util::OutputBufferPtr buffer,
DNSServer* server,
isc::resolve::ResolverInterface::CallbackPtr callback =
isc::resolve::ResolverInterface::CallbackPtr());
/// \brief Set Test Server
///
/// This method is *only* for unit testing the class. If set, it enables
......
......@@ -35,6 +35,7 @@
#include <nsas/nameserver_address_store.h>
#include <cache/resolver_cache.h>
#include <resolve/resolve.h>
// IMPORTANT: We shouldn't directly use ASIO definitions in this test.
// In particular, we must not include asio.hpp in this file.
......@@ -567,9 +568,12 @@ TEST_F(RecursiveQueryTest, forwarderSend) {
singleAddress(TEST_IPV4_ADDR, port));
Question q(Name("example.com"), RRClass::IN(), RRType::TXT());
Message query_message(Message::RENDER);
isc::resolve::initResponseMessage(q, query_message);
OutputBufferPtr buffer(new OutputBuffer(0));
MessagePtr answer(new Message(Message::RENDER));
rq.resolve(q, answer, buffer, &server);
rq.forward(ConstMessagePtr(&query_message), answer, buffer, &server);
char data[4096];
size_t size = sizeof(data);
......@@ -634,8 +638,41 @@ bool tryRead(int sock_, int recv_options, size_t max, int* num) {
return true;
}
// Mock resolver callback for testing forward query.
class MockResolverCallback : public isc::resolve::ResolverInterface::Callback {
public:
enum ResultValue {
DEFAULT = 0,
SUCCESS = 1,
FAILURE = 2
};
MockResolverCallback(DNSServer* server):
result(DEFAULT),
server_(server->clone())
{}
~MockResolverCallback() {
delete server_;
}
void success(const isc::dns::MessagePtr response) {
result = SUCCESS;
server_->resume(true);
}
void failure() {
result = FAILURE;
server_->resume(false);
}
uint32_t result;
private:
DNSServer* server_;
};
// Test it tries the correct amount of times before giving up
// Test query timeout, set query timeout is lower than client timeout
// and lookup timeout.
TEST_F(RecursiveQueryTest, forwardQueryTimeout) {
// Prepare the service (we do not use the common setup, we do not answer
setDNSService();
......@@ -657,26 +694,20 @@ TEST_F(RecursiveQueryTest, forwardQueryTimeout) {
Question question(Name("example.net"), RRClass::IN(), RRType::A());
OutputBufferPtr buffer(new OutputBuffer(0));
MessagePtr answer(new Message(Message::RENDER));
query.resolve(question, answer, buffer, &server);
Message query_message(Message::RENDER);
isc::resolve::initResponseMessage(question, query_message);
boost::shared_ptr<MockResolverCallback> callback(new MockResolverCallback(&server));
query.forward(ConstMessagePtr(&query_message), answer, buffer, &server, callback);
// Run the test
io_service_->run();
// Read up to 3 packets. Use some ad hoc timeout to prevent an infinite
// block (see also recvUDP()).
int recv_options = setSocketTimeout(sock_, 10, 0);
int num = 0;
bool read_success = tryRead(sock_, recv_options, 3, &num);
// The query should 'succeed' with an error response
EXPECT_TRUE(done);
EXPECT_EQ(3, num);
EXPECT_TRUE(read_success);
EXPECT_EQ(callback->result, MockResolverCallback::FAILURE);