From 682436e844799b2194eccc2ffdfb3fa3da527cea Mon Sep 17 00:00:00 2001 From: Stephen Morris Date: Mon, 28 Feb 2011 14:27:01 +0000 Subject: [PATCH] [trac499] Completed TCPSocket and unit tests --- src/lib/asiolink/Makefile.am | 1 + src/lib/asiolink/asiolink_utilities.h | 61 +++ src/lib/asiolink/io_asio_socket.h | 81 ++-- src/lib/asiolink/io_fetch.cc | 3 +- src/lib/asiolink/tcp_socket.h | 111 +++-- src/lib/asiolink/tests/Makefile.am | 1 + .../tests/asiolink_utilities_unittest.cc | 74 ++++ src/lib/asiolink/tests/tcp_socket_unittest.cc | 412 ++++++++++++------ src/lib/asiolink/tests/udp_socket_unittest.cc | 4 +- src/lib/asiolink/udp_socket.h | 63 ++- 10 files changed, 546 insertions(+), 265 deletions(-) create mode 100644 src/lib/asiolink/asiolink_utilities.h create mode 100644 src/lib/asiolink/tests/asiolink_utilities_unittest.cc diff --git a/src/lib/asiolink/Makefile.am b/src/lib/asiolink/Makefile.am index 71d31f9a6..b6133bbff 100644 --- a/src/lib/asiolink/Makefile.am +++ b/src/lib/asiolink/Makefile.am @@ -13,6 +13,7 @@ CLEANFILES = *.gcno *.gcda # which would make the build fail with -Werror (our default setting). lib_LTLIBRARIES = libasiolink.la libasiolink_la_SOURCES = asiolink.h +libasiolink_la_SOURCES += asiolink_utilities.h libasiolink_la_SOURCES += asiodef.cc asiodef.h libasiolink_la_SOURCES += dns_answer.h libasiolink_la_SOURCES += dns_lookup.h diff --git a/src/lib/asiolink/asiolink_utilities.h b/src/lib/asiolink/asiolink_utilities.h new file mode 100644 index 000000000..f7f82be69 --- /dev/null +++ b/src/lib/asiolink/asiolink_utilities.h @@ -0,0 +1,61 @@ +// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC") +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE +// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +#ifndef __ASIOLINK_UTILITIES_H +#define __ASIOLINK_UTILITIES_H + +#include + +namespace asiolink { + +/// \brief Read Unsigned 16-Bit Integer from Buffer +/// +/// This is essentially a copy of the isc::dns::InputBuffer::readUint16. It +/// should really be moved into a separate library. +/// +/// \param buffer Data buffer at least two bytes long of which the first two +/// bytes are assumed to represent a 16-bit integer in network-byte +/// order. +/// +/// \return Value of 16-bit integer +inline uint16_t +readUint16(const void* buffer) { + const uint8_t* byte_buffer = static_cast(buffer); + + uint16_t result = (static_cast(byte_buffer[0])) << 8; + result |= static_cast(byte_buffer[1]); + + return (result); +} + +/// \brief Write Unisgned 16-Bit Integer to Buffer +/// +/// This is essentially a copy of isc::dns::OutputBuffer::writeUint16. It +/// should really be moved into a separate library. +/// +/// \param value 16-bit value to convert +/// \param buffer Data buffer at least two bytes long into which the 16-bit +/// value is written in network-byte order. + +inline void +writeUint16(uint16_t value, void* buffer) { + uint8_t* byte_buffer = static_cast(buffer); + + byte_buffer[0] = static_cast((value & 0xff00U) >> 8); + byte_buffer[1] = static_cast(value & 0x00ffU); +} + +} // namespace asiolink + +#endif // __ASIOLINK_UTILITIES_H diff --git a/src/lib/asiolink/io_asio_socket.h b/src/lib/asiolink/io_asio_socket.h index 2e165f401..fbf749ef3 100644 --- a/src/lib/asiolink/io_asio_socket.h +++ b/src/lib/asiolink/io_asio_socket.h @@ -41,7 +41,7 @@ public: IOError(file, line, what) {} }; -/// \brief Socket not open +/// \brief Error setting socket options /// /// Thrown if attempt to change socket options fails. class SocketSetError : public IOError { @@ -50,7 +50,7 @@ public: IOError(file, line, what) {} }; -/// \brief Buffer Overflow +/// \brief Buffer overflow /// /// Thrown if an attempt is made to receive into an area beyond the end of /// the receive data buffer. @@ -108,24 +108,23 @@ public: /// \brief Return the "native" representation of the socket. /// - /// In practice, this is the file descriptor of the socket for - /// UNIX-like systems so the current implementation simply uses - /// \c int as the type of the return value. - /// We may have to need revisit this decision later. + /// In practice, this is the file descriptor of the socket for UNIX-like + /// systems so the current implementation simply uses \c int as the type of + /// the return value. We may have to need revisit this decision later. /// - /// In general, the application should avoid using this method; - /// it essentially discloses an implementation specific "handle" that - /// can change the internal state of the socket (consider the - /// application closes it, for example). - /// But we sometimes need to perform very low-level operations that - /// requires the native representation. Passing the file descriptor - /// to a different process is one example. - /// This method is provided as a necessary evil for such limited purposes. + /// In general, the application should avoid using this method; it + /// essentially discloses an implementation specific "handle" that can + /// change the internal state of the socket (consider what would happen if + /// the application closes it, for example). But we sometimes need to + /// perform very low-level operations that requires the native + /// representation. Passing the file descriptor to a different process is + /// one example. This method is provided as a necessary evil for such + //// limited purposes. /// /// This method never throws an exception. /// /// \return The native representation of the socket. This is the socket - /// file descriptor for UNIX-like systems. + /// file descriptor for UNIX-like systems. virtual int getNative() const = 0; /// \brief Return the transport protocol of the socket. @@ -135,16 +134,15 @@ public: /// /// This method never throws an exception. /// - /// \return IPPROTO_UDP for UDP sockets - /// \return IPPROTO_TCP for TCP sockets + /// \return \c IPPROTO_UDP for UDP sockets, \c IPPROTO_TCP for TCP sockets virtual int getProtocol() const = 0; /// \brief Is Open() synchronous? /// - /// On a UDP socket, an "open" operation is merely a call to "open()" on - /// the underlying socket (so completes immediately), but on a TCP socket it - /// also includings connecting to the remote end (which is done as an - /// asynchronous operation). + /// On a TCP socket, an "open" operation is a call to the socket's "open()" + /// method followed by a connection to the remote system: it is an + /// asynchronous operation. On a UDP socket, it is just a call to "open()" + /// and completes synchronously. /// /// For TCP, signalling of the completion of the operation is done by /// by calling the callback function in the normal way. This could be done @@ -154,31 +152,31 @@ public: /// asynchronously. /// /// Owing to the way that the stackless coroutines are implemented, we need - /// to know _before_ executing the operation whether or not the open is - /// asynchronous. So this method simply provides that information. + /// to know _before_ executing the "open" function whether or not it is + /// asynchronous. So this method is called to provide that information. /// /// (The reason there is a need to know is because the call to open() passes /// in the state of the coroutine at the time the call is made. On an /// asynchronous I/O, we need to set the state to point to the statement - /// after the call to open() before we pass the corotuine to the open() - /// call. Unfortunately, the macros that do this also yield control - which - /// we don't want to do if the open is synchronous. Hence we need to know - /// before we make the call to open() whether that call will complete - /// asynchronously.) + /// after the call to open() _before_ we pass the corouine to the open() + /// call. Unfortunately, the macros that set the state of the coroutine + /// also yield control - which we don't want to do if the open is + /// synchronous. Hence we need to know before we make the call to open() + /// whether that call will complete asynchronously.) virtual bool isOpenSynchronous() const = 0; /// \brief Open AsioSocket /// /// Opens the socket for asynchronous I/O. The open will complete /// synchronously on UCP or asynchronously on TCP (in which case a callback - /// will be queued): what will happen can be found by calling the method - /// isOpenSynchronous(). + /// will be queued). /// /// \param endpoint Pointer to the endpoint object. This is ignored for - /// a UDP socket (the target is specified in the send call), but should - /// be of type TCPEndpoint for a TCP connection. + /// a UDP socket (the target is specified in the send call), but + /// should be of type TCPEndpoint for a TCP connection. /// \param callback I/O Completion callback, called when the operation has - /// completed, but only if the operation was asynchronous. + /// completed, but only if the operation was asynchronous. (It is + /// ignored on a UDP socket.) virtual void open(const IOEndpoint* endpoint, C& callback) = 0; /// \brief Send Asynchronously @@ -196,7 +194,7 @@ public: /// \brief Receive Asynchronously /// - /// This correstponds to async_receive_from() for UDP sockets and + /// This corresponds to async_receive_from() for UDP sockets and /// async_receive() for TCP. In both cases, an endpoint argument is /// supplied to receive the source of the communication. For TCP it will /// be filled in with details of the connection. @@ -214,22 +212,17 @@ public: /// This applies to TCP receives, where the data is a byte stream and a /// receive is not guaranteed to receive the entire message. DNS messages /// over TCP are prefixed by a two-byte count field. This method takes the - /// amount received so far and the amount received in this I/O and checks - /// if the message is complete, returning the appropriate indication. As - /// a side-effect, it also updates the amount received. + /// amount received so far and checks if the message is complete. /// /// For a UDP receive, all the data is received in one I/O, so this is - /// effectively a no-op (although it does update the amount received). + /// effectively a no-op). /// /// \param data Data buffer containing data to date - /// \param length Amount of data received in last asynchronous I/O - /// \param cumulative On input, amount of data received before the last - /// I/O. On output, the total amount of data received to date. + /// \param length Total amount of data in the buffer. /// /// \return true if the receive is complete, false if another receive is - /// needed. - virtual bool receiveComplete(void* data, size_t length, - size_t& cumulative) = 0; + /// needed. + virtual bool receiveComplete(const void* data, size_t length) = 0; /// \brief Cancel I/O On AsioSocket virtual void cancel() = 0; diff --git a/src/lib/asiolink/io_fetch.cc b/src/lib/asiolink/io_fetch.cc index d890a52f4..212dd2646 100644 --- a/src/lib/asiolink/io_fetch.cc +++ b/src/lib/asiolink/io_fetch.cc @@ -222,8 +222,9 @@ IOFetch::operator()(asio::error_code ec, size_t length) { CORO_YIELD data_->socket->asyncReceive(data_->data.get(), static_cast(MIN_LENGTH), data_->cumulative, data_->remote.get(), *this); + data_->cumulative += length; std::cerr << "IOFetch: resuming after asynchronous receive\n"; - } while (!data_->socket->receiveComplete(data_->data.get(), length, + } while (!data_->socket->receiveComplete(data_->data.get(), data_->cumulative)); // The message is not rendered yet, so we can't print it easily diff --git a/src/lib/asiolink/tcp_socket.h b/src/lib/asiolink/tcp_socket.h index a7cc8e97d..d049f497b 100644 --- a/src/lib/asiolink/tcp_socket.h +++ b/src/lib/asiolink/tcp_socket.h @@ -24,7 +24,6 @@ #include #include // for some IPC/network system calls -#include #include #include @@ -34,6 +33,7 @@ #include +#include #include #include #include @@ -65,15 +65,15 @@ public: /// \brief Constructor from an ASIO TCP socket. /// - /// \param socket The ASIO representation of the TCP socket. It - /// is assumed that the caller will open and close the socket, so - /// these operations are a no-op for that socket. + /// \param socket The ASIO representation of the TCP socket. It is assumed + /// that the caller will open and close the socket, so these + /// operations are a no-op for that socket. TCPSocket(asio::ip::tcp::socket& socket); /// \brief Constructor /// /// Used when the TCPSocket is being asked to manage its own internal - /// socket. It is assumed that open() and close() will not be used. + /// socket. In this case, the open() and close() methods are used. /// /// \param service I/O Service object used to manage the socket. TCPSocket(IOService& service); @@ -100,10 +100,10 @@ public: /// \brief Open Socket /// - /// Opens the UDP socket. This is an asynchronous operation, completion of + /// Opens the TCP socket. This is an asynchronous operation, completion of /// which will be signalled via a call to the callback function. /// - /// \param endpoint Endpoint to which the socket will connect to. + /// \param endpoint Endpoint to which the socket will connect. /// \param callback Callback object. virtual void open(const IOEndpoint* endpoint, C& callback); @@ -115,7 +115,8 @@ public: /// /// \param data Data to send /// \param length Length of data to send - /// \param endpoint Target of the send + /// \param endpoint Target of the send. (Unused for a TCP socket because + /// that was determined when the connection was opened.) /// \param callback Callback object. virtual void asyncSend(const void* data, size_t length, const IOEndpoint* endpoint, C& callback); @@ -136,21 +137,15 @@ public: /// \brief Checks if the data received is complete. /// - /// As all the data is received in one I/O, so this is, this is effectively - /// a no-op (although it does update the amount of data received). + /// Checks if all the data has been received by checking that the amount + /// of data received is equal to the number in the first two bytes of the + /// message plus two (for the count field itself). /// - /// \param data Data buffer containing data to date. (This is ignored - /// for TCP receives.) - /// \param length Amount of data received in last asynchronous I/O - /// \param cumulative On input, amount of data received before the last - /// I/O. On output, the total amount of data received to date. + /// \param data Data buffer containing data to date (ignored) + /// \param length Amount of data in the buffer. /// - /// \return true if the receive is complete, false if another receive is - /// needed. - virtual bool receiveComplete(void*, size_t length, size_t& cumulative) { - cumulative = length; - return (true); - } + /// \return true if the receive is complete, false if not. + virtual bool receiveComplete(const void* data, size_t length); /// \brief Cancel I/O On Socket virtual void cancel(); @@ -176,6 +171,10 @@ private: // achieved by altering isc::dns::buffer to have pairs of methods: // getLength()/getTCPLength(), getData()/getTCPData(), with the getTCPXxx() // methods taking into account a two-byte count field.) + // + // The option of sending the data in two operations, the count followed by + // the data was discounted as that would lead to two callbacks which would + // cause problems with the stackless coroutine code. isc::dns::OutputBufferPtr send_buffer_; ///< Send buffer }; @@ -212,8 +211,6 @@ TCPSocket::open(const IOEndpoint* endpoint, C& callback) { // Ignore opens on already-open socket. Don't throw a failure because // of uncertainties as to what precedes whan when using asynchronous I/O. // At also allows us a treat a passed-in socket as a self-managed socket. - - std::cerr << "TCPSocket::open(): open_ flags is " << isopen_ << "\n"; if (!isopen_) { if (endpoint->getFamily() == AF_INET) { socket_.open(asio::ip::tcp::v4()); @@ -266,8 +263,6 @@ TCPSocket::asyncSend(const void* data, size_t length, const IOEndpoint*, send_buffer_->writeData(data, length); // ... and send it - std::cerr << "TCPSocket::asyncSend(): sending " << count << " data bytes\n"; - socket_.async_send(asio::buffer(send_buffer_->getData(), send_buffer_->getLength()), callback); } catch (boost::numeric::bad_numeric_cast& e) { @@ -281,34 +276,70 @@ TCPSocket::asyncSend(const void* data, size_t length, const IOEndpoint*, } } -// Receive a message. Note that the "cumulative" argument is ignored - every TCP -// receive is put into the buffer beginning at the start - there is no concept -// receiving a subsequent part of a message. Same critera as before concerning -// the need for the socket to be open. - +// Receive a message. Note that the "offset" argument is used as an index +// into the buffer in order to decide where to put the data. It is up to the +// caller to initialize the data to zero template void -TCPSocket::asyncReceive(void* data, size_t length, size_t, +TCPSocket::asyncReceive(void* data, size_t length, size_t offset, IOEndpoint* endpoint, C& callback) { if (isopen_) { - - // Upconvert the endpoint again. + // Upconvert to a TCPEndpoint. We need to do this because although + // IOEndpoint is the base class of UDPEndpoint and TCPEndpoint, it + // does not contain a method for getting at the underlying endpoint + // type - that is in the derived class and the two classes differ on + // return type. assert(endpoint->getProtocol() == IPPROTO_TCP); - const TCPEndpoint* tcp_endpoint = - static_cast(endpoint); - std::cerr << "TCPSocket::asyncReceive(): receiving from " << - tcp_endpoint->getAddress().toText() << - ", port " << tcp_endpoint->getPort() << "\n"; - - // TODO: Complete TCPSocket::asyncReceive() + TCPEndpoint* tcp_endpoint = static_cast(endpoint); + + // Write the endpoint details from the comminications link. Ideally + // we should make IOEndpoint assignable, but this runs in to all sorts + // of problems concerning the management of the underlying Boost + // endpoint (e.g. if it is not self-managed, is the copied one + // self-managed?) The most pragmatic solution is to let Boost take care + // of everything and copy details of the underlying endpoint. + tcp_endpoint->getASIOEndpoint() = socket_.remote_endpoint(); + + // Ensure we can write into the buffer and if so, set the pointer to + // where the data will be written. + if (offset >= length) { + isc_throw(BufferOverflow, "attempt to read into area beyond end of " + "TCP receive buffer"); + } + void* buffer_start = static_cast(static_cast(data) + offset); + // ... and kick off the read. + socket_.async_receive(asio::buffer(buffer_start, length - offset), callback); + } else { isc_throw(SocketNotOpen, "attempt to receive from a TCP socket that is not open"); } } +// Is the receive complete? + +template bool +TCPSocket::receiveComplete(const void* data, size_t length) { + + bool complete = false; + + // If we have read at least two bytes, we can work out how much we should be + // reading. + if (length >= 2) { + + // Convert first two bytes to a count and check that the following data + // is that length. + // TODO: Should we check to see if we have received too much data? + uint16_t expected = readUint16(data); + complete = ((expected + 2) == length); + } + + return (complete); +} + // Cancel I/O on the socket. No-op if the socket is not open. + template void TCPSocket::cancel() { if (isopen_) { diff --git a/src/lib/asiolink/tests/Makefile.am b/src/lib/asiolink/tests/Makefile.am index ded145c17..13f63e676 100644 --- a/src/lib/asiolink/tests/Makefile.am +++ b/src/lib/asiolink/tests/Makefile.am @@ -18,6 +18,7 @@ TESTS += run_unittests run_unittests_SOURCES = run_unittests.cc run_unittests_SOURCES += $(top_srcdir)/src/lib/dns/tests/unittest_util.h run_unittests_SOURCES += $(top_srcdir)/src/lib/dns/tests/unittest_util.cc +run_unittests_SOURCES += asiolink_utilities_unittest.cc run_unittests_SOURCES += io_address_unittest.cc run_unittests_SOURCES += io_endpoint_unittest.cc run_unittests_SOURCES += io_fetch_unittest.cc diff --git a/src/lib/asiolink/tests/asiolink_utilities_unittest.cc b/src/lib/asiolink/tests/asiolink_utilities_unittest.cc new file mode 100644 index 000000000..51f565f87 --- /dev/null +++ b/src/lib/asiolink/tests/asiolink_utilities_unittest.cc @@ -0,0 +1,74 @@ +// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC") +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE +// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +/// \brief Test of asiolink utilties +/// +/// Tests the fuctionality of the asiolink utilities code by comparing them +/// with the equivalent methods in isc::dns::[Input/Output]Buffer. + +#include + +#include + +#include +#include + +using namespace asiolink; +using namespace isc::dns; + +TEST(asioutil, readUint16) { + + // Reference buffer + uint8_t data[2]; + isc::dns::InputBuffer buffer(data, sizeof(data)); + + // Avoid possible compiler warnings by only setting uint8_t variables to + // uint8_t values. + uint8_t i8 = 0; + uint8_t j8 = 0; + for (int i = 0; i < (2 << 8); ++i, ++i8) { + for (int j = 0; j < (2 << 8); ++j, ++j8) { + data[0] = i8; + data[1] = j8; + buffer.setPosition(0); + EXPECT_EQ(buffer.readUint16(), readUint16(data)); + } + } +} + + +TEST(asioutil, writeUint16) { + + // Reference buffer + isc::dns::OutputBuffer buffer(2); + uint8_t test[2]; + + // Avoid possible compiler warnings by only setting uint16_t variables to + // uint16_t values. + uint16_t i16 = 0; + for (uint32_t i = 0; i < (2 << 16); ++i, ++i16) { + + // Write the reference data + buffer.clear(); + buffer.writeUint16(i16); + + // ... and the test data + writeUint16(i16, test); + + // ... and compare + const uint8_t* ref = static_cast(buffer.getData()); + EXPECT_EQ(ref[0], test[0]); + EXPECT_EQ(ref[1], test[1]); + } +} diff --git a/src/lib/asiolink/tests/tcp_socket_unittest.cc b/src/lib/asiolink/tests/tcp_socket_unittest.cc index d37f23677..8ecadaf5e 100644 --- a/src/lib/asiolink/tests/tcp_socket_unittest.cc +++ b/src/lib/asiolink/tests/tcp_socket_unittest.cc @@ -37,6 +37,7 @@ #include +#include #include #include #include @@ -56,11 +57,9 @@ const char OUTBOUND_DATA[] = "Data sent from client to server"; const char INBOUND_DATA[] = "Returned data from server to client"; } -/// /// An instance of this object is passed to the asynchronous I/O functions -/// and the operator() method is called when when an asynchronous I/O -/// completes. The arguments to the completion callback are stored for later -/// retrieval. +/// and the operator() method is called when when an asynchronous I/O completes. +/// The arguments to the completion callback are stored for later retrieval. class TCPCallback { public: /// \brief Operations the server is doing @@ -74,17 +73,20 @@ public: /// \brief Minimim size of buffers enum { - MIN_SIZE = 4096 + MIN_SIZE = (64 * 1024 + 2) ///< 64kB + two bytes for a count }; struct PrivateData { PrivateData() : - error_code_(), length_(0), name_(""), queued_(NONE), called_(NONE) + error_code_(), length_(0), cumulative_(0), name_(""), + queued_(NONE), called_(NONE) {} asio::error_code error_code_; ///< Completion error code - size_t length_; ///< Number of bytes transferred + size_t length_; ///< Bytes transfreed in this I/O + size_t cumulative_; ///< Cumulative bytes transferred std::string name_; ///< Which of the objects this is + uint8_t data_[MIN_SIZE]; ///< Receive buffer Operation queued_; ///< Queued operation Operation called_; ///< Which callback called }; @@ -103,7 +105,7 @@ public: /// \param which Which of the two callback objects this is TCPCallback(std::string which) : ptr_(new PrivateData()) { - setName(which); + ptr_->name_ = which; } /// \brief Destructor @@ -114,7 +116,7 @@ public: /// \brief Client Callback Function /// - /// Called when an asynchronous connect is completed by the client, this + /// Called when an asynchronous operation is completed by the client, this /// stores the origin of the operation in the client_called_ data member. /// /// \param ec I/O completion error code passed to callback function. @@ -123,8 +125,8 @@ public: size_t length = 0) { setCode(ec.value()); - setCalled(getQueued()); - setLength(length); + ptr_->called_ = ptr_->queued_; + ptr_->length_ = length; } /// \brief Get I/O completion error code @@ -140,57 +142,138 @@ public: } /// \brief Get number of bytes transferred in I/O - size_t getLength() { + size_t& length() { return (ptr_->length_); } - /// \brief Set number of bytes transferred in I/O + /// \brief Get cumulative number of bytes transferred in I/O + size_t& cumulative() { + return (ptr_->cumulative_); + } + + /// \brief Access Data Member /// - /// \param length New value of length parameter - void setLength(size_t length) { - ptr_->length_ = length; + /// \param Reference to the data member + uint8_t* data() { + return (ptr_->data_); } /// \brief Get flag to say what was queued - Operation getQueued() { + Operation& queued() { return (ptr_->queued_); } - /// \brief Set flag to say what is being queued - /// - /// \param called New value of queued parameter - void setQueued(Operation queued) { - ptr_->queued_ = queued; - } - /// \brief Get flag to say when callback was called - Operation getCalled() { + Operation& called() { return (ptr_->called_); } - /// \brief Set flag to say when callback was called - /// - /// \param called New value of called parameter - void setCalled(Operation called) { - ptr_->called_ = called; - } - /// \brief Return instance of callback name - std::string getName() { + std::string& name() { return (ptr_->name_); } - /// \brief Set callback name - /// - /// \param name New value of the callback name - void setName(const std::string& name) { - ptr_->name_ = name; - } - private: boost::shared_ptr ptr_; ///< Pointer to private data }; + +// Read Server Data +// +// Called in the part of the test that has the client send a message to the +// server, this loops until all the data has been read (synchronously) by the +// server. +// +// "All the data read" means that the server has received a message that is +// preceded by a two-byte count field and that the total amount of data received +// from the remote end is equal to the value in the count field plus two bytes +// for the count field itself. +// +// \param socket Socket on which the server is reading data +// \param server_cb Structure in which server data is held. +void +serverRead(tcp::socket& socket, TCPCallback& server_cb) { + + // Until we read something, the read is not complete. + bool complete = false; + + // As we may need to read multiple times, keep a count of the cumulative + // amount of data read and do successive reads into the appropriate part + // of the buffer. + // + // Note that there are no checks for buffer overflow - this is a test + // program and we have sized the buffer to be large enough for the test. + server_cb.cumulative() = 0; + + while (! complete) { + + // Read block of data and update cumulative amount of data received. + server_cb.length() = socket.receive( + asio::buffer(server_cb.data() + server_cb.cumulative(), + TCPCallback::MIN_SIZE - server_cb.cumulative())); + server_cb.cumulative() += server_cb.length(); + + // If we have read at least two bytes, we can work out how much we + // should be reading. + if (server_cb.cumulative() >= 2) { + uint16_t expected = readUint16(server_cb.data()); + if ((expected + 2) == server_cb.cumulative()) { + + // Amount of data read from socket equals the size of the + // message (as indicated in the first two bytes of the message) + // plus the size of the count field. Therefore we have received + // all the data. + complete = true; + } + } + } +} + +// Client read complete? +// +// This function is called when it appears that a client callback has been +// executed as the result of a read. It checks to see if all the data has been +// read and, if not, queues another asynchronous read. +// +// "All the data read" means that the client has received a message that is +// preceded by a two-byte count field and that the total amount of data received +// from the remote end is equal to the value in the count field plus two bytes +// for the count field itself. +// +// \param client TCPSocket object representing the client (i.e. the object +// under test). +// \param client_cb TCPCallback object holding information about the client. +// \param client_remote_endpoint Needed for the call to the client's asyncRead() +// method (but otherwise unused). +// +// \return true if the read is complete, false if not. +bool +clientReadComplete(TCPSocket& client, TCPCallback& client_cb, + TCPEndpoint& client_remote_endpoint) +{ + // Assume that all the data has not been read. + bool complete = false; + + // Check that the callback has in fact completed. + EXPECT_EQ(TCPCallback::READ, client_cb.called()); + EXPECT_EQ(0, client_cb.getCode()); + + // Update length of data received. + client_cb.cumulative() += client_cb.length(); + + // If the data is not complete, queue another read. + if (!client.receiveComplete(client_cb.data(), client_cb.cumulative())) { + client_cb.called() = TCPCallback::NONE; + client_cb.queued() = TCPCallback::READ; + client_cb.length() = 0; + client.asyncReceive(client_cb.data(), TCPCallback::MIN_SIZE , + client_cb.cumulative(), &client_remote_endpoint, + client_cb); + } + + return (complete); +} + // TODO: Need to add a test to check the cancel() method // Tests the operation of a TCPSocket by opening it, sending an asynchronous @@ -201,7 +284,12 @@ TEST(TCPSocket, SequenceTest) { // Common objects. IOService service; // Service object for async control - // Server + // The client - the TCPSocket being tested + TCPSocket client(service);// Socket under test + TCPCallback client_cb("Client"); // Async I/O callback function + TCPEndpoint client_remote_endpoint; // Where client receives message from + + // The server - with which the client communicates. IOAddress server_address(SERVER_ADDRESS); // Address of target server TCPCallback server_cb("Server"); // Server callback @@ -210,39 +298,23 @@ TEST(TCPSocket, SequenceTest) { TCPEndpoint server_remote_endpoint; // Address where server received message from tcp::socket server_socket(service.get_io_service()); // Socket used for server - char server_data[TCPCallback::MIN_SIZE]; - // Data received by server - ASSERT_GT(sizeof(server_data), sizeof(OUTBOUND_DATA)); - // Make sure it's large enough - // The client - the TCPSocket being tested - TCPSocket client(service);// Socket under test - TCPCallback client_cb("Client"); // Async I/O callback function - TCPEndpoint client_remote_endpoint; // Where client receives message from - char client_data[TCPCallback::MIN_SIZE]; - // Data received by client - ASSERT_GT(sizeof(client_data), sizeof(OUTBOUND_DATA)); - // Make sure it's large enough - //size_t client_cumulative = 0; // Cumulative data received - - // The server - with which the client communicates. For convenience, we - // use the same io_service, and use the endpoint object created for - // the client to send to as the endpoint object in the constructor. - - std::cerr << "Setting up acceptor\n"; - // Set up the server to accept incoming connections. - server_cb.setQueued(TCPCallback::ACCEPT); - server_cb.setCalled(TCPCallback::NONE); + // Step 1. Create the connection between the client and the server. Set + // up the server to accept incoming connections and have the client open + // a channel to it. + + // Set up server - open socket and queue an accept. + server_cb.queued() = TCPCallback::ACCEPT; + server_cb.called() = TCPCallback::NONE; server_cb.setCode(42); // Some error tcp::acceptor acceptor(service.get_io_service(), tcp::endpoint(tcp::v4(), SERVER_PORT)); acceptor.set_option(tcp::acceptor::reuse_address(true)); acceptor.async_accept(server_socket, server_cb); - std::cerr << "Setting up client\n"; - // Open the client socket - the operation should be asynchronous - client_cb.setQueued(TCPCallback::OPEN); - client_cb.setCalled(TCPCallback::NONE); + // Set up client - connect to the server. + client_cb.queued() = TCPCallback::OPEN; + client_cb.called() = TCPCallback::NONE; client_cb.setCode(43); // Some error EXPECT_FALSE(client.isOpenSynchronous()); client.open(&server_endpoint, client_cb); @@ -250,100 +322,150 @@ TEST(TCPSocket, SequenceTest) { // Run the open and the accept callback and check that they ran. service.run_one(); service.run_one(); - - EXPECT_EQ(TCPCallback::ACCEPT, server_cb.getCalled()); + + EXPECT_EQ(TCPCallback::ACCEPT, server_cb.called()); EXPECT_EQ(0, server_cb.getCode()); - EXPECT_EQ(TCPCallback::OPEN, client_cb.getCalled()); + + EXPECT_EQ(TCPCallback::OPEN, client_cb.called()); EXPECT_EQ(0, client_cb.getCode()); - // Write something to the server using the client and read it on ther server. - server_cb.setCalled(TCPCallback::NONE); - server_cb.setQueued(TCPCallback::READ); - server_cb.setCode(142); // Arbitrary number - server_cb.setLength(0); - server_socket.async_receive(buffer(server_data, sizeof(server_data)), server_cb); + // Step 2. Get the client to write to the server asynchronously. The + // server will loop reading the data synchronously. - client_cb.setCalled(TCPCallback::NONE); - client_cb.setQueued(TCPCallback::WRITE); + // Write asynchronously to the server. + client_cb.called() = TCPCallback::NONE; + client_cb.queued() = TCPCallback::WRITE; client_cb.setCode(143); // Arbitrary number - client_cb.setLength(0); + client_cb.length() = 0; client.asyncSend(OUTBOUND_DATA, sizeof(OUTBOUND_DATA), &server_endpoint, client_cb); - // Run the write and read callback and check they ran - service.run_one(); - service.run_one(); + // Synchronously read the data from the server.; + serverRead(server_socket, server_cb); - // Check lengths. As the client wrote the buffer, currently two bytes - // will be prepended by the client containing the length. - EXPECT_EQ(TCPCallback::READ, server_cb.getCalled()); - EXPECT_EQ(0, server_cb.getCode()); - EXPECT_EQ(sizeof(OUTBOUND_DATA) + 2, server_cb.getLength()); + // Wait for the client callback to complete. + service.run_one(); - EXPECT_EQ(TCPCallback::WRITE, client_cb.getCalled()); + // Check the client state + EXPECT_EQ(TCPCallback::WRITE, client_cb.called()); EXPECT_EQ(0, client_cb.getCode()); - EXPECT_EQ(sizeof(OUTBOUND_DATA) + 2, client_cb.getLength()); - - // Check that the first two bytes of the buffer are in fact the remaining - // length of the buffer (code copied from isc::dns::buffer.h) - uint16_t count = ((unsigned int)(server_data[0])) << 8; - count |= ((unsigned int)(server_data[1])); - EXPECT_EQ(sizeof(OUTBOUND_DATA), count); + EXPECT_EQ(sizeof(OUTBOUND_DATA) + 2, client_cb.length()); + + // ... and check what the server received. + EXPECT_EQ(sizeof(OUTBOUND_DATA) + 2, server_cb.cumulative()); + EXPECT_TRUE(equal(OUTBOUND_DATA, + (OUTBOUND_DATA + (sizeof(OUTBOUND_DATA) - 1)), + (server_cb.data() + 2))); + + // Step 3. Get the server to write all the data asynchronously and have the + // client loop (asynchronously) reading the data. Note that we copy the + // data into the server's internal buffer in order to precede it with a two- + // byte count field. + + // Have the server write asynchronously to the client. + server_cb.called() = TCPCallback::NONE; + server_cb.queued() = TCPCallback::WRITE; + server_cb.length() = 0; + server_cb.cumulative() = 0; + + writeUint16(sizeof(INBOUND_DATA), server_cb.data()); + copy(INBOUND_DATA, (INBOUND_DATA + sizeof(INBOUND_DATA) - 1), + (server_cb.data() + 2)); + server_socket.async_send(asio::buffer(server_cb.data(), + (sizeof(INBOUND_DATA) + 2)), + server_cb); + + // Have the client read asynchronously. + client_cb.called() = TCPCallback::NONE; + client_cb.queued() = TCPCallback::READ; + client_cb.length() = 0; + client_cb.cumulative() = 0; + client.asyncReceive(client_cb.data(), TCPCallback::MIN_SIZE , + client_cb.cumulative(), &client_remote_endpoint, + client_cb); + + // Run the callbacks. Several options are possible depending on how ASIO + // is implemented and whether the message gets fragmented: + // + // 1) The send handler may complete immediately, regardess of whether the + // data has been read by the client. (This is the most likely.) + // 2) The send handler may only run after all the data has been read by + // the client. (This could happen if the client's TCP buffers were too + // small so the data was not transferred to the "remote" system until the + // remote buffer has been emptied one or more times.) + // 3) The client handler may be run a number of times to handle the message + // fragments and the server handler may run between calls of the client + // handler. + // + // So loop, running one handler at a time until we are certain that all the + // handlers have run. + + bool server_complete = false; + bool client_complete = false; + while (!server_complete || !client_complete) { + service.run_one(); + + // Has the server run? + if (!server_complete) { + if (server_cb.called() == server_cb.queued()) { + + // Yes. Check that the send completed successfully and that + // all the data that was expected to have been sent was in fact + // sent. + EXPECT_EQ(0, server_cb.getCode()); + EXPECT_EQ((sizeof(INBOUND_DATA) + 2), server_cb.length()); + server_complete = true; + continue; + } + } + + if (!client_complete) { + + // Client callback must have run. Check that it ran OK. + EXPECT_EQ(TCPCallback::READ, client_cb.called()); + EXPECT_EQ(0, client_cb.getCode()); + + // Update length of data received. + client_cb.cumulative() += client_cb.length(); + if (client_cb.cumulative() > 2) { + + // Have at least the message length field, check if we have the + // entire message. (If we don't have the length field, the data + // is not complete.) + client_complete = ((readUint16(client_cb.data()) + 2) == + client_cb.cumulative()); + } + + // If the data is not complete, queue another read. + if (! client_complete) { + client_cb.called() = TCPCallback::NONE; + client_cb.queued() = TCPCallback::READ; + client_cb.length() = 0; + client.asyncReceive(client_cb.data(), TCPCallback::MIN_SIZE , + client_cb.cumulative(), &client_remote_endpoint, + client_cb); + } + } + } - // ... and check data received by server is what we expect - EXPECT_TRUE(equal(&server_data[2], &server_data[server_cb.getLength() - 1], - OUTBOUND_DATA)); + // Both the send and the receive have comnpleted. Check that the received + // is what was sent. - // TODO: Complete this server test - // TODO: Add in loop for server to read data - read 2 bytes, then as much as needed - - /* - // Now return data from the server to the client. Issue the read on the - // client. - client_cb.setCalled(TCPCallback::NONE); - client_cb.setQueued(TCPCallback::READ); - client_cb.setCode(143); // Arbitrary number - client_cb.setLength(0); - client.asyncReceive(OUTBOUND_DATA, sizeof(OUTBOUND_DATA), &server_endpoint, client_cb); - - client_cb.setLength(12345); // Arbitrary number - client_cb.setCalled(false); - client_cb.setCode(32); // Arbitrary number - client.asyncReceive(data, sizeof(data), client_cumulative, - &client_remote_endpoint, client_cb); - - // Issue the write on the server side to the source of the data it received. - server_cb.setLength(22345); // Arbitrary number - server_cb.setCalled(false); - server_cb.setCode(232); // Arbitrary number - server.async_send_to(buffer(INBOUND_DATA, sizeof(INBOUND_DATA)), - server_remote_endpoint.getASIOEndpoint(), server_cb); - - // Expect two callbacks to run - service.get_io_service().poll(); - //service.run_one(); - - EXPECT_TRUE(client_cb.getCalled()); + // Check the client state + EXPECT_EQ(TCPCallback::READ, client_cb.called()); EXPECT_EQ(0, client_cb.getCode()); - EXPECT_EQ(sizeof(INBOUND_DATA), client_cb.getLength()); + EXPECT_EQ(sizeof(INBOUND_DATA) + 2, client_cb.cumulative()); - EXPECT_TRUE(server_cb.getCalled()); + // ... and check what the server sent. + EXPECT_EQ(TCPCallback::WRITE, server_cb.called()); EXPECT_EQ(0, server_cb.getCode()); - EXPECT_EQ(sizeof(INBOUND_DATA), server_cb.getLength()); + EXPECT_EQ(sizeof(INBOUND_DATA) + 2, server_cb.length()); - EXPECT_TRUE(equal(&data[0], &data[server_cb.getLength() - 1], INBOUND_DATA)); - - // Check that the address/port received by the client corresponds to the - // address and port the server is listening on. - EXPECT_TRUE(server_address == client_remote_endpoint.getAddress()); - EXPECT_EQ(SERVER_PORT, client_remote_endpoint.getPort()); - - // Finally, check that the receive received a complete buffer's worth of data. - EXPECT_TRUE(client.receiveComplete(&data[0], client_cb.getLength(), - client_cumulative)); - EXPECT_EQ(client_cb.getLength(), client_cumulative); + // ... and that what was sent is what was received. + EXPECT_TRUE(equal(INBOUND_DATA, + (INBOUND_DATA + (sizeof(INBOUND_DATA) - 1)), + (client_cb.data() + 2))); // Close client and server. EXPECT_NO_THROW(client.close()); - EXPECT_NO_THROW(server.close()); - * */ -} + EXPECT_NO_THROW(server_socket.close()); +} \ No newline at end of file diff --git a/src/lib/asiolink/tests/udp_socket_unittest.cc b/src/lib/asiolink/tests/udp_socket_unittest.cc index 7b81a6205..aa46498a9 100644 --- a/src/lib/asiolink/tests/udp_socket_unittest.cc +++ b/src/lib/asiolink/tests/udp_socket_unittest.cc @@ -263,8 +263,8 @@ TEST(UDPSocket, SequenceTest) { EXPECT_EQ(SERVER_PORT, client_remote_endpoint.getPort()); // Finally, check that the receive received a complete buffer's worth of data. - EXPECT_TRUE(client.receiveComplete(&data[0], client_cb.getLength(), - client_cumulative)); + client_cumulative += client_cb.getLength(); + EXPECT_TRUE(client.receiveComplete(&data[0], client_cumulative)); EXPECT_EQ(client_cb.getLength(), client_cumulative); // Close client and server. diff --git a/src/lib/asiolink/udp_socket.h b/src/lib/asiolink/udp_socket.h index 0df6fba17..1a016d2d9 100644 --- a/src/lib/asiolink/udp_socket.h +++ b/src/lib/asiolink/udp_socket.h @@ -53,15 +53,15 @@ public: /// \brief Constructor from an ASIO UDP socket. /// - /// \param socket The ASIO representation of the UDP socket. It - /// is assumed that the caller will open and close the socket, so - /// these operations are a no-op for that socket. + /// \param socket The ASIO representation of the UDP socket. It is assumed + /// that the caller will open and close the socket, so these + /// operations are a no-op for that socket. UDPSocket(asio::ip::udp::socket& socket); /// \brief Constructor /// /// Used when the UDPSocket is being asked to manage its own internal - /// socket. It is assumed that open() and close() will not be used. + /// socket. In this case, the open() and close() methods are used. /// /// \param service I/O Service object used to manage the socket. UDPSocket(IOService& service); @@ -90,9 +90,11 @@ public: /// /// Opens the UDP socket. This is a synchronous operation. /// - /// \param endpoint Endpoint to which the socket will connect to. - /// \param callback Unused. - virtual void open(const IOEndpoint* endpoint, C&); + /// \param endpoint Endpoint to which the socket will send data. This is + /// used to determine the address family trhat should be used for the + /// underlying socket. + /// \param callback Unused as the operation is synchronous. + virtual void open(const IOEndpoint* endpoint, C& callback); /// \brief Send Asynchronously /// @@ -110,8 +112,8 @@ public: /// \brief Receive Asynchronously /// /// Calls the underlying socket's async_receive_from() method to read a - /// packet of data from a remote endpoint. Arrival of the data is - /// signalled via a call to the callback function. + /// packet of data from a remote endpoint. Arrival of the data is signalled + /// via a call to the callback function. /// /// \param data Buffer to receive incoming message /// \param length Length of the data buffer @@ -123,19 +125,15 @@ public: /// \brief Checks if the data received is complete. /// - /// As all the data is received in one I/O, so this is, this is effectively - /// a no-op (although it does update the amount of data received). + /// For a UDP socket all the data is received in one I/O, so this is + /// effectively a no-op (although it does update the amount of data + /// received). /// - /// \param data Data buffer containing data to date. (This is ignored - /// for UDP receives.) - /// \param length Amount of data received in last asynchronous I/O - /// \param cumulative On input, amount of data received before the last - /// I/O. On output, the total amount of data received to date. + /// \param data Data buffer containing data to date (ignored) + /// \param length Amount of data in the buffer. /// - /// \return true if the receive is complete, false if another receive is - /// needed. Always true for a UDP socket. - virtual bool receiveComplete(void*, size_t length, size_t& cumulative) { - cumulative = length; + /// \return Always true + virtual bool receiveComplete(const void*, size_t) { return (true); } @@ -185,10 +183,11 @@ UDPSocket::~UDPSocket() template void UDPSocket::open(const IOEndpoint* endpoint, C&) { - // Ignore opens on already-open socket. Don't throw a failure because - // of uncertainties as to what precedes whan when using asynchronous I/O. - // At also allows us a treat a passed-in socket as a self-managed socket. - + // Ignore opens on already-open socket. (Don't throw a failure because + // of uncertainties as to what precedes whan when using asynchronous I/O.) + // It also allows us a treat a passed-in socket in exactly the same way as + // a self-managed socket (in that we can call the open() and close() methods + // of this class). if (!isopen_) { if (endpoint->getFamily() == AF_INET) { socket_.open(asio::ip::udp::v4()); @@ -198,8 +197,7 @@ UDPSocket::open(const IOEndpoint* endpoint, C&) { } isopen_ = true; - // Ensure it can send and receive 4K buffers. - + // Ensure it can send and receive at least 4K buffers. asio::ip::udp::socket::send_buffer_size snd_size; socket_.get_option(snd_size); if (snd_size.value() < MIN_SIZE) { @@ -227,13 +225,14 @@ UDPSocket::asyncSend(const void* data, size_t length, // Upconvert to a UDPEndpoint. We need to do this because although // IOEndpoint is the base class of UDPEndpoint and TCPEndpoint, it - // doing cont contain a method for getting at the underlying endpoint - // type - those are in the derived class and the two classes differ on + // does not contain a method for getting at the underlying endpoint + // type - that is in the derived class and the two classes differ on // return type. - assert(endpoint->getProtocol() == IPPROTO_UDP); const UDPEndpoint* udp_endpoint = static_cast(endpoint); + + // ... and send the message. socket_.async_send_to(asio::buffer(data, length), udp_endpoint->getASIOEndpoint(), callback); } else { @@ -242,10 +241,8 @@ UDPSocket::asyncSend(const void* data, size_t length, } } -// Receive a message. Note that the "cumulative" argument is ignored - every UDP -// receive is put into the buffer beginning at the start - there is no concept -// receiving a subsequent part of a message. Same critera as before concerning -// the need for the socket to be open. +// Receive a message. Should never do this if the socket is not open, so throw +// an exception if this is the case. template void UDPSocket::asyncReceive(void* data, size_t length, size_t offset, -- GitLab