bind10.py 12.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
"""\
This file implements the Boss of Bind (BoB, or bob) program.

It's purpose is to start up the BIND 10 system, and then manage the
processes, by starting, stopping, and restarting processes that exit.

To start the system, it first runs the c-channel program (msgq), then
connects to that. It then runs the configuration manager, and reads
its own configuration. Then it proceeds to starting other modules.

The Python subprocess module is used for starting processes, but
because this is not efficient for managing groups of processes,
SIGCHLD signals are caught and processed using the signal module.

Most of the logic is contained in the BoB class. However, since Python
requires that signal processing happen in the main thread, we do
signal handling outside of that class, in the code running for
__main__.
"""

import subprocess
import signal
import os
import sys
import re
import errno
import time
28
import select
29
30
from optparse import OptionParser, OptionValueError

31
import ISC.CC
32
33

# This is the version that gets displayed to the user.
34
__version__ = "v20091030 (Paving the DNS Parking Lot)"
35

36
37
38
# Nothing at all to do with the 1990-12-10 article here:
# http://www.subgenius.com/subg-digest/v2/0056.html

39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class ProcessInfo:
    """Information about a process"""

    dev_null = open("/dev/null", "w")

    def _spawn(self):
        self.process = subprocess.Popen(self.args,
                                        stdin=subprocess.PIPE,
                                        stdout=self.dev_null,
                                        stderr=self.dev_null,
                                        close_fds=True,
                                        env=self.env,)
        self.pid = self.process.pid

    def __init__(self, name, args, env={}):
        self.name = name 
        self.args = args
        self.env = env
        self._spawn()

    def respawn(self):
        self._spawn()

62
63
class BoB:
    """Boss of BIND class."""
64
    def __init__(self, c_channel_port=9912, verbose=False):
65
66
67
68
69
70
71
        """Initialize the Boss of BIND. This is a singleton (only one
        can run).
        
        The c_channel_port specifies the TCP/IP port that the msgq
        process listens on. If verbose is True, then the boss reports
        what it is doing.
        """
72
        self.verbose = True
73
        self.c_channel_port = c_channel_port
74
        self.cc_session = None
75
76
        self.processes = {}
        self.dead_processes = {}
77
        self.runnable = False
78
79
80
81
82
83
84

    def startup(self):
        """Start the BoB instance.
 
        Returns None if successful, otherwise an string describing the
        problem.
        """
85
        # start the c-channel daemon
86
        if self.verbose:
87
88
            sys.stdout.write("Starting msgq using port %d\n" % self.c_channel_port)
        c_channel_env = { "ISC_MSGQ_PORT": str(self.c_channel_port), }
89
        try:
90
            c_channel = ProcessInfo("msgq", "msgq", c_channel_env)
91
        except:
92
93
94
            return "Unable to start msgq"
        self.processes[c_channel.pid] = c_channel
        if self.verbose:
95
            sys.stdout.write("Started msgq (PID %d)\n" % c_channel.pid)
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

        # now connect to the c-channel
        cc_connect_start = time.time()
        while self.cc_session is None:
            # if we have been trying for "a while" give up
            if (time.time() - cc_connect_start) > 5:
                c_channel.kill()
                return "Unable to connect to c-channel after 5 seconds"
            # try to connect, and if we can't wait a short while
            try:
                self.cc_session = ISC.CC.Session(self.c_channel_port)
            except ISC.CC.session.SessionError:
                time.sleep(0.1)
        self.cc_session.group_subscribe("Boss")

        # start the configuration manager
        if self.verbose:
            sys.stdout.write("Starting bind-cfgd\n")
        try:
115
            bind_cfgd = ProcessInfo("bind-cfgd", "bind-cfgd")
116
        except:
117
            c_channel.process.kill()
118
119
120
            return "Unable to start bind-cfgd"
        self.processes[bind_cfgd.pid] = bind_cfgd
        if self.verbose:
121
            sys.stdout.write("Started bind-cfgd (PID %d)\n" % bind_cfgd.pid)
122
123
124

        # start the parking lot
        # XXX: this must be read from the configuration manager in the future
125
        # XXX: we hardcode port 5300
126
        if self.verbose:
127
            sys.stdout.write("Starting parkinglot on port 5300\n")
128
        try:
129
            parkinglot = ProcessInfo("parkinglot", ["parkinglot", "-p", "5300"])
130
131
132
133
134
135
        except:
            c_channel.kill()
            bind_cfgd.kill()
            return "Unable to start parkinglot"
        self.processes[parkinglot.pid] = parkinglot
        if self.verbose:
136
            sys.stdout.write("Started parkinglot (PID %d)\n" % parkinglot.pid)
137

138
        self.runnable = True
139
140
        return None

141
142
143
144
    def stop_all_processes(self):
        """Stop all processes."""
        self.cc_session.group_sendmsg({ "shutdown": True }, "Boss")

145
146
147
148
149
150
151
152
153
154
    def stop_process(self, process):
        """Stop the given process, friendly-like."""
        # XXX nothing yet
        pass

    def shutdown(self):
        """Stop the BoB instance."""
        if self.verbose:
            sys.stdout.write("Stopping the server.\n")
        # first try using the BIND 10 request to stop
155
156
157
158
        try:
            self.stop_all_processes()
        except:
            pass
159
        time.sleep(0.1)  # XXX: some delay probably useful... how much is uncertain
160
        # next try sending a SIGTERM
161
        processes_to_stop = list(self.processes.values())
162
        unstopped_processes = []
163
        for proc_info in processes_to_stop:
164
            if self.verbose:
165
166
                sys.stdout.write("Sending SIGTERM to %s (PID %d).\n" % 
                                 (proc_info.name, proc_info.pid))
167
            try:
168
                proc_info.process.terminate()
169
170
171
172
173
            except OSError as o:
                # ignore these (usually ESRCH because the child
                # finally exited)
                pass
        time.sleep(0.1)  # XXX: some delay probably useful... how much is uncertain
174
175
        for proc_info in processes_to_stop:
            (pid, exit_status) = os.waitpid(proc_info.pid, os.WNOHANG)
176
            if pid == 0:
177
                unstopped_processes.append(proc_info)
178
179
        # finally, send a SIGKILL (unmaskable termination)
        processes_to_stop = unstopped_processes
180
        for proc_info in processes_to_stop:
181
            if self.verbose:
182
183
                sys.stdout.write("Sending SIGKILL to %s (PID %d).\n" % 
                                 (proc_info.name, proc_info.pid))
184
            try:
185
                proc_info.process.kill()
186
187
188
189
190
191
192
193
194
195
196
            except OSError as o:
                # ignore these (usually ESRCH because the child
                # finally exited)
                pass
        if self.verbose:
            sys.stdout.write("All processes ended, server done.\n")

    def reap(self, pid, exit_status):
        """The process specified by pid has exited with the value
        exit_status, so perform any action necessary (cleanup,
        restart, and so on).
197
198
199
  
        Returns True if everything is okay, or False if a fatal error
        has been detected and the program should exit.
200
        """
201
202
        proc_info = self.processes.pop(pid)
        self.dead_processes[proc_info.pid] = proc_info
203
        if self.verbose:
204
205
206
            sys.stdout.write("Process %s (PID %d) died.\n" % 
                             (proc_info.name, proc_info.pid))
        if proc_info.name == "msgq":
207
208
            if self.verbose:
                sys.stdout.write("The msgq process died, shutting down.\n")
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
            self.runnable = False

    def recv_and_process_cc_msg(self):
        """Receive and process the next message on the c-channel,
        if any."""
        routing, data = self.cc_session.group_recvmsg(False)
        print("routing", routing)
        print("data", data)

    def restart_processes(self):
        """Restart any dead processes."""
        # XXX: this needs a back-off algorithm
        still_dead = {}
        for proc_info in self.dead_processes.values():
            if self.verbose:
                sys.stdout.write("Resurrecting dead %s process...\n" % 
                                 proc_info.name)
            try:
                proc_info.respawn()
                self.processes[proc_info.pid] = proc_info
                if self.verbose:
                    sys.stdout.write("Resurrected %s (PID %d)\n" %
                                     (proc_info.name, proc_info.pid))
            except:
                still_dead[proc_info.pid] = proc_info
        # remember any processes that refuse to be resurrected
        self.dead_processes = still_dead
236
237
238
239
240
241
242
243
244
245
246
247

if __name__ == "__main__":
    def reaper(signal_number, stack_frame):
        """A child process has died (SIGCHLD received)."""
        global boss_of_bind
        while True:
            try:
                (pid, exit_status) = os.waitpid(-1, os.WNOHANG)
            except OSError as o:
                if o.errno == errno.ECHILD: break
                raise
            if pid == 0: break
248
249
            if boss_of_bind:
                boss_of_bind.reap(pid, exit_status)
250
                   
251
252
253
254
255
256
257
258
    def get_signame(signal_number):
        """Return the symbolic name for a signal."""
        for sig in dir(signal):
            if sig.startswith("SIG") and sig[3].isalnum():
                if getattr(signal, sig) == signal_number:
                    return sig
        return "Unknown signal %d" % signal_number

259
    # XXX: perhaps register atexit() function and invoke that instead
260
261
262
263
264
265
    def fatal_signal(signal_number, stack_frame):
        """We need to exit (SIGINT or SIGTERM received)."""
        global options
        if options.verbose:
            sys.stdout.write("Received %s.\n" % get_signame(signal_number))
        signal.signal(signal.SIGCHLD, signal.SIG_DFL)
266
        boss_of_bind.runnable = False
267
268

    def check_port(option, opt_str, value, parser):
269
270
        """Function to insure that the port we are passed is actually 
        a valid port number. Used by OptionParser() on startup."""
271
272
273
274
        if not re.match('^(6553[0-5]|655[0-2]\d|65[0-4]\d\d|6[0-4]\d{3}|[1-5]\d{4}|[1-9]\d{0,3}|0)$', value):
            raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
        parser.values.msgq_port = value

275
    # Parse any command-line options.
276
277
278
279
280
281
282
    parser = OptionParser(version=__version__)
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
                      help="display more about what is going on")
    parser.add_option("-m", "--msgq-port", dest="msgq_port", type="string",
                      action="callback", callback=check_port, default="9912",
                      help="port the msgq daemon will use")
    (options, args) = parser.parse_args()
283
284

    # Announce startup.
285
286
287
288
289
290
291
    if options.verbose:
        sys.stdout.write("BIND 10 %s\n" % __version__)

    # TODO: set process name, perhaps by:
    #       http://code.google.com/p/procname/
    #       http://github.com/lericson/procname/

292
293
294
295
    # Create wakeup pipe for signal handlers
    wakeup_pipe = os.pipe()
    signal.set_wakeup_fd(wakeup_pipe[1])

296
297
    # Set signal handlers for catching child termination, as well
    # as our own demise.
298
299
300
301
302
    signal.signal(signal.SIGCHLD, reaper)
    signal.siginterrupt(signal.SIGCHLD, False)
    signal.signal(signal.SIGINT, fatal_signal)
    signal.signal(signal.SIGTERM, fatal_signal)

303
    # Go bob!
304
    boss_of_bind = BoB(int(options.msgq_port), options.verbose)
305
306
307
308
309
    startup_result = boss_of_bind.startup()
    if startup_result:
        sys.stderr.write("Error on startup: %s\n" % startup_result)
        sys.exit(1)

310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
    # In our main loop, we check for dead processes or messages 
    # on the c-channel.
    event_poller = select.poll()
    wakeup_fd = wakeup_pipe[0]
    event_poller.register(wakeup_fd, select.POLLIN)
    cc_fd = boss_of_bind.cc_session._socket.fileno()
    event_poller.register(cc_fd, select.POLLIN)
    while boss_of_bind.runnable:
        # XXX: get time for next restart for poll

        # poll() can raise EINTR when a signal arrives, 
        # even if they are resumable, so we have to catch
        # the exception
        try:
            events = event_poller.poll()
        except select.error as err:
            if err.args[0] == errno.EINTR:
                events = []
            else:
                sys.stderr.write("Error with poll(); %s\n" % err)
                break

        for (fd, event) in events:
            if fd == cc_fd:
                boss_of_bind.recv_and_process_cc_msg()
            elif fd == wakeup_fd:
                os.read(wakeup_fd, 32)

        boss_of_bind.restart_processes()
339

340
341
342
    # shutdown
    signal.signal(signal.SIGCHLD, signal.SIG_DFL)
    boss_of_bind.shutdown()