Commit 50fd7903 authored by Andrei Pavel's avatar Andrei Pavel

Cassandra update

Replaced unrecommended backticks with $() in cql_version() in bash scripts.

*_execute() and *_execute_script() functions from src/bin/admin/admin-utils.sh now pass the parameters to the underlying backend binary whenever they are given rather than when there are 2 or more.

Corrected cql_version() return error in src/bin/admin/admin-utils.sh.

Removed redundant "USE" from cql_init() in src/bin/admin/kea-admin.in.

Inserted a newline in src/bin/admin/tests/Makefile.am to separate unrelated targets.

Style changes in cql_*_test() functions in src/bin/admin/tests/cql_tests.sh.in.

src/bin/admin/tests/dhcpdb_create_1.0.cql:
    "perfromance" typo
    Added comment headers
    Added index on expire since it is used in WHERE clauses (further performance testing may be required)
    Removed dhcp4_options and dhcp6_options table since they are not required for Cassandra

Added DROP INDEX in src/share/database/scripts/cql/dhcpdb_drop.cql.

Added sql_common.h
Added cql_exchange.h and cql_exchange.cc which mediate communication with Cassandra.
Added cql_lease_mgr.h and cql_lease_mgr.cc

Parameterized reconnect-wait-time, connect-timeout, request-timeout, tcp-keepalive, tcp-nodelay for Cassandra in kea.conf. Changes are in src/lib/dhcpsrv/cql_connection.cc and src/lib/dhcpsrv/parsers/dbaccess_parser.cc.

Reformated x != NULL into !x as specified in the Kea style guidelines

src/lib/dhcpsrv/cql_connection.cc:
    Added range check for port
    Added CqlConnection:setConsistency
    Added CqlConnection::startTransaction  which is a noop
    Added CqlTransaction method implementations.
    Corrected ending brace of namespace declaration, it doesn't need semicolon.

src/lib/dhcpsrv/cql_connection.h:
    Added explicit on CqlConnection constructor. Unlikely that this class will ever be derived, but it's good practice.
    Changed some comments.
    Added CqlTransaction class definition.

src/lib/dhcpsrv/cql_lease_mgr.cc:
    Formatted the entire code.
    Changed data types to cass_ types.

Added some log messages.

Moved structs, enums and typedefs from src/lib/dhcpsrv/lease_mgr.h to src/lib/dhcpsrv/sql_common.h

Added some missing tests in src/lib/dhcpsrv/tests/cql_lease_mgr_unittest.cc
parent 039622a4
......@@ -18,7 +18,7 @@
mysql_execute() {
QUERY=$1
shift
if [ $# -gt 1 ]; then
if [ $# -ge 1 ]; then
mysql -N -B $* -e "${QUERY}"
retcode=$?
else
......@@ -47,7 +47,7 @@ mysql_version() {
pgsql_execute() {
QUERY=$1
shift
if [ $# -gt 0 ]; then
if [ $# -ge 1 ]; then
echo $QUERY | psql --set ON_ERROR_STOP=1 -A -t -h localhost -q $*
retcode=$?
else
......@@ -71,7 +71,7 @@ pgsql_execute() {
pgsql_execute_script() {
file=$1
shift
if [ $# -gt 0 ]; then
if [ $# -ge 1 ]; then
psql --set ON_ERROR_STOP=1 -A -t -h localhost -q -f $file $*
retcode=$?
else
......@@ -90,8 +90,8 @@ pgsql_version() {
cql_execute() {
query=$1
shift
if [ $# -gt 1 ]; then
cqlsh $* -e "$query"
if [ $# -ge 1 ]; then
cqlsh "$@" -e "$query"
retcode=$?
else
cqlsh -u $db_user -p $db_password -k $db_name -e "$query"
......@@ -109,8 +109,8 @@ cql_execute() {
cql_execute_script() {
file=$1
shift
if [ $# -gt 1 ]; then
cqlsh $* -e "$file"
if [ $# -ge 1 ]; then
cqlsh "$@" -f "$file"
retcode=$?
else
cqlsh -u $db_user -p $db_password -k $db_name -f "$file"
......@@ -126,8 +126,9 @@ cql_execute_script() {
}
cql_version() {
version=`cql_execute "SELECT version, minor FROM schema_version" "$@"`
version=`echo "$version" | grep -A 1 "+" | grep -v "+" | tr -d ' ' | cut -d "|" -f 1-2 --output-delimiter="."`
echo $version
return $?
version=$(cql_execute "SELECT version, minor FROM schema_version" "$@")
error=$?
version=$(echo "$version" | grep -A 1 "+" | grep -v "+" | tr -d ' ' | cut -d "|" -f 1-2 --output-delimiter=".")
echo "$version"
return $error
}
......@@ -203,8 +203,8 @@ pgsql_init() {
cql_init() {
printf "Checking if there is a database initialized already... Please ignore errors.\n"
result=`cql_execute "USE $db_name; DESCRIBE tables;"`
if [ "$result"="<empty>" ]; then
result=$(cql_execute "DESCRIBE tables;")
if [ $(echo "$result" | grep "<empty>" | wc -w) -gt 0 ]; then
printf "Creating and initializing tables using script %s...\n" $scripts_dir/cql/dhcpdb_create.cql
cql_execute_script $scripts_dir/cql/dhcpdb_create.cql
else
......@@ -212,8 +212,8 @@ cql_init() {
exit 2
fi
version=`cql_version`
printf "Lease DB version reported after initialization: $version\n"
version=$(cql_version)
printf "Lease DB version reported after initialization: %s\n" "$version"
exit 0
}
......
......@@ -13,6 +13,7 @@ endif
if HAVE_CQL
SHTESTS += cql_tests.sh
endif
noinst_SCRIPTS = $(SHTESTS)
EXTRA_DIST = dhcpdb_create_1.0.mysql
......
......@@ -44,8 +44,7 @@ cql_lease_init_test() {
assert_eq 0 $? "lease4 table check failed, expected exit code: %d, actual: %d"
# Check lease6 table
cql_execute "SELECT address, duid, valid_lifetime, expire, subnet_id, pref_lifetime, lease_type, iaid, prefix_len, fqdn_fwd, fqdn_rev, hostname,\
state FROM lease6;"
cql_execute "SELECT address, duid, valid_lifetime, expire, subnet_id, pref_lifetime, lease_type, iaid, prefix_len, fqdn_fwd, fqdn_rev, hostname, state FROM lease6;"
assert_eq 0 $? "lease6 table check failed, expected exit code: %d, actual: %d"
# Check lease6_types table
......@@ -83,7 +82,7 @@ cql_lease_version_test() {
# Verfiy that kea-admin lease-version returns the correct version.
version=$($keaadmin lease-version cql -u $db_user -p $db_password -n $db_name)
assert_str_eq "1.0" $version "Expected kea-admin to return %s, returned value was %s"
assert_str_eq "2.0" $version "Expected kea-admin to return %s, returned value was %s"
# Wipe the database.
cql_execute_script $db_scripts_dir/cql/dhcpdb_drop.cql
......@@ -151,20 +150,20 @@ cql_lease4_dump_test() {
# 1430694930 corresponds to 2015-04-04 01:15:30
# 1433464245 corresponds to 2015-05-05 02:30:45
# 1436173267 corresponds to 2015-06-06 11:01:07
insert_sql="\
insert into lease4(address, hwaddr, client_id, valid_lifetime, expire, subnet_id,\
insert_cql="\
INSERT INTO lease4(address, hwaddr, client_id, valid_lifetime, expire, subnet_id,\
fqdn_fwd, fqdn_rev, hostname, state)\
values(-1073741302,textAsBlob('20'),textAsBlob('30'),40,1430694930,50,true,true,\
VALUES(-1073741302,textAsBlob('20'),textAsBlob('30'),40,1430694930,50,true,true,\
'one.example.com', 0);\
insert into lease4(address, hwaddr, client_id, valid_lifetime, expire, subnet_id,\
INSERT INTO lease4(address, hwaddr, client_id, valid_lifetime, expire, subnet_id,\
fqdn_fwd, fqdn_rev, hostname, state)\
values(-1073741301,NULL,textAsBlob('123'),40,1433464245,50,true,true,'', 1);\
insert into lease4(address, hwaddr, client_id, valid_lifetime, expire, subnet_id,\
VALUES(-1073741301,NULL,textAsBlob('123'),40,1433464245,50,true,true,'', 1);\
INSERT INTO lease4(address, hwaddr, client_id, valid_lifetime, expire, subnet_id,\
fqdn_fwd, fqdn_rev, hostname, state)\
values(-1073741300,textAsBlob('22'),NULL,40,1436173267,50,true,true,\
VALUES(-1073741300,textAsBlob('22'),NULL,40,1436173267,50,true,true,\
'three.example.com', 2);"
cql_execute "$insert_sql"
cql_execute "$insert_cql"
assert_eq 0 $? "insert into lease4 failed, expected exit code %d, actual %d"
# Dump lease4 to output_file.
......@@ -220,24 +219,24 @@ cql_lease6_dump_test() {
# 1430694930 corresponds to 2015-04-04 01:15:30
# 1433464245 corresponds to 2015-05-05 02:30:45
# 1436173267 corresponds to 2015-06-06 11:01:07
insert_sql="\
insert into lease6(address, duid, valid_lifetime, expire, subnet_id,\
insert_cql="\
INSERT INTO lease6(address, duid, valid_lifetime, expire, subnet_id,\
pref_lifetime, lease_type, iaid, prefix_len, fqdn_fwd, fqdn_rev, hostname,\
hwaddr, hwtype, hwaddr_source, state)\
values('2001:db8::10',textAsBlob('20'),30,1430694930,40,50,1,60,70,true,true,\
VALUES('2001:db8::10',textAsBlob('20'),30,1430694930,40,50,1,60,70,true,true,\
'one.example.com',textAsBlob('80'),90,16,0);\
insert into lease6(address, duid, valid_lifetime, expire, subnet_id,\
INSERT INTO lease6(address, duid, valid_lifetime, expire, subnet_id,\
pref_lifetime, lease_type, iaid, prefix_len, fqdn_fwd, fqdn_rev, hostname,\
hwaddr, hwtype, hwaddr_source, state)\
values('2001:db8::11',NULL,30,1433464245,40,50,1,60,70,true,true,\
VALUES('2001:db8::11',NULL,30,1433464245,40,50,1,60,70,true,true,\
'',textAsBlob('80'),90,1,1);\
insert into lease6(address, duid, valid_lifetime, expire, subnet_id,\
INSERT INTO lease6(address, duid, valid_lifetime, expire, subnet_id,\
pref_lifetime, lease_type, iaid, prefix_len, fqdn_fwd, fqdn_rev, hostname,\
hwaddr, hwtype, hwaddr_source, state)\
values('2001:db8::12',textAsBlob('21'),30,1436173267,40,50,1,60,70,true,true,\
VALUES('2001:db8::12',textAsBlob('21'),30,1436173267,40,50,1,60,70,true,true,\
'three.example.com',textAsBlob('80'),90,4,2);"
cql_execute "$insert_sql"
cql_execute "$insert_cql"
assert_eq 0 $? "insert into lease6 failed, expected exit code %d, actual %d"
# Dump lease4 to output_file.
......
......@@ -38,11 +38,14 @@
-- is initialized to 1.0, then upgraded to 2.0 etc. This may be somewhat
-- sub-optimal, but it ensues consistency with upgrade scripts. (It is much
-- easier to maintain init and upgrade scripts if they look the same).
-- Since initialization is done only once, it's perfromance is not an issue.
-- Since initialization is done only once, it's performance is not an issue.
-- This line starts database initialization to 1.0.
-- Holds the IPv4 leases.
-- -----------------------------------------------------
-- Table `lease4`
-- -----------------------------------------------------
CREATE TABLE lease4 (
address int,
hwaddr blob,
......@@ -61,11 +64,15 @@ CREATE TABLE lease4 (
CREATE INDEX lease4index1 ON lease4 (client_id);
CREATE INDEX lease4index2 ON lease4 (subnet_id);
CREATE INDEX lease4index3 ON lease4 (hwaddr);
CREATE INDEX lease4index4 ON lease4 (state);
CREATE INDEX lease4index4 ON lease4 (expire);
CREATE INDEX lease4index5 ON lease4 (state);
-- Holds the IPv6 leases.
-- N.B. The use of a VARCHAR for the address is temporary for development:
-- it will eventually be replaced by BINARY(16).
-- -----------------------------------------------------
-- Table `lease6`
-- -----------------------------------------------------
CREATE TABLE lease6 (
address varchar,
duid blob,
......@@ -91,13 +98,17 @@ CREATE INDEX lease6index1 ON lease6 (lease_type);
CREATE INDEX lease6index2 ON lease6 (duid);
CREATE INDEX lease6index3 ON lease6 (iaid);
CREATE INDEX lease6index4 ON lease6 (subnet_id);
CREATE INDEX lease6index5 ON lease6 (state);
CREATE INDEX lease6index5 ON lease6 (expire);
CREATE INDEX lease6index6 ON lease6 (state);
-- ... and a definition of lease6 types. This table is a convenience for
-- users of the database - if they want to view the lease table and use the
-- type names, they can join this table with the lease6 table.
-- Make sure those values match Lease6::LeaseType enum (see src/bin/dhcpsrv/
-- lease_mgr.h)
-- -----------------------------------------------------
-- Table `lease6_types`
-- -----------------------------------------------------
CREATE TABLE lease6_types (
lease_type int, -- Lease type code.
name varchar, -- Name of the lease type
......@@ -107,13 +118,15 @@ INSERT INTO lease6_types (lease_type, name) VALUES (0, 'IA_NA'); -- Non-tempor
INSERT INTO lease6_types (lease_type, name) VALUES (1, 'IA_TA'); -- Temporary v6 addresses
INSERT INTO lease6_types (lease_type, name) VALUES (2, 'IA_PD'); -- Prefix delegations
-- Kea keeps track of the hardware/MAC address source, i.e. how the address
-- was obtained. Depending on the technique and your network topology, it may
-- be more or less trustworthy. This table is a convenience for
-- users of the database - if they want to view the lease table and use the
-- type names, they can join this table with the lease6 table. For details,
-- see constants defined in src/lib/dhcp/dhcp/pkt.h for detailed explanation.
-- -----------------------------------------------------
-- Table `lease_hwaddr_source`
-- -----------------------------------------------------
CREATE TABLE lease_hwaddr_source (
hwaddr_source int,
name varchar,
......@@ -141,47 +154,12 @@ INSERT INTO lease_hwaddr_source (hwaddr_source, name) VALUES (32, 'HWADDR_SOURCE
-- Hardware address extracted from docsis options
INSERT INTO lease_hwaddr_source (hwaddr_source, name) VALUES (64, 'HWADDR_SOURCE_DOCSIS_CMTS');
-- -----------------------------------------------------
-- Table `dhcp4_options`
-- -----------------------------------------------------
CREATE TABLE dhcp4_options (
option_id int,
code int,
value blob,
formatted_value varchar,
space varchar,
persistent int,
dhcp_client_class varchar,
dhcp4_subnet_id int,
host_id int,
PRIMARY KEY (option_id)
);
-- Create search indexes for dhcp4_options table
CREATE INDEX dhcp4_optionsindex1 ON dhcp4_options (host_id);
-- -----------------------------------------------------
-- Table `dhcp6_options`
-- -----------------------------------------------------
CREATE TABLE dhcp6_options (
option_id int,
code int,
value blob,
formatted_value varchar,
space varchar,
persistent int,
dhcp_client_class varchar,
dhcp6_subnet_id int,
host_id int,
PRIMARY KEY (option_id)
);
-- Create search indexes for dhcp6_options table
CREATE INDEX dhcp6_optionsindex1 ON dhcp6_options (host_id);
-- Create table holding mapping of the lease states to their names.
-- This is not used in queries from the DHCP server but rather in
-- direct queries from the lease database management tools.
-- -----------------------------------------------------
-- Table `lease_state`
-- -----------------------------------------------------
CREATE TABLE lease_state (
state int,
name varchar,
......@@ -197,11 +175,18 @@ INSERT INTO lease_state (state, name) VALUES (2, 'expired-reclaimed');
-- This table is only modified during schema upgrades. For historical reasons
-- (related to the names of the columns in the BIND 10 DNS database file), the
-- first column is called "version" and not "major".
-- Note: This MUST be kept in step with src/share/database/scripts/cassandra/dhcpdb_create.cql,
-- which defines the schema for the unit tests.
-- Note: This MUST be kept synchronized with
-- src/share/database/scripts/cql/dhcpdb_create.cql which defines the schema for
-- the unit tests.
-- -----------------------------------------------------
-- Table `schema_version`
-- -----------------------------------------------------
CREATE TABLE schema_version (
version int,
minor int,
PRIMARY KEY (version)
);
INSERT INTO schema_version (version, minor) VALUES (1, 0);
-- This line concludes database initalization to version 1.0.
......@@ -124,6 +124,7 @@ libkea_dhcpsrv_la_SOURCES += logging.cc logging.h
libkea_dhcpsrv_la_SOURCES += logging_info.cc logging_info.h
libkea_dhcpsrv_la_SOURCES += memfile_lease_mgr.cc memfile_lease_mgr.h
libkea_dhcpsrv_la_SOURCES += memfile_lease_storage.h
libkea_dhcpsrv_la_SOURCES += sql_common.h
if HAVE_MYSQL
libkea_dhcpsrv_la_SOURCES += mysql_lease_mgr.cc mysql_lease_mgr.h
......@@ -139,10 +140,13 @@ libkea_dhcpsrv_la_SOURCES += pgsql_exchange.cc pgsql_exchange.h
libkea_dhcpsrv_la_SOURCES += pgsql_host_data_source.cc pgsql_host_data_source.h
libkea_dhcpsrv_la_SOURCES += pgsql_lease_mgr.cc pgsql_lease_mgr.h
endif
if HAVE_CQL
libkea_dhcpsrv_la_SOURCES += cql_lease_mgr.cc cql_lease_mgr.h
libkea_dhcpsrv_la_SOURCES += cql_connection.cc cql_connection.h
libkea_dhcpsrv_la_SOURCES += cql_exchange.cc cql_exchange.h
libkea_dhcpsrv_la_SOURCES += cql_lease_mgr.cc cql_lease_mgr.h
endif
libkea_dhcpsrv_la_SOURCES += pool.cc pool.h
libkea_dhcpsrv_la_SOURCES += srv_config.cc srv_config.h
libkea_dhcpsrv_la_SOURCES += subnet.cc subnet.h
......@@ -236,5 +240,3 @@ libkea_dhcpsrv_include_HEADERS = \
install-data-local:
$(mkinstalldirs) $(DESTDIR)$(dhcp_data_dir)
......@@ -14,22 +14,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <config.h>
#include <dhcpsrv/cql_connection.h>
#include <string>
#include <dhcpsrv/cql_exchange.h>
#include <boost/shared_ptr.hpp>
using namespace std;
namespace isc {
namespace dhcp {
CqlConnection::CqlConnection(const ParameterMap& parameters) :
DatabaseConnection(parameters), cluster_(NULL), session_(NULL),
tagged_statements_(NULL) {
CqlConnection::CqlConnection(const ParameterMap& parameters)
: DatabaseConnection(parameters), cluster_(NULL), session_(NULL),
force_consistency_(true), consistency_(CASS_CONSISTENCY_QUORUM),
tagged_statements_(NULL) {
}
CqlConnection::~CqlConnection() {
// Free up the prepared statements, ignoring errors.
// Session and connection resources are deallocated.
// Free up the prepared statements, ignoring errors. Session and connection
// resources are deallocated.
CassError rc = CASS_OK;
std::string error;
......@@ -43,7 +48,8 @@ CqlConnection::~CqlConnection() {
if (session_) {
CassFuture* close_future = cass_session_close(session_);
cass_future_wait(close_future);
checkStatementError(error, close_future, "could not close connection to DB");
checkStatementError(error, close_future, "could not close connection to"
" DB");
rc = cass_future_error_code(close_future);
cass_future_free(close_future);
cass_session_free(session_);
......@@ -55,10 +61,9 @@ CqlConnection::~CqlConnection() {
cluster_ = NULL;
}
// We're closing the connection anyway. Let's not throw at this
// stage
if (rc != CASS_OK) {
isc_throw(DbOpenError, error);
// We're closing the connection anyway. Let's not throw at this stage.
LOG_ERROR(dhcpsrv_logger, DHCPSRV_CQL_DEALLOC_ERROR).arg(error);
}
}
......@@ -67,25 +72,25 @@ CqlConnection::openDatabase() {
CassError rc;
// Set up the values of the parameters
const char* contact_points = "127.0.0.1";
string scontact_points;
std::string scontact_points;
try {
scontact_points = getParameter("contact_points");
contact_points = scontact_points.c_str();
} catch (...) {
// No host. Fine, we'll use "localhost".
// No host. Fine, we'll use "127.0.0.1".
}
const char* port = NULL;
string sport;
std::string sport;
try {
sport = getParameter("port");
port = sport.c_str();
} catch (...) {
// No port. Fine, we'll use "default".
// No port. Fine, we'll use the default "9042".
}
const char* user = NULL;
string suser;
std::string suser;
try {
suser = getParameter("user");
user = suser.c_str();
......@@ -94,7 +99,7 @@ CqlConnection::openDatabase() {
}
const char* password = NULL;
string spassword;
std::string spassword;
try {
spassword = getParameter("password");
password = spassword.c_str();
......@@ -103,36 +108,170 @@ CqlConnection::openDatabase() {
}
const char* keyspace = "keatest";
string skeyspace;
std::string skeyspace;
try {
skeyspace = getParameter("keyspace");
keyspace = skeyspace.c_str();
} catch (...) {
// No keyspace name. Fine, we'll use default "keatest".
// No keyspace name. Fine, we'll use "keatest".
}
const char* reconnect_wait_time = NULL;
std::string sreconnect_wait_time;
try {
sreconnect_wait_time = getParameter("reconnect-wait-time");
reconnect_wait_time = sreconnect_wait_time.c_str();
} catch (...) {
// No reconnect wait time. Fine, we'll use the default "2000".
}
const char* connect_timeout = NULL;
std::string sconnect_timeout;
try {
sconnect_timeout = getParameter("connect-timeout");
connect_timeout = sconnect_timeout.c_str();
} catch (...) {
// No connect timeout. Fine, we'll use the default "5000".
}
const char* request_timeout = NULL;
std::string srequest_timeout;
try {
srequest_timeout = getParameter("request-timeout");
request_timeout = srequest_timeout.c_str();
} catch (...) {
// No request timeout. Fine, we'll use the default "12000".
}
const char* tcp_keepalive = NULL;
std::string stcp_keepalive;
try {
stcp_keepalive = getParameter("tcp-keepalive");
tcp_keepalive = stcp_keepalive.c_str();
} catch (...) {
// No tcp-keepalive. Fine, we'll not use TCP keepalive.
}
std::string stcp_nodelay;
try {
stcp_nodelay = getParameter("tcp-nodelay");
} catch (...) {
// No tcp-nodelay. Fine, we'll use the default false.
}
cluster_ = cass_cluster_new();
cass_cluster_set_contact_points(cluster_, contact_points);
if (user != NULL && password != NULL) {
if (user && password) {
cass_cluster_set_credentials(cluster_, user, password);
}
if (port != NULL) {
if (port) {
int port_number;
try {
port_number = boost::lexical_cast<int>(port);
} catch (const std::exception& ex) {
isc_throw(DbOperationError, "Invalid int data: " << port
<< " : " << ex.what());
if (port_number < 1 || port_number > 65535) {
isc_throw(
DbOperationError,
"Port outside of range, expected 1-65535, instead got "
<< port);
}
} catch (const boost::bad_lexical_cast& ex) {
isc_throw(DbOperationError,
"Invalid port, castable to int expected, instead got \""
<< port << "\", " << ex.what());
}
cass_cluster_set_port(cluster_, port_number);
}
if (reconnect_wait_time) {
int reconnect_wait_time_number;
try {
reconnect_wait_time_number =
boost::lexical_cast<int>(reconnect_wait_time);
if (reconnect_wait_time_number < 0) {
isc_throw(DbOperationError,
"Invalid reconnect wait time, positive number "
"expected, instead got "
<< reconnect_wait_time);
}
} catch (const boost::bad_lexical_cast& ex) {
isc_throw(DbOperationError, "Invalid reconnect wait time, castable "
"to int expected, instead got \""
<< reconnect_wait_time << "\", "
<< ex.what());
}
cass_cluster_set_reconnect_wait_time(cluster_,
reconnect_wait_time_number);
}
if (connect_timeout) {
int connect_timeout_number;
try {
connect_timeout_number = boost::lexical_cast<int>(connect_timeout);
if (connect_timeout_number < 0) {
isc_throw(DbOperationError,
"Invalid connect timeout, positive number expected, "
"instead got "
<< connect_timeout);
}
} catch (const boost::bad_lexical_cast& ex) {
isc_throw(DbOperationError, "Invalid connect timeout, castable to "
"int expected, instead got \""
<< connect_timeout << "\", "
<< ex.what());
}
cass_cluster_set_connect_timeout(cluster_, connect_timeout_number);
}
if (request_timeout) {
int request_timeout_number;
try {
request_timeout_number = boost::lexical_cast<int>(request_timeout);
if (request_timeout_number < 0) {
isc_throw(DbOperationError,
"Invalid request timeout, positive number expected, "
"instead got "
<< request_timeout);
}
} catch (const boost::bad_lexical_cast& ex) {
isc_throw(DbOperationError, "Invalid request timeout, castable to "
"int expected, instead got \""
<< request_timeout << "\", "
<< ex.what());
}
cass_cluster_set_request_timeout(cluster_, request_timeout_number);
}
if (tcp_keepalive) {
int tcp_keepalive_number;
try {
tcp_keepalive_number = boost::lexical_cast<int>(tcp_keepalive);
if (tcp_keepalive_number < 0) {
isc_throw(DbOperationError,
"Invalid TCP keepalive, positive number expected, "
"instead got "
<< tcp_keepalive);
}
} catch (const boost::bad_lexical_cast& ex) {
isc_throw(
DbOperationError,
"Invalid TCP keepalive, castable to int expected, instead got "
"\"" << tcp_keepalive
<< "\", " << ex.what());
}
cass_cluster_set_tcp_keepalive(cluster_, cass_true,
tcp_keepalive_number);
}
if (stcp_nodelay == "true") {
cass_cluster_set_tcp_nodelay(cluster_, cass_true);
}
session_ = cass_session_new();
CassFuture* connect_future = cass_session_connect_keyspace(session_,
cluster_, keyspace);
CassFuture* connect_future =
cass_session_connect_keyspace(session_, cluster_, keyspace);
cass_future_wait(connect_future);
std::string error;
checkStatementError(error, connect_future, "could not connect to DB");
......@@ -148,11 +287,12 @@ CqlConnection::openDatabase() {
}
void
CqlConnection::prepareStatements(CqlTaggedStatement *statements) {
CqlConnection::prepareStatements(CqlTaggedStatement* statements) {
CassError rc = CASS_OK;
uint32_t size = 0;
tagged_statements_ = statements;
for (; tagged_statements_[size].params_; size++);
for (; tagged_statements_[size].params_; size++) {
}
statements_.resize(size);
for (uint32_t i = 0; i < size; i++) {
const char* query = tagged_statements_[i].text_;
......@@ -174,6 +314,19 @@ CqlConnection::prepareStatements(CqlTaggedStatement *statements) {
}
}
void
CqlConnection::setConsistency(bool force, CassConsistency consistency) {
force_consistency_ = force;
consistency_ = consistency;
}
void
CqlConnection::startTransaction() {
LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL,
DHCPSRV_CQL_BEGIN_TRANSACTION);
// No-op
}
void
CqlConnection::commit() {
LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL, DHCPSRV_CQL_COMMIT);
......@@ -185,27 +338,31 @@ CqlConnection::rollback() {
}
void
CqlConnection::checkStatementError(std::string& error, CassFuture* future,
uint32_t stindex, const char* what) const {
CqlConnection::checkStatementError(std::string& error,
CassFuture* future,
uint32_t stindex,
const char* what) const {
CassError rc;
const char* errorMessage;
size_t errorMessageSize;
std::stringstream stream;
stream << "no error for: " << tagged_statements_[stindex].name_;
stream << "no error for statement " << tagged_statements_[stindex].name_;
rc = cass_future_error_code(future);
cass_future_error_message(future, &errorMessage, &errorMessageSize);
if (rc != CASS_OK) {
stream.str(std::string());
stream << what << " for: " << tagged_statements_[stindex].name_
<< " reason: " << errorMessage << " error code: " << rc;
stream << what << " for statement " << tagged_statements_[stindex].name_
<< ". Future error: " << errorMessage
<< ". Error description: " << cass_error_desc(rc);
}
error = stream.str();
}
void
CqlConnection::checkStatementError(std::string& error, CassFuture* future,
CqlConnection::checkStatementError(std::string& error,
CassFuture* future,
const char* what) const {
CassError rc;
const char* errorMessage;
......@@ -218,10 +375,29 @@ CqlConnection::checkStatementError(std::string& error, CassFuture* future,
if (rc != CASS_OK) {
stream.str(std::string());
stream << what << " reason: " << errorMessage << " error code: " << rc;
stream << what << ". Future error: " << errorMessage
<< ". Error description: " << cass_error_desc(rc);
}
error = stream.str();
}
}; // end of isc::dhcp namespace
}; // end of isc namespace
CqlTransaction::CqlTransaction(CqlConnection& conn)
: conn_(conn), committed_(false) {
conn_.startTransaction();
}
CqlTransaction::~CqlTransaction() {
// Rollback if commit() wasn't explicitly called.
if (!committed_) {
conn_.rollback();
}
}
void
CqlTransaction::commit() {
conn_.commit();
committed_ = true;
}
} // namespace dhcp
} // namespace isc