xfrin_test.py 133 KB
Newer Older
Jelte Jansen's avatar
Jelte Jansen committed
1
# Copyright (C) 2009-2011  Internet Systems Consortium.
Likun Zhang's avatar
Likun Zhang committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

import unittest
17
import re
18
import shutil
Likun Zhang's avatar
Likun Zhang committed
19
import socket
20
import sys
21
import io
22
from isc.testutils.tsigctx_mock import MockTSIGContext
Jelte Jansen's avatar
Jelte Jansen committed
23
from isc.testutils.ccsession_mock import MockModuleCCSession
24
from isc.testutils.rrset_utils import *
Likun Zhang's avatar
Likun Zhang committed
25
from xfrin import *
26
import xfrin
27
from isc.xfrin.diff import Diff
28
import isc.log
29
30
31
32
33
34
35
# If we use any python library that is basically a wrapper for
# a library we use as well (like sqlite3 in our datasources),
# we must make sure we import ours first; If we have special
# rpath or libtool rules to pick the correct version, python might
# choose the wrong one first, if those rules aren't hit first.
# This would result in missing symbols later.
import sqlite3
Likun Zhang's avatar
Likun Zhang committed
36

37
38
39
#
# Commonly used (mostly constant) test parameters
#
40
41
TEST_ZONE_NAME_STR = "example.com."
TEST_ZONE_NAME = Name(TEST_ZONE_NAME_STR)
42
TEST_RRCLASS = RRClass.IN()
43
TEST_RRCLASS_STR = 'IN'
44
45
TEST_DB_FILE = 'db_file'
TEST_MASTER_IPV4_ADDRESS = '127.0.0.1'
46
47
TEST_MASTER_IPV4_ADDRINFO = (socket.AF_INET, socket.SOCK_STREAM,
                             (TEST_MASTER_IPV4_ADDRESS, 53))
48
TEST_MASTER_IPV6_ADDRESS = '::1'
49
50
TEST_MASTER_IPV6_ADDRINFO = (socket.AF_INET6, socket.SOCK_STREAM,
                             (TEST_MASTER_IPV6_ADDRESS, 53))
51
52
53

TESTDATA_SRCDIR = os.getenv("TESTDATASRCDIR")
TESTDATA_OBJDIR = os.getenv("TESTDATAOBJDIR")
54
55
56
57
# XXX: This should be a non priviledge port that is unlikely to be used.
# If some other process uses this port test will fail.
TEST_MASTER_PORT = '53535'

58
59
TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")

60
# SOA intended to be used for the new SOA as a result of transfer.
61
62
63
soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
                  'master.example.com. admin.example.com ' +
                  '1234 3600 1800 2419200 7200')
64
soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(), RRTTL(3600))
65
soa_rrset.add_rdata(soa_rdata)
66
67
68
69
70
71
72
73

# SOA intended to be used for the current SOA at the secondary side.
# Note that its serial is smaller than that of soa_rdata.
begin_soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
                        'master.example.com. admin.example.com ' +
                        '1230 3600 1800 2419200 7200')
begin_soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(), RRTTL(3600))
begin_soa_rrset.add_rdata(begin_soa_rdata)
74
75
example_axfr_question = Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.AXFR())
example_soa_question = Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA())
76
default_questions = [example_axfr_question]
77
default_answers = [soa_rrset]
78

79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def get_fake_time_time():
    '''Returns a temporary replacement function for time.time(), which
       always returns 0.1 more than the previous call. This is to make
       sure these tests do not fail on systems where the time.time()
       function has a high minimal accuracy.
       This fake time.time() is usually set in place of the real one
       where we need testing of get_running_time(). It is done is
       as low a scope as possible, so as not to mess up unit test
       framework time related tests. It must be set before
       XfrinTransferState (or any class that initializes that) is
       initialized.
       And every time it is set up, in must be reset later (again, so
       as not to mess up the framework's concept of time).
    '''
    fake_time = 0.0
    def fake_time_time():
        nonlocal fake_time
        fake_time += 0.1
        return fake_time
    return fake_time_time

100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def check_diffs(assert_fn, expected, actual):
    '''A helper function checking the differences made in the XFR session.

    This is expected called from some subclass of unittest.TestCase and
    assert_fn is generally expected to be 'self.assertEqual' of that class.

    '''
    assert_fn(len(expected), len(actual))
    for (diffs_exp, diffs_actual) in zip(expected, actual):
        assert_fn(len(diffs_exp), len(diffs_actual))
        for (diff_exp, diff_actual) in zip(diffs_exp, diffs_actual):
            # operation should match
            assert_fn(diff_exp[0], diff_actual[0])
            # The diff as RRset should be equal (for simplicity we assume
            # all RRsets contain exactly one RDATA)
            assert_fn(diff_exp[1].get_name(), diff_actual[1].get_name())
            assert_fn(diff_exp[1].get_type(), diff_actual[1].get_type())
            assert_fn(diff_exp[1].get_class(), diff_actual[1].get_class())
            assert_fn(diff_exp[1].get_rdata_count(),
                      diff_actual[1].get_rdata_count())
            assert_fn(1, diff_exp[1].get_rdata_count())
            assert_fn(diff_exp[1].get_rdata()[0],
                      diff_actual[1].get_rdata()[0])

124
125
class XfrinTestException(Exception):
    pass
126

127
128
129
class XfrinTestTimeoutException(Exception):
    pass

Jelte Jansen's avatar
Jelte Jansen committed
130
class MockCC(MockModuleCCSession):
131
    def get_default_value(self, identifier):
132
133
134
        # The returned values should be identical to the spec file
        # XXX: these should be retrieved from the spec file
        # (see MyCCSession of xfrout_test.py.in)
135
136
137
        if identifier == "zones/master_port":
            return TEST_MASTER_PORT
        if identifier == "zones/class":
Jelte Jansen's avatar
Jelte Jansen committed
138
            return TEST_RRCLASS_STR
139
140
        if identifier == "zones/use_ixfr":
            return False
141

142
143
144
    def remove_remote_config(self, module_name):
        pass

145
146
147
148
149
class MockDataSourceClient():
    '''A simple mock data source client.

    This class provides a minimal set of wrappers related the data source
    API that would be used by Diff objects.  For our testing purposes they
150
    only keep truck of the history of the changes.
151
152

    '''
153
    def __init__(self):
154
        self.force_fail = False # if True, raise an exception on commit
155
156
157
        self.committed_diffs = []
        self.diffs = []

158
159
160
161
162
163
164
165
166
167
    def get_class(self):
        '''Mock version of get_class().

        We simply return the commonly used constant RR class.  If and when
        we use this mock for a different RR class we need to adjust it
        accordingly.

        '''
        return TEST_RRCLASS

168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
    def find_zone(self, zone_name):
        '''Mock version of find_zone().

        It returns itself (subsequently acting as a mock ZoneFinder) for
        some test zone names.  For some others it returns either NOTFOUND
        or PARTIALMATCH.

        '''
        if zone_name == TEST_ZONE_NAME or \
                zone_name == Name('no-soa.example') or \
                zone_name == Name('dup-soa.example'):
            return (isc.datasrc.DataSourceClient.SUCCESS, self)
        elif zone_name == Name('no-such-zone.example'):
            return (DataSourceClient.NOTFOUND, None)
        elif zone_name == Name('partial-match-zone.example'):
            return (DataSourceClient.PARTIALMATCH, self)
        raise ValueError('Unexpected input to mock client: bug in test case?')

186
    def find(self, name, rrtype, options=ZoneFinder.FIND_DEFAULT):
187
188
189
190
191
192
193
194
        '''Mock ZoneFinder.find().

        It returns the predefined SOA RRset to queries for SOA of the common
        test zone name.  It also emulates some unusual cases for special
        zone names.

        '''
        if name == TEST_ZONE_NAME and rrtype == RRType.SOA():
195
            return (ZoneFinder.SUCCESS, begin_soa_rrset, 0)
196
        if name == Name('no-soa.example'):
197
            return (ZoneFinder.NXDOMAIN, None, 0)
198
199
200
201
        if name == Name('dup-soa.example'):
            dup_soa_rrset = RRset(name, TEST_RRCLASS, RRType.SOA(), RRTTL(0))
            dup_soa_rrset.add_rdata(begin_soa_rdata)
            dup_soa_rrset.add_rdata(soa_rdata)
202
            return (ZoneFinder.SUCCESS, dup_soa_rrset, 0)
203
204
        raise ValueError('Unexpected input to mock finder: bug in test case?')

205
    def get_updater(self, zone_name, replace, journaling=False):
206
        self._journaling_enabled = journaling
207
208
209
        return self

    def add_rrset(self, rrset):
210
        self.diffs.append(('add', rrset))
211

212
    def delete_rrset(self, rrset):
213
        self.diffs.append(('delete', rrset))
214
215

    def commit(self):
216
217
        if self.force_fail:
            raise isc.datasrc.Error('Updater.commit() failed')
218
219
        self.committed_diffs.append(self.diffs)
        self.diffs = []
220

221
class MockXfrin(Xfrin):
222
223
224
225
226
227
228
    # This is a class attribute of a callable object that specifies a non
    # default behavior triggered in _cc_check_command().  Specific test methods
    # are expected to explicitly set this attribute before creating a
    # MockXfrin object (when it needs a non default behavior).
    # See the TestMain class.
    check_command_hook = None

229
    def _cc_setup(self):
Jelte Jansen's avatar
Jelte Jansen committed
230
        self._tsig_key = None
231
        self._module_cc = MockCC()
Likun Zhang's avatar
Likun Zhang committed
232
        pass
233
234
235

    def _get_db_file(self):
        pass
chenzhengzhang's avatar
chenzhengzhang committed
236

237
238
239
240
    def _cc_check_command(self):
        self._shutdown_event.set()
        if MockXfrin.check_command_hook:
            MockXfrin.check_command_hook()
Likun Zhang's avatar
Likun Zhang committed
241

242
    def xfrin_start(self, zone_name, rrclass, db_file, master_addrinfo,
243
                    tsig_key, request_type, check_soa=True):
244
245
246
247
        # store some of the arguments for verification, then call this
        # method in the superclass
        self.xfrin_started_master_addr = master_addrinfo[2][0]
        self.xfrin_started_master_port = master_addrinfo[2][1]
248
        self.xfrin_started_request_type = request_type
249
        return Xfrin.xfrin_start(self, zone_name, rrclass, None,
Jelte Jansen's avatar
Jelte Jansen committed
250
                                 master_addrinfo, tsig_key,
251
                                 request_type, check_soa)
252

253
class MockXfrinConnection(XfrinConnection):
254
255
    def __init__(self, sock_map, zone_name, rrclass, datasrc_client,
                 shutdown_event, master_addr, tsig_key=None):
256
        super().__init__(sock_map, zone_name, rrclass, MockDataSourceClient(),
257
                         shutdown_event, master_addr, TEST_DB_FILE)
258
259
260
261
        self.query_data = b''
        self.reply_data = b''
        self.force_time_out = False
        self.force_close = False
262
        self.qlen = None
263
264
        self.qid = None
        self.response_generator = None
265

266
267
268
269
270
271
272
273
    def _asyncore_loop(self):
        if self.force_close:
            self.handle_close()
        elif not self.force_time_out:
            self.handle_read()

    def connect_to_master(self):
        return True
274

275
276
    def recv(self, size):
        data = self.reply_data[:size]
277
        self.reply_data = self.reply_data[size:]
278
279
        if len(data) == 0:
            raise XfrinTestTimeoutException('Emulated timeout')
280
        if len(data) < size:
281
282
            raise XfrinTestException('cannot get reply data (' + str(size) +
                                     ' bytes)')
283
        return data
284
285

    def send(self, data):
286
287
288
289
290
        if self.qlen != None and len(self.query_data) >= self.qlen:
            # This is a new query.  reset the internal state.
            self.qlen = None
            self.qid = None
            self.query_data = b''
291
        self.query_data += data
292
293
294
295
296
297

        # when the outgoing data is sufficiently large to contain the length
        # and the QID fields (4 octets or more), extract these fields.
        # The length will be reset the internal query data to support multiple
        # queries in a single test.
        # The QID will be used to construct a matching response.
298
        if len(self.query_data) >= 4 and self.qid == None:
299
300
            self.qlen = socket.htons(struct.unpack('H',
                                                   self.query_data[0:2])[0])
301
302
303
304
            self.qid = socket.htons(struct.unpack('H', self.query_data[2:4])[0])
            # if the response generator method is specified, invoke it now.
            if self.response_generator != None:
                self.response_generator()
305
306
        return len(data)

307
    def create_response_data(self, response=True, auth=True, bad_qid=False,
308
309
310
                             rcode=Rcode.NOERROR(),
                             questions=default_questions,
                             answers=default_answers,
311
                             authorities=[],
312
                             tsig_ctx=None):
313
        resp = Message(Message.RENDER)
314
315
316
317
        qid = self.qid
        if bad_qid:
            qid += 1
        resp.set_qid(qid)
318
        resp.set_opcode(Opcode.QUERY())
319
320
        resp.set_rcode(rcode)
        if response:
321
            resp.set_header_flag(Message.HEADERFLAG_QR)
322
323
        if auth:
            resp.set_header_flag(Message.HEADERFLAG_AA)
324
        [resp.add_question(q) for q in questions]
325
        [resp.add_rrset(Message.SECTION_ANSWER, a) for a in answers]
326
        [resp.add_rrset(Message.SECTION_AUTHORITY, a) for a in authorities]
327

328
        renderer = MessageRenderer()
329
        if tsig_ctx is not None:
330
331
332
            resp.to_wire(renderer, tsig_ctx)
        else:
            resp.to_wire(renderer)
333
334
        reply_data = struct.pack('H', socket.htons(renderer.get_length()))
        reply_data += renderer.get_data()
335

336
        return reply_data
337

338
339
340
class TestXfrinState(unittest.TestCase):
    def setUp(self):
        self.sock_map = {}
341
        self.conn = MockXfrinConnection(self.sock_map, TEST_ZONE_NAME,
342
                                        TEST_RRCLASS, None, threading.Event(),
343
                                        TEST_MASTER_IPV4_ADDRINFO)
344
        self.conn.init_socket()
345
346
347
348
        self.begin_soa = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(),
                               RRTTL(3600))
        self.begin_soa.add_rdata(Rdata(RRType.SOA(), TEST_RRCLASS,
                                       'm. r. 1230 0 0 0 0'))
349
350
351
352
        self.ns_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.NS(),
                              RRTTL(3600))
        self.ns_rrset.add_rdata(Rdata(RRType.NS(), TEST_RRCLASS,
                                      'ns.example.com'))
353
354
355
356
        self.a_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.A(),
                             RRTTL(3600))
        self.a_rrset.add_rdata(Rdata(RRType.A(), TEST_RRCLASS, '192.0.2.1'))

357
        self.conn._datasrc_client = MockDataSourceClient()
358
        self.conn._diff = Diff(self.conn._datasrc_client, TEST_ZONE_NAME)
359

360
361
362
363
364
365
366
367
368
class TestXfrinStateBase(TestXfrinState):
    def setUp(self):
        super().setUp()

    def test_handle_rr_on_base(self):
        # The base version of handle_rr() isn't supposed to be called
        # directly (the argument doesn't matter in this test)
        self.assertRaises(XfrinException, XfrinState().handle_rr, None)

369
370
371
372
373
374
375
class TestXfrinInitialSOA(TestXfrinState):
    def setUp(self):
        super().setUp()
        self.state = XfrinInitialSOA()

    def test_handle_rr(self):
        # normal case
376
        self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
377
378
        self.assertEqual(type(XfrinFirstData()),
                         type(self.conn.get_xfrstate()))
379
        self.assertEqual(1234, self.conn._end_serial.get_value())
380
381
382
383
384
385

    def test_handle_not_soa(self):
        # The given RR is not of SOA
        self.assertRaises(XfrinProtocolError, self.state.handle_rr, self.conn,
                          self.ns_rrset)

386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
    def test_handle_ixfr_uptodate(self):
        self.conn._request_type = RRType.IXFR()
        self.conn._request_serial = isc.dns.Serial(1234) # same as soa_rrset
        self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
        self.assertEqual(type(XfrinIXFRUptodate()),
                         type(self.conn.get_xfrstate()))

    def test_handle_ixfr_uptodate2(self):
        self.conn._request_type = RRType.IXFR()
        self.conn._request_serial = isc.dns.Serial(1235) # > soa_rrset
        self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
        self.assertEqual(type(XfrinIXFRUptodate()),
                         type(self.conn.get_xfrstate()))

    def test_handle_ixfr_uptodate3(self):
        # Similar to the previous case, but checking serial number arithmetic
        # comparison
        self.conn._request_type = RRType.IXFR()
        self.conn._request_serial = isc.dns.Serial(0xffffffff)
        self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
        self.assertEqual(type(XfrinFirstData()),
                         type(self.conn.get_xfrstate()))

    def test_handle_axfr_uptodate(self):
        # "request serial" should matter only for IXFR
        self.conn._request_type = RRType.AXFR()
        self.conn._request_serial = isc.dns.Serial(1234) # same as soa_rrset
        self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
        self.assertEqual(type(XfrinFirstData()),
                         type(self.conn.get_xfrstate()))

417
418
419
    def test_finish_message(self):
        self.assertTrue(self.state.finish_message(self.conn))

420
421
422
423
424
class TestXfrinFirstData(TestXfrinState):
    def setUp(self):
        super().setUp()
        self.state = XfrinFirstData()
        self.conn._request_type = RRType.IXFR()
425
426
        # arbitrary chosen serial < 1234:
        self.conn._request_serial = isc.dns.Serial(1230)
427
        self.conn._diff = None           # should be replaced in the AXFR case
428
429
430

    def test_handle_ixfr_begin_soa(self):
        self.conn._request_type = RRType.IXFR()
431
        self.assertFalse(self.state.handle_rr(self.conn, self.begin_soa))
432
433
434
435
436
437
438
        self.assertEqual(type(XfrinIXFRDeleteSOA()),
                         type(self.conn.get_xfrstate()))

    def test_handle_axfr(self):
        # If the original type is AXFR, other conditions aren't considered,
        # and AXFR processing will continue
        self.conn._request_type = RRType.AXFR()
439
        self.assertFalse(self.state.handle_rr(self.conn, self.begin_soa))
440
441
442
443
444
445
446
        self.assertEqual(type(XfrinAXFR()), type(self.conn.get_xfrstate()))

    def test_handle_ixfr_to_axfr(self):
        # Detecting AXFR-compatible IXFR response by seeing a non SOA RR after
        # the initial SOA.  Should switch to AXFR.
        self.assertFalse(self.state.handle_rr(self.conn, self.ns_rrset))
        self.assertEqual(type(XfrinAXFR()), type(self.conn.get_xfrstate()))
447
448
        # The Diff for AXFR should be created at this point
        self.assertNotEqual(None, self.conn._diff)
449
450

    def test_handle_ixfr_to_axfr_by_different_soa(self):
451
452
453
        # An unusual case: Response contains two consecutive SOA but the
        # serial of the second does not match the requested one.  See
        # the documentation for XfrinFirstData.handle_rr().
454
455
        self.assertFalse(self.state.handle_rr(self.conn, soa_rrset))
        self.assertEqual(type(XfrinAXFR()), type(self.conn.get_xfrstate()))
456
        self.assertNotEqual(None, self.conn._diff)
457

458
459
460
    def test_finish_message(self):
        self.assertTrue(self.state.finish_message(self.conn))

461
class TestXfrinIXFRDeleteSOA(TestXfrinState):
462
463
464
    def setUp(self):
        super().setUp()
        self.state = XfrinIXFRDeleteSOA()
465
466
467
        # In this state a new Diff object is expected to be created.  To
        # confirm it, we nullify it beforehand.
        self.conn._diff = None
468
469
470
471
472

    def test_handle_rr(self):
        self.assertTrue(self.state.handle_rr(self.conn, self.begin_soa))
        self.assertEqual(type(XfrinIXFRDelete()),
                         type(self.conn.get_xfrstate()))
473
        self.assertEqual([('delete', self.begin_soa)],
474
475
476
477
478
479
                         self.conn._diff.get_buffer())

    def test_handle_non_soa(self):
        self.assertRaises(XfrinException, self.state.handle_rr, self.conn,
                          self.ns_rrset)

480
481
482
    def test_finish_message(self):
        self.assertTrue(self.state.finish_message(self.conn))

483
class TestXfrinIXFRDelete(TestXfrinState):
484
485
    def setUp(self):
        super().setUp()
486
487
        # We need record the state in 'conn' to check the case where the
        # state doesn't change.
488
489
490
491
        XfrinIXFRDelete().set_xfrstate(self.conn, XfrinIXFRDelete())
        self.state = self.conn.get_xfrstate()

    def test_handle_delete_rr(self):
492
        # Non SOA RRs are simply (goting to be) deleted in this state
493
        self.assertTrue(self.state.handle_rr(self.conn, self.ns_rrset))
494
        self.assertEqual([('delete', self.ns_rrset)],
495
496
497
498
499
500
501
502
503
504
505
                         self.conn._diff.get_buffer())
        # The state shouldn't change
        self.assertEqual(type(XfrinIXFRDelete()),
                         type(self.conn.get_xfrstate()))

    def test_handle_soa(self):
        # SOA in this state means the beginning of added RRs.  This SOA
        # should also be added in the next state, so handle_rr() should return
        # false.
        self.assertFalse(self.state.handle_rr(self.conn, soa_rrset))
        self.assertEqual([], self.conn._diff.get_buffer())
506
        self.assertEqual(1234, self.conn._current_serial.get_value())
507
        self.assertEqual(type(XfrinIXFRAddSOA()),
508
509
                         type(self.conn.get_xfrstate()))

510
511
512
    def test_finish_message(self):
        self.assertTrue(self.state.finish_message(self.conn))

513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
class TestXfrinIXFRAddSOA(TestXfrinState):
    def setUp(self):
        super().setUp()
        self.state = XfrinIXFRAddSOA()

    def test_handle_rr(self):
        self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
        self.assertEqual(type(XfrinIXFRAdd()), type(self.conn.get_xfrstate()))
        self.assertEqual([('add', soa_rrset)],
                         self.conn._diff.get_buffer())

    def test_handle_non_soa(self):
        self.assertRaises(XfrinException, self.state.handle_rr, self.conn,
                          self.ns_rrset)

528
529
530
    def test_finish_message(self):
        self.assertTrue(self.state.finish_message(self.conn))

531
532
533
534
535
536
class TestXfrinIXFRAdd(TestXfrinState):
    def setUp(self):
        super().setUp()
        # We need record the state in 'conn' to check the case where the
        # state doesn't change.
        XfrinIXFRAdd().set_xfrstate(self.conn, XfrinIXFRAdd())
537
        self.conn._current_serial = isc.dns.Serial(1230)
538
539
540
541
542
543
544
545
546
547
548
        self.state = self.conn.get_xfrstate()

    def test_handle_add_rr(self):
        # Non SOA RRs are simply (goting to be) added in this state
        self.assertTrue(self.state.handle_rr(self.conn, self.ns_rrset))
        self.assertEqual([('add', self.ns_rrset)],
                         self.conn._diff.get_buffer())
        # The state shouldn't change
        self.assertEqual(type(XfrinIXFRAdd()), type(self.conn.get_xfrstate()))

    def test_handle_end_soa(self):
549
        self.conn._end_serial = isc.dns.Serial(1234)
550
551
552
553
554
555
556
557
        self.conn._diff.add_data(self.ns_rrset) # put some dummy change
        self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
        self.assertEqual(type(XfrinIXFREnd()), type(self.conn.get_xfrstate()))
        # handle_rr should have caused commit, and the buffer should now be
        # empty.
        self.assertEqual([], self.conn._diff.get_buffer())

    def test_handle_new_delete(self):
558
        self.conn._end_serial = isc.dns.Serial(1234)
559
560
561
562
563
564
565
566
        # SOA RR whose serial is the current one means we are going to a new
        # difference, starting with removing that SOA.
        self.conn._diff.add_data(self.ns_rrset) # put some dummy change
        self.assertFalse(self.state.handle_rr(self.conn, self.begin_soa))
        self.assertEqual([], self.conn._diff.get_buffer())
        self.assertEqual(type(XfrinIXFRDeleteSOA()),
                         type(self.conn.get_xfrstate()))

567
568
569
570
571
572
573
574
575
576
577
578
    def test_handle_new_delete_missing_sig(self):
        self.conn._end_serial = isc.dns.Serial(1234)
        # SOA RR whose serial is the current one means we are going to a new
        # difference, starting with removing that SOA.
        self.conn._diff.add_data(self.ns_rrset) # put some dummy change
        self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
        self.conn._tsig_ctx.last_has_signature = lambda: False
        # This would try to finish up. But the TSIG pretends not everything is
        # signed, rejecting it.
        self.assertRaises(xfrin.XfrinProtocolError, self.state.handle_rr,
                          self.conn, self.begin_soa)

579
580
    def test_handle_out_of_sync(self):
        # getting SOA with an inconsistent serial.  This is an error.
581
        self.conn._end_serial = isc.dns.Serial(1235)
582
583
584
        self.assertRaises(XfrinProtocolError, self.state.handle_rr,
                          self.conn, soa_rrset)

585
586
587
    def test_finish_message(self):
        self.assertTrue(self.state.finish_message(self.conn))

588
589
590
591
592
593
594
595
596
class TestXfrinIXFREnd(TestXfrinState):
    def setUp(self):
        super().setUp()
        self.state = XfrinIXFREnd()

    def test_handle_rr(self):
        self.assertRaises(XfrinProtocolError, self.state.handle_rr, self.conn,
                          self.ns_rrset)

597
598
599
    def test_finish_message(self):
        self.assertFalse(self.state.finish_message(self.conn))

600
class TestXfrinIXFREndUpToDate(TestXfrinState):
601
602
603
604
605
606
607
608
609
610
611
612
    def setUp(self):
        super().setUp()
        self.state = XfrinIXFRUptodate()

    def test_handle_rr(self):
        self.assertRaises(XfrinProtocolError, self.state.handle_rr, self.conn,
                          self.ns_rrset)

    def test_finish_message(self):
        self.assertRaises(XfrinZoneUptodate, self.state.finish_message,
                          self.conn)

613
614
615
616
class TestXfrinAXFR(TestXfrinState):
    def setUp(self):
        super().setUp()
        self.state = XfrinAXFR()
617
        self.conn._end_serial = isc.dns.Serial(1234)
618
619

    def test_handle_rr(self):
620
621
622
623
        """
        Test we can put data inside.
        """
        # Put some data inside
624
        self.assertTrue(self.state.handle_rr(self.conn, self.a_rrset))
625
626
627
        # This test uses internal Diff structure to check the behaviour of
        # XfrinAXFR. Maybe there could be a cleaner way, but it would be more
        # complicated.
628
629
        self.assertEqual([('add', self.a_rrset)], self.conn._diff.get_buffer())
        # This SOA terminates the transfer
630
631
632
        self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
        # It should have changed the state
        self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))
633
634
635
        # At this point, the data haven't been committed yet
        self.assertEqual([('add', self.a_rrset), ('add', soa_rrset)],
                         self.conn._diff.get_buffer())
636

637
638
639
640
641
642
643
    def test_handle_rr_mismatch_soa(self):
        """ SOA with inconsistent serial - unexpected, but we accept it.

        """
        self.assertTrue(self.state.handle_rr(self.conn, begin_soa_rrset))
        self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))

644
    def test_finish_message(self):
645
646
647
648
        """
        Check normal end of message.
        """
        # When a message ends, nothing happens usually
649
650
        self.assertTrue(self.state.finish_message(self.conn))

651
652
653
654
655
656
657
658
659
660
class TestXfrinAXFREnd(TestXfrinState):
    def setUp(self):
        super().setUp()
        self.state = XfrinAXFREnd()

    def test_handle_rr(self):
        self.assertRaises(XfrinProtocolError, self.state.handle_rr, self.conn,
                          self.ns_rrset)

    def test_finish_message(self):
661
662
        self.conn._diff.add_data(self.a_rrset)
        self.conn._diff.add_data(soa_rrset)
663
664
        self.assertFalse(self.state.finish_message(self.conn))

665
666
667
668
669
670
671
        # The data should have been committed
        self.assertEqual([], self.conn._diff.get_buffer())
        check_diffs(self.assertEqual, [[('add', self.a_rrset),
                                        ('add', soa_rrset)]],
                    self.conn._datasrc_client.committed_diffs)
        self.assertRaises(ValueError, self.conn._diff.commit)

672
class TestXfrinConnection(unittest.TestCase):
673
674
675
676
677
678
679
    '''Convenient parent class for XFR-protocol tests.

    This class provides common setups and helper methods for protocol related
    tests on AXFR and IXFR.

    '''

680
    def setUp(self):
681
682
        if os.path.exists(TEST_DB_FILE):
            os.remove(TEST_DB_FILE)
683
        self.sock_map = {}
684
        self.conn = MockXfrinConnection(self.sock_map, TEST_ZONE_NAME,
685
                                        TEST_RRCLASS, None, threading.Event(),
686
                                        TEST_MASTER_IPV4_ADDRINFO)
687
        self.conn.init_socket()
688
689
690
691
        self.soa_response_params = {
            'questions': [example_soa_question],
            'bad_qid': False,
            'response': True,
692
            'auth': True,
693
            'rcode': Rcode.NOERROR(),
694
            'answers': default_answers,
695
            'authorities': [],
696
            'tsig': False,
697
698
            'axfr_after_soa': self._create_normal_response_data
            }
699
        self.axfr_response_params = {
700
701
            'question_1st': default_questions,
            'question_2nd': default_questions,
702
703
            'answer_1st': [soa_rrset, self._create_ns()],
            'answer_2nd': default_answers,
704
705
            'tsig_1st': None,
            'tsig_2nd': None
706
            }
707
708
709
710
711
712

    def tearDown(self):
        self.conn.close()
        if os.path.exists(TEST_DB_FILE):
            os.remove(TEST_DB_FILE)

713
714
    def _create_normal_response_data(self):
        # This helper method creates a simple sequence of DNS messages that
715
716
        # forms a valid AXFR transaction.  It consists of two messages: the
        # first one containing SOA, NS, the second containing the trailing SOA.
717
718
        question_1st = self.axfr_response_params['question_1st']
        question_2nd = self.axfr_response_params['question_2nd']
719
720
        answer_1st = self.axfr_response_params['answer_1st']
        answer_2nd = self.axfr_response_params['answer_2nd']
721
722
        tsig_1st = self.axfr_response_params['tsig_1st']
        tsig_2nd = self.axfr_response_params['tsig_2nd']
723
        self.conn.reply_data = self.conn.create_response_data(
724
            questions=question_1st, answers=answer_1st,
725
            tsig_ctx=tsig_1st)
726
        self.conn.reply_data += \
727
            self.conn.create_response_data(questions=question_2nd,
728
                                           answers=answer_2nd,
729
                                           tsig_ctx=tsig_2nd)
730
731
732
733
734
735
736
737
738
739
740
741

    def _create_soa_response_data(self):
        # This helper method creates a DNS message that is supposed to be
        # used a valid response to SOA queries prior to XFR.
        # If tsig is True, it tries to verify the query with a locally
        # created TSIG context (which may or may not succeed) so that the
        # response will include a TSIG.
        # If axfr_after_soa is True, it resets the response_generator so that
        # a valid XFR messages will follow.

        verify_ctx = None
        if self.soa_response_params['tsig']:
Jelte Jansen's avatar
Jelte Jansen committed
742
            # xfrin (currently) always uses TCP.  strip off the length field.
743
744
745
746
747
748
749
750
751
            query_data = self.conn.query_data[2:]
            query_message = Message(Message.PARSE)
            query_message.from_wire(query_data)
            verify_ctx = TSIGContext(TSIG_KEY)
            verify_ctx.verify(query_message.get_tsig_record(), query_data)

        self.conn.reply_data = self.conn.create_response_data(
            bad_qid=self.soa_response_params['bad_qid'],
            response=self.soa_response_params['response'],
752
            auth=self.soa_response_params['auth'],
753
754
            rcode=self.soa_response_params['rcode'],
            questions=self.soa_response_params['questions'],
755
756
            answers=self.soa_response_params['answers'],
            authorities=self.soa_response_params['authorities'],
757
758
759
760
761
762
763
764
765
766
767
768
            tsig_ctx=verify_ctx)
        if self.soa_response_params['axfr_after_soa'] != None:
            self.conn.response_generator = \
                self.soa_response_params['axfr_after_soa']

    def _create_broken_response_data(self):
        # This helper method creates a bogus "DNS message" that only contains
        # 4 octets of data.  The DNS message parser will raise an exception.
        bogus_data = b'xxxx'
        self.conn.reply_data = struct.pack('H', socket.htons(len(bogus_data)))
        self.conn.reply_data += bogus_data

769
770
771
772
773
774
775
776
777
778
779
780
781
    def _create_a(self, address):
        rrset = RRset(Name('a.example.com'), TEST_RRCLASS, RRType.A(),
                      RRTTL(3600))
        rrset.add_rdata(Rdata(RRType.A(), TEST_RRCLASS, address))
        return rrset

    def _create_soa(self, serial):
        rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(),
                      RRTTL(3600))
        rdata_str = 'm. r. ' + serial + ' 3600 1800 2419200 7200'
        rrset.add_rdata(Rdata(RRType.SOA(), TEST_RRCLASS, rdata_str))
        return rrset

JINMEI Tatuya's avatar
JINMEI Tatuya committed
782
783
784
785
786
    def _create_ns(self, nsname='ns.'+TEST_ZONE_NAME_STR):
        rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.NS(), RRTTL(3600))
        rrset.add_rdata(Rdata(RRType.NS(), TEST_RRCLASS, nsname))
        return rrset

787
788
789
790
791
792
793
794
795
    def _set_test_zone(self, zone_name):
        '''Set the zone name for transfer to the specified one.

        It also make sure that the SOA RR (if exist) is correctly (re)set.

        '''
        self.conn._zone_name = zone_name
        self.conn._zone_soa = self.conn._get_zone_soa()

796
797
class TestAXFR(TestXfrinConnection):
    def setUp(self):
798
799
800
        # replace time.time with a steadily increasing fake one
        self.orig_time_time = time.time
        time.time = get_fake_time_time()
801
        super().setUp()
802
        XfrinInitialSOA().set_xfrstate(self.conn, XfrinInitialSOA())
803

804
805
806
    def tearDown(self):
        time.time = self.orig_time_time

807
    def __create_mock_tsig(self, key, error, has_last_signature=True):
808
809
810
811
812
        # This helper function creates a MockTSIGContext for a given key
        # and TSIG error to be used as a result of verify (normally faked
        # one)
        mock_ctx = MockTSIGContext(key)
        mock_ctx.error = error
813
814
        if not has_last_signature:
            mock_ctx.last_has_signature = lambda: False
815
816
        return mock_ctx

chenzhengzhang's avatar
chenzhengzhang committed
817
    def __match_exception(self, expected_exception, expected_msg, expression):
818
819
820
821
        # This helper method is a higher-granularity version of assertRaises().
        # If it's not sufficient to check the exception class (e.g., when
        # the same type of exceptions can be thrown from many places), this
        # method can be used to check it with the exception argument.
chenzhengzhang's avatar
chenzhengzhang committed
822
823
824
825
826
827
828
        try:
            expression()
        except expected_exception as ex:
            self.assertEqual(str(ex), expected_msg)
        else:
            self.assertFalse('exception is expected, but not raised')

829
830
831
832
833
834
835
836
837
    def test_close(self):
        # we shouldn't be using the global asyncore map.
        self.assertEqual(len(asyncore.socket_map), 0)
        # there should be exactly one entry in our local map
        self.assertEqual(len(self.sock_map), 1)
        # once closing the dispatch the map should become empty
        self.conn.close()
        self.assertEqual(len(self.sock_map), 0)

838
839
840
841
842
843
    def test_init_ip6(self):
        # This test simply creates a new XfrinConnection object with an
        # IPv6 address, tries to bind it to an IPv6 wildcard address/port
        # to confirm an AF_INET6 socket has been created.  A naive application
        # tends to assume it's IPv4 only and hardcode AF_INET.  This test
        # uncovers such a bug.
844
        c = MockXfrinConnection({}, TEST_ZONE_NAME, TEST_RRCLASS, None,
845
                                threading.Event(), TEST_MASTER_IPV6_ADDRINFO)
846
        c.init_socket()
847
848
849
850
        c.bind(('::', 0))
        c.close()

    def test_init_chclass(self):
851
        c = MockXfrinConnection({}, TEST_ZONE_NAME, RRClass.CH(), None,
852
                                threading.Event(), TEST_MASTER_IPV4_ADDRINFO)
853
        c.init_socket()
854
855
856
        axfrmsg = c._create_query(RRType.AXFR())
        self.assertEqual(axfrmsg.get_question()[0].get_class(),
                         RRClass.CH())
857
        c.close()
858

859
    def test_create_query(self):
860
        def check_query(expected_qtype, expected_auth):
861
862
863
864
865
866
867
868
            '''Helper method to repeat the same pattern of tests'''
            self.assertEqual(Opcode.QUERY(), msg.get_opcode())
            self.assertEqual(Rcode.NOERROR(), msg.get_rcode())
            self.assertEqual(1, msg.get_rr_count(Message.SECTION_QUESTION))
            self.assertEqual(TEST_ZONE_NAME, msg.get_question()[0].get_name())
            self.assertEqual(expected_qtype, msg.get_question()[0].get_type())
            self.assertEqual(0, msg.get_rr_count(Message.SECTION_ANSWER))
            self.assertEqual(0, msg.get_rr_count(Message.SECTION_ADDITIONAL))
869
            if expected_auth is None:
870
871
872
873
874
                self.assertEqual(0,
                                 msg.get_rr_count(Message.SECTION_AUTHORITY))
            else:
                self.assertEqual(1,
                                 msg.get_rr_count(Message.SECTION_AUTHORITY))
875
876
877
878
879
880
881
882
883
884
                auth_rr = msg.get_section(Message.SECTION_AUTHORITY)[0]
                self.assertEqual(expected_auth.get_name(), auth_rr.get_name())
                self.assertEqual(expected_auth.get_type(), auth_rr.get_type())
                self.assertEqual(expected_auth.get_class(),
                                 auth_rr.get_class())
                # In our test scenario RDATA must be 1
                self.assertEqual(1, expected_auth.get_rdata_count())
                self.assertEqual(1, auth_rr.get_rdata_count())
                self.assertEqual(expected_auth.get_rdata()[0],
                                 auth_rr.get_rdata()[0])
885
886
887
888
889
890
891
892
893

        # Actual tests start here
        # SOA query
        msg = self.conn._create_query(RRType.SOA())
        check_query(RRType.SOA(), None)

        # AXFR query
        msg = self.conn._create_query(RRType.AXFR())
        check_query(RRType.AXFR(), None)
894

895
896
897
        # IXFR query
        msg = self.conn._create_query(RRType.IXFR())
        check_query(RRType.IXFR(), begin_soa_rrset)
898
        self.assertEqual(1230, self.conn._request_serial.get_value())
899
900
901
902
903

    def test_create_ixfr_query_fail(self):
        # In these cases _create_query() will fail to find a valid SOA RR to
        # insert in the IXFR query, and should raise an exception.

904
        self._set_test_zone(Name('no-such-zone.example'))
905
906
907
        self.assertRaises(XfrinException, self.conn._create_query,
                          RRType.IXFR())

908
        self._set_test_zone(Name('partial-match-zone.example'))
909
910
911
        self.assertRaises(XfrinException, self.conn._create_query,
                          RRType.IXFR())

912
        self._set_test_zone(Name('no-soa.example'))
913
914
915
        self.assertRaises(XfrinException, self.conn._create_query,
                          RRType.IXFR())

916
917
        self._set_test_zone(Name('dup-soa.example'))
        self.conn._zone_soa = self.conn._get_zone_soa()
918
919
        self.assertRaises(XfrinException, self.conn._create_query,
                          RRType.IXFR())
920

921
    def test_send_query(self):
922
923
924
925
926
927
928
929
        def message_has_tsig(data):
            # a simple check if the actual data contains a TSIG RR.
            # At our level this simple check should suffice; other detailed
            # tests regarding the TSIG protocol are done in pydnspp.
            msg = Message(Message.PARSE)
            msg.from_wire(data)
            return msg.get_tsig_record() is not None

930
        # soa request with tsig
931
        self.conn._tsig_key = TSIG_KEY
932
        self.conn._send_query(RRType.SOA())
933
        self.assertTrue(message_has_tsig(self.conn.query_data[2:]))
934
935
936

        # axfr request with tsig
        self.conn._send_query(RRType.AXFR())
937
        self.assertTrue(message_has_tsig(self.conn.query_data[2:]))
938

939
    def test_response_with_invalid_msg(self):
940
        self.conn.reply_data = b'aaaxxxx'
941
942
        self.assertRaises(XfrinTestException,
                          self.conn._handle_xfrin_responses)
943

944
    def test_response_with_tsigfail(self):
945
        self.conn._tsig_key = TSIG_KEY
946
947
        # server tsig check fail, return with RCODE 9 (NOTAUTH)
        self.conn._send_query(RRType.SOA())
948
949
950
951
        self.conn.reply_data = \
            self.conn.create_response_data(rcode=Rcode.NOTAUTH())
        self.assertRaises(XfrinProtocolError,
                          self.conn._handle_xfrin_responses)
952

953
    def test_response_without_end_soa(self):
954
        self.conn._send_query(RRType.AXFR())
955
        self.conn.reply_data = self.conn.create_response_data()
956
957
958
959
        # This should result in timeout in the asyncore loop.  We emulate
        # that situation in recv() by emptying the reply data buffer.
        self.assertRaises(XfrinTestTimeoutException,
                          self.conn._handle_xfrin_responses)
960
961

    def test_response_bad_qid(self):
962
        self.conn._send_query(RRType.AXFR())
963
        self.conn.reply_data = self.conn.create_response_data(bad_qid=True)
964
965
        self.assertRaises(XfrinProtocolError,
                          self.conn._handle_xfrin_responses)
966

967
968
969
970
971
972
973
974
975
    def test_response_error_code_bad_sig(self):
        self.conn._tsig_key = TSIG_KEY
        self.conn._tsig_ctx_creator = \
            lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
        self.conn._send_query(RRType.AXFR())
        self.conn.reply_data = self.conn.create_response_data(
                rcode=Rcode.SERVFAIL())
        # xfrin should check TSIG before other part of incoming message
        # validate log message for XfrinException
976
        self.__match_exception(XfrinProtocolError,
chenzhengzhang's avatar
chenzhengzhang committed
977
                               "TSIG verify fail: BADSIG",
978
                               self.conn._handle_xfrin_responses)
979
980
981
982
983
984

    def test_response_bad_qid_bad_key(self):
        self.conn._tsig_key = TSIG_KEY
        self.conn._tsig_ctx_creator = \
            lambda key: self.__create_mock_tsig(key, TSIGError.BAD_KEY)
        self.conn._send_query(RRType.AXFR())
985
        self.conn.reply_data = self.conn.create_response_data(bad_qid=True)
986
987
        # xfrin should check TSIG before other part of incoming message
        # validate log message for XfrinException
988
        self.__match_exception(XfrinProtocolError,
chenzhengzhang's avatar
chenzhengzhang committed
989
                               "TSIG verify fail: BADKEY",
990
                               self.conn._handle_xfrin_responses)
991

992
    def test_response_non_response(self):
993
        self.conn._send_query(RRType.AXFR())
994
995
        self.conn.reply_data = self.conn.create_response_data(response=False)
        self.assertRaises(XfrinException, self.conn._handle_xfrin_responses)
996
997

    def test_response_error_code(self):
998
        self.conn._send_query(RRType.AXFR())
999
        self.conn.reply_data = self.conn.create_response_data(
1000
            rcode=Rcode.SERVFAIL())
1001
1002
        self.assertRaises(XfrinProtocolError,
                          self.conn._handle_xfrin_responses)
1003
1004

    def test_response_multi_question(self):
1005
        self.conn._send_query(RRType.AXFR())
1006
        self.conn.reply_data = self.conn.create_response_data(
1007
            questions=[example_axfr_question, example_axfr_question])
1008
1009
        self.assertRaises(XfrinProtocolError,
                          self.conn._handle_xfrin_responses)
1010
1011

    def test_response_non_response(self):
1012
        self.conn._send_query(RRType.AXFR())
1013
        self.conn.reply_data = self.conn.create_response_data(response = False)
1014
1015
        self.assertRaises(XfrinProtocolError,
                          self.conn._handle_xfrin_responses)
1016
1017
1018
1019
1020
1021
1022
1023
1024

    def test_soacheck(self):
        # we need to defer the creation until we know the QID, which is
        # determined in _check_soa_serial(), so we use response_generator.
        self.conn.response_generator = self._create_soa_response_data
        self.assertEqual(self.conn._check_soa_serial(), XFRIN_OK)

    def test_soacheck_with_bad_response(self):
        self.conn.response_generator = self._create_broken_response_data
1025
        self.assertRaises(MessageTooShort, self.conn._check_soa_serial)
1026
1027
1028
1029

    def test_soacheck_badqid(self):
        self.soa_response_params['bad_qid'] = True
        self.conn.response_generator = self._create_soa_response_data
1030
        self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
1031

1032
1033
1034
1035
1036
1037
1038
1039
    def test_soacheck_bad_qid_bad_sig(self):
        self.conn._tsig_key = TSIG_KEY
        self.conn._tsig_ctx_creator = \
            lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
        self.soa_response_params['bad_qid'] = True
        self.conn.response_generator = self._create_soa_response_data
        # xfrin should check TSIG before other part of incoming message
        # validate log message for XfrinException
1040
        self.__match_exception(XfrinProtocolError,
chenzhengzhang's avatar
chenzhengzhang committed
1041
1042
                               "TSIG verify fail: BADSIG",
                               self.conn._check_soa_serial)
1043

1044
1045
1046
    def test_soacheck_non_response(self):
        self.soa_response_params['response'] = False
        self.conn.response_generator = self._create_soa_response_data
1047
        self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)