1 /* 2 * Copyright (C) 2011-2014 Felix Fietkau <nbd@openwrt.org> 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU Lesser General Public License version 2.1 6 * as published by the Free Software Foundation 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU General Public License for more details. 12 */ 13 14 #include <unistd.h> 15 #include "libubus.h" 16 #include "libubus-internal.h" 17 18 struct ubus_pending_data { 19 struct list_head list; 20 int type; 21 struct blob_attr data[]; 22 }; 23 24 static void req_data_cb(struct ubus_request *req, int type, struct blob_attr *data) 25 { 26 struct blob_attr **attr; 27 28 if (req->raw_data_cb) 29 req->raw_data_cb(req, type, data); 30 31 if (!req->data_cb) 32 return; 33 34 attr = ubus_parse_msg(data, blob_raw_len(data)); 35 if (!attr[UBUS_ATTR_DATA]) 36 return; 37 38 req->data_cb(req, type, attr[UBUS_ATTR_DATA]); 39 } 40 41 static void __ubus_process_req_data(struct ubus_request *req) 42 { 43 struct ubus_pending_data *data, *tmp; 44 45 list_for_each_entry_safe(data, tmp, &req->pending, list) { 46 list_del(&data->list); 47 if (!req->cancelled) 48 req_data_cb(req, data->type, data->data); 49 free(data); 50 } 51 } 52 53 int __hidden __ubus_start_request(struct ubus_context *ctx, struct ubus_request *req, 54 struct blob_attr *msg, int cmd, uint32_t peer) 55 { 56 57 if (msg && blob_pad_len(msg) > UBUS_MAX_MSGLEN) 58 return -1; 59 60 INIT_LIST_HEAD(&req->list); 61 INIT_LIST_HEAD(&req->pending); 62 req->ctx = ctx; 63 req->peer = peer; 64 req->seq = ++ctx->request_seq; 65 66 return ubus_send_msg(ctx, req->seq, msg, cmd, peer, req->fd); 67 } 68 69 int __hidden ubus_start_request(struct ubus_context *ctx, struct ubus_request *req, 70 struct blob_attr *msg, int cmd, uint32_t peer) 71 { 72 memset(req, 0, sizeof(*req)); 73 74 req->fd = -1; 75 76 return __ubus_start_request(ctx, req, msg, cmd, peer); 77 } 78 79 80 void ubus_abort_request(struct ubus_context *ctx, struct ubus_request *req) 81 { 82 if (list_empty(&req->list)) 83 return; 84 85 req->cancelled = true; 86 __ubus_process_req_data(req); 87 list_del_init(&req->list); 88 } 89 90 void ubus_complete_request_async(struct ubus_context *ctx, struct ubus_request *req) 91 { 92 if (!list_empty(&req->list)) 93 return; 94 95 list_add(&req->list, &ctx->requests); 96 } 97 98 static void 99 ubus_req_complete_cb(struct ubus_request *req) 100 { 101 ubus_complete_handler_t cb = req->complete_cb; 102 103 if (!cb) 104 return; 105 106 req->complete_cb = NULL; 107 cb(req, req->status_code); 108 } 109 110 static void 111 ubus_set_req_status(struct ubus_request *req, int ret) 112 { 113 if (!list_empty(&req->list)) 114 list_del_init(&req->list); 115 116 req->status_msg = true; 117 req->status_code = ret; 118 if (!req->blocked) 119 ubus_req_complete_cb(req); 120 } 121 122 static void ubus_sync_req_cb(struct ubus_request *req, int ret) 123 { 124 req->status_msg = true; 125 req->status_code = ret; 126 req->ctx->cancel_poll = true; 127 } 128 129 static int64_t get_time_msec(void) 130 { 131 struct timespec ts; 132 int64_t val; 133 134 clock_gettime(CLOCK_MONOTONIC, &ts); 135 val = (int64_t) ts.tv_sec * 1000LL; 136 val += ts.tv_nsec / 1000000LL; 137 return val; 138 } 139 140 int ubus_complete_request(struct ubus_context *ctx, struct ubus_request *req, 141 int req_timeout) 142 { 143 ubus_complete_handler_t complete_cb = req->complete_cb; 144 int status = UBUS_STATUS_NO_DATA; 145 int64_t timeout = 0, time_end = 0; 146 147 if (req_timeout) 148 time_end = get_time_msec() + req_timeout; 149 150 ubus_complete_request_async(ctx, req); 151 req->complete_cb = ubus_sync_req_cb; 152 153 ctx->stack_depth++; 154 while (!req->status_msg) { 155 if (req_timeout) { 156 timeout = time_end - get_time_msec(); 157 if (timeout <= 0) { 158 ubus_set_req_status(req, UBUS_STATUS_TIMEOUT); 159 break; 160 } 161 } 162 163 ubus_poll_data(ctx, (unsigned int) timeout); 164 165 if (ctx->sock.eof) { 166 ubus_set_req_status(req, UBUS_STATUS_CONNECTION_FAILED); 167 ctx->cancel_poll = true; 168 break; 169 } 170 } 171 172 ctx->stack_depth--; 173 if (ctx->stack_depth) 174 ctx->cancel_poll = true; 175 176 if (req->status_msg) 177 status = req->status_code; 178 179 req->complete_cb = complete_cb; 180 if (req->complete_cb) 181 req->complete_cb(req, status); 182 183 if (!ctx->stack_depth && !ctx->sock.registered) 184 ctx->pending_timer.cb(&ctx->pending_timer); 185 186 return status; 187 } 188 189 void ubus_complete_deferred_request(struct ubus_context *ctx, struct ubus_request_data *req, int ret) 190 { 191 blob_buf_init(&b, 0); 192 blob_put_int32(&b, UBUS_ATTR_STATUS, ret); 193 blob_put_int32(&b, UBUS_ATTR_OBJID, req->object); 194 ubus_send_msg(ctx, req->seq, b.head, UBUS_MSG_STATUS, req->peer, req->fd); 195 } 196 197 static void ubus_put_data(struct blob_buf *buf, struct blob_attr *msg) 198 { 199 if (msg) 200 blob_put(buf, UBUS_ATTR_DATA, blob_data(msg), blob_len(msg)); 201 else 202 blob_put(buf, UBUS_ATTR_DATA, NULL, 0); 203 } 204 205 int ubus_send_reply(struct ubus_context *ctx, struct ubus_request_data *req, 206 struct blob_attr *msg) 207 { 208 int ret; 209 210 blob_buf_init(&b, 0); 211 blob_put_int32(&b, UBUS_ATTR_OBJID, req->object); 212 ubus_put_data(&b, msg); 213 ret = ubus_send_msg(ctx, req->seq, b.head, UBUS_MSG_DATA, req->peer, -1); 214 if (ret < 0) 215 return UBUS_STATUS_NO_DATA; 216 217 return 0; 218 } 219 220 int ubus_invoke_async_fd(struct ubus_context *ctx, uint32_t obj, 221 const char *method, struct blob_attr *msg, 222 struct ubus_request *req, int fd) 223 { 224 blob_buf_init(&b, 0); 225 blob_put_int32(&b, UBUS_ATTR_OBJID, obj); 226 blob_put_string(&b, UBUS_ATTR_METHOD, method); 227 ubus_put_data(&b, msg); 228 229 memset(req, 0, sizeof(*req)); 230 req->fd = fd; 231 if (__ubus_start_request(ctx, req, b.head, UBUS_MSG_INVOKE, obj) < 0) 232 return UBUS_STATUS_INVALID_ARGUMENT; 233 return 0; 234 } 235 236 int ubus_invoke_fd(struct ubus_context *ctx, uint32_t obj, const char *method, 237 struct blob_attr *msg, ubus_data_handler_t cb, void *priv, 238 int timeout, int fd) 239 { 240 struct ubus_request req; 241 int rc; 242 243 rc = ubus_invoke_async_fd(ctx, obj, method, msg, &req, fd); 244 if (rc) 245 return rc; 246 247 req.data_cb = cb; 248 req.priv = priv; 249 return ubus_complete_request(ctx, &req, timeout); 250 } 251 252 static void 253 ubus_notify_complete_cb(struct ubus_request *req, int ret) 254 { 255 struct ubus_notify_request *nreq; 256 257 nreq = container_of(req, struct ubus_notify_request, req); 258 if (!nreq->complete_cb) 259 return; 260 261 nreq->complete_cb(nreq, 0, 0); 262 } 263 264 static void 265 ubus_notify_data_cb(struct ubus_request *req, int type, struct blob_attr *msg) 266 { 267 struct ubus_notify_request *nreq; 268 269 nreq = container_of(req, struct ubus_notify_request, req); 270 if (!nreq->data_cb) 271 return; 272 273 nreq->data_cb(nreq, type, msg); 274 } 275 276 static int 277 __ubus_notify_async(struct ubus_context *ctx, struct ubus_object *obj, 278 const char *type, struct blob_attr *msg, 279 struct ubus_notify_request *req, bool reply) 280 { 281 memset(req, 0, sizeof(*req)); 282 283 blob_buf_init(&b, 0); 284 blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id); 285 blob_put_string(&b, UBUS_ATTR_METHOD, type); 286 ubus_put_data(&b, msg); 287 288 if (!reply) 289 blob_put_int8(&b, UBUS_ATTR_NO_REPLY, true); 290 291 if (ubus_start_request(ctx, &req->req, b.head, UBUS_MSG_NOTIFY, obj->id) < 0) 292 return UBUS_STATUS_INVALID_ARGUMENT; 293 294 /* wait for status message from ubusd first */ 295 req->req.notify = true; 296 req->pending = 1; 297 req->id[0] = obj->id; 298 req->req.complete_cb = ubus_notify_complete_cb; 299 req->req.data_cb = ubus_notify_data_cb; 300 301 return 0; 302 } 303 304 int ubus_notify_async(struct ubus_context *ctx, struct ubus_object *obj, 305 const char *type, struct blob_attr *msg, 306 struct ubus_notify_request *req) 307 { 308 return __ubus_notify_async(ctx, obj, type, msg, req, true); 309 } 310 311 int ubus_notify(struct ubus_context *ctx, struct ubus_object *obj, 312 const char *type, struct blob_attr *msg, int timeout) 313 { 314 struct ubus_notify_request req; 315 int ret; 316 317 ret = __ubus_notify_async(ctx, obj, type, msg, &req, timeout >= 0); 318 if (ret < 0) 319 return ret; 320 321 if (timeout < 0) { 322 ubus_abort_request(ctx, &req.req); 323 return 0; 324 } 325 326 return ubus_complete_request(ctx, &req.req, timeout); 327 } 328 329 static bool ubus_get_status(struct ubus_msghdr_buf *buf, int *ret) 330 { 331 struct blob_attr **attrbuf = ubus_parse_msg(buf->data, blob_raw_len(buf->data)); 332 333 if (!attrbuf[UBUS_ATTR_STATUS]) 334 return false; 335 336 *ret = blob_get_u32(attrbuf[UBUS_ATTR_STATUS]); 337 return true; 338 } 339 340 static int 341 ubus_process_req_status(struct ubus_request *req, struct ubus_msghdr_buf *buf) 342 { 343 int ret = UBUS_STATUS_INVALID_ARGUMENT; 344 345 ubus_get_status(buf, &ret); 346 req->peer = buf->hdr.peer; 347 ubus_set_req_status(req, ret); 348 349 return ret; 350 } 351 352 static void 353 ubus_process_req_data(struct ubus_request *req, struct ubus_msghdr_buf *buf) 354 { 355 struct ubus_pending_data *data; 356 int len; 357 358 if (!req->blocked) { 359 req->blocked = true; 360 req_data_cb(req, buf->hdr.type, buf->data); 361 __ubus_process_req_data(req); 362 req->blocked = false; 363 364 if (req->status_msg) 365 ubus_req_complete_cb(req); 366 367 return; 368 } 369 370 len = blob_raw_len(buf->data); 371 data = calloc(1, sizeof(*data) + len); 372 if (!data) 373 return; 374 375 data->type = buf->hdr.type; 376 memcpy(data->data, buf->data, len); 377 list_add(&data->list, &req->pending); 378 } 379 380 static int 381 ubus_find_notify_id(struct ubus_notify_request *n, uint32_t objid) 382 { 383 uint32_t pending = n->pending; 384 int i; 385 386 for (i = 0; pending; i++, pending >>= 1) { 387 if (!(pending & 1)) 388 continue; 389 390 if (n->id[i] == objid) 391 return i; 392 } 393 394 return -1; 395 } 396 397 static struct ubus_request * 398 ubus_find_request(struct ubus_context *ctx, uint32_t seq, uint32_t peer, int *id) 399 { 400 struct ubus_request *req; 401 402 list_for_each_entry(req, &ctx->requests, list) { 403 struct ubus_notify_request *nreq; 404 nreq = container_of(req, struct ubus_notify_request, req); 405 406 if (seq != req->seq) 407 continue; 408 409 if (req->notify) { 410 if (!nreq->pending) 411 continue; 412 413 *id = ubus_find_notify_id(nreq, peer); 414 if (*id < 0) 415 continue; 416 } else if (peer != req->peer) 417 continue; 418 419 return req; 420 } 421 return NULL; 422 } 423 424 static void ubus_process_notify_status(struct ubus_request *req, int id, struct ubus_msghdr_buf *buf) 425 { 426 struct ubus_notify_request *nreq; 427 struct blob_attr **tb; 428 struct blob_attr *cur; 429 size_t rem; 430 int idx = 1; 431 int ret = 0; 432 433 nreq = container_of(req, struct ubus_notify_request, req); 434 nreq->pending &= ~(1 << id); 435 436 if (!id) { 437 /* first id: ubusd's status message with a list of ids */ 438 tb = ubus_parse_msg(buf->data, blob_raw_len(buf->data)); 439 if (tb[UBUS_ATTR_SUBSCRIBERS]) { 440 blob_for_each_attr(cur, tb[UBUS_ATTR_SUBSCRIBERS], rem) { 441 if (!blob_check_type(blob_data(cur), blob_len(cur), BLOB_ATTR_INT32)) 442 continue; 443 444 nreq->pending |= (1 << idx); 445 nreq->id[idx] = blob_get_int32(cur); 446 idx++; 447 448 if (idx == UBUS_MAX_NOTIFY_PEERS + 1) 449 break; 450 } 451 } 452 } else { 453 ubus_get_status(buf, &ret); 454 if (nreq->status_cb) 455 nreq->status_cb(nreq, id, ret); 456 } 457 458 if (!nreq->pending) 459 ubus_set_req_status(req, 0); 460 } 461 462 void __hidden ubus_process_req_msg(struct ubus_context *ctx, struct ubus_msghdr_buf *buf, int fd) 463 { 464 struct ubus_msghdr *hdr = &buf->hdr; 465 struct ubus_request *req; 466 int id = -1; 467 468 switch(hdr->type) { 469 case UBUS_MSG_STATUS: 470 req = ubus_find_request(ctx, hdr->seq, hdr->peer, &id); 471 if (!req) 472 break; 473 474 if (fd >= 0) { 475 if (req->fd_cb) 476 req->fd_cb(req, fd); 477 else 478 close(fd); 479 } 480 481 if (id >= 0) 482 ubus_process_notify_status(req, id, buf); 483 else 484 ubus_process_req_status(req, buf); 485 break; 486 487 case UBUS_MSG_DATA: 488 req = ubus_find_request(ctx, hdr->seq, hdr->peer, &id); 489 if (req && (req->data_cb || req->raw_data_cb)) 490 ubus_process_req_data(req, buf); 491 break; 492 } 493 } 494 495 int __ubus_monitor(struct ubus_context *ctx, const char *type) 496 { 497 blob_buf_init(&b, 0); 498 return ubus_invoke(ctx, UBUS_SYSTEM_OBJECT_MONITOR, type, b.head, NULL, NULL, 1000); 499 } 500
This page was automatically generated by LXR 0.3.1. • OpenWrt