Commit f5c6cae3 authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

Merge #2690

Using select instead of poll/kqueue in message queue daemon.
parents 6c968df2 b177e3f9
......@@ -182,8 +182,6 @@ class MsgQ:
self.socket_file = socket_file
self.verbose = verbose
self.poller = None
self.kqueue = None
self.runnable = False
self.listen_socket = False
self.sockets = {}
......@@ -204,6 +202,7 @@ class MsgQ:
# side.
self.__lock = threading.Lock()
self._session = None
self.__poller_sock = None
def members_notify(self, event, params):
"""
......@@ -264,37 +263,6 @@ class MsgQ:
self.__cfgmgr_ready_cond.wait()
return self.__cfgmgr_ready
def setup_poller(self):
"""Set up the poll thing. Internal function."""
try:
self.kqueue = select.kqueue()
except AttributeError:
self.poller = select.poll()
def add_kqueue_socket(self, socket, write_filter=False):
"""Add a kqueue filter for a socket. By default the read
filter is used; if write_filter is set to True, the write
filter is used. We use a boolean value instead of a specific
filter constant, because kqueue filter values do not seem to
be defined on some systems. The use of boolean makes the
interface restrictive because there are other filters, but this
method is mostly only for our internal use, so it should be
acceptable at least for now."""
filter_type = select.KQ_FILTER_WRITE if write_filter else \
select.KQ_FILTER_READ
event = select.kevent(socket.fileno(), filter_type,
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
self.kqueue.control([event], 0)
def delete_kqueue_socket(self, socket, write_filter=False):
"""Delete a kqueue filter for socket. See add_kqueue_socket()
for the semantics and notes about write_filter."""
filter_type = select.KQ_FILTER_WRITE if write_filter else \
select.KQ_FILTER_READ
event = select.kevent(socket.fileno(), filter_type,
select.KQ_EV_DELETE)
self.kqueue.control([event], 0)
def setup_listener(self):
"""Set up the listener socket. Internal function."""
logger.debug(TRACE_BASIC, MSGQ_LISTENER_SETUP, self.socket_file)
......@@ -315,11 +283,6 @@ class MsgQ:
logger.fatal(MSGQ_LISTENER_FAILED, self.socket_file, e)
raise e
if self.poller:
self.poller.register(self.listen_socket, select.POLLIN)
else:
self.add_kqueue_socket(self.listen_socket)
def setup_signalsock(self):
"""Create a socket pair used to signal when we want to finish.
Using a socket is easy and thread/signal safe way to signal
......@@ -329,18 +292,12 @@ class MsgQ:
# closed, we should shut down.
(self.__poller_sock, self.__control_sock) = socket.socketpair()
if self.poller:
self.poller.register(self.__poller_sock, select.POLLIN)
else:
self.add_kqueue_socket(self.__poller_sock)
def setup(self):
"""Configure listener socket, polling, etc.
Raises a socket.error if the socket_file cannot be
created.
"""
self.setup_poller()
self.setup_signalsock()
self.setup_listener()
......@@ -369,20 +326,10 @@ class MsgQ:
logger.debug(TRACE_BASIC, MSGQ_SOCKET_REGISTERED, newsocket.fileno(),
lname)
if self.poller:
self.poller.register(newsocket, select.POLLIN)
else:
self.add_kqueue_socket(newsocket)
self.members_notify('connected', {'client': lname})
def kill_socket(self, fd, sock):
"""Fully close down the socket."""
# Unregister events on the socket. Note that we don't have to do
# this for kqueue because the registered events are automatically
# deleted when the corresponding socket is closed.
if self.poller:
self.poller.unregister(sock)
unsubscribed_from = self.subs.unsubscribe_all(sock)
lname = self.fd_to_lname[fd]
......@@ -584,15 +531,10 @@ class MsgQ:
else:
buff = msg[amount_sent:]
last_sent = now
if self.poller:
self.poller.register(fileno, select.POLLIN |
select.POLLOUT)
else:
self.add_kqueue_socket(sock, True)
self.sendbuffs[fileno] = (last_sent, buff)
return True
def __process_write(self, fileno):
def _process_write(self, fileno):
# Try to send some data from the buffer
(_, msg) = self.sendbuffs[fileno]
sock = self.sockets[fileno]
......@@ -602,10 +544,6 @@ class MsgQ:
msg = msg[amount_sent:]
if len(msg) == 0:
# If there's no more, stop requesting for write availability
if self.poller:
self.poller.register(fileno, select.POLLIN)
else:
self.delete_kqueue_socket(sock, True)
del self.sendbuffs[fileno]
else:
self.sendbuffs[fileno] = (time.clock(), msg)
......@@ -717,90 +655,46 @@ class MsgQ:
"""Process messages. Forever. Mostly."""
self.running = True
if self.poller:
self.run_poller()
else:
self.run_kqueue()
self.run_select()
def run_poller(self):
def run_select(self):
while self.running:
reads = list(self.fd_to_lname.keys())
if self.listen_socket.fileno() != -1: # Skip in tests
reads.append(self.listen_socket.fileno())
if self.__poller_sock and self.__poller_sock.fileno() != -1:
reads.append(self.__poller_sock.fileno())
writes = list(self.sendbuffs.keys())
(read_ready, write_ready) = ([], [])
try:
# Poll with a timeout so that every once in a while,
# the loop checks for self.running.
events = self.poller.poll()
(read_ready, write_ready, _) = select.select(reads, writes,
[]);
except select.error as err:
if err.args[0] == errno.EINTR:
events = []
continue # Just try it again if interrupted.
else:
logger.fatal(MSGQ_POLL_ERROR, err)
logger.fatal(MSGQ_SELECT_ERROR, err)
break
with self.__lock:
for (fd, event) in events:
write_ready = set(write_ready)
for fd in read_ready:
# Do only one operation per loop iteration on the given fd.
# It could be possible to perform both, but it may have
# undesired side effects in special situations (like, if the
# read closes the socket).
if fd in write_ready:
write_ready.remove(fd)
if fd == self.listen_socket.fileno():
self.process_accept()
elif fd == self.__poller_sock.fileno():
# If it's the signal socket, we should terminate now.
elif self.__poller_sock and fd == \
self.__poller_sock.fileno():
# The signal socket. We should terminate now.
self.running = False
break
else:
writable = event & select.POLLOUT
# Note: it may be okay to read data if available
# immediately after write some, but due to unexpected
# regression (see comments on the kqueue version below)
# we restrict one operation per iteration for now.
# In future we may clarify the point and enable the
# "read/write" mode.
readable = not writable and (event & select.POLLIN)
if not writable and not readable:
logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
self._process_fd(fd, writable, readable, False)
def run_kqueue(self):
while self.running:
# Check with a timeout so that every once in a while,
# the loop checks for self.running.
events = self.kqueue.control(None, 10)
if not events:
raise RuntimeError('serve: kqueue returned no events')
with self.__lock:
for event in events:
if event.ident == self.listen_socket.fileno():
self.process_accept()
elif event.ident == self.__poller_sock.fileno():
# If it's the signal socket, we should terminate now.
self.running = False
break;
else:
fd = event.ident
writable = event.filter == select.KQ_FILTER_WRITE
readable = (event.filter == select.KQ_FILTER_READ and
event.data > 0)
# It seems to break some of our test cases if we
# immediately close the socket on EOF after reading
# some data. It may be possible to avoid by tweaking
# the test, but unless we can be sure we'll hold off.
closed = (not readable and
(event.flags & select.KQ_EV_EOF))
self._process_fd(fd, writable, readable, closed)
def _process_fd(self, fd, writable, readable, closed):
'''Process a single FD: unified subroutine of run_kqueue/poller.
closed can be True only in the case of kqueue. This is essentially
private but is defined as if it were "protected" so it's callable
from tests.
'''
# We need to check if FD is still in the sockets dict, because
# it's possible that the socket has been "killed" while processing
# other FDs; it's even possible it's killed within this method.
if writable and fd in self.sockets:
self.__process_write(fd)
if readable and fd in self.sockets:
self.process_packet(fd, self.sockets[fd])
if closed and fd in self.sockets:
self.kill_socket(fd, self.sockets[fd])
self.process_packet(fd, self.sockets[fd])
for fd in write_ready:
self._process_write(fd)
def stop(self):
# Signal it should terminate.
......
......@@ -85,17 +85,6 @@ Debug message. The listener is trying to open a listening socket.
Debug message. The message queue successfully opened a listening socket and
waits for incoming connections.
% MSGQ_POLL_ERROR Error while polling for events: %1
A low-level error happened when waiting for events, the error is logged. The
reason for this varies, but it usually means the system is short on some
resources.
% MSGQ_POLL_UNKNOWN_EVENT Got an unknown event from the poller for fd %1: %2
An unknown event got out from the poll() system call. This should generally not
happen and it is either a programmer error or OS bug. The event is ignored. The
number noted as the event is the raw encoded value, which might be useful to
the authors when figuring the problem out.
% MSGQ_RECV_ERROR Error reading from socket %1: %2
There was a low-level error when reading from a socket. The error is logged and
the corresponding socket is dropped. The errors include receiving
......@@ -119,6 +108,11 @@ on shutdown unless there's really something unexpected.
% MSGQ_RECV_HDR Received header: %1
Debug message. This message includes the whole routing header of a packet.
% MSGQ_SELECT_ERROR Error while waiting for events: %1
A low-level error happened when waiting for events, the error is logged. The
reason for this varies, but it usually means the system is short on some
resources.
% MSGQ_SEND_ERROR Error while sending to socket %1: %2
There was a low-level error when sending data to a socket. The error is logged
and the corresponding socket is dropped.
......
......@@ -274,8 +274,6 @@ class MsgQTest(unittest.TestCase):
sock = Sock(1)
return notifications, sock
@unittest.skipUnless('POLLIN' in select.__dict__,
'cannot perform tests requiring select.poll')
def test_notifies(self):
"""
Test the message queue sends notifications about connecting,
......@@ -315,8 +313,6 @@ class MsgQTest(unittest.TestCase):
self.__msgq.kill_socket(sock.fileno(), sock)
self.assertEqual([('disconnected', {'client': lname})], notifications)
@unittest.skipUnless('POLLIN' in select.__dict__,
'cannot perform tests requiring select.poll')
def test_notifies_implicit_kill(self):
"""
Test that the unsubscription notifications are sent before the socket
......@@ -579,7 +575,6 @@ class SendNonblock(unittest.TestCase):
'''
msgq = MsgQ()
# We do only partial setup, so we don't create the listening socket
msgq.setup_poller()
(read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
msgq.register_socket(write)
self.assertEqual(1, len(msgq.lnames))
......@@ -677,7 +672,6 @@ class SendNonblock(unittest.TestCase):
queue_pid = os.fork()
if queue_pid == 0:
signal.alarm(120)
msgq.setup_poller()
msgq.setup_signalsock()
msgq.register_socket(queue)
msgq.run()
......@@ -754,7 +748,6 @@ class SendNonblock(unittest.TestCase):
msgq = MsgQ()
# Don't need a listen_socket
msgq.listen_socket = DummySocket
msgq.setup_poller()
msgq.setup_signalsock()
msgq.register_socket(write)
msgq.register_socket(control_write)
......@@ -997,9 +990,11 @@ class SocketTests(unittest.TestCase):
self.__killed_socket = None
self.__logger = self.LoggerWrapper(msgq.logger)
msgq.logger = self.__logger
self.__orig_select = msgq.select.select
def tearDown(self):
msgq.logger = self.__logger.orig_logger
msgq.select.select = self.__orig_select
def test_send_data(self):
# Successful case: _send_data() returns the hardcoded value, and
......@@ -1047,32 +1042,6 @@ class SocketTests(unittest.TestCase):
self.assertEqual(expected_errors, self.__logger.error_called)
self.assertEqual(expected_warns, self.__logger.warn_called)
def test_process_fd_read_after_bad_write(self):
'''Check the specific case of write fail followed by read attempt.
The write failure results in kill_socket, then read shouldn't tried.
'''
self.__sock_error.errno = errno.EPIPE
self.__sock.ex_on_send = self.__sock_error
self.__msgq.process_socket = None # if called, trigger an exception
self.__msgq._process_fd(42, True, True, False) # shouldn't crash
# check the socket is deleted from the fileno=>sock dictionary
self.assertEqual({}, self.__msgq.sockets)
def test_process_fd_close_after_bad_write(self):
'''Similar to the previous, but for checking dup'ed kill attempt'''
self.__sock_error.errno = errno.EPIPE
self.__sock.ex_on_send = self.__sock_error
self.__msgq._process_fd(42, True, False, True) # shouldn't crash
self.assertEqual({}, self.__msgq.sockets)
def test_process_fd_writer_after_close(self):
'''Emulate a "writable" socket has been already closed and killed.'''
# This just shouldn't crash
self.__msgq._process_fd(4200, True, False, False)
def test_process_packet(self):
'''Check some failure cases in handling an incoming message.'''
expected_errors = 0
......@@ -1106,6 +1075,47 @@ class SocketTests(unittest.TestCase):
self.assertEqual(expected_errors, self.__logger.error_called)
self.assertEqual(expected_debugs, self.__logger.debug_called)
def test_do_select(self):
"""
Check the behaviour of the run_select method.
In particular, check that we skip writing to the sockets we read,
because a read may have side effects (like closing the socket) and
we want to prevent strange behavior.
"""
self.__read_called = []
self.__write_called = []
self.__reads = None
self.__writes = None
def do_read(fd, socket):
self.__read_called.append(fd)
self.__msgq.running = False
def do_write(fd):
self.__write_called.append(fd)
self.__msgq.running = False
self.__msgq.process_packet = do_read
self.__msgq._process_write = do_write
self.__msgq.fd_to_lname = {42: 'lname', 44: 'other', 45: 'unused'}
# The do_select does index it, but just passes the value. So reuse
# the dict to safe typing in the test.
self.__msgq.sockets = self.__msgq.fd_to_lname
self.__msgq.sendbuffs = {42: 'data', 43: 'data'}
def my_select(reads, writes, errors):
self.__reads = reads
self.__writes = writes
self.assertEqual([], errors)
return ([42, 44], [42, 43], [])
msgq.select.select = my_select
self.__msgq.listen_socket = DummySocket
self.__msgq.running = True
self.__msgq.run_select()
self.assertEqual([42, 44], self.__read_called)
self.assertEqual([43], self.__write_called)
self.assertEqual({42, 44, 45}, set(self.__reads))
self.assertEqual({42, 43}, set(self.__writes))
if __name__ == '__main__':
isc.log.resetUnitTestRootLogger()
unittest.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment