diff options
author | Quentin Young <qlyoung@cumulusnetworks.com> | 2017-06-08 23:14:18 +0200 |
---|---|---|
committer | Quentin Young <qlyoung@cumulusnetworks.com> | 2017-11-30 22:18:01 +0100 |
commit | b750b0ba76767ce5eed186cdb53c44001d08a7ef (patch) | |
tree | 1c3b749def6e669e967ed5d5683cab4c29f1e639 /bgpd/bgp_io.c | |
parent | bgpd: bye bye THREAD_BACKGROUND (diff) | |
download | frr-b750b0ba76767ce5eed186cdb53c44001d08a7ef.tar.xz frr-b750b0ba76767ce5eed186cdb53c44001d08a7ef.zip |
bgpd: small i/o threading improvements
* Start bit flags at 1, not 2
* Make run-flags atomic for i/o thread
* Remove work_cond mutex, it should no longer be necessary
* Add asserts to ensure proper ordering in bgp_connect()
* Use true/false with booleans, not 1/0
Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
Diffstat (limited to 'bgpd/bgp_io.c')
-rw-r--r-- | bgpd/bgp_io.c | 105 |
1 files changed, 49 insertions, 56 deletions
diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c index 63559467e..581973d37 100644 --- a/bgpd/bgp_io.c +++ b/bgpd/bgp_io.c @@ -44,18 +44,18 @@ static int bgp_process_reads(struct thread *); static bool validate_header(struct peer *); /* generic i/o status codes */ -#define BGP_IO_TRANS_ERR (1 << 1) // EAGAIN or similar occurred -#define BGP_IO_FATAL_ERR (1 << 2) // some kind of fatal TCP error +#define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred +#define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error /* Start and stop routines for I/O pthread + control variables * ------------------------------------------------------------------------ */ -bool bgp_packet_write_thread_run = false; -pthread_mutex_t *work_mtx; +_Atomic bool bgp_io_thread_run; +_Atomic bool bgp_io_thread_started; void bgp_io_init() { - work_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t)); - pthread_mutex_init(work_mtx, NULL); + bgp_io_thread_run = false; + bgp_io_thread_started = false; } void *bgp_io_start(void *arg) @@ -66,16 +66,15 @@ void *bgp_io_start(void *arg) // we definitely don't want to handle signals fpt->master->handle_signals = false; - bgp_packet_write_thread_run = true; struct thread task; - while (bgp_packet_write_thread_run) { + atomic_store_explicit(&bgp_io_thread_run, true, memory_order_relaxed); + atomic_store_explicit(&bgp_io_thread_started, true, + memory_order_relaxed); + + while (bgp_io_thread_run) { if (thread_fetch(fpt->master, &task)) { - pthread_mutex_lock(work_mtx); - { - thread_call(&task); - } - pthread_mutex_unlock(work_mtx); + thread_call(&task); } } @@ -84,21 +83,24 @@ void *bgp_io_start(void *arg) int bgp_io_stop(void **result, struct frr_pthread *fpt) { + + bgp_io_thread_run = false; + /* let the loop break */ fpt->master->spin = false; - bgp_packet_write_thread_run = false; + /* break poll */ pthread_kill(fpt->thread, SIGINT); pthread_join(fpt->thread, result); - pthread_mutex_unlock(work_mtx); - pthread_mutex_destroy(work_mtx); - - XFREE(MTYPE_TMP, work_mtx); return 0; } /* ------------------------------------------------------------------------ */ void bgp_writes_on(struct peer *peer) { + while (!atomic_load_explicit(&bgp_io_thread_started, + memory_order_relaxed)) + ; + assert(peer->status != Deleted); assert(peer->obuf); assert(peer->ibuf); @@ -108,33 +110,31 @@ void bgp_writes_on(struct peer *peer) struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - pthread_mutex_lock(work_mtx); - { - thread_add_write(fpt->master, bgp_process_writes, peer, - peer->fd, &peer->t_write); - SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); - } - pthread_mutex_unlock(work_mtx); + thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd, + &peer->t_write); + SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); } void bgp_writes_off(struct peer *peer) { + while (!atomic_load_explicit(&bgp_io_thread_started, + memory_order_relaxed)) + ; + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - pthread_mutex_lock(work_mtx); - { - thread_cancel_async(fpt->master, &peer->t_write); - THREAD_OFF(peer->t_generate_updgrp_packets); + thread_cancel_async(fpt->master, &peer->t_write, NULL); + THREAD_OFF(peer->t_generate_updgrp_packets); - // peer access by us after this point will result in pain - UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); - } - pthread_mutex_unlock(work_mtx); - /* upon return, i/o thread must not access the peer */ + UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); } void bgp_reads_on(struct peer *peer) { + while (!atomic_load_explicit(&bgp_io_thread_started, + memory_order_relaxed)) + ; + assert(peer->status != Deleted); assert(peer->ibuf); assert(peer->fd); @@ -146,31 +146,24 @@ void bgp_reads_on(struct peer *peer) struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - pthread_mutex_lock(work_mtx); - { - thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, - &peer->t_read); - thread_add_timer_msec(bm->master, bgp_process_packet, peer, 0, - &peer->t_process_packet); - SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); - } - pthread_mutex_unlock(work_mtx); + thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, + &peer->t_read); + + SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); } void bgp_reads_off(struct peer *peer) { + while (!atomic_load_explicit(&bgp_io_thread_started, + memory_order_relaxed)) + ; + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - /* this mutex ensures t_read is not being modified */ - pthread_mutex_lock(work_mtx); - { - thread_cancel_async(fpt->master, &peer->t_read); - THREAD_OFF(peer->t_process_packet); + thread_cancel_async(fpt->master, &peer->t_read, NULL); + THREAD_OFF(peer->t_process_packet); - // peer access by us after this point will result in pain - UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); - } - pthread_mutex_unlock(work_mtx); + UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); } /** @@ -182,13 +175,13 @@ static int bgp_process_writes(struct thread *thread) static struct peer *peer; peer = THREAD_ARG(thread); uint16_t status; + bool reschedule; if (peer->fd < 0) return -1; struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - bool reschedule; pthread_mutex_lock(&peer->io_mtx); { status = bgp_write(peer); @@ -200,7 +193,7 @@ static int bgp_process_writes(struct thread *thread) } if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) - reschedule = 0; // problem + reschedule = false; /* problem */ if (reschedule) { thread_add_write(fpt->master, bgp_process_writes, peer, @@ -451,8 +444,8 @@ done : { */ static uint16_t bgp_read(struct peer *peer) { - int readsize; // how many bytes we want to read - int nbytes; // how many bytes we actually read + size_t readsize; // how many bytes we want to read + ssize_t nbytes; // how many bytes we actually read uint16_t status = 0; readsize = STREAM_WRITEABLE(peer->ibuf_work); |