Commit 3ef6ad20 authored by Jelte Jansen's avatar Jelte Jansen
Browse files

use unix domain socket instead of localhost:9912

on the msgq and python side, the socket file is either:

- what is speficied as an argument to the constructor, OR
- environment variable BIND10_MSGQ_SOCKET_FILE, OR
- default (@localstatedir@/msgq_socket)

in that order.

two open items:

TODO: run_X scripts should set a local file in BIND10_MSGQ_SOCKET_FILE (so if installed dirs don't exist run from source fails right now)
TODO: the c++ boost::asio version has no 'manual override', only environment variable or default. (which is better than what it was before, only hardcoded port 9912), fixing this would mean an API change somewhere


git-svn-id: svn://bind10.isc.org/svn/bind10/branches/trac183@1901 e5f2f494-b856-4b98-b285-d166d9295462
parent 91923a0d
......@@ -436,7 +436,9 @@ AC_OUTPUT([src/bin/cfgmgr/b10-cfgmgr.py
src/lib/python/isc/config/tests/config_test
src/lib/python/isc/cc/tests/cc_test
src/lib/dns/gen-rdatacode.py
src/lib/python/isc/cc/session.py
src/lib/dns/tests/testdata/gen-wiredata.py
src/lib/cc/session_config.h.pre
], [
chmod +x src/bin/cmdctl/run_b10-cmdctl.sh
chmod +x src/bin/xfrin/run_b10-xfrin.sh
......
......@@ -158,24 +158,26 @@ class ProcessInfo:
class BoB:
"""Boss of BIND class."""
def __init__(self, c_channel_port=9912, auth_port=5300, verbose=False):
def __init__(self, msgq_socket_file=None, auth_port=5300, verbose=False):
"""Initialize the Boss of BIND. This is a singleton (only one
can run).
The c_channel_port specifies the TCP/IP port that the msgq
process listens on. If verbose is True, then the boss reports
what it is doing.
The msgq_socket_file specifies the UNIX domain socket file
that the msgq process listens on.
If verbose is True, then the boss reports what it is doing.
"""
self.verbose = verbose
self.c_channel_port = c_channel_port
self.msgq_socket_file = msgq_socket_file
self.auth_port = auth_port
self.cc_session = None
self.ccs = None
self.processes = {}
self.dead_processes = {}
self.runnable = False
os.environ['ISC_MSGQ_PORT'] = str(self.c_channel_port)
if self.msgq_socket_file is not None:
os.environ['BIND10_MSGQ_SOCKET_FILE'] = str(self.msgq_socket_file)
def config_handler(self, new_config):
if self.verbose:
......@@ -220,20 +222,20 @@ class BoB:
"""
# try to connect to the c-channel daemon,
# to see if it is already running
c_channel_env = { "ISC_MSGQ_PORT": str(self.c_channel_port), }
c_channel_env = { "BIND10_MSGQ_SOCKET_FILE": self.msgq_socket_file }
if self.verbose:
sys.stdout.write("Checking for already running b10-msgq\n")
# try to connect, and if we can't wait a short while
try:
self.cc_session = isc.cc.Session(self.c_channel_port)
return "b10-msgq already running, cannot start"
self.cc_session = isc.cc.Session(self.msgq_socket_file)
return "b10-msgq already running, or socket file not cleaned , cannot start"
except isc.cc.session.SessionError:
pass
# start the c-channel daemon
if self.verbose:
sys.stdout.write("Starting b10-msgq using port %d\n" %
self.c_channel_port)
sys.stdout.write("Starting b10-msgq using domain socket %s\n" %
self.msgq_socket_file)
try:
c_channel = ProcessInfo("b10-msgq", ["b10-msgq"], c_channel_env,
True, not self.verbose)
......@@ -252,7 +254,7 @@ class BoB:
return "Unable to connect to c-channel after 5 seconds"
# try to connect, and if we can't wait a short while
try:
self.cc_session = isc.cc.Session(self.c_channel_port)
self.cc_session = isc.cc.Session(self.msgq_socket_file)
except isc.cc.session.SessionError:
time.sleep(0.1)
#self.cc_session.group_subscribe("Boss", "boss")
......@@ -262,7 +264,7 @@ class BoB:
sys.stdout.write("[bind10] Starting b10-cfgmgr\n")
try:
bind_cfgd = ProcessInfo("b10-cfgmgr", ["b10-cfgmgr"],
{ 'ISC_MSGQ_PORT': str(self.c_channel_port)})
{ 'BIND10_MSGQ_SOCKET_FILE': str(self.msgq_socket_file)})
except Exception as e:
c_channel.process.kill()
return "Unable to start b10-cfgmgr; " + str(e)
......@@ -290,7 +292,7 @@ class BoB:
sys.stdout.write("Starting b10-xfrout\n")
try:
xfrout = ProcessInfo("b10-xfrout", ["b10-xfrout"],
{ 'ISC_MSGQ_PORT': str(self.c_channel_port)})
{ 'BIND10_MSGQ_SOCKET_FILE': str(self.msgq_socket_file)})
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
......@@ -308,7 +310,7 @@ class BoB:
authargs += ['-v']
try:
auth = ProcessInfo("b10-auth", authargs,
{ 'ISC_MSGQ_PORT': str(self.c_channel_port)})
{ 'BIND10_MSGQ_SOCKET_FILE': str(self.msgq_socket_file)})
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
......@@ -323,7 +325,7 @@ class BoB:
sys.stdout.write("Starting b10-xfrin\n")
try:
xfrind = ProcessInfo("b10-xfrin", ['b10-xfrin'],
{ 'ISC_MSGQ_PORT': str(self.c_channel_port)})
{ 'BIND10_MSGQ_SOCKET_FILE': str(self.msgq_socket_file)})
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
......@@ -340,7 +342,7 @@ class BoB:
sys.stdout.write("Starting b10-cmdctl on port 8080\n")
try:
cmd_ctrld = ProcessInfo("b10-cmdctl", ['b10-cmdctl'],
{ 'ISC_MSGQ_PORT': str(self.c_channel_port)})
{ 'BIND10_MSGQ_SOCKET_FILE': str(self.msgq_socket_file)})
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
......@@ -582,9 +584,9 @@ def main():
parser.add_option("-p", "--port", dest="auth_port", type="string",
action="callback", callback=check_port, default="5300",
help="port the b10-auth daemon will use (default 5300)")
parser.add_option("-m", "--msgq-port", dest="msgq_port", type="string",
action="callback", callback=check_port, default="9912",
help="port the b10-msgq daemon will use (default 9912)")
parser.add_option("-m", "--msgq-socket-file", dest="msgq_socket_file",
type="string", default=isc.cc.Session.SOCKET_FILE,
help="UNIX domain socket file the b10-msgq daemon will use")
(options, args) = parser.parse_args()
# Announce startup.
......@@ -607,7 +609,7 @@ def main():
signal.signal(signal.SIGTERM, fatal_signal)
# Go bob!
boss_of_bind = BoB(int(options.msgq_port), int(options.auth_port),
boss_of_bind = BoB(options.msgq_socket_file, int(options.auth_port),
options.verbose)
startup_result = boss_of_bind.startup()
if startup_result:
......
......@@ -86,25 +86,33 @@ class SubscriptionManager:
class MsgQ:
"""Message Queue class."""
def __init__(self, port=0, verbose=False):
# did we find a better way to do this?
SOCKET_FILE = os.path.join("@localstatedir@",
"@PACKAGE_NAME@",
"msgq_socket").replace("${prefix}",
"@prefix@")
def __init__(self, socket_file=None, verbose=False):
"""Initialize the MsgQ master.
The port specifies the TCP/IP port that the msgq
process listens on. If verbose is True, then the MsgQ reports
The socket_file specifies the path to the UNIX domain socket
that the msgq process listens on. If it is None, the
environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
is not set, it will default to
@localstatedir@/@PACKAGE_NAME@/msg_socket.
If verbose is True, then the MsgQ reports
what it is doing.
"""
if port == 0:
if 'ISC_MSGQ_PORT' in os.environ:
port = int(os.environ["ISC_MSGQ_PORT"])
else:
port = 9912
print(port)
if socket_file is None:
if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
else:
self.socket_file = self.SOCKET_FILE
else:
self.socket_file = socket_file
self.verbose = verbose
self.c_channel_port = port
self.poller = None
self.kqueue = None
self.runnable = False
......@@ -131,10 +139,23 @@ class MsgQ:
def setup_listener(self):
"""Set up the listener socket. Internal function."""
self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listen_socket.bind(("127.0.0.1", self.c_channel_port))
self.listen_socket.listen(1024)
self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
#self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#self.listen_socket.bind(("127.0.0.1", self.c_channel_port))
if os.path.exists(self.socket_file):
os.remove(self.socket_file)
try:
print("[XX] SOCKET FILE: " + self.socket_file)
self.listen_socket.bind(self.socket_file)
print("[XX] LISTENING ON SOCKET FILE: " + self.socket_file)
self.listen_socket.listen(1024)
except Exception as e:
# remove the file again if something goes wrong
# (note this is a catch-all, but we reraise it)
if os.path.exists(self.socket_file):
os.remove(self.socket_file)
raise e
if self.poller:
self.poller.register(self.listen_socket, select.POLLIN)
......@@ -142,7 +163,10 @@ class MsgQ:
self.add_kqueue_socket(self.listen_socket)
def setup(self):
"""Configure listener socket, polling, etc."""
"""Configure listener socket, polling, etc.
Raises a socket.error if the socket_file cannot be
created.
"""
self.setup_poller()
self.setup_listener()
......@@ -366,6 +390,8 @@ class MsgQ:
if self.verbose:
sys.stdout.write("Stopping the server.\n")
self.listen_socket.close()
if os.path.exists(self.socket_file):
os.remove(self.socket_file)
# can signal handling and calling a destructor be done without a
# global variable?
......@@ -389,9 +415,9 @@ if __name__ == "__main__":
parser = OptionParser(version=__version__)
parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
help="display more about what is going on")
parser.add_option("-m", "--msgq-port", dest="msgq_port", type="string",
action="callback", callback=check_port, default="0",
help="port the msgq daemon will use")
parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
type="string", default=None,
help="UNIX domain socket file the msgq daemon will use")
(options, args) = parser.parse_args()
signal.signal(signal.SIGTERM, signal_handler)
......@@ -400,7 +426,7 @@ if __name__ == "__main__":
if options.verbose:
sys.stdout.write("MsgQ %s\n" % __version__)
msgq = MsgQ(int(options.msgq_port), options.verbose)
msgq = MsgQ(options.msgq_socket_file, options.verbose)
setup_result = msgq.setup()
if setup_result:
......
......@@ -2,6 +2,7 @@ from msgq import SubscriptionManager, MsgQ
import unittest
import os
import socket
#
# Currently only the subscription part is implemented... I'd have to mock
......@@ -62,6 +63,7 @@ class TestSubscriptionManager(unittest.TestCase):
def test_open_socket_parameter(self):
self.assertFalse(os.path.exists("./my_socket_file"))
msgq = MsgQ("./my_socket_file");
msgq.setup()
self.assertTrue(os.path.exists("./my_socket_file"))
msgq.shutdown();
self.assertFalse(os.path.exists("./my_socket_file"))
......@@ -70,6 +72,7 @@ class TestSubscriptionManager(unittest.TestCase):
self.assertFalse(os.path.exists("my_socket_file"))
os.environ["BIND10_MSGQ_SOCKET_FILE"] = "./my_socket_file"
msgq = MsgQ();
msgq.setup()
self.assertTrue(os.path.exists("./my_socket_file"))
msgq.shutdown();
self.assertFalse(os.path.exists("./my_socket_file"))
......@@ -80,13 +83,15 @@ class TestSubscriptionManager(unittest.TestCase):
socket_file = MsgQ.SOCKET_FILE
self.assertFalse(os.path.exists(socket_file))
msgq = MsgQ();
self.assertTrue(os.path.exists("./my_socket_file"))
msgq.setup()
self.assertTrue(os.path.exists(socket_file))
msgq.shutdown();
self.assertFalse(os.path.exists("./my_socket_file"))
self.assertFalse(os.path.exists(socket_file))
pass
def test_open_socket_bad(self):
self.assertRaises(Exception, MsgQ("/does/not/exist"))
msgq = MsgQ("/does/not/exist")
self.assertRaises(socket.error, msgq.setup)
pass
if __name__ == '__main__':
......
......@@ -5,6 +5,11 @@ libcc_a_SOURCES = data.cc data.h session.cc session.h
CLEANFILES = *.gcno *.gcda
session_config.h: session_config.h.pre
$(SED) -e "s|@@LOCALSTATEDIR@@|$(localstatedir)|" session_config.h.pre >$@
BUILT_SOURCES = session_config.h
TESTS =
if HAVE_GTEST
TESTS += run_unittests
......
......@@ -88,7 +88,7 @@ private:
private:
io_service& io_service_;
tcp::socket socket_;
boost::asio::local::stream_protocol::socket socket_;
uint32_t data_length_;
boost::function<void()> user_handler_;
boost::system::error_code error_;
......@@ -96,10 +96,13 @@ private:
void
ASIOSession::establish() {
socket_.connect(tcp::endpoint(boost::asio::ip::address_v4::loopback(),
9912), error_);
const char *socket_file = getenv("BIND10_MSGQ_SOCKET_FILE");
if (!socket_file) {
socket_file = BIND10_MSGQ_SOCKET_FILE;
}
socket_.connect(boost::asio::local::stream_protocol::endpoint(socket_file), error_);
if (error_) {
isc_throw(SessionError, "Unable to connect to message queue");
isc_throw(SessionError, "Unable to connect to message queue.");
}
}
......@@ -215,27 +218,22 @@ public:
void
SocketSession::establish() {
int s;
struct sockaddr_in sin;
struct sockaddr_un sun;
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
s = socket(AF_UNIX, SOCK_STREAM, IPPROTO_TCP);
if (s < 0) {
isc_throw(SessionError, "socket() failed");
}
int port = atoi(getenv("ISC_MSGQ_PORT"));
if (port == 0) {
port = 9912;
const char *socket_file = getenv("BIND10_MSGQ_SOCKET_FILE");
if (!socket_file) {
socket_file = BIND10_MSGQ_SOCKET_FILE;
}
sin.sin_family = AF_INET;
sin.sin_port = htons(port);
sin.sin_addr.s_addr = INADDR_ANY;
#ifdef HAVE_SIN_LEN
sin.sin_len = sizeof(struct sockaddr_in);
#endif
sun.sun_family = AF_UNIX;
strncpy(sun.sun_path, socket_file, 107);
if (connect(s, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
if (connect(s, (struct sockaddr *)&sun, sizeof(sun)) < 0) {
close(s);
isc_throw(SessionError, "Unable to connect to message queue");
}
......
......@@ -24,6 +24,7 @@
#include <exceptions/exceptions.h>
#include "data.h"
#include "session_config.h"
namespace boost {
namespace asio {
......
#define BIND10_MSGQ_SOCKET_FILE "@@LOCALSTATEDIR@@/@PACKAGE@/msgq_socket"
......@@ -26,7 +26,12 @@ class NetworkError(Exception): pass
class SessionError(Exception): pass
class Session:
def __init__(self, port=0):
SOCKET_FILE = os.path.join("@localstatedir@",
"@PACKAGE_NAME@",
"msgq_socket").replace("${prefix}",
"@prefix@")
def __init__(self, socket_file=None):
self._socket = None
self._lname = None
self._recvbuffer = bytearray()
......@@ -36,18 +41,24 @@ class Session:
self._queue = []
self._lock = threading.RLock()
if port == 0:
if 'ISC_MSGQ_PORT' in os.environ:
port = int(os.environ["ISC_MSGQ_PORT"])
else:
port = 9912
if socket_file is None:
if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
else:
self.socket_file = self.SOCKET_FILE
else:
self.socket_file = socket_file
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect(tuple(['127.0.0.1', port]))
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
print("[XX] SOCKET FILE TO CONNECT TO: " + str(self.socket_file))
self._socket.connect(self.socket_file)
print("[XX] CONNECTED")
self.sendmsg({ "type": "getlname" })
print("[XX] MSG SENT")
env, msg = self.recvmsg(False)
print("[XX] MSG RECEIVED")
if not env:
raise ProtocolError("Could not get local name")
self._lname = msg["lname"]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment