Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
ISC Open Source Projects
Kea
Commits
2636154f
Commit
2636154f
authored
Jun 10, 2011
by
chenzhengzhang
Browse files
[trac955] refactor unittest
parent
e95a07de
Changes
2
Hide whitespace changes
Inline
Side-by-side
src/bin/xfrin/tests/xfrin_test.py
View file @
2636154f
...
...
@@ -79,7 +79,7 @@ class MockXfrin(Xfrin):
def
_get_db_file
(
self
):
pass
def
_cc_check_command
(
self
):
self
.
_shutdown_event
.
set
()
if
MockXfrin
.
check_command_hook
:
...
...
@@ -208,6 +208,14 @@ class TestXfrinConnection(unittest.TestCase):
mock_ctx
.
error
=
error
return
mock_ctx
def
__match_exception
(
self
,
expected_exception
,
expected_msg
,
expression
):
try
:
expression
()
except
expected_exception
as
ex
:
self
.
assertEqual
(
str
(
ex
),
expected_msg
)
else
:
self
.
assertFalse
(
'exception is expected, but not raised'
)
def
test_close
(
self
):
# we shouldn't be using the global asyncore map.
self
.
assertEqual
(
len
(
asyncore
.
socket_map
),
0
)
...
...
@@ -303,12 +311,9 @@ class TestXfrinConnection(unittest.TestCase):
rcode
=
Rcode
.
SERVFAIL
())
# xfrin should check TSIG before other part of incoming message
# validate log message for XfrinException
self
.
conn
.
_verbose
=
True
err_output
=
io
.
StringIO
()
sys
.
stdout
=
err_output
self
.
assertRaises
(
XfrinException
,
self
.
_handle_xfrin_response
)
self
.
assertEqual
(
"[b10-xfrin] TSIG verify fail: BADSIG
\n
"
,
err_output
.
getvalue
())
err_output
.
close
()
self
.
__match_exception
(
XfrinException
,
"TSIG verify fail: BADSIG"
,
self
.
_handle_xfrin_response
)
def
test_response_bad_qid_bad_key
(
self
):
self
.
conn
.
_tsig_key
=
TSIG_KEY
...
...
@@ -318,12 +323,9 @@ class TestXfrinConnection(unittest.TestCase):
self
.
conn
.
reply_data
=
self
.
conn
.
create_response_data
(
bad_qid
=
True
)
# xfrin should check TSIG before other part of incoming message
# validate log message for XfrinException
self
.
conn
.
_verbose
=
True
err_output
=
io
.
StringIO
()
sys
.
stdout
=
err_output
self
.
assertRaises
(
XfrinException
,
self
.
_handle_xfrin_response
)
self
.
assertEqual
(
"[b10-xfrin] TSIG verify fail: BADKEY
\n
"
,
err_output
.
getvalue
())
err_output
.
close
()
self
.
__match_exception
(
XfrinException
,
"TSIG verify fail: BADKEY"
,
self
.
_handle_xfrin_response
)
def
test_response_non_response
(
self
):
self
.
conn
.
_send_query
(
RRType
.
AXFR
())
...
...
@@ -377,12 +379,9 @@ class TestXfrinConnection(unittest.TestCase):
self
.
conn
.
response_generator
=
self
.
_create_soa_response_data
# xfrin should check TSIG before other part of incoming message
# validate log message for XfrinException
self
.
conn
.
_verbose
=
True
err_output
=
io
.
StringIO
()
sys
.
stdout
=
err_output
self
.
assertRaises
(
XfrinException
,
self
.
conn
.
_check_soa_serial
)
self
.
assertEqual
(
"[b10-xfrin] TSIG verify fail: BADSIG
\n
"
,
err_output
.
getvalue
())
err_output
.
close
()
self
.
__match_exception
(
XfrinException
,
"TSIG verify fail: BADSIG"
,
self
.
conn
.
_check_soa_serial
)
def
test_soacheck_non_response
(
self
):
self
.
soa_response_params
[
'response'
]
=
False
...
...
src/bin/xfrin/xfrin.py.in
View file @
2636154f
...
...
@@ -218,9 +218,7 @@ class XfrinConnection(asyncore.dispatcher):
if self._tsig_ctx is not None:
tsig_error = self._tsig_ctx.verify(tsig_record, response_data)
if tsig_error != TSIGError.NOERROR:
errmsg = 'TSIG verify fail: ' + str(tsig_error)
self.log_msg(errmsg)
raise XfrinException(errmsg)
raise XfrinException('TSIG verify fail: %s' % str(tsig_error))
elif tsig_record is not None:
# If the response includes a TSIG while we didn't sign the query,
# we treat it as an error. RFC doesn't say anything about this
...
...
@@ -229,9 +227,7 @@ class XfrinConnection(asyncore.dispatcher):
# implementation would return such a response, and since this is
# part of security mechanism, it's probably better to be more
# strict.
errmsg = 'Unexpected TSIG in response'
self.log_msg(errmsg)
raise XfrinException(errmsg)
raise XfrinException('Unexpected TSIG in response')
def _check_soa_serial(self):
''' Compare the soa serial, if soa serial in master is less than
...
...
@@ -312,19 +308,13 @@ class XfrinConnection(asyncore.dispatcher):
msg_rcode = msg.get_rcode()
if msg_rcode != Rcode.NOERROR():
errmsg = 'error response: ' + msg_rcode.to_text()
self.log_msg(errmsg)
raise XfrinException(errmsg)
raise XfrinException('error response: %s' % msg_rcode.to_text())
if not msg.get_header_flag(Message.HEADERFLAG_QR):
errmsg = 'response is not a response'
self.log_msg(errmsg)
raise XfrinException(errmsg)
raise XfrinException('response is not a response ')
if msg.get_qid() != self._query_id:
errmsg = 'bad query id'
self.log_msg(errmsg)
raise XfrinException(errmsg)
raise XfrinException('bad query id')
def _check_response_status(self, msg):
'''Check validation of xfr response. '''
...
...
@@ -332,14 +322,10 @@ class XfrinConnection(asyncore.dispatcher):
self._check_response_header(msg)
if msg.get_rr_count(Message.SECTION_ANSWER) == 0:
errmsg = 'answer section is empty'
self.log_msg(errmsg)
raise XfrinException(errmsg)
raise XfrinException('answer section is empty')
if msg.get_rr_count(Message.SECTION_QUESTION) > 1:
errmsg = 'query section count greater than 1'
self.log_msg(errmsg)
raise XfrinException(errmsg)
raise XfrinException('query section count greater than 1')
def _handle_answer_section(self, answer_section):
'''Return a generator for the reponse in one tcp package to a zone transfer.'''
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment