Commit 55eb7266 authored by Marcin Siodelski's avatar Marcin Siodelski
Browse files

[#491,!363] HTTP listner associates ASIO handlers with transactions.

parent d409b8c2
......@@ -23,6 +23,7 @@ libkea_http_la_SOURCES += http_message_parser_base.cc http_message_parser_base.h
libkea_http_la_SOURCES += http_messages.cc http_messages.h
libkea_http_la_SOURCES += http_types.h
libkea_http_la_SOURCES += listener.cc listener.h
libkea_http_la_SOURCES += listener_impl.cc listener_impl.h
libkea_http_la_SOURCES += post_request.cc post_request.h
libkea_http_la_SOURCES += post_request_json.cc post_request_json.h
libkea_http_la_SOURCES += request.cc request.h
......@@ -99,6 +100,7 @@ libkea_http_include_HEADERS = \
http_messages.h \
http_types.h \
listener.h \
listener_impl.h \
post_request.h \
post_request_json.h \
request.h \
......
// Copyright (C) 2017-2018 Internet Systems Consortium, Inc. ("ISC")
// Copyright (C) 2017-2019 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
......@@ -12,6 +12,7 @@
#include <http/http_log.h>
#include <http/http_messages.h>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
using namespace isc::asiolink;
......@@ -27,6 +28,30 @@ constexpr size_t MAX_LOGGED_MESSAGE_SIZE = 1024;
namespace isc {
namespace http {
HttpConnection::Transaction::Transaction(const HttpResponseCreatorPtr& response_creator,
const HttpRequestPtr& request)
: request_(request ? request : response_creator->createNewHttpRequest()),
parser_(new HttpRequestParser(*request_)),
input_buf_(),
output_buf_() {
parser_->initModel();
}
HttpConnection::TransactionPtr
HttpConnection::Transaction::create(const HttpResponseCreatorPtr& response_creator) {
return (boost::make_shared<Transaction>(response_creator));
}
HttpConnection::TransactionPtr
HttpConnection::Transaction::spawn(const HttpResponseCreatorPtr& response_creator,
const TransactionPtr& transaction) {
if (transaction) {
return (boost::make_shared<Transaction>(response_creator,
transaction->getRequest()));
}
return (create(response_creator));
}
void
HttpConnection::
SocketCallback::operator()(boost::system::error_code ec, size_t length) {
......@@ -36,13 +61,13 @@ SocketCallback::operator()(boost::system::error_code ec, size_t length) {
callback_(ec, length);
}
HttpConnection:: HttpConnection(asiolink::IOService& io_service,
HttpAcceptor& acceptor,
HttpConnectionPool& connection_pool,
const HttpResponseCreatorPtr& response_creator,
const HttpAcceptorCallback& callback,
const long request_timeout,
const long idle_timeout)
HttpConnection::HttpConnection(asiolink::IOService& io_service,
HttpAcceptor& acceptor,
HttpConnectionPool& connection_pool,
const HttpResponseCreatorPtr& response_creator,
const HttpAcceptorCallback& callback,
const long request_timeout,
const long idle_timeout)
: request_timer_(io_service),
request_timeout_(request_timeout),
idle_timeout_(idle_timeout),
......@@ -50,12 +75,7 @@ HttpConnection:: HttpConnection(asiolink::IOService& io_service,
acceptor_(acceptor),
connection_pool_(connection_pool),
response_creator_(response_creator),
request_(response_creator_->createNewHttpRequest()),
parser_(new HttpRequestParser(*request_)),
acceptor_callback_(callback),
buf_(),
output_buf_() {
parser_->initModel();
acceptor_callback_(callback) {
}
HttpConnection::~HttpConnection() {
......@@ -98,17 +118,26 @@ HttpConnection::asyncAccept() {
}
void
HttpConnection::doRead() {
HttpConnection::doRead(TransactionPtr transaction) {
try {
TCPEndpoint endpoint;
// Transaction is was not created if we are starting to read the
// new request.
if (!transaction) {
transaction = Transaction::create(response_creator_);
}
// Create instance of the callback. It is safe to pass the local instance
// of the callback, because the underlying boost functions make copies
// as needed.
SocketCallback cb(boost::bind(&HttpConnection::socketReadCallback,
shared_from_this(),
transaction,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
socket_.asyncReceive(static_cast<void*>(buf_.data()), buf_.size(),
socket_.asyncReceive(static_cast<void*>(transaction->getInputBufData()),
transaction->getInputBufSize(),
0, &endpoint, cb);
} catch (...) {
......@@ -117,25 +146,34 @@ HttpConnection::doRead() {
}
void
HttpConnection::doWrite() {
HttpConnection::doWrite(HttpConnection::TransactionPtr transaction) {
try {
if (!output_buf_.empty()) {
if (transaction->outputDataAvail()) {
// Create instance of the callback. It is safe to pass the local instance
// of the callback, because the underlying boost functions make copies
// as needed.
SocketCallback cb(boost::bind(&HttpConnection::socketWriteCallback,
shared_from_this(),
transaction,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
socket_.asyncSend(output_buf_.data(),
output_buf_.length(),
socket_.asyncSend(transaction->getOutputBufData(),
transaction->getOutputBufSize(),
cb);
} else {
if (!request_->isPersistent()) {
// The isPersistent() function may throw if the request hasn't
// been created, i.e. the HTTP headers weren't parsed. We catch
// this exception below and close the connection since we're
// unable to tell if the connection should remain persistent
// or not. The default is to close it.
if (!transaction->getRequest()->isPersistent()) {
stopThisConnection();
} else {
reinitProcessingState();
// The connection is persistent and we are done sending
// the previous response. Start listening for the next
// requests.
setupIdleTimer();
doRead();
}
}
......@@ -145,9 +183,10 @@ HttpConnection::doWrite() {
}
void
HttpConnection::asyncSendResponse(const ConstHttpResponsePtr& response) {
output_buf_ = response->toString();
doWrite();
HttpConnection::asyncSendResponse(const ConstHttpResponsePtr& response,
TransactionPtr transaction) {
transaction->setOutputBuf(response->toString());
doWrite(transaction);
}
......@@ -175,7 +214,8 @@ HttpConnection::acceptorCallback(const boost::system::error_code& ec) {
}
void
HttpConnection::socketReadCallback(boost::system::error_code ec, size_t length) {
HttpConnection::socketReadCallback(HttpConnection::TransactionPtr transaction,
boost::system::error_code ec, size_t length) {
if (ec) {
// IO service has been stopped and the connection is probably
// going to be shutting down.
......@@ -198,7 +238,7 @@ HttpConnection::socketReadCallback(boost::system::error_code ec, size_t length)
}
// Receiving is in progress, so push back the timeout.
setupRequestTimer();
setupRequestTimer(transaction);
if (length != 0) {
LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_DETAIL_DATA,
......@@ -206,17 +246,20 @@ HttpConnection::socketReadCallback(boost::system::error_code ec, size_t length)
.arg(length)
.arg(getRemoteEndpointAddressAsText());
std::string s(&buf_[0], buf_[0] + length);
parser_->postBuffer(static_cast<void*>(buf_.data()), length);
parser_->poll();
transaction->getParser()->postBuffer(static_cast<void*>(transaction->getInputBufData()),
length);
transaction->getParser()->poll();
}
if (parser_->needData()) {
doRead();
if (transaction->getParser()->needData()) {
// The parser indicates that the some part of the message being
// received is still missing, so continue to read.
doRead(transaction);
} else {
try {
request_->finalize();
// The whole message has been received, so let's finalize it.
transaction->getRequest()->finalize();
LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC,
HTTP_CLIENT_REQUEST_RECEIVED)
......@@ -225,7 +268,7 @@ HttpConnection::socketReadCallback(boost::system::error_code ec, size_t length)
LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC_DATA,
HTTP_CLIENT_REQUEST_RECEIVED_DETAILS)
.arg(getRemoteEndpointAddressAsText())
.arg(parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE));
.arg(transaction->getParser()->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE));
} catch (const std::exception& ex) {
LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC,
......@@ -236,13 +279,15 @@ HttpConnection::socketReadCallback(boost::system::error_code ec, size_t length)
LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC_DATA,
HTTP_BAD_CLIENT_REQUEST_RECEIVED_DETAILS)
.arg(getRemoteEndpointAddressAsText())
.arg(parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE));
.arg(transaction->getParser()->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE));
}
// Don't want to timeout if creation of the response takes long.
request_timer_.cancel();
HttpResponsePtr response = response_creator_->createHttpResponse(request_);
// Create the response from the received request using the custom
// response creator.
HttpResponsePtr response = response_creator_->createHttpResponse(transaction->getRequest());
LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC,
HTTP_SERVER_RESPONSE_SEND)
.arg(response->toBriefString())
......@@ -254,15 +299,17 @@ HttpConnection::socketReadCallback(boost::system::error_code ec, size_t length)
.arg(HttpMessageParserBase::logFormatHttpMessage(response->toString(),
MAX_LOGGED_MESSAGE_SIZE));
// Response created. Active timer again.
setupRequestTimer();
// Response created. Activate the timer again.
setupRequestTimer(transaction);
asyncSendResponse(response);
// Start sending the response.
asyncSendResponse(response, transaction);
}
}
void
HttpConnection::socketWriteCallback(boost::system::error_code ec, size_t length) {
HttpConnection::socketWriteCallback(HttpConnection::TransactionPtr transaction,
boost::system::error_code ec, size_t length) {
if (ec) {
// IO service has been stopped and the connection is probably
// going to be shutting down.
......@@ -279,49 +326,41 @@ HttpConnection::socketWriteCallback(boost::system::error_code ec, size_t length)
// read something from the socket on the next attempt.
} else {
// Sending is in progress, so push back the timeout.
setupRequestTimer();
setupRequestTimer(transaction);
doWrite();
doWrite(transaction);
}
}
// Since each transaction has its own output buffer, it is not really
// possible that the number of bytes written is larger than the size
// of the buffer. But, let's be safe and set the length to the size
// of the buffer if that unexpected condition occurs.
if (length > transaction->getOutputBufSize()) {
length = transaction->getOutputBufSize();
}
if (length <= output_buf_.size()) {
if (length <= transaction->getOutputBufSize()) {
// Sending is in progress, so push back the timeout.
setupRequestTimer();
output_buf_.erase(0, length);
doWrite();
} else {
output_buf_.clear();
if (!request_->isPersistent()) {
stopThisConnection();
} else {
reinitProcessingState();
doRead();
}
setupRequestTimer(transaction);
}
}
void
HttpConnection::reinitProcessingState() {
request_ = response_creator_->createNewHttpRequest();
parser_.reset(new HttpRequestParser(*request_));
parser_->initModel();
setupIdleTimer();
// Eat the 'length' number of bytes from the output buffer and only
// leave the part of the response that hasn't been sent.
transaction->consumeOutputBuf(length);
// Schedule the write of the unsent data.
doWrite(transaction);
}
void
HttpConnection::setupRequestTimer() {
HttpConnection::setupRequestTimer(TransactionPtr transaction) {
// Pass raw pointer rather than shared_ptr to this object,
// because IntervalTimer already passes shared pointer to the
// IntervalTimerImpl to make sure that the callback remains
// valid.
request_timer_.setup(boost::bind(&HttpConnection::requestTimeoutCallback,
this),
this, transaction),
request_timeout_, IntervalTimer::ONE_SHOT);
}
......@@ -333,14 +372,39 @@ HttpConnection::setupIdleTimer() {
}
void
HttpConnection::requestTimeoutCallback() {
HttpConnection::requestTimeoutCallback(TransactionPtr transaction) {
LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_DETAIL,
HTTP_CLIENT_REQUEST_TIMEOUT_OCCURRED)
.arg(getRemoteEndpointAddressAsText());
// We need to differentiate the transactions between a normal response and the
// timeout. We create new transaction from the current transaction. It is
// to preserve the request we're responding to.
transaction = Transaction::spawn(response_creator_, transaction);
// The new transaction inherits the request from the original transaction
// if such transaction exists.
auto request = transaction->getRequest();
// Depending on when the timeout occured, the HTTP version of the request
// may or may not be available. Therefore we check if the HTTP version is
// set in the request. If it is not available, we need to create a dummy
// request with the default HTTP/1.0 version. This version will be used
// in the response.
if (request->context()->http_version_major_ == 0) {
request.reset(new HttpRequest(HttpRequest::Method::HTTP_POST, "/",
HttpVersion::HTTP_10(),
HostHttpHeader("dummy")));
request->finalize();
}
// Create the timeout response.
HttpResponsePtr response =
response_creator_->createStockHttpResponse(request_,
response_creator_->createStockHttpResponse(request,
HttpStatusCode::REQUEST_TIMEOUT);
asyncSendResponse(response);
// Send the HTTP 408 status.
asyncSendResponse(response, transaction);
}
void
......
// Copyright (C) 2017-2018 Internet Systems Consortium, Inc. ("ISC")
// Copyright (C) 2017-2019 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
......@@ -78,6 +78,136 @@ private:
SocketCallbackFunction callback_;
};
protected:
class Transaction;
/// @brief Shared pointer to the @c Transaction.
typedef boost::shared_ptr<Transaction> TransactionPtr;
/// @brief Represents a single exchange of the HTTP messages.
///
/// In HTTP/1.1 multiple HTTP message exchanges may be conducted
/// over the same persistent connection before the connection is
/// closed. Since ASIO handlers for these exchanges may be sometimes
/// executed out of order, there is a need to associate the states of
/// the exchanges with the appropriate ASIO handlers. This object
/// represents such state and includes: received request, request
/// parser (being in the particular state of parsing), input buffer
/// and the output buffer.
///
/// A pointer to the @c Transaction object is passed to the ASIO
/// callbacks when the new message exchange begins. It is passed
/// between the callbacks until the message exchange is completed.
class Transaction {
public:
/// @brief Constructor.
///
/// @param response_creator Pointer to the response creator being
/// used for generating a response from the request.
/// @param request Pointer to the HTTP request. If the request is
/// null, the constructor creates new request instance using the
/// provided response creator.
Transaction(const HttpResponseCreatorPtr& response_creator,
const HttpRequestPtr& request = HttpRequestPtr());
/// @brief Creates new transaction instance.
///
/// It is called when the HTTP server has just scheduled asynchronous
/// reading of the HTTP message.
///
/// @param response_creator Pointer to the response creator to be passed
/// to the transaction's constructor.
///
/// @return Pointer to the created transaction instance.
static TransactionPtr create(const HttpResponseCreatorPtr& response_creator);
/// @brief Creates new transaction from the current transaction.
///
/// This method creates new transaction and inherits the request
/// from the existing transaction. This is used when the timeout
/// occurs during the messages exchange. The server creates the new
/// transaction to handle the timeout but this new transaction must
/// include the request instance so as HTTP version information can
/// be inferred from it while sending the timeout response. The
/// HTTP version information should match between the request and
/// the response.
///
/// @param response_creator Pointer to the response creator.
/// @param transaction Existing transaction from which the request
/// should be inherited. If the transaction is null, the new (dummy)
/// request is created for the new transaction.
static TransactionPtr spawn(const HttpResponseCreatorPtr& response_creator,
const TransactionPtr& transaction);
/// @brief Returns request instance associated with the transaction.
HttpRequestPtr getRequest() const {
return (request_);
}
/// @brief Returns parser instance associated with the transaction.
HttpRequestParserPtr getParser() const {
return (parser_);
}
/// @brief Returns pointer to the first byte of the input buffer.
char* getInputBufData() {
return (input_buf_.data());
}
/// @brief Returns input buffer size.
size_t getInputBufSize() const {
return (input_buf_.size());
}
/// @brief Checks if the output buffer contains some data to be
/// sent.
///
/// @return true if the output buffer contains data to be sent,
/// false otherwise.
bool outputDataAvail() const {
return (!output_buf_.empty());
}
/// @brief Returns pointer to the first byte of the output buffer.
const char* getOutputBufData() const {
return (output_buf_.data());
}
/// @brief Returns size of the output buffer.
size_t getOutputBufSize() const {
return (output_buf_.size());
}
/// @brief Replaces output buffer contents with new contents.
///
/// @param response New contents for the output buffer.
void setOutputBuf(const std::string& response) {
output_buf_ = response;
}
/// @brief Erases n bytes from the beginning of the output buffer.
///
/// @param length Number of bytes to be erased.
void consumeOutputBuf(const size_t length) {
output_buf_.erase(0, length);
}
private:
/// @brief Pointer to the request received over this connection.
HttpRequestPtr request_;
/// @brief Pointer to the HTTP request parser.
HttpRequestParserPtr parser_;
/// @brief Buffer for received data.
std::array<char, 32768> input_buf_;
/// @brief Buffer used for outbound data.
std::string output_buf_;
};
public:
......@@ -105,7 +235,7 @@ public:
/// @brief Destructor.
///
/// Closes current connection.
~HttpConnection();
virtual ~HttpConnection();
/// @brief Asynchronously accepts new connection.
///
......@@ -124,23 +254,32 @@ public:
/// HTTP response using supplied response creator object.
///
/// In case of error the connection is stopped.
void doRead();
///
/// @param transaction Pointer to the transaction for which the read
/// operation should be performed. It defaults to null pointer which
/// indicates that this function should create new transaction.
void doRead(TransactionPtr transaction = TransactionPtr());
private:
protected:
/// @brief Starts asynchronous write to the socket.
///
/// The @c output_buf_ must contain the data to be sent.
///
/// In case of error the connection is stopped.
void doWrite();
///
/// @param transaction Pointer to the transaction for which the write
/// operation should be performed.
void doWrite(TransactionPtr transaction);
/// @brief Sends HTTP response asynchronously.
///
/// Internally it calls @ref HttpConnection::doWrite to send the data.
///
/// @param response Pointer to the HTTP response to be sent.
void asyncSendResponse(const ConstHttpResponsePtr& response);
/// @param transaction Pointer to the transaction.
void asyncSendResponse(const ConstHttpResponsePtr& response,
TransactionPtr transaction);
/// @brief Local callback invoked when new connection is accepted.
///
......@@ -157,28 +296,28 @@ private:
/// parsing. When the parser signals end of the HTTP request the callback
/// prepares a response and starts asynchronous send over the socket.
///
/// @param transaction Pointer to the transaction for which the callback
/// is invoked.
/// @param ec Error code.
/// @param length Length of the received data.
void socketReadCallback(boost::system::error_code ec,
void socketReadCallback(TransactionPtr transaction,
boost::system::error_code ec,
size_t length);
/// @brief Callback invoked when data is sent over the socket.
///
/// @param transaction Pointer to the transaction for which the callback
/// is invoked.
/// @param ec Error code.
/// @param length Length of the data sent.
void socketWriteCallback(boost::system::error_code ec,
size_t length);
/// @brief Reinitializes request processing state after sending a response.
///
/// This method is only called for persistent connections, when the response
/// to a previous command has been sent. It initializes the state machine to
/// be able to process the next request. It also sets the persistent connection
/// idle timer to monitor the connection timeout.
void reinitProcessingState();
virtual void socketWriteCallback(TransactionPtr transaction,
boost::system::error_code ec,
size_t length);
/// @brief Reset timer for detecting request timeouts.
void setupRequestTimer();
///
/// @param transaction Pointer to the transaction to be guarded by the timeout.
void setupRequestTimer(TransactionPtr transaction = TransactionPtr());
/// @brief Reset timer for detecing idle timeout in persistent connections.
void setupIdleTimer();
......@@ -187,7 +326,9 @@ private:
///
/// This callback creates HTTP response with Request Timeout error code
/// and sends it to the client.
void requestTimeoutCallback();
///
/// @param transaction Pointer to the transaction for which timeout occurs.
void requestTimeoutCallback(TransactionPtr transaction);
void idleTimeoutCallback();
......@@ -220,20 +361,8 @@ private:
/// HTTP responses.
HttpResponseCreatorPtr response_creator_;
/// @brief Pointer to the request received over this connection.
HttpRequestPtr request_;
/// @brief Pointer to the HTTP request parser.
HttpRequestParserPtr parser_;
/// @brief External TCP acceptor callback.
HttpAcceptorCallback acceptor_callback_;