netmgr.c 42.4 KB
Newer Older
1 2 3 4 5
/*
 * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
6
 * file, you can obtain one at https://mozilla.org/MPL/2.0/.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
 *
 * See the COPYRIGHT file distributed with this work for additional
 * information regarding copyright ownership.
 */

#include <inttypes.h>
#include <unistd.h>
#include <uv.h>

#include <isc/atomic.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/magic.h>
#include <isc/mem.h>
#include <isc/netmgr.h>
#include <isc/print.h>
23
#include <isc/quota.h>
24 25 26 27 28
#include <isc/random.h>
#include <isc/refcount.h>
#include <isc/region.h>
#include <isc/result.h>
#include <isc/sockaddr.h>
29
#include <isc/stats.h>
30 31 32 33
#include <isc/thread.h>
#include <isc/util.h>

#include "netmgr-int.h"
34
#include "uv-compat.h"
35

36 37 38 39
#ifdef NETMGR_TRACE
#include <execinfo.h>
#endif

40 41 42 43 44 45 46
/*%
 * How many isc_nmhandles and isc_nm_uvreqs will we be
 * caching for reuse in a socket.
 */
#define ISC_NM_HANDLES_STACK_SIZE 600
#define ISC_NM_REQS_STACK_SIZE	  600

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
/*%
 * Shortcut index arrays to get access to statistics counters.
 */

static const isc_statscounter_t udp4statsindex[] = {
	isc_sockstatscounter_udp4open,
	isc_sockstatscounter_udp4openfail,
	isc_sockstatscounter_udp4close,
	isc_sockstatscounter_udp4bindfail,
	isc_sockstatscounter_udp4connectfail,
	isc_sockstatscounter_udp4connect,
	-1,
	-1,
	isc_sockstatscounter_udp4sendfail,
	isc_sockstatscounter_udp4recvfail,
	isc_sockstatscounter_udp4active
};

static const isc_statscounter_t udp6statsindex[] = {
	isc_sockstatscounter_udp6open,
	isc_sockstatscounter_udp6openfail,
	isc_sockstatscounter_udp6close,
	isc_sockstatscounter_udp6bindfail,
	isc_sockstatscounter_udp6connectfail,
	isc_sockstatscounter_udp6connect,
	-1,
	-1,
	isc_sockstatscounter_udp6sendfail,
	isc_sockstatscounter_udp6recvfail,
	isc_sockstatscounter_udp6active
};

static const isc_statscounter_t tcp4statsindex[] = {
80 81 82 83 84
	isc_sockstatscounter_tcp4open,	      isc_sockstatscounter_tcp4openfail,
	isc_sockstatscounter_tcp4close,	      isc_sockstatscounter_tcp4bindfail,
	isc_sockstatscounter_tcp4connectfail, isc_sockstatscounter_tcp4connect,
	isc_sockstatscounter_tcp4acceptfail,  isc_sockstatscounter_tcp4accept,
	isc_sockstatscounter_tcp4sendfail,    isc_sockstatscounter_tcp4recvfail,
85 86 87 88
	isc_sockstatscounter_tcp4active
};

static const isc_statscounter_t tcp6statsindex[] = {
89 90 91 92 93
	isc_sockstatscounter_tcp6open,	      isc_sockstatscounter_tcp6openfail,
	isc_sockstatscounter_tcp6close,	      isc_sockstatscounter_tcp6bindfail,
	isc_sockstatscounter_tcp6connectfail, isc_sockstatscounter_tcp6connect,
	isc_sockstatscounter_tcp6acceptfail,  isc_sockstatscounter_tcp6accept,
	isc_sockstatscounter_tcp6sendfail,    isc_sockstatscounter_tcp6recvfail,
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
	isc_sockstatscounter_tcp6active
};

#if 0
/* XXX: not currently used */
static const isc_statscounter_t unixstatsindex[] = {
	isc_sockstatscounter_unixopen,
	isc_sockstatscounter_unixopenfail,
	isc_sockstatscounter_unixclose,
	isc_sockstatscounter_unixbindfail,
	isc_sockstatscounter_unixconnectfail,
	isc_sockstatscounter_unixconnect,
	isc_sockstatscounter_unixacceptfail,
	isc_sockstatscounter_unixaccept,
	isc_sockstatscounter_unixsendfail,
	isc_sockstatscounter_unixrecvfail,
	isc_sockstatscounter_unixactive
};
112
#endif /* if 0 */
113

114 115 116 117 118 119 120 121 122
/*
 * libuv is not thread safe, but has mechanisms to pass messages
 * between threads. Each socket is owned by a thread. For UDP
 * sockets we have a set of sockets for each interface and we can
 * choose a sibling and send the message directly. For TCP, or if
 * we're calling from a non-networking thread, we need to pass the
 * request using async_cb.
 */

123
static thread_local int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN;
124

Ondřej Surý's avatar
Ondřej Surý committed
125 126 127 128 129 130 131 132
static void
nmsocket_maybe_destroy(isc_nmsocket_t *sock);
static void
nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle);
static isc_threadresult_t
nm_thread(isc_threadarg_t worker0);
static void
async_cb(uv_async_t *handle);
133
static bool
Ondřej Surý's avatar
Ondřej Surý committed
134
process_queue(isc__networker_t *worker, isc_queue_t *queue);
135 136 137 138 139 140 141 142 143 144 145 146 147
static bool
process_priority_queue(isc__networker_t *worker);
static bool
process_normal_queue(isc__networker_t *worker);
static void
process_queues(isc__networker_t *worker);

static void
isc__nm_async_stopcb(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_async_pausecb(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_async_resumecb(isc__networker_t *worker, isc__netievent_t *ev0);
148 149

int
150
isc_nm_tid(void) {
151 152 153 154
	return (isc__nm_tid_v);
}

bool
155
isc__nm_in_netthread(void) {
156 157 158 159
	return (isc__nm_tid_v >= 0);
}

isc_nm_t *
Evan Hunt's avatar
Evan Hunt committed
160
isc_nm_start(isc_mem_t *mctx, uint32_t workers) {
161
	isc_nm_t *mgr = NULL;
Evan Hunt's avatar
Evan Hunt committed
162
	char name[32];
163 164

	mgr = isc_mem_get(mctx, sizeof(*mgr));
165
	*mgr = (isc_nm_t){ .nworkers = workers };
166 167 168 169 170 171 172 173

	isc_mem_attach(mctx, &mgr->mctx);
	isc_mutex_init(&mgr->lock);
	isc_condition_init(&mgr->wkstatecond);
	isc_refcount_init(&mgr->references, 1);
	atomic_init(&mgr->maxudp, 0);
	atomic_init(&mgr->interlocked, false);

174 175 176 177
#ifdef NETMGR_TRACE
	ISC_LIST_INIT(mgr->active_sockets);
#endif

178 179
	/*
	 * Default TCP timeout values.
Evan Hunt's avatar
Evan Hunt committed
180
	 * May be updated by isc_nm_tcptimeouts().
181 182 183 184 185 186
	 */
	mgr->init = 30000;
	mgr->idle = 30000;
	mgr->keepalive = 30000;
	mgr->advertised = 30000;

Evan Hunt's avatar
Evan Hunt committed
187 188 189
	isc_mutex_init(&mgr->reqlock);
	isc_mempool_create(mgr->mctx, sizeof(isc__nm_uvreq_t), &mgr->reqpool);
	isc_mempool_setname(mgr->reqpool, "nm_reqpool");
190
	isc_mempool_setfreemax(mgr->reqpool, 4096);
Evan Hunt's avatar
Evan Hunt committed
191 192 193 194 195 196 197
	isc_mempool_associatelock(mgr->reqpool, &mgr->reqlock);
	isc_mempool_setfillcount(mgr->reqpool, 32);

	isc_mutex_init(&mgr->evlock);
	isc_mempool_create(mgr->mctx, sizeof(isc__netievent_storage_t),
			   &mgr->evpool);
	isc_mempool_setname(mgr->evpool, "nm_evpool");
198
	isc_mempool_setfreemax(mgr->evpool, 4096);
Evan Hunt's avatar
Evan Hunt committed
199 200 201
	isc_mempool_associatelock(mgr->evpool, &mgr->evlock);
	isc_mempool_setfillcount(mgr->evpool, 32);

202 203
	mgr->workers = isc_mem_get(mctx, workers * sizeof(isc__networker_t));
	for (size_t i = 0; i < workers; i++) {
Evan Hunt's avatar
Evan Hunt committed
204
		int r;
205
		isc__networker_t *worker = &mgr->workers[i];
206
		*worker = (isc__networker_t){
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
			.mgr = mgr,
			.id = i,
		};

		r = uv_loop_init(&worker->loop);
		RUNTIME_CHECK(r == 0);

		worker->loop.data = &mgr->workers[i];

		r = uv_async_init(&worker->loop, &worker->async, async_cb);
		RUNTIME_CHECK(r == 0);

		isc_mutex_init(&worker->lock);
		isc_condition_init(&worker->cond);

		worker->ievents = isc_queue_new(mgr->mctx, 128);
223
		worker->ievents_prio = isc_queue_new(mgr->mctx, 128);
224
		worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
225 226 227 228 229 230

		/*
		 * We need to do this here and not in nm_thread to avoid a
		 * race - we could exit isc_nm_start, launch nm_destroy,
		 * and nm_thread would still not be up.
		 */
231
		mgr->workers_running++;
232 233 234 235 236 237 238 239 240 241 242 243 244 245
		isc_thread_create(nm_thread, &mgr->workers[i], &worker->thread);

		snprintf(name, sizeof(name), "isc-net-%04zu", i);
		isc_thread_setname(worker->thread, name);
	}

	mgr->magic = NM_MAGIC;
	return (mgr);
}

/*
 * Free the resources of the network manager.
 */
static void
Evan Hunt's avatar
Evan Hunt committed
246
nm_destroy(isc_nm_t **mgr0) {
247 248 249 250
	REQUIRE(VALID_NM(*mgr0));
	REQUIRE(!isc__nm_in_netthread());

	isc_nm_t *mgr = *mgr0;
251
	*mgr0 = NULL;
252

253 254
	isc_refcount_destroy(&mgr->references);

255 256 257
	mgr->magic = 0;

	for (size_t i = 0; i < mgr->nworkers; i++) {
258 259 260 261
		isc__networker_t *worker = &mgr->workers[i];
		isc__netievent_t *event = isc__nm_get_ievent(mgr,
							     netievent_stop);
		isc__nm_enqueue_ievent(worker, event);
262 263
	}

Mark Andrews's avatar
Mark Andrews committed
264
	LOCK(&mgr->lock);
265
	while (mgr->workers_running > 0) {
266 267 268 269 270
		WAIT(&mgr->wkstatecond, &mgr->lock);
	}
	UNLOCK(&mgr->lock);

	for (size_t i = 0; i < mgr->nworkers; i++) {
271
		isc__networker_t *worker = &mgr->workers[i];
Evan Hunt's avatar
Evan Hunt committed
272
		isc__netievent_t *ievent = NULL;
Evan Hunt's avatar
Evan Hunt committed
273
		int r;
Evan Hunt's avatar
Evan Hunt committed
274 275

		/* Empty the async event queues */
276
		while ((ievent = (isc__netievent_t *)isc_queue_dequeue(
Evan Hunt's avatar
Evan Hunt committed
277 278
				worker->ievents)) != NULL)
		{
279 280
			isc_mempool_put(mgr->evpool, ievent);
		}
Evan Hunt's avatar
Evan Hunt committed
281

282
		while ((ievent = (isc__netievent_t *)isc_queue_dequeue(
Evan Hunt's avatar
Evan Hunt committed
283 284
				worker->ievents_prio)) != NULL)
		{
Evan Hunt's avatar
Evan Hunt committed
285
			isc_mempool_put(mgr->evpool, ievent);
286
		}
Evan Hunt's avatar
Evan Hunt committed
287 288

		r = uv_loop_close(&worker->loop);
289
		INSIST(r == 0);
Evan Hunt's avatar
Evan Hunt committed
290

291 292
		isc_queue_destroy(worker->ievents);
		isc_queue_destroy(worker->ievents_prio);
293 294 295
		isc_mutex_destroy(&worker->lock);
		isc_condition_destroy(&worker->cond);

296 297
		isc_mem_put(mgr->mctx, worker->recvbuf,
			    ISC_NETMGR_RECVBUF_SIZE);
298
		isc_thread_join(worker->thread, NULL);
299 300
	}

301 302 303 304
	if (mgr->stats != NULL) {
		isc_stats_detach(&mgr->stats);
	}

305 306
	isc_condition_destroy(&mgr->wkstatecond);
	isc_mutex_destroy(&mgr->lock);
Evan Hunt's avatar
Evan Hunt committed
307 308 309 310 311 312 313

	isc_mempool_destroy(&mgr->evpool);
	isc_mutex_destroy(&mgr->evlock);

	isc_mempool_destroy(&mgr->reqpool);
	isc_mutex_destroy(&mgr->reqlock);

314 315 316 317 318 319
	isc_mem_put(mgr->mctx, mgr->workers,
		    mgr->nworkers * sizeof(isc__networker_t));
	isc_mem_putanddetach(&mgr->mctx, mgr, sizeof(*mgr));
}

void
Evan Hunt's avatar
Evan Hunt committed
320
isc_nm_pause(isc_nm_t *mgr) {
321 322 323 324 325 326
	REQUIRE(VALID_NM(mgr));
	REQUIRE(!isc__nm_in_netthread());

	isc__nm_acquire_interlocked_force(mgr);

	for (size_t i = 0; i < mgr->nworkers; i++) {
327 328 329 330
		isc__networker_t *worker = &mgr->workers[i];
		isc__netievent_t *event = isc__nm_get_ievent(mgr,
							     netievent_pause);
		isc__nm_enqueue_ievent(worker, event);
331 332 333
	}

	LOCK(&mgr->lock);
334
	while (mgr->workers_paused != mgr->workers_running) {
335 336 337 338 339 340
		WAIT(&mgr->wkstatecond, &mgr->lock);
	}
	UNLOCK(&mgr->lock);
}

void
Evan Hunt's avatar
Evan Hunt committed
341
isc_nm_resume(isc_nm_t *mgr) {
342 343 344 345
	REQUIRE(VALID_NM(mgr));
	REQUIRE(!isc__nm_in_netthread());

	for (size_t i = 0; i < mgr->nworkers; i++) {
346 347 348 349
		isc__networker_t *worker = &mgr->workers[i];
		isc__netievent_t *event = isc__nm_get_ievent(mgr,
							     netievent_resume);
		isc__nm_enqueue_ievent(worker, event);
350 351
	}

352 353 354 355 356 357 358
	LOCK(&mgr->lock);
	while (mgr->workers_paused != 0) {
		WAIT(&mgr->wkstatecond, &mgr->lock);
	}
	UNLOCK(&mgr->lock);

	isc__nm_drop_interlocked(mgr);
359 360 361
}

void
Evan Hunt's avatar
Evan Hunt committed
362
isc_nm_attach(isc_nm_t *mgr, isc_nm_t **dst) {
363 364 365
	REQUIRE(VALID_NM(mgr));
	REQUIRE(dst != NULL && *dst == NULL);

366
	isc_refcount_increment(&mgr->references);
367 368 369 370 371

	*dst = mgr;
}

void
Evan Hunt's avatar
Evan Hunt committed
372
isc_nm_detach(isc_nm_t **mgr0) {
373 374 375 376 377 378 379 380
	isc_nm_t *mgr = NULL;

	REQUIRE(mgr0 != NULL);
	REQUIRE(VALID_NM(*mgr0));

	mgr = *mgr0;
	*mgr0 = NULL;

381
	if (isc_refcount_decrement(&mgr->references) == 1) {
382 383 384 385
		nm_destroy(&mgr);
	}
}

Evan Hunt's avatar
Evan Hunt committed
386
void
Evan Hunt's avatar
Evan Hunt committed
387
isc_nm_closedown(isc_nm_t *mgr) {
Evan Hunt's avatar
Evan Hunt committed
388 389 390 391 392 393 394 395 396
	REQUIRE(VALID_NM(mgr));

	atomic_store(&mgr->closing, true);
	for (size_t i = 0; i < mgr->nworkers; i++) {
		isc__netievent_t *event = NULL;
		event = isc__nm_get_ievent(mgr, netievent_shutdown);
		isc__nm_enqueue_ievent(&mgr->workers[i], event);
	}
}
397 398

void
Evan Hunt's avatar
Evan Hunt committed
399
isc_nm_destroy(isc_nm_t **mgr0) {
400
	isc_nm_t *mgr = NULL;
401
	int counter = 0;
402
	uint_fast32_t references;
403 404 405 406 407 408

	REQUIRE(mgr0 != NULL);
	REQUIRE(VALID_NM(*mgr0));

	mgr = *mgr0;

Evan Hunt's avatar
Evan Hunt committed
409 410 411 412
	/*
	 * Close active connections.
	 */
	isc_nm_closedown(mgr);
413

414
	/*
Evan Hunt's avatar
Evan Hunt committed
415
	 * Wait for the manager to be dereferenced elsewhere.
416
	 */
417 418 419
	while ((references = isc_refcount_current(&mgr->references)) > 1 &&
	       counter++ < 1000)
	{
420
#ifdef WIN32
421
		_sleep(10);
422
#else  /* ifdef WIN32 */
423
		usleep(10000);
424
#endif /* ifdef WIN32 */
425
	}
426 427

	INSIST(references == 1);
428

Evan Hunt's avatar
Evan Hunt committed
429 430 431 432
	/*
	 * Detach final reference.
	 */
	isc_nm_detach(mgr0);
433 434 435
}

void
Evan Hunt's avatar
Evan Hunt committed
436
isc_nm_maxudp(isc_nm_t *mgr, uint32_t maxudp) {
437 438 439 440 441
	REQUIRE(VALID_NM(mgr));

	atomic_store(&mgr->maxudp, maxudp);
}

442 443
void
isc_nm_tcp_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle,
Evan Hunt's avatar
Evan Hunt committed
444
		       uint32_t keepalive, uint32_t advertised) {
445 446 447 448 449 450 451 452 453 454
	REQUIRE(VALID_NM(mgr));

	mgr->init = init * 100;
	mgr->idle = idle * 100;
	mgr->keepalive = keepalive * 100;
	mgr->advertised = advertised * 100;
}

void
isc_nm_tcp_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,
Evan Hunt's avatar
Evan Hunt committed
455
		       uint32_t *keepalive, uint32_t *advertised) {
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
	REQUIRE(VALID_NM(mgr));

	if (initial != NULL) {
		*initial = mgr->init / 100;
	}

	if (idle != NULL) {
		*idle = mgr->idle / 100;
	}

	if (keepalive != NULL) {
		*keepalive = mgr->keepalive / 100;
	}

	if (advertised != NULL) {
		*advertised = mgr->advertised / 100;
	}
}

475 476 477 478
/*
 * nm_thread is a single worker thread, that runs uv_run event loop
 * until asked to stop.
 */
479
static isc_threadresult_t
Evan Hunt's avatar
Evan Hunt committed
480
nm_thread(isc_threadarg_t worker0) {
481
	isc__networker_t *worker = (isc__networker_t *)worker0;
482
	isc_nm_t *mgr = worker->mgr;
483 484 485 486 487

	isc__nm_tid_v = worker->id;
	isc_thread_setaffinity(isc__nm_tid_v);

	while (true) {
Evan Hunt's avatar
Evan Hunt committed
488
		int r = uv_run(&worker->loop, UV_RUN_DEFAULT);
489 490 491 492 493 494 495 496 497 498
		/* There's always the async handle until we are done */
		INSIST(r > 0 || worker->finished);

		if (worker->paused) {
			LOCK(&worker->lock);
			/* We need to lock the worker first otherwise
			 * isc_nm_resume() might slip in before WAIT() in the
			 * while loop starts and the signal never gets delivered
			 * and we are forever stuck in the paused loop.
			 */
499

500 501 502 503
			LOCK(&mgr->lock);
			mgr->workers_paused++;
			SIGNAL(&mgr->wkstatecond);
			UNLOCK(&mgr->lock);
504

505 506 507 508
			while (worker->paused) {
				WAIT(&worker->cond, &worker->lock);
				(void)process_priority_queue(worker);
			}
509

510 511 512 513
			LOCK(&mgr->lock);
			mgr->workers_paused--;
			SIGNAL(&mgr->wkstatecond);
			UNLOCK(&mgr->lock);
514

515
			UNLOCK(&worker->lock);
516 517
		}

518 519
		if (r == 0) {
			INSIST(worker->finished);
520 521 522
			break;
		}

523
		INSIST(!worker->finished);
524 525 526 527

		/*
		 * Empty the async queue.
		 */
528
		process_queues(worker);
529 530
	}

531 532 533 534
	LOCK(&mgr->lock);
	mgr->workers_running--;
	SIGNAL(&mgr->wkstatecond);
	UNLOCK(&mgr->lock);
535 536

	return ((isc_threadresult_t)0);
537 538 539
}

/*
Evan Hunt's avatar
Evan Hunt committed
540 541 542 543
 * async_cb is a universal callback for 'async' events sent to event loop.
 * It's the only way to safely pass data to the libuv event loop. We use a
 * single async event and a lockless queue of 'isc__netievent_t' structures
 * passed from other threads.
544 545
 */
static void
Evan Hunt's avatar
Evan Hunt committed
546
async_cb(uv_async_t *handle) {
547
	isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
548 549 550 551 552 553 554 555 556 557
	process_queues(worker);
}

static void
isc__nm_async_stopcb(isc__networker_t *worker, isc__netievent_t *ev0) {
	UNUSED(ev0);
	worker->finished = true;
	/* Close the async handler */
	uv_close((uv_handle_t *)&worker->async, NULL);
	/* uv_stop(&worker->loop); */
558 559 560
}

static void
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593
isc__nm_async_pausecb(isc__networker_t *worker, isc__netievent_t *ev0) {
	UNUSED(ev0);
	REQUIRE(worker->paused == false);
	worker->paused = true;
	uv_stop(&worker->loop);
}

static void
isc__nm_async_resumecb(isc__networker_t *worker, isc__netievent_t *ev0) {
	UNUSED(ev0);
	REQUIRE(worker->paused == true);
	worker->paused = false;
}

static bool
process_priority_queue(isc__networker_t *worker) {
	return (process_queue(worker, worker->ievents_prio));
}

static bool
process_normal_queue(isc__networker_t *worker) {
	return (process_queue(worker, worker->ievents));
}

static void
process_queues(isc__networker_t *worker) {
	if (!process_priority_queue(worker)) {
		return;
	}
	(void)process_normal_queue(worker);
}

static bool
Evan Hunt's avatar
Evan Hunt committed
594
process_queue(isc__networker_t *worker, isc_queue_t *queue) {
Evan Hunt's avatar
Evan Hunt committed
595
	isc__netievent_t *ievent = NULL;
596
	bool more = true;
597

Evan Hunt's avatar
Evan Hunt committed
598 599
	while ((ievent = (isc__netievent_t *)isc_queue_dequeue(queue)) != NULL)
	{
600 601
		switch (ievent->type) {
		case netievent_stop:
602 603 604 605
			isc__nm_async_stopcb(worker, ievent);
			/* Don't process more ievents when we are stopping */
			more = false;
			break;
606

607 608 609
		case netievent_udplisten:
			isc__nm_async_udplisten(worker, ievent);
			break;
Evan Hunt's avatar
Evan Hunt committed
610 611
		case netievent_udpstop:
			isc__nm_async_udpstop(worker, ievent);
612 613 614 615
			break;
		case netievent_udpsend:
			isc__nm_async_udpsend(worker, ievent);
			break;
616

617 618 619 620 621 622
		case netievent_tcpconnect:
			isc__nm_async_tcpconnect(worker, ievent);
			break;
		case netievent_tcplisten:
			isc__nm_async_tcplisten(worker, ievent);
			break;
623 624 625 626 627
		case netievent_tcpchildaccept:
			isc__nm_async_tcpchildaccept(worker, ievent);
			break;
		case netievent_tcpaccept:
			isc__nm_async_tcpaccept(worker, ievent);
628
			break;
629
		case netievent_tcpstartread:
630
			isc__nm_async_tcp_startread(worker, ievent);
631 632
			break;
		case netievent_tcppauseread:
633
			isc__nm_async_tcp_pauseread(worker, ievent);
634 635 636 637
			break;
		case netievent_tcpsend:
			isc__nm_async_tcpsend(worker, ievent);
			break;
638 639 640
		case netievent_tcpdnssend:
			isc__nm_async_tcpdnssend(worker, ievent);
			break;
Evan Hunt's avatar
Evan Hunt committed
641 642
		case netievent_tcpstop:
			isc__nm_async_tcpstop(worker, ievent);
643 644 645 646
			break;
		case netievent_tcpclose:
			isc__nm_async_tcpclose(worker, ievent);
			break;
647

648 649 650
		case netievent_tcpdnsclose:
			isc__nm_async_tcpdnsclose(worker, ievent);
			break;
651

Witold Kręcicki's avatar
Witold Kręcicki committed
652 653 654
		case netievent_closecb:
			isc__nm_async_closecb(worker, ievent);
			break;
655 656 657
		case netievent_shutdown:
			isc__nm_async_shutdown(worker, ievent);
			break;
658 659 660 661 662 663 664 665 666

		case netievent_resume:
			isc__nm_async_resumecb(worker, ievent);
			break;
		case netievent_pause:
			isc__nm_async_pausecb(worker, ievent);
			/* Don't process more ievents when we are pausing */
			more = false;
			break;
667 668 669 670
		default:
			INSIST(0);
			ISC_UNREACHABLE();
		}
Witold Kręcicki's avatar
Witold Kręcicki committed
671 672

		isc__nm_put_ievent(worker->mgr, ievent);
673 674 675
		if (!more) {
			break;
		}
676
	}
677
	return (more);
678 679 680
}

void *
Evan Hunt's avatar
Evan Hunt committed
681
isc__nm_get_ievent(isc_nm_t *mgr, isc__netievent_type type) {
Evan Hunt's avatar
Evan Hunt committed
682
	isc__netievent_storage_t *event = isc_mempool_get(mgr->evpool);
683

684
	*event = (isc__netievent_storage_t){ .ni.type = type };
685 686 687
	return (event);
}

Witold Kręcicki's avatar
Witold Kręcicki committed
688
void
Evan Hunt's avatar
Evan Hunt committed
689
isc__nm_put_ievent(isc_nm_t *mgr, void *ievent) {
Evan Hunt's avatar
Evan Hunt committed
690
	isc_mempool_put(mgr->evpool, ievent);
Witold Kręcicki's avatar
Witold Kręcicki committed
691 692
}

693
void
Evan Hunt's avatar
Evan Hunt committed
694
isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
695 696 697 698 699 700 701 702 703 704 705 706
	if (event->type > netievent_prio) {
		/*
		 * We need to make sure this signal will be delivered and
		 * the queue will be processed.
		 */
		LOCK(&worker->lock);
		isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event);
		SIGNAL(&worker->cond);
		UNLOCK(&worker->lock);
	} else {
		isc_queue_enqueue(worker->ievents, (uintptr_t)event);
	}
707 708 709
	uv_async_send(&worker->async);
}

710
bool
Evan Hunt's avatar
Evan Hunt committed
711
isc__nmsocket_active(isc_nmsocket_t *sock) {
712 713 714 715 716 717 718 719 720
	REQUIRE(VALID_NMSOCK(sock));
	if (sock->parent != NULL) {
		return (atomic_load(&sock->parent->active));
	}

	return (atomic_load(&sock->active));
}

void
721
isc__nmsocket_attach(isc_nmsocket_t *sock, isc_nmsocket_t **target) {
722 723 724 725 726
	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(target != NULL && *target == NULL);

	if (sock->parent != NULL) {
		INSIST(sock->parent->parent == NULL); /* sanity check */
727
		isc_refcount_increment0(&sock->parent->references);
728
	} else {
729
		isc_refcount_increment0(&sock->references);
730 731 732 733 734 735 736 737 738
	}

	*target = sock;
}

/*
 * Free all resources inside a socket (including its children if any).
 */
static void
Evan Hunt's avatar
Evan Hunt committed
739 740
nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) {
	isc_nmhandle_t *handle = NULL;
741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766
	isc__nm_uvreq_t *uvreq = NULL;

	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(!isc__nmsocket_active(sock));

	atomic_store(&sock->destroying, true);

	if (sock->parent == NULL && sock->children != NULL) {
		/*
		 * We shouldn't be here unless there are no active handles,
		 * so we can clean up and free the children.
		 */
		for (int i = 0; i < sock->nchildren; i++) {
			if (!atomic_load(&sock->children[i].destroying)) {
				nmsocket_cleanup(&sock->children[i], false);
			}
		}

		/*
		 * This was a parent socket; free the children.
		 */
		isc_mem_put(sock->mgr->mctx, sock->children,
			    sock->nchildren * sizeof(*sock));
		sock->children = NULL;
		sock->nchildren = 0;
	}
767
	if (sock->statsindex != NULL) {
768
		isc__nm_decstats(sock->mgr, sock->statsindex[STATID_ACTIVE]);
769
	}
770

771
	sock->statichandle = NULL;
772 773

	if (sock->outerhandle != NULL) {
774
		isc_nmhandle_detach(&sock->outerhandle);
775 776 777 778
	}

	if (sock->outer != NULL) {
		isc__nmsocket_detach(&sock->outer);
779 780 781 782 783 784 785
	}

	while ((handle = isc_astack_pop(sock->inactivehandles)) != NULL) {
		nmhandle_free(sock, handle);
	}

	if (sock->buf != NULL) {
786
		isc_mem_free(sock->mgr->mctx, sock->buf);
787 788 789 790 791 792
	}

	if (sock->quota != NULL) {
		isc_quota_detach(&sock->quota);
	}

793 794
	sock->pquota = NULL;

Witold Kręcicki's avatar
Witold Kręcicki committed
795 796
	if (sock->timer_initialized) {
		sock->timer_initialized = false;
797
		/* We might be in timer callback */
798
		if (!uv_is_closing((uv_handle_t *)&sock->timer)) {
799 800 801
			uv_timer_stop(&sock->timer);
			uv_close((uv_handle_t *)&sock->timer, NULL);
		}
Witold Kręcicki's avatar
Witold Kręcicki committed
802 803
	}

804 805 806
	isc_astack_destroy(sock->inactivehandles);

	while ((uvreq = isc_astack_pop(sock->inactivereqs)) != NULL) {
Evan Hunt's avatar
Evan Hunt committed
807
		isc_mempool_put(sock->mgr->reqpool, uvreq);
808 809 810
	}

	isc_astack_destroy(sock->inactivereqs);
811
	sock->magic = 0;
812 813 814

	isc_mem_free(sock->mgr->mctx, sock->ah_frees);
	isc_mem_free(sock->mgr->mctx, sock->ah_handles);
815 816
	isc_mutex_destroy(&sock->lock);
	isc_condition_destroy(&sock->cond);
817 818 819 820 821
#ifdef NETMGR_TRACE
	LOCK(&sock->mgr->lock);
	ISC_LIST_UNLINK(sock->mgr->active_sockets, sock, active_link);
	UNLOCK(&sock->mgr->lock);
#endif
822 823 824 825 826 827 828 829 830 831
	if (dofree) {
		isc_nm_t *mgr = sock->mgr;
		isc_mem_put(mgr->mctx, sock, sizeof(*sock));
		isc_nm_detach(&mgr);
	} else {
		isc_nm_detach(&sock->mgr);
	}
}

static void
Evan Hunt's avatar
Evan Hunt committed
832 833
nmsocket_maybe_destroy(isc_nmsocket_t *sock) {
	int active_handles;
834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851
	bool destroy = false;

	if (sock->parent != NULL) {
		/*
		 * This is a child socket and cannot be destroyed except
		 * as a side effect of destroying the parent, so let's go
		 * see if the parent is ready to be destroyed.
		 */
		nmsocket_maybe_destroy(sock->parent);
		return;
	}

	/*
	 * This is a parent socket (or a standalone). See whether the
	 * children have active handles before deciding whether to
	 * accept destruction.
	 */
	LOCK(&sock->lock);
852
	if (atomic_load(&sock->active) || atomic_load(&sock->destroying) ||
Evan Hunt's avatar
Evan Hunt committed
853 854
	    !atomic_load(&sock->closed) || atomic_load(&sock->references) != 0)
	{
855 856 857 858 859
		UNLOCK(&sock->lock);
		return;
	}

	active_handles = atomic_load(&sock->ah);
860 861 862
	if (sock->children != NULL) {
		for (int i = 0; i < sock->nchildren; i++) {
			LOCK(&sock->children[i].lock);
Witold Kręcicki's avatar
Witold Kręcicki committed
863
			active_handles += atomic_load(&sock->children[i].ah);
864 865 866 867
			UNLOCK(&sock->children[i].lock);
		}
	}

868
	if (active_handles == 0 || sock->statichandle != NULL) {
869 870 871 872
		destroy = true;
	}

	if (destroy) {
873 874
		atomic_store(&sock->destroying, true);
		UNLOCK(&sock->lock);
875
		nmsocket_cleanup(sock, true);
876 877
	} else {
		UNLOCK(&sock->lock);
878 879 880 881
	}
}

void
Evan Hunt's avatar
Evan Hunt committed
882
isc__nmsocket_prep_destroy(isc_nmsocket_t *sock) {
883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911
	REQUIRE(sock->parent == NULL);

	/*
	 * The final external reference to the socket is gone. We can try
	 * destroying the socket, but we have to wait for all the inflight
	 * handles to finish first.
	 */
	atomic_store(&sock->active, false);

	/*
	 * If the socket has children, they'll need to be marked inactive
	 * so they can be cleaned up too.
	 */
	if (sock->children != NULL) {
		for (int i = 0; i < sock->nchildren; i++) {
			atomic_store(&sock->children[i].active, false);
		}
	}

	/*
	 * If we're here then we already stopped listening; otherwise
	 * we'd have a hanging reference from the listening process.
	 *
	 * If it's a regular socket we may need to close it.
	 */
	if (!atomic_load(&sock->closed)) {
		switch (sock->type) {
		case isc_nm_tcpsocket:
			isc__nm_tcp_close(sock);
912
			return;
913 914
		case isc_nm_tcpdnssocket:
			isc__nm_tcpdns_close(sock);
915
			return;
916 917 918 919 920 921 922 923 924
		default:
			break;
		}
	}

	nmsocket_maybe_destroy(sock);
}

void
925
isc__nmsocket_detach(isc_nmsocket_t **sockp) {
926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942
	REQUIRE(sockp != NULL && *sockp != NULL);
	REQUIRE(VALID_NMSOCK(*sockp));

	isc_nmsocket_t *sock = *sockp, *rsock = NULL;
	*sockp = NULL;

	/*
	 * If the socket is a part of a set (a child socket) we are
	 * counting references for the whole set at the parent.
	 */
	if (sock->parent != NULL) {
		rsock = sock->parent;
		INSIST(rsock->parent == NULL); /* Sanity check */
	} else {
		rsock = sock;
	}

943
	if (isc_refcount_decrement(&rsock->references) == 1) {
944 945 946 947
		isc__nmsocket_prep_destroy(rsock);
	}
}

948 949 950 951 952 953 954 955 956 957 958
void
isc_nmsocket_close(isc_nmsocket_t **sockp) {
	REQUIRE(sockp != NULL);
	REQUIRE(VALID_NMSOCK(*sockp));
	REQUIRE((*sockp)->type == isc_nm_udplistener ||
		(*sockp)->type == isc_nm_tcplistener ||
		(*sockp)->type == isc_nm_tcpdnslistener);

	isc__nmsocket_detach(sockp);
}

