Commit 6c1f3af0 authored by Stephen Morris's avatar Stephen Morris

[trac499] Correct problem with receiving large responses

The TCP handling reads into an intermediate staging buffer.  The
problem was that this buffer wasn't being emptied between multiple
reads, only after the last one.  If the total amount of data
received was more than the staging buffer size, the code failed
attempting to write beyond the buffer end.

This fix empties the staging buffer after (more or less) every
read from the network.
parent 1f682b81
......@@ -203,43 +203,74 @@ public:
///
/// \param data Buffer to receive incoming message
/// \param length Length of the data buffer
/// \param offset Offset into buffer where data is to be put
/// \param offset Offset into buffer where data is to be put. Although the
/// offset could be implied by adjusting "data" and "length"
/// appropriately, using this argument allows data to be specified as
/// "const void*" - the overhead of converting it to a pointer to a
/// set of bytes is hidden away here.
/// \param endpoint Source of the communication
/// \param callback Callback object
virtual void asyncReceive(void* data, size_t length, size_t offset,
IOEndpoint* endpoint, C& callback) = 0;
/// \brief Checks if the data received is complete.
///
/// This applies to TCP receives, where the data is a byte stream and a
/// receive is not guaranteed to receive the entire message. DNS messages
/// over TCP are prefixed by a two-byte count field. This method takes the
/// amount received so far and checks if the message is complete.
///
/// For a UDP receive, all the data is received in one I/O, so this is
/// effectively a no-op).
///
/// \param data Data buffer containing data to date
/// \param length Total amount of data in the buffer.
/// \brief Processes received data
///
/// In the IOFetch code, data is received into a staging buffer before being
/// copied into the target buffer. (This is because (a) we don't know how
/// much data we will be receiving, so don't know how to size the output
/// buffer and (b) TCP data is preceded by a two-byte count field that needs
/// to be discarded before being returned to the user.)
///
/// An additional consideration is that TCP data is not received in one
/// I/O - it may take a number of I/Os - each receiving any non-zero number
/// of bytes - to read the entire message.
///
/// So the IOFetch code has to loop until it determines that all the data
/// has been read. This is where this method comes in. It has several
/// functions:
///
/// - It checks if the received data is complete.
/// - If data is not complete, decides if the next set of data is to go into
/// the start of the staging buffer or at some offset into it. (This
/// simplifies the case we could have in a TCP receive where the two-byte
/// count field is received in one-byte chunks: we put off interpreting
/// the count until we have all of it. The alternative - copying the
/// data to the output buffer and interpreting the count from there -
/// would require moving the data in the output buffer by two bytes before
/// returning it to the caller.)
/// - Copies data from the staging buffer into the output buffer.
///
/// This functionality mainly applies to TCP receives. For UDP, all the
/// data is received in one I/O, so this just copies the data into the
/// output buffer.
///
/// \param staging Pointer to the start of the staging buffer.
/// \param length Amount of data in the staging buffer.
/// \param cumulative Amount of data received before the staging buffer is
/// processed (this includes the TCP count field if appropriate).
/// The value should be set to zero before the receive loop is
/// entered, and it will be updated by this method as required.
/// \param offset Offset into the staging buffer where the next read should
/// put the received data. It should be set to zero before the first
/// call and may be updated by this method.
/// \param expected Expected amount of data to be received. This is
/// really the TCP count field and is set to that value when enough
/// of a TCP message is received. It should be initialized to -1
/// before the first read is executed.
/// \param outbuff Output buffer. Data in the staging buffer may be copied
/// to this output buffer in the call.
///
/// \return true if the receive is complete, false if another receive is
/// needed.
virtual bool receiveComplete(const void* data, size_t length) = 0;
/// \brief Append Normalized Data
///
/// When a UDP buffer is received, the entire buffer contains the data.
/// When a TCP buffer is received, the first two bytes of the buffer hold
/// a length count. This method removes those bytes from the buffer.
///
/// \param inbuf Input buffer. This contains the data received over the
/// network connection.
/// \param length Amount of data in the input buffer. If TCP, this includes
/// the two-byte count field.
/// \param outbuf Pointer to output buffer to which the data will be
/// appended.
virtual void appendNormalizedData(const void* inbuf, size_t length,
isc::dns::OutputBufferPtr outbuf) = 0;
/// needed. This is always true for UDP, but for TCP involves
/// checking the amount of data received so far against the amount
/// expected (as indicated by the two-byte count field). If this
/// method returns false, another read should be queued and data
/// should be read into the staging buffer at offset given by the
/// "offset" parameter.
virtual bool processReceivedData(const void* staging, size_t length,
size_t& cumulative, size_t& offset,
size_t& expected,
isc::dns::OutputBufferPtr& outbuff) = 0;
/// \brief Cancel I/O On AsioSocket
virtual void cancel() = 0;
......@@ -330,24 +361,23 @@ public:
/// \brief Checks if the data received is complete.
///
/// \param data Unused
/// \param staging Unused
/// \param length Unused
/// \param cumulative Unused
/// \param offset Unused.
/// \param expected Unused.
/// \param outbuff Unused.
///
/// \return Always true
virtual bool receiveComplete(void*, size_t, size_t&) {
return (true);
}
/// \brief Append Normalized Data
///
/// \param inbuf Unused.
/// \param length Unused.
/// \param outbuf unused.
virtual void appendNormalizedData(const void*, size_t,
isc::dns::OutputBufferPtr)
virtual bool receiveComplete(const void* staging, size_t length,
size_t& cumulative, size_t& offset,
size_t& expected,
isc::dns::OutputBufferPtr& outbuff)
{
return (true);
}
/// \brief Cancel I/O On AsioSocket
///
/// Must be supplied as it is abstract in the base class.
......
......@@ -78,6 +78,8 @@ struct IOFetchData {
asio::deadline_timer timer; ///< Timer to measure timeouts
IOFetch::Protocol protocol; ///< Protocol being used
size_t cumulative; ///< Cumulative received amount
size_t expected; ///< Expected amount of data
size_t offset; ///< Offset to receive data
bool stopped; ///< Have we stopped running?
int timeout; ///< Timeout in ms
......@@ -129,6 +131,8 @@ struct IOFetchData {
timer(service.get_io_service()),
protocol(proto),
cumulative(0),
expected(0),
offset(0),
stopped(false),
timeout(wait),
origin(ASIO_UNKORIGIN)
......@@ -182,9 +186,6 @@ IOFetch::operator()(asio::error_code ec, size_t length) {
msg.addQuestion(data_->question);
MessageRenderer renderer(*data_->msgbuf);
msg.toWire(renderer);
// As this is a new fetch, clear the amount of data received
data_->cumulative = 0;
}
// If we timeout, we stop, which will can cancel outstanding I/Os and
......@@ -218,22 +219,29 @@ IOFetch::operator()(asio::error_code ec, size_t length) {
// we need to yield ... and we *really* don't want to set up another
// coroutine within that method.) So after each receive (and yield),
// we check if the operation is complete and if not, loop to read again.
//
// Another concession to TCP is that the amount of is contained in the
// first two bytes. This leads to two problems:
//
// a) We don't want those bytes in the return buffer.
// b) They may not both arrive in the first I/O.
//
// So... we need to loop until we have at least two bytes, then store
// the expected amount of data. Then we need to loop until we have
// received all the data before copying it back to the user's buffer.
// And we want to minimise the amount of copying...
data_->origin = ASIO_RECVSOCK;
data_->cumulative = 0; // No data yet received
data_->offset = 0; // First data into start of buffer
do {
CORO_YIELD data_->socket->asyncReceive(data_->staging.get(),
static_cast<size_t>(MIN_LENGTH), data_->cumulative,
data_->remote.get(), *this);
data_->cumulative += length;
} while (!data_->socket->receiveComplete(data_->staging.get(),
data_->cumulative));
/// Copy the answer into the response buffer. (TODO: If the
/// OutputBuffer object were made to meet the requirements of a
/// MutableBufferSequence, then it could be written to directly by
/// async_receive_from() and this additional copy step would be
/// unnecessary.)
data_->socket->appendNormalizedData(data_->staging.get(),
data_->cumulative, data_->received);
static_cast<size_t>(MIN_LENGTH),
data_->offset,
data_->remote.get(), *this);
} while (!data_->socket->processReceivedData(data_->staging.get(), length,
data_->cumulative, data_->offset,
data_->expected, data_->received));
// Finished with this socket, so close it. This will not generate an
// I/O error, but reset the origin to unknown in case we change this.
......
......@@ -24,6 +24,8 @@
#include <sys/socket.h>
#include <unistd.h> // for some IPC/network system calls
#include <algorithm>
#include <cassert>
#include <cstddef>
#include <boost/bind.hpp>
......@@ -135,32 +137,25 @@ public:
virtual void asyncReceive(void* data, size_t length, size_t offset,
IOEndpoint* endpoint, C& callback);
/// \brief Checks if the data received is complete.
/// \brief Process received data packet
///
/// Checks if all the data has been received by checking that the amount
/// of data received is equal to the number in the first two bytes of the
/// message plus two (for the count field itself).
/// See the description of IOAsioSocket::receiveComplete for a complete
/// description of this method.
///
/// \param data Data buffer containing data to date (ignored)
/// \param length Amount of data in the buffer.
/// \param staging Pointer to the start of the staging buffer.
/// \param length Amount of data in the staging buffer.
/// \param cumulative Amount of data received before the staging buffer is
/// processed.
/// \param offset Unused.
/// \param expected unused.
/// \param outbuff Output buffer. Data in the staging buffer is be copied
/// to this output buffer in the call.
///
/// \return true if the receive is complete, false if not.
virtual bool receiveComplete(const void* data, size_t length);
/// \brief Append Normalized Data
///
/// When a UDP buffer is received, the entire buffer contains the data.
/// When a TCP buffer is received, the first two bytes of the buffer hold
/// a length count. This method removes those bytes from the buffer.
///
/// \param inbuf Input buffer. This contains the data received over the
/// network connection.
/// \param length Amount of data in the input buffer. If TCP, this includes
/// the two-byte count field.
/// \param outbuf Pointer to output buffer to which the data will be
/// appended
virtual void appendNormalizedData(const void* inbuf, size_t length,
isc::dns::OutputBufferPtr outbuf);
/// \return Always true
virtual bool processReceivedData(const void* staging, size_t length,
size_t& cumulative, size_t& offset,
size_t& expected,
isc::dns::OutputBufferPtr& outbuff);
/// \brief Cancel I/O On Socket
virtual void cancel();
......@@ -335,32 +330,65 @@ TCPSocket<C>::asyncReceive(void* data, size_t length, size_t offset,
// Is the receive complete?
template <typename C> bool
TCPSocket<C>::receiveComplete(const void* data, size_t length) {
TCPSocket<C>::processReceivedData(const void* staging, size_t length,
size_t& cumulative, size_t& offset,
size_t& expected,
isc::dns::OutputBufferPtr& outbuff)
{
// Point to the data in the staging buffer and note how much there is.
const uint8_t* data = static_cast<const uint8_t*>(staging);
size_t data_length = length;
// Is the number is "expected" valid? It won't be unless we have received
// at least two bytes of data in total for this set of receives.
if (cumulative < 2) {
// "expected" is not valid. Did this read give us enough data to
// work it out?
cumulative += length;
if (cumulative < 2) {
// Nope, still not valid. This must have been the first packet and
// was only one byte long. Tell the fetch code to read the next
// packet into the staging buffer beyond the data that is already
// there so that the next time we are called we have a complete
// TCP count.
offset = cumulative;
return (false);
}
bool complete = false;
// Have enough data to interpret the packet count, so do so now.
expected = readUint16(data);
// If we have read at least two bytes, we can work out how much we should be
// reading.
if (length >= 2) {
// We have two bytes less of data to process. Point to the start of the
// data and adjust the packet size. Note that at this point,
// "cumulative" is the true amount of data in the staging buffer, not
// "length".
data += 2;
data_length = cumulative - 2;
} else {
// Convert first two bytes to a count and check that the following data
// is that length.
// TODO: Should we check to see if we have received too much data?
uint16_t expected = readUint16(data);
complete = ((expected + 2) == length);
// Update total amount of data received.
cumulative += length;
}
return (complete);
}
// Regardless of anything else, the next read goes into the start of the
// staging buffer.
offset = 0;
// Copy buffer less leading two bytes to the target buffer.
// Work out how much data we still have to put in the output buffer. (This
// could be zero if we have just interpreted the TCP count and that was
// set to zero.)
if (expected >= outbuff->getLength()) {
template <typename C> void
TCPSocket<C>::appendNormalizedData(const void* inbuf, size_t length,
isc::dns::OutputBufferPtr outbuf)
{
const uint8_t* bytebuff = static_cast<const uint8_t*>(inbuf);
outbuf->writeData(bytebuff + 2, length - 2);
// Still need data in the output packet. Copy what we can from the
// staging buffer to the output buffer.
size_t copy_amount = std::min(expected - outbuff->getLength(), data_length);
outbuff->writeData(data, copy_amount);
}
// We can now say if we have all the data.
return (expected == outbuff->getLength());
}
// Cancel I/O on the socket. No-op if the socket is not open.
......
......@@ -16,6 +16,9 @@
#include <cstdlib>
#include <string>
#include <iostream>
#include <iomanip>
#include <iterator>
#include <vector>
#include <gtest/gtest.h>
#include <boost/bind.hpp>
......@@ -48,7 +51,11 @@ const asio::ip::address TEST_HOST(asio::ip::address::from_string("127.0.0.1"));
const uint16_t TEST_PORT(5301);
// FIXME Shouldn't we send something that is real message?
const char TEST_DATA[] = "Test response from server to client (longer than 30 bytes)";
const int SEND_INTERVAL = 500; // Interval in ms between TCP sends
const int SEND_INTERVAL = 250; // Interval in ms between TCP sends
// The tests are complex, so debug output has been left in (although disabled).
// Set this to true to enable it.
const bool DEBUG = false;
/// \brief Test fixture for the asiolink::IOFetch.
class IOFetchTest : public virtual ::testing::Test, public virtual IOFetch::Callback
......@@ -70,10 +77,13 @@ public:
// response handler methods in this class) receives the question sent by the
// fetch object.
uint8_t receive_buffer_[512]; ///< Server receive buffer
uint8_t send_buffer_[512]; ///< Server send buffer
uint16_t send_size_; ///< Amount of data to sent
vector<uint8_t> send_buffer_; ///< Server send buffer
uint16_t send_cumulative_; ///< Data sent so far
// Other data.
string return_data_; ///< Data returned by server
bool debug_; ///< true to enable debug output
/// \brief Constructor
IOFetchTest() :
service_(),
......@@ -92,8 +102,9 @@ public:
timer_(service_.get_io_service()),
receive_buffer_(),
send_buffer_(),
send_size_(0),
send_cumulative_(0)
send_cumulative_(0),
return_data_(TEST_DATA),
debug_(DEBUG)
{
// Construct the data buffer for question we expect to receive.
Message msg(Message::RENDER);
......@@ -119,6 +130,10 @@ public:
/// \param length Amount of data received.
void udpReceiveHandler(udp::endpoint* remote, udp::socket* socket,
error_code ec = error_code(), size_t length = 0) {
if (debug_) {
cout << "udpReceiveHandler(): error = " << ec.value() <<
", length = " << length << endl;
}
// The QID in the incoming data is random so set it to 0 for the
// data comparison check. (It is set to 0 in the buffer containing
......@@ -132,7 +147,8 @@ public:
static_cast<const uint8_t*>(msgbuf_->getData())));
// Return a message back to the IOFetch object.
socket->send_to(asio::buffer(TEST_DATA, sizeof TEST_DATA), *remote);
socket->send_to(asio::buffer(return_data_.c_str(), return_data_.size()),
*remote);
}
/// \brief Completion Handler for accepting TCP data
......@@ -144,6 +160,10 @@ public:
/// \param ec Boost error code, value should be zero.
void tcpAcceptHandler(tcp::socket* socket, error_code ec = error_code())
{
if (debug_) {
cout << "tcpAcceptHandler(): error = " << ec.value() << endl;
}
// Expect that the accept completed without a problem.
EXPECT_EQ(0, ec.value());
......@@ -167,6 +187,10 @@ public:
void tcpReceiveHandler(tcp::socket* socket, error_code ec = error_code(),
size_t length = 0)
{
if (debug_) {
cout << "tcpReceiveHandler(): error = " << ec.value() <<
", length = " << length << endl;
}
// Expect that the receive completed without a problem.
EXPECT_EQ(0, ec.value());
......@@ -196,11 +220,12 @@ public:
static_cast<const uint8_t*>(msgbuf_->getData())));
// ... and return a message back. This has to be preceded by a two-byte
// count field. Construct the message.
assert(sizeof(send_buffer_) > (sizeof(TEST_DATA) + 2));
writeUint16(sizeof(TEST_DATA), send_buffer_);
copy(TEST_DATA, TEST_DATA + sizeof(TEST_DATA) - 1, send_buffer_ + 2);
send_size_ = sizeof(TEST_DATA) + 2;
// count field.
send_buffer_.clear();
send_buffer_.push_back(0);
send_buffer_.push_back(0);
writeUint16(return_data_.size(), &send_buffer_[0]);
copy(return_data_.begin(), return_data_.end(), back_inserter(send_buffer_));
// Send the data. This is done in multiple writes with a delay between
// each to check that the reassembly of TCP packets from fragments works.
......@@ -216,12 +241,16 @@ public:
///
/// \param socket Socket over which send should take place
void tcpSendData(tcp::socket* socket) {
if (debug_) {
cout << "tcpSendData()" << endl;
}
// Decide what to send based on the cumulative count
uint8_t* send_ptr = &send_buffer_[send_cumulative_];
// Pointer to data to send
size_t amount = 16; // Amount of data to send
if (send_cumulative_ > 30) {
amount = send_size_ - send_cumulative_;
amount = send_buffer_.size() - send_cumulative_;
}
// ... and send it. The amount sent is also passed as the first argument
......@@ -252,12 +281,17 @@ public:
void tcpSendHandler(size_t expected, tcp::socket* socket,
error_code ec = error_code(), size_t length = 0)
{
if (debug_) {
cout << "tcpSendHandler(): error = " << ec.value() <<
", length = " << length << endl;
}
EXPECT_EQ(0, ec.value()); // Expect no error
EXPECT_EQ(expected, length); // And that amount sent is as expected
// Do we need to send more?
send_cumulative_ += length;
if (send_cumulative_ < send_size_) {
if (send_cumulative_ < send_buffer_.size()) {
// Yes - set up a timer: the callback handler for the timer is
// tcpSendData, which will then send the next chunk. We pass the
......@@ -277,20 +311,22 @@ public:
///
/// \param result Result indicated by the callback
void operator()(IOFetch::Result result) {
if (debug_) {
cout << "operator()(): result = " << result << endl;
}
EXPECT_EQ(expected_, result); // Check correct result returned
EXPECT_FALSE(run_); // Check it is run only once
run_ = true; // Note success
// If the expected result for SUCCESS, then this should have been called
// when one of the "servers" in this class has sent back the TEST_DATA.
// when one of the "servers" in this class has sent back return_data_.
// Check the data is as expected/
if (expected_ == IOFetch::SUCCESS) {
EXPECT_EQ(sizeof(TEST_DATA), result_buff_->getLength());
EXPECT_EQ(return_data_.size(), result_buff_->getLength());
const uint8_t* start = static_cast<const uint8_t*>(result_buff_->getData());
EXPECT_TRUE(equal(TEST_DATA, (TEST_DATA + sizeof(TEST_DATA) - 1),
start));
EXPECT_TRUE(equal(return_data_.begin(), return_data_.end(), start));
}
// ... and cause the run loop to exit.
......@@ -360,6 +396,38 @@ public:
service_.run();
EXPECT_TRUE(run_);
}
/// \brief Send/Receive Test
///
/// Send a query to the server then receives a response.
///
/// \param Test data to return to client
void tcpSendReturnTest(const std::string& return_data) {
return_data_ = return_data;
protocol_ = IOFetch::TCP;
expected_ = IOFetch::SUCCESS;
// Socket into which the connection will be accepted
tcp::socket socket(service_.get_io_service());
// Acceptor object - called when the connection is made, the handler
// will initiate a read on the socket.
tcp::acceptor acceptor(service_.get_io_service(),
tcp::endpoint(tcp::v4(), TEST_PORT));
acceptor.async_accept(socket,
boost::bind(&IOFetchTest::tcpAcceptHandler, this, &socket, _1));
// Post the TCP fetch object to send the query and receive the response.
service_.get_io_service().post(tcp_fetch_);
// ... and execute all the callbacks. This exits when the fetch
// completes.
service_.run();
EXPECT_TRUE(run_); // Make sure the callback did execute
// Tidy up
socket.close();
}
};
// Check the protocol
......@@ -421,28 +489,27 @@ TEST_F(IOFetchTest, TcpTimeout) {
timeoutTest(IOFetch::TCP, tcp_fetch_);
}
TEST_F(IOFetchTest, TcpSendReceive) {
protocol_ = IOFetch::TCP;
expected_ = IOFetch::SUCCESS;
// Do a send and return with a small amount of data
// Socket into which the connection will be accepted
tcp::socket socket(service_.get_io_service());
TEST_F(IOFetchTest, TcpSendReceiveShort) {
tcpSendReturnTest(TEST_DATA);
}
// Acceptor object - called when the connection is made, the handler will
// initiate a read on the socket.
tcp::acceptor acceptor(service_.get_io_service(),
tcp::endpoint(tcp::v4(), TEST_PORT));
acceptor.async_accept(socket,
boost::bind(&IOFetchTest::tcpAcceptHandler, this, &socket, _1));
// Now with at least 16kB of data.
// Post the TCP fetch object to send the query and receive the response.
service_.get_io_service().post(tcp_fetch_);
TEST_F(IOFetchTest, TcpSendReceiveLong) {
const size_t REQUIRED_SIZE = 16 * 1024;
// ... and execute all the callbacks. This exits when the fetch completes.
service_.run();
EXPECT_TRUE(run_); // Make sure the callback did execute
// We could initialize the string with a repeat of a single character, but
// we choose to enure that there are different characters as an added test.
string data;
data.reserve(REQUIRED_SIZE);
while (data.size() < REQUIRED_SIZE) {
data += "An abritrary message that is returned to the IOFetch object!";
}
socket.close();
// ... and do the test with this data.
tcpSendReturnTest(data);
}
} // namespace asiolink
......@@ -35,6 +35,8 @@
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <dns/buffer.h>
#include <asio.hpp>
#include <asiolink/asiolink_utilities.h>
......@@ -45,6 +47,7 @@
using namespace asio;
using namespace asio::ip;
using namespace asiolink;
using namespace isc::dns;
using namespace std;
namespace {
......@@ -78,17 +81,20 @@ public:
struct PrivateData {
PrivateData() :
error_code_(), length_(0), cumulative_(0), name_(""),
queued_(NONE), called_(NONE)
error_code_(), length_(0), cumulative_(0), expected_(0), offset_(0),
name_(""), queued_(NONE), called_(NONE)
{}
asio::error_code error_code_; ///< Completion error code
size_t length_; ///< Bytes transfreed in this I/O
size_t length_; ///< Bytes transferred in this I/O
size_t cumulative_; ///< Cumulative bytes transferred
size_t expected_; ///< Expected amount of data
size_t offset_; ///< Where to put data in buffer
std::string name_; ///< Which of the objects this is
uint8_t data_[MIN_SIZE]; ///< Receive buffer
Operation queued_; ///< Queued operation
Operation called_; ///< Which callback called
uint8_t data_[MIN_SIZE]; ///< Receive buffer