Commit 26b0ae80 authored by Mukund Sivaraman's avatar Mukund Sivaraman
Browse files

Merge branch 'trac2856'

parents 6606b6d0 e3e92f01
......@@ -1295,6 +1295,7 @@ AC_CONFIG_FILES([Makefile
src/lib/python/isc/ddns/tests/Makefile
src/lib/python/isc/memmgr/Makefile
src/lib/python/isc/memmgr/tests/Makefile
src/lib/python/isc/memmgr/tests/testdata/Makefile
src/lib/python/isc/xfrin/Makefile
src/lib/python/isc/xfrin/tests/Makefile
src/lib/python/isc/server_common/Makefile
......
......@@ -162,7 +162,7 @@ class Memmgr(BIND10Server):
# This makes the MemorySegmentBuilder exit its main loop. It
# should make the builder thread joinable.
with self._builder_cv:
self._builder_command_queue.append('shutdown')
self._builder_command_queue.append(('shutdown',))
self._builder_cv.notify_all()
self._builder_thread.join()
......
......@@ -4,6 +4,7 @@ EXTRA_DIST = __init__.py
EXTRA_DIST += init_messages.py
EXTRA_DIST += cmdctl_messages.py
EXTRA_DIST += ddns_messages.py
EXTRA_DIST += libmemmgr_messages.py
EXTRA_DIST += memmgr_messages.py
EXTRA_DIST += stats_messages.py
EXTRA_DIST += stats_httpd_messages.py
......@@ -25,6 +26,7 @@ CLEANFILES = __init__.pyc
CLEANFILES += init_messages.pyc
CLEANFILES += cmdctl_messages.pyc
CLEANFILES += ddns_messages.pyc
CLEANFILES += libmemmgr_messages.pyc
CLEANFILES += memmgr_messages.pyc
CLEANFILES += stats_messages.pyc
CLEANFILES += stats_httpd_messages.pyc
......
from work.libmemmgr_messages import *
SUBDIRS = . tests
python_PYTHON = __init__.py builder.py datasrc_info.py
python_PYTHON = __init__.py builder.py datasrc_info.py logger.py
pythondir = $(pyexecdir)/isc/memmgr
BUILT_SOURCES = $(PYTHON_LOGMSGPKG_DIR)/work/libmemmgr_messages.py
nodist_pylogmessage_PYTHON = $(PYTHON_LOGMSGPKG_DIR)/work/libmemmgr_messages.py
pylogmessagedir = $(pyexecdir)/isc/log_messages/
CLEANFILES = $(PYTHON_LOGMSGPKG_DIR)/work/libmemmgr_messages.py
CLEANFILES += $(PYTHON_LOGMSGPKG_DIR)/work/libmemmgr_messages.pyc
EXTRA_DIST = libmemmgr_messages.mes
$(PYTHON_LOGMSGPKG_DIR)/work/libmemmgr_messages.py : libmemmgr_messages.mes
$(top_builddir)/src/lib/log/compiler/message \
-d $(PYTHON_LOGMSGPKG_DIR)/work -p $(srcdir)/libmemmgr_messages.mes
CLEANDIRS = __pycache__
clean-local:
......
......@@ -13,6 +13,13 @@
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import json
from isc.datasrc import ConfigurableClientList
from isc.memmgr.datasrc_info import SegmentInfo
from isc.log_messages.libmemmgr_messages import *
from isc.memmgr.logger import logger
class MemorySegmentBuilder:
"""The builder runs in a different thread in the memory manager. It
waits for commands from the memory manager, and then executes them
......@@ -50,6 +57,83 @@ class MemorySegmentBuilder:
self._response_queue = response_queue
self._shutdown = False
def __handle_shutdown(self):
# This method is called when handling the 'shutdown' command. The
# following tuple is passed:
#
# ('shutdown',)
self._shutdown = True
def __handle_bad_command(self, bad_command):
# A bad command was received. Raising an exception is not useful
# in this case as we are likely running in a different thread
# from the main thread which would need to be notified. Instead
# return this in the response queue.
logger.error(LIBMEMMGR_BUILDER_BAD_COMMAND_ERROR, bad_command)
self._response_queue.append(('bad_command',))
self._shutdown = True
def __handle_load(self, zone_name, dsrc_info, rrclass, dsrc_name):
# This method is called when handling the 'load' command. The
# following tuple is passed:
#
# ('load', zone_name, dsrc_info, rrclass, dsrc_name)
#
# where:
#
# * zone_name is None or isc.dns.Name, specifying the zone name
# to load. If it's None, it means all zones to be cached in
# the specified data source (used for initialization).
#
# * dsrc_info is a DataSrcInfo object corresponding to the
# generation ID of the set of data sources for this loading.
#
# * rrclass is an isc.dns.RRClass object, the RR class of the
# data source.
#
# * dsrc_name is a string, specifying a data source name.
clist = dsrc_info.clients_map[rrclass]
sgmt_info = dsrc_info.segment_info_map[(rrclass, dsrc_name)]
params = json.dumps(sgmt_info.get_reset_param(SegmentInfo.WRITER))
clist.reset_memory_segment(dsrc_name,
ConfigurableClientList.READ_WRITE,
params)
if zone_name is not None:
zones = [(None, zone_name)]
else:
zones = clist.get_zone_table_accessor(dsrc_name, True)
for _, zone_name in zones:
catch_load_error = (zone_name is None) # install empty zone initially
result, writer = clist.get_cached_zone_writer(zone_name, catch_load_error,
dsrc_name)
if result != ConfigurableClientList.CACHE_STATUS_ZONE_SUCCESS:
logger.error(LIBMEMMGR_BUILDER_GET_ZONE_WRITER_ERROR, zone_name, dsrc_name)
continue
try:
error = writer.load()
if error is not None:
logger.error(LIBMEMMGR_BUILDER_ZONE_WRITER_LOAD_1_ERROR, zone_name, dsrc_name, error)
continue
except Exception as e:
logger.error(LIBMEMMGR_BUILDER_ZONE_WRITER_LOAD_2_ERROR, zone_name, dsrc_name, str(e))
continue
writer.install()
writer.cleanup()
# need to reset the segment so readers can read it (note: memmgr
# itself doesn't have to keep it open, but there's currently no
# public API to just clear the segment)
clist.reset_memory_segment(dsrc_name,
ConfigurableClientList.READ_ONLY,
params)
self._response_queue.append(('load-completed', dsrc_info, rrclass,
dsrc_name))
def run(self):
""" This is the method invoked when the builder thread is
started. In this thread, be careful when modifying
......@@ -64,7 +148,7 @@ class MemorySegmentBuilder:
# Acquire the condition variable while running the loop.
with self._cv:
while not self._shutdown:
while len(self._command_queue) == 0:
while not self._command_queue:
self._cv.wait()
# Move the queue content to a local queue. Be careful of
# not making assignments to reference variables.
......@@ -74,26 +158,28 @@ class MemorySegmentBuilder:
# Run commands passed in the command queue sequentially
# in the given order. For now, it only supports the
# "shutdown" command, which just exits the thread.
for command in local_command_queue:
if command == 'shutdown':
self._shutdown = True
for command_tuple in local_command_queue:
command = command_tuple[0]
if command == 'load':
# See the comments for __handle_load() for
# details of the tuple passed to the "load"
# command.
_, zone_name, dsrc_info, rrclass, dsrc_name = command_tuple
self.__handle_load(zone_name, dsrc_info, rrclass, dsrc_name)
elif command == 'shutdown':
self.__handle_shutdown()
# When the shutdown command is received, we do
# not process any further commands.
break
else:
# A bad command was received. Raising an
# exception is not useful in this case as we are
# likely running in a different thread from the
# main thread which would need to be
# notified. Instead return this in the response
# queue.
self._response_queue.append(('bad_command',))
self._shutdown = True
self.__handle_bad_command(command)
# When a bad command is received, we do not
# process any further commands.
break
# Notify (any main thread) on the socket about a
# response. Otherwise, the main thread may wait in its
# loop without knowing there was a problem.
if len(self._response_queue) > 0:
if self._response_queue:
while self._sock.send(b'x') != 1:
continue
......@@ -14,6 +14,7 @@
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import os
from collections import deque
class SegmentInfoError(Exception):
"""An exception raised for general errors in the SegmentInfo class."""
......@@ -33,16 +34,208 @@ class SegmentInfo:
segment-type specific details. Such details are expected to be
delegated to subclasses corresponding to specific types of segments.
The implementation is still incomplete. It will have more attributes
such as a set of current readers, methods for adding or deleting
the readers. These will probably be implemented in this base class
as they will be independent from segment-type specific details.
A summarized (and simplified) state transition diagram (for __state)
would be as follows:
+--sync_reader()/remove_reader()
| still have old readers
| |
UPDATING-----complete_--->SYNCHRONIZING<---+
^ update() |
start_update()| | sync_reader()/remove_reader()
events | V no more old reader
exist READY<------complete_----------COPYING
update()
"""
# Common constants of user type: reader or writer
READER = 0
WRITER = 1
# Enumerated values for state:
UPDATING = 0 # the segment is being updated (by the builder thread,
# although SegmentInfo won't care about this level of
# details).
SYNCHRONIZING = 1 # one pair of underlying segments has been
# updated, and readers are now migrating to the
# updated version of the segment.
COPYING = 2 # all readers that used the old version of segment have
# been migrated to the updated version, and the old
# segment is now being updated.
READY = 3 # both segments of the pair have been updated. it can now
# handle further updates (e.g., from xfrin).
def __init__(self):
# Holds the state of SegmentInfo. See the class description
# above for the state transition diagram.
self.__state = self.READY
# __readers is a set of 'reader_session_id' private to
# SegmentInfo. It consists of the (ID of) reader modules that
# are using the "current" reader version of the segment.
self.__readers = set()
# __old_readers is a set of 'reader_session_id' private to
# SegmentInfo for write (update), but publicly readable. It can
# be non empty only in the SYNCHRONIZING state, and consists of
# (ID of) reader modules that are using the old version of the
# segments (and have to migrate to the updated version).
self.__old_readers = set()
# __events is a FIFO queue of opaque data for pending update
# events. Update events can come at any time (e.g., after
# xfr-in), but can be only handled if SegmentInfo is in the
# READY state. This maintains such pending events in the order
# they arrived. SegmentInfo doesn't have to know the details of
# the stored data; it only matters for the memmgr.
self.__events = deque()
def get_state(self):
"""Returns the state of SegmentInfo (UPDATING, SYNCHRONIZING,
COPYING or READY)."""
return self.__state
def get_readers(self):
"""Returns a set of IDs of the reader modules that are using the
"current" reader version of the segment. This method is mainly
useful for testing purposes."""
return self.__readers
def get_old_readers(self):
"""Returns a set of IDs of reader modules that are using the old
version of the segments and have to be migrated to the updated
version."""
return self.__old_readers
def get_events(self):
"""Returns a list of pending events in the order they arrived."""
return list(self.__events)
# Helper method used in complete_update(), sync_reader() and
# remove_reader().
def __sync_reader_helper(self, new_state):
if not self.__old_readers:
self.__state = new_state
if self.__events:
return self.__events.popleft()
return None
def add_event(self, event_data):
"""Add an event to the end of the pending events queue. The
event_data is not used internally by this class, and is returned
as-is by other methods. The format of event_data only matters in
the memmgr. This method must be called by memmgr when it
receives a request for reloading a zone. No state transition
happens."""
self.__events.append(event_data)
def add_reader(self, reader_session_id):
"""Add the reader module ID to an internal set of reader modules
that are using the "current" reader version of the segment. It
must be called by memmgr when it first gets the pre-existing
readers or when it's notified of a new reader. No state
transition happens.
When the SegmentInfo is not in the READY state, if memmgr gets
notified of a new reader (such as b10-auth) subscribing to the
readers group and calls add_reader(), we assume the new reader
is using the new mapped file and not the old one. For making
sure there is no race, memmgr should make SegmentInfo updates in
the main thread itself (which also handles communications) and
only have the builder in a different thread."""
if reader_session_id in self.__readers:
raise SegmentInfoError('Reader session ID is already in readers set: ' +
str(reader_session_id))
self.__readers.add(reader_session_id)
def start_update(self):
"""If the current state is READY and there are pending events,
it changes the state to UPDATING and returns the head (oldest)
event (without removing it from the pending events queue). This
tells the caller (memmgr) that it should initiate the update
process with the builder. In all other cases it returns None."""
if self.__state == self.READY:
if self.__events:
self.__state = self.UPDATING
return self.__events[0]
else:
return None
raise SegmentInfoError('start_update() called in ' +
'incorrect state: ' + str(self.__state))
def complete_update(self):
"""This method should be called when memmgr is notified by the
builder of the completion of segment update. It changes the
state from UPDATING to SYNCHRONIZING, and COPYING to READY. In
the former case, set of reader modules that are using the
"current" reader version of the segment are moved to the set
that are using an "old" version of segment. If there are no such
readers using the "old" version of segment, it pops the head
(oldest) event from the pending events queue and returns it. It
is an error if this method is called in other states than
UPDATING and COPYING."""
if self.__state == self.UPDATING:
self.__state = self.SYNCHRONIZING
self.__old_readers = self.__readers
self.__readers = set()
return self.__sync_reader_helper(self.READY)
elif self.__state == self.COPYING:
self.__state = self.READY
return None
else:
raise SegmentInfoError('complete_update() called in ' +
'incorrect state: ' + str(self.__state))
def sync_reader(self, reader_session_id):
"""This method must only be called in the SYNCHRONIZING
state. memmgr should call it when it receives the
"segment_update_ack" message from a reader module. It moves the
given ID from the set of reader modules that are using the "old"
version of the segment to the set of reader modules that are
using the "current" version of the segment, and if there are no
reader modules using the "old" version of the segment, the state
is changed to COPYING. If the state has changed to COPYING, it
pops the head (oldest) event from the pending events queue and
returns it; otherwise it returns None."""
if self.__state != self.SYNCHRONIZING:
raise SegmentInfoError('sync_reader() called in ' +
'incorrect state: ' + str(self.__state))
if reader_session_id not in self.__old_readers:
raise SegmentInfoError('Reader session ID is not in old readers set: ' +
str(reader_session_id))
if reader_session_id in self.__readers:
raise SegmentInfoError('Reader session ID is already in readers set: ' +
str(reader_session_id))
self.__old_readers.remove(reader_session_id)
self.__readers.add(reader_session_id)
return self.__sync_reader_helper(self.COPYING)
def remove_reader(self, reader_session_id):
"""This method must only be called in the SYNCHRONIZING
state. memmgr should call it when it's notified that an existing
reader has unsubscribed. It removes the given reader ID from
either the set of readers that use the "current" version of the
segment or the "old" version of the segment (wherever the reader
belonged), and in the latter case, if there are no reader
modules using the "old" version of the segment, the state is
changed to COPYING. If the state has changed to COPYING, it pops
the head (oldest) event from the pending events queue and
returns it; otherwise it returns None."""
if self.__state != self.SYNCHRONIZING:
raise SegmentInfoError('remove_reader() called in ' +
'incorrect state: ' + str(self.__state))
if reader_session_id in self.__old_readers:
self.__old_readers.remove(reader_session_id)
return self.__sync_reader_helper(self.COPYING)
elif reader_session_id in self.__readers:
self.__readers.remove(reader_session_id)
return None
else:
raise SegmentInfoError('Reader session ID is not in current ' +
'readers or old readers set: ' +
str(reader_session_id))
def create(type, genid, rrclass, datasrc_name, mgr_config):
"""Factory of specific SegmentInfo subclass instance based on the
segment type.
......
# Copyright (C) 2013 Internet Systems Consortium, Inc. ("ISC")
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
# No namespace declaration - these constants go in the global namespace
# of the config_messages python module.
% LIBMEMMGR_BUILDER_BAD_COMMAND_ERROR MemorySegmentBuilder received bad command '%1'
The MemorySegmentBuilder has received a bad command in its input command
queue. This is likely a programming error. If the builder runs in a
separate thread, this would cause it to exit the thread.
% LIBMEMMGR_BUILDER_GET_ZONE_WRITER_ERROR Unable to get zone writer for zone '%1', data source '%2'. Skipping.
The MemorySegmentBuilder was unable to get a ZoneWriter for the
specified zone when handling the load command. This zone will be
skipped.
% LIBMEMMGR_BUILDER_ZONE_WRITER_LOAD_1_ERROR Error loading zone '%1', data source '%2': '%3'
The MemorySegmentBuilder failed to load the specified zone when handling
the load command. This zone will be skipped.
% LIBMEMMGR_BUILDER_ZONE_WRITER_LOAD_2_ERROR Error loading zone '%1', data source '%2': '%3'
An exception occured when the MemorySegmentBuilder tried to load the
specified zone when handling the load command. This zone will be
skipped.
# Copyright (C) 2013 Internet Systems Consortium, Inc. ("ISC")
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
'''Common definitions regarding logging for the memmgr package.'''
import isc.log
logger = isc.log.Logger("libmemmgr")
SUBDIRS = testdata
PYCOVERAGE_RUN = @PYCOVERAGE_RUN@
PYTESTS = builder_tests.py datasrc_info_tests.py
EXTRA_DIST = $(PYTESTS)
......@@ -26,7 +27,8 @@ endif
for pytest in $(PYTESTS) ; do \
echo Running test: $$pytest ; \
$(LIBRARY_PATH_PLACEHOLDER) \
TESTDATA_PATH=$(builddir) \
TESTDATA_PATH=$(abs_srcdir)/testdata \
TESTDATA_WRITE_PATH=$(builddir) \
B10_FROM_BUILD=$(abs_top_builddir) \
HAVE_SHARED_MEMORY=$(HAVE_SHARED_MEMORY) \
PYTHONPATH=$(COMMON_PYTHON_PATH):$(abs_top_builddir)/src/lib/dns/python/.libs \
......
......@@ -14,12 +14,28 @@
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import unittest
import os
import socket
import select
import threading
import isc.log
from isc.dns import *
import isc.datasrc
from isc.memmgr.builder import *
from isc.server_common.datasrc_clients_mgr import DataSrcClientsMgr
from isc.memmgr.datasrc_info import *
TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
# Defined for easier tests with DataSrcClientsMgr.reconfigure(), which
# only needs get_value() method
class MockConfigData:
def __init__(self, data):
self.__data = data
def get_value(self, identifier):
return self.__data[identifier], False
class TestMemorySegmentBuilder(unittest.TestCase):
def _create_builder_thread(self):
......@@ -29,7 +45,8 @@ class TestMemorySegmentBuilder(unittest.TestCase):
self._builder_command_queue = []
self._builder_response_queue = []
self._builder_cv = threading.Condition()
self._builder_lock = threading.Lock()
self._builder_cv = threading.Condition(lock=self._builder_lock)
self._builder = MemorySegmentBuilder(self._builder_sock,
self._builder_cv,
......@@ -39,6 +56,7 @@ class TestMemorySegmentBuilder(unittest.TestCase):
def setUp(self):
self._create_builder_thread()
self.__mapped_file_path = None
def tearDown(self):
# It's the tests' responsibility to stop and join the builder
......@@ -48,6 +66,10 @@ class TestMemorySegmentBuilder(unittest.TestCase):
self._master_sock.close()
self._builder_sock.close()
if self.__mapped_file_path is not None:
if os.path.exists(self.__mapped_file_path):
os.unlink(self.__mapped_file_path)
def test_bad_command(self):
"""Tests what happens when a bad command is passed to the
MemorySegmentBuilder.
......@@ -58,7 +80,7 @@ class TestMemorySegmentBuilder(unittest.TestCase):
# Now that the builder thread is running, send it a bad
# command. The thread should exit its main loop and be joinable.
with self._builder_cv:
self._builder_command_queue.append('bad_command')
self._builder_command_queue.append(('bad_command',))
self._builder_cv.notify_all()
# Wait 5 seconds to receive a notification on the socket from
......@@ -95,13 +117,110 @@ class TestMemorySegmentBuilder(unittest.TestCase):
self._builder_thread.start()
# Now that the builder thread is running, send it the shutdown
# Now that the builder thread is running, send it the "shutdown"
# command. The thread should exit its main loop and be joinable.
with self._builder_cv:
self._builder_command_queue.append('shutdown')
self._builder_command_queue.append(('shutdown',))
# Commands after 'shutdown' must be ignored.
self._builder_command_queue.append('bad_command_1')
self._builder_command_queue.append('bad_command_2')
self._builder_command_queue.append(('bad_command_1',))
self._builder_command_queue.append(('bad_command_2',))
self._builder_cv.notify_all()
# Wait 5 seconds at most for the main loop of the builder to
# exit.
self._builder_thread.join(5)
self.assertFalse(self._builder_thread.isAlive())
# The command queue must be cleared, and the response queue must
# be untouched (we don't use it in this test). The thread is no
# longer running, so we can use the queues without a lock.
self.assertEqual(len(self._builder_command_queue), 0)
self.assertEqual(len(self._builder_response_queue), 0)
@unittest.skipIf(os.environ['HAVE_SHARED_MEMORY'] != 'yes',
'shared memory is not available')
def test_load(self):
"""
Test "load" command.
"""
mapped_file_dir = os.environ['TESTDATA_WRITE_PATH']
mgr_config = {'mapped_file_dir': mapped_file_dir}
cfg_data = MockConfigData(
{"classes":
{"IN": [{"type": "MasterFiles",
"params": { "example.com": TESTDATA_PATH + "example.com.zone" },