Commit 3771057f authored by Likun Zhang's avatar Likun Zhang
Browse files

Commit code for module xfrout, AXFR-out works now. Notes:

1. When auth server get axfr query in tcp, the query message and tcp socket will be sent to xfrout process over sendmsg/recvmsg, then auth server will close the tcp socket, or get the next tcp query on the socket. Xfrout send the query response over the tcp socket it received from auth server.
2. Since python3.1 didn't provide the wrapper of sendmsg() and recvmsg() for C socket module(Seems they plan do it in python3.2), I have to write the code in c++, and make python binding for it. See the code in file /src/lib/xfr/fd_share.cc and /src/lib/xfr/python_xfr.cc.

git-svn-id: svn://bind10.isc.org/svn/bind10/trunk@1630 e5f2f494-b856-4b98-b285-d166d9295462
parent 714cd6f3
......@@ -361,6 +361,7 @@ AC_CONFIG_FILES([Makefile
src/bin/auth/Makefile
src/bin/auth/tests/Makefile
src/bin/xfrin/Makefile
src/bin/xfrout/Makefile
src/bin/usermgr/Makefile
src/lib/Makefile
src/lib/cc/Makefile
......@@ -376,6 +377,7 @@ AC_CONFIG_FILES([Makefile
src/lib/exceptions/Makefile
src/lib/auth/Makefile
src/lib/auth/tests/Makefile
src/lib/xfr/Makefile
])
AC_OUTPUT([src/bin/cfgmgr/b10-cfgmgr.py
src/bin/cmdctl/cmdctl.py
......@@ -385,6 +387,9 @@ AC_OUTPUT([src/bin/cfgmgr/b10-cfgmgr.py
src/bin/xfrin/xfrin.py
src/bin/xfrin/xfrin.pre
src/bin/xfrin/run_b10-xfrin.sh
src/bin/xfrout/xfrout.py
src/bin/xfrout/xfrout.pre
src/bin/xfrout/run_b10-xfrout.sh
src/bin/bind10/bind10.py
src/bin/bind10/tests/bind10_test
src/bin/bind10/run_bind10.sh
......@@ -408,6 +413,7 @@ AC_OUTPUT([src/bin/cfgmgr/b10-cfgmgr.py
], [
chmod +x src/bin/cmdctl/run_b10-cmdctl.sh
chmod +x src/bin/xfrin/run_b10-xfrin.sh
chmod +x src/bin/xfrout/run_b10-xfrout.sh
chmod +x src/bin/bind10/run_bind10.sh
chmod +x src/bin/cmdctl/unittest/cmdctl_test
chmod +x src/bin/xfrin/unittest/xfrin_test
......
SUBDIRS = bind10 bindctl cfgmgr loadzone msgq host cmdctl auth xfrin usermgr
SUBDIRS = bind10 bindctl cfgmgr loadzone msgq host cmdctl auth xfrin xfrout usermgr
......@@ -33,6 +33,7 @@ b10_auth_LDADD += $(top_builddir)/src/lib/config/.libs/libcfgclient.a
b10_auth_LDADD += $(top_builddir)/src/lib/cc/libcc.a
b10_auth_LDADD += $(top_builddir)/src/lib/exceptions/.libs/libexceptions.a
b10_auth_LDADD += $(SQLITE_LIBS)
b10_auth_LDADD += $(top_builddir)/src/lib/xfr/.libs/libxfr.a
if HAVE_BOOST_SYSTEM
b10_auth_LDFLAGS = $(AM_LDFLAGS) $(BOOST_LDFLAGS)
b10_auth_LDADD += $(BOOST_SYSTEM_LIB)
......
......@@ -42,12 +42,14 @@
#include <cc/session.h>
#include <cc/data.h>
#include <config/ccsession.h>
#include <xfr/xfrout_client.h>
#include "spec_config.h"
#include "common.h"
#include "auth_srv.h"
using namespace std;
using namespace isc::xfr;
#ifdef HAVE_BOOST_SYSTEM
using namespace boost::asio;
......@@ -103,7 +105,38 @@ my_command_handler(const string& command, const ElementPtr args) {
return answer;
}
#ifdef HAVE_BOOST_SYSTEM
//TODO. The sample way for checking axfr query, the code should be merged to auth server class
static bool
check_axfr_query(char *msg_data, uint16_t msg_len)
{
if (msg_len < 15)
return false;
uint16_t query_type = *(uint16_t *)(msg_data + (msg_len - 4));
if ( query_type == 0xFC00)
return true;
return false;
}
//TODO. Send the xfr query to xfrout module, the code should be merged to auth server class
static void
dispatch_axfr_query(int tcp_sock, char axfr_query[], uint16_t query_len)
{
std::string path = "/tmp/auth_xfrout_conn";
XfroutClient xfr_client(path);
try {
xfr_client.connect();
xfr_client.sendXfroutRequestInfo(tcp_sock, (uint8_t *)axfr_query, query_len);
xfr_client.disconnect();
}
catch (const std::exception & err) {
//if (verbose_mode)
cerr << "error handle xfr query:" << err.what() << endl;
}
}
#ifdef HAVE_BOOSTLIB
//
// Helper classes for asynchronous I/O using boost::asio
//
......@@ -148,17 +181,24 @@ public:
{
if (!error) {
InputBuffer dnsbuffer(data_, bytes_transferred);
if (auth_server->processMessage(dnsbuffer, dns_message_,
response_renderer_, false)) {
responselen_buffer_.writeUint16(response_buffer_.getLength());
async_write(socket_,
boost::asio::buffer(
responselen_buffer_.getData(),
responselen_buffer_.getLength()),
boost::bind(&TCPClient::responseWrite, this,
placeholders::error));
} else {
delete this;
if (check_axfr_query(data_, bytes_transferred)) {
dispatch_axfr_query(socket_.native(), data_, bytes_transferred);
// start to get new query ?
start();
}
else {
if (auth_server->processMessage(dnsbuffer, dns_message_,
response_renderer_, false)) {
responselen_buffer_.writeUint16(response_buffer_.getLength());
async_write(socket_,
boost::asio::buffer(
responselen_buffer_.getData(),
responselen_buffer_.getLength()),
boost::bind(&TCPClient::responseWrite, this,
placeholders::error));
} else {
delete this;
}
}
} else {
delete this;
......@@ -537,25 +577,30 @@ processMessageTCP(const int fd, Message& dns_message,
cc += cc0;
}
InputBuffer buffer(&message_buffer[0], size);
dns_message.clear(Message::PARSE);
response_renderer.clear();
if (auth_server->processMessage(buffer, dns_message, response_renderer,
false)) {
size = response_renderer.getLength();
size_n = htons(size);
if (send(ts, &size_n, 2, 0) == 2) {
cc = send(ts, response_renderer.getData(),
response_renderer.getLength(), 0);
if (cc == -1) {
if (verbose_mode) {
cerr << "[AuthSrv] error in sending TCP response message" <<
endl;
}
} else {
if (verbose_mode) {
cerr << "[XX] sent TCP response: " << cc << " bytes"
<< endl;
if (check_axfr_query(&message_buffer[0], size)) {
dispatch_axfr_query(ts, &message_buffer[0], size);
}
else {
InputBuffer buffer(&message_buffer[0], size);
dns_message.clear(Message::PARSE);
response_renderer.clear();
if (auth_server->processMessage(buffer, dns_message, response_renderer,
false)) {
size = response_renderer.getLength();
size_n = htons(size);
if (send(ts, &size_n, 2, 0) == 2) {
cc = send(ts, response_renderer.getData(),
response_renderer.getLength(), 0);
if (cc == -1) {
if (verbose_mode) {
cerr << "[AuthSrv] error in sending TCP response message" <<
endl;
}
} else {
if (verbose_mode) {
cerr << "[XX] sent TCP response: " << cc << " bytes"
<< endl;
}
}
}
} else {
......
......@@ -265,6 +265,21 @@ class BoB:
if self.verbose:
print("[XX] ccsession started")
# start the xfrout before auth-server, to make sure every xfr-query can be
# processed properly.
if self.verbose:
sys.stdout.write("Starting b10-xfrout\n")
try:
xfrout = ProcessInfo("b10-xfrout", ["b10-xfrout"],
{ 'ISC_MSGQ_PORT': str(self.c_channel_port)})
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
return "Unable to start b10-xfrout; " + str(e)
self.processes[xfrout.pid] = xfrout
if self.verbose:
sys.stdout.write("Started b10-xfrout (PID %d)\n" % xfrout.pid)
# start b10-auth
# XXX: this must be read from the configuration manager in the future
authargs = ['b10-auth', '-p', str(self.auth_port)]
......@@ -278,6 +293,7 @@ class BoB:
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
xfrout.process.kill()
return "Unable to start b10-auth; " + str(e)
self.processes[auth.pid] = auth
if self.verbose:
......@@ -292,6 +308,7 @@ class BoB:
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
xfrout.process.kill()
auth.process.kill()
return "Unable to start b10-xfrin; " + str(e)
self.processes[xfrind.pid] = xfrind
......@@ -308,6 +325,7 @@ class BoB:
except Exception as e:
c_channel.process.kill()
bind_cfgd.process.kill()
xfrout.process.kill()
auth.process.kill()
xfrind.process.kill()
return "Unable to start b10-cmdctl; " + str(e)
......
pkglibexecdir = $(libexecdir)/@PACKAGE@
pkglibexec_SCRIPTS = b10-xfrout
b10_xfroutdir = $(DESTDIR)$(pkgdatadir)
b10_xfrout_DATA = xfrout.spec
CLEANFILES= xfrout.py xfrout.pre xfrout.spec
xfrout.spec: xfrout.pre
$(SED) -e "s|@@LOCALSTATEDIR@@|$(localstatedir)|" xfrout.pre >$@
# TODO: does this need $$(DESTDIR) also?
# this is done here since configure.ac AC_OUTPUT doesn't expand exec_prefix
b10-xfrout: xfrout.py
$(SED) -e "s|@@PYTHONPATH@@|@pyexecdir@|" \
-e "s|@@LOCALSTATEDIR@@|$(localstatedir)|" xfrout.py >$@
chmod a+x $@
#! /bin/sh
PYTHON_EXEC=${PYTHON_EXEC:-@PYTHON@}
export PYTHON_EXEC
MYPATH_PATH=@abs_top_builddir@/src/bin/xfrout
PYTHONPATH=@abs_top_srcdir@/src/lib/python
export PYTHONPATH
cd ${MYPATH_PATH}
${PYTHON_EXEC} b10-xfrout
{
"module_spec": {
"module_name": "Xfrout",
"config_data": [
{
"item_name": "transfers_out",
"item_type": "integer",
"item_optional": False,
"item_default": 10
},
{
"item_name": "db_file",
"item_type": "string",
"item_optional": False,
"item_default": '@@LOCALSTATEDIR@@/@PACKAGE@/zone.sqlite3'
}
],
"commands": [
{
"command_name": "shutdown",
"command_description": "Shut down Xfrout",
"command_args": []
}
]
}
}
#!@PYTHON@
# Copyright (C) 2010 Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import sys; sys.path.append ('@@PYTHONPATH@@')
import isc
import isc.cc
import os
import threading
import struct
import signal
from isc.auth import sqlite3_ds
from socketserver import *
import os
from isc.config.ccsession import *
import socket
from bind10_xfr import *
from bind10_dns import *
from optparse import OptionParser, OptionValueError
try:
from bind10_xfr import *
from bind10_dns import *
except:
pass
if "B10_FROM_SOURCE" in os.environ:
SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrout"
else:
PREFIX = "@prefix@"
DATAROOTDIR = "@datarootdir@"
SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
MAX_TRANSFERS_OUT = 10
verbose_mode = True
class XfroutException(Exception): pass
class XfroutSession(BaseRequestHandler):
def handle(self):
fd = recv_fd(self.request.fileno())
data_len = self.request.recv(2)
msg_len = struct.unpack('H', data_len)[0]
msgdata = self.request.recv(msg_len)
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
try:
self.dns_xfrout_start(sock, msgdata)
except Exception as e:
self.log_msg(str(e))
sock.close()
def _parse_query_message(self, mdata):
''' parse query message to [socket,message]'''
#TODO, need to add parseHeader() in case the message header is invalid
try:
msg = message(message_mode.PARSE)
msg.from_wire(input_buffer(mdata))
except Exception as err:
self.log_msg(str(err))
return rcode.FORMERR(), None
return rcode.NOERROR(), msg
def _get_query_zone_name(self, msg):
q_iter = question_iter(msg)
question = q_iter.get_question()
return question.get_name().to_text()
def _send_data(self, sock, data):
size = len(data)
total_count = 0
while total_count < size:
count = sock.send(data[total_count:])
total_count += count
def _send_message(self, sock, msg):
obuf = output_buffer(0)
render = message_render(obuf)
msg.to_wire(render)
header_len = struct.pack('H', socket.htons(obuf.get_length()))
self._send_data(sock, header_len)
self._send_data(sock, obuf.get_data())
def _reply_query_with_error_rcode(self, msg, sock, rcode_):
msg.make_response()
msg.set_rcode(rcode_)
self._send_message(sock, msg)
def _reply_query_with_format_error(self, sock, msg):
'''query message format isn't legal.'''
if not msg:
return # query message is invalid. send nothing back.
msg.make_response()
msg.set_rcode(rcode.FORMERR())
self._send_message(sock, msg)
def _zone_is_empty(self, zone):
if sqlite3_ds.get_zone_soa(zone, self.server.get_db_file()):
return False
return True
def _zone_exist(self, zonename):
# Find zone in datasource, should this works? maybe should ask
# config manager.
soa = sqlite3_ds.get_zone_soa(zonename, self.server.get_db_file())
if soa:
return True
return False
def _check_xfrout_available(self, zone_name):
'''Check if xfr request can be responsed.
TODO, Get zone's configuration from cfgmgr or some other place
eg. check allow_transfer setting,
'''
if not self._zone_exist(zone_name):
return rcode.NOTAUTH()
if self._zone_is_empty(zone_name):
return rcode.SERVFAIL()
#TODO, check allow_transfer
if not self.server.increase_transfers_counter():
return rcode.REFUSED()
return rcode.NOERROR()
def dns_xfrout_start(self, sock, msg_query):
rcode_, msg = self._parse_query_message(msg_query)
#TODO. create query message and parse header
if rcode_ != rcode.NOERROR():
return self._reply_query_with_format_error(msg, sock, msg_query)
zone_name = self._get_query_zone_name(msg)
rcode_ = self._check_xfrout_available(zone_name)
if rcode_ != rcode.NOERROR():
return self. _reply_query_with_error_rcode(msg, sock, rcode_)
try:
if verbose_mode:
self.log_msg("transfer of '%s/IN': AXFR started" % zone_name)
self._reply_xfrout_query(msg, sock, zone_name)
if verbose_mode:
self.log_msg("transfer of '%s/IN': AXFR end" % zone_name)
except Exception as err:
if verbose_mode:
sys.stderr.write(str(err))
self.server.decrease_transfers_counter()
return
def _clear_message(self, msg):
qid = msg.get_qid()
opcode = msg.get_opcode()
rcode = msg.get_rcode()
msg.clear(message_mode.RENDER)
msg.set_qid(qid)
msg.set_opcode(opcode)
msg.set_rcode(rcode)
msg.set_header_flag(message_flag.AA())
msg.set_header_flag(message_flag.QR())
return msg
def _create_rrset_from_db_record(self, record):
'''Create one rrset from one record of datasource, if the schema of record is changed,
This function should be updated first.
'''
rrtype_ = rr_type(record[5])
rdata_ = create_rdata(rrtype_, rr_class.IN(), " ".join(record[7:]))
rrset_ = rrset(name(record[2]), rr_class.IN(), rrtype_, rr_ttl( int(record[4])))
rrset_.add_rdata(rdata_)
return rrset_
def _send_message_with_last_soa(self, msg, sock, rrset_soa):
'''Add the SOA record to the end of message. If it can't be
added, a new message should be created to send out the last soa .
'''
obuf = output_buffer(0)
render = message_render(obuf)
msg.to_wire(render)
old_message_len = obuf.get_length()
msg.add_rrset(section.ANSWER(), rrset_soa)
msg.to_wire(render)
message_len = obuf.get_length()
if message_len != old_message_len:
self._send_message(sock, msg)
else:
msg = self._clear_message(msg)
msg.add_rrset(section.ANSWER(), rrset_soa)
self._send_message(sock, msg)
def _get_message_len(self, msg):
'''Get message length, every time need do like this? Actually there should be
a better way, I need check with jinmei later.
'''
obuf = output_buffer(0)
render = message_render(obuf)
msg.to_wire(render)
return obuf.get_length()
def _reply_xfrout_query(self, msg, sock, zone_name):
#TODO, there should be a better way to insert rrset.
msg.make_response()
msg.set_header_flag(message_flag.AA())
soa_record = sqlite3_ds.get_zone_soa(zone_name, self.server.get_db_file())
rrset_soa = self._create_rrset_from_db_record(soa_record)
msg.add_rrset(section.ANSWER(), rrset_soa)
old_message_len = 0
# TODO, Since add_rrset() return nothing when rrset can't be added, so I have to compare
# the message length to know if the rrset has been added sucessfully.
for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()):
if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown
raise XfroutException("shutdown!")
if rr_type(rr_data[5]) == rr_type.SOA(): #ignore soa record
continue
rrset_ = self._create_rrset_from_db_record(rr_data)
msg.add_rrset(section.ANSWER(), rrset_)
message_len = self._get_message_len(msg)
if message_len != old_message_len:
old_message_len = message_len
continue
self._send_message(sock, msg)
msg = self._clear_message(msg)
msg.add_rrset(section.ANSWER(), rrset_) # Add the rrset to the new message
old_message_len = 0
self._send_message_with_last_soa(msg, sock, rrset_soa)
def log_msg(self, msg):
print('[b10-xfrout] ', msg)
class UnixSockServer(ThreadingUnixStreamServer):
'''The unix domain socket server which accept xfr query sent from auth server.'''
def __init__(self, sock_file, handle_class, shutdown_event, config_data):
ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
self._lock = threading.Lock()
self._transfers_counter = 0
self._shutdown_event = shutdown_event
self.update_config_data(config_data)
def update_config_data(self, new_config):
'''Apply the new config setting of xfrout module. '''
self._lock.acquire()
self._max_transfers_out = new_config.get('transfers_out')
self._db_file = new_config.get('db_file')
self._lock.release()
def get_db_file(self):
self._lock.acquire()
file = self._db_file
self._lock.release()
return file
def increase_transfers_counter(self):
'''Return False, if counter + 1 > max_transfers_out, or else
return True
'''
ret = False
self._lock.acquire()
if self._transfers_counter < self._max_transfers_out:
self._transfers_counter += 1
ret = True
self._lock.release()
return ret
def decrease_transfers_counter(self):
self._lock.acquire()
self._transfers_counter -= 1
self._lock.release()
def listen_on_xfr_query(unix_socket_server):
'''Listen xfr query in one single thread. '''
unix_socket_server.serve_forever()
class XfroutServer:
def __init__(self):
self._init_config_data()
self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
self._cc.start()
self._lock = threading.Lock()
self._shutdown_event = threading.Event()
self._listen_sock_file = '/tmp/auth_xfrout_conn' # TODO, should this be configurable in cfgmgr
self._start_xfr_query_listener()
def _start_xfr_query_listener(self):
'''Start a new thread to accept xfr query. '''
try:
os.unlink(self._listen_sock_file)
except:
pass
self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
self._shutdown_event, self._config_data);
listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
listener.start()
def _init_config_data(self):
'''Init the config item here. In case there is some error in config data got from cfgmgr.'''
self._config_data = {'transfers_out': 10, 'db_file':'/tmp/zone.sqlite3'}
def config_handler(self, new_config):