diff options
author | whitespace / reindent <invalid@invalid.invalid> | 2017-07-17 14:03:14 +0200 |
---|---|---|
committer | whitespace / reindent <invalid@invalid.invalid> | 2017-07-17 14:04:07 +0200 |
commit | d62a17aedeb0eebdba98238874bb13d62c48dbf9 (patch) | |
tree | 3b319b1d61c8b85b4d1f06adf8b844bb8a9b5107 /lib/workqueue.c | |
parent | *: add indent control files (diff) | |
download | frr-d62a17aedeb0eebdba98238874bb13d62c48dbf9.tar.xz frr-d62a17aedeb0eebdba98238874bb13d62c48dbf9.zip |
*: reindentreindent-master-after
indent.py `git ls-files | pcregrep '\.[ch]$' | pcregrep -v '^(ldpd|babeld|nhrpd)/'`
Signed-off-by: David Lamparter <equinox@opensourcerouting.org>
Diffstat (limited to 'lib/workqueue.c')
-rw-r--r-- | lib/workqueue.c | 573 |
1 files changed, 272 insertions, 301 deletions
diff --git a/lib/workqueue.c b/lib/workqueue.c index 60119f1f4..612421c80 100644 --- a/lib/workqueue.c +++ b/lib/workqueue.c @@ -1,4 +1,4 @@ -/* +/* * Quagga Work Queue Support. * * Copyright (C) 2005 Sun Microsystems, Inc. @@ -28,7 +28,7 @@ #include "command.h" #include "log.h" -DEFINE_MTYPE(LIB, WORK_QUEUE, "Work queue") +DEFINE_MTYPE(LIB, WORK_QUEUE, "Work queue") DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_ITEM, "Work queue item") DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_NAME, "Work queue name string") @@ -41,145 +41,131 @@ static struct list *work_queues = &_work_queues; #define WORK_QUEUE_MIN_GRANULARITY 1 -static struct work_queue_item * -work_queue_item_new (struct work_queue *wq) +static struct work_queue_item *work_queue_item_new(struct work_queue *wq) { - struct work_queue_item *item; - assert (wq); + struct work_queue_item *item; + assert(wq); - item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, - sizeof (struct work_queue_item)); - - return item; + item = XCALLOC(MTYPE_WORK_QUEUE_ITEM, sizeof(struct work_queue_item)); + + return item; } -static void -work_queue_item_free (struct work_queue_item *item) +static void work_queue_item_free(struct work_queue_item *item) { - XFREE (MTYPE_WORK_QUEUE_ITEM, item); - return; + XFREE(MTYPE_WORK_QUEUE_ITEM, item); + return; } /* create new work queue */ -struct work_queue * -work_queue_new (struct thread_master *m, const char *queue_name) +struct work_queue *work_queue_new(struct thread_master *m, + const char *queue_name) { - struct work_queue *new; - - new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue)); - - if (new == NULL) - return new; - - new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name); - new->master = m; - SET_FLAG (new->flags, WQ_UNPLUGGED); - - if ( (new->items = list_new ()) == NULL) - { - XFREE (MTYPE_WORK_QUEUE_NAME, new->name); - XFREE (MTYPE_WORK_QUEUE, new); - - return NULL; - } - - new->items->del = (void (*)(void *)) work_queue_item_free; - - listnode_add (work_queues, new); - - new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; - - /* Default values, can be overriden by caller */ - new->spec.hold = WORK_QUEUE_DEFAULT_HOLD; - new->spec.yield = THREAD_YIELD_TIME_SLOT; - - return new; + struct work_queue *new; + + new = XCALLOC(MTYPE_WORK_QUEUE, sizeof(struct work_queue)); + + if (new == NULL) + return new; + + new->name = XSTRDUP(MTYPE_WORK_QUEUE_NAME, queue_name); + new->master = m; + SET_FLAG(new->flags, WQ_UNPLUGGED); + + if ((new->items = list_new()) == NULL) { + XFREE(MTYPE_WORK_QUEUE_NAME, new->name); + XFREE(MTYPE_WORK_QUEUE, new); + + return NULL; + } + + new->items->del = (void (*)(void *))work_queue_item_free; + + listnode_add(work_queues, new); + + new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; + + /* Default values, can be overriden by caller */ + new->spec.hold = WORK_QUEUE_DEFAULT_HOLD; + new->spec.yield = THREAD_YIELD_TIME_SLOT; + + return new; } -void -work_queue_free (struct work_queue *wq) +void work_queue_free(struct work_queue *wq) { - if (wq->thread != NULL) - thread_cancel(wq->thread); - - /* list_delete frees items via callback */ - list_delete (wq->items); - listnode_delete (work_queues, wq); - - XFREE (MTYPE_WORK_QUEUE_NAME, wq->name); - XFREE (MTYPE_WORK_QUEUE, wq); - return; + if (wq->thread != NULL) + thread_cancel(wq->thread); + + /* list_delete frees items via callback */ + list_delete(wq->items); + listnode_delete(work_queues, wq); + + XFREE(MTYPE_WORK_QUEUE_NAME, wq->name); + XFREE(MTYPE_WORK_QUEUE, wq); + return; } -bool -work_queue_is_scheduled (struct work_queue *wq) +bool work_queue_is_scheduled(struct work_queue *wq) { - return (wq->thread != NULL); + return (wq->thread != NULL); } -static int -work_queue_schedule (struct work_queue *wq, unsigned int delay) +static int work_queue_schedule(struct work_queue *wq, unsigned int delay) { - /* if appropriate, schedule work queue thread */ - if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED) - && (wq->thread == NULL) - && (listcount (wq->items) > 0) ) - { - wq->thread = NULL; - thread_add_timer_msec (wq->master, work_queue_run, wq, delay, - &wq->thread); - /* set thread yield time, if needed */ - if (wq->thread && wq->spec.yield != THREAD_YIELD_TIME_SLOT) - thread_set_yield_time (wq->thread, wq->spec.yield); - return 1; - } - else - return 0; + /* if appropriate, schedule work queue thread */ + if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL) + && (listcount(wq->items) > 0)) { + wq->thread = NULL; + thread_add_timer_msec(wq->master, work_queue_run, wq, delay, + &wq->thread); + /* set thread yield time, if needed */ + if (wq->thread && wq->spec.yield != THREAD_YIELD_TIME_SLOT) + thread_set_yield_time(wq->thread, wq->spec.yield); + return 1; + } else + return 0; } - -void -work_queue_add (struct work_queue *wq, void *data) + +void work_queue_add(struct work_queue *wq, void *data) { - struct work_queue_item *item; - - assert (wq); - - if (!(item = work_queue_item_new (wq))) - { - zlog_err ("%s: unable to get new queue item", __func__); - return; - } - - item->data = data; - listnode_add (wq->items, item); - - work_queue_schedule (wq, wq->spec.hold); - - return; + struct work_queue_item *item; + + assert(wq); + + if (!(item = work_queue_item_new(wq))) { + zlog_err("%s: unable to get new queue item", __func__); + return; + } + + item->data = data; + listnode_add(wq->items, item); + + work_queue_schedule(wq, wq->spec.hold); + + return; } -static void -work_queue_item_remove (struct work_queue *wq, struct listnode *ln) +static void work_queue_item_remove(struct work_queue *wq, struct listnode *ln) { - struct work_queue_item *item = listgetdata (ln); + struct work_queue_item *item = listgetdata(ln); - assert (item && item->data); + assert(item && item->data); - /* call private data deletion callback if needed */ - if (wq->spec.del_item_data) - wq->spec.del_item_data (wq, item->data); + /* call private data deletion callback if needed */ + if (wq->spec.del_item_data) + wq->spec.del_item_data(wq, item->data); - list_delete_node (wq->items, ln); - work_queue_item_free (item); - - return; + list_delete_node(wq->items, ln); + work_queue_item_free(item); + + return; } -static void -work_queue_item_requeue (struct work_queue *wq, struct listnode *ln) +static void work_queue_item_requeue(struct work_queue *wq, struct listnode *ln) { - LISTNODE_DETACH (wq->items, ln); - LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */ + LISTNODE_DETACH(wq->items, ln); + LISTNODE_ATTACH(wq->items, ln); /* attach to end of list */ } DEFUN (show_work_queues, @@ -188,227 +174,212 @@ DEFUN (show_work_queues, SHOW_STR "Work Queue information\n") { - struct listnode *node; - struct work_queue *wq; - - vty_out (vty, - "%c %8s %5s %8s %8s %21s\n", - ' ', "List","(ms) ","Q. Runs","Yields","Cycle Counts "); - vty_out (vty, - "%c %8s %5s %8s %8s %7s %6s %8s %6s %s\n", - 'P', - "Items", - "Hold", - "Total","Total", - "Best","Gran.","Total","Avg.", - "Name"); - - for (ALL_LIST_ELEMENTS_RO (work_queues, node, wq)) - { - vty_out (vty,"%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n", - (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'), - listcount (wq->items), - wq->spec.hold, - wq->runs, wq->yields, - wq->cycles.best, wq->cycles.granularity, wq->cycles.total, - (wq->runs) ? - (unsigned int) (wq->cycles.total / wq->runs) : 0, - wq->name); - } - - return CMD_SUCCESS; + struct listnode *node; + struct work_queue *wq; + + vty_out(vty, "%c %8s %5s %8s %8s %21s\n", ' ', "List", "(ms) ", + "Q. Runs", "Yields", "Cycle Counts "); + vty_out(vty, "%c %8s %5s %8s %8s %7s %6s %8s %6s %s\n", 'P', "Items", + "Hold", "Total", "Total", "Best", "Gran.", "Total", "Avg.", + "Name"); + + for (ALL_LIST_ELEMENTS_RO(work_queues, node, wq)) { + vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n", + (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'), + listcount(wq->items), wq->spec.hold, wq->runs, + wq->yields, wq->cycles.best, wq->cycles.granularity, + wq->cycles.total, + (wq->runs) ? (unsigned int)(wq->cycles.total / wq->runs) + : 0, + wq->name); + } + + return CMD_SUCCESS; } -void -workqueue_cmd_init (void) +void workqueue_cmd_init(void) { - install_element (VIEW_NODE, &show_work_queues_cmd); + install_element(VIEW_NODE, &show_work_queues_cmd); } /* 'plug' a queue: Stop it from being scheduled, * ie: prevent the queue from draining. */ -void -work_queue_plug (struct work_queue *wq) +void work_queue_plug(struct work_queue *wq) { - if (wq->thread) - thread_cancel (wq->thread); - - wq->thread = NULL; - - UNSET_FLAG (wq->flags, WQ_UNPLUGGED); + if (wq->thread) + thread_cancel(wq->thread); + + wq->thread = NULL; + + UNSET_FLAG(wq->flags, WQ_UNPLUGGED); } /* unplug queue, schedule it again, if appropriate * Ie: Allow the queue to be drained again */ -void -work_queue_unplug (struct work_queue *wq) +void work_queue_unplug(struct work_queue *wq) { - SET_FLAG (wq->flags, WQ_UNPLUGGED); + SET_FLAG(wq->flags, WQ_UNPLUGGED); - /* if thread isnt already waiting, add one */ - work_queue_schedule (wq, wq->spec.hold); + /* if thread isnt already waiting, add one */ + work_queue_schedule(wq, wq->spec.hold); } /* timer thread to process a work queue * will reschedule itself if required, - * otherwise work_queue_item_add + * otherwise work_queue_item_add */ -int -work_queue_run (struct thread *thread) +int work_queue_run(struct thread *thread) { - struct work_queue *wq; - struct work_queue_item *item; - wq_item_status ret; - unsigned int cycles = 0; - struct listnode *node, *nnode; - char yielded = 0; - - wq = THREAD_ARG (thread); - wq->thread = NULL; - - assert (wq && wq->items); - - /* calculate cycle granularity: - * list iteration == 1 run - * listnode processing == 1 cycle - * granularity == # cycles between checks whether we should yield. - * - * granularity should be > 0, and can increase slowly after each run to - * provide some hysteris, but not past cycles.best or 2*cycles. - * - * Best: starts low, can only increase - * - * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased - * if we run to end of time slot, can increase otherwise - * by a small factor. - * - * We could use just the average and save some work, however we want to be - * able to adjust quickly to CPU pressure. Average wont shift much if - * daemon has been running a long time. - */ - if (wq->cycles.granularity == 0) - wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; - - for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item)) - { - assert (item && item->data); - - /* dont run items which are past their allowed retries */ - if (item->ran > wq->spec.max_retries) - { - /* run error handler, if any */ - if (wq->spec.errorfunc) - wq->spec.errorfunc (wq, item->data); - work_queue_item_remove (wq, node); - continue; - } - - /* run and take care of items that want to be retried immediately */ - do - { - ret = wq->spec.workfunc (wq, item->data); - item->ran++; - } - while ((ret == WQ_RETRY_NOW) - && (item->ran < wq->spec.max_retries)); - - switch (ret) - { - case WQ_QUEUE_BLOCKED: - { - /* decrement item->ran again, cause this isn't an item - * specific error, and fall through to WQ_RETRY_LATER - */ - item->ran--; - } - case WQ_RETRY_LATER: - { - goto stats; - } - case WQ_REQUEUE: - { - item->ran--; - work_queue_item_requeue (wq, node); - /* If a single node is being used with a meta-queue (e.g., zebra), - * update the next node as we don't want to exit the thread and - * reschedule it after every node. By definition, WQ_REQUEUE is - * meant to continue the processing; the yield logic will kick in - * to terminate the thread when time has exceeded. - */ - if (nnode == NULL) - nnode = node; - break; + struct work_queue *wq; + struct work_queue_item *item; + wq_item_status ret; + unsigned int cycles = 0; + struct listnode *node, *nnode; + char yielded = 0; + + wq = THREAD_ARG(thread); + wq->thread = NULL; + + assert(wq && wq->items); + + /* calculate cycle granularity: + * list iteration == 1 run + * listnode processing == 1 cycle + * granularity == # cycles between checks whether we should yield. + * + * granularity should be > 0, and can increase slowly after each run to + * provide some hysteris, but not past cycles.best or 2*cycles. + * + * Best: starts low, can only increase + * + * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased + * if we run to end of time slot, can increase otherwise + * by a small factor. + * + * We could use just the average and save some work, however we want to + * be + * able to adjust quickly to CPU pressure. Average wont shift much if + * daemon has been running a long time. + */ + if (wq->cycles.granularity == 0) + wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; + + for (ALL_LIST_ELEMENTS(wq->items, node, nnode, item)) { + assert(item && item->data); + + /* dont run items which are past their allowed retries */ + if (item->ran > wq->spec.max_retries) { + /* run error handler, if any */ + if (wq->spec.errorfunc) + wq->spec.errorfunc(wq, item->data); + work_queue_item_remove(wq, node); + continue; + } + + /* run and take care of items that want to be retried + * immediately */ + do { + ret = wq->spec.workfunc(wq, item->data); + item->ran++; + } while ((ret == WQ_RETRY_NOW) + && (item->ran < wq->spec.max_retries)); + + switch (ret) { + case WQ_QUEUE_BLOCKED: { + /* decrement item->ran again, cause this isn't an item + * specific error, and fall through to WQ_RETRY_LATER + */ + item->ran--; + } + case WQ_RETRY_LATER: { + goto stats; + } + case WQ_REQUEUE: { + item->ran--; + work_queue_item_requeue(wq, node); + /* If a single node is being used with a meta-queue + * (e.g., zebra), + * update the next node as we don't want to exit the + * thread and + * reschedule it after every node. By definition, + * WQ_REQUEUE is + * meant to continue the processing; the yield logic + * will kick in + * to terminate the thread when time has exceeded. + */ + if (nnode == NULL) + nnode = node; + break; + } + case WQ_RETRY_NOW: + /* a RETRY_NOW that gets here has exceeded max_tries, same as + * ERROR */ + case WQ_ERROR: { + if (wq->spec.errorfunc) + wq->spec.errorfunc(wq, item); + } + /* fallthru */ + case WQ_SUCCESS: + default: { + work_queue_item_remove(wq, node); + break; + } + } + + /* completed cycle */ + cycles++; + + /* test if we should yield */ + if (!(cycles % wq->cycles.granularity) + && thread_should_yield(thread)) { + yielded = 1; + goto stats; + } } - case WQ_RETRY_NOW: - /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */ - case WQ_ERROR: - { - if (wq->spec.errorfunc) - wq->spec.errorfunc (wq, item); - } - /* fallthru */ - case WQ_SUCCESS: - default: - { - work_queue_item_remove (wq, node); - break; - } - } - - /* completed cycle */ - cycles++; - - /* test if we should yield */ - if ( !(cycles % wq->cycles.granularity) - && thread_should_yield (thread)) - { - yielded = 1; - goto stats; - } - } stats: #define WQ_HYSTERESIS_FACTOR 4 - /* we yielded, check whether granularity should be reduced */ - if (yielded && (cycles < wq->cycles.granularity)) - { - wq->cycles.granularity = ((cycles > 0) ? cycles - : WORK_QUEUE_MIN_GRANULARITY); - } - /* otherwise, should granularity increase? */ - else if (cycles >= (wq->cycles.granularity)) - { - if (cycles > wq->cycles.best) - wq->cycles.best = cycles; - - /* along with yielded check, provides hysteresis for granularity */ - if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR - * WQ_HYSTERESIS_FACTOR)) - wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */ - else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR)) - wq->cycles.granularity += WQ_HYSTERESIS_FACTOR; - } + /* we yielded, check whether granularity should be reduced */ + if (yielded && (cycles < wq->cycles.granularity)) { + wq->cycles.granularity = + ((cycles > 0) ? cycles : WORK_QUEUE_MIN_GRANULARITY); + } + /* otherwise, should granularity increase? */ + else if (cycles >= (wq->cycles.granularity)) { + if (cycles > wq->cycles.best) + wq->cycles.best = cycles; + + /* along with yielded check, provides hysteresis for granularity + */ + if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR + * WQ_HYSTERESIS_FACTOR)) + wq->cycles.granularity *= + WQ_HYSTERESIS_FACTOR; /* quick ramp-up */ + else if (cycles + > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR)) + wq->cycles.granularity += WQ_HYSTERESIS_FACTOR; + } #undef WQ_HYSTERIS_FACTOR - - wq->runs++; - wq->cycles.total += cycles; - if (yielded) - wq->yields++; + + wq->runs++; + wq->cycles.total += cycles; + if (yielded) + wq->yields++; #if 0 printf ("%s: cycles %d, new: best %d, worst %d\n", __func__, cycles, wq->cycles.best, wq->cycles.granularity); #endif - - /* Is the queue done yet? If it is, call the completion callback. */ - if (listcount (wq->items) > 0) - work_queue_schedule (wq, 0); - else if (wq->spec.completion_func) - wq->spec.completion_func (wq); - - return 0; + + /* Is the queue done yet? If it is, call the completion callback. */ + if (listcount(wq->items) > 0) + work_queue_schedule(wq, 0); + else if (wq->spec.completion_func) + wq->spec.completion_func(wq); + + return 0; } |