xfrin.py.in 79.5 KB
Newer Older
Likun Zhang's avatar
Likun Zhang committed
1
2
#!@PYTHON@

3
# Copyright (C) 2009-2013  Internet Systems Consortium.
Likun Zhang's avatar
Likun Zhang committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

import sys; sys.path.append ('@@PYTHONPATH@@')
import os
import signal
import isc
import asyncore
import struct
import threading
import socket
import random
27
import time
28
from functools import reduce
29
from optparse import OptionParser, OptionValueError
Likun Zhang's avatar
Likun Zhang committed
30
from isc.config.ccsession import *
31
from isc.statistics.dns import Counters
32
from isc.notify import notify_out
33
import isc.util.process
34
from isc.util.address_formatter import AddressFormatter
35
from isc.datasrc import DataSourceClient, ZoneFinder
Michal Vaner's avatar
Michal Vaner committed
36
import isc.net.parse
37
from isc.xfrin.diff import Diff
38
from isc.server_common.auth_command import auth_loadzone_command
39
from isc.server_common.tsig_keyring import init_keyring, get_keyring
40
from isc.server_common.datasrc_clients_mgr import DataSrcClientsMgr, ConfigError
41
from isc.log_messages.xfrin_messages import *
42
from isc.dns import *
43

44
isc.log.init("b10-xfrin", buffer=True)
45
logger = isc.log.Logger("xfrin")
46

47
48
49
50
51
# Pending system-wide debug level definitions, the ones we
# use here are hardcoded for now
DBG_PROCESS = logger.DBGLVL_TRACE_BASIC
DBG_COMMANDS = logger.DBGLVL_TRACE_DETAIL

52
isc.util.process.rename()
Michal Vaner's avatar
Michal Vaner committed
53

54
55
56
# If B10_FROM_BUILD or B10_FROM_SOURCE is set in the environment, we
# use data files from a directory relative to that, otherwise we use
# the ones installed on the system
57
58
59
SPECFILE_PATH = "@datadir@/@PACKAGE@"\
    .replace("${datarootdir}", "@datarootdir@")\
    .replace("${prefix}", "@prefix@")
60
61
if "B10_FROM_SOURCE" in os.environ:
    SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrin"
Likun Zhang's avatar
Likun Zhang committed
62
63
SPECFILE_LOCATION = SPECFILE_PATH + "/xfrin.spec"

64
AUTH_MODULE_NAME = 'Auth'
65
XFROUT_MODULE_NAME = 'Xfrout'
66
67

# Remote module and identifiers (according to their spec files)
68
ZONE_MANAGER_MODULE_NAME = 'Zonemgr'
69

70
REFRESH_FROM_ZONEMGR = 'refresh_from_zonemgr'
Jelte Jansen's avatar
Jelte Jansen committed
71

72
73
# Constants for debug levels.
DBG_XFRIN_TRACE = logger.DBGLVL_TRACE_BASIC
74

Jelte Jansen's avatar
Jelte Jansen committed
75
76
77
78
# These two default are currently hard-coded. For config this isn't
# necessary, but we need these defaults for optional command arguments
# (TODO: have similar support to get default values for command
# arguments as we do for config options)
79
DEFAULT_MASTER_PORT = 53
80
DEFAULT_ZONE_CLASS = RRClass.IN
Jelte Jansen's avatar
Jelte Jansen committed
81

Likun Zhang's avatar
Likun Zhang committed
82
__version__ = 'BIND10'
83
84
85
86

# Internal result codes of an xfr session
XFRIN_OK = 0                    # normal success
XFRIN_FAIL = 1                  # general failure (internal/external)
87

88
class XfrinException(Exception):
Likun Zhang's avatar
Likun Zhang committed
89
90
    pass

91
92
93
94
95
class XfrinProtocolError(Exception):
    '''An exception raised for errors encountered in xfrin protocol handling.
    '''
    pass

96
97
class XfrinZoneError(Exception):
    '''
98
    An exception raised when the received zone is broken enough to be unusable.
99
100
101
    '''
    pass

102
class XfrinZoneUptodate(Exception):
103
104
105
106
    '''
    Thrown when the zone is already up to date, so there's no need to download
    the zone. This is not really an error case (but it's still an exceptional
    condition and the control flow is different than usual).
107
108
109
    '''
    pass

Jelte Jansen's avatar
Jelte Jansen committed
110
class XfrinZoneInfoException(Exception):
111
    """This exception is raised if there is an error in the given
Jelte Jansen's avatar
Jelte Jansen committed
112
113
       configuration (part), or when a command does not have a required
       argument or has bad arguments, for instance when the zone's master
114
115
116
       address is not a valid IP address, when the zone does not
       have a name, or when multiple settings are given for the same
       zone."""
117
118
    pass

119
def _check_zone_name(zone_name_str):
120
121
    """Checks if the given zone name is a valid domain name, and returns
    it as a Name object. Raises an XfrinException if it is not."""
122
    try:
Jelte Jansen's avatar
Jelte Jansen committed
123
124
125
126
127
128
        # In the _zones dict, part of the key is the zone name,
        # but due to a limitation in the Name class, we
        # cannot directly use it as a dict key, and we use to_text()
        #
        # Downcase the name here for that reason.
        return Name(zone_name_str, True)
129
130
    except (EmptyLabel, TooLongLabel, BadLabelType, BadEscape,
            TooLongName, IncompleteName) as ne:
Jelte Jansen's avatar
Jelte Jansen committed
131
        raise XfrinZoneInfoException("bad zone name: " + zone_name_str + " (" + str(ne) + ")")
132
133
134
135

def _check_zone_class(zone_class_str):
    """If the given argument is a string: checks if the given class is
       a valid one, and returns an RRClass object if so.
Jelte Jansen's avatar
Jelte Jansen committed
136
       Raises XfrinZoneInfoException if not.
137
       If it is None, this function returns the default RRClass.IN"""
138
    if zone_class_str is None:
Jelte Jansen's avatar
Jelte Jansen committed
139
        return DEFAULT_ZONE_CLASS
140
141
142
    try:
        return RRClass(zone_class_str)
    except InvalidRRClass as irce:
Jelte Jansen's avatar
Jelte Jansen committed
143
        raise XfrinZoneInfoException("bad zone class: " + zone_class_str + " (" + str(irce) + ")")
144

Jelte Jansen's avatar
Jelte Jansen committed
145
146
147
148
149
150
151
def format_zone_str(zone_name, zone_class):
    """Helper function to format a zone name and class as a string of
       the form '<name>/<class>'.
       Parameters:
       zone_name (isc.dns.Name) name to format
       zone_class (isc.dns.RRClass) class to format
    """
152
    return zone_name.to_text(True) + '/' + str(zone_class)
Jelte Jansen's avatar
Jelte Jansen committed
153
154
155
156
157
158
159

