1 /* 2 * ustream - library for stream buffer management 3 * 4 * Copyright (C) 2012 Felix Fietkau <nbd@openwrt.org> 5 * 6 * Permission to use, copy, modify, and/or distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <stdlib.h> 20 #include <string.h> 21 #include <unistd.h> 22 #include <stdio.h> 23 #include <stdarg.h> 24 25 #include "ustream.h" 26 27 #define CB_PENDING_READ (1 << 0) 28 29 static void ustream_init_buf(struct ustream_buf *buf, int len) 30 { 31 if (!len) 32 abort(); 33 34 memset(buf, 0, sizeof(*buf)); 35 buf->data = buf->tail = buf->head; 36 buf->end = buf->head + len; 37 *buf->head = 0; 38 } 39 40 static void ustream_add_buf(struct ustream_buf_list *l, struct ustream_buf *buf) 41 { 42 l->buffers++; 43 if (!l->tail) 44 l->head = buf; 45 else 46 l->tail->next = buf; 47 48 buf->next = NULL; 49 l->tail = buf; 50 if (!l->data_tail) 51 l->data_tail = l->head; 52 } 53 54 static bool ustream_can_alloc(struct ustream_buf_list *l) 55 { 56 if (l->max_buffers <= 0) 57 return true; 58 59 return (l->buffers < l->max_buffers); 60 } 61 62 static int ustream_alloc_default(struct ustream *s, struct ustream_buf_list *l) 63 { 64 struct ustream_buf *buf; 65 66 if (!ustream_can_alloc(l)) 67 return -1; 68 69 buf = malloc(sizeof(*buf) + l->buffer_len + s->string_data); 70 if (!buf) 71 return -1; 72 73 ustream_init_buf(buf, l->buffer_len); 74 ustream_add_buf(l, buf); 75 76 return 0; 77 } 78 79 static void ustream_free_buffers(struct ustream_buf_list *l) 80 { 81 struct ustream_buf *buf = l->head; 82 83 while (buf) { 84 struct ustream_buf *next = buf->next; 85 86 free(buf); 87 buf = next; 88 } 89 l->head = NULL; 90 l->tail = NULL; 91 l->data_tail = NULL; 92 } 93 94 void ustream_free(struct ustream *s) 95 { 96 if (s->free) 97 s->free(s); 98 99 uloop_timeout_cancel(&s->state_change); 100 ustream_free_buffers(&s->r); 101 ustream_free_buffers(&s->w); 102 } 103 104 static void ustream_state_change_cb(struct uloop_timeout *t) 105 { 106 struct ustream *s = container_of(t, struct ustream, state_change); 107 108 if (s->write_error) 109 ustream_free_buffers(&s->w); 110 if (s->notify_state) 111 s->notify_state(s); 112 } 113 114 void ustream_init_defaults(struct ustream *s) 115 { 116 #define DEFAULT_SET(_f, _default) \ 117 do { \ 118 if (!_f) \ 119 _f = _default; \ 120 } while(0) 121 122 DEFAULT_SET(s->r.alloc, ustream_alloc_default); 123 DEFAULT_SET(s->w.alloc, ustream_alloc_default); 124 125 DEFAULT_SET(s->r.min_buffers, 1); 126 DEFAULT_SET(s->r.max_buffers, 1); 127 DEFAULT_SET(s->r.buffer_len, 4096); 128 129 DEFAULT_SET(s->w.min_buffers, 2); 130 DEFAULT_SET(s->w.max_buffers, -1); 131 DEFAULT_SET(s->w.buffer_len, 256); 132 133 #undef DEFAULT_SET 134 135 s->state_change.cb = ustream_state_change_cb; 136 s->write_error = false; 137 s->eof = false; 138 s->read_blocked = 0; 139 140 s->r.buffers = 0; 141 s->r.data_bytes = 0; 142 143 s->w.buffers = 0; 144 s->w.data_bytes = 0; 145 } 146 147 static bool ustream_should_move(struct ustream_buf_list *l, struct ustream_buf *buf, int len) 148 { 149 int maxlen; 150 int offset; 151 152 /* nothing to squeeze */ 153 if (buf->data == buf->head) 154 return false; 155 156 maxlen = buf->end - buf->head; 157 offset = buf->data - buf->head; 158 159 /* less than half is available */ 160 if (offset > maxlen / 2) 161 return true; 162 163 /* less than 32 bytes data but takes more than 1/4 space */ 164 if (buf->tail - buf->data < 32 && offset > maxlen / 4) 165 return true; 166 167 /* more buf is already in list or can be allocated */ 168 if (buf != l->tail || ustream_can_alloc(l)) 169 return false; 170 171 /* no need to move if len is available at the tail */ 172 return (buf->end - buf->tail < len); 173 } 174 175 static void ustream_free_buf(struct ustream_buf_list *l, struct ustream_buf *buf) 176 { 177 if (buf == l->head) 178 l->head = buf->next; 179 180 if (buf == l->data_tail) 181 l->data_tail = buf->next; 182 183 if (buf == l->tail) 184 l->tail = NULL; 185 186 if (--l->buffers >= l->min_buffers) { 187 free(buf); 188 return; 189 } 190 191 /* recycle */ 192 ustream_init_buf(buf, buf->end - buf->head); 193 ustream_add_buf(l, buf); 194 } 195 196 static void __ustream_set_read_blocked(struct ustream *s, unsigned char val) 197 { 198 bool changed = !!s->read_blocked != !!val; 199 200 s->read_blocked = val; 201 if (changed) 202 s->set_read_blocked(s); 203 } 204 205 void ustream_set_read_blocked(struct ustream *s, bool set) 206 { 207 unsigned char val = s->read_blocked & ~READ_BLOCKED_USER; 208 209 if (set) 210 val |= READ_BLOCKED_USER; 211 212 __ustream_set_read_blocked(s, val); 213 } 214 215 void ustream_consume(struct ustream *s, int len) 216 { 217 struct ustream_buf *buf = s->r.head; 218 219 if (!len) 220 return; 221 222 s->r.data_bytes -= len; 223 if (s->r.data_bytes < 0) 224 abort(); 225 226 do { 227 struct ustream_buf *next = buf->next; 228 int buf_len = buf->tail - buf->data; 229 230 if (len < buf_len) { 231 buf->data += len; 232 break; 233 } 234 235 len -= buf_len; 236 ustream_free_buf(&s->r, buf); 237 buf = next; 238 } while(len); 239 240 __ustream_set_read_blocked(s, s->read_blocked & ~READ_BLOCKED_FULL); 241 } 242 243 static void ustream_fixup_string(struct ustream *s, struct ustream_buf *buf) 244 { 245 if (!s->string_data) 246 return; 247 248 *buf->tail = 0; 249 } 250 251 static bool ustream_prepare_buf(struct ustream *s, struct ustream_buf_list *l, int len) 252 { 253 struct ustream_buf *buf; 254 255 buf = l->data_tail; 256 if (buf) { 257 if (ustream_should_move(l, buf, len)) { 258 int len = buf->tail - buf->data; 259 260 memmove(buf->head, buf->data, len); 261 buf->data = buf->head; 262 buf->tail = buf->data + len; 263 264 if (l == &s->r) 265 ustream_fixup_string(s, buf); 266 } 267 /* some chunks available at the tail */ 268 if (buf->tail != buf->end) 269 return true; 270 /* next buf available */ 271 if (buf->next) { 272 l->data_tail = buf->next; 273 return true; 274 } 275 } 276 277 if (!ustream_can_alloc(l)) 278 return false; 279 280 if (l->alloc(s, l) < 0) 281 return false; 282 283 l->data_tail = l->tail; 284 return true; 285 } 286 287 char *ustream_reserve(struct ustream *s, int len, int *maxlen) 288 { 289 struct ustream_buf *buf; 290 291 if (!ustream_prepare_buf(s, &s->r, len)) { 292 __ustream_set_read_blocked(s, s->read_blocked | READ_BLOCKED_FULL); 293 *maxlen = 0; 294 return NULL; 295 } 296 297 buf = s->r.data_tail; 298 *maxlen = buf->end - buf->tail; 299 return buf->tail; 300 } 301 302 void ustream_fill_read(struct ustream *s, int len) 303 { 304 struct ustream_buf *buf = s->r.data_tail; 305 int maxlen; 306 307 s->r.data_bytes += len; 308 do { 309 if (!buf) 310 abort(); 311 312 maxlen = buf->end - buf->tail; 313 if (len < maxlen) 314 maxlen = len; 315 316 len -= maxlen; 317 buf->tail += maxlen; 318 ustream_fixup_string(s, buf); 319 320 s->r.data_tail = buf; 321 buf = buf->next; 322 } while (len); 323 324 if (s->notify_read) { 325 if (s->pending_cb & CB_PENDING_READ) 326 return; 327 328 s->pending_cb |= CB_PENDING_READ; 329 s->notify_read(s, s->r.data_bytes); 330 s->pending_cb &= ~CB_PENDING_READ; 331 } 332 } 333 334 char *ustream_get_read_buf(struct ustream *s, int *buflen) 335 { 336 char *data = NULL; 337 int len = 0; 338 339 if (s->r.head) { 340 len = s->r.head->tail - s->r.head->data; 341 if (len > 0) 342 data = s->r.head->data; 343 } 344 345 if (buflen) 346 *buflen = len; 347 348 return data; 349 } 350 351 int ustream_read(struct ustream *s, char *buf, int buflen) 352 { 353 char *chunk; 354 int chunk_len; 355 int len = 0; 356 357 do { 358 chunk = ustream_get_read_buf(s, &chunk_len); 359 if (!chunk) 360 break; 361 if (chunk_len > buflen - len) 362 chunk_len = buflen - len; 363 memcpy(buf + len, chunk, chunk_len); 364 ustream_consume(s, chunk_len); 365 len += chunk_len; 366 } while (len < buflen); 367 368 return len; 369 } 370 371 static void ustream_write_error(struct ustream *s) 372 { 373 if (!s->write_error) 374 ustream_state_change(s); 375 s->write_error = true; 376 } 377 378 bool ustream_write_pending(struct ustream *s) 379 { 380 struct ustream_buf *buf = s->w.head; 381 int wr = 0, len; 382 383 if (s->write_error) 384 return false; 385 386 while (buf && s->w.data_bytes) { 387 struct ustream_buf *next = buf->next; 388 int maxlen = buf->tail - buf->data; 389 390 len = s->write(s, buf->data, maxlen, !!buf->next); 391 if (len < 0) { 392 ustream_write_error(s); 393 break; 394 } 395 396 if (len == 0) 397 break; 398 399 wr += len; 400 s->w.data_bytes -= len; 401 if (len < maxlen) { 402 buf->data += len; 403 break; 404 } 405 406 ustream_free_buf(&s->w, buf); 407 buf = next; 408 } 409 410 if (s->notify_write) 411 s->notify_write(s, wr); 412 413 if (s->eof && wr && !s->w.data_bytes) 414 ustream_state_change(s); 415 416 return !s->w.data_bytes; 417 } 418 419 static int ustream_write_buffered(struct ustream *s, const char *data, int len, int wr) 420 { 421 struct ustream_buf_list *l = &s->w; 422 struct ustream_buf *buf; 423 int maxlen; 424 425 while (len) { 426 if (!ustream_prepare_buf(s, &s->w, len)) 427 break; 428 429 buf = l->data_tail; 430 431 maxlen = buf->end - buf->tail; 432 if (maxlen > len) 433 maxlen = len; 434 435 memcpy(buf->tail, data, maxlen); 436 buf->tail += maxlen; 437 data += maxlen; 438 len -= maxlen; 439 wr += maxlen; 440 l->data_bytes += maxlen; 441 } 442 443 return wr; 444 } 445 446 int ustream_write(struct ustream *s, const char *data, int len, bool more) 447 { 448 struct ustream_buf_list *l = &s->w; 449 int wr = 0; 450 451 if (s->write_error) 452 return 0; 453 454 if (!l->data_bytes) { 455 wr = s->write(s, data, len, more); 456 if (wr == len) 457 return wr; 458 459 if (wr < 0) { 460 ustream_write_error(s); 461 return wr; 462 } 463 464 data += wr; 465 len -= wr; 466 } 467 468 return ustream_write_buffered(s, data, len, wr); 469 } 470 471 #define MAX_STACK_BUFLEN 256 472 473 int ustream_vprintf(struct ustream *s, const char *format, va_list arg) 474 { 475 struct ustream_buf_list *l = &s->w; 476 char *buf; 477 va_list arg2; 478 int wr, maxlen, buflen; 479 480 if (s->write_error) 481 return 0; 482 483 if (!l->data_bytes) { 484 buf = alloca(MAX_STACK_BUFLEN); 485 va_copy(arg2, arg); 486 maxlen = vsnprintf(buf, MAX_STACK_BUFLEN, format, arg2); 487 va_end(arg2); 488 if (maxlen < MAX_STACK_BUFLEN) { 489 wr = s->write(s, buf, maxlen, false); 490 if (wr < 0) { 491 ustream_write_error(s); 492 return wr; 493 } 494 if (wr == maxlen) 495 return wr; 496 497 buf += wr; 498 maxlen -= wr; 499 return ustream_write_buffered(s, buf, maxlen, wr); 500 } else { 501 buf = malloc(maxlen + 1); 502 if (!buf) 503 return 0; 504 wr = vsnprintf(buf, maxlen + 1, format, arg); 505 wr = ustream_write(s, buf, wr, false); 506 free(buf); 507 return wr; 508 } 509 } 510 511 if (!ustream_prepare_buf(s, l, 1)) 512 return 0; 513 514 buf = l->data_tail->tail; 515 buflen = l->data_tail->end - buf; 516 517 va_copy(arg2, arg); 518 maxlen = vsnprintf(buf, buflen, format, arg2); 519 va_end(arg2); 520 521 wr = maxlen; 522 if (wr >= buflen) 523 wr = buflen - 1; 524 525 l->data_tail->tail += wr; 526 l->data_bytes += wr; 527 if (maxlen < buflen) 528 return wr; 529 530 buf = malloc(maxlen + 1); 531 if (!buf) 532 return wr; 533 maxlen = vsnprintf(buf, maxlen + 1, format, arg); 534 wr = ustream_write_buffered(s, buf + wr, maxlen - wr, wr); 535 free(buf); 536 537 return wr; 538 } 539 540 int ustream_printf(struct ustream *s, const char *format, ...) 541 { 542 va_list arg; 543 int ret; 544 545 if (s->write_error) 546 return 0; 547 548 va_start(arg, format); 549 ret = ustream_vprintf(s, format, arg); 550 va_end(arg); 551 552 return ret; 553 } 554
This page was automatically generated by LXR 0.3.1. • OpenWrt