xfrout.py.in 27.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!@PYTHON@

# Copyright (C) 2010  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.


import sys; sys.path.append ('@@PYTHONPATH@@')
import isc
import isc.cc
import threading
import struct
import signal
Evan Hunt's avatar
Evan Hunt committed
25
from isc.datasrc import sqlite3_ds
26
27
28
from socketserver import *
import os
from isc.config.ccsession import *
29
from isc.cc import SessionError, SessionTimeout
30
from isc.notify import notify_out
31
import isc.util.process
32
import socket
33
import select
34
import errno
35
from optparse import OptionParser, OptionValueError
Likun Zhang's avatar
Likun Zhang committed
36
from isc.util import socketserver_mixin
37

38
39
40
41
42
from xfrout_messages import *

isc.log.init("b10-xfrout")
logger = isc.log.Logger("xfrout")

43
try:
44
    from libutil_io_python import *
Jelte Jansen's avatar
Jelte Jansen committed
45
    from pydnspp import *
46
47
48
except ImportError as e:
    # C++ loadable module may not be installed; even so the xfrout process
    # must keep running, so we warn about it and move forward.
49
    log.error(XFROUT_IMPORT, str(e))
Michal Vaner's avatar
Michal Vaner committed
50

Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
51
52
53
from isc.acl.acl import ACCEPT, REJECT, DROP
from isc.acl.dns import REQUEST_LOADER

54
isc.util.process.rename()
55

56
57
58
59
60
61
62
63
64
65
66
67
def init_paths():
    global SPECFILE_PATH
    global AUTH_SPECFILE_PATH
    global UNIX_SOCKET_FILE
    if "B10_FROM_BUILD" in os.environ:
        SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout"
        AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
        if "B10_FROM_SOURCE_LOCALSTATEDIR" in os.environ:
            UNIX_SOCKET_FILE = os.environ["B10_FROM_SOURCE_LOCALSTATEDIR"] + \
                "/auth_xfrout_conn"
        else:
            UNIX_SOCKET_FILE = os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn"
68
    else:
69
70
71
72
73
74
75
76
77
78
        PREFIX = "@prefix@"
        DATAROOTDIR = "@datarootdir@"
        SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
        AUTH_SPECFILE_PATH = SPECFILE_PATH
        if "BIND10_XFROUT_SOCKET_FILE" in os.environ:
            UNIX_SOCKET_FILE = os.environ["BIND10_XFROUT_SOCKET_FILE"]
        else:
            UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/auth_xfrout_conn"

init_paths()
79

80
SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
81
AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec"
82
MAX_TRANSFERS_OUT = 10
Jerry's avatar
Jerry committed
83
VERBOSE_MODE = False
84
85
# tsig sign every N axfr packets.
TSIG_SIGN_EVERY_NTH = 96
86

87
88
XFROUT_MAX_MESSAGE_SIZE = 65535

89

90
91
92
93
94
95
96
def get_rrset_len(rrset):
    """Returns the wire length of the given RRset"""
    bytes = bytearray()
    rrset.to_wire(bytes)
    return len(bytes)


97
class XfroutSession():
98
99
    def __init__(self, sock_fd, request_data, server, tsig_key_ring, remote,
                 acl):
100
101
102
        self._sock_fd = sock_fd
        self._request_data = request_data
        self._server = server
103
104
105
        self._tsig_key_ring = tsig_key_ring
        self._tsig_ctx = None
        self._tsig_len = 0
106
        self._remote = remote
107
        self._acl = acl
108
        self.handle()
Jerry's avatar
Jerry committed
109

110
111
112
113
    def create_tsig_ctx(self, tsig_record, tsig_key_ring):
        return TSIGContext(tsig_record.get_name(), tsig_record.get_rdata().get_algorithm(),
                           tsig_key_ring)

114
    def handle(self):
115
        ''' Handle a xfrout query, send xfrout response '''
116
        try:
117
            self.dns_xfrout_start(self._sock_fd, self._request_data)
118
119
            #TODO, avoid catching all exceptions
        except Exception as e:
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
120
            logger.error(XFROUT_HANDLE_QUERY_ERROR, e)
121
            pass
122

123
        os.close(self._sock_fd)
124

125
126
127
128
129
130
131
132
133
134
135
136
    def _check_request_tsig(self, msg, request_data):
        ''' If request has a tsig record, perform tsig related checks '''
        tsig_record = msg.get_tsig_record()
        if tsig_record is not None:
            self._tsig_len = tsig_record.get_length()
            self._tsig_ctx = self.create_tsig_ctx(tsig_record, self._tsig_key_ring)
            tsig_error = self._tsig_ctx.verify(tsig_record, request_data)
            if tsig_error != TSIGError.NOERROR:
                return Rcode.NOTAUTH()

        return Rcode.NOERROR()

137
138
    def _parse_query_message(self, mdata):
        ''' parse query message to [socket,message]'''
139
        #TODO, need to add parseHeader() in case the message header is invalid
140
        try:
141
            msg = Message(Message.PARSE)
142
            Message.from_wire(msg, mdata)
143
144
145
146

            # TSIG related checks
            rcode = self._check_request_tsig(msg, mdata)

147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
            if rcode == Rcode.NOERROR():
                # ACL checks
                acl_result = self._acl.execute(
                    isc.acl.dns.RequestContext(self._remote))
                if acl_result == DROP:
                    logger.info(XFROUT_QUERY_DROPPED,
                                self._get_query_zone_name(msg),
                                self._get_query_zone_class(msg),
                                self._remote[0], self._remote[1])
                    return None, None
                elif acl_result == REJECT:
                    logger.info(XFROUT_QUERY_REJECTED,
                                self._get_query_zone_name(msg),
                                self._get_query_zone_class(msg),
                                self._remote[0], self._remote[1])
                    return Rcode.REFUSED(), msg
163

164
        except Exception as err:
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
165
            logger.error(XFROUT_PARSE_QUERY_ERROR, err)
166
            return Rcode.FORMERR(), None
167

168
        return rcode, msg
169
170

    def _get_query_zone_name(self, msg):
171
        question = msg.get_question()[0]
172
173
        return question.get_name().to_text()

174
175
176
    def _get_query_zone_class(self, msg):
        question = msg.get_question()[0]
        return question.get_class().to_text()
177

178
    def _send_data(self, sock_fd, data):
179
180
181
        size = len(data)
        total_count = 0
        while total_count < size:
182
            count = os.write(sock_fd, data[total_count:])
183
184
185
            total_count += count


186
    def _send_message(self, sock_fd, msg, tsig_ctx=None):
187
        render = MessageRenderer()
188
189
        # As defined in RFC5936 section3.4, perform case-preserving name
        # compression for AXFR message.
190
        render.set_compress_mode(MessageRenderer.CASE_SENSITIVE)
191
        render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE)
192
193
194
195
196
197
198
199

        # XXX Currently, python wrapper doesn't accept 'None' parameter in this case,
        # we should remove the if statement and use a universal interface later.
        if tsig_ctx is not None:
            msg.to_wire(render, tsig_ctx)
        else:
            msg.to_wire(render)

200
        header_len = struct.pack('H', socket.htons(render.get_length()))
201
202
        self._send_data(sock_fd, header_len)
        self._send_data(sock_fd, render.get_data())
203
204


205
    def _reply_query_with_error_rcode(self, msg, sock_fd, rcode_):
206
        if not msg:
207
            return # query message is invalid. send nothing back.
208
209

        msg.make_response()
210
        msg.set_rcode(rcode_)
211
        self._send_message(sock_fd, msg, self._tsig_ctx)
212

JINMEI Tatuya's avatar
JINMEI Tatuya committed
213
214
215
216
217
218
219
    def _zone_has_soa(self, zone):
        '''Judge if the zone has an SOA record.'''
        # In some sense, the SOA defines a zone.
        # If the current name server has authority for the
        # specific zone, we need to judge if the zone has an SOA record;
        # if not, we consider the zone has incomplete data, so xfrout can't
        # serve for it.
220
        if sqlite3_ds.get_zone_soa(zone, self._server.get_db_file()):
221
            return True
JINMEI Tatuya's avatar
JINMEI Tatuya committed
222

223
224
        return False

JINMEI Tatuya's avatar
JINMEI Tatuya committed
225
226
227
228
229
230
231
    def _zone_exist(self, zonename):
        '''Judge if the zone is configured by config manager.'''
        # Currently, if we find the zone in datasource successfully, we
        # consider the zone is configured, and the current name server has
        # authority for the specific zone.
        # TODO: should get zone's configuration from cfgmgr or other place
        # in future.
232
        return sqlite3_ds.zone_exist(zonename, self._server.get_db_file())
233

234
235
236
    def _check_xfrout_available(self, zone_name):
        '''Check if xfr request can be responsed.
           TODO, Get zone's configuration from cfgmgr or some other place
237
           eg. check allow_transfer setting,
238
        '''
JINMEI Tatuya's avatar
JINMEI Tatuya committed
239
240
        # If the current name server does not have authority for the
        # zone, xfrout can't serve for it, return rcode NOTAUTH.
241
        if not self._zone_exist(zone_name):
242
            return Rcode.NOTAUTH()
243

JINMEI Tatuya's avatar
JINMEI Tatuya committed
244
245
246
247
        # If we are an authoritative name server for the zone, but fail
        # to find the zone's SOA record in datasource, xfrout can't
        # provide zone transfer for it.
        if not self._zone_has_soa(zone_name):
248
            return Rcode.SERVFAIL()
249
250

        #TODO, check allow_transfer
251
        if not self._server.increase_transfers_counter():
252
            return Rcode.REFUSED()
253

254
        return Rcode.NOERROR()
255
256


257
    def dns_xfrout_start(self, sock_fd, msg_query):
258
259
        rcode_, msg = self._parse_query_message(msg_query)
        #TODO. create query message and parse header
260
261
262
        if rcode_ is None: # Dropped by ACL
            return
        elif rcode_ == Rcode.NOTAUTH() or rcode_ == Rcode.REFUSED():
263
            return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
264
        elif rcode_ != Rcode.NOERROR():
265
266
            return self._reply_query_with_error_rcode(msg, sock_fd,
                                                      Rcode.FORMERR())
267
268

        zone_name = self._get_query_zone_name(msg)
269
270
        zone_class_str = self._get_query_zone_class(msg)
        # TODO: should we not also include class in the check?
271
        rcode_ = self._check_xfrout_available(zone_name)
272

273
        if rcode_ != Rcode.NOERROR():
274
275
            logger.info(XFROUT_AXFR_TRANSFER_FAILED, zone_name,
                        zone_class_str, rcode_.to_text())
276
            return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
277
278

        try:
279
            logger.info(XFROUT_AXFR_TRANSFER_STARTED, zone_name, zone_class_str)
280
            self._reply_xfrout_query(msg, sock_fd, zone_name)
281
        except Exception as err:
282
283
            logger.error(XFROUT_AXFR_TRANSFER_ERROR, zone_name,
                         zone_class_str, str(err))
284
            pass
285
        logger.info(XFROUT_AXFR_TRANSFER_DONE, zone_name, zone_class_str)
286

287
        self._server.decrease_transfers_counter()
288
        return
289
290
291
292
293
294


    def _clear_message(self, msg):
        qid = msg.get_qid()
        opcode = msg.get_opcode()
        rcode = msg.get_rcode()
295

296
        msg.clear(Message.RENDER)
297
298
299
        msg.set_qid(qid)
        msg.set_opcode(opcode)
        msg.set_rcode(rcode)
300
301
        msg.set_header_flag(Message.HEADERFLAG_AA)
        msg.set_header_flag(Message.HEADERFLAG_QR)
302
303
304
        return msg

    def _create_rrset_from_db_record(self, record):
305
        '''Create one rrset from one record of datasource, if the schema of record is changed,
306
307
        This function should be updated first.
        '''
308
309
310
        rrtype_ = RRType(record[5])
        rdata_ = Rdata(rrtype_, RRClass("IN"), " ".join(record[7:]))
        rrset_ = RRset(Name(record[2]), RRClass("IN"), rrtype_, RRTTL( int(record[4])))
311
312
        rrset_.add_rdata(rdata_)
        return rrset_
313

314
315
    def _send_message_with_last_soa(self, msg, sock_fd, rrset_soa, message_upper_len,
                                    count_since_last_tsig_sign):
