Commit 4d9aec9c authored by Danny Mayer's avatar Danny Mayer
Browse files

Redesigned sockets to use I/O Completion Ports and Events and eliminating...

Redesigned sockets to use I/O Completion Ports and Events and eliminating multiple socket bugs reported
parent de4f4b6c
/*
* Copyright (C) 2000-2002 Internet Software Consortium.
* Copyright (C) 2000, 2001 Internet Software Consortium.
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
......@@ -15,7 +15,7 @@
* WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/* $Id: socket.c,v 1.18 2002/05/27 00:40:23 marka Exp $ */
/* $Id: socket.c,v 1.19 2002/08/01 03:46:21 mayer Exp $ */
#define MAKE_EXTERNAL 1
#include <config.h>
......@@ -33,6 +33,7 @@
#include <unistd.h>
#include <io.h>
#include <fcntl.h>
#include <process.h>
#include <isc/buffer.h>
#include <isc/bufferlist.h>
......@@ -43,18 +44,31 @@
#include <isc/msgs.h>
#include <isc/mutex.h>
#include <isc/net.h>
#include <isc/os.h>
#include <isc/platform.h>
#include <isc/print.h>
#include <isc/region.h>
#include <isc/socket.h>
#include <isc/strerror.h>
#include <isc/syslog.h>
#include <isc/task.h>
#include <isc/thread.h>
#include <isc/util.h>
#include <isc/win32os.h>
#include "errno2result.h"
#define MAX_SELECT_SECONDS 0
#define MAX_SELECT_MILLISECONDS 400
/*
* Define this macro to control the behavior of connection
* resets on UDP sockets. See Microsoft KnowledgeBase Article Q263823
* for details.
* NOTE: This requires that Windows 2000 systems install Service Pack 2
* or later.
*/
#ifndef SIO_UDP_CONNRESET
#define SIO_UDP_CONNRESET _WSAIOW(IOC_VENDOR,12)
#endif
/*
* Some systems define the socket length argument as an int, some as size_t,
* some as socklen_t. This is here so it can be easily changed if needed.
......@@ -72,6 +86,7 @@
* work around it here.
*/
#define SOFT_ERROR(e) ((e) == WSAEINTR || \
(e) == WSA_IO_PENDING || \
(e) == WSAEWOULDBLOCK || \
(e) == EWOULDBLOCK || \
(e) == EINTR || \
......@@ -120,19 +135,6 @@ typedef isc_event_t intev_t;
* a setsockopt() like interface to request timestamps, and if the OS
* doesn't do it for us, call gettimeofday() on every UDP receive?
*/
#ifdef SO_TIMESTAMP
#ifndef USE_CMSG
#define USE_CMSG 1
#endif
#endif
/*
* Check to see if we have even basic support for cracking messages from
* the control data returned from/sent via recvmsg()/sendmsg().
*/
#if defined(USE_CMSG) && (!defined(CMSG_LEN) || !defined(CMSG_SPACE))
#undef USE_CMSG
#endif
/*
* We really don't want to try and use these control messages. Win32
......@@ -144,10 +146,7 @@ typedef isc_event_t intev_t;
* Message header for recvmsg and sendmsg calls.
* Used value-result for recvmsg, value only for sendmsg.
*/
struct iovec {
void *iov_base; /* starting address of buffer */
size_t iov_len; /* size of buffer */
};
struct msghdr {
void *msg_name; /* optional address */
......@@ -170,11 +169,19 @@ struct isc_socket {
isc_socketmgr_t *manager;
isc_mutex_t lock;
isc_sockettype_t type;
OVERLAPPED overlapped;
/* Pointers to scatter/gather buffers */
WSABUF iov[ISC_SOCKET_MAXSCATTERGATHER];
struct msghdr *messagehdr;
size_t totalBytes;
int iEvent; /* Index into Event Array */
WSAEVENT hEvent; /* Event Handle */
long wait_type; /* Events to wait on */
/* Locked by socket lock. */
ISC_LINK(isc_socket_t) link;
unsigned int references;
int fd;
SOCKET fd;
int pf;
ISC_LIST(isc_socketevent_t) send_list;
......@@ -192,19 +199,66 @@ struct isc_socket {
isc_sockaddr_t address; /* remote address */
unsigned int pending_recv : 1,
pending_send : 1,
unsigned int pending_close : 1,
pending_accept : 1,
iocp : 1, /* I/O Completion Port */
listener : 1, /* listener socket */
connected : 1,
connecting : 1, /* connect pending */
bound : 1; /* bound to local addr */
#ifdef ISC_NET_RECVOVERFLOW
unsigned char overflow; /* used for MSG_TRUNC fake */
#endif
};
/*
* I/O Completion ports Info structures
*/
static HANDLE hHeapHandle = NULL;
static int iocp_total = 0;
typedef struct IoCompletionInfo {
OVERLAPPED overlapped;
isc_socketevent_t *dev;
int request_type;
} IoCompletionInfo;
/*
* Define a maximum number of I/O Completion Port worker threads
* to handle the load on the Completion Port
*/
#define MAX_IOCPTHREADS 20
/*
* event_change structure to handle adds and deletes from the list of
* events in the Wait
*/
typedef struct event_change event_change_t;
struct event_change {
isc_socket_t *sock;
int iEvent;
SOCKET fd;
unsigned int action;
ISC_LINK(event_change_t) link;
};
/*
* Note: We are using an array here since *WaitForMultiple* wants an array
*/
#define MAX_EVENTS 64
/*
* List of events being waited on and their associated sockets
*/
typedef struct sock_event_list {
int max_event;
int total_events;
isc_socket_t *aSockList[MAX_EVENTS];
WSAEVENT aEventList[MAX_EVENTS];
isc_mutex_t EventLock;
ISC_LIST(event_change_t) event_updates;
} sock_event_list;
#define SOCKET_MANAGER_MAGIC ISC_MAGIC('I', 'O', 'm', 'g')
#define VALID_MANAGER(m) ISC_MAGIC_VALID(m, SOCKET_MANAGER_MAGIC)
......@@ -215,16 +269,15 @@ struct isc_socketmgr {
isc_mutex_t lock;
/* Locked by manager lock. */
ISC_LIST(isc_socket_t) socklist;
fd_set read_fds;
fd_set write_fds;
fd_set except_fds;
isc_socket_t *fds[FD_SETSIZE];
int fdstate[FD_SETSIZE];
int maxfd;
int minfd;
sock_event_list sockev_list;
int event_written;
isc_boolean_t bShutdown;
isc_thread_t watcher;
isc_condition_t shutdown_ok;
int pipe_fds[2];
HANDLE hIoCompletionPort;
int maxIOCPThreads;
HANDLE hIOCPThreads[MAX_IOCPTHREADS];
DWORD dwIOCPThreadIds[MAX_IOCPTHREADS];
};
#define CLOSED 0 /* this one must be zero */
......@@ -235,23 +288,21 @@ struct isc_socketmgr {
* send() and recv() iovec counts
*/
#define MAXSCATTERGATHER_SEND (ISC_SOCKET_MAXSCATTERGATHER)
#ifdef ISC_NET_RECVOVERFLOW
# define MAXSCATTERGATHER_RECV (ISC_SOCKET_MAXSCATTERGATHER + 1)
#else
# define MAXSCATTERGATHER_RECV (ISC_SOCKET_MAXSCATTERGATHER)
#endif
#define MAXSCATTERGATHER_RECV (ISC_SOCKET_MAXSCATTERGATHER)
static isc_threadresult_t WINAPI SocketIoThread(LPVOID ThreadContext);
static void send_recvdone_event(isc_socket_t *, isc_socketevent_t **);
static void send_senddone_event(isc_socket_t *, isc_socketevent_t **);
static void free_socket(isc_socket_t **);
static isc_result_t allocate_socket(isc_socketmgr_t *, isc_sockettype_t,
isc_socket_t **);
static void destroy(isc_socket_t **);
static void internal_accept(isc_task_t *, isc_event_t *);
static void internal_connect(isc_task_t *, isc_event_t *);
static void internal_recv(isc_task_t *, isc_event_t *);
static void internal_send(isc_task_t *, isc_event_t *);
static void process_cmsg(isc_socket_t *, struct msghdr *, isc_socketevent_t *);
static isc_result_t
socket_recv(isc_socket_t *, isc_socketevent_t *, isc_task_t *, unsigned int);
static void destroy_socket(isc_socket_t **);
static void internal_accept(isc_socket_t *, int);
static void internal_connect(isc_socket_t *, int);
static void internal_recv(isc_socket_t *, isc_socketevent_t *, int, int);
static void internal_send(isc_socket_t *, isc_socketevent_t *, int, int);
static void build_msghdr_send(isc_socket_t *, isc_socketevent_t *,
struct msghdr *, char *cmsg,
WSABUF *, size_t *);
......@@ -259,44 +310,401 @@ static void build_msghdr_recv(isc_socket_t *, isc_socketevent_t *,
struct msghdr *, char *cmsg,
WSABUF *, size_t *);
#define SELECT_POKE_SHUTDOWN (-1)
#define SELECT_POKE_NOTHING (-2)
#define SELECT_POKE_READ (-3)
#define SELECT_POKE_ACCEPT (-3) /* Same as _READ */
#define SELECT_POKE_WRITE (-4)
#define SELECT_POKE_CONNECT (-5)
#define SELECT_POKE_CLOSE (-6)
long bpipe_written = 0;
enum {
SOCKET_CANCEL,
SOCKET_SHUTDOWN,
SOCKET_RECV,
SOCKET_SEND,
SOCK_ACCEPT
};
enum {
EVENT_ADD,
EVENT_DELETE
};
#define SOCK_DEAD(s) ((s)->references == 0)
#if defined(ISC_SOCKET_DEBUG)
/*
* Routine to handle error messages
* This is used to duump the contents of the sock structure
* You should make sure that the sock is locked before
* dumping it. Since the code uses simple printf() statements
* it should only be used interactively.
*/
void
sock_dump(isc_socket_t *sock) {
isc_socketevent_t *ldev;
isc_socket_newconnev_t *ndev;
isc_sockaddr_t addr;
char socktext[256];
isc_socket_getpeername(sock, &addr);
isc_sockaddr_format(&addr, socktext, sizeof(socktext));
printf("Remote Socket: %s\n", socktext);
isc_socket_getsockname(sock, &addr);
isc_sockaddr_format(&addr, socktext, sizeof(socktext));
printf("This Socket: %s\n", socktext);
printf("\n\t\tSock Dump\n");
printf("\t\tfd: %u\n", sock->fd);
printf("\t\treferences: %d\n", sock->references);
printf("\t\tpending_accept: %d\n", sock->pending_accept);
printf("\t\tpending_close: %d\n", sock->pending_close);
printf("\t\tconnecting: %d\n", sock->connecting);
printf("\t\tconnected: %d\n", sock->connected);
printf("\t\tbound: %d\n", sock->bound);
printf("\t\tiocp: %d\n", sock->iocp);
printf("\t\tsocket type: %d\n", sock->type);
printf("\n\t\tSock Recv List\n");
ldev = ISC_LIST_HEAD(sock->recv_list);
while (ldev != NULL) {
printf("\t\tdev: %p\n", ldev);
ldev = ISC_LIST_NEXT(ldev, ev_link);
}
printf("\n\t\tSock Send List\n");
ldev = ISC_LIST_HEAD(sock->send_list);
while (ldev != NULL) {
printf("\t\tdev: %p\n", ldev);
ldev = ISC_LIST_NEXT(ldev, ev_link);
}
printf("\n\t\tSock Accept List\n");
ndev = ISC_LIST_HEAD(sock->accept_list);
while (ndev != NULL) {
printf("\t\tdev: %p\n", ldev);
ndev = ISC_LIST_NEXT(ndev, ev_link);
}
}
#endif
#define ISC_STRERRORSIZE 128
/* This function will add an entry to the I/O completion port
* that will signal the I/O thread to exit (gracefully)
*/
static void
signal_iocompletionport_exit(isc_socketmgr_t *manager) {
int i;
int errval;
char strbuf[ISC_STRERRORSIZE];
for (i = 0; i < manager->maxIOCPThreads; i++) {
if (!PostQueuedCompletionStatus(manager->hIoCompletionPort, 0, 0, 0)) {
errval = GetLastError();
isc__strerror(errval, strbuf, sizeof(strbuf));
FATAL_ERROR(__FILE__, __LINE__,
isc_msgcat_get(isc_msgcat, ISC_MSGSET_SOCKET,
ISC_MSG_FAILED,
"Can't request service thread to exit: %s"),
strbuf);
}
}
}
/*
* Create the worker threads for the I/O Completion Port
*/
void
isc__strerror(int num, char *buf, size_t size) {
char *msg;
unsigned int unum = num;
iocompletionport_createthreads(int total_threads, isc_socketmgr_t *manager) {
int errval;
char strbuf[ISC_STRERRORSIZE];
int i = 0;
REQUIRE(buf != NULL);
/*
* We need at least one
*/
for (i = 0; i < total_threads; i++) {
manager->hIOCPThreads[i] = CreateThread( NULL, 0, SocketIoThread,
manager, 0,
&manager->dwIOCPThreadIds[i]);
if(manager->hIOCPThreads[i] == NULL) {
errval = GetLastError();
isc__strerror(errval, strbuf, sizeof(strbuf));
FATAL_ERROR(__FILE__, __LINE__,
isc_msgcat_get(isc_msgcat, ISC_MSGSET_SOCKET,
ISC_MSG_FAILED,
"Can't create IOCP thread: %s"),
strbuf);
}
}
}
msg = NTstrerror(num);
if (msg != NULL)
snprintf(buf, size, "%s", msg);
else
snprintf(buf, size, "Unknown error: %u", unum);
/*
* Create/initialise the I/O completion port
*/
void
iocompletionport_init(isc_socketmgr_t *manager) {
int errval;
char strbuf[ISC_STRERRORSIZE];
/*
* Create a private heap to handle the socket overlapped structure
* The miniumum number of structures is 10, there is no maximum
*/
hHeapHandle = HeapCreate(0, 10*sizeof(IoCompletionInfo), 0);
manager->maxIOCPThreads = min(isc_os_ncpus() + 1,
MAX_IOCPTHREADS);
/* Now Create the Completion Port */
manager->hIoCompletionPort = CreateIoCompletionPort(
INVALID_HANDLE_VALUE, NULL,
0, manager->maxIOCPThreads);
if (manager->hIoCompletionPort == NULL) {
errval = GetLastError();
isc__strerror(errval, strbuf, sizeof(strbuf));
FATAL_ERROR(__FILE__, __LINE__,
isc_msgcat_get(isc_msgcat, ISC_MSGSET_SOCKET,
ISC_MSG_FAILED,
"CreateIoCompletionPort() failed "
"during initialization: %s"),
strbuf);
exit(1);
}
/*
* Worker threads for servicing the I/O
*/
iocompletionport_createthreads(manager->maxIOCPThreads, manager);
}
void
iocompletionport_exit(isc_socketmgr_t *manager) {
if (manager->hIoCompletionPort != NULL) {
/* Get each of the service threads to exit
*/
signal_iocompletionport_exit(manager);
}
}
/*
* Add sockets in here and pass the sock data in as part of the information needed
*/
void
iocompletionport_update(isc_socket_t *sock) {
HANDLE hiocp;
if(sock->iocp == 0) {
sock->iocp = 1;
hiocp = CreateIoCompletionPort((HANDLE) sock->fd,
sock->manager->hIoCompletionPort, (DWORD) sock,
sock->manager->maxIOCPThreads);
InterlockedIncrement(&iocp_total);
}
}
void
socket_event_minit(sock_event_list *evlist) {
BOOL bReset;
int i;
/* Initialize the Event List */
evlist->max_event = 0;
evlist->total_events = 0;
for (i = 0; i < MAX_EVENTS; i++) {
evlist->aSockList[i] = NULL;
evlist->aEventList[i] = (WSAEVENT) 0;
}
/*
* initialize the lock
*/
if (isc_mutex_init(&(evlist->EventLock)) != ISC_R_SUCCESS) {
UNEXPECTED_ERROR(__FILE__, __LINE__,
"isc_mutex_init() %s Event Lock",
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
ISC_MSG_FAILED, "failed"));
}
evlist->aEventList[0] = WSACreateEvent();
(evlist->max_event)++;
bReset = WSAResetEvent(evlist->aEventList[0]);
ISC_LIST_INIT(evlist->event_updates);
}
/*
* Note that the eventLock is already locked before calling this function
*/
isc_result_t
socket_eventlist_add(isc_socket_t *sock, sock_event_list *evlist) {
int max_event;
REQUIRE(sock != NULL);
REQUIRE(sock->hEvent != NULL);
REQUIRE(evlist != NULL);
max_event = evlist->max_event;
if(max_event >= MAX_EVENTS) {
return(ISC_R_NOSPACE);
}
sock->iEvent = max_event;
evlist->aSockList[max_event] = sock;
evlist->aEventList[max_event] = sock->hEvent;
evlist->max_event++;
evlist->total_events++;
return (ISC_R_SUCCESS);
}
/*
* Note that the eventLock is locked before calling this function
* All Events and associated sockets are closed here
*/
void
socket_eventlist_delete(isc_socket_t *sock, SOCKET fd, int iEvent,
sock_event_list *evlist) {
int i;
WSAEVENT hEvent;
REQUIRE(evlist != NULL);
REQUIRE(iEvent > 0);
hEvent = evlist->aEventList[iEvent];
if (hEvent != NULL)
WSACloseEvent(hEvent);
for(i = iEvent; i < evlist->max_event; i++) {
evlist->aEventList[i] = evlist->aEventList[i + 1];
evlist->aSockList[i] = evlist->aSockList[i + 1];
if(evlist->aSockList[i] != NULL)
evlist->aSockList[i]->iEvent = i;
}
evlist->aEventList[evlist->max_event] = 0;
evlist->aSockList[evlist->max_event] = NULL;
if(sock != NULL) {
sock->iEvent = 0;
sock->pending_close = 1;
}
if (fd >= 0)
closesocket(fd);
evlist->max_event--;
evlist->total_events--;
}
/*
* Get the event changes off of the list and apply the
* requested changes
*/
isc_result_t
process_eventlist(sock_event_list *evlist, isc_socketmgr_t *manager) {
event_change_t *evchange;
evchange = ISC_LIST_HEAD(evlist->event_updates);
while (evchange != NULL) {
switch (evchange->action) {
case EVENT_ADD:
socket_eventlist_add(evchange->sock, evlist);
break;
case EVENT_DELETE:
socket_eventlist_delete(evchange->sock, evchange->fd,
evchange->iEvent, evlist);
break;
default:
break;
}
ISC_LIST_DEQUEUE(evlist->event_updates, evchange, link);
HeapFree(hHeapHandle, 0, evchange);
manager->event_written--;
evchange = ISC_LIST_HEAD(evlist->event_updates);
}
manager->event_written = 0;
return (ISC_R_SUCCESS);
}
/*
* Add the event list changes to the queue and notify the
* event loop
*/
static void
notify_eventlist(isc_socket_t *sock, sock_event_list *evlist,
unsigned int action) {
event_change_t *evchange;
evchange = HeapAlloc(hHeapHandle, HEAP_ZERO_MEMORY,
sizeof(event_change_t));
evchange->sock = sock;
evchange->action = action;
evchange->iEvent = sock->iEvent;
evchange->fd = sock->fd;
LOCK(&evlist->EventLock);
ISC_LIST_APPEND(evlist->event_updates, evchange, link);
sock->manager->event_written++;
UNLOCK(&evlist->EventLock);
WSASetEvent(evlist->aEventList[0]); /* Alert the Wait List */
}
/*
* Note that the socket is already locked before calling this function
*/
isc_result_t
socket_event_add(isc_socket_t *sock, long type) {
int stat;
WSAEVENT hEvent;
sock_event_list *evlist;
REQUIRE(sock != NULL);
evlist = &(sock->manager->sockev_list);
hEvent = WSACreateEvent();
if (hEvent == WSA_INVALID_EVENT) {
stat = WSAGetLastError();
return (ISC_R_UNEXPECTED);
}
if (WSAEventSelect(sock->fd, hEvent, type) != 0) {
stat = WSAGetLastError();
WSACloseEvent(hEvent);
return (ISC_R_UNEXPECTED);
}
sock->hEvent = hEvent;
sock->wait_type = type;
notify_eventlist(sock, evlist, EVENT_ADD);
return (ISC_R_SUCCESS);
}
/*
* Note that the socket is not locked before calling this function
*/
void
socket_event_delete(isc_socket_t *sock) {
sock_event_list *evlist;
REQUIRE(sock != NULL);
REQUIRE(sock->hEvent != NULL);
evlist = &(sock->manager->sockev_list);
if (sock->hEvent != NULL) {