1 /* 2 * Copyright (C) 2011-2014 Felix Fietkau <nbd@openwrt.org> 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU Lesser General Public License version 2.1 6 * as published by the Free Software Foundation 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU General Public License for more details. 12 */ 13 14 #define _GNU_SOURCE 15 #include <sys/types.h> 16 #include <sys/uio.h> 17 #include <sys/socket.h> 18 19 #include <unistd.h> 20 #include <fcntl.h> 21 #include <poll.h> 22 23 #include <libubox/usock.h> 24 #include <libubox/blob.h> 25 #include <libubox/blobmsg.h> 26 27 #include "libubus.h" 28 #include "libubus-internal.h" 29 30 #define STATIC_IOV(_var) { .iov_base = (char *) &(_var), .iov_len = sizeof(_var) } 31 32 #define UBUS_MSGBUF_REDUCTION_INTERVAL 16 33 34 static const struct blob_attr_info ubus_policy[UBUS_ATTR_MAX] = { 35 [UBUS_ATTR_STATUS] = { .type = BLOB_ATTR_INT32 }, 36 [UBUS_ATTR_OBJID] = { .type = BLOB_ATTR_INT32 }, 37 [UBUS_ATTR_OBJPATH] = { .type = BLOB_ATTR_STRING }, 38 [UBUS_ATTR_METHOD] = { .type = BLOB_ATTR_STRING }, 39 [UBUS_ATTR_ACTIVE] = { .type = BLOB_ATTR_INT8 }, 40 [UBUS_ATTR_NO_REPLY] = { .type = BLOB_ATTR_INT8 }, 41 [UBUS_ATTR_SUBSCRIBERS] = { .type = BLOB_ATTR_NESTED }, 42 }; 43 44 static struct blob_attr *attrbuf[UBUS_ATTR_MAX]; 45 46 __hidden struct blob_attr **ubus_parse_msg(struct blob_attr *msg, size_t len) 47 { 48 blob_parse_untrusted(msg, len, attrbuf, ubus_policy, UBUS_ATTR_MAX); 49 return attrbuf; 50 } 51 52 static void wait_data(int fd, bool write) 53 { 54 struct pollfd pfd = { .fd = fd }; 55 56 pfd.events = write ? POLLOUT : POLLIN; 57 poll(&pfd, 1, -1); 58 } 59 60 static int writev_retry(int fd, struct iovec *iov, int iov_len, int sock_fd) 61 { 62 uint8_t fd_buf[CMSG_SPACE(sizeof(int))] = { 0 }; 63 struct msghdr msghdr = { 0 }; 64 struct cmsghdr *cmsg; 65 int len = 0; 66 int *pfd; 67 68 msghdr.msg_iov = iov, 69 msghdr.msg_iovlen = iov_len, 70 msghdr.msg_control = fd_buf; 71 msghdr.msg_controllen = sizeof(fd_buf); 72 73 cmsg = CMSG_FIRSTHDR(&msghdr); 74 cmsg->cmsg_type = SCM_RIGHTS; 75 cmsg->cmsg_level = SOL_SOCKET; 76 cmsg->cmsg_len = CMSG_LEN(sizeof(int)); 77 78 pfd = (int *) CMSG_DATA(cmsg); 79 msghdr.msg_controllen = cmsg->cmsg_len; 80 81 do { 82 ssize_t cur_len; 83 84 if (sock_fd < 0) { 85 msghdr.msg_control = NULL; 86 msghdr.msg_controllen = 0; 87 } else { 88 *pfd = sock_fd; 89 } 90 91 cur_len = sendmsg(fd, &msghdr, 0); 92 if (cur_len < 0) { 93 switch(errno) { 94 case EAGAIN: 95 wait_data(fd, true); 96 break; 97 case EINTR: 98 break; 99 default: 100 return -1; 101 } 102 continue; 103 } 104 105 if (len > 0) 106 sock_fd = -1; 107 108 len += cur_len; 109 while (cur_len >= (ssize_t) iov->iov_len) { 110 cur_len -= iov->iov_len; 111 iov_len--; 112 iov++; 113 if (!iov_len) 114 return len; 115 } 116 iov->iov_base += cur_len; 117 iov->iov_len -= cur_len; 118 msghdr.msg_iov = iov; 119 msghdr.msg_iovlen = iov_len; 120 } while (1); 121 122 /* Should never reach here */ 123 return -1; 124 } 125 126 int __hidden ubus_send_msg(struct ubus_context *ctx, uint32_t seq, 127 struct blob_attr *msg, int cmd, uint32_t peer, int fd) 128 { 129 struct ubus_msghdr hdr; 130 struct iovec iov[2] = { 131 STATIC_IOV(hdr) 132 }; 133 int ret; 134 135 hdr.version = 0; 136 hdr.type = cmd; 137 hdr.seq = cpu_to_be16(seq); 138 hdr.peer = cpu_to_be32(peer); 139 140 if (!msg) { 141 blob_buf_init(&b, 0); 142 msg = b.head; 143 } 144 145 iov[1].iov_base = (char *) msg; 146 iov[1].iov_len = blob_raw_len(msg); 147 148 ret = writev_retry(ctx->sock.fd, iov, ARRAY_SIZE(iov), fd); 149 if (ret < 0) 150 ctx->sock.eof = true; 151 152 if (fd >= 0) 153 close(fd); 154 155 return ret; 156 } 157 158 static int recv_retry(struct ubus_context *ctx, struct iovec *iov, bool wait, int *recv_fd) 159 { 160 uint8_t fd_buf[CMSG_SPACE(sizeof(int))] = { 0 }; 161 struct msghdr msghdr = { 0 }; 162 struct cmsghdr *cmsg; 163 int total = 0; 164 int bytes; 165 int *pfd; 166 int fd; 167 168 fd = ctx->sock.fd; 169 170 msghdr.msg_iov = iov, 171 msghdr.msg_iovlen = 1, 172 msghdr.msg_control = fd_buf; 173 msghdr.msg_controllen = sizeof(fd_buf); 174 175 cmsg = CMSG_FIRSTHDR(&msghdr); 176 cmsg->cmsg_type = SCM_RIGHTS; 177 cmsg->cmsg_level = SOL_SOCKET; 178 cmsg->cmsg_len = CMSG_LEN(sizeof(int)); 179 180 pfd = (int *) CMSG_DATA(cmsg); 181 182 while (iov->iov_len > 0) { 183 if (recv_fd) { 184 msghdr.msg_control = fd_buf; 185 msghdr.msg_controllen = cmsg->cmsg_len; 186 } else { 187 msghdr.msg_control = NULL; 188 msghdr.msg_controllen = 0; 189 } 190 191 *pfd = -1; 192 bytes = recvmsg(fd, &msghdr, 0); 193 if (!bytes) 194 return -1; 195 196 if (bytes < 0) { 197 bytes = 0; 198 if (errno == EINTR) 199 continue; 200 201 if (errno != EAGAIN) 202 return -1; 203 } 204 if (!wait && !bytes) 205 return 0; 206 207 if (recv_fd) 208 *recv_fd = *pfd; 209 210 recv_fd = NULL; 211 212 wait = true; 213 iov->iov_len -= bytes; 214 iov->iov_base += bytes; 215 total += bytes; 216 217 if (iov->iov_len > 0) 218 wait_data(fd, false); 219 } 220 221 return total; 222 } 223 224 bool ubus_validate_hdr(struct ubus_msghdr *hdr) 225 { 226 struct blob_attr *data = (struct blob_attr *) (hdr + 1); 227 228 if (hdr->version != 0) 229 return false; 230 231 if (blob_raw_len(data) < sizeof(*data)) 232 return false; 233 234 if (blob_pad_len(data) > UBUS_MAX_MSGLEN) 235 return false; 236 237 return true; 238 } 239 240 static bool alloc_msg_buf(struct ubus_context *ctx, int len) 241 { 242 void *ptr; 243 int buf_len = ctx->msgbuf_data_len; 244 int rem; 245 246 if (!ctx->msgbuf.data) 247 buf_len = 0; 248 249 rem = (len % UBUS_MSG_CHUNK_SIZE); 250 if (rem > 0) 251 len += UBUS_MSG_CHUNK_SIZE - rem; 252 253 if (len < buf_len && 254 ++ctx->msgbuf_reduction_counter > UBUS_MSGBUF_REDUCTION_INTERVAL) { 255 ctx->msgbuf_reduction_counter = 0; 256 buf_len = 0; 257 } 258 259 if (len <= buf_len) 260 return true; 261 262 ptr = realloc(ctx->msgbuf.data, len); 263 if (!ptr) 264 return false; 265 266 ctx->msgbuf.data = ptr; 267 ctx->msgbuf_data_len = len; 268 return true; 269 } 270 271 static bool get_next_msg(struct ubus_context *ctx, int *recv_fd) 272 { 273 struct { 274 struct ubus_msghdr hdr; 275 struct blob_attr data; 276 } hdrbuf; 277 struct iovec iov = STATIC_IOV(hdrbuf); 278 int len; 279 int r; 280 281 /* receive header + start attribute */ 282 r = recv_retry(ctx, &iov, false, recv_fd); 283 if (r <= 0) { 284 if (r < 0) 285 ctx->sock.eof = true; 286 287 return false; 288 } 289 290 hdrbuf.hdr.seq = be16_to_cpu(hdrbuf.hdr.seq); 291 hdrbuf.hdr.peer = be32_to_cpu(hdrbuf.hdr.peer); 292 293 if (!ubus_validate_hdr(&hdrbuf.hdr)) 294 return false; 295 296 len = blob_raw_len(&hdrbuf.data); 297 if (!alloc_msg_buf(ctx, len)) 298 return false; 299 300 memcpy(&ctx->msgbuf.hdr, &hdrbuf.hdr, sizeof(hdrbuf.hdr)); 301 memcpy(ctx->msgbuf.data, &hdrbuf.data, sizeof(hdrbuf.data)); 302 303 iov.iov_base = (char *)ctx->msgbuf.data + sizeof(hdrbuf.data); 304 iov.iov_len = blob_len(ctx->msgbuf.data); 305 if (iov.iov_len > 0 && 306 recv_retry(ctx, &iov, true, NULL) <= 0) 307 return false; 308 309 return true; 310 } 311 312 void __hidden ubus_handle_data(struct uloop_fd *u, unsigned int events) 313 { 314 struct ubus_context *ctx = container_of(u, struct ubus_context, sock); 315 int recv_fd = -1; 316 317 while (1) { 318 if (!ctx->stack_depth) 319 ctx->pending_timer.cb(&ctx->pending_timer); 320 321 if (!get_next_msg(ctx, &recv_fd)) 322 break; 323 ubus_process_msg(ctx, &ctx->msgbuf, recv_fd); 324 if (uloop_cancelling() || ctx->cancel_poll) 325 break; 326 } 327 328 if (!ctx->stack_depth) 329 ctx->pending_timer.cb(&ctx->pending_timer); 330 331 if (u->eof) 332 ctx->connection_lost(ctx); 333 } 334 335 void __hidden ubus_poll_data(struct ubus_context *ctx, int timeout) 336 { 337 struct pollfd pfd = { 338 .fd = ctx->sock.fd, 339 .events = POLLIN | POLLERR, 340 }; 341 342 ctx->cancel_poll = false; 343 poll(&pfd, 1, timeout ? timeout : -1); 344 ubus_handle_data(&ctx->sock, ULOOP_READ); 345 } 346 347 static void 348 ubus_auto_sub_lookup(struct ubus_context *ctx, struct ubus_object_data *obj, 349 void *priv) 350 { 351 struct ubus_subscriber *s; 352 353 list_for_each_entry(s, &ctx->auto_subscribers, list) 354 if (s->new_obj_cb(ctx, s, obj->path)) 355 ubus_subscribe(ctx, s, obj->id); 356 } 357 358 static void 359 ubus_refresh_auto_subscribe(struct ubus_context *ctx) 360 { 361 struct ubus_event_handler *ev = &ctx->auto_subscribe_event_handler; 362 363 if (list_empty(&ctx->auto_subscribers)) 364 return; 365 366 ubus_register_event_handler(ctx, ev, "ubus.object.add"); 367 ubus_lookup(ctx, NULL, ubus_auto_sub_lookup, NULL); 368 } 369 370 static void 371 ubus_refresh_state(struct ubus_context *ctx) 372 { 373 struct ubus_object *obj, *tmp; 374 struct ubus_object **objs; 375 int n, i = 0; 376 377 /* clear all type IDs, they need to be registered again */ 378 avl_for_each_element(&ctx->objects, obj, avl) 379 if (obj->type) 380 obj->type->id = 0; 381 382 /* push out all objects again */ 383 objs = alloca(ctx->objects.count * sizeof(*objs)); 384 avl_remove_all_elements(&ctx->objects, obj, avl, tmp) { 385 objs[i++] = obj; 386 obj->id = 0; 387 } 388 389 for (n = i, i = 0; i < n; i++) 390 ubus_add_object(ctx, objs[i]); 391 392 ubus_refresh_auto_subscribe(ctx); 393 } 394 395 int ubus_reconnect(struct ubus_context *ctx, const char *path) 396 { 397 struct { 398 struct ubus_msghdr hdr; 399 struct blob_attr data; 400 } hdr; 401 struct blob_attr *buf; 402 int ret = UBUS_STATUS_UNKNOWN_ERROR; 403 404 if (!path) 405 path = UBUS_UNIX_SOCKET; 406 407 if (ctx->sock.fd >= 0) { 408 if (ctx->sock.registered) 409 uloop_fd_delete(&ctx->sock); 410 411 close(ctx->sock.fd); 412 } 413 414 ctx->sock.eof = false; 415 ctx->sock.error = false; 416 ctx->sock.fd = usock(USOCK_UNIX, path, NULL); 417 if (ctx->sock.fd < 0) 418 return UBUS_STATUS_CONNECTION_FAILED; 419 420 if (read(ctx->sock.fd, &hdr, sizeof(hdr)) != sizeof(hdr)) 421 goto out_close; 422 423 if (!ubus_validate_hdr(&hdr.hdr)) 424 goto out_close; 425 426 if (hdr.hdr.type != UBUS_MSG_HELLO) 427 goto out_close; 428 429 buf = calloc(1, blob_raw_len(&hdr.data)); 430 if (!buf) 431 goto out_close; 432 433 memcpy(buf, &hdr.data, sizeof(hdr.data)); 434 if (read(ctx->sock.fd, blob_data(buf), blob_len(buf)) != (ssize_t) blob_len(buf)) 435 goto out_free; 436 437 ctx->local_id = hdr.hdr.peer; 438 if (!ctx->local_id) 439 goto out_free; 440 441 ret = UBUS_STATUS_OK; 442 fcntl(ctx->sock.fd, F_SETFL, fcntl(ctx->sock.fd, F_GETFL) | O_NONBLOCK | O_CLOEXEC); 443 444 ubus_refresh_state(ctx); 445 446 out_free: 447 free(buf); 448 out_close: 449 if (ret) 450 close(ctx->sock.fd); 451 452 return ret; 453 } 454
This page was automatically generated by LXR 0.3.1. • OpenWrt