Commit 318dceaa authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

Merge branch #802

Conflicts:
	src/lib/python/isc/bind10/socket_cache.py
parents 83ecce54 25ac2455
......@@ -99,6 +99,12 @@ The boss module is sending a kill signal to process with the given name,
as part of the process of killing all started processes during a failed
startup, as described for BIND10_KILLING_ALL_PROCESSES
% BIND10_LOST_SOCKET_CONSUMER consumer %1 of sockets disconnected, considering all its sockets closed
A connection from one of the applications which requested a socket was
closed. This means the application has terminated, so all the sockets it was
using are now closed and bind10 process can release them as well, unless the
same sockets are used by yet another application.
% BIND10_MSGQ_ALREADY_RUNNING msgq daemon already running, cannot start
There already appears to be a message bus daemon running. Either an
old process was not shut down correctly, and needs to be killed, or
......@@ -110,6 +116,11 @@ While listening on the message bus channel for messages, it suddenly
disappeared. The msgq daemon may have died. This might lead to an
inconsistent state of the system, and BIND 10 will now shut down.
% BIND10_NO_SOCKET couldn't send a socket for token %1 because of error: %2
An error occurred when the bind10 process was asked to send a socket file
descriptor. The error is mentioned, most common reason is that the request
is invalid and may not come from bind10 process at all.
% BIND10_PROCESS_ENDED process %2 of %1 ended with status %3
This indicates a process started previously terminated. The process id
and component owning the process are indicated, as well as the exit code.
......
......@@ -72,6 +72,9 @@ import isc.log
from isc.log_messages.bind10_messages import *
import isc.bind10.component
import isc.bind10.special_component
import isc.bind10.socket_cache
import libutil_io_python
import tempfile
isc.log.init("b10-boss")
logger = isc.log.Logger("boss")
......@@ -81,6 +84,10 @@ logger = isc.log.Logger("boss")
DBG_PROCESS = logger.DBGLVL_TRACE_BASIC
DBG_COMMANDS = logger.DBGLVL_TRACE_DETAIL
# Messages sent over the unix domain socket to indicate if it is followed by a real socket
CREATOR_SOCKET_OK = "1\n"
CREATOR_SOCKET_UNAVAILABLE = "0\n"
# Assign this process some longer name
isc.util.process.rename(sys.argv[0])
......@@ -241,6 +248,12 @@ class BoB:
# If -v was set, enable full debug logging.
if self.verbose:
logger.set_severity("DEBUG", 99)
# This is set in init_socket_srv
self._socket_path = None
self._socket_cache = None
self._tmpdir = None
self._srv_socket = None
self._unix_sockets = {}
def __propagate_component_config(self, config):
comps = dict(config)
......@@ -315,6 +328,18 @@ class BoB:
elif command == "show_processes":
answer = isc.config.ccsession. \
create_answer(0, self.get_processes())
elif command == "get_socket":
answer = self._get_socket(args)
elif command == "drop_socket":
if "token" not in args:
answer = isc.config.ccsession. \
create_answer(1, "Missing token parameter")
else:
try:
self._socket_cache.drop_socket(args["token"])
answer = isc.config.ccsession.create_answer(0)
except Exception as e:
answer = isc.config.ccsession.create_answer(1, str(e))
else:
answer = isc.config.ccsession.create_answer(1,
"Unknown command")
......@@ -769,6 +794,209 @@ class BoB:
return next_restart_time
def _get_socket(self, args):
"""
Implementation of the get_socket CC command. It asks the cache
to provide the token and sends the information back.
"""
try:
try:
addr = isc.net.parse.addr_parse(args['address'])
port = isc.net.parse.port_parse(args['port'])
protocol = args['protocol']
if protocol not in ['UDP', 'TCP']:
raise ValueError("Protocol must be either UDP or TCP")
share_mode = args['share_mode']
if share_mode not in ['ANY', 'SAMEAPP', 'NO']:
raise ValueError("Share mode must be one of ANY, SAMEAPP" +
" or NO")
share_name = args['share_name']
except KeyError as ke:
return \
isc.config.ccsession.create_answer(1,
"Missing parameter " +
str(ke))
# FIXME: This call contains blocking IPC. It is expected to be
# short, but if it turns out to be problem, we'll need to do
# something about it.
token = self._socket_cache.get_token(protocol, addr, port,
share_mode, share_name)
return isc.config.ccsession.create_answer(0, {
'token': token,
'path': self._socket_path
})
except Exception as e:
return isc.config.ccsession.create_answer(1, str(e))
def socket_request_handler(self, token, unix_socket):
"""
This function handles a token that comes over a unix_domain socket.
The function looks into the _socket_cache and sends the socket
identified by the token back over the unix_socket.
"""
try:
fd = self._socket_cache.get_socket(token, unix_socket.fileno())
# FIXME: These two calls are blocking in their nature. An OS-level
# buffer is likely to be large enough to hold all these data, but
# if it wasn't and the remote application got stuck, we would have
# a problem. If there appear such problems, we should do something
# about it.
unix_socket.sendall(CREATOR_SOCKET_OK)
libutil_io_python.send_fd(unix_socket.fileno(), fd)
except Exception as e:
logger.info(BIND10_NO_SOCKET, token, e)
unix_socket.sendall(CREATOR_SOCKET_UNAVAILABLE)
def socket_consumer_dead(self, unix_socket):
"""
This function handles when a unix_socket closes. This means all
sockets sent to it are to be considered closed. This function signals
so to the _socket_cache.
"""
logger.info(BIND10_LOST_SOCKET_CONSUMER, unix_socket.fileno())
try:
self._socket_cache.drop_application(unix_socket.fileno())
except ValueError:
# This means the application holds no sockets. It's harmless, as it
# can happen in real life - for example, it requests a socket, but
# get_socket doesn't find it, so the application dies. It should be
# rare, though.
pass
def set_creator(self, creator):
"""
Registeres a socket creator into the boss. The socket creator is not
used directly, but through a cache. The cache is created in this
method.
If called more than once, it raises a ValueError.
"""
if self._socket_cache is not None:
raise ValueError("A creator was inserted previously")
self._socket_cache = isc.bind10.socket_cache.Cache(creator)
def init_socket_srv(self):
"""
Creates and listens on a unix-domain socket to be able to send out
the sockets.
This method should be called after switching user, or the switched
applications won't be able to access the socket.
"""
self._srv_socket = socket.socket(socket.AF_UNIX)
# We create a temporary directory somewhere safe and unique, to avoid
# the need to find the place ourself or bother users. Also, this
# secures the socket on some platforms, as it creates a private
# directory.
self._tmpdir = tempfile.mkdtemp()
# Get the name
self._socket_path = os.path.join(self._tmpdir, "sockcreator")
# And bind the socket to the name
self._srv_socket.bind(self._socket_path)
self._srv_socket.listen(5)
def remove_socket_srv(self):
"""
Closes and removes the listening socket and the directory where it
lives, as we created both.
It does nothing if the _srv_socket is not set (eg. it was not yet
initialized).
"""
if self._srv_socket is not None:
self._srv_socket.close()
os.remove(self._socket_path)
os.rmdir(self._tmpdir)
def _srv_accept(self):
"""
Accept a socket from the unix domain socket server and put it to the
others we care about.
"""
socket = self._srv_socket.accept()
self._unix_sockets[socket.fileno()] = (socket, b'')
def _socket_data(self, socket_fileno):
"""
This is called when a socket identified by the socket_fileno needs
attention. We try to read data from there. If it is closed, we remove
it.
"""
(sock, previous) = self._unix_sockets[socket_fileno]
while True:
try:
data = sock.recv(1, socket.MSG_DONTWAIT)
except socket.error as se:
# These two might be different on some systems
if se.errno == errno.EAGAIN or se.errno == errno.EWOULDBLOCK:
# No more data now. Oh, well, just store what we have.
self._unix_sockets[socket_fileno] = (sock, previous)
return
else:
data = b'' # Pretend it got closed
if len(data) == 0: # The socket got to it's end
del self._unix_sockets[socket_fileno]
self.socket_consumer_dead(sock)
sock.close()
return
else:
if data == b"\n":
# Handle this token and clear it
self.socket_request_handler(previous, sock)
previous = b''
else:
previous += data
def run(self, wakeup_fd):
"""
The main loop, waiting for sockets, commands and dead processes.
Runs as long as the runnable is true.
The wakeup_fd descriptor is the read end of pipe where CHLD signal
handler writes.
"""
ccs_fd = self.ccs.get_socket().fileno()
while self.runnable:
# clean up any processes that exited
self.reap_children()
next_restart = self.restart_processes()
if next_restart is None:
wait_time = None
else:
wait_time = max(next_restart - time.time(), 0)
# select() can raise EINTR when a signal arrives,
# even if they are resumable, so we have to catch
# the exception
try:
(rlist, wlist, xlist) = \
select.select([wakeup_fd, ccs_fd,
self._srv_socket.fileno()] +
list(self._unix_sockets.keys()), [], [],
wait_time)
except select.error as err:
if err.args[0] == errno.EINTR:
(rlist, wlist, xlist) = ([], [], [])
else:
logger.fatal(BIND10_SELECT_ERROR, err)
break
for fd in rlist + xlist:
if fd == ccs_fd:
try:
self.ccs.check_command()
except isc.cc.session.ProtocolError:
logger.fatal(BIND10_MSGQ_DISAPPEARED)
self.runnable = False
break
elif fd == wakeup_fd:
os.read(wakeup_fd, 32)
elif fd == self._srv_socket.fileno():
self._srv_accept()
elif fd in self._unix_sockets:
self._socket_data(fd)
# global variables, needed for signal handlers
options = None
boss_of_bind = None
......@@ -931,60 +1159,32 @@ def main():
# Block SIGPIPE, as we don't want it to end this process
signal.signal(signal.SIGPIPE, signal.SIG_IGN)
# Go bob!
boss_of_bind = BoB(options.msgq_socket_file, options.data_path,
options.config_file, options.nocache, options.verbose,
setuid, username, options.cmdctl_port,
options.wait_time)
startup_result = boss_of_bind.startup()
if startup_result:
logger.fatal(BIND10_STARTUP_ERROR, startup_result)
sys.exit(1)
logger.info(BIND10_STARTUP_COMPLETE)
dump_pid(options.pid_file)
# In our main loop, we check for dead processes or messages
# on the c-channel.
wakeup_fd = wakeup_pipe[0]
ccs_fd = boss_of_bind.ccs.get_socket().fileno()
while boss_of_bind.runnable:
# clean up any processes that exited
boss_of_bind.reap_children()
next_restart = boss_of_bind.restart_processes()
if next_restart is None:
wait_time = None
else:
wait_time = max(next_restart - time.time(), 0)
# select() can raise EINTR when a signal arrives,
# even if they are resumable, so we have to catch
# the exception
try:
(rlist, wlist, xlist) = select.select([wakeup_fd, ccs_fd], [], [],
wait_time)
except select.error as err:
if err.args[0] == errno.EINTR:
(rlist, wlist, xlist) = ([], [], [])
else:
logger.fatal(BIND10_SELECT_ERROR, err)
break
for fd in rlist + xlist:
if fd == ccs_fd:
try:
boss_of_bind.ccs.check_command()
except isc.cc.session.ProtocolError:
logger.fatal(BIND10_MSGQ_DISAPPEARED)
self.runnable = False
break
elif fd == wakeup_fd:
os.read(wakeup_fd, 32)
# shutdown
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
boss_of_bind.shutdown()
unlink_pid_file(options.pid_file)
sys.exit(0)
try:
# Go bob!
boss_of_bind = BoB(options.msgq_socket_file, options.data_path,
options.config_file, options.nocache,
options.verbose, setuid, username,
options.cmdctl_port, options.wait_time)
startup_result = boss_of_bind.startup()
if startup_result:
logger.fatal(BIND10_STARTUP_ERROR, startup_result)
sys.exit(1)
boss_of_bind.init_socket_srv()
logger.info(BIND10_STARTUP_COMPLETE)
dump_pid(options.pid_file)
# Let it run
boss_of_bind.run(wakeup_pipe[0])
# shutdown
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
boss_of_bind.shutdown()
finally:
# Clean up the filesystem
unlink_pid_file(options.pid_file)
if boss_of_bind is not None:
boss_of_bind.remove_socket_srv()
sys.exit(boss_of_bind.exitcode)
if __name__ == "__main__":
main()
......@@ -13,7 +13,11 @@
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# Most of the time, we omit the "bind10_src" for brevity. Sometimes,
# we want to be explicit about what we do, like when hijacking a library
# call used by the bind10_src.
from bind10_src import ProcessInfo, BoB, parse_args, dump_pid, unlink_pid_file, _BASETIME
import bind10_src
# XXX: environment tests are currently disabled, due to the preprocessor
# setup that we have now complicating the environment
......@@ -28,6 +32,8 @@ from isc.net.addr import IPAddr
import time
import isc
import isc.log
import isc.bind10.socket_cache
import errno
from isc.testutils.parse_args import TestOptParser, OptsError
......@@ -97,6 +103,232 @@ class TestProcessInfo(unittest.TestCase):
self.assertTrue(type(pi.pid) is int)
self.assertNotEqual(pi.pid, old_pid)
class TestCacheCommands(unittest.TestCase):
"""
Test methods of boss related to the socket cache and socket handling.
"""
def setUp(self):
"""
Prepare the boss for some tests.
Also prepare some variables we need.
"""
self.__boss = BoB()
# Fake the cache here so we can pretend it is us and hijack the
# calls to its methods.
self.__boss._socket_cache = self
self.__boss._socket_path = '/socket/path'
self.__raise_exception = None
self.__socket_args = {
"port": 53,
"address": "::",
"protocol": "UDP",
"share_mode": "ANY",
"share_name": "app"
}
# What was and wasn't called.
self.__drop_app_called = None
self.__get_socket_called = None
self.__send_fd_called = None
self.__get_token_called = None
self.__drop_socket_called = None
bind10_src.libutil_io_python.send_fd = self.__send_fd
def __send_fd(self, to, socket):
"""
A function to hook the send_fd in the bind10_src.
"""
self.__send_fd_called = (to, socket)
class FalseSocket:
"""
A socket where we can fake methods we need instead of having a real
socket.
"""
def __init__(self):
self.send = ""
def fileno(self):
"""
The file number. Used for identifying the remote application.
"""
return 42
def sendall(self, data):
"""
Adds data to the self.send.
"""
self.send += data
def drop_application(self, application):
"""
Part of pretending to be the cache. Logs the parameter to
self.__drop_app_called.
In the case self.__raise_exception is set, the exception there
is raised instead.
"""
if self.__raise_exception is not None:
raise self.__raise_exception
self.__drop_app_called = application
def test_consumer_dead(self):
"""
Test that it calls the drop_application method of the cache.
"""
self.__boss.socket_consumer_dead(self.FalseSocket())
self.assertEqual(42, self.__drop_app_called)
def test_consumer_dead_invalid(self):
"""
Test that it doesn't crash in case the application is not known to
the cache, the boss doesn't crash, as this actually can happen in
practice.
"""
self.__raise_exception = ValueError("This application is unknown")
# This doesn't crash
self.__boss.socket_consumer_dead(self.FalseSocket())
def get_socket(self, token, application):
"""
Part of pretending to be the cache. If there's anything in
__raise_exception, it is raised. Otherwise, the call is logged
into __get_socket_called and a number is returned.
"""
if self.__raise_exception is not None:
raise self.__raise_exception
self.__get_socket_called = (token, application)
return 13
def test_request_handler(self):
"""
Test that a request for socket is forwarded and the socket is sent
back, if it returns a socket.
"""
socket = self.FalseSocket()
# An exception from the cache
self.__raise_exception = ValueError("Test value error")
self.__boss.socket_request_handler("token", socket)
# It was called, but it threw, so it is not noted here
self.assertIsNone(self.__get_socket_called)
self.assertEqual("0\n", socket.send)
# It should not have sent any socket.
self.assertIsNone(self.__send_fd_called)
# Now prepare a valid scenario
self.__raise_exception = None
socket.send = ""
self.__boss.socket_request_handler("token", socket)
self.assertEqual("1\n", socket.send)
self.assertEqual((42, 13), self.__send_fd_called)
self.assertEqual(("token", 42), self.__get_socket_called)
def get_token(self, protocol, address, port, share_mode, share_name):
"""
Part of pretending to be the cache. If there's anything in
__raise_exception, it is raised. Otherwise, the parameters are
logged into __get_token_called and a token is returned.
"""
if self.__raise_exception is not None:
raise self.__raise_exception
self.__get_token_called = (protocol, address, port, share_mode,
share_name)
return "token"
def test_get_socket_ok(self):
"""
Test the successful scenario of getting a socket.
"""
result = self.__boss._get_socket(self.__socket_args)
[code, answer] = result['result']
self.assertEqual(0, code)
self.assertEqual({
'token': 'token',
'path': '/socket/path'
}, answer)
addr = self.__get_token_called[1]
self.assertTrue(isinstance(addr, IPAddr))
self.assertEqual("::", str(addr))
self.assertEqual(("UDP", addr, 53, "ANY", "app"),
self.__get_token_called)
def test_get_socket_error(self):
"""
Test that bad inputs are handled correctly, etc.
"""
def check_code(code, args):
"""
Pass the args there and check if it returns success or not.
The rest is not tested, as it is already checked in the
test_get_socket_ok.
"""
[rcode, ranswer] = self.__boss._get_socket(args)['result']
self.assertEqual(code, rcode)
if code == 1:
# This should be an error message. The exact formatting
# is unknown, but we check it is string at least
self.assertTrue(isinstance(ranswer, str))
def mod_args(name, value):
"""
Override a parameter in the args.
"""
result = dict(self.__socket_args)
result[name] = value
return result
# Port too large
check_code(1, mod_args('port', 65536))
# Not numeric address
check_code(1, mod_args('address', 'example.org.'))
# Some bad values of enum-like params
check_code(1, mod_args('protocol', 'BAD PROTO'))
check_code(1, mod_args('share_mode', 'BAD SHARE'))
# Check missing parameters
for param in self.__socket_args.keys():
args = dict(self.__socket_args)
del args[param]
check_code(1, args)
# These are OK values for the enum-like parameters
# The ones from test_get_socket_ok are not tested here
check_code(0, mod_args('protocol', 'TCP'))
check_code(0, mod_args('share_mode', 'SAMEAPP'))
check_code(0, mod_args('share_mode', 'NO'))
# If an exception is raised from within the cache, it is converted
# to an error, not propagated
self.__raise_exception = Exception("Test exception")
check_code(1, self.__socket_args)
def drop_socket(self, token):
"""
Part of pretending to be the cache. If there's anything in
__raise_exception, it is raised. Otherwise, the parameter is stored
in __drop_socket_called.
"""
if self.__raise_exception is not None:
raise self.__raise_exception
self.__drop_socket_called = token
def test_drop_socket(self):
"""
Check the drop_socket command. It should directly call the method
on the cache. Exceptions should be translated to error messages.
"""
# This should be OK and just propagated to the call.
self.assertEqual({"result": [0]},
self.__boss.command_handler("drop_socket",
{"token": "token"}))
self.assertEqual("token", self.__drop_socket_called)
self.__drop_socket_called = None
# Missing parameter
self.assertEqual({"result": [1, "Missing token parameter"]},
self.__boss.command_handler("drop_socket", {}))
self.assertIsNone(self.__drop_socket_called)
# An exception is raised from within the cache
self.__raise_exception = ValueError("Test error")
self.assertEqual({"result": [1, "Test error"]},
self.__boss.command_handler("drop_socket",
{"token": "token"}))
class TestBoB(unittest.TestCase):
def test_init(self):
bob = BoB()
......@@ -109,6 +341,22 @@ class TestBoB(unittest.TestCase):
self.assertEqual(bob.uid, None)
self.assertEqual(bob.username, None)
self.assertEqual(bob.nocache, False)
self.assertIsNone(bob._socket_cache)
def test_set_creator(self):
"""
Test the call to set_creator. First time, the cache is created
with the passed creator. The next time, it throws an exception.
"""
bob = BoB()
# The cache doesn't use it at start, so just create an empty class
class Creator: pass
creator = Creator()
bob.set_creator(creator)
self.assertTrue(isinstance(bob._socket_cache,
isc.bind10.socket_cache.Cache))