cql_connection.cc 16.7 KB
Newer Older
1
// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
2
// Copyright (C) 2015-2018 Deutsche Telekom AG.
3
//
4 5
// Authors: Razvan Becheriu <razvan.becheriu@qualitance.com>
//          Andrei Pavel <andrei.pavel@qualitance.com>
6 7 8 9 10 11 12 13 14 15 16 17 18
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//           http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

Andrei Pavel's avatar
Andrei Pavel committed
19 20
#include <config.h>

21 22
#include <cql/cql_connection.h>
#include <cql/cql_exchange.h>
23 24
#include <database/db_exceptions.h>
#include <database/db_log.h>
Andrei Pavel's avatar
Andrei Pavel committed
25

26
#include <string>
27 28

namespace isc {
29
namespace db {
30

Andrei Pavel's avatar
Andrei Pavel committed
31
CqlConnection::CqlConnection(const ParameterMap& parameters)
32
    : DatabaseConnection(parameters), statements_(), cluster_(NULL),
33 34
      session_(NULL), consistency_(CASS_CONSISTENCY_QUORUM),
      serial_consistency_(CASS_CONSISTENCY_UNKNOWN), schema_meta_(NULL),
35
      keyspace_meta_(NULL), force_consistency_(true) {
36 37
}

Tomek Mrugalski's avatar
Tomek Mrugalski committed
38
CqlConnection::~CqlConnection() {
Andrei Pavel's avatar
Andrei Pavel committed
39 40
    // Free up the prepared statements, ignoring errors. Session and connection
    // resources are deallocated.
41 42 43
    CassError rc = CASS_OK;
    std::string error;

44
    // Let's free the prepared statements.
45 46 47 48
    for (StatementMapEntry s : statements_) {
        CqlTaggedStatement statement = s.second;
        if (statement.prepared_statement_) {
            cass_prepared_free(statement.prepared_statement_);
49 50
        }
    }
51

52
    // If there's a session, tear it down and free the resources.
53
    if (session_) {
54
        cass_schema_meta_free(schema_meta_);
55 56
        CassFuture* close_future = cass_session_close(session_);
        cass_future_wait(close_future);
57 58 59
        error = checkFutureError(
            "CqlConnection::~CqlConnection(): cass_sesssion_close() != CASS_OK",
            close_future);
60 61 62 63 64 65
        rc = cass_future_error_code(close_future);
        cass_future_free(close_future);
        cass_session_free(session_);
        session_ = NULL;
    }

66
    // Free the cluster if there's one.
67 68 69 70 71
    if (cluster_) {
        cass_cluster_free(cluster_);
        cluster_ = NULL;
    }

72
    if (rc != CASS_OK) {
Andrei Pavel's avatar
Andrei Pavel committed
73
        // We're closing the connection anyway. Let's not throw at this stage.
74
        DB_LOG_ERROR(CQL_DEALLOC_ERROR).arg(error);
75 76 77
    }
}

78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
std::pair<uint32_t, uint32_t>
CqlConnection::getVersion(const ParameterMap& parameters) {
    // Get a connection.
    CqlConnection conn(parameters);

    // Open the database.
    conn.openDatabase();

    // Prepare statement.
    conn.prepareStatements(CqlVersionExchange::tagged_statements_);

    std::unique_ptr<CqlVersionExchange> version_exchange(new CqlVersionExchange());
    return version_exchange->retrieveVersion(conn);
}

93 94
CassConsistency CqlConnection::parseConsistency(std::string value) {
    static std::map<std::string, CassConsistency> consistency_map {
Razvan Becheriu's avatar
Razvan Becheriu committed
95 96 97 98 99 100 101 102 103 104 105
        {"any", CASS_CONSISTENCY_ANY},
        {"one", CASS_CONSISTENCY_ONE},
        {"two", CASS_CONSISTENCY_TWO},
        {"three", CASS_CONSISTENCY_THREE},
        {"quorum", CASS_CONSISTENCY_QUORUM},
        {"all", CASS_CONSISTENCY_ALL},
        {"local-quorum", CASS_CONSISTENCY_LOCAL_QUORUM},
        {"each-quorum", CASS_CONSISTENCY_EACH_QUORUM},
        {"serial", CASS_CONSISTENCY_SERIAL},
        {"local-serial", CASS_CONSISTENCY_LOCAL_SERIAL},
        {"local-one", CASS_CONSISTENCY_LOCAL_ONE}
106 107 108 109 110 111 112
    };
    if (consistency_map.find(value) == consistency_map.end()) {
        return CASS_CONSISTENCY_UNKNOWN;
    }
    return consistency_map[value];
}

113
void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
114
CqlConnection::openDatabase() {
115 116 117
    CassError rc;
    // Set up the values of the parameters
    const char* contact_points = "127.0.0.1";
Andrei Pavel's avatar
Andrei Pavel committed
118
    std::string scontact_points;
119
    try {
120
        scontact_points = getParameter("contact-points");
121 122
        contact_points = scontact_points.c_str();
    } catch (...) {
Andrei Pavel's avatar
Andrei Pavel committed
123
        // No host. Fine, we'll use "127.0.0.1".
124 125 126
    }

    const char* port = NULL;
Andrei Pavel's avatar
Andrei Pavel committed
127
    std::string sport;
128 129 130 131
    try {
        sport = getParameter("port");
        port = sport.c_str();
    } catch (...) {
Andrei Pavel's avatar
Andrei Pavel committed
132
        // No port. Fine, we'll use the default "9042".
133 134 135
    }

    const char* user = NULL;
Andrei Pavel's avatar
Andrei Pavel committed
136
    std::string suser;
137 138 139 140
    try {
        suser = getParameter("user");
        user = suser.c_str();
    } catch (...) {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
141
        // No user. Fine, we'll use NULL.
142 143 144
    }

    const char* password = NULL;
Andrei Pavel's avatar
Andrei Pavel committed
145
    std::string spassword;
146 147 148 149
    try {
        spassword = getParameter("password");
        password = spassword.c_str();
    } catch (...) {
Tomek Mrugalski's avatar
Tomek Mrugalski committed
150
        // No password. Fine, we'll use NULL.
151 152 153
    }

    const char* keyspace = "keatest";
Andrei Pavel's avatar
Andrei Pavel committed
154
    std::string skeyspace;
155 156 157 158
    try {
        skeyspace = getParameter("keyspace");
        keyspace = skeyspace.c_str();
    } catch (...) {
Andrei Pavel's avatar
Andrei Pavel committed
159 160 161
        // No keyspace name. Fine, we'll use "keatest".
    }

162 163 164
    const char* consistency = NULL;
    std::string sconsistency;
    try {
Razvan Becheriu's avatar
Razvan Becheriu committed
165
        sconsistency = getParameter("consistency");
166 167
        consistency = sconsistency.c_str();
    } catch (...) {
Razvan Becheriu's avatar
Razvan Becheriu committed
168
        // No consistency. Fine, we'll use "quorum".
169 170
    }

171 172 173 174 175 176
    const char* serial_consistency = NULL;
    std::string sserial_consistency;
    try {
        sserial_consistency = getParameter("serial-consistency");
        serial_consistency = sserial_consistency.c_str();
    } catch (...) {
Razvan Becheriu's avatar
Razvan Becheriu committed
177
        // No serial consistency. Fine, we'll use "serial".
178 179
    }

Andrei Pavel's avatar
Andrei Pavel committed
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
    const char* reconnect_wait_time = NULL;
    std::string sreconnect_wait_time;
    try {
        sreconnect_wait_time = getParameter("reconnect-wait-time");
        reconnect_wait_time = sreconnect_wait_time.c_str();
    } catch (...) {
        // No reconnect wait time. Fine, we'll use the default "2000".
    }

    const char* connect_timeout = NULL;
    std::string sconnect_timeout;
    try {
        sconnect_timeout = getParameter("connect-timeout");
        connect_timeout = sconnect_timeout.c_str();
    } catch (...) {
        // No connect timeout. Fine, we'll use the default "5000".
    }

    const char* request_timeout = NULL;
    std::string srequest_timeout;
    try {
        srequest_timeout = getParameter("request-timeout");
        request_timeout = srequest_timeout.c_str();
    } catch (...) {
        // No request timeout. Fine, we'll use the default "12000".
    }

    const char* tcp_keepalive = NULL;
    std::string stcp_keepalive;
    try {
        stcp_keepalive = getParameter("tcp-keepalive");
        tcp_keepalive = stcp_keepalive.c_str();
    } catch (...) {
        // No tcp-keepalive. Fine, we'll not use TCP keepalive.
    }

    std::string stcp_nodelay;
    try {
        stcp_nodelay = getParameter("tcp-nodelay");
    } catch (...) {
        // No tcp-nodelay. Fine, we'll use the default false.
221 222 223 224 225
    }

    cluster_ = cass_cluster_new();
    cass_cluster_set_contact_points(cluster_, contact_points);

Andrei Pavel's avatar
Andrei Pavel committed
226
    if (user && password) {
227 228 229
        cass_cluster_set_credentials(cluster_, user, password);
    }

Andrei Pavel's avatar
Andrei Pavel committed
230
    if (port) {
231
        int32_t port_number;
232
        try {
233
            port_number = boost::lexical_cast<int32_t>(port);
Andrei Pavel's avatar
Andrei Pavel committed
234
            if (port_number < 1 || port_number > 65535) {
235 236
                cass_cluster_free(cluster_);
                cluster_ = NULL;
237 238 239 240 241
                isc_throw(DbOperationError,
                          "CqlConnection::openDatabase(): "
                          "port outside of range, expected "
                          "1-65535, instead got "
                              << port);
Andrei Pavel's avatar
Andrei Pavel committed
242 243
            }
        } catch (const boost::bad_lexical_cast& ex) {
244 245
            cass_cluster_free(cluster_);
            cluster_ = NULL;
Andrei Pavel's avatar
Andrei Pavel committed
246
            isc_throw(DbOperationError,
247 248 249 250
                      "CqlConnection::openDatabase(): invalid "
                      "port, expected castable to int, instead got "
                      "\"" << port
                           << "\", " << ex.what());
251
        }
252
        cass_cluster_set_port(cluster_, port_number);
253 254
    }

255 256
    if (consistency) {
        CassConsistency desired_consistency = CqlConnection::parseConsistency(sconsistency);
257 258 259 260
        CassConsistency desired_serial_consistency = CASS_CONSISTENCY_UNKNOWN;
        if (serial_consistency) {
            desired_serial_consistency = CqlConnection::parseConsistency(sserial_consistency);
        }
261
        if (desired_consistency != CASS_CONSISTENCY_UNKNOWN) {
262
            setConsistency(true, desired_consistency, desired_serial_consistency);
263 264 265
        }
    }

Andrei Pavel's avatar
Andrei Pavel committed
266
    if (reconnect_wait_time) {
267
        int32_t reconnect_wait_time_number;
Andrei Pavel's avatar
Andrei Pavel committed
268 269
        try {
            reconnect_wait_time_number =
270
                boost::lexical_cast<int32_t>(reconnect_wait_time);
Andrei Pavel's avatar
Andrei Pavel committed
271
            if (reconnect_wait_time_number < 0) {
272 273
                cass_cluster_free(cluster_);
                cluster_ = NULL;
Andrei Pavel's avatar
Andrei Pavel committed
274
                isc_throw(DbOperationError,
275 276
                          "CqlConnection::openDatabase(): invalid reconnect "
                          "wait time, expected positive number, instead got "
Andrei Pavel's avatar
Andrei Pavel committed
277 278 279
                              << reconnect_wait_time);
            }
        } catch (const boost::bad_lexical_cast& ex) {
280 281
            cass_cluster_free(cluster_);
            cluster_ = NULL;
282 283 284 285 286
            isc_throw(DbOperationError,
                      "CqlConnection::openDatabase(): "
                      "invalid reconnect wait time, expected "
                      "castable to int, instead got \""
                          << reconnect_wait_time << "\", " << ex.what());
Andrei Pavel's avatar
Andrei Pavel committed
287 288 289 290 291 292
        }
        cass_cluster_set_reconnect_wait_time(cluster_,
                                             reconnect_wait_time_number);
    }

    if (connect_timeout) {
293
        int32_t connect_timeout_number;
Andrei Pavel's avatar
Andrei Pavel committed
294
        try {
295 296
            connect_timeout_number =
                boost::lexical_cast<int32_t>(connect_timeout);
Andrei Pavel's avatar
Andrei Pavel committed
297
            if (connect_timeout_number < 0) {
298 299
                cass_cluster_free(cluster_);
                cluster_ = NULL;
Andrei Pavel's avatar
Andrei Pavel committed
300
                isc_throw(DbOperationError,
301 302 303
                          "CqlConnection::openDatabase(): "
                          "invalid connect timeout, expected "
                          "positive number, instead got "
Andrei Pavel's avatar
Andrei Pavel committed
304 305 306
                              << connect_timeout);
            }
        } catch (const boost::bad_lexical_cast& ex) {
307 308
            cass_cluster_free(cluster_);
            cluster_ = NULL;
309 310 311 312
            isc_throw(DbOperationError,
                      "CqlConnection::openDatabase(): invalid connect timeout, "
                      "expected castable to int, instead got \""
                          << connect_timeout << "\", " << ex.what());
Andrei Pavel's avatar
Andrei Pavel committed
313 314 315 316 317
        }
        cass_cluster_set_connect_timeout(cluster_, connect_timeout_number);
    }

    if (request_timeout) {
318
        int32_t request_timeout_number;
Andrei Pavel's avatar
Andrei Pavel committed
319
        try {
320 321
            request_timeout_number =
                boost::lexical_cast<int32_t>(request_timeout);
Andrei Pavel's avatar
Andrei Pavel committed
322
            if (request_timeout_number < 0) {
323 324
                cass_cluster_free(cluster_);
                cluster_ = NULL;
Andrei Pavel's avatar
Andrei Pavel committed
325
                isc_throw(DbOperationError,
326 327 328
                          "CqlConnection::openDatabase(): "
                          "invalid request timeout, expected "
                          "positive number, instead got "
Andrei Pavel's avatar
Andrei Pavel committed
329 330 331
                              << request_timeout);
            }
        } catch (const boost::bad_lexical_cast& ex) {
332 333
            cass_cluster_free(cluster_);
            cluster_ = NULL;
334 335 336 337
            isc_throw(DbOperationError,
                      "CqlConnection::openDatabase(): invalid request timeout, "
                      "expected castable to int, instead got \""
                          << request_timeout << "\", " << ex.what());
Andrei Pavel's avatar
Andrei Pavel committed
338 339 340 341 342
        }
        cass_cluster_set_request_timeout(cluster_, request_timeout_number);
    }

    if (tcp_keepalive) {
343
        int32_t tcp_keepalive_number;
Andrei Pavel's avatar
Andrei Pavel committed
344
        try {
345
            tcp_keepalive_number = boost::lexical_cast<int32_t>(tcp_keepalive);
Andrei Pavel's avatar
Andrei Pavel committed
346
            if (tcp_keepalive_number < 0) {
347 348
                cass_cluster_free(cluster_);
                cluster_ = NULL;
Andrei Pavel's avatar
Andrei Pavel committed
349
                isc_throw(DbOperationError,
350 351 352
                          "CqlConnection::openDatabase(): "
                          "invalid TCP keepalive, expected "
                          "positive number, instead got "
Andrei Pavel's avatar
Andrei Pavel committed
353 354 355
                              << tcp_keepalive);
            }
        } catch (const boost::bad_lexical_cast& ex) {
356 357
            cass_cluster_free(cluster_);
            cluster_ = NULL;
358 359 360 361
            isc_throw(DbOperationError,
                      "CqlConnection::openDatabase(): invalid TCP keepalive, "
                      "expected castable to int, instead got \""
                          << tcp_keepalive << "\", " << ex.what());
Andrei Pavel's avatar
Andrei Pavel committed
362 363 364 365 366 367 368 369 370
        }
        cass_cluster_set_tcp_keepalive(cluster_, cass_true,
                                       tcp_keepalive_number);
    }

    if (stcp_nodelay == "true") {
        cass_cluster_set_tcp_nodelay(cluster_, cass_true);
    }

371 372
    session_ = cass_session_new();

Andrei Pavel's avatar
Andrei Pavel committed
373 374
    CassFuture* connect_future =
        cass_session_connect_keyspace(session_, cluster_, keyspace);
375
    cass_future_wait(connect_future);
376 377 378 379
    const std::string error =
        checkFutureError("CqlConnection::openDatabase(): "
                         "cass_session_connect_keyspace() != CASS_OK",
                         connect_future);
380 381 382 383 384 385 386 387 388
    rc = cass_future_error_code(connect_future);
    cass_future_free(connect_future);
    if (rc != CASS_OK) {
        cass_session_free(session_);
        session_ = NULL;
        cass_cluster_free(cluster_);
        cluster_ = NULL;
        isc_throw(DbOpenError, error);
    }
389 390 391 392 393

    // Get keyspace meta.
    schema_meta_ = cass_session_get_schema_meta(session_);
    keyspace_meta_ = cass_schema_meta_keyspace_by_name(schema_meta_, keyspace);
    if (!keyspace_meta_) {
394 395 396 397
        cass_session_free(session_);
        session_ = NULL;
        cass_cluster_free(cluster_);
        cluster_ = NULL;
398 399 400
        isc_throw(DbOpenError, "CqlConnection::openDatabase(): "
                               "!cass_schema_meta_keyspace_by_name()");
    }
401 402 403
}

void
404
CqlConnection::prepareStatements(StatementMap& statements) {
405
    CassError rc = CASS_OK;
406 407 408 409 410 411 412 413
    for (StatementMapEntry it : statements) {
        CqlTaggedStatement& tagged_statement = it.second;
        if (statements_.find(tagged_statement.name_) != statements_.end()) {
            isc_throw(DbOperationError,
                      "CqlConnection::prepareStatements(): "
                      "duplicate statement with name "
                          << tagged_statement.name_);
        }
414

415 416
        CassFuture* future =
            cass_session_prepare(session_, tagged_statement.text_);
417
        cass_future_wait(future);
418 419 420 421
        const std::string error =
            checkFutureError("CqlConnection::prepareStatements():"
                             " cass_session_prepare() != CASS_OK",
                             future, tagged_statement.name_);
422 423 424 425 426 427
        rc = cass_future_error_code(future);
        if (rc != CASS_OK) {
            cass_future_free(future);
            isc_throw(DbOperationError, error);
        }

428 429
        tagged_statement.prepared_statement_ = cass_future_get_prepared(future);
        statements_.insert(it);
430 431 432 433
        cass_future_free(future);
    }
}

Andrei Pavel's avatar
Andrei Pavel committed
434
void
435 436 437
CqlConnection::setConsistency(bool force,
                              CassConsistency consistency,
                              CassConsistency serial_consistency) {
Andrei Pavel's avatar
Andrei Pavel committed
438 439
    force_consistency_ = force;
    consistency_ = consistency;
440
    serial_consistency_ = serial_consistency;
Andrei Pavel's avatar
Andrei Pavel committed
441 442 443 444
}

void
CqlConnection::startTransaction() {
445
    DB_LOG_DEBUG(DB_DBG_TRACE_DETAIL, CQL_CONNECTION_BEGIN_TRANSACTION);
Andrei Pavel's avatar
Andrei Pavel committed
446 447
}

448
void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
449
CqlConnection::commit() {
450
    DB_LOG_DEBUG(DB_DBG_TRACE_DETAIL, CQL_CONNECTION_COMMIT);
451 452 453
}

void
Tomek Mrugalski's avatar
Tomek Mrugalski committed
454
CqlConnection::rollback() {
455
    DB_LOG_DEBUG(DB_DBG_TRACE_DETAIL, CQL_CONNECTION_ROLLBACK);
456 457
}

458 459 460 461 462 463 464 465
const std::string
CqlConnection::checkFutureError(const std::string& what,
                                CassFuture* future,
                                StatementTag statement_tag /* = NULL */) {
    CassError cass_error = cass_future_error_code(future);
    const char* error_message;
    size_t error_message_size;
    cass_future_error_message(future, &error_message, &error_message_size);
466 467

    std::stringstream stream;
468 469 470 471 472 473 474
    if (statement_tag && std::strlen(statement_tag) > 0) {
        // future is from cass_session_execute() call.
        stream << "Statement ";
        stream << statement_tag;
    } else {
        // future is from cass_session_*() call.
        stream << "Session action ";
475
    }
476 477 478 479 480 481
    if (cass_error == CASS_OK) {
        stream << " executed succesfully.";
    } else {
        stream << " failed, Kea error: " << what
               << ", Cassandra error code: " << cass_error_desc(cass_error)
               << ", Cassandra future error: " << error_message;
Andrei Pavel's avatar
Andrei Pavel committed
482
    }
483
    return stream.str();
Andrei Pavel's avatar
Andrei Pavel committed
484 485 486 487
}

}  // namespace dhcp
}  // namespace isc