1 /* 2 * runqueue.c - a simple task queueing/completion tracking helper 3 * 4 * Copyright (C) 2013 Felix Fietkau <nbd@openwrt.org> 5 * 6 * Permission to use, copy, modify, and/or distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <string.h> 20 #include <stdio.h> 21 #include "runqueue.h" 22 23 static void 24 __runqueue_empty_cb(struct uloop_timeout *timeout) 25 { 26 struct runqueue *q = container_of(timeout, struct runqueue, timeout); 27 28 q->empty_cb(q); 29 } 30 31 void runqueue_init(struct runqueue *q) 32 { 33 INIT_SAFE_LIST(&q->tasks_active); 34 INIT_SAFE_LIST(&q->tasks_inactive); 35 } 36 37 static void __runqueue_start_next(struct uloop_timeout *timeout) 38 { 39 struct runqueue *q = container_of(timeout, struct runqueue, timeout); 40 struct runqueue_task *t; 41 42 do { 43 if (q->stopped) 44 break; 45 46 if (list_empty(&q->tasks_inactive.list)) 47 break; 48 49 if (q->max_running_tasks && q->running_tasks >= q->max_running_tasks) 50 break; 51 52 t = list_first_entry(&q->tasks_inactive.list, struct runqueue_task, list.list); 53 safe_list_del(&t->list); 54 safe_list_add(&t->list, &q->tasks_active); 55 t->running = true; 56 q->running_tasks++; 57 if (t->run_timeout) 58 uloop_timeout_set(&t->timeout, t->run_timeout); 59 t->type->run(q, t); 60 } while (1); 61 62 if (!q->empty && 63 list_empty(&q->tasks_active.list) && 64 list_empty(&q->tasks_inactive.list)) { 65 q->empty = true; 66 if (q->empty_cb) { 67 q->timeout.cb = __runqueue_empty_cb; 68 uloop_timeout_set(&q->timeout, 1); 69 } 70 } 71 } 72 73 static void runqueue_start_next(struct runqueue *q) 74 { 75 if (q->empty) 76 return; 77 78 q->timeout.cb = __runqueue_start_next; 79 uloop_timeout_set(&q->timeout, 1); 80 } 81 82 static int __runqueue_cancel(void *ctx, struct safe_list *list) 83 { 84 struct runqueue_task *t; 85 86 t = container_of(list, struct runqueue_task, list); 87 runqueue_task_cancel(t, 0); 88 89 return 0; 90 } 91 92 void runqueue_cancel_active(struct runqueue *q) 93 { 94 safe_list_for_each(&q->tasks_active, __runqueue_cancel, NULL); 95 } 96 97 void runqueue_cancel_pending(struct runqueue *q) 98 { 99 safe_list_for_each(&q->tasks_inactive, __runqueue_cancel, NULL); 100 } 101 102 void runqueue_cancel(struct runqueue *q) 103 { 104 runqueue_cancel_pending(q); 105 runqueue_cancel_active(q); 106 } 107 108 void runqueue_kill(struct runqueue *q) 109 { 110 struct runqueue_task *t; 111 112 while (!list_empty(&q->tasks_active.list)) { 113 t = list_first_entry(&q->tasks_active.list, struct runqueue_task, list.list); 114 runqueue_task_kill(t); 115 } 116 runqueue_cancel_pending(q); 117 uloop_timeout_cancel(&q->timeout); 118 } 119 120 void runqueue_task_cancel(struct runqueue_task *t, int type) 121 { 122 if (!t->queued) 123 return; 124 125 if (!t->running) { 126 runqueue_task_complete(t); 127 return; 128 } 129 130 t->cancelled = true; 131 if (t->cancel_timeout) 132 uloop_timeout_set(&t->timeout, t->cancel_timeout); 133 if (t->type->cancel) 134 t->type->cancel(t->q, t, type); 135 } 136 137 static void 138 __runqueue_task_timeout(struct uloop_timeout *timeout) 139 { 140 struct runqueue_task *t = container_of(timeout, struct runqueue_task, timeout); 141 142 if (t->cancelled) 143 runqueue_task_kill(t); 144 else 145 runqueue_task_cancel(t, t->cancel_type); 146 } 147 148 static void _runqueue_task_add(struct runqueue *q, struct runqueue_task *t, bool running, bool first) 149 { 150 struct safe_list *head; 151 152 if (t->queued) 153 return; 154 155 if (!t->type->run && !running) { 156 fprintf(stderr, "BUG: inactive task added without run() callback\n"); 157 return; 158 } 159 160 if (running) { 161 q->running_tasks++; 162 head = &q->tasks_active; 163 } else { 164 head = &q->tasks_inactive; 165 } 166 167 t->timeout.cb = __runqueue_task_timeout; 168 t->q = q; 169 if (first) 170 safe_list_add_first(&t->list, head); 171 else 172 safe_list_add(&t->list, head); 173 t->cancelled = false; 174 t->queued = true; 175 t->running = running; 176 q->empty = false; 177 178 runqueue_start_next(q); 179 } 180 181 void runqueue_task_add(struct runqueue *q, struct runqueue_task *t, bool running) 182 { 183 _runqueue_task_add(q, t, running, 0); 184 } 185 186 void runqueue_task_add_first(struct runqueue *q, struct runqueue_task *t, bool running) 187 { 188 _runqueue_task_add(q, t, running, 1); 189 } 190 191 void runqueue_task_kill(struct runqueue_task *t) 192 { 193 struct runqueue *q = t->q; 194 bool running = t->running; 195 196 if (!t->queued) 197 return; 198 199 if (running && t->type->kill) 200 t->type->kill(q, t); 201 202 runqueue_task_complete(t); 203 } 204 205 void runqueue_stop(struct runqueue *q) 206 { 207 q->stopped = true; 208 } 209 210 void runqueue_resume(struct runqueue *q) 211 { 212 q->stopped = false; 213 runqueue_start_next(q); 214 } 215 216 void runqueue_task_complete(struct runqueue_task *t) 217 { 218 struct runqueue *q = t->q; 219 220 if (!t->queued) 221 return; 222 223 if (t->running) 224 t->q->running_tasks--; 225 226 uloop_timeout_cancel(&t->timeout); 227 228 safe_list_del(&t->list); 229 t->queued = false; 230 t->running = false; 231 t->cancelled = false; 232 if (t->complete) 233 t->complete(q, t); 234 runqueue_start_next(q); 235 } 236 237 static void 238 __runqueue_proc_cb(struct uloop_process *p, int ret) 239 { 240 struct runqueue_process *t = container_of(p, struct runqueue_process, proc); 241 242 runqueue_task_complete(&t->task); 243 } 244 245 void runqueue_process_cancel_cb(struct runqueue *q, struct runqueue_task *t, int type) 246 { 247 struct runqueue_process *p = container_of(t, struct runqueue_process, task); 248 249 if (!type) 250 type = SIGTERM; 251 252 kill(p->proc.pid, type); 253 } 254 255 void runqueue_process_kill_cb(struct runqueue *q, struct runqueue_task *t) 256 { 257 struct runqueue_process *p = container_of(t, struct runqueue_process, task); 258 259 uloop_process_delete(&p->proc); 260 kill(p->proc.pid, SIGKILL); 261 } 262 263 static const struct runqueue_task_type runqueue_proc_type = { 264 .name = "process", 265 .cancel = runqueue_process_cancel_cb, 266 .kill = runqueue_process_kill_cb, 267 }; 268 269 void runqueue_process_add(struct runqueue *q, struct runqueue_process *p, pid_t pid) 270 { 271 if (p->proc.pending) 272 return; 273 274 p->proc.pid = pid; 275 p->proc.cb = __runqueue_proc_cb; 276 if (!p->task.type) 277 p->task.type = &runqueue_proc_type; 278 uloop_process_add(&p->proc); 279 if (!p->task.running) 280 runqueue_task_add(q, &p->task, true); 281 } 282
This page was automatically generated by LXR 0.3.1. • OpenWrt