xfrin.py.in 15 KB
Newer Older
Likun Zhang's avatar
Likun Zhang committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!@PYTHON@

# Copyright (C) 2010  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.


import sys; sys.path.append ('@@PYTHONPATH@@')
import os
import signal
import isc
import asyncore
import struct
import threading
import socket
import random
28
from optparse import OptionParser, OptionValueError
Likun Zhang's avatar
Likun Zhang committed
29
30
from isc.config.ccsession import *
try:
31
    from bind10_dns import *
Likun Zhang's avatar
Likun Zhang committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
except:
    pass

# If B10_FROM_SOURCE is set in the environment, we use data files
# from a directory relative to that, otherwise we use the ones
# installed on the system
if "B10_FROM_SOURCE" in os.environ:
    SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrin"
else:
    PREFIX = "@prefix@"
    DATAROOTDIR = "@datarootdir@"
    SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
SPECFILE_LOCATION = SPECFILE_PATH + "/xfrin.spec"


__version__ = 'BIND10'
# define xfrin rcode
XFRIN_OK = 0
XFRIN_RECV_TIMEOUT = 1
XFRIN_NO_NEWDATA = 2
XFRIN_QUOTA_ERROR = 3
XFRIN_IS_DOING = 4

# define xfrin state
XFRIN_QUERY_SOA = 1
XFRIN_FIRST_AXFR = 2
XFRIN_FIRST_IXFR = 3

60
61
def log_error(msg):
    sys.stderr.write("[b10-xfrin] ")
62
    sys.stderr.write(str(msg))
63
64
65
    sys.stderr.write('\n')


Likun Zhang's avatar
Likun Zhang committed
66
67
68
69
70
71
72
73
74
75
76
77
class XfrinException(Exception): 
    pass


class XfrinConnection(asyncore.dispatcher):
    '''Do xfrin in this class. '''    

    def __init__(self, zone_name, db_file, 
                 shutdown_event,
                 master_addr, 
                 port = 53, 
                 check_soa = True, 
78
79
                 verbose = False,
                 idle_timeout = 60): 
Likun Zhang's avatar
Likun Zhang committed
80
81
82
83
84
85
86
87
        ''' idle_timeout: max idle time for read data from socket.
            db_file: specify the data source file.
            check_soa: when it's true, check soa first before sending xfr query
        '''
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self._zone_name = zone_name
        self._db_file = db_file
88
89
        self._axfrin_db = isc.auth.sqlite3_ds.AXFRInDB(self._db_file, self._zone_name)
        self._soa_rr_count = 0
Likun Zhang's avatar
Likun Zhang committed
90
91
92
93
        self._idle_timeout = idle_timeout
        self.setblocking(1)
        self.connect((master_addr, port))
        self._shutdown_event = shutdown_event
94
        self._verbose = verbose
Likun Zhang's avatar
Likun Zhang committed
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126

    def _create_query(self, query_type):
        '''Create dns query message. '''
        msg = message(message_mode.RENDER)
        query_id = random.randint(1, 0xFFFF)
        self._query_id = query_id
        msg.set_qid(query_id)
        msg.set_opcode(op_code.QUERY())
        msg.set_rcode(rcode.NOERROR())
        query_question = question(name(self._zone_name), rr_class.IN(), query_type)
        msg.add_question(query_question)
        return msg

    def _send_data(self, data):
        size = len(data)
        total_count = 0
        while total_count < size:
            count = self.send(data[total_count:])
            total_count += count


    def _send_query(self, query_type):
        '''Send query message over TCP. '''
        msg = self._create_query(query_type)
        obuf = output_buffer(0)
        render = message_render(obuf)
        msg.to_wire(render)
        header_len = struct.pack('H', socket.htons(obuf.get_length()))

        self._send_data(header_len)
        self._send_data(obuf.get_data())

127
    
Likun Zhang's avatar
Likun Zhang committed
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
    def _get_request_response(self, size):
        recv_size = 0
        data = b''
        while recv_size < size:
            self._recv_time_out = True
            self._need_recv_size = size - recv_size
            asyncore.loop(self._idle_timeout, count = 1)
            if self._recv_time_out:
                raise XfrinException('receive data from socket time out.')

            recv_size += self._recvd_size
            data += self._recvd_data

        return data


    def handle_read(self):
        '''Read query's response from socket. '''
        self._recvd_data = self.recv(self._need_recv_size)
        self._recvd_size = len(self._recvd_data)
        self._recv_time_out = False


    def _check_soa_serial(self):
        ''' Compare the soa serial, if soa serial in master is less than
        the soa serial in local, Finish xfrin.
        False: soa serial in master is less or equal to the local one.
        True: soa serial in master is bigger
        '''
        self._send_query(rr_type.SOA())
        data_size = self._get_request_response(2)
        soa_reply = self._get_request_response(int(data_size))
        #TODO, need select soa record from data source then compare the two 
        #serial 
        return XFRIN_OK

    def do_xfrin(self, check_soa, ixfr_first = False):
        try:
            ret = XFRIN_OK
            if check_soa:
                ret =  self._check_soa_serial()
169
170
            
            self.log_msg('transfer of \'%s\': AXFR started' % self._zone_name)
Likun Zhang's avatar
Likun Zhang committed
171
            if ret == XFRIN_OK:    
172
                self._axfrin_db.prepare_axfrin()
Likun Zhang's avatar
Likun Zhang committed
173
174
175
                self._send_query(rr_type.AXFR())
                ret = self._handle_xfrin_response()

176
            self.log_msg('transfer of \'%s\' AXFR ended' % self._zone_name)
Likun Zhang's avatar
Likun Zhang committed
177
        except XfrinException as e:
178
179
            self.log_msg(e)
            self.log_msg('Error happened during xfrin!')
Likun Zhang's avatar
Likun Zhang committed
180
181
182
            #TODO, recover data source. 
        finally:
           self.close()
183
184
           if ret == XFRIN_OK:
               self._axfrin_db.finish_axfrin()
Likun Zhang's avatar
Likun Zhang committed
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219

        return ret
    
    def _check_response_status(self, msg):
        #TODO, check more?
        if msg.get_rcode() != rcode.NOERROR():
            raise XfrinException('error response: ')

        if not msg.get_header_flag(message_flag.QR()):
            raise XfrinException('response is not a response ')

        if msg.get_qid() != self._query_id:
            raise XfrinException('bad query id')

        if msg.get_rr_count(section.ANSWER()) == 0:
            raise XfrinException('answer section is empty')

        if msg.get_rr_count(section.QUESTION()) > 1:
            raise XfrinException('query section count greater than 1')


    def _handle_answer_section(self, rrset_iter):
        while not rrset_iter.is_last():
            rrset = rrset_iter.get_rrset()
            rrset_iter.next()
            rrset_name = rrset.get_name().to_text()
            rrset_ttl = int(rrset.get_ttl().to_text())
            rrset_class = rrset.get_class().to_text()
            rrset_type = rrset.get_type().to_text()

            rdata_iter = rrset.get_rdata_iterator()
            rdata_iter.first()
            while not rdata_iter.is_last():
                # Count the soa record count
                if rrset.get_type() == rr_type.SOA():
220
221
222
223
                    self._soa_rr_count += 1
                    if (self._soa_rr_count == 2):
                        # Avoid inserting soa record twice                        
                        return
Likun Zhang's avatar
Likun Zhang committed
224
225
226

                rdata_text = rdata_iter.get_current().to_text()
                rr_data = (rrset_name, rrset_ttl, rrset_class, rrset_type, rdata_text)
227
                self._axfrin_db.insert_axfr_record([rr_data]) 
Likun Zhang's avatar
Likun Zhang committed
228
229
230
231
232
233
234
235
236
237
238
239
240
                rdata_iter.next()


    def _handle_xfrin_response(self):
        while True:
            data_len = self._get_request_response(2)
            msg_len = socket.htons(struct.unpack('H', data_len)[0])
            recvdata = self._get_request_response(msg_len)
            msg = message(message_mode.PARSE)
            msg.from_wire(input_buffer(recvdata))
            self._check_response_status(msg)
            
            rrset_iter = section_iter(msg, section.ANSWER())
241
242
            self._handle_answer_section(rrset_iter)
            if self._soa_rr_count == 2:
Likun Zhang's avatar
Likun Zhang committed
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
                return XFRIN_OK
            
            if self._shutdown_event.is_set():
                #Check if xfrin process is shutdown.
                #TODO, xfrin may be blocked in one loop. 
                raise XfrinException('xfrin is forced to stop')

        return XFRIN_OK


    def writable(self):
        '''Ignore the writable socket. '''
        return False


258
    def log_info(self, msg, type='info'):
Likun Zhang's avatar
Likun Zhang committed
259
260
261
        # Overwrite the log function, log nothing
        pass

262
263
    def log_msg(self, msg):
        sys.stdout.write('[b10-xfrin] ')
264
        sys.stdout.write(str(msg))
265
266
        sys.stdout.write('\n')

Likun Zhang's avatar
Likun Zhang committed
267
268

def process_xfrin(xfrin_recorder, zone_name, db_file, 
269
                  shutdown_event, master_addr, port, check_soa, verbose):
Likun Zhang's avatar
Likun Zhang committed
270
271
272
    xfrin_recorder.increment(name)
    try:
        conn = XfrinConnection(zone_name, db_file, shutdown_event, 
273
                           master_addr, int(port), check_soa, verbose)
Likun Zhang's avatar
Likun Zhang committed
274
275
        conn.do_xfrin(False)
    except Exception as e:
276
        log_error(str(e))
Likun Zhang's avatar
Likun Zhang committed
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309

    xfrin_recorder.decrement(zone_name)


class XfrinRecorder():
    def __init__(self):
        self._lock = threading.Lock()
        self._zones = []

    def increment(self, zone_name):
        self._lock.acquire()
        self._zones.append(zone_name)
        self._lock.release()

    def decrement(self, zone_name):
        self._lock.acquire()
        if zone_name in self._zones:
            self._zones.remove(zone_name)
        self._lock.release()

    def xfrin_in_progress(self, zone_name):
        self._lock.acquire()
        ret = zone_name in self._zones
        self._lock.release()
        return ret

    def count(self):
        self._lock.acquire()
        ret = len(self._zones)
        self._lock.release()
        return ret

class Xfrin():
310
    def __init__(self, verbose = False):
Likun Zhang's avatar
Likun Zhang committed
311
312
313
314
315
        self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
        self._cc.start()
        self._max_transfers_in = 10
        self.recorder = XfrinRecorder()
        self._shutdown_event = threading.Event()
316
        self._verbose = verbose
Likun Zhang's avatar
Likun Zhang committed
317
318
319


    def config_handler(self, new_config):
Likun Zhang's avatar
Likun Zhang committed
320
321
        # TODO, process new config data
        return create_answer(0)
Likun Zhang's avatar
Likun Zhang committed
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370


    def _print_settings(self):
        full_config = self._cc.get_full_config()
        for item in full_config:
            print(item + ":" + str(full_config[item]))


    def shutdown(self):
        ''' shutdown the xfrin process. the thread which is doing xfrin should be 
        terminated.
        '''
        self._shutdown_event.set()
        main_thread = threading.currentThread()
        for th in threading.enumerate():
            if th is main_thread:
                continue
            th.join()


    def command_handler(self, command, args):
        answer = create_answer(0)
        cmd = command
        try:
            if cmd == 'print_message':
                print(args)

            elif cmd == 'print_settings':
                self._print_settings()  

            elif cmd == 'shutdown':
                self._shutdown_event.set()

            elif cmd == 'retransfer':
                zone_name, master, port, db_file = self._parse_cmd_params(args)
                ret = self.xfrin_start(zone_name, db_file, master, port, False)
                answer = create_answer(ret[0], ret[1])

            elif cmd == 'refresh':
                zone_name, master, port, db_file = self._parse_cmd_params(args)
                ret = self.xfrin_start(zone_name, db_file, master, port)
                answer = create_answer(ret[0], ret[1])

        except XfrinException as err:
            answer = create_answer(1, str(err))

        return answer

    def _parse_cmd_params(self, args):
371
372
373
374
375
376
377
378
        zone_name = args.get('zone_name')
        if not zone_name:
            raise XfrinException('zone name should be provided')

        master = args.get('master')
        if not master:
            raise XfrinException('master address should be provided')

Likun Zhang's avatar
Likun Zhang committed
379
        check_addr(master)
380
381
382
383
        port = 53
        port_str = args.get('port')
        if port_str:
            port = int(port_str)
Likun Zhang's avatar
Likun Zhang committed
384
385
386
387
388
            check_port(port)

        db_file = args.get('db_file')
        if not db_file:
            #TODO, the db file path should be got in auth server's configuration
389
            db_file = '@@LOCALSTATEDIR@@/@PACKAGE@/zone.sqlite3'
Likun Zhang's avatar
Likun Zhang committed
390
391
392
393
394
395
396
397
398
399
400
401

        return (zone_name, master, port, db_file)
            

    def startup(self):
        while not self._shutdown_event.is_set():
            self._cc.check_command()


    def xfrin_start(self, zone_name, db_file, master_addr, 
                    port = 53, 
                    check_soa = True):
402
403
        if "bind10_dns" not in sys.modules:
            return (1, "xfrin failed, can't load dns message python library: 'bind10_dns'")
Likun Zhang's avatar
Likun Zhang committed
404
405
406
407
408
409
410
411
412
413
414
415
416
417

        # check max_transfer_in, else return quota error
        if self.recorder.count() >= self._max_transfers_in:
            return (1, 'xfrin quota error')

        if self.recorder.xfrin_in_progress(zone_name):
            return (1, 'zone xfrin is in progress')

        xfrin_thread = threading.Thread(target = process_xfrin, 
                                        args = (self.recorder, 
                                                zone_name, 
                                                db_file, 
                                                self._shutdown_event,
                                                master_addr, 
418
419
                                                port, check_soa, self._verbose))
                                                
Likun Zhang's avatar
Likun Zhang committed
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
        xfrin_thread.start()
        return (0, 'zone xfrin is started')


xfrind = None

def signal_handler(signal, frame):
    if xfrind:
        xfrind.shutdown()
    sys.exit(0)

def set_signal_handler():
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)

def check_port(value):
    if (value < 0) or (value > 65535):
        raise XfrinException('requires a port number (0-65535)')

def check_addr(ipstr):
    ip_family = socket.AF_INET
    if (ipstr.find(':') != -1):
        ip_family = socket.AF_INET6

    try:
        socket.inet_pton(ip_family, ipstr)
    except:
        raise XfrinException("%s invalid ip address" % ipstr)


450
451
452
453
454
def set_cmd_options(parser):
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
            help="display more about what is going on")

    
Likun Zhang's avatar
Likun Zhang committed
455
456
if __name__ == '__main__':
    try:
457
458
459
460
        parser = OptionParser(version = __version__)
        set_cmd_options(parser)
        (options, args) = parser.parse_args()

Likun Zhang's avatar
Likun Zhang committed
461
        set_signal_handler()
462
        xfrind = Xfrin(verbose = options.verbose)
Likun Zhang's avatar
Likun Zhang committed
463
464
        xfrind.startup()
    except KeyboardInterrupt:
465
        log_error("exit b10-xfrin")
466
    except isc.cc.session.SessionError as e:
467
        log_error(str(e))
468
        log_error('Error happened! is the command channel daemon running?')
Likun Zhang's avatar
Likun Zhang committed
469
    except Exception as e:
470
        log_error(str(e))
Likun Zhang's avatar
Likun Zhang committed
471
472
473
474
475
476

    if xfrind:
        xfrind.shutdown()