msgq.py.in 20.1 KB
Newer Older
1
2
#!@PYTHON@

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright (C) 2010  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

18
19
import sys; sys.path.append ('@@PYTHONPATH@@')

Michael Graff's avatar
Michael Graff committed
20
"""This code implements the msgq daemon."""
21
22
23
24
25
26

import subprocess
import signal
import os
import socket
import sys
27
import struct
28
29
30
import errno
import time
import select
31
import random
32
from optparse import OptionParser, OptionValueError
33
import isc.util.process
34

35
import isc.cc
36

37
isc.util.process.rename()
Michal Vaner's avatar
Michal Vaner committed
38

39
# This is the version that gets displayed to the user.
40
41
# The VERSION string consists of the module name, the module version
# number, and the overall BIND 10 version number (set in configure.ac).
42
VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
43

44
45
46
47
48
49
50
51
52
53
class MsgQReceiveError(Exception): pass

class SubscriptionManager:
    def __init__(self):
        self.subscriptions = {}

    def subscribe(self, group, instance, socket):
        """Add a subscription."""
        target = ( group, instance )
        if target in self.subscriptions:
54
            print("[b10-msgq] Appending to existing target")
Michael Graff's avatar
Michael Graff committed
55
56
            if socket not in self.subscriptions[target]:
                self.subscriptions[target].append(socket)
57
        else:
58
            print("[b10-msgq] Creating new target")
59
60
61
62
63
64
            self.subscriptions[target] = [ socket ]

    def unsubscribe(self, group, instance, socket):
        """Remove the socket from the one specific subscription."""
        target = ( group, instance )
        if target in self.subscriptions:
Michael Graff's avatar
Michael Graff committed
65
            if socket in self.subscriptions[target]:
66
67
68
69
70
                self.subscriptions[target].remove(socket)

    def unsubscribe_all(self, socket):
        """Remove the socket from all subscriptions."""
        for socklist in self.subscriptions.values():
Michael Graff's avatar
Michael Graff committed
71
            if socket in socklist:
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
                socklist.remove(socket)

    def find_sub(self, group, instance):
        """Return an array of sockets which want this specific group,
        instance."""
        target = (group, instance)
        if target in self.subscriptions:
            return self.subscriptions[target]
        else:
            return []

    def find(self, group, instance):
        """Return an array of sockets who should get something sent to
        this group, instance pair.  This includes wildcard subscriptions."""
        target = (group, instance)
        partone = self.find_sub(group, instance)
        parttwo = self.find_sub(group, "*")
        return list(set(partone + parttwo))

91
92
class MsgQ:
    """Message Queue class."""
93
94
95
96
97
    # did we find a better way to do this?
    SOCKET_FILE = os.path.join("@localstatedir@",
                               "@PACKAGE_NAME@",
                               "msgq_socket").replace("${prefix}",
                                                      "@prefix@")
Jelte Jansen's avatar
Jelte Jansen committed
98

99
    def __init__(self, socket_file=None, verbose=False):
100
        """Initialize the MsgQ master.
Jelte Jansen's avatar
Jelte Jansen committed
101

102
103
104
105
106
107
        The socket_file specifies the path to the UNIX domain socket
        that the msgq process listens on. If it is None, the
        environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
        is not set, it will default to
        @localstatedir@/@PACKAGE_NAME@/msg_socket.
        If verbose is True, then the MsgQ reports
108
109
        what it is doing.
        """
110

111
112
113
114
115
116
117
        if socket_file is None:
            if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
                self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
            else:
                self.socket_file = self.SOCKET_FILE
        else:
            self.socket_file = socket_file
118

Michael Graff's avatar
Michael Graff committed
119
        self.verbose = verbose
120
        self.poller = None
121
        self.kqueue = None
122
123
124
        self.runnable = False
        self.listen_socket = False
        self.sockets = {}
125
126
127
        self.connection_counter = random.random()
        self.hostname = socket.gethostname()
        self.subs = SubscriptionManager()
128
        self.lnames = {}
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
129
        self.sendbuffs = {}
130
131
132

    def setup_poller(self):
        """Set up the poll thing.  Internal function."""
133
134
        try:
            self.kqueue = select.kqueue()
135
136
        except AttributeError:
            self.poller = select.poll()
Jelte Jansen's avatar
Jelte Jansen committed
137

138
139
140
141
142
143
144
145
146
147
148
149
    def add_kqueue_socket(self, socket, write_filter=False):
        """Add a kquque filter for a socket.  By default the read
        filter is used; if write_filter is set to True, the write
        filter is used.  We use a boolean value instead of a specific
        filter constant, because kqueue filter values do not seem to
        be defined on some systems.  The use of boolean makes the
        interface restrictive because there are other filters, but this
        method is mostly only for our internal use, so it should be
        acceptable at least for now."""
        filter_type = select.KQ_FILTER_WRITE if write_filter else \
            select.KQ_FILTER_READ
        event = select.kevent(socket.fileno(), filter_type,
150
151
152
                              select.KQ_EV_ADD | select.KQ_EV_ENABLE)
        self.kqueue.control([event], 0)

153
154
155
156
157
158
159
160
    def delete_kqueue_socket(self, socket, write_filter=False):
        """Delete a kqueue filter for socket.  See add_kqueue_socket()
        for the semantics and notes about write_filter."""
        filter_type = select.KQ_FILTER_WRITE if write_filter else \
            select.KQ_FILTER_READ
        event = select.kevent(socket.fileno(), filter_type,
                              select.KQ_EV_DELETE)
        self.kqueue.control([event], 0)
161
162
163

    def setup_listener(self):
        """Set up the listener socket.  Internal function."""
164
165
166
167
        if self.verbose:
            sys.stdout.write("[b10-msgq] Setting up socket at %s\n" %
                             self.socket_file)

168
        self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
Jelte Jansen's avatar
Jelte Jansen committed
169

170
171
172
173
174
175
176
177
178
179
        if os.path.exists(self.socket_file):
            os.remove(self.socket_file)
        try:
            self.listen_socket.bind(self.socket_file)
            self.listen_socket.listen(1024)
        except Exception as e:
            # remove the file again if something goes wrong
            # (note this is a catch-all, but we reraise it)
            if os.path.exists(self.socket_file):
                os.remove(self.socket_file)
180
            self.listen_socket.close()
181
            raise e
182

183
184
185
186
        if self.poller:
            self.poller.register(self.listen_socket, select.POLLIN)
        else:
            self.add_kqueue_socket(self.listen_socket)
187
188

    def setup(self):
189
190
191
192
        """Configure listener socket, polling, etc.
           Raises a socket.error if the socket_file cannot be
           created.
        """
193
194
195
196
197

        self.setup_poller()
        self.setup_listener()

        if self.verbose:
198
            sys.stdout.write("[b10-msgq] Listening\n")
Jelte Jansen's avatar
Jelte Jansen committed
199

200
201
202
        self.runnable = True

    def process_accept(self):
203
        """Process an accept on the listening socket."""
204
        newsocket, ipaddr = self.listen_socket.accept()
205
206
207
        # TODO: When we have logging, we might want
        # to add a debug message here that a new connection
        # was made
208
        self.register_socket(newsocket)
209
210
211
212
213

    def register_socket(self, newsocket):
        """
        Internal function to insert a socket. Used by process_accept and some tests.
        """
214
        self.sockets[newsocket.fileno()] = newsocket
215
216
        lname = self.newlname()
        self.lnames[lname] = newsocket
217
218
219
220
221

        if self.poller:
            self.poller.register(newsocket, select.POLLIN)
        else:
            self.add_kqueue_socket(newsocket)
222
223

    def process_socket(self, fd):
224
        """Process a read on a socket."""
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
225
        if not fd in self.sockets:
226
            sys.stderr.write("[b10-msgq] Got read on Strange Socket fd %d\n" % fd)
227
            return
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
228
        sock = self.sockets[fd]
229
#        sys.stderr.write("[b10-msgq] Got read on fd %d\n" %fd)
230
231
        self.process_packet(fd, sock)

232
233
    def kill_socket(self, fd, sock):
        """Fully close down the socket."""
234
235
        if self.poller:
            self.poller.unregister(sock)
236
        self.subs.unsubscribe_all(sock)
237
238
        lname = [ k for k, v in self.lnames.items() if v == sock ][0]
        del self.lnames[lname]
239
        sock.close()
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
240
241
242
        del self.sockets[fd]
        if fd in self.sendbuffs:
            del self.sendbuffs[fd]
243
        sys.stderr.write("[b10-msgq] Closing socket fd %d\n" % fd)
244
245
246
247
248
249

    def getbytes(self, fd, sock, length):
        """Get exactly the requested bytes, or raise an exception if
           EOF."""
        received = b''
        while len(received) < length:
Jelte Jansen's avatar
Jelte Jansen committed
250
251
252
253
            try:
                data = sock.recv(length - len(received))
            except socket.error:
                raise MsgQReceiveError(socket.error)
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
            if len(data) == 0:
                raise MsgQReceiveError("EOF")
            received += data
        return received

    def read_packet(self, fd, sock):
        """Read a correctly formatted packet.  Will raise exceptions if
           something fails."""
        lengths = self.getbytes(fd, sock, 6)
        overall_length, routing_length = struct.unpack(">IH", lengths)
        if overall_length < 2:
            raise MsgQReceiveError("overall_length < 2")
        overall_length -= 2
        if routing_length > overall_length:
            raise MsgQReceiveError("routing_length > overall_length")
        if routing_length == 0:
            raise MsgQReceiveError("routing_length == 0")
        data_length = overall_length - routing_length
        # probably need to sanity check lengths here...
        routing = self.getbytes(fd, sock, routing_length)
        if data_length > 0:
            data = self.getbytes(fd, sock, data_length)
        else:
            data = None
        return (routing, data)

280
    def process_packet(self, fd, sock):
281
282
283
284
285
        """Process one packet."""
        try:
            routing, data = self.read_packet(fd, sock)
        except MsgQReceiveError as err:
            self.kill_socket(fd, sock)
286
            sys.stderr.write("[b10-msgq] Receive error: %s\n" % err)
287
288
289
            return

        try:
290
            routingmsg = isc.cc.message.from_wire(routing)
291
292
        except DecodeError as err:
            self.kill_socket(fd, sock)
293
            sys.stderr.write("[b10-msgq] Routing decode error: %s\n" % err)
294
            return
295
296
297
298
299

        self.process_command(fd, sock, routingmsg, data)

    def process_command(self, fd, sock, routing, data):
        """Process a single command.  This will split out into one of the
Jeremy C. Reed's avatar
Jeremy C. Reed committed
300
           other functions."""
301
302
303
        # TODO: A print statement got removed here (one that prints the
        # routing envelope). When we have logging with multiple levels,
        # we might want to re-add that on a high debug verbosity.
304
        cmd = routing["type"]
305
        if cmd == 'send':
306
            self.process_command_send(sock, routing, data)
307
308
309
310
311
312
        elif cmd == 'subscribe':
            self.process_command_subscribe(sock, routing, data)
        elif cmd == 'unsubscribe':
            self.process_command_unsubscribe(sock, routing, data)
        elif cmd == 'getlname':
            self.process_command_getlname(sock, routing, data)
313
314
315
        elif cmd == 'ping':
            # Command for testing purposes
            self.process_command_ping(sock, routing, data)
316
        else:
317
            sys.stderr.write("[b10-msgq] Invalid command: %s\n" % cmd)
318

319
    def preparemsg(self, env, msg = None):
320
        if type(env) == dict:
321
            env = isc.cc.message.to_wire(env)
322
        if type(msg) == dict:
323
            msg = isc.cc.message.to_wire(msg)
324
325
326
        length = 2 + len(env);
        if msg:
            length += len(msg)
327
328
        ret = struct.pack("!IH", length, len(env))
        ret += env
329
        if msg:
330
331
332
333
            ret += msg
        return ret

    def sendmsg(self, sock, env, msg = None):
334
        self.send_prepared_msg(sock, self.preparemsg(env, msg))
335

336
337
    def __send_data(self, sock, data):
        try:
338
339
340
341
            # We set the socket nonblocking, MSG_DONTWAIT doesn't exist
            # on some OSes
            sock.setblocking(0)
            return sock.send(data)
342
343
344
345
346
        except socket.error as e:
            if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
                return 0
            else:
                raise e
347
348
349
        finally:
            # And set it back again
            sock.setblocking(1)
350

351
    def send_prepared_msg(self, sock, msg):
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
352
353
354
355
356
        # Try to send the data, but only if there's nothing waiting
        fileno = sock.fileno()
        if fileno in self.sendbuffs:
            amount_sent = 0
        else:
Jelte Jansen's avatar
Jelte Jansen committed
357
358
359
360
361
362
            try:
                amount_sent = self.__send_data(sock, msg)
            except socket.error as sockerr:
                # in the case the other side seems gone, kill the socket
                # and drop the send action
                if sockerr.errno == errno.EPIPE:
Jelte Jansen's avatar
Jelte Jansen committed
363
364
                    print("[b10-msgq] SIGPIPE on send, dropping message " +
                          "and closing connection")
Jelte Jansen's avatar
Jelte Jansen committed
365
366
367
368
                    self.kill_socket(fileno, sock)
                    return
                else:
                    raise
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
369
370
371
372
373
374
375
376
377
378

        # Still something to send
        if amount_sent < len(msg):
            now = time.clock()
            # Append it to buffer (but check the data go away)
            if fileno in self.sendbuffs:
                (last_sent, buff) = self.sendbuffs[fileno]
                if now - last_sent > 0.1:
                    self.kill_socket(fileno, sock)
                    return
379
                buff += msg
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
380
            else:
381
                buff = msg[amount_sent:]
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
382
                last_sent = now
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
383
                if self.poller:
384
385
                    self.poller.register(fileno, select.POLLIN |
                        select.POLLOUT)
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
386
                else:
387
                    self.add_kqueue_socket(sock, True)
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
388
389
            self.sendbuffs[fileno] = (last_sent, buff)

390
    def __process_write(self, fileno):
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
391
392
393
        # Try to send some data from the buffer
        (_, msg) = self.sendbuffs[fileno]
        sock = self.sockets[fileno]
394
        amount_sent = self.__send_data(sock, msg)
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
395
396
397
398
399
400
401
        # Keep the rest
        msg = msg[amount_sent:]
        if len(msg) == 0:
            # If there's no more, stop requesting for write availability
            if self.poller:
                self.poller.register(fileno, select.POLLIN)
            else:
402
                self.delete_kqueue_socket(sock, True)
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
403
404
405
            del self.sendbuffs[fileno]
        else:
            self.sendbuffs[fileno] = (time.clock(), msg)
406
407

    def newlname(self):
Jeremy C. Reed's avatar
Jeremy C. Reed committed
408
        """Generate a unique connection identifier for this socket.
409
410
411
412
        This is done by using an increasing counter and the current
        time."""
        self.connection_counter += 1
        return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname)
413

414
415
416
    def process_command_ping(self, sock, routing, data):
        self.sendmsg(sock, { "type" : "pong" }, data)

417
    def process_command_getlname(self, sock, routing, data):
418
419
        lname = [ k for k, v in self.lnames.items() if v == sock ][0]
        self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname })
420
421
422
423

    def process_command_send(self, sock, routing, data):
        group = routing["group"]
        instance = routing["instance"]
424
        to = routing["to"]
425
426
        if group == None or instance == None:
            return  # ignore invalid packets entirely
427
428
429
430

        if to == "*":
            sockets = self.subs.find(group, instance)
        else:
431
432
433
434
            if to in self.lnames:
                sockets = [ self.lnames[to] ]
            else:
                return # recipient doesn't exist
435
436
437
438
439
440
441
442
443
444
445

        msg = self.preparemsg(routing, data)

        if sock in sockets:
            sockets.remove(sock)
        for socket in sockets:
            self.send_prepared_msg(socket, msg)

    def process_command_subscribe(self, sock, routing, data):
        group = routing["group"]
        instance = routing["instance"]
446
        if group == None or instance == None:
447
448
449
450
451
452
453
454
455
            return  # ignore invalid packets entirely
        self.subs.subscribe(group, instance, sock)

    def process_command_unsubscribe(self, sock, routing, data):
        group = routing["group"]
        instance = routing["instance"]
        if group == None or instance == None:
            return  # ignore invalid packets entirely
        self.subs.unsubscribe(group, instance, sock)
456
457
458

    def run(self):
        """Process messages.  Forever.  Mostly."""
Jelte Jansen's avatar
Jelte Jansen committed
459

460
461
462
463
        if self.poller:
            self.run_poller()
        else:
            self.run_kqueue()
Jelte Jansen's avatar
Jelte Jansen committed
464

465
    def run_poller(self):
466
467
468
469
470
471
472
        while True:
            try:
                events = self.poller.poll()
            except select.error as err:
                if err.args[0] == errno.EINTR:
                    events = []
                else:
473
                    sys.stderr.write("[b10-msgq] Error with poll(): %s\n" % err)
474
475
476
477
478
                    break
            for (fd, event) in events:
                if fd == self.listen_socket.fileno():
                    self.process_accept()
                else:
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
479
                    if event & select.POLLOUT:
480
                        self.__process_write(fd)
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
481
482
                    if event & select.POLLIN:
                        self.process_socket(fd)
483

484
485
486
487
488
    def run_kqueue(self):
        while True:
            events = self.kqueue.control(None, 10)
            if not events:
                raise RuntimeError('serve: kqueue returned no events')
chenzhengzhang's avatar
chenzhengzhang committed
489

490
491
492
493
            for event in events:
                if event.ident == self.listen_socket.fileno():
                    self.process_accept()
                else:
494
495
496
497
                    if event.filter == select.KQ_FILTER_WRITE:
                        self.__process_write(event.ident)
                    if event.filter == select.KQ_FILTER_READ and \
                            event.data > 0:
498
499
                        self.process_socket(event.ident)
                    elif event.flags & select.KQ_EV_EOF:
500
501
                        self.kill_socket(event.ident,
                                         self.sockets[event.ident])
502

503
504
505
    def shutdown(self):
        """Stop the MsgQ master."""
        if self.verbose:
506
            sys.stdout.write("[b10-msgq] Stopping the server.\n")
507
        self.listen_socket.close()
508
509
        if os.path.exists(self.socket_file):
            os.remove(self.socket_file)
510

511
512
513
514
515
516
517
518
519
# can signal handling and calling a destructor be done without a
# global variable?
msgq = None

def signal_handler(signal, frame):
    if msgq:
        msgq.shutdown()
    sys.exit(0)

520
521
if __name__ == "__main__":
    def check_port(option, opt_str, value, parser):
Jelte Jansen's avatar
Jelte Jansen committed
522
        """Function to insure that the port we are passed is actually
523
524
525
526
527
528
529
        a valid port number. Used by OptionParser() on startup."""
        intval = int(value)
        if (intval < 0) or (intval > 65535):
            raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
        parser.values.msgq_port = intval

    # Parse any command-line options.
530
    parser = OptionParser(version=VERSION)
531
532
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
                      help="display more about what is going on")
533
534
535
    parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
                      type="string", default=None,
                      help="UNIX domain socket file the msgq daemon will use")
536
537
    (options, args) = parser.parse_args()

538
539
    signal.signal(signal.SIGTERM, signal_handler)

540
541
    # Announce startup.
    if options.verbose:
542
        sys.stdout.write("[b10-msgq] %s\n" % VERSION)
543

544
    msgq = MsgQ(options.msgq_socket_file, options.verbose)
545
546
547

    setup_result = msgq.setup()
    if setup_result:
548
        sys.stderr.write("[b10-msgq] Error on startup: %s\n" % setup_result)
549
550
        sys.exit(1)

551
552
553
554
    try:
        msgq.run()
    except KeyboardInterrupt:
        pass
555
556

    msgq.shutdown()