diff options
author | Quentin Young <qlyoung@cumulusnetworks.com> | 2018-04-21 22:51:54 +0200 |
---|---|---|
committer | Quentin Young <qlyoung@cumulusnetworks.com> | 2018-05-07 17:37:07 +0200 |
commit | 363e24c65140d07c2ced580698bd1427c6b99a55 (patch) | |
tree | 9fa78138b63b883535e8b1a8f099d7fd721eb17a | |
parent | Merge pull request #2156 from donaldsharp/zebra_doc (diff) | |
download | frr-363e24c65140d07c2ced580698bd1427c6b99a55.tar.xz frr-363e24c65140d07c2ced580698bd1427c6b99a55.zip |
lib: add mt-safe variants for stream_fifo ops
stream_fifo is used as our standard internal message queue. Message
queues are useful in multithreaded environments. Up until now I have
been doing my own synchronization when using stream_fifo in this way;
this patch gets rid of the need for that boilerplate and decreases the
risk of locking mistakes when working with this datastructure.
Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
-rw-r--r-- | lib/stream.c | 60 | ||||
-rw-r--r-- | lib/stream.h | 93 |
2 files changed, 147 insertions, 6 deletions
diff --git a/lib/stream.c b/lib/stream.c index c4edd3d5b..aba4c2016 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -21,6 +21,7 @@ #include <zebra.h> #include <stddef.h> +#include <pthread.h> #include "stream.h" #include "memory.h" @@ -1101,6 +1102,7 @@ struct stream_fifo *stream_fifo_new(void) struct stream_fifo *new; new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo)); + pthread_mutex_init(&new->mtx, NULL); return new; } @@ -1115,7 +1117,16 @@ void stream_fifo_push(struct stream_fifo *fifo, struct stream *s) fifo->tail = s; fifo->tail->next = NULL; - fifo->count++; + atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release); +} + +void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s) +{ + pthread_mutex_lock(&fifo->mtx); + { + stream_fifo_push(fifo, s); + } + pthread_mutex_unlock(&fifo->mtx); } /* Delete first stream from fifo. */ @@ -1131,7 +1142,8 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo) if (fifo->head == NULL) fifo->tail = NULL; - fifo->count--; + atomic_fetch_sub_explicit(&fifo->count, 1, + memory_order_release); /* ensure stream is scrubbed of references to this fifo */ s->next = NULL; @@ -1140,12 +1152,37 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo) return s; } -/* Return first fifo entry. */ +struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo) +{ + struct stream *ret; + + pthread_mutex_lock(&fifo->mtx); + { + ret = stream_fifo_pop(fifo); + } + pthread_mutex_unlock(&fifo->mtx); + + return ret; +} + struct stream *stream_fifo_head(struct stream_fifo *fifo) { return fifo->head; } +struct stream *stream_fifo_head_safe(struct stream_fifo *fifo) +{ + struct stream *ret; + + pthread_mutex_lock(&fifo->mtx); + { + ret = stream_fifo_head(fifo); + } + pthread_mutex_unlock(&fifo->mtx); + + return ret; +} + void stream_fifo_clean(struct stream_fifo *fifo) { struct stream *s; @@ -1156,11 +1193,26 @@ void stream_fifo_clean(struct stream_fifo *fifo) stream_free(s); } fifo->head = fifo->tail = NULL; - fifo->count = 0; + atomic_store_explicit(&fifo->count, 0, memory_order_release); +} + +void stream_fifo_clean_safe(struct stream_fifo *fifo) +{ + pthread_mutex_lock(&fifo->mtx); + { + stream_fifo_clean(fifo); + } + pthread_mutex_unlock(&fifo->mtx); +} + +size_t stream_fifo_count_safe(struct stream_fifo *fifo) +{ + return atomic_load_explicit(&fifo->count, memory_order_acquire); } void stream_fifo_free(struct stream_fifo *fifo) { stream_fifo_clean(fifo); + pthread_mutex_destroy(&fifo->mtx); XFREE(MTYPE_STREAM_FIFO, fifo); } diff --git a/lib/stream.h b/lib/stream.h index cc74e22a6..e5d325e43 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -22,6 +22,9 @@ #ifndef _ZEBRA_STREAM_H #define _ZEBRA_STREAM_H +#include <pthread.h> + +#include "frratomic.h" #include "mpls.h" #include "prefix.h" @@ -107,7 +110,11 @@ struct stream { /* First in first out queue structure. */ struct stream_fifo { - size_t count; + /* lock for mt-safe operations */ + pthread_mutex_t mtx; + + /* number of streams in this fifo */ + _Atomic size_t count; struct stream *head; struct stream *tail; @@ -240,12 +247,94 @@ extern int stream_empty(struct stream *); /* is the stream empty? */ /* deprecated */ extern uint8_t *stream_pnt(struct stream *); -/* Stream fifo. */ +/* + * Operations on struct stream_fifo. + * + * Each function has a safe variant, which ensures that the operation performed + * is atomic with respect to the operations performed by all other safe + * variants. In other words, the safe variants lock the stream_fifo's mutex + * before performing their action. These are provided for convenience when + * using stream_fifo in a multithreaded context, to alleviate the need for the + * caller to implement their own synchronization around the stream_fifo. + * + * The following functions do not have safe variants. The caller must ensure + * that these operations are performed safely in a multithreaded context: + * - stream_fifo_new + * - stream_fifo_free + */ + +/* + * Create a new stream_fifo. + * + * Returns: + * newly created stream_fifo + */ extern struct stream_fifo *stream_fifo_new(void); + +/* + * Push a stream onto a stream_fifo. + * + * fifo + * the stream_fifo to push onto + * + * s + * the stream to push onto the stream_fifo + */ extern void stream_fifo_push(struct stream_fifo *fifo, struct stream *s); +extern void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s); + +/* + * Pop a stream off a stream_fifo. + * + * fifo + * the stream_fifo to pop from + * + * Returns: + * the next stream in the stream_fifo + */ extern struct stream *stream_fifo_pop(struct stream_fifo *fifo); +extern struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo); + +/* + * Retrieve the next stream from a stream_fifo without popping it. + * + * fifo + * the stream_fifo to operate on + * + * Returns: + * the next stream that would be returned from stream_fifo_pop + */ extern struct stream *stream_fifo_head(struct stream_fifo *fifo); +extern struct stream *stream_fifo_head_safe(struct stream_fifo *fifo); + +/* + * Remove all streams from a stream_fifo. + * + * fifo + * the stream_fifo to clean + */ extern void stream_fifo_clean(struct stream_fifo *fifo); +extern void stream_fifo_clean_safe(struct stream_fifo *fifo); + +/* + * Retrieve number of streams on a stream_fifo. + * + * fifo + * the stream_fifo to retrieve the count for + * + * Returns: + * the number of streams on the stream_fifo + */ +extern size_t stream_fifo_count_safe(struct stream_fifo *fifo); + +/* + * Free a stream_fifo. + * + * Calls stream_fifo_clean, then deinitializes the stream_fifo and frees it. + * + * fifo + * the stream_fifo to free + */ extern void stream_fifo_free(struct stream_fifo *fifo); /* This is here because "<< 24" is particularly problematic in C. |