Commit ff65d5d0 authored by Likun Zhang's avatar Likun Zhang
Browse files

Add the files for xfrin.

git-svn-id: svn://bind10.isc.org/svn/bind10/trunk@1236 e5f2f494-b856-4b98-b285-d166d9295462
parent 7e9fb87b
pkglibexecdir = $(libexecdir)/@PACKAGE@
pkglibexec_SCRIPTS = b10-xfrin
b10_xfrindir = $(DESTDIR)$(pkgdatadir)
b10_xfrin_DATA = xfrin.spec
CLEANFILES= b10-xfrin
# TODO: does this need $$(DESTDIR) also?
# this is done here since configure.ac AC_OUTPUT doesn't expand exec_prefix
b10-xfrin: xfrin.py
$(SED) "s|@@PYTHONPATH@@|@pyexecdir@|" xfrin.py >$@
chmod a+x $@
#! /bin/sh
PYTHON_EXEC=${PYTHON_EXEC:-@PYTHON@}
export PYTHON_EXEC
CMD_CTRLD_PATH=@abs_top_builddir@/src/bin/xfrin
PYTHONPATH=@abs_top_srcdir@/src/lib/cc/python:${abs_top_src_dir}/lib/cc/python/ISC
export PYTHONPATH
cd ${CMD_CTRLD_PATH}
${PYTHON_EXEC} b10-xfrin
#!@PYTHON@
# Copyright (C) 2010 Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import sys; sys.path.append ('@@PYTHONPATH@@')
import os
import signal
import isc
import asyncore
import struct
import threading
import socket
import random
from isc.config.ccsession import *
try:
from bind10_message import *
except:
pass
# If B10_FROM_SOURCE is set in the environment, we use data files
# from a directory relative to that, otherwise we use the ones
# installed on the system
if "B10_FROM_SOURCE" in os.environ:
SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrin"
else:
PREFIX = "@prefix@"
DATAROOTDIR = "@datarootdir@"
SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
SPECFILE_LOCATION = SPECFILE_PATH + "/xfrin.spec"
__version__ = 'BIND10'
# define xfrin rcode
XFRIN_OK = 0
XFRIN_RECV_TIMEOUT = 1
XFRIN_NO_NEWDATA = 2
XFRIN_QUOTA_ERROR = 3
XFRIN_IS_DOING = 4
# define xfrin state
XFRIN_QUERY_SOA = 1
XFRIN_FIRST_AXFR = 2
XFRIN_FIRST_IXFR = 3
class XfrinException(Exception):
pass
class XfrinConnection(asyncore.dispatcher):
'''Do xfrin in this class. '''
def __init__(self, zone_name, db_file,
shutdown_event,
master_addr,
port = 53,
check_soa = True,
idle_timeout = 60):
''' idle_timeout: max idle time for read data from socket.
db_file: specify the data source file.
check_soa: when it's true, check soa first before sending xfr query
'''
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self._zone_name = zone_name
self._db_file = db_file
self._records = []
self._idle_timeout = idle_timeout
self.setblocking(1)
self.connect((master_addr, port))
self._shutdown_event = shutdown_event
def _create_query(self, query_type):
'''Create dns query message. '''
msg = message(message_mode.RENDER)
query_id = random.randint(1, 0xFFFF)
self._query_id = query_id
msg.set_qid(query_id)
msg.set_opcode(op_code.QUERY())
msg.set_rcode(rcode.NOERROR())
query_question = question(name(self._zone_name), rr_class.IN(), query_type)
msg.add_question(query_question)
return msg
def _send_data(self, data):
size = len(data)
total_count = 0
while total_count < size:
count = self.send(data[total_count:])
total_count += count
def _send_query(self, query_type):
'''Send query message over TCP. '''
msg = self._create_query(query_type)
obuf = output_buffer(0)
render = message_render(obuf)
msg.to_wire(render)
header_len = struct.pack('H', socket.htons(obuf.get_length()))
self._send_data(header_len)
self._send_data(obuf.get_data())
def _get_request_response(self, size):
recv_size = 0
data = b''
while recv_size < size:
self._recv_time_out = True
self._need_recv_size = size - recv_size
asyncore.loop(self._idle_timeout, count = 1)
if self._recv_time_out:
raise XfrinException('receive data from socket time out.')
recv_size += self._recvd_size
data += self._recvd_data
return data
def handle_read(self):
'''Read query's response from socket. '''
self._recvd_data = self.recv(self._need_recv_size)
self._recvd_size = len(self._recvd_data)
self._recv_time_out = False
def _check_soa_serial(self):
''' Compare the soa serial, if soa serial in master is less than
the soa serial in local, Finish xfrin.
False: soa serial in master is less or equal to the local one.
True: soa serial in master is bigger
'''
self._send_query(rr_type.SOA())
data_size = self._get_request_response(2)
soa_reply = self._get_request_response(int(data_size))
#TODO, need select soa record from data source then compare the two
#serial
return XFRIN_OK
def do_xfrin(self, check_soa, ixfr_first = False):
try:
ret = XFRIN_OK
if check_soa:
ret = self._check_soa_serial()
print('[xfrin] transfer of \'%s\': AXFR started' % self._zone_name)
if ret == XFRIN_OK:
self._send_query(rr_type.AXFR())
ret = self._handle_xfrin_response()
self._insert_record_to_sqlite3(self._records)
print('[xfrin] transfer of \'%s\' AXFR ended' % self._zone_name)
except XfrinException as e:
print(e)
print('[xfrin] Error happened during xfrin!')
#TODO, recover data source.
finally:
self.close()
return ret
def _check_response_status(self, msg):
#TODO, check more?
if msg.get_rcode() != rcode.NOERROR():
raise XfrinException('error response: ')
if not msg.get_header_flag(message_flag.QR()):
raise XfrinException('response is not a response ')
if msg.get_qid() != self._query_id:
raise XfrinException('bad query id')
if msg.get_rr_count(section.ANSWER()) == 0:
raise XfrinException('answer section is empty')
if msg.get_rr_count(section.QUESTION()) > 1:
raise XfrinException('query section count greater than 1')
def _handle_answer_section(self, rrset_iter):
soa_count = 0
while not rrset_iter.is_last():
rrset = rrset_iter.get_rrset()
rrset_iter.next()
rrset_name = rrset.get_name().to_text()
rrset_ttl = int(rrset.get_ttl().to_text())
rrset_class = rrset.get_class().to_text()
rrset_type = rrset.get_type().to_text()
rdata_iter = rrset.get_rdata_iterator()
rdata_iter.first()
while not rdata_iter.is_last():
# Count the soa record count
if rrset.get_type() == rr_type.SOA():
soa_count += 1
rdata_text = rdata_iter.get_current().to_text()
rr_data = (rrset_name, rrset_ttl, rrset_class, rrset_type, rdata_text)
self._records.append(rr_data)
#self._insert_record_to_sqlite3(rr_data)
rdata_iter.next()
return soa_count
def _handle_xfrin_response(self):
soa_count = 0
while True:
data_len = self._get_request_response(2)
msg_len = socket.htons(struct.unpack('H', data_len)[0])
recvdata = self._get_request_response(msg_len)
msg = message(message_mode.PARSE)
msg.from_wire(input_buffer(recvdata))
self._check_response_status(msg)
rrset_iter = section_iter(msg, section.ANSWER())
soa_count += self._handle_answer_section(rrset_iter)
if soa_count == 2:
return XFRIN_OK
if self._shutdown_event.is_set():
#Check if xfrin process is shutdown.
#TODO, xfrin may be blocked in one loop.
raise XfrinException('xfrin is forced to stop')
return XFRIN_OK
def _insert_record_to_sqlite3(self, rrs):
''' The interface provided by sqlite3_ds only support insert all the records
at the end of AXFR'''
isc.auth.sqlite3_ds.load(self._db_file, self._zone_name, rrs)
def writable(self):
'''Ignore the writable socket. '''
return False
def log_info(self, msg, type = ''):
# Overwrite the log function, log nothing
pass
def process_xfrin(xfrin_recorder, zone_name, db_file,
shutdown_event, master_addr, port, check_soa):
xfrin_recorder.increment(name)
try:
conn = XfrinConnection(zone_name, db_file, shutdown_event,
master_addr, int(port), check_soa)
conn.do_xfrin(False)
except Exception as e:
print('[xfrin] Error happened:', e)
xfrin_recorder.decrement(zone_name)
class XfrinRecorder():
def __init__(self):
self._lock = threading.Lock()
self._zones = []
def increment(self, zone_name):
self._lock.acquire()
self._zones.append(zone_name)
self._lock.release()
def decrement(self, zone_name):
self._lock.acquire()
if zone_name in self._zones:
self._zones.remove(zone_name)
self._lock.release()
def xfrin_in_progress(self, zone_name):
self._lock.acquire()
ret = zone_name in self._zones
self._lock.release()
return ret
def count(self):
self._lock.acquire()
ret = len(self._zones)
self._lock.release()
return ret
class Xfrin():
def __init__(self):
self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
self._cc.start()
self._max_transfers_in = 10
self.recorder = XfrinRecorder()
self._shutdown_event = threading.Event()
def config_handler(self, new_config):
answer = create_answer(0, 'ok')
def _print_settings(self):
full_config = self._cc.get_full_config()
for item in full_config:
print(item + ":" + str(full_config[item]))
def shutdown(self):
''' shutdown the xfrin process. the thread which is doing xfrin should be
terminated.
'''
self._shutdown_event.set()
main_thread = threading.currentThread()
for th in threading.enumerate():
if th is main_thread:
continue
th.join()
def command_handler(self, command, args):
answer = create_answer(0)
cmd = command
try:
if cmd == 'print_message':
print(args)
elif cmd == 'print_settings':
self._print_settings()
elif cmd == 'shutdown':
self._shutdown_event.set()
elif cmd == 'retransfer':
zone_name, master, port, db_file = self._parse_cmd_params(args)
ret = self.xfrin_start(zone_name, db_file, master, port, False)
answer = create_answer(ret[0], ret[1])
elif cmd == 'refresh':
zone_name, master, port, db_file = self._parse_cmd_params(args)
ret = self.xfrin_start(zone_name, db_file, master, port)
answer = create_answer(ret[0], ret[1])
except XfrinException as err:
answer = create_answer(1, str(err))
return answer
def _parse_cmd_params(self, args):
zone_name = args['zone_name']
master = args['master']
check_addr(master)
port = int(args.get('port'))
if port:
check_port(port)
else:
port = 53
db_file = args.get('db_file')
if not db_file:
#TODO, the db file path should be got in auth server's configuration
db_file = '/tmp/zone.sqlite3'
return (zone_name, master, port, db_file)
def startup(self):
while not self._shutdown_event.is_set():
self._cc.check_command()
def xfrin_start(self, zone_name, db_file, master_addr,
port = 53,
check_soa = True):
if "bind10_message" not in sys.modules:
return (1, "xfrin failed, can't load dns message python library: 'bind10_message'")
# check max_transfer_in, else return quota error
if self.recorder.count() >= self._max_transfers_in:
return (1, 'xfrin quota error')
if self.recorder.xfrin_in_progress(zone_name):
return (1, 'zone xfrin is in progress')
xfrin_thread = threading.Thread(target = process_xfrin,
args = (self.recorder,
zone_name,
db_file,
self._shutdown_event,
master_addr,
port, check_soa))
xfrin_thread.start()
return (0, 'zone xfrin is started')
xfrind = None
def signal_handler(signal, frame):
if xfrind:
xfrind.shutdown()
sys.exit(0)
def set_signal_handler():
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def check_port(value):
if (value < 0) or (value > 65535):
raise XfrinException('requires a port number (0-65535)')
def check_addr(ipstr):
ip_family = socket.AF_INET
if (ipstr.find(':') != -1):
ip_family = socket.AF_INET6
try:
socket.inet_pton(ip_family, ipstr)
except:
raise XfrinException("%s invalid ip address" % ipstr)
if __name__ == '__main__':
try:
set_signal_handler()
xfrind = Xfrin()
xfrind.startup()
except KeyboardInterrupt:
print("exit http server")
except Exception as e:
print(e)
if xfrind:
xfrind.shutdown()
{
"module_spec": {
"module_name": "Xfrin",
"config_data": [
{
"item_name": "transfers_in",
"item_type": "integer",
"item_optional": False,
"item_default": 10
}
],
"commands": [
{
"command_name": "print_message",
"command_description": "Print the given message to stdout",
"command_args": [ {
"item_name": "message",
"item_type": "string",
"item_optional": False,
"item_default": ""
} ]
},
{
"command_name": "print_settings",
"command_description": "Print some_string and some_int to stdout",
"command_args": []
},
{
'command_name': 'retransfer',
"command_description": 'retransfer a single zone without checking zone serial number',
'command_args': [ {
"item_name": "zone_name",
"item_type": "string",
"item_optional": False,
"item_default": ""
},
{
"item_name": "master",
"item_type": "string",
"item_optional": False,
"item_default": ""
},
{
"item_name": "port",
"item_type": "integer",
"item_optional": True,
"item_default": 53
},
{
"item_name": "db_file",
"item_type": "string",
"item_optional": True,
"item_default": '/tmp/zone.sqlite3'
}
]
},
{
"command_name": "shutdown",
"command_description": "Shut down BIND 10",
"command_args": []
}
]
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment