xfrout.py.in 19.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!@PYTHON@

# Copyright (C) 2010  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.


import sys; sys.path.append ('@@PYTHONPATH@@')
import isc
import isc.cc
import threading
import struct
import signal
Evan Hunt's avatar
Evan Hunt committed
25
from isc.datasrc import sqlite3_ds
26
27
28
from socketserver import *
import os
from isc.config.ccsession import *
29
from isc.log.log import *
30
from isc.cc import SessionError, SessionTimeout
31
from isc.notify import notify_out
32
import socket
33
import select
34
import errno
35
36
from optparse import OptionParser, OptionValueError
try:
37
    from libxfr_python import *
Jelte Jansen's avatar
Jelte Jansen committed
38
    from pydnspp import *
39
40
41
42
except ImportError as e:
    # C++ loadable module may not be installed; even so the xfrout process
    # must keep running, so we warn about it and move forward.
    sys.stderr.write('[b10-xfrout] failed to import DNS or XFR module: %s\n' % str(e))
43

44
45
if "B10_FROM_BUILD" in os.environ:
    SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout"
46
    AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
47
    UNIX_SOCKET_FILE= os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn"
48
49
50
51
else:
    PREFIX = "@prefix@"
    DATAROOTDIR = "@datarootdir@"
    SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
52
    AUTH_SPECFILE_PATH = SPECFILE_PATH
53
    UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/auth_xfrout_conn"
54

55
SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
56
AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec"
57
MAX_TRANSFERS_OUT = 10
Jerry's avatar
Jerry committed
58
VERBOSE_MODE = False
59

60
61
62
63
64
65
66
67
68
XFROUT_MAX_MESSAGE_SIZE = 65535

def get_rrset_len(rrset):
    """Returns the wire length of the given RRset"""
    bytes = bytearray()
    rrset.to_wire(bytes)
    return len(bytes)


69
class XfroutSession(BaseRequestHandler):
Jerry's avatar
Jerry committed
70
    def __init__(self, request, client_address, server, log):
Jelte Jansen's avatar
Jelte Jansen committed
71
72
        # The initializer for the superclass may call functions
        # that need _log to be set, so we set it first
Jerry's avatar
Jerry committed
73
        self._log = log
Jelte Jansen's avatar
Jelte Jansen committed
74
        BaseRequestHandler.__init__(self, request, client_address, server)
Jerry's avatar
Jerry committed
75

76
77
    def handle(self):
        fd = recv_fd(self.request.fileno())
78
        
79
        if fd < 0:
80
81
82
            # This may happen when one xfrout process try to connect to
            # xfrout unix socket server, to check whether there is another
            # xfrout running. 
83
            self._log.log_message("error", "Failed to receive the file descriptor for XFR connection")
84
85
            return

86
        data_len = self.request.recv(2)
87
        msg_len = struct.unpack('!H', data_len)[0]
88
89
90
91
        msgdata = self.request.recv(msg_len)
        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
        try:
            self.dns_xfrout_start(sock, msgdata)
92
            #TODO, avoid catching all exceptions
93
        except Exception as e:
Jerry's avatar
Jerry committed
94
            self._log.log_message("error", str(e))
95

96
97
98
99
100
101
102
        try:
            sock.shutdown(socket.SHUT_RDWR)
        except socket.error:
            # Avoid socket error caused by shutting down 
            # one non-connected socket.
            pass

103
        sock.close()
104
105
        os.close(fd)
        pass
106

107
108
109
110
    def _parse_query_message(self, mdata):
        ''' parse query message to [socket,message]'''
        #TODO, need to add parseHeader() in case the message header is invalid 
        try:
111
            msg = Message(Message.PARSE)
112
            Message.from_wire(msg, mdata)
113
        except Exception as err:
Jerry's avatar
Jerry committed
114
            self._log.log_message("error", str(err))
115
            return Rcode.FORMERR(), None
116

117
        return Rcode.NOERROR(), msg
118
119

    def _get_query_zone_name(self, msg):
120
        question = msg.get_question()[0]
121
122
123
124
125
126
127
128
129
130
131
132
        return question.get_name().to_text()


    def _send_data(self, sock, data):
        size = len(data)
        total_count = 0
        while total_count < size:
            count = sock.send(data[total_count:])
            total_count += count


    def _send_message(self, sock, msg):
133
        render = MessageRenderer()
134
        render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE)
135
        msg.to_wire(render)
136
        header_len = struct.pack('H', socket.htons(render.get_length()))
137
        self._send_data(sock, header_len)
138
        self._send_data(sock, render.get_data())
139
140
141
142
143
144
145
146


    def _reply_query_with_error_rcode(self, msg, sock, rcode_):
        msg.make_response()
        msg.set_rcode(rcode_)
        self._send_message(sock, msg)


Likun Zhang's avatar
Likun Zhang committed
147
    def _reply_query_with_format_error(self, msg, sock):
148
149
150
151
152
        '''query message format isn't legal.'''
        if not msg:
            return # query message is invalid. send nothing back. 

        msg.make_response()
153
        msg.set_rcode(Rcode.FORMERR())
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
        self._send_message(sock, msg)


    def _zone_is_empty(self, zone):
        if sqlite3_ds.get_zone_soa(zone, self.server.get_db_file()):
            return False

        return True

    def _zone_exist(self, zonename):
        # Find zone in datasource, should this works? maybe should ask 
        # config manager.
        soa = sqlite3_ds.get_zone_soa(zonename, self.server.get_db_file())
        if soa:
            return True
        return False

    
    def _check_xfrout_available(self, zone_name):
        '''Check if xfr request can be responsed.
           TODO, Get zone's configuration from cfgmgr or some other place
           eg. check allow_transfer setting, 
        '''
        if not self._zone_exist(zone_name):
178
            return Rcode.NOTAUTH()
179
180

        if self._zone_is_empty(zone_name):
181
            return Rcode.SERVFAIL() 
182
183
184

        #TODO, check allow_transfer
        if not self.server.increase_transfers_counter():
185
            return Rcode.REFUSED()
186

187
        return Rcode.NOERROR()
188
189
190
191
192


    def dns_xfrout_start(self, sock, msg_query):
        rcode_, msg = self._parse_query_message(msg_query)
        #TODO. create query message and parse header
193
        if rcode_ != Rcode.NOERROR():
Likun Zhang's avatar
Likun Zhang committed
194
            return self._reply_query_with_format_error(msg, sock)
195
196
197

        zone_name = self._get_query_zone_name(msg)
        rcode_ = self._check_xfrout_available(zone_name)
198
        if rcode_ != Rcode.NOERROR():
199
200
            self._log.log_message("info", "transfer of '%s/IN' failed: %s",
                                  zone_name, rcode_.to_text())
201
202
203
            return self. _reply_query_with_error_rcode(msg, sock, rcode_)

        try:
Jerry's avatar
Jerry committed
204
            self._log.log_message("info", "transfer of '%s/IN': AXFR started" % zone_name)
205
            self._reply_xfrout_query(msg, sock, zone_name)
Jerry's avatar
Jerry committed
206
            self._log.log_message("info", "transfer of '%s/IN': AXFR end" % zone_name)
207
        except Exception as err:
Jerry's avatar
Jerry committed
208
            self._log.log_message("error", str(err))
209
210
211
212
213
214
215
216
217
218

        self.server.decrease_transfers_counter()
        return    


    def _clear_message(self, msg):
        qid = msg.get_qid()
        opcode = msg.get_opcode()
        rcode = msg.get_rcode()
        
219
        msg.clear(Message.RENDER)
220
221
222
        msg.set_qid(qid)
        msg.set_opcode(opcode)
        msg.set_rcode(rcode)
223
224
        msg.set_header_flag(MessageFlag.AA())
        msg.set_header_flag(MessageFlag.QR())
225
226
227
228
229
230
        return msg

    def _create_rrset_from_db_record(self, record):
        '''Create one rrset from one record of datasource, if the schema of record is changed, 
        This function should be updated first.
        '''
231
232
233
        rrtype_ = RRType(record[5])
        rdata_ = Rdata(rrtype_, RRClass("IN"), " ".join(record[7:]))
        rrset_ = RRset(Name(record[2]), RRClass("IN"), rrtype_, RRTTL( int(record[4])))
234
235
236
        rrset_.add_rdata(rdata_)
        return rrset_
         
237
    def _send_message_with_last_soa(self, msg, sock, rrset_soa, message_upper_len):
238
239
240
        '''Add the SOA record to the end of message. If it can't be
        added, a new message should be created to send out the last soa .
        '''
241
        rrset_len = get_rrset_len(rrset_soa)
242

243
244
        if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE:
            msg.add_rrset(Section.ANSWER(), rrset_soa)
245
        else:
246
            self._send_message(sock, msg)
247
            msg = self._clear_message(msg)
248
            msg.add_rrset(Section.ANSWER(), rrset_soa)
249

250
        self._send_message(sock, msg)
251
252
253
254
255


    def _reply_xfrout_query(self, msg, sock, zone_name):
        #TODO, there should be a better way to insert rrset.
        msg.make_response()
256
        msg.set_header_flag(MessageFlag.AA())
257
258
        soa_record = sqlite3_ds.get_zone_soa(zone_name, self.server.get_db_file())
        rrset_soa = self._create_rrset_from_db_record(soa_record)
259
        msg.add_rrset(Section.ANSWER(), rrset_soa)
260

261
262
        message_upper_len = get_rrset_len(rrset_soa)

263
264
        for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()):
            if  self.server._shutdown_event.is_set(): # Check if xfrout is shutdown
Jerry's avatar
Jerry committed
265
                self._log.log_message("error", "shutdown!")
266

267
268
            # TODO: RRType.SOA() ?
            if RRType(rr_data[5]) == RRType("SOA"): #ignore soa record
269
                continue
Jelte Jansen's avatar
Jelte Jansen committed
270

271
            rrset_ = self._create_rrset_from_db_record(rr_data)
272
273
274
275
276
277
278
279

            # We calculate the maximum size of the RRset (i.e. the
            # size without compression) and use that to see if we
            # may have reached the limit
            rrset_len = get_rrset_len(rrset_)
            if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE:
                msg.add_rrset(Section.ANSWER(), rrset_)
                message_upper_len += rrset_len
280
281
282
283
                continue

            self._send_message(sock, msg)
            msg = self._clear_message(msg)
284
            msg.add_rrset(Section.ANSWER(), rrset_) # Add the rrset to the new message
285
            message_upper_len = rrset_len
286

287
        self._send_message_with_last_soa(msg, sock, rrset_soa, message_upper_len)
288
289
290
291

class UnixSockServer(ThreadingUnixStreamServer):
    '''The unix domain socket server which accept xfr query sent from auth server.'''

292
    def __init__(self, sock_file, handle_class, shutdown_event, config_data, cc, log):
293
        self._remove_unused_sock_file(sock_file)
294
        self._sock_file = sock_file
295
296
297
298
        ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
        self._lock = threading.Lock()
        self._transfers_counter = 0
        self._shutdown_event = shutdown_event
299
        self._log = log
300
        self.update_config_data(config_data)
301
        self._cc = cc
302
        
Jerry's avatar
Jerry committed
303
304
305
    def finish_request(self, request, client_address):
        '''Finish one request by instantiating RequestHandlerClass.'''
        self.RequestHandlerClass(request, client_address, self, self._log)
306
307
308
309
310
311
312

    def _remove_unused_sock_file(self, sock_file):
        '''Try to remove the socket file. If the file is being used 
        by one running xfrout process, exit from python. 
        If it's not a socket file or nobody is listening
        , it will be removed. If it can't be removed, exit from python. '''
        if self._sock_file_in_use(sock_file):
313
314
            sys.stderr.write("[b10-xfrout] Fail to start xfrout process, unix socket" 
                  " file '%s' is being used by another xfrout process\n" % sock_file)
315
316
317
318
319
320
321
322
            sys.exit(0)
        else:
            if not os.path.exists(sock_file):
                return

            try:
                os.unlink(sock_file)
            except OSError as err:
Jeremy C. Reed's avatar
Jeremy C. Reed committed
323
                sys.stderr.write('[b10-xfrout] Fail to remove file %s: %s\n' % (sock_file, err))
324
325
326
327
328
329
330
331
332
333
334
335
336
                sys.exit(0)
   
    def _sock_file_in_use(self, sock_file):
        '''Check whether the socket file 'sock_file' exists and 
        is being used by one running xfrout process. If it is, 
        return True, or else return False. '''
        try:
            sock = socket.socket(socket.AF_UNIX)
            sock.connect(sock_file)
        except socket.error as err:
            return False
        else:
            return True 
337

338
339
340
341
    def shutdown(self):
        ThreadingUnixStreamServer.shutdown(self)
        try:
            os.unlink(self._sock_file)
Jerry's avatar
Jerry committed
342
343
        except Exception as e:
            self._log.log_message("error", str(e))
344
345
346

    def update_config_data(self, new_config):
        '''Apply the new config setting of xfrout module. '''
Jerry's avatar
Jerry committed
347
        self._log.log_message('info', 'update config data start.')
348
349
        self._lock.acquire()
        self._max_transfers_out = new_config.get('transfers_out')
Jerry's avatar
Jerry committed
350
        self._log.log_message('info', 'max transfer out : %d', self._max_transfers_out)
351
        self._lock.release()
Jerry's avatar
Jerry committed
352
        self._log.log_message('info', 'update config data complete.')
353
354

    def get_db_file(self):
355
356
357
358
359
360
        file, is_default = self._cc.get_remote_config_value("Auth", "database_file")
        # this too should be unnecessary, but currently the
        # 'from build' override isn't stored in the config
        # (and we don't have indirect python access to datasources yet)
        if is_default and "B10_FROM_BUILD" in os.environ:
            file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
361
362
        return file

363

364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
    def increase_transfers_counter(self):
        '''Return False, if counter + 1 > max_transfers_out, or else
        return True
        '''
        ret = False
        self._lock.acquire()
        if self._transfers_counter < self._max_transfers_out:
            self._transfers_counter += 1
            ret = True
        self._lock.release()
        return ret

    def decrease_transfers_counter(self):
        self._lock.acquire()
        self._transfers_counter -= 1
        self._lock.release()

def listen_on_xfr_query(unix_socket_server):
382
383
384
    '''Listen xfr query in one single thread. Polls for shutdown 
    every 0.1 seconds, is there a better time?
    '''
385
386
387
388
389
390
391
392
393
394
395
396
397

    while True:
        try:
            unix_socket_server.serve_forever(poll_interval = 0.1)
        except select.error as err:
            # serve_forever() calls select.select(), which can be 
            # interrupted.
            # If it is interrupted, it raises select.error with the 
            # errno set to EINTR. We ignore this case, and let the
            # normal program flow continue by trying serve_forever()
            # again.
            if err.args[0] != errno.EINTR: raise

398
399
400
401
   

class XfroutServer:
    def __init__(self):
402
        self._unix_socket_server = None
403
        self._log = None
404
        self._listen_sock_file = UNIX_SOCKET_FILE 
405
406
407
408
        self._shutdown_event = threading.Event()
        self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
        self._config_data = self._cc.get_full_config()
        self._cc.start()
409
        self._cc.add_remote_config(AUTH_SPECFILE_LOCATION);
Jerry's avatar
Jerry committed
410
        self._log = isc.log.NSLogger(self._config_data.get('log_name'), self._config_data.get('log_file'),
Jerry's avatar
Jerry committed
411
412
                                self._config_data.get('log_severity'), self._config_data.get('log_versions'),
                                self._config_data.get('log_max_bytes'), True)
413
        self._start_xfr_query_listener()
414
        self._start_notifier()
415

416
417
418
    def _start_xfr_query_listener(self):
        '''Start a new thread to accept xfr query. '''
        self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession, 
419
                                                  self._shutdown_event, self._config_data,
420
                                                  self._cc, self._log);
421
422
        listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
        listener.start()
423
424
425
426
427
428
429
        
    def _start_notifier(self):
        datasrc = self._unix_socket_server.get_db_file()
        self._notifier = notify_out.NotifyOut(datasrc, self._log)
        td = threading.Thread(target = notify_out.dispatcher, args = (self._notifier,))
        td.daemon = True
        td.start()
430

431
432
    def send_notify(self, zone_name, zone_class):
        self._notifier.send_notify(zone_name, zone_class)
433
434
435
436
437
438
439
440
441
442

    def config_handler(self, new_config):
        '''Update config data. TODO. Do error check'''
        answer = create_answer(0)
        for key in new_config:
            if key not in self._config_data:
                answer = create_answer(1, "Unknown config data: " + str(key))
                continue
            self._config_data[key] = new_config[key]
        
443
        if self._log:
Jerry's avatar
Jerry committed
444
            self._log.update_config(new_config)
445

446
447
        if self._unix_socket_server:
            self._unix_socket_server.update_config_data(self._config_data)
448

449
450
451
452
        return answer


    def shutdown(self):
453
        ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
454
455
        terminated.
        '''
456
457
458

        global xfrout_server
        xfrout_server = None #Avoid shutdown is called twice
459
        self._shutdown_event.set()
460
461
        if self._unix_socket_server:
            self._unix_socket_server.shutdown()
462
463
464
465
466
467
468
469
470

        main_thread = threading.currentThread()
        for th in threading.enumerate():
            if th is main_thread:
                continue
            th.join()

    def command_handler(self, cmd, args):
        if cmd == "shutdown":
471
            self._log.log_message("info", "Received shutdown command.")
472
473
            self.shutdown()
            answer = create_answer(0)
474
        
475
        elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
476
            zone_name = args.get('zone_name')
477
478
            zone_class = args.get('zone_class')
            if zone_name and zone_class:
479
480
                self._log.log_message("info", "zone '%s/%s': receive notify others command" \
                                       % (zone_name, zone_class))
481
                self.send_notify(zone_name, zone_class)
482
483
484
485
                answer = create_answer(0)
            else:
                answer = create_answer(1, "Bad command parameter:" + str(args))

486
487
488
489
490
491
492
493
494
495
496
497
498
499
        else: 
            answer = create_answer(1, "Unknown command:" + str(cmd))

        return answer    

    def run(self):
        '''Get and process all commands sent from cfgmgr or other modules. '''
        while not self._shutdown_event.is_set():
            self._cc.check_command()


xfrout_server = None

def signal_handler(signal, frame):
500
    if xfrout_server:
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
        xfrout_server.shutdown()
        sys.exit(0)

def set_signal_handler():
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)

def set_cmd_options(parser):
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
            help="display more about what is going on")

if '__main__' == __name__:
    try:
        parser = OptionParser()
        set_cmd_options(parser)
        (options, args) = parser.parse_args()
Jerry's avatar
Jerry committed
517
        VERBOSE_MODE = options.verbose
518
519
520
521
522

        set_signal_handler()
        xfrout_server = XfroutServer()
        xfrout_server.run()
    except KeyboardInterrupt:
Jeremy C. Reed's avatar
Jeremy C. Reed committed
523
        sys.stderr.write("[b10-xfrout] exit xfrout process\n")
524
    except SessionError as e:
525
        sys.stderr.write("[b10-xfrout] Error creating xfrout, "
Jeremy C. Reed's avatar
Jeremy C. Reed committed
526
                           "is the command channel daemon running?\n")
527
    except SessionTimeout as e:
528
        sys.stderr.write("[b10-xfrout] Error creating xfrout, " 
Jeremy C. Reed's avatar
Jeremy C. Reed committed
529
                           "is the configuration manager running?\n")
530
    except ModuleCCSessionError as e:
Jeremy C. Reed's avatar
Jeremy C. Reed committed
531
        sys.stderr.write("[b10-xfrout] exit xfrout process:%s\n" % str(e))
532

533
534
535
    if xfrout_server:
        xfrout_server.shutdown()