• source navigation  • diff markup  • identifier search  • freetext search  • 

Sources/omcproxy/src/groups.c

  1 /*
  2  * Author: Steven Barth <steven at midlink.org>
  3  *
  4  * Copyright 2015 Deutsche Telekom AG
  5  *
  6  * Licensed under the Apache License, Version 2.0 (the "License");
  7  * you may not use this file except in compliance with the License.
  8  * You may obtain a copy of the License at
  9  *
 10  *  http://www.apache.org/licenses/LICENSE-2.0
 11  *
 12  * Unless required by applicable law or agreed to in writing, software
 13  * distributed under the License is distributed on an "AS IS" BASIS,
 14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  * See the License for the specific language governing permissions and
 16  * limitations under the License.
 17  *
 18  */
 19 
 20 #include <string.h>
 21 #include <stdlib.h>
 22 #include <errno.h>
 23 #include "groups.h"
 24 
 25 
 26 // Group comparator for AVL-tree
 27 static int compare_groups(const void *k1, const void *k2, __unused void *ptr)
 28 {
 29         return memcmp(k1, k2, sizeof(struct in6_addr));
 30 }
 31 
 32 // Remove a source-definition for a group
 33 static void querier_remove_source(struct group *group, struct group_source *source)
 34 {
 35         --group->source_count;
 36         list_del(&source->head);
 37         free(source);
 38 }
 39 
 40 // Clear all sources of a certain group
 41 static void querier_clear_sources(struct group *group)
 42 {
 43         struct group_source *s, *n;
 44         list_for_each_entry_safe(s, n, &group->sources, head)
 45                 querier_remove_source(group, s);
 46 }
 47 
 48 // Remove a group and all associated sources from the group state
 49 static void querier_remove_group(struct groups *groups, struct group *group, omgp_time_t now)
 50 {
 51         querier_clear_sources(group);
 52         group->exclude_until = 0;
 53 
 54         if (groups->cb_update)
 55                 groups->cb_update(groups, group, now);
 56 
 57         avl_delete(&groups->groups, &group->node);
 58         free(group);
 59 }
 60 
 61 // Expire a group and / or its associated sources depending on the current time
 62 static omgp_time_t expire_group(struct groups *groups, struct group *group,
 63                 omgp_time_t now, omgp_time_t next_event)
 64 {
 65         struct groups_config *cfg = IN6_IS_ADDR_V4MAPPED(&group->addr) ? &groups->cfg_v4 : &groups->cfg_v6;
 66         omgp_time_t llqi = now + cfg->last_listener_query_interval;
 67         omgp_time_t llqt = now + (cfg->last_listener_query_interval * cfg->last_listener_query_count);
 68 
 69         // Handle group and source-specific query retransmission
 70         struct list_head suppressed = LIST_HEAD_INIT(suppressed);
 71         struct list_head unsuppressed = LIST_HEAD_INIT(unsuppressed);
 72         struct group_source *s, *s2;
 73 
 74         if (group->next_source_transmit > 0 && group->next_source_transmit <= now) {
 75                 group->next_source_transmit = 0;
 76 
 77                 list_for_each_entry_safe(s, s2, &group->sources, head) {
 78                         if (s->retransmit > 0) {
 79                                 list_move_tail(&s->head, (s->include_until > llqt) ? &suppressed : &unsuppressed);
 80                                 --s->retransmit;
 81                         }
 82 
 83                         if (s->retransmit > 0)
 84                                 group->next_source_transmit = llqi;
 85                 }
 86         }
 87 
 88         if (group->next_source_transmit > 0 && group->next_source_transmit < next_event)
 89                 next_event = group->next_source_transmit;
 90 
 91         // Handle group-specific query retransmission
 92         if (group->retransmit > 0 && group->next_generic_transmit <= now) {
 93                 group->next_generic_transmit = 0;
 94 
 95                 if (groups->cb_query)
 96                         groups->cb_query(groups, &group->addr, NULL, group->exclude_until > llqt);
 97 
 98                 --group->retransmit;
 99 
100                 if (group->retransmit > 0)
101                         group->next_generic_transmit = llqi;
102 
103                 // Skip suppresed source-specific query (RFC 3810 7.6.3.2)
104                 list_splice_init(&suppressed, &group->sources);
105         }
106 
107         if (group->next_generic_transmit > 0 && group->next_generic_transmit < next_event)
108                 next_event = group->next_generic_transmit;
109 
110         if (!list_empty(&suppressed)) {
111                 if (groups->cb_query)
112                                 groups->cb_query(groups, &group->addr, &suppressed, true);
113 
114                 list_splice(&suppressed, &group->sources);
115         }
116 
117         if (!list_empty(&unsuppressed)) {
118                 if (groups->cb_query)
119                                 groups->cb_query(groups, &group->addr, &unsuppressed, false);
120 
121                 list_splice(&unsuppressed, &group->sources);
122         }
123 
124         // Handle source and group expiry
125         bool changed = false;
126         if (group->exclude_until > 0) {
127                 if (group_is_included(group, now)) {
128                         // Leaving exclude mode
129                         group->exclude_until = 0;
130                         changed = true;
131                 } else if (group->exclude_until < next_event) {
132                         next_event = group->exclude_until;
133                 }
134         }
135 
136         list_for_each_entry_safe(s, s2, &group->sources, head) {
137                 if (s->include_until > 0) {
138                         if (!source_is_included(s, now)) {
139                                 s->include_until = 0;
140                                 changed = true;
141                         } else if (s->include_until < next_event) {
142                                 next_event = s->include_until;
143                         }
144                 }
145 
146                 if (group->exclude_until == 0 && s->include_until == 0)
147                         querier_remove_source(group, s);
148         }
149 
150         if (group->exclude_until == 0 && group->source_count == 0)
151                 querier_remove_group(groups, group, now);
152         else if (changed && groups->cb_update)
153                 groups->cb_update(groups, group, now);
154 
155         return next_event;
156 }
157 
158 // Rearm the global groups-timer if the next event is before timer expiration
159 static void rearm_timer(struct groups *groups, int msecs)
160 {
161         int64_t remain = uloop_timeout_remaining64(&groups->timer);
162         if (remain < 0 || remain >= msecs)
163                 uloop_timeout_set(&groups->timer, msecs);
164 }
165 
166 // Expire all groups of a group-state (called by timer as callback)
167 static void expire_groups(struct uloop_timeout *t)
168 {
169         struct groups *groups = container_of(t, struct groups, timer);
170         omgp_time_t now = omgp_time();
171         omgp_time_t next_event = now + 3600 * OMGP_TIME_PER_SECOND;
172 
173         struct group *group, *n;
174         avl_for_each_element_safe(&groups->groups, group, node, n)
175                 next_event = expire_group(groups, group, now, next_event);
176 
177         rearm_timer(groups, (next_event > now) ? next_event - now : 0);
178 }
179 
180 // Initialize a group-state
181 void groups_init(struct groups *groups)
182 {
183         avl_init(&groups->groups, compare_groups, false, NULL);
184         groups->timer.cb = expire_groups;
185 
186         groups_update_config(groups, false, OMGP_TIME_PER_SECOND / 10,
187                         125 * OMGP_TIME_PER_SECOND, 2);
188         groups_update_config(groups, true, OMGP_TIME_PER_SECOND / 10,
189                                 125 * OMGP_TIME_PER_SECOND, 2);
190 }
191 
192 // Cleanup a group-state
193 void groups_deinit(struct groups *groups)
194 {
195         omgp_time_t now = omgp_time();
196         struct group *group, *safe;
197         avl_for_each_element_safe(&groups->groups, group, node, safe)
198                 querier_remove_group(groups, group, now);
199         uloop_timeout_cancel(&groups->timer);
200 }
201 
202 // Get group-object for a given group, create if requested
203 static struct group* groups_get_group(struct groups *groups,
204                 const struct in6_addr *addr, bool *created)
205 {
206         struct group *group = avl_find_element(&groups->groups, addr, group, node);
207         if (!group && created && (group = calloc(1, sizeof(*group)))) {
208                 group->addr = *addr;
209                 group->node.key = &group->addr;
210                 avl_insert(&groups->groups, &group->node);
211 
212                 INIT_LIST_HEAD(&group->sources);
213                 *created = true;
214         } else if (created) {
215                 *created = false;
216         }
217         return group;
218 }
219 
220 // Get source-object for a given source, create if requested
221 static struct group_source* groups_get_source(struct groups *groups,
222                 struct group *group, const struct in6_addr *addr, bool *created)
223 {
224         struct group_source *c, *source = NULL;
225         group_for_each_source(c, group)
226                 if (IN6_ARE_ADDR_EQUAL(&c->addr, addr))
227                         source = c;
228 
229         if (!source && created && group->source_count < groups->source_limit &&
230                         (source = calloc(1, sizeof(*source)))) {
231                 source->addr = *addr;
232                 list_add_tail(&source->head, &group->sources);
233                 ++group->source_count;
234                 *created = true;
235         } else if (created) {
236                 *created = false;
237         }
238 
239         return source;
240 }
241 
242 // Update the IGMP/MLD timers of a group-state
243 void groups_update_config(struct groups *groups, bool v6,
244                 omgp_time_t query_response_interval, omgp_time_t query_interval, int robustness)
245 {
246         struct groups_config *cfg = v6 ? &groups->cfg_v6 : &groups->cfg_v4;
247         cfg->query_response_interval = query_response_interval;
248         cfg->query_interval = query_interval;
249         cfg->robustness = robustness;
250         cfg->last_listener_query_count = cfg->robustness;
251         cfg->last_listener_query_interval = 1 * OMGP_TIME_PER_SECOND;
252 }
253 
254 // Update timers for a given group (called when receiving queries from other queriers)
255 void groups_update_timers(struct groups *groups,
256                 const struct in6_addr *groupaddr,
257                 const struct in6_addr *addrs, size_t len)
258 {
259         char addrbuf[INET6_ADDRSTRLEN];
260         inet_ntop(AF_INET6, groupaddr, addrbuf, sizeof(addrbuf));
261         struct group *group = groups_get_group(groups, groupaddr, NULL);
262         if (!group) {
263                 L_WARN("%s: failed to update timer: no such group %s", __FUNCTION__, addrbuf);
264                 return;
265         }
266 
267         struct groups_config *cfg = IN6_IS_ADDR_V4MAPPED(&group->addr) ? &groups->cfg_v4 : &groups->cfg_v6;
268         omgp_time_t now = omgp_time();
269         omgp_time_t llqt = now + (cfg->last_listener_query_count * cfg->last_listener_query_interval);
270 
271         if (len == 0) {
272                 if (group->exclude_until > llqt)
273                         group->exclude_until = llqt;
274         } else {
275                 for (size_t i = 0; i < len; ++i) {
276                         struct group_source *source = groups_get_source(groups, group, &addrs[i], NULL);
277                         if (!source) {
278                                 L_WARN("%s: failed to update timer: unknown sources for group %s", __FUNCTION__, addrbuf);
279                                 continue;
280                         }
281 
282                         if (source->include_until > llqt)
283                                 source->include_until = llqt;
284                 }
285         }
286 
287         rearm_timer(groups, llqt - now);
288 }
289 
290 // Update state of a given group (on reception of node's IGMP/MLD packets)
291 void groups_update_state(struct groups *groups,
292                 const struct in6_addr *groupaddr,
293                 const struct in6_addr *addrs, size_t len,
294                 enum groups_update update)
295 {
296         bool created = false, changed = false;
297         char addrbuf[INET6_ADDRSTRLEN];
298         inet_ntop(AF_INET6, groupaddr, addrbuf, sizeof(addrbuf));
299         L_DEBUG("%s: %s (+%d sources) => %d", __FUNCTION__, addrbuf, (int)len, update);
300 
301         struct group *group = groups_get_group(groups, groupaddr, &created);
302         if (!group) {
303                 L_ERR("querier_state: failed to allocate group for %s", addrbuf);
304                 return;
305         }
306 
307         if (created)
308                 changed = true;
309 
310         omgp_time_t now = omgp_time();
311         omgp_time_t next_event = OMGP_TIME_MAX;
312         struct groups_config *cfg = IN6_IS_ADDR_V4MAPPED(&group->addr) ? &groups->cfg_v4 : &groups->cfg_v6;
313 
314         // Backwards compatibility modes
315         if (group->compat_v2_until > now || group->compat_v1_until > now) {
316                 if (update == UPDATE_BLOCK)
317                         return;
318 
319                 if (group->compat_v1_until > now && (update == UPDATE_DONE || update == UPDATE_TO_IN))
320                         return;
321 
322                 if (update == UPDATE_TO_EX)
323                         len = 0;
324         }
325 
326         if (update == UPDATE_REPORT || update == UPDATE_REPORT_V1 || update == UPDATE_DONE) {
327                 omgp_time_t compat_until = now + cfg->query_response_interval +
328                                 (cfg->robustness * cfg->query_interval);
329 
330                 if (update == UPDATE_REPORT_V1)
331                         group->compat_v1_until = compat_until;
332                 else if (update == UPDATE_REPORT)
333                         group->compat_v2_until = compat_until;
334 
335                 update = (update == UPDATE_DONE) ? UPDATE_TO_IN : UPDATE_IS_EXCLUDE;
336                 len = 0;
337         }
338 
339         bool include = group->exclude_until <= now;
340         bool is_include = update == UPDATE_IS_INCLUDE || update == UPDATE_TO_IN || update == UPDATE_ALLOW;
341 
342         int llqc = cfg->last_listener_query_count;
343         omgp_time_t mali = now + (cfg->robustness * cfg->query_interval) + cfg->query_response_interval;
344         omgp_time_t llqt = now + (cfg->last_listener_query_interval * llqc);
345 
346         // RFC 3810 7.4
347         struct list_head saved = LIST_HEAD_INIT(saved);
348         struct list_head queried = LIST_HEAD_INIT(queried);
349         for (size_t i = 0; i < len; ++i) {
350                 bool *create = (include && update == UPDATE_BLOCK) ? NULL : &created;
351                 struct group_source *source = groups_get_source(groups, group, &addrs[i], create);
352 
353                 if (include && update == UPDATE_BLOCK) {
354                         if (source)
355                                 list_move_tail(&source->head, &queried);
356                 } else {
357                         bool query = false;
358                         if (!source) {
359                                 groups_update_state(groups, groupaddr, NULL, 0, false);
360                                 L_WARN("querier: failed to allocate source for %s, fallback to ASM", addrbuf);
361                                 return;
362                         }
363 
364                         if (created)
365                                 changed = true;
366                         else if (include && update == UPDATE_TO_EX)
367                                 query = true;
368 
369                         if (source->include_until <= now && update == UPDATE_SET_IN) {
370                                 source->include_until = mali;
371                                 changed = true;
372                         } else if (source->include_until > now && update == UPDATE_SET_EX) {
373                                 source->include_until = now;
374                                 changed = true;
375                         }
376 
377                         if (!include && (update == UPDATE_BLOCK || update == UPDATE_TO_EX) &&
378                                         (created || source->include_until > now))
379                                 query = true;
380 
381                         if ((is_include || (!include && created))) {
382                                 if (source->include_until <= now)
383                                         changed = true;
384 
385                                 source->include_until = (is_include || update == UPDATE_IS_EXCLUDE)
386                                                 ? mali : group->exclude_until;
387 
388                                 if (next_event > mali)
389                                         next_event = mali;
390                         }
391 
392                         if (query)
393                                 list_move_tail(&source->head, &queried);
394                         else if (update == UPDATE_IS_EXCLUDE || update == UPDATE_TO_EX ||
395                                         update == UPDATE_SET_EX || update == UPDATE_SET_IN)
396                                 list_move_tail(&source->head, &saved);
397                 }
398         }
399 
400         if (update == UPDATE_IS_EXCLUDE || update == UPDATE_TO_EX || update == UPDATE_SET_EX) {
401                 if (include || !list_empty(&group->sources))
402                         changed = true;
403 
404                 querier_clear_sources(group);
405                 list_splice(&saved, &group->sources);
406                 group->exclude_until = mali;
407 
408                 if (next_event > mali)
409                         next_event = mali;
410         }
411 
412         if (update == UPDATE_SET_IN) {
413                 if (!include || !list_empty(&group->sources)) {
414                         changed = true;
415                         next_event = now;
416                 }
417 
418                 querier_clear_sources(group);
419                 list_splice(&saved, &group->sources);
420                 group->exclude_until = now;
421         }
422 
423         // Prepare queries
424         if (update == UPDATE_TO_IN) {
425                 struct group_source *source, *n;
426                 list_for_each_entry_safe(source, n, &group->sources, head) {
427                         if (source->include_until <= now)
428                                 continue;
429 
430                         size_t i;
431                         for (i = 0; i < len && !IN6_ARE_ADDR_EQUAL(&source->addr, &addrs[i]); ++i);
432                         if (i == len)
433                                 list_move_tail(&source->head, &queried);
434                 }
435         }
436 
437         if (!list_empty(&queried)) {
438                 struct group_source *source;
439                 list_for_each_entry(source, &queried, head) {
440                         if (source->include_until > llqt)
441                                 source->include_until = llqt;
442 
443                         group->next_source_transmit = now;
444                         source->retransmit = llqc;
445                 }
446 
447                 next_event = now;
448                 list_splice(&queried, &group->sources);
449         }
450 
451         if (!include && update == UPDATE_TO_IN) {
452                 if (group->exclude_until > llqt)
453                         group->exclude_until = llqt;
454 
455                 group->next_generic_transmit = now;
456                 group->retransmit = llqc;
457                 next_event = now;
458         }
459 
460         if (changed && groups->cb_update)
461                 groups->cb_update(groups, group, now);
462 
463         if (group_is_included(group, now) && group->source_count == 0)
464                 next_event = now;
465 
466         if (next_event < OMGP_TIME_MAX)
467                 rearm_timer(groups, next_event - now);
468 
469         if (changed)
470                 L_DEBUG("%s: %s => %s (+%d sources)", __FUNCTION__, addrbuf,
471                                 (group_is_included(group, now)) ? "included" : "excluded",
472                                 (int)group->source_count);
473 
474 }
475 
476 // Get group object of a given group
477 const struct group* groups_get(struct groups *groups, const struct in6_addr *addr)
478 {
479         return groups_get_group(groups, addr, NULL);
480 }
481 
482 // Test if a group (and source) is requested in the current group state
483 // (i.e. for deciding if it should be routed / forwarded)
484 bool groups_includes_group(struct groups *groups, const struct in6_addr *addr,
485                 const struct in6_addr *src, omgp_time_t time)
486 {
487         struct group *group = groups_get_group(groups, addr, NULL);
488         if (group) {
489                 if (!src && (!group_is_included(group, time) || group->source_count > 0))
490                         return true;
491 
492                 struct group_source *source = groups_get_source(groups, group, src, NULL);
493                 if ((!group_is_included(group, time) && (!source || source_is_included(source, time))) ||
494                                 (group_is_included(group, time) && source && source_is_included(source, time)))
495                         return true;
496         }
497         return false;
498 }
499 

This page was automatically generated by LXR 0.3.1.  •  OpenWrt