def format_addrinfo(addrinfo):
    """Helper function to format the addrinfo as a string of the form
       <addr>:<port> (for IPv4) or [<addr>]:port (for IPv6). For unix domain
       sockets, and unknown address families, it returns a basic string
       conversion of the third element of the passed tuple.
       Parameters:
160
       addrinfo: a 3-tuple consisting of address family, socket type, and,
Jelte Jansen's avatar
Jelte Jansen committed
161
162
163
                 depending on the family, either a 2-tuple with the address
                 and port, or a filename
    """
164
165
166
167
168
169
170
171
    try:
        if addrinfo[0] == socket.AF_INET:
            return str(addrinfo[2][0]) + ":" + str(addrinfo[2][1])
        elif addrinfo[0] == socket.AF_INET6:
            return "[" + str(addrinfo[2][0]) + "]:" + str(addrinfo[2][1])
        else:
            return str(addrinfo[2])
    except IndexError:
172
        raise TypeError("addrinfo argument to format_addrinfo() does not "
173
                        "appear to be consisting of (family, socktype, (addr, port))")
Jelte Jansen's avatar
Jelte Jansen committed
174

175
def get_soa_serial(soa_rdata):
176
    '''Extract the serial field of SOA RDATA and return it as a Serial object.
177
178
179
180
181
182
183
184

    We don't have to be very efficient here, so we first dump the entire RDATA
    as a string and convert the first corresponding field.  This should be
    sufficient in practice, but may not always work when the MNAME or RNAME
    contains an (escaped) space character in their labels.  Ideally there
    should be a more direct and convenient way to get access to the SOA
    fields.
    '''
185
    return Serial(int(soa_rdata.to_text().split()[2]))
186
187
188

class XfrinState:
    '''
189
    The states of the incoming *XFR state machine.
190
191
192
193
194
195

    We (will) handle both IXFR and AXFR with a single integrated state
    machine because they cannot be distinguished immediately - an AXFR
    response to an IXFR request can only be detected when the first two (2)
    response RRs have already been received.

JINMEI Tatuya's avatar
JINMEI Tatuya committed
196
197
198
    The following diagram summarizes the state transition.  After sending
    the query, xfrin starts the process with the InitialSOA state (all
    IXFR/AXFR response begins with an SOA).  When it reaches IXFREnd
199
    or AXFREnd, the process successfully completes.
JINMEI Tatuya's avatar
JINMEI Tatuya committed
200

201
202
                             (AXFR or
            (recv SOA)        AXFR-style IXFR)  (SOA, add)
203
    InitialSOA------->FirstData------------->AXFR--------->AXFREnd
204
205
206
207
208
209
         |                |                  |  ^         (post xfr
         |(IXFR &&        |                  |  |        checks, then
         | recv SOA       |                  +--+        commit)
         | not new)       |            (non SOA, add)
         V                |
    IXFRUptodate          |                     (non SOA, delete)
JINMEI Tatuya's avatar
JINMEI Tatuya committed
210
211
212
213
214
215
216
217
218
219
220
221
222
223
               (pure IXFR,|                           +-------+
            keep handling)|             (Delete SOA)  V       |
                          + ->IXFRDeleteSOA------>IXFRDelete--+
                                   ^                   |
                (see SOA, not end, |          (see SOA)|
            commit, keep handling) |                   |
                                   |                   V
                      +---------IXFRAdd<----------+IXFRAddSOA
        (non SOA, add)|         ^  |    (Add SOA)
                      ----------+  |
                                   |(see SOA w/ end serial, commit changes)
                                   V
                                IXFREnd

224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
    Note that changes are committed for every "difference sequence"
    (i.e. changes for one SOA update).  This means when an IXFR response
    contains multiple difference sequences and something goes wrong
    after several commits, these changes have been published and visible
    to clients even if the IXFR session is subsequently aborted.
    It is not clear if this is valid in terms of the protocol specification.
    Section 4 of RFC 1995 states:

       An IXFR client, should only replace an older version with a newer
       version after all the differences have been successfully processed.

    If this "replacement" is for the changes of one difference sequence
    and "all the differences" mean the changes for that sequence, this
    implementation strictly follows what RFC states.  If this is for
    the entire IXFR response (that may contain multiple sequences),
    we should implement it with one big transaction and one final commit
    at the very end.

    For now, we implement it with multiple smaller commits for two
    reasons.  First, this is what BIND 9 does, and we generally port
    the implementation logic here.  BIND 9 has been supporting IXFR
    for many years, so the fact that it still behaves this way
    probably means it at least doesn't cause a severe operational
    problem in practice.  Second, especially because BIND 10 would
    often uses a database backend, a larger transaction could cause an
    undesirable effects, e.g. suspending normal lookups for a longer
    period depending on the characteristics of the database.  Even if
    we find something wrong in a later sequeunce and abort the
    session, we can start another incremental update from what has
    been validated, or we can switch to AXFR to replace the zone
    completely.

JINMEI Tatuya's avatar
JINMEI Tatuya committed
256
257
258
259
260
261
262
263
264
265
    This implementation uses the state design pattern, where each state
    is represented as a subclass of the base XfrinState class.  Each concrete
    subclass of XfrinState is assumed to define two methods: handle_rr() and
    finish_message().  These methods handle specific part of XFR protocols
    and (if necessary) perform the state transition.

    Conceptually, XfrinState and its subclasses are a "friend" of
    XfrinConnection and are assumed to be allowed to access its internal
    information (even though Python does not have a strict access control
    between different classes).
266
267
268
269
270

    The XfrinState and its subclasses are designed to be stateless, and
    can be used as singleton objects.  For now, however, we always instantiate
    a new object for every state transition, partly because the introduction
    of singleton will make a code bit complicated, and partly because
271
    the overhead of object instantiation wouldn't be significant for xfrin.
JINMEI Tatuya's avatar
JINMEI Tatuya committed
272

273
274
    '''
    def set_xfrstate(self, conn, new_state):
275
        '''Set the XfrConnection to a given new state.
276
277
278

        As a "friend" class, this method intentionally gets access to the
        connection's "private" method.
JINMEI Tatuya's avatar
JINMEI Tatuya committed
279

280
281
282
        '''
        conn._XfrinConnection__set_xfrstate(new_state)

JINMEI Tatuya's avatar
JINMEI Tatuya committed
283
284
285
286
287
288
289
290
291
    def handle_rr(self, conn):
        '''Handle one RR of an XFR response message.

        Depending on the state, the RR is generally added or deleted in the
        corresponding data source, or in some special cases indicates
        a specifi transition, such as starting a new IXFR difference
        sequence or completing the session.

        All subclass has their specific behaviors for this method, so
292
293
294
        there is no default definition.  If the base class version
        is called, it's a bug of the caller, and it's notified via
        an XfrinException exception.
295
296
297
298
299
300

        This method returns a boolean value: True if the given RR was
        fully handled and the caller should go to the next RR; False
        if the caller needs to call this method with the (possibly) new
        state for the same RR again.

JINMEI Tatuya's avatar
JINMEI Tatuya committed
301
        '''
302
303
        raise XfrinException("Internal bug: " +
                             "XfrinState.handle_rr() called directly")
JINMEI Tatuya's avatar
JINMEI Tatuya committed
304

305
306
307
308
309
310
311
312
313
314
315
    def finish_message(self, conn):
        '''Perform any final processing after handling all RRs of a response.

        This method then returns a boolean indicating whether to continue
        receiving the message.  Unless it's in the end of the entire XFR
        session, we should continue, so this default method simply returns
        True.

        '''
        return True

316
317
class XfrinInitialSOA(XfrinState):
    def handle_rr(self, conn, rr):
318
        if rr.get_type() != RRType.SOA:
319
            raise XfrinProtocolError('First RR in zone transfer must be SOA ('
320
                                     + rr.get_type().to_text() + ' received)')
321
322
        conn._end_serial = get_soa_serial(rr.get_rdata()[0])

323
        if conn._request_type == RRType.IXFR and \
324
325
326
327
328
329
                conn._end_serial <= conn._request_serial:
            logger.info(XFRIN_IXFR_UPTODATE, conn.zone_str(),
                        conn._request_serial, conn._end_serial)
            self.set_xfrstate(conn, XfrinIXFRUptodate())
        else:
            self.set_xfrstate(conn, XfrinFirstData())
330

331
        return True
332
333
334

class XfrinFirstData(XfrinState):
    def handle_rr(self, conn, rr):
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
        '''Handle the first RR after initial SOA in an XFR session.

        This state happens exactly once in an XFR session, where
        we decide whether it's incremental update ("real" IXFR) or
        non incremental update (AXFR or AXFR-style IXFR).
        If we initiated IXFR and the transfer begins with two SOAs
        (the serial of the second one being equal to our serial),
        it's incremental; otherwise it's non incremental.

        This method always return False (unlike many other handle_rr()
        methods) because this first RR must be examined again in the
        determined update context.

        Note that in the non incremental case the RR should normally be
        something other SOA, but it's still possible it's an SOA with a
        different serial than ours.  The only possible interpretation at
        this point is that it's non incremental update that only consists
        of the SOA RR.  It will result in broken zone (for example, it
        wouldn't even contain an apex NS) and should be rejected at post
        XFR processing, but in terms of the XFR session processing we
        accept it and move forward.

        Note further that, in the half-broken SOA-only transfer case,
        these two SOAs are supposed to be the same as stated in Section 2.2
        of RFC 5936.  We don't check that condition here, either; we'll
        leave whether and how to deal with that situation to the end of
        the processing of non incremental update.  See also a related
        discussion at the IETF dnsext wg:
        http://www.ietf.org/mail-archive/web/dnsext/current/msg07908.html

        '''
366
367
        if conn._request_type == RRType.IXFR and \
                rr.get_type() == RRType.SOA and \
368
369
370
                conn._request_serial == get_soa_serial(rr.get_rdata()[0]):
            logger.debug(DBG_XFRIN_TRACE, XFRIN_GOT_INCREMENTAL_RESP,
                         conn.zone_str())
371
            conn._diff = None # Will be created on-demand
372
373
374
            self.set_xfrstate(conn, XfrinIXFRDeleteSOA())
        else:
            logger.debug(DBG_XFRIN_TRACE, XFRIN_GOT_NONINCREMENTAL_RESP,
375
                 conn.zone_str())
376
            # We are now going to add RRs to the new zone.  We need create
377
378
            # a Diff object.  It will be used throughtout the XFR session.
            conn._diff = Diff(conn._datasrc_client, conn._zone_name, True)
379
            self.set_xfrstate(conn, XfrinAXFR())
380
        return False
381
382

class XfrinIXFRDeleteSOA(XfrinState):
383
    def handle_rr(self, conn, rr):
384
        if rr.get_type() != RRType.SOA:
385
            # this shouldn't happen; should this occur it means an internal
386
            # bug.
387
388
            raise XfrinException(rr.get_type().to_text() +
                                 ' RR is given in IXFRDeleteSOA state')
389
        # This is the beginning state of one difference sequence (changes
390
        # for one SOA update).  We may need to create a new Diff object now.
391
392
393
        # Note also that we (unconditionally) enable journaling here.  The
        # Diff constructor may internally disable it, however, if the
        # underlying data source doesn't support journaling.
394
395
396
        if conn._diff is None:
            conn._diff = Diff(conn._datasrc_client, conn._zone_name, False,
                              True)
397
        conn._diff.delete_data(rr)
398
        self.set_xfrstate(conn, XfrinIXFRDelete())
399
        conn.get_transfer_stats().ixfr_deletion_count += 1
400
        return True
401

402
403
class XfrinIXFRDelete(XfrinState):
    def handle_rr(self, conn, rr):
404
        if rr.get_type() == RRType.SOA:
405
            # This is the only place where current_serial is set
406
            conn._current_serial = get_soa_serial(rr.get_rdata()[0])
407
            self.set_xfrstate(conn, XfrinIXFRAddSOA())
408
            return False
409
        conn._diff.delete_data(rr)
410
        conn.get_transfer_stats().ixfr_deletion_count += 1
411
412
        return True

413
414
class XfrinIXFRAddSOA(XfrinState):
    def handle_rr(self, conn, rr):
415
        if rr.get_type() != RRType.SOA:
416
            # this shouldn't happen; should this occur it means an internal
417
            # bug.
418
419
            raise XfrinException(rr.get_type().to_text() +
                                 ' RR is given in IXFRAddSOA state')
420
421
        conn._diff.add_data(rr)
        self.set_xfrstate(conn, XfrinIXFRAdd())
422
        conn.get_transfer_stats().ixfr_addition_count += 1
423
424
425
426
        return True

class XfrinIXFRAdd(XfrinState):
    def handle_rr(self, conn, rr):
427
        if rr.get_type() == RRType.SOA:
428
429
            # This SOA marks the end of a difference sequence
            conn.get_transfer_stats().ixfr_changeset_count += 1
430
431
            soa_serial = get_soa_serial(rr.get_rdata()[0])
            if soa_serial == conn._end_serial:
432
433
                # The final part is there. Finish the transfer by
                # checking the last TSIG (if required), the zone data and
434
                # committing.
435
                conn.finish_transfer()
436
437
438
                self.set_xfrstate(conn, XfrinIXFREnd())
                return True
            elif soa_serial != conn._current_serial:
439
440
441
442
                raise XfrinProtocolError('IXFR out of sync: expected ' +
                                         'serial ' +
                                         str(conn._current_serial) +
                                         ', got ' + str(soa_serial))
443
            else:
444
445
446
447
                # Apply a change to the database. But don't commit it yet,
                # we can't know if the message is/will be properly signed.
                # A complete commit will happen after the last bit.
                conn._diff.apply()
448
449
450
                self.set_xfrstate(conn, XfrinIXFRDeleteSOA())
                return False
        conn._diff.add_data(rr)
451
        conn.get_transfer_stats().ixfr_addition_count += 1
452
453
454
455
        return True

class XfrinIXFREnd(XfrinState):
    def handle_rr(self, conn, rr):
456
457
        raise XfrinProtocolError('Extra data after the end of IXFR diffs: ' +
                                 rr.to_text())
458

459
460
461
462
463
464
465
466
467
    def finish_message(self, conn):
        '''Final processing after processing an entire IXFR session.

        There will be more actions here, but for now we simply return False,
        indicating there will be no more message to receive.

        '''
        return False

468
469
470
471
472
473
474
475
class XfrinIXFRUptodate(XfrinState):
    def handle_rr(self, conn, rr):
        raise XfrinProtocolError('Extra data after single IXFR response ' +
                                 rr.to_text())

    def finish_message(self, conn):
        raise XfrinZoneUptodate

476
class XfrinAXFR(XfrinState):
477
    def handle_rr(self, conn, rr):
478
479
480
        """
        Handle the RR by putting it into the zone.
        """
481
        conn._diff.add_data(rr)
482
        if rr.get_type() == RRType.SOA:
483
484
            # SOA means end.  Don't commit it yet - we need to perform
            # post-transfer checks
485
486
487
488
489
490

            soa_serial = get_soa_serial(rr.get_rdata()[0])
            if conn._end_serial != soa_serial:
                logger.warn(XFRIN_AXFR_INCONSISTENT_SOA, conn.zone_str(),
                            conn._end_serial, soa_serial)

491
            self.set_xfrstate(conn, XfrinAXFREnd())
492
        conn.get_transfer_stats().axfr_rr_count += 1
493
494
495
496
        # Yes, we've eaten this RR.
        return True

class XfrinAXFREnd(XfrinState):
497
    def handle_rr(self, conn, rr):
498
499
500
501
502
503
504
        raise XfrinProtocolError('Extra data after the end of AXFR: ' +
                                 rr.to_text())

    def finish_message(self, conn):
        """
        Final processing after processing an entire AXFR session.

505
506
507
        This simply calls the finish_transfer method of the connection
        that ensures it is signed by TSIG (if required), the zone data
        is valid and commits it.
508
        """
509
        conn.finish_transfer()
510
        return False
511

512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
class XfrinTransferStats:
    """
    This class keeps a record of transfer data for logging purposes.
    It records number of messages, rrs, and bytes transfered, as well
    as the start and end time. The start time is set upon instantiation of
    this class. The end time is set the first time finalize(),
    get_running_time(), or get_bytes_per_second() is called. The end time is
    set only once; subsequent calls to any of these methods does not modify
    it further.
    All _count instance variables can be directly set as needed by the
    class collecting these results.
    """
    def __init__(self):
        self.message_count = 0
        self.axfr_rr_count = 0
        self.byte_count = 0
        self.ixfr_changeset_count = 0;
        self.ixfr_deletion_count = 0;
        self.ixfr_addition_count = 0;
        self._start_time = time.time()
        self._end_time = None

    def finalize(self):
        """Sets the end time to time.time() if not done already."""
        if self._end_time is None:
            self._end_time = time.time()

    def get_running_time(self):
        """Calls finalize(), then returns the difference between creation
           and finalization time"""
        self.finalize()
        return self._end_time - self._start_time

    def get_bytes_per_second(self):
        """Returns the number of bytes per second, based on the result of
           get_running_time() and the value of bytes_count."""
        runtime = self.get_running_time()
        if runtime > 0.0:
            return float(self.byte_count) / runtime
        else:
            # This should never happen, but if some clock is so
            # off or reset in the meantime, we do need to return
            # *something* (and not raise an error)
            if self.byte_count == 0:
                return 0.0
            else:
                return float("inf")


Likun Zhang's avatar
Likun Zhang committed
561
class XfrinConnection(asyncore.dispatcher):
562
    '''Do xfrin in this class. '''
Likun Zhang's avatar
Likun Zhang committed
563

564
    def __init__(self,
565
                 sock_map, zone_name, rrclass, datasrc_client,
566
                 shutdown_event, master_addrinfo, zone_soa, tsig_key=None,
567
                 idle_timeout=60):
568
569
570
571
572
573
574
575
        """Constructor of the XfirnConnection class.

        Parameters:
          sock_map: empty dict, used with asyncore.
          zone_name (dns.Name): Zone name.
          rrclass (dns.RRClass): Zone RR class.
          datasrc_client (DataSourceClient): the data source client object
            used for the XFR session.
JINMEI Tatuya's avatar
JINMEI Tatuya committed
576
          shutdown_event (threading.Event): used for synchronization with
577
578
579
580
581
582
            parent thread.
          master_addrinfo (tuple: (sock family, sock type, sockaddr)):
            address and port of the master server.
          zone_soa (RRset or None): SOA RRset of zone's current SOA or None
            if it's not available.
          idle_timeout (int): max idle time for read data from socket.
583

584
        """
585

586
        asyncore.dispatcher.__init__(self, map=sock_map)
JINMEI Tatuya's avatar
JINMEI Tatuya committed
587

Jelte Jansen's avatar
Jelte Jansen committed
588
        # The XFR state.  Conceptually this is purely private, so we emphasize
JINMEI Tatuya's avatar
JINMEI Tatuya committed
589
590
591
        # the fact by the double underscore.  Other classes are assumed to
        # get access to this via get_xfrstate(), and only XfrinState classes
        # are assumed to be allowed to modify it via __set_xfrstate().
592
        self.__state = None
JINMEI Tatuya's avatar
JINMEI Tatuya committed
593

594
595
596
        # Requested transfer type (RRType.AXFR or RRType.IXFR).  The actual
        # transfer type may differ due to IXFR->AXFR fallback:
        self._request_type = None
597
598

        # Zone parameters
Likun Zhang's avatar
Likun Zhang committed
599
        self._zone_name = zone_name
600
        self._rrclass = rrclass
601

602
        # Data source handler
603
        self._datasrc_client = datasrc_client
604
        self._zone_soa = zone_soa
605
606

        self._sock_map = sock_map
607
        self._soa_rr_count = 0
Likun Zhang's avatar
Likun Zhang committed
608
609
        self._idle_timeout = idle_timeout
        self._shutdown_event = shutdown_event
610
        self._master_addrinfo = master_addrinfo
611
        self._tsig_key = tsig_key
612
        self._tsig_ctx = None
613
614
        # tsig_ctx_creator is introduced to allow tests to use a mock class for
        # easier tests (in normal case we always use the default)
615
        self._tsig_ctx_creator = lambda key : TSIGContext(key)
616

617
618
619
        # keep a record of this specific transfer to log on success
        # (time, rr/s, etc)
        self._transfer_stats = XfrinTransferStats()
620
        self._counters = Counters(SPECFILE_LOCATION)
621

622
623
624
625
626
627
628
629
630
    def init_socket(self):
        '''Initialize the underlyig socket.

        This is essentially a part of __init__() and is expected to be
        called immediately after the constructor.  It's separated from
        the constructor because otherwise we might not be able to close
        it if the constructor raises an exception after opening the socket.
        '''
        self.create_socket(self._master_addrinfo[0], self._master_addrinfo[1])
631
        self.socket.setblocking(1)
Likun Zhang's avatar
Likun Zhang committed
632

633
634
635
636
637
638
    def __set_xfrstate(self, new_state):
        self.__state = new_state

    def get_xfrstate(self):
        return self.__state

639
640
641
642
643
    def get_transfer_stats(self):
        """Returns the transfer stats object, used to measure transfer time,
           and number of messages/records/bytes transfered."""
        return self._transfer_stats

644
    def zone_str(self):
Jelte Jansen's avatar
Jelte Jansen committed
645
646
        '''A convenience function for logging to include zone name and class'''
        return format_zone_str(self._zone_name, self._rrclass)
647

648
    def connect_to_master(self):
649
        '''Connect to master in TCP.'''
650

651
        try:
652
            self.connect(self._master_addrinfo[2])
653
654
            return True
        except socket.error as e:
655
656
            logger.error(XFRIN_CONNECT_MASTER, self.tsig_key_name,
                         self._master_addrinfo[2],
657
                         str(e))
658
659
            return False

Likun Zhang's avatar
Likun Zhang committed
660
    def _create_query(self, query_type):
661
662
        '''Create an XFR-related query message.

663
664
665
666
667
668
669
        query_type is either SOA, AXFR or IXFR.  An IXFR query needs the
        zone's current SOA record.  If it's not known, it raises an
        XfrinException exception.  Note that this may not necessarily a
        broken configuration; for the first attempt of transfer the secondary
        may not have any boot-strap zone information, in which case IXFR
        simply won't work.  The xfrin should then fall back to AXFR.
        _request_serial is recorded for later use.
670

671
        '''
672
        msg = Message(Message.RENDER)
673
        query_id = random.randint(0, 0xFFFF)
Likun Zhang's avatar
Likun Zhang committed
674
675
        self._query_id = query_id
        msg.set_qid(query_id)
676
677
        msg.set_opcode(Opcode.QUERY)
        msg.set_rcode(Rcode.NOERROR)
678
        msg.add_question(Question(self._zone_name, self._rrclass, query_type))
679
680
681
682
683
684

        # Remember our serial, if known
        self._request_serial = get_soa_serial(self._zone_soa.get_rdata()[0]) \
            if self._zone_soa is not None else None

        # Set the authority section with our SOA for IXFR
685
        if query_type == RRType.IXFR:
686
687
688
689
690
            if self._zone_soa is None:
                # (incremental) IXFR doesn't work without known SOA
                raise XfrinException('Failed to create IXFR query due to no ' +
                                     'SOA for ' + self.zone_str())
            msg.add_rrset(Message.SECTION_AUTHORITY, self._zone_soa)
691

Likun Zhang's avatar
Likun Zhang committed
692
693
694
695
696
697
698
699
700
701
702
        return msg

    def _send_data(self, data):
        size = len(data)
        total_count = 0
        while total_count < size:
            count = self.send(data[total_count:])
            total_count += count

    def _send_query(self, query_type):
        '''Send query message over TCP. '''
703

Likun Zhang's avatar
Likun Zhang committed
704
        msg = self._create_query(query_type)
Jelte Jansen's avatar
Jelte Jansen committed
705
        render = MessageRenderer()
706
707
708
        # XXX Currently, python wrapper doesn't accept 'None' parameter in this
        # case, we should remove the if statement and use a universal
        # interface later.
709
710
        if self._tsig_key is not None:
            self._tsig_ctx = self._tsig_ctx_creator(self._tsig_key)
711
712
713
            msg.to_wire(render, self._tsig_ctx)
        else:
            msg.to_wire(render)
Likun Zhang's avatar
Likun Zhang committed
714

715
        header_len = struct.pack('H', socket.htons(render.get_length()))
Likun Zhang's avatar
Likun Zhang committed
716
        self._send_data(header_len)
Jelte Jansen's avatar
Jelte Jansen committed
717
        self._send_data(render.get_data())
718
719
720

    def _asyncore_loop(self):
        '''
721
722
723
        This method is a trivial wrapper for asyncore.loop().  It's extracted from
        _get_request_response so that we can test the rest of the code without
        involving actual communication with a remote server.'''
JINMEI Tatuya's avatar
JINMEI Tatuya committed
724
        asyncore.loop(self._idle_timeout, map=self._sock_map, count=1)
725

Likun Zhang's avatar
Likun Zhang committed
726
727
728
729
730
731
    def _get_request_response(self, size):
        recv_size = 0
        data = b''
        while recv_size < size:
            self._recv_time_out = True
            self._need_recv_size = size - recv_size
732
            self._asyncore_loop()
Likun Zhang's avatar
Likun Zhang committed
733
734
735
736
737
738
739
740
            if self._recv_time_out:
                raise XfrinException('receive data from socket time out.')

            recv_size += self._recvd_size
            data += self._recvd_data

        return data

741
742
743
744
745
    def _check_response_tsig(self, msg, response_data):
        tsig_record = msg.get_tsig_record()
        if self._tsig_ctx is not None:
            tsig_error = self._tsig_ctx.verify(tsig_record, response_data)
            if tsig_error != TSIGError.NOERROR:
746
747
                raise XfrinProtocolError('TSIG verify fail: %s' %
                                         str(tsig_error))
748
749
750
751
752
753
754
755
        elif tsig_record is not None:
            # If the response includes a TSIG while we didn't sign the query,
            # we treat it as an error.  RFC doesn't say anything about this
            # case, but it clearly states the server must not sign a response
            # to an unsigned request.  Although we could be flexible, no sane
            # implementation would return such a response, and since this is
            # part of security mechanism, it's probably better to be more
            # strict.
756
            raise XfrinProtocolError('Unexpected TSIG in response')
757

758
759
760
761
762
    def _check_response_tsig_last(self):
        """
        Check there's a signature at the last message.
        """
        if self._tsig_ctx is not None:
763
            if not self._tsig_ctx.last_had_signature():
764
765
766
                raise XfrinProtocolError('TSIG verify fail: no TSIG on last '+
                                         'message')

767
768
769
770
    def __validate_error(self, reason):
        '''
        Used as error callback below.
        '''
771
772
        logger.error(XFRIN_ZONE_INVALID, self._zone_name,
                     self._rrclass, reason)
773

774
775
776
777
    def __validate_warning(self, reason):
        '''
        Used as warning callback below.
        '''
778
779
        logger.warn(XFRIN_ZONE_WARN, self._zone_name,
                    self._rrclass, reason)
780

781
    def finish_transfer(self):
782
783
        """
        Perform any necessary checks after a transfer. Then complete the
784
        transfer by committing the transaction into the data source.
785
786
        """
        self._check_response_tsig_last()
787
788
789
        if not check_zone(self._zone_name, self._rrclass,
                          self._diff.get_rrset_collection(),
                          (self.__validate_error, self.__validate_warning)):
790
            raise XfrinZoneError('Validation of the new zone failed')
791
792
        self._diff.commit()

793
    def __parse_soa_response(self, msg, response_data):
Jelte Jansen's avatar
Jelte Jansen committed
794
        '''Parse a response to SOA query and extract the SOA from answer.
795
796
797
798

        This is a subroutine of _check_soa_serial().  This method also
        validates message, and rejects bogus responses with XfrinProtocolError.

Jelte Jansen's avatar
Jelte Jansen committed
799
        If everything is okay, it returns the SOA RR from the answer section
800
801
802
        of the response.

        '''
Jelte Jansen's avatar
Jelte Jansen committed
803
        # Check TSIG integrity and validate the header.  Unlike AXFR/IXFR,
804
805
806
807
808
809
810
811
812
        # we should be more strict for SOA queries and check the AA flag, too.
        self._check_response_tsig(msg, response_data)
        self._check_response_header(msg)
        if not msg.get_header_flag(Message.HEADERFLAG_AA):
            raise XfrinProtocolError('non-authoritative answer to SOA query')

        # Validate the question section
        n_question = msg.get_rr_count(Message.SECTION_QUESTION)
        if n_question != 1:
813
814
815
            raise XfrinProtocolError('Invalid response to SOA query: ' +
                                     '(' + str(n_question) + ' questions, 1 ' +
                                     'expected)')
816
817
818
        resp_question = msg.get_question()[0]
        if resp_question.get_name() != self._zone_name or \
                resp_question.get_class() != self._rrclass or \
819
                resp_question.get_type() != RRType.SOA:
820
821
822
            raise XfrinProtocolError('Invalid response to SOA query: '
                                     'question mismatch: ' +
                                     str(resp_question))
823

824
        # Look into the answer section for SOA
825
        soa = None
826
        for rr in msg.get_section(Message.SECTION_ANSWER):
827
            if rr.get_type() == RRType.SOA:
828
829
830
831
                if soa is not None:
                    raise XfrinProtocolError('SOA response had multiple SOAs')
                soa = rr
            # There should not be a CNAME record at top of zone.
832
            if rr.get_type() == RRType.CNAME:
833
834
835
836
837
838
                raise XfrinProtocolError('SOA query resulted in CNAME')

        # If SOA is not found, try to figure out the reason then report it.
        if soa is None:
            # See if we have any SOA records in the authority section.
            for rr in msg.get_section(Message.SECTION_AUTHORITY):
839
                if rr.get_type() == RRType.NS:
840
                    raise XfrinProtocolError('SOA query resulted in referral')
841
                if rr.get_type() == RRType.SOA:
842
                    raise XfrinProtocolError('SOA query resulted in NODATA')
843
844
            raise XfrinProtocolError('No SOA record found in response to ' +
                                     'SOA query')
845
846
847
848
849
850
851
852

        # Check if the SOA is really what we asked for
        if soa.get_name() != self._zone_name or \
                soa.get_class() != self._rrclass:
            raise XfrinProtocolError("SOA response doesn't match query: " +
                                     str(soa))

        # All okay, return it
853
854
        return soa

855
    def _get_ipver_str(self):
Naoki Kambe's avatar
Naoki Kambe committed
856
        """Returns a 'v4' or 'v6' string representing a IP version
857
        depending on the socket family. This is for an internal use
858
859
860
861
        only (except for tests). This is supported only for IP sockets.
        It raises a ValueError exception on other address families.

        """
Naoki Kambe's avatar
Naoki Kambe committed
862
863
864
865
        if self.socket.family == socket.AF_INET:
            return 'v4'
        elif self.socket.family == socket.AF_INET6:
            return 'v6'
866
867
        raise ValueError("Invalid address family. "
                         "This is supported only for IP sockets")
868

Likun Zhang's avatar
Likun Zhang committed
869
    def _check_soa_serial(self):
870
871
872
873
874
875
        '''Send SOA query and compare the local and remote serials.

        If we know our local serial and the remote serial isn't newer
        than ours, we abort the session with XfrinZoneUptodate.
        On success it returns XFRIN_OK for testing.  The caller won't use it.

Likun Zhang's avatar
Likun Zhang committed
876
        '''
877

878
        self._send_query(RRType.SOA)
Naoki Kambe's avatar
Naoki Kambe committed
879
        # count soaoutv4 or soaoutv6 requests
Naoki Kambe's avatar
Naoki Kambe committed
880
881
882
        self._counters.inc('zones', self._rrclass.to_text(),
                           self._zone_name.to_text(), 'soaout' +
                           self._get_ipver_str())
883
884
885
        data_len = self._get_request_response(2)
        msg_len = socket.htons(struct.unpack('H', data_len)[0])
        soa_response = self._get_request_response(msg_len)
886
        msg = Message(Message.PARSE)
887
        msg.from_wire(soa_response, Message.PRESERVE_ORDER)
888

889
890
891
        # Validate/parse the rest of the response, and extract the SOA
        # from the answer section
        soa = self.__parse_soa_response(msg, soa_response)
892

893
        # Compare the two serials.  If ours is 'new', abort with ZoneUptodate.
894
        primary_serial = get_soa_serial(soa.get_rdata()[0])
895
        if self._request_serial is not None and \
896
897
898
899
900
901
902
903
                self._request_serial >= primary_serial:
            if self._request_serial != primary_serial:
                logger.info(XFRIN_ZONE_SERIAL_AHEAD, primary_serial,
                            self.zone_str(),
                            format_addrinfo(self._master_addrinfo),
                            self._request_serial)
            raise XfrinZoneUptodate

Likun Zhang's avatar
Likun Zhang committed
904
905
        return XFRIN_OK

906
    def do_xfrin(self, check_soa, request_type=RRType.AXFR):
907
        '''Do an xfr session by sending xfr request and parsing responses.'''
908

Likun Zhang's avatar
Likun Zhang committed
909
910
        try:
            ret = XFRIN_OK
911
            self._request_type = request_type
912
            req_str = request_type.to_text()
Likun Zhang's avatar
Likun Zhang committed
913
            if check_soa:
914
                self._check_soa_serial()
