Commit 1ef232f9 authored by Ondřej Surý's avatar Ondřej Surý
Browse files

Merge the common parts between udp, tcpdns and tlsdns protocol

The udp, tcpdns and tlsdns contained lot of cut&paste code or code that
was very similar making the stack harder to maintain as any change to
one would have to be copied to the the other protocols.

In this commit, we merge the common parts into the common functions
under isc__nm_<foo> namespace and just keep the little differences based
on the socket type.
parent caa5b654
......@@ -41,6 +41,8 @@
#define ISC_NETMGR_TID_UNKNOWN -1
#define ISC_NETMGR_TLSBUF_SIZE 65536
#if !defined(WIN32)
/*
* New versions of libuv support recvmmsg on unices.
......@@ -1850,3 +1852,49 @@ void
isc__nm_tcpdns_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result);
void
isc__nm_tlsdns_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result);
isc_result_t
isc__nm_tcpdns_processbuffer(isc_nmsocket_t *sock);
isc_result_t
isc__nm_tlsdns_processbuffer(isc_nmsocket_t *sock);
isc__nm_uvreq_t *
isc__nm_get_read_req(isc_nmsocket_t *sock, isc_sockaddr_t *sockaddr);
void
isc__nm_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf);
void
isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags);
void
isc__nm_tcpdns_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
void
isc__nm_tlsdns_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
void
isc__nm_start_reading(isc_nmsocket_t *sock);
void
isc__nm_stop_reading(isc_nmsocket_t *sock);
void
isc__nm_process_sock_buffer(isc_nmsocket_t *sock);
void
isc__nm_resume_processing(void *arg);
bool
isc__nm_inactive(isc_nmsocket_t *sock);
void
isc__nm_alloc_dnsbuf(isc_nmsocket_t *sock, size_t len);
void
isc__nm_failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_result_t eresult);
void
isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult);
void
isc__nm_failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_result_t eresult);
void
isc__nm_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result);
#define STREAM_CLIENTS_PER_CONN 23
......@@ -1583,8 +1583,86 @@ isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg,
handle->dofree = dofree;
}
static void
isc__nmsocket_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
void
isc__nm_alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
REQUIRE(len <= NM_BIG_BUF);
if (sock->buf == NULL) {
/* We don't have the buffer at all */
size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF;
sock->buf = isc_mem_allocate(sock->mgr->mctx, alloc_len);
sock->buf_size = alloc_len;
} else {
/* We have the buffer but it's too small */
sock->buf = isc_mem_reallocate(sock->mgr->mctx, sock->buf,
NM_BIG_BUF);
sock->buf_size = NM_BIG_BUF;
}
}
void
isc__nm_failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_result_t eresult) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(req));
if (req->cb.send != NULL) {
isc__nm_sendcb(sock, req, eresult, true);
} else {
isc__nm_uvreq_put(&req, sock);
}
}
void
isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
REQUIRE(sock->accepting);
REQUIRE(sock->server);
/*
* Detach the quota early to make room for other connections;
* otherwise it'd be detached later asynchronously, and clog
* the quota unnecessarily.
*/
if (sock->quota != NULL) {
isc_quota_detach(&sock->quota);
}
isc__nmsocket_detach(&sock->server);
sock->accepting = false;
switch (eresult) {
case ISC_R_NOTCONNECTED:
/* IGNORE: The client disconnected before we could accept */
break;
default:
isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
"Accepting TCP connection failed: %s",
isc_result_totext(eresult));
}
}
void
isc__nm_failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_result_t eresult) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(req));
REQUIRE(sock->tid == isc_nm_tid());
REQUIRE(atomic_load(&sock->connecting));
REQUIRE(req->cb.connect != NULL);
atomic_store(&sock->connecting, false);
isc__nmsocket_clearcb(sock);
isc__nm_connectcb(sock, req, eresult);
isc__nmsocket_prep_destroy(sock);
}
void
isc__nm_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
REQUIRE(VALID_NMSOCK(sock));
switch (sock->type) {
case isc_nm_udpsocket:
......@@ -1613,7 +1691,7 @@ isc__nmsocket_readtimeout_cb(uv_timer_t *timer) {
REQUIRE(sock->tid == isc_nm_tid());
REQUIRE(sock->reading);
isc__nmsocket_failed_read_cb(sock, ISC_R_TIMEDOUT);
isc__nm_failed_read_cb(sock, ISC_R_TIMEDOUT);
}
void
......
......@@ -34,7 +34,6 @@
#include "netmgr-int.h"
#include "uv-compat.h"
#define TCPDNS_CLIENTS_PER_CONN 23
/*%<
*
* Maximum number of simultaneous handles in flight supported for a single
......@@ -57,12 +56,6 @@ can_log_tcpdns_quota(void) {
return (false);
}
static void
tcpdns_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf);
static void
resume_processing(void *arg);
static isc_result_t
tcpdns_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
......@@ -75,9 +68,6 @@ tcpdns_connect_cb(uv_connect_t *uvreq, int status);
static void
tcpdns_connection_cb(uv_stream_t *server, int status);
static void
read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
static void
tcpdns_close_cb(uv_handle_t *uvhandle);
......@@ -87,98 +77,11 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota);
static void
quota_accept_cb(isc_quota_t *quota, void *sock0);
static void
failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult);
static void
failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_result_t eresult);
static void
stop_tcpdns_parent(isc_nmsocket_t *sock);
static void
stop_tcpdns_child(isc_nmsocket_t *sock);
static void
process_sock_buffer(isc_nmsocket_t *sock);
static void
stop_reading(isc_nmsocket_t *sock);
static isc__nm_uvreq_t *
get_read_req(isc_nmsocket_t *sock);
static inline void
alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
REQUIRE(len <= NM_BIG_BUF);
if (sock->buf == NULL) {
/* We don't have the buffer at all */
size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF;
sock->buf = isc_mem_allocate(sock->mgr->mctx, alloc_len);
sock->buf_size = alloc_len;
} else {
/* We have the buffer but it's too small */
sock->buf = isc_mem_reallocate(sock->mgr->mctx, sock->buf,
NM_BIG_BUF);
sock->buf_size = NM_BIG_BUF;
}
}
static bool
inactive(isc_nmsocket_t *sock) {
return (!isc__nmsocket_active(sock) || atomic_load(&sock->closing) ||
atomic_load(&sock->mgr->closing) ||
(sock->server != NULL && !isc__nmsocket_active(sock->server)));
}
static void
failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
REQUIRE(sock->accepting);
REQUIRE(sock->server);
/*
* Detach the quota early to make room for other connections;
* otherwise it'd be detached later asynchronously, and clog
* the quota unnecessarily.
*/
if (sock->quota != NULL) {
isc_quota_detach(&sock->quota);
}
isc__nmsocket_detach(&sock->server);
sock->accepting = false;
switch (eresult) {
case ISC_R_NOTCONNECTED:
/* IGNORE: The client disconnected before we could accept */
break;
default:
isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
"Accepting TCP connection failed: %s",
isc_result_totext(eresult));
}
}
static void
failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_result_t eresult) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(req));
REQUIRE(sock->tid == isc_nm_tid());
REQUIRE(atomic_load(&sock->connecting));
REQUIRE(req->cb.connect != NULL);
atomic_store(&sock->connecting, false);
isc__nmsocket_clearcb(sock);
isc__nm_connectcb(sock, req, eresult);
isc__nmsocket_prep_destroy(sock);
}
static isc_result_t
tcpdns_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker = NULL;
......@@ -326,7 +229,7 @@ tcpdns_connect_cb(uv_connect_t *uvreq, int status) {
return;
error:
failed_connect_cb(sock, req, result);
isc__nm_failed_connect_cb(sock, req, result);
}
isc_result_t
......@@ -643,7 +546,7 @@ tcpdns_connection_cb(uv_stream_t *server, int status) {
REQUIRE(VALID_NMSOCK(ssock));
REQUIRE(ssock->tid == isc_nm_tid());
if (inactive(ssock)) {
if (isc__nm_inactive(ssock)) {
result = ISC_R_CANCELED;
goto done;
}
......@@ -720,13 +623,13 @@ isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) {
}
}
static void
failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
void
isc__nm_tcpdns_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(result != ISC_R_SUCCESS);
isc__nmsocket_timer_stop(sock);
stop_reading(sock);
isc__nm_stop_reading(sock);
if (!sock->recv_read) {
goto destroy;
......@@ -734,7 +637,7 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
sock->recv_read = false;
if (sock->recv_cb != NULL) {
isc__nm_uvreq_t *req = get_read_req(sock);
isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL);
isc__nmsocket_clearcb(sock);
isc__nm_readcb(sock, req, result);
}
......@@ -749,67 +652,6 @@ destroy:
}
}
void
isc__nm_tcpdns_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
failed_read_cb(sock, result);
}
static void
failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_result_t eresult) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(req));
if (req->cb.send != NULL) {
isc__nm_sendcb(sock, req, eresult, true);
} else {
isc__nm_uvreq_put(&req, sock);
}
}
static isc__nm_uvreq_t *
get_read_req(isc_nmsocket_t *sock) {
isc__nm_uvreq_t *req = NULL;
req = isc__nm_uvreq_get(sock->mgr, sock);
req->cb.recv = sock->recv_cb;
req->cbarg = sock->recv_cbarg;
if (atomic_load(&sock->client)) {
isc_nmhandle_attach(sock->statichandle, &req->handle);
} else {
req->handle = isc__nmhandle_get(sock, NULL, NULL);
}
return (req);
}
static void
start_reading(isc_nmsocket_t *sock) {
int r;
if (sock->reading) {
return;
}
r = uv_read_start(&sock->uv_handle.stream, tcpdns_alloc_cb, read_cb);
RUNTIME_CHECK(r == 0);
sock->reading = true;
}
static void
stop_reading(isc_nmsocket_t *sock) {
int r;
if (!sock->reading) {
return;
}
r = uv_read_stop(&sock->uv_handle.stream);
RUNTIME_CHECK(r == 0);
sock->reading = false;
}
void
isc__nm_tcpdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
REQUIRE(VALID_NMHANDLE(handle));
......@@ -847,31 +689,6 @@ isc__nm_tcpdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
return;
}
/*%<
* Allocator for TCP read operations. Limited to size 2^16.
*
* Note this doesn't actually allocate anything, it just assigns the
* worker's receive buffer to a socket, and marks it as "in use".
*/
static void
tcpdns_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
isc__networker_t *worker = NULL;
UNUSED(size);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpdnssocket);
REQUIRE(isc__nm_in_netthread());
worker = &sock->mgr->workers[sock->tid];
INSIST(!worker->recvbuf_inuse);
buf->base = worker->recvbuf;
buf->len = size;
worker->recvbuf_inuse = true;
}
void
isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpdnsread_t *ievent =
......@@ -883,13 +700,13 @@ isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid());
if (inactive(sock)) {
if (isc__nm_inactive(sock)) {
sock->reading = true;
failed_read_cb(sock, ISC_R_CANCELED);
isc__nm_failed_read_cb(sock, ISC_R_CANCELED);
return;
}
process_sock_buffer(sock);
isc__nm_process_sock_buffer(sock);
}
/*
......@@ -901,8 +718,8 @@ isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) {
*
* The caller will need to unreference the handle.
*/
static isc_result_t
processbuffer(isc_nmsocket_t *sock) {
isc_result_t
isc__nm_tcpdns_processbuffer(isc_nmsocket_t *sock) {
size_t len;
isc__nm_uvreq_t *req = NULL;
isc_nmhandle_t *handle = NULL;
......@@ -910,7 +727,7 @@ processbuffer(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid());
if (inactive(sock)) {
if (isc__nm_inactive(sock)) {
return (ISC_R_CANCELED);
}
......@@ -931,7 +748,7 @@ processbuffer(isc_nmsocket_t *sock) {
return (ISC_R_NOMORE);
}
req = get_read_req(sock);
req = isc__nm_get_read_req(sock, NULL);
REQUIRE(VALID_UVREQ(req));
/*
......@@ -975,8 +792,9 @@ processbuffer(isc_nmsocket_t *sock) {
return (ISC_R_SUCCESS);
}
static void
read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
void
isc__nm_tcpdns_read_cb(uv_stream_t *stream, ssize_t nread,
const uv_buf_t *buf) {
isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)stream);
uint8_t *base = NULL;
size_t len;
......@@ -986,8 +804,8 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
REQUIRE(sock->reading);
REQUIRE(buf != NULL);
if (inactive(sock)) {
failed_read_cb(sock, ISC_R_CANCELED);
if (isc__nm_inactive(sock)) {
isc__nm_failed_read_cb(sock, ISC_R_CANCELED);
goto free;
}
......@@ -997,8 +815,7 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
sock->statsindex[STATID_RECVFAIL]);
}
failed_read_cb(sock, isc__nm_uverr2result(nread));
isc__nm_failed_read_cb(sock, isc__nm_uverr2result(nread));
goto free;
}
......@@ -1015,7 +832,7 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
*/
if (sock->buf_len + len > sock->buf_size) {
alloc_dnsbuf(sock, sock->buf_len + len);
isc__nm_alloc_dnsbuf(sock, sock->buf_len + len);
}
memmove(sock->buf + sock->buf_len, base, len);
sock->buf_len += len;
......@@ -1024,7 +841,7 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
sock->read_timeout = atomic_load(&sock->mgr->idle);
}
process_sock_buffer(sock);
isc__nm_process_sock_buffer(sock);
free:
isc__nm_free_uvbuf(sock, buf);
}
......@@ -1086,7 +903,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
REQUIRE(VALID_NMSOCK(ssock));
REQUIRE(ssock->tid == isc_nm_tid());
if (inactive(ssock)) {
if (isc__nm_inactive(ssock)) {
if (quota != NULL) {
isc_quota_detach(&quota);
}
......@@ -1168,7 +985,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
csock->read_timeout = atomic_load(&csock->mgr->init);
csock->closehandle_cb = resume_processing;
csock->closehandle_cb = isc__nm_resume_processing;
/*
* We need to keep the handle alive until we fail to read or connection
......@@ -1176,7 +993,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
* prep_destroy()->tcpdns_close_direct().
*/
isc_nmhandle_attach(handle, &csock->recv_handle);
process_sock_buffer(csock);
isc__nm_process_sock_buffer(csock);
/*
* The initial timer has been set, update the read timeout for the next
......@@ -1199,7 +1016,7 @@ failure:
atomic_store(&csock->active, false);
failed_accept_cb(csock, result);
isc__nm_failed_accept_cb(csock, result);
isc__nmsocket_prep_destroy(csock);
......@@ -1247,7 +1064,8 @@ tcpdns_send_cb(uv_write_t *req, int status) {
if (status < 0) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
failed_send_cb(sock, uvreq, isc__nm_uverr2result(status));
isc__nm_failed_send_cb(sock, uvreq,
isc__nm_uverr2result(status));
return;
}
......@@ -1279,7 +1097,7 @@ isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(worker);
if (inactive(sock)) {
if (isc__nm_inactive(sock)) {
result = ISC_R_CANCELED;
goto fail;
}
......@@ -1321,7 +1139,7 @@ isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) {
fail:
if (result != ISC_R_SUCCESS) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
failed_send_cb(sock, uvreq, result);
isc__nm_failed_send_cb(sock, uvreq, result);
}
}
......@@ -1452,7 +1270,7 @@ tcpdns_close_direct(isc_nmsocket_t *sock) {
}
isc__nmsocket_timer_stop(sock);
stop_reading(sock);
isc__nm_stop_reading(sock);
uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
}
......@@ -1514,7 +1332,7 @@ isc__nm_tcpdns_shutdown(isc_nmsocket_t *sock) {
}
if (sock->statichandle) {
failed_read_cb(sock, ISC_R_CANCELED);
isc__nm_failed_read_cb(sock, ISC_R_CANCELED);
return;
}
......@@ -1554,7 +1372,7 @@ isc__nm_async_tcpdnscancel(isc__networker_t *worker, isc__netievent_t *ev0) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid());
failed_read_cb(sock, ISC_R_EOF);
isc__nm_failed_read_cb(sock, ISC_R_EOF);
}
void
......@@ -1577,7 +1395,7 @@ isc_nm_tcpdns_sequential(isc_nmhandle_t *handle) {
*/
isc__nmsocket_timer_stop(sock);
stop_reading(sock);
isc__nm_stop_reading(sock);
atomic_store(&sock->sequential, true);
}
......@@ -1593,74 +1411,3 @@ isc_nm_tcpdns_keepalive(isc_nmhandle_t *handle, bool value) {
atomic_store(&sock->keepalive, value);
}
/*
* Process a DNS message.
*
* If we only have an incomplete DNS message, we don't touch any
* timers. If we do have a full message, reset the timer.
*
* Stop reading if this is a client socket, or if the server socket
* has been set to sequential mode, or the number of queries we are
* processing simultaneously has reached the clients-per-connection
* limit. In this event we'll be called again by resume_processing()
* later.
*/
static void
process_sock_buffer(isc_nmsocket_t *sock) {