Commit bc5aae15 authored by Witold Krecicki's avatar Witold Krecicki Committed by Evan Hunt

netmgr: make tcp listening multithreaded.

When listening for TCP connections we create a socket, bind it
and then pass it over IPC to all threads - which then listen on
in and accept connections. This sounds broken, but it's the
official way of dealing with multithreaded TCP listeners in libuv,
and works on all platforms supported by libuv.
parent 09c2dbff
......@@ -114,7 +114,9 @@ typedef enum isc__netievent_type {
netievent_tcpstartread,
netievent_tcppauseread,
netievent_tcplisten,
netievent_tcpchildlisten,
netievent_tcpstoplisten,
netievent_tcpstopchildlisten,
netievent_tcpclose,
netievent_closecb,
netievent_shutdown,
......@@ -159,6 +161,7 @@ typedef struct isc__nm_uvreq {
isc_sockaddr_t peer; /* peer address */
isc__nm_cb_t cb; /* callback */
void * cbarg; /* callback argument */
uv_pipe_t pipe;
union {
uv_req_t req;
uv_getaddrinfo_t getaddrinfo;
......@@ -180,6 +183,7 @@ typedef struct isc__netievent__socket {
typedef isc__netievent__socket_t isc__netievent_udplisten_t;
typedef isc__netievent__socket_t isc__netievent_udpstoplisten_t;
typedef isc__netievent__socket_t isc__netievent_tcpstoplisten_t;
typedef isc__netievent__socket_t isc__netievent_tcpstopchildlisten_t;
typedef isc__netievent__socket_t isc__netievent_tcpclose_t;
typedef isc__netievent__socket_t isc__netievent_startread_t;
typedef isc__netievent__socket_t isc__netievent_pauseread_t;
......@@ -193,6 +197,7 @@ typedef struct isc__netievent__socket_req {
typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t;
typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t;
typedef isc__netievent__socket_req_t isc__netievent_tcpchildlisten_t;
typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t;
typedef struct isc__netievent_udpsend {
......@@ -274,6 +279,7 @@ typedef enum isc_nmsocket_type {
isc_nm_udplistener, /* Aggregate of nm_udpsocks */
isc_nm_tcpsocket,
isc_nm_tcplistener,
isc_nm_tcpchildlistener,
isc_nm_tcpdnslistener,
isc_nm_tcpdnssocket
} isc_nmsocket_type;
......@@ -322,6 +328,11 @@ struct isc_nmsocket {
isc_nmiface_t *iface;
isc_nmhandle_t *tcphandle;
/* used to send listening TCP sockets to children */
uv_pipe_t ipc;
char ipc_pipe_name[32];
atomic_int_fast32_t schildren;
/*% extra data allocated at the end of each isc_nmhandle_t */
size_t extrahandlesize;
......@@ -579,9 +590,14 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0);
void
isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0);
void
isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0);
void
isc__nm_async_tcpstoplisten(isc__networker_t *worker,
isc__netievent_t *ievent0);
void
isc__nm_async_tcpstopchildlisten(isc__networker_t *worker,
isc__netievent_t *ievent0);
void
isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ievent0);
void
isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0);
......
......@@ -42,6 +42,12 @@
ISC_THREAD_LOCAL int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN;
#ifdef WIN32
#define NAMED_PIPE_PREFIX "\\\\.\\pipe\\named-ipc"
#else
#define NAMED_PIPE_PREFIX ".named-ipc"
#endif
static void
nmsocket_maybe_destroy(isc_nmsocket_t *sock);
static void
......@@ -497,6 +503,9 @@ async_cb(uv_async_t *handle) {
case netievent_tcplisten:
isc__nm_async_tcplisten(worker, ievent);
break;
case netievent_tcpchildlisten:
isc__nm_async_tcpchildlisten(worker, ievent);
break;
case netievent_tcpstartread:
isc__nm_async_startread(worker, ievent);
break;
......@@ -509,6 +518,9 @@ async_cb(uv_async_t *handle) {
case netievent_tcpstoplisten:
isc__nm_async_tcpstoplisten(worker, ievent);
break;
case netievent_tcpstopchildlisten:
isc__nm_async_tcpstopchildlisten(worker, ievent);
break;
case netievent_tcpclose:
isc__nm_async_tcpclose(worker, ievent);
break;
......@@ -790,6 +802,16 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr,
sock->ah_handles[i] = NULL;
}
/*
* XXXWPK Maybe it should be in tmp, maybe it should not
* be random?
*/
strcpy(sock->ipc_pipe_name, NAMED_PIPE_PREFIX);
for (int i=strlen(sock->ipc_pipe_name); i<31; i++) {
sock->ipc_pipe_name[i] = isc_random8()%24 + 'a';
}
sock->ipc_pipe_name[31] = '\0';
isc_mutex_init(&sock->lock);
isc_condition_init(&sock->cond);
isc_refcount_init(&sock->references, 1);
......
......@@ -50,6 +50,21 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
static void
tcp_close_cb(uv_handle_t *uvhandle);
static void
ipc_connection_cb(uv_stream_t *stream, int status);
static void
ipc_write_cb(uv_write_t* uvreq, int status);
static void
parent_pipe_close_cb(uv_handle_t *handle);
static void
childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status);
static void
childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
static void
stoplistening(isc_nmsocket_t *sock);
static void
tcp_listenclose_cb(uv_handle_t *handle);
static int
tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker;
......@@ -71,7 +86,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
return (r);
}
}
sock->uv_handle.tcp.data = sock;
r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp,
&req->peer.type.sa, tcp_connect_cb);
return (r);
......@@ -134,7 +149,6 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_quota_t *quota,
isc_nmsocket_t **sockp)
{
isc__netievent_tcplisten_t *ievent = NULL;
isc_nmsocket_t *nsock = NULL;
REQUIRE(VALID_NM(mgr));
......@@ -142,6 +156,11 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener);
nsock->iface = iface;
nsock->nchildren = mgr->nworkers;
atomic_init(&nsock->rchildren, mgr->nworkers);
nsock->children = isc_mem_get(mgr->mctx,
mgr->nworkers * sizeof(*nsock));
memset(nsock->children, 0, mgr->nworkers * sizeof(*nsock));
nsock->rcb.accept = cb;
nsock->rcbarg = cbarg;
nsock->extrahandlesize = extrahandlesize;
......@@ -156,18 +175,27 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
nsock->tid = isc_random_uniform(mgr->nworkers);
/*
* Listening to TCP is rare enough not to care about the
* added overhead from passing this to another thread.
*/
ievent = isc__nm_get_ievent(mgr, netievent_tcplisten);
* Listening to TCP is rare enough not to care about the
* added overhead from passing this to another thread.
*/
isc__netievent_tcplisten_t *ievent = isc__nm_get_ievent(mgr, netievent_tcplisten);
ievent->sock = nsock;
isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
(isc__netievent_t *) ievent);
*sockp = nsock;
return (ISC_R_SUCCESS);
}
/*
* For TCP listening we create a single socket, bind it, and then pass it
* to `ncpu` child sockets - the passing is done over IPC.
* XXXWPK This design pattern is ugly but it's "the way to do it" recommended
* by libuv documentation - which also mentions that there should be
* uv_export/uv_import functions which would simplify this greatly.
*/
void
isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
isc__netievent_tcplisten_t *ievent =
......@@ -184,10 +212,56 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
}
uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0);
r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog,
tcp_connection_cb);
if (r != 0) {
return;
sock->uv_handle.tcp.data = sock;
/*
* This is not properly documented in libuv, and the example
* (benchmark-multi-accept) is wrong:
* 'ipc' parameter must be '0' for 'listening' IPC socket, '1'
* only for the sockets are really passing the FDs between
* threads. This works without any problems on Unices, but
* breaks horribly on Windows.
*/
r = uv_pipe_init(&worker->loop, &sock->ipc, 0);
INSIST(r == 0);
sock->ipc.data = sock;
r = uv_pipe_bind(&sock->ipc, sock->ipc_pipe_name);
INSIST(r == 0);
r = uv_listen((uv_stream_t *) &sock->ipc, sock->nchildren,
ipc_connection_cb);
INSIST(r == 0);
/*
* We launch n 'tcpchildlistener' that will receive
* sockets to be listened on over ipc.
*/
for (int i = 0; i < sock->nchildren; i++) {
isc__netievent_tcpchildlisten_t *event = NULL;
isc_nmsocket_t *csock = &sock->children[i];
isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener);
csock->parent = sock;
csock->iface = sock->iface;
csock->tid = i;
csock->pquota = sock->pquota;
csock->backlog = sock->backlog;
csock->extrahandlesize = sock->extrahandlesize;
INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL);
csock->rcb.accept = sock->rcb.accept;
csock->rcbarg = sock->rcbarg;
csock->fd = -1;
event = isc__nm_get_ievent(csock->mgr,
netievent_tcpchildlisten);
event->sock = csock;
if (csock->tid == isc_nm_tid()) {
isc__nm_async_tcpchildlisten(&sock->mgr->workers[i],
(isc__netievent_t *) event);
isc__nm_put_ievent(sock->mgr, event);
} else {
isc__nm_enqueue_ievent(&sock->mgr->workers[i],
(isc__netievent_t *) event);
}
}
atomic_store(&sock->listening, true);
......@@ -195,6 +269,117 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
return;
}
/* Parent got an IPC connection from child */
static void
ipc_connection_cb(uv_stream_t *stream, int status) {
int r;
REQUIRE(status == 0);
isc_nmsocket_t *sock = stream->data;
isc__networker_t *worker = &sock->mgr->workers[isc_nm_tid()];
isc__nm_uvreq_t *nreq = isc__nm_uvreq_get(sock->mgr, sock);
/*
* The buffer can be anything, it will be ignored, but it has to
* be something that won't disappear.
*/
nreq->uvbuf = uv_buf_init((char *)nreq, 1);
uv_pipe_init(&worker->loop, &nreq->pipe, 1);
nreq->pipe.data = nreq;
/* Failure here is critical */
r = uv_accept((uv_stream_t *) &sock->ipc,
(uv_stream_t*) &nreq->pipe);
INSIST(r == 0);
r = uv_write2(&nreq->uv_req.write,
(uv_stream_t*) &nreq->pipe,
&nreq->uvbuf,
1,
(uv_stream_t*) &sock->uv_handle.stream,
ipc_write_cb);
INSIST(r == 0);
}
static void
ipc_write_cb(uv_write_t* uvreq, int status) {
UNUSED(status);
isc__nm_uvreq_t *req = uvreq->data;
/*
* We want all children to get the socket. If we're done we can stop
* listening on the IPC socket.
*/
if (atomic_fetch_add(&req->sock->schildren, 1) ==
req->sock->nchildren - 1) {
uv_close((uv_handle_t*) &req->sock->ipc, NULL);
}
uv_close((uv_handle_t*) &req->pipe, parent_pipe_close_cb);
}
static void
parent_pipe_close_cb(uv_handle_t *handle) {
isc__nm_uvreq_t *req = handle->data;
isc__nm_uvreq_put(&req, req->sock);
}
void
isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0) {
isc__netievent_tcplisten_t *ievent =
(isc__netievent_tcplisten_t *) ievent0;
isc_nmsocket_t *sock = ievent->sock;
int r;
REQUIRE(isc__nm_in_netthread());
REQUIRE(sock->type == isc_nm_tcpchildlistener);
r = uv_pipe_init(&worker->loop, &sock->ipc, 1);
INSIST(r == 0);
sock->ipc.data = sock;
isc__nm_uvreq_t * req = isc__nm_uvreq_get(sock->mgr, sock);
uv_pipe_connect(&req->uv_req.connect,
&sock->ipc,
sock->parent->ipc_pipe_name,
childlisten_ipc_connect_cb);
}
/* child connected to parent over IPC */
static void
childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status) {
UNUSED(status);
isc__nm_uvreq_t *req = uvreq->data;
isc_nmsocket_t *sock = req->sock;
isc__nm_uvreq_put(&req, sock);
int r = uv_read_start((uv_stream_t*) &sock->ipc,
isc__nm_alloc_cb,
childlisten_read_cb);
INSIST(r == 0);
}
/* child got the socket over IPC */
static void
childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
UNUSED(nread);
int r;
isc_nmsocket_t *sock = stream->data;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(buf != NULL);
uv_pipe_t* ipc = (uv_pipe_t*) stream;
uv_handle_type type = uv_pipe_pending_type(ipc);
INSIST(type == UV_TCP);
isc__nm_free_uvbuf(sock, buf);
isc__networker_t * worker = &sock->mgr->workers[isc_nm_tid()];
uv_tcp_init(&worker->loop, (uv_tcp_t*) &sock->uv_handle.tcp);
sock->uv_handle.tcp.data = sock;
uv_accept(stream, &sock->uv_handle.stream);
r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog,
tcp_connection_cb);
uv_close((uv_handle_t*) ipc, NULL);
if (r != 0) {
/* XXX log it? */
return;
}
}
void
isc_nm_tcp_stoplistening(isc_nmsocket_t *sock) {
isc__netievent_tcpstoplisten_t *ievent = NULL;
......@@ -208,24 +393,71 @@ isc_nm_tcp_stoplistening(isc_nmsocket_t *sock) {
(isc__netievent_t *) ievent);
}
void
isc__nm_async_tcpstoplisten(isc__networker_t *worker,
isc__netievent_t *ievent0)
{
isc__netievent_tcpstoplisten_t *ievent =
(isc__netievent_tcpstoplisten_t *) ievent0;
isc_nmsocket_t *sock = ievent->sock;
UNUSED(worker);
REQUIRE(isc__nm_in_netthread());
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcplistener);
/*
* If network manager is interlocked, re-enqueue the event for later.
*/
if (!isc__nm_acquire_interlocked(sock->mgr)) {
isc__netievent_tcpstoplisten_t *event = NULL;
event = isc__nm_get_ievent(sock->mgr,
netievent_tcpstoplisten);
event->sock = sock;
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *) event);
} else {
stoplistening(sock);
isc__nm_drop_interlocked(sock->mgr);
}
}
static void
stoplistening_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = handle->data;
stoplistening(isc_nmsocket_t *sock) {
for (int i = 0; i < sock->nchildren; i++) {
/*
* Stoplistening is a rare event, we can ignore the overhead
* caused by allocating an event, and doing it this way
* simplifies sock reference counting.
*/
isc__netievent_tcpstopchildlisten_t *event = NULL;
event = isc__nm_get_ievent(sock->mgr,
netievent_tcpstopchildlisten);
isc_nmsocket_attach(&sock->children[i], &event->sock);
if (i == sock->tid) {
isc__nm_async_tcpstopchildlisten(&sock->mgr->workers[i],
(isc__netievent_t *) event);
isc__nm_put_ievent(sock->mgr, event);
} else {
isc__nm_enqueue_ievent(&sock->mgr->workers[i],
(isc__netievent_t *) event);
}
}
LOCK(&sock->lock);
atomic_store(&sock->listening, false);
atomic_store(&sock->closed, true);
SIGNAL(&sock->cond);
while (atomic_load_relaxed(&sock->rchildren) > 0) {
WAIT(&sock->cond, &sock->lock);
}
UNLOCK(&sock->lock);
sock->pquota = NULL;
isc_nmsocket_detach(&sock);
uv_close((uv_handle_t *) &sock->uv_handle.tcp, tcp_listenclose_cb);
}
void
isc__nm_async_tcpstoplisten(isc__networker_t *worker,
isc__netievent_t *ievent0)
isc__nm_async_tcpstopchildlisten(isc__networker_t *worker,
isc__netievent_t *ievent0)
{
isc__netievent_tcpstoplisten_t *ievent =
(isc__netievent_tcpstoplisten_t *) ievent0;
......@@ -233,11 +465,39 @@ isc__nm_async_tcpstoplisten(isc__networker_t *worker,
UNUSED(worker);
REQUIRE(isc__nm_in_netthread());
REQUIRE(isc_nm_tid() == sock->tid);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcplistener);
REQUIRE(sock->type == isc_nm_tcpchildlistener);
REQUIRE(sock->parent != NULL);
/*
* rchildren is atomic but we still need to change it
* under a lock as the parent is waiting on conditional
* and without it we might deadlock.
*/
LOCK(&sock->parent->lock);
atomic_fetch_sub(&sock->parent->rchildren, 1);
UNLOCK(&sock->parent->lock);
uv_close((uv_handle_t *) &sock->uv_handle.tcp, tcp_listenclose_cb);
BROADCAST(&sock->parent->cond);
}
uv_close(&sock->uv_handle.handle, stoplistening_cb);
/*
* This callback is used for closing child and parent listening sockets -
* that's why we need to choose the proper lock.
*/
static void
tcp_listenclose_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = handle->data;
isc_mutex_t * lock = (sock->parent != NULL) ?
&sock->parent->lock : &sock->lock;
LOCK(lock);
atomic_store(&sock->closed, true);
atomic_store(&sock->listening, false);
sock->pquota = NULL;
UNLOCK(lock);
isc_nmsocket_detach(&sock);
}
static void
......
......@@ -171,7 +171,7 @@ stoplistening(isc_nmsocket_t *sock) {
INSIST(sock->type == isc_nm_udplistener);
for (int i = 0; i < sock->nchildren; i++) {
isc__netievent_udplisten_t *event = NULL;
isc__netievent_udpstoplisten_t *event = NULL;
if (i == sock->tid) {
stop_udp_child(&sock->children[i]);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment