Commit 381ad791 authored by Andrei Pavel's avatar Andrei Pavel

Merge branch 'cassandra-host-data-source' into cassandra-host-data-source-stress-test

parents d552aa37 765c0d79
......@@ -82,7 +82,7 @@ cql_lease_version_test() {
# Verify that kea-admin lease-version returns the correct version.
version=$($keaadmin lease-version cql -u $db_user -p $db_password -n $db_name)
assert_str_eq "2.0" $version "Expected kea-admin to return %s, returned value was %s"
assert_str_eq "1.0" $version "Expected kea-admin to return %s, returned value was %s"
# Wipe the database.
cql_execute_script $db_scripts_dir/cql/dhcpdb_drop.cql
......
-- Copyright (C) 2015 - 2016 Deutsche Telekom AG.
-- Copyright (C) 2015-2017 Deutsche Telekom AG.
-- Author: Razvan Becheriu <razvan.becheriu@qualitance.com>
......@@ -46,7 +46,7 @@
-- -----------------------------------------------------
-- Table `lease4`
-- -----------------------------------------------------
CREATE TABLE lease4 (
CREATE TABLE IF NOT EXISTS lease4 (
address int,
hwaddr blob,
client_id blob,
......@@ -57,15 +57,15 @@ CREATE TABLE lease4 (
fqdn_rev boolean,
hostname varchar,
state int,
PRIMARY KEY (address)
PRIMARY KEY ((address))
);
-- Create search indexes for lease4 table
CREATE INDEX lease4index1 ON lease4 (client_id);
CREATE INDEX lease4index2 ON lease4 (subnet_id);
CREATE INDEX lease4index3 ON lease4 (hwaddr);
CREATE INDEX lease4index4 ON lease4 (expire);
CREATE INDEX lease4index5 ON lease4 (state);
CREATE INDEX IF NOT EXISTS lease4index1 ON lease4 (client_id);
CREATE INDEX IF NOT EXISTS lease4index2 ON lease4 (subnet_id);
CREATE INDEX IF NOT EXISTS lease4index3 ON lease4 (hwaddr);
CREATE INDEX IF NOT EXISTS lease4index4 ON lease4 (expire);
CREATE INDEX IF NOT EXISTS lease4index5 ON lease4 (state);
-- Holds the IPv6 leases.
-- N.B. The use of a VARCHAR for the address is temporary for development:
......@@ -73,15 +73,15 @@ CREATE INDEX lease4index5 ON lease4 (state);
-- -----------------------------------------------------
-- Table `lease6`
-- -----------------------------------------------------
CREATE TABLE lease6 (
CREATE TABLE IF NOT EXISTS lease6 (
address varchar,
duid blob,
valid_lifetime bigint,
expire bigint,
subnet_id int,
pref_lifetime bigint,
lease_type int,
duid blob,
iaid int,
lease_type int,
prefix_len int,
fqdn_fwd boolean,
fqdn_rev boolean,
......@@ -90,16 +90,16 @@ CREATE TABLE lease6 (
hwtype int,
hwaddr_source int,
state int,
PRIMARY KEY (address)
PRIMARY KEY ((address))
);
-- Create search indexes for lease6 table
CREATE INDEX lease6index1 ON lease6 (lease_type);
CREATE INDEX lease6index2 ON lease6 (duid);
CREATE INDEX lease6index3 ON lease6 (iaid);
CREATE INDEX lease6index4 ON lease6 (subnet_id);
CREATE INDEX lease6index5 ON lease6 (expire);
CREATE INDEX lease6index6 ON lease6 (state);
CREATE INDEX IF NOT EXISTS lease6index1 ON lease6 (duid);
CREATE INDEX IF NOT EXISTS lease6index2 ON lease6 (iaid);
CREATE INDEX IF NOT EXISTS lease6index3 ON lease6 (lease_type);
CREATE INDEX IF NOT EXISTS lease6index4 ON lease6 (subnet_id);
CREATE INDEX IF NOT EXISTS lease6index5 ON lease6 (expire);
CREATE INDEX IF NOT EXISTS lease6index6 ON lease6 (state);
-- ... and a definition of lease6 types. This table is a convenience for
-- users of the database - if they want to view the lease table and use the
......@@ -109,10 +109,10 @@ CREATE INDEX lease6index6 ON lease6 (state);
-- -----------------------------------------------------
-- Table `lease6_types`
-- -----------------------------------------------------
CREATE TABLE lease6_types (
CREATE TABLE IF NOT EXISTS lease6_types (
lease_type int, -- Lease type code.
name varchar, -- Name of the lease type
PRIMARY KEY (lease_type)
PRIMARY KEY ((lease_type))
);
INSERT INTO lease6_types (lease_type, name) VALUES (0, 'IA_NA'); -- Non-temporary v6 addresses
INSERT INTO lease6_types (lease_type, name) VALUES (1, 'IA_TA'); -- Temporary v6 addresses
......@@ -127,10 +127,10 @@ INSERT INTO lease6_types (lease_type, name) VALUES (2, 'IA_PD'); -- Prefix del
-- -----------------------------------------------------
-- Table `lease_hwaddr_source`
-- -----------------------------------------------------
CREATE TABLE lease_hwaddr_source (
CREATE TABLE IF NOT EXISTS lease_hwaddr_source (
hwaddr_source int,
name varchar,
PRIMARY KEY (hwaddr_source)
PRIMARY KEY ((hwaddr_source))
);
-- Hardware address obtained from raw sockets
......@@ -160,10 +160,10 @@ INSERT INTO lease_hwaddr_source (hwaddr_source, name) VALUES (64, 'HWADDR_SOURCE
-- -----------------------------------------------------
-- Table `lease_state`
-- -----------------------------------------------------
CREATE TABLE lease_state (
CREATE TABLE IF NOT EXISTS lease_state (
state int,
name varchar,
PRIMARY KEY (state)
PRIMARY KEY ((state))
);
-- Insert currently defined state names.
......@@ -180,12 +180,13 @@ INSERT INTO lease_state (state, name) VALUES (2, 'expired-reclaimed');
-- -----------------------------------------------------
-- Table `schema_version`
-- -----------------------------------------------------
CREATE TABLE schema_version (
CREATE TABLE IF NOT EXISTS schema_version (
version int,
minor int,
PRIMARY KEY (version)
PRIMARY KEY ((version))
);
INSERT INTO schema_version (version, minor) VALUES (1, 0);
-- This line concludes database initalization to version 1.0.
-- This line concludes database initialization to version 1.0.
// Copyright (C) 2015 - 2016 Deutsche Telekom AG.
// Copyright (C) 2015-2017 Deutsche Telekom AG.
//
// Author: Razvan Becheriu <razvan.becheriu@qualitance.com>
// Authors: Razvan Becheriu <razvan.becheriu@qualitance.com>
// Andrei Pavel <andrei.pavel@qualitance.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
......@@ -18,18 +19,18 @@
#include <dhcpsrv/cql_connection.h>
#include <dhcpsrv/cql_exchange.h>
#include <dhcpsrv/db_exceptions.h>
#include <dhcpsrv/dhcpsrv_log.h>
#include <boost/shared_ptr.hpp>
using namespace std;
#include <memory> // for std::unique_ptr
namespace isc {
namespace dhcp {
CqlConnection::CqlConnection(const ParameterMap& parameters)
: DatabaseConnection(parameters), cluster_(NULL), session_(NULL),
force_consistency_(true), consistency_(CASS_CONSISTENCY_QUORUM),
tagged_statements_(NULL) {
: DatabaseConnection(parameters), statements_(), cluster_(NULL),
session_(NULL), consistency_(CASS_CONSISTENCY_QUORUM), schema_meta_(NULL),
keyspace_meta_(NULL), force_consistency_(true) {
}
CqlConnection::~CqlConnection() {
......@@ -38,18 +39,21 @@ CqlConnection::~CqlConnection() {
CassError rc = CASS_OK;
std::string error;
for (int i = 0; i < statements_.size(); i++) {
if (statements_[i]) {
cass_prepared_free(statements_[i]);
for (StatementMapEntry s : statements_) {
// typeid(s.second.first) is CassPrepared*
CqlTaggedStatement statement = s.second;
if (statement.prepared_statement_) {
cass_prepared_free(statement.prepared_statement_);
}
statements_[i] = NULL;
}
if (session_) {
cass_schema_meta_free(schema_meta_);
CassFuture* close_future = cass_session_close(session_);
cass_future_wait(close_future);
checkStatementError(error, close_future, "could not close connection to"
" DB");
error = checkFutureError(
"CqlConnection::~CqlConnection(): cass_sesssion_close() != CASS_OK",
close_future);
rc = cass_future_error_code(close_future);
cass_future_free(close_future);
cass_session_free(session_);
......@@ -167,98 +171,106 @@ CqlConnection::openDatabase() {
}
if (port) {
int port_number;
int32_t port_number;
try {
port_number = boost::lexical_cast<int>(port);
port_number = boost::lexical_cast<int32_t>(port);
if (port_number < 1 || port_number > 65535) {
isc_throw(
DbOperationError,
"Port outside of range, expected 1-65535, instead got "
<< port);
isc_throw(DbOperationError,
"CqlConnection::openDatabase(): "
"port outside of range, expected "
"1-65535, instead got "
<< port);
}
} catch (const boost::bad_lexical_cast& ex) {
isc_throw(DbOperationError,
"Invalid port, castable to int expected, instead got \""
<< port << "\", " << ex.what());
"CqlConnection::openDatabase(): invalid "
"port, expected castable to int, instead got "
"\"" << port
<< "\", " << ex.what());
}
cass_cluster_set_port(cluster_, port_number);
}
if (reconnect_wait_time) {
int reconnect_wait_time_number;
int32_t reconnect_wait_time_number;
try {
reconnect_wait_time_number =
boost::lexical_cast<int>(reconnect_wait_time);
boost::lexical_cast<int32_t>(reconnect_wait_time);
if (reconnect_wait_time_number < 0) {
isc_throw(DbOperationError,
"Invalid reconnect wait time, positive number "
"expected, instead got "
"CqlConnection::openDatabase(): invalid reconnect "
"wait time, expected positive number, instead got "
<< reconnect_wait_time);
}
} catch (const boost::bad_lexical_cast& ex) {
isc_throw(DbOperationError, "Invalid reconnect wait time, castable "
"to int expected, instead got \""
<< reconnect_wait_time << "\", "
<< ex.what());
isc_throw(DbOperationError,
"CqlConnection::openDatabase(): "
"invalid reconnect wait time, expected "
"castable to int, instead got \""
<< reconnect_wait_time << "\", " << ex.what());
}
cass_cluster_set_reconnect_wait_time(cluster_,
reconnect_wait_time_number);
}
if (connect_timeout) {
int connect_timeout_number;
int32_t connect_timeout_number;
try {
connect_timeout_number = boost::lexical_cast<int>(connect_timeout);
connect_timeout_number =
boost::lexical_cast<int32_t>(connect_timeout);
if (connect_timeout_number < 0) {
isc_throw(DbOperationError,
"Invalid connect timeout, positive number expected, "
"instead got "
"CqlConnection::openDatabase(): "
"invalid connect timeout, expected "
"positive number, instead got "
<< connect_timeout);
}
} catch (const boost::bad_lexical_cast& ex) {
isc_throw(DbOperationError, "Invalid connect timeout, castable to "
"int expected, instead got \""
<< connect_timeout << "\", "
<< ex.what());
isc_throw(DbOperationError,
"CqlConnection::openDatabase(): invalid connect timeout, "
"expected castable to int, instead got \""
<< connect_timeout << "\", " << ex.what());
}
cass_cluster_set_connect_timeout(cluster_, connect_timeout_number);
}
if (request_timeout) {
int request_timeout_number;
int32_t request_timeout_number;
try {
request_timeout_number = boost::lexical_cast<int>(request_timeout);
request_timeout_number =
boost::lexical_cast<int32_t>(request_timeout);
if (request_timeout_number < 0) {
isc_throw(DbOperationError,
"Invalid request timeout, positive number expected, "
"instead got "
"CqlConnection::openDatabase(): "
"invalid request timeout, expected "
"positive number, instead got "
<< request_timeout);
}
} catch (const boost::bad_lexical_cast& ex) {
isc_throw(DbOperationError, "Invalid request timeout, castable to "
"int expected, instead got \""
<< request_timeout << "\", "
<< ex.what());
isc_throw(DbOperationError,
"CqlConnection::openDatabase(): invalid request timeout, "
"expected castable to int, instead got \""
<< request_timeout << "\", " << ex.what());
}
cass_cluster_set_request_timeout(cluster_, request_timeout_number);
}
if (tcp_keepalive) {
int tcp_keepalive_number;
int32_t tcp_keepalive_number;
try {
tcp_keepalive_number = boost::lexical_cast<int>(tcp_keepalive);
tcp_keepalive_number = boost::lexical_cast<int32_t>(tcp_keepalive);
if (tcp_keepalive_number < 0) {
isc_throw(DbOperationError,
"Invalid TCP keepalive, positive number expected, "
"instead got "
"CqlConnection::openDatabase(): "
"invalid TCP keepalive, expected "
"positive number, instead got "
<< tcp_keepalive);
}
} catch (const boost::bad_lexical_cast& ex) {
isc_throw(
DbOperationError,
"Invalid TCP keepalive, castable to int expected, instead got "
"\"" << tcp_keepalive
<< "\", " << ex.what());
isc_throw(DbOperationError,
"CqlConnection::openDatabase(): invalid TCP keepalive, "
"expected castable to int, instead got \""
<< tcp_keepalive << "\", " << ex.what());
}
cass_cluster_set_tcp_keepalive(cluster_, cass_true,
tcp_keepalive_number);
......@@ -273,8 +285,10 @@ CqlConnection::openDatabase() {
CassFuture* connect_future =
cass_session_connect_keyspace(session_, cluster_, keyspace);
cass_future_wait(connect_future);
std::string error;
checkStatementError(error, connect_future, "could not connect to DB");
const std::string error =
checkFutureError("CqlConnection::openDatabase(): "
"cass_session_connect_keyspace() != CASS_OK",
connect_future);
rc = cass_future_error_code(connect_future);
cass_future_free(connect_future);
if (rc != CASS_OK) {
......@@ -284,32 +298,43 @@ CqlConnection::openDatabase() {
cluster_ = NULL;
isc_throw(DbOpenError, error);
}
// Get keyspace meta.
schema_meta_ = cass_session_get_schema_meta(session_);
keyspace_meta_ = cass_schema_meta_keyspace_by_name(schema_meta_, keyspace);
if (!keyspace_meta_) {
isc_throw(DbOpenError, "CqlConnection::openDatabase(): "
"!cass_schema_meta_keyspace_by_name()");
}
}
void
CqlConnection::prepareStatements(CqlTaggedStatement* statements) {
CqlConnection::prepareStatements(StatementMap& statements) {
CassError rc = CASS_OK;
uint32_t size = 0;
tagged_statements_ = statements;
for (; tagged_statements_[size].params_; size++) {
}
statements_.resize(size);
for (uint32_t i = 0; i < size; i++) {
const char* query = tagged_statements_[i].text_;
for (StatementMapEntry it : statements) {
CqlTaggedStatement& tagged_statement = it.second;
if (statements_.find(tagged_statement.name_) != statements_.end()) {
isc_throw(DbOperationError,
"CqlConnection::prepareStatements(): "
"duplicate statement with name "
<< tagged_statement.name_);
}
CassFuture* future = cass_session_prepare(session_, query);
CassFuture* future =
cass_session_prepare(session_, tagged_statement.text_);
cass_future_wait(future);
std::string error;
checkStatementError(error, future, i, "could not prepare statement");
const std::string error =
checkFutureError("CqlConnection::prepareStatements():"
" cass_session_prepare() != CASS_OK",
future, tagged_statement.name_);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
cass_future_free(future);
statements_[i] = NULL;
isc_throw(DbOperationError, error);
} else {
statements_[i] = cass_future_get_prepared(future);
}
tagged_statement.prepared_statement_ = cass_future_get_prepared(future);
statements_.insert(it);
cass_future_free(future);
}
}
......@@ -323,80 +348,47 @@ CqlConnection::setConsistency(bool force, CassConsistency consistency) {
void
CqlConnection::startTransaction() {
LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL,
DHCPSRV_CQL_BEGIN_TRANSACTION);
// No-op
DHCPSRV_CQL_CONNECTION_BEGIN_TRANSACTION);
}
void
CqlConnection::commit() {
LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL, DHCPSRV_CQL_COMMIT);
LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL,
DHCPSRV_CQL_CONNECTION_COMMIT);
}
void
CqlConnection::rollback() {
LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL, DHCPSRV_CQL_ROLLBACK);
LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL,
DHCPSRV_CQL_CONNECTION_ROLLBACK);
}
void
CqlConnection::checkStatementError(std::string& error,
CassFuture* future,
uint32_t stindex,
const char* what) const {
CassError rc;
const char* errorMessage;
size_t errorMessageSize;
std::stringstream stream;
stream << "no error for statement " << tagged_statements_[stindex].name_;
rc = cass_future_error_code(future);
cass_future_error_message(future, &errorMessage, &errorMessageSize);
if (rc != CASS_OK) {
stream.str(std::string());
stream << what << " for statement " << tagged_statements_[stindex].name_
<< ". Future error: " << errorMessage
<< ". Error description: " << cass_error_desc(rc);
}
error = stream.str();
}
const std::string
CqlConnection::checkFutureError(const std::string& what,
CassFuture* future,
StatementTag statement_tag /* = NULL */) {
CassError cass_error = cass_future_error_code(future);
const char* error_message;
size_t error_message_size;
cass_future_error_message(future, &error_message, &error_message_size);
void
CqlConnection::checkStatementError(std::string& error,
CassFuture* future,
const char* what) const {
CassError rc;
const char* errorMessage;
size_t errorMessageSize;
std::stringstream stream;
stream << "no error";
rc = cass_future_error_code(future);
cass_future_error_message(future, &errorMessage, &errorMessageSize);
if (rc != CASS_OK) {
stream.str(std::string());
stream << what << ". Future error: " << errorMessage
<< ". Error description: " << cass_error_desc(rc);
if (statement_tag && std::strlen(statement_tag) > 0) {
// future is from cass_session_execute() call.
stream << "Statement ";
stream << statement_tag;
} else {
// future is from cass_session_*() call.
stream << "Session action ";
}
error = stream.str();
}
CqlTransaction::CqlTransaction(CqlConnection& conn)
: conn_(conn), committed_(false) {
conn_.startTransaction();
}
CqlTransaction::~CqlTransaction() {
// Rollback if commit() wasn't explicitly called.
if (!committed_) {
conn_.rollback();
if (cass_error == CASS_OK) {
stream << " executed succesfully.";
} else {
stream << " failed, Kea error: " << what
<< ", Cassandra error code: " << cass_error_desc(cass_error)
<< ", Cassandra future error: " << error_message;
}
}
void
CqlTransaction::commit() {
conn_.commit();
committed_ = true;
return stream.str();
}
} // namespace dhcp
......
// Copyright (C) 2015 - 2016 Deutsche Telekom AG.
// Copyright (C) 2015-2017 Deutsche Telekom AG.
//
// Author: Razvan Becheriu <razvan.becheriu@qualitance.com>
// Authors: Razvan Becheriu <razvan.becheriu@qualitance.com>
// Andrei Pavel <andrei.pavel@qualitance.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
......@@ -18,46 +19,101 @@
#define CQL_CONNECTION_H
#include <dhcpsrv/database_connection.h>
#include <dhcpsrv/dhcpsrv_log.h>
#include <cassandra.h>
#include <boost/scoped_ptr.hpp>
#include <inttypes.h>
#include <cstring>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
namespace isc {
namespace dhcp {
/// @brief Defines a single statement
/// @brief Pair containing major and minor versions
typedef std::pair<uint32_t, uint32_t> VersionPair;
/// @brief Statement index representing the statement name
typedef char const* const StatementTag;
/// @brief Define CQL backend version: 2.3
/// @{
constexpr uint32_t CQL_DRIVER_VERSION_MAJOR = CASS_VERSION_MAJOR;
constexpr uint32_t CQL_DRIVER_VERSION_MINOR = CASS_VERSION_MINOR;
/// @}
/// Define CQL schema version: 2.0
/// @{
constexpr uint32_t CQL_SCHEMA_VERSION_MAJOR = 2u;
constexpr uint32_t CQL_SCHEMA_VERSION_MINOR = 0u;
/// @}
/// @brief Defines a single statement or query
///
/// @param params_ parameter names
/// @param name_ short description of the query
/// @param text_ text representation of the actual query
/// @param prepared_statement_ internal Cassandra object representing the
/// prepared statement
/// @param is_raw_statement_ shows if statement should be executed rawly or with
/// binds
struct CqlTaggedStatement {
const char** params_;
const char* name_;
const char* text_;
StatementTag name_;
char const* const text_;
const CassPrepared* prepared_statement_;
bool is_raw_statement_;
/// @brief Constructor
CqlTaggedStatement(StatementTag name, char const* const text)
: name_(name), text_(text), prepared_statement_(NULL),
is_raw_statement_(false) {
}
/// @brief Constructor
CqlTaggedStatement(StatementTag name,
char const* const text,
bool const& is_raw_statement)
: name_(name), text_(text), prepared_statement_(NULL),
is_raw_statement_(is_raw_statement) {
}
};
/// @brief Hash function for StatementMap keys
///
/// Delegates to std::hash<std::string>.
struct StatementTagHash {
size_t operator()(StatementTag const& key) const {
return std::hash<std::string>{}(std::string(key));
}
};
/// @brief Equality function for StatementMap keys
struct StatementTagEqual {
bool operator()(StatementTag const& lhs, StatementTag const& rhs) const {
return std::strcmp(lhs, rhs) == 0;
}
};
// Define CQL backend version: 2.3
const uint32_t CQL_DRIVER_VERSION_MAJOR = CASS_VERSION_MAJOR;
const uint32_t CQL_DRIVER_VERSION_MINOR = CASS_VERSION_MINOR;
/// @brief Contains all statements.
typedef std::unordered_map<StatementTag,
CqlTaggedStatement,
StatementTagHash,
StatementTagEqual>
StatementMap;