Commit 8c17b6f1 authored by Danny Mayer's avatar Danny Mayer
Browse files

Updated code to support more than 63 accepts and connects by adding capability...

Updated code to support more than 63 accepts and connects by adding capability to add more event_wait threads on demand
parent a6211a2f
......@@ -15,7 +15,40 @@
* WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/* $Id: socket.c,v 1.20 2002/08/02 03:45:56 mayer Exp $ */
/* $Id: socket.c,v 1.21 2002/08/06 03:32:53 mayer Exp $ */
/* This code has been rewritten to take advantage of Windows Sockets
* I/O Completion Ports and Events. I/O Completion Ports is ONLY
* available on Windows NT, Windows 2000 and Windows XP series of
* the Windows Operating Systems. In CANNOT run on Windows 95, Windows 98
* or the follow-ons to those Systems.
*
* This code is by nature multithreaded and takes advantage of various
* features to pass on information through the completion port for
* when I/O is completed. All sends and receives are completed through
* the completion port. Due to an implementation bug in Windows 2000,
* Service Pack 2 must installed on the system for this code to run correctly.
* For details on this problem see Knowledge base article Q263823.
* The code checks for this. The number of Completion Port Worker threads
* used is the total number of CPU's + 1. This increases the likelihood that
* a Worker Thread is available for processing a completed request.
*
* All accepts and connects are accomplished through the WSAEventSelect()
* function and the event_wait loop. Events are added to and deleted from
* each event_wait thread via a common event_update stack owned by the socket
* manager. If the event_wait thread runs out of array space in the events
* array it will look for another event_wait thread to add the event. If it
* fails to find another one it will create a new thread to handle the
* outstanding event.
*
* A future enhancement is to use AcceptEx to take avantage of Overlapped
* I/O which allows for enhanced performance of TCP connections.
* This will also reduce the number of events that are waited on by the
* event_wait threads to just the connect sockets and reduce the number
* additional threads required.
*
* XXXPDM 5 August, 2002
*/
#define MAKE_EXTERNAL 1
#include <config.h>
......@@ -173,9 +206,11 @@ struct isc_socket {
/* Pointers to scatter/gather buffers */
WSABUF iov[ISC_SOCKET_MAXSCATTERGATHER];
size_t totalBytes;
int iEvent; /* Index into Event Array */
WSAEVENT hEvent; /* Event Handle */
long wait_type; /* Events to wait on */
WSAEVENT hAlert; /* Alert Event Handle */
DWORD evthread_id; /* Event Thread Id for socket */
/* Locked by socket lock. */
ISC_LINK(isc_socket_t) link;
......@@ -223,7 +258,8 @@ typedef struct IoCompletionInfo {
/*
* Define a maximum number of I/O Completion Port worker threads
* to handle the load on the Completion Port
* to handle the load on the Completion Port. The actual number
* used is the number of CPU's + 1.
*/
#define MAX_IOCPTHREADS 20
......@@ -235,7 +271,8 @@ typedef struct event_change event_change_t;
struct event_change {
isc_socket_t *sock;
int iEvent;
WSAEVENT hEvent;
DWORD evthread_id;
SOCKET fd;
unsigned int action;
ISC_LINK(event_change_t) link;
......@@ -243,6 +280,8 @@ struct event_change {
/*
* Note: We are using an array here since *WaitForMultiple* wants an array
* WARNING: This value may not be greater than 64 since the
* WSAWaitForMultipleEvents function is limited to 64 events.
*/
#define MAX_EVENTS 64
......@@ -255,29 +294,41 @@ typedef struct sock_event_list {
int total_events;
isc_socket_t *aSockList[MAX_EVENTS];
WSAEVENT aEventList[MAX_EVENTS];
isc_mutex_t EventLock;
ISC_LIST(event_change_t) event_updates;
} sock_event_list;
/*
* Thread Event structure for managing the threads handling events
*/
typedef struct events_thread events_thread_t;
struct events_thread {
isc_thread_t thread_handle; /* Thread's handle */
DWORD thread_id; /* Thread's id */
sock_event_list sockev_list;
isc_socketmgr_t *manager;
ISC_LINK(events_thread_t) link;
};
#define SOCKET_MANAGER_MAGIC ISC_MAGIC('I', 'O', 'm', 'g')
#define VALID_MANAGER(m) ISC_MAGIC_VALID(m, SOCKET_MANAGER_MAGIC)
struct isc_socketmgr {
/* Not locked. */
unsigned int magic;
isc_mem_t *mctx;
isc_mutex_t lock;
unsigned int magic;
isc_mem_t *mctx;
isc_mutex_t lock;
/* Locked by manager lock. */
ISC_LIST(isc_socket_t) socklist;
sock_event_list sockev_list;
int event_written;
isc_boolean_t bShutdown;
isc_thread_t watcher;
isc_condition_t shutdown_ok;
HANDLE hIoCompletionPort;
int maxIOCPThreads;
HANDLE hIOCPThreads[MAX_IOCPTHREADS];
DWORD dwIOCPThreadIds[MAX_IOCPTHREADS];
ISC_LIST(event_change_t) event_updates;
ISC_LIST(isc_socket_t) socklist;
int event_written;
WSAEVENT prime_alert;
isc_boolean_t bShutdown;
ISC_LIST(events_thread_t) ev_threads;
isc_condition_t shutdown_ok;
HANDLE hIoCompletionPort;
int maxIOCPThreads;
HANDLE hIOCPThreads[MAX_IOCPTHREADS];
DWORD dwIOCPThreadIds[MAX_IOCPTHREADS];
};
#define CLOSED 0 /* this one must be zero */
......@@ -290,6 +341,7 @@ struct isc_socketmgr {
#define MAXSCATTERGATHER_SEND (ISC_SOCKET_MAXSCATTERGATHER)
#define MAXSCATTERGATHER_RECV (ISC_SOCKET_MAXSCATTERGATHER)
static isc_threadresult_t WINAPI event_wait(void *uap);
static isc_threadresult_t WINAPI SocketIoThread(LPVOID ThreadContext);
static void free_socket(isc_socket_t **);
......@@ -311,7 +363,7 @@ enum {
#if defined(ISC_SOCKET_DEBUG)
/*
* This is used to duump the contents of the sock structure
* This is used to dump the contents of the sock structure
* You should make sure that the sock is locked before
* dumping it. Since the code uses simple printf() statements
* it should only be used interactively.
......@@ -372,8 +424,10 @@ signal_iocompletionport_exit(isc_socketmgr_t *manager) {
int errval;
char strbuf[ISC_STRERRORSIZE];
REQUIRE(VALID_MANAGER(manager));
for (i = 0; i < manager->maxIOCPThreads; i++) {
if (!PostQueuedCompletionStatus(manager->hIoCompletionPort, 0, 0, 0)) {
if (!PostQueuedCompletionStatus(manager->hIoCompletionPort,
0, 0, 0)) {
errval = GetLastError();
isc__strerror(errval, strbuf, sizeof(strbuf));
FATAL_ERROR(__FILE__, __LINE__,
......@@ -392,8 +446,10 @@ void
iocompletionport_createthreads(int total_threads, isc_socketmgr_t *manager) {
int errval;
char strbuf[ISC_STRERRORSIZE];
int i = 0;
int i;
INSIST(total_threads > 0);
REQUIRE(VALID_MANAGER(manager));
/*
* We need at least one
*/
......@@ -420,6 +476,8 @@ void
iocompletionport_init(isc_socketmgr_t *manager) {
int errval;
char strbuf[ISC_STRERRORSIZE];
REQUIRE(VALID_MANAGER(manager));
/*
* Create a private heap to handle the socket overlapped structure
* The miniumum number of structures is 10, there is no maximum
......@@ -453,6 +511,8 @@ iocompletionport_init(isc_socketmgr_t *manager) {
void
iocompletionport_exit(isc_socketmgr_t *manager) {
REQUIRE(VALID_MANAGER(manager));
if (manager->hIoCompletionPort != NULL) {
/* Get each of the service threads to exit
*/
......@@ -465,8 +525,9 @@ iocompletionport_exit(isc_socketmgr_t *manager) {
*/
void
iocompletionport_update(isc_socket_t *sock) {
HANDLE hiocp;
REQUIRE(sock != NULL);
if(sock->iocp == 0) {
sock->iocp = 1;
hiocp = CreateIoCompletionPort((HANDLE) sock->fd,
......@@ -482,6 +543,7 @@ socket_event_minit(sock_event_list *evlist) {
BOOL bReset;
int i;
REQUIRE(evlist != NULL);
/* Initialize the Event List */
evlist->max_event = 0;
evlist->total_events = 0;
......@@ -490,109 +552,213 @@ socket_event_minit(sock_event_list *evlist) {
evlist->aEventList[i] = (WSAEVENT) 0;
}
evlist->aEventList[0] = WSACreateEvent();
(evlist->max_event)++;
bReset = WSAResetEvent(evlist->aEventList[0]);
}
/*
* Event Thread Initialization
*/
isc_result_t
event_thread_create(events_thread_t **evthreadp, isc_socketmgr_t *manager) {
events_thread_t *evthread;
REQUIRE(VALID_MANAGER(manager));
REQUIRE(evthreadp != NULL && *evthreadp == NULL);
evthread = isc_mem_get(manager->mctx, sizeof(*evthread));
socket_event_minit(&evthread->sockev_list);
ISC_LINK_INIT(evthread, link);
evthread->manager = manager;
ISC_LIST_APPEND(manager->ev_threads, evthread, link);
/*
* initialize the lock
* Start up the event wait thread.
*/
if (isc_mutex_init(&(evlist->EventLock)) != ISC_R_SUCCESS) {
if (isc_thread_create(event_wait, evthread, &evthread->thread_handle) !=
ISC_R_SUCCESS) {
isc_mem_put(manager->mctx, evthread, sizeof(*evthread));
UNEXPECTED_ERROR(__FILE__, __LINE__,
"isc_mutex_init() %s Event Lock",
"isc_thread_create() %s",
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
ISC_MSG_FAILED, "failed"));
return (ISC_R_UNEXPECTED);
}
evlist->aEventList[0] = WSACreateEvent();
(evlist->max_event)++;
bReset = WSAResetEvent(evlist->aEventList[0]);
ISC_LIST_INIT(evlist->event_updates);
*evthreadp = evthread;
return (ISC_R_SUCCESS);
}
/*
* Note that the eventLock is already locked before calling this function
* Locate a thread with space for additional events or create one if
* necessary. The manager is locked at this point so the information
* cannot be changed by another thread while we are searching.
*/
isc_result_t
socket_eventlist_add(isc_socket_t *sock, sock_event_list *evlist) {
void
locate_available_thread(isc_socketmgr_t *manager) {
events_thread_t *evthread;
DWORD threadid = GetCurrentThreadId();
evthread = ISC_LIST_HEAD(manager->ev_threads);
while (evthread != NULL) {
/*
* We need to find a thread with space to add an event
* If we find it, alert it to process the event change
* list
*/
if(threadid != evthread->thread_id &&
evthread->sockev_list.max_event < MAX_EVENTS) {
WSASetEvent(evthread->sockev_list.aEventList[0]);
return;
}
evthread = ISC_LIST_NEXT(evthread, link);
}
/*
* We need to create a new thread as other threads are full.
* If we succeed in creating the thread, alert it to
* process the event change list since it will have space.
* If we are unable to create one, the event will stay on the
* list and the next event_wait thread will try again to add
* the event. It will call here again if it has no space.
*/
if (event_thread_create(&evthread, manager) == ISC_R_SUCCESS) {
WSASetEvent(evthread->sockev_list.aEventList[0]);
}
}
isc_boolean_t
socket_eventlist_add(event_change_t *evchange, sock_event_list *evlist,
isc_socketmgr_t *manager) {
int max_event;
isc_socket_t *sock;
REQUIRE(evchange != NULL);
sock = evchange->sock;
REQUIRE(sock != NULL);
REQUIRE(sock->hEvent != NULL);
REQUIRE(evlist != NULL);
max_event = evlist->max_event;
if(max_event >= MAX_EVENTS) {
return(ISC_R_NOSPACE);
locate_available_thread(manager);
return (ISC_FALSE);
}
sock->iEvent = max_event;
evlist->aSockList[max_event] = sock;
evlist->aEventList[max_event] = sock->hEvent;
evlist->max_event++;
evlist->total_events++;
return (ISC_R_SUCCESS);
sock->hAlert = evlist->aEventList[0];
sock->evthread_id = GetCurrentThreadId();
return (ISC_TRUE);
}
/*
* Note that the eventLock is locked before calling this function
* All Events and associated sockets are closed here
* All Events and associated sockes are closed here
*/
void
socket_eventlist_delete(isc_socket_t *sock, SOCKET fd, int iEvent,
sock_event_list *evlist) {
isc_boolean_t
socket_eventlist_delete(event_change_t *evchange, sock_event_list *evlist) {
int i;
WSAEVENT hEvent;
int iEvent = -1;
REQUIRE(evlist != NULL);
REQUIRE(iEvent > 0);
REQUIRE(evchange != NULL);
/* Make sure this is the right thread from which to delete the event */
if(evchange->evthread_id != GetCurrentThreadId())
return (ISC_FALSE);
hEvent = evlist->aEventList[iEvent];
if (hEvent != NULL)
WSACloseEvent(hEvent);
REQUIRE(evlist != NULL);
REQUIRE(evchange->hEvent != NULL);
hEvent = evchange->hEvent;
/* Find the Event */
for (i = 1; i < evlist->max_event; i++) {
if (evlist->aEventList[i] == hEvent) {
iEvent = i;
break;
}
}
/* Actual event start at 1 */
if (iEvent < 1)
return (ISC_FALSE);
for(i = iEvent; i < evlist->max_event; i++) {
for(i = iEvent; i < (evlist->max_event - 1); i++) {
evlist->aEventList[i] = evlist->aEventList[i + 1];
evlist->aSockList[i] = evlist->aSockList[i + 1];
if(evlist->aSockList[i] != NULL)
evlist->aSockList[i]->iEvent = i;
}
evlist->aEventList[evlist->max_event] = 0;
evlist->aSockList[evlist->max_event] = NULL;
if(sock != NULL) {
sock->iEvent = 0;
sock->pending_close = 1;
}
if (fd >= 0)
closesocket(fd);
evlist->aEventList[evlist->max_event - 1] = 0;
evlist->aSockList[evlist->max_event - 1] = NULL;
/* Cleanup */
WSACloseEvent(hEvent);
if (evchange->fd >= 0)
closesocket(evchange->fd);
evlist->max_event--;
evlist->total_events--;
return (ISC_TRUE);
}
/*
* Get the event changes off of the list and apply the
* requested changes
* requested changes. The manager lock is taken out at
* the start of this function to prevent other event_wait
* threads processing the same information at the same
* time. The queue may not be empty on exit since other
* threads may be involved in processing the queue.
*
* The deletes are done first in order that there be space
* available for the events being added in the same thread
* in case the event list is almost full. This reduces the
* probability of having to create another thread which would
* increase overhead costs.
*/
isc_result_t
process_eventlist(sock_event_list *evlist, isc_socketmgr_t *manager) {
event_change_t *evchange;
event_change_t *next;
isc_boolean_t del;
REQUIRE(evlist != NULL);
LOCK(&manager->lock);
evchange = ISC_LIST_HEAD(evlist->event_updates);
/* First the deletes */
evchange = ISC_LIST_HEAD(manager->event_updates);
while (evchange != NULL) {
switch (evchange->action) {
case EVENT_ADD:
socket_eventlist_add(evchange->sock, evlist);
break;
case EVENT_DELETE:
socket_eventlist_delete(evchange->sock, evchange->fd,
evchange->iEvent, evlist);
break;
default:
break;
next = ISC_LIST_NEXT(evchange, link);
del = ISC_FALSE;
if(evchange->action == EVENT_DELETE) {
del = socket_eventlist_delete(evchange, evlist);
/* Delete only if this thread's socket list was updated */
if (del) {
ISC_LIST_DEQUEUE(manager->event_updates,
evchange, link);
HeapFree(hHeapHandle, 0, evchange);
manager->event_written--;
}
}
ISC_LIST_DEQUEUE(evlist->event_updates, evchange, link);
HeapFree(hHeapHandle, 0, evchange);
manager->event_written--;
evchange = ISC_LIST_HEAD(evlist->event_updates);
evchange = next;
}
manager->event_written = 0;
/* Now the adds */
evchange = ISC_LIST_HEAD(manager->event_updates);
while (evchange != NULL) {
next = ISC_LIST_NEXT(evchange, link);
del = ISC_FALSE;
if(evchange->action == EVENT_ADD) {
del = socket_eventlist_add(evchange, evlist, manager);
/* Delete only if this thread's socket list was updated */
if (del) {
ISC_LIST_DEQUEUE(manager->event_updates,
evchange, link);
HeapFree(hHeapHandle, 0, evchange);
manager->event_written--;
}
}
evchange = next;
}
UNLOCK(&manager->lock);
return (ISC_R_SUCCESS);
}
/*
......@@ -600,24 +766,32 @@ process_eventlist(sock_event_list *evlist, isc_socketmgr_t *manager) {
* event loop
*/
static void
notify_eventlist(isc_socket_t *sock, sock_event_list *evlist,
notify_eventlist(isc_socket_t *sock, isc_socketmgr_t *manager,
unsigned int action) {
event_change_t *evchange;
REQUIRE(VALID_MANAGER(manager));
REQUIRE(sock != NULL);
evchange = HeapAlloc(hHeapHandle, HEAP_ZERO_MEMORY,
sizeof(event_change_t));
evchange->sock = sock;
evchange->action = action;
evchange->iEvent = sock->iEvent;
evchange->hEvent = sock->hEvent;
evchange->fd = sock->fd;
evchange->evthread_id = sock->evthread_id;
LOCK(&evlist->EventLock);
ISC_LIST_APPEND(evlist->event_updates, evchange, link);
LOCK(&manager->lock);
ISC_LIST_APPEND(manager->event_updates, evchange, link);
sock->manager->event_written++;
UNLOCK(&evlist->EventLock);
WSASetEvent(evlist->aEventList[0]); /* Alert the Wait List */
UNLOCK(&manager->lock);
/* Alert the Wait List */
if (sock->hAlert != NULL)
WSASetEvent(sock->hAlert);
else
WSASetEvent(manager->prime_alert);
}
/*
* Note that the socket is already locked before calling this function
......@@ -626,10 +800,8 @@ isc_result_t
socket_event_add(isc_socket_t *sock, long type) {
int stat;
WSAEVENT hEvent;
sock_event_list *evlist;
REQUIRE(sock != NULL);
evlist = &(sock->manager->sockev_list);
hEvent = WSACreateEvent();
if (hEvent == WSA_INVALID_EVENT) {
......@@ -644,7 +816,7 @@ socket_event_add(isc_socket_t *sock, long type) {
sock->hEvent = hEvent;
sock->wait_type = type;
notify_eventlist(sock, evlist, EVENT_ADD);
notify_eventlist(sock, sock->manager, EVENT_ADD);
return (ISC_R_SUCCESS);
}
/*
......@@ -652,17 +824,17 @@ socket_event_add(isc_socket_t *sock, long type) {
*/
void
socket_event_delete(isc_socket_t *sock) {
sock_event_list *evlist;
REQUIRE(sock != NULL);
REQUIRE(sock->hEvent != NULL);
evlist = &(sock->manager->sockev_list);
if (sock->hEvent != NULL) {
sock->wait_type = 0;
sock->pending_close = 1;
notify_eventlist(sock, evlist, EVENT_DELETE);
notify_eventlist(sock, sock->manager, EVENT_DELETE);
sock->hEvent = NULL;
sock->hAlert = NULL;
sock->evthread_id = 0;
}
}
......@@ -675,6 +847,8 @@ socket_event_delete(isc_socket_t *sock) {
*/
void
socket_close(isc_socket_t *sock) {
REQUIRE(sock != NULL);
sock->pending_close = 1;
if (sock->hEvent != NULL)
socket_event_delete(sock);
......@@ -701,8 +875,6 @@ BOOL InitSockets() {
err = WSAStartup(wVersionRequested, &wsaData);
if ( err != 0 ) {
/* Tell the user that we could not find a usable Winsock DLL */
NTReportError("named",
"Application Requires Winsock 2.0 or later. Exiting");
return(FALSE);
}
return(TRUE);
......@@ -894,7 +1066,7 @@ connection_reset_fix(SOCKET fd) {
if(isc_win32os_majorversion() < 5)
return (ISC_R_SUCCESS); /* NT 4.0 has no problem */
/* disable new behavior using IOCTL: SIO_UDP_CONNRESET */
/* disable bad behavior using IOCTL: SIO_UDP_CONNRESET */
status = WSAIoctl(fd, SIO_UDP_CONNRESET, &bNewBehavior,
sizeof(bNewBehavior), NULL, 0,
&dwBytesReturned, NULL, NULL);
......@@ -1243,7 +1415,7 @@ completeio_recv(isc_socket_t *sock, isc_socketevent_t *dev,
/*
* If we read less than we expected, update counters,
* and let the upper layer poke the descriptor.
* and let the upper layer handle it.
*/
if (((size_t)cc != sock->totalBytes) && (dev->n < dev->minimum))
return (DOIO_SOFT);
......@@ -1468,6 +1640,8 @@ destroy_socket(isc_socket_t **sockp) {
isc_socket_t *sock = *sockp;
isc_socketmgr_t *manager = sock->manager;
REQUIRE(sock != NULL);
socket_log(sock, NULL, CREATION, isc_msgcat, ISC_MSGSET_SOCKET,
ISC_MSG_DESTROYING, "destroying socket %d", sock->fd);
......@@ -1475,7 +1649,6 @@ destroy_socket(isc_socket_t **sockp) {
INSIST(ISC_LIST_EMPTY(sock->recv_list));
INSIST(ISC_LIST_EMPTY(sock->send_list));
INSIST(sock->connect_ev == NULL);
REQUIRE(sock->fd >= 0);
LOCK(&manager->lock);
......@@ -1537,8 +1710,9 @@ allocate_socket(isc_socketmgr_t *manager, isc_sockettype_t type,
sock->connected = 0;
sock->connecting = 0;
sock->bound = 0;
sock->iEvent = 0;
sock->hEvent = NULL;
sock->hAlert = NULL;
sock->evthread_id = 0;
sock->wait_type = 0;
/*
......@@ -1867,7 +2041,7 @@ internal_accept(isc_socket_t *sock, int accept_errno) {
INSIST(VALID_MANAGER(manager));
INSIST(sock->listener);
INSIST(sock->iEvent);
INSIST(sock->hEvent != NULL);
INSIST(sock->pending_accept == 1);
sock->pending_accept = 0;
......@@ -1891,8 +2065,8 @@ internal_accept(isc_socket_t *sock, int accept_errno) {
/*
* Try to accept the new connection. If the accept fails with
* EAGAIN or EINTR, simply poke the watcher to watch this socket
* again.
* EAGAIN or EINTR, the event wait will be notified again since