......@@ -368,7 +368,7 @@ public:
/// \return The number of zones known to this datasource
virtual unsigned int getZoneCount() const;
/// \brief Create a zone in the database
/// \brief Create a zone in the data source
/// Creates a new (empty) zone in the data source backend, which
/// can subsequently be filled with data (through getUpdater()).
......@@ -88,6 +88,35 @@ Return Value(s): A tuple containing a result value and a ZoneFinder object or\n\
const char* const DataSourceClient_createZone_doc = "\
create_zone(name) -> bool\n\
Create a zone in the data source.\n\
Creates a new (empty) zone in the data source backend, which can\n\
subsequently be filled with data (through get_updater()).\n\
Note: This is a tentative API, and this method is likely to change or\n\
be removed in the near future. For that reason, it currently provides\n\
a default implementation that throws NotImplemented.\n\
Apart from the two exceptions mentioned below, in theory this call can\n\
throw anything, depending on the implementation of the datasource\n\
NotImplemented If the datasource backend does not support direct\n\
zone creation.\n\
DataSourceError If something goes wrong in the data source while\n\
creating the zone.\n\
name The (fully qualified) name of the zone to create\n\
Return Value(s): True if the zone was added, False if it already\n\
const char* const DataSourceClient_getIterator_doc = "\
get_iterator(name, separate_rrs=False) -> ZoneIterator\n\
......@@ -91,6 +91,37 @@ DataSourceClient_findZone(PyObject* po_self, PyObject* args) {
DataSourceClient_createZone(PyObject* po_self, PyObject* args) {
s_DataSourceClient* const self = static_cast<s_DataSourceClient*>(po_self);
PyObject* name;
if (PyArg_ParseTuple(args, "O!", &name_type, &name)) {
try {
if (self->client->createZone(PyName_ToName(name))) {
} else {
} catch (const DataSourceError& dse) {
PyErr_SetString(getDataSourceException("Error"), dse.what());
return (NULL);
} catch (const isc::NotImplemented& ni) {
return (NULL);
} catch (const std::exception& exc) {
PyErr_SetString(getDataSourceException("Error"), exc.what());
return (NULL);
} catch (...) {
"Unexpected exception");
return (NULL);
} else {
return (NULL);
DataSourceClient_getIterator(PyObject* po_self, PyObject* args) {
s_DataSourceClient* const self = static_cast<s_DataSourceClient*>(po_self);
......@@ -230,6 +261,8 @@ DataSourceClient_getJournalReader(PyObject* po_self, PyObject* args) {
PyMethodDef DataSourceClient_methods[] = {
{ "find_zone", DataSourceClient_findZone, METH_VARARGS,
DataSourceClient_findZone_doc },
{ "create_zone", DataSourceClient_createZone, METH_VARARGS,
DataSourceClient_createZone_doc },
{ "get_iterator",
DataSourceClient_getIterator, METH_VARARGS,
DataSourceClient_getIterator_doc },
......@@ -200,7 +200,7 @@ class DataSrcClient(unittest.TestCase):
# For RRSIGS, we can't add the fake data through the API, so we
# simply pass no rdata at all (which is skipped by the check later)
# Since we passed separate_rrs = True to get_iterator, we get several
# sets of RRSIGs, one for each TTL
add_rrset(expected_rrset_list, name, rrclass,
......@@ -634,6 +634,53 @@ class DataSrcUpdater(unittest.TestCase):
self.assertEqual(None, iterator.get_soa())
self.assertEqual(None, iterator.get_next_rrset())
def test_create_zone_args(self):
dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
self.assertRaises(TypeError, dsc.create_zone)
self.assertRaises(TypeError, dsc.create_zone, 1)
self.assertRaises(TypeError, dsc.create_zone, None)
self.assertRaises(TypeError, dsc.create_zone, "foo.")
self.assertRaises(TypeError, dsc.create_zone,
isc.dns.Name(""), 1)
def test_create_zone(self):
dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
# Note, using here, which should not exist
zone_name = isc.dns.Name("")
self.assertIsNone(dsc.get_updater(zone_name, True))
self.assertRaises(isc.datasrc.Error, dsc.get_iterator, zone_name)
# should exist now, we should be able to get an updater
# and currently it should be empty
self.assertIsNotNone(dsc.get_updater(zone_name, True))
iterator = dsc.get_iterator(zone_name)
self.assertEqual(None, iterator.get_soa())
self.assertEqual(None, iterator.get_next_rrset())
# Trying to create it again should return False
def test_create_zone_locked(self):
zone_name = isc.dns.Name("")
dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
updater = dsc.get_updater(isc.dns.Name(""), True)
# Should fail since db is locked
self.assertRaises(isc.datasrc.Error, dsc.create_zone, zone_name)
# Cancel updater, then create should succeed
updater = None
def test_create_zone_not_implemented(self):
mem_cfg = '{ "type": "memory", "class": "IN", "zones": [] }';
dsc = isc.datasrc.DataSourceClient("memory", mem_cfg)
self.assertRaises(isc.datasrc.NotImplemented, dsc.create_zone,
class JournalWrite(unittest.TestCase):
def setUp(self):
# Make a fresh copy of the writable database with all original content
