summaryrefslogtreecommitdiffstats
path: root/bgpd/bgp_packet.c
diff options
context:
space:
mode:
authorQuentin Young <qlyoung@cumulusnetworks.com>2017-02-07 00:39:06 +0100
committerQuentin Young <qlyoung@cumulusnetworks.com>2017-11-30 22:17:57 +0100
commitd3ecc69e5fba1873872a1f4dc359ff1934f81848 (patch)
treebcdf393161c7f7ca5e8fffbb1208362904bb80a6 /bgpd/bgp_packet.c
parentMerge pull request #1493 from donaldsharp/plist_stuff (diff)
downloadfrr-d3ecc69e5fba1873872a1f4dc359ff1934f81848.tar.xz
frr-d3ecc69e5fba1873872a1f4dc359ff1934f81848.zip
bgpd: move packet writes into dedicated pthread
* BGP_WRITE_ON() removed * BGP_WRITE_OFF() removed * peer_writes_on() added * peer_writes_off() added * bgp_write_proceed_actions() removed Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
Diffstat (limited to 'bgpd/bgp_packet.c')
-rw-r--r--bgpd/bgp_packet.c581
1 files changed, 319 insertions, 262 deletions
diff --git a/bgpd/bgp_packet.c b/bgpd/bgp_packet.c
index a955b3512..e27b416d0 100644
--- a/bgpd/bgp_packet.c
+++ b/bgpd/bgp_packet.c
@@ -19,6 +19,7 @@
*/
#include <zebra.h>
+#include <sys/time.h>
#include "thread.h"
#include "stream.h"
@@ -55,6 +56,13 @@
#include "bgpd/bgp_updgrp.h"
#include "bgpd/bgp_label.h"
+/* Linked list of active peers */
+static pthread_mutex_t plist_mtx = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
+static struct list *plist;
+
+bool bgp_packet_writes_thread_run;
+
/* Set up BGP packet marker and packet type. */
int bgp_packet_set_marker(struct stream *s, u_char type)
{
@@ -87,21 +95,40 @@ int bgp_packet_set_size(struct stream *s)
return cp;
}
-/* Add new packet to the peer. */
-void bgp_packet_add(struct peer *peer, struct stream *s)
+/**
+ * Push a packet onto the beginning of the peer's output queue.
+ * Must be externally synchronized around 'peer'.
+ */
+static void bgp_packet_add_unsafe(struct peer *peer, struct stream *s)
{
/* Add packet to the end of list. */
stream_fifo_push(peer->obuf, s);
+ peer_writes_wake();
+}
+
+/*
+ * Push a packet onto the beginning of the peer's output queue.
+ * This function acquires the peer's write mutex before proceeding.
+ */
+static void bgp_packet_add(struct peer *peer, struct stream *s)
+{
+ pthread_mutex_lock(&peer->obuf_mtx);
+ bgp_packet_add_unsafe(peer, s);
+ pthread_mutex_unlock(&peer->obuf_mtx);
}
-/* Free first packet. */
-static void bgp_packet_delete(struct peer *peer)
+/**
+ * Pop a packet off the end of the peer's output queue.
+ * Must be externally synchronized around 'peer'.
+ */
+static void bgp_packet_delete_unsafe(struct peer *peer)
{
stream_free(stream_fifo_pop(peer->obuf));
}
+
/* Check file descriptor whether connect is established. */
-int bgp_connect_check(struct peer *peer, int change_state)
+static int bgp_connect_check(struct peer *peer, int change_state)
{
int status;
socklen_t slen;
@@ -109,7 +136,6 @@ int bgp_connect_check(struct peer *peer, int change_state)
/* Anyway I have to reset read and write thread. */
BGP_READ_OFF(peer->t_read);
- BGP_WRITE_OFF(peer->t_write);
/* Check file descriptor. */
slen = sizeof(status);
@@ -176,7 +202,7 @@ static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi,
}
bgp_packet_set_size(s);
- bgp_packet_add(peer, s);
+ bgp_packet_add_unsafe(peer, s);
return s;
}
@@ -248,246 +274,21 @@ static struct stream *bgp_write_packet(struct peer *peer)
}
- /*
- * Found a packet template to send, overwrite packet
- * with appropriate
- * attributes from peer and advance peer
- */
- s = bpacket_reformat_for_peer(next_pkt, paf);
- bpacket_queue_advance_peer(paf);
- return s;
- }
-
- return NULL;
-}
-
-/* The next action for the peer from a write perspective */
-static void bgp_write_proceed_actions(struct peer *peer)
-{
- afi_t afi;
- safi_t safi;
- struct peer_af *paf;
- struct bpacket *next_pkt;
- int fullq_found = 0;
- struct update_subgroup *subgrp;
-
- if (stream_fifo_head(peer->obuf)) {
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
- return;
- }
-
- FOREACH_AFI_SAFI (afi, safi) {
- paf = peer_af_find(peer, afi, safi);
- if (!paf)
- continue;
- subgrp = paf->subgroup;
- if (!subgrp)
- continue;
-
- next_pkt = paf->next_pkt_to_send;
- if (next_pkt && next_pkt->buffer) {
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
- return;
- }
-
- /* No packets readily available for AFI/SAFI, are there
- * subgroup packets
- * that need to be generated? */
- if (bpacket_queue_is_full(SUBGRP_INST(subgrp),
- SUBGRP_PKTQ(subgrp)))
- fullq_found = 1;
- else if (subgroup_packets_to_build(subgrp)) {
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
- return;
- }
-
- /* No packets to send, see if EOR is pending */
- if (CHECK_FLAG(peer->cap, PEER_CAP_RESTART_RCV)) {
- if (!subgrp->t_coalesce && peer->afc_nego[afi][safi]
- && peer->synctime
- && !CHECK_FLAG(peer->af_sflags[afi][safi],
- PEER_STATUS_EOR_SEND)
- && safi != SAFI_MPLS_VPN) {
- BGP_WRITE_ON(peer->t_write, bgp_write,
- peer->fd);
- return;
- }
- }
- }
- if (fullq_found) {
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
- return;
- }
-}
-
-/* Write packet to the peer. */
-int bgp_write(struct thread *thread)
-{
- struct peer *peer;
- u_char type;
- struct stream *s;
- int num;
- int update_last_write = 0;
- unsigned int count = 0;
- unsigned int oc = 0;
-
- /* Yes first of all get peer pointer. */
- peer = THREAD_ARG(thread);
- peer->t_write = NULL;
-
- /* For non-blocking IO check. */
- if (peer->status == Connect) {
- bgp_connect_check(peer, 1);
- return 0;
- }
-
- s = bgp_write_packet(peer);
- if (!s) {
- bgp_write_proceed_actions(peer);
- return 0;
- }
-
- sockopt_cork(peer->fd, 1);
-
- oc = peer->update_out;
-
- /* Nonblocking write until TCP output buffer is full. */
- do {
- int writenum;
-
- /* Number of bytes to be sent. */
- writenum = stream_get_endp(s) - stream_get_getp(s);
-
- /* Call write() system call. */
- num = write(peer->fd, STREAM_PNT(s), writenum);
- if (num < 0) {
- /* write failed either retry needed or error */
- if (ERRNO_IO_RETRY(errno))
- break;
-
- BGP_EVENT_ADD(peer, TCP_fatal_error);
- return 0;
- }
-
- if (num != writenum) {
- /* Partial write */
- stream_forward_getp(s, num);
- break;
- }
-
- /* Retrieve BGP packet type. */
- stream_set_getp(s, BGP_MARKER_SIZE + 2);
- type = stream_getc(s);
-
- switch (type) {
- case BGP_MSG_OPEN:
- peer->open_out++;
- break;
- case BGP_MSG_UPDATE:
- peer->update_out++;
- break;
- case BGP_MSG_NOTIFY:
- peer->notify_out++;
- /* Double start timer. */
- peer->v_start *= 2;
-
- /* Overflow check. */
- if (peer->v_start >= (60 * 2))
- peer->v_start = (60 * 2);
-
- /* Flush any existing events */
- BGP_EVENT_ADD(peer, BGP_Stop);
- goto done;
-
- case BGP_MSG_KEEPALIVE:
- peer->keepalive_out++;
- break;
- case BGP_MSG_ROUTE_REFRESH_NEW:
- case BGP_MSG_ROUTE_REFRESH_OLD:
- peer->refresh_out++;
- break;
- case BGP_MSG_CAPABILITY:
- peer->dynamic_cap_out++;
- break;
+ /* Found a packet template to send, overwrite packet
+ * with appropriate
+ * attributes from peer and advance peer */
+ s = bpacket_reformat_for_peer(next_pkt, paf);
+ bgp_packet_add_unsafe(peer, s);
+ bpacket_queue_advance_peer(paf);
+ return s;
}
- /* OK we send packet so delete it. */
- bgp_packet_delete(peer);
- update_last_write = 1;
- } while (++count < peer->bgp->wpkt_quanta
- && (s = bgp_write_packet(peer)) != NULL);
-
- bgp_write_proceed_actions(peer);
-
-done:
- /* Update last_update if UPDATEs were written. */
- if (peer->update_out > oc)
- peer->last_update = bgp_clock();
-
- /* If we TXed any flavor of packet update last_write */
- if (update_last_write)
- peer->last_write = bgp_clock();
-
- sockopt_cork(peer->fd, 0);
- return 0;
-}
-
-/* This is only for sending NOTIFICATION message to neighbor. */
-static int bgp_write_notify(struct peer *peer)
-{
- int ret, val;
- u_char type;
- struct stream *s;
-
- /* There should be at least one packet. */
- s = stream_fifo_head(peer->obuf);
- if (!s)
- return 0;
- assert(stream_get_endp(s) >= BGP_HEADER_SIZE);
-
- /* Stop collecting data within the socket */
- sockopt_cork(peer->fd, 0);
-
- /* socket is in nonblocking mode, if we can't deliver the NOTIFY, well,
- * we only care about getting a clean shutdown at this point. */
- ret = write(peer->fd, STREAM_DATA(s), stream_get_endp(s));
-
- /* only connection reset/close gets counted as TCP_fatal_error, failure
- * to write the entire NOTIFY doesn't get different FSM treatment */
- if (ret <= 0) {
- BGP_EVENT_ADD(peer, TCP_fatal_error);
- return 0;
- }
-
- /* Disable Nagle, make NOTIFY packet go out right away */
- val = 1;
- (void)setsockopt(peer->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val,
- sizeof(val));
-
- /* Retrieve BGP packet type. */
- stream_set_getp(s, BGP_MARKER_SIZE + 2);
- type = stream_getc(s);
-
- assert(type == BGP_MSG_NOTIFY);
-
- /* Type should be notify. */
- peer->notify_out++;
-
- /* Double start timer. */
- peer->v_start *= 2;
-
- /* Overflow check. */
- if (peer->v_start >= (60 * 2))
- peer->v_start = (60 * 2);
-
- /* Handle Graceful Restart case where the state changes to
- Connect instead of Idle */
- BGP_EVENT_ADD(peer, BGP_Stop);
-
- return 0;
+ return NULL;
}
-/* Make keepalive packet and send it to the peer. */
+/*
+ * Creates a BGP Keepalive packet and appends it to the peer's output queue.
+ */
void bgp_keepalive_send(struct peer *peer)
{
struct stream *s;
@@ -508,11 +309,12 @@ void bgp_keepalive_send(struct peer *peer)
/* Add packet to the peer. */
bgp_packet_add(peer, s);
-
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
}
-/* Make open packet and send it to the peer. */
+/*
+ * Creates a BGP Open packet and appends it to the peer's output queue.
+ * Sets capabilities as necessary.
+ */
void bgp_open_send(struct peer *peer)
{
struct stream *s;
@@ -560,11 +362,20 @@ void bgp_open_send(struct peer *peer)
/* Add packet to the peer. */
bgp_packet_add(peer, s);
-
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
}
-/* Send BGP notify packet with data potion. */
+/*
+ * Creates a BGP Notify and appends it to the peer's output queue.
+ *
+ * This function awakens the write thread to ensure the packet
+ * gets out ASAP.
+ *
+ * @param peer
+ * @param code BGP error code
+ * @param sub_code BGP error subcode
+ * @param data Data portion
+ * @param datalen length of data portion
+ */
void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
u_char *data, size_t datalen)
{
@@ -574,7 +385,7 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
/* Allocate new stream. */
s = stream_new(BGP_MAX_PACKET_SIZE);
- /* Make nitify packet. */
+ /* Make notify packet. */
bgp_packet_set_marker(s, BGP_MSG_NOTIFY);
/* Set notify packet values. */
@@ -589,8 +400,9 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
length = bgp_packet_set_size(s);
/* Add packet to the peer. */
+ pthread_mutex_lock(&peer->obuf_mtx);
stream_fifo_clean(peer->obuf);
- bgp_packet_add(peer, s);
+ pthread_mutex_unlock(&peer->obuf_mtx);
/* For debug */
{
@@ -641,19 +453,37 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
} else
peer->last_reset = PEER_DOWN_NOTIFY_SEND;
- /* Call immediately. */
- BGP_WRITE_OFF(peer->t_write);
-
- bgp_write_notify(peer);
+ /* Add packet to peer's output queue */
+ bgp_packet_add(peer, s);
+ /* Wake up the write thread to get the notify out ASAP */
+ peer_writes_wake();
}
-/* Send BGP notify packet. */
+/*
+ * Creates a BGP Notify and appends it to the peer's output queue.
+ *
+ * This function awakens the write thread to ensure the packet
+ * gets out ASAP.
+ *
+ * @param peer
+ * @param code BGP error code
+ * @param sub_code BGP error subcode
+ */
void bgp_notify_send(struct peer *peer, u_char code, u_char sub_code)
{
bgp_notify_send_with_data(peer, code, sub_code, NULL, 0);
}
-/* Send route refresh message to the peer. */
+/*
+ * Creates BGP Route Refresh packet and appends it to the peer's output queue.
+ *
+ * @param peer
+ * @param afi Address Family Identifier
+ * @param safi Subsequent Address Family Identifier
+ * @param orf_type Outbound Route Filtering type
+ * @param when_to_refresh Whether to refresh immediately or defer
+ * @param remove Whether to remove ORF for specified AFI/SAFI
+ */
void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi,
u_char orf_type, u_char when_to_refresh, int remove)
{
@@ -741,11 +571,17 @@ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi,
/* Add packet to the peer. */
bgp_packet_add(peer, s);
-
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
}
-/* Send capability message to the peer. */
+/*
+ * Create a BGP Capability packet and append it to the peer's output queue.
+ *
+ * @param peer
+ * @param afi Address Family Identifier
+ * @param safi Subsequent Address Family Identifier
+ * @param capability_code BGP Capability Code
+ * @param action Set or Remove capability
+ */
void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi,
int capability_code, int action)
{
@@ -784,8 +620,6 @@ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi,
/* Add packet to the peer. */
bgp_packet_add(peer, s);
-
- BGP_WRITE_ON(peer->t_write, bgp_write, peer->fd);
}
/* RFC1771 6.8 Connection collision detection. */
@@ -2340,3 +2174,226 @@ done:
return 0;
}
+
+/* ------------- write thread ------------------ */
+
+/**
+ * Flush peer output buffer.
+ *
+ * This function pops packets off of peer->obuf and writes them to peer->fd.
+ * The amount of packets written is equal to the minimum of peer->wpkt_quanta
+ * and the number of packets on the output buffer.
+ *
+ * If write() returns an error, the appropriate FSM event is generated.
+ *
+ * The return value is equal to the number of packets written
+ * (which may be zero).
+ */
+static int bgp_write(struct peer *peer)
+{
+ u_char type;
+ struct stream *s;
+ int num;
+ int update_last_write = 0;
+ unsigned int count = 0;
+ unsigned int oc = 0;
+
+ /* For non-blocking IO check. */
+ if (peer->status == Connect) {
+ bgp_connect_check(peer, 1);
+ return 0;
+ }
+
+ /* Write packets. The number of packets written is the value of
+ * bgp->wpkt_quanta or the size of the output buffer, whichever is
+ * smaller.*/
+ while (count < peer->bgp->wpkt_quanta
+ && (s = bgp_write_packet(peer)) != NULL) {
+ int writenum;
+ do { // write a full packet, or return on error
+ writenum = stream_get_endp(s) - stream_get_getp(s);
+ num = write(peer->fd, STREAM_PNT(s), writenum);
+
+ if (num < 0) {
+ if (ERRNO_IO_RETRY(errno))
+ continue;
+
+ BGP_EVENT_ADD(peer, TCP_fatal_error);
+ goto done;
+ } else if (num != writenum) // incomplete write
+ stream_forward_getp(s, num);
+
+ } while (num != writenum);
+
+ /* Retrieve BGP packet type. */
+ stream_set_getp(s, BGP_MARKER_SIZE + 2);
+ type = stream_getc(s);
+
+ switch (type) {
+ case BGP_MSG_OPEN:
+ peer->open_out++;
+ break;
+ case BGP_MSG_UPDATE:
+ peer->update_out++;
+ break;
+ case BGP_MSG_NOTIFY:
+ peer->notify_out++;
+ /* Double start timer. */
+ peer->v_start *= 2;
+
+ /* Overflow check. */
+ if (peer->v_start >= (60 * 2))
+ peer->v_start = (60 * 2);
+
+ /* Handle Graceful Restart case where the state changes
+ to
+ Connect instead of Idle */
+ /* Flush any existing events */
+ BGP_EVENT_ADD(peer, BGP_Stop);
+ goto done;
+
+ case BGP_MSG_KEEPALIVE:
+ peer->keepalive_out++;
+ break;
+ case BGP_MSG_ROUTE_REFRESH_NEW:
+ case BGP_MSG_ROUTE_REFRESH_OLD:
+ peer->refresh_out++;
+ break;
+ case BGP_MSG_CAPABILITY:
+ peer->dynamic_cap_out++;
+ break;
+ }
+
+ count++;
+ /* OK we send packet so delete it. */
+ bgp_packet_delete_unsafe(peer);
+ update_last_write = 1;
+ }
+
+done : {
+ /* Update last_update if UPDATEs were written. */
+ if (peer->update_out > oc)
+ peer->last_update = bgp_clock();
+
+ /* If we TXed any flavor of packet update last_write */
+ if (update_last_write)
+ peer->last_write = bgp_clock();
+}
+
+ return count;
+}
+
+static void cleanup_handler(void *arg)
+{
+ if (plist)
+ list_delete(plist);
+
+ plist = NULL;
+
+ pthread_mutex_unlock(&plist_mtx);
+}
+
+/**
+ * Entry function for peer packet flushing pthread.
+ *
+ * The plist must be initialized before calling this.
+ */
+void *peer_writes_start(void *arg)
+{
+ struct timeval currtime = {0, 0};
+ struct timeval sleeptime = {0, 500};
+ struct timespec next_update = {0, 0};
+
+ // initialize
+ pthread_mutex_lock(&plist_mtx);
+ plist = list_new();
+
+ struct listnode *ln;
+ struct peer *peer;
+
+ pthread_cleanup_push(&cleanup_handler, NULL);
+
+ bgp_packet_writes_thread_run = true;
+
+ while (bgp_packet_writes_thread_run) { // wait around until next update
+ // time
+ if (plist->count > 0)
+ pthread_cond_timedwait(&write_cond, &plist_mtx,
+ &next_update);
+ else // wait around until we have some peers
+ while (plist->count == 0
+ && bgp_packet_writes_thread_run)
+ pthread_cond_wait(&write_cond, &plist_mtx);
+
+ for (ALL_LIST_ELEMENTS_RO(plist, ln, peer)) {
+ pthread_mutex_lock(&peer->obuf_mtx);
+ {
+ bgp_write(peer);
+ }
+ pthread_mutex_unlock(&peer->obuf_mtx);
+ }
+
+ gettimeofday(&currtime, NULL);
+ timeradd(&currtime, &sleeptime, &currtime);
+ TIMEVAL_TO_TIMESPEC(&currtime, &next_update);
+ }
+
+ // clean up
+ pthread_cleanup_pop(1);
+
+ return NULL;
+}
+
+/**
+ * Turns on packet writing for a peer.
+ */
+void peer_writes_on(struct peer *peer)
+{
+ if (peer->status == Deleted)
+ return;
+
+ pthread_mutex_lock(&plist_mtx);
+ {
+ struct listnode *ln, *nn;
+ struct peer *p;
+
+ // make sure this peer isn't already in the list
+ for (ALL_LIST_ELEMENTS(plist, ln, nn, p))
+ if (p == peer) {
+ pthread_mutex_unlock(&plist_mtx);
+ return;
+ }
+
+ peer_lock(peer);
+ listnode_add(plist, peer);
+ }
+ pthread_mutex_unlock(&plist_mtx);
+ peer_writes_wake();
+}
+
+/**
+ * Turns off packet writing for a peer.
+ */
+void peer_writes_off(struct peer *peer)
+{
+ struct listnode *ln, *nn;
+ struct peer *p;
+ pthread_mutex_lock(&plist_mtx);
+ {
+ for (ALL_LIST_ELEMENTS(plist, ln, nn, p))
+ if (p == peer) {
+ list_delete_node(plist, ln);
+ peer_unlock(peer);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&plist_mtx);
+}
+
+/**
+ * Wakes up the write thread to do work.
+ */
+void peer_writes_wake()
+{
+ pthread_cond_signal(&write_cond);
+}