diff --git a/src/lib/dhcpsrv/pgsql_host_data_source.cc b/src/lib/dhcpsrv/pgsql_host_data_source.cc index 7ea6f61e6b8e0308e7d25515cb4c8daaeca351df..633ea483e7df61cbaf85c745cfb72a4bcc2bb5d5 100644 --- a/src/lib/dhcpsrv/pgsql_host_data_source.cc +++ b/src/lib/dhcpsrv/pgsql_host_data_source.cc @@ -297,7 +297,7 @@ public: // most recently added host is different than the host id of the // currently processed host. if (hosts.empty() || row_host_id != hosts.back()->getHostId()) { - HostPtr host = retrieveHost(r, row, row_host_id); + HostPtr host(retrieveHost(r, row, row_host_id)); hosts.push_back(host); } } @@ -1263,7 +1263,7 @@ private: OptionPtr option_; }; -} // end of anonymous namespace +} // namespace namespace isc { namespace dhcp { @@ -1854,7 +1854,7 @@ TaggedStatementArray tagged_statements = { { // Using fixed scope_id = 3, which associates an option with host. {7, { OID_INT2, OID_BYTEA, OID_TEXT, - OID_VARCHAR, OID_BOOL, OID_TEXT, OID_INT8}, + OID_VARCHAR, OID_BOOL, OID_TEXT, OID_INT8 }, "insert_v4_host_option", "INSERT INTO dhcp4_options(code, value, formatted_value, space, " " persistent, user_context, host_id, scope_id) " @@ -1866,7 +1866,7 @@ TaggedStatementArray tagged_statements = { { // Using fixed scope_id = 3, which associates an option with host. {7, { OID_INT2, OID_BYTEA, OID_TEXT, - OID_VARCHAR, OID_BOOL, OID_TEXT, OID_INT8}, + OID_VARCHAR, OID_BOOL, OID_TEXT, OID_INT8 }, "insert_v6_host_option", "INSERT INTO dhcp6_options(code, value, formatted_value, space, " " persistent, user_context, host_id, scope_id) " @@ -1903,7 +1903,7 @@ TaggedStatementArray tagged_statements = { { } }; -}; // end anonymous namespace +} // namespace PgSqlHostDataSourceImpl:: PgSqlHostDataSourceImpl(const PgSqlConnection::ParameterMap& parameters) @@ -1927,7 +1927,7 @@ PgSqlHostDataSourceImpl(const PgSqlConnection::ParameterMap& parameters) isc_throw(DbOpenError, "PostgreSQL schema version mismatch: need version: " << code_version.first << "." << code_version.second - << " found version: " << db_version.first << "." + << " found version: " << db_version.first << "." << db_version.second); } @@ -1957,8 +1957,10 @@ uint64_t PgSqlHostDataSourceImpl::addStatement(StatementIndex stindex, PsqlBindArrayPtr& bind_array, const bool return_last_id) { + PgSqlHolder& holderHandle = conn_.handle(); uint64_t last_id = 0; - PgSqlResult r(PQexecPrepared(conn_, tagged_statements[stindex].name, + + PgSqlResult r(PQexecPrepared(holderHandle, tagged_statements[stindex].name, tagged_statements[stindex].nbparams, &bind_array->values_[0], &bind_array->lengths_[0], @@ -1987,7 +1989,9 @@ PgSqlHostDataSourceImpl::addStatement(StatementIndex stindex, bool PgSqlHostDataSourceImpl::delStatement(StatementIndex stindex, PsqlBindArrayPtr& bind_array) { - PgSqlResult r(PQexecPrepared(conn_, tagged_statements[stindex].name, + PgSqlHolder& holderHandle = conn_.handle(); + + PgSqlResult r(PQexecPrepared(holderHandle, tagged_statements[stindex].name, tagged_statements[stindex].nbparams, &bind_array->values_[0], &bind_array->lengths_[0], @@ -2062,9 +2066,10 @@ PgSqlHostDataSourceImpl:: getHostCollection(StatementIndex stindex, PsqlBindArrayPtr bind_array, boost::shared_ptr exchange, ConstHostCollection& result, bool single) const { + PgSqlHolder& holderHandle = conn_.handle(); exchange->clear(); - PgSqlResult r(PQexecPrepared(conn_, tagged_statements[stindex].name, + PgSqlResult r(PQexecPrepared(holderHandle, tagged_statements[stindex].name, tagged_statements[stindex].nbparams, &bind_array->values_[0], &bind_array->lengths_[0], @@ -2110,29 +2115,34 @@ getHost(const SubnetID& subnet_id, // Return single record if present, else clear the host. ConstHostPtr result; - if (!collection.empty()) + if (!collection.empty()) { result = *collection.begin(); + } return (result); } -std::pair PgSqlHostDataSourceImpl::getVersion() const { +pair +PgSqlHostDataSourceImpl::getVersion() const { LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL, DHCPSRV_PGSQL_HOST_DB_GET_VERSION); + + PgSqlHolder& holderHandle = conn_.handle(); const char* version_sql = "SELECT version, minor FROM schema_version;"; - PgSqlResult r(PQexec(conn_, version_sql)); + + PgSqlResult r(PQexec(holderHandle, version_sql)); if(PQresultStatus(r) != PGRES_TUPLES_OK) { isc_throw(DbOperationError, "unable to execute PostgreSQL statement <" - << version_sql << ">, reason: " << PQerrorMessage(conn_)); + << version_sql << ">, reason: " << PQerrorMessage(holderHandle)); } - uint32_t version; - PgSqlExchange::getColumnValue(r, 0, 0, version); + uint32_t major; + PgSqlExchange::getColumnValue(r, 0, 0, major); uint32_t minor; PgSqlExchange::getColumnValue(r, 0, 1, minor); - return (std::make_pair(version, minor)); + return (make_pair(major, minor)); } void @@ -2455,8 +2465,8 @@ ConstHostPtr PgSqlHostDataSource::get4(const SubnetID& subnet_id, const asiolink::IOAddress& address) const { if (!address.isV4()) { - isc_throw(BadValue, "PgSqlHostDataSource::get4(id, address) - " - " wrong address type, address supplied is an IPv6 address"); + isc_throw(BadValue, "PgSqlHostDataSource::get4(id, address): " + "wrong address type, address supplied is an IPv6 address"); } // Set up the WHERE clause value @@ -2475,8 +2485,9 @@ PgSqlHostDataSource::get4(const SubnetID& subnet_id, // Return single record if present, else clear the host. ConstHostPtr result; - if (!collection.empty()) + if (!collection.empty()) { result = *collection.begin(); + } return (result); } @@ -2550,7 +2561,8 @@ PgSqlHostDataSource::get6(const SubnetID& subnet_id, // Miscellaneous database methods. -std::string PgSqlHostDataSource::getName() const { +std::string +PgSqlHostDataSource::getName() const { std::string name = ""; try { name = impl_->conn_.getParameter("name"); @@ -2560,7 +2572,8 @@ std::string PgSqlHostDataSource::getName() const { return (name); } -std::string PgSqlHostDataSource::getDescription() const { +std::string +PgSqlHostDataSource::getDescription() const { return (std::string("Host data source that stores host information" "in PostgreSQL database")); } @@ -2583,5 +2596,5 @@ PgSqlHostDataSource::rollback() { impl_->conn_.rollback(); } -}; // end of isc::dhcp namespace -}; // end of isc namespace +} // namespace dhcp +} // namespace isc diff --git a/src/lib/dhcpsrv/pgsql_lease_mgr.cc b/src/lib/dhcpsrv/pgsql_lease_mgr.cc index 1ade37364974a196e1c400e8a6e0592f1b6b69ff..7180ea54fc1e47e9ba4fb23639396efeab956b93 100644 --- a/src/lib/dhcpsrv/pgsql_lease_mgr.cc +++ b/src/lib/dhcpsrv/pgsql_lease_mgr.cc @@ -133,7 +133,7 @@ PgSqlTaggedStatement tagged_statements[] = { "SELECT address, hwaddr, client_id, " "valid_lifetime, extract(epoch from expire)::bigint, subnet_id, " "fqdn_fwd, fqdn_rev, hostname, " - "state, user_context " + "state, user_context " "FROM lease4 " "WHERE subnet_id = $1"}, @@ -307,6 +307,7 @@ PgSqlTaggedStatement tagged_statements[] = { "hwaddr = $13, hwtype = $14, hwaddr_source = $15, " "state = $16, user_context = $17 " "WHERE address = $18"}, + // ALL_LEASE4_STATS { 0, { OID_NONE }, "all_lease4_stats", @@ -333,7 +334,7 @@ PgSqlTaggedStatement tagged_statements[] = { { 0, { OID_NONE }, "all_lease6_stats", "SELECT subnet_id, lease_type, state, leases as state_count" - " FROM lease6_stat ORDER BY subnet_id, lease_type, state" }, + " FROM lease6_stat ORDER BY subnet_id, lease_type, state"}, // SUBNET_LEASE6_STATS { 1, { OID_INT8 }, @@ -341,7 +342,7 @@ PgSqlTaggedStatement tagged_statements[] = { "SELECT subnet_id, lease_type, state, leases as state_count" " FROM lease6_stat " " WHERE subnet_id = $1 " - " ORDER BY lease_type, state" }, + " ORDER BY lease_type, state"}, // SUBNET_RANGE_LEASE6_STATS { 2, { OID_INT8, OID_INT8 }, @@ -349,7 +350,8 @@ PgSqlTaggedStatement tagged_statements[] = { "SELECT subnet_id, lease_type, state, leases as state_count" " FROM lease6_stat " " WHERE subnet_id >= $1 and subnet_id <= $2 " - " ORDER BY subnet_id, lease_type, state" }, + " ORDER BY subnet_id, lease_type, state"}, + // End of list sentinel { 0, { 0 }, NULL, NULL} }; @@ -463,8 +465,7 @@ public: lease_ = lease; try { - addr_str_ = boost::lexical_cast - (lease->addr_.toUint32()); + addr_str_ = boost::lexical_cast(lease->addr_.toUint32()); bind_array.add(addr_str_); if (lease->hwaddr_ && !lease->hwaddr_->hwaddr_.empty()) { @@ -1018,10 +1019,11 @@ public: /// parameters (for all subnets), a subnet id for a single subnet, or /// a first and last subnet id for a subnet range. void start() { + PgSqlHolder& holderHandle = conn_.handle(); if (getSelectMode() == ALL_SUBNETS) { // Run the query with no where clause parameters. - result_set_.reset(new PgSqlResult(PQexecPrepared(conn_, statement_.name, + result_set_.reset(new PgSqlResult(PQexecPrepared(holderHandle, statement_.name, 0, 0, 0, 0, 0))); } else { // Set up the WHERE clause values @@ -1039,7 +1041,7 @@ public: } // Run the query with where clause parameters. - result_set_.reset(new PgSqlResult(PQexecPrepared(conn_, statement_.name, + result_set_.reset(new PgSqlResult(PQexecPrepared(holderHandle, statement_.name, parms.size(), &parms.values_[0], &parms.lengths_[0], &parms.formats_[0], 0))); } @@ -1132,7 +1134,7 @@ PgSqlLeaseMgr::PgSqlLeaseMgr(const DatabaseConnection::ParameterMap& parameters) // Now prepare the SQL statements. int i = 0; - for( ; tagged_statements[i].text != NULL ; ++i) { + for(; tagged_statements[i].text != NULL; ++i) { conn_.prepareStatement(tagged_statements[i]); } @@ -1158,7 +1160,9 @@ PgSqlLeaseMgr::getDBVersion() { bool PgSqlLeaseMgr::addLeaseCommon(StatementIndex stindex, PsqlBindArray& bind_array) { - PgSqlResult r(PQexecPrepared(conn_, tagged_statements[stindex].name, + PgSqlHolder& holderHandle = conn_.handle(); + + PgSqlResult r(PQexecPrepared(holderHandle, tagged_statements[stindex].name, tagged_statements[stindex].nbparams, &bind_array.values_[0], &bind_array.lengths_[0], @@ -1206,8 +1210,10 @@ void PgSqlLeaseMgr::getLeaseCollection(StatementIndex stindex, Exchange& exchange, LeaseCollection& result, bool single) const { + PgSqlHolder& holderHandle = conn_.handle(); const int n = tagged_statements[stindex].nbparams; - PgSqlResult r(PQexecPrepared(conn_, tagged_statements[stindex].name, n, + + PgSqlResult r(PQexecPrepared(holderHandle, tagged_statements[stindex].name, n, n > 0 ? &bind_array.values_[0] : NULL, n > 0 ? &bind_array.lengths_[0] : NULL, n > 0 ? &bind_array.formats_[0] : NULL, 0)); @@ -1273,8 +1279,7 @@ PgSqlLeaseMgr::getLease4(const isc::asiolink::IOAddress& addr) const { PsqlBindArray bind_array; // LEASE ADDRESS - std::string addr_str = boost::lexical_cast - (addr.toUint32()); + std::string addr_str = boost::lexical_cast(addr.toUint32()); bind_array.add(addr_str); // Get the data @@ -1709,7 +1714,9 @@ PgSqlLeaseMgr::updateLeaseCommon(StatementIndex stindex, LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL, DHCPSRV_PGSQL_ADD_ADDR4).arg(tagged_statements[stindex].name); - PgSqlResult r(PQexecPrepared(conn_, tagged_statements[stindex].name, + PgSqlHolder& holderHandle = conn_.handle(); + + PgSqlResult r(PQexecPrepared(holderHandle, tagged_statements[stindex].name, tagged_statements[stindex].nbparams, &bind_array.values_[0], &bind_array.lengths_[0], @@ -1748,9 +1755,8 @@ PgSqlLeaseMgr::updateLease4(const Lease4Ptr& lease) { exchange4_->createBindForSend(lease, bind_array); // Set up the WHERE clause and append it to the SQL_BIND array - std::string addr4_ = boost::lexical_cast - (lease->addr_.toUint32()); - bind_array.add(addr4_); + std::string addr_str = boost::lexical_cast(lease->addr_.toUint32()); + bind_array.add(addr_str); // Drop to common update code updateLeaseCommon(stindex, bind_array, lease); @@ -1778,7 +1784,9 @@ PgSqlLeaseMgr::updateLease6(const Lease6Ptr& lease) { uint64_t PgSqlLeaseMgr::deleteLeaseCommon(StatementIndex stindex, PsqlBindArray& bind_array) { - PgSqlResult r(PQexecPrepared(conn_, tagged_statements[stindex].name, + PgSqlHolder& holderHandle = conn_.handle(); + + PgSqlResult r(PQexecPrepared(holderHandle, tagged_statements[stindex].name, tagged_statements[stindex].nbparams, &bind_array.values_[0], &bind_array.lengths_[0], @@ -1929,25 +1937,22 @@ PgSqlLeaseMgr::getVersion() const { LOG_DEBUG(dhcpsrv_logger, DHCPSRV_DBG_TRACE_DETAIL, DHCPSRV_PGSQL_GET_VERSION); + PgSqlHolder& holderHandle = conn_.handle(); const char* version_sql = "SELECT version, minor FROM schema_version;"; - PgSqlResult r(PQexec(conn_, version_sql)); + + PgSqlResult r(PQexec(holderHandle, version_sql)); if(PQresultStatus(r) != PGRES_TUPLES_OK) { isc_throw(DbOperationError, "unable to execute PostgreSQL statement <" - << version_sql << ", reason: " << PQerrorMessage(conn_)); + << version_sql << ">, reason: " << PQerrorMessage(holderHandle)); } - istringstream tmp; - uint32_t version; - tmp.str(PQgetvalue(r, 0, 0)); - tmp >> version; - tmp.str(""); - tmp.clear(); + uint32_t major; + PgSqlExchange::getColumnValue(r, 0, 0, major); uint32_t minor; - tmp.str(PQgetvalue(r, 0, 1)); - tmp >> minor; + PgSqlExchange::getColumnValue(r, 0, 1, minor); - return (make_pair(version, minor)); + return (make_pair(major, minor)); } void @@ -1960,5 +1965,5 @@ PgSqlLeaseMgr::rollback() { conn_.rollback(); } -}; // end of isc::dhcp namespace -}; // end of isc namespace +} // namespace dhcp +} // namespace isc diff --git a/src/lib/dhcpsrv/tests/pgsql_host_data_source_unittest.cc b/src/lib/dhcpsrv/tests/pgsql_host_data_source_unittest.cc index e8e149d48226c5ad97439d8f35a3686025694252..88238f0d88bb7537cf03e1c30a6ed7c9284aaabe 100644 --- a/src/lib/dhcpsrv/tests/pgsql_host_data_source_unittest.cc +++ b/src/lib/dhcpsrv/tests/pgsql_host_data_source_unittest.cc @@ -119,9 +119,11 @@ public: PgSqlConnection conn(params); conn.openDatabase(); - PgSqlResult r(PQexec(conn, query.c_str())); + PgSqlHolder& holderHandle = conn.handle(); + + PgSqlResult r(PQexec(holderHandle, query.c_str())); if (PQresultStatus(r) != PGRES_TUPLES_OK) { - isc_throw(DbOperationError, "Query failed:" << PQerrorMessage(conn)); + isc_throw(DbOperationError, "Query failed:" << PQerrorMessage(holderHandle)); } int numrows = PQntuples(r); @@ -644,9 +646,14 @@ TEST_F(PgSqlHostDataSourceTest, testAddRollback) { PgSqlConnection conn(params); ASSERT_NO_THROW(conn.openDatabase()); - PgSqlResult r(PQexec(conn, "DROP TABLE IF EXISTS ipv6_reservations")); - ASSERT_TRUE (PQresultStatus(r) == PGRES_COMMAND_OK) - << " drop command failed :" << PQerrorMessage(conn); + PgSqlHolder& holderHandle = conn.handle(); + + ConstHostCollection collection = hdsptr_->getAll4(0); + ASSERT_EQ(collection.size(), 0); + + PgSqlResult r(PQexec(holderHandle, "DROP TABLE IF EXISTS ipv6_reservations")); + ASSERT_TRUE(PQresultStatus(r) == PGRES_COMMAND_OK) + << " drop command failed :" << PQerrorMessage(holderHandle); // Create a host with a reservation. HostPtr host = HostDataSourceUtils::initializeHost6("2001:db8:1::1", diff --git a/src/lib/pgsql/pgsql_connection.cc b/src/lib/pgsql/pgsql_connection.cc index af98f098cb1468af6300cdb1743326d979fd1dc6..a2d7e6c56ed7c51cc736378981f1399446b1cf1c 100644 --- a/src/lib/pgsql/pgsql_connection.cc +++ b/src/lib/pgsql/pgsql_connection.cc @@ -37,6 +37,78 @@ const int PGSQL_DEFAULT_CONNECTION_TIMEOUT = 5; // seconds const char PgSqlConnection::DUPLICATE_KEY[] = ERRCODE_UNIQUE_VIOLATION; +void +PgSqlHolder::setConnection(PGconn* connection) { + // clear prepared statements associated to current connection + clearPrepared(); + // clear old database back-end object + if (pgsql_ != NULL) { + PQfinish(pgsql_); + } + // set new database back-end object + pgsql_ = connection; + // clear connected flag + connected_ = false; + // clear prepared flag + prepared_ = false; +} + +void +PgSqlHolder::clearPrepared() { + if (pgsql_ != NULL) { + // Deallocate the prepared queries. + if (PQstatus(pgsql_) == CONNECTION_OK) { + PgSqlResult r(PQexec(pgsql_, "DEALLOCATE all")); + if(PQresultStatus(r) != PGRES_COMMAND_OK) { + // Highly unlikely but we'll log it and go on. + DB_LOG_ERROR(PGSQL_DEALLOC_ERROR) + .arg(PQerrorMessage(pgsql_)); + } + } + } +} + +void +PgSqlHolder::openDatabase(PgSqlConnection& connection) { + // return if holder has already called openDatabase + if (connected_) { + return; + } + // set connected flag + connected_ = true; + // set prepared flag to true so that PgSqlConnection::handle() within + // openDatabase function does not call prepareStatements before opening + // the new connection + prepared_ = true; + // call openDatabase for this holder handle + connection.openDatabase(); + // set prepared flag to false so that PgSqlConnection::handle() will + // call prepareStatements for this holder handle + prepared_ = false; +} + +void +PgSqlHolder::prepareStatements(PgSqlConnection& connection) { + // return if holder has already called prepareStatemens + if (prepared_) { + return; + } + // clear previously prepared statements + clearPrepared(); + // Prepare all statements queries with all known fields datatype + for (auto it = connection.statements_.begin(); + it != connection.statements_.end(); ++it) { + PgSqlResult r(PQprepare(pgsql_, (*it)->name, (*it)->text, + (*it)->nbparams, (*it)->types)); + if (PQresultStatus(r) != PGRES_COMMAND_OK) { + isc_throw(DbOperationError, "unable to prepare PostgreSQL statement: " + << (*it)->text << ", reason: " << PQerrorMessage(pgsql_)); + } + } + // set prepared flag + prepared_ = true; +} + PgSqlResult::PgSqlResult(PGresult *result) : result_(result), rows_(0), cols_(0) { if (!result) { @@ -103,7 +175,10 @@ PgSqlTransaction::PgSqlTransaction(PgSqlConnection& conn) PgSqlTransaction::~PgSqlTransaction() { // If commit() wasn't explicitly called, rollback. if (!committed_) { - conn_.rollback(); + try { + conn_.rollback(); + } catch (...) { + } } } @@ -113,29 +188,27 @@ PgSqlTransaction::commit() { committed_ = true; } -PgSqlConnection::~PgSqlConnection() { - if (conn_) { - // Deallocate the prepared queries. - if (PQstatus(conn_) == CONNECTION_OK) { - PgSqlResult r(PQexec(conn_, "DEALLOCATE all")); - if(PQresultStatus(r) != PGRES_COMMAND_OK) { - // Highly unlikely but we'll log it and go on. - DB_LOG_ERROR(PGSQL_DEALLOC_ERROR) - .arg(PQerrorMessage(conn_)); - } - } +PgSqlHolder& +PgSqlConnection::handle() const { + thread_local std::shared_ptr result(std::make_shared()); + if (connected_) { + result->openDatabase(*(const_cast(this))); } + if (prepared_) { + result->prepareStatements(*(const_cast(this))); + } + return *result; +} + +PgSqlConnection::~PgSqlConnection() { + statements_.clear(); + handle().clear(); } void PgSqlConnection::prepareStatement(const PgSqlTaggedStatement& statement) { - // Prepare all statements queries with all known fields datatype - PgSqlResult r(PQprepare(conn_, statement.name, statement.text, - statement.nbparams, statement.types)); - if(PQresultStatus(r) != PGRES_COMMAND_OK) { - isc_throw(DbOperationError, "unable to prepare PostgreSQL statement: " - << statement.text << ", reason: " << PQerrorMessage(conn_)); - } + statements_.push_back(&statement); + prepared_ = true; } void @@ -276,7 +349,10 @@ PgSqlConnection::openDatabase() { } // We have a valid connection, so let's save it to our holder - conn_.setConnection(new_conn); + PgSqlHolder& holderHandle = handle(); + holderHandle.setConnection(new_conn); + holderHandle.connected_ = true; + connected_ = true; } bool @@ -296,6 +372,9 @@ PgSqlConnection::checkStatementError(const PgSqlResult& r, // error class. Note, there is a severity field, but it can be // misleadingly returned as fatal. However, a loss of connectivity // can lead to a NULL sqlstate with a status of PGRES_FATAL_ERROR. + + PgSqlHolder& holderHandle = handle(); + const char* sqlstate = PQresultErrorField(r, PG_DIAG_SQLSTATE); if ((sqlstate == NULL) || ((memcmp(sqlstate, "08", 2) == 0) || // Connection Exception @@ -305,7 +384,7 @@ PgSqlConnection::checkStatementError(const PgSqlResult& r, (memcmp(sqlstate, "58", 2) == 0))) { // System error DB_LOG_ERROR(PGSQL_FATAL_ERROR) .arg(statement.name) - .arg(PQerrorMessage(conn_)) + .arg(PQerrorMessage(holderHandle)) .arg(sqlstate ? sqlstate : ""); // If there's no lost db callback or it returns false, @@ -321,7 +400,7 @@ PgSqlConnection::checkStatementError(const PgSqlResult& r, } // Apparently it wasn't fatal, so we throw with a helpful message. - const char* error_message = PQerrorMessage(conn_); + const char* error_message = PQerrorMessage(holderHandle); isc_throw(DbOperationError, "Statement exec failed:" << " for: " << statement.name << ", status: " << s << "sqlstate:[ " << (sqlstate ? sqlstate : "") @@ -332,9 +411,12 @@ PgSqlConnection::checkStatementError(const PgSqlResult& r, void PgSqlConnection::startTransaction() { DB_LOG_DEBUG(DB_DBG_TRACE_DETAIL, PGSQL_START_TRANSACTION); - PgSqlResult r(PQexec(conn_, "START TRANSACTION")); + + PgSqlHolder& holderHandle = handle(); + + PgSqlResult r(PQexec(holderHandle, "START TRANSACTION")); if (PQresultStatus(r) != PGRES_COMMAND_OK) { - const char* error_message = PQerrorMessage(conn_); + const char* error_message = PQerrorMessage(holderHandle); isc_throw(DbOperationError, "unable to start transaction" << error_message); } @@ -343,9 +425,12 @@ PgSqlConnection::startTransaction() { void PgSqlConnection::commit() { DB_LOG_DEBUG(DB_DBG_TRACE_DETAIL, PGSQL_COMMIT); - PgSqlResult r(PQexec(conn_, "COMMIT")); + + PgSqlHolder& holderHandle = handle(); + + PgSqlResult r(PQexec(holderHandle, "COMMIT")); if (PQresultStatus(r) != PGRES_COMMAND_OK) { - const char* error_message = PQerrorMessage(conn_); + const char* error_message = PQerrorMessage(holderHandle); isc_throw(DbOperationError, "commit failed: " << error_message); } } @@ -353,12 +438,16 @@ PgSqlConnection::commit() { void PgSqlConnection::rollback() { DB_LOG_DEBUG(DB_DBG_TRACE_DETAIL, PGSQL_ROLLBACK); - PgSqlResult r(PQexec(conn_, "ROLLBACK")); + + PgSqlHolder& holderHandle = handle(); + + PgSqlResult r(PQexec(holderHandle, "ROLLBACK")); if (PQresultStatus(r) != PGRES_COMMAND_OK) { - const char* error_message = PQerrorMessage(conn_); + const char* error_message = PQerrorMessage(holderHandle); isc_throw(DbOperationError, "rollback failed: " << error_message); } } -}; // end of isc::db namespace -}; // end of isc namespace +} // namespace db +} // namespace isc + diff --git a/src/lib/pgsql/pgsql_connection.h b/src/lib/pgsql/pgsql_connection.h index 339ec7eefccc6d15f25497322db486184878fa1a..c344f71d59a5810010290624b934edfc16764bb0 100644 --- a/src/lib/pgsql/pgsql_connection.h +++ b/src/lib/pgsql/pgsql_connection.h @@ -7,6 +7,7 @@ #define PGSQL_CONNECTION_H #include +#include #include #include @@ -160,11 +161,13 @@ public: } private: - PGresult* result_; ///< Result set to be freed - int rows_; ///< Number of rows in the result set - int cols_; ///< Number of columns in the result set + PGresult* result_; ///< Result set to be freed + int rows_; ///< Number of rows in the result set + int cols_; ///< Number of columns in the result set }; +/// @brief Forward declaration to @ref PgSqlConnection. +class PgSqlConnection; /// @brief Postgresql connection handle Holder /// @@ -179,57 +182,70 @@ private: /// For this reason, the class is declared noncopyable. class PgSqlHolder : public boost::noncopyable { public: - /// @brief Constructor /// - /// Sets the Postgresql API connector handle to NULL. - /// - PgSqlHolder() : pgconn_(NULL) { + /// Sets the PgSql API connector handle to NULL. + PgSqlHolder() : connected_(false), prepared_(false), pgsql_(NULL) { } /// @brief Destructor /// - /// Frees up resources allocated by the connection. + /// Frees up resources allocated by the connection holder. ~PgSqlHolder() { - if (pgconn_ != NULL) { - PQfinish(pgconn_); - } + clear(); + } + + /// @brief Clear all resources + /// + /// Clear all resources. + void clear() { + setConnection(NULL); } + /// @brief Clear prepared statements + /// + /// Clear prepared statements. + void clearPrepared(); + /// @brief Sets the connection to the value given /// - /// @param connection - pointer to the Postgresql connection instance - void setConnection(PGconn* connection) { - if (pgconn_ != NULL) { - // Already set? Release the current connection first. - // Maybe this should be an error instead? - PQfinish(pgconn_); - } + /// Sets the database back-end object. + /// + /// @param connection - pointer to the PgSql connection instance + void setConnection(PGconn* connection); - pgconn_ = connection; - } + /// @brief Open database + /// + /// Open database and apply PgSql connection parameters. + /// + /// @param connection - associated connection which holds connection properties. + void openDatabase(PgSqlConnection& connection); + + /// @brief Prepare statements + /// + /// Prepare statements. + /// + /// @param connection - associated connection which holds the text statements. + void prepareStatements(PgSqlConnection& connection); /// @brief Conversion Operator /// /// Allows the PgSqlHolder object to be passed as the context argument to /// PQxxxx functions. operator PGconn*() const { - return (pgconn_); + return (pgsql_); } - /// @brief Boolean Operator - /// - /// Allows testing the connection for emptiness: "if (holder)" - operator bool() const { - return (pgconn_); - } + /// @brief The connected flag + bool connected_; ///< Flag to indicate openDatabase has been called private: - PGconn* pgconn_; ///< Postgresql connection -}; + /// @brief The prepared flag + bool prepared_; ///< Flag to indicate prepareStatements has been called -/// @brief Forward declaration to @ref PgSqlConnection. -class PgSqlConnection; + /// @brief The PgSql database back-end object associated to this holder + PGconn* pgsql_; ///< Postgresql connection +}; /// @brief RAII object representing a PostgreSQL transaction. /// @@ -304,8 +320,8 @@ public: /// @brief Constructor /// /// Initialize PgSqlConnection object with parameters needed for connection. - PgSqlConnection(const ParameterMap& parameters) - : DatabaseConnection(parameters) { + PgSqlConnection(const ParameterMap& parameters) : + DatabaseConnection(parameters), connected_(false), prepared_(false) { } /// @brief Destructor @@ -400,30 +416,24 @@ public: void checkStatementError(const PgSqlResult& r, PgSqlTaggedStatement& statement) const; - /// @brief PgSql connection handle + /// @brief Raw statements /// - /// This field is public, because it is used heavily from PgSqlLeaseMgr + /// This field is public, because it is used heavily from PgSqlConnection /// and from PgSqlHostDataSource. - PgSqlHolder conn_; - - /// @brief Conversion Operator - /// - /// Allows the PgConnection object to be passed as the context argument to - /// PQxxxx functions. - operator PGconn*() const { - return (conn_); - } + std::vector statements_; - /// @brief Boolean Operator + /// @brief PgSql connection handle /// - /// Allows testing the PgConnection for initialized connection - operator bool() const { - return (conn_); - } + /// This field is public, because it is used heavily from PgSqlLeaseMgr + /// and from PgSqlHostDataSource. + PgSqlHolder& handle() const; +private: + bool connected_; ///< Flag to indicate openDatabase has been called + bool prepared_; ///< Flag to indicate prepareStatements has been called }; -}; // end of isc::db namespace -}; // end of isc namespace +} // namespace db +} // namespace isc #endif // PGSQL_CONNECTION_H diff --git a/src/lib/pgsql/pgsql_exchange.h b/src/lib/pgsql/pgsql_exchange.h index 4aa82b22357491dbf0194dc39b5c998b5c45be34..fd9518a00fa2f2353137514c6386a4bd00d59cca 100644 --- a/src/lib/pgsql/pgsql_exchange.h +++ b/src/lib/pgsql/pgsql_exchange.h @@ -70,7 +70,6 @@ struct PsqlBindArray { /// @return Returns true if there are no entries in the array, false /// otherwise. bool empty() const { - return (values_.empty()); } @@ -393,7 +392,7 @@ public: protected: /// @brief Stores text labels for columns, currently only used for /// logging and errors. - std::vectorcolumns_; + std::vector columns_; }; }; // end of isc::db namespace diff --git a/src/lib/pgsql/tests/pgsql_exchange_unittest.cc b/src/lib/pgsql/tests/pgsql_exchange_unittest.cc index 173665ac94c3e768c4914527b6cb811a8749bef2..a1f597c3d7089360bd6ef0fbe07f4a6544df8927 100644 --- a/src/lib/pgsql/tests/pgsql_exchange_unittest.cc +++ b/src/lib/pgsql/tests/pgsql_exchange_unittest.cc @@ -198,18 +198,22 @@ public: " varchar_col VARCHAR(255) " "); "; - PgSqlResult r(PQexec(*conn_, sql)); + PgSqlHolder& holderHandle = conn_->handle(); + + PgSqlResult r(PQexec(holderHandle, sql)); ASSERT_EQ(PQresultStatus(r), PGRES_COMMAND_OK) - << " create basics table failed: " << PQerrorMessage(*conn_); + << " create basics table failed: " << PQerrorMessage(holderHandle); } /// @brief Destroys the basics table /// Asserts if the destruction fails void destroySchema() { if (conn_) { - PgSqlResult r(PQexec(*conn_, "DROP TABLE IF EXISTS basics;")); + PgSqlHolder& holderHandle = conn_->handle(); + + PgSqlResult r(PQexec(holderHandle, "DROP TABLE IF EXISTS basics;")); ASSERT_EQ(PQresultStatus(r), PGRES_COMMAND_OK) - << " drop basics table failed: " << PQerrorMessage(*conn_); + << " drop basics table failed: " << PQerrorMessage(holderHandle); } } @@ -227,10 +231,12 @@ public: /// Asserts if the result set status does not equal the expected outcome. void runSql(PgSqlResultPtr& r, const std::string& sql, int exp_outcome, int lineno) { - r.reset(new PgSqlResult(PQexec(*conn_, sql.c_str()))); + PgSqlHolder& holderHandle = conn_->handle(); + + r.reset(new PgSqlResult(PQexec(holderHandle, sql.c_str()))); ASSERT_EQ(PQresultStatus(*r), exp_outcome) << " runSql at line: " << lineno << " failed, sql:[" << sql - << "]\n reason: " << PQerrorMessage(*conn_); + << "]\n reason: " << PQerrorMessage(holderHandle); } /// @brief Executes a SQL statement and tests for an expected outcome @@ -250,7 +256,9 @@ public: PgSqlTaggedStatement& statement, PsqlBindArrayPtr bind_array, int exp_outcome, int lineno) { - r.reset(new PgSqlResult(PQexecPrepared(*conn_, statement.name, + PgSqlHolder& holderHandle = conn_->handle(); + + r.reset(new PgSqlResult(PQexecPrepared(holderHandle, statement.name, statement.nbparams, &bind_array->values_[0], &bind_array->lengths_[0], @@ -258,7 +266,7 @@ public: ASSERT_EQ(PQresultStatus(*r), exp_outcome) << " runPreparedStatement at line: " << lineno << " statement name:[" << statement.name - << "]\n reason: " << PQerrorMessage(*conn_); + << "]\n reason: " << PQerrorMessage(holderHandle); } /// @brief Fetches all of the rows currently in the table diff --git a/src/lib/util/Makefile.am b/src/lib/util/Makefile.am index cb643556d9d1d8cf4e88dbd7599e5976fc84cb9d..0f0e529a17e7972ed0dc1470e7fb8c53b6764812 100644 --- a/src/lib/util/Makefile.am +++ b/src/lib/util/Makefile.am @@ -28,6 +28,7 @@ libkea_util_la_SOURCES += state_model.cc state_model.h libkea_util_la_SOURCES += stopwatch.cc stopwatch.h libkea_util_la_SOURCES += stopwatch_impl.cc stopwatch_impl.h libkea_util_la_SOURCES += strutil.h strutil.cc +libkea_util_la_SOURCES += thread_resource.h libkea_util_la_SOURCES += time_utilities.h time_utilities.cc libkea_util_la_SOURCES += versioned_csv_file.h versioned_csv_file.cc libkea_util_la_SOURCES += watch_socket.cc watch_socket.h diff --git a/src/lib/util/tests/Makefile.am b/src/lib/util/tests/Makefile.am index 68b3e31ea39fbd6e6079de685ab9db471f2831a9..ce86032a402fc8d71ad0875095cb4ede0734b062 100644 --- a/src/lib/util/tests/Makefile.am +++ b/src/lib/util/tests/Makefile.am @@ -53,6 +53,7 @@ run_unittests_SOURCES += random_number_generator_unittest.cc run_unittests_SOURCES += staged_value_unittest.cc run_unittests_SOURCES += state_model_unittest.cc run_unittests_SOURCES += strutil_unittest.cc +run_unittests_SOURCES += thread_resource_unittest.cc run_unittests_SOURCES += time_utilities_unittest.cc run_unittests_SOURCES += range_utilities_unittest.cc run_unittests_SOURCES += signal_set_unittest.cc diff --git a/src/lib/util/tests/thread_resource_unittest.cc b/src/lib/util/tests/thread_resource_unittest.cc new file mode 100644 index 0000000000000000000000000000000000000000..4977ced90e3ba05bdda22ef084345167a1d6a2a7 --- /dev/null +++ b/src/lib/util/tests/thread_resource_unittest.cc @@ -0,0 +1,360 @@ +// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include + +#include + +#include + +#include + +#include +#include +#include +#include + +using namespace isc::dhcp; +using namespace std; + +namespace { + +/// @brief test class to keep track of all constructed objects of a specific +/// class type +/// +/// @template parameter class to make this functionality available for a wide +/// range of 'similar' but distinct classes +template +class Resource : public boost::noncopyable { +public: + /// @brief Constructor + Resource() : data_() { + lock_guard lk(mutex_); + // increase current number of instances of this class + Resource::count_++; + // increase the total number of instances ever created + Resource::created_count_++; + // check that this instance is not found in the verification set + EXPECT_TRUE(Resource::set_.find(&data_) == Resource::set_.end()); + // add this instance to the verification set + Resource::set_.emplace(&data_); + } + + /// @brief Destructor + virtual ~Resource() { + lock_guard lk(mutex_); + // decrease current number of instances of this class + Resource::count_--; + // increase the total number of instances ever destroyed + Resource::destroyed_count_++; + // check that this instance is found in the verification set + EXPECT_FALSE(Resource::set_.find(&data_) == Resource::set_.end()); + // remove this instance from the verification set + Resource::set_.erase(&data_); + } + + /// @brief count number of current allocated instances of the class + /// + /// @return number of current allocated instances of the class + static uint32_t count() { + lock_guard lk(mutex_); + return Resource::count_; + } + + /// @brief count number of class instances ever created + /// + /// @return number of class instances ever created + static uint32_t createdCount() { + lock_guard lk(mutex_); + return Resource::created_count_; + } + + /// @brief count number of class instances ever destroyed + /// + /// @return number of class instances ever destroyed + static uint32_t destroyedCount() { + lock_guard lk(mutex_); + return Resource::destroyed_count_; + } + + /// @brief reset all statistics for this class + static void reset() { + lock_guard lk(mutex_); + // reset all statistics for this class + Resource::count_ = 0; + Resource::created_count_ = 0; + Resource::destroyed_count_ = 0; + Resource::set_.clear(); + } + +private: + /// @brief data element + T data_; + + /// @brief total number of instances at any given time + static uint32_t count_; + + /// @brief total number of instances ever created + static uint32_t created_count_; + + /// @brief total number of instances ever destroyed + static uint32_t destroyed_count_; + + /// @brief mutex used to keep the internal state consistent + static std::mutex mutex_; + + /// @brief set to hold the distinct identification data of each instance + static std::set set_; +}; + +template +uint32_t Resource::count_; +template +uint32_t Resource::created_count_; +template +uint32_t Resource::destroyed_count_; +template +std::mutex Resource::mutex_; +template +std::set Resource::set_; + +/// @brief Test Fixture for testing isc::dhcp::ThreadResource +class ThreadResourceTest : public ::testing::Test { +public: + /// @brief Constructor + ThreadResourceTest() : wait_thread_(false), wait_(false) { + } + + /// @brief Destructor + ~ThreadResourceTest() { + } + + /// @brief flag which indicates if main thread should wait for the test + /// thread to start + /// + /// @return the wait flag + bool waitThread() { + return wait_thread_; + } + + /// @brief flag which indicates if working thread should wait for main + /// thread signal + /// + /// @return the wait flag + bool waitMain() { + return wait_; + } + + /// @brief block main thread until testing thread has processed the task + void wait() { + unique_lock lck(mutex_); + // wait for the testing thread to process + cv_.wait(lck, [&]{ return (waitThread() == false); }); + } + + /// @brief function used by main thread to unblock processing threads + void signalThreads() { + lock_guard lk(wait_mutex_); + // clear the wait flag so that threads will no longer wait for the main + // thread signal + wait_ = false; + // wake all threads if waiting for main thread signal + wait_cv_.notify_all(); + } + + /// @brief reset resource for the specific class type and perform sanity + /// checks, then reset the wait flag so threads wait for the main thread + /// signal to exit + template + void reset() { + // reset the resource + get() = make_shared>>(); + // perform sanity checks + sanityCheck(); + // reset the wait flag + wait_ = true; + } + + /// @brief reset wait thread flag + void resetWaitThread() { + wait_thread_ = true; + } + + /// @brief check statistics + /// + /// @param expected_count check equality of this value with the number of + /// class instances + /// @param expected_created check equality of this value with the number of + /// class instances ever created + /// @param expected_destroyed check equality of this value with the number + /// of class instances ever destroyed + template + void checkInstances(uint32_t expected_count, + uint32_t expected_created, + uint32_t expected_destroyed) { + ASSERT_EQ(Resource::count(), expected_count); + ASSERT_EQ(Resource::createdCount(), expected_created); + ASSERT_EQ(Resource::destroyedCount(), expected_destroyed); + } + + /// @brief get the instance of the resource responsible for a specific class + /// type + /// + /// @return the resource responsible for a specific class type + template + shared_ptr>> &get() { + static shared_ptr>> container; + return container; + } + + /// @brief run function which accesses the resource allocated for the + /// calling thread and verifies the class statistics + /// @param expected_count check equality of this value with the number of + /// class instances + /// @param expected_created check equality of this value with the number of + /// class instances ever created + /// @param expected_destroyed check equality of this value with the number + /// of class instances ever destroyed + /// @param signal indicate if the function should wait for signal from main + /// thread or exit immediately + template + void run(uint32_t expected_count, + uint32_t expected_created, + uint32_t expected_destroyed, + bool signal = false) { + // get resource for this thread + auto left = get()->resource().get(); + // verify statistics + checkInstances(expected_count, expected_created, expected_destroyed); + // get the resource for this thread once more + auto right = get()->resource().get(); + // check that it is the same resource + ASSERT_EQ(left, right); + // verify statistics which should have not changed on multiple + // sequential requests for the same resource + checkInstances(expected_count, expected_created, expected_destroyed); + + { + // make sure this thread has started + lock_guard lk(mutex_); + // reset wait thread flag + wait_thread_ = false; + // wake main thread if it is waiting for this thread to process + cv_.notify_all(); + } + + if (signal) { + unique_lock lk(wait_mutex_); + // if specified, wait for signal from main thread + wait_cv_.wait(lk, [&]{ return (waitMain() == false); }); + } + } + +private: + /// @brief sanity check that the number of created instances is equal to the + /// number of destroyed instances + template + void sanityCheck() { + // the number of created instances should match the number of destroyed + // instances + ASSERT_EQ(Resource::createdCount(), Resource::destroyedCount()); + } + + /// @brief mutex used to keep the internal state consistent + std::mutex mutex_; + + /// @brief condition variable used to signal main thread that test thread + /// has started processing + condition_variable cv_; + + /// @brief mutex used to keep the internal state consistent + /// related to the control of the main thread over the working threads exit + std::mutex wait_mutex_; + + /// @brief condition variable used to signal working threads to exit + condition_variable wait_cv_; + + /// @brief flag which indicates if main thread should wait for test thread + /// to start + bool wait_thread_; + + /// @brief flag which indicates if working thread should wait for main + /// thread signal + bool wait_; +}; + +/// @brief This test verifies that each thread can access it's own allocated +/// resource. Multiple threads are created and run in parallel. The checks are +/// done while threads are still running. +/// It is very important for the threads to run in parallel and not just run and +/// join the thread as this will cause newer threads to use the old thread id +/// and receive the same resource. +/// If destroying threads, the resource should also be reset. +TEST_F(ThreadResourceTest, testThreadResources) { + std::list> threads; + + // reset statistics for uint_32 type + reset(); + // call run function on main thread and verify statistics + run(1, 1, 0); + // configure wait for test thread + resetWaitThread(); + // call run on a different thread and verify statistics + threads.push_back(std::make_shared(std::bind( + &ThreadResourceTest::run, this, 2, 2, 0, true))); + // wait for the thread to process + wait(); + // configure wait for test thread + resetWaitThread(); + // call run again on a different thread and verify statistics + threads.push_back(std::make_shared(std::bind( + &ThreadResourceTest::run, this, 3, 3, 0, true))); + // wait for the thread to process + wait(); + // signal all threads + signalThreads(); + // wait for all threads to finish + for (auto &thread : threads) { + thread->join(); + } + // reset statistics for uint_32 type + reset(); + // verify statistics 0 instances, 3 created, 3 destroyed + checkInstances(0, 3, 3); + + threads.clear(); + + // reset statistics for bool type + reset(); + // call run function on main thread and verify statistics + run(1, 1, 0); + // configure wait for test thread + resetWaitThread(); + // call run on a different thread and verify statistics + threads.push_back(std::make_shared(std::bind( + &ThreadResourceTest::run, this, 2, 2, 0, true))); + // wait for the thread to process + wait(); + // configure wait for test thread + resetWaitThread(); + // call run again on a different thread and verify statistics + threads.push_back(std::make_shared(std::bind( + &ThreadResourceTest::run, this, 3, 3, 0, true))); + // wait for the thread to process + wait(); + // signal all threads + signalThreads(); + // wait for all threads to finish + for (auto &thread : threads) { + thread->join(); + } + // reset statistics for bool type + reset(); + // verify statistics 0 instances, 3 created, 3 destroyed + checkInstances(0, 3, 3); +} + +} // namespace diff --git a/src/lib/util/thread_resource.h b/src/lib/util/thread_resource.h new file mode 100644 index 0000000000000000000000000000000000000000..9a8e44a6049f15e6e62af514e96deb6237580760 --- /dev/null +++ b/src/lib/util/thread_resource.h @@ -0,0 +1,49 @@ +// Copyright (C) 2019 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef THREAD_RESOURCE_H +#define THREAD_RESOURCE_H + +#include +#include +#include +#include + +namespace isc { +namespace dhcp { + +template +class ThreadResource { + typedef std::shared_ptr ResourcePtr; +public: + /// @brief function to retrieve the specific resource of calling thread + /// This function returns the resource of the calling thread from the map + /// container or, in case it is not found, it creates a resource and adds it + /// to the map container + /// + /// @return the specific resource of the calling thread + ResourcePtr resource() { + std::lock_guard lock(mutex_); + auto id = std::this_thread::get_id(); + if (map_.find(id) != map_.end()) { + return map_[id]; + } + ResourcePtr result(std::make_shared()); + map_[id] = result; + return result; + } +private: + /// @brief mutex used to keep the internal state consistent + std::mutex mutex_; + + /// @brief map container which holds the resources for each thread + std::unordered_map map_; +}; + +} // namespace dhcp +} // namespace isc + +#endif // THREAD_RESOURCE_H