Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
Sebastian Schrader
Kea
Commits
34e8602a
Commit
34e8602a
authored
Mar 15, 2011
by
Shane Kerr
Browse files
Change from starting the server to a check that uses the "mock"
BIND 10 server. Far better suited as a unit test.
parent
9d42d008
Changes
4
Hide whitespace changes
Inline
Side-by-side
src/bin/bind10/bind10.py.in
View file @
34e8602a
...
...
@@ -139,7 +139,8 @@ class ProcessInfo:
self.restart_schedule = RestartSchedule()
self.uid = uid
self.username = username
self._spawn()
self.process = None
self.pid = None
def _preexec_work(self):
"""Function used before running a program that needs to run as a
...
...
@@ -186,6 +187,11 @@ class ProcessInfo:
self.pid = self.process.pid
self.restart_schedule.set_run_start_time()
# spawn() and respawn() are the same for now, but in the future they
# may have different functionality
def spawn(self):
self._spawn()
def respawn(self):
self._spawn()
...
...
@@ -383,6 +389,7 @@ class BoB:
c_channel = ProcessInfo("b10-msgq", ["b10-msgq"], c_channel_env,
True, not self.verbose, uid=self.uid,
username=self.username)
c_channel.spawn()
self.processes[c_channel.pid] = c_channel
self.log_started(c_channel.pid)
...
...
@@ -407,6 +414,7 @@ class BoB:
bind_cfgd = ProcessInfo("b10-cfgmgr", ["b10-cfgmgr"],
c_channel_env, uid=self.uid,
username=self.username)
bind_cfgd.spawn()
self.processes[bind_cfgd.pid] = bind_cfgd
self.log_started(bind_cfgd.pid)
...
...
@@ -441,6 +449,7 @@ class BoB:
"""
self.log_starting(name, port, address)
newproc = ProcessInfo(name, args, c_channel_env)
newproc.spawn()
self.processes[newproc.pid] = newproc
self.log_started(newproc.pid)
...
...
src/bin/bind10/tests/Makefile.am
View file @
34e8602a
PYCOVERAGE_RUN
=
@PYCOVERAGE_RUN@
#PYTESTS = args_test.py bind10_test.py
bind10_cmd_test.py
PYTESTS
=
bind10_test.py
bind10_cmd_test.py
#PYTESTS = args_test.py bind10_test.py
PYTESTS
=
bind10_test.py
EXTRA_DIST
=
$(PYTESTS)
# test using command-line arguments, so use check-local target instead of TESTS
...
...
src/bin/bind10/tests/bind10_cmd_test.py
deleted
100644 → 0
View file @
9d42d008
"""
This program checks the BIND 10 boss interaction.
"""
import
unittest
import
subprocess
import
time
import
select
import
isc.cc
BIND10_EXE
=
"../run_bind10.sh"
TIMEOUT
=
3
def
_waitForString
(
bob
,
s
):
"""Read the input from the Process object until we find the
string we're looking for or we timeout."""
found_string
=
False
start_time
=
time
.
time
()
while
time
.
time
()
<
start_time
+
TIMEOUT
:
(
r
,
w
,
x
)
=
select
.
select
((
bob
.
stdout
,),
(),
(),
TIMEOUT
)
if
bob
.
stdout
in
r
:
s
=
bob
.
stdout
.
readline
()
if
s
==
''
:
break
if
s
.
startswith
(
s
):
found_string
=
True
break
return
found_string
class
TestBossCmd
(
unittest
.
TestCase
):
@
classmethod
def
setUpClass
(
cls
):
print
(
"setupclass?"
)
# start bind10
cls
.
_bob
=
subprocess
.
Popen
(
args
=
(
BIND10_EXE
,),
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
)
started_ok
=
_waitForString
(
cls
.
_bob
,
'[bind10] BIND 10 started'
)
if
not
started_ok
:
cls
.
_bob
.
terminate
()
cls
.
_bob
.
wait
()
cls
.
fail
(
'Unable to start BIND 10'
)
@
classmethod
def
tearDownClass
(
cls
):
# shut down bind10
cls
.
_bob
.
terminate
()
cls
.
_bob
.
wait
()
def
testPing
(
self
):
"""Simple aliveness check"""
ping_worked
=
False
# connect to the command channel
self
.
cc
=
isc
.
cc
.
Session
()
self
.
cc
.
group_subscribe
(
'Boss'
)
# send a ping
cmd
=
{
"command"
:
[
'ping'
]}
seq
=
self
.
cc
.
group_sendmsg
(
cmd
,
'Boss'
)
# wait for a pong
env
,
msg
=
self
.
cc
.
recvmsg
(
False
,
seq
)
if
'result'
in
msg
:
result
=
msg
[
'result'
]
if
(
result
[
0
]
==
0
)
and
(
result
[
1
]
==
'pong'
):
ping_worked
=
True
# check that we were able to ping
self
.
assertEqual
(
ping_worked
,
True
)
def
_check_processes
(
self
,
process_list
):
# the msgq and cfgmgr are required, everything else is optional
msgq_pid
=
None
cfgmgr_pid
=
None
for
process
in
process_list
:
if
len
(
process
)
!=
2
:
return
False
if
type
(
process
[
0
])
!=
int
:
return
False
if
type
(
process
[
1
])
!=
str
:
return
False
if
process
[
1
]
==
'b10-msgq'
:
msgq_pid
=
process
[
0
]
elif
process
[
1
]
==
'b10-cfgmgr'
:
cfgmgr_pid
=
process
[
0
]
if
msgq_pid
and
cfgmgr_pid
:
return
True
return
False
def
testShowServices
(
self
):
"""Get a list of children"""
command_worked
=
False
# connect to the command channel
self
.
cc
=
isc
.
cc
.
Session
()
self
.
cc
.
group_subscribe
(
'Boss'
)
# send a ping
cmd
=
{
"command"
:
[
'show_processes'
]}
seq
=
self
.
cc
.
group_sendmsg
(
cmd
,
'Boss'
)
# wait for a pong
env
,
msg
=
self
.
cc
.
recvmsg
(
False
,
seq
)
if
'result'
in
msg
:
result
=
msg
[
'result'
]
if
(
result
[
0
]
==
0
)
and
self
.
_check_processes
(
result
[
1
]):
command_worked
=
True
# check that we were able to ping
self
.
assertEqual
(
command_worked
,
True
)
if
__name__
==
'__main__'
:
# Python 3.2 and later support the setUpClass() and tearDownClass()
# class methods to unittest, which are what we want to avoid having
# to start/stop BIND 10 every time we run the test. For versions of
# unittest that do not support this, we invoke them explicitly
if
not
hasattr
(
unittest
.
TestCase
,
'setUpClass'
):
TestBossCmd
.
setUpClass
()
unittest
.
main
()
if
not
hasattr
(
unittest
.
TestCase
,
'tearDownClass'
):
TestBossCmd
.
tearDownClass
()
src/bin/bind10/tests/bind10_test.py
View file @
34e8602a
...
...
@@ -30,6 +30,7 @@ class TestProcessInfo(unittest.TestCase):
def
test_init
(
self
):
pi
=
ProcessInfo
(
'Test Process'
,
[
'/bin/echo'
,
'foo'
])
pi
.
spawn
()
os
.
dup2
(
self
.
old_stdout
,
sys
.
stdout
.
fileno
())
self
.
assertEqual
(
pi
.
name
,
'Test Process'
)
self
.
assertEqual
(
pi
.
args
,
[
'/bin/echo'
,
'foo'
])
...
...
@@ -50,12 +51,14 @@ class TestProcessInfo(unittest.TestCase):
def
test_setting_null_stdout
(
self
):
pi
=
ProcessInfo
(
'Test Process'
,
[
'/bin/echo'
,
'foo'
],
dev_null_stdout
=
True
)
pi
.
spawn
()
os
.
dup2
(
self
.
old_stdout
,
sys
.
stdout
.
fileno
())
self
.
assertEqual
(
pi
.
dev_null_stdout
,
True
)
self
.
assertEqual
(
os
.
read
(
self
.
pipes
[
0
],
100
),
b
""
)
def
test_respawn
(
self
):
pi
=
ProcessInfo
(
'Test Process'
,
[
'/bin/echo'
,
'foo'
])
pi
.
spawn
()
# wait for old process to work...
self
.
assertEqual
(
os
.
read
(
self
.
pipes
[
0
],
100
),
b
"foo
\n
"
)
# respawn it
...
...
@@ -104,17 +107,19 @@ class TestBoB(unittest.TestCase):
self
.
assertEqual
(
bob
.
cfg_start_auth
,
True
)
self
.
assertEqual
(
bob
.
cfg_start_resolver
,
False
)
# Class for testing the BoB start/stop components routines.
# Class for testing the BoB without actually starting processes.
# This is used for testing the start/stop components routines and
# the BoB commands.
#
#
Although t
esting that external processes start is outside the scope
#
T
esting that external processes start is outside the scope
# of the unit test, by overriding the process start methods we can check
# that the right processes are started depending on the configuration
# options.
class
StartStopChe
ckBob
(
BoB
):
class
Mo
ckBob
(
BoB
):
def
__init__
(
self
):
BoB
.
__init__
(
self
)
# Set flags as to which of the overridden methods has been run.
# Set flags as to which of the overridden methods has been run.
self
.
msgq
=
False
self
.
cfgmgr
=
False
self
.
ccsession
=
False
...
...
@@ -126,6 +131,7 @@ class StartStopCheckBob(BoB):
self
.
stats
=
False
self
.
cmdctl
=
False
self
.
c_channel_env
=
{}
self
.
processes
=
{
}
def
read_bind10_config
(
self
):
# Configuration options are set directly
...
...
@@ -133,65 +139,95 @@ class StartStopCheckBob(BoB):
def
start_msgq
(
self
,
c_channel_env
):
self
.
msgq
=
True
self
.
processes
[
2
]
=
ProcessInfo
(
'b10-msgq'
,
[
'/bin/false'
])
def
start_cfgmgr
(
self
,
c_channel_env
):
self
.
cfgmgr
=
True
self
.
processes
[
3
]
=
ProcessInfo
(
'b10-cfgmgr'
,
[
'/bin/false'
])
def
start_ccsession
(
self
,
c_channel_env
):
self
.
ccsession
=
True
self
.
processes
[
4
]
=
ProcessInfo
(
'b10-ccsession'
,
[
'/bin/false'
])
def
start_auth
(
self
,
c_channel_env
):
self
.
auth
=
True
self
.
processes
[
5
]
=
ProcessInfo
(
'b10-auth'
,
[
'/bin/false'
])
def
start_resolver
(
self
,
c_channel_env
):
self
.
resolver
=
True
self
.
processes
[
6
]
=
ProcessInfo
(
'b10-resolver'
,
[
'/bin/false'
])
def
start_xfrout
(
self
,
c_channel_env
):
self
.
xfrout
=
True
self
.
processes
[
7
]
=
ProcessInfo
(
'b10-xfrout'
,
[
'/bin/false'
])
def
start_xfrin
(
self
,
c_channel_env
):
self
.
xfrin
=
True
self
.
processes
[
8
]
=
ProcessInfo
(
'b10-xfrin'
,
[
'/bin/false'
])
def
start_zonemgr
(
self
,
c_channel_env
):
self
.
zonemgr
=
True
self
.
processes
[
9
]
=
ProcessInfo
(
'b10-zonemgr'
,
[
'/bin/false'
])
def
start_stats
(
self
,
c_channel_env
):
self
.
stats
=
True
self
.
processes
[
10
]
=
ProcessInfo
(
'b10-stats'
,
[
'/bin/false'
])
def
start_cmdctl
(
self
,
c_channel_env
):
self
.
cmdctl
=
True
self
.
processes
[
11
]
=
ProcessInfo
(
'b10-cmdctl'
,
[
'/bin/false'
])
# We don't really use all of these stop_ methods. But it might turn out
# someone would add some stop_ method to BoB and we want that one overriden
# in case he forgets to update the tests.
def
stop_msgq
(
self
):
if
self
.
msgq
:
del
self
.
processes
[
2
]
self
.
msgq
=
False
def
stop_cfgmgr
(
self
):
if
self
.
cfgmgr
:
del
self
.
processes
[
3
]
self
.
cfgmgr
=
False
def
stop_ccsession
(
self
):
if
self
.
ccssession
:
del
self
.
processes
[
4
]
self
.
ccsession
=
False
def
stop_auth
(
self
):
if
self
.
auth
:
del
self
.
processes
[
5
]
self
.
auth
=
False
def
stop_resolver
(
self
):
if
self
.
resolver
:
del
self
.
processes
[
6
]
self
.
resolver
=
False
def
stop_xfrout
(
self
):
if
self
.
xfrout
:
del
self
.
processes
[
7
]
self
.
xfrout
=
False
def
stop_xfrin
(
self
):
if
self
.
xfrin
:
del
self
.
processes
[
8
]
self
.
xfrin
=
False
def
stop_zonemgr
(
self
):
if
self
.
zonemgr
:
del
self
.
processes
[
9
]
self
.
zonemgr
=
False
def
stop_stats
(
self
):
if
self
.
stats
:
del
self
.
processes
[
10
]
self
.
stats
=
False
def
stop_cmdctl
(
self
):
if
self
.
cmdctl
:
del
self
.
processes
[
11
]
self
.
cmdctl
=
False
class
TestStartStopProcessesBob
(
unittest
.
TestCase
):
...
...
@@ -251,7 +287,7 @@ class TestStartStopProcessesBob(unittest.TestCase):
# is specified.
def
test_start_none
(
self
):
# Create BoB and ensure correct initialization
bob
=
StartStopChe
ckBob
()
bob
=
Mo
ckBob
()
self
.
check_preconditions
(
bob
)
# Start processes and check what was started
...
...
@@ -264,7 +300,7 @@ class TestStartStopProcessesBob(unittest.TestCase):
# Checks the processes started when starting only the auth process
def
test_start_auth
(
self
):
# Create BoB and ensure correct initialization
bob
=
StartStopChe
ckBob
()
bob
=
Mo
ckBob
()
self
.
check_preconditions
(
bob
)
# Start processes and check what was started
...
...
@@ -278,7 +314,7 @@ class TestStartStopProcessesBob(unittest.TestCase):
# Checks the processes started when starting only the resolver process
def
test_start_resolver
(
self
):
# Create BoB and ensure correct initialization
bob
=
StartStopChe
ckBob
()
bob
=
Mo
ckBob
()
self
.
check_preconditions
(
bob
)
# Start processes and check what was started
...
...
@@ -292,7 +328,7 @@ class TestStartStopProcessesBob(unittest.TestCase):
# Checks the processes started when starting both auth and resolver process
def
test_start_both
(
self
):
# Create BoB and ensure correct initialization
bob
=
StartStopChe
ckBob
()
bob
=
Mo
ckBob
()
self
.
check_preconditions
(
bob
)
# Start processes and check what was started
...
...
@@ -310,7 +346,7 @@ class TestStartStopProcessesBob(unittest.TestCase):
"""
# Create BoB and ensure correct initialization
bob
=
StartStopChe
ckBob
()
bob
=
Mo
ckBob
()
self
.
check_preconditions
(
bob
)
# Start processes (nothing much should be started, as in
...
...
@@ -375,7 +411,7 @@ class TestStartStopProcessesBob(unittest.TestCase):
Tests that a process is started only once.
"""
# Create BoB and ensure correct initialization
bob
=
StartStopChe
ckBob
()
bob
=
Mo
ckBob
()
self
.
check_preconditions
(
bob
)
# Start processes (both)
...
...
@@ -401,7 +437,7 @@ class TestStartStopProcessesBob(unittest.TestCase):
Test that processes are not started by the config handler before
startup.
"""
bob
=
StartStopChe
ckBob
()
bob
=
Mo
ckBob
()
self
.
check_preconditions
(
bob
)
bob
.
start_auth
=
lambda
:
self
.
fail
(
"Started auth again"
)
...
...
@@ -412,5 +448,40 @@ class TestStartStopProcessesBob(unittest.TestCase):
bob
.
config_handler
({
'start_auth'
:
True
,
'start_resolver'
:
True
})
class
TestBossCmd
(
unittest
.
TestCase
):
def
test_ping
(
self
):
"""
Confirm simple ping command works.
"""
bob
=
MockBob
()
answer
=
bob
.
command_handler
(
"ping"
,
None
)
self
.
assertEqual
(
answer
,
{
'result'
:
[
0
,
'pong'
]})
def
test_show_processes
(
self
):
"""
Confirm getting a list of processes works.
"""
bob
=
MockBob
()
answer
=
bob
.
command_handler
(
"show_processes"
,
None
)
self
.
assertEqual
(
answer
,
{
'result'
:
[
0
,
[]]})
def
test_show_processes_started
(
self
):
"""
Confirm getting a list of processes works.
"""
bob
=
MockBob
()
bob
.
start_all_processes
()
answer
=
bob
.
command_handler
(
"show_processes"
,
None
)
processes
=
[[
2
,
'b10-msgq'
],
[
3
,
'b10-cfgmgr'
],
[
4
,
'b10-ccsession'
],
[
5
,
'b10-auth'
],
[
7
,
'b10-xfrout'
],
[
8
,
'b10-xfrin'
],
[
9
,
'b10-zonemgr'
],
[
10
,
'b10-stats'
],
[
11
,
'b10-cmdctl'
]]
self
.
assertEqual
(
answer
,
{
'result'
:
[
0
,
processes
]})
if
__name__
==
'__main__'
:
unittest
.
main
()
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment