cql_connection.cc 6.57 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright (C) 2015 - 2016 Deutsche Telekom AG.
//
// Author: Razvan Becheriu <razvan.becheriu@qualitance.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//           http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

Tomek Mrugalski's avatar
Tomek Mrugalski committed
17
#include <dhcpsrv/cql_connection.h>
18 19 20 21 22 23 24
#include <string>

using namespace std;

namespace isc {
namespace dhcp {

Tomek Mrugalski's avatar
Tomek Mrugalski committed
25 26 27
CqlConnection::CqlConnection(const ParameterMap& parameters) :
        DatabaseConnection(parameters), cluster_(NULL), session_(NULL),
        tagged_statements_(NULL) {
28 29
}

Tomek Mrugalski's avatar
Tomek Mrugalski committed
30
CqlConnection::~CqlConnection() {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
31 32
    // Free up the prepared statements, ignoring errors.
    // Session and connection resources are deallocated.
33 34 35
    CassError rc = CASS_OK;
    std::string error;

Tomek Mrugalski's avatar
Tomek Mrugalski committed
36
    for (int i = 0; i < statements_.size(); i++) {
37 38 39 40 41
        if (statements_[i]) {
            cass_prepared_free(statements_[i]);
        }
        statements_[i] = NULL;
    }
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59

    if (session_) {
        CassFuture* close_future = cass_session_close(session_);
        cass_future_wait(close_future);
        checkStatementError(error, close_future, "could not close connection to DB");
        rc = cass_future_error_code(close_future);
        cass_future_free(close_future);
        cass_session_free(session_);
        session_ = NULL;
    }

    if (cluster_) {
        cass_cluster_free(cluster_);
        cluster_ = NULL;
    }

    // We're closing the connection anyway. Let's not throw at this
    // stage
60 61 62 63 64 65
    if (rc != CASS_OK) {
        isc_throw(DbOpenError, error);
    }
}

void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
66
CqlConnection::openDatabase() {
67 68 69 70 71
    CassError rc;
    // Set up the values of the parameters
    const char* contact_points = "127.0.0.1";
    string scontact_points;
    try {
72
        scontact_points = getParameter("contact-points");
73 74
        contact_points = scontact_points.c_str();
    } catch (...) {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
75
        // No host. Fine, we'll use "localhost".
76 77 78 79 80 81 82 83
    }

    const char* port = NULL;
    string sport;
    try {
        sport = getParameter("port");
        port = sport.c_str();
    } catch (...) {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
84
        // No port. Fine, we'll use "default".
85 86 87 88 89 90 91 92
    }

    const char* user = NULL;
    string suser;
    try {
        suser = getParameter("user");
        user = suser.c_str();
    } catch (...) {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
93
        // No user. Fine, we'll use NULL.
94 95 96 97 98 99 100 101
    }

    const char* password = NULL;
    string spassword;
    try {
        spassword = getParameter("password");
        password = spassword.c_str();
    } catch (...) {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
102
        // No password. Fine, we'll use NULL.
103 104 105 106 107 108 109 110
    }

    const char* keyspace = "keatest";
    string skeyspace;
    try {
        skeyspace = getParameter("keyspace");
        keyspace = skeyspace.c_str();
    } catch (...) {
111
        // No keyspace name. Fine, we'll use default "keatest".
112 113 114 115 116 117 118 119 120 121
    }

    cluster_ = cass_cluster_new();
    cass_cluster_set_contact_points(cluster_, contact_points);

    if (user != NULL && password != NULL) {
        cass_cluster_set_credentials(cluster_, user, password);
    }

    if (port != NULL) {
122
        int port_number;
123
        try {
124
            port_number = boost::lexical_cast<int>(port);
125 126 127 128
        } catch (const std::exception& ex) {
            isc_throw(DbOperationError, "Invalid int data: " << port
                      << " : " << ex.what());
        }
129
        cass_cluster_set_port(cluster_, port_number);
130 131 132 133
    }

    session_ = cass_session_new();

134 135
    CassFuture* connect_future = cass_session_connect_keyspace(session_,
        cluster_, keyspace);
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
    cass_future_wait(connect_future);
    std::string error;
    checkStatementError(error, connect_future, "could not connect to DB");
    rc = cass_future_error_code(connect_future);
    cass_future_free(connect_future);
    if (rc != CASS_OK) {
        cass_session_free(session_);
        session_ = NULL;
        cass_cluster_free(cluster_);
        cluster_ = NULL;
        isc_throw(DbOpenError, error);
    }
}

void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
151
CqlConnection::prepareStatements(CqlTaggedStatement *statements) {
152 153 154 155 156 157 158 159
    CassError rc = CASS_OK;
    uint32_t size = 0;
    tagged_statements_ = statements;
    for (; tagged_statements_[size].params_; size++);
    statements_.resize(size);
    for (uint32_t i = 0; i < size; i++) {
        const char* query = tagged_statements_[i].text_;

160
        CassFuture* future = cass_session_prepare(session_, query);
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
        cass_future_wait(future);
        std::string error;
        checkStatementError(error, future, i, "could not prepare statement");
        rc = cass_future_error_code(future);
        if (rc != CASS_OK) {
            cass_future_free(future);
            statements_[i] = NULL;
            isc_throw(DbOperationError, error);
        } else {
            statements_[i] = cass_future_get_prepared(future);
        }

        cass_future_free(future);
    }
}

void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
178 179
CqlConnection::commit() {
    LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL, DHCPSRV_CQL_COMMIT);
180 181 182
}

void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
183 184
CqlConnection::rollback() {
    LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL, DHCPSRV_CQL_ROLLBACK);
185 186 187
}

void
188
CqlConnection::checkStatementError(std::string& error, CassFuture* future,
189
                                   uint32_t stindex, const char* what) const {
190 191 192 193 194 195 196 197 198 199 200
    CassError rc;
    const char* errorMessage;
    size_t errorMessageSize;
    std::stringstream stream;
    stream << "no error for: " << tagged_statements_[stindex].name_;

    rc = cass_future_error_code(future);
    cass_future_error_message(future, &errorMessage, &errorMessageSize);

    if (rc != CASS_OK) {
        stream.str(std::string());
201 202
        stream << what << " for: " << tagged_statements_[stindex].name_
               << " reason: " << errorMessage << " error code: " << rc;
203 204 205 206 207
    }
    error = stream.str();
}

void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
208
CqlConnection::checkStatementError(std::string& error, CassFuture* future,
209
                                   const char* what) const {
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
    CassError rc;
    const char* errorMessage;
    size_t errorMessageSize;
    std::stringstream stream;
    stream << "no error";

    rc = cass_future_error_code(future);
    cass_future_error_message(future, &errorMessage, &errorMessageSize);

    if (rc != CASS_OK) {
        stream.str(std::string());
        stream << what << " reason: " << errorMessage << " error code: " << rc;
    }
    error = stream.str();
}

}; // end of isc::dhcp namespace
}; // end of isc namespace