Commit cbc1b123 authored by Witold Krecicki's avatar Witold Krecicki
Browse files

Do IO after event directly in the network thread, don't queue an event in a separate task.

parent 778ab815
......@@ -348,20 +348,9 @@ struct isc__socket {
ISC_LIST(isc_socket_newconnev_t) accept_list;
ISC_LIST(isc_socket_connev_t) connect_list;
/*
* Internal events. Posted when a descriptor is readable or
* writable. These are statically allocated and never freed.
* They will be set to non-purgable before use.
*/
intev_t readable_ev;
intev_t writable_ev;
isc_sockaddr_t peer_address; /* remote address */
unsigned int pending_recv : 1,
pending_send : 1,
pending_accept : 1,
listener : 1, /* listener socket */
unsigned int listener : 1, /* listener socket */
connected : 1,
connecting : 1, /* connect pending */
bound : 1, /* bound to local addr */
......@@ -459,10 +448,10 @@ static void free_socket(isc__socket_t **);
static isc_result_t allocate_socket(isc__socketmgr_t *, isc_sockettype_t,
isc__socket_t **);
static void destroy(isc__socket_t **);
static void internal_accept(isc_task_t *, isc_event_t *);
static void internal_connect(isc_task_t *, isc_event_t *);
static void internal_recv(isc_task_t *, isc_event_t *);
static void internal_send(isc_task_t *, isc_event_t *);
static void internal_accept(isc__socket_t *);
static void internal_connect(isc__socket_t *);
static void internal_recv(isc__socket_t *);
static void internal_send(isc__socket_t *);
static void process_cmsg(isc__socket_t *, struct msghdr *, isc_socketevent_t *);
static void build_msghdr_send(isc__socket_t *, char *, isc_socketevent_t *,
struct msghdr *, struct iovec *, size_t *);
......@@ -1883,9 +1872,6 @@ allocate_socket(isc__socketmgr_t *manager, isc_sockettype_t type,
ISC_LIST_INIT(sock->send_list);
ISC_LIST_INIT(sock->accept_list);
ISC_LIST_INIT(sock->connect_list);
sock->pending_recv = 0;
sock->pending_send = 0;
sock->pending_accept = 0;
sock->listener = 0;
sock->connected = 0;
sock->connecting = 0;
......@@ -1902,16 +1888,6 @@ allocate_socket(isc__socketmgr_t *manager, isc_sockettype_t type,
goto error;
}
/*
* Initialize readable and writable events.
*/
ISC_EVENT_INIT(&sock->readable_ev, sizeof(intev_t),
ISC_EVENTATTR_NOPURGE, NULL, ISC_SOCKEVENT_INTR,
NULL, sock, sock, NULL, NULL);
ISC_EVENT_INIT(&sock->writable_ev, sizeof(intev_t),
ISC_EVENTATTR_NOPURGE, NULL, ISC_SOCKEVENT_INTW,
NULL, sock, sock, NULL, NULL);
sock->common.magic = ISCAPI_SOCKET_MAGIC;
sock->common.impmagic = SOCKET_MAGIC;
*socketp = sock;
......@@ -1938,9 +1914,6 @@ free_socket(isc__socket_t **socketp) {
INSIST(VALID_SOCKET(sock));
INSIST(sock->references == 0);
INSIST(!sock->connecting);
INSIST(!sock->pending_recv);
INSIST(!sock->pending_send);
INSIST(!sock->pending_accept);
INSIST(ISC_LIST_EMPTY(sock->recv_list));
INSIST(ISC_LIST_EMPTY(sock->send_list));
INSIST(ISC_LIST_EMPTY(sock->accept_list));
......@@ -2745,9 +2718,6 @@ isc_socket_close(isc_socket_t *sock0) {
REQUIRE(sock->fd >= 0 && sock->fd < (int)sock->manager->maxsocks);
INSIST(!sock->connecting);
INSIST(!sock->pending_recv);
INSIST(!sock->pending_send);
INSIST(!sock->pending_accept);
INSIST(ISC_LIST_EMPTY(sock->recv_list));
INSIST(ISC_LIST_EMPTY(sock->send_list));
INSIST(ISC_LIST_EMPTY(sock->accept_list));
......@@ -2772,117 +2742,6 @@ isc_socket_close(isc_socket_t *sock0) {
return (ISC_R_SUCCESS);
}
/*
* I/O is possible on a given socket. Schedule an event to this task that
* will call an internal function to do the I/O. This will charge the
* task with the I/O operation and let our select loop handler get back
* to doing something real as fast as possible.
*
* The socket and manager must be locked before calling this function.
*/
static void
dispatch_recv(isc__socket_t *sock) {
intev_t *iev;
isc_socketevent_t *ev;
isc_task_t *sender;
INSIST(!sock->pending_recv);
ev = ISC_LIST_HEAD(sock->recv_list);
if (ev == NULL)
return;
socket_log(sock, NULL, EVENT, NULL, 0, 0,
"dispatch_recv: event %p -> task %p",
ev, ev->ev_sender);
sender = ev->ev_sender;
sock->pending_recv = 1;
iev = &sock->readable_ev;
sock->references++;
iev->ev_sender = sock;
iev->ev_action = internal_recv;
iev->ev_arg = sock;
isc_task_send(sender, (isc_event_t **)&iev);
}
static void
dispatch_send(isc__socket_t *sock) {
intev_t *iev;
isc_socketevent_t *ev;
isc_task_t *sender;
INSIST(!sock->pending_send);
ev = ISC_LIST_HEAD(sock->send_list);
if (ev == NULL)
return;
socket_log(sock, NULL, EVENT, NULL, 0, 0,
"dispatch_send: event %p -> task %p",
ev, ev->ev_sender);
sender = ev->ev_sender;
sock->pending_send = 1;
iev = &sock->writable_ev;
sock->references++;
iev->ev_sender = sock;
iev->ev_action = internal_send;
iev->ev_arg = sock;
isc_task_send(sender, (isc_event_t **)&iev);
}
/*
* Dispatch an internal accept event.
*/
static void
dispatch_accept(isc__socket_t *sock) {
intev_t *iev;
isc_socket_newconnev_t *ev;
INSIST(!sock->pending_accept);
/*
* Are there any done events left, or were they all canceled
* before the manager got the socket lock?
*/
ev = ISC_LIST_HEAD(sock->accept_list);
if (ev == NULL)
return;
sock->pending_accept = 1;
iev = &sock->readable_ev;
sock->references++; /* keep socket around for this internal event */
iev->ev_sender = sock;
iev->ev_action = internal_accept;
iev->ev_arg = sock;
isc_task_send(ev->ev_sender, (isc_event_t **)&iev);
}
static void
dispatch_connect(isc__socket_t *sock) {
intev_t *iev;
isc_socket_connev_t *ev;
iev = &sock->writable_ev;
ev = ISC_LIST_HEAD(sock->connect_list);
INSIST(ev != NULL); /* XXX */
INSIST(sock->connecting);
sock->references++; /* keep socket around for this internal event */
iev->ev_sender = sock;
iev->ev_action = internal_connect;
iev->ev_arg = sock;
isc_task_send(ev->ev_sender, (isc_event_t **)&iev);
}
/*
* Dequeue an item off the given socket's read queue, set the result code
* in the done event to the one provided, and send it to the task it was
......@@ -2967,8 +2826,7 @@ send_connectdone_event(isc__socket_t *sock, isc_socket_connev_t **dev) {
* so just unlock and return.
*/
static void
internal_accept(isc_task_t *me, isc_event_t *ev) {
isc__socket_t *sock;
internal_accept(isc__socket_t *sock) {
isc__socketmgr_t *manager;
isc_socket_newconnev_t *dev;
isc_task_t *task;
......@@ -2978,9 +2836,6 @@ internal_accept(isc_task_t *me, isc_event_t *ev) {
char strbuf[ISC_STRERRORSIZE];
const char *err = "accept";
UNUSED(me);
sock = ev->ev_sender;
INSIST(VALID_SOCKET(sock));
LOCK(&sock->lock);
......@@ -2992,16 +2847,6 @@ internal_accept(isc_task_t *me, isc_event_t *ev) {
INSIST(VALID_MANAGER(manager));
INSIST(sock->listener);
INSIST(sock->pending_accept == 1);
sock->pending_accept = 0;
INSIST(sock->references > 0);
sock->references--; /* the internal event is done with this socket */
if (sock->references == 0) {
UNLOCK(&sock->lock);
destroy(&sock);
return;
}
/*
* Get the first item off the accept list.
......@@ -3135,7 +2980,7 @@ internal_accept(isc_task_t *me, isc_event_t *ev) {
* Poke watcher if there are more pending accepts.
*/
if (!ISC_LIST_EMPTY(sock->accept_list))
select_poke(sock->manager, sock->fd, SELECT_POKE_ACCEPT);
watch_fd(sock->manager, sock->fd, SELECT_POKE_ACCEPT);
UNLOCK(&sock->lock);
......@@ -3221,7 +3066,7 @@ internal_accept(isc_task_t *me, isc_event_t *ev) {
return;
soft_error:
select_poke(sock->manager, sock->fd, SELECT_POKE_ACCEPT);
watch_fd(sock->manager, sock->fd, SELECT_POKE_ACCEPT);
UNLOCK(&sock->lock);
inc_stats(manager->stats, sock->statsindex[STATID_ACCEPTFAIL]);
......@@ -3229,36 +3074,26 @@ internal_accept(isc_task_t *me, isc_event_t *ev) {
}
static void
internal_recv(isc_task_t *me, isc_event_t *ev) {
internal_recv(isc__socket_t *sock) {
isc_socketevent_t *dev;
isc__socket_t *sock;
INSIST(ev->ev_type == ISC_SOCKEVENT_INTR);
sock = ev->ev_sender;
INSIST(VALID_SOCKET(sock));
LOCK(&sock->lock);
socket_log(sock, NULL, IOEVENT,
isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_INTERNALRECV,
"internal_recv: task %p got event %p", me, ev);
INSIST(sock->pending_recv == 1);
sock->pending_recv = 0;
INSIST(sock->references > 0);
sock->references--; /* the internal event is done with this socket */
if (sock->references == 0) {
dev = ISC_LIST_HEAD(sock->recv_list);
if (dev == NULL) {
UNLOCK(&sock->lock);
destroy(&sock);
return;
}
socket_log(sock, NULL, IOEVENT,
isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_INTERNALRECV,
"internal_recv: event %p -> task %p", dev, dev->ev_sender);
/*
* Try to do as much I/O as possible on this socket. There are no
* limits here, currently.
*/
dev = ISC_LIST_HEAD(sock->recv_list);
while (dev != NULL) {
switch (doio_recv(sock, dev)) {
case DOIO_SOFT:
......@@ -3288,45 +3123,31 @@ internal_recv(isc_task_t *me, isc_event_t *ev) {
poke:
if (!ISC_LIST_EMPTY(sock->recv_list))
select_poke(sock->manager, sock->fd, SELECT_POKE_READ);
watch_fd(sock->manager, sock->fd, SELECT_POKE_READ);
UNLOCK(&sock->lock);
}
static void
internal_send(isc_task_t *me, isc_event_t *ev) {
internal_send(isc__socket_t *sock) {
isc_socketevent_t *dev;
isc__socket_t *sock;
INSIST(ev->ev_type == ISC_SOCKEVENT_INTW);
/*
* Find out what socket this is and lock it.
*/
sock = (isc__socket_t *)ev->ev_sender;
INSIST(VALID_SOCKET(sock));
LOCK(&sock->lock);
socket_log(sock, NULL, IOEVENT,
isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_INTERNALSEND,
"internal_send: task %p got event %p", me, ev);
INSIST(sock->pending_send == 1);
sock->pending_send = 0;
INSIST(sock->references > 0);
sock->references--; /* the internal event is done with this socket */
if (sock->references == 0) {
dev = ISC_LIST_HEAD(sock->send_list);
if (dev == NULL) {
UNLOCK(&sock->lock);
destroy(&sock);
return;
}
socket_log(sock, NULL, EVENT, NULL, 0, 0,
"internal_send: event %p -> task %p",
dev, dev->ev_sender);
/*
* Try to do as much I/O as possible on this socket. There are no
* limits here, currently.
*/
dev = ISC_LIST_HEAD(sock->send_list);
while (dev != NULL) {
switch (doio_send(sock, dev)) {
case DOIO_SOFT:
......@@ -3343,7 +3164,7 @@ internal_send(isc_task_t *me, isc_event_t *ev) {
poke:
if (!ISC_LIST_EMPTY(sock->send_list))
select_poke(sock->manager, sock->fd, SELECT_POKE_WRITE);
watch_fd(sock->manager, sock->fd, SELECT_POKE_WRITE);
UNLOCK(&sock->lock);
}
......@@ -3357,7 +3178,6 @@ process_fd(isc__socketmgr_t *manager, int fd, bool readable,
bool writeable)
{
isc__socket_t *sock;
bool unlock_sock;
bool unwatch_read = false, unwatch_write = false;
int lockid = FDLOCK_ID(fd);
......@@ -3374,19 +3194,20 @@ process_fd(isc__socketmgr_t *manager, int fd, bool readable,
}
sock = manager->fds[fd];
unlock_sock = false;
LOCK(&sock->lock);
sock->references++;
UNLOCK(&sock->lock);
if (readable) {
if (sock == NULL) {
unwatch_read = true;
goto check_write;
}
unlock_sock = true;
LOCK(&sock->lock);
if (!SOCK_DEAD(sock)) {
if (sock->listener)
dispatch_accept(sock);
internal_accept(sock);
else
dispatch_recv(sock);
internal_recv(sock);
}
unwatch_read = true;
}
......@@ -3396,20 +3217,14 @@ check_write:
unwatch_write = true;
goto unlock_fd;
}
if (!unlock_sock) {
unlock_sock = true;
LOCK(&sock->lock);
}
if (!SOCK_DEAD(sock)) {
if (sock->connecting)
dispatch_connect(sock);
internal_connect(sock);
else
dispatch_send(sock);
internal_send(sock);
}
unwatch_write = true;
}
if (unlock_sock)
UNLOCK(&sock->lock);
unlock_fd:
UNLOCK(&manager->fdlock[lockid]);
......@@ -3417,7 +3232,9 @@ check_write:
(void)unwatch_fd(manager, fd, SELECT_POKE_READ);
if (unwatch_write)
(void)unwatch_fd(manager, fd, SELECT_POKE_WRITE);
LOCK(&sock->lock);
sock->references--;
UNLOCK(&sock->lock);
}
#ifdef USE_KQUEUE
......@@ -4288,7 +4105,7 @@ socket_recv(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task,
* Enqueue the request. If the socket was previously not being
* watched, poke the watcher to start paying attention to it.
*/
if (ISC_LIST_EMPTY(sock->recv_list) && !sock->pending_recv)
if (ISC_LIST_EMPTY(sock->recv_list))
select_poke(sock->manager, sock->fd, SELECT_POKE_READ);
ISC_LIST_ENQUEUE(sock->recv_list, dev, ev_link);
......@@ -4435,8 +4252,7 @@ socket_send(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task,
* not being watched, poke the watcher to start
* paying attention to it.
*/
if (ISC_LIST_EMPTY(sock->send_list) &&
!sock->pending_send)
if (ISC_LIST_EMPTY(sock->send_list))
select_poke(sock->manager, sock->fd,
SELECT_POKE_WRITE);
ISC_LIST_ENQUEUE(sock->send_list, dev, ev_link);
......@@ -5168,8 +4984,7 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr,
* Called when a socket with a pending connect() finishes.
*/
static void
internal_connect(isc_task_t *me, isc_event_t *ev) {
isc__socket_t *sock;
internal_connect(isc__socket_t *sock) {
isc_socket_connev_t *dev;
int cc;
isc_result_t result;
......@@ -5177,26 +4992,10 @@ internal_connect(isc_task_t *me, isc_event_t *ev) {
char strbuf[ISC_STRERRORSIZE];
char peerbuf[ISC_SOCKADDR_FORMATSIZE];
UNUSED(me);
INSIST(ev->ev_type == ISC_SOCKEVENT_INTW);
sock = ev->ev_sender;
INSIST(VALID_SOCKET(sock));
LOCK(&sock->lock);
/*
* When the internal event was sent the reference count was bumped
* to keep the socket around for us. Decrement the count here.
*/
INSIST(sock->references > 0);
sock->references--;
if (sock->references == 0) {
UNLOCK(&sock->lock);
destroy(&sock);
return;
}
/*
* Get the first item off the connect list.
* If it is empty, unlock the socket and return.
......@@ -5228,7 +5027,7 @@ internal_connect(isc_task_t *me, isc_event_t *ev) {
*/
if (SOFT_ERROR(errno) || errno == EINPROGRESS) {
sock->connecting = 1;
select_poke(sock->manager, sock->fd,
watch_fd(sock->manager, sock->fd,
SELECT_POKE_CONNECT);
UNLOCK(&sock->lock);
......@@ -5693,18 +5492,6 @@ isc_socketmgr_renderxml(isc_socketmgr_t *mgr0, xmlTextWriterPtr writer) {
}
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "states"));
if (sock->pending_recv)
TRY0(xmlTextWriterWriteElement(writer,
ISC_XMLCHAR "state",
ISC_XMLCHAR "pending-receive"));
if (sock->pending_send)
TRY0(xmlTextWriterWriteElement(writer,
ISC_XMLCHAR "state",
ISC_XMLCHAR "pending-send"));
if (sock->pending_accept)
TRY0(xmlTextWriterWriteElement(writer,
ISC_XMLCHAR "state",
ISC_XMLCHAR "pending_accept"));
if (sock->listener)
TRY0(xmlTextWriterWriteElement(writer,
ISC_XMLCHAR "state",
......@@ -5812,24 +5599,6 @@ isc_socketmgr_renderjson(isc_socketmgr_t *mgr0, json_object *stats) {
CHECKMEM(states);
json_object_object_add(entry, "states", states);
if (sock->pending_recv) {
obj = json_object_new_string("pending-receive");
CHECKMEM(obj);
json_object_array_add(states, obj);
}
if (sock->pending_send) {
obj = json_object_new_string("pending-send");
CHECKMEM(obj);
json_object_array_add(states, obj);
}
if (sock->pending_accept) {
obj = json_object_new_string("pending-accept");
CHECKMEM(obj);
json_object_array_add(states, obj);
}
if (sock->listener) {
obj = json_object_new_string("listener");
CHECKMEM(obj);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment