Commit c5a9ab2d authored by Michal Vaner's avatar Michal Vaner
Browse files

Remove race conditions and spurious wakeup problems

* If select created a spurious wakeup (probably only theoretical problem
  on unix domain socket), it could block indefinitely.
* Eliminated race condition on fast start-shutdown serie
* Made the run_timers()-shutdown() be callable multiple times
* Fixed a shutdown problem with empty zone manager (it didn't get to
  select when there were no data)

git-svn-id: svn://bind10.isc.org/svn/bind10/branches/trac335@3191 e5f2f494-b856-4b98-b285-d166d9295462
parent 288f0861
......@@ -40,11 +40,9 @@ class MySession():
class MyZonemgrRefresh(ZonemgrRefresh):
def __init__(self):
self._cc = MySession()
self._db_file = "initdb.file"
self._is_shut_down = threading.Event()
self._read_sock, self._write_sock = socket.socketpair()
self._master_socket, self._slave_socket = socket.socketpair()
ZonemgrRefresh.__init__(self, MySession(), "initdb.file",
self._slave_socket)
self._zonemgr_refresh_info = {
('sd.cn.', 'IN'): {
'last_refresh_time': 1280474398.822142,
......@@ -58,16 +56,6 @@ class MyZonemgrRefresh(ZonemgrRefresh):
'zone_state': 0}
}
def stop_timer(self):
''' Exit timer thread '''
self.shutdown()
main_thread = threading.currentThread()
for th in threading.enumerate():
if th is main_thread:
continue
th.join()
class TestZonemgrRefresh(unittest.TestCase):
def setUp(self):
self.stderr_backup = sys.stderr
......@@ -104,7 +92,7 @@ class TestZonemgrRefresh(unittest.TestCase):
time2 = time.time()
self.assertTrue((time1 + 7200 * 3 / 4) <= zone_timeout)
self.assertTrue(zone_timeout <= time2 + 7200)
def test_set_zone_retry_timer(self):
time1 = time.time()
self.zone_refresh._set_zone_retry_timer(ZONE_NAME_CLASS1_IN)
......@@ -150,6 +138,8 @@ class TestZonemgrRefresh(unittest.TestCase):
def test_zonemgr_reload_zone(self):
soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
# We need to restore this not to harm other tests
old_get_zone_soa = sqlite3_ds.get_zone_soa
def get_zone_soa(zone_name, db_file):
return (1, 2, 'sd.cn.', 'cn.sd.', 21600, 'SOA', None,
'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600')
......@@ -157,6 +147,7 @@ class TestZonemgrRefresh(unittest.TestCase):
self.zone_refresh.zonemgr_reload_zone(ZONE_NAME_CLASS1_IN)
self.assertEqual(soa_rdata, self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_soa_rdata"])
sqlite3_ds.get_zone_soa = old_get_zone_soa
def test_get_zone_notifier_master(self):
notify_master = "192.168.1.1"
......@@ -234,6 +225,9 @@ class TestZonemgrRefresh(unittest.TestCase):
def test_zonemgr_add_zone(self):
soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
# This needs to be restored. The following test actually failed if we left
# this unclean
old_get_zone_soa = sqlite3_ds.get_zone_soa
def get_zone_soa(zone_name, db_file):
return (1, 2, 'sd.cn.', 'cn.sd.', 21600, 'SOA', None,
......@@ -254,7 +248,8 @@ class TestZonemgrRefresh(unittest.TestCase):
return None
sqlite3_ds.get_zone_soa = get_zone_soa2
self.assertRaises(ZonemgrException, self.zone_refresh.zonemgr_add_zone, \
ZONE_NAME_CLASS1_IN)
ZONE_NAME_CLASS1_IN)
sqlite3_ds.get_zone_soa = old_get_zone_soa
def test_build_zonemgr_refresh_info(self):
soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
......@@ -391,7 +386,7 @@ class TestZonemgrRefresh(unittest.TestCase):
"""This case will run timer in daemon thread.
The zone's next_refresh_time is less than now, so zonemgr will do zone refresh
immediately. The zone's state will become "refreshing".
Then closing the socket ,the timer will stop, and throw a ZonemgrException."""
"""
time1 = time.time()
self.zone_refresh._zonemgr_refresh_info = {
("sd.cn.", "IN"):{
......@@ -401,13 +396,9 @@ class TestZonemgrRefresh(unittest.TestCase):
'zone_state': ZONE_OK}
}
self.zone_refresh._check_sock = self.zone_refresh._master_socket
listener = threading.Thread(target = self.zone_refresh.run_timer, args = ())
listener.setDaemon(True)
listener.start()
# Sleep 1 sec to ensure that the timer thread has enough time to run.
time.sleep(1)
listener = self.zone_refresh.run_timer(daemon=True)
# Shut down the timer thread
self.zone_refresh.stop_timer()
self.zone_refresh.shutdown()
# After running timer, the zone's state should become "refreshing".
zone_state = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"]
self.assertTrue("refresh_timeout" in self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN].keys())
......@@ -415,11 +406,10 @@ class TestZonemgrRefresh(unittest.TestCase):
def test_shutdown(self):
self.zone_refresh._check_sock = self.zone_refresh._master_socket
listener = threading.Thread(target = self.zone_refresh.run_timer)
listener.start()
listener = self.zone_refresh.run_timer()
self.assertTrue(listener.is_alive())
# Shut down the timer thread
self.zone_refresh.stop_timer()
self.zone_refresh.shutdown()
self.assertFalse(listener.is_alive())
def tearDown(self):
......
......@@ -15,7 +15,7 @@
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""\
"""
This file implements the Secondary Manager program.
The secondary manager is one of the co-operating processes
......@@ -103,10 +103,10 @@ class ZonemgrRefresh:
def __init__(self, cc, db_file, slave_socket):
self._cc = cc
self._check_sock = slave_socket
self._read_sock, self._write_sock = socket.socketpair()
self._db_file = db_file
self._zonemgr_refresh_info = {}
self._build_zonemgr_refresh_info()
self._running = False
def _random_jitter(self, max, jitter):
"""Imposes some random jitters for refresh and
......@@ -331,25 +331,21 @@ class ZonemgrRefresh:
return False
def run_timer(self):
"""Keep track of zone timers. The loop can be stopped by calling shutdown() in
another thread.
"""
while True:
def _run_timer(self):
while self._running:
# Zonemgr has no zone.
if self._zone_mgr_is_empty():
time.sleep(LOWERBOUND_RETRY) # A better time?
continue
zone_need_refresh = self._find_need_do_refresh_zone()
# If don't get zone with minimum next refresh time, set timer timeout = LOWERBOUND_REFRESH
if not zone_need_refresh:
timeout = LOWERBOUND_RETRY
else:
timeout = self._get_zone_next_refresh_time(zone_need_refresh) - self._get_current_time()
if (timeout < 0):
self._do_refresh(zone_need_refresh)
continue
zone_need_refresh = self._find_need_do_refresh_zone()
# If don't get zone with minimum next refresh time, set timer timeout = LOWERBOUND_REFRESH
if not zone_need_refresh:
timeout = LOWERBOUND_RETRY
else:
timeout = self._get_zone_next_refresh_time(zone_need_refresh) - self._get_current_time()
if (timeout < 0):
self._do_refresh(zone_need_refresh)
continue
""" Wait for the socket notification for a maximum time of timeout
in seconds (as float)."""
......@@ -364,15 +360,52 @@ class ZonemgrRefresh:
for fd in rlist:
if fd == self._read_sock: # awaken by shutdown socket
self._read_sock.recv(32)
return
# self._running will be False by now, if it is not a false
# alarm
continue
if fd == self._check_sock: # awaken by check socket
self._check_sock.recv(32)
def run_timer(self, daemon=False):
"""
Keep track of zone timers. Spawns and starts a thread. The thread object is returned.
You can stop it by calling shutdown().
"""
# Small sanity check
if self._running:
raise RuntimeError("Trying to run the timers twice at the same time")
# Prepare the launch
self._running = True
(self._read_sock, self._write_sock) = socket.socketpair()
# Start the thread
self._thread = threading.Thread(target = self._run_timer, args = ())
if daemon:
self._thread.setDaemon(True)
self._thread.start()
# Return the thread to anyone interested
return self._thread
def shutdown(self):
"""Stop the run_timer() loop. Block until the loop has finished. This must be
called when run_timer() is running in another thread, or it will deadlock."""
"""
Stop the run_timer() thread. Block until it finished. This must be
called from a different thread.
"""
if not self._running:
raise RuntimeError("Trying to shutdown, but not running")
# Ask the thread to stop
self._running = False
self._write_sock.send(b'shutdown') # make self._read_sock readble
# Wait for it to actually finnish
self._thread.join()
# Wipe out what we do not need
self._thread = None
self._read_sock = None
self._write_sock = None
class Zonemgr:
"""Zone manager class."""
......@@ -382,16 +415,11 @@ class Zonemgr:
# Create socket pair for communicating between main thread and zonemgr timer thread
self._master_socket, self._slave_socket = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
self._zone_refresh = ZonemgrRefresh(self._cc, self._db_file, self._slave_socket)
self._start_zone_refresh_timer()
self._zone_refresh.run_timer()
self._lock = threading.Lock()
self._shutdown_event = threading.Event()
def _start_zone_refresh_timer(self):
"""Start a new thread to keep track of zone timers"""
listener = threading.Thread(target = self._zone_refresh.run_timer, args = ())
listener.start()
def _setup_session(self):
"""Setup two sessions for zonemgr, one(self._module_cc) is used for receiving
commands and config data sent from other modules, another one (self._cc)
......@@ -422,12 +450,6 @@ class Zonemgr:
self._slave_socket.close()
self._master_socket.close()
self._shutdown_event.set()
main_thread = threading.currentThread()
for th in threading.enumerate():
# Stop the thread which is running zone refresh timer
if th is main_thread:
continue
th.join()
def config_handler(self, new_config):
"""Update config data."""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment