Commit 58c93bf0 authored by Ondřej Surý's avatar Ondřej Surý

Merge branch...

Merge branch '1428-possible-data-race-in-rbtdb-happens-occasionally-on-ppc64le-v9_14-v9_11' into 'v9_11'

Resolve "Possible data race in rbtdb, happens occasionally on ppc64le"

See merge request !3037
parents 4e739538 f71a8d11
Pipeline #33681 passed with stages
in 1 minute and 28 seconds
......@@ -52,7 +52,6 @@ struct isc_rwlock {
/* Unlocked. */
unsigned int magic;
isc_mutex_t lock;
int32_t spins;
#if defined(ISC_RWLOCK_USEATOMIC)
/*
......@@ -70,13 +69,17 @@ struct isc_rwlock {
/* Read or modified atomically. */
#if defined(ISC_RWLOCK_USESTDATOMIC)
atomic_int_fast32_t spins;
atomic_int_fast32_t write_requests;
atomic_int_fast32_t write_completions;
atomic_int_fast32_t cnt_and_flag;
atomic_int_fast32_t write_granted;
#else
int32_t spins;
int32_t write_requests;
int32_t write_completions;
int32_t cnt_and_flag;
int32_t write_granted;
#endif
/* Locked by lock. */
......@@ -84,9 +87,6 @@ struct isc_rwlock {
isc_condition_t writeable;
unsigned int readers_waiting;
/* Locked by rwlock itself. */
unsigned int write_granted;
/* Unlocked. */
unsigned int write_quota;
......@@ -107,6 +107,7 @@ struct isc_rwlock {
*/
unsigned int granted;
unsigned int spins;
unsigned int readers_waiting;
unsigned int writers_waiting;
unsigned int read_quota;
......
......@@ -31,6 +31,37 @@
#ifdef ISC_PLATFORM_USETHREADS
#if defined(ISC_RWLOCK_USESTDATOMIC)
#define rwl_init(o, a) atomic_init(&rwl->o, a)
#define rwl_load(o) atomic_load(&rwl->o)
#define rwl_store(o, a) atomic_store(&rwl->o, (a))
#define rwl_add(o, a) atomic_fetch_add(&rwl->o, (a))
#define rwl_sub(o, a) atomic_fetch_sub(&rwl->o, (a))
#define rwl_cmpxchg(o, e, d) atomic_compare_exchange_strong(&rwl->o, &(e), (d))
#elif defined(ISC_RWLOCK_USEATOMIC)
#define rwl_init(o, a) rwl->o = (a);
#define rwl_load(o) isc_atomic_xadd(&rwl->o, 0)
#define rwl_store(o, a) isc_atomic_store(&rwl->o, (a))
#define rwl_add(o, a) isc_atomic_xadd(&rwl->o, (a))
#define rwl_sub(o, a) isc_atomic_xadd(&rwl->o, -(a))
#define rwl_cmpxchg(o, e, d) e = isc_atomic_cmpxchg(&rwl->o, e, d)
#else
#define rwl_init(o, a) rwl->o = (a)
#define rwl_load(o) (rwl->o)
#define rwl_store(o, a) rwl->o = (a)
#define rwl_add(o, a) rwl->o += (a)
#define rwl_sub(o, a) rwl->o -= (a)
#define rwl_cmpxchg(o, e, d) \
if (rwl->o == e) { e = rwl->o; rwl->o = d; } else { e = rwl->o; }
#endif
#ifndef RWLOCK_DEFAULT_READ_QUOTA
#define RWLOCK_DEFAULT_READ_QUOTA 4
#endif
......@@ -68,9 +99,9 @@ print_lock(const char *operation, isc_rwlock_t *rwl, isc_rwlocktype_t type) {
ISC_MSG_READ, "read") :
isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
ISC_MSG_WRITE, "write")),
rwl->write_requests, rwl->write_completions,
rwl->cnt_and_flag, rwl->readers_waiting,
rwl->write_granted, rwl->write_quota);
rwl_load(write_requests), rwl_load(write_completions),
rwl_load(cnt_and_flag), rwl->readers_waiting,
rwl_load(write_granted), rwl->write_quota);
#else
fprintf(stderr,
isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
......@@ -108,19 +139,21 @@ isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
*/
rwl->magic = 0;
rwl->spins = 0;
rwl_init(spins, 0);
#if defined(ISC_RWLOCK_USEATOMIC)
rwl->write_requests = 0;
rwl->write_completions = 0;
rwl->cnt_and_flag = 0;
rwl_init(write_requests, 0);
rwl_init(write_completions, 0);
rwl_init(cnt_and_flag, 0);
rwl_init(write_granted, 0);
rwl->readers_waiting = 0;
rwl->write_granted = 0;
if (read_quota != 0) {
UNEXPECTED_ERROR(__FILE__, __LINE__,
"read quota is not supported");
}
if (write_quota == 0)
if (write_quota == 0) {
write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
}
rwl->write_quota = write_quota;
#else
rwl->type = isc_rwlocktype_read;
......@@ -179,8 +212,8 @@ isc_rwlock_destroy(isc_rwlock_t *rwl) {
REQUIRE(VALID_RWLOCK(rwl));
#if defined(ISC_RWLOCK_USEATOMIC)
REQUIRE(rwl->write_requests == rwl->write_completions &&
rwl->cnt_and_flag == 0 && rwl->readers_waiting == 0);
REQUIRE(rwl_load(write_requests) == rwl_load(write_completions) &&
rwl_load(cnt_and_flag) == 0 && rwl->readers_waiting == 0);
#else
LOCK(&rwl->lock);
REQUIRE(rwl->active == 0 &&
......@@ -264,7 +297,7 @@ isc_rwlock_destroy(isc_rwlock_t *rwl) {
static isc_result_t
isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
int32_t cntflag;
int_fast32_t cntflag;
REQUIRE(VALID_RWLOCK(rwl));
......@@ -274,10 +307,12 @@ isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
#endif
if (type == isc_rwlocktype_read) {
if (rwl->write_requests != rwl->write_completions) {
if (rwl_load(write_requests) != rwl_load(write_completions)) {
/* there is a waiting or active writer */
LOCK(&rwl->lock);
if (rwl->write_requests != rwl->write_completions) {
if (rwl_load(write_requests) !=
rwl_load(write_completions))
{
rwl->readers_waiting++;
WAIT(&rwl->readable, &rwl->lock);
rwl->readers_waiting--;
......@@ -285,23 +320,20 @@ isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
UNLOCK(&rwl->lock);
}
#if defined(ISC_RWLOCK_USESTDATOMIC)
cntflag = atomic_fetch_add_explicit(&rwl->cnt_and_flag,
READER_INCR,
memory_order_relaxed);
#else
cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
#endif
cntflag = rwl_add(cnt_and_flag, READER_INCR);
POST(cntflag);
while (1) {
if ((rwl->cnt_and_flag & WRITER_ACTIVE) == 0)
if ((rwl_load(cnt_and_flag) & WRITER_ACTIVE) == 0) {
break;
}
/* A writer is still working */
LOCK(&rwl->lock);
rwl->readers_waiting++;
if ((rwl->cnt_and_flag & WRITER_ACTIVE) != 0)
if ((rwl_load(cnt_and_flag) & WRITER_ACTIVE) != 0) {
WAIT(&rwl->readable, &rwl->lock);
}
rwl->readers_waiting--;
UNLOCK(&rwl->lock);
......@@ -336,20 +368,15 @@ isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
* quota, reset the condition (race among readers doesn't
* matter).
*/
rwl->write_granted = 0;
rwl_store(write_granted, 0);
} else {
int32_t prev_writer;
int_fast32_t prev_writer;
/* enter the waiting queue, and wait for our turn */
#if defined(ISC_RWLOCK_USESTDATOMIC)
prev_writer = atomic_fetch_add_explicit(&rwl->write_requests, 1,
memory_order_relaxed);
#else
prev_writer = isc_atomic_xadd(&rwl->write_requests, 1);
#endif
while (rwl->write_completions != prev_writer) {
prev_writer = rwl_add(write_requests, 1);
while (rwl_load(write_completions) != prev_writer) {
LOCK(&rwl->lock);
if (rwl->write_completions != prev_writer) {
if (rwl_load(write_completions) != prev_writer) {
WAIT(&rwl->writeable, &rwl->lock);
UNLOCK(&rwl->lock);
continue;
......@@ -359,29 +386,22 @@ isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
}
while (1) {
#if defined(ISC_RWLOCK_USESTDATOMIC)
int_fast32_t cntflag2 = 0;
atomic_compare_exchange_strong_explicit
(&rwl->cnt_and_flag, &cntflag2, WRITER_ACTIVE,
memory_order_relaxed, memory_order_relaxed);
#else
int32_t cntflag2;
cntflag2 = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
WRITER_ACTIVE);
#endif
rwl_cmpxchg(cnt_and_flag, cntflag2, WRITER_ACTIVE);
if (cntflag2 == 0)
break;
/* Another active reader or writer is working. */
LOCK(&rwl->lock);
if (rwl->cnt_and_flag != 0)
if (rwl_load(cnt_and_flag) != 0) {
WAIT(&rwl->writeable, &rwl->lock);
}
UNLOCK(&rwl->lock);
}
INSIST((rwl->cnt_and_flag & WRITER_ACTIVE) != 0);
rwl->write_granted++;
INSIST((rwl_load(cnt_and_flag) & WRITER_ACTIVE) != 0);
rwl_add(write_granted, 1);
}
#ifdef ISC_RWLOCK_TRACE
......@@ -394,8 +414,8 @@ isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
isc_result_t
isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
int32_t cnt = 0;
int32_t max_cnt = rwl->spins * 2 + 10;
int_fast32_t cnt = 0;
int_fast32_t max_cnt = rwl_load(spins) * 2 + 10;
isc_result_t result = ISC_R_SUCCESS;
if (max_cnt > RWLOCK_MAX_ADAPTIVE_COUNT)
......@@ -411,14 +431,14 @@ isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
#endif
} while (isc_rwlock_trylock(rwl, type) != ISC_R_SUCCESS);
rwl->spins += (cnt - rwl->spins) / 8;
rwl_add(spins, (cnt - rwl_load(spins)) / 8);
return (result);
}
isc_result_t
isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
int32_t cntflag;
int_fast32_t cntflag;
REQUIRE(VALID_RWLOCK(rwl));
......@@ -429,36 +449,26 @@ isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
if (type == isc_rwlocktype_read) {
/* If a writer is waiting or working, we fail. */
if (rwl->write_requests != rwl->write_completions)
if (rwl_load(write_requests) != rwl_load(write_completions)) {
return (ISC_R_LOCKBUSY);
}
/* Otherwise, be ready for reading. */
#if defined(ISC_RWLOCK_USESTDATOMIC)
cntflag = atomic_fetch_add_explicit(&rwl->cnt_and_flag,
READER_INCR,
memory_order_relaxed);
#else
cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
#endif
cntflag = rwl_add(cnt_and_flag, READER_INCR);
if ((cntflag & WRITER_ACTIVE) != 0) {
/*
* A writer is working. We lose, and cancel the read
* request.
*/
#if defined(ISC_RWLOCK_USESTDATOMIC)
cntflag = atomic_fetch_sub_explicit
(&rwl->cnt_and_flag, READER_INCR,
memory_order_relaxed);
#else
cntflag = isc_atomic_xadd(&rwl->cnt_and_flag,
-READER_INCR);
#endif
cntflag = rwl_sub(cnt_and_flag, READER_INCR);
/*
* If no other readers are waiting and we've suspended
* new writers in this short period, wake them up.
*/
if (cntflag == READER_INCR &&
rwl->write_completions != rwl->write_requests) {
rwl_load(write_completions) !=
rwl_load(write_requests))
{
LOCK(&rwl->lock);
BROADCAST(&rwl->writeable);
UNLOCK(&rwl->lock);
......@@ -468,31 +478,19 @@ isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
}
} else {
/* Try locking without entering the waiting queue. */
#if defined(ISC_RWLOCK_USESTDATOMIC)
int_fast32_t zero = 0;
if (!atomic_compare_exchange_strong_explicit
(&rwl->cnt_and_flag, &zero, WRITER_ACTIVE,
memory_order_relaxed, memory_order_relaxed))
return (ISC_R_LOCKBUSY);
#else
cntflag = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
WRITER_ACTIVE);
if (cntflag != 0)
rwl_cmpxchg(cnt_and_flag, zero, WRITER_ACTIVE);
if (zero != 0) {
return (ISC_R_LOCKBUSY);
#endif
}
/*
* XXXJT: jump into the queue, possibly breaking the writer
* order.
*/
#if defined(ISC_RWLOCK_USESTDATOMIC)
(void)atomic_fetch_sub_explicit(&rwl->write_completions, 1,
memory_order_relaxed);
#else
(void)isc_atomic_xadd(&rwl->write_completions, -1);
#endif
rwl->write_granted++;
rwl_sub(write_completions, 1);
rwl_add(write_granted, 1);
}
#ifdef ISC_RWLOCK_TRACE
......@@ -505,107 +503,60 @@ isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
isc_result_t
isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
REQUIRE(VALID_RWLOCK(rwl));
int_fast32_t reader_incr = READER_INCR;
#if defined(ISC_RWLOCK_USESTDATOMIC)
{
int_fast32_t reader_incr = READER_INCR;
REQUIRE(VALID_RWLOCK(rwl));
/* Try to acquire write access. */
atomic_compare_exchange_strong_explicit
(&rwl->cnt_and_flag, &reader_incr, WRITER_ACTIVE,
memory_order_relaxed, memory_order_relaxed);
/*
* There must have been no writer, and there must have
* been at least one reader.
*/
INSIST((reader_incr & WRITER_ACTIVE) == 0 &&
(reader_incr & ~WRITER_ACTIVE) != 0);
/* Try to acquire write access. */
rwl_cmpxchg(cnt_and_flag, reader_incr, WRITER_ACTIVE);
if (reader_incr == READER_INCR) {
/*
* We are the only reader and have been upgraded.
* Now jump into the head of the writer waiting queue.
*/
(void)atomic_fetch_sub_explicit(&rwl->write_completions,
1, memory_order_relaxed);
} else {
return (ISC_R_LOCKBUSY);
}
}
#else
{
int32_t prevcnt;
/*
* There must have been no writer, and there must have
* been at least one reader.
*/
INSIST((reader_incr & WRITER_ACTIVE) == 0 &&
(reader_incr & ~WRITER_ACTIVE) != 0);
/* Try to acquire write access. */
prevcnt = isc_atomic_cmpxchg(&rwl->cnt_and_flag,
READER_INCR, WRITER_ACTIVE);
if (reader_incr == READER_INCR) {
/*
* There must have been no writer, and there must have
* been at least one reader.
* We are the only reader and have been upgraded.
* Now jump into the head of the writer waiting queue.
*/
INSIST((prevcnt & WRITER_ACTIVE) == 0 &&
(prevcnt & ~WRITER_ACTIVE) != 0);
if (prevcnt == READER_INCR) {
/*
* We are the only reader and have been upgraded.
* Now jump into the head of the writer waiting queue.
*/
(void)isc_atomic_xadd(&rwl->write_completions, -1);
} else
return (ISC_R_LOCKBUSY);
rwl_sub(write_completions, 1);
} else {
return (ISC_R_LOCKBUSY);
}
#endif
return (ISC_R_SUCCESS);
}
void
isc_rwlock_downgrade(isc_rwlock_t *rwl) {
int32_t prev_readers;
int_fast32_t prev_readers;
REQUIRE(VALID_RWLOCK(rwl));
#if defined(ISC_RWLOCK_USESTDATOMIC)
{
/* Become an active reader. */
prev_readers = atomic_fetch_add_explicit(&rwl->cnt_and_flag,
READER_INCR,
memory_order_relaxed);
/* We must have been a writer. */
INSIST((prev_readers & WRITER_ACTIVE) != 0);
/* Complete write */
(void)atomic_fetch_sub_explicit(&rwl->cnt_and_flag,
WRITER_ACTIVE,
memory_order_relaxed);
(void)atomic_fetch_add_explicit(&rwl->write_completions, 1,
memory_order_relaxed);
}
#else
{
/* Become an active reader. */
prev_readers = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
/* We must have been a writer. */
INSIST((prev_readers & WRITER_ACTIVE) != 0);
/* Complete write */
(void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
(void)isc_atomic_xadd(&rwl->write_completions, 1);
}
#endif
/* Become an active reader. */
prev_readers = rwl_add(cnt_and_flag, READER_INCR);
/* We must have been a writer. */
INSIST((prev_readers & WRITER_ACTIVE) != 0);
/* Complete write */
rwl_sub(cnt_and_flag, WRITER_ACTIVE);
rwl_add(write_completions, 1);
/* Resume other readers */
LOCK(&rwl->lock);
if (rwl->readers_waiting > 0)
if (rwl->readers_waiting > 0) {
BROADCAST(&rwl->readable);
}
UNLOCK(&rwl->lock);
}
isc_result_t
isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
int32_t prev_cnt;
int_fast32_t prev_cnt;
REQUIRE(VALID_RWLOCK(rwl));
......@@ -615,20 +566,16 @@ isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
#endif
if (type == isc_rwlocktype_read) {
#if defined(ISC_RWLOCK_USESTDATOMIC)
prev_cnt = atomic_fetch_sub_explicit(&rwl->cnt_and_flag,
READER_INCR,
memory_order_relaxed);
#else
prev_cnt = isc_atomic_xadd(&rwl->cnt_and_flag, -READER_INCR);
#endif
prev_cnt = rwl_sub(cnt_and_flag, READER_INCR);
/*
* If we're the last reader and any writers are waiting, wake
* them up. We need to wake up all of them to ensure the
* FIFO order.
*/
if (prev_cnt == READER_INCR &&
rwl->write_completions != rwl->write_requests) {
rwl_load(write_completions) !=
rwl_load(write_requests))
{
LOCK(&rwl->lock);
BROADCAST(&rwl->writeable);
UNLOCK(&rwl->lock);
......@@ -640,20 +587,12 @@ isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
* Reset the flag, and (implicitly) tell other writers
* we are done.
*/
#if defined(ISC_RWLOCK_USESTDATOMIC)
(void)atomic_fetch_sub_explicit(&rwl->cnt_and_flag,
WRITER_ACTIVE,
memory_order_relaxed);
(void)atomic_fetch_add_explicit(&rwl->write_completions, 1,
memory_order_relaxed);
#else
(void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
(void)isc_atomic_xadd(&rwl->write_completions, 1);
#endif
rwl_sub(cnt_and_flag, WRITER_ACTIVE);
rwl_add(write_completions, 1);
if (rwl->write_granted >= rwl->write_quota ||
rwl->write_requests == rwl->write_completions ||
(rwl->cnt_and_flag & ~WRITER_ACTIVE) != 0) {
if ((uint32_t)rwl_load(write_granted) >= rwl->write_quota ||
rwl_load(write_requests) == rwl_load(write_completions) ||
(rwl_load(cnt_and_flag) & ~WRITER_ACTIVE) != 0) {
/*
* We have passed the write quota, no writer is
* waiting, or some readers are almost ready, pending
......@@ -670,7 +609,7 @@ isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
UNLOCK(&rwl->lock);
}
if (rwl->write_requests != rwl->write_completions &&
if (rwl_load(write_requests) != rwl_load(write_completions) &&
wakeup_writers) {
LOCK(&rwl->lock);
BROADCAST(&rwl->writeable);
......@@ -761,8 +700,8 @@ doit(isc_rwlock_t *rwl, isc_rwlocktype_t type, bool nonblock) {
isc_result_t
isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
int32_t cnt = 0;
int32_t max_cnt = rwl->spins * 2 + 10;
int_fast32_t cnt = 0;
int_fast32_t max_cnt = rwl_load(spins) * 2 + 10;
isc_result_t result = ISC_R_SUCCESS;
if (max_cnt > RWLOCK_MAX_ADAPTIVE_COUNT)
......@@ -778,7 +717,7 @@ isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
#endif
} while (doit(rwl, type, true) != ISC_R_SUCCESS);
rwl->spins += (cnt - rwl->spins) / 8;
rwl_add(spins, (cnt - rwl_load(spins)) /8);
return (result);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment