Commit 770aeae6 authored by Thomas Markwalder's avatar Thomas Markwalder
Browse files

[#691,!395] Add Connection socket exposure and close_callback handler

Addes close_callback and exposes Connectin's TCP socket to it
and connect_callback.

src/lib/http/client.h b/src/lib/http/client.h
    HttpClient:
        Added second parameter, socket FD, to ConnectHandler
        Added CloseHandler typedef

        asyncSendRequest() - added close_callback parameter

src/lib/http/client.cc
    Connection - added close_callback parameter to all
    methods that accept connect_callback parameter

    Added invocation of close_callback wherever the connection's
    socket is closed.

src/lib/http/tests/server_client_unittests.cc
    TEST_F(HttpClientTest, connectCloseCallbacks) - new test that
    verifies connect and close callback operations
parent 9240a766
......@@ -1327,7 +1327,7 @@ fi
#
# Doesn't seem to be required?
CPPFLAGS="$CPPFLAGS -DBOOST_ASIO_HEADER_ONLY"
#CPPFLAGS="$CPPFLAGS -DBOOST_ASIO_HEADER_ONLY"
#
# Disable threads: they seems to break things on some systems
# As now we use threads in boost ASIO this is commented out...
......
......@@ -124,9 +124,12 @@ public:
/// transaction completes.
/// @param connect_callback Pointer to the callback function to be invoked when
/// the client connects to the server.
/// @param close_callback Pointer to the callback function to be invoked when
/// the client closes the socket to the server.
void doTransaction(const HttpRequestPtr& request, const HttpResponsePtr& response,
const long request_timeout, const HttpClient::RequestHandler& callback,
const HttpClient::ConnectHandler& connect_callback);
const HttpClient::ConnectHandler& connect_callback,
const HttpClient::CloseHandler& close_callback);
/// @brief Closes the socket and cancels the request timer.
void close();
......@@ -265,6 +268,9 @@ private:
/// @brief Identifier of the current transaction.
uint64_t current_transid_;
/// @brief User supplied callback.
HttpClient::CloseHandler close_callback_;
};
/// @brief Shared pointer to the connection.
......@@ -305,6 +311,8 @@ public:
/// @param callback Pointer to the user callback for this request.
/// @param connect_callback Pointer to the user callback invoked when
/// the client connects to the server.
/// @param close_callback Pointer to the user callback invoked when
/// the client closes the connection to the server.
///
/// @return true if the request for the given URL has been retrieved,
/// false if there are no more requests queued for this URL.
......@@ -313,7 +321,8 @@ public:
HttpResponsePtr& response,
long& request_timeout,
HttpClient::RequestHandler& callback,
HttpClient::ConnectHandler& connect_callback) {
HttpClient::ConnectHandler& connect_callback,
HttpClient::CloseHandler& close_callback) {
// Check if there is a queue for this URL. If there is no queue, there
// is no request queued either.
auto it = queue_.find(url);
......@@ -327,6 +336,7 @@ public:
request_timeout = desc.request_timeout_,
callback = desc.callback_;
connect_callback = desc.connect_callback_;
close_callback = desc.close_callback_;
return (true);
}
}
......@@ -349,12 +359,15 @@ public:
/// transaction ends.
/// @param connect_callback Pointer to the user callback to be invoked when the
/// client connects to the server.
/// @param close_callback Pointer to the user callback to be invoked when the
/// client closes the connection to the server.
void queueRequest(const Url& url,
const HttpRequestPtr& request,
const HttpResponsePtr& response,
const long request_timeout,
const HttpClient::RequestHandler& request_callback,
const HttpClient::ConnectHandler& connect_callback) {
const HttpClient::ConnectHandler& connect_callback,
const HttpClient::CloseHandler& close_callback) {
auto it = conns_.find(url);
if (it != conns_.end()) {
ConnectionPtr conn = it->second;
......@@ -364,12 +377,13 @@ public:
queue_[url].push(RequestDescriptor(request, response,
request_timeout,
request_callback,
connect_callback));
connect_callback,
close_callback));
} else {
// Connection is idle, so we can start the transaction.
conn->doTransaction(request, response, request_timeout,
request_callback, connect_callback);
request_callback, connect_callback, close_callback);
}
} else {
......@@ -378,7 +392,7 @@ public:
ConnectionPtr conn(new Connection(io_service_, shared_from_this(),
url));
conn->doTransaction(request, response, request_timeout, request_callback,
connect_callback);
connect_callback, close_callback);
conns_[url] = conn;
}
}
......@@ -434,15 +448,19 @@ private:
/// @param callback Pointer to the user callback.
/// @param connect_callback pointer to the user callback to be invoked
/// when the client connects to the server.
/// @param close_callback pointer to the user callback to be invoked
/// when the client closes the connection to the server.
RequestDescriptor(const HttpRequestPtr& request,
const HttpResponsePtr& response,
const long request_timeout,
const HttpClient::RequestHandler& callback,
const HttpClient::ConnectHandler& connect_callback)
const HttpClient::ConnectHandler& connect_callback,
const HttpClient::CloseHandler& close_callback)
: request_(request), response_(response),
request_timeout_(request_timeout),
callback_(callback),
connect_callback_(connect_callback) {
connect_callback_(connect_callback),
close_callback_(close_callback) {
}
/// @brief Holds pointer to the request.
......@@ -455,6 +473,9 @@ private:
HttpClient::RequestHandler callback_;
/// @brief Holds pointer to the user callback for connect.
HttpClient::ConnectHandler connect_callback_;
/// @brief Holds pointer to the user callback for close.
HttpClient::CloseHandler close_callback_;
};
/// @brief Holds the queue of requests for different URLs.
......@@ -466,7 +487,7 @@ Connection::Connection(IOService& io_service,
const Url& url)
: conn_pool_(conn_pool), url_(url), socket_(io_service), timer_(io_service),
current_request_(), current_response_(), parser_(), current_callback_(),
buf_(), input_buf_(), current_transid_(0) {
buf_(), input_buf_(), current_transid_(0), close_callback_() {
}
Connection::~Connection() {
......@@ -486,13 +507,15 @@ Connection::doTransaction(const HttpRequestPtr& request,
const HttpResponsePtr& response,
const long request_timeout,
const HttpClient::RequestHandler& callback,
const HttpClient::ConnectHandler& connect_callback) {
const HttpClient::ConnectHandler& connect_callback,
const HttpClient::CloseHandler& close_callback) {
try {
current_request_ = request;
current_response_ = response;
parser_.reset(new HttpResponseParser(*current_response_));
parser_->initModel();
current_callback_ = callback;
close_callback_ = close_callback;
// Starting new transaction. Generate new transaction id.
++current_transid_;
......@@ -506,6 +529,9 @@ Connection::doTransaction(const HttpRequestPtr& request,
// data over this socket, when the peer may close the connection. In this
// case we'll need to re-transmit but we don't handle it here.
if (socket_.getASIOSocket().is_open() && !socket_.isUsable()) {
if (close_callback) {
close_callback(socket_.getNative());
}
socket_.close();
}
......@@ -542,6 +568,10 @@ Connection::doTransaction(const HttpRequestPtr& request,
void
Connection::close() {
if (close_callback_) {
close_callback_(socket_.getNative());
}
timer_.cancel();
socket_.close();
resetState();
......@@ -632,10 +662,12 @@ Connection::terminate(const boost::system::error_code& ec,
long request_timeout;
HttpClient::RequestHandler callback;
HttpClient::ConnectHandler connect_callback;
HttpClient::CloseHandler close_callback;
ConnectionPoolPtr conn_pool = conn_pool_.lock();
if (conn_pool && conn_pool->getNextRequest(url_, request, response, request_timeout,
callback, connect_callback)) {
doTransaction(request, response, request_timeout, callback, connect_callback);
callback, connect_callback, close_callback)) {
doTransaction(request, response, request_timeout, callback,
connect_callback, close_callback);
}
}
......@@ -685,7 +717,7 @@ Connection::connectCallback(HttpClient::ConnectHandler connect_callback,
if (connect_callback) {
// If the user defined callback indicates that the connection
// should not be continued.
if (!connect_callback(ec)) {
if (!connect_callback(ec, socket_.getNative())) {
return;
}
}
......@@ -844,7 +876,8 @@ HttpClient::asyncSendRequest(const Url& url, const HttpRequestPtr& request,
const HttpResponsePtr& response,
const HttpClient::RequestHandler& request_callback,
const HttpClient::RequestTimeout& request_timeout,
const HttpClient::ConnectHandler& connect_callback) {
const HttpClient::ConnectHandler& connect_callback,
const HttpClient::CloseHandler& close_callback) {
if (!url.isValid()) {
isc_throw(HttpClientError, "invalid URL specified for the HTTP client");
}
......@@ -862,7 +895,7 @@ HttpClient::asyncSendRequest(const Url& url, const HttpRequestPtr& request,
}
impl_->conn_pool_->queueRequest(url, request, response, request_timeout.value_,
request_callback, connect_callback);
request_callback, connect_callback, close_callback);
}
void
......
// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
......@@ -86,9 +86,18 @@ public:
///
/// Returned boolean value indicates whether the client should continue
/// connecting to the server (if true) or not (false).
/// It is passed the IO error code along with the native socket handle of
/// the connection's TCP socket. This always the socket's event readiness
/// to be monitored via select() or epoll.
///
/// @note Beware that the IO error code can be set to "in progress"
/// so a not null error code does not always mean the connect failed.
typedef std::function<bool(const boost::system::error_code&)> ConnectHandler;
typedef std::function<bool(const boost::system::error_code&, const int)> ConnectHandler;
/// @brief Optional handler invoked when client closes the connection to the server.
///
/// It is passed the native socket handler of the connection's TCP socket.
typedef std::function<void(const int)> CloseHandler;
/// @brief Constructor.
///
......@@ -167,7 +176,9 @@ public:
const RequestTimeout& request_timeout =
RequestTimeout(10000),
const ConnectHandler& connect_callback =
ConnectHandler());
ConnectHandler(),
const CloseHandler& close_callback =
CloseHandler());
/// @brief Closes all connections.
void stop();
......
......@@ -724,7 +724,7 @@ public:
"Content-Length: 3\r\n\r\n"
"{ }";
// Use custom listener and the specialized connection object.
// Use custom listener and the specialized connection object.
HttpListenerCustom<HttpConnectionType>
listener(io_service_, IOAddress(SERVER_ADDRESS), SERVER_PORT,
factory_, HttpListener::RequestTimeout(REQUEST_TIMEOUT),
......@@ -1375,6 +1375,123 @@ public:
ASSERT_NO_THROW(runIOService());
}
/// @brief Tests that underlying TCP socket can be registered and
/// unregsitered via connection and close callbacks.
///
/// It conducts to consequetive requests over the same client.
///
/// @param version HTTP version to be used.
void testConnectCloseCallbacks(const HttpVersion& version) {
// Start the server.
ASSERT_NO_THROW(listener_.start());
// Create a client and specify the URL on which the server can be reached.
HttpClient client(io_service_);
Url url("http://127.0.0.1:18123");
// Initiate request to the server.
PostHttpRequestJsonPtr request1 = createRequest("sequence", 1, version);
HttpResponseJsonPtr response1(new HttpResponseJson());
unsigned resp_num = 0;
ExternalMonitor monitor;
ASSERT_NO_THROW(client.asyncSendRequest(url, request1, response1,
[this, &resp_num](const boost::system::error_code& ec,
const HttpResponsePtr&,
const std::string&) {
if (++resp_num > 1) {
io_service_.stop();
}
EXPECT_FALSE(ec);
},
HttpClient::RequestTimeout(10000),
boost::bind(&ExternalMonitor::connectHandler, &monitor, _1, _2),
boost::bind(&ExternalMonitor::closeHandler, &monitor, _1)
));
// Initiate another request to the destination.
PostHttpRequestJsonPtr request2 = createRequest("sequence", 2, version);
HttpResponseJsonPtr response2(new HttpResponseJson());
ASSERT_NO_THROW(client.asyncSendRequest(url, request2, response2,
[this, &resp_num](const boost::system::error_code& ec,
const HttpResponsePtr&,
const std::string&) {
if (++resp_num > 1) {
io_service_.stop();
}
EXPECT_FALSE(ec);
},
HttpClient::RequestTimeout(10000),
boost::bind(&ExternalMonitor::connectHandler, &monitor, _1, _2),
boost::bind(&ExternalMonitor::closeHandler, &monitor, _1)
));
// Actually trigger the requests. The requests should be handlded by the
// server one after another. While the first request is being processed
// the server should queue another one.
ASSERT_NO_THROW(runIOService());
// Make sure that the received responses are different. We check that by
// comparing value of the sequence parameters.
ASSERT_TRUE(response1);
ConstElementPtr sequence1 = response1->getJsonElement("sequence");
ASSERT_TRUE(sequence1);
ASSERT_TRUE(response2);
ConstElementPtr sequence2 = response2->getJsonElement("sequence");
ASSERT_TRUE(sequence2);
EXPECT_NE(sequence1->intValue(), sequence2->intValue());
}
/// @brief Simulates external registery of Connection TCP sockets
///
/// Provides methods compatible with Connection callbacks for connnect
/// and close operations.
class ExternalMonitor {
public:
/// @breif Constructor
ExternalMonitor() : registered_fd_(-1) {};
/// @brief Connect callback handler
/// @param ec Error status of the ASIO connect
/// @param tcp_native_fd socket descriptor to register
bool connectHandler(const boost::system::error_code& ec, int tcp_native_fd) {
if (!ec || ec.value() == boost::asio::error::in_progress) {
if (tcp_native_fd >= 0) {
registered_fd_ = tcp_native_fd;
return (true);
}
// Invalid fd?, this really should not be possible. EXPECT makes
// sure we log it.
EXPECT_TRUE (tcp_native_fd >= 0) << "no ec error but invalid fd?";
return (false);
} else if (ec.value() == boost::asio::error::already_connected) {
if (registered_fd_ != tcp_native_fd) {
return (false);
}
}
// ec indicates an error, return true, so that error can be handled
// by Connection logic.
return (true);
}
/// @brief Close callback handler
///
/// @param tcp_native_fd socket descriptor to register
void closeHandler(int tcp_native_fd) {
EXPECT_EQ(tcp_native_fd, registered_fd_) << "closeHandler fd mismatch";
if (tcp_native_fd >= 0) {
registered_fd_ = -1;
}
}
/// @brief Keeps track of socket currently "registered" for external monitoring.
int registered_fd_;
};
/// @brief Instance of the listener used in the tests.
HttpListener listener_;
......@@ -1384,6 +1501,7 @@ public:
/// @brief Instance of the third listener used in the tests (with short idle
/// timeout).
HttpListener listener3_;
};
// Test that two conscutive requests can be sent over the same (persistent)
......@@ -1671,7 +1789,7 @@ TEST_F(HttpClientTest, clientConnectTimeout) {
// try to send a request to the server. This simulates the
// case of connect() taking very long and should eventually
// cause the transaction to time out.
[](const boost::system::error_code& /*ec*/) {
[](const boost::system::error_code& /*ec*/, int) {
return (false);
}));
......@@ -1688,5 +1806,9 @@ TEST_F(HttpClientTest, clientConnectTimeout) {
ASSERT_NO_THROW(runIOService());
}
/// Tests that connect and close callbacks work correctly.
TEST_F(HttpClientTest, connectCloseCallbacks) {
ASSERT_NO_FATAL_FAILURE(testConnectCloseCallbacks(HttpVersion(1, 1)));
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment