Commit abbb79f9 authored by Evan Hunt's avatar Evan Hunt Committed by Witold Krecicki

implement isc_nm_tcpconnect()

the isc_nm_tcpconnect() function establishes a client connection via
TCP.  once the connection is esablished, a callback function will be
called with a newly created network manager handle.
parent cd79b495
...@@ -238,6 +238,21 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb, ...@@ -238,6 +238,21 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb,
* prepended with a two-byte length field, and handles buffering. * prepended with a two-byte length field, and handles buffering.
*/ */
isc_result_t
isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize);
/*%<
* Create a socket using netmgr 'mgr', bind it to the address 'local',
* and connect it to the address 'peer'.
*
* When the connection is complete, call 'cb' with argument 'cbarg'.
* Allocate 'extrahandlesize' additional bytes along with the handle to use
* for an associated object.
*
* The connected socket can only be accessed via the handle passed to
* 'cb'.
*/
isc_result_t isc_result_t
isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
void *cbarg, isc_nm_cb_t accept_cb, void *accept_cbarg, void *cbarg, isc_nm_cb_t accept_cb, void *accept_cbarg,
......
...@@ -96,8 +96,7 @@ struct isc_nmhandle { ...@@ -96,8 +96,7 @@ struct isc_nmhandle {
* the socket. * the socket.
*/ */
isc_nmsocket_t *sock; isc_nmsocket_t *sock;
size_t ah_pos; /* Position in the socket's size_t ah_pos; /* Position in the socket's 'active handles' array */
* 'active handles' array */
/* /*
* The handle is 'inflight' if netmgr is not currently processing * The handle is 'inflight' if netmgr is not currently processing
...@@ -141,6 +140,7 @@ typedef enum isc__netievent_type { ...@@ -141,6 +140,7 @@ typedef enum isc__netievent_type {
netievent_closecb, netievent_closecb,
netievent_shutdown, netievent_shutdown,
netievent_stop, netievent_stop,
netievent_prio = 0xff, /* event type values higher than this netievent_prio = 0xff, /* event type values higher than this
* will be treated as high-priority * will be treated as high-priority
* events, which can be processed * events, which can be processed
...@@ -443,6 +443,8 @@ struct isc_nmsocket { ...@@ -443,6 +443,8 @@ struct isc_nmsocket {
atomic_bool closed; atomic_bool closed;
atomic_bool listening; atomic_bool listening;
atomic_bool listen_error; atomic_bool listen_error;
atomic_bool connected;
atomic_bool connect_error;
isc_refcount_t references; isc_refcount_t references;
/*% /*%
......
...@@ -1140,6 +1140,9 @@ nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { ...@@ -1140,6 +1140,9 @@ nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
static void static void
nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
size_t handlenum;
bool reuse = false;
/* /*
* We do all of this under lock to avoid races with socket * We do all of this under lock to avoid races with socket
* destruction. We have to do this now, because at this point the * destruction. We have to do this now, because at this point the
...@@ -1152,10 +1155,9 @@ nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { ...@@ -1152,10 +1155,9 @@ nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
INSIST(atomic_load(&sock->ah) > 0); INSIST(atomic_load(&sock->ah) > 0);
sock->ah_handles[handle->ah_pos] = NULL; sock->ah_handles[handle->ah_pos] = NULL;
size_t handlenum = atomic_fetch_sub(&sock->ah, 1) - 1; handlenum = atomic_fetch_sub(&sock->ah, 1) - 1;
sock->ah_frees[handlenum] = handle->ah_pos; sock->ah_frees[handlenum] = handle->ah_pos;
handle->ah_pos = 0; handle->ah_pos = 0;
bool reuse = false;
if (atomic_load(&sock->active)) { if (atomic_load(&sock->active)) {
reuse = isc_astack_trypush(sock->inactivehandles, handle); reuse = isc_astack_trypush(sock->inactivehandles, handle);
} }
......
...@@ -88,6 +88,10 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { ...@@ -88,6 +88,10 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
if (r != 0) { if (r != 0) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
/* Socket was never opened; no need for tcp_close_direct() */
atomic_store(&sock->closed, true);
sock->result = isc__nm_uverr2result(r);
atomic_store(&sock->connect_error, true);
return (r); return (r);
} }
...@@ -96,13 +100,23 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { ...@@ -96,13 +100,23 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
if (r != 0) { if (r != 0) {
isc__nm_incstats(sock->mgr, isc__nm_incstats(sock->mgr,
sock->statsindex[STATID_BINDFAIL]); sock->statsindex[STATID_BINDFAIL]);
sock->result = isc__nm_uverr2result(r);
atomic_store(&sock->connect_error, true);
tcp_close_direct(sock); tcp_close_direct(sock);
return (r); return (r);
} }
} }
uv_handle_set_data(&sock->uv_handle.handle, sock); uv_handle_set_data(&sock->uv_handle.handle, sock);
r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp, r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp,
&req->peer.type.sa, tcp_connect_cb); &req->peer.type.sa, tcp_connect_cb);
if (r != 0) {
isc__nm_incstats(sock->mgr,
sock->statsindex[STATID_CONNECTFAIL]);
sock->result = isc__nm_uverr2result(r);
atomic_store(&sock->connect_error, true);
tcp_close_direct(sock);
}
return (r); return (r);
} }
...@@ -114,14 +128,21 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { ...@@ -114,14 +128,21 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__nm_uvreq_t *req = ievent->req; isc__nm_uvreq_t *req = ievent->req;
int r; int r;
REQUIRE(sock->type == isc_nm_tcpsocket); UNUSED(worker);
REQUIRE(worker->id == ievent->req->sock->mgr->workers[isc_nm_tid()].id);
r = tcp_connect_direct(sock, req); r = tcp_connect_direct(sock, req);
if (r != 0) { if (r != 0) {
/* We need to issue callbacks ourselves */ /* We need to issue callbacks ourselves */
tcp_connect_cb(&req->uv_req.connect, r); tcp_connect_cb(&req->uv_req.connect, r);
goto done;
} }
atomic_store(&sock->connected, true);
done:
LOCK(&sock->lock);
SIGNAL(&sock->cond);
UNLOCK(&sock->lock);
} }
static void static void
...@@ -138,6 +159,7 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { ...@@ -138,6 +159,7 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
struct sockaddr_storage ss; struct sockaddr_storage ss;
isc_nmhandle_t *handle = NULL; isc_nmhandle_t *handle = NULL;
sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss, uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
&(int){ sizeof(ss) }); &(int){ sizeof(ss) });
...@@ -165,13 +187,61 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { ...@@ -165,13 +187,61 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
* TODO: * TODO:
* Handle the connect error properly and free the socket. * Handle the connect error properly and free the socket.
*/ */
isc__nm_incstats(sock->mgr,
sock->statsindex[STATID_CONNECTFAIL]);
req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg); req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg);
isc__nm_uvreq_put(&req, sock); isc__nm_uvreq_put(&req, sock);
} }
} }
isc_result_t
isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize) {
isc_nmsocket_t *nsock = NULL;
isc__netievent_tcpconnect_t *ievent = NULL;
isc__nm_uvreq_t *req = NULL;
REQUIRE(VALID_NM(mgr));
nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local);
nsock->extrahandlesize = extrahandlesize;
nsock->result = ISC_R_SUCCESS;
req = isc__nm_uvreq_get(mgr, nsock);
req->cb.connect = cb;
req->cbarg = cbarg;
req->peer = peer->addr;
ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect);
ievent->sock = nsock;
ievent->req = req;
if (isc__nm_in_netthread()) {
nsock->tid = isc_nm_tid();
isc__nm_async_tcpconnect(&mgr->workers[nsock->tid],
(isc__netievent_t *)ievent);
isc__nm_put_ievent(mgr, ievent);
} else {
nsock->tid = isc_random_uniform(mgr->nworkers);
isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
(isc__netievent_t *)ievent);
LOCK(&nsock->lock);
while (!atomic_load(&nsock->connected) &&
!atomic_load(&nsock->connect_error)) {
WAIT(&nsock->cond, &nsock->lock);
}
UNLOCK(&nsock->lock);
}
if (nsock->result != ISC_R_SUCCESS) {
isc_result_t result = nsock->result;
isc__nmsocket_detach(&nsock);
return (result);
}
return (ISC_R_SUCCESS);
}
isc_result_t isc_result_t
isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb, isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb,
void *cbarg, size_t extrahandlesize, int backlog, void *cbarg, size_t extrahandlesize, int backlog,
...@@ -245,8 +315,8 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { ...@@ -245,8 +315,8 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
if (r != 0) { if (r != 0) {
/* It was never opened */
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
/* The socket was never opened, so no need for uv_close() */
atomic_store(&sock->closed, true); atomic_store(&sock->closed, true);
sock->result = isc__nm_uverr2result(r); sock->result = isc__nm_uverr2result(r);
atomic_store(&sock->listen_error, true); atomic_store(&sock->listen_error, true);
...@@ -866,6 +936,7 @@ static void ...@@ -866,6 +936,7 @@ static void
tcp_send_cb(uv_write_t *req, int status) { tcp_send_cb(uv_write_t *req, int status) {
isc_result_t result = ISC_R_SUCCESS; isc_result_t result = ISC_R_SUCCESS;
isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data; isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
isc_nmsocket_t *sock = NULL;
REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle)); REQUIRE(VALID_NMHANDLE(uvreq->handle));
...@@ -877,8 +948,10 @@ tcp_send_cb(uv_write_t *req, int status) { ...@@ -877,8 +948,10 @@ tcp_send_cb(uv_write_t *req, int status) {
} }
uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
sock = uvreq->handle->sock;
isc_nmhandle_unref(uvreq->handle); isc_nmhandle_unref(uvreq->handle);
isc__nm_uvreq_put(&uvreq, uvreq->handle->sock); isc__nm_uvreq_put(&uvreq, sock);
} }
/* /*
...@@ -931,6 +1004,7 @@ tcp_close_cb(uv_handle_t *uvhandle) { ...@@ -931,6 +1004,7 @@ tcp_close_cb(uv_handle_t *uvhandle) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
atomic_store(&sock->closed, true); atomic_store(&sock->closed, true);
atomic_store(&sock->connected, false);
isc__nmsocket_prep_destroy(sock); isc__nmsocket_prep_destroy(sock);
} }
......
...@@ -459,6 +459,7 @@ isc_nm_send ...@@ -459,6 +459,7 @@ isc_nm_send
isc_nm_setstats isc_nm_setstats
isc_nm_start isc_nm_start
isc_nm_stoplistening isc_nm_stoplistening
isc_nm_tcpconnect
isc_nm_tcp_gettimeouts isc_nm_tcp_gettimeouts
isc_nm_tcp_settimeouts isc_nm_tcp_settimeouts
isc_nm_tcpdns_keepalive isc_nm_tcpdns_keepalive
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment