cql_connection.cc 6.57 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright (C) 2015 - 2016 Deutsche Telekom AG.
//
// Author: Razvan Becheriu <razvan.becheriu@qualitance.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//           http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

Tomek Mrugalski's avatar
Tomek Mrugalski committed
17
#include <dhcpsrv/cql_connection.h>
18
19
20
21
22
23
24
#include <string>

using namespace std;

namespace isc {
namespace dhcp {

Tomek Mrugalski's avatar
Tomek Mrugalski committed
25
26
27
CqlConnection::CqlConnection(const ParameterMap& parameters) :
        DatabaseConnection(parameters), cluster_(NULL), session_(NULL),
        tagged_statements_(NULL) {
28
29
}

Tomek Mrugalski's avatar
Tomek Mrugalski committed
30
CqlConnection::~CqlConnection() {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
31
32
    // Free up the prepared statements, ignoring errors.
    // Session and connection resources are deallocated.
33
34
35
    CassError rc = CASS_OK;
    std::string error;

Tomek Mrugalski's avatar
Tomek Mrugalski committed
36
    for (int i = 0; i < statements_.size(); i++) {
37
38
39
40
41
        if (statements_[i]) {
            cass_prepared_free(statements_[i]);
        }
        statements_[i] = NULL;
    }
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

    if (session_) {
        CassFuture* close_future = cass_session_close(session_);
        cass_future_wait(close_future);
        checkStatementError(error, close_future, "could not close connection to DB");
        rc = cass_future_error_code(close_future);
        cass_future_free(close_future);
        cass_session_free(session_);
        session_ = NULL;
    }

    if (cluster_) {
        cass_cluster_free(cluster_);
        cluster_ = NULL;
    }

    // We're closing the connection anyway. Let's not throw at this
    // stage
60
61
62
63
64
65
    if (rc != CASS_OK) {
        isc_throw(DbOpenError, error);
    }
}

void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
66
CqlConnection::openDatabase() {
67
68
69
70
71
72
73
74
    CassError rc;
    // Set up the values of the parameters
    const char* contact_points = "127.0.0.1";
    string scontact_points;
    try {
        scontact_points = getParameter("contact_points");
        contact_points = scontact_points.c_str();
    } catch (...) {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
75
        // No host. Fine, we'll use "localhost".
76
77
78
79
80
81
82
83
    }

    const char* port = NULL;
    string sport;
    try {
        sport = getParameter("port");
        port = sport.c_str();
    } catch (...) {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
84
        // No port. Fine, we'll use "default".
85
86
87
88
89
90
91
92
    }

    const char* user = NULL;
    string suser;
    try {
        suser = getParameter("user");
        user = suser.c_str();
    } catch (...) {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
93
        // No user. Fine, we'll use NULL.
94
95
96
97
98
99
100
101
    }

    const char* password = NULL;
    string spassword;
    try {
        spassword = getParameter("password");
        password = spassword.c_str();
    } catch (...) {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
102
        // No password. Fine, we'll use NULL.
103
104
105
106
107
108
109
110
    }

    const char* keyspace = "keatest";
    string skeyspace;
    try {
        skeyspace = getParameter("keyspace");
        keyspace = skeyspace.c_str();
    } catch (...) {
111
        // No keyspace name. Fine, we'll use default "keatest".
112
113
114
115
116
117
118
119
120
121
    }

    cluster_ = cass_cluster_new();
    cass_cluster_set_contact_points(cluster_, contact_points);

    if (user != NULL && password != NULL) {
        cass_cluster_set_credentials(cluster_, user, password);
    }

    if (port != NULL) {
122
        int port_number;
123
        try {
124
            port_number = boost::lexical_cast<int>(port);
125
126
127
128
        } catch (const std::exception& ex) {
            isc_throw(DbOperationError, "Invalid int data: " << port
                      << " : " << ex.what());
        }
129
        cass_cluster_set_port(cluster_, port_number);
130
131
132
133
    }

    session_ = cass_session_new();

134
135
    CassFuture* connect_future = cass_session_connect_keyspace(session_,
        cluster_, keyspace);
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
    cass_future_wait(connect_future);
    std::string error;
    checkStatementError(error, connect_future, "could not connect to DB");
    rc = cass_future_error_code(connect_future);
    cass_future_free(connect_future);
    if (rc != CASS_OK) {
        cass_session_free(session_);
        session_ = NULL;
        cass_cluster_free(cluster_);
        cluster_ = NULL;
        isc_throw(DbOpenError, error);
    }
}

void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
151
CqlConnection::prepareStatements(CqlTaggedStatement *statements) {
152
153
154
155
156
157
158
159
    CassError rc = CASS_OK;
    uint32_t size = 0;
    tagged_statements_ = statements;
    for (; tagged_statements_[size].params_; size++);
    statements_.resize(size);
    for (uint32_t i = 0; i < size; i++) {
        const char* query = tagged_statements_[i].text_;

160
        CassFuture* future = cass_session_prepare(session_, query);
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
        cass_future_wait(future);
        std::string error;
        checkStatementError(error, future, i, "could not prepare statement");
        rc = cass_future_error_code(future);
        if (rc != CASS_OK) {
            cass_future_free(future);
            statements_[i] = NULL;
            isc_throw(DbOperationError, error);
        } else {
            statements_[i] = cass_future_get_prepared(future);
        }

        cass_future_free(future);
    }
}

void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
178
179
CqlConnection::commit() {
    LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL, DHCPSRV_CQL_COMMIT);
180
181
182
}

void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
183
184
CqlConnection::rollback() {
    LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL, DHCPSRV_CQL_ROLLBACK);
185
186
187
}

void
188
CqlConnection::checkStatementError(std::string& error, CassFuture* future,
189
                                   uint32_t stindex, const char* what) const {
190
191
192
193
194
195
196
197
198
199
200
    CassError rc;
    const char* errorMessage;
    size_t errorMessageSize;
    std::stringstream stream;
    stream << "no error for: " << tagged_statements_[stindex].name_;

    rc = cass_future_error_code(future);
    cass_future_error_message(future, &errorMessage, &errorMessageSize);

    if (rc != CASS_OK) {
        stream.str(std::string());
201
202
        stream << what << " for: " << tagged_statements_[stindex].name_
               << " reason: " << errorMessage << " error code: " << rc;
203
204
205
206
207
    }
    error = stream.str();
}

void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
208
CqlConnection::checkStatementError(std::string& error, CassFuture* future,
209
                                   const char* what) const {
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
    CassError rc;
    const char* errorMessage;
    size_t errorMessageSize;
    std::stringstream stream;
    stream << "no error";

    rc = cass_future_error_code(future);
    cass_future_error_message(future, &errorMessage, &errorMessageSize);

    if (rc != CASS_OK) {
        stream.str(std::string());
        stream << what << " reason: " << errorMessage << " error code: " << rc;
    }
    error = stream.str();
}

}; // end of isc::dhcp namespace
}; // end of isc namespace