• source navigation  • diff markup  • identifier search  • freetext search  • 

Sources/libubox/ustream.c

  1 /*
  2  * ustream - library for stream buffer management
  3  *
  4  * Copyright (C) 2012 Felix Fietkau <nbd@openwrt.org>
  5  *
  6  * Permission to use, copy, modify, and/or distribute this software for any
  7  * purpose with or without fee is hereby granted, provided that the above
  8  * copyright notice and this permission notice appear in all copies.
  9  *
 10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 17  */
 18 
 19 #include <limits.h>
 20 #include <stdlib.h>
 21 #include <string.h>
 22 #include <unistd.h>
 23 #include <stdio.h>
 24 #include <stdarg.h>
 25 
 26 #include "ustream.h"
 27 
 28 #define CB_PENDING_READ (1 << 0)
 29 
 30 static void ustream_init_buf(struct ustream_buf *buf, int len)
 31 {
 32         if (!len)
 33                 abort();
 34 
 35         memset(buf, 0, sizeof(*buf));
 36         buf->data = buf->tail = buf->head;
 37         buf->end = buf->head + len;
 38         *buf->head = 0;
 39 }
 40 
 41 static void ustream_add_buf(struct ustream_buf_list *l, struct ustream_buf *buf)
 42 {
 43         l->buffers++;
 44         if (!l->tail)
 45                 l->head = buf;
 46         else
 47                 l->tail->next = buf;
 48 
 49         buf->next = NULL;
 50         l->tail = buf;
 51         if (!l->data_tail)
 52                 l->data_tail = l->head;
 53 }
 54 
 55 static bool ustream_can_alloc(struct ustream_buf_list *l)
 56 {
 57         if (l->max_buffers <= 0)
 58                 return true;
 59 
 60         return (l->buffers < l->max_buffers);
 61 }
 62 
 63 static int ustream_alloc_default(struct ustream *s, struct ustream_buf_list *l)
 64 {
 65         struct ustream_buf *buf;
 66 
 67         if (!ustream_can_alloc(l))
 68                 return -1;
 69 
 70         buf = malloc(sizeof(*buf) + l->buffer_len + s->string_data);
 71         if (!buf)
 72                 return -1;
 73 
 74         ustream_init_buf(buf, l->buffer_len);
 75         ustream_add_buf(l, buf);
 76 
 77         return 0;
 78 }
 79 
 80 static void ustream_free_buffers(struct ustream_buf_list *l)
 81 {
 82         struct ustream_buf *buf = l->head;
 83 
 84         while (buf) {
 85                 struct ustream_buf *next = buf->next;
 86 
 87                 free(buf);
 88                 buf = next;
 89         }
 90         l->head = NULL;
 91         l->tail = NULL;
 92         l->data_tail = NULL;
 93 }
 94 
 95 void ustream_free(struct ustream *s)
 96 {
 97         if (s->free)
 98                 s->free(s);
 99 
100         uloop_timeout_cancel(&s->state_change);
101         ustream_free_buffers(&s->r);
102         ustream_free_buffers(&s->w);
103 }
104 
105 static void ustream_state_change_cb(struct uloop_timeout *t)
106 {
107         struct ustream *s = container_of(t, struct ustream, state_change);
108 
109         if (s->write_error)
110                 ustream_free_buffers(&s->w);
111         if (s->notify_state)
112                 s->notify_state(s);
113 }
114 
115 void ustream_init_defaults(struct ustream *s)
116 {
117 #define DEFAULT_SET(_f, _default)       \
118         do {                            \
119                 if (!_f)                \
120                         _f = _default;  \
121         } while(0)
122 
123         DEFAULT_SET(s->r.alloc, ustream_alloc_default);
124         DEFAULT_SET(s->w.alloc, ustream_alloc_default);
125 
126         DEFAULT_SET(s->r.min_buffers, 1);
127         DEFAULT_SET(s->r.max_buffers, 1);
128         DEFAULT_SET(s->r.buffer_len, 4096);
129 
130         DEFAULT_SET(s->w.min_buffers, 2);
131         DEFAULT_SET(s->w.max_buffers, -1);
132         DEFAULT_SET(s->w.buffer_len, 256);
133 
134 #undef DEFAULT_SET
135 
136         s->state_change.cb = ustream_state_change_cb;
137         s->write_error = false;
138         s->eof = false;
139         s->read_blocked = 0;
140 
141         s->r.buffers = 0;
142         s->r.data_bytes = 0;
143 
144         s->w.buffers = 0;
145         s->w.data_bytes = 0;
146 }
147 
148 static bool ustream_should_move(struct ustream_buf_list *l, struct ustream_buf *buf, int len)
149 {
150         int maxlen;
151         int offset;
152 
153         /* nothing to squeeze */
154         if (buf->data == buf->head)
155                 return false;
156 
157         maxlen = buf->end - buf->head;
158         offset = buf->data - buf->head;
159 
160         /* less than half is available */
161         if (offset > maxlen / 2)
162                 return true;
163 
164         /* less than 32 bytes data but takes more than 1/4 space */
165         if (buf->tail - buf->data < 32 && offset > maxlen / 4)
166                 return true;
167 
168         /* more buf is already in list or can be allocated */
169         if (buf != l->tail || ustream_can_alloc(l))
170                 return false;
171 
172         /* no need to move if len is available at the tail */
173         return (buf->end - buf->tail < len);
174 }
175 
176 static void ustream_free_buf(struct ustream_buf_list *l, struct ustream_buf *buf)
177 {
178         if (buf == l->head)
179                 l->head = buf->next;
180 
181         if (buf == l->data_tail)
182                 l->data_tail = buf->next;
183 
184         if (buf == l->tail)
185                 l->tail = NULL;
186 
187         if (--l->buffers >= l->min_buffers) {
188                 free(buf);
189                 return;
190         }
191 
192         /* recycle */
193         ustream_init_buf(buf, buf->end - buf->head);
194         ustream_add_buf(l, buf);
195 }
196 
197 static void __ustream_set_read_blocked(struct ustream *s, unsigned char val)
198 {
199         bool changed = !!s->read_blocked != !!val;
200 
201         s->read_blocked = val;
202         if (changed)
203                 s->set_read_blocked(s);
204 }
205 
206 void ustream_set_read_blocked(struct ustream *s, bool set)
207 {
208         unsigned char val = s->read_blocked & ~READ_BLOCKED_USER;
209 
210         if (set)
211                 val |= READ_BLOCKED_USER;
212 
213         __ustream_set_read_blocked(s, val);
214 }
215 
216 void ustream_consume(struct ustream *s, int len)
217 {
218         struct ustream_buf *buf = s->r.head;
219 
220         if (!len)
221                 return;
222 
223         s->r.data_bytes -= len;
224         if (s->r.data_bytes < 0)
225                 abort();
226 
227         do {
228                 struct ustream_buf *next = buf->next;
229                 int buf_len = buf->tail - buf->data;
230 
231                 if (len < buf_len) {
232                         buf->data += len;
233                         break;
234                 }
235 
236                 len -= buf_len;
237                 ustream_free_buf(&s->r, buf);
238                 buf = next;
239         } while(len);
240 
241         __ustream_set_read_blocked(s, s->read_blocked & ~READ_BLOCKED_FULL);
242 }
243 
244 static void ustream_fixup_string(struct ustream *s, struct ustream_buf *buf)
245 {
246         if (!s->string_data)
247                 return;
248 
249         *buf->tail = 0;
250 }
251 
252 static bool ustream_prepare_buf(struct ustream *s, struct ustream_buf_list *l, int len)
253 {
254         struct ustream_buf *buf;
255 
256         buf = l->data_tail;
257         if (buf) {
258                 if (ustream_should_move(l, buf, len)) {
259                         int len = buf->tail - buf->data;
260 
261                         memmove(buf->head, buf->data, len);
262                         buf->data = buf->head;
263                         buf->tail = buf->data + len;
264 
265                         if (l == &s->r)
266                                 ustream_fixup_string(s, buf);
267                 }
268                 /* some chunks available at the tail */
269                 if (buf->tail != buf->end)
270                         return true;
271                 /* next buf available */
272                 if (buf->next) {
273                         l->data_tail = buf->next;
274                         return true;
275                 }
276         }
277 
278         if (!ustream_can_alloc(l))
279                 return false;
280 
281         if (l->alloc(s, l) < 0)
282                 return false;
283 
284         l->data_tail = l->tail;
285         return true;
286 }
287 
288 char *ustream_reserve(struct ustream *s, int len, int *maxlen)
289 {
290         struct ustream_buf *buf;
291 
292         if (!ustream_prepare_buf(s, &s->r, len)) {
293                 __ustream_set_read_blocked(s, s->read_blocked | READ_BLOCKED_FULL);
294                 *maxlen = 0;
295                 return NULL;
296         }
297 
298         buf = s->r.data_tail;
299         *maxlen = buf->end - buf->tail;
300         return buf->tail;
301 }
302 
303 void ustream_fill_read(struct ustream *s, int len)
304 {
305         struct ustream_buf *buf = s->r.data_tail;
306         int maxlen;
307 
308         s->r.data_bytes += len;
309         do {
310                 if (!buf)
311                         abort();
312 
313                 maxlen = buf->end - buf->tail;
314                 if (len < maxlen)
315                         maxlen = len;
316 
317                 len -= maxlen;
318                 buf->tail += maxlen;
319                 ustream_fixup_string(s, buf);
320 
321                 s->r.data_tail = buf;
322                 buf = buf->next;
323         } while (len);
324 
325         if (s->notify_read) {
326                 if (s->pending_cb & CB_PENDING_READ)
327                         return;
328 
329                 s->pending_cb |= CB_PENDING_READ;
330                 s->notify_read(s, s->r.data_bytes);
331                 s->pending_cb &= ~CB_PENDING_READ;
332         }
333 }
334 
335 char *ustream_get_read_buf(struct ustream *s, int *buflen)
336 {
337         char *data = NULL;
338         int len = 0;
339 
340         if (s->r.head) {
341                 len = s->r.head->tail - s->r.head->data;
342                 if (len > 0)
343                         data = s->r.head->data;
344         }
345 
346         if (buflen)
347                 *buflen = len;
348 
349         return data;
350 }
351 
352 int ustream_read(struct ustream *s, char *buf, int buflen)
353 {
354         char *chunk;
355         int chunk_len;
356         int len = 0;
357 
358         do {
359                 chunk = ustream_get_read_buf(s, &chunk_len);
360                 if (!chunk)
361                         break;
362                 if (chunk_len > buflen - len)
363                         chunk_len = buflen - len;
364                 memcpy(buf + len, chunk, chunk_len);
365                 ustream_consume(s, chunk_len);
366                 len += chunk_len;
367         } while (len < buflen);
368 
369         return len;
370 }
371 
372 static void ustream_write_error(struct ustream *s)
373 {
374         if (!s->write_error)
375                 ustream_state_change(s);
376         s->write_error = true;
377 }
378 
379 bool ustream_write_pending(struct ustream *s)
380 {
381         struct ustream_buf *buf = s->w.head;
382         int wr = 0, len;
383 
384         if (s->write_error)
385                 return false;
386 
387         while (buf && s->w.data_bytes) {
388                 struct ustream_buf *next = buf->next;
389                 int maxlen = buf->tail - buf->data;
390 
391                 len = s->write(s, buf->data, maxlen, !!buf->next);
392                 if (len < 0) {
393                         ustream_write_error(s);
394                         break;
395                 }
396 
397                 if (len == 0)
398                         break;
399 
400                 wr += len;
401                 s->w.data_bytes -= len;
402                 if (len < maxlen) {
403                         buf->data += len;
404                         break;
405                 }
406 
407                 ustream_free_buf(&s->w, buf);
408                 buf = next;
409         }
410 
411         if (s->notify_write)
412                 s->notify_write(s, wr);
413 
414         if (s->eof && wr && !s->w.data_bytes)
415                 ustream_state_change(s);
416 
417         return !s->w.data_bytes;
418 }
419 
420 static int ustream_write_buffered(struct ustream *s, const char *data, int len, int wr)
421 {
422         struct ustream_buf_list *l = &s->w;
423         struct ustream_buf *buf;
424         int maxlen;
425 
426         while (len) {
427                 if (!ustream_prepare_buf(s, &s->w, len))
428                         break;
429 
430                 buf = l->data_tail;
431 
432                 maxlen = buf->end - buf->tail;
433                 if (maxlen > len)
434                         maxlen = len;
435 
436                 memcpy(buf->tail, data, maxlen);
437                 buf->tail += maxlen;
438                 data += maxlen;
439                 len -= maxlen;
440                 wr += maxlen;
441                 l->data_bytes += maxlen;
442         }
443 
444         return wr;
445 }
446 
447 int ustream_write(struct ustream *s, const char *data, int len, bool more)
448 {
449         struct ustream_buf_list *l = &s->w;
450         int wr = 0;
451 
452         if (s->write_error)
453                 return 0;
454 
455         if (!l->data_bytes) {
456                 wr = s->write(s, data, len, more);
457                 if (wr == len)
458                         return wr;
459 
460                 if (wr < 0) {
461                         ustream_write_error(s);
462                         return wr;
463                 }
464 
465                 data += wr;
466                 len -= wr;
467         }
468 
469         return ustream_write_buffered(s, data, len, wr);
470 }
471 
472 #define MAX_STACK_BUFLEN        256
473 
474 int ustream_vprintf(struct ustream *s, const char *format, va_list arg)
475 {
476         struct ustream_buf_list *l = &s->w;
477         char *buf;
478         va_list arg2;
479         int wr, maxlen, buflen;
480 
481         if (s->write_error)
482                 return 0;
483 
484         if (!l->data_bytes) {
485                 buf = alloca(MAX_STACK_BUFLEN);
486                 va_copy(arg2, arg);
487                 maxlen = vsnprintf(buf, MAX_STACK_BUFLEN, format, arg2);
488                 va_end(arg2);
489                 if (maxlen < MAX_STACK_BUFLEN) {
490                         wr = s->write(s, buf, maxlen, false);
491                         if (wr < 0) {
492                                 ustream_write_error(s);
493                                 return wr;
494                         }
495                         if (wr == maxlen)
496                                 return wr;
497 
498                         buf += wr;
499                         maxlen -= wr;
500                         return ustream_write_buffered(s, buf, maxlen, wr);
501                 } else {
502                         if (maxlen == INT_MAX)
503                                 return 0;
504                         buf = malloc(maxlen + 1);
505                         if (!buf)
506                                 return 0;
507                         wr = vsnprintf(buf, maxlen + 1, format, arg);
508                         wr = ustream_write(s, buf, wr, false);
509                         free(buf);
510                         return wr;
511                 }
512         }
513 
514         if (!ustream_prepare_buf(s, l, 1))
515                 return 0;
516 
517         buf = l->data_tail->tail;
518         buflen = l->data_tail->end - buf;
519 
520         va_copy(arg2, arg);
521         maxlen = vsnprintf(buf, buflen, format, arg2);
522         va_end(arg2);
523 
524         wr = maxlen;
525         if (wr >= buflen)
526                 wr = buflen - 1;
527 
528         l->data_tail->tail += wr;
529         l->data_bytes += wr;
530         if (maxlen < buflen)
531                 return wr;
532 
533         if (maxlen == INT_MAX)
534                 return wr;
535         buf = malloc(maxlen + 1);
536         if (!buf)
537                 return wr;
538         maxlen = vsnprintf(buf, maxlen + 1, format, arg);
539         wr = ustream_write_buffered(s, buf + wr, maxlen - wr, wr);
540         free(buf);
541 
542         return wr;
543 }
544 
545 int ustream_printf(struct ustream *s, const char *format, ...)
546 {
547         va_list arg;
548         int ret;
549 
550         if (s->write_error)
551                 return 0;
552 
553         va_start(arg, format);
554         ret = ustream_vprintf(s, format, arg);
555         va_end(arg);
556 
557         return ret;
558 }
559 

This page was automatically generated by LXR 0.3.1.  •  OpenWrt