Commit 70c463a6 authored by Witold Krecicki's avatar Witold Krecicki

Make isc_task_pause/isc_task_unpause thread safe.

isc_task_pause/unpause were inherently thread-unsafe - a task
could be paused only once by one thread, if the task was running
while we paused it it led to races. Fix it by making sure that
the task will pause if requested to, and by using a 'pause reference
counter' to count task pause requests - a task will be unpaused
iff all threads unpause it.

Don't remove from queue when pausing task - we lock the queue lock
(expensive), while it's unlikely that the task will be running -
and we'll remove it anyway in dispatcher
parent c823ed4f
Pipeline #33940 passed with stages
in 19 minutes and 55 seconds
......@@ -107,6 +107,7 @@ struct isc__task {
isc_mutex_t lock;
/* Locked by task lock. */
task_state_t state;
int pause_cnt;
isc_refcount_t references;
isc_eventlist_t events;
isc_eventlist_t on_shutdown;
......@@ -309,6 +310,7 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
isc_mutex_init(&task->lock);
task->state = task_state_idle;
task->pause_cnt = 0;
isc_refcount_init(&task->references, 1);
INIT_LIST(task->events);
......@@ -409,7 +411,7 @@ task_shutdown(isc__task_t *task)
/*
* Moves a task onto the appropriate run queue.
*
* Caller must NOT hold manager lock.
* Caller must NOT hold queue lock.
*/
static inline void
task_ready(isc__task_t *task)
......@@ -418,7 +420,6 @@ task_ready(isc__task_t *task)
bool has_privilege = isc_task_privilege((isc_task_t *)task);
REQUIRE(VALID_MANAGER(manager));
REQUIRE(task->state == task_state_ready);
XTRACE("task_ready");
LOCK(&manager->queues[task->threadid].lock);
......@@ -973,11 +974,14 @@ pop_readyq(isc__taskmgr_t *manager, int c)
* Push 'task' onto the ready_tasks queue. If 'task' has the privilege
* flag set, then also push it onto the ready_priority_tasks queue.
*
* Caller must hold the task manager lock.
* Caller must hold the task queue lock.
*/
static inline void
push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c)
{
if (ISC_LINK_LINKED(task, ready_link)) {
return;
}
ENQUEUE(manager->queues[c].ready_tasks, task, ready_link);
if (TASK_PRIVILEGED(task)) {
ENQUEUE(manager->queues[c].ready_priority_tasks, task,
......@@ -1135,6 +1139,17 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid)
memory_order_acquire);
LOCK(&task->lock);
/*
* It is possible because that we have a paused task
* in the queue - it might have been paused in the
* meantime and we never hold both queue and task lock
* to avoid deadlocks, just bail then.
*/
if (task->state != task_state_ready) {
UNLOCK(&task->lock);
LOCK(&manager->queues[threadid].lock);
continue;
}
INSIST(task->state == task_state_ready);
task->state = task_state_running;
XTRACE("running");
......@@ -1222,6 +1237,15 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid)
}
}
done = true;
} else if (task->state == task_state_pausing) {
/*
* We got a pause request on this task,
* stop working on it and switch the
* state to paused.
*/
XTRACE("pausing");
task->state = task_state_paused;
done = true;
} else if (dispatch_count >= task->quantum) {
/*
* Our quantum has expired, but
......@@ -1234,17 +1258,8 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid)
* so the minimum quantum is one.
*/
XTRACE("quantum");
if (task->state == task_state_running) {
/*
* We requeue only if it's
* not paused.
*/
task->state = task_state_ready;
requeue = true;
} else if (task->state ==
task_state_pausing) {
task->state = task_state_paused;
}
task->state = task_state_ready;
requeue = true;
done = true;
}
} while (!done);
......@@ -1698,32 +1713,28 @@ void
isc_task_pause(isc_task_t *task0)
{
REQUIRE(ISCAPI_TASK_VALID(task0));
isc__task_t * task = (isc__task_t *)task0;
isc__taskmgr_t *manager = task->manager;
bool running = false;
isc__task_t *task = (isc__task_t *)task0;
LOCK(&task->lock);
task->pause_cnt++;
if (task->pause_cnt > 1) {
/*
* Someone already paused this thread, just increase
* the number of pausing clients.
*/
UNLOCK(&task->lock);
return;
}
INSIST(task->state == task_state_idle ||
task->state == task_state_ready ||
task->state == task_state_running);
if (task->state == task_state_running) {
running = true;
task->state = task_state_pausing;
} else {
task->state = task_state_paused;
}
UNLOCK(&task->lock);
if (running) {
return;
}
LOCK(&manager->queues[task->threadid].lock);
if (ISC_LINK_LINKED(task, ready_link)) {
DEQUEUE(manager->queues[task->threadid].ready_tasks, task,
ready_link);
}
UNLOCK(&manager->queues[task->threadid].lock);
}
void
......@@ -1735,6 +1746,13 @@ isc_task_unpause(isc_task_t *task0)
REQUIRE(ISCAPI_TASK_VALID(task0));
LOCK(&task->lock);
task->pause_cnt--;
INSIST(task->pause_cnt >= 0);
if (task->pause_cnt > 0) {
UNLOCK(&task->lock);
return;
}
INSIST(task->state == task_state_paused ||
task->state == task_state_pausing);
/* If the task was pausing we can't reschedule it */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment