Commit 3a6202f1 authored by Bob Halley's avatar Bob Halley

checkpoint

parent 7ee52cc7
......@@ -6,18 +6,26 @@
#include "memcluster.h"
#include "task.h"
/*ARGSUSED*/
boolean_t
my_callback(generic_event_t event) {
int i;
my_callback(task_t task, void *arg, generic_event_t event) {
int i, j;
char *name = arg;
printf("my callback, event type %d\n", event->type);
for (i = 0; i < 1000000; i++);
j = 0;
for (i = 0; i < 100000000; i++)
j += 100;
printf("task %s: %d\n", name, j);
return (FALSE);
}
/*ARGSUSED*/
boolean_t
my_shutdown(generic_event_t event) {
printf("shutdown\n");
my_shutdown(task_t task, void *arg, generic_event_t event) {
char *name = arg;
printf("shutdown %s\n", name);
return (TRUE);
}
......@@ -32,6 +40,7 @@ event_allocate(mem_context_t mctx, event_type_t type, event_action_t action,
if (event == NULL)
return (NULL);
event->mctx = mctx;
event->size = size;
event->type = type;
event->action = action;
......@@ -39,33 +48,49 @@ event_allocate(mem_context_t mctx, event_type_t type, event_action_t action,
}
void
main(void) {
main(int argc, char *argv[]) {
mem_context_t mctx = NULL;
task_manager_t manager = NULL;
task_t task = NULL;
task_t t1 = NULL, t2 = NULL;
generic_event_t event;
unsigned int workers;
if (argc > 1)
workers = atoi(argv[1]);
else
workers = 2;
printf("%d workers\n", workers);
INSIST(mem_context_create(0, 0, &mctx) == 0);
INSIST(task_manager_create(mctx, 2, 0, &manager) == 2);
INSIST(task_allocate(manager, my_shutdown, 0, &task));
INSIST(task_manager_create(mctx, workers, 0, &manager) == workers);
INSIST(task_allocate(manager, "1", my_shutdown, 0, &t1));
INSIST(task_allocate(manager, "2", my_shutdown, 0, &t2));
event = event_allocate(mctx, 1, my_callback, sizeof *event);
task_send_event(task, event);
task_send_event(t1, event);
event = event_allocate(mctx, 1, my_callback, sizeof *event);
task_send_event(task, event);
task_send_event(t2, event);
event = event_allocate(mctx, 1, my_callback, sizeof *event);
task_send_event(task, event);
task_send_event(t1, event);
event = event_allocate(mctx, 1, my_callback, sizeof *event);
task_send_event(task, event);
printf("presleep\n");
sleep(4);
printf("postsleep\n");
task_send_event(t2, event);
event = event_allocate(mctx, 1, my_callback, sizeof *event);
task_send_event(t1, event);
event = event_allocate(mctx, 1, my_callback, sizeof *event);
task_send_event(t2, event);
event = event_allocate(mctx, 1, my_callback, sizeof *event);
task_send_event(t1, event);
event = event_allocate(mctx, 1, my_callback, sizeof *event);
task_send_event(t2, event);
task_shutdown(task);
task_detach(&task);
task_shutdown(t1);
task_shutdown(t2);
task_detach(&t1);
task_detach(&t2);
printf("destroy\n");
task_manager_destroy(&manager);
printf("destroyed\n");
mem_stats(mctx, stdout);
}
......@@ -18,6 +18,11 @@
#define FINISHED(m) ((m)->exiting && EMPTY((m)->tasks))
#ifdef DEBUGTRACE
#define XTRACE(m) printf("%s %p\n", (m), pthread_self())
#else
#define XTRACE(m)
#endif
/***
*** Tasks.
......@@ -27,7 +32,7 @@ static void
task_free(task_t task) {
task_manager_t manager = task->manager;
printf("free task\n");
XTRACE("free task");
REQUIRE(EMPTY(task->events));
LOCK(&manager->lock);
......@@ -48,8 +53,10 @@ task_free(task_t task) {
}
boolean_t
task_allocate(task_manager_t manager, event_action_t shutdown_action,
u_int32_t quantum, task_t *taskp) {
task_allocate(task_manager_t manager, void *arg,
event_action_t shutdown_action, unsigned int quantum,
task_t *taskp)
{
task_t task;
REQUIRE(VALID_MANAGER(manager));
......@@ -67,6 +74,7 @@ task_allocate(task_manager_t manager, event_action_t shutdown_action,
INIT_LIST(task->events);
task->quantum = quantum;
task->shutdown_pending = FALSE;
task->arg = arg;
task->shutdown_action = shutdown_action;
INIT_LINK(task, link);
INIT_LINK(task, ready_link);
......@@ -103,7 +111,7 @@ task_detach(task_t *taskp) {
task_manager_t manager;
task_t task;
printf("task_detach\n");
XTRACE("task_detach");
REQUIRE(taskp != NULL);
task = *taskp;
......@@ -112,7 +120,7 @@ task_detach(task_t *taskp) {
LOCK(&task->lock);
REQUIRE(task->references > 0);
task->references--;
if (task->state == task_state_zombie &&
if (task->state == task_state_shutdown &&
task->references == 0) {
manager = task->manager;
INSIST(VALID_MANAGER(manager));
......@@ -136,14 +144,14 @@ task_send_event(task_t task, generic_event_t event) {
REQUIRE(VALID_TASK(task));
REQUIRE(event != NULL);
printf("sending\n");
XTRACE("sending");
/*
* We're trying hard to hold locks for as short a time as possible.
* We're also trying to hold as few locks as possible. This is why
* some processing is deferred until after a lock is released.
*/
LOCK(&task->lock);
if (task->state != task_state_zombie && !task->shutdown_pending) {
if (task->state != task_state_shutdown && !task->shutdown_pending) {
if (task->state == task_state_idle) {
was_idle = TRUE;
INSIST(EMPTY(task->events));
......@@ -157,7 +165,7 @@ task_send_event(task_t task, generic_event_t event) {
UNLOCK(&task->lock);
if (discard) {
mem_put(event->mctx, event, sizeof *event);
mem_put(event->mctx, event, event->size);
return (TRUE);
}
......@@ -201,14 +209,14 @@ task_send_event(task_t task, generic_event_t event) {
BROADCAST(&manager->work_available);
}
printf("sent\n");
XTRACE("sent");
return (TRUE);
}
boolean_t
task_shutdown(task_t task) {
boolean_t was_idle = FALSE;
boolean_t zombie = FALSE;
boolean_t discard = FALSE;
REQUIRE(VALID_TASK(task));
......@@ -217,7 +225,7 @@ task_shutdown(task_t task) {
*/
LOCK(&task->lock);
if (task->state != task_state_zombie) {
if (task->state != task_state_shutdown && !task->shutdown_pending) {
if (task->state == task_state_idle) {
was_idle = TRUE;
INSIST(EMPTY(task->events));
......@@ -227,10 +235,10 @@ task_shutdown(task_t task) {
task->state == task_state_running);
task->shutdown_pending = TRUE;
} else
zombie = TRUE;
discard = TRUE;
UNLOCK(&task->lock);
if (zombie)
if (discard)
return (TRUE);
if (was_idle) {
......@@ -262,9 +270,8 @@ void *task_manager_run(void *uap) {
task_manager_t manager = uap;
task_t task;
boolean_t no_workers = FALSE;
int spin = 0;
printf("start %p\n", pthread_self());
XTRACE("start");
REQUIRE(VALID_MANAGER(manager));
......@@ -328,21 +335,22 @@ void *task_manager_run(void *uap) {
* task lock.
*/
while (EMPTY(manager->ready_tasks) && !FINISHED(manager)) {
printf("wait %p\n", pthread_self());
XTRACE("wait");
WAIT(&manager->work_available, &manager->lock);
printf("awake %p\n", pthread_self());
XTRACE("awake");
}
printf("working %p\n", pthread_self());
XTRACE("working");
task = HEAD(manager->ready_tasks);
if (task != NULL) {
u_int32_t dispatch_count = 0;
unsigned int dispatch_count = 0;
boolean_t done = FALSE;
boolean_t requeue = FALSE;
boolean_t wants_shutdown;
boolean_t free_task = FALSE;
generic_event_t event;
void *arg;
event_action_t action;
generic_event_t event;
event_list_t remaining_events;
boolean_t discard_remaining = FALSE;
......@@ -370,30 +378,50 @@ void *task_manager_run(void *uap) {
action = event->action;
DEQUEUE(task->events, event, link);
}
arg = task->arg;
UNLOCK(&task->lock);
printf("dispatch %p\n", pthread_self());
/*
* Execute the event action.
*/
XTRACE("execute action");
if (action != NULL)
wants_shutdown = (*action)(event);
wants_shutdown = (*action)(task,
arg,
event);
else
wants_shutdown = FALSE;
dispatch_count++;
/*
* If this wasn't a shutdown event, we
* need to free it.
*
* Also, if we've delivered the shutdown
* event to the task, then we are going
* to shut it down no matter what the task
* callback returned.
*/
if (event != NULL)
mem_put(event->mctx, event,
sizeof *event);
event->size);
else
wants_shutdown = TRUE;
LOCK(&task->lock);
if (wants_shutdown) {
printf("wants shutdown\n");
/*
* The task has either had the
* shutdown event sent to it, or
* an event action requested shutdown.
*
* Since no more events can be
* delivered to the task, we purge
* any remaining events (but defer
* freeing them until we've released
* the lock).
*/
XTRACE("wants shutdown");
if (!EMPTY(task->events)) {
remaining_events =
task->events;
......@@ -402,10 +430,14 @@ void *task_manager_run(void *uap) {
}
if (task->references == 0)
free_task = TRUE;
task->state = task_state_zombie;
task->state = task_state_shutdown;
done = TRUE;
} else if (EMPTY(task->events) &&
!task->shutdown_pending) {
/*
* Nothing else to do for this task.
* Put it to sleep.
*/
task->state = task_state_idle;
done = TRUE;
} else if (dispatch_count >= task->quantum) {
......@@ -434,7 +466,7 @@ void *task_manager_run(void *uap) {
event = next_event) {
next_event = NEXT(event, link);
mem_put(event->mctx, event,
sizeof *event);
event->size);
}
}
......@@ -476,7 +508,7 @@ void *task_manager_run(void *uap) {
if (no_workers)
BROADCAST(&manager->no_workers);
printf("exit %p\n", pthread_self());
XTRACE("exit");
return (NULL);
}
......@@ -490,11 +522,11 @@ manager_free(task_manager_t manager) {
mem_put(manager->mctx, manager, sizeof *manager);
}
u_int32_t
task_manager_create(mem_context_t mctx, int workers, int default_quantum,
task_manager_t *managerp) {
int i;
u_int32_t started = 0;
unsigned int
task_manager_create(mem_context_t mctx, unsigned int workers,
unsigned int default_quantum, task_manager_t *managerp)
{
unsigned int i, started = 0;
task_manager_t manager;
os_thread_t thread;
......@@ -546,7 +578,7 @@ task_manager_destroy(task_manager_t *managerp) {
manager = *managerp;
REQUIRE(VALID_MANAGER(manager));
printf("task_manager_destroy %p\n", pthread_self());
XTRACE("task_manager_destroy");
/*
* Only one non-worker thread may ever call this routine.
* If a worker thread wants to initiate shutdown of the
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment