xfrin.py.in 82.9 KB
Newer Older
Likun Zhang's avatar
Likun Zhang committed
1
2
#!@PYTHON@

3
# Copyright (C) 2009-2013  Internet Systems Consortium.
Likun Zhang's avatar
Likun Zhang committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

import sys; sys.path.append ('@@PYTHONPATH@@')
import os
import signal
import isc
import asyncore
import struct
import threading
import socket
import random
27
import time
28
from functools import reduce
29
from optparse import OptionParser, OptionValueError
Likun Zhang's avatar
Likun Zhang committed
30
from isc.config.ccsession import *
31
from isc.statistics import Counters
32
from isc.notify import notify_out
33
import isc.util.process
34
from isc.util.address_formatter import AddressFormatter
35
from isc.datasrc import DataSourceClient, ZoneFinder
Michal Vaner's avatar
Michal Vaner committed
36
import isc.net.parse
37
from isc.xfrin.diff import Diff
38
from isc.server_common.auth_command import auth_loadzone_command
39
from isc.server_common.tsig_keyring import init_keyring, get_keyring
40
from isc.log_messages.xfrin_messages import *
41
from isc.dns import *
42

43
isc.log.init("b10-xfrin", buffer=True)
44
logger = isc.log.Logger("xfrin")
45

46
47
48
49
50
# Pending system-wide debug level definitions, the ones we
# use here are hardcoded for now
DBG_PROCESS = logger.DBGLVL_TRACE_BASIC
DBG_COMMANDS = logger.DBGLVL_TRACE_DETAIL

51
isc.util.process.rename()
Michal Vaner's avatar
Michal Vaner committed
52

53
54
55
# If B10_FROM_BUILD or B10_FROM_SOURCE is set in the environment, we
# use data files from a directory relative to that, otherwise we use
# the ones installed on the system
56
57
58
59
SPECFILE_PATH = "@datadir@/@PACKAGE@"\
    .replace("${datarootdir}", "@datarootdir@")\
    .replace("${prefix}", "@prefix@")
AUTH_SPECFILE_PATH = SPECFILE_PATH
60
61
if "B10_FROM_SOURCE" in os.environ:
    SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrin"
62
63
if "B10_FROM_BUILD" in os.environ:
    AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
Likun Zhang's avatar
Likun Zhang committed
64
SPECFILE_LOCATION = SPECFILE_PATH + "/xfrin.spec"
65
AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + "/auth.spec"
Likun Zhang's avatar
Likun Zhang committed
66

67
AUTH_MODULE_NAME = 'Auth'
68
XFROUT_MODULE_NAME = 'Xfrout'
69
70

# Remote module and identifiers (according to their spec files)
71
ZONE_MANAGER_MODULE_NAME = 'Zonemgr'
72

73
REFRESH_FROM_ZONEMGR = 'refresh_from_zonemgr'
Jelte Jansen's avatar
Jelte Jansen committed
74

75
76
# Constants for debug levels.
DBG_XFRIN_TRACE = logger.DBGLVL_TRACE_BASIC
77

Jelte Jansen's avatar
Jelte Jansen committed
78
79
80
81
# These two default are currently hard-coded. For config this isn't
# necessary, but we need these defaults for optional command arguments
# (TODO: have similar support to get default values for command
# arguments as we do for config options)
82
DEFAULT_MASTER_PORT = 53
83
DEFAULT_ZONE_CLASS = RRClass.IN
Jelte Jansen's avatar
Jelte Jansen committed
84

Likun Zhang's avatar
Likun Zhang committed
85
__version__ = 'BIND10'
86
87
88
89

# Internal result codes of an xfr session
XFRIN_OK = 0                    # normal success
XFRIN_FAIL = 1                  # general failure (internal/external)
90

91
class XfrinException(Exception):
Likun Zhang's avatar
Likun Zhang committed
92
93
    pass

94
95
96
97
98
class XfrinProtocolError(Exception):
    '''An exception raised for errors encountered in xfrin protocol handling.
    '''
    pass

99
100
class XfrinZoneError(Exception):
    '''
101
    An exception raised when the received zone is broken enough to be unusable.
102
103
104
    '''
    pass

105
class XfrinZoneUptodate(Exception):
106
107
108
109
    '''
    Thrown when the zone is already up to date, so there's no need to download
    the zone. This is not really an error case (but it's still an exceptional
    condition and the control flow is different than usual).
110
111
112
    '''
    pass

Jelte Jansen's avatar
Jelte Jansen committed
113
class XfrinZoneInfoException(Exception):
114
    """This exception is raised if there is an error in the given
Jelte Jansen's avatar
Jelte Jansen committed
115
116
       configuration (part), or when a command does not have a required
       argument or has bad arguments, for instance when the zone's master
117
118
119
       address is not a valid IP address, when the zone does not
       have a name, or when multiple settings are given for the same
       zone."""
120
121
    pass

122
def _check_zone_name(zone_name_str):
123
124
    """Checks if the given zone name is a valid domain name, and returns
    it as a Name object. Raises an XfrinException if it is not."""
125
    try:
Jelte Jansen's avatar
Jelte Jansen committed
126
127
128
129
130
131
        # In the _zones dict, part of the key is the zone name,
        # but due to a limitation in the Name class, we
        # cannot directly use it as a dict key, and we use to_text()
        #
        # Downcase the name here for that reason.
        return Name(zone_name_str, True)
132
133
    except (EmptyLabel, TooLongLabel, BadLabelType, BadEscape,
            TooLongName, IncompleteName) as ne:
Jelte Jansen's avatar
Jelte Jansen committed
134
        raise XfrinZoneInfoException("bad zone name: " + zone_name_str + " (" + str(ne) + ")")
135
136
137
138

def _check_zone_class(zone_class_str):
    """If the given argument is a string: checks if the given class is
       a valid one, and returns an RRClass object if so.
Jelte Jansen's avatar
Jelte Jansen committed
139
       Raises XfrinZoneInfoException if not.
140
       If it is None, this function returns the default RRClass.IN"""
141
    if zone_class_str is None:
Jelte Jansen's avatar
Jelte Jansen committed
142
        return DEFAULT_ZONE_CLASS
143
144
145
    try:
        return RRClass(zone_class_str)
    except InvalidRRClass as irce:
Jelte Jansen's avatar
Jelte Jansen committed
146
        raise XfrinZoneInfoException("bad zone class: " + zone_class_str + " (" + str(irce) + ")")
147

Jelte Jansen's avatar
Jelte Jansen committed
148
149
150
151
152
153
154
def format_zone_str(zone_name, zone_class):
    """Helper function to format a zone name and class as a string of
       the form '<name>/<class>'.
       Parameters:
       zone_name (isc.dns.Name) name to format
       zone_class (isc.dns.RRClass) class to format
    """
155
    return zone_name.to_text(True) + '/' + str(zone_class)
Jelte Jansen's avatar
Jelte Jansen committed
156
157
158
159
160
161
162

def format_addrinfo(addrinfo):
    """Helper function to format the addrinfo as a string of the form
       <addr>:<port> (for IPv4) or [<addr>]:port (for IPv6). For unix domain
       sockets, and unknown address families, it returns a basic string
       conversion of the third element of the passed tuple.
       Parameters:
163
       addrinfo: a 3-tuple consisting of address family, socket type, and,
Jelte Jansen's avatar
Jelte Jansen committed
164
165
166
                 depending on the family, either a 2-tuple with the address
                 and port, or a filename
    """
167
168
169
170
171
172
173
174
    try:
        if addrinfo[0] == socket.AF_INET:
            return str(addrinfo[2][0]) + ":" + str(addrinfo[2][1])
        elif addrinfo[0] == socket.AF_INET6:
            return "[" + str(addrinfo[2][0]) + "]:" + str(addrinfo[2][1])
        else:
            return str(addrinfo[2])
    except IndexError:
175
        raise TypeError("addrinfo argument to format_addrinfo() does not "
176
                        "appear to be consisting of (family, socktype, (addr, port))")
Jelte Jansen's avatar
Jelte Jansen committed
177

178
def get_soa_serial(soa_rdata):
179
    '''Extract the serial field of SOA RDATA and return it as a Serial object.
180
181
182
183
184
185
186
187

    We don't have to be very efficient here, so we first dump the entire RDATA
    as a string and convert the first corresponding field.  This should be
    sufficient in practice, but may not always work when the MNAME or RNAME
    contains an (escaped) space character in their labels.  Ideally there
    should be a more direct and convenient way to get access to the SOA
    fields.
    '''
188
    return Serial(int(soa_rdata.to_text().split()[2]))
189
190
191

class XfrinState:
    '''
192
    The states of the incoming *XFR state machine.
193
194
195
196
197
198

    We (will) handle both IXFR and AXFR with a single integrated state
    machine because they cannot be distinguished immediately - an AXFR
    response to an IXFR request can only be detected when the first two (2)
    response RRs have already been received.

JINMEI Tatuya's avatar
JINMEI Tatuya committed
199
200
201
    The following diagram summarizes the state transition.  After sending
    the query, xfrin starts the process with the InitialSOA state (all
    IXFR/AXFR response begins with an SOA).  When it reaches IXFREnd
202
    or AXFREnd, the process successfully completes.
JINMEI Tatuya's avatar
JINMEI Tatuya committed
203

204
205
                             (AXFR or
            (recv SOA)        AXFR-style IXFR)  (SOA, add)
206
    InitialSOA------->FirstData------------->AXFR--------->AXFREnd
207
208
209
210
211
212
         |                |                  |  ^         (post xfr
         |(IXFR &&        |                  |  |        checks, then
         | recv SOA       |                  +--+        commit)
         | not new)       |            (non SOA, add)
         V                |
    IXFRUptodate          |                     (non SOA, delete)
JINMEI Tatuya's avatar
JINMEI Tatuya committed
213
214
215
216
217
218
219
220
221
222
223
224
225
226
               (pure IXFR,|                           +-------+
            keep handling)|             (Delete SOA)  V       |
                          + ->IXFRDeleteSOA------>IXFRDelete--+
                                   ^                   |
                (see SOA, not end, |          (see SOA)|
            commit, keep handling) |                   |
                                   |                   V
                      +---------IXFRAdd<----------+IXFRAddSOA
        (non SOA, add)|         ^  |    (Add SOA)
                      ----------+  |
                                   |(see SOA w/ end serial, commit changes)
                                   V
                                IXFREnd

227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
    Note that changes are committed for every "difference sequence"
    (i.e. changes for one SOA update).  This means when an IXFR response
    contains multiple difference sequences and something goes wrong
    after several commits, these changes have been published and visible
    to clients even if the IXFR session is subsequently aborted.
    It is not clear if this is valid in terms of the protocol specification.
    Section 4 of RFC 1995 states:

       An IXFR client, should only replace an older version with a newer
       version after all the differences have been successfully processed.

    If this "replacement" is for the changes of one difference sequence
    and "all the differences" mean the changes for that sequence, this
    implementation strictly follows what RFC states.  If this is for
    the entire IXFR response (that may contain multiple sequences),
    we should implement it with one big transaction and one final commit
    at the very end.

    For now, we implement it with multiple smaller commits for two
    reasons.  First, this is what BIND 9 does, and we generally port
    the implementation logic here.  BIND 9 has been supporting IXFR
    for many years, so the fact that it still behaves this way
    probably means it at least doesn't cause a severe operational
    problem in practice.  Second, especially because BIND 10 would
    often uses a database backend, a larger transaction could cause an
    undesirable effects, e.g. suspending normal lookups for a longer
    period depending on the characteristics of the database.  Even if
    we find something wrong in a later sequeunce and abort the
    session, we can start another incremental update from what has
    been validated, or we can switch to AXFR to replace the zone
    completely.

JINMEI Tatuya's avatar
JINMEI Tatuya committed
259
260
261
262
263
264
265
266
267
268
    This implementation uses the state design pattern, where each state
    is represented as a subclass of the base XfrinState class.  Each concrete
    subclass of XfrinState is assumed to define two methods: handle_rr() and
    finish_message().  These methods handle specific part of XFR protocols
    and (if necessary) perform the state transition.

    Conceptually, XfrinState and its subclasses are a "friend" of
    XfrinConnection and are assumed to be allowed to access its internal
    information (even though Python does not have a strict access control
    between different classes).
269
270
271
272
273

    The XfrinState and its subclasses are designed to be stateless, and
    can be used as singleton objects.  For now, however, we always instantiate
    a new object for every state transition, partly because the introduction
    of singleton will make a code bit complicated, and partly because
274
    the overhead of object instantiation wouldn't be significant for xfrin.
JINMEI Tatuya's avatar
JINMEI Tatuya committed
275

276
277
    '''
    def set_xfrstate(self, conn, new_state):
278
        '''Set the XfrConnection to a given new state.
279
280
281

        As a "friend" class, this method intentionally gets access to the
        connection's "private" method.
JINMEI Tatuya's avatar
JINMEI Tatuya committed
282

283
284
285
        '''
        conn._XfrinConnection__set_xfrstate(new_state)

JINMEI Tatuya's avatar
JINMEI Tatuya committed
286
287
288
289
290
291
292
293
294
    def handle_rr(self, conn):
        '''Handle one RR of an XFR response message.

        Depending on the state, the RR is generally added or deleted in the
        corresponding data source, or in some special cases indicates
        a specifi transition, such as starting a new IXFR difference
        sequence or completing the session.

        All subclass has their specific behaviors for this method, so
295
296
297
        there is no default definition.  If the base class version
        is called, it's a bug of the caller, and it's notified via
        an XfrinException exception.
298
299
300
301
302
303

        This method returns a boolean value: True if the given RR was
        fully handled and the caller should go to the next RR; False
        if the caller needs to call this method with the (possibly) new
        state for the same RR again.

JINMEI Tatuya's avatar
JINMEI Tatuya committed
304
        '''
305
306
        raise XfrinException("Internal bug: " +
                             "XfrinState.handle_rr() called directly")
JINMEI Tatuya's avatar
JINMEI Tatuya committed
307

308
309
310
311
312
313
314
315
316
317
318
    def finish_message(self, conn):
        '''Perform any final processing after handling all RRs of a response.

        This method then returns a boolean indicating whether to continue
        receiving the message.  Unless it's in the end of the entire XFR
        session, we should continue, so this default method simply returns
        True.

        '''
        return True

319
320
class XfrinInitialSOA(XfrinState):
    def handle_rr(self, conn, rr):
321
        if rr.get_type() != RRType.SOA:
322
            raise XfrinProtocolError('First RR in zone transfer must be SOA ('
323
                                     + rr.get_type().to_text() + ' received)')
324
325
        conn._end_serial = get_soa_serial(rr.get_rdata()[0])

326
        if conn._request_type == RRType.IXFR and \
327
328
329
330
331
332
                conn._end_serial <= conn._request_serial:
            logger.info(XFRIN_IXFR_UPTODATE, conn.zone_str(),
                        conn._request_serial, conn._end_serial)
            self.set_xfrstate(conn, XfrinIXFRUptodate())
        else:
            self.set_xfrstate(conn, XfrinFirstData())
333

334
        return True
335
336
337

class XfrinFirstData(XfrinState):
    def handle_rr(self, conn, rr):
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
        '''Handle the first RR after initial SOA in an XFR session.

        This state happens exactly once in an XFR session, where
        we decide whether it's incremental update ("real" IXFR) or
        non incremental update (AXFR or AXFR-style IXFR).
        If we initiated IXFR and the transfer begins with two SOAs
        (the serial of the second one being equal to our serial),
        it's incremental; otherwise it's non incremental.

        This method always return False (unlike many other handle_rr()
        methods) because this first RR must be examined again in the
        determined update context.

        Note that in the non incremental case the RR should normally be
        something other SOA, but it's still possible it's an SOA with a
        different serial than ours.  The only possible interpretation at
        this point is that it's non incremental update that only consists
        of the SOA RR.  It will result in broken zone (for example, it
        wouldn't even contain an apex NS) and should be rejected at post
        XFR processing, but in terms of the XFR session processing we
        accept it and move forward.

        Note further that, in the half-broken SOA-only transfer case,
        these two SOAs are supposed to be the same as stated in Section 2.2
        of RFC 5936.  We don't check that condition here, either; we'll
        leave whether and how to deal with that situation to the end of
        the processing of non incremental update.  See also a related
        discussion at the IETF dnsext wg:
        http://www.ietf.org/mail-archive/web/dnsext/current/msg07908.html

        '''
369
370
        if conn._request_type == RRType.IXFR and \
                rr.get_type() == RRType.SOA and \
371
372
373
                conn._request_serial == get_soa_serial(rr.get_rdata()[0]):
            logger.debug(DBG_XFRIN_TRACE, XFRIN_GOT_INCREMENTAL_RESP,
                         conn.zone_str())
374
            conn._diff = None # Will be created on-demand
375
376
377
            self.set_xfrstate(conn, XfrinIXFRDeleteSOA())
        else:
            logger.debug(DBG_XFRIN_TRACE, XFRIN_GOT_NONINCREMENTAL_RESP,
378
                 conn.zone_str())
379
            # We are now going to add RRs to the new zone.  We need create
380
381
            # a Diff object.  It will be used throughtout the XFR session.
            conn._diff = Diff(conn._datasrc_client, conn._zone_name, True)
382
            self.set_xfrstate(conn, XfrinAXFR())
383
        return False
384
385

class XfrinIXFRDeleteSOA(XfrinState):
386
    def handle_rr(self, conn, rr):
387
        if rr.get_type() != RRType.SOA:
388
            # this shouldn't happen; should this occur it means an internal
389
            # bug.
390
391
            raise XfrinException(rr.get_type().to_text() +
                                 ' RR is given in IXFRDeleteSOA state')
392
        # This is the beginning state of one difference sequence (changes
393
        # for one SOA update).  We may need to create a new Diff object now.
394
395
396
        # Note also that we (unconditionally) enable journaling here.  The
        # Diff constructor may internally disable it, however, if the
        # underlying data source doesn't support journaling.
397
398
399
        if conn._diff is None:
            conn._diff = Diff(conn._datasrc_client, conn._zone_name, False,
                              True)
400
        conn._diff.delete_data(rr)
401
        self.set_xfrstate(conn, XfrinIXFRDelete())
402
        conn.get_transfer_stats().ixfr_deletion_count += 1
403
        return True
404

405
406
class XfrinIXFRDelete(XfrinState):
    def handle_rr(self, conn, rr):
407
        if rr.get_type() == RRType.SOA:
408
            # This is the only place where current_serial is set
409
            conn._current_serial = get_soa_serial(rr.get_rdata()[0])
410
            self.set_xfrstate(conn, XfrinIXFRAddSOA())
411
            return False
412
        conn._diff.delete_data(rr)
413
        conn.get_transfer_stats().ixfr_deletion_count += 1
414
415
        return True

416
417
class XfrinIXFRAddSOA(XfrinState):
    def handle_rr(self, conn, rr):
418
        if rr.get_type() != RRType.SOA:
419
            # this shouldn't happen; should this occur it means an internal
420
            # bug.
421
422
            raise XfrinException(rr.get_type().to_text() +
                                 ' RR is given in IXFRAddSOA state')
423
424
        conn._diff.add_data(rr)
        self.set_xfrstate(conn, XfrinIXFRAdd())
425
        conn.get_transfer_stats().ixfr_addition_count += 1
426
427
428
429
        return True

class XfrinIXFRAdd(XfrinState):
    def handle_rr(self, conn, rr):
430
        if rr.get_type() == RRType.SOA:
431
432
            # This SOA marks the end of a difference sequence
            conn.get_transfer_stats().ixfr_changeset_count += 1
433
434
            soa_serial = get_soa_serial(rr.get_rdata()[0])
            if soa_serial == conn._end_serial:
435
436
                # The final part is there. Finish the transfer by
                # checking the last TSIG (if required), the zone data and
437
                # committing.
438
                conn.finish_transfer()
439
440
441
                self.set_xfrstate(conn, XfrinIXFREnd())
                return True
            elif soa_serial != conn._current_serial:
442
443
444
445
                raise XfrinProtocolError('IXFR out of sync: expected ' +
                                         'serial ' +
                                         str(conn._current_serial) +
                                         ', got ' + str(soa_serial))
446
            else:
447
448
449
450
                # Apply a change to the database. But don't commit it yet,
                # we can't know if the message is/will be properly signed.
                # A complete commit will happen after the last bit.
                conn._diff.apply()
451
452
453
                self.set_xfrstate(conn, XfrinIXFRDeleteSOA())
                return False
        conn._diff.add_data(rr)
454
        conn.get_transfer_stats().ixfr_addition_count += 1
455
456
457
458
        return True

class XfrinIXFREnd(XfrinState):
    def handle_rr(self, conn, rr):
459
460
        raise XfrinProtocolError('Extra data after the end of IXFR diffs: ' +
                                 rr.to_text())
461

462
463
464
465
466
467
468
469
470
    def finish_message(self, conn):
        '''Final processing after processing an entire IXFR session.

        There will be more actions here, but for now we simply return False,
        indicating there will be no more message to receive.

        '''
        return False

471
472
473
474
475
476
477
478
class XfrinIXFRUptodate(XfrinState):
    def handle_rr(self, conn, rr):
        raise XfrinProtocolError('Extra data after single IXFR response ' +
                                 rr.to_text())

    def finish_message(self, conn):
        raise XfrinZoneUptodate

479
class XfrinAXFR(XfrinState):
480
    def handle_rr(self, conn, rr):
481
482
483
        """
        Handle the RR by putting it into the zone.
        """
484
        conn._diff.add_data(rr)
485
        if rr.get_type() == RRType.SOA:
486
487
            # SOA means end.  Don't commit it yet - we need to perform
            # post-transfer checks
488
489
490
491
492
493

            soa_serial = get_soa_serial(rr.get_rdata()[0])
            if conn._end_serial != soa_serial:
                logger.warn(XFRIN_AXFR_INCONSISTENT_SOA, conn.zone_str(),
                            conn._end_serial, soa_serial)

494
            self.set_xfrstate(conn, XfrinAXFREnd())
495
        conn.get_transfer_stats().axfr_rr_count += 1
496
497
498
499
        # Yes, we've eaten this RR.
        return True

class XfrinAXFREnd(XfrinState):
500
    def handle_rr(self, conn, rr):
501
502
503
504
505
506
507
        raise XfrinProtocolError('Extra data after the end of AXFR: ' +
                                 rr.to_text())

    def finish_message(self, conn):
        """
        Final processing after processing an entire AXFR session.

508
509
510
        This simply calls the finish_transfer method of the connection
        that ensures it is signed by TSIG (if required), the zone data
        is valid and commits it.
511
        """
512
        conn.finish_transfer()
513
        return False
514

515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
class XfrinTransferStats:
    """
    This class keeps a record of transfer data for logging purposes.
    It records number of messages, rrs, and bytes transfered, as well
    as the start and end time. The start time is set upon instantiation of
    this class. The end time is set the first time finalize(),
    get_running_time(), or get_bytes_per_second() is called. The end time is
    set only once; subsequent calls to any of these methods does not modify
    it further.
    All _count instance variables can be directly set as needed by the
    class collecting these results.
    """
    def __init__(self):
        self.message_count = 0
        self.axfr_rr_count = 0
        self.byte_count = 0
        self.ixfr_changeset_count = 0;
        self.ixfr_deletion_count = 0;
        self.ixfr_addition_count = 0;
        self._start_time = time.time()
        self._end_time = None

    def finalize(self):
        """Sets the end time to time.time() if not done already."""
        if self._end_time is None:
            self._end_time = time.time()

    def get_running_time(self):
        """Calls finalize(), then returns the difference between creation
           and finalization time"""
        self.finalize()
        return self._end_time - self._start_time

    def get_bytes_per_second(self):
        """Returns the number of bytes per second, based on the result of
           get_running_time() and the value of bytes_count."""
        runtime = self.get_running_time()
        if runtime > 0.0:
            return float(self.byte_count) / runtime
        else:
            # This should never happen, but if some clock is so
            # off or reset in the meantime, we do need to return
            # *something* (and not raise an error)
            if self.byte_count == 0:
                return 0.0
            else:
                return float("inf")


Likun Zhang's avatar
Likun Zhang committed
564
class XfrinConnection(asyncore.dispatcher):
565
    '''Do xfrin in this class. '''
Likun Zhang's avatar
Likun Zhang committed
566

567
    def __init__(self,
568
                 sock_map, zone_name, rrclass, datasrc_client,
569
                 shutdown_event, master_addrinfo, zone_soa, tsig_key=None,
570
                 idle_timeout=60):
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
        """Constructor of the XfirnConnection class.

        Parameters:
          sock_map: empty dict, used with asyncore.
          zone_name (dns.Name): Zone name.
          rrclass (dns.RRClass): Zone RR class.
          datasrc_client (DataSourceClient): the data source client object
            used for the XFR session.
          shutdown_event (threaving.Event): used for synchronization with
            parent thread.
          master_addrinfo (tuple: (sock family, sock type, sockaddr)):
            address and port of the master server.
          zone_soa (RRset or None): SOA RRset of zone's current SOA or None
            if it's not available.
          idle_timeout (int): max idle time for read data from socket.
586

587
        """
588

589
        asyncore.dispatcher.__init__(self, map=sock_map)
JINMEI Tatuya's avatar
JINMEI Tatuya committed
590

Jelte Jansen's avatar
Jelte Jansen committed
591
        # The XFR state.  Conceptually this is purely private, so we emphasize
JINMEI Tatuya's avatar
JINMEI Tatuya committed
592
593
594
        # the fact by the double underscore.  Other classes are assumed to
        # get access to this via get_xfrstate(), and only XfrinState classes
        # are assumed to be allowed to modify it via __set_xfrstate().
595
        self.__state = None
JINMEI Tatuya's avatar
JINMEI Tatuya committed
596

597
598
599
        # Requested transfer type (RRType.AXFR or RRType.IXFR).  The actual
        # transfer type may differ due to IXFR->AXFR fallback:
        self._request_type = None
600
601

        # Zone parameters
Likun Zhang's avatar
Likun Zhang committed
602
        self._zone_name = zone_name
603
        self._rrclass = rrclass
604

605
        # Data source handler
606
        self._datasrc_client = datasrc_client
607
        self._zone_soa = zone_soa
608
609

        self._sock_map = sock_map
610
        self._soa_rr_count = 0
Likun Zhang's avatar
Likun Zhang committed
611
612
        self._idle_timeout = idle_timeout
        self._shutdown_event = shutdown_event
613
        self._master_addrinfo = master_addrinfo
614
        self._tsig_key = tsig_key
615
        self._tsig_ctx = None
616
617
        # tsig_ctx_creator is introduced to allow tests to use a mock class for
        # easier tests (in normal case we always use the default)
618
        self._tsig_ctx_creator = lambda key : TSIGContext(key)
619

620
621
622
        # keep a record of this specific transfer to log on success
        # (time, rr/s, etc)
        self._transfer_stats = XfrinTransferStats()
623
        self._counters = Counters(SPECFILE_LOCATION)
624

625
626
627
628
629
630
631
632
633
    def init_socket(self):
        '''Initialize the underlyig socket.

        This is essentially a part of __init__() and is expected to be
        called immediately after the constructor.  It's separated from
        the constructor because otherwise we might not be able to close
        it if the constructor raises an exception after opening the socket.
        '''
        self.create_socket(self._master_addrinfo[0], self._master_addrinfo[1])
634
        self.socket.setblocking(1)
Likun Zhang's avatar
Likun Zhang committed
635

636
637
638
639
640
641
    def __set_xfrstate(self, new_state):
        self.__state = new_state

    def get_xfrstate(self):
        return self.__state

642
643
644
645
646
    def get_transfer_stats(self):
        """Returns the transfer stats object, used to measure transfer time,
           and number of messages/records/bytes transfered."""
        return self._transfer_stats

647
    def zone_str(self):
Jelte Jansen's avatar
Jelte Jansen committed
648
649
        '''A convenience function for logging to include zone name and class'''
        return format_zone_str(self._zone_name, self._rrclass)
650

651
    def connect_to_master(self):
652
        '''Connect to master in TCP.'''
653

654
        try:
655
            self.connect(self._master_addrinfo[2])
656
657
            return True
        except socket.error as e:
658
659
            logger.error(XFRIN_CONNECT_MASTER, self._master_addrinfo[2],
                         str(e))
660
661
            return False

Likun Zhang's avatar
Likun Zhang committed
662
    def _create_query(self, query_type):
663
664
        '''Create an XFR-related query message.

665
666
667
668
669
670
671
        query_type is either SOA, AXFR or IXFR.  An IXFR query needs the
        zone's current SOA record.  If it's not known, it raises an
        XfrinException exception.  Note that this may not necessarily a
        broken configuration; for the first attempt of transfer the secondary
        may not have any boot-strap zone information, in which case IXFR
        simply won't work.  The xfrin should then fall back to AXFR.
        _request_serial is recorded for later use.
672

673
        '''
674
        msg = Message(Message.RENDER)
675
        query_id = random.randint(0, 0xFFFF)
Likun Zhang's avatar
Likun Zhang committed
676
677
        self._query_id = query_id
        msg.set_qid(query_id)
678
679
        msg.set_opcode(Opcode.QUERY)
        msg.set_rcode(Rcode.NOERROR)
680
        msg.add_question(Question(self._zone_name, self._rrclass, query_type))
681
682
683
684
685
686

        # Remember our serial, if known
        self._request_serial = get_soa_serial(self._zone_soa.get_rdata()[0]) \
            if self._zone_soa is not None else None

        # Set the authority section with our SOA for IXFR
687
        if query_type == RRType.IXFR:
688
689
690
691
692
            if self._zone_soa is None:
                # (incremental) IXFR doesn't work without known SOA
                raise XfrinException('Failed to create IXFR query due to no ' +
                                     'SOA for ' + self.zone_str())
            msg.add_rrset(Message.SECTION_AUTHORITY, self._zone_soa)
693

Likun Zhang's avatar
Likun Zhang committed
694
695
696
697
698
699
700
701
702
703
704
        return msg

    def _send_data(self, data):
        size = len(data)
        total_count = 0
        while total_count < size:
            count = self.send(data[total_count:])
            total_count += count

    def _send_query(self, query_type):
        '''Send query message over TCP. '''
705

Likun Zhang's avatar
Likun Zhang committed
706
        msg = self._create_query(query_type)
Jelte Jansen's avatar
Jelte Jansen committed
707
        render = MessageRenderer()
708
709
710
        # XXX Currently, python wrapper doesn't accept 'None' parameter in this
        # case, we should remove the if statement and use a universal
        # interface later.
711
712
        if self._tsig_key is not None:
            self._tsig_ctx = self._tsig_ctx_creator(self._tsig_key)
713
714
715
            msg.to_wire(render, self._tsig_ctx)
        else:
            msg.to_wire(render)
Likun Zhang's avatar
Likun Zhang committed
716

717
        header_len = struct.pack('H', socket.htons(render.get_length()))
Likun Zhang's avatar
Likun Zhang committed
718
        self._send_data(header_len)
Jelte Jansen's avatar
Jelte Jansen committed
719
        self._send_data(render.get_data())
720
721
722

    def _asyncore_loop(self):
        '''
723
724
725
        This method is a trivial wrapper for asyncore.loop().  It's extracted from
        _get_request_response so that we can test the rest of the code without
        involving actual communication with a remote server.'''
JINMEI Tatuya's avatar
JINMEI Tatuya committed
726
        asyncore.loop(self._idle_timeout, map=self._sock_map, count=1)
727

Likun Zhang's avatar
Likun Zhang committed
728
729
730
731
732
733
    def _get_request_response(self, size):
        recv_size = 0
        data = b''
        while recv_size < size:
            self._recv_time_out = True
            self._need_recv_size = size - recv_size
734
            self._asyncore_loop()
Likun Zhang's avatar
Likun Zhang committed
735
736
737
738
739
740
741
742
            if self._recv_time_out:
                raise XfrinException('receive data from socket time out.')

            recv_size += self._recvd_size
            data += self._recvd_data

        return data

743
744
745
746
747
    def _check_response_tsig(self, msg, response_data):
        tsig_record = msg.get_tsig_record()
        if self._tsig_ctx is not None:
            tsig_error = self._tsig_ctx.verify(tsig_record, response_data)
            if tsig_error != TSIGError.NOERROR:
748
749
                raise XfrinProtocolError('TSIG verify fail: %s' %
                                         str(tsig_error))
750
751
752
753
754
755
756
757
        elif tsig_record is not None:
            # If the response includes a TSIG while we didn't sign the query,
            # we treat it as an error.  RFC doesn't say anything about this
            # case, but it clearly states the server must not sign a response
            # to an unsigned request.  Although we could be flexible, no sane
            # implementation would return such a response, and since this is
            # part of security mechanism, it's probably better to be more
            # strict.
758
            raise XfrinProtocolError('Unexpected TSIG in response')
759

760
761
762
763
764
    def _check_response_tsig_last(self):
        """
        Check there's a signature at the last message.
        """
        if self._tsig_ctx is not None:
765
            if not self._tsig_ctx.last_had_signature():
766
767
768
                raise XfrinProtocolError('TSIG verify fail: no TSIG on last '+
                                         'message')

769
770
771
772
773
774
    def __validate_error(self, reason):
        '''
        Used as error callback below.
        '''
        logger.error(XFRIN_ZONE_INVALID, self._zone_name, self._rrclass,
                     reason)
775

776
777
778
779
780
    def __validate_warning(self, reason):
        '''
        Used as warning callback below.
        '''
        logger.warn(XFRIN_ZONE_WARN, self._zone_name, self._rrclass, reason)
781

782
    def finish_transfer(self):
783
784
        """
        Perform any necessary checks after a transfer. Then complete the
785
        transfer by committing the transaction into the data source.
786
787
        """
        self._check_response_tsig_last()
788
789
790
        if not check_zone(self._zone_name, self._rrclass,
                          self._diff.get_rrset_collection(),
                          (self.__validate_error, self.__validate_warning)):
791
            raise XfrinZoneError('Validation of the new zone failed')
792
793
        self._diff.commit()

794
    def __parse_soa_response(self, msg, response_data):
Jelte Jansen's avatar
Jelte Jansen committed
795
        '''Parse a response to SOA query and extract the SOA from answer.
796
797
798
799

        This is a subroutine of _check_soa_serial().  This method also
        validates message, and rejects bogus responses with XfrinProtocolError.

Jelte Jansen's avatar
Jelte Jansen committed
800
        If everything is okay, it returns the SOA RR from the answer section
801
802
803
        of the response.

        '''
Jelte Jansen's avatar
Jelte Jansen committed
804
        # Check TSIG integrity and validate the header.  Unlike AXFR/IXFR,
805
806
807
808
809
810
811
812
813
        # we should be more strict for SOA queries and check the AA flag, too.
        self._check_response_tsig(msg, response_data)
        self._check_response_header(msg)
        if not msg.get_header_flag(Message.HEADERFLAG_AA):
            raise XfrinProtocolError('non-authoritative answer to SOA query')

        # Validate the question section
        n_question = msg.get_rr_count(Message.SECTION_QUESTION)
        if n_question != 1:
814
815
816
            raise XfrinProtocolError('Invalid response to SOA query: ' +
                                     '(' + str(n_question) + ' questions, 1 ' +
                                     'expected)')
817
818
819
        resp_question = msg.get_question()[0]
        if resp_question.get_name() != self._zone_name or \
                resp_question.get_class() != self._rrclass or \
820
                resp_question.get_type() != RRType.SOA:
821
822
823
            raise XfrinProtocolError('Invalid response to SOA query: '
                                     'question mismatch: ' +
                                     str(resp_question))
824

825
        # Look into the answer section for SOA
826
        soa = None
827
        for rr in msg.get_section(Message.SECTION_ANSWER):
828
            if rr.get_type() == RRType.SOA:
829
830
831
832
                if soa is not None:
                    raise XfrinProtocolError('SOA response had multiple SOAs')
                soa = rr
            # There should not be a CNAME record at top of zone.
833
            if rr.get_type() == RRType.CNAME:
834
835
836
837
838
839
                raise XfrinProtocolError('SOA query resulted in CNAME')

        # If SOA is not found, try to figure out the reason then report it.
        if soa is None:
            # See if we have any SOA records in the authority section.
            for rr in msg.get_section(Message.SECTION_AUTHORITY):
840
                if rr.get_type() == RRType.NS:
841
                    raise XfrinProtocolError('SOA query resulted in referral')
842
                if rr.get_type() == RRType.SOA:
843
                    raise XfrinProtocolError('SOA query resulted in NODATA')
844
845
            raise XfrinProtocolError('No SOA record found in response to ' +
                                     'SOA query')
846
847
848
849
850
851
852
853

        # Check if the SOA is really what we asked for
        if soa.get_name() != self._zone_name or \
                soa.get_class() != self._rrclass:
            raise XfrinProtocolError("SOA response doesn't match query: " +
                                     str(soa))

        # All okay, return it
854
855
        return soa

856
    def _get_ipver_str(self):
Naoki Kambe's avatar
Naoki Kambe committed
857
        """Returns a 'v4' or 'v6' string representing a IP version
858
        depending on the socket family. This is for an internal use
859
860
861
862
        only (except for tests). This is supported only for IP sockets.
        It raises a ValueError exception on other address families.

        """
Naoki Kambe's avatar
Naoki Kambe committed
863
864
865
866
        if self.socket.family == socket.AF_INET:
            return 'v4'
        elif self.socket.family == socket.AF_INET6:
            return 'v6'
867
868
        raise ValueError("Invalid address family. "
                         "This is supported only for IP sockets")
869

Likun Zhang's avatar
Likun Zhang committed
870
    def _check_soa_serial(self):
871
872
873
874
875
876
        '''Send SOA query and compare the local and remote serials.

        If we know our local serial and the remote serial isn't newer
        than ours, we abort the session with XfrinZoneUptodate.
        On success it returns XFRIN_OK for testing.  The caller won't use it.

Likun Zhang's avatar
Likun Zhang committed
877
        '''
878

879
        self._send_query(RRType.SOA)
Naoki Kambe's avatar
Naoki Kambe committed
880
        # count soaoutv4 or soaoutv6 requests
881
882
        self._counters.inc('zones', self._zone_name.to_text(),
                           'soaout' + self._get_ipver_str())
883
884
885
        data_len = self._get_request_response(2)
        msg_len = socket.htons(struct.unpack('H', data_len)[0])
        soa_response = self._get_request_response(msg_len)
886
        msg = Message(Message.PARSE)
887
        msg.from_wire(soa_response, Message.PRESERVE_ORDER)
888

889
890
891
        # Validate/parse the rest of the response, and extract the SOA
        # from the answer section
        soa = self.__parse_soa_response(msg, soa_response)
892

893
        # Compare the two serials.  If ours is 'new', abort with ZoneUptodate.
894
        primary_serial = get_soa_serial(soa.get_rdata()[0])
895
        if self._request_serial is not None and \
896
897
898
899
900
901
902
903
                self._request_serial >= primary_serial:
            if self._request_serial != primary_serial:
                logger.info(XFRIN_ZONE_SERIAL_AHEAD, primary_serial,
                            self.zone_str(),
                            format_addrinfo(self._master_addrinfo),
                            self._request_serial)
            raise XfrinZoneUptodate

Likun Zhang's avatar
Likun Zhang committed
904
905
        return XFRIN_OK

906
    def do_xfrin(self, check_soa, request_type=RRType.AXFR):
907
        '''Do an xfr session by sending xfr request and parsing responses.'''
908

Likun Zhang's avatar
Likun Zhang committed
909
910
        try:
            ret = XFRIN_OK
911
            self._request_type = request_type
912
            req_str = request_type.to_text()
Likun Zhang's avatar
Likun Zhang committed
913
            if check_soa:
914
                self._check_soa_serial()
915
916
917
918
                self.close()
                self.init_socket()
                if not self.connect_to_master():
                    raise XfrinException('Unable to reconnect to master')
Jelte Jansen's avatar
Jelte Jansen committed
919

920
            # start statistics timer
921
922
923
            # Note: If the timer for the zone is already started but
            # not yet stopped due to some error, the last start time
            # is overwritten at this point.
Naoki Kambe's avatar
Naoki Kambe committed
924
            self._counters.start_timer('zones', self._zone_name.to_text(),
925
                                       'last_' + req_str.lower() + '_duration')
926
            logger.info(XFRIN_XFR_TRANSFER_STARTED, req_str, self.zone_str())
927
            # An AXFR or IXFR is being requested.
928
            self._counters.inc('zones', self._zone_name.to_text(),
929
                               req_str.lower() + 'req' + self._get_ipver_str())
930
931
932
            self._send_query(self._request_type)
            self.__state = XfrinInitialSOA()
            self._handle_xfrin_responses()
Jelte Jansen's avatar
Jelte Jansen committed
933
934
935
            # Depending what data was found, we log different status reports
            # (In case of an AXFR-style IXFR, print the 'AXFR' message)
            if self._transfer_stats.axfr_rr_count == 0:
936
937
938
939
940
941
942
943
944
945
946
                logger.info(XFRIN_IXFR_TRANSFER_SUCCESS,
                            self.zone_str(),
                            self._transfer_stats.message_count,
                            self._transfer_stats.ixfr_changeset_count,
                            self._transfer_stats.ixfr_deletion_count,
                            self._transfer_stats.ixfr_addition_count,
                            self._transfer_stats.byte_count,
                            "%.3f" % self._transfer_stats.get_running_time(),
                            "%.f" % self._transfer_stats.get_bytes_per_second()
                           )
            else:
Jelte Jansen's avatar
Jelte Jansen committed
947
948
                logger.info(XFRIN_TRANSFER_SUCCESS,
                            req_str,
949
950
951
952
953
954
955
                            self.zone_str(),
                            self._transfer_stats.message_count,
                            self._transfer_stats.axfr_rr_count,
                            self._transfer_stats.byte_count,
                            "%.3f" % self._transfer_stats.get_running_time(),
                            "%.f" % self._transfer_stats.get_bytes_per_second()
                           )
956
957
        except XfrinZoneUptodate:
            # Eventually we'll probably have to treat this case as a trigger
958
            # of trying another primary server, etc, but for now we treat it
959
960
            # as "success".
            pass
961
962
963
964
965
966
967
        except XfrinZoneError:
            # The log message doesn't contain the exception text, since there's
            # only one place where the exception is thrown now and it'd be the
            # same generic message every time.
            logger.error(XFRIN_INVALID_ZONE_DATA, self.zone_str(),
                         format_addrinfo(self._master_addrinfo))
            ret = XFRIN_FAIL
968
        except XfrinProtocolError as e:
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
969
            logger.info(XFRIN_XFR_TRANSFER_PROTOCOL_VIOLATION, req_str,
970
971
972
973
                        self.zone_str(),
                        format_addrinfo(self._master_addrinfo), str(e))
            ret = XFRIN_FAIL
        except XfrinException as e:
974
            logger.error(XFRIN_XFR_TRANSFER_FAILURE, req_str,
975
976
                         self.zone_str(),
                         format_addrinfo(self._master_addrinfo), str(e))
977
            ret = XFRIN_FAIL
978
979
980
981
982
983
984
985
986
        except Exception as e:
            # Catching all possible exceptions like this is generally not a
            # good practice, but handling an xfr session could result in
            # so many types of exceptions, including ones from the DNS library
            # or from the data source library.  Eventually we'd introduce a
            # hierarchy for exception classes from a base "ISC exception" and
            # catch it here, but until then we need broadest coverage so that
            # we won't miss anything.

987
            logger.error(XFRIN_XFR_OTHER_FAILURE, req_str,
JINMEI Tatuya's avatar
JINMEI Tatuya committed
988
                         self.zone_str(), str(e))
989
            ret = XFRIN_FAIL
Likun Zhang's avatar
Likun Zhang committed
990
        finally:
991
992
993
994
995
996
997
998
999
1000
1001
            # A xfrsuccess or xfrfail counter is incremented depending on
            # the result.
            result = {XFRIN_OK: 'xfrsuccess', XFRIN_FAIL: 'xfrfail'}[ret]
            self._counters.inc('zones', self._zone_name.to_text(), result)
            # The started statistics timer is finally stopped only in
            # a successful case.
            if ret == XFRIN_OK:
                self._counters.stop_timer('zones',
                                          self._zone_name.to_text(),
                                          'last_' + req_str.lower() +
                                          '_duration')
1002
1003
1004
1005
            # Make sure any remaining transaction in the diff is closed
            # (if not yet - possible in case of xfr-level exception) as soon
            # as possible
            self._diff = None
Likun Zhang's avatar
Likun Zhang committed
1006
        return ret
1007

1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
    def _check_response_header(self, msg):
        '''Perform minimal validation on responses'''

        # It's not clear how strict we should be about response validation.
        # BIND 9 ignores some cases where it would normally be considered a
        # bogus response.  For example, it accepts a response even if its
        # opcode doesn't match that of the corresponding request.
        # According to an original developer of BIND 9 some of the missing
        # checks are deliberate to be kind to old implementations that would
        # cause interoperability trouble with stricter checks.

1019
        msg_rcode = msg.get_rcode()
1020
        if msg_rcode != Rcode.NOERROR:
1021
1022
            raise XfrinProtocolError('error response: %s' %
                                     msg_rcode.to_text())
Likun Zhang's avatar
Likun Zhang committed
1023

1024
        if not msg.get_header_flag(Message.HEADERFLAG_QR):
1025
            raise XfrinProtocolError('response is not a response')
Likun Zhang's avatar
Likun Zhang committed
1026
1027

        if msg.get_qid() != self._query_id:
1028
            raise XfrinProtocolError('bad query id')
Likun Zhang's avatar
Likun Zhang committed
1029

1030
1031
1032
1033
1034
    def _check_response_status(self, msg):
        '''Check validation of xfr response. '''

        self._check_response_header(msg)

1035
        if msg.get_rr_count(Message.SECTION_QUESTION) > 1:
1036
            raise XfrinProtocolError('query section count greater than 1')
Likun Zhang's avatar
Likun Zhang committed
1037

1038
1039
1040
1041
1042
    def _handle_xfrin_responses(self):
        read_next_msg = True
        while read_next_msg:
            data_len = self._get_request_response(2)
            msg_len = socket.htons(struct.unpack('H', data_len)[0])
1043
            self._transfer_stats.byte_count += msg_len + 2
1044
1045
1046
            recvdata = self._get_request_response(msg_len)
            msg = Message(Message.PARSE)
            msg.from_wire(recvdata, Message.PRESERVE_ORDER)
1047
            self._transfer_stats.message_count += 1
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059

            # TSIG related checks, including an unexpected signed response
            self._check_response_tsig(msg, recvdata)

            # Perform response status validation
            self._check_response_status(msg)

            for rr in msg.get_section(Message.SECTION_ANSWER):
                rr_handled = False
                while not rr_handled:
                    rr_handled = self.__state.handle_rr(self, rr)

1060
1061
            read_next_msg = self.__state.finish_message(self)

1062
1063
1064
            if self._shutdown_event.is_set():
                raise XfrinException('xfrin is forced to stop')

1065
1066
1067
1068
1069
1070
1071
    def handle_read(self):
        '''Read query's response from socket. '''

        self._recvd_data = self.recv(self._need_recv_size)
        self._recvd_size = len(self._recvd_data)
        self._recv_time_out = False

Likun Zhang's avatar
Likun Zhang committed
1072
1073
    def writable(self):
        '''Ignore the writable socket. '''
1074

Likun Zhang's avatar
Likun Zhang committed
1075
1076
        return False

1077
1078
1079
1080
def _get_zone_soa(datasrc_client, zone_name, zone_class):
    """Retrieve the current SOA RR of the zone to be transferred.

    This function is essentially private to the module, but will also
1081
1082
    be called (or tweaked) from tests; no one else should use this
    function directly.
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099

    It will be used for various purposes in subsequent xfr protocol
    processing.   It is validly possible that the zone is currently
    empty and therefore doesn't have an SOA, so this method doesn't
    consider it an error and returns None in such a case.  It may or
    may not result in failure in the actual processing depending on
    how the SOA is used.

    When the zone has an SOA RR, this method makes sure that it's
    valid, i.e., it has exactly one RDATA; if it is not the case
    this method returns None.

    If the underlying data source doesn't even know the zone, this method
    tries to provide backward compatible behavior where xfrin is
    responsible for creating zone in the corresponding DB table.
    For a longer term we should deprecate this behavior by introducing
    more generic zone management framework, but at the moment we try
1100
    to not surprise existing users.
1101
1102

    """
1103
1104
1105
1106
1107
    # datasrc_client should never be None in production case (only tests could
    # specify None)
    if datasrc_client is None:
        return None

1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
    # get the zone finder.  this must be SUCCESS (not even
    # PARTIALMATCH) because we are specifying the zone origin name.
    result, finder = datasrc_client.find_zone(zone_name)
    if result != DataSourceClient.SUCCESS:
        # The data source doesn't know the zone.  For now, we provide
        # backward compatibility and creates a new one ourselves.
        # For longer term, we should probably separate this level of zone
        # management outside of xfrin.
        datasrc_client.create_zone(zone_name)
        logger.warn(XFRIN_ZONE_CREATED, format_zone_str(zone_name, zone_class))
        # try again
        result, finder = datasrc_client.find_zone(zone_name)
    if result != DataSourceClient.SUCCESS:
        return None
    result, soa_rrset, _ = finder.find(zone_name, RRType.SOA)
    if result != ZoneFinder.SUCCESS:
        logger.info(XFRIN_ZONE_NO_SOA, format_zone_str(zone_name, zone_class))
        return None
    if soa_rrset.get_rdata_count() != 1:
        logger.warn(XFRIN_ZONE_MULTIPLE_SOA,
                    format_zone_str(zone_name, zone_class),
                    soa_rrset.get_rdata_count())
        return None
    return soa_rrset

1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
def __get_initial_xfr_type(zone_soa, request_ixfr, zname, zclass, master_addr):
    """Determine the initial xfr request type.

    This is a dedicated subroutine of __process_xfrin.
    """
    if zone_soa is None:
        # This is a kind of special case, so we log it at info level.
        logger.info(XFRIN_INITIAL_AXFR, format_zone_str(zname, zclass),
                     AddressFormatter(master_addr))
        return RRType.AXFR
    if request_ixfr == ZoneInfo.REQUEST_IXFR_DISABLED:
        logger.debug(DBG_XFRIN_TRACE, XFRIN_INITIAL_IXFR_DISABLED,
                     format_zone_str(zname, zclass),
                     AddressFormatter(master_addr))
        return RRType.AXFR

    assert(request_ixfr == ZoneInfo.REQUEST_IXFR_FIRST or
           request_ixfr == ZoneInfo.REQUEST_IXFR_ONLY)
    logger.debug(DBG_XFRIN_TRACE, XFRIN_INITIAL_IXFR,
                     format_zone_str(zname, zclass),
                     AddressFormatter(master_addr))
    return RRType.IXFR

1156
def __process_xfrin(server, zone_name, rrclass, db_file,
1157
                    shutdown_event, master_addrinfo, check_soa, tsig_key,
1158
                    request_ixfr, conn_class):
1159
1160
    conn = None
    exception = None
1161
    ret = XFRIN_FAIL
1162
1163
    try:
        # Create a data source client used in this XFR session.  Right now we
1164
1165
        # still assume an sqlite3-based data source, and use both the old and
        # new data source APIs.  We also need to use a mock client for tests.
1166
1167
1168
1169
1170
1171
1172
        # For a temporary workaround to deal with these situations, we skip the
        # creation when the given file is none (the test case).  Eventually
        # this code will be much cleaner.
        datasrc_client = None
        if db_file is not None:
            # temporary hardcoded sqlite initialization. Once we decide on
            # the config specification, we need to update this (TODO)
1173
1174
            # this may depend on #1207, or any follow-up ticket created for
            # #1207
1175
1176
1177
1178
            datasrc_type = "sqlite3"
            datasrc_config = "{ \"database_file\": \"" + db_file + "\"}"
            datasrc_client = DataSourceClient(datasrc_type, datasrc_config)

1179
1180
1181
1182
1183
1184
        # Get the current zone SOA (if available) and determine the initial
        # reuqest type: AXFR or IXFR.
        zone_soa = _get_zone_soa(datasrc_client, zone_name, rrclass)
        request_type = __get_initial_xfr_type(zone_soa, request_ixfr,
                                              zone_name, rrclass,
                                              master_addrinfo[2])
1185

1186
1187
        # Create a TCP connection for the XFR session and perform the
        # operation.
1188
        sock_map = {}
1189
1190
        # In case we were asked to do IXFR and that one fails, we try again
        # with AXFR. But only if we could actually connect to the server.
1191
        #
1192
1193
1194
        # So we start with retry as True, which is set to false on each
        # attempt. In the case of connected but failed IXFR, we set it to true
        # once again.
1195
1196
1197
1198
        retry = True
        while retry:
            retry = False
            conn = conn_class(sock_map, zone_name, rrclass, datasrc_client,
1199
1200
                              shutdown_event, master_addrinfo, zone_soa,
                              tsig_key)
1201
1202
1203
1204
            conn.init_socket()
            ret = XFRIN_FAIL
            if conn.connect_to_master():
                ret = conn.do_xfrin(check_soa, request_type)
1205
                if ret == XFRIN_FAIL and request_type == RRType.IXFR:
1206
1207
1208
                    # IXFR failed for some reason. It might mean the server
                    # can't handle it, or we don't have the zone or we are out
                    # of sync or whatever else. So we retry with with AXFR, as
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
                    # it may succeed in many such cases; if "IXFR only" is
                    # specified in request_ixfr, however, we suppress the
                    # fallback.
                    if request_ixfr == ZoneInfo.REQUEST_IXFR_ONLY:
                        logger.warn(XFRIN_XFR_TRANSFER_FALLBACK_DISABLED,
                                    conn.zone_str())
                    else:
                        retry = True
                        request_type = RRType.AXFR
                        logger.warn(XFRIN_XFR_TRANSFER_FALLBACK,
                                    conn.zone_str())
                        conn.close()
                        conn = None
1222

1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
    except Exception as ex:
        # If exception happens, just remember it here so that we can re-raise
        # after cleaning up things.  We don't log it here because we want
        # eliminate smallest possibility of having an exception in logging
        # itself.
        exception = ex

    # asyncore.dispatcher requires explicit close() unless its lifetime
    # from born to destruction is closed within asyncore.loop, which is not
    # the case for us.  We always close() here, whether or not do_xfrin
    # succeeds, and even when we see an unexpected exception.
    if conn is not None:
        conn.close()
1236