Commit 57f7044d authored by JINMEI Tatuya's avatar JINMEI Tatuya
Browse files

[1165] refactor the tests for later use: separate ACL related tests

into a single function and provide capability of specifying the ACL context.
parent 383b6b28
......@@ -101,20 +101,24 @@ class TestXfroutSession(unittest.TestCase):
def message_has_tsig(self, msg):
return msg.get_tsig_record() is not None
def create_request_data_with_tsig(self):
def create_request_data(self, with_tsig=False):
msg = Message(Message.RENDER)
query_id = 0x1035
msg.set_qid(query_id)
msg.set_opcode(Opcode.QUERY())
msg.set_rcode(Rcode.NOERROR())
query_question = Question(Name("example.com."), RRClass.IN(), RRType.AXFR())
query_question = Question(Name("example.com"), RRClass.IN(),
RRType.AXFR())
msg.add_question(query_question)
renderer = MessageRenderer()
tsig_ctx = MockTSIGContext(TSIG_KEY)
msg.to_wire(renderer, tsig_ctx)
reply_data = renderer.get_data()
return reply_data
if with_tsig:
tsig_ctx = MockTSIGContext(TSIG_KEY)
msg.to_wire(renderer, tsig_ctx)
else:
msg.to_wire(renderer)
request_data = renderer.get_data()
return request_data
def setUp(self):
self.sock = MySocket(socket.AF_INET,socket.SOCK_STREAM)
......@@ -123,7 +127,7 @@ class TestXfroutSession(unittest.TestCase):
# When not testing ACLs, simply accept
isc.acl.dns.REQUEST_LOADER.load(
[{"action": "ACCEPT"}]))
self.mdata = bytes(b'\xd6=\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01')
self.mdata = self.create_request_data(False)
self.soa_record = (4, 3, 'example.com.', 'com.example.', 3600, 'SOA', None, 'master.example.com. admin.example.com. 1234 3600 1800 2419200 7200')
def test_parse_query_message(self):
......@@ -131,7 +135,7 @@ class TestXfroutSession(unittest.TestCase):
self.assertEqual(get_rcode.to_text(), "NOERROR")
# tsig signed query message
request_data = self.create_request_data_with_tsig()
request_data = self.create_request_data(True)
# BADKEY
[rcode, msg] = self.xfrsess._parse_query_message(request_data)
self.assertEqual(rcode.to_text(), "NOTAUTH")
......@@ -143,8 +147,9 @@ class TestXfroutSession(unittest.TestCase):
self.assertEqual(rcode.to_text(), "NOERROR")
self.assertTrue(self.xfrsess._tsig_ctx is not None)
def check_transfer_acl(self, acl_setter):
# ACL checks, put some ACL inside
self.xfrsess._acl = isc.acl.dns.REQUEST_LOADER.load([
acl_setter(isc.acl.dns.REQUEST_LOADER.load([
{
"from": "127.0.0.1",
"action": "ACCEPT"
......@@ -153,7 +158,7 @@ class TestXfroutSession(unittest.TestCase):
"from": "192.0.2.1",
"action": "DROP"
}
])
]))
# Localhost (the default in this test) is accepted
rcode, msg = self.xfrsess._parse_query_message(self.mdata)
self.assertEqual(rcode.to_text(), "NOERROR")
......@@ -165,6 +170,10 @@ class TestXfroutSession(unittest.TestCase):
self.xfrsess._remote = ('192.0.2.2', 12345)
rcode, msg = self.xfrsess._parse_query_message(self.mdata)
self.assertEqual(rcode.to_text(), "REFUSED")
# TSIG signed request
request_data = self.create_request_data(True)
# If the TSIG check fails, it should not check ACL
# (If it checked ACL as well, it would just drop the request)
self.xfrsess._remote = ('192.0.2.1', 12345)
......@@ -174,36 +183,36 @@ class TestXfroutSession(unittest.TestCase):
self.assertTrue(self.xfrsess._tsig_ctx is not None)
# ACL using TSIG: successful case
self.xfrsess._acl = isc.acl.dns.REQUEST_LOADER.load([
acl_setter(isc.acl.dns.REQUEST_LOADER.load([
{"key": "example.com", "action": "ACCEPT"}, {"action": "REJECT"}
])
]))
self.assertEqual(TSIGKeyRing.SUCCESS,
self.xfrsess._tsig_key_ring.add(TSIG_KEY))
[rcode, msg] = self.xfrsess._parse_query_message(request_data)
self.assertEqual(rcode.to_text(), "NOERROR")
# ACL using TSIG: key name doesn't match; should be rejected
self.xfrsess._acl = isc.acl.dns.REQUEST_LOADER.load([
acl_setter(isc.acl.dns.REQUEST_LOADER.load([
{"key": "example.org", "action": "ACCEPT"}, {"action": "REJECT"}
])
]))
[rcode, msg] = self.xfrsess._parse_query_message(request_data)
self.assertEqual(rcode.to_text(), "REFUSED")
# ACL using TSIG: no TSIG; should be rejected
self.xfrsess._acl = isc.acl.dns.REQUEST_LOADER.load([
acl_setter(isc.acl.dns.REQUEST_LOADER.load([
{"key": "example.org", "action": "ACCEPT"}, {"action": "REJECT"}
])
]))
[rcode, msg] = self.xfrsess._parse_query_message(self.mdata)
self.assertEqual(rcode.to_text(), "REFUSED")
#
# ACL using IP + TSIG: both should match
#
self.xfrsess._acl = isc.acl.dns.REQUEST_LOADER.load([
acl_setter(isc.acl.dns.REQUEST_LOADER.load([
{"ALL": [{"key": "example.com"}, {"from": "192.0.2.1"}],
"action": "ACCEPT"},
{"action": "REJECT"}
])
]))
# both matches
self.xfrsess._remote = ('192.0.2.1', 12345)
[rcode, msg] = self.xfrsess._parse_query_message(request_data)
......@@ -221,6 +230,11 @@ class TestXfroutSession(unittest.TestCase):
[rcode, msg] = self.xfrsess._parse_query_message(self.mdata)
self.assertEqual(rcode.to_text(), "REFUSED")
def test_transfer_acl(self):
def acl_setter(acl):
self.xfrsess._acl = acl
self.check_transfer_acl(acl_setter)
def test_get_query_zone_name(self):
msg = self.getmsg()
self.assertEqual(self.xfrsess._get_query_zone_name(msg), "example.com.")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment