Commit 3551d3ff authored by Evan Hunt's avatar Evan Hunt

convert rndc and control channel to use netmgr

- updated libisccc to use netmgr events
- updated rndc to use isc_nm_tcpconnect() to establish connections
- updated control channel to use isc_nm_listentcp()

open issues:

- the control channel timeout was previously 60 seconds, but it is now
  overridden by the TCP idle timeout setting, which defaults to 30
  seconds. we should add a function that sets the timeout value for
  a specific listener socket, instead of always using the global value
  set in the netmgr. (for the moment, since 30 seconds is a reasonable
  timeout for the control channel, I'm not prioritizing this.)
- the netmgr currently has no support for UNIX-domain sockets; until
  this is addressed, it will not be possible to configure rndc to use
  them. we will need to either fix this or document the change in
  behavior.
parent 002c3284
This diff is collapsed.
......@@ -58,6 +58,9 @@
const char *progname = NULL;
bool verbose;
static isc_taskmgr_t *taskmgr = NULL;
static isc_task_t *rndc_task = NULL;
static const char *admin_conffile = NULL;
static const char *admin_keyfile = NULL;
static const char *version = PACKAGE_VERSION;
......@@ -68,9 +71,9 @@ static bool local4set = false, local6set = false;
static int nserveraddrs;
static int currentaddr = 0;
static unsigned int remoteport = 0;
static isc_socketmgr_t *socketmgr = NULL;
static isc_nm_t *netmgr = NULL;
static isc_buffer_t *databuf = NULL;
static isccc_ccmsg_t ccmsg;
static isccc_ccmsg_t rndc_ccmsg;
static uint32_t algorithm;
static isccc_region_t secret;
static bool failed = false;
......@@ -82,13 +85,13 @@ static atomic_uint_fast32_t connects = ATOMIC_VAR_INIT(0);
static char *command = NULL;
static char *args = NULL;
static char program[256];
static isc_socket_t *sock = NULL;
static uint32_t serial;
static bool quiet = false;
static bool showresult = false;
static bool shuttingdown = false;
static void
rndc_startconnect(isc_sockaddr_t *addr, isc_task_t *task);
rndc_startconnect(isc_sockaddr_t *addr);
ISC_NORETURN static void
usage(int status);
......@@ -278,51 +281,54 @@ get_addresses(const char *host, in_port_t port) {
}
static void
rndc_senddone(isc_task_t *task, isc_event_t *event) {
isc_socketevent_t *sevent = (isc_socketevent_t *)event;
UNUSED(task);
rndc_senddone(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
UNUSED(arg);
if (sevent->result != ISC_R_SUCCESS) {
fatal("send failed: %s", isc_result_totext(sevent->result));
if (result != ISC_R_SUCCESS) {
fatal("send failed: %s", isc_result_totext(result));
}
isc_event_free(&event);
if (atomic_fetch_sub_release(&sends, 1) == 1 &&
atomic_load_acquire(&recvs) == 0)
{
isc_socket_detach(&sock);
isc_task_shutdown(task);
shuttingdown = true;
isc_task_shutdown(rndc_task);
isc_app_shutdown();
isc_nmhandle_unref(handle);
}
}
static void
rndc_recvdone(isc_task_t *task, isc_event_t *event) {
rndc_recvdone(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
isccc_ccmsg_t *ccmsg = (isccc_ccmsg_t *)arg;
isccc_sexpr_t *response = NULL;
isccc_sexpr_t *data;
isccc_region_t source;
char *errormsg = NULL;
char *textmsg = NULL;
isc_result_t result;
REQUIRE(ccmsg != NULL);
atomic_fetch_sub_release(&recvs, 1);
if (ccmsg.result == ISC_R_EOF) {
fatal("connection to remote host closed\n"
"This may indicate that\n"
"* the remote server is using an older version of"
" the command protocol,\n"
if (shuttingdown && (result == ISC_R_EOF || result == ISC_R_CANCELED)) {
isc_nmhandle_unref(handle);
return;
} else if (result == ISC_R_EOF) {
fatal("connection to remote host closed.\n"
"* This may indicate that the\n"
"* remote server is using an older\n"
"* version of the command protocol,\n"
"* this host is not authorized to connect,\n"
"* the clocks are not synchronized, or\n"
"* the key is invalid.");
}
if (ccmsg.result != ISC_R_SUCCESS) {
fatal("recv failed: %s", isc_result_totext(ccmsg.result));
"* the clocks are not synchronized,\n"
"* the key signing algorithm is incorrect,\n"
"* or the key is invalid.");
} else if (result != ISC_R_SUCCESS && result != ISC_R_CANCELED) {
fatal("recv failed: %s", isc_result_totext(result));
}
source.rstart = isc_buffer_base(&ccmsg.buffer);
source.rend = isc_buffer_used(&ccmsg.buffer);
source.rstart = isc_buffer_base(ccmsg->buffer);
source.rend = isc_buffer_used(ccmsg->buffer);
DO("parse message",
isccc_cc_fromwire(&source, &response, algorithm, &secret));
......@@ -362,22 +368,23 @@ rndc_recvdone(isc_task_t *task, isc_event_t *event) {
}
}
isc_event_free(&event);
isccc_sexpr_free(&response);
if (atomic_load_acquire(&sends) == 0 &&
atomic_load_acquire(&recvs) == 0) {
isc_socket_detach(&sock);
isc_task_shutdown(task);
shuttingdown = true;
isc_task_shutdown(rndc_task);
isc_app_shutdown();
isc_nmhandle_unref(handle);
}
}
static void
rndc_recvnonce(isc_task_t *task, isc_event_t *event) {
rndc_recvnonce(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
isccc_ccmsg_t *ccmsg = (isccc_ccmsg_t *)arg;
isccc_sexpr_t *response = NULL;
isccc_sexpr_t *_ctrl;
isccc_region_t source;
isc_result_t result;
uint32_t nonce;
isccc_sexpr_t *request = NULL;
isccc_time_t now;
......@@ -385,25 +392,28 @@ rndc_recvnonce(isc_task_t *task, isc_event_t *event) {
isccc_sexpr_t *data;
isc_buffer_t b;
REQUIRE(ccmsg != NULL);
atomic_fetch_sub_release(&recvs, 1);
if (ccmsg.result == ISC_R_EOF) {
fatal("connection to remote host closed\n"
"This may indicate that\n"
"* the remote server is using an older version of"
" the command protocol,\n"
if (shuttingdown && result == ISC_R_EOF) {
isc_nmhandle_unref(handle);
return;
} else if (result == ISC_R_EOF) {
fatal("connection to remote host closed.\n"
"* This may indicate that the\n"
"* remote server is using an older\n"
"* version of the command protocol,\n"
"* this host is not authorized to connect,\n"
"* the clocks are not synchronized,\n"
"* the key signing algorithm is incorrect, or\n"
"* the key is invalid.");
}
if (ccmsg.result != ISC_R_SUCCESS) {
fatal("recv failed: %s", isc_result_totext(ccmsg.result));
"* the key signing algorithm is incorrect\n"
"* or the key is invalid.");
} else if (result != ISC_R_SUCCESS) {
fatal("recv failed: %s", isc_result_totext(result));
}
source.rstart = isc_buffer_base(&ccmsg.buffer);
source.rend = isc_buffer_used(&ccmsg.buffer);
source.rstart = isc_buffer_base(ccmsg->buffer);
source.rend = isc_buffer_used(ccmsg->buffer);
DO("parse message",
isccc_cc_fromwire(&source, &response, algorithm, &secret));
......@@ -451,48 +461,44 @@ rndc_recvnonce(isc_task_t *task, isc_event_t *event) {
r.base = databuf->base;
r.length = databuf->used;
isccc_ccmsg_cancelread(&ccmsg);
DO("schedule recv",
isccc_ccmsg_readmessage(&ccmsg, task, rndc_recvdone, NULL));
isccc_ccmsg_readmessage(ccmsg, rndc_recvdone, ccmsg));
atomic_fetch_add_relaxed(&recvs, 1);
DO("send message",
isc_socket_send(sock, &r, task, rndc_senddone, NULL));
DO("send message", isc_nm_send(handle, &r, rndc_senddone, NULL));
atomic_fetch_add_relaxed(&sends, 1);
isc_event_free(&event);
isccc_sexpr_free(&response);
isccc_sexpr_free(&request);
return;
}
static void
rndc_connected(isc_task_t *task, isc_event_t *event) {
rndc_connected(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
isccc_ccmsg_t *ccmsg = (isccc_ccmsg_t *)arg;
char socktext[ISC_SOCKADDR_FORMATSIZE];
isc_socketevent_t *sevent = (isc_socketevent_t *)event;
isccc_sexpr_t *request = NULL;
isccc_sexpr_t *data;
isccc_time_t now;
isc_region_t r;
isc_buffer_t b;
isc_result_t result;
REQUIRE(ccmsg != NULL);
atomic_fetch_sub_release(&connects, 1);
if (sevent->result != ISC_R_SUCCESS) {
if (result != ISC_R_SUCCESS) {
isc_sockaddr_format(&serveraddrs[currentaddr], socktext,
sizeof(socktext));
if (sevent->result != ISC_R_CANCELED &&
++currentaddr < nserveraddrs) {
if (++currentaddr < nserveraddrs) {
notify("connection failed: %s: %s", socktext,
isc_result_totext(sevent->result));
isc_socket_detach(&sock);
isc_event_free(&event);
rndc_startconnect(&serveraddrs[currentaddr], task);
isc_result_totext(result));
rndc_startconnect(&serveraddrs[currentaddr]);
return;
} else {
fatal("connect failed: %s: %s", socktext,
isc_result_totext(sevent->result));
}
fatal("connect failed: %s: %s", socktext,
isc_result_totext(result));
}
isc_stdtime_get(&now);
......@@ -519,50 +525,52 @@ rndc_connected(isc_task_t *task, isc_event_t *event) {
r.base = databuf->base;
r.length = databuf->used;
isccc_ccmsg_init(rndc_mctx, sock, &ccmsg);
isccc_ccmsg_setmaxsize(&ccmsg, 1024 * 1024);
isccc_ccmsg_init(rndc_mctx, handle, ccmsg);
isccc_ccmsg_setmaxsize(ccmsg, 1024 * 1024);
isc_nmhandle_ref(handle);
DO("schedule recv",
isccc_ccmsg_readmessage(&ccmsg, task, rndc_recvnonce, NULL));
isccc_ccmsg_readmessage(ccmsg, rndc_recvnonce, ccmsg));
atomic_fetch_add_relaxed(&recvs, 1);
DO("send message",
isc_socket_send(sock, &r, task, rndc_senddone, NULL));
DO("send message", isc_nm_send(handle, &r, rndc_senddone, NULL));
atomic_fetch_add_relaxed(&sends, 1);
isc_event_free(&event);
isccc_sexpr_free(&request);
}
static void
rndc_startconnect(isc_sockaddr_t *addr, isc_task_t *task) {
rndc_startconnect(isc_sockaddr_t *addr) {
isc_result_t result;
int pf;
isc_sockettype_t type;
char socktext[ISC_SOCKADDR_FORMATSIZE];
isc_sockaddr_t *local = NULL;
isc_sockaddr_format(addr, socktext, sizeof(socktext));
notify("using server %s (%s)", servername, socktext);
pf = isc_sockaddr_pf(addr);
if (pf == AF_INET || pf == AF_INET6) {
type = isc_sockettype_tcp;
} else {
type = isc_sockettype_unix;
}
DO("create socket", isc_socket_create(socketmgr, pf, type, &sock));
switch (isc_sockaddr_pf(addr)) {
case AF_INET:
DO("bind socket", isc_socket_bind(sock, &local4, 0));
local = &local4;
break;
case AF_INET6:
DO("bind socket", isc_socket_bind(sock, &local6, 0));
local = &local6;
break;
case AF_UNIX:
/*
* TODO: support UNIX domain sockets in netgmr.
*/
fatal("UNIX domain sockets not currently supported");
default:
break;
INSIST(0);
ISC_UNREACHABLE();
}
DO("connect",
isc_socket_connect(sock, addr, task, rndc_connected, NULL));
DO("create connection",
isc_nm_tcpconnect(netmgr, (isc_nmiface_t *)local,
(isc_nmiface_t *)addr, rndc_connected, &rndc_ccmsg,
0));
atomic_fetch_add_relaxed(&connects, 1);
}
......@@ -570,8 +578,10 @@ static void
rndc_start(isc_task_t *task, isc_event_t *event) {
isc_event_free(&event);
UNUSED(task);
currentaddr = 0;
rndc_startconnect(&serveraddrs[currentaddr], task);
rndc_startconnect(&serveraddrs[currentaddr]);
}
static void
......@@ -853,8 +863,6 @@ int
main(int argc, char **argv) {
isc_result_t result = ISC_R_SUCCESS;
bool show_final_mem = false;
isc_taskmgr_t *taskmgr = NULL;
isc_task_t *task = NULL;
isc_log_t *log = NULL;
isc_logconfig_t *logconfig = NULL;
isc_logdestination_t logdest;
......@@ -980,18 +988,23 @@ main(int argc, char **argv) {
argc -= isc_commandline_index;
argv += isc_commandline_index;
if (argc < 1) {
if (argv[0] == NULL) {
usage(1);
} else {
command = argv[0];
if (strcmp(command, "restart") == 0) {
fatal("'%s' is not implemented", command);
}
notify("%s", command);
}
serial = isc_random32();
isc_mem_create(&rndc_mctx);
DO("create socket manager",
isc_socketmgr_create(rndc_mctx, &socketmgr));
netmgr = isc_nm_start(rndc_mctx, 1);
DO("create task manager",
isc_taskmgr_create(rndc_mctx, 1, 0, NULL, &taskmgr));
DO("create task", isc_task_create(taskmgr, 0, &task));
DO("create task", isc_task_create(taskmgr, 0, &rndc_task));
isc_log_create(rndc_mctx, &log, &logconfig);
isc_log_setcontext(log);
isc_log_settag(logconfig, progname);
......@@ -1009,8 +1022,6 @@ main(int argc, char **argv) {
isccc_result_register();
command = *argv;
isc_buffer_allocate(rndc_mctx, &databuf, 2048);
/*
......@@ -1037,32 +1048,30 @@ main(int argc, char **argv) {
*p++ = '\0';
INSIST(p == args + argslen);
notify("%s", command);
if (strcmp(command, "restart") == 0) {
fatal("'%s' is not implemented", command);
}
if (nserveraddrs == 0 && servername != NULL) {
get_addresses(servername, (in_port_t)remoteport);
}
DO("post event", isc_app_onrun(rndc_mctx, task, rndc_start, NULL));
DO("post event", isc_app_onrun(rndc_mctx, rndc_task, rndc_start, NULL));
result = isc_app_run();
if (result != ISC_R_SUCCESS) {
fatal("isc_app_run() failed: %s", isc_result_totext(result));
}
if (atomic_load_acquire(&connects) > 0 ||
atomic_load_acquire(&sends) > 0 || atomic_load_acquire(&recvs) > 0)
{
isc_socket_cancel(sock, task, ISC_SOCKCANCEL_ALL);
}
isc_task_detach(&task);
isc_task_detach(&rndc_task);
isc_taskmgr_destroy(&taskmgr);
isc_socketmgr_destroy(&socketmgr);
isc_nm_destroy(&netmgr);
/*
* Note: when TCP connections are shut down, there will be a final
* call to the isccc callback routine with &rndc_ccmsg as its
* argument. We therefore need to delay invalidating it until
* after the netmgr is destroyed.
*/
isccc_ccmsg_invalidate(&rndc_ccmsg);
isc_log_destroy(&log);
isc_log_setcontext(NULL);
......@@ -1070,7 +1079,6 @@ main(int argc, char **argv) {
cfg_parser_destroy(&pctx);
isc_mem_put(rndc_mctx, args, argslen);
isccc_ccmsg_invalidate(&ccmsg);
isc_buffer_free(&databuf);
......
......@@ -39,7 +39,7 @@ level margin: \\n[rst2man-indent\\n[rst2man-indent-level]]
.sp
\fBdnssec\-importkey\fP reads a public DNSKEY record and generates a pair
of .key/.private files. The DNSKEY record may be read from an existing
.key file, in which case a corresponding .private file is
\&.key file, in which case a corresponding .private file is
generated, or it may be read from any other file or from the standard
input, in which case both .key and .private files are generated.
.sp
......
......@@ -28,8 +28,9 @@
#include <inttypes.h>
#include <isc/mem.h>
#include <isc/netmgr.h>
#include <isc/result.h>
#include <isc/task.h>
#include <isc/string.h>
#include <isc/util.h>
#include <isccc/ccmsg.h>
......@@ -39,109 +40,140 @@
#define VALID_CCMSG(foo) ISC_MAGIC_VALID(foo, CCMSG_MAGIC)
static void
recv_length(isc_task_t *, isc_event_t *);
static void
recv_message(isc_task_t *, isc_event_t *);
recv_message(isc_nmhandle_t *handle, isc_result_t eresult, isc_region_t *region,
void *arg);
static void
recv_length(isc_task_t *task, isc_event_t *ev_in) {
isc_socketevent_t *ev = (isc_socketevent_t *)ev_in;
isc_event_t *dev = NULL;
isccc_ccmsg_t *ccmsg = ev_in->ev_arg;
isc_region_t region;
recv_nonce(isc_nmhandle_t *handle, isc_result_t eresult, isc_region_t *region,
void *arg) {
isccc_ccmsg_t *ccmsg = arg;
isc_result_t result;
if (eresult == ISC_R_CANCELED || eresult == ISC_R_EOF) {
ccmsg->result = eresult;
goto done;
}
INSIST(VALID_CCMSG(ccmsg));
dev = &ccmsg->event;
if (region == NULL && eresult == ISC_R_SUCCESS) {
ccmsg->result = ISC_R_EOF;
goto done;
} else if (eresult != ISC_R_SUCCESS) {
ccmsg->result = eresult;
goto done;
} else {
ccmsg->result = eresult;
}
if (ev->result != ISC_R_SUCCESS) {
ccmsg->result = ev->result;
goto send_and_free;
if (region->length < sizeof(uint32_t)) {
ccmsg->result = ISC_R_UNEXPECTEDEND;
goto done;
}
/*
* Success.
*/
ccmsg->size = ntohl(ccmsg->size);
ccmsg->size = ntohl(*(uint32_t *)region->base);
if (ccmsg->size == 0) {
ccmsg->result = ISC_R_UNEXPECTEDEND;
goto send_and_free;
goto done;
}
if (ccmsg->size > ccmsg->maxsize) {
ccmsg->result = ISC_R_RANGE;
goto send_and_free;
goto done;
}
region.base = isc_mem_get(ccmsg->mctx, ccmsg->size);
region.length = ccmsg->size;
if (region.base == NULL) {
ccmsg->result = ISC_R_NOMEMORY;
goto send_and_free;
isc_region_consume(region, sizeof(uint32_t));
isc_buffer_allocate(ccmsg->mctx, &ccmsg->buffer, ccmsg->size);
/*
* If there's more of the message waiting, pass it to
* recv_message() directly.
*/
if (region->length != 0) {
recv_message(handle, ISC_R_SUCCESS, region, ccmsg);
return;
}
isc_buffer_init(&ccmsg->buffer, region.base, region.length);
result = isc_socket_recv(ccmsg->sock, &region, 0, task, recv_message,
ccmsg);
if (result != ISC_R_SUCCESS) {
ccmsg->result = result;
goto send_and_free;
/*
* Otherwise, continue reading and handle it in
* recv_message().
*/
result = isc_nm_read(handle, recv_message, ccmsg);
if (result == ISC_R_SUCCESS) {
return;
}
isc_event_free(&ev_in);
return;
ccmsg->result = result;
send_and_free:
isc_task_send(ccmsg->task, &dev);
ccmsg->task = NULL;
isc_event_free(&ev_in);
return;
done:
ccmsg->cb(handle, ccmsg->result, ccmsg->cbarg);
isc_nmhandle_unref(handle);
}
static void
recv_message(isc_task_t *task, isc_event_t *ev_in) {
isc_socketevent_t *ev = (isc_socketevent_t *)ev_in;
isc_event_t *dev = NULL;
isccc_ccmsg_t *ccmsg = ev_in->ev_arg;
recv_message(isc_nmhandle_t *handle, isc_result_t eresult, isc_region_t *region,
void *arg) {
isc_result_t result;
isccc_ccmsg_t *ccmsg = arg;
size_t size;
UNUSED(task);
if (eresult == ISC_R_CANCELED || eresult == ISC_R_EOF) {
ccmsg->result = eresult;
goto done;
}
INSIST(VALID_CCMSG(ccmsg));
dev = &ccmsg->event;
if (region == NULL && eresult == ISC_R_SUCCESS) {
ccmsg->result = ISC_R_EOF;
goto done;
} else if (eresult != ISC_R_SUCCESS) {
ccmsg->result = eresult;
goto done;
} else {
ccmsg->result = eresult;
}
if (region->length == 0) {
ccmsg->result = ISC_R_UNEXPECTEDEND;
goto done;
}
size = ISC_MIN(isc_buffer_availablelength(ccmsg->buffer),
region->length);
isc_buffer_putmem(ccmsg->buffer, region->base, size);
isc_region_consume(region, size);
if (isc_buffer_usedlength(ccmsg->buffer) == ccmsg->size) {
ccmsg->result = ISC_R_SUCCESS;
goto done;
}
if (ev->result != ISC_R_SUCCESS) {
ccmsg->result = ev->result;
goto send_and_free;
result = isc_nm_read(handle, recv_message, ccmsg);
if (result == ISC_R_SUCCESS) {
return;
}
ccmsg->result = ISC_R_SUCCESS;
isc_buffer_add(&ccmsg->buffer, ev->n);
ccmsg->address = ev->address;
ccmsg->result = result;
send_and_free:
isc_task_send(ccmsg->task, &dev);
ccmsg->task = NULL;
isc_event_free(&ev_in);
done:
ccmsg->cb(handle, ccmsg->result, ccmsg->cbarg);
isc_nmhandle_unref(handle);
}
void
isccc_ccmsg_init(isc_mem_t *mctx, isc_socket_t *sock, isccc_ccmsg_t *ccmsg) {
isccc_ccmsg_init(isc_mem_t *mctx, isc_nmhandle_t *handle,
isccc_ccmsg_t *ccmsg) {
REQUIRE(mctx != NULL);
REQUIRE(sock != NULL);
REQUIRE(handle != NULL);
REQUIRE(ccmsg != NULL);
*ccmsg = (isccc_ccmsg_t){
.magic = CCMSG_MAGIC,
.maxsize = 0xffffffffU, /* Largest message possible. */
.mctx = mctx,
.sock = sock,
.handle = handle,
.result = ISC_R_UNEXPECTED /* None yet. */
};
/*