1 /* 2 * ustream - library for stream buffer management 3 * 4 * Copyright (C) 2012 Felix Fietkau <nbd@openwrt.org> 5 * 6 * Permission to use, copy, modify, and/or distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <limits.h> 20 #include <stdlib.h> 21 #include <string.h> 22 #include <unistd.h> 23 #include <stdio.h> 24 #include <stdarg.h> 25 26 #include "ustream.h" 27 28 #define CB_PENDING_READ (1 << 0) 29 30 static void ustream_init_buf(struct ustream_buf *buf, int len) 31 { 32 if (!len) 33 abort(); 34 35 memset(buf, 0, sizeof(*buf)); 36 buf->data = buf->tail = buf->head; 37 buf->end = buf->head + len; 38 *buf->head = 0; 39 } 40 41 static void ustream_add_buf(struct ustream_buf_list *l, struct ustream_buf *buf) 42 { 43 l->buffers++; 44 if (!l->tail) 45 l->head = buf; 46 else 47 l->tail->next = buf; 48 49 buf->next = NULL; 50 l->tail = buf; 51 if (!l->data_tail) 52 l->data_tail = l->head; 53 } 54 55 static bool ustream_can_alloc(struct ustream_buf_list *l) 56 { 57 if (l->max_buffers <= 0) 58 return true; 59 60 return (l->buffers < l->max_buffers); 61 } 62 63 static int ustream_alloc_default(struct ustream *s, struct ustream_buf_list *l) 64 { 65 struct ustream_buf *buf; 66 67 if (!ustream_can_alloc(l)) 68 return -1; 69 70 buf = malloc(sizeof(*buf) + l->buffer_len + s->string_data); 71 if (!buf) 72 return -1; 73 74 ustream_init_buf(buf, l->buffer_len); 75 ustream_add_buf(l, buf); 76 77 return 0; 78 } 79 80 static void ustream_free_buffers(struct ustream_buf_list *l) 81 { 82 struct ustream_buf *buf = l->head; 83 84 while (buf) { 85 struct ustream_buf *next = buf->next; 86 87 free(buf); 88 buf = next; 89 } 90 l->head = NULL; 91 l->tail = NULL; 92 l->data_tail = NULL; 93 } 94 95 void ustream_free(struct ustream *s) 96 { 97 if (s->free) 98 s->free(s); 99 100 uloop_timeout_cancel(&s->state_change); 101 ustream_free_buffers(&s->r); 102 ustream_free_buffers(&s->w); 103 } 104 105 static void ustream_state_change_cb(struct uloop_timeout *t) 106 { 107 struct ustream *s = container_of(t, struct ustream, state_change); 108 109 if (s->write_error) 110 ustream_free_buffers(&s->w); 111 if (s->notify_state) 112 s->notify_state(s); 113 } 114 115 void ustream_init_defaults(struct ustream *s) 116 { 117 #define DEFAULT_SET(_f, _default) \ 118 do { \ 119 if (!_f) \ 120 _f = _default; \ 121 } while(0) 122 123 DEFAULT_SET(s->r.alloc, ustream_alloc_default); 124 DEFAULT_SET(s->w.alloc, ustream_alloc_default); 125 126 DEFAULT_SET(s->r.min_buffers, 1); 127 DEFAULT_SET(s->r.max_buffers, 1); 128 DEFAULT_SET(s->r.buffer_len, 4096); 129 130 DEFAULT_SET(s->w.min_buffers, 2); 131 DEFAULT_SET(s->w.max_buffers, -1); 132 DEFAULT_SET(s->w.buffer_len, 256); 133 134 #undef DEFAULT_SET 135 136 s->state_change.cb = ustream_state_change_cb; 137 s->write_error = false; 138 s->eof = false; 139 s->read_blocked = 0; 140 141 s->r.buffers = 0; 142 s->r.data_bytes = 0; 143 144 s->w.buffers = 0; 145 s->w.data_bytes = 0; 146 } 147 148 static bool ustream_should_move(struct ustream_buf_list *l, struct ustream_buf *buf, int len) 149 { 150 int maxlen; 151 int offset; 152 153 /* nothing to squeeze */ 154 if (buf->data == buf->head) 155 return false; 156 157 maxlen = buf->end - buf->head; 158 offset = buf->data - buf->head; 159 160 /* less than half is available */ 161 if (offset > maxlen / 2) 162 return true; 163 164 /* less than 32 bytes data but takes more than 1/4 space */ 165 if (buf->tail - buf->data < 32 && offset > maxlen / 4) 166 return true; 167 168 /* more buf is already in list or can be allocated */ 169 if (buf != l->tail || ustream_can_alloc(l)) 170 return false; 171 172 /* no need to move if len is available at the tail */ 173 return (buf->end - buf->tail < len); 174 } 175 176 static void ustream_free_buf(struct ustream_buf_list *l, struct ustream_buf *buf) 177 { 178 if (buf == l->head) 179 l->head = buf->next; 180 181 if (buf == l->data_tail) 182 l->data_tail = buf->next; 183 184 if (buf == l->tail) 185 l->tail = NULL; 186 187 if (--l->buffers >= l->min_buffers) { 188 free(buf); 189 return; 190 } 191 192 /* recycle */ 193 ustream_init_buf(buf, buf->end - buf->head); 194 ustream_add_buf(l, buf); 195 } 196 197 static void __ustream_set_read_blocked(struct ustream *s, unsigned char val) 198 { 199 bool changed = !!s->read_blocked != !!val; 200 201 s->read_blocked = val; 202 if (changed) 203 s->set_read_blocked(s); 204 } 205 206 void ustream_set_read_blocked(struct ustream *s, bool set) 207 { 208 unsigned char val = s->read_blocked & ~READ_BLOCKED_USER; 209 210 if (set) 211 val |= READ_BLOCKED_USER; 212 213 __ustream_set_read_blocked(s, val); 214 } 215 216 void ustream_consume(struct ustream *s, int len) 217 { 218 struct ustream_buf *buf = s->r.head; 219 220 if (!len) 221 return; 222 223 s->r.data_bytes -= len; 224 if (s->r.data_bytes < 0) 225 abort(); 226 227 do { 228 struct ustream_buf *next = buf->next; 229 int buf_len = buf->tail - buf->data; 230 231 if (len < buf_len) { 232 buf->data += len; 233 break; 234 } 235 236 len -= buf_len; 237 ustream_free_buf(&s->r, buf); 238 buf = next; 239 } while(len); 240 241 __ustream_set_read_blocked(s, s->read_blocked & ~READ_BLOCKED_FULL); 242 } 243 244 static void ustream_fixup_string(struct ustream *s, struct ustream_buf *buf) 245 { 246 if (!s->string_data) 247 return; 248 249 *buf->tail = 0; 250 } 251 252 static bool ustream_prepare_buf(struct ustream *s, struct ustream_buf_list *l, int len) 253 { 254 struct ustream_buf *buf; 255 256 buf = l->data_tail; 257 if (buf) { 258 if (ustream_should_move(l, buf, len)) { 259 int len = buf->tail - buf->data; 260 261 memmove(buf->head, buf->data, len); 262 buf->data = buf->head; 263 buf->tail = buf->data + len; 264 265 if (l == &s->r) 266 ustream_fixup_string(s, buf); 267 } 268 /* some chunks available at the tail */ 269 if (buf->tail != buf->end) 270 return true; 271 /* next buf available */ 272 if (buf->next) { 273 l->data_tail = buf->next; 274 return true; 275 } 276 } 277 278 if (!ustream_can_alloc(l)) 279 return false; 280 281 if (l->alloc(s, l) < 0) 282 return false; 283 284 l->data_tail = l->tail; 285 return true; 286 } 287 288 char *ustream_reserve(struct ustream *s, int len, int *maxlen) 289 { 290 struct ustream_buf *buf; 291 292 if (!ustream_prepare_buf(s, &s->r, len)) { 293 __ustream_set_read_blocked(s, s->read_blocked | READ_BLOCKED_FULL); 294 *maxlen = 0; 295 return NULL; 296 } 297 298 buf = s->r.data_tail; 299 *maxlen = buf->end - buf->tail; 300 return buf->tail; 301 } 302 303 void ustream_fill_read(struct ustream *s, int len) 304 { 305 struct ustream_buf *buf = s->r.data_tail; 306 int maxlen; 307 308 s->r.data_bytes += len; 309 do { 310 if (!buf) 311 abort(); 312 313 maxlen = buf->end - buf->tail; 314 if (len < maxlen) 315 maxlen = len; 316 317 len -= maxlen; 318 buf->tail += maxlen; 319 ustream_fixup_string(s, buf); 320 321 s->r.data_tail = buf; 322 buf = buf->next; 323 } while (len); 324 325 if (s->notify_read) { 326 if (s->pending_cb & CB_PENDING_READ) 327 return; 328 329 s->pending_cb |= CB_PENDING_READ; 330 s->notify_read(s, s->r.data_bytes); 331 s->pending_cb &= ~CB_PENDING_READ; 332 } 333 } 334 335 char *ustream_get_read_buf(struct ustream *s, int *buflen) 336 { 337 char *data = NULL; 338 int len = 0; 339 340 if (s->r.head) { 341 len = s->r.head->tail - s->r.head->data; 342 if (len > 0) 343 data = s->r.head->data; 344 } 345 346 if (buflen) 347 *buflen = len; 348 349 return data; 350 } 351 352 int ustream_read(struct ustream *s, char *buf, int buflen) 353 { 354 char *chunk; 355 int chunk_len; 356 int len = 0; 357 358 do { 359 chunk = ustream_get_read_buf(s, &chunk_len); 360 if (!chunk) 361 break; 362 if (chunk_len > buflen - len) 363 chunk_len = buflen - len; 364 memcpy(buf + len, chunk, chunk_len); 365 ustream_consume(s, chunk_len); 366 len += chunk_len; 367 } while (len < buflen); 368 369 return len; 370 } 371 372 static void ustream_write_error(struct ustream *s) 373 { 374 if (!s->write_error) 375 ustream_state_change(s); 376 s->write_error = true; 377 } 378 379 bool ustream_write_pending(struct ustream *s) 380 { 381 struct ustream_buf *buf = s->w.head; 382 int wr = 0, len; 383 384 if (s->write_error) 385 return false; 386 387 while (buf && s->w.data_bytes) { 388 struct ustream_buf *next = buf->next; 389 int maxlen = buf->tail - buf->data; 390 391 len = s->write(s, buf->data, maxlen, !!buf->next); 392 if (len < 0) { 393 ustream_write_error(s); 394 break; 395 } 396 397 if (len == 0) 398 break; 399 400 wr += len; 401 s->w.data_bytes -= len; 402 if (len < maxlen) { 403 buf->data += len; 404 break; 405 } 406 407 ustream_free_buf(&s->w, buf); 408 buf = next; 409 } 410 411 if (s->notify_write) 412 s->notify_write(s, wr); 413 414 if (s->eof && wr && !s->w.data_bytes) 415 ustream_state_change(s); 416 417 return !s->w.data_bytes; 418 } 419 420 static int ustream_write_buffered(struct ustream *s, const char *data, int len, int wr) 421 { 422 struct ustream_buf_list *l = &s->w; 423 struct ustream_buf *buf; 424 int maxlen; 425 426 while (len) { 427 if (!ustream_prepare_buf(s, &s->w, len)) 428 break; 429 430 buf = l->data_tail; 431 432 maxlen = buf->end - buf->tail; 433 if (maxlen > len) 434 maxlen = len; 435 436 memcpy(buf->tail, data, maxlen); 437 buf->tail += maxlen; 438 data += maxlen; 439 len -= maxlen; 440 wr += maxlen; 441 l->data_bytes += maxlen; 442 } 443 444 return wr; 445 } 446 447 int ustream_write(struct ustream *s, const char *data, int len, bool more) 448 { 449 struct ustream_buf_list *l = &s->w; 450 int wr = 0; 451 452 if (s->write_error) 453 return 0; 454 455 if (!l->data_bytes) { 456 wr = s->write(s, data, len, more); 457 if (wr == len) 458 return wr; 459 460 if (wr < 0) { 461 ustream_write_error(s); 462 return wr; 463 } 464 465 data += wr; 466 len -= wr; 467 } 468 469 return ustream_write_buffered(s, data, len, wr); 470 } 471 472 #define MAX_STACK_BUFLEN 256 473 474 int ustream_vprintf(struct ustream *s, const char *format, va_list arg) 475 { 476 struct ustream_buf_list *l = &s->w; 477 char *buf; 478 va_list arg2; 479 int wr, maxlen, buflen; 480 481 if (s->write_error) 482 return 0; 483 484 if (!l->data_bytes) { 485 buf = alloca(MAX_STACK_BUFLEN); 486 va_copy(arg2, arg); 487 maxlen = vsnprintf(buf, MAX_STACK_BUFLEN, format, arg2); 488 va_end(arg2); 489 if (maxlen < MAX_STACK_BUFLEN) { 490 wr = s->write(s, buf, maxlen, false); 491 if (wr < 0) { 492 ustream_write_error(s); 493 return wr; 494 } 495 if (wr == maxlen) 496 return wr; 497 498 buf += wr; 499 maxlen -= wr; 500 return ustream_write_buffered(s, buf, maxlen, wr); 501 } else { 502 if (maxlen == INT_MAX) 503 return 0; 504 buf = malloc(maxlen + 1); 505 if (!buf) 506 return 0; 507 wr = vsnprintf(buf, maxlen + 1, format, arg); 508 wr = ustream_write(s, buf, wr, false); 509 free(buf); 510 return wr; 511 } 512 } 513 514 if (!ustream_prepare_buf(s, l, 1)) 515 return 0; 516 517 buf = l->data_tail->tail; 518 buflen = l->data_tail->end - buf; 519 520 va_copy(arg2, arg); 521 maxlen = vsnprintf(buf, buflen, format, arg2); 522 va_end(arg2); 523 524 wr = maxlen; 525 if (wr >= buflen) 526 wr = buflen - 1; 527 528 l->data_tail->tail += wr; 529 l->data_bytes += wr; 530 if (maxlen < buflen) 531 return wr; 532 533 if (maxlen == INT_MAX) 534 return wr; 535 buf = malloc(maxlen + 1); 536 if (!buf) 537 return wr; 538 maxlen = vsnprintf(buf, maxlen + 1, format, arg); 539 wr = ustream_write_buffered(s, buf + wr, maxlen - wr, wr); 540 free(buf); 541 542 return wr; 543 } 544 545 int ustream_printf(struct ustream *s, const char *format, ...) 546 { 547 va_list arg; 548 int ret; 549 550 if (s->write_error) 551 return 0; 552 553 va_start(arg, format); 554 ret = ustream_vprintf(s, format, arg); 555 va_end(arg); 556 557 return ret; 558 } 559
This page was automatically generated by LXR 0.3.1. • OpenWrt