diff options
author | Quentin Young <qlyoung@cumulusnetworks.com> | 2017-04-18 20:11:43 +0200 |
---|---|---|
committer | Quentin Young <qlyoung@cumulusnetworks.com> | 2017-11-30 22:17:59 +0100 |
commit | 56257a44e408b7491090b47e24b22beeb1cfd35e (patch) | |
tree | 7092cbbd51fcdb93b179de8cf73188fb72ccd971 /bgpd | |
parent | bgpd: use new threading infra (diff) | |
download | frr-56257a44e408b7491090b47e24b22beeb1cfd35e.tar.xz frr-56257a44e408b7491090b47e24b22beeb1cfd35e.zip |
bgpd: move bgp i/o to a separate source file
After implement threading, bgp_packet.c was serving the double purpose
of consolidating packet parsing functionality and handling actual I/O
operations. This is somewhat messy and difficult to understand. I've
thus moved all code and data structures for handling threaded packet
writes to bgp_io.[ch].
Although bgp_io.[ch] only handles writes at the moment to keep the noise
on this commit series down, for organization purposes, it's probably
best to move bgp_read() and its trappings into here as well and
restructure that code so that read()'s happen in the pthread and packet
processing happens on the main thread.
Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
Diffstat (limited to 'bgpd')
-rw-r--r-- | bgpd/Makefile.am | 3 | ||||
-rw-r--r-- | bgpd/bgp_fsm.c | 2 | ||||
-rw-r--r-- | bgpd/bgp_io.c | 307 | ||||
-rw-r--r-- | bgpd/bgp_io.h | 104 | ||||
-rw-r--r-- | bgpd/bgp_packet.c | 453 | ||||
-rw-r--r-- | bgpd/bgp_packet.h | 7 | ||||
-rw-r--r-- | bgpd/bgpd.c | 1 | ||||
-rw-r--r-- | bgpd/bgpd.h | 1 |
8 files changed, 500 insertions, 378 deletions
diff --git a/bgpd/Makefile.am b/bgpd/Makefile.am index c8791687e..b0d34dc43 100644 --- a/bgpd/Makefile.am +++ b/bgpd/Makefile.am @@ -86,7 +86,7 @@ libbgp_a_SOURCES = \ bgp_nht.c bgp_updgrp.c bgp_updgrp_packet.c bgp_updgrp_adv.c bgp_bfd.c \ bgp_encap_tlv.c $(BGP_VNC_RFAPI_SRC) bgp_attr_evpn.c \ bgp_evpn.c bgp_evpn_vty.c bgp_vpn.c bgp_label.c bgp_rd.c \ - bgp_keepalives.c + bgp_keepalives.c bgp_io.c noinst_HEADERS = \ bgp_memory.h \ @@ -99,6 +99,7 @@ noinst_HEADERS = \ bgp_updgrp.h bgp_bfd.h bgp_encap_tlv.h bgp_encap_types.h \ $(BGP_VNC_RFAPI_HD) bgp_attr_evpn.h bgp_evpn.h bgp_evpn_vty.h \ bgp_vpn.h bgp_label.h bgp_rd.h bgp_evpn_private.h bgp_keepalives.h \ + bgp_io.h bgpd_SOURCES = bgp_main.c bgpd_LDADD = libbgp.a $(BGP_VNC_RFP_LIB) ../lib/libfrr.la @LIBCAP@ @LIBM@ diff --git a/bgpd/bgp_fsm.c b/bgpd/bgp_fsm.c index d965b49c4..95e2f157c 100644 --- a/bgpd/bgp_fsm.c +++ b/bgpd/bgp_fsm.c @@ -50,6 +50,7 @@ #include "bgpd/bgp_bfd.h" #include "bgpd/bgp_memory.h" #include "bgpd/bgp_keepalives.h" +#include "bgpd/bgp_io.h" DEFINE_HOOK(peer_backward_transition, (struct peer * peer), (peer)) DEFINE_HOOK(peer_established, (struct peer * peer), (peer)) @@ -1037,6 +1038,7 @@ int bgp_stop(struct peer *peer) BGP_TIMER_OFF(peer->t_holdtime); peer_keepalives_off(peer); BGP_TIMER_OFF(peer->t_routeadv); + BGP_TIMER_OFF(peer->t_generate_updgrp_packets); /* Stream reset. */ peer->packet_size = 0; diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c new file mode 100644 index 000000000..5d14737d2 --- /dev/null +++ b/bgpd/bgp_io.c @@ -0,0 +1,307 @@ +/* + BGP I/O. + Implements a consumer thread to flush packets destined for remote peers. + + Copyright (C) 2017 Cumulus Networks + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING; if not, write to the + Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, + MA 02110-1301 USA + */ + +#include <zebra.h> +#include <sys/time.h> +#include <pthread.h> + +#include "thread.h" +#include "hash.h" +#include "stream.h" +#include "memory.h" +#include "log.h" +#include "monotime.h" +#include "network.h" +#include "frr_pthread.h" + +#include "bgpd/bgpd.h" +#include "bgpd/bgp_io.h" +#include "bgpd/bgp_debug.h" +#include "bgpd/bgp_packet.h" +#include "bgpd/bgp_fsm.h" + +static int bgp_write(struct peer *); +static void peer_process_writes(struct hash_backet *, void *); + +bool bgp_packet_writes_thread_run = false; + +/* Hash table of peers to operate on, associated synchronization primitives and + * hash table callbacks. + * ------------------------------------------------------------------------ */ +static struct hash *peerhash; +/* Mutex to protect hash table */ +static pthread_mutex_t *peerhash_mtx; +/* Condition variable used to notify the write thread that there is work to do + */ +static pthread_cond_t *write_cond; + +static unsigned int peer_hash_key_make(void *p) +{ + struct peer *peer = p; + return sockunion_hash(&peer->su); +} + +static int peer_hash_cmp(const void *p1, const void *p2) +{ + const struct peer *peer1 = p1; + const struct peer *peer2 = p2; + return (sockunion_same(&peer1->su, &peer2->su) + && CHECK_FLAG(peer1->flags, PEER_FLAG_CONFIG_NODE) + == CHECK_FLAG(peer2->flags, PEER_FLAG_CONFIG_NODE)); +} +/* ------------------------------------------------------------------------ */ + +void peer_writes_init(void) +{ + peerhash_mtx = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_mutex_t)); + write_cond = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_cond_t)); + + // initialize mutex + pthread_mutex_init(peerhash_mtx, NULL); + + // use monotonic clock with condition variable + pthread_condattr_t attrs; + pthread_condattr_init(&attrs); + pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC); + pthread_cond_init(write_cond, &attrs); + pthread_condattr_destroy(&attrs); + + // initialize peerhash + peerhash = hash_create_size(2048, peer_hash_key_make, peer_hash_cmp); +} + +static void peer_writes_finish(void *arg) +{ + bgp_packet_writes_thread_run = false; + + if (peerhash) + hash_free(peerhash); + + peerhash = NULL; + + pthread_mutex_unlock(peerhash_mtx); + pthread_mutex_destroy(peerhash_mtx); + pthread_cond_destroy(write_cond); + + XFREE(MTYPE_PTHREAD, peerhash_mtx); + XFREE(MTYPE_PTHREAD, write_cond); +} + +void *peer_writes_start(void *arg) +{ + struct timeval currtime = {0, 0}; + struct timeval sleeptime = {0, 500}; + struct timespec next_update = {0, 0}; + + pthread_mutex_lock(peerhash_mtx); + + // register cleanup handler + pthread_cleanup_push(&peer_writes_finish, NULL); + + bgp_packet_writes_thread_run = true; + + while (bgp_packet_writes_thread_run) { + // wait around until next update time + if (peerhash->count > 0) + pthread_cond_timedwait(write_cond, peerhash_mtx, + &next_update); + else // wait around until we have some peers + while (peerhash->count == 0 + && bgp_packet_writes_thread_run) + pthread_cond_wait(write_cond, peerhash_mtx); + + hash_iterate(peerhash, peer_process_writes, NULL); + + monotime(&currtime); + timeradd(&currtime, &sleeptime, &currtime); + TIMEVAL_TO_TIMESPEC(&currtime, &next_update); + } + + // clean up + pthread_cleanup_pop(1); + + return NULL; +} + +int peer_writes_stop(void **result) +{ + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_WRITE); + bgp_packet_writes_thread_run = false; + peer_writes_wake(); + pthread_join(fpt->thread, result); + return 0; +} + +void peer_writes_on(struct peer *peer) +{ + if (peer->status == Deleted) + return; + + pthread_mutex_lock(peerhash_mtx); + { + if (!hash_lookup(peerhash, peer)) { + hash_get(peerhash, peer, hash_alloc_intern); + peer_lock(peer); + } + + SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); + } + pthread_mutex_unlock(peerhash_mtx); + peer_writes_wake(); +} + +void peer_writes_off(struct peer *peer) +{ + pthread_mutex_lock(peerhash_mtx); + { + if (hash_release(peerhash, peer)) { + peer_unlock(peer); + fprintf(stderr, "Releasing %p\n", peer); + } + + UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); + } + pthread_mutex_unlock(peerhash_mtx); +} + +void peer_writes_wake() +{ + pthread_cond_signal(write_cond); +} + +/** + * Callback for hash_iterate. Takes a hash bucket, unwraps it into a peer and + * synchronously calls bgp_write() on the peer. + */ +static void peer_process_writes(struct hash_backet *hb, void *arg) +{ + static struct peer *peer; + peer = hb->data; + pthread_mutex_lock(&peer->obuf_mtx); + { + bgp_write(peer); + } + pthread_mutex_unlock(&peer->obuf_mtx); + + // dispatch job on main thread + BGP_TIMER_ON(peer->t_generate_updgrp_packets, + bgp_generate_updgrp_packets, 100); +} + +/** + * Flush peer output buffer. + * + * This function pops packets off of peer->obuf and writes them to peer->fd. + * The amount of packets written is equal to the minimum of peer->wpkt_quanta + * and the number of packets on the output buffer. + * + * If write() returns an error, the appropriate FSM event is generated. + * + * The return value is equal to the number of packets written + * (which may be zero). + */ +static int bgp_write(struct peer *peer) +{ + u_char type; + struct stream *s; + int num; + int update_last_write = 0; + unsigned int count = 0; + unsigned int oc = 0; + + /* Write packets. The number of packets written is the value of + * bgp->wpkt_quanta or the size of the output buffer, whichever is + * smaller.*/ + while (count < peer->bgp->wpkt_quanta + && (s = stream_fifo_head(peer->obuf))) { + int writenum; + do { + writenum = stream_get_endp(s) - stream_get_getp(s); + num = write(peer->fd, STREAM_PNT(s), writenum); + + if (num < 0) { + if (!ERRNO_IO_RETRY(errno)) + BGP_EVENT_ADD(peer, TCP_fatal_error); + + goto done; + } else if (num != writenum) // incomplete write + stream_forward_getp(s, num); + + } while (num != writenum); + + /* Retrieve BGP packet type. */ + stream_set_getp(s, BGP_MARKER_SIZE + 2); + type = stream_getc(s); + + switch (type) { + case BGP_MSG_OPEN: + peer->open_out++; + break; + case BGP_MSG_UPDATE: + peer->update_out++; + break; + case BGP_MSG_NOTIFY: + peer->notify_out++; + /* Double start timer. */ + peer->v_start *= 2; + + /* Overflow check. */ + if (peer->v_start >= (60 * 2)) + peer->v_start = (60 * 2); + + /* Handle Graceful Restart case where the state changes + to + Connect instead of Idle */ + /* Flush any existing events */ + BGP_EVENT_ADD(peer, BGP_Stop); + goto done; + + case BGP_MSG_KEEPALIVE: + peer->keepalive_out++; + break; + case BGP_MSG_ROUTE_REFRESH_NEW: + case BGP_MSG_ROUTE_REFRESH_OLD: + peer->refresh_out++; + break; + case BGP_MSG_CAPABILITY: + peer->dynamic_cap_out++; + break; + } + + count++; + /* OK we send packet so delete it. */ + stream_free(stream_fifo_pop(peer->obuf)); + update_last_write = 1; + } + +done : { + /* Update last_update if UPDATEs were written. */ + if (peer->update_out > oc) + peer->last_update = bgp_clock(); + + /* If we TXed any flavor of packet update last_write */ + if (update_last_write) + peer->last_write = bgp_clock(); +} + + return count; +} diff --git a/bgpd/bgp_io.h b/bgpd/bgp_io.h new file mode 100644 index 000000000..7b81b8ee3 --- /dev/null +++ b/bgpd/bgp_io.h @@ -0,0 +1,104 @@ +/* + BGP I/O. + Implements a consumer thread to flush packets destined for remote peers. + + Copyright (C) 2017 Cumulus Networks + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING; if not, write to the + Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, + MA 02110-1301 USA + */ + +#ifndef _FRR_BGP_IO_H +#define _FRR_BGP_IO_H + +#include "bgpd/bgpd.h" + +/** + * Control variable for write thread. + * + * Setting this variable to false and calling peer_writes_wake() will + * eventually result in thread termination. + */ +extern bool bgp_packet_writes_thread_run; + +/** + * Initializes data structures and flags for the write thread. + * + * This function should be called from the main thread before + * peer_writes_start() is invoked. + */ +extern void peer_writes_init(void); + +/** + * Start function for write thread. + * + * This function should be passed to pthread_create() during BGP startup. + */ +extern void *peer_writes_start(void *arg); + +/** + * Start function for write thread. + * + * Uninitializes all resources and stops the thread. + * + * @param result -- where to store data result, unused + */ +extern int peer_writes_stop(void **result); + +/** + * Registers a peer with the write thread. + * + * This function adds the peer to an internal data structure, which must be + * locked for write access. This call will block until the structure can be + * locked. + * + * After this function is called, any packets placed on peer->obuf will be + * written to peer->fd at regular intervals. + * + * This function increments the peer reference counter with peer_lock(). + * + * If the peer is already registered, nothing happens. + * + * @param peer - peer to register + */ +extern void peer_writes_on(struct peer *peer); + +/** + * Deregisters a peer with the write thread. + * + * This function removes the peer from an internal data structure, which must + * be locked for write access. This call will block until the structure can be + * locked. + * + * After this function is called, any packets placed on peer->obuf will not be + * written to peer->fd. + * + * This function decrements the peer reference counter with peer_unlock(). + * + * If the peer is not registered, nothing happens. + * + * @param peer - peer to deregister + */ +extern void peer_writes_off(struct peer *peer); + +/** + * Notifies the write thread that there is work to be done. + * + * This function has the effect of waking the write thread if it is sleeping. + * If the thread is not sleeping, this signal will be ignored. + */ +extern void peer_writes_wake(void); + +#endif /* _FRR_BGP_IO_H */ diff --git a/bgpd/bgp_packet.c b/bgpd/bgp_packet.c index 5994df23e..a89d72cc6 100644 --- a/bgpd/bgp_packet.c +++ b/bgpd/bgp_packet.c @@ -1,4 +1,6 @@ /* BGP packet management routine. + * Contains utility functions for constructing and consuming BGP messages. + * Copyright (C) 2017 Cumulus Networks * Copyright (C) 1999 Kunihiro Ishiguro * * This file is part of GNU Zebra. @@ -34,7 +36,6 @@ #include "plist.h" #include "queue.h" #include "filter.h" -#include "lib/frr_pthread.h" #include "bgpd/bgpd.h" #include "bgpd/bgp_table.h" @@ -56,6 +57,7 @@ #include "bgpd/bgp_vty.h" #include "bgpd/bgp_updgrp.h" #include "bgpd/bgp_label.h" +#include "bgpd/bgp_io.h" /* Linked list of active peers */ static pthread_mutex_t *plist_mtx; @@ -99,17 +101,6 @@ int bgp_packet_set_size(struct stream *s) return cp; } -/** - * Push a packet onto the beginning of the peer's output queue. - * Must be externally synchronized around 'peer'. - */ -static void bgp_packet_add_unsafe(struct peer *peer, struct stream *s) -{ - /* Add packet to the end of list. */ - stream_fifo_push(peer->obuf, s); - peer_writes_wake(); -} - /* * Push a packet onto the beginning of the peer's output queue. * This function acquires the peer's write mutex before proceeding. @@ -117,17 +108,10 @@ static void bgp_packet_add_unsafe(struct peer *peer, struct stream *s) static void bgp_packet_add(struct peer *peer, struct stream *s) { pthread_mutex_lock(&peer->obuf_mtx); - bgp_packet_add_unsafe(peer, s); + stream_fifo_push(peer->obuf, s); pthread_mutex_unlock(&peer->obuf_mtx); -} -/** - * Pop a packet off the end of the peer's output queue. - * Must be externally synchronized around 'peer'. - */ -static void bgp_packet_delete_unsafe(struct peer *peer) -{ - stream_free(stream_fifo_pop(peer->obuf)); + peer_writes_wake(); } static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi, @@ -172,10 +156,18 @@ static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi, return s; } -/* Get next packet to be written. */ -static struct stream *bgp_write_packet(struct peer *peer) +/* + * Enqueue onto the peer's output buffer any packets which are pending for the + * update group it is a member of. + * + * XXX: Severely needs performance work. + */ +int bgp_generate_updgrp_packets(struct thread *thread) { - struct stream *s = NULL; + struct peer *peer = THREAD_ARG(thread); + peer->t_generate_updgrp_packets = NULL; + + struct stream *s; struct peer_af *paf; struct bpacket *next_pkt; afi_t afi; @@ -187,104 +179,86 @@ static struct stream *bgp_write_packet(struct peer *peer) * update-delay post processing). */ if (peer->status != Established) - return NULL; + return 0; if (peer->bgp && peer->bgp->main_peers_update_hold) - return NULL; - - FOREACH_AFI_SAFI (afi, safi) { - paf = peer_af_find(peer, afi, safi); - if (!paf || !PAF_SUBGRP(paf)) - continue; - next_pkt = paf->next_pkt_to_send; - - /* Try to generate a packet for the peer if we are at - * the end of - * the list. Always try to push out WITHDRAWs first. */ - if (!next_pkt || !next_pkt->buffer) { - next_pkt = subgroup_withdraw_packet(PAF_SUBGRP(paf)); - if (!next_pkt || !next_pkt->buffer) - subgroup_update_packet(PAF_SUBGRP(paf)); - next_pkt = paf->next_pkt_to_send; - } + return 0; - /* Try to generate a packet for the peer if we are at - * the end of - * the list. Always try to push out WITHDRAWs first. */ - if (!next_pkt || !next_pkt->buffer) { - next_pkt = subgroup_withdraw_packet( - PAF_SUBGRP(paf)); - if (!next_pkt || !next_pkt->buffer) - subgroup_update_packet(PAF_SUBGRP(paf)); + do { + s = NULL; + for (afi = AFI_IP; afi < AFI_MAX; afi++) + for (safi = SAFI_UNICAST; safi < SAFI_MAX; safi++) { + paf = peer_af_find(peer, afi, safi); + if (!paf || !PAF_SUBGRP(paf)) + continue; next_pkt = paf->next_pkt_to_send; - } - - /* If we still don't have a packet to send to the peer, - * then - * try to find out out if we have to send eor or if not, - * skip to - * the next AFI, SAFI. - * Don't send the EOR prematurely... if the subgroup's - * coalesce - * timer is running, the adjacency-out structure is not - * created - * yet. - */ - if (!next_pkt || !next_pkt->buffer) { - if (CHECK_FLAG(peer->cap, - PEER_CAP_RESTART_RCV)) { - if (!(PAF_SUBGRP(paf))->t_coalesce - && peer->afc_nego[afi][safi] - && peer->synctime - && !CHECK_FLAG( - peer->af_sflags[afi] - [safi], - PEER_STATUS_EOR_SEND)) { - SET_FLAG(peer->af_sflags[afi] - [safi], - PEER_STATUS_EOR_SEND); - if ((s = bgp_update_packet_eor( - peer, afi, safi))) - bgp_packet_add(peer, s); + /* Try to generate a packet for the peer if we + * are at the end of + * the list. Always try to push out WITHDRAWs + * first. */ + if (!next_pkt || !next_pkt->buffer) { + next_pkt = subgroup_withdraw_packet( + PAF_SUBGRP(paf)); + if (!next_pkt || !next_pkt->buffer) + subgroup_update_packet( + PAF_SUBGRP(paf)); + next_pkt = paf->next_pkt_to_send; + } - return s; + /* If we still don't have a packet to send to + * the peer, then + * try to find out out if we have to send eor or + * if not, skip to + * the next AFI, SAFI. + * Don't send the EOR prematurely... if the + * subgroup's coalesce + * timer is running, the adjacency-out structure + * is not created + * yet. + */ + if (!next_pkt || !next_pkt->buffer) { + if (CHECK_FLAG(peer->cap, + PEER_CAP_RESTART_RCV)) { + if (!(PAF_SUBGRP(paf)) + ->t_coalesce + && peer->afc_nego[afi][safi] + && peer->synctime + && !CHECK_FLAG( + peer->af_sflags + [afi] + [safi], + PEER_STATUS_EOR_SEND)) { + SET_FLAG( + peer->af_sflags + [afi] + [safi], + PEER_STATUS_EOR_SEND); + + if ((s = bgp_update_packet_eor( + peer, afi, + safi))) + bgp_packet_add( + peer, + s); + } } + continue; } - } - continue; - } - - /* Found a packet template to send, overwrite packet - * with appropriate - * attributes from peer and advance peer */ - s = bpacket_reformat_for_peer(next_pkt, paf); - bgp_packet_add(peer, s); - bpacket_queue_advance_peer(paf); - return s; - } - - return NULL; -} -static int bgp_generate_updgrp_packets(struct thread *thread) -{ - struct listnode *ln; - struct peer *peer; - pthread_mutex_lock(plist_mtx); - { - for (ALL_LIST_ELEMENTS_RO(plist, ln, peer)) - while (bgp_write_packet(peer)) - ; + /* Found a packet template to send, overwrite + * packet with appropriate + * attributes from peer and advance peer */ + s = bpacket_reformat_for_peer(next_pkt, paf); + bgp_packet_add(peer, s); + bpacket_queue_advance_peer(paf); + } + } while (s); - t_generate_updgrp_packets = NULL; - } - pthread_mutex_unlock(plist_mtx); return 0; } - /* * Creates a BGP Keepalive packet and appends it to the peer's output queue. */ @@ -2168,266 +2142,3 @@ done: return 0; } - -/* ------------- write thread ------------------ */ - -/** - * Flush peer output buffer. - * - * This function pops packets off of peer->obuf and writes them to peer->fd. - * The amount of packets written is equal to the minimum of peer->wpkt_quanta - * and the number of packets on the output buffer. - * - * If write() returns an error, the appropriate FSM event is generated. - * - * The return value is equal to the number of packets written - * (which may be zero). - */ -static int bgp_write(struct peer *peer) -{ - u_char type; - struct stream *s; - int num; - int update_last_write = 0; - unsigned int count = 0; - unsigned int oc = 0; - - /* Write packets. The number of packets written is the value of - * bgp->wpkt_quanta or the size of the output buffer, whichever is - * smaller.*/ - while (count < peer->bgp->wpkt_quanta - && (s = stream_fifo_head(peer->obuf))) { - int writenum; - do { // write a full packet, or return on error - writenum = stream_get_endp(s) - stream_get_getp(s); - num = write(peer->fd, STREAM_PNT(s), writenum); - - if (num < 0) { - if (!ERRNO_IO_RETRY(errno)) - BGP_EVENT_ADD(peer, TCP_fatal_error); - - goto done; - } else if (num != writenum) // incomplete write - stream_forward_getp(s, num); - - } while (num != writenum); - - /* Retrieve BGP packet type. */ - stream_set_getp(s, BGP_MARKER_SIZE + 2); - type = stream_getc(s); - - switch (type) { - case BGP_MSG_OPEN: - peer->open_out++; - break; - case BGP_MSG_UPDATE: - peer->update_out++; - break; - case BGP_MSG_NOTIFY: - peer->notify_out++; - /* Double start timer. */ - peer->v_start *= 2; - - /* Overflow check. */ - if (peer->v_start >= (60 * 2)) - peer->v_start = (60 * 2); - - /* Handle Graceful Restart case where the state changes - to - Connect instead of Idle */ - /* Flush any existing events */ - BGP_EVENT_ADD(peer, BGP_Stop); - goto done; - - case BGP_MSG_KEEPALIVE: - peer->keepalive_out++; - break; - case BGP_MSG_ROUTE_REFRESH_NEW: - case BGP_MSG_ROUTE_REFRESH_OLD: - peer->refresh_out++; - break; - case BGP_MSG_CAPABILITY: - peer->dynamic_cap_out++; - break; - } - - count++; - /* OK we send packet so delete it. */ - bgp_packet_delete_unsafe(peer); - update_last_write = 1; - } - -done : { - /* Update last_update if UPDATEs were written. */ - if (peer->update_out > oc) - peer->last_update = bgp_clock(); - - /* If we TXed any flavor of packet update last_write */ - if (update_last_write) - peer->last_write = bgp_clock(); -} - - return count; -} - -void peer_writes_init(void) -{ - plist_mtx = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_mutex_t)); - write_cond = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_cond_t)); - - // initialize mutex - pthread_mutex_init(plist_mtx, NULL); - - // use monotonic clock with condition variable - pthread_condattr_t attrs; - pthread_condattr_init(&attrs); - pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC); - pthread_cond_init(write_cond, &attrs); - pthread_condattr_destroy(&attrs); - - // initialize peerlist - plist = list_new(); -} - -static void peer_writes_finish(void *arg) -{ - bgp_packet_writes_thread_run = false; - - if (plist) - list_delete(plist); - - plist = NULL; - - pthread_mutex_unlock(plist_mtx); - pthread_mutex_destroy(plist_mtx); - pthread_cond_destroy(write_cond); - - XFREE(MTYPE_PTHREAD, plist_mtx); - XFREE(MTYPE_PTHREAD, write_cond); -} - -/** - * Entry function for peer packet flushing pthread. - * - * peer_writes_init() must be called prior to this. - */ -void *peer_writes_start(void *arg) -{ - struct timeval currtime = {0, 0}; - struct timeval sleeptime = {0, 500}; - struct timespec next_update = {0, 0}; - - struct listnode *ln; - struct peer *peer; - - pthread_mutex_lock(plist_mtx); - - // register cleanup handler - pthread_cleanup_push(&peer_writes_finish, NULL); - - bgp_packet_writes_thread_run = true; - - while (bgp_packet_writes_thread_run) { // wait around until next update - // time - if (plist->count > 0) - pthread_cond_timedwait(write_cond, plist_mtx, - &next_update); - else // wait around until we have some peers - while (plist->count == 0 - && bgp_packet_writes_thread_run) - pthread_cond_wait(write_cond, plist_mtx); - - for (ALL_LIST_ELEMENTS_RO(plist, ln, peer)) { - pthread_mutex_lock(&peer->obuf_mtx); - { - bgp_write(peer); - } - pthread_mutex_unlock(&peer->obuf_mtx); - - if (!bgp_packet_writes_thread_run) - break; - } - - // schedule update packet generation on main thread - if (!t_generate_updgrp_packets) - t_generate_updgrp_packets = thread_add_event( - bm->master, bgp_generate_updgrp_packets, NULL, - 0); - - monotime(&currtime); - timeradd(&currtime, &sleeptime, &currtime); - TIMEVAL_TO_TIMESPEC(&currtime, &next_update); - } - - // clean up - pthread_cleanup_pop(1); - - return NULL; -} - -/** - * Turns on packet writing for a peer. - */ -void peer_writes_on(struct peer *peer) -{ - if (peer->status == Deleted) - return; - - pthread_mutex_lock(plist_mtx); - { - struct listnode *ln, *nn; - struct peer *p; - - // make sure this peer isn't already in the list - for (ALL_LIST_ELEMENTS(plist, ln, nn, p)) - if (p == peer) { - pthread_mutex_unlock(plist_mtx); - return; - } - - peer_lock(peer); - listnode_add(plist, peer); - - SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); - } - pthread_mutex_unlock(plist_mtx); - peer_writes_wake(); -} - -/** - * Turns off packet writing for a peer. - */ -void peer_writes_off(struct peer *peer) -{ - struct listnode *ln, *nn; - struct peer *p; - pthread_mutex_lock(plist_mtx); - { - for (ALL_LIST_ELEMENTS(plist, ln, nn, p)) - if (p == peer) { - list_delete_node(plist, ln); - peer_unlock(peer); - break; - } - - UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); - } - pthread_mutex_unlock(plist_mtx); -} - -/** - * Wakes up the write thread to do work. - */ -void peer_writes_wake() -{ - pthread_cond_signal(write_cond); -} - -int peer_writes_stop(void **result) -{ - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_WRITE); - bgp_packet_writes_thread_run = false; - peer_writes_wake(); - pthread_join(fpt->thread, result); - return 0; -} diff --git a/bgpd/bgp_packet.h b/bgpd/bgp_packet.h index 2c252012f..d7080d7fb 100644 --- a/bgpd/bgp_packet.h +++ b/bgpd/bgp_packet.h @@ -67,11 +67,6 @@ extern int bgp_packet_set_size(struct stream *s); /* Control variable for write thread. */ extern bool bgp_packet_writes_thread_run; -extern void peer_writes_init(void); -extern void *peer_writes_start(void *arg); -extern void peer_writes_on(struct peer *peer); -extern void peer_writes_off(struct peer *peer); -extern void peer_writes_wake(void); -extern int peer_writes_stop(void **result); +extern int bgp_generate_updgrp_packets(struct thread *); #endif /* _QUAGGA_BGP_PACKET_H */ diff --git a/bgpd/bgpd.c b/bgpd/bgpd.c index 306a31e40..cfe5d5c67 100644 --- a/bgpd/bgpd.c +++ b/bgpd/bgpd.c @@ -77,6 +77,7 @@ #include "bgpd/bgp_memory.h" #include "bgpd/bgp_evpn_vty.h" #include "bgpd/bgp_keepalives.h" +#include "bgpd/bgp_io.h" DEFINE_MTYPE_STATIC(BGPD, PEER_TX_SHUTDOWN_MSG, "Peer shutdown message (TX)"); diff --git a/bgpd/bgpd.h b/bgpd/bgpd.h index 0e80226eb..4fb784e24 100644 --- a/bgpd/bgpd.h +++ b/bgpd/bgpd.h @@ -806,6 +806,7 @@ struct peer { struct thread *t_pmax_restart; struct thread *t_gr_restart; struct thread *t_gr_stale; + struct thread *t_generate_updgrp_packets; /* Thread flags. */ u_int16_t thread_flags; |