316
317
318
        '''Add the SOA record to the end of message. If it can't be
        added, a new message should be created to send out the last soa .
        '''
319
        rrset_len = get_rrset_len(rrset_soa)
320

321
322
323
324
325
326
327
        if (count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH and
            message_upper_len + rrset_len >= XFROUT_MAX_MESSAGE_SIZE):
            # If tsig context exist, sign the packet with serial number TSIG_SIGN_EVERY_NTH
            self._send_message(sock_fd, msg, self._tsig_ctx)
            msg = self._clear_message(msg)
        elif (count_since_last_tsig_sign != TSIG_SIGN_EVERY_NTH and
              message_upper_len + rrset_len + self._tsig_len >= XFROUT_MAX_MESSAGE_SIZE):
328
            self._send_message(sock_fd, msg)
329
330
            msg = self._clear_message(msg)

331
332
        # If tsig context exist, sign the last packet
        msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
333
        self._send_message(sock_fd, msg, self._tsig_ctx)
334
335


336
    def _reply_xfrout_query(self, msg, sock_fd, zone_name):
337
        #TODO, there should be a better way to insert rrset.
338
        count_since_last_tsig_sign = TSIG_SIGN_EVERY_NTH
339
        msg.make_response()
340
        msg.set_header_flag(Message.HEADERFLAG_AA)
341
        soa_record = sqlite3_ds.get_zone_soa(zone_name, self._server.get_db_file())
342
        rrset_soa = self._create_rrset_from_db_record(soa_record)
343
        msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
344

345
        message_upper_len = get_rrset_len(rrset_soa) + self._tsig_len
346

347
348
        for rr_data in sqlite3_ds.get_zone_datas(zone_name, self._server.get_db_file()):
            if  self._server._shutdown_event.is_set(): # Check if xfrout is shutdown
349
                logger.info(XFROUT_STOPPING)
350
                return
351
352
            # TODO: RRType.SOA() ?
            if RRType(rr_data[5]) == RRType("SOA"): #ignore soa record
353
                continue
Jelte Jansen's avatar
Jelte Jansen committed
354

355
            rrset_ = self._create_rrset_from_db_record(rr_data)
356
357
358
359
360
361

            # We calculate the maximum size of the RRset (i.e. the
            # size without compression) and use that to see if we
            # may have reached the limit
            rrset_len = get_rrset_len(rrset_)
            if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE:
362
                msg.add_rrset(Message.SECTION_ANSWER, rrset_)
363
                message_upper_len += rrset_len
364
365
                continue

366
            # If tsig context exist, sign every N packets
367
368
369
370
371
372
373
            if count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH:
                count_since_last_tsig_sign = 0
                self._send_message(sock_fd, msg, self._tsig_ctx)
            else:
                self._send_message(sock_fd, msg)

            count_since_last_tsig_sign += 1
374
            msg = self._clear_message(msg)
375
            msg.add_rrset(Message.SECTION_ANSWER, rrset_) # Add the rrset to the new message
376
377
378
379
380
381

            # Reserve tsig space for signed packet
            if count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH:
                message_upper_len = rrset_len + self._tsig_len
            else:
                message_upper_len = rrset_len
382

383
384
        self._send_message_with_last_soa(msg, sock_fd, rrset_soa, message_upper_len,
                                         count_since_last_tsig_sign)
385

386
class UnixSockServer(socketserver_mixin.NoPollMixIn, ThreadingUnixStreamServer):
387
388
    '''The unix domain socket server which accept xfr query sent from auth server.'''

389
    def __init__(self, sock_file, handle_class, shutdown_event, config_data, cc):
390
        self._remove_unused_sock_file(sock_file)
391
        self._sock_file = sock_file
392
        socketserver_mixin.NoPollMixIn.__init__(self)
393
394
        ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
        self._shutdown_event = shutdown_event
395
        self._write_sock, self._read_sock = socket.socketpair()
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
396
        self._common_init()
397
        self.update_config_data(config_data)
398
        self._cc = cc
