Commit 2515825a authored by Evan Hunt's avatar Evan Hunt
Browse files

Merge branch '1312-netmgr-tcp-fixes' into 'master'

Resolve "netmgr hangs on shutdown when TCP connections are still active"

Closes #1312

See merge request !2617
parents 751ad12d 8bdb5f58
Pipeline #26177 passed with stages
in 1 minute and 36 seconds
5325. [bug] Addressed several issues with TCP connections in
the netmgr: restored support for TCP connection
timeouts, restored TCP backlog support, actively
close all open sockets during shutdown. [GL #1312]
5324. [bug] Change the category of some log messages from general
to the more appopriate catergory of xfer-in. [GL #1394]
 
......
......@@ -939,11 +939,23 @@ create_managers(void) {
static void
destroy_managers(void) {
/*
* isc_taskmgr_destroy() will block until all tasks have exited,
* isc_nm_closedown() closes all active connections, freeing
* attached clients and other resources and preventing new
* connections from being established, but it not does not
* stop all processing or destroy the netmgr yet.
*/
isc_nm_closedown(named_g_nm);
/*
* isc_taskmgr_destroy() will block until all tasks have exited.
*/
isc_taskmgr_destroy(&named_g_taskmgr);
isc_timermgr_destroy(&named_g_timermgr);
isc_socketmgr_destroy(&named_g_socketmgr);
/*
* At this point is safe to destroy the netmgr.
*/
isc_nm_destroy(&named_g_nm);
}
......
......@@ -8470,8 +8470,8 @@ load_configuration(const char *filename, named_server_t *server,
advertised = MAX_TCP_TIMEOUT;
}
ns_server_settimeouts(named_g_server->sctx,
initial, idle, keepalive, advertised);
isc_nm_tcp_settimeouts(named_g_nm, initial, idle,
keepalive, advertised);
/*
* Configure sets of UDP query source ports.
......@@ -15405,8 +15405,8 @@ named_server_tcptimeouts(isc_lex_t *lex, isc_buffer_t **text) {
if (ptr == NULL)
return (ISC_R_UNEXPECTEDEND);
ns_server_gettimeouts(named_g_server->sctx,
&initial, &idle, &keepalive, &advertised);
isc_nm_tcp_gettimeouts(named_g_nm, &initial, &idle,
&keepalive, &advertised);
/* Look for optional arguments. */
ptr = next_token(lex, NULL);
......@@ -15445,7 +15445,7 @@ named_server_tcptimeouts(isc_lex_t *lex, isc_buffer_t **text) {
result = isc_task_beginexclusive(named_g_server->task);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
ns_server_settimeouts(named_g_server->sctx, initial, idle,
isc_nm_tcp_settimeouts(named_g_nm, initial, idle,
keepalive, advertised);
isc_task_endexclusive(named_g_server->task);
......
......@@ -22,6 +22,7 @@ options {
recursion yes;
dnssec-validation yes;
querylog yes;
prefetch 3 9;
};
server 10.53.0.7 {
......
......@@ -452,7 +452,7 @@ n=`expr $n + 1`
echo_i "check prefetch (${n})"
ret=0
$DIG $DIGOPTS @10.53.0.5 fetch.tld txt > dig.out.1.${n} || ret=1
ttl1=`awk '/"A" "short" "ttl"/ { print $2 - 2 }' dig.out.1.${n}`
ttl1=`awk '/"A" "short" "ttl"/ { print $2 - 3 }' dig.out.1.${n}`
# sleep so we are in prefetch range
sleep ${ttl1:-0}
# trigger prefetch
......@@ -470,7 +470,7 @@ n=`expr $n + 1`
echo_i "check prefetch of validated DS's RRSIG TTL is updated (${n})"
ret=0
$DIG $DIGOPTS +dnssec @10.53.0.5 ds.example.net ds > dig.out.1.${n} || ret=1
dsttl1=`awk '$4 == "DS" && $7 == "2" { print $2 - 2 }' dig.out.1.${n}`
dsttl1=`awk '$4 == "DS" && $7 == "2" { print $2 - 3 }' dig.out.1.${n}`
# sleep so we are in prefetch range
sleep ${dsttl1:-0}
# trigger prefetch
......@@ -517,7 +517,7 @@ n=`expr $n + 1`
echo_i "check prefetch qtype * (${n})"
ret=0
$DIG $DIGOPTS @10.53.0.5 fetchall.tld any > dig.out.1.${n} || ret=1
ttl1=`awk '/"A" "short" "ttl"/ { print $2 - 2 }' dig.out.1.${n}`
ttl1=`awk '/"A" "short" "ttl"/ { print $2 - 3 }' dig.out.1.${n}`
# sleep so we are in prefetch range
sleep ${ttl1:-0}
# trigger prefetch
......
......@@ -109,7 +109,7 @@ foreach my $name(@ans) {
stop_signal($name, "TERM", 1);
}
@ans = wait_for_servers(60, @ans);
@ans = wait_for_servers(1200, @ans);
# Pass 3: SIGABRT
foreach my $name (@ns) {
......
......@@ -115,7 +115,7 @@ n=$((n + 1))
echo_i "TCP high-water: check initial statistics ($n)"
ret=0
refresh_tcp_stats
assert_int_equal "${TCP_CUR}" 1 "current TCP clients count" || ret=1
assert_int_equal "${TCP_CUR}" 0 "current TCP clients count" || ret=1
if [ $ret != 0 ]; then echo_i "failed"; fi
status=$((status + ret))
......@@ -166,12 +166,8 @@ check_stats_limit() {
assert_int_equal "${TCP_HIGH}" "${TCP_LIMIT}" "TCP high-water value" || return 1
}
retry 2 check_stats_limit || ret=1
close_connections $((TCP_LIMIT + 1)) || :
if [ $ret != 0 ]; then echo_i "failed"; fi
status=$((status + ret))
# wait for connections to close
sleep 5
echo_i "exit status: $status"
[ $status -eq 0 ] || exit 1
......@@ -177,7 +177,8 @@ LIBISC_EXTERNAL_DATA extern isc_logmodule_t isc_modules[];
#define ISC_LOGMODULE_INTERFACE (&isc_modules[2])
#define ISC_LOGMODULE_TIMER (&isc_modules[3])
#define ISC_LOGMODULE_FILE (&isc_modules[4])
#define ISC_LOGMODULE_OTHER (&isc_modules[5])
#define ISC_LOGMODULE_NETMGR (&isc_modules[5])
#define ISC_LOGMODULE_OTHER (&isc_modules[6])
ISC_LANG_BEGINDECLS
......
......@@ -46,7 +46,16 @@ isc_nm_destroy(isc_nm_t **mgr0);
* for all other references to be gone.
*/
/* Return thread id of current thread, or ISC_NETMGR_TID_UNKNOWN */
void
isc_nm_closedown(isc_nm_t *mgr);
/*%<
* Close down all active connections, freeing associated resources;
* prevent new connections from being established. This can optionally
* be called prior to shutting down the netmgr, to stop all processing
* before shutting down the task manager.
*/
/* Return thread ID of current thread, or ISC_NETMGR_TID_UNKNOWN */
int
isc_nm_tid(void);
......@@ -113,8 +122,20 @@ isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg,
isc_sockaddr_t
isc_nmhandle_peeraddr(isc_nmhandle_t *handle);
/*%<
* Return the peer address for the given handle.
*/
isc_sockaddr_t
isc_nmhandle_localaddr(isc_nmhandle_t *handle);
/*%<
* Return the local address for the given handle.
*/
isc_nm_t *
isc_nmhandle_netmgr(isc_nmhandle_t *handle);
/*%<
* Return a pointer to the netmgr object for the given handle.
*/
typedef void (*isc_nm_recv_cb_t)(isc_nmhandle_t *handle, isc_region_t *region,
void *cbarg);
......@@ -211,8 +232,9 @@ isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region,
isc_result_t
isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nm_cb_t cb, void *cbarg,
size_t extrahandlesize, isc_quota_t *quota,
isc_nmsocket_t **rv);
size_t extrahandlesize, int backlog,
isc_quota_t *quota,
isc_nmsocket_t **sockp);
/*%<
* Start listening for raw messages over the TCP interface 'iface', using
* net manager 'mgr'.
......@@ -230,8 +252,8 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
* quota. This allows us to enforce TCP client quota limits.
*
* NOTE: This is currently only called inside isc_nm_listentcpdns(), which
* creates a 'wrapper' socket that sends and receives DNS messages -
* prepended with a two-byte length field - and handles buffering.
* creates a 'wrapper' socket that sends and receives DNS messages
* prepended with a two-byte length field, and handles buffering.
*/
void
......@@ -243,15 +265,18 @@ isc_nm_tcp_stoplistening(isc_nmsocket_t *sock);
isc_result_t
isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nm_recv_cb_t cb, void *arg,
size_t extrahandlesize, isc_quota_t *quota,
size_t extrahandlesize, int backlog,
isc_quota_t *quota,
isc_nmsocket_t **sockp);
/*%<
* Start listening for DNS messages over the TCP interface 'iface', using
* net manager 'mgr'.
*
* On success, 'sockp' will be updated to contain a new listening TCPDNS
* socket. This is a wrapper around a TCP socket, and handles DNS length
* processing.
* socket. This is a wrapper around a raw TCP socket, which sends and
* receives DNS messages via that socket. It handles message buffering
* and pipelining, and automatically prepends messages with a two-byte
* length field.
*
* When a complete DNS message is received on the socket, 'cb' will be
* called with 'cbarg' as its argument.
......@@ -259,6 +284,8 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface,
* When handles are allocated for the socket, 'extrasize' additional bytes
* will be allocated along with the handle for an associated object
* (typically ns_client).
*
* 'quota' is passed to isc_nm_listentcp() when opening the raw TCP socket.
*/
void
......@@ -270,25 +297,60 @@ isc_nm_tcpdns_stoplistening(isc_nmsocket_t *sock);
void
isc_nm_tcpdns_sequential(isc_nmhandle_t *handle);
/*%<
* Disable pipelining on this connection. Each DNS packet
* will be only processed after the previous completes.
* Disable pipelining on this connection. Each DNS packet will be only
* processed after the previous completes.
*
* The socket must be unpaused after the query is processed.
* This is done the response is sent, or if we're dropping the
* query, it will be done when a handle is fully dereferenced
* by calling the socket's closehandle_cb callback.
* The socket must be unpaused after the query is processed. This is done
* the response is sent, or if we're dropping the query, it will be done
* when a handle is fully dereferenced by calling the socket's
* closehandle_cb callback.
*
* Note: This can only be run while a message is being processed;
* if it is run before any messages are read, no messages will
* be read.
* Note: This can only be run while a message is being processed; if it is
* run before any messages are read, no messages will be read.
*
* Also note: once this has been set, it cannot be reversed for a
* given connection.
* Also note: once this has been set, it cannot be reversed for a given
* connection.
*/
void
isc_nm_tcpdns_keepalive(isc_nmhandle_t *handle);
/*%<
* Enable keepalive on this connection.
*
* When keepalive is active, we switch to using the keepalive timeout
* to determine when to close a connection, rather than the idle timeout.
*/
void
isc_nm_tcp_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle,
uint32_t keepalive, uint32_t advertised);
/*%<
* Sets the initial, idle, and keepalive timeout values to use for
* TCP connections, and the timeout value to advertise in responses using
* the EDNS TCP Keepalive option (which should ordinarily be the same
* as 'keepalive'), in tenths of seconds.
*
* Requires:
* \li 'mgr' is a valid netmgr.
*/
void
isc_nm_tcp_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,
uint32_t *keepalive, uint32_t *advertised);
/*%<
* Gets the initial, idle, keepalive, or advertised timeout values,
* in tenths of seconds.
*
* Any integer pointer parameter not set to NULL will be updated to
* contain the corresponding timeout value.
*
* Requires:
* \li 'mgr' is a valid netmgr.
*/
void
isc_nm_maxudp(isc_nm_t *mgr, uint32_t maxudp);
/*%<
* Simulate a broken firewall that blocks UDP messages larger
* than a given size.
* Simulate a broken firewall that blocks UDP messages larger than a given
* size.
*/
......@@ -192,6 +192,7 @@ LIBISC_EXTERNAL_DATA isc_logmodule_t isc_modules[] = {
{ "interface", 0 },
{ "timer", 0 },
{ "file", 0 },
{ "netmgr", 0 },
{ "other", 0 },
{ NULL, 0 }
};
......
......@@ -41,7 +41,6 @@ typedef struct isc__networker {
uv_async_t async; /* async channel to send
* data to this networker */
isc_mutex_t lock;
isc_mempool_t *mpool_bufs;
isc_condition_t cond;
bool paused;
bool finished;
......@@ -116,12 +115,10 @@ typedef enum isc__netievent_type {
netievent_tcplisten,
netievent_tcpstoplisten,
netievent_tcpclose,
netievent_closecb,
netievent_shutdown,
} isc__netievent_type;
typedef struct isc__netievent_stop {
isc__netievent_type type;
} isc__netievent_stop_t;
/*
* We have to split it because we can read and write on a socket
* simultaneously.
......@@ -185,7 +182,7 @@ typedef isc__netievent__socket_t isc__netievent_tcpstoplisten_t;
typedef isc__netievent__socket_t isc__netievent_tcpclose_t;
typedef isc__netievent__socket_t isc__netievent_startread_t;
typedef isc__netievent__socket_t isc__netievent_pauseread_t;
typedef isc__netievent__socket_t isc__netievent_resumeread_t;
typedef isc__netievent__socket_t isc__netievent_closecb_t;
typedef struct isc__netievent__socket_req {
isc__netievent_type type;
......@@ -208,10 +205,13 @@ typedef struct isc__netievent {
isc__netievent_type type;
} isc__netievent_t;
typedef isc__netievent_t isc__netievent_shutdown_t;
typedef isc__netievent_t isc__netievent_stop_t;
typedef union {
isc__netievent_t ni;
isc__netievent_stop_t nis;
isc__netievent_udplisten_t niul;
isc__netievent__socket_t nis;
isc__netievent__socket_req_t nisr;
isc__netievent_udpsend_t nius;
} isc__netievent_storage_t;
......@@ -229,11 +229,24 @@ struct isc_nm {
isc_mutex_t lock;
isc_condition_t wkstatecond;
isc__networker_t *workers;
isc_mempool_t *reqpool;
isc_mutex_t reqlock;
isc_mempool_t *evpool;
isc_mutex_t evlock;
atomic_uint_fast32_t workers_running;
atomic_uint_fast32_t workers_paused;
atomic_uint_fast32_t maxudp;
atomic_bool paused;
/*
* Acive connections are being closed and new connections are
* no longer allowed.
*/
atomic_bool closing;
/*
* A worker is actively waiting for other workers, for example to
* stop listening; that means no other thread can do the same thing
......@@ -241,6 +254,18 @@ struct isc_nm {
* event or wait for the other one to finish if we want to pause.
*/
atomic_bool interlocked;
/*
* Timeout values for TCP connections, coresponding to
* tcp-intiial-timeout, tcp-idle-timeout, tcp-keepalive-timeout,
* and tcp-advertised-timeout. Note that these are stored in
* milliseconds so they can be used directly with the libuv timer,
* but they are configured in tenths of seconds.
*/
uint32_t init;
uint32_t idle;
uint32_t keepalive;
uint32_t advertised;
};
typedef enum isc_nmsocket_type {
......@@ -266,9 +291,24 @@ struct isc_nmsocket {
isc_nmsocket_type type;
isc_nm_t *mgr;
isc_nmsocket_t *parent;
/*
* quota is the TCP client, attached when a TCP connection
* is established. pquota is a non-attached pointer to the
* TCP client quota, stored in listening sockets but only
* attached in connected sockets.
*/
isc_quota_t *quota;
isc_quota_t *pquota;
bool overquota;
/*
* TCP read timeout timer.
*/
uv_timer_t timer;
bool timer_initialized;
uint64_t read_timeout;
/*% outer socket is for 'wrapped' sockets - e.g. tcpdns in tcp */
isc_nmsocket_t *outer;
......@@ -284,6 +324,9 @@ struct isc_nmsocket {
/*% extra data allocated at the end of each isc_nmhandle_t */
size_t extrahandlesize;
/*% TCP backlog */
int backlog;
/*% libuv data */
uv_os_sock_t fd;
union uv_any_handle uv_handle;
......@@ -334,6 +377,12 @@ struct isc_nmsocket {
*/
atomic_bool readpaused;
/*%
* A TCP or TCPDNS socket has been set to use the keepalive
* timeout instead of the default idle timeout.
*/
atomic_bool keepalive;
/*%
* 'spare' handles for that can be reused to avoid allocations,
* for UDP.
......@@ -366,7 +415,7 @@ struct isc_nmsocket {
* might want to change it to something lockless in the
* future.
*/
size_t ah;
atomic_int_fast32_t ah;
size_t ah_size;
size_t *ah_frees;
isc_nmhandle_t **ah_handles;
......@@ -398,6 +447,8 @@ isc__nm_get_ievent(isc_nm_t *mgr, isc__netievent_type type);
/*%<
* Allocate an ievent and set the type.
*/
void
isc__nm_put_ievent(isc_nm_t *mgr, void *ievent);
void
isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event);
......@@ -471,6 +522,19 @@ isc__nmsocket_prep_destroy(isc_nmsocket_t *sock);
* if there are no remaining references or active handles.
*/
void
isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0);
/*%<
* Issue a 'handle closed' callback on the socket.
*/
void
isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ievent0);
/*%<
* Walk through all uv handles, get the underlying sockets and issue
* close on them.
*/
isc_result_t
isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region,
isc_nm_cb_t cb, void *cbarg);
......@@ -503,6 +567,12 @@ isc__nm_tcp_close(isc_nmsocket_t *sock);
* Close a TCP socket.
*/
void
isc__nm_tcp_shutdown(isc_nmsocket_t *sock);
/*%<
* Called on shutdown to close and clean up a listening TCP socket.
*/
void
isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0);
void
......@@ -517,15 +587,12 @@ isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0);
void
isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ievent0);
void
isc__nm_async_resumeread(isc__networker_t *worker, isc__netievent_t *ievent0);
void
isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ievent0);
/*%<
* Callback handlers for asynchronous TCP events (connect, listen,
* stoplisten, send, read, pauseread, resumeread, close).
* stoplisten, send, read, pause, close).
*/
isc_result_t
isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
isc_nm_cb_t cb, void *cbarg);
......
......@@ -94,6 +94,32 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) {
atomic_init(&mgr->paused, false);
atomic_init(&mgr->interlocked, false);
/*
* Default TCP timeout values.
* May be updated by isc_nm_tcptimeouts().
*/
mgr->init = 30000;
mgr->idle = 30000;
mgr->keepalive = 30000;
mgr->advertised = 30000;
isc_mutex_init(&mgr->reqlock);
isc_mempool_create(mgr->mctx, sizeof(isc__nm_uvreq_t), &mgr->reqpool);
isc_mempool_setname(mgr->reqpool, "nm_reqpool");
isc_mempool_setmaxalloc(mgr->reqpool, 32768);
isc_mempool_setfreemax(mgr->reqpool, 32768);
isc_mempool_associatelock(mgr->reqpool, &mgr->reqlock);
isc_mempool_setfillcount(mgr->reqpool, 32);
isc_mutex_init(&mgr->evlock);
isc_mempool_create(mgr->mctx, sizeof(isc__netievent_storage_t),
&mgr->evpool);
isc_mempool_setname(mgr->evpool, "nm_evpool");
isc_mempool_setmaxalloc(mgr->evpool, 32768);
isc_mempool_setfreemax(mgr->evpool, 32768);
isc_mempool_associatelock(mgr->evpool, &mgr->evlock);
isc_mempool_setfillcount(mgr->evpool, 32);
mgr->workers = isc_mem_get(mctx, workers * sizeof(isc__networker_t));
for (size_t i = 0; i < workers; i++) {
int r;
......@@ -114,7 +140,6 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) {
isc_mutex_init(&worker->lock);
isc_condition_init(&worker->cond);
isc_mempool_create(mgr->mctx, 65536, &worker->mpool_bufs);
worker->ievents = isc_queue_new(mgr->mctx, 128);
/*
......@@ -168,17 +193,22 @@ nm_destroy(isc_nm_t **mgr0) {
while ((ievent = (isc__netievent_t *)
isc_queue_dequeue(mgr->workers[i].ievents)) != NULL)
{
isc_mem_put(mgr->mctx, ievent,
sizeof(isc__netievent_storage_t));
isc_mempool_put(mgr->evpool, ievent);
}
int r = uv_loop_close(&mgr->workers[i].loop);
INSIST(r == 0);
isc_queue_destroy(mgr->workers[i].ievents);
isc_mempool_destroy(&mgr->workers[i].mpool_bufs);
}
isc_condition_destroy(&mgr->wkstatecond);
isc_mutex_destroy(&mgr->lock);
isc_mempool_destroy(&mgr->evpool);
isc_mutex_destroy(&mgr->evlock);
isc_mempool_destroy(&mgr->reqpool);
isc_mutex_destroy(&mgr->reqlock);
isc_mem_put(mgr->mctx, mgr->workers,
mgr->nworkers * sizeof(isc__networker_t));
isc_mem_putanddetach(&mgr->mctx, mgr, sizeof(*mgr));
......@@ -267,20 +297,34 @@ isc_nm_detach(isc_nm_t **mgr0) {
}
}
void
isc_nm_closedown(isc_nm_t *mgr) {
REQUIRE(VALID_NM(mgr));
atomic_store(&mgr->closing, true);
for (size_t i = 0; i < mgr->nworkers; i++) {
isc__netievent_t *event = NULL;
event = isc__nm_get_ievent(mgr, netievent_shutdown);
isc__nm_enqueue_ievent(&mgr->workers[i], event);
}
}
void
isc_nm_destroy(isc_nm_t **mgr0) {
isc_nm_t *mgr = NULL;
int references;
REQUIRE(mgr0 != NULL);
REQUIRE(VALID_NM(*mgr0));
mgr = *mgr0;
*mgr0 = NULL;
/*
* Wait for the manager to be dereferenced elsehwere.
* Close active connections.
*/
isc_nm_closedown(mgr);
/*
* Wait for the manager to be dereferenced elsewhere.
*/
while (isc_refcount_current(&mgr->references) > 1) {
#ifdef WIN32
......@@ -289,11 +333,11 @@ isc_nm_destroy(isc_nm_t **mgr0) {
usleep(1000000);
#endif
}
references = isc_refcount_decrement(&mgr->references);
INSIST(references > 0);