Commit 25ac2455 authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

Merge branch 'work/sock/cache' into work/sock/cacheinter

Conflicts:
	src/lib/python/isc/bind10/socket_cache.py
parents 2a6b5e55 e7019de8
......@@ -17,14 +17,23 @@
Here's the cache for sockets from socket creator.
"""
import os
import random
import isc.bind10.sockcreator
from copy import copy
class SocketError(Exception):
"""
Exception raised when the socket creator is unable to create requested
socket. Possible reasons might be the address it should be bound to
is already taken, the permissions are insufficient, the address family
is not supported on this computer and many more.
The errno, if not None, is passed from the socket creator.
"""
pass
def __init__(self, message, errno):
Exception.__init__(self, message)
self.errno = errno
class ShareError(Exception):
"""
......@@ -33,6 +42,65 @@ class ShareError(Exception):
"""
pass
class Socket:
"""
This represents one socket cached by the cache program. This should never
be used directly by a user, it is used internally by the Cache. Therefore
many member variables are used directly instead of by a accessor method.
Be warned that this object implements the __del__ method. It closes the
socket held inside in it. But this poses various problems with garbage
collector. In short, do not make reference cycles with this and generally
leave this class alone to live peacefully.
"""
def __init__(self, protocol, address, port, fileno):
"""
Creates the socket.
The protocol, address and port are preserved for the information.
"""
self.protocol = protocol
self.address = address
self.port = port
self.fileno = fileno
# Mapping from token -> application
self.active_tokens = {}
# The tokens which were not yet picked up
self.waiting_tokens = set()
# Share modes and names by the tokens (token -> (mode, name))
self.shares = {}
def __del__(self):
"""
Closes the file descriptor.
"""
os.close(self.fileno)
def share_compatible(self, mode, name):
"""
Checks if the given share mode and name is compatible with the ones
already installed here.
The allowed values for mode are listed in the Cache.get_token
function.
"""
if mode not in ['NO', 'SAMEAPP', 'ANY']:
raise ValueError("Mode " + mode + " is invalid")
# Go through the existing ones
for (emode, ename) in self.shares.values():
if emode == 'NO' or mode == 'NO':
# One of them can't live together with anything
return False
if (emode == 'SAMEAPP' or mode == 'SAMEAPP') and \
ename != name:
# One of them can't live together with someone of different
# name
return False
# else both are ANY or SAMEAPP with the same name, which is OK
# No problem found, so we consider it OK
return True
class Cache:
"""
This is the cache for sockets from socket creator. The purpose of cache
......@@ -47,6 +115,9 @@ class Cache:
it is removed from cache and closed.
This is expected to be part of Boss, it is not a general utility class.
It is not expected to be subclassed. The methods and members are named
as protected so tests are easier access into them.
"""
def __init__(self, creator):
"""
......@@ -54,9 +125,25 @@ class Cache:
(isc.bind10.sockcreator.Creator) which will be used to create yet
uncached sockets.
"""
# Full implementation and tests are in #1427. This is just because
# of a boss test.
self._creator = creator
# The sockets we have live here, these dicts are various ways how
# to get them. Each of them contains the Socket objects somehow
# This one is dict of token: socket for the ones that were not yet
# picked up by an application.
self._waiting_tokens = {}
# This format is the same as above, but for the tokens that were
# already picked up by the application and not yet released.
self._active_tokens = {}
# This is a dict from applications to set of tokens used by the
# application, for the sockets already picked up by an application
self._active_apps = {}
# The sockets live here to be indexed by protocol, address and
# subsequently by port
self._sockets = {}
# These are just the tokens actually in use, so we don't generate
# dupes. If one is dropped, it can be potentially reclaimed.
self._live_tokens = set()
def get_token(self, protocol, address, port, share_mode, share_name):
"""
......@@ -93,7 +180,39 @@ class Cache:
Note that it isn't guaranteed the tokens would be unique and they
should be used as an opaque handle only.
"""
pass
addr_str = str(address)
try:
socket = self._sockets[protocol][addr_str][port]
except KeyError:
# Something in the dicts is not there, so socket is to be
# created
try:
fileno = self._creator.get_socket(address, port, protocol)
except isc.bind10.sockcreator.CreatorError as ce:
if ce.fatal:
raise
else:
raise SocketError(str(ce), ce.errno)
socket = Socket(protocol, address, port, fileno)
# And cache it
if protocol not in self._sockets:
self._sockets[protocol] = {}
if addr_str not in self._sockets[protocol]:
self._sockets[protocol][addr_str] = {}
self._sockets[protocol][addr_str][port] = socket
# Now we get the token, check it is compatible
if not socket.share_compatible(share_mode, share_name):
raise ShareError("Cached socket not compatible with mode " +
share_mode + " and name " + share_name)
# Grab yet unused token
token = 't' + str(random.randint(0, 2^32-1))
while token in self._live_tokens:
token = 't' + str(random.randint(0, 2^32-1))
self._waiting_tokens[token] = socket
self._live_tokens.add(token)
socket.shares[token] = (share_mode, share_name)
socket.waiting_tokens.add(token)
return token
def get_socket(self, token, application):
"""
......@@ -111,7 +230,19 @@ class Cache:
get_token, it was already used, the socket wasn't picked up soon
enough, ...), it raises ValueError.
"""
pass
try:
socket = self._waiting_tokens[token]
except KeyError:
raise ValueError("Token " + token +
" isn't waiting to be picked up")
del self._waiting_tokens[token]
self._active_tokens[token] = socket
if application not in self._active_apps:
self._active_apps[application] = set()
self._active_apps[application].add(token)
socket.waiting_tokens.remove(token)
socket.active_tokens[token] = application
return socket.fileno
def drop_socket(self, token):
"""
......@@ -122,15 +253,50 @@ class Cache:
It raises ValueError if the token doesn't exist.
"""
pass
try:
socket = self._active_tokens[token]
except KeyError:
raise ValueError("Token " + token + " doesn't represent an " +
"active socket")
# Now, remove everything from the bookkeeping
del socket.shares[token]
app = socket.active_tokens[token]
del socket.active_tokens[token]
del self._active_tokens[token]
self._active_apps[app].remove(token)
if len(self._active_apps[app]) == 0:
del self._active_apps[app]
self._live_tokens.remove(token)
# The socket is not used by anything now, so remove it
if len(socket.active_tokens) == 0 and len(socket.waiting_tokens) == 0:
addr = str(socket.address)
port = socket.port
proto = socket.protocol
del self._sockets[proto][addr][port]
# Clean up empty branches of the structure
if len(self._sockets[proto][addr]) == 0:
del self._sockets[proto][addr]
if len(self._sockets[proto]) == 0:
del self._sockets[proto]
def drop_application(self, application):
"""
This signals the application terminated and all socket it picked up
This signals the application terminated and all sockets it picked up
should be considered unused by it now. It effectively calls drop_socket
on each of the sockets the application picked up and didn't drop yet.
If the application is invalid (no get_socket was successful with this
value of application), it raises ValueError.
"""
pass
try:
# Get a copy. Who knows how iteration works through sets if we
# delete from it during the time, so we'll just have our own copy
# to iterate
to_drop = copy(self._active_apps[application])
except KeyError:
raise ValueError("Application " + str(application) +
" doesn't hold any sockets")
for token in to_drop:
self.drop_socket(token)
# We don't call del now. The last drop_socket should have
# removed the application key as well.
PYCOVERAGE_RUN = @PYCOVERAGE_RUN@
#PYTESTS = args_test.py bind10_test.py
# NOTE: this has a generated test found in the builddir
PYTESTS = sockcreator_test.py component_test.py
PYTESTS = sockcreator_test.py component_test.py socket_cache_test.py
EXTRA_DIST = $(PYTESTS)
......
......@@ -13,9 +13,6 @@
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# This test file is generated .py.in -> .py just to be in the build dir,
# same as the rest of the tests. Saves a lot of stuff in makefile.
"""
Tests for the bind10.sockcreator module.
"""
......
# Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import unittest
import isc.log
import isc.bind10.socket_cache
import isc.bind10.sockcreator
from isc.net.addr import IPAddr
import os
class Test(unittest.TestCase):
"""
Base for the tests here. It replaces the os.close method.
"""
def setUp(self):
self._closes = []
isc.bind10.socket_cache.os.close = self.__close
def tearDown(self):
# This is not very clean solution. But when the test stops
# to exist, the method must not be used to destroy the
# object any more. And we can't restore the os.close here
# as we never work with real sockets here.
isc.bind10.socket_cache.os.close = lambda fd: None
def __close(self, fd):
"""
Just log a close was called.
"""
self._closes.append(fd)
class SocketTest(Test):
"""
Test for the Socket class.
"""
def setUp(self):
"""
Creates the socket to be tested.
It also creates other useful test variables.
"""
Test.setUp(self)
self.__address = IPAddr("192.0.2.1")
self.__socket = isc.bind10.socket_cache.Socket('UDP', self.__address,
1024, 42)
def test_init(self):
"""
Checks the intrnals of the cache just after the creation.
"""
self.assertEqual('UDP', self.__socket.protocol)
self.assertEqual(self.__address, self.__socket.address)
self.assertEqual(1024, self.__socket.port)
self.assertEqual(42, self.__socket.fileno)
self.assertEqual({}, self.__socket.active_tokens)
self.assertEqual({}, self.__socket.shares)
self.assertEqual(set(), self.__socket.waiting_tokens)
def test_del(self):
"""
Check it closes the socket when removed.
"""
# This should make the refcount 0 and call the descructor
# right away
self.__socket = None
self.assertEqual([42], self._closes)
def test_share_modes(self):
"""
Test the share mode compatibility check function.
"""
modes = ['NO', 'SAMEAPP', 'ANY']
# If there are no shares, it is compatible with everything.
for mode in modes:
self.assertTrue(self.__socket.share_compatible(mode, 'anything'))
# There's an NO already, so it is incompatible with everything.
self.__socket.shares = {'token': ('NO', 'anything')}
for mode in modes:
self.assertFalse(self.__socket.share_compatible(mode, 'anything'))
# If there's SAMEAPP, it is compatible with ANY and SAMEAPP with the
# same name.
self.__socket.shares = {'token': ('SAMEAPP', 'app')}
self.assertFalse(self.__socket.share_compatible('NO', 'app'))
self.assertFalse(self.__socket.share_compatible('SAMEAPP',
'something'))
self.assertTrue(self.__socket.share_compatible('SAMEAPP', 'app'))
self.assertTrue(self.__socket.share_compatible('ANY', 'app'))
self.assertFalse(self.__socket.share_compatible('ANY', 'something'))
# If there's ANY, then ANY and SAMEAPP with the same name is compatible
self.__socket.shares = {'token': ('ANY', 'app')}
self.assertFalse(self.__socket.share_compatible('NO', 'app'))
self.assertFalse(self.__socket.share_compatible('SAMEAPP',
'something'))
self.assertTrue(self.__socket.share_compatible('SAMEAPP', 'app'))
self.assertTrue(self.__socket.share_compatible('ANY', 'something'))
# In case there are multiple already inside
self.__socket.shares = {
'token': ('ANY', 'app'),
'another': ('SAMEAPP', 'app')
}
self.assertFalse(self.__socket.share_compatible('NO', 'app'))
self.assertFalse(self.__socket.share_compatible('SAMEAPP',
'something'))
self.assertTrue(self.__socket.share_compatible('SAMEAPP', 'app'))
self.assertFalse(self.__socket.share_compatible('ANY', 'something'))
self.assertTrue(self.__socket.share_compatible('ANY', 'app'))
# Invalid inputs are rejected
self.assertRaises(ValueError, self.__socket.share_compatible, 'bad',
'bad')
class SocketCacheTest(Test):
"""
Some tests for the isc.bind10.socket_cache.Cache.
This class, as well as being the testcase, pretends to be the
socket creator so it can hijack all the requests for sockets.
"""
def setUp(self):
"""
Creates the cache for tests with us being the socket creator.
Also creates some more variables for testing.
"""
Test.setUp(self)
self.__cache = isc.bind10.socket_cache.Cache(self)
self.__address = IPAddr("192.0.2.1")
self.__socket = isc.bind10.socket_cache.Socket('UDP', self.__address,
1024, 42)
self.__get_socket_called = False
def test_init(self):
"""
Checks the internals of the cache just after the creation.
"""
self.assertEqual(self, self.__cache._creator)
self.assertEqual({}, self.__cache._waiting_tokens)
self.assertEqual({}, self.__cache._active_tokens)
self.assertEqual({}, self.__cache._active_apps)
self.assertEqual({}, self.__cache._sockets)
self.assertEqual(set(), self.__cache._live_tokens)
def get_socket(self, address, port, socktype):
"""
Pretend to be a socket creator.
This expects to be called with the _address, port 1024 and 'UDP'.
Returns 42 and notes down it was called.
"""
self.assertEqual(self.__address, address)
self.assertEqual(1024, port)
self.assertEqual('UDP', socktype)
self.__get_socket_called = True
return 42
def test_get_token_cached(self):
"""
Check the behaviour of get_token when the requested socket is already
cached inside.
"""
self.__cache._sockets = {
'UDP': {'192.0.2.1': {1024: self.__socket}}
}
token = self.__cache.get_token('UDP', self.__address, 1024, 'ANY',
'test')
# It didn't call get_socket
self.assertFalse(self.__get_socket_called)
# It returned something
self.assertIsNotNone(token)
# The token is both in the waiting sockets and the live tokens
self.assertEqual({token: self.__socket}, self.__cache._waiting_tokens)
self.assertEqual(set([token]), self.__cache._live_tokens)
# The token got the new share to block any relevant queries
self.assertEqual({token: ('ANY', 'test')}, self.__socket.shares)
# The socket knows the token is waiting in it
self.assertEqual(set([token]), self.__socket.waiting_tokens)
# If we request one more, with incompatible share, it is rejected
self.assertRaises(isc.bind10.socket_cache.ShareError,
self.__cache.get_token, 'UDP', self.__address, 1024,
'NO', 'test')
# The internals are not changed, so the same checks
self.assertEqual({token: self.__socket}, self.__cache._waiting_tokens)
self.assertEqual(set([token]), self.__cache._live_tokens)
self.assertEqual({token: ('ANY', 'test')}, self.__socket.shares)
self.assertEqual(set([token]), self.__socket.waiting_tokens)
def test_get_token_uncached(self):
"""
Check a new socket is created when a corresponding one is missing.
"""
token = self.__cache.get_token('UDP', self.__address, 1024, 'ANY',
'test')
# The get_socket was called
self.assertTrue(self.__get_socket_called)
# It returned something
self.assertIsNotNone(token)
# Get the socket and check it looks OK
socket = self.__cache._waiting_tokens[token]
self.assertEqual(self.__address, socket.address)
self.assertEqual(1024, socket.port)
self.assertEqual(42, socket.fileno)
self.assertEqual('UDP', socket.protocol)
# The socket is properly cached
self.assertEqual({
'UDP': {'192.0.2.1': {1024: socket}}
}, self.__cache._sockets)
# The token is both in the waiting sockets and the live tokens
self.assertEqual({token: socket}, self.__cache._waiting_tokens)
self.assertEqual(set([token]), self.__cache._live_tokens)
# The token got the new share to block any relevant queries
self.assertEqual({token: ('ANY', 'test')}, socket.shares)
# The socket knows the token is waiting in it
self.assertEqual(set([token]), socket.waiting_tokens)
def test_get_token_excs(self):
"""
Test that it is handled properly if the socket creator raises
some exceptions.
"""
def raiseCreatorError(fatal):
raise isc.bind10.sockcreator.CreatorError('test error', fatal)
# First, fatal socket creator errors are passed through
self.get_socket = lambda addr, port, proto: raiseCreatorError(True)
self.assertRaises(isc.bind10.sockcreator.CreatorError,
self.__cache.get_token, 'UDP', self.__address, 1024,
'NO', 'test')
# And nonfatal are converted to SocketError
self.get_socket = lambda addr, port, proto: raiseCreatorError(False)
self.assertRaises(isc.bind10.socket_cache.SocketError,
self.__cache.get_token, 'UDP', self.__address, 1024,
'NO', 'test')
def test_get_socket(self):
"""
Test that we can pickup a socket if we know a token.
"""
token = "token"
app = 13
# No socket prepared there
self.assertRaises(ValueError, self.__cache.get_socket, token, app)
# Not changed
self.assertEqual({}, self.__cache._active_tokens)
self.assertEqual({}, self.__cache._active_apps)
self.assertEqual({}, self.__cache._sockets)
self.assertEqual(set(), self.__cache._live_tokens)
# Prepare a token there
self.__socket.waiting_tokens = set([token])
self.__socket.shares = {token: ('ANY', 'app')}
self.__cache._waiting_tokens = {token: self.__socket}
self.__cache._sockets = {'UDP': {'192.0.2.1': {1024: self.__socket}}}
self.__cache._live_tokens = set([token])
socket = self.__cache.get_socket(token, app)
# Received the fileno
self.assertEqual(42, socket)
# It moved from waiting to active ones
self.assertEqual({}, self.__cache._waiting_tokens)
self.assertEqual({token: self.__socket}, self.__cache._active_tokens)
self.assertEqual({13: set([token])}, self.__cache._active_apps)
self.assertEqual(set([token]), self.__cache._live_tokens)
self.assertEqual(set(), self.__socket.waiting_tokens)
self.assertEqual({token: 13}, self.__socket.active_tokens)
# Trying to get it again fails
self.assertRaises(ValueError, self.__cache.get_socket, token, app)
def test_drop_application(self):
"""
Test that a drop_application calls drop_socket on all the sockets
held by the application.
"""
sockets = set()
def drop_socket(token):
sockets.add(token)
# Mock the drop_socket so we know it is called
self.__cache.drop_socket = drop_socket
self.assertRaises(ValueError, self.__cache.drop_application,
13)
self.assertEqual(set(), sockets)
# Put the tokens into active_apps. Nothing else should be touched
# by this call, so leave it alone.
self.__cache._active_apps = {
1: set(['t1', 't2']),
2: set(['t3'])
}
self.__cache.drop_application(1)
# We don't check the _active_apps, as it would be cleaned by
# drop_socket and we removed it.
self.assertEqual(set(['t1', 't2']), sockets)
def test_drop_socket(self):
"""
Test the drop_socket call. It tests:
* That a socket that still has something to keep it alive is left alive
(both waiting and active).
* If not, it is deleted.
* All bookkeeping data around are properly removed.
* Of course the exception.
"""
self.assertRaises(ValueError, self.__cache.drop_socket, "bad token")
self.__socket.active_tokens = {'t1': 1}
self.__socket.waiting_tokens = set(['t2'])
self.__socket.shares = {'t1': ('ANY', 'app1'), 't2': ('ANY', 'app2')}
self.__cache._waiting_tokens = {'t2': self.__socket}
self.__cache._active_tokens = {'t1': self.__socket}
self.__cache._sockets = {'UDP': {'192.0.2.1': {1024: self.__socket}}}
self.__cache._live_tokens = set(['t1', 't2'])
self.__cache._active_apps = {1: set(['t1'])}
# We can't drop what wasn't picket up yet
self.assertRaises(ValueError, self.__cache.drop_socket, 't2')
self.assertEqual({'t1': 1}, self.__socket.active_tokens)
self.assertEqual(set(['t2']), self.__socket.waiting_tokens)
self.assertEqual({'t1': ('ANY', 'app1'), 't2': ('ANY', 'app2')},
self.__socket.shares)
self.assertEqual({'t2': self.__socket}, self.__cache._waiting_tokens)
<