399

Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
400
401
402
    def _common_init(self):
        self._lock = threading.Lock()
        self._transfers_counter = 0
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
403
404
405
        # This default value will probably get overwritten by the (same)
        # default value from the spec file. This is here just to make
        # sure and to make the default value in tests consistent.
406
        self._acl = REQUEST_LOADER.load('[{"action": "ACCEPT"}]')
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
407

408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
    def _receive_query_message(self, sock):
        ''' receive request message from sock'''
        # receive data length
        data_len = sock.recv(2)
        if not data_len:
            return None
        msg_len = struct.unpack('!H', data_len)[0]
        # receive data
        recv_size = 0
        msgdata = b''
        while recv_size < msg_len:
            data = sock.recv(msg_len - recv_size)
            if not data:
                return None
            recv_size += len(data)
            msgdata += data

        return msgdata

427
428
429
430
431
    def handle_request(self):
        ''' Enable server handle a request until shutdown or auth is closed.'''
        try:
            request, client_address = self.get_request()
        except socket.error:
432
            logger.error(XFROUT_FETCH_REQUEST_ERROR)
433
434
435
436
437
438
439
440
441
442
443
444
445
            return

        # Check self._shutdown_event to ensure the real shutdown comes.
        # Linux could trigger a spurious readable event on the _read_sock
        # due to a bug, so we need perform a double check.
        while not self._shutdown_event.is_set(): # Check if xfrout is shutdown
            try:
                (rlist, wlist, xlist) = select.select([self._read_sock, request], [], [])
            except select.error as e:
                if e.args[0] == errno.EINTR:
                    (rlist, wlist, xlist) = ([], [], [])
                    continue
                else:
446
                    logger.error(XFROUT_SOCKET_SELECT_ERROR, str(e))
447
448
449
450
451
452
453
454
455
                    break

            # self.server._shutdown_event will be set by now, if it is not a false
            # alarm
            if self._read_sock in rlist:
                continue

            try:
                self.process_request(request)
456
            except Exception as pre:
457
                log.error(XFROUT_PROCESS_REQUEST_ERROR, str(pre))
458
459
                break

460
    def _handle_request_noblock(self):
461
462
        """Override the function _handle_request_noblock(), it creates a new
        thread to handle requests for each auth"""
463
464
465
466
        td = threading.Thread(target=self.handle_request)
        td.setDaemon(True)
        td.start()

467
    def process_request(self, request):
468
469
470
471
472
473
474
        """Receive socket fd and query message from auth, then
        start a new thread to process the request."""
        sock_fd = recv_fd(request.fileno())
        if sock_fd < 0:
            # This may happen when one xfrout process try to connect to
            # xfrout unix socket server, to check whether there is another
            # xfrout running.
475
            if sock_fd == FD_COMM_ERROR:
476
                logger.error(XFROUT_RECEIVE_FILE_DESCRIPTOR_ERROR)
477
478
479
480
481
482
483
484
            return

        # receive request msg
        request_data = self._receive_query_message(request)
        if not request_data:
            return

        t = threading.Thread(target = self.finish_request,
485
                             args = (sock_fd, request_data))
486
487
488
489
        if self.daemon_threads:
            t.daemon = True
        t.start()

490
491
    def _guess_remote(self, sock_fd):
        """
492
           Guess remote address and port of the socket. The sock_fd must be a
493
494
495
           socket
        """
        # This uses a trick. If the socket is IPv4 in reality and we pretend
496
        # it to be IPv6, it returns IPv4 address anyway. This doesn't seem
497
498
499
500
501
502
503
504
505
        # to care about the SOCK_STREAM parameter at all (which it really is,
        # except for testing)
        if socket.has_ipv6:
            sock = socket.fromfd(sock_fd, socket.AF_INET6, socket.SOCK_STREAM)
        else:
            # To make it work even on hosts without IPv6 support
            # (Any idea how to simulate this in test?)
            sock = socket.fromfd(sock_fd, socket.AF_INET, socket.SOCK_STREAM)
        return sock.getpeername()
506

507
    def finish_request(self, sock_fd, request_data):
Jerry's avatar
Jerry committed
508
        '''Finish one request by instantiating RequestHandlerClass.'''
509
510
        self.RequestHandlerClass(sock_fd, request_data, self,
                                 self.tsig_key_ring,
511
                                 self._guess_remote(sock_fd), self._acl)
512
513

    def _remove_unused_sock_file(self, sock_file):
514
515
        '''Try to remove the socket file. If the file is being used
        by one running xfrout process, exit from python.
516
517
518
        If it's not a socket file or nobody is listening
        , it will be removed. If it can't be removed, exit from python. '''
        if self._sock_file_in_use(sock_file):
519
            logger.error(XFROUT_UNIX_SOCKET_FILE_IN_USE, sock_file)
520
521
522
523
524
525
526
527
            sys.exit(0)
        else:
            if not os.path.exists(sock_file):
                return

            try:
                os.unlink(sock_file)
            except OSError as err:
528
                logger.error(XFROUT_REMOVE_OLD_UNIX_SOCKET_FILE_ERROR, sock_file, str(err))
529
                sys.exit(0)
530

531
    def _sock_file_in_use(self, sock_file):
532
533
        '''Check whether the socket file 'sock_file' exists and
        is being used by one running xfrout process. If it is,
534
535
536
537
538
539
540
        return True, or else return False. '''
        try:
            sock = socket.socket(socket.AF_UNIX)
            sock.connect(sock_file)
        except socket.error as err:
            return False
        else:
541
            return True
542

543
    def shutdown(self):
544
        self._write_sock.send(b"shutdown") #terminate the xfrout session thread
545
        super().shutdown() # call the shutdown() of class socketserver_mixin.NoPollMixIn
546
547
        try:
            os.unlink(self._sock_file)
Jerry's avatar
Jerry committed
548
        except Exception as e:
Jelte Jansen's avatar
Jelte Jansen committed
549
            logger.error(XFROUT_REMOVE_UNIX_SOCKET_FILE_ERROR, self._sock_file, str(e))
550
            pass
551
552
553

    def update_config_data(self, new_config):
        '''Apply the new config setting of xfrout module. '''
554
        logger.info(XFROUT_NEW_CONFIG)
555
556
        if 'query_acl' in new_config:
            self._acl = REQUEST_LOADER.load(new_config['query_acl'])
557
558
        self._lock.acquire()
        self._max_transfers_out = new_config.get('transfers_out')
559
        self.set_tsig_key_ring(new_config.get('tsig_key_ring'))
560
        self._lock.release()
561
        logger.info(XFROUT_NEW_CONFIG_DONE)
562

563
    def set_tsig_key_ring(self, key_list):
564
565
566
        """Set the tsig_key_ring , given a TSIG key string list representation. """

        # XXX add values to configure zones/tsig options
567
        self.tsig_key_ring = TSIGKeyRing()
568
        # If key string list is empty, create a empty tsig_key_ring
569
570
571
572
573
574
575
        if not key_list:
            return

        for key_item in key_list:
            try:
                self.tsig_key_ring.add(TSIGKey(key_item))
            except InvalidParameter as ipe:
576
                logger.error(XFROUT_BAD_TSIG_KEY_STRING, str(key_item))
577

578
    def get_db_file(self):
579
580
581
582
583
584
        file, is_default = self._cc.get_remote_config_value("Auth", "database_file")
        # this too should be unnecessary, but currently the
        # 'from build' override isn't stored in the config
        # (and we don't have indirect python access to datasources yet)
        if is_default and "B10_FROM_BUILD" in os.environ:
            file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
585
586
        return file

587

588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
    def increase_transfers_counter(self):
        '''Return False, if counter + 1 > max_transfers_out, or else
        return True
        '''
        ret = False
        self._lock.acquire()
        if self._transfers_counter < self._max_transfers_out:
            self._transfers_counter += 1
            ret = True
        self._lock.release()
        return ret

    def decrease_transfers_counter(self):
        self._lock.acquire()
        self._transfers_counter -= 1
        self._lock.release()

class XfroutServer:
    def __init__(self):
607
        self._unix_socket_server = None
608
        self._listen_sock_file = UNIX_SOCKET_FILE
609
        self._shutdown_event = threading.Event()
610
        self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler, None, True)
611
612
        self._config_data = self._cc.get_full_config()
        self._cc.start()
613
        self._cc.add_remote_config(AUTH_SPECFILE_LOCATION);
614
        self._start_xfr_query_listener()
615
        self._start_notifier()
616

617
618
    def _start_xfr_query_listener(self):
        '''Start a new thread to accept xfr query. '''
619
        self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
620
                                                  self._shutdown_event, self._config_data,
621
                                                  self._cc)
622
        listener = threading.Thread(target=self._unix_socket_server.serve_forever)
623
        listener.start()
624

625
626
    def _start_notifier(self):
        datasrc = self._unix_socket_server.get_db_file()
627
        self._notifier = notify_out.NotifyOut(datasrc)
Michal Vaner's avatar
Michal Vaner committed
628
        self._notifier.dispatcher()
629

630
631
    def send_notify(self, zone_name, zone_class):
        self._notifier.send_notify(zone_name, zone_class)
632
633
634
635
636
637
638
639
640

    def config_handler(self, new_config):
        '''Update config data. TODO. Do error check'''
        answer = create_answer(0)
        for key in new_config:
            if key not in self._config_data:
                answer = create_answer(1, "Unknown config data: " + str(key))
                continue
            self._config_data[key] = new_config[key]
Michal Vaner's avatar
Michal Vaner committed
641

642
        if self._unix_socket_server:
643
644
645
            try:
                self._unix_socket_server.update_config_data(self._config_data)
            except Exception as e:
646
647
648
                answer = create_answer(1,
                                       "Failed to handle new configuration: " +
                                       str(e))
649

650
651
652
653
        return answer


    def shutdown(self):
654
        ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
655
656
        terminated.
        '''
657
658
659

        global xfrout_server
        xfrout_server = None #Avoid shutdown is called twice
660
        self._shutdown_event.set()
661
        self._notifier.shutdown()
662
663
        if self._unix_socket_server:
            self._unix_socket_server.shutdown()
664

665
        # Wait for all threads to terminate
666
667
668
669
670
671
672
673
        main_thread = threading.currentThread()
        for th in threading.enumerate():
            if th is main_thread:
                continue
            th.join()

    def command_handler(self, cmd, args):
        if cmd == "shutdown":
674
            logger.info(XFROUT_RECEIVED_SHUTDOWN_COMMAND)
675
676
            self.shutdown()
            answer = create_answer(0)
Michal Vaner's avatar
Michal Vaner committed
677

678
        elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
679
            zone_name = args.get('zone_name')
680
681
            zone_class = args.get('zone_class')
            if zone_name and zone_class:
682
                logger.info(XFROUT_NOTIFY_COMMAND, zone_name, zone_class)
683
                self.send_notify(zone_name, zone_class)
684
685
686
687
                answer = create_answer(0)
            else:
                answer = create_answer(1, "Bad command parameter:" + str(args))

688
        else:
689
690
            answer = create_answer(1, "Unknown command:" + str(cmd))

Michal Vaner's avatar
Michal Vaner committed
691
        return answer
692
693
694
695

    def run(self):
        '''Get and process all commands sent from cfgmgr or other modules. '''
        while not self._shutdown_event.is_set():
696
            self._cc.check_command(False)
697
698
699
700
701


xfrout_server = None

def signal_handler(signal, frame):
702
    if xfrout_server:
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
        xfrout_server.shutdown()
        sys.exit(0)

def set_signal_handler():
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)

def set_cmd_options(parser):
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
            help="display more about what is going on")

if '__main__' == __name__:
    try:
        parser = OptionParser()
        set_cmd_options(parser)
        (options, args) = parser.parse_args()
Jerry's avatar
Jerry committed
719
        VERBOSE_MODE = options.verbose
720
721
722
723
724

        set_signal_handler()
        xfrout_server = XfroutServer()
        xfrout_server.run()
    except KeyboardInterrupt:
725
        logger.INFO(XFROUT_STOPPED_BY_KEYBOARD)
726
    except SessionError as e:
727
        logger.error(XFROUT_CC_SESSION_ERROR, str(e))
728
    except SessionTimeout as e:
729
        logger.error(XFROUT_CC_SESSION_TIMEOUT_ERROR)
730

731
732
733
    if xfrout_server:
        xfrout_server.shutdown()