Commit 6feb688f authored by Witold Krecicki's avatar Witold Krecicki Committed by Witold Krecicki
Browse files

Use multiple network event loop threads with separate data structures.

parent cbc1b123
......@@ -824,7 +824,7 @@ create_managers(void) {
}
result = isc_socketmgr_create2(named_g_mctx, &named_g_socketmgr,
maxsocks);
maxsocks, named_g_udpdisp);
if (result != ISC_R_SUCCESS) {
UNEXPECTED_ERROR(__FILE__, __LINE__,
"isc_socketmgr_create() failed: %s",
......
......@@ -813,7 +813,7 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp);
isc_result_t
isc_socketmgr_create2(isc_mem_t *mctx, isc_socketmgr_t **managerp,
unsigned int maxsocks);
unsigned int maxsocks, int nthreads);
/*%<
* Create a socket manager. If "maxsocks" is non-zero, it specifies the
* maximum number of sockets that the created manager should handle.
......
......@@ -324,6 +324,7 @@ typedef isc_event_t intev_t;
typedef struct isc__socket isc__socket_t;
typedef struct isc__socketmgr isc__socketmgr_t;
typedef struct isc__socketthread isc__socketthread_t;
#define NEWCONNSOCK(ev) ((isc__socket_t *)(ev)->newsocket)
......@@ -340,6 +341,7 @@ struct isc__socket {
unsigned int references;
int fd;
int pf;
int threadid;
char name[16];
void * tag;
......@@ -373,8 +375,26 @@ struct isc__socketmgr {
isc_socketmgr_t common;
isc_mem_t *mctx;
isc_mutex_t lock;
isc_mutex_t *fdlock;
isc_stats_t *stats;
int nthreads;
isc__socketthread_t *threads;
unsigned int maxsocks;
/* Locked by manager lock. */
ISC_LIST(isc__socket_t) socklist;
int reserved; /* unlocked */
isc_condition_t shutdown_ok;
int maxudp;
};
struct isc__socketthread {
isc__socketmgr_t * manager;
int threadid;
isc_thread_t thread;
int pipe_fds[2];
isc_mutex_t *fdlock;
/* Locked by fdlock. */
isc__socket_t **fds;
int *fdstate;
#ifdef USE_KQUEUE
int kqueue_fd;
int nevents;
......@@ -384,6 +404,7 @@ struct isc__socketmgr {
int epoll_fd;
int nevents;
struct epoll_event *events;
uint32_t *epoll_events;
#endif /* USE_EPOLL */
#ifdef USE_DEVPOLL
int devpoll_fd;
......@@ -391,38 +412,19 @@ struct isc__socketmgr {
unsigned int calls;
int nevents;
struct pollfd *events;
pollinfo_t *fdpollinfo;
#endif /* USE_DEVPOLL */
#ifdef USE_SELECT
int fd_bufsize;
#endif /* USE_SELECT */
unsigned int maxsocks;
int pipe_fds[2];
/* Locked by fdlock. */
isc__socket_t **fds;
int *fdstate;
#if defined(USE_EPOLL)
uint32_t *epoll_events;
#endif
#ifdef USE_DEVPOLL
pollinfo_t *fdpollinfo;
#endif
/* Locked by manager lock. */
ISC_LIST(isc__socket_t) socklist;
#ifdef USE_SELECT
fd_set *read_fds;
fd_set *read_fds_copy;
fd_set *write_fds;
fd_set *write_fds_copy;
int maxfd;
#endif /* USE_SELECT */
int reserved; /* unlocked */
isc_thread_t watcher;
isc_condition_t shutdown_ok;
int maxudp;
};
#define CLOSED 0 /* this one must be zero */
#define MANAGED 1
#define CLOSE_PENDING 2
......@@ -457,7 +459,7 @@ static void build_msghdr_send(isc__socket_t *, char *, isc_socketevent_t *,
struct msghdr *, struct iovec *, size_t *);
static void build_msghdr_recv(isc__socket_t *, char *, isc_socketevent_t *,
struct msghdr *, struct iovec *, size_t *);
static bool process_ctlfd(isc__socketmgr_t *manager);
static bool process_ctlfd(isc__socketthread_t *thread);
static void setdscp(isc__socket_t *sock, isc_dscp_t dscp);
#define SELECT_POKE_SHUTDOWN (-1)
......@@ -588,6 +590,29 @@ manager_log(isc__socketmgr_t *sockmgr,
"sockmgr %p: %s", sockmgr, msgbuf);
}
static void
thread_log(isc__socketthread_t *thread,
isc_logcategory_t *category, isc_logmodule_t *module, int level,
const char *fmt, ...) ISC_FORMAT_PRINTF(5, 6);
static void
thread_log(isc__socketthread_t *thread,
isc_logcategory_t *category, isc_logmodule_t *module, int level,
const char *fmt, ...)
{
char msgbuf[2048];
va_list ap;
if (! isc_log_wouldlog(isc_lctx, level))
return;
va_start(ap, fmt);
vsnprintf(msgbuf, sizeof(msgbuf), fmt, ap);
va_end(ap);
isc_log_write(isc_lctx, category, module, level,
"sockmgr %p thread %d: %s", thread->manager, thread->threadid, msgbuf);
}
static void
socket_log(isc__socket_t *sock, const isc_sockaddr_t *address,
isc_logcategory_t *category, isc_logmodule_t *module, int level,
......@@ -645,7 +670,7 @@ dec_stats(isc_stats_t *stats, isc_statscounter_t counterid) {
}
static inline isc_result_t
watch_fd(isc__socketmgr_t *manager, int fd, int msg) {
watch_fd(isc__socketthread_t *thread, int fd, int msg) {
isc_result_t result = ISC_R_SUCCESS;
#ifdef USE_KQUEUE
......@@ -658,7 +683,7 @@ watch_fd(isc__socketmgr_t *manager, int fd, int msg) {
evchange.filter = EVFILT_WRITE;
evchange.flags = EV_ADD;
evchange.ident = fd;
if (kevent(manager->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0)
if (kevent(thread->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0)
result = isc__errno2result(errno);
return (result);
......@@ -668,18 +693,18 @@ watch_fd(isc__socketmgr_t *manager, int fd, int msg) {
int ret;
int op;
oldevents = manager->epoll_events[fd];
oldevents = thread->epoll_events[fd];
if (msg == SELECT_POKE_READ)
manager->epoll_events[fd] |= EPOLLIN;
thread->epoll_events[fd] |= EPOLLIN;
else
manager->epoll_events[fd] |= EPOLLOUT;
thread->epoll_events[fd] |= EPOLLOUT;
event.events = manager->epoll_events[fd];
event.events = thread->epoll_events[fd];
memset(&event.data, 0, sizeof(event.data));
event.data.fd = fd;
op = (oldevents == 0U) ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ret = epoll_ctl(manager->epoll_fd, op, fd, &event);
ret = epoll_ctl(thread->epoll_fd, op, fd, &event);
if (ret == -1) {
if (errno == EEXIST)
UNEXPECTED_ERROR(__FILE__, __LINE__,
......@@ -691,6 +716,7 @@ watch_fd(isc__socketmgr_t *manager, int fd, int msg) {
return (result);
#elif defined(USE_DEVPOLL)
struct pollfd pfd;
INSIST(threadid == 0);
int lockid = FDLOCK_ID(fd);
memset(&pfd, 0, sizeof(pfd));
......@@ -700,32 +726,32 @@ watch_fd(isc__socketmgr_t *manager, int fd, int msg) {
pfd.events = POLLOUT;
pfd.fd = fd;
pfd.revents = 0;
LOCK(&manager->fdlock[lockid]);
if (write(manager->devpoll_fd, &pfd, sizeof(pfd)) == -1)
LOCK(&thread->fdlock[lockid]);
if (write(thread->devpoll_fd, &pfd, sizeof(pfd)) == -1)
result = isc__errno2result(errno);
else {
if (msg == SELECT_POKE_READ)
manager->fdpollinfo[fd].want_read = 1;
thread->fdpollinfo[fd].want_read = 1;
else
manager->fdpollinfo[fd].want_write = 1;
thread->fdpollinfo[fd].want_write = 1;
}
UNLOCK(&manager->fdlock[lockid]);
UNLOCK(&thread->fdlock[lockid]);
return (result);
#elif defined(USE_SELECT)
LOCK(&manager->lock);
LOCK(&thread->manager->lock);
if (msg == SELECT_POKE_READ)
FD_SET(fd, manager->read_fds);
FD_SET(fd, thread->read_fds);
if (msg == SELECT_POKE_WRITE)
FD_SET(fd, manager->write_fds);
UNLOCK(&manager->lock);
FD_SET(fd, thread->write_fds);
UNLOCK(&thread->manager->lock);
return (result);
#endif
}
static inline isc_result_t
unwatch_fd(isc__socketmgr_t *manager, int fd, int msg) {
unwatch_fd(isc__socketthread_t *thread, int fd, int msg) {
isc_result_t result = ISC_R_SUCCESS;
#ifdef USE_KQUEUE
......@@ -738,7 +764,7 @@ unwatch_fd(isc__socketmgr_t *manager, int fd, int msg) {
evchange.filter = EVFILT_WRITE;
evchange.flags = EV_DELETE;
evchange.ident = fd;
if (kevent(manager->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0)
if (kevent(thread->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0)
result = isc__errno2result(errno);
return (result);
......@@ -747,17 +773,18 @@ unwatch_fd(isc__socketmgr_t *manager, int fd, int msg) {
int ret;
int op;
if (msg == SELECT_POKE_READ)
manager->epoll_events[fd] &= ~(EPOLLIN);
else
manager->epoll_events[fd] &= ~(EPOLLOUT);
if (msg == SELECT_POKE_READ) {
thread->epoll_events[fd] &= ~(EPOLLIN);
} else {
thread->epoll_events[fd] &= ~(EPOLLOUT);
}
event.events = manager->epoll_events[fd];
event.events = thread->epoll_events[fd];
memset(&event.data, 0, sizeof(event.data));
event.data.fd = fd;
op = (event.events == 0U) ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
ret = epoll_ctl(manager->epoll_fd, op, fd, &event);
ret = epoll_ctl(thread->epoll_fd, op, fd, &event);
if (ret == -1 && errno != ENOENT) {
char strbuf[ISC_STRERRORSIZE];
strerror_r(errno, strbuf, sizeof(strbuf));
......@@ -780,45 +807,45 @@ unwatch_fd(isc__socketmgr_t *manager, int fd, int msg) {
* only provides a way of canceling per FD, we may need to re-poll the
* socket for the other operation.
*/
LOCK(&manager->fdlock[lockid]);
LOCK(&thread->fdlock[lockid]);
if (msg == SELECT_POKE_READ &&
manager->fdpollinfo[fd].want_write == 1) {
thread->fdpollinfo[fd].want_write == 1) {
pfds[1].events = POLLOUT;
pfds[1].fd = fd;
writelen += sizeof(pfds[1]);
}
if (msg == SELECT_POKE_WRITE &&
manager->fdpollinfo[fd].want_read == 1) {
thread->fdpollinfo[fd].want_read == 1) {
pfds[1].events = POLLIN;
pfds[1].fd = fd;
writelen += sizeof(pfds[1]);
}
if (write(manager->devpoll_fd, pfds, writelen) == -1)
if (write(thread->devpoll_fd, pfds, writelen) == -1)
result = isc__errno2result(errno);
else {
if (msg == SELECT_POKE_READ)
manager->fdpollinfo[fd].want_read = 0;
thread->fdpollinfo[fd].want_read = 0;
else
manager->fdpollinfo[fd].want_write = 0;
thread->fdpollinfo[fd].want_write = 0;
}
UNLOCK(&manager->fdlock[lockid]);
UNLOCK(&thread->fdlock[lockid]);
return (result);
#elif defined(USE_SELECT)
LOCK(&manager->lock);
LOCK(&thread->manager->lock);
if (msg == SELECT_POKE_READ)
FD_CLR(fd, manager->read_fds);
FD_CLR(fd, thread->read_fds);
else if (msg == SELECT_POKE_WRITE)
FD_CLR(fd, manager->write_fds);
UNLOCK(&manager->lock);
FD_CLR(fd, thread->write_fds);
UNLOCK(&thread->manager->lock);
return (result);
#endif
}
static void
wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) {
wakeup_socket(isc__socketthread_t *thread, int fd, int msg) {
isc_result_t result;
int lockid = FDLOCK_ID(fd);
......@@ -828,21 +855,21 @@ wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) {
* or writes.
*/
INSIST(fd >= 0 && fd < (int)manager->maxsocks);
INSIST(fd >= 0 && fd < (int)thread->manager->maxsocks);
if (msg == SELECT_POKE_CLOSE) {
/* No one should be updating fdstate, so no need to lock it */
INSIST(manager->fdstate[fd] == CLOSE_PENDING);
manager->fdstate[fd] = CLOSED;
(void)unwatch_fd(manager, fd, SELECT_POKE_READ);
(void)unwatch_fd(manager, fd, SELECT_POKE_WRITE);
INSIST(thread->fdstate[fd] == CLOSE_PENDING);
thread->fdstate[fd] = CLOSED;
(void)unwatch_fd(thread, fd, SELECT_POKE_READ);
(void)unwatch_fd(thread, fd, SELECT_POKE_WRITE);
(void)close(fd);
return;
}
LOCK(&manager->fdlock[lockid]);
if (manager->fdstate[fd] == CLOSE_PENDING) {
UNLOCK(&manager->fdlock[lockid]);
LOCK(&thread->fdlock[lockid]);
if (thread->fdstate[fd] == CLOSE_PENDING) {
UNLOCK(&thread->fdlock[lockid]);
/*
* We accept (and ignore) any error from unwatch_fd() as we are
......@@ -852,20 +879,20 @@ wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) {
* fdlock; otherwise it could cause deadlock due to a lock order
* reversal.
*/
(void)unwatch_fd(manager, fd, SELECT_POKE_READ);
(void)unwatch_fd(manager, fd, SELECT_POKE_WRITE);
(void)unwatch_fd(thread, fd, SELECT_POKE_READ);
(void)unwatch_fd(thread, fd, SELECT_POKE_WRITE);
return;
}
if (manager->fdstate[fd] != MANAGED) {
UNLOCK(&manager->fdlock[lockid]);
if (thread->fdstate[fd] != MANAGED) {
UNLOCK(&thread->fdlock[lockid]);
return;
}
UNLOCK(&manager->fdlock[lockid]);
UNLOCK(&thread->fdlock[lockid]);
/*
* Set requested bit.
*/
result = watch_fd(manager, fd, msg);
result = watch_fd(thread, fd, msg);
if (result != ISC_R_SUCCESS) {
/*
* XXXJT: what should we do? Ignoring the failure of watching
......@@ -885,7 +912,7 @@ wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) {
* will not get partial writes.
*/
static void
select_poke(isc__socketmgr_t *mgr, int fd, int msg) {
select_poke(isc__socketmgr_t *mgr, int threadid, int fd, int msg) {
int cc;
int buf[2];
char strbuf[ISC_STRERRORSIZE];
......@@ -894,7 +921,7 @@ select_poke(isc__socketmgr_t *mgr, int fd, int msg) {
buf[1] = msg;
do {
cc = write(mgr->pipe_fds[1], buf, sizeof(buf));
cc = write(mgr->threads[threadid].pipe_fds[1], buf, sizeof(buf));
#ifdef ENOSR
/*
* Treat ENOSR as EAGAIN but loop slowly as it is
......@@ -924,12 +951,12 @@ select_poke(isc__socketmgr_t *mgr, int fd, int msg) {
* Read a message on the internal fd.
*/
static void
select_readmsg(isc__socketmgr_t *mgr, int *fd, int *msg) {
select_readmsg(isc__socketthread_t *thread, int *fd, int *msg) {
int buf[2];
int cc;
char strbuf[ISC_STRERRORSIZE];
cc = read(mgr->pipe_fds[0], buf, sizeof(buf));
cc = read(thread->pipe_fds[0], buf, sizeof(buf));
if (cc < 0) {
*msg = SELECT_POKE_NOTHING;
*fd = -1; /* Silence compiler. */
......@@ -1755,16 +1782,16 @@ doio_send(isc__socket_t *sock, isc_socketevent_t *dev) {
static void
socketclose(isc__socketmgr_t *manager, isc__socket_t *sock, int fd) {
int lockid = FDLOCK_ID(fd);
isc__socketthread_t *thread = &manager->threads[sock->threadid];
/*
* No one has this socket open, so the watcher doesn't have to be
* poked, and the socket doesn't have to be locked.
*/
LOCK(&manager->fdlock[lockid]);
manager->fds[fd] = NULL;
manager->fdstate[fd] = CLOSE_PENDING;
UNLOCK(&manager->fdlock[lockid]);
select_poke(manager, fd, SELECT_POKE_CLOSE);
LOCK(&thread->fdlock[lockid]);
thread->fds[fd] = NULL;
thread->fdstate[fd] = CLOSE_PENDING;
UNLOCK(&thread->fdlock[lockid]);
select_poke(manager, sock->threadid, fd, SELECT_POKE_CLOSE);
inc_stats(manager->stats, sock->statsindex[STATID_CLOSE]);
if (sock->active == 1) {
......@@ -1778,23 +1805,23 @@ socketclose(isc__socketmgr_t *manager, isc__socket_t *sock, int fd) {
*/
#ifdef USE_SELECT
LOCK(&manager->lock);
if (manager->maxfd == fd) {
if (thread->maxfd == fd) {
int i;
manager->maxfd = 0;
thread->maxfd = 0;
for (i = fd - 1; i >= 0; i--) {
lockid = FDLOCK_ID(i);
LOCK(&manager->fdlock[lockid]);
if (manager->fdstate[i] == MANAGED) {
manager->maxfd = i;
UNLOCK(&manager->fdlock[lockid]);
LOCK(&thread->fdlock[lockid]);
if (thread->fdstate[i] == MANAGED) {
thread->maxfd = i;
UNLOCK(&thread->fdlock[lockid]);
break;
}
UNLOCK(&manager->fdlock[lockid]);
UNLOCK(&thread->fdlock[lockid]);
}
if (manager->maxfd < manager->pipe_fds[0])
manager->maxfd = manager->pipe_fds[0];
if (thread->maxfd < thread->pipe_fds[0])
thread->maxfd = thread->pipe_fds[0];
}
UNLOCK(&manager->lock);
......@@ -2509,6 +2536,7 @@ socket_create(isc_socketmgr_t *manager0, int pf, isc_sockettype_t type,
{
isc__socket_t *sock = NULL;
isc__socketmgr_t *manager = (isc__socketmgr_t *)manager0;
isc__socketthread_t *thread;
isc_result_t result;
int lockid;
......@@ -2549,7 +2577,9 @@ socket_create(isc_socketmgr_t *manager0, int pf, isc_sockettype_t type,
return (result);
}
sock->threadid = sock->fd % manager->nthreads; // TODO?
sock->references = 1;
thread = &manager->threads[sock->threadid];
*socketp = (isc_socket_t *)sock;
/*
......@@ -2558,23 +2588,24 @@ socket_create(isc_socketmgr_t *manager0, int pf, isc_sockettype_t type,
*/
lockid = FDLOCK_ID(sock->fd);
LOCK(&manager->fdlock[lockid]);
manager->fds[sock->fd] = sock;
manager->fdstate[sock->fd] = MANAGED;
LOCK(&thread->fdlock[lockid]);
thread->fds[sock->fd] = sock;
thread->fdstate[sock->fd] = MANAGED;
#if defined(USE_EPOLL)
manager->epoll_events[sock->fd] = 0;
thread->epoll_events[sock->fd] = 0;
#endif
#ifdef USE_DEVPOLL
INSIST(sock->manager->fdpollinfo[sock->fd].want_read == 0 &&
sock->manager->fdpollinfo[sock->fd].want_write == 0);
INSIST(thread->fdpollinfo[sock->fd].want_read == 0 &&
thread->fdpollinfo[sock->fd].want_write == 0);
#endif
UNLOCK(&manager->fdlock[lockid]);
UNLOCK(&thread->fdlock[lockid]);
LOCK(&manager->lock);
ISC_LIST_APPEND(manager->socklist, sock, link);
#ifdef USE_SELECT
if (manager->maxfd < sock->fd)
manager->maxfd = sock->fd;
if (thread->maxfd < sock->fd) {
thread->maxfd = sock->fd;
}
#endif
UNLOCK(&manager->lock);
......@@ -2617,6 +2648,7 @@ isc_result_t
isc_socket_open(isc_socket_t *sock0) {
isc_result_t result;
isc__socket_t *sock = (isc__socket_t *)sock0;
isc__socketthread_t *thread;
REQUIRE(VALID_SOCKET(sock));
......@@ -2628,30 +2660,32 @@ isc_socket_open(isc_socket_t *sock0) {
* this socket.
*/
REQUIRE(sock->fd == -1);
REQUIRE(sock->threadid == -1);
result = opensocket(sock->manager, sock, NULL);
if (result != ISC_R_SUCCESS)
if (result != ISC_R_SUCCESS) {
sock->fd = -1;
if (result == ISC_R_SUCCESS) {
} else {
sock->threadid = sock->fd % sock->manager->nthreads; // TODO?
thread = &sock->manager->threads[sock->threadid];
int lockid = FDLOCK_ID(sock->fd);
LOCK(&sock->manager->fdlock[lockid]);
sock->manager->fds[sock->fd] = sock;
sock->manager->fdstate[sock->fd] = MANAGED;
LOCK(&thread->fdlock[lockid]);
thread->fds[sock->fd] = sock;
thread->fdstate[sock->fd] = MANAGED;
#if defined(USE_EPOLL)
sock->manager->epoll_events[sock->fd] = 0;
thread->epoll_events[sock->fd] = 0;
#endif
#ifdef USE_DEVPOLL
INSIST(sock->manager->fdpollinfo[sock->fd].want_read == 0 &&
sock->manager->fdpollinfo[sock->fd].want_write == 0);
INSIST(thread->fdpollinfo[sock->fd].want_read == 0 &&
thread->fdpollinfo[sock->fd].want_write == 0);
#endif
UNLOCK(&sock->manager->fdlock[lockid]);
UNLOCK(&thread->fdlock[lockid]);
#ifdef USE_SELECT
LOCK(&sock->manager->lock);
if (sock->manager->maxfd < sock->fd)
sock->manager->maxfd = sock->fd;
if (thread->maxfd < sock->fd)
thread->maxfd = sock->fd;
UNLOCK(&sock->manager->lock);
#endif
}
......@@ -2828,6 +2862,7 @@ send_connectdone_event(isc__socket_t *sock, isc_socket_connev_t **dev) {
static void
internal_accept(isc__socket_t *sock) {
isc__socketmgr_t *manager;
isc__socketthread_t *thread, *nthread;
isc_socket_newconnev_t *dev;
isc_task_t *task;
socklen_t addrlen;
......@@ -2845,6 +2880,7 @@ internal_accept(isc__socket_t *sock) {
manager = sock->manager;
INSIST(VALID_MANAGER(manager));
thread = &manager->threads[sock->threadid];
INSIST(sock->listener);
......@@ -2980,7 +3016,8 @@ internal_accept(isc__socket_t *sock) {
* Poke watcher if there are more pending accepts.
*/
if (!ISC_LIST_EMPTY(sock->accept_list))
watch_fd(sock->manager, sock->fd, SELECT_POKE_ACCEPT);
watch_fd(thread, sock->fd,
SELECT_POKE_ACCEPT);
UNLOCK(&sock->lock);
......@@ -2999,8 +3036,10 @@ internal_accept(isc__socket_t *sock) {
int lockid = FDLOCK_ID(fd);
NEWCONNSOCK(dev)->fd = fd;
NEWCONNSOCK(dev)->threadid = fd % manager->nthreads; // TODO
NEWCONNSOCK(dev)->bound = 1;
NEWCONNSOCK(dev)->connected = 1;
nthread = &manager->threads[NEWCONNSOCK(dev)->threadid];
/*
* Use minimum mtu if possible.
......@@ -3024,19 +3063,19 @@ internal_accept(isc__socket_t *sock) {
NEWCONNSOCK(dev)->active = 1;
}
LOCK(&manager->fdlock[lockid]);