Commit a07e3014 authored by Francis Dupont's avatar Francis Dupont

[4031] Cleaned up asiolink (tentative)

parent 6c1ef544
......@@ -24,15 +24,12 @@ libkea_asiolink_la_SOURCES += io_address.cc io_address.h
libkea_asiolink_la_SOURCES += io_asio_socket.h
libkea_asiolink_la_SOURCES += io_endpoint.cc io_endpoint.h
libkea_asiolink_la_SOURCES += io_error.h
libkea_asiolink_la_SOURCES += io_message.h
libkea_asiolink_la_SOURCES += io_service.h io_service.cc
libkea_asiolink_la_SOURCES += io_socket.h io_socket.cc
libkea_asiolink_la_SOURCES += simple_callback.h
libkea_asiolink_la_SOURCES += tcp_endpoint.h
libkea_asiolink_la_SOURCES += tcp_socket.h
libkea_asiolink_la_SOURCES += udp_endpoint.h
libkea_asiolink_la_SOURCES += udp_socket.h
libkea_asiolink_la_SOURCES += local_socket.h local_socket.cc
# Note: the ordering matters: -Wno-... must follow -Wextra (defined in
# KEA_CXXFLAGS)
......
......@@ -20,12 +20,10 @@
// See the description of the namespace below.
#include <asiolink/io_service.h>
#include <asiolink/simple_callback.h>
#include <asiolink/interval_timer.h>
#include <asiolink/io_address.h>
#include <asiolink/io_endpoint.h>
#include <asiolink/io_message.h>
#include <asiolink/io_socket.h>
#include <asiolink/io_error.h>
......
// Copyright (C) 2010 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#ifndef IO_MESSAGE_H
#define IO_MESSAGE_H 1
// IMPORTANT NOTE: only very few ASIO headers files can be included in
// this file. In particular, asio.hpp should never be included here.
// See the description of the namespace below.
#include <unistd.h> // for some network system calls
#include <functional>
#include <string>
#include <exceptions/exceptions.h>
#include <asiolink/io_endpoint.h>
#include <asiolink/io_socket.h>
namespace isc {
namespace asiolink {
/// \brief The \c IOMessage class encapsulates an incoming message received
/// on a socket.
///
/// An \c IOMessage object represents a tuple of a chunk of data
/// (a UDP packet or some segment of TCP stream), the socket over which the
/// data is passed, the information about the other end point of the
/// communication, and perhaps more.
///
/// The current design and interfaces of this class is tentative.
/// It only provides a minimal level of support that is necessary for
/// the current implementation of the authoritative server.
/// A future version of this class will definitely support more.
class IOMessage {
///
/// \name Constructors and Destructor
///
/// Note: The copy constructor and the assignment operator are
/// intentionally defined as private, making this class non-copyable.
//@{
private:
IOMessage(const IOMessage& source);
IOMessage& operator=(const IOMessage& source);
public:
/// \brief Constructor from message data
///
/// This constructor needs to handle the ASIO \c ip::address class,
/// and is intended to be used within this wrapper implementation.
/// Once the \c IOMessage object is created, the application can
/// get access to the information via the wrapper interface such as
/// \c getRemoteAddress().
///
/// This constructor never throws an exception.
///
/// \param data A pointer to the message data.
/// \param data_size The size of the message data in bytes.
/// \param io_socket The socket over which the data is given.
/// \param remote_endpoint The other endpoint of the socket, that is,
/// the sender of the message.
IOMessage(const void* data, const size_t data_size,
const IOSocket& io_socket, const IOEndpoint& remote_endpoint) :
data_(data), data_size_(data_size), io_socket_(io_socket),
remote_endpoint_(remote_endpoint)
{}
//@}
/// \brief Returns a pointer to the received data.
const void* getData() const { return (data_); }
/// \brief Returns the size of the received data in bytes.
size_t getDataSize() const { return (data_size_); }
/// \brief Returns the socket on which the message arrives.
const IOSocket& getSocket() const { return (io_socket_); }
/// \brief Returns the endpoint that sends the message.
const IOEndpoint& getRemoteEndpoint() const { return (remote_endpoint_); }
private:
const void* data_;
const size_t data_size_;
const IOSocket& io_socket_;
const IOEndpoint& remote_endpoint_;
};
} // namespace asiolink
} // namespace isc
#endif // IO_MESSAGE_H
// Copyright (C) 2013 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#include <asiolink/local_socket.h>
#include <asiolink/io_service.h>
#include <asiolink/io_error.h>
#include <asio.hpp>
#include <boost/bind.hpp>
#include <string>
#include <sys/socket.h>
namespace isc {
namespace asiolink {
class LocalSocket::Impl {
public:
Impl(IOService& io_service, int fd) :
asio_sock_(io_service.get_io_service(),
asio::local::stream_protocol(), fd)
{
// Depending on the underlying demultiplex API, the constructor may or
// may not throw in case fd is invalid. To catch such cases sooner,
// we try to get the local endpoint (we don't need it in the rest of
// this implementation).
asio_sock_.local_endpoint(ec_);
if (ec_) {
isc_throw(IOError, "failed to open local socket with FD " << fd
<< " (local endpoint unknown): " << ec_.message());
}
}
asio::local::stream_protocol::socket asio_sock_;
asio::error_code ec_;
};
LocalSocket::LocalSocket(IOService& io_service, int fd) :
impl_(NULL)
{
try {
impl_ = new Impl(io_service, fd);
} catch (const asio::system_error& error) {
// Catch and convert any exception from asio's constructor
isc_throw(IOError, "failed to open local socket with FD " << fd
<< ": " << error.what());
}
}
LocalSocket::~LocalSocket() {
delete impl_;
}
int
LocalSocket::getNative() const {
return (impl_->asio_sock_.native());
}
int
LocalSocket::getProtocol() const {
return (AF_UNIX);
}
namespace {
// Wrapper callback for async_read that simply adjusts asio-native parameters
// for the LocalSocket interface. Note that this is a free function and
// doesn't rely on internal member variables of LocalSocket.
// So it can be called safely even after the LocalSocket object on which
// asyncRead() was called is destroyed.
void
readCompleted(const asio::error_code& ec,
LocalSocket::ReadCallback user_callback)
{
// assumption check: we pass non empty string iff ec indicates an error.
const std::string err_msg = ec ? ec.message() : std::string();
assert(ec || err_msg.empty());
user_callback(err_msg);
}
}
void
LocalSocket::asyncRead(const ReadCallback& callback, void* buf,
size_t buflen)
{
asio::async_read(impl_->asio_sock_, asio::buffer(buf, buflen),
boost::bind(readCompleted, _1, callback));
}
} // namespace asiolink
} // namespace isc
// Copyright (C) 2013 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#ifndef LOCAL_SOCKET_H
#define LOCAL_SOCKET_H 1
#include <asiolink/io_socket.h>
#include <asiolink/io_service.h>
#include <boost/noncopyable.hpp>
namespace isc {
namespace asiolink {
/// \brief A wrapper for ASIO stream socket in the local (AF_UNIX) domain.
///
/// This class provides a simple, limited set of wrapper interfaces to an
/// ASIO stream socket object in the local domain. Unlike other concrete
/// derived classes of \c IOSocket, this class is intended to be instantiated
/// directly. Right now it only provides read interface due to the limited
/// expected usage, but it can be extended as we see need for other operations
/// on this socket.
///
/// Note that in the initial implementation there's even no stop() or cancel()
/// method; for these cases users are expected to just destroy the socket
/// object (this may be extended in future, too).
class LocalSocket : boost::noncopyable, public IOSocket {
public:
/// \brief Constructor from a native file descriptor of AF_UNIX stream
/// socket.
///
/// Parameter \c fd must be an open stream-type socket of the AF_UNIX
/// domain. The constructor tries to detect some invalid cases, but
/// it may not reject all invalid cases. It's generally the
/// responsibility of the caller.
///
/// \throw IOError Failed to create the socket object, most likely because
/// the given file descriptor is invalid.
///
/// \param io_service The IO service object to handle events on this
/// socket.
/// \param fd File descriptor of an AF_UNIX socket.
LocalSocket(IOService& io_service, int fd);
/// \brief Destructor.
///
/// \throw None.
virtual ~LocalSocket();
/// \brief Local socket version of getNative().
///
/// \throw None.
virtual int getNative() const;
/// \brief Local socket version of getProtocol().
///
/// It always returns \c AF_UNIX.
///
/// \throw None.
virtual int getProtocol() const;
/// \brief The callback functor for the \c asyncRead method.
///
/// The callback takes one parameter, \c error. It will be set to
/// non empty string if read operation fails and the string explains
/// the reason for the failure. On success \c error will be empty.
typedef boost::function<void(const std::string& error)> ReadCallback;
/// \brief Start asynchronous read on the socket.
///
/// This method registers an interest on a new read event on the local
/// socket for the specified length of data (\c buflen bytes). This
/// method returns immediately. When the specified amount of data
/// are available for read from the socket or an error happens, the
/// specified callback will be called. In the former case the data are
/// copied into the given buffer (pointed to by \c buf); in the latter
/// case, the \c error parameter of the callback will be set to a non
/// empty string.
///
/// In the case of error, this socket should be considered
/// unusable anymore, because this class doesn't provide a feasible way
/// to identify where in the input stream to restart reading. So,
/// in practice, the user of this socket should destroy this socket,
/// and, if necessary to continue, create a new one.
///
/// \c buf must point to a memory region that has at least \c buflen
/// bytes of valid space. That region must be kept valid until the
/// callback is called or the \c IOService passed to the constructor
/// is stopped. This method and class do not check these conditions;
/// it's the caller's responsibility to guarantee them.
///
/// \note If asyncRead() has been called and hasn't been completed (with
/// the callback being called), it's possible that the callback is called
/// even after the \c LocalSocket object is destroyed. So the caller
/// has to make sure that either \c LocalSocket is valid until the
/// callback is called or the callback does not depend on \c LocalSocket;
/// alternatively, the caller can stop the \c IOService. This will make
/// sure the callback will not be called regardless of when and how
/// the \c LocalSocket is destroyed.
///
/// \throw None.
///
/// \brief callback The callback functor to be called on the completion
/// of read.
/// \brief buf Buffer to read in data from the socket.
/// \brief buflen Length of data to read.
void asyncRead(const ReadCallback& callback, void* buf, size_t buflen);
private:
class Impl;
Impl* impl_;
};
} // namespace asiolink
} // namespace isc
#endif // LOCAL_SOCKET_H
// Local Variables:
// mode: c++
// End:
// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#ifndef ASIOLINK_SIMPLE_CALLBACK_H
#define ASIOLINK_SIMPLE_CALLBACK_H 1
#include <asiolink/io_message.h>
namespace isc {
namespace asiolink {
/// \brief The \c SimpleCallback class is an abstract base class for a
/// simple callback function with the signature:
///
/// void simpleCallback(const IOMessage& io_message) const;
///
/// Specific derived class implementations are hidden within the
/// implementation. Instances of the derived classes can be called
/// as functions via the operator() interface. Pointers to these
/// instances can then be provided to the \c IOService class
/// via its constructor.
///
/// The \c SimpleCallback is expected to be used for basic, generic
/// tasks such as checking for configuration changes. It may also be
/// used for testing purposes.
class SimpleCallback {
///
/// \name Constructors and Destructor
///
/// Note: The copy constructor and the assignment operator are
/// intentionally defined as private, making this class non-copyable.
//@{
private:
SimpleCallback(const SimpleCallback& source);
SimpleCallback& operator=(const SimpleCallback& source);
protected:
/// \brief The default constructor.
///
/// This is intentionally defined as \c protected as this base class
/// should never be instantiated (except as part of a derived class).
SimpleCallback() {
self_ = this;
}
public:
/// \brief The destructor
virtual ~SimpleCallback() {}
/// \brief The function operator
//@}
///
/// This makes its call indirectly via the "self" pointer, ensuring
/// that the function ultimately invoked will be the one in the derived
/// class.
///
/// \param io_message The event message to handle
virtual void operator()(const IOMessage& io_message) const {
(*self_)(io_message);
}
private:
SimpleCallback* self_;
};
} // namespace asiolink
} // namespace isc
#endif // ASIOLINK_SIMPLE_CALLBACK_H
......@@ -34,7 +34,6 @@ run_unittests_SOURCES += tcp_socket_unittest.cc
run_unittests_SOURCES += udp_endpoint_unittest.cc
run_unittests_SOURCES += udp_socket_unittest.cc
run_unittests_SOURCES += io_service_unittest.cc
run_unittests_SOURCES += local_socket_unittest.cc
run_unittests_SOURCES += dummy_io_callback_unittest.cc
run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES)
......
// Copyright (C) 2013 Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.
#include <asiolink/local_socket.h>
#include <asiolink/io_error.h>
#include <gtest/gtest.h>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/bind.hpp>
#include <csignal>
#include <cstring>
#include <vector>
#include <sys/socket.h>
#include <stdint.h>
#include <unistd.h> // for alarm(3)
using namespace isc::asiolink;
namespace {
// duration (in seconds) until we break possible hangup; value is an
// arbitrary choice.
const unsigned IO_TIMEOUT = 10;
// A simple RAII wrapper for a file descriptor so test sockets are safely
// closed in each test.
class ScopedSocket : boost::noncopyable {
public:
ScopedSocket() : fd_(-1) {}
~ScopedSocket() {
if (fd_ >= 0) {
EXPECT_EQ(0, ::close(fd_));
}
}
void set(int fd) {
assert(fd_ == -1);
fd_ = fd;
}
int get() { return (fd_); }
int release() {
const int ret = fd_;
fd_ = -1;
return (ret);
}
private:
int fd_;
};
class LocalSocketTest : public ::testing::Test {
protected:
LocalSocketTest() {
int sock_pair[2];
EXPECT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, sock_pair));
sock_pair_[0].set(sock_pair[0]);
sock_pair_[1].set(sock_pair[1]);
// For tests using actual I/O we use a timer to prevent hangup
// due to a bug. Set up the signal handler for the timer here.
g_io_service_ = &io_service_;
prev_handler_ = std::signal(SIGALRM, stopIOService);
}
~LocalSocketTest() {
alarm(0);
// reset the global to NULL to detect any invalid access to freed
// io_service (this shouldn't happen, so we don't change stopIOService
// itself)
g_io_service_ = NULL;
std::signal(SIGALRM, prev_handler_);
}
// Common set of tests for async read
void checkAsyncRead(size_t data_len);
IOService io_service_;
ScopedSocket sock_pair_[2];
std::vector<uint8_t> read_buf_;
private:
static IOService* g_io_service_; // will be set to &io_service_
void (*prev_handler_)(int);
// SIGALRM handler to prevent hangup. This must be a static method
// so it can be passed to std::signal().
static void stopIOService(int) {
g_io_service_->stop();
}
};
IOService* LocalSocketTest::g_io_service_ = NULL;
TEST_F(LocalSocketTest, construct) {
const int fd = sock_pair_[0].release();
LocalSocket sock(io_service_, fd);
EXPECT_EQ(fd, sock.getNative());
EXPECT_EQ(AF_UNIX, sock.getProtocol());
}
TEST_F(LocalSocketTest, constructError) {
// try to construct a LocalSocket object with a closed socket. It should
// fail.
const int fd = sock_pair_[0].release();
EXPECT_EQ(0, close(fd));
EXPECT_THROW(LocalSocket(io_service_, fd), IOError);
}
TEST_F(LocalSocketTest, autoClose) {
// Confirm that passed FD will be closed on destruction of LocalSocket
const int fd = sock_pair_[0].release();
{
LocalSocket sock(io_service_, fd);
}
// fd should have been closed, so close() should fail (we assume there's
// no other open() call since then)
EXPECT_EQ(-1, ::close(fd));
}
void
callback(const std::string& error, IOService* io_service, bool* called,
bool expect_error)
{
if (expect_error) {
EXPECT_NE("", error);
} else {
EXPECT_EQ("", error);
}
*called = true;
io_service->stop();
}
void
LocalSocketTest::checkAsyncRead(size_t data_len) {
LocalSocket sock(io_service_, sock_pair_[0].release());
bool callback_called = false;
read_buf_.resize(data_len);
sock.asyncRead(boost::bind(&callback, _1, &io_service_, &callback_called,
false), &read_buf_[0], data_len);
std::vector<uint8_t> expected_data(data_len);
for (size_t i = 0; i < data_len; ++i) {
expected_data[i] = i & 0xff;
}
alarm(IO_TIMEOUT);
// If write blocks, it will eventually fail due to signal interruption.
// Since io_service has been stopped already, run() would immediately
// return and test should complete (with failure). But to make very sure
// it never cause hangup we rather return from the test at the point of
// failure of write. In either case it signals a failure and need for
// a fix.
ASSERT_EQ(data_len, write(sock_pair_[1].get(), &expected_data[0],
data_len));
io_service_.run();
EXPECT_TRUE(callback_called);
EXPECT_EQ(0, std::memcmp(&expected_data[0], &read_buf_[0], data_len));
}
TEST_F(LocalSocketTest, asyncRead) {
// A simple case of asynchronous read: wait for 1 byte and successfully
// read it in the run() loop.
checkAsyncRead(1);
}
TEST_F(LocalSocketTest, asyncLargeRead) {
// Similar to the previous case, but for moderately larger data.
// (for the moment) we don't expect to use this interface with much
// larger data that could cause blocking write.
checkAsyncRead(1024);
}
TEST_F(LocalSocketTest, asyncPartialRead) {
alarm(IO_TIMEOUT);
// specify reading 4 bytes of data, and send 3 bytes. It shouldn't cause
// callback. while we actually don't use the buffer, we'll initialize it
// to make valgrind happy.
char recv_buf[4];
std::memset(recv_buf, 0, sizeof(recv_buf));
bool callback_called = false;
LocalSocket sock(io_service_, sock_pair_[0].release());
sock.asyncRead(boost::bind(&callback, _1, &io_service_, &callback_called,
false), recv_buf, sizeof(recv_buf));
EXPECT_EQ(3, write(sock_pair_[1].get(), recv_buf, 3));
// open another pair of sockets so we can stop the IO service after run.
int socks[2];
char ch = 0;
EXPECT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, socks));
ScopedSocket aux_sockpair[2];
aux_sockpair[0].set(socks[0]);
aux_sockpair[1].set(socks[1]);
LocalSocket aux_sock(io_service_, aux_sockpair[0].get());
aux_sockpair[0].release(); // on successful construction we should release
bool aux_callback_called = false;
aux_sock.asyncRead(boost::bind(&callback, _1, &io_service_,
&aux_callback_called, false), &ch, 1);
EXPECT_EQ(1, write(aux_sockpair[1].get(), &ch, 1));
// run the IO service, it will soon be stopped via the auxiliary callback.
// the main callback shouldn't be called.
io_service_.run();
EXPECT_FALSE(callback_called);
EXPECT_TRUE(aux_callback_called);
}
TEST_F(LocalSocketTest, asyncReadError) {
const int sock_fd = sock_pair_[0].release();
LocalSocket sock(io_service_, sock_fd);
bool callback_called = false;
read_buf_.resize(1);
read_buf_.at(0) = 53; // dummy data to check it later
const char ch = 35; // send different data to the read socket with data
EXPECT_EQ(1, write(sock_pair_[1].get(), &ch, 1));
close(sock_fd); // invalidate the read socket
// we'll get callback with an error (e.g. 'bad file descriptor)
sock.asyncRead(boost::bind(&callback, _1, &io_service_, &callback_called,
true), &read_buf_[0], 1);
io_service_.run();
EXPECT_TRUE(callback_called);