Commit d35101e4 authored by Evan Hunt's avatar Evan Hunt
Browse files

Merge branch 'each-netmgr-fix-shutdown-crash' into 'main'

clean up outerhandle when a tcpdns socket is disconnected

See merge request !3726
parents bcbc7e2b 591b79b5
Pipeline #45311 passed with stages
in 3 minutes and 11 seconds
......@@ -98,14 +98,6 @@ struct isc_nmhandle {
isc_nmsocket_t *sock;
size_t ah_pos; /* Position in the socket's 'active handles' array */
/*
* The handle is 'inflight' if netmgr is not currently processing
* it in any way - it might mean that e.g. a recursive resolution
* is happening. For an inflight handle we must wait for the
* calling code to finish before we can free it.
*/
atomic_bool inflight;
isc_sockaddr_t peer;
isc_sockaddr_t local;
isc_nm_opaquecb_t doreset; /* reset extra callback, external */
......@@ -135,7 +127,9 @@ typedef enum isc__netievent_type {
netievent_tcpaccept,
netievent_tcpstop,
netievent_tcpclose,
netievent_tcpdnsclose,
netievent_tcpdnssend,
netievent_closecb,
netievent_shutdown,
......@@ -227,6 +221,7 @@ typedef struct isc__netievent__socket_req {
typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t;
typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t;
typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t;
typedef isc__netievent__socket_req_t isc__netievent_tcpdnssend_t;
typedef struct isc__netievent__socket_streaminfo_quota {
isc__netievent_type type;
......@@ -650,6 +645,12 @@ isc__nmsocket_active(isc_nmsocket_t *sock);
* or, for child sockets, 'sock->parent->active'.
*/
void
isc__nmsocket_clearcb(isc_nmsocket_t *sock);
/*%<
* Clear the recv and accept callbacks in 'sock'.
*/
void
isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0);
/*%<
......@@ -774,6 +775,9 @@ isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock);
void
isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0);
#define isc__nm_uverr2result(x) \
isc___nm_uverr2result(x, true, __FILE__, __LINE__)
isc_result_t
......
......@@ -621,6 +621,9 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
case netievent_tcpsend:
isc__nm_async_tcpsend(worker, ievent);
break;
case netievent_tcpdnssend:
isc__nm_async_tcpdnssend(worker, ievent);
break;
case netievent_tcpstop:
isc__nm_async_tcpstop(worker, ievent);
break;
......@@ -833,10 +836,13 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) {
if (active_handles == 0 || sock->tcphandle != NULL) {
destroy = true;
}
UNLOCK(&sock->lock);
if (destroy) {
atomic_store(&sock->destroying, true);
UNLOCK(&sock->lock);
nmsocket_cleanup(sock, true);
} else {
UNLOCK(&sock->lock);
}
}
......@@ -987,6 +993,16 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type,
sock->magic = NMSOCK_MAGIC;
}
void
isc__nmsocket_clearcb(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
sock->rcb.recv = NULL;
sock->rcbarg = NULL;
sock->accept_cb.accept = NULL;
sock->accept_cbarg = NULL;
}
void
isc__nm_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
......
......@@ -584,6 +584,7 @@ readtimeout_cb(uv_timer_t *handle) {
if (sock->rcb.recv != NULL) {
sock->rcb.recv(sock->tcphandle, ISC_R_TIMEDOUT, NULL,
sock->rcbarg);
isc__nmsocket_clearcb(sock);
}
}
......@@ -682,7 +683,9 @@ isc__nm_tcp_resumeread(isc_nmsocket_t *sock) {
isc__netievent_startread_t *ievent = NULL;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->rcb.recv != NULL);
if (sock->rcb.recv == NULL) {
return (ISC_R_CANCELED);
}
if (!atomic_load(&sock->readpaused)) {
return (ISC_R_SUCCESS);
......@@ -744,6 +747,7 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
if (sock->rcb.recv != NULL) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
sock->rcb.recv(sock->tcphandle, ISC_R_EOF, NULL, sock->rcbarg);
isc__nmsocket_clearcb(sock);
}
/*
......@@ -1080,6 +1084,7 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
{
sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL,
sock->rcbarg);
isc__nmsocket_clearcb(sock);
}
}
......@@ -1092,5 +1097,6 @@ isc__nm_tcp_cancelread(isc_nmsocket_t *sock) {
{
sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL,
sock->rcbarg);
isc__nmsocket_clearcb(sock);
}
}
......@@ -231,6 +231,11 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
if (dnssock->self != NULL) {
isc__nmsocket_detach(&dnssock->self);
}
isc__nmsocket_clearcb(dnssock);
if (dnssock->outerhandle != NULL) {
isc_nmhandle_unref(dnssock->outerhandle);
dnssock->outerhandle = NULL;
}
return;
}
......@@ -340,8 +345,7 @@ isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock) {
atomic_store(&sock->listening, false);
atomic_store(&sock->closed, true);
sock->rcb.recv = NULL;
sock->rcbarg = NULL;
isc__nmsocket_clearcb(sock);
if (sock->outer != NULL) {
isc__nm_tcp_stoplistening(sock->outer);
......@@ -385,15 +389,6 @@ isc_nm_tcpdns_keepalive(isc_nmhandle_t *handle) {
atomic_store(&handle->sock->outerhandle->sock->keepalive, true);
}
typedef struct tcpsend {
isc_mem_t *mctx;
isc_nmhandle_t *handle;
isc_region_t region;
isc_nmhandle_t *orighandle;
isc_nm_cb_t cb;
void *cbarg;
} tcpsend_t;
static void
resume_processing(void *arg) {
isc_nmsocket_t *sock = (isc_nmsocket_t *)arg;
......@@ -431,7 +426,11 @@ resume_processing(void *arg) {
}
isc_nmhandle_unref(handle);
} else if (sock->outerhandle != NULL) {
isc_nm_resumeread(sock->outerhandle->sock);
result = isc_nm_resumeread(sock->outerhandle->sock);
if (result != ISC_R_SUCCESS) {
isc_nmhandle_unref(sock->outerhandle);
sock->outerhandle = NULL;
}
}
return;
......@@ -466,15 +465,40 @@ resume_processing(void *arg) {
static void
tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
tcpsend_t *ts = (tcpsend_t *)cbarg;
isc__nm_uvreq_t *req = (isc__nm_uvreq_t *)cbarg;
ts->cb(ts->orighandle, result, ts->cbarg);
isc_mem_put(ts->mctx, ts->region.base, ts->region.length);
UNUSED(handle);
isc_nmhandle_unref(ts->orighandle);
isc_mem_putanddetach(&ts->mctx, ts, sizeof(*ts));
req->cb.send(req->handle, result, req->cbarg);
isc_mem_put(req->sock->mgr->mctx, req->uvbuf.base, req->uvbuf.len);
isc__nm_uvreq_put(&req, req->handle->sock);
}
void
isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) {
isc_result_t result;
isc__netievent_tcpdnssend_t *ievent =
(isc__netievent_tcpdnssend_t *)ev0;
isc__nm_uvreq_t *req = ievent->req;
isc_nmsocket_t *sock = ievent->sock;
isc_nmhandle_unref(handle);
REQUIRE(worker->id == sock->tid);
result = ISC_R_NOTCONNECTED;
if (atomic_load(&sock->active) && sock->outerhandle != NULL) {
isc_region_t r;
r.base = (unsigned char *)req->uvbuf.base;
r.length = req->uvbuf.len;
result = isc__nm_tcp_send(sock->outerhandle, &r, tcpdnssend_cb,
req);
}
if (result != ISC_R_SUCCESS) {
req->cb.send(req->handle, result, req->cbarg);
isc_mem_put(sock->mgr->mctx, req->uvbuf.base, req->uvbuf.len);
isc__nm_uvreq_put(&req, sock);
}
}
/*
......@@ -483,7 +507,7 @@ tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_result_t
isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
isc_nm_cb_t cb, void *cbarg) {
tcpsend_t *t = NULL;
isc__nm_uvreq_t *uvreq = NULL;
REQUIRE(VALID_NMHANDLE(handle));
......@@ -492,31 +516,39 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpdnssocket);
if (sock->outerhandle == NULL) {
/* The socket is closed */
return (ISC_R_NOTCONNECTED);
}
uvreq = isc__nm_uvreq_get(sock->mgr, sock);
uvreq->handle = handle;
isc_nmhandle_ref(uvreq->handle);
uvreq->cb.send = cb;
uvreq->cbarg = cbarg;
t = isc_mem_get(sock->mgr->mctx, sizeof(*t));
*t = (tcpsend_t){
.cb = cb,
.cbarg = cbarg,
.handle = handle->sock->outerhandle,
};
uvreq->uvbuf.base = isc_mem_get(sock->mgr->mctx, region->length + 2);
uvreq->uvbuf.len = region->length + 2;
*(uint16_t *)uvreq->uvbuf.base = htons(region->length);
memmove(uvreq->uvbuf.base + 2, region->base, region->length);
isc_mem_attach(sock->mgr->mctx, &t->mctx);
t->orighandle = handle;
isc_nmhandle_ref(t->orighandle);
isc_nmhandle_ref(t->handle);
if (sock->tid == isc_nm_tid()) {
isc_region_t r;
t->region = (isc_region_t){ .base = isc_mem_get(t->mctx,
region->length + 2),
.length = region->length + 2 };
r.base = (unsigned char *)uvreq->uvbuf.base;
r.length = uvreq->uvbuf.len;
*(uint16_t *)t->region.base = htons(region->length);
memmove(t->region.base + 2, region->base, region->length);
return (isc__nm_tcp_send(sock->outerhandle, &r, tcpdnssend_cb,
uvreq));
} else {
isc__netievent_tcpdnssend_t *ievent = NULL;
return (isc_nm_send(t->handle, &t->region, tcpdnssend_cb, t));
ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpdnssend);
ievent->req = uvreq;
ievent->sock = sock;
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
return (ISC_R_SUCCESS);
}
return (ISC_R_UNEXPECTED);
}
static void
......@@ -524,7 +556,9 @@ tcpdns_close_direct(isc_nmsocket_t *sock) {
REQUIRE(sock->tid == isc_nm_tid());
/* We don't need atomics here, it's all in single network thread */
if (sock->timer_initialized) {
if (sock->self != NULL) {
isc__nmsocket_detach(&sock->self);
} else if (sock->timer_initialized) {
/*
* We need to fire the timer callback to clean it up,
* it will then call us again (via detach) so that we
......@@ -533,15 +567,13 @@ tcpdns_close_direct(isc_nmsocket_t *sock) {
sock->timer_initialized = false;
uv_timer_stop(&sock->timer);
uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
} else if (sock->self != NULL) {
isc__nmsocket_detach(&sock->self);
} else {
/*
* At this point we're certain that there are no external
* references, we can close everything.
*/
if (sock->outerhandle != NULL) {
sock->outerhandle->sock->rcb.recv = NULL;
isc__nmsocket_clearcb(sock->outerhandle->sock);
isc_nmhandle_unref(sock->outerhandle);
sock->outerhandle = NULL;
}
......
......@@ -210,19 +210,6 @@ static void
stoplistening(isc_nmsocket_t *sock) {
REQUIRE(sock->type == isc_nm_udplistener);
/*
* Socket is already closing; there's nothing to do.
*/
if (!isc__nmsocket_active(sock)) {
return;
}
/*
* Mark it inactive now so that all sends will be ignored
* and we won't try to stop listening again.
*/
atomic_store(&sock->active, false);
for (int i = 0; i < sock->nchildren; i++) {
isc__netievent_udpstop_t *event = NULL;
......@@ -256,6 +243,18 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_udplistener);
/*
* Socket is already closing; there's nothing to do.
*/
if (!isc__nmsocket_active(sock)) {
return;
}
/*
* Mark it inactive now so that all sends will be ignored
* and we won't try to stop listening again.
*/
atomic_store(&sock->active, false);
/*
* If the manager is interlocked, re-enqueue this as an asynchronous
* event. Otherwise, go ahead and stop listening right away.
......@@ -336,10 +335,17 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
#endif
/*
* If addr == NULL that's the end of stream - we can
* free the buffer and bail.
* Three reasons to return now without processing:
* - If addr == NULL that's the end of stream - we can
* free the buffer and bail.
* - If we're simulating a firewall blocking UDP packets
* bigger than 'maxudp' bytes for testing purposes.
* - If the socket is no longer active.
*/
if (addr == NULL) {
maxudp = atomic_load(&sock->mgr->maxudp);
if ((addr == NULL) || (maxudp != 0 && (uint32_t)nrecv > maxudp) ||
(!isc__nmsocket_active(sock)))
{
if (free_buf) {
isc__nm_free_uvbuf(sock, buf);
}
......@@ -347,16 +353,6 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
return;
}
/*
* Simulate a firewall blocking UDP packets bigger than
* 'maxudp' bytes.
*/
maxudp = atomic_load(&sock->mgr->maxudp);
if (maxudp != 0 && (uint32_t)nrecv > maxudp) {
isc__nmsocket_detach(&sock);
return;
}
result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
......@@ -398,7 +394,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
uint32_t maxudp = atomic_load(&sock->mgr->maxudp);
/*
* Simulate a firewall blocking UDP packets bigger than
* We're simulating a firewall blocking UDP packets bigger than
* 'maxudp' bytes, for testing purposes.
*
* The client would ordinarily have unreferenced the handle
......@@ -522,6 +518,9 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
REQUIRE(sock->tid == isc_nm_tid());
REQUIRE(sock->type == isc_nm_udpsocket);
if (!isc__nmsocket_active(sock)) {
return (ISC_R_CANCELED);
}
isc_nmhandle_ref(req->handle);
rv = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp,
&req->uvbuf, 1, &peer->type.sa, udp_send_cb);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment