Commit f4b20fc5 authored by Stephen Morris's avatar Stephen Morris

[trac499] Finish TCPSocket and IOFetch and associated unit tests

parent 682436e8
// File created from asiodef.msg on Thu Feb 24 11:52:42 2011
// File created from asiodef.msg on Mon Feb 28 17:15:30 2011
#include <cstddef>
#include <log/message_types.h>
......@@ -20,14 +20,14 @@ extern const isc::log::MessageID ASIO_UNKRESULT = "UNKRESULT";
namespace {
const char* values[] = {
"FETCHCOMP", "upstream fetch to %s has now completed",
"FETCHSTOP", "upstream fetch to %s has been stopped",
"OPENSOCK", "error %d opening %s socket to %s",
"RECVSOCK", "error %d reading data from %s via a %s socket",
"RECVTMO", "receive timeout while waiting for data from %s",
"SENDSOCK", "error %d sending data to %s via a %s socket",
"FETCHCOMP", "upstream fetch to %s(%d) has now completed",
"FETCHSTOP", "upstream fetch to %s(%d) has been stopped",
"OPENSOCK", "error %d opening %s socket to %s(%d)",
"RECVSOCK", "error %d reading %s data from %s(%d)",
"RECVTMO", "receive timeout while waiting for data from %s(%d)",
"SENDSOCK", "error %d sending data using %s to %s(%d)",
"UNKORIGIN", "unknown origin for ASIO error code %d (protocol: %s, address %s)",
"UNKRESULT", "unknown result (%d) when IOFetch::stop() was executed for I/O to %s",
"UNKRESULT", "unknown result (%d) when IOFetch::stop() was executed for I/O to %s(%d)",
NULL
};
......
// File created from asiodef.msg on Thu Feb 24 11:52:42 2011
// File created from asiodef.msg on Mon Feb 28 17:15:30 2011
#ifndef __ASIODEF_H
#define __ASIODEF_H
......
......@@ -15,32 +15,32 @@
$PREFIX ASIO_
$NAMESPACE asiolink
FETCHCOMP upstream fetch to %s has now completed
FETCHCOMP upstream fetch to %s(%d) has now completed
+ A debug message, this records the the upstream fetch (a query made by the
+ resolver on behalf of its client) to the specified address has completed.
FETCHSTOP upstream fetch to %s has been stopped
FETCHSTOP upstream fetch to %s(%d) has been stopped
+ An external component has requested the halting of an upstream fetch. This
+ is an allowed operation, and the message should only appear if debug is
+ enabled.
OPENSOCK error %d opening %s socket to %s
OPENSOCK error %d opening %s socket to %s(%d)
+ The asynchronous I/O code encountered an error when trying to open a socket
+ of the specified protocol in order to send a message to the target address.
+ The the number of the system error that cause the problem is given in the
+ message.
RECVSOCK error %d reading data from %s via a %s socket
RECVSOCK error %d reading %s data from %s(%d)
+ The asynchronous I/O code encountered an error when trying read data from
+ the specified address on the given protocol. The the number of the system
+ error that cause the problem is given in the message.
SENDSOCK error %d sending data to %s via a %s socket
SENDSOCK error %d sending data using %s to %s(%d)
+ The asynchronous I/O code encountered an error when trying send data to
+ the specified address on the given protocol. The the number of the system
+ error that cause the problem is given in the message.
RECVTMO receive timeout while waiting for data from %s
RECVTMO receive timeout while waiting for data from %s(%d)
+ An upstream fetch from the specified address timed out. This may happen for
+ any number of reasons and is most probably a problem at the remote server
+ or a problem on the network. The message will only appear if debug is
......@@ -50,7 +50,7 @@ UNKORIGIN unknown origin for ASIO error code %d (protocol: %s, address %s)
+ This message should not appear and indicates an internal error if it does.
+ Please enter a bug report.
UNKRESULT unknown result (%d) when IOFetch::stop() was executed for I/O to %s
UNKRESULT unknown result (%d) when IOFetch::stop() was executed for I/O to %s(%d)
+ The termination method of the resolver's upstream fetch class was called with
+ an unknown result code (which is given in the message). This message should
+ not appear and may indicate an internal error. Please enter a bug report.
......@@ -57,17 +57,17 @@ isc::log::Logger logger("asio");
/// \brief IOFetch Data
///
/// The data for IOFetch is held in a separate struct pointed to by a
/// shared_ptr object. This is because the IOFetch object will be copied
/// often (it is used as a coroutine and passed as callback to many
/// async_*() functions) and we want keep the same data). Organising the
/// data in this way keeps copying to a minimum.
/// The data for IOFetch is held in a separate struct pointed to by a shared_ptr
/// object. This is because the IOFetch object will be copied often (it is used
/// as a coroutine and passed as callback to many async_*() functions) and we
/// want keep the same data). Organising the data in this way keeps copying to
/// a minimum.
struct IOFetchData {
// The first two members are shared pointers to a base class because what is
// actually instantiated depends on whether the fetch is over UDP or TCP,
// which is not known until construction of the IOFetch. Use of a shared
//pointer here is merely to ensure deletion when the data object is deleted.
// pointer here is merely to ensure deletion when the data object is deleted.
boost::shared_ptr<IOAsioSocket<IOFetch> > socket;
///< Socket to use for I/O
boost::shared_ptr<IOEndpoint> remote; ///< Where the fetch was sent
......@@ -80,23 +80,29 @@ struct IOFetchData {
bool stopped; ///< Have we stopped running?
asio::deadline_timer timer; ///< Timer to measure timeouts
int timeout; ///< Timeout in ms
IOFetch::Origin origin; ///< Origin of last asynchronous I/O
// In case we need to log an error, the origin of the last asynchronous
// I/O is recorded. To save time and simplify the code, this is recorded
// as the ID of the error message that would be generated if the I/O failed.
// This means that we must make sure that all possible "origins" take the
// same arguments in their message in the same order.
isc::log::MessageID origin; ///< Origin of last asynchronous I/O
/// \brief Constructor
///
/// Just fills in the data members of the IOFetchData structure
///
/// \param protocol Either IOFetch::TCP or IOFetch::UDP
/// \param protocol Either IOFetch::TCP or IOFetch::UDP.
/// \param service I/O Service object to handle the asynchronous
/// operations.
/// operations.
/// \param query DNS question to send to the upstream server.
/// \param address IP address of upstream server
/// \param port Port to use for the query
/// \param buff Output buffer into which the response (in wire format)
/// is written (if a response is received).
/// is written (if a response is received).
/// \param cb Callback object containing the callback to be called
/// when we terminate. The caller is responsible for managing this
/// object and deleting it if necessary.
/// when we terminate. The caller is responsible for managing this
/// object and deleting it if necessary.
/// \param wait Timeout for the fetch (in ms).
///
/// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
......@@ -124,11 +130,10 @@ struct IOFetchData {
stopped(false),
timer(service.get_io_service()),
timeout(wait),
origin(IOFetch::NONE)
origin(ASIO_UNKORIGIN)
{}
};
/// IOFetch Constructor - just initialize the private data
IOFetch::IOFetch(Protocol protocol, IOService& service,
......@@ -145,8 +150,7 @@ IOFetch::IOFetch(Protocol protocol, IOService& service,
void
IOFetch::operator()(asio::error_code ec, size_t length) {
std::cerr << "IOFetch::operator() [" << this << "], origin = " <<
data_->origin << ", coroutine = " << get_value() << "\n";
if (data_->stopped) {
return;
} else if (ec) {
......@@ -161,7 +165,7 @@ IOFetch::operator()(asio::error_code ec, size_t length) {
/// declarations.
{
Message msg(Message::RENDER);
// TODO: replace with boost::random or some other suitable PRNG
msg.setQid(0);
msg.setOpcode(Opcode::QUERY());
......@@ -178,8 +182,8 @@ IOFetch::operator()(asio::error_code ec, size_t length) {
data_->remote->getAddress().toText());
}
// If we timeout, we stop, which will shutdown everything and
// cancel all other attempts to run inside the coroutine
// If we timeout, we stop, which will can cancel outstanding I/Os and
// shutdown everything.
if (data_->timeout != -1) {
data_->timer.expires_from_now(boost::posix_time::milliseconds(
data_->timeout));
......@@ -188,27 +192,20 @@ IOFetch::operator()(asio::error_code ec, size_t length) {
}
// Open a connection to the target system. For speed, if the operation
// was completed synchronously (i.e. UDP operation) we bypass the yield.
data_->origin = OPEN;
// is synchronous (i.e. UDP operation) we bypass the yield.
data_->origin = ASIO_OPENSOCK;
if (data_->socket->isOpenSynchronous()) {
std::cerr << "IOFetch: Opening socket synchronously\n";
data_->socket->open(data_->remote.get(), *this);
} else {
std::cerr << "IOFetch: Opening socket asynchronously and yeilding\n";
CORO_YIELD data_->socket->open(data_->remote.get(), *this);
std::cerr << "IOFetch: Resuming after Opening socket asynchronously\n";
}
// Begin an asynchronous send, and then yield. When the send completes
// send completes, we will resume immediately after this point.
// Note: A TCP message may not be sent in one piece (depends on the
// implementation in TCP socket). Therefore there may be
data_->origin = SEND;
std::cerr << "IOFetch: asynchronous send\n";
// Begin an asynchronous send, and then yield. When the send completes,
// we will resume immediately after this point.
data_->origin = ASIO_SENDSOCK;
CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
data_->msgbuf->getLength(), data_->remote.get(), *this);
std::cerr << "IOFetch: resuming after asynchronous send\n";
// Now receive the response. Since TCP may not receive the entire
// message in one operation, we need to loop until we have received
// it. (This can't be done within the asyncReceive() method because
......@@ -216,30 +213,25 @@ IOFetch::operator()(asio::error_code ec, size_t length) {
// we need to yield ... and we *really* don't want to set up another
// coroutine within that method.) So after each receive (and yield),
// we check if the operation is complete and if not, loop to read again.
data_->origin = RECEIVE;
data_->origin = ASIO_RECVSOCK;
do {
std::cerr << "IOFetch: asynchronous receive\n";
CORO_YIELD data_->socket->asyncReceive(data_->data.get(),
static_cast<size_t>(MIN_LENGTH), data_->cumulative,
data_->remote.get(), *this);
data_->cumulative += length;
std::cerr << "IOFetch: resuming after asynchronous receive\n";
} while (!data_->socket->receiveComplete(data_->data.get(),
data_->cumulative));
// The message is not rendered yet, so we can't print it easily
dlog("Received response from " + data_->remote->getAddress().toText());
/// Copy the answer into the response buffer. (TODO: If the
/// OutputBuffer object were made to meet the requirements of
/// a MutableBufferSequence, then it could be written to directly
/// by async_receive_from() and this additional copy step would
/// be unnecessary.)
/// OutputBuffer object were made to meet the requirements of a
/// MutableBufferSequence, then it could be written to directly by
/// async_receive_from() and this additional copy step would be
/// unnecessary.)
data_->buffer->writeData(data_->data.get(), length);
// Finished with this socket, so close it.
data_->origin = CLOSE;
std::cerr << "IOFetch: close\n";
// Finished with this socket, so close it. This will not generate an
// I/O error, but reset the origin to unknown in case we change this.
data_->origin = ASIO_UNKORIGIN;
data_->socket->close();
/// We are done
......@@ -251,9 +243,8 @@ IOFetch::operator()(asio::error_code ec, size_t length) {
// query finishes or when the timer times out. Either way, it sets the
// "stopped_" flag and cancels anything that is in progress.
//
// As the function may be entered multiple times as things wind down, the
// stopped_ flag checks if stop() has already been called. If it has,
// subsequent calls are no-ops.
// As the function may be entered multiple times as things wind down, it checks
// if the stopped_ flag is already set. If it is, the call is a no-op.
void
IOFetch::stop(Result result) {
......@@ -276,24 +267,24 @@ IOFetch::stop(Result result) {
//
// Although Logger::debug checks the debug flag internally, doing it
// below before calling Logger::debug avoids the overhead of a string
// conversion in the common paths and in the common case when debug is
// not enabled.
// conversion in the common case when debug is not enabled.
//
// TODO: Update testing of stopped_ if threads are used.
data_->stopped = true;
switch (result) {
case TIME_OUT:
if (logger.isDebugEnabled(1)) {
logger.debug(1, ASIO_RECVTMO,
data_->remote->getAddress().toText().c_str());
logger.debug(20, ASIO_RECVTMO,
data_->remote->getAddress().toText().c_str(),
static_cast<int>(data_->remote->getPort()));
}
break;
case SUCCESS:
if (logger.isDebugEnabled(50)) {
logger.debug(50, ASIO_FETCHCOMP,
data_->remote->getAddress().toText().c_str());
logger.debug(30, ASIO_FETCHCOMP,
data_->remote->getAddress().toText().c_str(),
static_cast<int>(data_->remote->getPort()));
}
break;
......@@ -301,13 +292,15 @@ IOFetch::stop(Result result) {
// Fetch has been stopped for some other reason. This is
// allowed but as it is unusual it is logged, but with a lower
// debug level than a timeout (which is totally normal).
logger.debug(10, ASIO_FETCHSTOP,
data_->remote->getAddress().toText().c_str());
logger.debug(1, ASIO_FETCHSTOP,
data_->remote->getAddress().toText().c_str(),
static_cast<int>(data_->remote->getPort()));
break;
default:
logger.error(ASIO_UNKRESULT, static_cast<int>(result),
data_->remote->getAddress().toText().c_str());
data_->remote->getAddress().toText().c_str(),
static_cast<int>(data_->remote->getPort()));
}
// Stop requested, cancel and I/O's on the socket and shut it down,
......@@ -321,9 +314,6 @@ IOFetch::stop(Result result) {
if (data_->callback) {
(*(data_->callback))(result);
}
// Mark that stop() has now been called.
}
}
......@@ -331,32 +321,19 @@ IOFetch::stop(Result result) {
void IOFetch::logIOFailure(asio::error_code ec) {
// Get information that will be in all messages
static const char* PROTOCOL[2] = {"TCP", "UDP"};
const char* prot = (data_->remote->getProtocol() == IPPROTO_TCP) ?
PROTOCOL[0] : PROTOCOL[1];
int errcode = ec.value();
std::string str_address = data_->remote->getAddress().toText();
const char* address = str_address.c_str();
switch (data_->origin) {
case OPEN:
logger.error(ASIO_OPENSOCK, errcode, prot, address);
break;
// Should only get here with a known error code.
assert((data_->origin == ASIO_OPENSOCK) ||
(data_->origin == ASIO_SENDSOCK) ||
(data_->origin == ASIO_RECVSOCK) ||
(data_->origin == ASIO_UNKORIGIN));
case SEND:
logger.error(ASIO_SENDSOCK, errcode, prot, address);
break;
case RECEIVE:
logger.error(ASIO_RECVSOCK, errcode, prot, address);
break;
default:
logger.error(ASIO_UNKORIGIN, errcode, prot, address);
}
static const char* PROTOCOL[2] = {"TCP", "UDP"};
logger.error(data_->origin,
ec.value(),
((data_->remote->getProtocol() == IPPROTO_TCP) ?
PROTOCOL[0] : PROTOCOL[1]),
data_->remote->getAddress().toText().c_str(),
static_cast<int>(data_->remote->getPort()));
}
} // namespace asiolink
......
......@@ -62,7 +62,7 @@ private:
TCPSocket& operator=(const TCPSocket&);
public:
/// \brief Constructor from an ASIO TCP socket.
///
/// \param socket The ASIO representation of the TCP socket. It is assumed
......
......@@ -12,13 +12,14 @@
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#include <gtest/gtest.h>
#include <boost/bind.hpp>
#include <algorithm>
#include <cstdlib>
#include <string>
#include <iostream>
#include <string.h>
#include <gtest/gtest.h>
#include <boost/bind.hpp>
#include <asio.hpp>
......@@ -30,6 +31,7 @@
#include <dns/name.h>
#include <dns/rcode.h>
#include <asiolink/asiolink_utilities.h>
#include <asiolink/io_address.h>
#include <asiolink/io_endpoint.h>
#include <asiolink/io_fetch.h>
......@@ -38,6 +40,7 @@
using namespace asio;
using namespace isc::dns;
using namespace asio::ip;
using namespace std;
namespace asiolink {
......@@ -59,6 +62,7 @@ public:
IOFetch udp_fetch_; ///< For UDP query test
IOFetch tcp_fetch_; ///< For TCP query test
IOFetch::Protocol protocol_; ///< Protocol being tested
size_t cumulative_; ///< Cumulative data received by "server".
// The next member is the buffer in which the "server" (implemented by the
// response handler methods in this class) receives the question sent by the
......@@ -77,7 +81,8 @@ public:
TEST_PORT, result_buff_, this, 100),
tcp_fetch_(IOFetch::TCP, service_, question_, IOAddress(TEST_HOST),
TEST_PORT, result_buff_, this, 1000),
protocol_(IOFetch::TCP) // for initialization - will be changed
protocol_(IOFetch::TCP), // for initialization - will be changed
cumulative_(0)
{
// Construct the data buffer for question we expect to receive.
Message msg(Message::RENDER);
......@@ -140,7 +145,8 @@ public:
// Check that length of the received data and the expected data are
// identical, then check that the data is identical as well.
EXPECT_EQ(msgbuf_->getLength(), length);
EXPECT_TRUE(memcmp(msgbuf_->getData(), server_buff_, length) == 0);
EXPECT_TRUE(equal(server_buff_, (server_buff_ + length - 1),
static_cast<const uint8_t*>(msgbuf_->getData())));
// Return a message back to the IOFetch object.
socket->send_to(asio::buffer(TEST_DATA, sizeof TEST_DATA), *remote);
......@@ -155,10 +161,11 @@ public:
/// \param ec Boost error code, value should be zero.
void tcpAcceptHandler(tcp::socket* socket, error_code ec = error_code())
{
std::cerr << "TCP Accept Handler\n";
EXPECT_EQ(0, ec.value()); // Expect no error
// Expect that the accept completed without a problem.
EXPECT_EQ(0, ec.value());
// Initiate a read on the socket
// Initiate a read on the socket.
cumulative_ = 0;
socket->async_receive(asio::buffer(server_buff_, sizeof(server_buff_)),
boost::bind(&IOFetchTest::tcpReceiveHandler, this, socket, _1, _2));
}
......@@ -166,8 +173,9 @@ public:
/// \brief Completion handler for receiving TCP data
///
/// When IOFetch is sending data, this response handler emulates the remote
/// DNS server. It checks that the data sent by the IOFetch object is what
/// was expected to have been sent, then sends back a known buffer of data.
/// DNS server. It that all the data sent by the IOFetch object has been
/// received, issuing another read if not. If the data is complete, it is
/// compared to what is expected and a reply sent back to the IOFetch.
///
/// \param socket Socket to use to send the answer
/// \param ec ASIO error code, completion code of asynchronous I/O issued
......@@ -176,36 +184,48 @@ public:
void tcpReceiveHandler(tcp::socket* socket, error_code ec = error_code(),
size_t length = 0)
{
std::cerr << "TCP Receive Handler\n";
// TODO - need to loop until all the data is received.
// Interpret the received data. The first two bytes, when converted
// to host byte order, are the count of the length of the message.
EXPECT_GE(2, length);
uint16_t dns_length = readUint16(server_buff_);
EXPECT_EQ(length, dns_length + 2);
// Check that length of the DNS message received is that expected.
EXPECT_EQ(msgbuf_->getLength(), dns_length);
// Compare buffers, zeroing the QID in the received buffer to match
// Expect that the receive completed without a problem.
EXPECT_EQ(0, ec.value());
// If we haven't received all the data, issue another read.
cumulative_ += length;
bool complete = false;
if (cumulative_ > 2) {
uint16_t dns_length = readUint16(server_buff_);
complete = ((dns_length + 2) == cumulative_);
}
if (!complete) {
socket->async_receive(asio::buffer((server_buff_ + cumulative_),
(sizeof(server_buff_) - cumulative_)),
boost::bind(&IOFetchTest::tcpReceiveHandler, this, socket, _1, _2));
return;
}
// Check that length of the DNS message received is that expected, then
// compare buffers, zeroing the QID in the received buffer to match
// that set in our expected question. Note that due to the length
// field the QID in the received buffer is in the thrid and fourth
// field the QID in the received buffer is in the third and fourth
// bytes.
EXPECT_EQ(msgbuf_->getLength() + 2, cumulative_);
server_buff_[2] = server_buff_[3] = 0;
EXPECT_TRUE(memcmp(msgbuf_->getData(), server_buff_ + 2, dns_length) == 0);
EXPECT_TRUE(equal((server_buff_ + 2), (server_buff_ + cumulative_ - 2),
static_cast<const uint8_t*>(msgbuf_->getData())));
// ... and return a message back. This has to be preceded by a two-byte
// count field. It's simpler to do this as two writes - it shouldn't
// make any difference to the IOFetch object.
//
// When specifying the callback handler, the expected size of the
// data written is passed as the first parameter.
uint8_t count[2];
writeUint16(sizeof(TEST_DATA), count);
socket->async_send(asio::buffer(count, 2),
boost::bind(&IOFetchTest::tcpSendHandler, this,
sizeof(count), _1, _2));
2, _1, _2));
socket->async_send(asio::buffer(TEST_DATA, sizeof(TEST_DATA)),
boost::bind(&IOFetchTest::tcpSendHandler, this,
sizeof(count), _1, _2));
sizeof(TEST_DATA), _1, _2));
}
/// \brief Completion Handler for Sending TCP data
......@@ -221,7 +241,6 @@ public:
void tcpSendHandler(size_t expected = 0, error_code ec = error_code(),
size_t length = 0)
{
std::cerr << "TCP Send Handler\n";
EXPECT_EQ(0, ec.value()); // Expect no error
EXPECT_EQ(expected, length); // And that amount sent is as expected
}
......@@ -234,7 +253,7 @@ public:
///
/// \param result Result indicated by the callback
void operator()(IOFetch::Result result) {
std::cerr << "Fetch completion\n";
EXPECT_EQ(expected_, result); // Check correct result returned
EXPECT_FALSE(run_); // Check it is run only once
run_ = true; // Note success
......@@ -360,6 +379,7 @@ TEST_F(IOFetchTest, UdpSendReceive) {
protocol_ = IOFetch::UDP;
expected_ = IOFetch::SUCCESS;
// Set up the server.
udp::socket socket(service_.get_io_service(), udp::v4());
socket.set_option(socket_base::reuse_address(true));
socket.bind(udp::endpoint(TEST_HOST, TEST_PORT));
......@@ -395,21 +415,17 @@ TEST_F(IOFetchTest, TcpSendReceive) {
protocol_ = IOFetch::TCP;
expected_ = IOFetch::SUCCESS;
std::cerr << "Creating socket\n";
// Socket into which the connection will be accepted
tcp::socket socket(service_.get_io_service());
std::cerr << "Creating acceptor\n";
// Acceptor object - called when the connection is made, the handler will
// initiate a read on the socket.
tcp::acceptor acceptor(service_.get_io_service(),
tcp::endpoint(tcp::v4(), TEST_PORT));
std::cerr << "Issuing async accept call\n";
acceptor.async_accept(socket,
boost::bind(&IOFetchTest::tcpAcceptHandler, this, &socket, _1));
// Post the TCP fetch object to send the query and receive the response.
std::cerr << "Posting TCP fetch\n";
service_.get_io_service().post(tcp_fetch_);
// ... and execute all the callbacks. This exits when the fetch completes.
......
......@@ -318,14 +318,14 @@ TEST(TCPSocket, SequenceTest) {
client_cb.setCode(43); // Some error
EXPECT_FALSE(client.isOpenSynchronous());
client.open(&server_endpoint, client_cb);
// Run the open and the accept callback and check that they ran.
service.run_one();
service.run_one();
EXPECT_EQ(TCPCallback::ACCEPT, server_cb.called());
EXPECT_EQ(0, server_cb.getCode());
EXPECT_EQ(TCPCallback::OPEN, client_cb.called());
EXPECT_EQ(0, client_cb.getCode());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment