Commit 232548d5 authored by Bob Halley's avatar Bob Halley

add event purging

parent 8f9001ec
......@@ -45,7 +45,7 @@ my_tick(task_t __attribute__((unused)) task,
{
char *name = event->arg;
printf("tick %s\n", name);
printf("task %p tick %s\n", task, name);
return (FALSE);
}
......@@ -58,7 +58,8 @@ simple_timer_run(void *arg) {
for (i = 0; i < 10; i++) {
sleep(1);
printf("sending timer to %p\n", task);
event = task_event_allocate(mctx, 2, my_tick, "foo",
event = task_event_allocate(mctx, simple_timer_run,
2, my_tick, "foo",
sizeof *event);
INSIST(event != NULL);
(void)task_send_event(task, &event);
......@@ -108,34 +109,52 @@ main(int argc, char *argv[]) {
printf("task 2 = %p\n", t2);
sleep(2);
event = task_event_allocate(mctx, 1, my_callback, "1", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "1",
sizeof *event);
task_send_event(t1, &event);
event = task_event_allocate(mctx, 1, my_callback, "1", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "1",
sizeof *event);
task_send_event(t1, &event);
event = task_event_allocate(mctx, 1, my_callback, "1", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "1",
sizeof *event);
task_send_event(t1, &event);
event = task_event_allocate(mctx, 1, my_callback, "1", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "1",
sizeof *event);
task_send_event(t1, &event);
event = task_event_allocate(mctx, 1, my_callback, "1", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "1",
sizeof *event);
task_send_event(t1, &event);
event = task_event_allocate(mctx, 1, my_callback, "1", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "1",
sizeof *event);
task_send_event(t1, &event);
event = task_event_allocate(mctx, 1, my_callback, "1", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "1",
sizeof *event);
task_send_event(t1, &event);
event = task_event_allocate(mctx, 1, my_callback, "1", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "1",
sizeof *event);
task_send_event(t1, &event);
event = task_event_allocate(mctx, 1, my_callback, "2", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "1",
sizeof *event);
task_send_event(t1, &event);
event = task_event_allocate(mctx, main, 1, my_callback, "2",
sizeof *event);
task_send_event(t2, &event);
event = task_event_allocate(mctx, 1, my_callback, "3", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "3",
sizeof *event);
task_send_event(t3, &event);
event = task_event_allocate(mctx, 1, my_callback, "4", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "4",
sizeof *event);
task_send_event(t4, &event);
event = task_event_allocate(mctx, 1, my_callback, "2", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "2",
sizeof *event);
task_send_event(t2, &event);
event = task_event_allocate(mctx, 1, my_callback, "3", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "3",
sizeof *event);
task_send_event(t3, &event);
event = task_event_allocate(mctx, 1, my_callback, "4", sizeof *event);
event = task_event_allocate(mctx, main, 1, my_callback, "4",
sizeof *event);
task_send_event(t4, &event);
task_purge_events(t3, NULL, 0);
task_detach(&t1);
task_detach(&t2);
......
......@@ -27,6 +27,8 @@ typedef struct task_manager * task_manager_t;
/*
* Negative event types are reserved for use by the task manager.
*
* Type 0 means "any type".
*/
typedef int task_eventtype_t;
......@@ -39,6 +41,7 @@ typedef boolean_t (*task_action_t)(task_t, task_event_t);
struct task_event {
mem_context_t mctx;
size_t size;
void * sender;
task_eventtype_t type;
task_action_t action;
void * arg;
......@@ -51,6 +54,7 @@ struct task_event {
typedef LIST(struct task_event) task_eventlist_t;
task_event_t task_event_allocate(mem_context_t,
void *,
task_eventtype_t,
task_action_t,
void *arg,
......@@ -71,6 +75,8 @@ void task_attach(task_t, task_t *);
void task_detach(task_t *);
boolean_t task_send_event(task_t,
task_event_t *);
void task_purge_events(task_t, void *,
task_eventtype_t);
void task_shutdown(task_t);
void task_destroy(task_t *);
......
......@@ -24,7 +24,8 @@
#define BROADCAST(cvp) INSIST(os_condition_broadcast((cvp)))
#ifdef DEBUGTRACE
#define XTRACE(m) printf("%s %p\n", (m), os_thread_self())
#define XTRACE(m) printf("%s task %p thread %p\n", (m), \
task, os_thread_self())
#else
#define XTRACE(m)
#endif
......@@ -88,7 +89,7 @@ struct task_manager {
***/
static inline task_event_t
event_allocate(mem_context_t mctx, task_eventtype_t type,
event_allocate(mem_context_t mctx, void *sender, task_eventtype_t type,
task_action_t action, void *arg, size_t size)
{
task_event_t event;
......@@ -98,6 +99,7 @@ event_allocate(mem_context_t mctx, task_eventtype_t type,
return (NULL);
event->mctx = mctx;
event->size = size;
event->sender = sender;
event->type = type;
event->action = action;
event->arg = arg;
......@@ -106,7 +108,7 @@ event_allocate(mem_context_t mctx, task_eventtype_t type,
}
task_event_t
task_event_allocate(mem_context_t mctx, task_eventtype_t type,
task_event_allocate(mem_context_t mctx, void *sender, task_eventtype_t type,
task_action_t action, void *arg, size_t size)
{
if (size < sizeof (struct task_event))
......@@ -116,7 +118,7 @@ task_event_allocate(mem_context_t mctx, task_eventtype_t type,
if (action == NULL)
return (NULL);
return (event_allocate(mctx, type, action, arg, size));
return (event_allocate(mctx, sender, type, action, arg, size));
}
void
......@@ -187,6 +189,7 @@ task_create(task_manager_t manager, task_action_t shutdown_action,
task->quantum = quantum;
task->enqueue_allowed = TRUE;
task->shutdown_event = event_allocate(manager->mctx,
NULL,
TASK_EVENT_SHUTDOWN,
shutdown_action,
shutdown_arg,
......@@ -257,7 +260,8 @@ task_send_event(task_t task, task_event_t *eventp) {
REQUIRE(eventp != NULL);
event = *eventp;
REQUIRE(event != NULL);
REQUIRE(event->type >= 0);
REQUIRE(event->sender != NULL);
REQUIRE(event->type > 0);
XTRACE("sending");
/*
......@@ -331,6 +335,43 @@ task_send_event(task_t task, task_event_t *eventp) {
return (TRUE);
}
void
task_purge_events(task_t task, void *sender, task_eventtype_t type) {
task_event_t event, next_event;
task_eventlist_t purgeable;
REQUIRE(VALID_TASK(task));
REQUIRE(type >= 0);
/*
* Purge events matching 'sender' and 'type'. sender == NULL means
* "any sender". type == NULL means any type. Task manager events
* cannot be purged.
*/
INIT_LIST(purgeable);
LOCK(&task->lock);
for (event = HEAD(task->events);
event != NULL;
event = next_event) {
next_event = NEXT(event, link);
if ((sender == NULL || event->sender == sender) &&
((type == 0 && event->type > 0) || event->type == type)) {
DEQUEUE(task->events, event, link);
ENQUEUE(purgeable, event, link);
}
}
UNLOCK(&task->lock);
for (event = HEAD(purgeable);
event != NULL;
event = next_event) {
next_event = NEXT(event, link);
task_event_free(&event);
}
}
void
task_shutdown(task_t task) {
boolean_t was_idle = FALSE;
......@@ -493,7 +534,18 @@ void *task_manager_run(void *uap) {
UNLOCK(&manager->lock);
LOCK(&task->lock);
task->state = task_state_running;
INSIST(task->state == task_state_ready);
if (EMPTY(task->events)) {
/*
* The task became runnable, but all events
* in the run queue were subsequently purged.
* Put the task to sleep.
*/
task->state = task_state_idle;
done = TRUE;
XTRACE("ready but empty");
} else
task->state = task_state_running;
while (!done) {
INSIST(!EMPTY(task->events));
event = HEAD(task->events);
......@@ -549,6 +601,7 @@ void *task_manager_run(void *uap) {
* Nothing else to do for this task.
* Put it to sleep.
*/
XTRACE("empty");
task->state = task_state_idle;
done = TRUE;
} else if (dispatch_count >= task->quantum) {
......@@ -562,6 +615,7 @@ void *task_manager_run(void *uap) {
* dispatching at least one event,
* so the minimum quantum is one.
*/
XTRACE("quantum");
task->state = task_state_ready;
requeue = TRUE;
done = TRUE;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment