Commit a003908a authored by Witold Krecicki's avatar Witold Krecicki Committed by Witold Krecicki

Fix a race in access to manager->tasks in taskmgr

Make taskmgr->mode and boolean state flags (exclusive, paused, exiting) atomic.
parent d0222edb
Pipeline #9145 passed with stages
in 23 minutes and 44 seconds
......@@ -12,6 +12,7 @@
#pragma once
#include <inttypes.h>
#include <stdbool.h>
#if !defined(__has_feature)
#define __has_feature(x) 0
......@@ -75,6 +76,7 @@ typedef int_fast32_t atomic_int_fast32_t;
typedef uint_fast32_t atomic_uint_fast32_t;
typedef int_fast64_t atomic_int_fast64_t;
typedef uint_fast64_t atomic_uint_fast64_t;
typedef bool atomic_bool;
#if defined(__CLANG_ATOMICS) /* __c11_atomic builtins */
#define atomic_init(obj, desired) \
......
......@@ -147,15 +147,16 @@ struct isc__taskmgr {
atomic_uint_fast32_t tasks_running;
atomic_uint_fast32_t tasks_ready;
atomic_uint_fast32_t curq;
atomic_uint_fast32_t tasks_count;
isc__taskqueue_t *queues;
/* Locked by task manager lock. */
unsigned int default_quantum;
LIST(isc__task_t) tasks;
isc_taskmgrmode_t mode;
bool pause_requested;
bool exclusive_requested;
bool exiting;
atomic_uint_fast32_t mode;
atomic_bool pause_req;
atomic_bool exclusive_req;
atomic_bool exiting;
/* Locked by halt_lock */
unsigned int halted;
......@@ -176,7 +177,8 @@ isc__taskmgr_resume(isc_taskmgr_t *manager0);
#define DEFAULT_DEFAULT_QUANTUM 25
#define FINISHED(m) ((m)->exiting && EMPTY((m)->tasks))
#define FINISHED(m) (atomic_load_relaxed(&((m)->exiting)) == true && \
atomic_load(&(m)->tasks_count) == 0)
/*%
* The following are intended for internal use (indicated by "isc__"
......@@ -228,6 +230,7 @@ task_finished(isc__task_t *task) {
LOCK(&manager->lock);
UNLINK(manager->tasks, task, link);
atomic_fetch_sub(&manager->tasks_count, 1);
UNLOCK(&manager->lock);
if (FINISHED(manager)) {
/*
......@@ -303,8 +306,9 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
exiting = false;
LOCK(&manager->lock);
if (!manager->exiting) {
if (!atomic_load_relaxed(&manager->exiting)) {
APPEND(manager->tasks, task, link);
atomic_fetch_add(&manager->tasks_count, 1);
} else {
exiting = true;
}
......@@ -397,7 +401,8 @@ task_ready(isc__task_t *task) {
XTRACE("task_ready");
LOCK(&manager->queues[task->threadid].lock);
push_readyq(manager, task, task->threadid);
if (manager->mode == isc_taskmgrmode_normal || has_privilege) {
if (atomic_load(&manager->mode) == isc_taskmgrmode_normal ||
has_privilege) {
SIGNAL(&manager->queues[task->threadid].work_available);
}
UNLOCK(&manager->queues[task->threadid].lock);
......@@ -512,6 +517,13 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
REQUIRE(VALID_TASK(task));
XTRACE("isc_task_send");
/*
* We're trying hard to hold locks for as short a time as possible.
* We're also trying to hold as few locks as possible. This is why
* some processing is deferred until after the lock is released.
*/
LOCK(&task->lock);
/* If task is bound ignore provided cpu. */
if (task->bound) {
c = task->threadid;
......@@ -520,13 +532,6 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
memory_order_relaxed);
}
c %= task->manager->workers;
/*
* We're trying hard to hold locks for as short a time as possible.
* We're also trying to hold as few locks as possible. This is why
* some processing is deferred until after the lock is released.
*/
LOCK(&task->lock);
was_idle = task_send(task, eventp, c);
UNLOCK(&task->lock);
......@@ -565,6 +570,7 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
REQUIRE(VALID_TASK(task));
XTRACE("isc_task_sendanddetach");
LOCK(&task->lock);
if (task->bound) {
c = task->threadid;
} else if (c < 0) {
......@@ -572,8 +578,6 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
memory_order_relaxed);
}
c %= task->manager->workers;
LOCK(&task->lock);
idle1 = task_send(task, eventp, c);
idle2 = task_detach(task);
UNLOCK(&task->lock);
......@@ -902,11 +906,11 @@ static inline bool
empty_readyq(isc__taskmgr_t *manager, int c) {
isc__tasklist_t queue;
if (manager->mode == isc_taskmgrmode_normal)
if (atomic_load_relaxed(&manager->mode) == isc_taskmgrmode_normal) {
queue = manager->queues[c].ready_tasks;
else
} else {
queue = manager->queues[c].ready_priority_tasks;
}
return (EMPTY(queue));
}
......@@ -922,7 +926,7 @@ static inline isc__task_t *
pop_readyq(isc__taskmgr_t *manager, int c) {
isc__task_t *task;
if (manager->mode == isc_taskmgrmode_normal) {
if (atomic_load_relaxed(&manager->mode) == isc_taskmgrmode_normal) {
task = HEAD(manager->queues[c].ready_tasks);
} else {
task = HEAD(manager->queues[c].ready_priority_tasks);
......@@ -1029,15 +1033,15 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
* until it's been released.
*/
while ((empty_readyq(manager, threadid) &&
!manager->pause_requested &&
!manager->exclusive_requested) &&
!atomic_load_relaxed(&manager->pause_req) &&
!atomic_load_relaxed(&manager->exclusive_req)) &&
!FINISHED(manager))
{
XTHREADTRACE("wait");
XTHREADTRACE(manager->pause_requested
XTHREADTRACE(atomic_load_relaxed(&manager->pause_req)
? "paused"
: "notpaused");
XTHREADTRACE(manager->exclusive_requested
XTHREADTRACE(atomic_load_relaxed(&manager->exclusive_req)
? "excreq"
: "notexcreq");
WAIT(&manager->queues[threadid].work_available,
......@@ -1046,15 +1050,16 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
}
XTHREADTRACE("working");
if (manager->pause_requested || manager->exclusive_requested) {
if (atomic_load_relaxed(&manager->pause_req) ||
atomic_load_relaxed(&manager->exclusive_req)) {
UNLOCK(&manager->queues[threadid].lock);
XTHREADTRACE("halting");
/*
* Switching to exclusive mode is done as a
* 2-phase-lock, checking if we have to switch is
* done without any locks on pause_requested and
* exclusive_requested to save time - the worst
* done without any locks on pause_req and
* exclusive_req to save time - the worst
* thing that can happen is that we'll launch one
* task more and exclusive task will be postponed a
* bit.
......@@ -1066,8 +1071,8 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
LOCK(&manager->halt_lock);
manager->halted++;
BROADCAST(&manager->halt_cond);
while (manager->pause_requested ||
manager->exclusive_requested)
while (atomic_load_relaxed(&manager->pause_req) ||
atomic_load_relaxed(&manager->exclusive_req))
{
WAIT(&manager->halt_cond, &manager->halt_lock);
}
......@@ -1260,7 +1265,8 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
UNLOCK(&manager->queues[i].lock);
}
if (empty) {
manager->mode = isc_taskmgrmode_normal;
atomic_store(&manager->mode,
isc_taskmgrmode_normal);
wake_all_queues(manager);
}
}
......@@ -1331,7 +1337,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
RUNTIME_CHECK(manager != NULL);
manager->common.impmagic = TASK_MANAGER_MAGIC;
manager->common.magic = ISCAPI_TASKMGR_MAGIC;
manager->mode = isc_taskmgrmode_normal;
atomic_store(&manager->mode, isc_taskmgrmode_normal);
manager->mctx = NULL;
isc_mutex_init(&manager->lock);
isc_mutex_init(&manager->excl_lock);
......@@ -1346,6 +1352,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
}
manager->default_quantum = default_quantum;
INIT_LIST(manager->tasks);
atomic_store(&manager->tasks_count, 0);
manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
RUNTIME_CHECK(manager->queues != NULL);
......@@ -1355,8 +1362,8 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
manager->exiting = false;
manager->excl = NULL;
manager->halted = 0;
manager->exclusive_requested = false;
manager->pause_requested = false;
atomic_store_relaxed(&manager->exclusive_req, false);
atomic_store_relaxed(&manager->pause_req, false);
isc_mem_attach(mctx, &manager->mctx);
......@@ -1433,13 +1440,13 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
/*
* Make sure we only get called once.
*/
INSIST(!manager->exiting);
manager->exiting = true;
INSIST(!atomic_load(&manager->exiting));
atomic_store(&manager->exiting, true);
/*
* If privileged mode was on, turn it off.
*/
manager->mode = isc_taskmgrmode_normal;
atomic_store(&manager->mode, isc_taskmgrmode_normal);
/*
* Post shutdown event(s) to every task (if they haven't already been
......@@ -1481,19 +1488,13 @@ void
isc_taskmgr_setprivilegedmode(isc_taskmgr_t *manager0) {
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
LOCK(&manager->lock);
manager->mode = isc_taskmgrmode_privileged;
UNLOCK(&manager->lock);
atomic_store(&manager->mode, isc_taskmgrmode_privileged);
}
isc_taskmgrmode_t
isc_taskmgr_mode(isc_taskmgr_t *manager0) {
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
isc_taskmgrmode_t mode;
LOCK(&manager->lock);
mode = manager->mode;
UNLOCK(&manager->lock);
return (mode);
return (atomic_load(&manager->mode));
}
void
......@@ -1501,14 +1502,15 @@ isc__taskmgr_pause(isc_taskmgr_t *manager0) {
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
LOCK(&manager->halt_lock);
while (manager->exclusive_requested || manager->pause_requested) {
while (atomic_load_relaxed(&manager->exclusive_req) ||
atomic_load_relaxed(&manager->pause_req)) {
UNLOCK(&manager->halt_lock);
/* This is ugly but pause is used EXCLUSIVELY in tests */
isc_thread_yield();
LOCK(&manager->halt_lock);
}
manager->pause_requested = true;
atomic_store_relaxed(&manager->pause_req, true);
while (manager->halted < manager->workers) {
wake_all_queues(manager);
WAIT(&manager->halt_cond, &manager->halt_lock);
......@@ -1520,8 +1522,8 @@ void
isc__taskmgr_resume(isc_taskmgr_t *manager0) {
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
LOCK(&manager->halt_lock);
if (manager->pause_requested) {
manager->pause_requested = false;
if (manager->pause_req) {
manager->pause_req = false;
while (manager->halted > 0) {
BROADCAST(&manager->halt_cond);
WAIT(&manager->halt_cond, &manager->halt_lock);
......@@ -1573,16 +1575,19 @@ isc_task_beginexclusive(isc_task_t *task0) {
LOCK(&manager->excl_lock);
REQUIRE(task == task->manager->excl ||
(task->manager->exiting && task->manager->excl == NULL));
(atomic_load_relaxed(&task->manager->exiting) &&
task->manager->excl == NULL));
UNLOCK(&manager->excl_lock);
if (manager->exclusive_requested || manager->pause_requested) {
if (atomic_load_relaxed(&manager->exclusive_req) ||
atomic_load_relaxed(&manager->pause_req)) {
return (ISC_R_LOCKBUSY);
}
LOCK(&manager->halt_lock);
INSIST(!manager->exclusive_requested && !manager->pause_requested);
manager->exclusive_requested = true;
INSIST(!atomic_load_relaxed(&manager->exclusive_req) &&
!atomic_load_relaxed(&manager->pause_req));
atomic_store_relaxed(&manager->exclusive_req, true);
while (manager->halted + 1 < manager->workers) {
wake_all_queues(manager);
WAIT(&manager->halt_cond, &manager->halt_lock);
......@@ -1599,8 +1604,8 @@ isc_task_endexclusive(isc_task_t *task0) {
REQUIRE(VALID_TASK(task));
REQUIRE(task->state == task_state_running);
LOCK(&manager->halt_lock);
REQUIRE(manager->exclusive_requested);
manager->exclusive_requested = false;
REQUIRE(atomic_load_relaxed(&manager->exclusive_req) == true);
atomic_store_relaxed(&manager->exclusive_req, false);
while (manager->halted > 0) {
BROADCAST(&manager->halt_cond);
WAIT(&manager->halt_cond, &manager->halt_lock);
......@@ -1685,14 +1690,19 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr0, xmlTextWriterPtr writer) {
mgr->default_quantum));
TRY0(xmlTextWriterEndElement(writer)); /* default-quantum */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-count"));
TRY0(xmlTextWriterWriteFormatString(writer, "%d",
(int) atomic_load_relaxed(&mgr->tasks_count)));
TRY0(xmlTextWriterEndElement(writer)); /* tasks-count */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-running"));
TRY0(xmlTextWriterWriteFormatString(writer, "%d",
(int) mgr->tasks_running));
(int) atomic_load_relaxed(&mgr->tasks_running)));
TRY0(xmlTextWriterEndElement(writer)); /* tasks-running */
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-ready"));
TRY0(xmlTextWriterWriteFormatString(writer, "%d",
(int) mgr->tasks_ready));
(int) atomic_load_relaxed(&mgr->tasks_ready)));
TRY0(xmlTextWriterEndElement(writer)); /* tasks-ready */
TRY0(xmlTextWriterEndElement(writer)); /* thread-model */
......@@ -1785,11 +1795,15 @@ isc_taskmgr_renderjson(isc_taskmgr_t *mgr0, json_object *tasks) {
CHECKMEM(obj);
json_object_object_add(tasks, "default-quantum", obj);
obj = json_object_new_int(mgr->tasks_running);
obj = json_object_new_int(atomic_load_relaxed(&mgr->tasks_count));
CHECKMEM(obj);
json_object_object_add(tasks, "tasks-count", obj);
obj = json_object_new_int(atomic_load_relaxed(&mgr->tasks_running));
CHECKMEM(obj);
json_object_object_add(tasks, "tasks-running", obj);
obj = json_object_new_int(mgr->tasks_ready);
obj = json_object_new_int(atomic_load_relaxed(&mgr->tasks_ready));
CHECKMEM(obj);
json_object_object_add(tasks, "tasks-ready", obj);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment