diff options
Diffstat (limited to 'bgpd/bgp_io.c')
-rw-r--r-- | bgpd/bgp_io.c | 123 |
1 files changed, 34 insertions, 89 deletions
diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c index 5ab14d5cd..59b2d1cda 100644 --- a/bgpd/bgp_io.c +++ b/bgpd/bgp_io.c @@ -51,74 +51,13 @@ static bool validate_header(struct peer *); #define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred #define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error -/* Start and stop routines for I/O pthread + control variables - * ------------------------------------------------------------------------ */ -_Atomic bool bgp_io_thread_run; -_Atomic bool bgp_io_thread_started; +/* Thread external API ----------------------------------------------------- */ -void bgp_io_init() -{ - bgp_io_thread_run = false; - bgp_io_thread_started = false; -} - -/* Unused callback for thread_add_read() */ -static int bgp_io_dummy(struct thread *thread) { return 0; } - -void *bgp_io_start(void *arg) +void bgp_writes_on(struct peer *peer) { struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - fpt->master->owner = pthread_self(); - - // fd so we can sleep in poll() - int sleeper[2]; - pipe(sleeper); - thread_add_read(fpt->master, &bgp_io_dummy, NULL, sleeper[0], NULL); - - // we definitely don't want to handle signals - fpt->master->handle_signals = false; - - struct thread task; - - atomic_store_explicit(&bgp_io_thread_run, true, memory_order_seq_cst); - atomic_store_explicit(&bgp_io_thread_started, true, - memory_order_seq_cst); - - while (bgp_io_thread_run) { - if (thread_fetch(fpt->master, &task)) { - thread_call(&task); - } - } - - close(sleeper[1]); - close(sleeper[0]); - - return NULL; -} - -static int bgp_io_finish(struct thread *thread) -{ - atomic_store_explicit(&bgp_io_thread_run, false, memory_order_seq_cst); - return 0; -} - -int bgp_io_stop(void **result, struct frr_pthread *fpt) -{ - thread_add_event(fpt->master, &bgp_io_finish, NULL, 0, NULL); - pthread_join(fpt->thread, result); - return 0; -} - -/* Extern API -------------------------------------------------------------- */ -void bgp_io_running(void) -{ - while (!atomic_load_explicit(&bgp_io_thread_started, - memory_order_seq_cst)) - frr_pthread_yield(); -} + assert(fpt->running); -void bgp_writes_on(struct peer *peer) -{ assert(peer->status != Deleted); assert(peer->obuf); assert(peer->ibuf); @@ -127,8 +66,6 @@ void bgp_writes_on(struct peer *peer) assert(!peer->t_connect_check_w); assert(peer->fd); - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd, &peer->t_write); SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); @@ -137,6 +74,7 @@ void bgp_writes_on(struct peer *peer) void bgp_writes_off(struct peer *peer) { struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + assert(fpt->running); thread_cancel_async(fpt->master, &peer->t_write, NULL); THREAD_OFF(peer->t_generate_updgrp_packets); @@ -146,6 +84,9 @@ void bgp_writes_off(struct peer *peer) void bgp_reads_on(struct peer *peer) { + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + assert(fpt->running); + assert(peer->status != Deleted); assert(peer->ibuf); assert(peer->fd); @@ -155,8 +96,6 @@ void bgp_reads_on(struct peer *peer) assert(!peer->t_connect_check_w); assert(peer->fd); - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, &peer->t_read); @@ -166,6 +105,7 @@ void bgp_reads_on(struct peer *peer) void bgp_reads_off(struct peer *peer) { struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + assert(fpt->running); thread_cancel_async(fpt->master, &peer->t_read, NULL); THREAD_OFF(peer->t_process_packet); @@ -173,9 +113,9 @@ void bgp_reads_off(struct peer *peer) UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); } -/* Internal functions ------------------------------------------------------- */ +/* Thread internal functions ----------------------------------------------- */ -/** +/* * Called from I/O pthread when a file descriptor has become ready for writing. */ static int bgp_process_writes(struct thread *thread) @@ -198,11 +138,13 @@ static int bgp_process_writes(struct thread *thread) } pthread_mutex_unlock(&peer->io_mtx); - if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */ + /* no problem */ + if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { } + /* problem */ if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) { - reschedule = false; /* problem */ + reschedule = false; fatal = true; } @@ -217,7 +159,7 @@ static int bgp_process_writes(struct thread *thread) return 0; } -/** +/* * Called from I/O pthread when a file descriptor has become ready for reading, * or has hung up. * @@ -288,8 +230,10 @@ static int bgp_process_reads(struct thread *thread) /* if this fails we are seriously screwed */ assert(pktsize <= BGP_MAX_PACKET_SIZE); - /* If we have that much data, chuck it into its own - * stream and append to input queue for processing. */ + /* + * If we have that much data, chuck it into its own + * stream and append to input queue for processing. + */ if (ringbuf_remain(ibw) >= pktsize) { struct stream *pkt = stream_new(pktsize); assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize); @@ -323,7 +267,7 @@ static int bgp_process_reads(struct thread *thread) return 0; } -/** +/* * Flush peer output buffer. * * This function pops packets off of peer->obuf and writes them to peer->fd. @@ -342,15 +286,10 @@ static uint16_t bgp_write(struct peer *peer) int num; int update_last_write = 0; unsigned int count = 0; - uint32_t oc; - uint32_t uo; + uint32_t uo = 0; uint16_t status = 0; uint32_t wpkt_quanta_old; - // save current # updates sent - oc = atomic_load_explicit(&peer->update_out, memory_order_relaxed); - - // cache current write quanta wpkt_quanta_old = atomic_load_explicit(&peer->bgp->wpkt_quanta, memory_order_relaxed); @@ -369,7 +308,7 @@ static uint16_t bgp_write(struct peer *peer) } goto done; - } else if (num != writenum) // incomplete write + } else if (num != writenum) stream_forward_getp(s, num); } while (num != writenum); @@ -386,6 +325,7 @@ static uint16_t bgp_write(struct peer *peer) case BGP_MSG_UPDATE: atomic_fetch_add_explicit(&peer->update_out, 1, memory_order_relaxed); + uo++; break; case BGP_MSG_NOTIFY: atomic_fetch_add_explicit(&peer->notify_out, 1, @@ -397,8 +337,10 @@ static uint16_t bgp_write(struct peer *peer) if (peer->v_start >= (60 * 2)) peer->v_start = (60 * 2); - /* Handle Graceful Restart case where the state changes - * to Connect instead of Idle */ + /* + * Handle Graceful Restart case where the state changes + * to Connect instead of Idle. + */ BGP_EVENT_ADD(peer, BGP_Stop); goto done; @@ -424,9 +366,12 @@ static uint16_t bgp_write(struct peer *peer) } done : { - /* Update last_update if UPDATEs were written. */ - uo = atomic_load_explicit(&peer->update_out, memory_order_relaxed); - if (uo > oc) + /* + * Update last_update if UPDATEs were written. + * Note: that these are only updated at end, + * not per message (i.e., per loop) + */ + if (uo) atomic_store_explicit(&peer->last_update, bgp_clock(), memory_order_relaxed); @@ -439,7 +384,7 @@ done : { return status; } -/** +/* * Reads a chunk of data from peer->fd into peer->ibuf_work. * * @return status flag (see top-of-file) |