915
916
917
918
                self.close()
                self.init_socket()
                if not self.connect_to_master():
                    raise XfrinException('Unable to reconnect to master')
Jelte Jansen's avatar
Jelte Jansen committed
919

920
            # start statistics timer
921
922
923
            # Note: If the timer for the zone is already started but
            # not yet stopped due to some error, the last start time
            # is overwritten at this point.
Naoki Kambe's avatar
Naoki Kambe committed
924
925
926
927
928
            self._counters.start_timer('zones',
                                       self._rrclass.to_text(),
                                       self._zone_name.to_text(),
                                       'last_' + req_str.lower() +
                                       '_duration')
929
            logger.info(XFRIN_XFR_TRANSFER_STARTED, req_str, self.zone_str())
930
            # An AXFR or IXFR is being requested.
Naoki Kambe's avatar
Naoki Kambe committed
931
932
933
934
            self._counters.inc('zones', self._rrclass.to_text(),
                               self._zone_name.to_text(),
                               req_str.lower() + 'req' +
                               self._get_ipver_str())
935
936
937
            self._send_query(self._request_type)
            self.__state = XfrinInitialSOA()
            self._handle_xfrin_responses()
Jelte Jansen's avatar
Jelte Jansen committed
938
939
940
            # Depending what data was found, we log different status reports
            # (In case of an AXFR-style IXFR, print the 'AXFR' message)
            if self._transfer_stats.axfr_rr_count == 0:
941
942
943
944
945
946
947
948
949
950
951
                logger.info(XFRIN_IXFR_TRANSFER_SUCCESS,
                            self.zone_str(),
                            self._transfer_stats.message_count,
                            self._transfer_stats.ixfr_changeset_count,
                            self._transfer_stats.ixfr_deletion_count,
                            self._transfer_stats.ixfr_addition_count,
                            self._transfer_stats.byte_count,
                            "%.3f" % self._transfer_stats.get_running_time(),
                            "%.f" % self._transfer_stats.get_bytes_per_second()
                           )
            else:
Jelte Jansen's avatar
Jelte Jansen committed
952
953
                logger.info(XFRIN_TRANSFER_SUCCESS,
                            req_str,
954
955
956
957
958
959
960
                            self.zone_str(),
                            self._transfer_stats.message_count,
                            self._transfer_stats.axfr_rr_count,
                            self._transfer_stats.byte_count,
                            "%.3f" % self._transfer_stats.get_running_time(),
                            "%.f" % self._transfer_stats.get_bytes_per_second()
                           )
961
962
        except XfrinZoneUptodate:
            # Eventually we'll probably have to treat this case as a trigger
963
            # of trying another primary server, etc, but for now we treat it
964
965
            # as "success".
            pass
966
967
968
969
        except XfrinZoneError:
            # The log message doesn't contain the exception text, since there's
            # only one place where the exception is thrown now and it'd be the
            # same generic message every time.
970
971
            logger.error(XFRIN_INVALID_ZONE_DATA,
                         self.zone_str(),
972
973
                         format_addrinfo(self._master_addrinfo))
            ret = XFRIN_FAIL
974
        except XfrinProtocolError as e:
975
976
            logger.info(XFRIN_XFR_TRANSFER_PROTOCOL_VIOLATION,
                        req_str, self.zone_str(),
977
978
979
                        format_addrinfo(self._master_addrinfo), str(e))
            ret = XFRIN_FAIL
        except XfrinException as e:
980
981
            logger.error(XFRIN_XFR_TRANSFER_FAILURE,
                         req_str, self.zone_str(),
982
                         format_addrinfo(self._master_addrinfo), str(e))
983
            ret = XFRIN_FAIL
984
985
986
987
988
989
990
991
992
        except Exception as e:
            # Catching all possible exceptions like this is generally not a
            # good practice, but handling an xfr session could result in
            # so many types of exceptions, including ones from the DNS library
            # or from the data source library.  Eventually we'd introduce a
            # hierarchy for exception classes from a base "ISC exception" and
            # catch it here, but until then we need broadest coverage so that
            # we won't miss anything.

993
            logger.error(XFRIN_XFR_OTHER_FAILURE, req_str,
JINMEI Tatuya's avatar
JINMEI Tatuya committed
994
                         self.zone_str(), str(e))
995
            ret = XFRIN_FAIL
Likun Zhang's avatar
Likun Zhang committed
996
        finally:
997
998
999
            # A xfrsuccess or xfrfail counter is incremented depending on
            # the result.
            result = {XFRIN_OK: 'xfrsuccess', XFRIN_FAIL: 'xfrfail'}[ret]
Naoki Kambe's avatar
Naoki Kambe committed
1000
1001
            self._counters.inc('zones', self._rrclass.to_text(),
                               self._zone_name.to_text(), result)
1002
1003
1004
1005
            # The started statistics timer is finally stopped only in
            # a successful case.
            if ret == XFRIN_OK:
                self._counters.stop_timer('zones',
Naoki Kambe's avatar
Naoki Kambe committed
1006
                                          self._rrclass.to_text(),
1007
1008
1009
                                          self._zone_name.to_text(),
                                          'last_' + req_str.lower() +
                                          '_duration')
1010
1011
1012
1013
            # Make sure any remaining transaction in the diff is closed
            # (if not yet - possible in case of xfr-level exception) as soon
            # as possible
            self._diff = None
Likun Zhang's avatar
Likun Zhang committed
1014
        return ret
1015

1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
    def _check_response_header(self, msg):
        '''Perform minimal validation on responses'''

        # It's not clear how strict we should be about response validation.
        # BIND 9 ignores some cases where it would normally be considered a
        # bogus response.  For example, it accepts a response even if its
        # opcode doesn't match that of the corresponding request.
        # According to an original developer of BIND 9 some of the missing
        # checks are deliberate to be kind to old implementations that would
        # cause interoperability trouble with stricter checks.

1027
        msg_rcode = msg.get_rcode()
1028
        if msg_rcode != Rcode.NOERROR:
1029
1030
            raise XfrinProtocolError('error response: %s' %
                                     msg_rcode.to_text())
Likun Zhang's avatar
Likun Zhang committed
1031

1032
        if not msg.get_header_flag(Message.HEADERFLAG_QR):
1033
            raise XfrinProtocolError('response is not a response')
Likun Zhang's avatar
Likun Zhang committed
1034
1035

        if msg.get_qid() != self._query_id:
1036
            raise XfrinProtocolError('bad query id')
Likun Zhang's avatar
Likun Zhang committed
1037

1038
1039
1040
1041
1042
    def _check_response_status(self, msg):
        '''Check validation of xfr response. '''

        self._check_response_header(msg)

1043
        if msg.get_rr_count(Message.SECTION_QUESTION) > 1:
1044
            raise XfrinProtocolError('query section count greater than 1')
Likun Zhang's avatar
Likun Zhang committed
1045

1046
1047
1048
1049
1050
    def _handle_xfrin_responses(self):
        read_next_msg = True
        while read_next_msg:
            data_len = self._get_request_response(2)
            msg_len = socket.htons(struct.unpack('H', data_len)[0])
1051
            self._transfer_stats.byte_count += msg_len + 2
1052
1053
1054
            recvdata = self._get_request_response(msg_len)
            msg = Message(Message.PARSE)
            msg.from_wire(recvdata, Message.PRESERVE_ORDER)
1055
            self._transfer_stats.message_count += 1
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067

            # TSIG related checks, including an unexpected signed response
            self._check_response_tsig(msg, recvdata)

            # Perform response status validation
            self._check_response_status(msg)

            for rr in msg.get_section(Message.SECTION_ANSWER):
                rr_handled = False
                while not rr_handled:
                    rr_handled = self.__state.handle_rr(self, rr)

1068
1069
            read_next_msg = self.__state.finish_message(self)

1070
1071
1072
            if self._shutdown_event.is_set():
                raise XfrinException('xfrin is forced to stop')

1073
1074
1075
1076
1077
1078
1079
    def handle_read(self):
        '''Read query's response from socket. '''

        self._recvd_data = self.recv(self._need_recv_size)
        self._recvd_size = len(self._recvd_data)
        self._recv_time_out = False

Likun Zhang's avatar
Likun Zhang committed
1080
1081
    def writable(self):
        '''Ignore the writable socket. '''
1082

Likun Zhang's avatar
Likun Zhang committed
1083
1084
        return False

1085
1086
1087
1088
1089
1090
1091
1092
def __get_initial_xfr_type(zone_soa, request_ixfr, zname, zclass, master_addr):
    """Determine the initial xfr request type.

    This is a dedicated subroutine of __process_xfrin.
    """
    if zone_soa is None:
        # This is a kind of special case, so we log it at info level.
        logger.info(XFRIN_INITIAL_AXFR, format_zone_str(zname, zclass),
JINMEI Tatuya's avatar
JINMEI Tatuya committed
1093
                    AddressFormatter(master_addr))
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
        return RRType.AXFR
    if request_ixfr == ZoneInfo.REQUEST_IXFR_DISABLED:
        logger.debug(DBG_XFRIN_TRACE, XFRIN_INITIAL_IXFR_DISABLED,
                     format_zone_str(zname, zclass),
                     AddressFormatter(master_addr))
        return RRType.AXFR

    assert(request_ixfr == ZoneInfo.REQUEST_IXFR_FIRST or
           request_ixfr == ZoneInfo.REQUEST_IXFR_ONLY)
    logger.debug(DBG_XFRIN_TRACE, XFRIN_INITIAL_IXFR,
                     format_zone_str(zname, zclass),
                     AddressFormatter(master_addr))
    return RRType.IXFR

1108
def __process_xfrin(server, zone_name, rrclass, datasrc_client, zone_soa,
1109
                    shutdown_event, master_addrinfo, check_soa, tsig_key,
1110
                    request_ixfr, conn_class):
1111
1112
    conn = None
    exception = None
1113
    ret = XFRIN_FAIL
1114
    try:
1115
        # Determine the initialreuqest type: AXFR or IXFR.
1116
1117
1118
        request_type = __get_initial_xfr_type(zone_soa, request_ixfr,
                                              zone_name, rrclass,
                                              master_addrinfo[2])
1119

1120
1121
        # Create a TCP connection for the XFR session and perform the
        # operation.
1122
        sock_map = {}
1123
1124
        # In case we were asked to do IXFR and that one fails, we try again
        # with AXFR. But only if we could actually connect to the server.
1125
        #
1126
1127
1128
        # So we start with retry as True, which is set to false on each
        # attempt. In the case of connected but failed IXFR, we set it to true
        # once again.
1129
1130
1131
1132
        retry = True
        while retry:
            retry = False
            conn = conn_class(sock_map, zone_name, rrclass, datasrc_client,
1133
1134
                              shutdown_event, master_addrinfo, zone_soa,
                              tsig_key)
1135
1136
1137
1138
            conn.init_socket()
            ret = XFRIN_FAIL
            if conn.connect_to_master():
                ret = conn.do_xfrin(check_soa, request_type)
1139
                if ret == XFRIN_FAIL and request_type == RRType.IXFR:
1140
1141
1142
                    # IXFR failed for some reason. It might mean the server
                    # can't handle it, or we don't have the zone or we are out
                    # of sync or whatever else. So we retry with with AXFR, as
1143
1144
1145
1146
1147
                    # it may succeed in many such cases; if "IXFR only" is
                    # specified in request_ixfr, however, we suppress the
                    # fallback.
                    if request_ixfr == ZoneInfo.REQUEST_IXFR_ONLY:
                        logger.warn(XFRIN_XFR_TRANSFER_FALLBACK_DISABLED,
1148
                                    tsig_key, conn.zone_str())
1149
1150
1151
1152
                    else:
                        retry = True
                        request_type = RRType.AXFR
                        logger.warn(XFRIN_XFR_TRANSFER_FALLBACK,
1153
                                    tsig_key, conn.zone_str())
1154
1155
                        conn.close()
                        conn = None
1156

1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
    except Exception as ex:
        # If exception happens, just remember it here so that we can re-raise
        # after cleaning up things.  We don't log it here because we want
        # eliminate smallest possibility of having an exception in logging
        # itself.
        exception = ex

    # asyncore.dispatcher requires explicit close() unless its lifetime
    # from born to destruction is closed within asyncore.loop, which is not
    # the case for us.  We always close() here, whether or not do_xfrin
    # succeeds, and even when we see an unexpected exception.
    if conn is not None:
        conn.close()
1170

1171
1172
1173
1174
    # Publish the zone transfer result news, so zonemgr can reset the
    # zone timer, and xfrout can notify the zone's slaves if the result
    # is success.
    server.publish_xfrin_news(zone_name, rrclass, ret)
1175
1176
1177
1178

    if exception is not None:
        raise exception

1179
1180
1181
def process_xfrin(server, xfrin_recorder, zone_name, rrclass, datasrc_client,
                  zone_soa, shutdown_event, master_addrinfo, check_soa,
                  tsig_key, request_ixfr, conn_class=XfrinConnection):
1182
1183
1184
1185
1186
1187
1188
    # Even if it should be rare, the main process of xfrin session can
    # raise an exception.  In order to make sure the lock in xfrin_recorder
    # is released in any cases, we delegate the main part to the helper
    # function in the try block, catch any exceptions, then release the lock.
    xfrin_recorder.increment(zone_name)
    exception = None
    try:
1189
        __process_xfrin(server, zone_name, rrclass, datasrc_client, zone_soa,
1190
                        shutdown_event, master_addrinfo, check_soa, tsig_key,