msgq.py.in 34.9 KB
Newer Older
1
2
#!@PYTHON@

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright (C) 2010  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

18
19
import sys; sys.path.append ('@@PYTHONPATH@@')

Michael Graff's avatar
Michael Graff committed
20
"""This code implements the msgq daemon."""
21
22
23
24
25
26

import subprocess
import signal
import os
import socket
import sys
27
import struct
28
29
30
import errno
import time
import select
31
import random
32
import threading
33
import isc.config.ccsession
34
from optparse import OptionParser, OptionValueError
35
import isc.util.process
36
import isc.util.traceback_handler
37
from isc.cc.proto_defs import *
38
import isc.log
39
from isc.log_messages.msgq_messages import *
40

41
import isc.cc
42

43
isc.util.process.rename()
44

45
isc.log.init("b10-msgq", buffer=True)
46
47
# Logger that is used in the actual msgq handling - startup, shutdown and the
# poller thread.
48
logger = isc.log.Logger("msgq")
49
50
51
52
# A separate copy for the master/config thread when the poller thread runs.
# We use a separate instance, since the logger itself doesn't have to be
# thread safe.
config_logger = isc.log.Logger("msgq")
53
54
55
TRACE_START = logger.DBGLVL_START_SHUT
TRACE_BASIC = logger.DBGLVL_TRACE_BASIC
TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
Michal Vaner's avatar
Michal Vaner committed
56

57
# This is the version that gets displayed to the user.
58
59
# The VERSION string consists of the module name, the module version
# number, and the overall BIND 10 version number (set in configure.ac).
60
VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
61

62
63
64
# If B10_FROM_BUILD is set in the environment, we use data files
# from a directory relative to that, otherwise we use the ones
# installed on the system
65
66
if "B10_FROM_SOURCE" in os.environ:
    SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/msgq"
67
68
69
else:
    PREFIX = "@prefix@"
    DATAROOTDIR = "@datarootdir@"
70
71
72
    SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}",
                                                  DATAROOTDIR). \
                                                  replace("${prefix}", PREFIX)
73
74
SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"

75
76
class MsgQReceiveError(Exception): pass

77
78
class MsgQRunningError(Exception): pass

79
class MsgQCloseOnReceive(Exception):
80
    """Exception raised when reading data from a socket results in 'shutdown'.
81

82
83
84
85
86
    This happens when msgq received 0-length data.  This class holds whether
    it happens in the middle of reading (i.e. after reading some) via
    partial_read parameter, which is set to True if and only if so.
    This will be used by an upper layer catching the exception to distinguish
    the severity of the event.
87

88
    """
89
90
91
    def __init__(self, reason, partial_read):
        self.partial_read = partial_read
        self.__reason = reason
92

93
94
95
    def __str__(self):
        return self.__reason

96
class SubscriptionManager:
97
98
99
100
101
102
103
104
    def __init__(self, cfgmgr_ready):
        """
        Initialize the subscription manager.
        parameters:
        * cfgmgr_ready: A callable object run once the config manager
            subscribes. This is a hackish solution, but we can't read
            the configuration sooner.
        """
105
        self.subscriptions = {}
106
107
        self.__cfgmgr_ready = cfgmgr_ready
        self.__cfgmgr_ready_called = False
108
109
110
111
112

    def subscribe(self, group, instance, socket):
        """Add a subscription."""
        target = ( group, instance )
        if target in self.subscriptions:
113
            logger.debug(TRACE_BASIC, MSGQ_SUBS_APPEND_TARGET, group, instance)
Michael Graff's avatar
Michael Graff committed
114
115
            if socket not in self.subscriptions[target]:
                self.subscriptions[target].append(socket)
116
        else:
117
            logger.debug(TRACE_BASIC, MSGQ_SUBS_NEW_TARGET, group, instance)
118
            self.subscriptions[target] = [ socket ]
119
120
121
122
        if group == "ConfigManager" and not self.__cfgmgr_ready_called:
            logger.debug(TRACE_BASIC, MSGQ_CFGMGR_SUBSCRIBED)
            self.__cfgmgr_ready_called = True
            self.__cfgmgr_ready()
123
124
125
126
127

    def unsubscribe(self, group, instance, socket):
        """Remove the socket from the one specific subscription."""
        target = ( group, instance )
        if target in self.subscriptions:
Michael Graff's avatar
Michael Graff committed
128
            if socket in self.subscriptions[target]:
129
                self.subscriptions[target].remove(socket)
130
131
                return True
        return False
132
133
134

    def unsubscribe_all(self, socket):
        """Remove the socket from all subscriptions."""
135
136
        removed_from = []
        for subs, socklist in self.subscriptions.items():
Michael Graff's avatar
Michael Graff committed
137
            if socket in socklist:
138
                socklist.remove(socket)
139
140
                removed_from.append(subs)
        return removed_from
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155

    def find_sub(self, group, instance):
        """Return an array of sockets which want this specific group,
        instance."""
        target = (group, instance)
        if target in self.subscriptions:
            return self.subscriptions[target]
        else:
            return []

    def find(self, group, instance):
        """Return an array of sockets who should get something sent to
        this group, instance pair.  This includes wildcard subscriptions."""
        target = (group, instance)
        partone = self.find_sub(group, instance)
156
        parttwo = self.find_sub(group, CC_INSTANCE_WILDCARD)
157
158
        return list(set(partone + parttwo))

159
160
class MsgQ:
    """Message Queue class."""
161
162
163
164
165
    # did we find a better way to do this?
    SOCKET_FILE = os.path.join("@localstatedir@",
                               "@PACKAGE_NAME@",
                               "msgq_socket").replace("${prefix}",
                                                      "@prefix@")
Jelte Jansen's avatar
Jelte Jansen committed
166

167
    def __init__(self, socket_file=None, verbose=False):
168
        """Initialize the MsgQ master.
Jelte Jansen's avatar
Jelte Jansen committed
169

170
171
172
173
174
175
        The socket_file specifies the path to the UNIX domain socket
        that the msgq process listens on. If it is None, the
        environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
        is not set, it will default to
        @localstatedir@/@PACKAGE_NAME@/msg_socket.
        If verbose is True, then the MsgQ reports
176
177
        what it is doing.
        """
178

179
180
181
182
183
184
185
        if socket_file is None:
            if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
                self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
            else:
                self.socket_file = self.SOCKET_FILE
        else:
            self.socket_file = socket_file
186

Michael Graff's avatar
Michael Graff committed
187
        self.verbose = verbose
188
189
190
        self.runnable = False
        self.listen_socket = False
        self.sockets = {}
191
192
        self.connection_counter = random.random()
        self.hostname = socket.gethostname()
193
        self.subs = SubscriptionManager(self.cfgmgr_ready)
194
        self.lnames = {}
195
        self.fd_to_lname = {}
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
196
        self.sendbuffs = {}
197
        self.running = False
198
199
        self.__cfgmgr_ready = None
        self.__cfgmgr_ready_cond = threading.Condition()
200
201
202
203
204
205
206
        # A lock used when the message queue does anything more complicated.
        # It is mostly a safety measure, the threads doing so should be mostly
        # independent, and the one with config session should be read only,
        # but with threads, one never knows. We use threads for concurrency,
        # not for performance, so we use wide lock scopes to be on the safe
        # side.
        self.__lock = threading.Lock()
207
        self._session = None
208
        self.__poller_sock = None
209

210
211
212
213
214
    def members_notify(self, event, params):
        """
        Thin wrapper around ccs's notify. Send a notification about change
        of some list that can be requested by the members command.

Paul Selkirk's avatar
Paul Selkirk committed
215
        The event is one of:
216
217
218
219
220
221
222
        - connected (client connected to MsgQ)
        - disconected (client disconnected from MsgQ)
        - subscribed (client subscribed to a group)
        - unsubscribed (client unsubscribed from a group)

        The params is dict containing:
        - client: The lname of the client in question.
Paul Selkirk's avatar
Paul Selkirk committed
223
224
        - group (for 'subscribed' and 'unsubscribed' events):
          The group the client subscribed or unsubscribed from.
225

Paul Selkirk's avatar
Paul Selkirk committed
226
227
        The notification occurs after the event, so client a subscribing for
        notifications will get a notification about its own subscription, but
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
228
        will not get a notification when it unsubscribes.
229
        """
230
231
232
233
234
        # Due to the interaction between threads (and fear it might influence
        # sending stuff), we test this method in msgq_run_test, instead of
        # mocking the ccs.
        if self._session: # Don't send before we have started up
            self._session.notify('cc_members', event, params)
235

236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
    def cfgmgr_ready(self, ready=True):
        """Notify that the config manager is either subscribed, or
           that the msgq is shutting down and it won't connect, but
           anybody waiting for it should stop anyway.

           The ready parameter signifies if the config manager is subscribed.

           This method can be called multiple times, but second and any
           following call is simply ignored. This means the "abort" version
           of the call can be used on any stop unconditionally, even when
           the config manager already connected.
        """
        with self.__cfgmgr_ready_cond:
            if self.__cfgmgr_ready is not None:
                # This is a second call to this method. In that case it does
                # nothing.
                return
            self.__cfgmgr_ready = ready
            self.__cfgmgr_ready_cond.notify_all()

    def wait_cfgmgr(self):
        """Wait for msgq to subscribe.

Jelte Jansen's avatar
Jelte Jansen committed
259
           When this returns, the config manager is either subscribed, or
260
261
262
263
264
265
266
267
           msgq gave up waiting for it. Success is signified by the return
           value.
        """
        with self.__cfgmgr_ready_cond:
            # Wait until it either aborts or subscribes
            while self.__cfgmgr_ready is None:
                self.__cfgmgr_ready_cond.wait()
            return self.__cfgmgr_ready
268

269
270
    def setup_listener(self):
        """Set up the listener socket.  Internal function."""
271
        logger.debug(TRACE_BASIC, MSGQ_LISTENER_SETUP, self.socket_file)
272

273
        if os.path.exists(self.socket_file):
274
275
276
277
            # Rather than just blindly removing the socket file, attempt to
            # connect to the existing socket to see if there is an existing
            # msgq running. Only if that fails do we remove the file and
            # attempt to create a new socket.
278
            existing_msgq = None
279
            try:
280
                existing_msgq = isc.cc.Session(self.socket_file)
281
            except isc.cc.session.SessionError:
282
                existing_msgq = None
283

284
            if existing_msgq:
285
                existing_msgq.close()
286
                logger.fatal(MSGQ_ALREADY_RUNNING)
287
                raise MsgQRunningError("b10-msgq already running")
288

289
            os.remove(self.socket_file)
290

291
        try:
292
            self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
293
294
295
296
297
298
299
            self.listen_socket.bind(self.socket_file)
            self.listen_socket.listen(1024)
        except Exception as e:
            # remove the file again if something goes wrong
            # (note this is a catch-all, but we reraise it)
            if os.path.exists(self.socket_file):
                os.remove(self.socket_file)
300
            self.listen_socket.close()
301
            logger.fatal(MSGQ_LISTENER_FAILED, self.socket_file, e)
302
            raise e
303

304
305
306
307
308
309
310
311
312
    def setup_signalsock(self):
        """Create a socket pair used to signal when we want to finish.
           Using a socket is easy and thread/signal safe way to signal
           the termination.
        """
        # The __poller_sock will be the end in the poller. When it is
        # closed, we should shut down.
        (self.__poller_sock, self.__control_sock) = socket.socketpair()

313
    def setup(self):
314
315
316
317
        """Configure listener socket, polling, etc.
           Raises a socket.error if the socket_file cannot be
           created.
        """
318

319
        self.setup_signalsock()
320
321
        self.setup_listener()

322
        logger.debug(TRACE_START, MSGQ_LISTENER_STARTED);
Jelte Jansen's avatar
Jelte Jansen committed
323

324
325
326
        self.runnable = True

    def process_accept(self):
327
        """Process an accept on the listening socket."""
328
        newsocket, ipaddr = self.listen_socket.accept()
329
330
331
        # TODO: When we have logging, we might want
        # to add a debug message here that a new connection
        # was made
332
        self.register_socket(newsocket)
333
334
335

    def register_socket(self, newsocket):
        """
336
337
        Internal function to insert a socket. Used by process_accept and some
        tests.
338
        """
339
        self.sockets[newsocket.fileno()] = newsocket
340
341
        lname = self.newlname()
        self.lnames[lname] = newsocket
342
        self.fd_to_lname[newsocket.fileno()] = lname
343

344
345
346
        logger.debug(TRACE_BASIC, MSGQ_SOCKET_REGISTERED, newsocket.fileno(),
                     lname)

347
348
        self.members_notify('connected', {'client': lname})

349
350
    def kill_socket(self, fd, sock):
        """Fully close down the socket."""
351

352
        unsubscribed_from = self.subs.unsubscribe_all(sock)
353
354
        lname = self.fd_to_lname[fd]
        del self.fd_to_lname[fd]
355
        del self.lnames[lname]
356
        sock.close()
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
357
358
359
        del self.sockets[fd]
        if fd in self.sendbuffs:
            del self.sendbuffs[fd]
360
        logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
361
362
363
364
365
366
367
        # Filter out just the groups.
        unsubscribed_from_groups = set(map(lambda x: x[0], unsubscribed_from))
        for group in unsubscribed_from_groups:
            self.members_notify('unsubscribed', {
                                    'client': lname,
                                    'group': group
                                })
368
        self.members_notify('disconnected', {'client': lname})
369

370
    def __getbytes(self, fd, sock, length, continued):
371
        """Get exactly the requested bytes, or raise an exception if
372
373
374
375
376
           EOF.

           continued is set to True if this method is called to complete
           already read data.
           """
377
378
        received = b''
        while len(received) < length:
Jelte Jansen's avatar
Jelte Jansen committed
379
380
            try:
                data = sock.recv(length - len(received))
381

382
            except socket.error as err:
383
384
385
386
387
                # This case includes ECONNRESET, which seems to happen when
                # the remote client has closed its socket at some subtle
                # timing (it should normally result in receiving empty data).
                # Since we didn't figure out how exactly that could happen,
                # we treat it just like other really-unexpected socket errors.
388
                raise MsgQReceiveError(str(err))
389
            if len(data) == 0:
390
                raise MsgQCloseOnReceive("EOF", continued)
391
            received += data
392
            continued = True
393
394
395
396
397
        return received

    def read_packet(self, fd, sock):
        """Read a correctly formatted packet.  Will raise exceptions if
           something fails."""
398
        lengths = self.__getbytes(fd, sock, 6, False)
399
400
401
402
403
404
405
406
407
408
        overall_length, routing_length = struct.unpack(">IH", lengths)
        if overall_length < 2:
            raise MsgQReceiveError("overall_length < 2")
        overall_length -= 2
        if routing_length > overall_length:
            raise MsgQReceiveError("routing_length > overall_length")
        if routing_length == 0:
            raise MsgQReceiveError("routing_length == 0")
        data_length = overall_length - routing_length
        # probably need to sanity check lengths here...
409
        routing = self.__getbytes(fd, sock, routing_length, True)
410
        if data_length > 0:
411
            data = self.__getbytes(fd, sock, data_length, True)
412
413
414
415
        else:
            data = None
        return (routing, data)

416
    def process_packet(self, fd, sock):
417
418
419
        """Process one packet."""
        try:
            routing, data = self.read_packet(fd, sock)
420
        except (MsgQReceiveError, MsgQCloseOnReceive) as err:
421
            # If it's MsgQCloseOnReceive and that happens without reading
422
            # any data, it basically means the remote client has closed the
423
424
            # socket, so we log it as debug information.  Otherwise, it's
            # a somewhat unexpected event, so we consider it an "error".
425
426
427
            if isinstance(err, MsgQCloseOnReceive) and not err.partial_read:
                logger.debug(TRACE_BASIC, MSGQ_CLOSE_ON_RECV, fd)
            else:
428
                logger.error(MSGQ_RECV_ERROR, fd, err)
429
            self.kill_socket(fd, sock)
430
431
432
            return

        try:
433
            routingmsg = isc.cc.message.from_wire(routing)
434
435
        except DecodeError as err:
            self.kill_socket(fd, sock)
436
            logger.error(MSGQ_HDR_DECODE_ERROR, fd, err)
437
            return
438
439
440
441
442

        self.process_command(fd, sock, routingmsg, data)

    def process_command(self, fd, sock, routing, data):
        """Process a single command.  This will split out into one of the
Jeremy C. Reed's avatar
Jeremy C. Reed committed
443
           other functions."""
444
        logger.debug(TRACE_DETAIL, MSGQ_RECV_HDR, routing)
445
446
        cmd = routing[CC_HEADER_TYPE]
        if cmd == CC_COMMAND_SEND:
447
            self.process_command_send(sock, routing, data)
448
        elif cmd == CC_COMMAND_SUBSCRIBE:
449
            self.process_command_subscribe(sock, routing, data)
450
        elif cmd == CC_COMMAND_UNSUBSCRIBE:
451
            self.process_command_unsubscribe(sock, routing, data)
452
        elif cmd == CC_COMMAND_GET_LNAME:
453
            self.process_command_getlname(sock, routing, data)
454
        elif cmd == CC_COMMAND_PING:
455
456
            # Command for testing purposes
            self.process_command_ping(sock, routing, data)
457
        elif cmd == CC_COMMAND_STOP:
458
            self.stop()
459
        else:
460
            logger.error(MSGQ_INVALID_CMD, cmd)
461

462
    def preparemsg(self, env, msg = None):
463
        if type(env) == dict:
464
            env = isc.cc.message.to_wire(env)
465
        if type(msg) == dict:
466
            msg = isc.cc.message.to_wire(msg)
467
468
469
        length = 2 + len(env);
        if msg:
            length += len(msg)
470
471
        ret = struct.pack("!IH", length, len(env))
        ret += env
472
        if msg:
473
474
475
476
            ret += msg
        return ret

    def sendmsg(self, sock, env, msg = None):
477
        self.send_prepared_msg(sock, self.preparemsg(env, msg))
478

479
    def _send_data(self, sock, data):
480
        """
481
482
483
484
        Send a piece of data to the given socket.  This method is
        essentially "private" to MsgQ, but defined as if it were "protected"
        for easier access from tests.

485
486
487
488
489
490
491
492
493
        Parameters:
        sock: The socket to send to
        data: The list of bytes to send
        Returns:
        An integer or None. If an integer (which can be 0), it signals
        the number of bytes sent. If None, the socket appears to have
        been closed on the other end, and it has been killed on this
        side too.
        """
494
        try:
495
496
497
498
            # We set the socket nonblocking, MSG_DONTWAIT doesn't exist
            # on some OSes
            sock.setblocking(0)
            return sock.send(data)
499
        except socket.error as e:
500
            if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR ]:
501
                return 0
502
503
504
505
506
507
508
            elif e.errno in [ errno.EPIPE, errno.ECONNRESET, errno.ENOBUFS ]:
                # EPIPE happens if the remote module has terminated by the time
                # of this send; its severity can vary, but in many cases it
                # shouldn't be critical, so we log it separately as a warning.
                if e.errno == errno.EPIPE:
                    logger.warn(MSGQ_CLOSE_ON_SEND, sock.fileno())
                else:
509
                    logger.error(MSGQ_SEND_ERROR, sock.fileno(),
510
                                 errno.errorcode[e.errno])
511
512
                self.kill_socket(sock.fileno(), sock)
                return None
513
514
            else:
                raise e
515
516
517
        finally:
            # And set it back again
            sock.setblocking(1)
518

519
    def send_prepared_msg(self, sock, msg):
520
521
522
523
524
525
526
527
528
        '''
        Add a message to the queue. If there's nothing waiting, try
        to send it right away.

        Return if the socket is still alive. It can return false if the
        socket dies (for example due to EPIPE in the attempt to send).
        Returning true does not guarantee the message will be delivered,
        but returning false means it won't.
        '''
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
529
530
531
532
533
        # Try to send the data, but only if there's nothing waiting
        fileno = sock.fileno()
        if fileno in self.sendbuffs:
            amount_sent = 0
        else:
534
            amount_sent = self._send_data(sock, msg)
535
536
            if amount_sent is None:
                # Socket has been killed, drop the send
537
                return False
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
538

539
        # Still something to send, add it to outgoing queue
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
540
541
542
543
544
545
546
        if amount_sent < len(msg):
            now = time.clock()
            # Append it to buffer (but check the data go away)
            if fileno in self.sendbuffs:
                (last_sent, buff) = self.sendbuffs[fileno]
                if now - last_sent > 0.1:
                    self.kill_socket(fileno, sock)
547
                    return False
548
                buff += msg
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
549
            else:
550
                buff = msg[amount_sent:]
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
551
                last_sent = now
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
552
            self.sendbuffs[fileno] = (last_sent, buff)
553
        return True
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
554

555
    def _process_write(self, fileno):
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
556
557
558
        # Try to send some data from the buffer
        (_, msg) = self.sendbuffs[fileno]
        sock = self.sockets[fileno]
559
        amount_sent = self._send_data(sock, msg)
560
561
562
563
564
565
        if amount_sent is not None:
            # Keep the rest
            msg = msg[amount_sent:]
            if len(msg) == 0:
                # If there's no more, stop requesting for write availability
                del self.sendbuffs[fileno]
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
566
            else:
567
                self.sendbuffs[fileno] = (time.clock(), msg)
568
569

    def newlname(self):
Jeremy C. Reed's avatar
Jeremy C. Reed committed
570
        """Generate a unique connection identifier for this socket.
571
572
573
        This is done by using an increasing counter and the current
        time."""
        self.connection_counter += 1
574
575
        return "%x_%x@%s" % (time.time(), self.connection_counter,
                             self.hostname)
576

577
    def process_command_ping(self, sock, routing, data):
578
        self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_PONG }, data)
579

580
    def process_command_getlname(self, sock, routing, data):
581
        lname = [ k for k, v in self.lnames.items() if v == sock ][0]
582
583
        self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_GET_LNAME },
                     { CC_PAYLOAD_LNAME : lname })
584
585

    def process_command_send(self, sock, routing, data):
586
587
588
        group = routing[CC_HEADER_GROUP]
        instance = routing[CC_HEADER_INSTANCE]
        to = routing[CC_HEADER_TO]
589
        if group == None or instance == None:
590
            # FIXME: Should we log them instead?
591
            return  # ignore invalid packets entirely
592

593
        if to == CC_TO_WILDCARD:
594
595
            sockets = self.subs.find(group, instance)
        else:
596
597
598
            if to in self.lnames:
                sockets = [ self.lnames[to] ]
            else:
599
                sockets = []
600
601
602
603

        msg = self.preparemsg(routing, data)

        if sock in sockets:
604
            # Don't bounce to self
605
            sockets.remove(sock)
606

607
        has_recipient = False
608
        for socket in sockets:
609
610
611
            if self.send_prepared_msg(socket, msg):
                has_recipient = True
        if not has_recipient and routing.get(CC_HEADER_WANT_ANSWER) and \
612
            CC_HEADER_REPLY not in routing:
613
614
            # We have no recipients. But the sender insists on a reply
            # (and the message isn't a reply itself). We need to send
615
616
            # an error to satisfy the request, since there's nobody
            # else who can.
617
618
619
620
621
622
623
            #
            # We omit the replies on purpose. The recipient might generate
            # the response by copying and mangling the header of incoming
            # message (just like we do below) and would include the want_answer
            # by accident. And we want to avoid loops of errors. Also, it
            # is unclear if the knowledge of undeliverable reply would be
            # of any use to the sender, and it should be much rarer situation.
624
625
626
627
628

            # The real errors would be positive, 1 most probably. We use
            # negative errors for delivery errors to distinguish them a
            # little. We probably should have a way to provide more data
            # in the error message.
629
            payload = isc.config.ccsession.create_answer(CC_REPLY_NO_RECPT,
630
631
632
633
634
                                                         "No such recipient")
            # We create the header based on the current one. But we don't
            # want to mangle it for the caller, so we get a copy. A shallow
            # one should be enough, we modify the dict only.
            header = routing.copy()
635
            header[CC_HEADER_REPLY] = routing[CC_HEADER_SEQ]
636
637
            # Dummy lname not assigned to clients
            header[CC_HEADER_FROM] = "msgq"
638
            header[CC_HEADER_TO] = routing[CC_HEADER_FROM]
639
            # We keep the seq as it is. We don't need to track the message
640
641
            # and we will not confuse the sender. The sender would use an
            # unique id for each message, so we won't return one twice to it.
642
643
644
            errmsg = self.preparemsg(header, payload)
            # Send it back.
            self.send_prepared_msg(sock, errmsg)
645
646

    def process_command_subscribe(self, sock, routing, data):
647
648
        group = routing[CC_HEADER_GROUP]
        instance = routing[CC_HEADER_INSTANCE]
649
        if group == None or instance == None:
650
651
            return  # ignore invalid packets entirely
        self.subs.subscribe(group, instance, sock)
652
653
654
655
656
657
        lname = self.fd_to_lname[sock.fileno()]
        self.members_notify('subscribed',
                            {
                                'client': lname,
                                'group': group
                            })
658
659

    def process_command_unsubscribe(self, sock, routing, data):
660
661
        group = routing[CC_HEADER_GROUP]
        instance = routing[CC_HEADER_INSTANCE]
662
663
        if group == None or instance == None:
            return  # ignore invalid packets entirely
664
665
666
667
668
669
670
        if self.subs.unsubscribe(group, instance, sock):
            lname = self.fd_to_lname[sock.fileno()]
            self.members_notify('unsubscribed',
                                {
                                    'client': lname,
                                    'group': group
                                })
671
672
673

    def run(self):
        """Process messages.  Forever.  Mostly."""
674
        self.running = True
Jelte Jansen's avatar
Jelte Jansen committed
675

676
677
678
679
680
681
682
        self.run_select()

    def run_select(self):
        while self.running:
            reads = list(self.fd_to_lname.keys())
            if self.listen_socket.fileno() != -1: # Skip in tests
                reads.append(self.listen_socket.fileno())
683
            if self.__poller_sock and self.__poller_sock.fileno() != -1:
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
                reads.append(self.__poller_sock.fileno())
            writes = list(self.sendbuffs.keys())
            (read_ready, write_ready) = ([], [])
            try:
                (read_ready, write_ready, _) = select.select(reads, writes,
                                                             []);
            except select.error as err:
                if err.args[0] == errno.EINTR:
                    continue # Just try it again if interrupted.
                else:
                    logger.fatal(MSGQ_SELECT_ERROR, err)
                    break
            with self.__lock:
                write_ready = set(write_ready)
                for fd in read_ready:
                    # Do only one operation per loop iteration on the given fd.
                    # It could be possible to perform both, but it may have
                    # undesired side effects in special situations (like, if the
                    # read closes the socket).
                    if fd in write_ready:
                        write_ready.remove(fd)
                    if fd == self.listen_socket.fileno():
                        self.process_accept()
707
708
                    elif self.__poller_sock and fd == \
                        self.__poller_sock.fileno():
709
710
711
712
                        # The signal socket. We should terminate now.
                        self.running = False
                        break
                    else:
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
713
                        self.process_packet(fd, self.sockets[fd])
714
                for fd in write_ready:
715
                    self._process_write(fd)
716

717
    def stop(self):
718
719
720
        # Signal it should terminate.
        self.__control_sock.close()
        self.__control_sock = None
721
722
723
        # Abort anything waiting on the condition, just to make sure it's not
        # blocked forever
        self.cfgmgr_ready(False)
724
725
726
727
728
729
730
731
732
733
734

    def cleanup_signalsock(self):
        """Close the signal sockets. We could do it directly in shutdown,
           but this part is reused in tests.
        """
        if self.__poller_sock:
            self.__poller_sock.close()
            self.__poller_sock = None
        if self.__control_sock:
            self.__control_sock.close()
            self.__control_sock = None
735

736
737
    def shutdown(self):
        """Stop the MsgQ master."""
738
        logger.debug(TRACE_START, MSGQ_SHUTDOWN)
739
        self.listen_socket.close()
740
        self.cleanup_signalsock()
741
742
743
744
745
746
747
748
749
750
751
752
        # Close all the sockets too. In real life, there should be none now,
        # as Msgq should be the last one. But some tests don't adhere to this
        # and create a new Msgq for each test, which led to huge socket leaks.
        # Some other threads put some other things in instead of sockets, so
        # we catch whatever exceptions there we can. This should be safe,
        # because in real operation, we will terminate now anyway, implicitly
        # closing anything anyway.
        for sock in self.sockets.values():
            try:
                sock.close()
            except Exception:
                pass
753
754
        if os.path.exists(self.socket_file):
            os.remove(self.socket_file)
755

756
757
758
759
    def config_handler(self, new_config):
        """The configuration handler (run in a separate thread).
           Not tested, currently effectively empty.
        """
760
761
        config_logger.debug(TRACE_DETAIL, MSGQ_CONFIG_DATA, new_config)

762
763
764
765
        with self.__lock:
            if not self.running:
                return

766
            # TODO: Any config handling goes here.
767

768
            return isc.config.create_answer(0)
769
770

    def command_handler(self, command, args):
Paul Selkirk's avatar
Paul Selkirk committed
771
        """The command handler (run in a separate thread)."""
772
773
        config_logger.debug(TRACE_DETAIL, MSGQ_COMMAND, command, args)

774
775
776
777
        with self.__lock:
            if not self.running:
                return

778
779
780
781
782
783
784
785
786
787
788
789
            # TODO: Who does validation? The ModuleCCSession or must we?

            if command == 'members':
                # List all members of MsgQ or of a group.
                if args is None:
                    args = {}
                group = args.get('group')
                if group:
                    return isc.config.create_answer(0,
                        list(map(lambda sock: self.fd_to_lname[sock.fileno()],
                                 self.subs.find(group, ''))))
                else:
790
791
                    return isc.config.create_answer(0,
                                                    list(self.lnames.keys()))
792
793

            config_logger.error(MSGQ_COMMAND_UNKNOWN, command)
794
            return isc.config.create_answer(1, 'unknown command: ' + command)
795

796
def signal_handler(msgq, signal, frame):
797
    if msgq:
798
        msgq.stop()
799

800
def main():
801
    def check_port(option, opt_str, value, parser):
Jelte Jansen's avatar
Jelte Jansen committed
802
        """Function to insure that the port we are passed is actually
803
804
805
        a valid port number. Used by OptionParser() on startup."""
        intval = int(value)
        if (intval < 0) or (intval > 65535):
806
807
            raise OptionValueError("%s requires a port number (0-65535)" %
                                   opt_str)
808
809
810
        parser.values.msgq_port = intval

    # Parse any command-line options.
811
    parser = OptionParser(version=VERSION)
812
    # TODO: Should we remove the option?
813
814
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
                      help="display more about what is going on")
815
816
817
    parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
                      type="string", default=None,
                      help="UNIX domain socket file the msgq daemon will use")
818
819
820
    (options, args) = parser.parse_args()

    # Announce startup.
821
    logger.debug(TRACE_START, MSGQ_START, VERSION)
822

823
    msgq = MsgQ(options.msgq_socket_file, options.verbose)
824

825
826
827
    signal.signal(signal.SIGTERM,
                  lambda signal, frame: signal_handler(msgq, signal, frame))

828
829
830
    try:
        msgq.setup()
    except Exception as e:
831
        logger.fatal(MSGQ_START_FAIL, e)
832
833
        sys.exit(1)

834
835
836
837
838
839
    # We run the processing in a separate thread. This is because we want to
    # connect to the msgq ourself. But the cc library is unfortunately blocking
    # in many places and waiting for the processing part to answer, it would
    # deadlock.
    poller_thread = threading.Thread(target=msgq.run)
    poller_thread.daemon = True
840
    try:
841
        poller_thread.start()
842
843
844
845
846
847
848
        if msgq.wait_cfgmgr():
            # Once we get the config manager, we can read our own config.
            session = isc.config.ModuleCCSession(SPECFILE_LOCATION,
                                                 msgq.config_handler,
                                                 msgq.command_handler,
                                                 None, True,
                                                 msgq.socket_file)
849
            msgq._session = session
850
            session.start()
851
852
853
854
855
            # And we create a thread that'll just wait for commands and
            # handle them. We don't terminate the thread, we set it to
            # daemon. Once the main thread terminates, it'll just die.
            def run_session():
                while True:
856
857
858
859
860
861
862
863
864
865
                    # As the check_command has internal mutex that is shared
                    # with sending part (which includes notify). So we don't
                    # want to hold it long-term and block using select.
                    fileno = session.get_socket().fileno()
                    try:
                        (reads, _, _) = select.select([fileno], [], [])
                    except select.error as se:
                        if se.args[0] != errno.EINTR:
                            raise
                    session.check_command(True)
866
867
868
            background_thread = threading.Thread(target=run_session)
            background_thread.daemon = True
            background_thread.start()
869
        poller_thread.join()
870
871
    except KeyboardInterrupt:
        pass
872
873

    msgq.shutdown()
874
875

    logger.info(MSGQ_EXITING)
876
877
878

if __name__ == "__main__":
    isc.util.traceback_handler.traceback_handler(main)