bind10.py 14.5 KB
Newer Older
1 2 3 4
"""\
This file implements the Boss of Bind (BoB, or bob) program.

It's purpose is to start up the BIND 10 system, and then manage the
Shane Kerr's avatar
Shane Kerr committed
5 6
processes, by starting and stopping processes, plus restarting
processes that exit.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21

To start the system, it first runs the c-channel program (msgq), then
connects to that. It then runs the configuration manager, and reads
its own configuration. Then it proceeds to starting other modules.

The Python subprocess module is used for starting processes, but
because this is not efficient for managing groups of processes,
SIGCHLD signals are caught and processed using the signal module.

Most of the logic is contained in the BoB class. However, since Python
requires that signal processing happen in the main thread, we do
signal handling outside of that class, in the code running for
__main__.
"""

22 23
# TODO: start up statistics thingy

24 25 26 27 28 29 30
import subprocess
import signal
import os
import sys
import re
import errno
import time
31
import select
Shane Kerr's avatar
Shane Kerr committed
32
import pprint
33 34
from optparse import OptionParser, OptionValueError

35
import ISC.CC
36 37

# This is the version that gets displayed to the user.
38
__version__ = "v20091030 (Paving the DNS Parking Lot)"
39

40 41 42
# Nothing at all to do with the 1990-12-10 article here:
# http://www.subgenius.com/subg-digest/v2/0056.html

43 44 45 46 47 48
class ProcessInfo:
    """Information about a process"""

    dev_null = open("/dev/null", "w")

    def _spawn(self):
49 50 51 52
        if self.dev_null_stdout:
            spawn_stdout = self.dev_null
        else:
            spawn_stdout = None
53 54
        spawn_env = self.env
        spawn_env['PATH'] = os.environ['PATH']
55
        spawn_env['PYTHON_EXEC'] = os.environ['PYTHON_EXEC']
56 57
        self.process = subprocess.Popen(self.args,
                                        stdin=subprocess.PIPE,
58 59
                                        stdout=spawn_stdout,
                                        stderr=spawn_stdout,
60
                                        close_fds=True,
61
                                        env=spawn_env,)
62 63
        self.pid = self.process.pid

64
    def __init__(self, name, args, env={}, dev_null_stdout=False):
65 66 67
        self.name = name 
        self.args = args
        self.env = env
68
        self.dev_null_stdout = dev_null_stdout
69 70 71 72 73
        self._spawn()

    def respawn(self):
        self._spawn()

74 75
class BoB:
    """Boss of BIND class."""
76
    def __init__(self, c_channel_port=9912, verbose=False):
77 78 79 80 81 82 83
        """Initialize the Boss of BIND. This is a singleton (only one
        can run).
        
        The c_channel_port specifies the TCP/IP port that the msgq
        process listens on. If verbose is True, then the boss reports
        what it is doing.
        """
84
        self.verbose = True
85
        self.c_channel_port = c_channel_port
86
        self.cc_session = None
87 88
        self.processes = {}
        self.dead_processes = {}
89
        self.runnable = False
90 91 92 93 94 95 96

    def startup(self):
        """Start the BoB instance.
 
        Returns None if successful, otherwise an string describing the
        problem.
        """
97
        # start the c-channel daemon
98
        if self.verbose:
99 100
            sys.stdout.write("Starting msgq using port %d\n" % 
                             self.c_channel_port)
101
        c_channel_env = { "ISC_MSGQ_PORT": str(self.c_channel_port), }
102
        try:
103
            c_channel = ProcessInfo("msgq", "msgq", c_channel_env, True)
104 105
        except Exception as e:
            return "Unable to start msgq; " + str(e)
106 107
        self.processes[c_channel.pid] = c_channel
        if self.verbose:
108
            sys.stdout.write("Started msgq (PID %d)\n" % c_channel.pid)
109 110 111 112 113 114

        # now connect to the c-channel
        cc_connect_start = time.time()
        while self.cc_session is None:
            # if we have been trying for "a while" give up
            if (time.time() - cc_connect_start) > 5:
115
                c_channel.process.kill()
116 117 118 119 120 121
                return "Unable to connect to c-channel after 5 seconds"
            # try to connect, and if we can't wait a short while
            try:
                self.cc_session = ISC.CC.Session(self.c_channel_port)
            except ISC.CC.session.SessionError:
                time.sleep(0.1)
Shane Kerr's avatar
Shane Kerr committed
122
        self.cc_session.group_subscribe("Boss", "boss")
123 124 125 126 127

        # start the configuration manager
        if self.verbose:
            sys.stdout.write("Starting bind-cfgd\n")
        try:
128
            bind_cfgd = ProcessInfo("bind-cfgd", "bind-cfgd")
129
        except Exception as e:
130
            c_channel.process.kill()
131
            return "Unable to start bind-cfgd; " + str(e)
132 133
        self.processes[bind_cfgd.pid] = bind_cfgd
        if self.verbose:
134
            sys.stdout.write("Started bind-cfgd (PID %d)\n" % bind_cfgd.pid)
135 136 137

        # start the parking lot
        # XXX: this must be read from the configuration manager in the future
138
        # XXX: we hardcode port 5300
139
        if self.verbose:
140
            sys.stdout.write("Starting parkinglot on port 5300\n")
141
        try:
142
            parkinglot = ProcessInfo("parkinglot", ["parkinglot", "-p", "5300"])
143
        except Exception as e:
144 145
            c_channel.kill()
            bind_cfgd.kill()
146
            return "Unable to start parkinglot; " + str(e)
147 148
        self.processes[parkinglot.pid] = parkinglot
        if self.verbose:
149
            sys.stdout.write("Started parkinglot (PID %d)\n" % parkinglot.pid)
150

151
        self.runnable = True
152 153
        return None

154 155
    def stop_all_processes(self):
        """Stop all processes."""
Shane Kerr's avatar
Shane Kerr committed
156
        cmd = { "command": "shutdown" }
157 158
        self.cc_session.group_sendmsg(cmd, "Boss", "ConfigManager")
        self.cc_session.group_sendmsg(cmd, "Boss", "ParkingLot")
159

160 161 162 163 164 165 166 167 168 169
    def stop_process(self, process):
        """Stop the given process, friendly-like."""
        # XXX nothing yet
        pass

    def shutdown(self):
        """Stop the BoB instance."""
        if self.verbose:
            sys.stdout.write("Stopping the server.\n")
        # first try using the BIND 10 request to stop
170 171 172 173
        try:
            self.stop_all_processes()
        except:
            pass
174 175
        # XXX: some delay probably useful... how much is uncertain
        time.sleep(0.1)  
176
        # next try sending a SIGTERM
177
        processes_to_stop = list(self.processes.values())
178
        unstopped_processes = []
179
        for proc_info in processes_to_stop:
180
            if self.verbose:
181 182
                sys.stdout.write("Sending SIGTERM to %s (PID %d).\n" % 
                                 (proc_info.name, proc_info.pid))
183
            try:
184
                proc_info.process.terminate()
185 186 187 188
            except OSError as o:
                # ignore these (usually ESRCH because the child
                # finally exited)
                pass
189 190
        # XXX: some delay probably useful... how much is uncertain
        time.sleep(0.1)  
191 192
        for proc_info in processes_to_stop:
            (pid, exit_status) = os.waitpid(proc_info.pid, os.WNOHANG)
193
            if pid == 0:
194
                unstopped_processes.append(proc_info)
195 196
        # finally, send a SIGKILL (unmaskable termination)
        processes_to_stop = unstopped_processes
197
        for proc_info in processes_to_stop:
198
            if self.verbose:
199 200
                sys.stdout.write("Sending SIGKILL to %s (PID %d).\n" % 
                                 (proc_info.name, proc_info.pid))
201
            try:
202
                proc_info.process.kill()
203 204 205 206 207 208 209 210 211 212 213
            except OSError as o:
                # ignore these (usually ESRCH because the child
                # finally exited)
                pass
        if self.verbose:
            sys.stdout.write("All processes ended, server done.\n")

    def reap(self, pid, exit_status):
        """The process specified by pid has exited with the value
        exit_status, so perform any action necessary (cleanup,
        restart, and so on).
214 215 216
  
        Returns True if everything is okay, or False if a fatal error
        has been detected and the program should exit.
217
        """
218 219 220
        if not pid in self.processes:
            sys.stdout.write("Unknown child pid %d exited.\n" % pid)
            return
221 222
        proc_info = self.processes.pop(pid)
        self.dead_processes[proc_info.pid] = proc_info
223
        if self.verbose:
224 225 226
            sys.stdout.write("Process %s (PID %d) died.\n" % 
                             (proc_info.name, proc_info.pid))
        if proc_info.name == "msgq":
227 228
            if self.verbose:
                sys.stdout.write("The msgq process died, shutting down.\n")
229 230 231 232 233
            self.runnable = False

    def recv_and_process_cc_msg(self):
        """Receive and process the next message on the c-channel,
        if any."""
Shane Kerr's avatar
Shane Kerr committed
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
        # XXX: this needs to be made more robust for handling
        #      badly formatted messages
        msg, data = self.cc_session.group_recvmsg(False)
        if msg is None:
            return
        msg_from = data.get('from', '')
        if (type(msg) is dict) and (type(data) is dict):
            if "command" in msg:
                cmd = msg['command']
                if (cmd[0] == "boss") and (cmd[1] == "shutdown"):
                    if self.verbose:
                        sys.stdout.write("Shutdown command received\n")
                    self.runnable = False
                else:
                    if self.verbose:
                        sys.stdout.write("Unknown command %s\n" % str(cmd))
            else:
                if self.verbose:
                    del data['msg']
                    sys.stdout.write("Unknown message received\n")
                    sys.stdout.write(pprint.pformat(data) + "\n")
                    sys.stdout.write(pprint.pformat(msg) + "\n")
        else:
            if self.verbose:
                sys.stdout.write("Non-dictionary message\n")
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277

    def restart_processes(self):
        """Restart any dead processes."""
        # XXX: this needs a back-off algorithm
        still_dead = {}
        for proc_info in self.dead_processes.values():
            if self.verbose:
                sys.stdout.write("Resurrecting dead %s process...\n" % 
                                 proc_info.name)
            try:
                proc_info.respawn()
                self.processes[proc_info.pid] = proc_info
                if self.verbose:
                    sys.stdout.write("Resurrected %s (PID %d)\n" %
                                     (proc_info.name, proc_info.pid))
            except:
                still_dead[proc_info.pid] = proc_info
        # remember any processes that refuse to be resurrected
        self.dead_processes = still_dead
278 279 280 281

if __name__ == "__main__":
    def reaper(signal_number, stack_frame):
        """A child process has died (SIGCHLD received)."""
282 283 284 285
        # don't do anything... 
        # the Python signal handler has been set up to write
        # down a pipe, waking up our select() bit
        pass
286
                   
287 288 289 290 291 292 293 294
    def get_signame(signal_number):
        """Return the symbolic name for a signal."""
        for sig in dir(signal):
            if sig.startswith("SIG") and sig[3].isalnum():
                if getattr(signal, sig) == signal_number:
                    return sig
        return "Unknown signal %d" % signal_number

295
    # XXX: perhaps register atexit() function and invoke that instead
296 297 298 299 300 301
    def fatal_signal(signal_number, stack_frame):
        """We need to exit (SIGINT or SIGTERM received)."""
        global options
        if options.verbose:
            sys.stdout.write("Received %s.\n" % get_signame(signal_number))
        signal.signal(signal.SIGCHLD, signal.SIG_DFL)
302
        boss_of_bind.runnable = False
303 304

    def check_port(option, opt_str, value, parser):
305 306
        """Function to insure that the port we are passed is actually 
        a valid port number. Used by OptionParser() on startup."""
307 308 309 310
        if not re.match('^(6553[0-5]|655[0-2]\d|65[0-4]\d\d|6[0-4]\d{3}|[1-5]\d{4}|[1-9]\d{0,3}|0)$', value):
            raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
        parser.values.msgq_port = value

311
    # Parse any command-line options.
312 313 314 315 316 317 318
    parser = OptionParser(version=__version__)
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
                      help="display more about what is going on")
    parser.add_option("-m", "--msgq-port", dest="msgq_port", type="string",
                      action="callback", callback=check_port, default="9912",
                      help="port the msgq daemon will use")
    (options, args) = parser.parse_args()
319 320

    # Announce startup.
321 322 323 324 325 326 327
    if options.verbose:
        sys.stdout.write("BIND 10 %s\n" % __version__)

    # TODO: set process name, perhaps by:
    #       http://code.google.com/p/procname/
    #       http://github.com/lericson/procname/

328 329 330 331
    # Create wakeup pipe for signal handlers
    wakeup_pipe = os.pipe()
    signal.set_wakeup_fd(wakeup_pipe[1])

332 333
    # Set signal handlers for catching child termination, as well
    # as our own demise.
334 335 336 337 338
    signal.signal(signal.SIGCHLD, reaper)
    signal.siginterrupt(signal.SIGCHLD, False)
    signal.signal(signal.SIGINT, fatal_signal)
    signal.signal(signal.SIGTERM, fatal_signal)

339
    # Go bob!
340
    boss_of_bind = BoB(int(options.msgq_port), options.verbose)
341 342 343 344 345
    startup_result = boss_of_bind.startup()
    if startup_result:
        sys.stderr.write("Error on startup: %s\n" % startup_result)
        sys.exit(1)

346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
    # In our main loop, we check for dead processes or messages 
    # on the c-channel.
    event_poller = select.poll()
    wakeup_fd = wakeup_pipe[0]
    event_poller.register(wakeup_fd, select.POLLIN)
    cc_fd = boss_of_bind.cc_session._socket.fileno()
    event_poller.register(cc_fd, select.POLLIN)
    while boss_of_bind.runnable:
        # XXX: get time for next restart for poll

        # poll() can raise EINTR when a signal arrives, 
        # even if they are resumable, so we have to catch
        # the exception
        try:
            events = event_poller.poll()
        except select.error as err:
            if err.args[0] == errno.EINTR:
                events = []
            else:
                sys.stderr.write("Error with poll(); %s\n" % err)
                break

        for (fd, event) in events:
            if fd == cc_fd:
                boss_of_bind.recv_and_process_cc_msg()
            elif fd == wakeup_fd:
                os.read(wakeup_fd, 32)

374 375 376 377 378 379 380 381 382 383 384
        # clean up any processes that exited
        while True:
            try:
                (pid, exit_status) = os.waitpid(-1, os.WNOHANG)
            except OSError as o:
                if o.errno == errno.ECHILD: break
                # XXX: should be impossible to get any other error here
                raise
            if pid == 0: break
            boss_of_bind.reap(pid, exit_status)

385
        boss_of_bind.restart_processes()
386

387 388 389
    # shutdown
    signal.signal(signal.SIGCHLD, signal.SIG_DFL)
    boss_of_bind.shutdown()