959
void
960
isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type,
Evan Hunt's avatar
Evan Hunt committed
961
		   isc_nmiface_t *iface) {
962 963 964 965
	uint16_t family;

	REQUIRE(sock != NULL);
	REQUIRE(mgr != NULL);
966
	REQUIRE(iface != NULL);
967 968 969

	family = iface->addr.type.sa.sa_family;

970 971 972 973 974 975 976 977
	*sock = (isc_nmsocket_t){ .type = type,
				  .iface = iface,
				  .fd = -1,
				  .ah_size = 32,
				  .inactivehandles = isc_astack_new(
					  mgr->mctx, ISC_NM_HANDLES_STACK_SIZE),
				  .inactivereqs = isc_astack_new(
					  mgr->mctx, ISC_NM_REQS_STACK_SIZE) };
978

979 980 981 982 983 984 985 986 987
#ifdef NETMGR_TRACE
	sock->backtrace_size = backtrace(sock->backtrace, TRACE_SIZE);
	ISC_LINK_INIT(sock, active_link);
	ISC_LIST_INIT(sock->active_handles);
	LOCK(&mgr->lock);
	ISC_LIST_APPEND(mgr->active_sockets, sock, active_link);
	UNLOCK(&mgr->lock);
#endif

988 989 990
	isc_nm_attach(mgr, &sock->mgr);
	sock->uv_handle.handle.data = sock;

Evan Hunt's avatar
Evan Hunt committed
991 992
	sock->ah_frees = isc_mem_allocate(mgr->mctx,
					  sock->ah_size * sizeof(size_t));
993 994
	sock->ah_handles = isc_mem_allocate(
		mgr->mctx, sock->ah_size * sizeof(isc_nmhandle_t *));
995
	ISC_LINK_INIT(&sock->quotacb, link);
996 997 998 999 1000
	for (size_t i = 0; i < 32; i++) {
		sock->ah_frees[i] = i;
		sock->ah_handles[i] = NULL;
	}

1001 1002 1003 1004 1005 1006 1007 1008
	switch (type) {
	case isc_nm_udpsocket:
	case isc_nm_udplistener:
		if (family == AF_INET) {
			sock->statsindex = udp4statsindex;
		} else {
			sock->statsindex = udp6statsindex;
		}
1009
		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_ACTIVE]);
1010 1011 1012 1013 1014 1015 1016 1017
		break;
	case isc_nm_tcpsocket:
	case isc_nm_tcplistener:
		if (family == AF_INET) {
			sock->statsindex = tcp4statsindex;
		} else {
			sock->statsindex = tcp6statsindex;
		}
1018
		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_ACTIVE]);
Mark Andrews's avatar
Mark Andrews committed
1019
		break;
1020 1021 1022 1023
	default:
		break;
	}

1024 1025 1026
	isc_mutex_init(&sock->lock);
	isc_condition_init(&sock->cond);
	isc_refcount_init(&sock->references, 1);
1027

1028
	atomic_init(&sock->active, true);
1029 1030 1031 1032
	atomic_init(&sock->sequential, false);
	atomic_init(&sock->overlimit, false);
	atomic_init(&sock->processing, false);
	atomic_init(&sock->readpaused, false);
1033 1034 1035 1036

	sock->magic = NMSOCK_MAGIC;
}

1037 1038 1039 1040
void
isc__nmsocket_clearcb(isc_nmsocket_t *sock) {
	REQUIRE(VALID_NMSOCK(sock));

1041 1042 1043
	sock->recv_cb = NULL;
	sock->recv_cbarg = NULL;
	sock->accept_cb = NULL;
1044 1045 1046
	sock->accept_cbarg = NULL;
}

1047
void
Evan Hunt's avatar
Evan Hunt committed
1048
isc__nm_free_uvbuf(isc_nmsocket_t *sock, const uv_buf_t *buf) {
1049 1050 1051
	isc__networker_t *worker = NULL;

	REQUIRE(VALID_NMSOCK(sock));
1052 1053 1054 1055
	if (buf->base == NULL) {
		/* Empty buffer: might happen in case of error. */
		return;
	}
1056 1057
	worker = &sock->mgr->workers[sock->tid];

Evan Hunt's avatar
Evan Hunt committed
1058
	REQUIRE(worker->recvbuf_inuse);
1059
	if (sock->type == isc_nm_udpsocket && buf->base > worker->recvbuf &&
1060 1061
	    buf->base <= worker->recvbuf + ISC_NETMGR_RECVBUF_SIZE)
	{
1062
		/* Can happen in case of out-of-order recvmmsg in libuv1.36 */
1063 1064
		return;
	}
Evan Hunt's avatar
Evan Hunt committed
1065 1066
	REQUIRE(buf->base == worker->recvbuf);
	worker->recvbuf_inuse = false;
1067 1068 1069
}

static isc_nmhandle_t *
Evan Hunt's avatar
Evan Hunt committed
1070
alloc_handle(isc_nmsocket_t *sock) {
1071 1072 1073 1074
	isc_nmhandle_t *handle =
		isc_mem_get(sock->mgr->mctx,
			    sizeof(isc_nmhandle_t) + sock->extrahandlesize);

1075
	*handle = (isc_nmhandle_t){ .magic = NMHANDLE_MAGIC };
1076 1077 1078
#ifdef NETMGR_TRACE
	ISC_LINK_INIT(handle, active_link);
#endif
1079 1080 1081 1082 1083 1084 1085
	isc_refcount_init(&handle->references, 1);

	return (handle);
}

isc_nmhandle_t *
isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
Evan Hunt's avatar
Evan Hunt committed
1086
		  isc_sockaddr_t *local) {
1087
	isc_nmhandle_t *handle = NULL;