Commit 7e8df799 authored by Marcin Siodelski's avatar Marcin Siodelski
Browse files

[master] Merge branch 'trac5099'

parents b4de5ae3 2835b3c5
// Copyright (C) 2011-2016 Internet Systems Consortium, Inc. ("ISC")
// Copyright (C) 2011-2017 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
......@@ -110,9 +110,24 @@ public:
/// \param endpoint Target of the send. (Unused for a TCP socket because
/// that was determined when the connection was opened.)
/// \param callback Callback object.
/// \throw BufferTooLarge on attempt to send a buffer larger than 64kB.
virtual void asyncSend(const void* data, size_t length,
const IOEndpoint* endpoint, C& callback);
/// \brief Send Asynchronously without count.
///
/// This variant of the method sends data over the TCP socket without
/// preceding the data with a data count. Eventually, we should migrate
/// the virtual method to not insert the count but there are existing
/// classes using the count. Once this migration is done, the existing
/// virtual method should be replaced by this method.
///
/// \param data Data to send
/// \param length Length of data to send
/// \param callback Callback object.
/// \throw BufferTooLarge on attempt to send a buffer larger than 64kB.
void asyncSend(const void* data, size_t length, C& callback);
/// \brief Receive Asynchronously
///
/// Calls the underlying socket's async_receive() method to read a packet
......@@ -166,7 +181,6 @@ private:
// construction, or where it is asked to manage its own socket.
boost::asio::ip::tcp::socket* socket_ptr_; ///< Pointer to own socket
boost::asio::ip::tcp::socket& socket_; ///< Socket
bool isopen_; ///< true when socket is open
// TODO: Remove temporary buffer
// The current implementation copies the buffer passed to asyncSend() into
......@@ -188,7 +202,7 @@ private:
template <typename C>
TCPSocket<C>::TCPSocket(boost::asio::ip::tcp::socket& socket) :
socket_ptr_(NULL), socket_(socket), isopen_(true), send_buffer_()
socket_ptr_(NULL), socket_(socket), send_buffer_()
{
}
......@@ -197,7 +211,7 @@ TCPSocket<C>::TCPSocket(boost::asio::ip::tcp::socket& socket) :
template <typename C>
TCPSocket<C>::TCPSocket(IOService& service) :
socket_ptr_(new boost::asio::ip::tcp::socket(service.get_io_service())),
socket_(*socket_ptr_), isopen_(false)
socket_(*socket_ptr_)
{
}
......@@ -217,14 +231,13 @@ TCPSocket<C>::open(const IOEndpoint* endpoint, C& callback) {
// Ignore opens on already-open socket. Don't throw a failure because
// of uncertainties as to what precedes whan when using asynchronous I/O.
// At also allows us a treat a passed-in socket as a self-managed socket.
if (!isopen_) {
if (!socket_.is_open()) {
if (endpoint->getFamily() == AF_INET) {
socket_.open(boost::asio::ip::tcp::v4());
}
else {
socket_.open(boost::asio::ip::tcp::v6());
}
isopen_ = true;
// Set options on the socket:
......@@ -250,11 +263,35 @@ TCPSocket<C>::open(const IOEndpoint* endpoint, C& callback) {
// Send a message. Should never do this if the socket is not open, so throw
// an exception if this is the case.
template <typename C> void
TCPSocket<C>::asyncSend(const void* data, size_t length, C& callback)
{
if (socket_.is_open()) {
try {
send_buffer_.reset(new isc::util::OutputBuffer(length));
send_buffer_->writeData(data, length);
// Send the data.
socket_.async_send(boost::asio::buffer(send_buffer_->getData(),
send_buffer_->getLength()),
callback);
} catch (boost::numeric::bad_numeric_cast&) {
isc_throw(BufferTooLarge,
"attempt to send buffer larger than 64kB");
}
} else {
isc_throw(SocketNotOpen,
"attempt to send on a TCP socket that is not open");
}
}
template <typename C> void
TCPSocket<C>::asyncSend(const void* data, size_t length,
const IOEndpoint*, C& callback)
{
if (isopen_) {
if (socket_.is_open()) {
// Need to copy the data into a temporary buffer and precede it with
// a two-byte count field.
......@@ -289,7 +326,7 @@ template <typename C> void
TCPSocket<C>::asyncReceive(void* data, size_t length, size_t offset,
IOEndpoint* endpoint, C& callback)
{
if (isopen_) {
if (socket_.is_open()) {
// Upconvert to a TCPEndpoint. We need to do this because although
// IOEndpoint is the base class of UDPEndpoint and TCPEndpoint, it
// does not contain a method for getting at the underlying endpoint
......@@ -391,7 +428,7 @@ TCPSocket<C>::processReceivedData(const void* staging, size_t length,
template <typename C> void
TCPSocket<C>::cancel() {
if (isopen_) {
if (socket_.is_open()) {
socket_.cancel();
}
}
......@@ -401,9 +438,8 @@ TCPSocket<C>::cancel() {
template <typename C> void
TCPSocket<C>::close() {
if (isopen_ && socket_ptr_) {
if (socket_.is_open() && socket_ptr_) {
socket_.close();
isopen_ = false;
}
}
......
......@@ -22,10 +22,14 @@ EXTRA_DIST = http_messages.mes
CLEANFILES = *.gcno *.gcda http_messages.h http_messages.cc s-messages
lib_LTLIBRARIES = libkea-http.la
libkea_http_la_SOURCES = date_time.cc date_time.h
libkea_http_la_SOURCES = connection.cc connection.h
libkea_http_la_SOURCES += connection_pool.cc connection_pool.h
libkea_http_la_SOURCES += date_time.cc date_time.h
libkea_http_la_SOURCES += http_log.cc http_log.h
libkea_http_la_SOURCES += header_context.h
libkea_http_la_SOURCES += http_acceptor.h
libkea_http_la_SOURCES += http_types.h
libkea_http_la_SOURCES += listener.cc listener.h
libkea_http_la_SOURCES += post_request.cc post_request.h
libkea_http_la_SOURCES += post_request_json.cc post_request_json.h
libkea_http_la_SOURCES += request.cc request.h
......@@ -33,6 +37,7 @@ libkea_http_la_SOURCES += request_context.h
libkea_http_la_SOURCES += request_parser.cc request_parser.h
libkea_http_la_SOURCES += response.cc response.h
libkea_http_la_SOURCES += response_creator.cc response_creator.h
libkea_http_la_SOURCES += response_creator_factory.h
libkea_http_la_SOURCES += response_json.cc response_json.h
nodist_libkea_http_la_SOURCES = http_messages.cc http_messages.h
......
// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <asiolink/asio_wrapper.h>
#include <http/connection.h>
#include <http/connection_pool.h>
#include <boost/bind.hpp>
#include <iostream>
using namespace isc::asiolink;
namespace isc {
namespace http {
void
HttpConnection::
SocketCallback::operator()(boost::system::error_code ec, size_t length) {
if (ec.value() == boost::asio::error::operation_aborted) {
return;
}
callback_(ec, length);
}
HttpConnection:: HttpConnection(asiolink::IOService& io_service,
HttpAcceptor& acceptor,
HttpConnectionPool& connection_pool,
const HttpResponseCreatorPtr& response_creator,
const HttpAcceptorCallback& callback,
const long request_timeout)
: request_timer_(io_service),
request_timeout_(request_timeout),
socket_(io_service),
socket_callback_(boost::bind(&HttpConnection::socketReadCallback, this,
_1, _2)),
socket_write_callback_(boost::bind(&HttpConnection::socketWriteCallback,
this, _1, _2)),
acceptor_(acceptor),
connection_pool_(connection_pool),
response_creator_(response_creator),
request_(response_creator_->createNewHttpRequest()),
parser_(new HttpRequestParser(*request_)),
acceptor_callback_(callback),
buf_() {
parser_->initModel();
}
HttpConnection::~HttpConnection() {
close();
}
void
HttpConnection::close() {
socket_.close();
}
void
HttpConnection::stopThisConnection() {
try {
connection_pool_.stop(shared_from_this());
} catch (...) {
}
}
void
HttpConnection::asyncAccept() {
HttpAcceptorCallback cb = boost::bind(&HttpConnection::acceptorCallback,
this, _1);
try {
acceptor_.asyncAccept(socket_, cb);
} catch (const std::exception& ex) {
isc_throw(HttpConnectionError, "unable to start accepting TCP "
"connections: " << ex.what());
}
}
void
HttpConnection::doRead() {
try {
TCPEndpoint endpoint;
socket_.asyncReceive(static_cast<void*>(buf_.data()), buf_.size(),
0, &endpoint, socket_callback_);
} catch (const std::exception& ex) {
stopThisConnection();
}
}
void
HttpConnection::doWrite() {
try {
if (!output_buf_.empty()) {
socket_.asyncSend(output_buf_.data(),
output_buf_.length(),
socket_write_callback_);
}
} catch (const std::exception& ex) {
stopThisConnection();
}
}
void
HttpConnection::asyncSendResponse(const ConstHttpResponsePtr& response) {
output_buf_ = response->toString();
doWrite();
}
void
HttpConnection::acceptorCallback(const boost::system::error_code& ec) {
if (!acceptor_.isOpen()) {
return;
}
if (ec) {
stopThisConnection();
}
acceptor_callback_(ec);
if (!ec) {
request_timer_.setup(boost::bind(&HttpConnection::requestTimeoutCallback, this),
request_timeout_, IntervalTimer::ONE_SHOT);
doRead();
}
}
void
HttpConnection::socketReadCallback(boost::system::error_code ec, size_t length) {
std::string s(&buf_[0], buf_[0] + length);
parser_->postBuffer(static_cast<void*>(buf_.data()), length);
parser_->poll();
if (parser_->needData()) {
doRead();
} else {
try {
request_->finalize();
} catch (...) {
}
HttpResponsePtr response = response_creator_->createHttpResponse(request_);
asyncSendResponse(response);
}
}
void
HttpConnection::socketWriteCallback(boost::system::error_code ec,
size_t length) {
if (length <= output_buf_.size()) {
output_buf_.erase(0, length);
doWrite();
} else {
output_buf_.clear();
}
}
void
HttpConnection::requestTimeoutCallback() {
HttpResponsePtr response =
response_creator_->createStockHttpResponse(request_,
HttpStatusCode::REQUEST_TIMEOUT);
asyncSendResponse(response);
}
} // end of namespace isc::http
} // end of namespace isc
// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#ifndef HTTP_CONNECTION_H
#define HTTP_CONNECTION_H
#include <asiolink/interval_timer.h>
#include <asiolink/io_service.h>
#include <http/http_acceptor.h>
#include <http/request_parser.h>
#include <http/response_creator_factory.h>
#include <boost/function.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/system/error_code.hpp>
#include <boost/shared_ptr.hpp>
#include <array>
namespace isc {
namespace http {
/// @brief Generic error reported within @ref HttpConnection class.
class HttpConnectionError : public Exception {
public:
HttpConnectionError(const char* file, size_t line, const char* what) :
isc::Exception(file, line, what) { };
};
/// @brief Forward declaration to the @ref HttpConnectionPool.
///
/// This declaration is needed because we don't include the header file
/// declaring @ref HttpConnectionPool to avoid circular inclusion.
class HttpConnectionPool;
class HttpConnection;
/// @brief Pointer to the @ref HttpConnection.
typedef boost::shared_ptr<HttpConnection> HttpConnectionPtr;
/// @brief Accepts and handles a single HTTP connection.
class HttpConnection : public boost::enable_shared_from_this<HttpConnection> {
private:
/// @brief Type of the function implementing a callback invoked by the
/// @c SocketCallback functor.
typedef boost::function<void(boost::system::error_code ec, size_t length)>
SocketCallbackFunction;
/// @brief Functor associated with the socket object.
///
/// This functor calls a callback function specified in the constructor.
class SocketCallback {
public:
/// @brief Constructor.
///
/// @param socket_callback Callback to be invoked by the functor upon
/// an event associated with the socket.
SocketCallback(SocketCallbackFunction socket_callback)
: callback_(socket_callback) {
}
/// @brief Operator called when event associated with a socket occurs.
///
/// This operator returns immediately when received error code is
/// @c boost::system::error_code is equal to
/// @c boost::asio::error::operation_aborted, i.e. the callback is not
/// invoked.
///
/// @param ec Error code.
/// @param length Data length.
void operator()(boost::system::error_code ec, size_t length = 0);
private:
/// @brief Supplied callback.
SocketCallbackFunction callback_;
};
public:
/// @brief Constructor.
///
/// @param io_service IO service to be used by the connection.
/// @param acceptor Reference to the TCP acceptor object used to listen for
/// new HTTP connections.
/// @param connection_pool Connection pool in which this connection is
/// stored.
/// @param response_creator Pointer to the response creator object used to
/// create HTTP response from the HTTP request received.
/// @param callback Callback invoked when new connection is accepted.
/// @param request_timeout Configured timeout for a HTTP request.
HttpConnection(asiolink::IOService& io_service,
HttpAcceptor& acceptor,
HttpConnectionPool& connection_pool,
const HttpResponseCreatorPtr& response_creator,
const HttpAcceptorCallback& callback,
const long request_timeout);
/// @brief Destructor.
///
/// Closes current connection.
~HttpConnection();
/// @brief Asynchronously accepts new connection.
///
/// When the connection is established successfully, the timeout timer is
/// setup and the asynchronous read from the socket is started.
void asyncAccept();
/// @brief Closes the socket.
void close();
/// @brief Starts asynchronous read from the socket.
///
/// The data received over the socket are supplied to the HTTP parser until
/// the parser signals that the entire request has been received or until
/// the parser signals an error. In the former case the server creates an
/// HTTP response using supplied response creator object.
///
/// In case of error the connection is stopped.
void doRead();
private:
/// @brief Starts asynchronous write to the socket.
///
/// The @c output_buf_ must contain the data to be sent.
///
/// In case of error the connection is stopped.
void doWrite();
/// @brief Sends HTTP response asynchronously.
///
/// Internally it calls @ref HttpConnection::doWrite to send the data.
///
/// @param response Pointer to the HTTP response to be sent.
void asyncSendResponse(const ConstHttpResponsePtr& response);
/// @brief Local callback invoked when new connection is accepted.
///
/// It invokes external (supplied via constructor) acceptor callback. If
/// the acceptor is not opened it returns immediately. If the connection
/// is accepted successfully the @ref HttpConnection::doRead is called.
///
/// @param ec Error code.
void acceptorCallback(const boost::system::error_code& ec);
/// @brief Callback invoked when new data is received over the socket.
///
/// This callback supplies the data to the HTTP parser and continues
/// parsing. When the parser signals end of the HTTP request the callback
/// prepares a response and starts asynchronous send over the socket.
///
/// @param ec Error code.
/// @param length Length of the received data.
void socketReadCallback(boost::system::error_code ec,
size_t length);
/// @brief Callback invoked when data is sent over the socket.
///
/// @param ec Error code.
/// @param length Length of the data sent.
void socketWriteCallback(boost::system::error_code ec,
size_t length);
/// @brief Callback invoked when the HTTP Request Timeout occurs.
///
/// This callback creates HTTP response with Request Timeout error code
/// and sends it to the client.
void requestTimeoutCallback();
/// @brief Stops current connection.
void stopThisConnection();
/// @brief Timer used to detect Request Timeout.
asiolink::IntervalTimer request_timer_;
/// @brief Configured Request Timeout in milliseconds.
long request_timeout_;
/// @brief Socket used by this connection.
asiolink::TCPSocket<SocketCallback> socket_;
/// @brief Callback invoked when data received over the socket.
SocketCallback socket_callback_;
/// @brief Callback invoked when data sent over the socket.
SocketCallback socket_write_callback_;
/// @brief Reference to the TCP acceptor used to accept new connections.
HttpAcceptor& acceptor_;
/// @brief Connection pool holding this connection.
HttpConnectionPool& connection_pool_;
/// @brief Pointer to the @ref HttpResponseCreator object used to create
/// HTTP responses.
HttpResponseCreatorPtr response_creator_;
/// @brief Pointer to the request received over this connection.
HttpRequestPtr request_;
/// @brief Pointer to the HTTP request parser.
HttpRequestParserPtr parser_;
/// @brief External TCP acceptor callback.
HttpAcceptorCallback acceptor_callback_;
/// @brief Buffer for received data.
std::array<char, 4096> buf_;
/// @brief Buffer used for outbound data.
std::string output_buf_;
};
} // end of namespace isc::http
} // end of namespace isc
#endif
// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <asiolink/asio_wrapper.h>
#include <http/connection_pool.h>
namespace isc {
namespace http {
void
HttpConnectionPool::start(const HttpConnectionPtr& connection) {
connections_.insert(connections_.end(), connection);
connection->asyncAccept();
}
void
HttpConnectionPool::stop(const HttpConnectionPtr& connection) {
connections_.remove(connection);
}
void
HttpConnectionPool::stopAll() {
for (auto connection = connections_.begin();
connection != connections_.end();
++connection) {
(*connection)->close();
}
connections_.clear();
}
}
}
// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#ifndef HTTP_CONNECTION_POOL_H
#define HTTP_CONNECTION_POOL_H
#include <http/connection.h>
#include <list>
namespace isc {
namespace http {
/// @brief Pool of active HTTP connections.
///
/// The HTTP server is designed to handle many connections simultaneously.
/// The communication between the client and the server may take long time
/// and the server must be able to react on other events while the communication
/// with the clients is in progress. Thus, the server must track active
/// connections and gracefully close them when needed. An obvious case when the
/// connections must be terminated by the server is when the shutdown signal
/// is received.
///
/// This object is a simple container for the server connections which provides
/// means to terminate them on request.
class HttpConnectionPool {
public:
/// @brief Start new connection.
///
/// The connection is inserted to the pool and the