diff options
author | Quentin Young <qlyoung@cumulusnetworks.com> | 2017-02-07 00:39:06 +0100 |
---|---|---|
committer | Quentin Young <qlyoung@cumulusnetworks.com> | 2017-11-30 22:17:57 +0100 |
commit | d3ecc69e5fba1873872a1f4dc359ff1934f81848 (patch) | |
tree | bcdf393161c7f7ca5e8fffbb1208362904bb80a6 /bgpd/bgp_packet.c | |
parent | Merge pull request #1493 from donaldsharp/plist_stuff (diff) | |
download | frr-d3ecc69e5fba1873872a1f4dc359ff1934f81848.tar.xz frr-d3ecc69e5fba1873872a1f4dc359ff1934f81848.zip |
bgpd: move packet writes into dedicated pthread
* BGP_WRITE_ON() removed
* BGP_WRITE_OFF() removed
* peer_writes_on() added
* peer_writes_off() added
* bgp_write_proceed_actions() removed
Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
Diffstat (limited to 'bgpd/bgp_packet.c')
-rw-r--r-- | bgpd/bgp_packet.c | 581 |
1 files changed, 319 insertions, 262 deletions
diff --git a/bgpd/bgp_packet.c b/bgpd/bgp_packet.c index a955b3512..e27b416d0 100644 --- a/bgpd/bgp_packet.c +++ b/bgpd/bgp_packet.c @@ -19,6 +19,7 @@ */ #include <zebra.h> +#include <sys/time.h> #include "thread.h" #include "stream.h" @@ -55,6 +56,13 @@ #include "bgpd/bgp_updgrp.h" #include "bgpd/bgp_label.h" +/* Linked list of active peers */ +static pthread_mutex_t plist_mtx = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; +static struct list *plist; + +bool bgp_packet_writes_thread_run; + /* Set up BGP packet marker and packet type. */ int bgp_packet_set_marker(struct stream *s, u_char type) { @@ -87,21 +95,40 @@ int bgp_packet_set_size(struct stream *s) return cp; } -/* Add new packet to the peer. */ -void bgp_packet_add(struct peer *peer, struct stream *s) +/** + * Push a packet onto the beginning of the peer's output queue. + * Must be externally synchronized around 'peer'. + */ +static void bgp_packet_add_unsafe(struct peer *peer, struct stream *s) { /* Add packet to the end of list. */ stream_fifo_push(peer->obuf, s); + peer_writes_wake(); +} + +/* + * Push a packet onto the beginning of the peer's output queue. + * This function acquires the peer's write mutex before proceeding. + */ +static void bgp_packet_add(struct peer *peer, struct stream *s) +{ + pthread_mutex_lock(&peer->obuf_mtx); + bgp_packet_add_unsafe(peer, s); + pthread_mutex_unlock(&peer->obuf_mtx); } -/* Free first packet. */ -static void bgp_packet_delete(struct peer *peer) +/** + * Pop a packet off the end of the peer's output queue. + * Must be externally synchronized around 'peer'. + */ +static void bgp_packet_delete_unsafe(struct peer *peer) { stream_free(stream_fifo_pop(peer->obuf)); } + /* Check file descriptor whether connect is established. */ -int bgp_connect_check(struct peer *peer, int change_state) +static int bgp_connect_check(struct peer *peer, int change_state) { int status; socklen_t slen; @@ -109,7 +136,6 @@ int bgp_connect_check(struct peer *peer, int change_state) /* Anyway I have to reset read and write thread. */ BGP_READ_OFF(peer->t_read); - BGP_WRITE_OFF(peer->t_write); /* Check file descriptor. */ slen = sizeof(status); @@ -176,7 +202,7 @@ static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi, } bgp_packet_set_size(s); - bgp_packet_add(peer, s); + bgp_packet_add_unsafe(peer, s); return s; } @@ -248,246 +274,21 @@ static struct stream *bgp_write_packet(struct peer *peer) } - /* - * Found a packet template to send, overwrite packet - * with appropriate - * attributes from peer and advance peer - */ - s = bpacket_reformat_for_peer(next_pkt, paf); - bpacket_queue_advance_peer(paf); - return s; - } - - return NULL; -} - -/* The next action for the peer from a write perspective */ -static void bgp_write_proceed_actions(struct peer *peer) -{ - afi_t afi; - safi_t safi; - struct peer_af *paf; - struct bpacket *next_pkt; - int fullq_found = 0; - struct update_subgroup *subgrp; - - if (stream_fifo_head(peer->obuf)) { - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); - return; - } - - FOREACH_AFI_SAFI (afi, safi) { - paf = peer_af_find(peer, afi, safi); - if (!paf) - continue; - subgrp = paf->subgroup; - if (!subgrp) - continue; - - next_pkt = paf->next_pkt_to_send; - if (next_pkt && next_pkt->buffer) { - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); - return; - } - - /* No packets readily available for AFI/SAFI, are there - * subgroup packets - * that need to be generated? */ - if (bpacket_queue_is_full(SUBGRP_INST(subgrp), - SUBGRP_PKTQ(subgrp))) - fullq_found = 1; - else if (subgroup_packets_to_build(subgrp)) { - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); - return; - } - - /* No packets to send, see if EOR is pending */ - if (CHECK_FLAG(peer->cap, PEER_CAP_RESTART_RCV)) { - if (!subgrp->t_coalesce && peer->afc_nego[afi][safi] - && peer->synctime - && !CHECK_FLAG(peer->af_sflags[afi][safi], - PEER_STATUS_EOR_SEND) - && safi != SAFI_MPLS_VPN) { - BGP_WRITE_ON(peer->t_write, bgp_write, - peer->fd); - return; - } - } - } - if (fullq_found) { - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); - return; - } -} - -/* Write packet to the peer. */ -int bgp_write(struct thread *thread) -{ - struct peer *peer; - u_char type; - struct stream *s; - int num; - int update_last_write = 0; - unsigned int count = 0; - unsigned int oc = 0; - - /* Yes first of all get peer pointer. */ - peer = THREAD_ARG(thread); - peer->t_write = NULL; - - /* For non-blocking IO check. */ - if (peer->status == Connect) { - bgp_connect_check(peer, 1); - return 0; - } - - s = bgp_write_packet(peer); - if (!s) { - bgp_write_proceed_actions(peer); - return 0; - } - - sockopt_cork(peer->fd, 1); - - oc = peer->update_out; - - /* Nonblocking write until TCP output buffer is full. */ - do { - int writenum; - - /* Number of bytes to be sent. */ - writenum = stream_get_endp(s) - stream_get_getp(s); - - /* Call write() system call. */ - num = write(peer->fd, STREAM_PNT(s), writenum); - if (num < 0) { - /* write failed either retry needed or error */ - if (ERRNO_IO_RETRY(errno)) - break; - - BGP_EVENT_ADD(peer, TCP_fatal_error); - return 0; - } - - if (num != writenum) { - /* Partial write */ - stream_forward_getp(s, num); - break; - } - - /* Retrieve BGP packet type. */ - stream_set_getp(s, BGP_MARKER_SIZE + 2); - type = stream_getc(s); - - switch (type) { - case BGP_MSG_OPEN: - peer->open_out++; - break; - case BGP_MSG_UPDATE: - peer->update_out++; - break; - case BGP_MSG_NOTIFY: - peer->notify_out++; - /* Double start timer. */ - peer->v_start *= 2; - - /* Overflow check. */ - if (peer->v_start >= (60 * 2)) - peer->v_start = (60 * 2); - - /* Flush any existing events */ - BGP_EVENT_ADD(peer, BGP_Stop); - goto done; - - case BGP_MSG_KEEPALIVE: - peer->keepalive_out++; - break; - case BGP_MSG_ROUTE_REFRESH_NEW: - case BGP_MSG_ROUTE_REFRESH_OLD: - peer->refresh_out++; - break; - case BGP_MSG_CAPABILITY: - peer->dynamic_cap_out++; - break; + /* Found a packet template to send, overwrite packet + * with appropriate + * attributes from peer and advance peer */ + s = bpacket_reformat_for_peer(next_pkt, paf); + bgp_packet_add_unsafe(peer, s); + bpacket_queue_advance_peer(paf); + return s; } - /* OK we send packet so delete it. */ - bgp_packet_delete(peer); - update_last_write = 1; - } while (++count < peer->bgp->wpkt_quanta - && (s = bgp_write_packet(peer)) != NULL); - - bgp_write_proceed_actions(peer); - -done: - /* Update last_update if UPDATEs were written. */ - if (peer->update_out > oc) - peer->last_update = bgp_clock(); - - /* If we TXed any flavor of packet update last_write */ - if (update_last_write) - peer->last_write = bgp_clock(); - - sockopt_cork(peer->fd, 0); - return 0; -} - -/* This is only for sending NOTIFICATION message to neighbor. */ -static int bgp_write_notify(struct peer *peer) -{ - int ret, val; - u_char type; - struct stream *s; - - /* There should be at least one packet. */ - s = stream_fifo_head(peer->obuf); - if (!s) - return 0; - assert(stream_get_endp(s) >= BGP_HEADER_SIZE); - - /* Stop collecting data within the socket */ - sockopt_cork(peer->fd, 0); - - /* socket is in nonblocking mode, if we can't deliver the NOTIFY, well, - * we only care about getting a clean shutdown at this point. */ - ret = write(peer->fd, STREAM_DATA(s), stream_get_endp(s)); - - /* only connection reset/close gets counted as TCP_fatal_error, failure - * to write the entire NOTIFY doesn't get different FSM treatment */ - if (ret <= 0) { - BGP_EVENT_ADD(peer, TCP_fatal_error); - return 0; - } - - /* Disable Nagle, make NOTIFY packet go out right away */ - val = 1; - (void)setsockopt(peer->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val, - sizeof(val)); - - /* Retrieve BGP packet type. */ - stream_set_getp(s, BGP_MARKER_SIZE + 2); - type = stream_getc(s); - - assert(type == BGP_MSG_NOTIFY); - - /* Type should be notify. */ - peer->notify_out++; - - /* Double start timer. */ - peer->v_start *= 2; - - /* Overflow check. */ - if (peer->v_start >= (60 * 2)) - peer->v_start = (60 * 2); - - /* Handle Graceful Restart case where the state changes to - Connect instead of Idle */ - BGP_EVENT_ADD(peer, BGP_Stop); - - return 0; + return NULL; } -/* Make keepalive packet and send it to the peer. */ +/* + * Creates a BGP Keepalive packet and appends it to the peer's output queue. + */ void bgp_keepalive_send(struct peer *peer) { struct stream *s; @@ -508,11 +309,12 @@ void bgp_keepalive_send(struct peer *peer) /* Add packet to the peer. */ bgp_packet_add(peer, s); - - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); } -/* Make open packet and send it to the peer. */ +/* + * Creates a BGP Open packet and appends it to the peer's output queue. + * Sets capabilities as necessary. + */ void bgp_open_send(struct peer *peer) { struct stream *s; @@ -560,11 +362,20 @@ void bgp_open_send(struct peer *peer) /* Add packet to the peer. */ bgp_packet_add(peer, s); - - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); } -/* Send BGP notify packet with data potion. */ +/* + * Creates a BGP Notify and appends it to the peer's output queue. + * + * This function awakens the write thread to ensure the packet + * gets out ASAP. + * + * @param peer + * @param code BGP error code + * @param sub_code BGP error subcode + * @param data Data portion + * @param datalen length of data portion + */ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code, u_char *data, size_t datalen) { @@ -574,7 +385,7 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code, /* Allocate new stream. */ s = stream_new(BGP_MAX_PACKET_SIZE); - /* Make nitify packet. */ + /* Make notify packet. */ bgp_packet_set_marker(s, BGP_MSG_NOTIFY); /* Set notify packet values. */ @@ -589,8 +400,9 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code, length = bgp_packet_set_size(s); /* Add packet to the peer. */ + pthread_mutex_lock(&peer->obuf_mtx); stream_fifo_clean(peer->obuf); - bgp_packet_add(peer, s); + pthread_mutex_unlock(&peer->obuf_mtx); /* For debug */ { @@ -641,19 +453,37 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code, } else peer->last_reset = PEER_DOWN_NOTIFY_SEND; - /* Call immediately. */ - BGP_WRITE_OFF(peer->t_write); - - bgp_write_notify(peer); + /* Add packet to peer's output queue */ + bgp_packet_add(peer, s); + /* Wake up the write thread to get the notify out ASAP */ + peer_writes_wake(); } -/* Send BGP notify packet. */ +/* + * Creates a BGP Notify and appends it to the peer's output queue. + * + * This function awakens the write thread to ensure the packet + * gets out ASAP. + * + * @param peer + * @param code BGP error code + * @param sub_code BGP error subcode + */ void bgp_notify_send(struct peer *peer, u_char code, u_char sub_code) { bgp_notify_send_with_data(peer, code, sub_code, NULL, 0); } -/* Send route refresh message to the peer. */ +/* + * Creates BGP Route Refresh packet and appends it to the peer's output queue. + * + * @param peer + * @param afi Address Family Identifier + * @param safi Subsequent Address Family Identifier + * @param orf_type Outbound Route Filtering type + * @param when_to_refresh Whether to refresh immediately or defer + * @param remove Whether to remove ORF for specified AFI/SAFI + */ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi, u_char orf_type, u_char when_to_refresh, int remove) { @@ -741,11 +571,17 @@ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi, /* Add packet to the peer. */ bgp_packet_add(peer, s); - - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); } -/* Send capability message to the peer. */ +/* + * Create a BGP Capability packet and append it to the peer's output queue. + * + * @param peer + * @param afi Address Family Identifier + * @param safi Subsequent Address Family Identifier + * @param capability_code BGP Capability Code + * @param action Set or Remove capability + */ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi, int capability_code, int action) { @@ -784,8 +620,6 @@ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi, /* Add packet to the peer. */ bgp_packet_add(peer, s); - - BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd); } /* RFC1771 6.8 Connection collision detection. */ @@ -2340,3 +2174,226 @@ done: return 0; } + +/* ------------- write thread ------------------ */ + +/** + * Flush peer output buffer. + * + * This function pops packets off of peer->obuf and writes them to peer->fd. + * The amount of packets written is equal to the minimum of peer->wpkt_quanta + * and the number of packets on the output buffer. + * + * If write() returns an error, the appropriate FSM event is generated. + * + * The return value is equal to the number of packets written + * (which may be zero). + */ +static int bgp_write(struct peer *peer) +{ + u_char type; + struct stream *s; + int num; + int update_last_write = 0; + unsigned int count = 0; + unsigned int oc = 0; + + /* For non-blocking IO check. */ + if (peer->status == Connect) { + bgp_connect_check(peer, 1); + return 0; + } + + /* Write packets. The number of packets written is the value of + * bgp->wpkt_quanta or the size of the output buffer, whichever is + * smaller.*/ + while (count < peer->bgp->wpkt_quanta + && (s = bgp_write_packet(peer)) != NULL) { + int writenum; + do { // write a full packet, or return on error + writenum = stream_get_endp(s) - stream_get_getp(s); + num = write(peer->fd, STREAM_PNT(s), writenum); + + if (num < 0) { + if (ERRNO_IO_RETRY(errno)) + continue; + + BGP_EVENT_ADD(peer, TCP_fatal_error); + goto done; + } else if (num != writenum) // incomplete write + stream_forward_getp(s, num); + + } while (num != writenum); + + /* Retrieve BGP packet type. */ + stream_set_getp(s, BGP_MARKER_SIZE + 2); + type = stream_getc(s); + + switch (type) { + case BGP_MSG_OPEN: + peer->open_out++; + break; + case BGP_MSG_UPDATE: + peer->update_out++; + break; + case BGP_MSG_NOTIFY: + peer->notify_out++; + /* Double start timer. */ + peer->v_start *= 2; + + /* Overflow check. */ + if (peer->v_start >= (60 * 2)) + peer->v_start = (60 * 2); + + /* Handle Graceful Restart case where the state changes + to + Connect instead of Idle */ + /* Flush any existing events */ + BGP_EVENT_ADD(peer, BGP_Stop); + goto done; + + case BGP_MSG_KEEPALIVE: + peer->keepalive_out++; + break; + case BGP_MSG_ROUTE_REFRESH_NEW: + case BGP_MSG_ROUTE_REFRESH_OLD: + peer->refresh_out++; + break; + case BGP_MSG_CAPABILITY: + peer->dynamic_cap_out++; + break; + } + + count++; + /* OK we send packet so delete it. */ + bgp_packet_delete_unsafe(peer); + update_last_write = 1; + } + +done : { + /* Update last_update if UPDATEs were written. */ + if (peer->update_out > oc) + peer->last_update = bgp_clock(); + + /* If we TXed any flavor of packet update last_write */ + if (update_last_write) + peer->last_write = bgp_clock(); +} + + return count; +} + +static void cleanup_handler(void *arg) +{ + if (plist) + list_delete(plist); + + plist = NULL; + + pthread_mutex_unlock(&plist_mtx); +} + +/** + * Entry function for peer packet flushing pthread. + * + * The plist must be initialized before calling this. + */ +void *peer_writes_start(void *arg) +{ + struct timeval currtime = {0, 0}; + struct timeval sleeptime = {0, 500}; + struct timespec next_update = {0, 0}; + + // initialize + pthread_mutex_lock(&plist_mtx); + plist = list_new(); + + struct listnode *ln; + struct peer *peer; + + pthread_cleanup_push(&cleanup_handler, NULL); + + bgp_packet_writes_thread_run = true; + + while (bgp_packet_writes_thread_run) { // wait around until next update + // time + if (plist->count > 0) + pthread_cond_timedwait(&write_cond, &plist_mtx, + &next_update); + else // wait around until we have some peers + while (plist->count == 0 + && bgp_packet_writes_thread_run) + pthread_cond_wait(&write_cond, &plist_mtx); + + for (ALL_LIST_ELEMENTS_RO(plist, ln, peer)) { + pthread_mutex_lock(&peer->obuf_mtx); + { + bgp_write(peer); + } + pthread_mutex_unlock(&peer->obuf_mtx); + } + + gettimeofday(&currtime, NULL); + timeradd(&currtime, &sleeptime, &currtime); + TIMEVAL_TO_TIMESPEC(&currtime, &next_update); + } + + // clean up + pthread_cleanup_pop(1); + + return NULL; +} + +/** + * Turns on packet writing for a peer. + */ +void peer_writes_on(struct peer *peer) +{ + if (peer->status == Deleted) + return; + + pthread_mutex_lock(&plist_mtx); + { + struct listnode *ln, *nn; + struct peer *p; + + // make sure this peer isn't already in the list + for (ALL_LIST_ELEMENTS(plist, ln, nn, p)) + if (p == peer) { + pthread_mutex_unlock(&plist_mtx); + return; + } + + peer_lock(peer); + listnode_add(plist, peer); + } + pthread_mutex_unlock(&plist_mtx); + peer_writes_wake(); +} + +/** + * Turns off packet writing for a peer. + */ +void peer_writes_off(struct peer *peer) +{ + struct listnode *ln, *nn; + struct peer *p; + pthread_mutex_lock(&plist_mtx); + { + for (ALL_LIST_ELEMENTS(plist, ln, nn, p)) + if (p == peer) { + list_delete_node(plist, ln); + peer_unlock(peer); + break; + } + } + pthread_mutex_unlock(&plist_mtx); +} + +/** + * Wakes up the write thread to do work. + */ +void peer_writes_wake() +{ + pthread_cond_signal(&write_cond); +} |