Commit efafe4fa authored by Brian Wellington's avatar Brian Wellington
Browse files

Non-threaded socket manager.

parent 8dd55625
......@@ -15,7 +15,7 @@
* WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/* $Id: socket.c,v 1.160 2000/08/26 01:31:56 bwelling Exp $ */
/* $Id: socket.c,v 1.161 2000/08/29 23:58:15 bwelling Exp $ */
#include <config.h>
......@@ -38,7 +38,9 @@
#include <isc/list.h>
#include <isc/log.h>
#include <isc/mem.h>
#include <isc/mutex.h>
#include <isc/net.h>
#include <isc/platform.h>
#include <isc/print.h>
#include <isc/region.h>
#include <isc/socket.h>
......@@ -46,6 +48,10 @@
#include <isc/thread.h>
#include <isc/util.h>
#ifndef ISC_PLATFORM_USETHREADS
#include "socket_p.h"
#endif
/*
* Some systems define the socket length argument as an int, some as size_t,
* some as socklen_t. This is here so it can be easily changed if needed.
......@@ -180,16 +186,24 @@ struct isc_socketmgr {
isc_mutex_t lock;
/* Locked by manager lock. */
ISC_LIST(isc_socket_t) socklist;
isc_thread_t watcher;
isc_condition_t shutdown_ok;
fd_set read_fds;
fd_set write_fds;
isc_socket_t *fds[FD_SETSIZE];
int fdstate[FD_SETSIZE];
int maxfd;
#ifdef ISC_PLATFORM_USETHREADS
isc_thread_t watcher;
isc_condition_t shutdown_ok;
int pipe_fds[2];
#else
unsigned int refs;
#endif
};
#ifndef ISC_PLATFORM_USETHREADS
static isc_socketmgr_t *socketmgr = NULL;
#endif
#define CLOSED 0 /* this one must be zero */
#define MANAGED 1
#define CLOSE_PENDING 2
......@@ -272,6 +286,51 @@ socket_log(isc_socket_t *sock, isc_sockaddr_t *address,
}
}
static void
wakeup_socket(isc_socketmgr_t *manager, int fd) {
isc_event_t *ev2;
isc_socketevent_t *rev;
isc_socket_t *sock;
/*
* This is a wakeup on a socket. Look at the event queue for both
* read and write, and decide if we need to watch on it now or not.
*/
INSIST(fd < FD_SETSIZE);
if (manager->fdstate[fd] == CLOSE_PENDING) {
manager->fdstate[fd] = CLOSED;
FD_CLR(fd, &manager->read_fds);
FD_CLR(fd, &manager->write_fds);
close(fd);
return;
}
if (manager->fdstate[fd] != MANAGED)
return;
sock = manager->fds[fd];
/*
* If there are no events, or there is an event but we
* have already queued up the internal event on a task's
* queue, clear the bit. Otherwise, set it.
*/
rev = ISC_LIST_HEAD(sock->recv_list);
ev2 = (isc_event_t *) ISC_LIST_HEAD(sock->accept_list);
if ((rev == NULL && ev2 == NULL)
|| sock->pending_recv || sock->pending_accept)
FD_CLR(sock->fd, &manager->read_fds);
else
FD_SET(sock->fd, &manager->read_fds);
rev = ISC_LIST_HEAD(sock->send_list);
if ((rev == NULL || sock->pending_send) && !sock->connecting)
FD_CLR(sock->fd, &manager->write_fds);
else
FD_SET(sock->fd, &manager->write_fds);
}
#ifdef ISC_PLATFORM_USETHREADS
/*
* Poke the select loop when there is something for us to do.
* We assume that if a write completes here, it will be inserted into the
......@@ -284,7 +343,7 @@ select_poke(isc_socketmgr_t *mgr, int msg) {
do {
cc = write(mgr->pipe_fds[1], &msg, sizeof(int));
} while (cc < 0 && SOFT_ERROR(errno));
if (cc < 0)
FATAL_ERROR(__FILE__, __LINE__,
"write() failed during watcher poke: %s",
......@@ -309,12 +368,25 @@ select_readmsg(isc_socketmgr_t *mgr) {
FATAL_ERROR(__FILE__, __LINE__,
"read() failed during watcher poke: %s",
strerror(errno));
return (SELECT_POKE_NOTHING);
}
return (msg);
}
#else
/*
* Update the state of the socketmgr when something changes.
*/
static void
select_poke(isc_socketmgr_t *manager, int msg) {
if (msg == SELECT_POKE_SHUTDOWN)
return;
else if (msg >= 0)
wakeup_socket(manager, msg);
return;
}
#endif
/*
* Make a fd non-blocking
......@@ -986,8 +1058,10 @@ destroy(isc_socket_t **sockp) {
select_poke(manager, sock->fd);
ISC_LIST_UNLINK(manager->socklist, sock, link);
#ifdef ISC_PLATFORM_USETHREADS
if (ISC_LIST_EMPTY(manager->socklist))
SIGNAL(&manager->shutdown_ok);
#endif
/*
* XXX should reset manager->maxfd here
......@@ -1782,6 +1856,75 @@ internal_send(isc_task_t *me, isc_event_t *ev) {
UNLOCK(&sock->lock);
}
static void
process_fds(isc_socketmgr_t *manager, int maxfd,
fd_set *readfds, fd_set *writefds)
{
int i;
isc_socket_t *sock;
isc_boolean_t unlock_sock;
/*
* Process read/writes on other fds here. Avoid locking
* and unlocking twice if both reads and writes are possible.
*/
for (i = 0 ; i < maxfd ; i++) {
#ifdef ISC_PLATFORM_USETHREADS
if (i == manager->pipe_fds[0] || i == manager->pipe_fds[1])
continue;
#endif
if (manager->fdstate[i] == CLOSE_PENDING) {
manager->fdstate[i] = CLOSED;
FD_CLR(i, &manager->read_fds);
FD_CLR(i, &manager->write_fds);
close(i);
continue;
}
sock = manager->fds[i];
unlock_sock = ISC_FALSE;
if (FD_ISSET(i, readfds)) {
if (sock == NULL) {
FD_CLR(i, &manager->read_fds);
goto check_write;
}
unlock_sock = ISC_TRUE;
LOCK(&sock->lock);
if (!SOCK_DEAD(sock)) {
if (sock->listener)
dispatch_accept(sock);
else
dispatch_recv(sock);
}
FD_CLR(i, &manager->read_fds);
}
check_write:
if (FD_ISSET(i, writefds)) {
if (sock == NULL) {
FD_CLR(i, &manager->write_fds);
continue;
}
if (!unlock_sock) {
unlock_sock = ISC_TRUE;
LOCK(&sock->lock);
}
if (!SOCK_DEAD(sock)) {
if (sock->connecting)
dispatch_connect(sock);
else
dispatch_send(sock);
}
FD_CLR(i, &manager->write_fds);
}
if (unlock_sock)
UNLOCK(&sock->lock);
}
}
#ifdef ISC_PLATFORM_USETHREADS
/*
* This is the thread that will loop forever, always in a select or poll
* call.
......@@ -1792,17 +1935,12 @@ internal_send(isc_task_t *me, isc_event_t *ev) {
static isc_threadresult_t
watcher(void *uap) {
isc_socketmgr_t *manager = uap;
isc_socket_t *sock;
isc_boolean_t done;
int ctlfd;
int cc;
fd_set readfds;
fd_set writefds;
int msg;
isc_boolean_t unlock_sock;
int i;
isc_socketevent_t *rev;
isc_event_t *ev2;
int maxfd;
/*
......@@ -1866,122 +2004,12 @@ watcher(void *uap) {
* and decide if we need to watch on it now
* or not.
*/
if (msg >= 0) {
INSIST(msg < FD_SETSIZE);
if (manager->fdstate[msg] ==
CLOSE_PENDING) {
manager->fdstate[msg] = CLOSED;
FD_CLR(msg,
&manager->read_fds);
FD_CLR(msg,
&manager->write_fds);
close(msg);
continue;
}
if (manager->fdstate[msg] != MANAGED)
continue;
sock = manager->fds[msg];
LOCK(&sock->lock);
/*
* If there are no events, or there
* is an event but we have already
* queued up the internal event on a
* task's queue, clear the bit.
* Otherwise, set it.
*/
rev = ISC_LIST_HEAD(sock->recv_list);
ev2 = (isc_event_t *)
ISC_LIST_HEAD(sock->accept_list);
if ((rev == NULL && ev2 == NULL)
|| sock->pending_recv
|| sock->pending_accept) {
FD_CLR(sock->fd,
&manager->read_fds);
} else {
FD_SET(sock->fd,
&manager->read_fds);
}
rev = ISC_LIST_HEAD(sock->send_list);
if ((rev == NULL
|| sock->pending_send)
&& !sock->connecting) {
FD_CLR(sock->fd,
&manager->write_fds);
} else {
FD_SET(sock->fd,
&manager->write_fds);
}
UNLOCK(&sock->lock);
}
if (msg >= 0)
wakeup_socket(manager, msg);
}
}
/*
* Process read/writes on other fds here. Avoid locking
* and unlocking twice if both reads and writes are possible.
*/
for (i = 0 ; i < maxfd ; i++) {
if (i == manager->pipe_fds[0]
|| i == manager->pipe_fds[1])
continue;
if (manager->fdstate[i] == CLOSE_PENDING) {
manager->fdstate[i] = CLOSED;
FD_CLR(i, &manager->read_fds);
FD_CLR(i, &manager->write_fds);
close(i);
continue;
}
sock = manager->fds[i];
unlock_sock = ISC_FALSE;
if (FD_ISSET(i, &readfds)) {
if (sock == NULL) {
FD_CLR(i, &manager->read_fds);
goto check_write;
}
unlock_sock = ISC_TRUE;
LOCK(&sock->lock);
if (!SOCK_DEAD(sock)) {
if (sock->listener)
dispatch_accept(sock);
else
dispatch_recv(sock);
}
FD_CLR(i, &manager->read_fds);
}
check_write:
if (FD_ISSET(i, &writefds)) {
if (sock == NULL) {
FD_CLR(i, &manager->write_fds);
continue;
}
if (!unlock_sock) {
unlock_sock = ISC_TRUE;
LOCK(&sock->lock);
}
if (!SOCK_DEAD(sock)) {
if (sock->connecting)
dispatch_connect(sock);
else
dispatch_send(sock);
}
FD_CLR(i, &manager->write_fds);
}
if (unlock_sock)
UNLOCK(&sock->lock);
}
process_fds(manager, maxfd, &readfds, &writefds);
}
manager_log(manager, TRACE, "watcher exiting");
......@@ -1989,6 +2017,7 @@ watcher(void *uap) {
UNLOCK(&manager->lock);
return ((isc_threadresult_t)0);
}
#endif
/*
* Create a new socket manager.
......@@ -1999,6 +2028,14 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) {
REQUIRE(managerp != NULL && *managerp == NULL);
#ifndef ISC_PLATFORM_USETHREADS
if (socketmgr != NULL) {
socketmgr->refs++;
*managerp = socketmgr;
return (ISC_R_SUCCESS);
}
#endif
manager = isc_mem_get(mctx, sizeof *manager);
if (manager == NULL)
return (ISC_R_NOMEMORY);
......@@ -2013,7 +2050,7 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) {
"isc_mutex_init() failed");
return (ISC_R_UNEXPECTED);
}
#ifdef ISC_PLATFORM_USETHREADS
if (isc_condition_init(&manager->shutdown_ok) != ISC_R_SUCCESS) {
DESTROYLOCK(&manager->lock);
isc_mem_put(mctx, manager, sizeof *manager);
......@@ -2040,16 +2077,24 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) {
#if 0
RUNTIME_CHECK(make_nonblock(manager->pipe_fds[1]) == ISC_R_SUCCESS);
#endif
#else
manager->refs = 1;
#endif
/*
* Set up initial state for the select loop
*/
FD_ZERO(&manager->read_fds);
FD_ZERO(&manager->write_fds);
#ifdef ISC_PLATFORM_USETHREADS
FD_SET(manager->pipe_fds[0], &manager->read_fds);
manager->maxfd = manager->pipe_fds[0];
#else
manager->maxfd = 0;
#endif
memset(manager->fdstate, 0, sizeof(manager->fdstate));
#ifdef ISC_PLATFORM_USETHREADS
/*
* Start up the select/poll thread.
*/
......@@ -2063,9 +2108,12 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) {
close(manager->pipe_fds[1]);
return (ISC_R_UNEXPECTED);
}
#endif
isc_mem_attach(mctx, &manager->mctx);
#ifndef ISC_PLATFORM_USETHREADS
socketmgr = manager;
#endif
*managerp = manager;
return (ISC_R_SUCCESS);
......@@ -2085,8 +2133,17 @@ isc_socketmgr_destroy(isc_socketmgr_t **managerp) {
manager = *managerp;
REQUIRE(VALID_MANAGER(manager));
#ifndef ISC_PLATFORM_USETHREADS
if (manager->refs > 1) {
manager->refs--;
*managerp = NULL;
return;
}
#endif
LOCK(&manager->lock);
#ifdef ISC_PLATFORM_USETHREADS
/*
* Wait for all sockets to be destroyed.
*/
......@@ -2094,33 +2151,47 @@ isc_socketmgr_destroy(isc_socketmgr_t **managerp) {
manager_log(manager, CREATION, "sockets exist");
WAIT(&manager->shutdown_ok, &manager->lock);
}
#else
/*
* Hope all sockets have been destroyed.
*/
if (!ISC_LIST_EMPTY(manager->socklist)) {
manager_log(manager, CREATION, "sockets exist");
INSIST(0);
}
#endif
UNLOCK(&manager->lock);
/*
* Here, poke our select/poll thread. Do this by closing the write
* half of the pipe, which will send EOF to the read half.
* This is currently a no-op in the non-threaded case.
*/
select_poke(manager, SELECT_POKE_SHUTDOWN);
#ifdef ISC_PLATFORM_USETHREADS
/*
* Wait for thread to exit.
*/
if (isc_thread_join(manager->watcher, NULL) != ISC_R_SUCCESS)
UNEXPECTED_ERROR(__FILE__, __LINE__,
"isc_thread_join() failed");
#endif
/*
* Clean up.
*/
#ifdef ISC_PLATFORM_USETHREADS
close(manager->pipe_fds[0]);
close(manager->pipe_fds[1]);
(void)isc_condition_destroy(&manager->shutdown_ok);
#endif
for (i = 0 ; i < FD_SETSIZE ; i++)
if (manager->fdstate[i] == CLOSE_PENDING)
close(i);
(void)isc_condition_destroy(&manager->shutdown_ok);
DESTROYLOCK(&manager->lock);
manager->magic = 0;
mctx= manager->mctx;
......@@ -2652,6 +2723,7 @@ isc_socket_accept(isc_socket_t *sock,
isc_task_t *ntask = NULL;
isc_socket_t *nsock;
isc_result_t ret;
isc_boolean_t do_poke = ISC_FALSE;
REQUIRE(VALID_SOCKET(sock));
manager = sock->manager;
......@@ -2697,10 +2769,13 @@ isc_socket_accept(isc_socket_t *sock,
* bit of time waking it up now or later won't matter all that much.
*/
if (ISC_LIST_EMPTY(sock->accept_list))
select_poke(manager, sock->fd);
do_poke = ISC_TRUE;
ISC_LIST_ENQUEUE(sock->accept_list, dev, ev_link);
if (do_poke)
select_poke(manager, sock->fd);
UNLOCK(&sock->lock);
return (ISC_R_SUCCESS);
}
......@@ -3225,3 +3300,27 @@ isc_socket_isbound(isc_socket_t *sock) {
return (val);
}
#ifndef ISC_PLATFORM_USETHREADS
void
isc__socketmgr_getfdsets(fd_set *readset, fd_set *writeset, int *maxfd) {
if (socketmgr == NULL)
*maxfd = 0;
else {
*readset = socketmgr->read_fds;
*writeset = socketmgr->write_fds;
*maxfd = socketmgr->maxfd + 1;
}
}
isc_result_t
isc__socketmgr_dispatch(fd_set *readset, fd_set *writeset, int maxfd) {
isc_socketmgr_t *manager = socketmgr;
if (manager == NULL)
return (ISC_R_NOTFOUND);
process_fds(manager, maxfd, readset, writeset);
return (ISC_R_SUCCESS);
}
#endif
/*
* Copyright (C) 2000 Internet Software Consortium.
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SOFTWARE CONSORTIUM
* DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
* INTERNET SOFTWARE CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
* INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
* FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
* NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
* WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/* $Id: socket_p.h,v 1.1 2000/08/29 23:58:17 bwelling Exp $ */
#ifndef ISC_SOCKET_P_H
#define ISC_SOCKET_P_H
void
isc__socketmgr_getfdsets(fd_set *readset, fd_set *writeset, int *maxfd);
isc_result_t
isc__socketmgr_dispatch(fd_set *readset, fd_set *writeset, int maxfd);
#endif /* ISC_SOCKET_P_H */
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment