msgq_test.py 44.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2010-2013  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

16
import msgq
17
18
19
from msgq import SubscriptionManager, MsgQ

import unittest
Jelte Jansen's avatar
Jelte Jansen committed
20
import os
21
import socket
22
23
import signal
import sys
24
import time
25
26
import errno
import threading
27
import isc.cc
28
import collections
29
import isc.log
30
31
import struct
import json
32
33

#
34
35
# Currently only the subscription part and some sending is implemented...
# I'd have to mock out a socket, which, while not impossible, is not trivial.
36
37
38
39
#

class TestSubscriptionManager(unittest.TestCase):
    def setUp(self):
40
41
42
43
44
45
        self.__cfgmgr_ready_called = 0
        self.sm = SubscriptionManager(self.cfgmgr_ready)

    def cfgmgr_ready(self):
        # Called one more time
        self.__cfgmgr_ready_called += 1
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

    def test_subscription_add_delete_manager(self):
        self.sm.subscribe("a", "*", 'sock1')
        self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])

    def test_subscription_add_delete_other(self):
        self.sm.subscribe("a", "*", 'sock1')
        self.sm.unsubscribe("a", "*", 'sock2')
        self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])

    def test_subscription_add_several_sockets(self):
        socks = [ 's1', 's2', 's3', 's4', 's5' ]
        for s in socks:
            self.sm.subscribe("a", "*", s)
        self.assertEqual(self.sm.find_sub("a", "*"), socks)

    def test_unsubscribe(self):
        socks = [ 's1', 's2', 's3', 's4', 's5' ]
        for s in socks:
            self.sm.subscribe("a", "*", s)
        self.sm.unsubscribe("a", "*", 's3')
67
68
        self.assertEqual(self.sm.find_sub("a", "*"),
                         [ 's1', 's2', 's4', 's5' ])
69
70
71
72
73
74
75
76
77
78

    def test_unsubscribe_all(self):
        self.sm.subscribe('g1', 'i1', 's1')
        self.sm.subscribe('g1', 'i1', 's2')
        self.sm.subscribe('g1', 'i2', 's1')
        self.sm.subscribe('g1', 'i2', 's2')
        self.sm.subscribe('g2', 'i1', 's1')
        self.sm.subscribe('g2', 'i1', 's2')
        self.sm.subscribe('g2', 'i2', 's1')
        self.sm.subscribe('g2', 'i2', 's2')
79
80
81
        self.assertEqual(set([('g1', 'i1'), ('g1', 'i2'), ('g2', 'i1'),
                              ('g2', 'i2')]),
                         set(self.sm.unsubscribe_all('s1')))
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
        self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's2' ])
        self.assertEqual(self.sm.find_sub("g1", "i2"), [ 's2' ])
        self.assertEqual(self.sm.find_sub("g2", "i1"), [ 's2' ])
        self.assertEqual(self.sm.find_sub("g2", "i2"), [ 's2' ])

    def test_find(self):
        self.sm.subscribe('g1', 'i1', 's1')
        self.sm.subscribe('g1', '*', 's2')
        self.assertEqual(set(self.sm.find("g1", "i1")), set([ 's1', 's2' ]))

    def test_find_sub(self):
        self.sm.subscribe('g1', 'i1', 's1')
        self.sm.subscribe('g1', '*', 's2')
        self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's1' ])

Jelte Jansen's avatar
Jelte Jansen committed
97
98
99
    def test_open_socket_parameter(self):
        self.assertFalse(os.path.exists("./my_socket_file"))
        msgq = MsgQ("./my_socket_file");
100
        msgq.setup()
Jelte Jansen's avatar
Jelte Jansen committed
101
102
103
104
105
106
107
108
        self.assertTrue(os.path.exists("./my_socket_file"))
        msgq.shutdown();
        self.assertFalse(os.path.exists("./my_socket_file"))

    def test_open_socket_environment_variable(self):
        self.assertFalse(os.path.exists("my_socket_file"))
        os.environ["BIND10_MSGQ_SOCKET_FILE"] = "./my_socket_file"
        msgq = MsgQ();
109
        msgq.setup()
Jelte Jansen's avatar
Jelte Jansen committed
110
111
112
113
114
        self.assertTrue(os.path.exists("./my_socket_file"))
        msgq.shutdown();
        self.assertFalse(os.path.exists("./my_socket_file"))

    def test_open_socket_default(self):
115
        env_var = None
116
        orig_socket_file = None
Jelte Jansen's avatar
Jelte Jansen committed
117
        if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
118
            env_var = os.environ["BIND10_MSGQ_SOCKET_FILE"]
Jelte Jansen's avatar
Jelte Jansen committed
119
            del os.environ["BIND10_MSGQ_SOCKET_FILE"]
120
121
122
123
        # temporarily replace the class "default" not to be disrupted by
        # any running BIND 10 instance.
        if "BIND10_TEST_SOCKET_FILE" in os.environ:
            MsgQ.SOCKET_FILE = os.environ["BIND10_TEST_SOCKET_FILE"]
Jelte Jansen's avatar
Jelte Jansen committed
124
125
126
        socket_file = MsgQ.SOCKET_FILE
        self.assertFalse(os.path.exists(socket_file))
        msgq = MsgQ();
127
128
129
        try:
            msgq.setup()
            self.assertTrue(os.path.exists(socket_file))
130
            msgq.shutdown()
131
132
133
134
135
136
137
            self.assertFalse(os.path.exists(socket_file))
        except socket.error:
            # ok, the install path doesn't exist at all,
            # so we can't check any further
            pass
        if env_var is not None:
            os.environ["BIND10_MSGQ_SOCKET_FILE"] = env_var
138
139
        if orig_socket_file is not None:
            MsgQ.SOCKET_FILE = orig_socket_file
Jelte Jansen's avatar
Jelte Jansen committed
140
141

    def test_open_socket_bad(self):
142
143
        msgq = MsgQ("/does/not/exist")
        self.assertRaises(socket.error, msgq.setup)
144
145
        # But we can clean up after that.
        msgq.shutdown()
Jelte Jansen's avatar
Jelte Jansen committed
146

147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
    def test_subscribe_cfgmgr(self):
        """Test special handling of the config manager. Once it subscribes,
           the message queue needs to connect and read the config. But not
           before and only once.
        """
        self.assertEqual(0, self.__cfgmgr_ready_called)
        # Not called when something else subscribes
        self.sm.subscribe('SomethingElse', '*', 's1')
        self.assertEqual(0, self.__cfgmgr_ready_called)
        # Called whenever the config manager subscribes
        self.sm.subscribe('ConfigManager', '*', 's2')
        self.assertEqual(1, self.__cfgmgr_ready_called)
        # But not called again when it subscribes again (should not
        # happen in practice, but we make sure anyway)
        self.sm.subscribe('ConfigManager', '*', 's3')
        self.assertEqual(1, self.__cfgmgr_ready_called)

164
class MsgQTest(unittest.TestCase):
165
166
167
168
    """
    Tests for the behaviour of MsgQ. This is for the core of MsgQ, other
    subsystems are in separate test fixtures.
    """
169
170
171
    def setUp(self):
        self.__msgq = MsgQ()

172
173
174
175
176
177
178
179
180
181
182
183
    def parse_msg(self, msg):
        """
        Parse a binary representation of message to the routing header and the
        data payload. It assumes the message is correctly encoded and the
        payload is not omitted. It'd probably throw in other cases, but we
        don't use it in such situations in this test.
        """
        (length, header_len) = struct.unpack('>IH', msg[:6])
        header = json.loads(msg[6:6 + header_len].decode('utf-8'))
        data = json.loads(msg[6 + header_len:].decode('utf-8'))
        return (header, data)

184
185
186
187
188
189
190
191
192
    def test_unknown_command(self):
        """
        Test the command handler returns error when the command is unknown.
        """
        # Fake we are running, to disable test workarounds
        self.__msgq.running = True
        self.assertEqual({'result': [1, "unknown command: unknown"]},
                         self.__msgq.command_handler('unknown', {}))

193
194
195
196
197
198
199
200
201
    def test_get_members(self):
        """
        Test getting members of a group or of all connected clients.
        """
        # Push two dummy "clients" into msgq (the ugly way, by directly
        # tweaking relevant data structures).
        class Sock:
            def __init__(self, fileno):
                self.fileno = lambda: fileno
202
203
204
205
        self.__msgq.lnames['first'] = Sock(1)
        self.__msgq.lnames['second'] = Sock(2)
        self.__msgq.fd_to_lname[1] = 'first'
        self.__msgq.fd_to_lname[2] = 'second'
206
        # Subscribe them to some groups
207
208
        self.__msgq.process_command_subscribe(self.__msgq.lnames['first'],
                                              {'group': 'G1', 'instance': '*'},
209
                                              None)
210
211
        self.__msgq.process_command_subscribe(self.__msgq.lnames['second'],
                                              {'group': 'G1', 'instance': '*'},
212
                                              None)
213
214
        self.__msgq.process_command_subscribe(self.__msgq.lnames['second'],
                                              {'group': 'G2', 'instance': '*'},
215
216
217
218
219
220
221
222
223
                                              None)
        # Now query content of some groups through the command handler.
        self.__msgq.running = True # Enable the command handler
        def check_both(result):
            """
            Check the result is successful one and it contains both lnames (in
            any order).
            """
            array = result['result'][1]
224
225
            self.assertEqual(set(['first', 'second']), set(array))
            self.assertEqual({'result': [0, array]}, result)
226
227
228
229
            # Make sure the result can be encoded as JSON
            # (there seems to be types that look like a list but JSON choks
            # on them)
            json.dumps(result)
230
        # Members of the G1 and G2
231
        self.assertEqual({'result': [0, ['second']]},
232
                         self.__msgq.command_handler('members',
233
                                                     {'group': 'G2'}))
234
235
        check_both(self.__msgq.command_handler('members', {'group': 'G1'}))
        # We pretend that all the possible groups exist, just that most
236
        # of them are empty. So requesting for Empty is request for an empty
237
238
239
        # group and should not fail.
        self.assertEqual({'result': [0, []]},
                         self.__msgq.command_handler('members',
240
                                                     {'group': 'Empty'}))
241
242
243
244
245
        # Without the name of the group, we just get all the clients.
        check_both(self.__msgq.command_handler('members', {}))
        # Omitting the parameters completely in such case is OK
        check_both(self.__msgq.command_handler('members', None))

246
    def notifications_setup(self):
247
        """
248
        Common setup of some notifications tests. Mock several things.
249
250
251
252
253
254
255
256
257
258
259
        """
        # Mock the method to send notifications (we don't really want
        # to send them now, just see they'd be sent).
        # Mock the poller, as we don't need it at all (and we don't have
        # real socket to give it now).
        notifications = []
        def send_notification(event, params):
            notifications.append((event, params))
        class FakePoller:
            def register(self, socket, mode):
                pass
260
            def unregister(self, sock):
261
262
263
264
265
266
267
268
                pass
        self.__msgq.members_notify = send_notification
        self.__msgq.poller = FakePoller()

        # Create a socket
        class Sock:
            def __init__(self, fileno):
                self.fileno = lambda: fileno
269
270
            def close(self):
                pass
271
        sock = Sock(1)
272
273
274
275
276
277
278
279
        return notifications, sock

    def test_notifies(self):
        """
        Test the message queue sends notifications about connecting,
        disconnecting and subscription changes.
        """
        notifications, sock = self.notifications_setup()
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306

        # We should notify about new cliend when we register it
        self.__msgq.register_socket(sock)
        lname = list(self.__msgq.lnames.keys())[0] # Steal the lname
        self.assertEqual([('connected', {'client': lname})], notifications)
        notifications.clear()

        # A notification should happen for a subscription to a group
        self.__msgq.process_command_subscribe(sock, {'group': 'G',
                                                     'instance': '*'},
                                              None)
        self.assertEqual([('subscribed', {'client': lname, 'group': 'G'})],
                         notifications)
        notifications.clear()

        # As well for unsubscription
        self.__msgq.process_command_unsubscribe(sock, {'group': 'G',
                                                       'instance': '*'},
                                                None)
        self.assertEqual([('unsubscribed', {'client': lname, 'group': 'G'})],
                         notifications)
        notifications.clear()

        # And, finally, for removal of client
        self.__msgq.kill_socket(sock.fileno(), sock)
        self.assertEqual([('disconnected', {'client': lname})], notifications)

307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
    def test_notifies_implicit_kill(self):
        """
        Test that the unsubscription notifications are sent before the socket
        is dropped, even in case it does not unsubscribe explicitly.
        """
        notifications, sock = self.notifications_setup()

        # Register and subscribe. Notifications for these are in above test.
        self.__msgq.register_socket(sock)
        lname = list(self.__msgq.lnames.keys())[0] # Steal the lname
        self.__msgq.process_command_subscribe(sock, {'group': 'G',
                                                     'instance': '*'},
                                              None)
        notifications.clear()

        self.__msgq.kill_socket(sock.fileno(), sock)
        # Now, the notification for unsubscribe should be first, second for
        # the disconnection.
        self.assertEqual([('unsubscribed', {'client': lname, 'group': 'G'}),
                          ('disconnected', {'client': lname})
                         ], notifications)

329
    def test_undeliverable_errors(self):
330
331
332
        """
        Send several packets through the MsgQ and check it generates
        undeliverable notifications under the correct circumstances.
333
334
335
336

        The test is not exhaustive as it doesn't test all combination
        of existence of the recipient, addressing schemes, want_answer
        header and the reply header. It is not needed, these should
337
        be mostly independent. That means, for example, if the message
338
339
340
341
        is a reply and there's no recipient to send it to, the error
        would not be generated no matter if we addressed the recipient
        by lname or group. If we included everything, the test would
        have too many scenarios with little benefit.
342
        """
343
        self.__sent_messages = []
344
        def fake_send_prepared_msg(socket, msg):
345
            self.__sent_messages.append((socket, msg))
346
347
            return True
        self.__msgq.send_prepared_msg = fake_send_prepared_msg
348
349
350
351
352
        # These would be real sockets in the MsgQ, but we pass them as
        # parameters only, so we don't need them to be. We use simple
        # integers to tell one from another.
        sender = 1
        recipient = 2
353
        another_recipiet = 3
354
355
356
        # The routing headers and data to test with.
        routing = {
            'to': '*',
357
            'from': 'sender',
358
359
360
361
362
363
364
            'group': 'group',
            'instance': '*',
            'seq': 42
        }
        data = {
            "data": "Just some data"
        }
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391

        # Some common checking patterns
        def check_error():
            self.assertEqual(1, len(self.__sent_messages))
            self.assertEqual(1, self.__sent_messages[0][0])
            self.assertEqual(({
                                  'group': 'group',
                                  'instance': '*',
                                  'reply': 42,
                                  'seq': 42,
                                  'from': 'msgq',
                                  'to': 'sender',
                                  'want_answer': True
                              }, {'result': [-1, "No such recipient"]}),
                              self.parse_msg(self.__sent_messages[0][1]))
            self.__sent_messages = []

        def check_no_message():
            self.assertEqual([], self.__sent_messages)

        def check_delivered(rcpt_socket=recipient):
            self.assertEqual(1, len(self.__sent_messages))
            self.assertEqual(rcpt_socket, self.__sent_messages[0][0])
            self.assertEqual((routing, data),
                             self.parse_msg(self.__sent_messages[0][1]))
            self.__sent_messages = []

392
393
394
        # Send the message. No recipient, but errors are not requested,
        # so none is generated.
        self.__msgq.process_command_send(sender, routing, data)
395
396
        check_no_message()

397
        # It should act the same if we explicitly say we do not want replies.
398
        routing["want_answer"] = False
399
        self.__msgq.process_command_send(sender, routing, data)
400
401
        check_no_message()

402
        # Ask for errors if it can't be delivered.
403
        routing["want_answer"] = True
404
        self.__msgq.process_command_send(sender, routing, data)
405
406
        check_error()

407
        # If the message is a reply itself, we never generate the errors
408
409
        routing["reply"] = 3
        self.__msgq.process_command_send(sender, routing, data)
410
411
        check_no_message()

412
413
414
415
416
        # If there are recipients (but no "reply" header), the error should not
        # be sent and the message should get delivered.
        del routing["reply"]
        self.__msgq.subs.find = lambda group, instance: [recipient]
        self.__msgq.process_command_send(sender, routing, data)
417
418
        check_delivered()

419
420
421
422
        # When we send a direct message and the recipient is not there, we get
        # the error too
        routing["to"] = "lname"
        self.__msgq.process_command_send(sender, routing, data)
423
424
        check_error()

425
426
427
428
        # But when the recipient is there, it is delivered and no error is
        # generated.
        self.__msgq.lnames["lname"] = recipient
        self.__msgq.process_command_send(sender, routing, data)
429
        check_delivered()
430

431
432
433
434
435
436
437
        # If an attempt to send fails, consider it no recipient.
        def fail_send_prepared_msg(socket, msg):
            '''
            Pretend sending a message failed. After one call, return to the
            usual mock, so the errors or other messages can be sent.
            '''
            self.__msgq.send_prepared_msg = fake_send_prepared_msg
438
439
            return False

440
441
        self.__msgq.send_prepared_msg = fail_send_prepared_msg
        self.__msgq.process_command_send(sender, routing, data)
442
443
        check_error()

444
445
446
447
448
449
450
        # But if there are more recipients and only one fails, it should
        # be delivered to the other and not considered an error
        self.__msgq.send_prepared_msg = fail_send_prepared_msg
        routing["to"] = '*'
        self.__msgq.subs.find = lambda group, instance: [recipient,
                                                         another_recipiet]
        self.__msgq.process_command_send(sender, routing, data)
451
        check_delivered(rcpt_socket=another_recipiet)
452

453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
class DummySocket:
    """
    Dummy socket class.
    This one does nothing at all, but some calls are used.
    It is mainly intended to override the listen socket for msgq, which
    we do not need in these tests.
    """
    def fileno():
        return -1

    def close():
        pass

class BadSocket:
    """
    Special socket wrapper class. Once given a socket in its constructor,
    it completely behaves like that socket, except that its send() call
    will only actually send one byte per call, and optionally raise a given
    exception at a given time.
    """
    def __init__(self, real_socket, raise_on_send=0, send_exception=None):
        """
        Parameters:
        real_socket: The actual socket to wrap
Jelte Jansen's avatar
Jelte Jansen committed
477
478
479
        raise_on_send: integer. If higher than 0, and send_exception is
                       not None, send_exception will be raised on the
                       'raise_on_send'th call to send().
480
481
482
483
484
485
486
487
488
        send_exception: if not None, this exception will be raised
                        (if raise_on_send is not 0)
        """
        self.socket = real_socket
        self.send_count = 0
        self.raise_on_send = raise_on_send
        self.send_exception = send_exception

    # completely wrap all calls and member access
489
    # (except explicitly overridden ones)
490
491
    def __getattr__(self, name, *args):
        attr = getattr(self.socket, name)
492
        if isinstance(attr, collections.Callable):
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
            def callable_attr(*args):
                return attr.__call__(*args)
            return callable_attr
        else:
            return attr

    def send(self, data):
        self.send_count += 1
        if self.send_exception is not None and\
           self.send_count == self.raise_on_send:
            raise self.send_exception

        if len(data) > 0:
            return self.socket.send(data[:1])
        else:
            return 0

class MsgQThread(threading.Thread):
    """
    Very simple thread class that runs msgq.run() when started,
    and stores the exception that msgq.run() raises, if any.
    """
    def __init__(self, msgq):
        threading.Thread.__init__(self)
        self.msgq_ = msgq
        self.caught_exception = None
519
        self.lock = threading.Lock()
520
521
522

    def run(self):
        try:
523
            self.msgq_.run()
524
        except Exception as exc:
525
            # Store the exception to make the test fail if necessary
526
527
            self.caught_exception = exc

528
529
530
    def stop(self):
        self.msgq_.stop()

531
532
533
534
535
class SendNonblock(unittest.TestCase):
    """
    Tests that the whole thing will not get blocked if someone does not read.
    """

536
    def terminate_check(self, task, timeout=30):
537
538
539
540
541
542
543
544
545
546
547
548
549
550
        """
        Runs task in separate process (task is a function) and checks
        it terminates sooner than timeout.
        """
        task_pid = os.fork()
        if task_pid == 0:
            # Kill the forked process after timeout by SIGALRM
            signal.alarm(timeout)
            # Run the task
            # If an exception happens or we run out of time, we terminate
            # with non-zero
            task()
            # If we got here, then everything worked well and in time
            # In that case, we terminate successfully
Shane Kerr's avatar
Shane Kerr committed
551
            os._exit(0)	# needs exit code
552
553
554
555
556
        else:
            (pid, status) = os.waitpid(task_pid, 0)
            self.assertEqual(0, status,
                "The task did not complete successfully in time")

557
558
559
560
561
562
    def get_msgq_with_sockets(self):
        '''
        Create a message queue and prepare it for use with a socket pair.
        The write end is put into the message queue, so we can check it.
        It returns (msgq, read_end, write_end). It is expected the sockets
        are closed by the caller afterwards.
563
564
565

        Also check the sockets are registered correctly (eg. internal data
        structures are there for them).
566
567
568
569
570
571
        '''
        msgq = MsgQ()
        # We do only partial setup, so we don't create the listening socket
        msgq.setup_poller()
        (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
        msgq.register_socket(write)
572
573
        self.assertEqual(1, len(msgq.lnames))
        self.assertEqual(write, msgq.lnames[msgq.fd_to_lname[write.fileno()]])
574
575
        return (msgq, read, write)

576
577
578
579
580
581
    def infinite_sender(self, sender):
        """
        Sends data until an exception happens. socket.error is caught,
        as it means the socket got closed. Sender is called to actually
        send the data.
        """
582
        (msgq, read, write) = self.get_msgq_with_sockets()
583
584
585
586
        # Keep sending while it is not closed by the msgq
        try:
            while True:
                sender(msgq, write)
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
587
        except socket.error:
588
            pass
589
590
591
592

        # Explicitly close temporary socket pair as the Python
        # interpreter expects it.  It may not be 100% exception safe,
        # but since this is only for tests we prefer brevity.
593
594
595
596
597
598
599
600
        # Actually, the write end is often closed by the sender.
        if write.fileno() != -1:
            # Some of the senders passed here kill the socket internally.
            # So kill it only if not yet done so. If the socket is closed,
            # it gets -1 as fileno().
            msgq.kill_socket(write.fileno(), write)
        self.assertFalse(msgq.lnames)
        self.assertFalse(msgq.fd_to_lname)
601
        read.close()
602
603
604
605
606
607

    def test_infinite_sendmsg(self):
        """
        Tries sending messages (and not reading them) until it either times
        out (in blocking call, wrong) or closes it (correct).
        """
608
609
610
        data = "data"
        for i in range(1, 10):
            data += data
611
        self.terminate_check(lambda: self.infinite_sender(
612
            lambda msgq, socket: msgq.sendmsg(socket, {}, {"message" : data})))
613
614
615
616
617
618

    def test_infinite_sendprepared(self):
        """
        Tries sending data (and not reading them) until it either times
        out (in blocking call, wrong) or closes it (correct).
        """
619
620
621
        data = b"data"
        for i in range(1, 10):
            data += data
622
        self.terminate_check(lambda: self.infinite_sender(
623
            lambda msgq, socket: msgq.send_prepared_msg(socket, data)))
624

625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
    def test_sendprepared_success(self):
        '''
        Test the send_prepared_msg returns success when queueing messages.
        It does so on the first attempt (when it actually tries to send
        something to the socket) and on any attempt that follows and the
        buffer is already full.
        '''
        (msgq, read, write) = self.get_msgq_with_sockets()
        # Now keep sending until we fill in something into the internal
        # buffer.
        while not write.fileno() in msgq.sendbuffs:
            self.assertTrue(msgq.send_prepared_msg(write, b'data'))
        read.close()
        write.close()

    def test_sendprepared_epipe(self):
        '''
        Test the send_prepared_msg returns false when we try to queue a
        message and the other side is not there any more. It should be done
        with EPIPE, so not a fatal error.
        '''
        (msgq, read, write) = self.get_msgq_with_sockets()
        # Close one end. It should make a EPIPE on the other.
        read.close()
        # Now it should soft-fail
        self.assertFalse(msgq.send_prepared_msg(write, b'data'))
        write.close()

653
    def send_many(self, data):
654
655
656
657
658
659
660
661
662
        """
        Tries that sending a command many times and getting an answer works.
        """
        msgq = MsgQ()
        # msgq.run needs to compare with the listen_socket, so we provide
        # a replacement
        msgq.listen_socket = DummySocket
        (queue, out) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
        def run():
663
            length = len(data)
664
665
            queue_pid = os.fork()
            if queue_pid == 0:
666
                signal.alarm(120)
667
                msgq.setup_poller()
668
                msgq.setup_signalsock()
669
670
                msgq.register_socket(queue)
                msgq.run()
671
                msgq.cleanup_signalsock()
672
            else:
673
674
675
                try:
                    def killall(signum, frame):
                        os.kill(queue_pid, signal.SIGTERM)
676
                        os._exit(1)
677
678
679
680
681
682
683
684
685
686
687
688
                    signal.signal(signal.SIGALRM, killall)
                    msg = msgq.preparemsg({"type" : "ping"}, data)
                    now = time.clock()
                    while time.clock() - now < 0.2:
                        out.sendall(msg)
                        # Check the answer
                        (routing, received) = msgq.read_packet(out.fileno(),
                            out)
                        self.assertEqual({"type" : "pong"},
                            isc.cc.message.from_wire(routing))
                        self.assertEqual(data, received)
                finally:
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
689
                    os.kill(queue_pid, signal.SIGTERM)
690
        self.terminate_check(run)
691
692
693
694

        # Explicitly close temporary socket pair as the Python
        # interpreter expects it.  It may not be 100% exception safe,
        # but since this is only for tests we prefer brevity.
695
696
        queue.close()
        out.close()
697

698
699
700
701
    def test_small_sends(self):
        """
        Tests sending small data many times.
        """
702
        self.send_many(b"data")
703
704
705
706
707

    def test_large_sends(self):
        """
        Tests sending large data many times.
        """
708
        data = b"data"
709
710
        for i in range(1, 20):
            data = data + data
711
        self.send_many(data)
712

713
714
    def do_send(self, write, read, control_write, control_read,
                expect_arrive=True, expect_send_exception=None):
715
716
717
718
        """
        Makes a msgq object that is talking to itself,
        run it in a separate thread so we can use and
        test run().
719
720
721
722
723
724
        It is given two sets of connected sockets; write/read, and
        control_write/control_read. The former may be throwing errors
        and mangle data to test msgq. The second is mainly used to
        send msgq the stop command.
        (Note that the terms 'read' and 'write' are from the msgq
        point of view, so the test itself writes to 'control_read')
725
726
727
        Parameters:
        write: a socket that is used to send the data to
        read: a socket that is used to read the data from
728
729
        control_write: a second socket for communication with msgq
        control_read: a second socket for communication with msgq
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
        expect_arrive: if True, the read socket is read from, and the data
                       that is read is expected to be the same as the data
                       that has been sent to the write socket.
        expect_send_exception: if not None, this is the exception that is
                               expected to be raised by msgq
        """

        # Some message and envelope data to send and check
        env = b'{"env": "foo"}'
        msg = b'{"msg": "bar"}'

        msgq = MsgQ()
        # Don't need a listen_socket
        msgq.listen_socket = DummySocket
        msgq.setup_poller()
745
        msgq.setup_signalsock()
746
        msgq.register_socket(write)
747
        msgq.register_socket(control_write)
748
749
750
751
752
753
754
755
756
757
758
759
760
761
        # Queue the message for sending
        msgq.sendmsg(write, env, msg)

        # Run it in a thread
        msgq_thread = MsgQThread(msgq)
        # If we're done, just kill it
        msgq_thread.start()

        if expect_arrive:
            (recv_env, recv_msg) = msgq.read_packet(read.fileno(),
                read)
            self.assertEqual(env, recv_env)
            self.assertEqual(msg, recv_msg)

762
763
764
        # Tell msgq to stop
        msg = msgq.preparemsg({"type" : "stop"})
        control_read.sendall(msg)
765

Jelte Jansen's avatar
Jelte Jansen committed
766
        # Wait for thread to stop if it hasn't already.
767
768
769
770
771
        # Put in a (long) timeout; the thread *should* stop, but if it
        # does not, we don't want the test to hang forever
        msgq_thread.join(60)
        # Fail the test if it didn't stop
        self.assertFalse(msgq_thread.isAlive(), "Thread did not stop")
772

773
774
775
776
        # Clean up some internals of msgq (usually called as part of
        # shutdown, but we skip that one here)
        msgq.cleanup_signalsock()

777
        # Check the exception from the thread, if any
778
779
780
781
782
783
784
785
        # First, if we didn't expect it; reraise it (to make test fail and
        # show the stacktrace for debugging)
        if expect_send_exception is None:
            if msgq_thread.caught_exception is not None:
                raise msgq_thread.caught_exception
        else:
            # If we *did* expect it, fail it there was none
            self.assertIsNotNone(msgq_thread.caught_exception)
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802

    def do_send_with_send_error(self, raise_on_send, send_exception,
                                expect_answer=True,
                                expect_send_exception=None):
        """
        Sets up two connected sockets, wraps the sender socket into a BadSocket
        class, then performs a do_send() test.
        Parameters:
        raise_on_send: the byte at which send_exception should be raised
                       (see BadSocket)
        send_exception: the exception to raise (see BadSocket)
        expect_answer: whether the send is expected to complete (and hence
                       the read socket should get the message)
        expect_send_exception: the exception msgq is expected to raise when
                               send_exception is raised by BadSocket.
        """
        (write, read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
803
804
        (control_write, control_read) = socket.socketpair(socket.AF_UNIX,
                                                          socket.SOCK_STREAM)
805
        badwrite = BadSocket(write, raise_on_send, send_exception)
806
807
        self.do_send(badwrite, read, control_write, control_read,
                     expect_answer, expect_send_exception)
808
809
        write.close()
        read.close()
810
811
        control_write.close()
        control_read.close()
812

813
    def test_send_raise_recoverable(self):
814
        """
815
816
        Test whether msgq survices a recoverable socket errors when sending.
        Two tests are done: one where the error is raised on the 3rd octet,
817
818
                            and one on the 23rd.
        """
819
        for err in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR ]:
820
            sockerr = socket.error(err, 'Socket error')
821
822
            self.do_send_with_send_error(3, sockerr)
            self.do_send_with_send_error(23, sockerr)
823

824
    def test_send_raise_nonrecoverable(self):
825
        """
826
827
828
        Test whether msgq survives socket errors that are nonrecoverable
        (for said socket that is, i.e. EPIPE etc).
        Two tests are done: one where the error is raised on the 3rd octet,
829
830
                            and one on the 23rd.
        """
831
        for err in [ errno.EPIPE, errno.ENOBUFS, errno.ECONNRESET ]:
832
            sockerr = socket.error(err, 'Socket error')
833
834
            self.do_send_with_send_error(3, sockerr, False)
            self.do_send_with_send_error(23, sockerr, False)
835

836
    def otest_send_raise_crash(self):
837
838
839
840
841
842
843
844
845
846
847
848
        """
        Test whether msgq does NOT survive on a general exception.
        Note, perhaps it should; but we'd have to first discuss and decide
        how it should recover (i.e. drop the socket and consider the client
        dead?
        It may be a coding problem in msgq itself, and we certainly don't
        want to ignore those.
        """
        sockerr = Exception("just some general exception")
        self.do_send_with_send_error(3, sockerr, False, sockerr)
        self.do_send_with_send_error(23, sockerr, False, sockerr)

849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
class ThreadTests(unittest.TestCase):
    """Test various things around thread synchronization."""

    def setUp(self):
        self.__msgq = MsgQ()
        self.__abort_wait = False
        self.__result = None
        self.__notify_thread = threading.Thread(target=self.__notify)
        self.__wait_thread = threading.Thread(target=self.__wait)
        # Make sure the threads are killed if left behind by the test.
        self.__notify_thread.daemon = True
        self.__wait_thread.daemon = True

    def __notify(self):
        """Call the cfgmgr_ready."""
        if self.__abort_wait:
            self.__msgq.cfgmgr_ready(False)
        else:
            self.__msgq.cfgmgr_ready()

    def __wait(self):
        """Wait for config manager and store the result."""
        self.__result = self.__msgq.wait_cfgmgr()

    def test_wait_cfgmgr(self):
        """One thread signals the config manager subscribed, the other
           waits for it. We then check it terminated correctly.
        """
        self.__notify_thread.start()
        self.__wait_thread.start()
        # Timeout to ensure the test terminates even on failure
        self.__wait_thread.join(60)
        self.assertTrue(self.__result)

    def test_wait_cfgmgr_2(self):
        """Same as test_wait_cfgmgr, but starting the threads in reverse order
           (the result should be the same).
        """
        self.__wait_thread.start()
        self.__notify_thread.start()
        # Timeout to ensure the test terminates even on failure
        self.__wait_thread.join(60)
        self.assertTrue(self.__result)

    def test_wait_abort(self):
        """Similar to test_wait_cfgmgr, but the config manager is never
           subscribed and it is aborted.
        """
        self.__abort_wait = True
        self.__wait_thread.start()
        self.__notify_thread.start()
        # Timeout to ensure the test terminates even on failure
        self.__wait_thread.join(60)
        self.assertIsNotNone(self.__result)
        self.assertFalse(self.__result)

    def __check_ready_and_abort(self):
        """Check that when we first say the config manager is ready and then
           try to abort, it uses the first result.
        """
        self.__msgq.cfgmgr_ready()
        self.__msgq.cfgmgr_ready(False)
        self.__result = self.__msgq.wait_cfgmgr()

    def test_ready_and_abort(self):
        """Perform the __check_ready_and_abort test, but in a separate thread,
           so in case something goes wrong with the synchronisation and it
           deadlocks, the test will terminate anyway.
        """
        test_thread = threading.Thread(target=self.__check_ready_and_abort)
        test_thread.daemon = True
        test_thread.start()
        test_thread.join(60)
        self.assertTrue(self.__result)
923

924
925
926
927
class SocketTests(unittest.TestCase):
    '''Test cases for micro behaviors related to socket operations.

    Some cases are covered as part of other tests, but in this fixture
928
929
    we check more details of specific method related to socket operation,
    with the help of mock classes to avoid expensive overhead.
930
931
932
933
934
935

    '''
    class MockSocket():
        '''A mock socket used instead of standard socket objects.'''
        def __init__(self):
            self.ex_on_send = None # raised from send() if not None
936
            self.recv_result = b'test' # dummy data or exception
937
938
939
940
941
942
943
            self.blockings = [] # history of setblocking() params
        def setblocking(self, on):
            self.blockings.append(on)
        def send(self, data):
            if self.ex_on_send is not None:
                raise self.ex_on_send
            return 10           # arbitrary choice
944
945
946
947
948
949
        def recv(self, len):
            if isinstance(self.recv_result, Exception):
                raise self.recv_result
            ret = self.recv_result
            self.recv_result = b'' # if called again, return empty data
            return ret
950
951
952
953
954
955
956
957
        def fileno(self):
            return 42           # arbitrary choice

    class LoggerWrapper():
        '''A simple wrapper of logger to inspect log messages.'''
        def __init__(self, logger):
            self.error_called = 0
            self.warn_called = 0
958
            self.debug_called = 0
959
            self.orig_logger = logger
960
        def error(self, *args):
961
            self.error_called += 1
962
963
            self.orig_logger.error(*args)
        def warn(self, *args):
964
            self.warn_called += 1
965
            self.orig_logger.warn(*args)
966
967
968
        def debug(self, *args):
            self.debug_called += 1
            self.orig_logger.debug(*args)
969
970
971
972

    def mock_kill_socket(self, fileno, sock):
        '''A replacement of MsgQ.kill_socket method for inspection.'''
        self.__killed_socket = (fileno, sock)
973
974
        if fileno in self.__msgq.sockets:
            del self.__msgq.sockets[fileno]
975
976
977
978
979
980

    def setUp(self):
        self.__msgq = MsgQ()
        self.__msgq.kill_socket = self.mock_kill_socket
        self.__sock = self.MockSocket()
        self.__data = b'dummy'
981
982
        self.__msgq.sockets[42] = self.__sock
        self.__msgq.sendbuffs[42] = (None, b'testdata')
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
        self.__sock_error = socket.error()
        self.__killed_socket = None
        self.__logger = self.LoggerWrapper(msgq.logger)
        msgq.logger = self.__logger

    def tearDown(self):
        msgq.logger = self.__logger.orig_logger

    def test_send_data(self):
        # Successful case: _send_data() returns the hardcoded value, and
        # setblocking() is called twice with the expected parameters
        self.assertEqual(10, self.__msgq._send_data(self.__sock, self.__data))
        self.assertEqual([0, 1], self.__sock.blockings)
        self.assertIsNone(self.__killed_socket)

    def test_send_data_interrupt(self):
999
        '''send() is interrupted. send_data() returns 0, sock isn't killed.'''
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
        expected_blockings = []
        for eno in [errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR]:
            self.__sock_error.errno = eno
            self.__sock.ex_on_send = self.__sock_error
            self.assertEqual(0, self.__msgq._send_data(self.__sock,
                                                       self.__data))
            expected_blockings.extend([0, 1])
            self.assertEqual(expected_blockings, self.__sock.blockings)
            self.assertIsNone(self.__killed_socket)

    def test_send_data_error(self):
        '''Unexpected error happens on send().  The socket is killed.

        If the error is EPIPE, it's logged at the warn level; otherwise
        an error message is logged.

        '''
        expected_blockings = []
        expected_errors = 0
        expected_warns = 0
        for eno in [errno.EPIPE, errno.ECONNRESET, errno.ENOBUFS]:
            self.__sock_error.errno = eno
            self.__sock.ex_on_send = self.__sock_error
1023
            self.__killed_socket = None # clear any previuos value
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
            self.assertEqual(None, self.__msgq._send_data(self.__sock,
                                                          self.__data))
            self.assertEqual((42, self.__sock), self.__killed_socket)
            expected_blockings.extend([0, 1])
            self.assertEqual(expected_blockings, self.__sock.blockings)

            if eno == errno.EPIPE:
                expected_warns += 1
            else:
                expected_errors += 1
            self.assertEqual(expected_errors, self.__logger.error_called)
            self.assertEqual(expected_warns, self.__logger.warn_called)

1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
    def test_process_fd_read_after_bad_write(self):
        '''Check the specific case of write fail followed by read attempt.

        The write failure results in kill_socket, then read shouldn't tried.

        '''
        self.__sock_error.errno = errno.EPIPE
        self.__sock.ex_on_send = self.__sock_error
        self.__msgq.process_socket = None # if called, trigger an exception
        self.__msgq._process_fd(42, True, True, False) # shouldn't crash

        # check the socket is deleted from the fileno=>sock dictionary
        self.assertEqual({}, self.__msgq.sockets)

    def test_process_fd_close_after_bad_write(self):
        '''Similar to the previous, but for checking dup'ed kill attempt'''
        self.__sock_error.errno = errno.EPIPE
        self.__sock.ex_on_send = self.__sock_error
        self.__msgq._process_fd(42, True, False, True) # shouldn't crash
        self.assertEqual({}, self.__msgq.sockets)

    def test_process_fd_writer_after_close(self):
        '''Emulate a "writable" socket has been already closed and killed.'''
        # This just shouldn't crash
        self.__msgq._process_fd(4200, True, False, False)

1063
1064
1065
1066
1067
    def test_process_packet(self):
        '''Check some failure cases in handling an incoming message.'''
        expected_errors = 0
        expected_debugs = 0

1068
        # if socket.recv() fails due to socket.error, it will be logged
1069
        # as error and the socket will be killed regardless of errno.
1070
1071
1072
        for eno in [errno.ENOBUFS, errno.ECONNRESET]:
            self.__sock_error.errno = eno
            self.__sock.recv_result = self.__sock_error
1073
            self.__killed_socket = None # clear any previuos value
1074
1075
            self.__msgq.process_packet(42, self.__sock)
            self.assertEqual((42, self.__sock), self.__killed_socket)
1076
            expected_errors += 1
1077
1078
1079
            self.assertEqual(expected_errors, self.__logger.error_called)
            self.assertEqual(expected_debugs, self.__logger.debug_called)

1080
1081
1082
        # if socket.recv() returns empty data, the result depends on whether
        # there's any preceding data; in the second case below, at least
        # 6 bytes of data will be expected, and the second call to our faked
1083
        # recv() returns empty data.  In that case it will be logged as error.
1084
1085
        for recv_data in [b'', b'short']:
            self.__sock.recv_result = recv_data
1086
            self.__killed_socket = None
1087
1088
1089
1090
1091
1092
1093
1094
1095
            self.__msgq.process_packet(42, self.__sock)
            self.assertEqual((42, self.__sock), self.__killed_socket)
            if len(recv_data) == 0:
                expected_debugs += 1
            else:
                expected_errors += 1
            self.assertEqual(expected_errors, self.__logger.error_called)
            self.assertEqual(expected_debugs, self.__logger.debug_called)

1096
if __name__ == '__main__':
1097
    isc.log.resetUnitTestRootLogger()
1098
    unittest.main()