Commit f71a8d11 authored by Ondřej Surý's avatar Ondřej Surý Committed by Ondřej Surý

Convert all atomic operations in isc_rwlock to sequentially-consistent ordering

The memory ordering in the rwlock was all wrong, I am copying excerpts
from the https://en.cppreference.com/w/c/atomic/memory_order#Relaxed_ordering
for the convenience of the reader:

  Relaxed ordering

  Atomic operations tagged memory_order_relaxed are not synchronization
  operations; they do not impose an order among concurrent memory
  accesses. They only guarantee atomicity and modification order
  consistency.

  Sequentially-consistent ordering

  Atomic operations tagged memory_order_seq_cst not only order memory
  the same way as release/acquire ordering (everything that
  happened-before a store in one thread becomes a visible side effect in
  the thread that did a load), but also establish a single total
  modification order of all atomic operations that are so tagged.

Which basically means that we had no or weak synchronization between
threads using the same variables in the rwlock structure.  There should
not be a significant performance drop because the critical sections were
already protected by:

  while(1) {
    if (relaxed_atomic_operation) {
      break;
    }
    LOCK(lock);
    if (!relaxed_atomic_operation) {
      WAIT(sem, lock);
    }
    UNLOCK(lock)l
  }

I would add one more thing to "Don't do your own crypto, folks.":

  - Also don't do your own locking, folks.

As part of this commit, I have also cleaned up the #ifdef spaghetti,
and fixed the isc_atomic API usage.
parent 542517b1
Pipeline #33677 passed with stages
in 32 minutes and 28 seconds
......@@ -73,7 +73,7 @@ struct isc_rwlock {
atomic_int_fast32_t write_requests;
atomic_int_fast32_t write_completions;
atomic_int_fast32_t cnt_and_flag;
atomic_uint_fast32_t write_granted;
atomic_int_fast32_t write_granted;
#else
int32_t spins;
int32_t write_requests;
......
......@@ -31,6 +31,37 @@
#ifdef ISC_PLATFORM_USETHREADS
#if defined(ISC_RWLOCK_USESTDATOMIC)
#define rwl_init(o, a) atomic_init(&rwl->o, a)
#define rwl_load(o) atomic_load(&rwl->o)
#define rwl_store(o, a) atomic_store(&rwl->o, (a))
#define rwl_add(o, a) atomic_fetch_add(&rwl->o, (a))
#define rwl_sub(o, a) atomic_fetch_sub(&rwl->o, (a))
#define rwl_cmpxchg(o, e, d) atomic_compare_exchange_strong(&rwl->o, &(e), (d))
#elif defined(ISC_RWLOCK_USEATOMIC)
#define rwl_init(o, a) rwl->o = (a);
#define rwl_load(o) isc_atomic_xadd(&rwl->o, 0)
#define rwl_store(o, a) isc_atomic_store(&rwl->o, (a))
#define rwl_add(o, a) isc_atomic_xadd(&rwl->o, (a))
#define rwl_sub(o, a) isc_atomic_xadd(&rwl->o, -(a))
#define rwl_cmpxchg(o, e, d) e = isc_atomic_cmpxchg(&rwl->o, e, d)
#else
#define rwl_init(o, a) rwl->o = (a)
#define rwl_load(o) (rwl->o)
#define rwl_store(o, a) rwl->o = (a)
#define rwl_add(o, a) rwl->o += (a)
#define rwl_sub(o, a) rwl->o -= (a)
#define rwl_cmpxchg(o, e, d) \
if (rwl->o == e) { e = rwl->o; rwl->o = d; } else { e = rwl->o; }
#endif
#ifndef RWLOCK_DEFAULT_READ_QUOTA
#define RWLOCK_DEFAULT_READ_QUOTA 4
#endif
......@@ -68,9 +99,9 @@ print_lock(const char *operation, isc_rwlock_t *rwl, isc_rwlocktype_t type) {
ISC_MSG_READ, "read") :
isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
ISC_MSG_WRITE, "write")),
rwl->write_requests, rwl->write_completions,
rwl->cnt_and_flag, rwl->readers_waiting,
rwl->write_granted, rwl->write_quota);
rwl_load(write_requests), rwl_load(write_completions),
rwl_load(cnt_and_flag), rwl->readers_waiting,
rwl_load(write_granted), rwl->write_quota);
#else
fprintf(stderr,
isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
......@@ -108,20 +139,13 @@ isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
*/
rwl->magic = 0;
rwl_init(spins, 0);
#if defined(ISC_RWLOCK_USEATOMIC)
#if defined(ISC_RWLOCK_USESTDATOMIC)
atomic_init(&rwl->spins, 0);
atomic_init(&rwl->write_requests, 0);
atomic_init(&rwl->write_completions, 0);
atomic_init(&rwl->cnt_and_flag, 0);
atomic_init(&rwl->write_granted, 0);
#else
rwl->spins = 0;
rwl->write_requests = 0;
rwl->write_completions = 0;
rwl->cnt_and_flag = 0;
rwl->write_granted = 0;
#endif
rwl_init(write_requests, 0);
rwl_init(write_completions, 0);
rwl_init(cnt_and_flag, 0);
rwl_init(write_granted, 0);
rwl->readers_waiting = 0;
if (read_quota != 0) {
UNEXPECTED_ERROR(__FILE__, __LINE__,
......@@ -132,7 +156,6 @@ isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
}
rwl->write_quota = write_quota;
#else
rwl->spins = 0;
rwl->type = isc_rwlocktype_read;
rwl->original = isc_rwlocktype_none;
rwl->active = 0;
......@@ -189,8 +212,8 @@ isc_rwlock_destroy(isc_rwlock_t *rwl) {
REQUIRE(VALID_RWLOCK(rwl));
#if defined(ISC_RWLOCK_USEATOMIC)
REQUIRE(rwl->write_requests == rwl->write_completions &&
rwl->cnt_and_flag == 0 && rwl->readers_waiting == 0);
REQUIRE(rwl_load(write_requests) == rwl_load(write_completions) &&
rwl_load(cnt_and_flag) == 0 && rwl->readers_waiting == 0);
#else
LOCK(&rwl->lock);
REQUIRE(rwl->active == 0 &&
......@@ -274,7 +297,7 @@ isc_rwlock_destroy(isc_rwlock_t *rwl) {
static isc_result_t
isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
int32_t cntflag;
int_fast32_t cntflag;
REQUIRE(VALID_RWLOCK(rwl));
......@@ -284,10 +307,12 @@ isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
#endif
if (type == isc_rwlocktype_read) {
if (rwl->write_requests != rwl->write_completions) {
if (rwl_load(write_requests) != rwl_load(write_completions)) {
/* there is a waiting or active writer */
LOCK(&rwl->lock);
if (rwl->write_requests != rwl->write_completions) {
if (rwl_load(write_requests) !=
rwl_load(write_completions))
{
rwl->readers_waiting++;
WAIT(&rwl->readable, &rwl->lock);
rwl->readers_waiting--;
......@@ -295,23 +320,20 @@ isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
UNLOCK(&rwl->lock);
}
#if defined(ISC_RWLOCK_USESTDATOMIC)
cntflag = atomic_fetch_add_explicit(&rwl->cnt_and_flag,
READER_INCR,
memory_order_relaxed);
#else
cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
#endif
cntflag = rwl_add(cnt_and_flag, READER_INCR);
POST(cntflag);
while (1) {
if ((rwl->cnt_and_flag & WRITER_ACTIVE) == 0)
if ((rwl_load(cnt_and_flag) & WRITER_ACTIVE) == 0) {
break;
}
/* A writer is still working */
LOCK(&rwl->lock);
rwl->readers_waiting++;
if ((rwl->cnt_and_flag & WRITER_ACTIVE) != 0)
if ((rwl_load(cnt_and_flag) & WRITER_ACTIVE) != 0) {
WAIT(&rwl->readable, &rwl->lock);
}
rwl->readers_waiting--;
UNLOCK(&rwl->lock);
......@@ -346,24 +368,15 @@ isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
* quota, reset the condition (race among readers doesn't
* matter).
*/
#if defined(ISC_RWLOCK_USESTDATOMIC)
atomic_store(&rwl->write_granted, 0);
#else
isc_atomic_store(&rwl->write_granted, 0);
#endif
rwl_store(write_granted, 0);
} else {
int32_t prev_writer;
int_fast32_t prev_writer;
/* enter the waiting queue, and wait for our turn */
#if defined(ISC_RWLOCK_USESTDATOMIC)
prev_writer = atomic_fetch_add_explicit(&rwl->write_requests, 1,
memory_order_relaxed);
#else
prev_writer = isc_atomic_xadd(&rwl->write_requests, 1);
#endif
while (rwl->write_completions != prev_writer) {
prev_writer = rwl_add(write_requests, 1);
while (rwl_load(write_completions) != prev_writer) {
LOCK(&rwl->lock);
if (rwl->write_completions != prev_writer) {
if (rwl_load(write_completions) != prev_writer) {
WAIT(&rwl->writeable, &rwl->lock);
UNLOCK(&rwl->lock);
continue;
......@@ -373,33 +386,22 @@ isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
}
while (1) {
#if defined(ISC_RWLOCK_USESTDATOMIC)
int_fast32_t cntflag2 = 0;
atomic_compare_exchange_strong_explicit
(&rwl->cnt_and_flag, &cntflag2, WRITER_ACTIVE,
memory_order_relaxed, memory_order_relaxed);
#else
int32_t cntflag2;
cntflag2 = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
WRITER_ACTIVE);
#endif
rwl_cmpxchg(cnt_and_flag, cntflag2, WRITER_ACTIVE);
if (cntflag2 == 0)
break;
/* Another active reader or writer is working. */
LOCK(&rwl->lock);
if (rwl->cnt_and_flag != 0)
if (rwl_load(cnt_and_flag) != 0) {
WAIT(&rwl->writeable, &rwl->lock);
}
UNLOCK(&rwl->lock);
}
INSIST((rwl->cnt_and_flag & WRITER_ACTIVE) != 0);
#if defined(ISC_RWLOCK_USESTDATOMIC)
(void)atomic_fetch_add(&rwl->write_granted, 1);
#else
(void)isc_atomic_xadd(&rwl->write_granted, 1);
#endif
INSIST((rwl_load(cnt_and_flag) & WRITER_ACTIVE) != 0);
rwl_add(write_granted, 1);
}
#ifdef ISC_RWLOCK_TRACE
......@@ -412,13 +414,8 @@ isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
isc_result_t
isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
int32_t cnt = 0;
#if defined(ISC_RWLOCK_USESTDATOMIC)
int32_t spins = atomic_load(&rwl->spins);
#else
int32_t spins = rwl->spins;
#endif
int32_t max_cnt = spins * 2 + 10;
int_fast32_t cnt = 0;
int_fast32_t max_cnt = rwl_load(spins) * 2 + 10;
isc_result_t result = ISC_R_SUCCESS;
if (max_cnt > RWLOCK_MAX_ADAPTIVE_COUNT)
......@@ -434,18 +431,14 @@ isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
#endif
} while (isc_rwlock_trylock(rwl, type) != ISC_R_SUCCESS);
#if defined(ISC_RWLOCK_USESTDATOMIC)
atomic_fetch_add(&rwl->spins, (cnt - spins) / 8);
#else
rwl->spins += (cnt - spins) / 8;
#endif
rwl_add(spins, (cnt - rwl_load(spins)) / 8);
return (result);
}
isc_result_t
isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
int32_t cntflag;
int_fast32_t cntflag;
REQUIRE(VALID_RWLOCK(rwl));
......@@ -456,36 +449,26 @@ isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
if (type == isc_rwlocktype_read) {
/* If a writer is waiting or working, we fail. */
if (rwl->write_requests != rwl->write_completions)
if (rwl_load(write_requests) != rwl_load(write_completions)) {
return (ISC_R_LOCKBUSY);
}
/* Otherwise, be ready for reading. */
#if defined(ISC_RWLOCK_USESTDATOMIC)
cntflag = atomic_fetch_add_explicit(&rwl->cnt_and_flag,
READER_INCR,
memory_order_relaxed);
#else
cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
#endif
cntflag = rwl_add(cnt_and_flag, READER_INCR);
if ((cntflag & WRITER_ACTIVE) != 0) {
/*
* A writer is working. We lose, and cancel the read
* request.
*/
#if defined(ISC_RWLOCK_USESTDATOMIC)
cntflag = atomic_fetch_sub_explicit
(&rwl->cnt_and_flag, READER_INCR,
memory_order_relaxed);
#else
cntflag = isc_atomic_xadd(&rwl->cnt_and_flag,
-READER_INCR);
#endif
cntflag = rwl_sub(cnt_and_flag, READER_INCR);
/*
* If no other readers are waiting and we've suspended
* new writers in this short period, wake them up.
*/
if (cntflag == READER_INCR &&
rwl->write_completions != rwl->write_requests) {
rwl_load(write_completions) !=
rwl_load(write_requests))
{
LOCK(&rwl->lock);
BROADCAST(&rwl->writeable);
UNLOCK(&rwl->lock);
......@@ -495,32 +478,19 @@ isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
}
} else {
/* Try locking without entering the waiting queue. */
#if defined(ISC_RWLOCK_USESTDATOMIC)
int_fast32_t zero = 0;
if (!atomic_compare_exchange_strong_explicit
(&rwl->cnt_and_flag, &zero, WRITER_ACTIVE,
memory_order_relaxed, memory_order_relaxed))
return (ISC_R_LOCKBUSY);
#else
cntflag = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
WRITER_ACTIVE);
if (cntflag != 0)
rwl_cmpxchg(cnt_and_flag, zero, WRITER_ACTIVE);
if (zero != 0) {
return (ISC_R_LOCKBUSY);
#endif
}
/*
* XXXJT: jump into the queue, possibly breaking the writer
* order.
*/
#if defined(ISC_RWLOCK_USESTDATOMIC)
(void)atomic_fetch_sub_explicit(&rwl->write_completions, 1,
memory_order_relaxed);
(void)atomic_fetch_add(&rwl->write_granted, 1);
#else
(void)isc_atomic_xadd(&rwl->write_completions, -1);
(void)isc_atomic_xadd(&rwl->write_granted, 1);
#endif
rwl_sub(write_completions, 1);
rwl_add(write_granted, 1);
}
#ifdef ISC_RWLOCK_TRACE
......@@ -533,107 +503,60 @@ isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
isc_result_t
isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
int_fast32_t reader_incr = READER_INCR;
REQUIRE(VALID_RWLOCK(rwl));
#if defined(ISC_RWLOCK_USESTDATOMIC)
{
int_fast32_t reader_incr = READER_INCR;
/* Try to acquire write access. */
rwl_cmpxchg(cnt_and_flag, reader_incr, WRITER_ACTIVE);
/* Try to acquire write access. */
atomic_compare_exchange_strong_explicit
(&rwl->cnt_and_flag, &reader_incr, WRITER_ACTIVE,
memory_order_relaxed, memory_order_relaxed);
/*
* There must have been no writer, and there must have
* been at least one reader.
*/
INSIST((reader_incr & WRITER_ACTIVE) == 0 &&
(reader_incr & ~WRITER_ACTIVE) != 0);
if (reader_incr == READER_INCR) {
/*
* We are the only reader and have been upgraded.
* Now jump into the head of the writer waiting queue.
*/
(void)atomic_fetch_sub_explicit(&rwl->write_completions,
1, memory_order_relaxed);
} else {
return (ISC_R_LOCKBUSY);
}
}
#else
{
int32_t prevcnt;
/*
* There must have been no writer, and there must have
* been at least one reader.
*/
INSIST((reader_incr & WRITER_ACTIVE) == 0 &&
(reader_incr & ~WRITER_ACTIVE) != 0);
/* Try to acquire write access. */
prevcnt = isc_atomic_cmpxchg(&rwl->cnt_and_flag,
READER_INCR, WRITER_ACTIVE);
if (reader_incr == READER_INCR) {
/*
* There must have been no writer, and there must have
* been at least one reader.
* We are the only reader and have been upgraded.
* Now jump into the head of the writer waiting queue.
*/
INSIST((prevcnt & WRITER_ACTIVE) == 0 &&
(prevcnt & ~WRITER_ACTIVE) != 0);
if (prevcnt == READER_INCR) {
/*
* We are the only reader and have been upgraded.
* Now jump into the head of the writer waiting queue.
*/
(void)isc_atomic_xadd(&rwl->write_completions, -1);
} else
return (ISC_R_LOCKBUSY);
rwl_sub(write_completions, 1);
} else {
return (ISC_R_LOCKBUSY);
}
#endif
return (ISC_R_SUCCESS);
}
void
isc_rwlock_downgrade(isc_rwlock_t *rwl) {
int32_t prev_readers;
int_fast32_t prev_readers;
REQUIRE(VALID_RWLOCK(rwl));
#if defined(ISC_RWLOCK_USESTDATOMIC)
{
/* Become an active reader. */
prev_readers = atomic_fetch_add_explicit(&rwl->cnt_and_flag,
READER_INCR,
memory_order_relaxed);
/* We must have been a writer. */
INSIST((prev_readers & WRITER_ACTIVE) != 0);
/* Complete write */
(void)atomic_fetch_sub_explicit(&rwl->cnt_and_flag,
WRITER_ACTIVE,
memory_order_relaxed);
(void)atomic_fetch_add_explicit(&rwl->write_completions, 1,
memory_order_relaxed);
}
#else
{
/* Become an active reader. */
prev_readers = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
/* We must have been a writer. */
INSIST((prev_readers & WRITER_ACTIVE) != 0);
/* Complete write */
(void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
(void)isc_atomic_xadd(&rwl->write_completions, 1);
}
#endif
/* Become an active reader. */
prev_readers = rwl_add(cnt_and_flag, READER_INCR);
/* We must have been a writer. */
INSIST((prev_readers & WRITER_ACTIVE) != 0);
/* Complete write */
rwl_sub(cnt_and_flag, WRITER_ACTIVE);
rwl_add(write_completions, 1);
/* Resume other readers */
LOCK(&rwl->lock);
if (rwl->readers_waiting > 0)
if (rwl->readers_waiting > 0) {
BROADCAST(&rwl->readable);
}
UNLOCK(&rwl->lock);
}
isc_result_t
isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
int32_t prev_cnt;
int_fast32_t prev_cnt;
REQUIRE(VALID_RWLOCK(rwl));
......@@ -643,48 +566,33 @@ isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
#endif
if (type == isc_rwlocktype_read) {
#if defined(ISC_RWLOCK_USESTDATOMIC)
prev_cnt = atomic_fetch_sub_explicit(&rwl->cnt_and_flag,
READER_INCR,
memory_order_relaxed);
#else
prev_cnt = isc_atomic_xadd(&rwl->cnt_and_flag, -READER_INCR);
#endif
prev_cnt = rwl_sub(cnt_and_flag, READER_INCR);
/*
* If we're the last reader and any writers are waiting, wake
* them up. We need to wake up all of them to ensure the
* FIFO order.
*/
if (prev_cnt == READER_INCR &&
rwl->write_completions != rwl->write_requests) {
rwl_load(write_completions) !=
rwl_load(write_requests))
{
LOCK(&rwl->lock);
BROADCAST(&rwl->writeable);
UNLOCK(&rwl->lock);
}
} else {
bool wakeup_writers = true;
uint32_t write_granted;
/*
* Reset the flag, and (implicitly) tell other writers
* we are done.
*/
#if defined(ISC_RWLOCK_USESTDATOMIC)
(void)atomic_fetch_sub_explicit(&rwl->cnt_and_flag,
WRITER_ACTIVE,
memory_order_relaxed);
(void)atomic_fetch_add_explicit(&rwl->write_completions, 1,
memory_order_relaxed);
write_granted = (uint32_t)atomic_load(&rwl->write_granted);
#else
(void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
(void)isc_atomic_xadd(&rwl->write_completions, 1);
write_granted = isc_atomic_xadd(&rwl->write_granted, 0);
#endif
rwl_sub(cnt_and_flag, WRITER_ACTIVE);
rwl_add(write_completions, 1);
if (write_granted >= rwl->write_quota ||
rwl->write_requests == rwl->write_completions ||
(rwl->cnt_and_flag & ~WRITER_ACTIVE) != 0) {
if ((uint32_t)rwl_load(write_granted) >= rwl->write_quota ||
rwl_load(write_requests) == rwl_load(write_completions) ||
(rwl_load(cnt_and_flag) & ~WRITER_ACTIVE) != 0) {
/*
* We have passed the write quota, no writer is
* waiting, or some readers are almost ready, pending
......@@ -701,7 +609,7 @@ isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
UNLOCK(&rwl->lock);
}
if (rwl->write_requests != rwl->write_completions &&
if (rwl_load(write_requests) != rwl_load(write_completions) &&
wakeup_writers) {
LOCK(&rwl->lock);
BROADCAST(&rwl->writeable);
......@@ -792,13 +700,8 @@ doit(isc_rwlock_t *rwl, isc_rwlocktype_t type, bool nonblock) {
isc_result_t
isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
int32_t cnt = 0;
#if defined(ISC_RWLOCK_USESTDATOMIC)
int32_t spins = atomic_load(&rwl->spins);
#else
int32_t spins = rwl->spins;
#endif
int32_t max_cnt = spins * 2 + 10;
int_fast32_t cnt = 0;
int_fast32_t max_cnt = rwl_load(spins) * 2 + 10;
isc_result_t result = ISC_R_SUCCESS;
if (max_cnt > RWLOCK_MAX_ADAPTIVE_COUNT)
......@@ -814,11 +717,7 @@ isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
#endif
} while (doit(rwl, type, true) != ISC_R_SUCCESS);
#if defined(ISC_RWLOCK_USESTDATOMIC)
atomic_fetch_add(&rwl->spins, (cnt - spins) / 8)
#else
rwl->spins += (cnt - spins) / 8;
#endif
rwl_add(spins, (cnt - rwl_load(spins)) /8);
return (result);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment