• source navigation  • diff markup  • identifier search  • freetext search  • 

Sources/libubox/ustream.c

  1 /*
  2  * ustream - library for stream buffer management
  3  *
  4  * Copyright (C) 2012 Felix Fietkau <nbd@openwrt.org>
  5  *
  6  * Permission to use, copy, modify, and/or distribute this software for any
  7  * purpose with or without fee is hereby granted, provided that the above
  8  * copyright notice and this permission notice appear in all copies.
  9  *
 10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 17  */
 18 
 19 #include <stdlib.h>
 20 #include <string.h>
 21 #include <unistd.h>
 22 #include <stdio.h>
 23 #include <stdarg.h>
 24 
 25 #include "ustream.h"
 26 
 27 #define CB_PENDING_READ (1 << 0)
 28 
 29 static void ustream_init_buf(struct ustream_buf *buf, int len)
 30 {
 31         if (!len)
 32                 abort();
 33 
 34         memset(buf, 0, sizeof(*buf));
 35         buf->data = buf->tail = buf->head;
 36         buf->end = buf->head + len;
 37         *buf->head = 0;
 38 }
 39 
 40 static void ustream_add_buf(struct ustream_buf_list *l, struct ustream_buf *buf)
 41 {
 42         l->buffers++;
 43         if (!l->tail)
 44                 l->head = buf;
 45         else
 46                 l->tail->next = buf;
 47 
 48         buf->next = NULL;
 49         l->tail = buf;
 50         if (!l->data_tail)
 51                 l->data_tail = l->head;
 52 }
 53 
 54 static bool ustream_can_alloc(struct ustream_buf_list *l)
 55 {
 56         if (l->max_buffers <= 0)
 57                 return true;
 58 
 59         return (l->buffers < l->max_buffers);
 60 }
 61 
 62 static int ustream_alloc_default(struct ustream *s, struct ustream_buf_list *l)
 63 {
 64         struct ustream_buf *buf;
 65 
 66         if (!ustream_can_alloc(l))
 67                 return -1;
 68 
 69         buf = malloc(sizeof(*buf) + l->buffer_len + s->string_data);
 70         if (!buf)
 71                 return -1;
 72 
 73         ustream_init_buf(buf, l->buffer_len);
 74         ustream_add_buf(l, buf);
 75 
 76         return 0;
 77 }
 78 
 79 static void ustream_free_buffers(struct ustream_buf_list *l)
 80 {
 81         struct ustream_buf *buf = l->head;
 82 
 83         while (buf) {
 84                 struct ustream_buf *next = buf->next;
 85 
 86                 free(buf);
 87                 buf = next;
 88         }
 89         l->head = NULL;
 90         l->tail = NULL;
 91         l->data_tail = NULL;
 92 }
 93 
 94 void ustream_free(struct ustream *s)
 95 {
 96         if (s->free)
 97                 s->free(s);
 98 
 99         uloop_timeout_cancel(&s->state_change);
100         ustream_free_buffers(&s->r);
101         ustream_free_buffers(&s->w);
102 }
103 
104 static void ustream_state_change_cb(struct uloop_timeout *t)
105 {
106         struct ustream *s = container_of(t, struct ustream, state_change);
107 
108         if (s->write_error)
109                 ustream_free_buffers(&s->w);
110         if (s->notify_state)
111                 s->notify_state(s);
112 }
113 
114 void ustream_init_defaults(struct ustream *s)
115 {
116 #define DEFAULT_SET(_f, _default)       \
117         do {                            \
118                 if (!_f)                \
119                         _f = _default;  \
120         } while(0)
121 
122         DEFAULT_SET(s->r.alloc, ustream_alloc_default);
123         DEFAULT_SET(s->w.alloc, ustream_alloc_default);
124 
125         DEFAULT_SET(s->r.min_buffers, 1);
126         DEFAULT_SET(s->r.max_buffers, 1);
127         DEFAULT_SET(s->r.buffer_len, 4096);
128 
129         DEFAULT_SET(s->w.min_buffers, 2);
130         DEFAULT_SET(s->w.max_buffers, -1);
131         DEFAULT_SET(s->w.buffer_len, 256);
132 
133 #undef DEFAULT_SET
134 
135         s->state_change.cb = ustream_state_change_cb;
136         s->write_error = false;
137         s->eof = false;
138         s->read_blocked = 0;
139 
140         s->r.buffers = 0;
141         s->r.data_bytes = 0;
142 
143         s->w.buffers = 0;
144         s->w.data_bytes = 0;
145 }
146 
147 static bool ustream_should_move(struct ustream_buf_list *l, struct ustream_buf *buf, int len)
148 {
149         int maxlen;
150         int offset;
151 
152         /* nothing to squeeze */
153         if (buf->data == buf->head)
154                 return false;
155 
156         maxlen = buf->end - buf->head;
157         offset = buf->data - buf->head;
158 
159         /* less than half is available */
160         if (offset > maxlen / 2)
161                 return true;
162 
163         /* less than 32 bytes data but takes more than 1/4 space */
164         if (buf->tail - buf->data < 32 && offset > maxlen / 4)
165                 return true;
166 
167         /* more buf is already in list or can be allocated */
168         if (buf != l->tail || ustream_can_alloc(l))
169                 return false;
170 
171         /* no need to move if len is available at the tail */
172         return (buf->end - buf->tail < len);
173 }
174 
175 static void ustream_free_buf(struct ustream_buf_list *l, struct ustream_buf *buf)
176 {
177         if (buf == l->head)
178                 l->head = buf->next;
179 
180         if (buf == l->data_tail)
181                 l->data_tail = buf->next;
182 
183         if (buf == l->tail)
184                 l->tail = NULL;
185 
186         if (--l->buffers >= l->min_buffers) {
187                 free(buf);
188                 return;
189         }
190 
191         /* recycle */
192         ustream_init_buf(buf, buf->end - buf->head);
193         ustream_add_buf(l, buf);
194 }
195 
196 static void __ustream_set_read_blocked(struct ustream *s, unsigned char val)
197 {
198         bool changed = !!s->read_blocked != !!val;
199 
200         s->read_blocked = val;
201         if (changed)
202                 s->set_read_blocked(s);
203 }
204 
205 void ustream_set_read_blocked(struct ustream *s, bool set)
206 {
207         unsigned char val = s->read_blocked & ~READ_BLOCKED_USER;
208 
209         if (set)
210                 val |= READ_BLOCKED_USER;
211 
212         __ustream_set_read_blocked(s, val);
213 }
214 
215 void ustream_consume(struct ustream *s, int len)
216 {
217         struct ustream_buf *buf = s->r.head;
218 
219         if (!len)
220                 return;
221 
222         s->r.data_bytes -= len;
223         if (s->r.data_bytes < 0)
224                 abort();
225 
226         do {
227                 struct ustream_buf *next = buf->next;
228                 int buf_len = buf->tail - buf->data;
229 
230                 if (len < buf_len) {
231                         buf->data += len;
232                         break;
233                 }
234 
235                 len -= buf_len;
236                 ustream_free_buf(&s->r, buf);
237                 buf = next;
238         } while(len);
239 
240         __ustream_set_read_blocked(s, s->read_blocked & ~READ_BLOCKED_FULL);
241 }
242 
243 static void ustream_fixup_string(struct ustream *s, struct ustream_buf *buf)
244 {
245         if (!s->string_data)
246                 return;
247 
248         *buf->tail = 0;
249 }
250 
251 static bool ustream_prepare_buf(struct ustream *s, struct ustream_buf_list *l, int len)
252 {
253         struct ustream_buf *buf;
254 
255         buf = l->data_tail;
256         if (buf) {
257                 if (ustream_should_move(l, buf, len)) {
258                         int len = buf->tail - buf->data;
259 
260                         memmove(buf->head, buf->data, len);
261                         buf->data = buf->head;
262                         buf->tail = buf->data + len;
263 
264                         if (l == &s->r)
265                                 ustream_fixup_string(s, buf);
266                 }
267                 /* some chunks available at the tail */
268                 if (buf->tail != buf->end)
269                         return true;
270                 /* next buf available */
271                 if (buf->next) {
272                         l->data_tail = buf->next;
273                         return true;
274                 }
275         }
276 
277         if (!ustream_can_alloc(l))
278                 return false;
279 
280         if (l->alloc(s, l) < 0)
281                 return false;
282 
283         l->data_tail = l->tail;
284         return true;
285 }
286 
287 char *ustream_reserve(struct ustream *s, int len, int *maxlen)
288 {
289         struct ustream_buf *buf;
290 
291         if (!ustream_prepare_buf(s, &s->r, len)) {
292                 __ustream_set_read_blocked(s, s->read_blocked | READ_BLOCKED_FULL);
293                 *maxlen = 0;
294                 return NULL;
295         }
296 
297         buf = s->r.data_tail;
298         *maxlen = buf->end - buf->tail;
299         return buf->tail;
300 }
301 
302 void ustream_fill_read(struct ustream *s, int len)
303 {
304         struct ustream_buf *buf = s->r.data_tail;
305         int maxlen;
306 
307         s->r.data_bytes += len;
308         do {
309                 if (!buf)
310                         abort();
311 
312                 maxlen = buf->end - buf->tail;
313                 if (len < maxlen)
314                         maxlen = len;
315 
316                 len -= maxlen;
317                 buf->tail += maxlen;
318                 ustream_fixup_string(s, buf);
319 
320                 s->r.data_tail = buf;
321                 buf = buf->next;
322         } while (len);
323 
324         if (s->notify_read) {
325                 if (s->pending_cb & CB_PENDING_READ)
326                         return;
327 
328                 s->pending_cb |= CB_PENDING_READ;
329                 s->notify_read(s, s->r.data_bytes);
330                 s->pending_cb &= ~CB_PENDING_READ;
331         }
332 }
333 
334 char *ustream_get_read_buf(struct ustream *s, int *buflen)
335 {
336         char *data = NULL;
337         int len = 0;
338 
339         if (s->r.head) {
340                 len = s->r.head->tail - s->r.head->data;
341                 if (len > 0)
342                         data = s->r.head->data;
343         }
344 
345         if (buflen)
346                 *buflen = len;
347 
348         return data;
349 }
350 
351 int ustream_read(struct ustream *s, char *buf, int buflen)
352 {
353         char *chunk;
354         int chunk_len;
355         int len = 0;
356 
357         do {
358                 chunk = ustream_get_read_buf(s, &chunk_len);
359                 if (!chunk)
360                         break;
361                 if (chunk_len > buflen - len)
362                         chunk_len = buflen - len;
363                 memcpy(buf + len, chunk, chunk_len);
364                 ustream_consume(s, chunk_len);
365                 len += chunk_len;
366         } while (len < buflen);
367 
368         return len;
369 }
370 
371 static void ustream_write_error(struct ustream *s)
372 {
373         if (!s->write_error)
374                 ustream_state_change(s);
375         s->write_error = true;
376 }
377 
378 bool ustream_write_pending(struct ustream *s)
379 {
380         struct ustream_buf *buf = s->w.head;
381         int wr = 0, len;
382 
383         if (s->write_error)
384                 return false;
385 
386         while (buf && s->w.data_bytes) {
387                 struct ustream_buf *next = buf->next;
388                 int maxlen = buf->tail - buf->data;
389 
390                 len = s->write(s, buf->data, maxlen, !!buf->next);
391                 if (len < 0) {
392                         ustream_write_error(s);
393                         break;
394                 }
395 
396                 if (len == 0)
397                         break;
398 
399                 wr += len;
400                 s->w.data_bytes -= len;
401                 if (len < maxlen) {
402                         buf->data += len;
403                         break;
404                 }
405 
406                 ustream_free_buf(&s->w, buf);
407                 buf = next;
408         }
409 
410         if (s->notify_write)
411                 s->notify_write(s, wr);
412 
413         if (s->eof && wr && !s->w.data_bytes)
414                 ustream_state_change(s);
415 
416         return !s->w.data_bytes;
417 }
418 
419 static int ustream_write_buffered(struct ustream *s, const char *data, int len, int wr)
420 {
421         struct ustream_buf_list *l = &s->w;
422         struct ustream_buf *buf;
423         int maxlen;
424 
425         while (len) {
426                 if (!ustream_prepare_buf(s, &s->w, len))
427                         break;
428 
429                 buf = l->data_tail;
430 
431                 maxlen = buf->end - buf->tail;
432                 if (maxlen > len)
433                         maxlen = len;
434 
435                 memcpy(buf->tail, data, maxlen);
436                 buf->tail += maxlen;
437                 data += maxlen;
438                 len -= maxlen;
439                 wr += maxlen;
440                 l->data_bytes += maxlen;
441         }
442 
443         return wr;
444 }
445 
446 int ustream_write(struct ustream *s, const char *data, int len, bool more)
447 {
448         struct ustream_buf_list *l = &s->w;
449         int wr = 0;
450 
451         if (s->write_error)
452                 return 0;
453 
454         if (!l->data_bytes) {
455                 wr = s->write(s, data, len, more);
456                 if (wr == len)
457                         return wr;
458 
459                 if (wr < 0) {
460                         ustream_write_error(s);
461                         return wr;
462                 }
463 
464                 data += wr;
465                 len -= wr;
466         }
467 
468         return ustream_write_buffered(s, data, len, wr);
469 }
470 
471 #define MAX_STACK_BUFLEN        256
472 
473 int ustream_vprintf(struct ustream *s, const char *format, va_list arg)
474 {
475         struct ustream_buf_list *l = &s->w;
476         char *buf;
477         va_list arg2;
478         int wr, maxlen, buflen;
479 
480         if (s->write_error)
481                 return 0;
482 
483         if (!l->data_bytes) {
484                 buf = alloca(MAX_STACK_BUFLEN);
485                 va_copy(arg2, arg);
486                 maxlen = vsnprintf(buf, MAX_STACK_BUFLEN, format, arg2);
487                 va_end(arg2);
488                 if (maxlen < MAX_STACK_BUFLEN) {
489                         wr = s->write(s, buf, maxlen, false);
490                         if (wr < 0) {
491                                 ustream_write_error(s);
492                                 return wr;
493                         }
494                         if (wr == maxlen)
495                                 return wr;
496 
497                         buf += wr;
498                         maxlen -= wr;
499                         return ustream_write_buffered(s, buf, maxlen, wr);
500                 } else {
501                         buf = malloc(maxlen + 1);
502                         if (!buf)
503                                 return 0;
504                         wr = vsnprintf(buf, maxlen + 1, format, arg);
505                         wr = ustream_write(s, buf, wr, false);
506                         free(buf);
507                         return wr;
508                 }
509         }
510 
511         if (!ustream_prepare_buf(s, l, 1))
512                 return 0;
513 
514         buf = l->data_tail->tail;
515         buflen = l->data_tail->end - buf;
516 
517         va_copy(arg2, arg);
518         maxlen = vsnprintf(buf, buflen, format, arg2);
519         va_end(arg2);
520 
521         wr = maxlen;
522         if (wr >= buflen)
523                 wr = buflen - 1;
524 
525         l->data_tail->tail += wr;
526         l->data_bytes += wr;
527         if (maxlen < buflen)
528                 return wr;
529 
530         buf = malloc(maxlen + 1);
531         if (!buf)
532                 return wr;
533         maxlen = vsnprintf(buf, maxlen + 1, format, arg);
534         wr = ustream_write_buffered(s, buf + wr, maxlen - wr, wr);
535         free(buf);
536 
537         return wr;
538 }
539 
540 int ustream_printf(struct ustream *s, const char *format, ...)
541 {
542         va_list arg;
543         int ret;
544 
545         if (s->write_error)
546                 return 0;
547 
548         va_start(arg, format);
549         ret = ustream_vprintf(s, format, arg);
550         va_end(arg);
551 
552         return ret;
553 }
554 

This page was automatically generated by LXR 0.3.1.  •  OpenWrt