bind10.py 14.2 KB
Newer Older
1
2
3
4
"""\
This file implements the Boss of Bind (BoB, or bob) program.

It's purpose is to start up the BIND 10 system, and then manage the
Shane Kerr's avatar
Shane Kerr committed
5
6
processes, by starting and stopping processes, plus restarting
processes that exit.
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

To start the system, it first runs the c-channel program (msgq), then
connects to that. It then runs the configuration manager, and reads
its own configuration. Then it proceeds to starting other modules.

The Python subprocess module is used for starting processes, but
because this is not efficient for managing groups of processes,
SIGCHLD signals are caught and processed using the signal module.

Most of the logic is contained in the BoB class. However, since Python
requires that signal processing happen in the main thread, we do
signal handling outside of that class, in the code running for
__main__.
"""

22
23
# TODO: start up statistics thingy

24
25
26
27
28
29
30
import subprocess
import signal
import os
import sys
import re
import errno
import time
31
import select
Shane Kerr's avatar
Shane Kerr committed
32
import pprint
33
34
from optparse import OptionParser, OptionValueError

35
import ISC.CC
36
37

# This is the version that gets displayed to the user.
38
__version__ = "v20091030 (Paving the DNS Parking Lot)"
39

40
41
42
# Nothing at all to do with the 1990-12-10 article here:
# http://www.subgenius.com/subg-digest/v2/0056.html

43
44
45
46
47
48
class ProcessInfo:
    """Information about a process"""

    dev_null = open("/dev/null", "w")

    def _spawn(self):
49
50
51
52
        if self.dev_null_stdout:
            spawn_stdout = self.dev_null
        else:
            spawn_stdout = None
53
54
        spawn_env = self.env
        spawn_env['PATH'] = os.environ['PATH']
55
56
        self.process = subprocess.Popen(self.args,
                                        stdin=subprocess.PIPE,
57
58
                                        stdout=spawn_stdout,
                                        stderr=spawn_stdout,
59
                                        close_fds=True,
60
                                        env=spawn_env,)
61
62
        self.pid = self.process.pid

63
    def __init__(self, name, args, env={}, dev_null_stdout=False):
64
65
66
        self.name = name 
        self.args = args
        self.env = env
67
        self.dev_null_stdout = dev_null_stdout
68
69
70
71
72
        self._spawn()

    def respawn(self):
        self._spawn()

73
74
class BoB:
    """Boss of BIND class."""
75
    def __init__(self, c_channel_port=9912, verbose=False):
76
77
78
79
80
81
82
        """Initialize the Boss of BIND. This is a singleton (only one
        can run).
        
        The c_channel_port specifies the TCP/IP port that the msgq
        process listens on. If verbose is True, then the boss reports
        what it is doing.
        """
83
        self.verbose = True
84
        self.c_channel_port = c_channel_port
85
        self.cc_session = None
86
87
        self.processes = {}
        self.dead_processes = {}
88
        self.runnable = False
89
90
91
92
93
94
95

    def startup(self):
        """Start the BoB instance.
 
        Returns None if successful, otherwise an string describing the
        problem.
        """
96
        # start the c-channel daemon
97
        if self.verbose:
98
99
            sys.stdout.write("Starting msgq using port %d\n" % 
                             self.c_channel_port)
100
        c_channel_env = { "ISC_MSGQ_PORT": str(self.c_channel_port), }
101
        try:
102
            c_channel = ProcessInfo("msgq", "msgq", c_channel_env, True)
103
104
        except Exception as e:
            return "Unable to start msgq; " + str(e)
105
106
        self.processes[c_channel.pid] = c_channel
        if self.verbose:
107
            sys.stdout.write("Started msgq (PID %d)\n" % c_channel.pid)
108
109
110
111
112
113
114
115
116
117
118
119
120

        # now connect to the c-channel
        cc_connect_start = time.time()
        while self.cc_session is None:
            # if we have been trying for "a while" give up
            if (time.time() - cc_connect_start) > 5:
                c_channel.kill()
                return "Unable to connect to c-channel after 5 seconds"
            # try to connect, and if we can't wait a short while
            try:
                self.cc_session = ISC.CC.Session(self.c_channel_port)
            except ISC.CC.session.SessionError:
                time.sleep(0.1)
Shane Kerr's avatar
Shane Kerr committed
121
        self.cc_session.group_subscribe("Boss", "boss")
122
123
124
125
126

        # start the configuration manager
        if self.verbose:
            sys.stdout.write("Starting bind-cfgd\n")
        try:
127
            bind_cfgd = ProcessInfo("bind-cfgd", "bind-cfgd")
128
        except Exception as e:
129
            c_channel.process.kill()
130
            return "Unable to start bind-cfgd; " + str(e)
131
132
        self.processes[bind_cfgd.pid] = bind_cfgd
        if self.verbose:
133
            sys.stdout.write("Started bind-cfgd (PID %d)\n" % bind_cfgd.pid)
134
135
136

        # start the parking lot
        # XXX: this must be read from the configuration manager in the future
137
        # XXX: we hardcode port 5300
138
        if self.verbose:
139
            sys.stdout.write("Starting parkinglot on port 5300\n")
140
        try:
141
            parkinglot = ProcessInfo("parkinglot", ["parkinglot", "-p", "5300"])
142
        except Exception as e:
143
144
            c_channel.kill()
            bind_cfgd.kill()
145
            return "Unable to start parkinglot; " + str(e)
146
147
        self.processes[parkinglot.pid] = parkinglot
        if self.verbose:
148
            sys.stdout.write("Started parkinglot (PID %d)\n" % parkinglot.pid)
149

150
        self.runnable = True
151
152
        return None

153
154
    def stop_all_processes(self):
        """Stop all processes."""
Shane Kerr's avatar
Shane Kerr committed
155
156
        cmd = { "command": "shutdown" }
        self.cc_session.group_sendmsg(cmd, "Boss", "*")
157

158
159
160
161
162
163
164
165
166
167
    def stop_process(self, process):
        """Stop the given process, friendly-like."""
        # XXX nothing yet
        pass

    def shutdown(self):
        """Stop the BoB instance."""
        if self.verbose:
            sys.stdout.write("Stopping the server.\n")
        # first try using the BIND 10 request to stop
168
169
170
171
        try:
            self.stop_all_processes()
        except:
            pass
172
173
        # XXX: some delay probably useful... how much is uncertain
        time.sleep(0.1)  
174
        # next try sending a SIGTERM
175
        processes_to_stop = list(self.processes.values())
176
        unstopped_processes = []
177
        for proc_info in processes_to_stop:
178
            if self.verbose:
179
180
                sys.stdout.write("Sending SIGTERM to %s (PID %d).\n" % 
                                 (proc_info.name, proc_info.pid))
181
            try:
182
                proc_info.process.terminate()
183
184
185
186
            except OSError as o:
                # ignore these (usually ESRCH because the child
                # finally exited)
                pass
187
188
        # XXX: some delay probably useful... how much is uncertain
        time.sleep(0.1)  
189
190
        for proc_info in processes_to_stop:
            (pid, exit_status) = os.waitpid(proc_info.pid, os.WNOHANG)
191
            if pid == 0:
192
                unstopped_processes.append(proc_info)
193
194
        # finally, send a SIGKILL (unmaskable termination)
        processes_to_stop = unstopped_processes
195
        for proc_info in processes_to_stop:
196
            if self.verbose:
197
198
                sys.stdout.write("Sending SIGKILL to %s (PID %d).\n" % 
                                 (proc_info.name, proc_info.pid))
199
            try:
200
                proc_info.process.kill()
201
202
203
204
205
206
207
208
209
210
211
            except OSError as o:
                # ignore these (usually ESRCH because the child
                # finally exited)
                pass
        if self.verbose:
            sys.stdout.write("All processes ended, server done.\n")

    def reap(self, pid, exit_status):
        """The process specified by pid has exited with the value
        exit_status, so perform any action necessary (cleanup,
        restart, and so on).
212
213
214
  
        Returns True if everything is okay, or False if a fatal error
        has been detected and the program should exit.
215
        """
216
217
218
        if not pid in self.processes:
            sys.stdout.write("Unknown child pid %d exited.\n" % pid)
            return
219
220
        proc_info = self.processes.pop(pid)
        self.dead_processes[proc_info.pid] = proc_info
221
        if self.verbose:
222
223
224
            sys.stdout.write("Process %s (PID %d) died.\n" % 
                             (proc_info.name, proc_info.pid))
        if proc_info.name == "msgq":
225
226
            if self.verbose:
                sys.stdout.write("The msgq process died, shutting down.\n")
227
228
229
230
231
            self.runnable = False

    def recv_and_process_cc_msg(self):
        """Receive and process the next message on the c-channel,
        if any."""
Shane Kerr's avatar
Shane Kerr committed
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
        # XXX: this needs to be made more robust for handling
        #      badly formatted messages
        msg, data = self.cc_session.group_recvmsg(False)
        if msg is None:
            return
        msg_from = data.get('from', '')
        if (type(msg) is dict) and (type(data) is dict):
            if "command" in msg:
                cmd = msg['command']
                if (cmd[0] == "boss") and (cmd[1] == "shutdown"):
                    if self.verbose:
                        sys.stdout.write("Shutdown command received\n")
                    self.runnable = False
                else:
                    if self.verbose:
                        sys.stdout.write("Unknown command %s\n" % str(cmd))
            else:
                if self.verbose:
                    del data['msg']
                    sys.stdout.write("Unknown message received\n")
                    sys.stdout.write(pprint.pformat(data) + "\n")
                    sys.stdout.write(pprint.pformat(msg) + "\n")
        else:
            if self.verbose:
                sys.stdout.write("Non-dictionary message\n")
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275

    def restart_processes(self):
        """Restart any dead processes."""
        # XXX: this needs a back-off algorithm
        still_dead = {}
        for proc_info in self.dead_processes.values():
            if self.verbose:
                sys.stdout.write("Resurrecting dead %s process...\n" % 
                                 proc_info.name)
            try:
                proc_info.respawn()
                self.processes[proc_info.pid] = proc_info
                if self.verbose:
                    sys.stdout.write("Resurrected %s (PID %d)\n" %
                                     (proc_info.name, proc_info.pid))
            except:
                still_dead[proc_info.pid] = proc_info
        # remember any processes that refuse to be resurrected
        self.dead_processes = still_dead
276
277
278
279
280
281
282
283
284
285
286
287

if __name__ == "__main__":
    def reaper(signal_number, stack_frame):
        """A child process has died (SIGCHLD received)."""
        global boss_of_bind
        while True:
            try:
                (pid, exit_status) = os.waitpid(-1, os.WNOHANG)
            except OSError as o:
                if o.errno == errno.ECHILD: break
                raise
            if pid == 0: break
288
289
            if boss_of_bind:
                boss_of_bind.reap(pid, exit_status)
290
                   
291
292
293
294
295
296
297
298
    def get_signame(signal_number):
        """Return the symbolic name for a signal."""
        for sig in dir(signal):
            if sig.startswith("SIG") and sig[3].isalnum():
                if getattr(signal, sig) == signal_number:
                    return sig
        return "Unknown signal %d" % signal_number

299
    # XXX: perhaps register atexit() function and invoke that instead
300
301
302
303
304
305
    def fatal_signal(signal_number, stack_frame):
        """We need to exit (SIGINT or SIGTERM received)."""
        global options
        if options.verbose:
            sys.stdout.write("Received %s.\n" % get_signame(signal_number))
        signal.signal(signal.SIGCHLD, signal.SIG_DFL)
306
        boss_of_bind.runnable = False
307
308

    def check_port(option, opt_str, value, parser):
309
310
        """Function to insure that the port we are passed is actually 
        a valid port number. Used by OptionParser() on startup."""
311
312
313
314
        if not re.match('^(6553[0-5]|655[0-2]\d|65[0-4]\d\d|6[0-4]\d{3}|[1-5]\d{4}|[1-9]\d{0,3}|0)$', value):
            raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
        parser.values.msgq_port = value

315
    # Parse any command-line options.
316
317
318
319
320
321
322
    parser = OptionParser(version=__version__)
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
                      help="display more about what is going on")
    parser.add_option("-m", "--msgq-port", dest="msgq_port", type="string",
                      action="callback", callback=check_port, default="9912",
                      help="port the msgq daemon will use")
    (options, args) = parser.parse_args()
323
324

    # Announce startup.
325
326
327
328
329
330
331
    if options.verbose:
        sys.stdout.write("BIND 10 %s\n" % __version__)

    # TODO: set process name, perhaps by:
    #       http://code.google.com/p/procname/
    #       http://github.com/lericson/procname/

332
333
334
335
    # Create wakeup pipe for signal handlers
    wakeup_pipe = os.pipe()
    signal.set_wakeup_fd(wakeup_pipe[1])

336
337
    # Set signal handlers for catching child termination, as well
    # as our own demise.
338
339
340
341
342
    signal.signal(signal.SIGCHLD, reaper)
    signal.siginterrupt(signal.SIGCHLD, False)
    signal.signal(signal.SIGINT, fatal_signal)
    signal.signal(signal.SIGTERM, fatal_signal)

343
    # Go bob!
344
    boss_of_bind = BoB(int(options.msgq_port), options.verbose)
345
346
347
348
349
    startup_result = boss_of_bind.startup()
    if startup_result:
        sys.stderr.write("Error on startup: %s\n" % startup_result)
        sys.exit(1)

350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
    # In our main loop, we check for dead processes or messages 
    # on the c-channel.
    event_poller = select.poll()
    wakeup_fd = wakeup_pipe[0]
    event_poller.register(wakeup_fd, select.POLLIN)
    cc_fd = boss_of_bind.cc_session._socket.fileno()
    event_poller.register(cc_fd, select.POLLIN)
    while boss_of_bind.runnable:
        # XXX: get time for next restart for poll

        # poll() can raise EINTR when a signal arrives, 
        # even if they are resumable, so we have to catch
        # the exception
        try:
            events = event_poller.poll()
        except select.error as err:
            if err.args[0] == errno.EINTR:
                events = []
            else:
                sys.stderr.write("Error with poll(); %s\n" % err)
                break

        for (fd, event) in events:
            if fd == cc_fd:
                boss_of_bind.recv_and_process_cc_msg()
            elif fd == wakeup_fd:
                os.read(wakeup_fd, 32)

        boss_of_bind.restart_processes()
379

380
381
382
    # shutdown
    signal.signal(signal.SIGCHLD, signal.SIG_DFL)
    boss_of_bind.shutdown()