Commit c22bd7bc authored by Michal Vaner's avatar Michal Vaner
Browse files

Race conditions in xfrout

Similar changes as in previous commit, just in the notify_out module.

git-svn-id: svn:// e5f2f494-b856-4b98-b285-d166d9295462
parent c5a9ab2d
......@@ -426,8 +426,7 @@ class XfroutServer:
def _start_notifier(self):
datasrc = self._unix_socket_server.get_db_file()
self._notifier = notify_out.NotifyOut(datasrc, self._log)
td = threading.Thread(target=self._notifier.dispatcher)
def send_notify(self, zone_name, zone_class):
self._notifier.send_notify(zone_name, zone_class)
......@@ -440,7 +439,7 @@ class XfroutServer:
answer = create_answer(1, "Unknown config data: " + str(key))
self._config_data[key] = new_config[key]
if self._log:
......@@ -462,19 +461,12 @@ class XfroutServer:
if self._unix_socket_server:
main_thread = threading.currentThread()
# close the thread which's doing zone transfer.
for th in threading.enumerate():
if th is main_thread:
def command_handler(self, cmd, args):
if cmd == "shutdown":
self._log.log_message("info", "Received shutdown command.")
answer = create_answer(0)
elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
zone_name = args.get('zone_name')
zone_class = args.get('zone_class')
......@@ -489,7 +481,7 @@ class XfroutServer:
answer = create_answer(1, "Unknown command:" + str(cmd))
return answer
return answer
def run(self):
'''Get and process all commands sent from cfgmgr or other modules. '''
......@@ -105,8 +105,7 @@ class NotifyOut:
self._notifying_zones = []
self._log = log
self._serving = False
self._is_shut_down = threading.Event()
self._read_sock, self._write_sock = socket.socketpair()
self._read_sock = None
self.notify_num = 0 # the count of in progress notifies
self._verbose = verbose
self._lock = threading.Lock()
......@@ -149,24 +148,13 @@ class NotifyOut:
self.notify_num += 1
def dispatcher(self):
'''The loop function for handling notify related events.
If one zone get the notify reply before timeout, call the
handle to process the reply. If one zone can't get the notify
before timeout, call the handler to resend notify or notify
next slave.
The loop can be stoped by calling shutdown() in another
thread. '''
self._serving = True
def _dispatcher(self):
while self._serving:
replied_zones, not_replied_zones = self._wait_for_notify_reply()
if replied_zones is None:
# Let the master know we are alive already
if self._started_event:
if len(replied_zones) == 0 and len(not_replied_zones) == 0:
time.sleep(_IDLE_SLEEP_TIME) #TODO set a better time for idle sleep
replied_zones, not_replied_zones = self._wait_for_notify_reply()
for name_ in replied_zones:
self._zone_notify_handler(replied_zones[name_], _EVENT_READ)
......@@ -175,15 +163,61 @@ class NotifyOut:
if not_replied_zones[name_].notify_timeout <= time.time():
self._zone_notify_handler(not_replied_zones[name_], _EVENT_TIMEOUT)
def dispatcher(self, daemon=False):
Spawns a thread that will handle notify related events.
If one zone get the notify reply before timeout, call the
handle to process the reply. If one zone can't get the notify
before timeout, call the handler to resend notify or notify
next slave.
The thread can be stopped by calling shutdown().
Returns the thread object to anyone interested.
if self._serving:
raise RuntimeError(
'Dispatcher already running, tried to start twice')
# Prepare for launch
self._serving = True
self._started_event = threading.Event()
self._read_sock, self._write_sock = socket.socketpair()
# Start
self._thread = threading.Thread(target=self._dispatcher, args=())
if daemon:
self._thread.daemon = daemon
# Wait for it to get started
self._started_event = None
# Return it to anyone listening
return self._thread
def shutdown(self):
'''Stop the dispatcher() loop. Blocks until the loop has finished. This
must be called when dispatcher() is running in anther thread, or it
will deadlock. '''
Stop the dispatcher() thread. Blocks until the thread stopped.
if not self._serving:
raise RuntimeError('Tried to stop while not running')
# Ask it to stop
self._serving = False
self._write_sock.send(b'shutdown') # make self._read_sock be readable.
# Wait for it
# Clean up
self._write_sock = None
self._read_sock = None
self._thread = None
def _get_rdata_data(self, rr):
return rr[7].strip()
......@@ -220,56 +254,62 @@ class NotifyOut:
return addr_list
def _prepare_select_info(self):
'''Prepare the information for select(), returned
value is one tuple
Prepare the information for select(), returned
value is one tuple
(block_timeout, valid_socks, notifying_zones)
block_timeout: the timeout for select()
valid_socks: sockets list for waiting ready reading.
notifying_zones: the zones which have been triggered
for notify. '''
notifying_zones: the zones which have been triggered
for notify.
valid_socks = []
notifying_zones = {}
min_timeout = None
min_timeout = None
for info in self._notify_infos:
sock = self._notify_infos[info].get_socket()
if sock:
notifying_zones[info] = self._notify_infos[info]
tmp_timeout = self._notify_infos[info].notify_timeout
if min_timeout:
if min_timeout is not None:
if tmp_timeout < min_timeout:
min_timeout = tmp_timeout
min_timeout = tmp_timeout
block_timeout = 0
if min_timeout:
block_timeout = _IDLE_SLEEP_TIME
if min_timeout is not None:
block_timeout = min_timeout - time.time()
if block_timeout < 0:
block_timeout = 0
return (block_timeout, valid_socks, notifying_zones)
def _wait_for_notify_reply(self):
'''receive notify replies in specified time. returned value
is one tuple:(replied_zones, not_replied_zones). (None, None)
will be returned when self._read_sock is readable, since user
has called shutdown().
Receive notify replies in specified time. returned value
is one tuple:(replied_zones, not_replied_zones). ({}, {}) is
returned if shutdown() was called.
replied_zones: the zones which receive notify reply.
not_replied_zones: the zones which haven't got notify reply.
(block_timeout, valid_socks, notifying_zones) = self._prepare_select_info()
(block_timeout, valid_socks, notifying_zones) = \
# This is None only during some tests
if self._read_sock is not None:
r_fds, w, e =, [], [], block_timeout)
except select.error as err:
if err.args[0] != EINTR:
return {}, {}
if self._read_sock in r_fds:
return None, None # user has called shutdown()
return {}, {} # user has called shutdown()
not_replied_zones = {}
replied_zones = {}
for info in notifying_zones:
......@@ -128,10 +128,11 @@ class TestNotifyOut(unittest.TestCase):
# Now make one socket be readable
self._notify._notify_infos[('cn.', 'IN')].notify_timeout = time.time() + 10
self._notify._notify_infos[('com.', 'IN')].notify_timeout = time.time() + 10
self._notify._read_sock, self._notify._write_sock = socket.socketpair()
replied_zones, timeout_zones = self._notify._wait_for_notify_reply()
self.assertEqual(0, len(replied_zones))
self.assertEqual(0, len(timeout_zones))
def test_notify_next_target(self):
......@@ -268,7 +269,7 @@ class TestNotifyOut(unittest.TestCase):
def test_prepare_select_info(self):
timeout, valid_fds, notifying_zones = self._notify._prepare_select_info()
self.assertEqual(0, timeout)
self.assertEqual(notify_out._IDLE_SLEEP_TIME, timeout)
self.assertListEqual([], valid_fds)
self._notify._notify_infos[('cn.', 'IN')]._sock = 1
......@@ -290,12 +291,10 @@ class TestNotifyOut(unittest.TestCase):
self.assertListEqual([2, 1], valid_fds)
def test_shutdown(self):
import threading
td = threading.Thread(target=self._notify.dispatcher)
thread = self._notify.dispatcher()
if __name__== "__main__":
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment