summaryrefslogtreecommitdiffstats
path: root/bgpd
diff options
context:
space:
mode:
authorQuentin Young <qlyoung@cumulusnetworks.com>2017-05-02 02:37:45 +0200
committerQuentin Young <qlyoung@cumulusnetworks.com>2017-11-30 22:17:59 +0100
commit424ab01d0f69a71b865e5f2d817baea7ce263e44 (patch)
tree82f06cf1dae116f6db05cd5409475a6f8b6dcf1e /bgpd
parentbgpd: move bgp i/o to a separate source file (diff)
downloadfrr-424ab01d0f69a71b865e5f2d817baea7ce263e44.tar.xz
frr-424ab01d0f69a71b865e5f2d817baea7ce263e44.zip
bgpd: implement buffered reads
* Move and modify all network input related code to bgp_io.c * Add a real input buffer to `struct peer` * Move connection initialization to its own thread.c task instead of piggybacking off of bgp_read() * Tons of little fixups Primary changes are in bgp_packet.[ch], bgp_io.[ch], bgp_fsm.[ch]. Changes made elsewhere are almost exclusively refactoring peer->ibuf to peer->curr since peer->ibuf is now the true FIFO packet input buffer while peer->curr represents the packet currently being processed by the main pthread. Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
Diffstat (limited to 'bgpd')
-rw-r--r--bgpd/bgp_attr.c68
-rw-r--r--bgpd/bgp_fsm.c129
-rw-r--r--bgpd/bgp_fsm.h18
-rw-r--r--bgpd/bgp_io.c531
-rw-r--r--bgpd/bgp_io.h77
-rw-r--r--bgpd/bgp_keepalives.c58
-rw-r--r--bgpd/bgp_keepalives.h3
-rw-r--r--bgpd/bgp_packet.c350
-rw-r--r--bgpd/bgp_packet.h3
-rw-r--r--bgpd/bgp_updgrp.h2
-rw-r--r--bgpd/bgp_vty.c42
-rw-r--r--bgpd/bgpd.c49
-rw-r--r--bgpd/bgpd.h29
-rw-r--r--bgpd/rfapi/rfapi.c29
-rw-r--r--bgpd/rfapi/vnc_zebra.c35
15 files changed, 864 insertions, 559 deletions
diff --git a/bgpd/bgp_attr.c b/bgpd/bgp_attr.c
index 6ddb2ec8a..859158e3c 100644
--- a/bgpd/bgp_attr.c
+++ b/bgpd/bgp_attr.c
@@ -1156,7 +1156,7 @@ static int bgp_attr_aspath(struct bgp_attr_parser_args *args)
* peer with AS4 => will get 4Byte ASnums
* otherwise, will get 16 Bit
*/
- attr->aspath = aspath_parse(peer->ibuf, length,
+ attr->aspath = aspath_parse(peer->curr, length,
CHECK_FLAG(peer->cap, PEER_CAP_AS4_RCV));
/* In case of IBGP, length will be zero. */
@@ -1230,7 +1230,7 @@ static int bgp_attr_as4_path(struct bgp_attr_parser_args *args,
struct attr *const attr = args->attr;
const bgp_size_t length = args->length;
- *as4_path = aspath_parse(peer->ibuf, length, 1);
+ *as4_path = aspath_parse(peer->curr, length, 1);
/* In case of IBGP, length will be zero. */
if (!*as4_path) {
@@ -1271,7 +1271,7 @@ static bgp_attr_parse_ret_t bgp_attr_nexthop(struct bgp_attr_parser_args *args)
logged locally (this is implemented somewhere else). The UPDATE
message
gets ignored in any of these cases. */
- nexthop_n = stream_get_ipv4(peer->ibuf);
+ nexthop_n = stream_get_ipv4(peer->curr);
nexthop_h = ntohl(nexthop_n);
if ((IPV4_NET0(nexthop_h) || IPV4_NET127(nexthop_h)
|| IPV4_CLASS_DE(nexthop_h))
@@ -1307,7 +1307,7 @@ static bgp_attr_parse_ret_t bgp_attr_med(struct bgp_attr_parser_args *args)
args->total);
}
- attr->med = stream_getl(peer->ibuf);
+ attr->med = stream_getl(peer->curr);
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_MULTI_EXIT_DISC);
@@ -1333,11 +1333,11 @@ bgp_attr_local_pref(struct bgp_attr_parser_args *args)
external peer, then this attribute MUST be ignored by the
receiving speaker. */
if (peer->sort == BGP_PEER_EBGP) {
- stream_forward_getp(peer->ibuf, length);
+ stream_forward_getp(peer->curr, length);
return BGP_ATTR_PARSE_PROCEED;
}
- attr->local_pref = stream_getl(peer->ibuf);
+ attr->local_pref = stream_getl(peer->curr);
/* Set the local-pref flag. */
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_LOCAL_PREF);
@@ -1386,10 +1386,10 @@ static int bgp_attr_aggregator(struct bgp_attr_parser_args *args)
}
if (CHECK_FLAG(peer->cap, PEER_CAP_AS4_RCV))
- attr->aggregator_as = stream_getl(peer->ibuf);
+ attr->aggregator_as = stream_getl(peer->curr);
else
- attr->aggregator_as = stream_getw(peer->ibuf);
- attr->aggregator_addr.s_addr = stream_get_ipv4(peer->ibuf);
+ attr->aggregator_as = stream_getw(peer->curr);
+ attr->aggregator_addr.s_addr = stream_get_ipv4(peer->curr);
/* Set atomic aggregate flag. */
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_AGGREGATOR);
@@ -1413,8 +1413,8 @@ bgp_attr_as4_aggregator(struct bgp_attr_parser_args *args,
0);
}
- *as4_aggregator_as = stream_getl(peer->ibuf);
- as4_aggregator_addr->s_addr = stream_get_ipv4(peer->ibuf);
+ *as4_aggregator_as = stream_getl(peer->curr);
+ as4_aggregator_addr->s_addr = stream_get_ipv4(peer->curr);
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_AS4_AGGREGATOR);
@@ -1540,10 +1540,10 @@ bgp_attr_community(struct bgp_attr_parser_args *args)
}
attr->community =
- community_parse((u_int32_t *)stream_pnt(peer->ibuf), length);
+ community_parse((u_int32_t *)stream_pnt(peer->curr), length);
/* XXX: fix community_parse to use stream API and remove this */
- stream_forward_getp(peer->ibuf, length);
+ stream_forward_getp(peer->curr, length);
if (!attr->community)
return bgp_attr_malformed(args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR,
@@ -1570,7 +1570,7 @@ bgp_attr_originator_id(struct bgp_attr_parser_args *args)
args->total);
}
- attr->originator_id.s_addr = stream_get_ipv4(peer->ibuf);
+ attr->originator_id.s_addr = stream_get_ipv4(peer->curr);
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_ORIGINATOR_ID);
@@ -1594,10 +1594,10 @@ bgp_attr_cluster_list(struct bgp_attr_parser_args *args)
}
attr->cluster =
- cluster_parse((struct in_addr *)stream_pnt(peer->ibuf), length);
+ cluster_parse((struct in_addr *)stream_pnt(peer->curr), length);
/* XXX: Fix cluster_parse to use stream API and then remove this */
- stream_forward_getp(peer->ibuf, length);
+ stream_forward_getp(peer->curr, length);
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_CLUSTER_LIST);
@@ -1778,7 +1778,7 @@ int bgp_mp_unreach_parse(struct bgp_attr_parser_args *args,
struct attr *const attr = args->attr;
const bgp_size_t length = args->length;
- s = peer->ibuf;
+ s = peer->curr;
#define BGP_MP_UNREACH_MIN_SIZE 3
if ((length > STREAM_READABLE(s)) || (length < BGP_MP_UNREACH_MIN_SIZE))
@@ -1832,9 +1832,9 @@ bgp_attr_large_community(struct bgp_attr_parser_args *args)
}
attr->lcommunity =
- lcommunity_parse((u_int8_t *)stream_pnt(peer->ibuf), length);
+ lcommunity_parse((u_int8_t *)stream_pnt(peer->curr), length);
/* XXX: fix ecommunity_parse to use stream API */
- stream_forward_getp(peer->ibuf, length);
+ stream_forward_getp(peer->curr, length);
if (!attr->lcommunity)
return bgp_attr_malformed(args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR,
@@ -1861,9 +1861,9 @@ bgp_attr_ext_communities(struct bgp_attr_parser_args *args)
}
attr->ecommunity =
- ecommunity_parse((u_int8_t *)stream_pnt(peer->ibuf), length);
+ ecommunity_parse((u_int8_t *)stream_pnt(peer->curr), length);
/* XXX: fix ecommunity_parse to use stream API */
- stream_forward_getp(peer->ibuf, length);
+ stream_forward_getp(peer->curr, length);
if (!attr->ecommunity)
return bgp_attr_malformed(args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR,
@@ -1957,7 +1957,7 @@ static int bgp_attr_encap(uint8_t type, struct peer *peer, /* IN */
+ sublength);
tlv->type = subtype;
tlv->length = sublength;
- stream_get(tlv->value, peer->ibuf, sublength);
+ stream_get(tlv->value, peer->curr, sublength);
length -= sublength;
/* attach tlv to encap chain */
@@ -2025,8 +2025,8 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args,
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_PREFIX_SID);
- type = stream_getc(peer->ibuf);
- length = stream_getw(peer->ibuf);
+ type = stream_getc(peer->curr);
+ length = stream_getw(peer->curr);
if (type == BGP_PREFIX_SID_LABEL_INDEX) {
if (length != BGP_PREFIX_SID_LABEL_INDEX_LENGTH) {
@@ -2039,11 +2039,11 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args,
}
/* Ignore flags and reserved */
- stream_getc(peer->ibuf);
- stream_getw(peer->ibuf);
+ stream_getc(peer->curr);
+ stream_getw(peer->curr);
/* Fetch the label index and see if it is valid. */
- label_index = stream_getl(peer->ibuf);
+ label_index = stream_getl(peer->curr);
if (label_index == BGP_INVALID_LABEL_INDEX)
return bgp_attr_malformed(
args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR,
@@ -2074,16 +2074,16 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args,
}
/* Ignore reserved */
- stream_getc(peer->ibuf);
- stream_getw(peer->ibuf);
+ stream_getc(peer->curr);
+ stream_getw(peer->curr);
- stream_get(&ipv6_sid, peer->ibuf, 16);
+ stream_get(&ipv6_sid, peer->curr, 16);
}
/* Placeholder code for the Originator SRGB type */
else if (type == BGP_PREFIX_SID_ORIGINATOR_SRGB) {
/* Ignore flags */
- stream_getw(peer->ibuf);
+ stream_getw(peer->curr);
length -= 2;
@@ -2099,8 +2099,8 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args,
srgb_count = length / BGP_PREFIX_SID_ORIGINATOR_SRGB_LENGTH;
for (int i = 0; i < srgb_count; i++) {
- stream_get(&srgb_base, peer->ibuf, 3);
- stream_get(&srgb_range, peer->ibuf, 3);
+ stream_get(&srgb_base, peer->curr, 3);
+ stream_get(&srgb_range, peer->curr, 3);
}
}
@@ -2125,7 +2125,7 @@ static bgp_attr_parse_ret_t bgp_attr_unknown(struct bgp_attr_parser_args *args)
peer->host, type, length);
/* Forward read pointer of input stream. */
- stream_forward_getp(peer->ibuf, length);
+ stream_forward_getp(peer->curr, length);
/* If any of the mandatory well-known attributes are not recognized,
then the Error Subcode is set to Unrecognized Well-known
diff --git a/bgpd/bgp_fsm.c b/bgpd/bgp_fsm.c
index 95e2f157c..18262e6e9 100644
--- a/bgpd/bgp_fsm.c
+++ b/bgpd/bgp_fsm.c
@@ -126,35 +126,61 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
from_peer->host, from_peer, from_peer->fd, peer,
peer->fd);
- peer_writes_off(peer);
- BGP_READ_OFF(peer->t_read);
- peer_writes_off(from_peer);
- BGP_READ_OFF(from_peer->t_read);
+ bgp_writes_off(peer);
+ bgp_reads_off(peer);
+ bgp_writes_off(from_peer);
+ bgp_reads_off(from_peer);
BGP_TIMER_OFF(peer->t_routeadv);
+ BGP_TIMER_OFF(peer->t_connect);
+ BGP_TIMER_OFF(peer->t_connect_check);
BGP_TIMER_OFF(from_peer->t_routeadv);
-
- fd = peer->fd;
- peer->fd = from_peer->fd;
- from_peer->fd = fd;
- stream_reset(peer->ibuf);
+ BGP_TIMER_OFF(from_peer->t_connect);
+ BGP_TIMER_OFF(from_peer->t_connect_check);
// At this point in time, it is possible that there are packets pending
// on
- // from_peer->obuf. These need to be transferred to the new peer struct.
- pthread_mutex_lock(&peer->obuf_mtx);
- pthread_mutex_lock(&from_peer->obuf_mtx);
+ // various buffers. Those need to be transferred or dropped, otherwise
+ // we'll
+ // get spurious failures during session establishment.
+ pthread_mutex_lock(&peer->io_mtx);
+ pthread_mutex_lock(&from_peer->io_mtx);
{
- // wipe new peer's packet queue
+ fd = peer->fd;
+ peer->fd = from_peer->fd;
+ from_peer->fd = fd;
+
+ stream_fifo_clean(peer->ibuf);
stream_fifo_clean(peer->obuf);
+ stream_reset(peer->ibuf_work);
- // copy each packet from old peer's queue to new peer's queue
+ // this should never happen, since bgp_process_packet() is the
+ // only task
+ // that sets and unsets the current packet and it runs in our
+ // pthread.
+ if (peer->curr) {
+ zlog_err(
+ "[%s] Dropping pending packet on connection transfer:",
+ peer->host);
+ u_int16_t type = stream_getc_from(peer->curr,
+ BGP_MARKER_SIZE + 2);
+ bgp_dump_packet(peer, type, peer->curr);
+ stream_free(peer->curr);
+ peer->curr = NULL;
+ }
+
+ // copy each packet from old peer's output queue to new peer
while (from_peer->obuf->head)
stream_fifo_push(peer->obuf,
stream_fifo_pop(from_peer->obuf));
+
+ // copy each packet from old peer's input queue to new peer
+ while (from_peer->ibuf->head)
+ stream_fifo_push(peer->ibuf,
+ stream_fifo_pop(from_peer->ibuf));
}
- pthread_mutex_unlock(&from_peer->obuf_mtx);
- pthread_mutex_unlock(&peer->obuf_mtx);
+ pthread_mutex_unlock(&from_peer->io_mtx);
+ pthread_mutex_unlock(&peer->io_mtx);
peer->as = from_peer->as;
peer->v_holdtime = from_peer->v_holdtime;
@@ -232,8 +258,8 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
}
}
- BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
- peer_writes_on(peer);
+ bgp_reads_on(peer);
+ bgp_writes_on(peer);
if (from_peer)
peer_xfer_stats(peer, from_peer);
@@ -381,6 +407,10 @@ static int bgp_connect_timer(struct thread *thread)
int ret;
peer = THREAD_ARG(thread);
+
+ assert(!peer->t_write);
+ assert(!peer->t_read);
+
peer->t_connect = NULL;
if (bgp_debug_neighbor_events(peer))
@@ -429,6 +459,9 @@ int bgp_routeadv_timer(struct thread *thread)
peer->synctime = bgp_clock();
+ thread_add_background(bm->master, bgp_generate_updgrp_packets, peer, 0,
+ &peer->t_generate_updgrp_packets);
+
/* MRAI timer will be started again when FIFO is built, no need to
* do it here.
*/
@@ -634,6 +667,9 @@ void bgp_adjust_routeadv(struct peer *peer)
BGP_TIMER_OFF(peer->t_routeadv);
peer->synctime = bgp_clock();
+ thread_add_background(bm->master, bgp_generate_updgrp_packets,
+ peer, 0,
+ &peer->t_generate_updgrp_packets);
return;
}
@@ -1028,33 +1064,40 @@ int bgp_stop(struct peer *peer)
bgp_bfd_deregister_peer(peer);
}
- /* Stop read and write threads when exists. */
- BGP_READ_OFF(peer->t_read);
- peer_writes_off(peer);
+ /* stop keepalives */
+ peer_keepalives_off(peer);
+
+ /* Stop read and write threads. */
+ bgp_writes_off(peer);
+ bgp_reads_off(peer);
+
+ THREAD_OFF(peer->t_connect_check);
/* Stop all timers. */
BGP_TIMER_OFF(peer->t_start);
BGP_TIMER_OFF(peer->t_connect);
BGP_TIMER_OFF(peer->t_holdtime);
- peer_keepalives_off(peer);
BGP_TIMER_OFF(peer->t_routeadv);
- BGP_TIMER_OFF(peer->t_generate_updgrp_packets);
-
- /* Stream reset. */
- peer->packet_size = 0;
/* Clear input and output buffer. */
- if (peer->ibuf)
- stream_reset(peer->ibuf);
- if (peer->work)
- stream_reset(peer->work);
-
- pthread_mutex_lock(&peer->obuf_mtx);
+ pthread_mutex_lock(&peer->io_mtx);
{
+ if (peer->ibuf)
+ stream_fifo_clean(peer->ibuf);
if (peer->obuf)
stream_fifo_clean(peer->obuf);
+
+ if (peer->ibuf_work)
+ stream_reset(peer->ibuf_work);
+ if (peer->obuf_work)
+ stream_reset(peer->obuf_work);
+
+ if (peer->curr) {
+ stream_free(peer->curr);
+ peer->curr = NULL;
+ }
}
- pthread_mutex_unlock(&peer->obuf_mtx);
+ pthread_mutex_unlock(&peer->io_mtx);
/* Close of file descriptor. */
if (peer->fd >= 0) {
@@ -1177,10 +1220,12 @@ static int bgp_connect_check(struct thread *thread)
struct peer *peer;
peer = THREAD_ARG(thread);
+ assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_READS_ON));
+ assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON));
+ assert(!peer->t_read);
+ assert(!peer->t_write);
- /* This value needs to be unset in order for bgp_read() to be scheduled
- */
- BGP_READ_OFF(peer->t_read);
+ peer->t_connect_check = NULL;
/* Check file descriptor. */
slen = sizeof(status);
@@ -1218,17 +1263,16 @@ static int bgp_connect_success(struct peer *peer)
return -1;
}
- peer_writes_on(peer);
-
if (bgp_getsockname(peer) < 0) {
zlog_err("%s: bgp_getsockname(): failed for peer %s, fd %d",
__FUNCTION__, peer->host, peer->fd);
bgp_notify_send(peer, BGP_NOTIFY_FSM_ERR,
0); /* internal error */
+ bgp_writes_on(peer);
return -1;
}
- BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
+ bgp_reads_on(peer);
if (bgp_debug_neighbor_events(peer)) {
char buf1[SU_ADDRSTRLEN];
@@ -1332,6 +1376,10 @@ int bgp_start(struct peer *peer)
#endif
}
+ assert(!peer->t_write);
+ assert(!peer->t_read);
+ assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON));
+ assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_READS_ON));
status = bgp_connect(peer);
switch (status) {
@@ -1362,7 +1410,8 @@ int bgp_start(struct peer *peer)
// when the socket becomes ready (or fails to connect),
// bgp_connect_check
// will be called.
- BGP_READ_ON(peer->t_read, bgp_connect_check, peer->fd);
+ thread_add_read(bm->master, bgp_connect_check, peer, peer->fd,
+ &peer->t_connect_check);
break;
}
return 0;
diff --git a/bgpd/bgp_fsm.h b/bgpd/bgp_fsm.h
index a7abfdb2f..131de40b5 100644
--- a/bgpd/bgp_fsm.h
+++ b/bgpd/bgp_fsm.h
@@ -23,24 +23,6 @@
#define _QUAGGA_BGP_FSM_H
/* Macro for BGP read, write and timer thread. */
-#define BGP_READ_ON(T, F, V) \
- do { \
- if ((peer->status != Deleted)) \
- thread_add_read(bm->master, (F), peer, (V), &(T)); \
- } while (0)
-
-#define BGP_READ_OFF(T) \
- do { \
- if (T) \
- THREAD_READ_OFF(T); \
- } while (0)
-
-#define BGP_WRITE_OFF(T) \
- do { \
- if (T) \
- THREAD_WRITE_OFF(T); \
- } while (0)
-
#define BGP_TIMER_ON(T, F, V) \
do { \
if ((peer->status != Deleted)) \
diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c
index 5d14737d2..4bdafa744 100644
--- a/bgpd/bgp_io.c
+++ b/bgpd/bgp_io.c
@@ -1,8 +1,10 @@
/*
BGP I/O.
- Implements a consumer thread to flush packets destined for remote peers.
+ Implements packet I/O in a consumer pthread.
+ --------------------------------------------
Copyright (C) 2017 Cumulus Networks
+ Quentin Young
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -31,7 +33,7 @@
#include "log.h"
#include "monotime.h"
#include "network.h"
-#include "frr_pthread.h"
+#include "pqueue.h"
#include "bgpd/bgpd.h"
#include "bgpd/bgp_io.h"
@@ -39,172 +41,278 @@
#include "bgpd/bgp_packet.h"
#include "bgpd/bgp_fsm.h"
-static int bgp_write(struct peer *);
-static void peer_process_writes(struct hash_backet *, void *);
+/* forward declarations */
+static uint16_t bgp_write(struct peer *);
+static uint16_t bgp_read(struct peer *);
+static int bgp_process_writes(struct thread *);
+static int bgp_process_reads(struct thread *);
+static bool validate_header(struct peer *);
-bool bgp_packet_writes_thread_run = false;
+/* generic i/o status codes */
+#define BGP_IO_TRANS_ERR (1 << 1) // EAGAIN or similar occurred
+#define BGP_IO_FATAL_ERR (1 << 2) // some kind of fatal TCP error
-/* Hash table of peers to operate on, associated synchronization primitives and
- * hash table callbacks.
+/* bgp_read() status codes */
+#define BGP_IO_READ_HEADER (1 << 3) // when read a full packet header
+#define BGP_IO_READ_FULLPACKET (1 << 4) // read a full packet
+
+/* Start and stop routines for I/O pthread + control variables
* ------------------------------------------------------------------------ */
-static struct hash *peerhash;
-/* Mutex to protect hash table */
-static pthread_mutex_t *peerhash_mtx;
-/* Condition variable used to notify the write thread that there is work to do
- */
-static pthread_cond_t *write_cond;
+bool bgp_packet_write_thread_run = false;
+pthread_mutex_t *work_mtx;
-static unsigned int peer_hash_key_make(void *p)
-{
- struct peer *peer = p;
- return sockunion_hash(&peer->su);
-}
+static struct list *read_cancel;
+static struct list *write_cancel;
-static int peer_hash_cmp(const void *p1, const void *p2)
+void bgp_io_init()
{
- const struct peer *peer1 = p1;
- const struct peer *peer2 = p2;
- return (sockunion_same(&peer1->su, &peer2->su)
- && CHECK_FLAG(peer1->flags, PEER_FLAG_CONFIG_NODE)
- == CHECK_FLAG(peer2->flags, PEER_FLAG_CONFIG_NODE));
+ work_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t));
+ pthread_mutex_init(work_mtx, NULL);
+
+ read_cancel = list_new();
+ write_cancel = list_new();
}
-/* ------------------------------------------------------------------------ */
-void peer_writes_init(void)
+void *bgp_io_start(void *arg)
{
- peerhash_mtx = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_mutex_t));
- write_cond = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_cond_t));
-
- // initialize mutex
- pthread_mutex_init(peerhash_mtx, NULL);
-
- // use monotonic clock with condition variable
- pthread_condattr_t attrs;
- pthread_condattr_init(&attrs);
- pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
- pthread_cond_init(write_cond, &attrs);
- pthread_condattr_destroy(&attrs);
+ struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
+
+ // we definitely don't want to handle signals
+ fpt->master->handle_signals = false;
+
+ bgp_packet_write_thread_run = true;
+ struct thread task;
+
+ while (bgp_packet_write_thread_run) {
+ if (thread_fetch(fpt->master, &task)) {
+ pthread_mutex_lock(work_mtx);
+ {
+ bool cancel = false;
+ struct peer *peer = THREAD_ARG(&task);
+ if ((task.func == bgp_process_reads
+ && listnode_lookup(read_cancel, peer))
+ || (task.func == bgp_process_writes
+ && listnode_lookup(write_cancel, peer)))
+ cancel = true;
+
+ list_delete_all_node(write_cancel);
+ list_delete_all_node(read_cancel);
+
+ if (!cancel)
+ thread_call(&task);
+ }
+ pthread_mutex_unlock(work_mtx);
+ }
+ }
- // initialize peerhash
- peerhash = hash_create_size(2048, peer_hash_key_make, peer_hash_cmp);
+ return NULL;
}
-static void peer_writes_finish(void *arg)
+int bgp_io_stop(void **result, struct frr_pthread *fpt)
{
- bgp_packet_writes_thread_run = false;
-
- if (peerhash)
- hash_free(peerhash);
-
- peerhash = NULL;
+ fpt->master->spin = false;
+ bgp_packet_write_thread_run = false;
+ pthread_kill(fpt->thread, SIGINT);
+ pthread_join(fpt->thread, result);
- pthread_mutex_unlock(peerhash_mtx);
- pthread_mutex_destroy(peerhash_mtx);
- pthread_cond_destroy(write_cond);
+ pthread_mutex_unlock(work_mtx);
+ pthread_mutex_destroy(work_mtx);
- XFREE(MTYPE_PTHREAD, peerhash_mtx);
- XFREE(MTYPE_PTHREAD, write_cond);
+ list_delete(read_cancel);
+ list_delete(write_cancel);
+ XFREE(MTYPE_TMP, work_mtx);
+ return 0;
}
+/* ------------------------------------------------------------------------ */
-void *peer_writes_start(void *arg)
+void bgp_writes_on(struct peer *peer)
{
- struct timeval currtime = {0, 0};
- struct timeval sleeptime = {0, 500};
- struct timespec next_update = {0, 0};
-
- pthread_mutex_lock(peerhash_mtx);
-
- // register cleanup handler
- pthread_cleanup_push(&peer_writes_finish, NULL);
-
- bgp_packet_writes_thread_run = true;
+ assert(peer->status != Deleted);
+ assert(peer->obuf);
+ assert(peer->ibuf);
+ assert(peer->ibuf_work);
+ assert(!peer->t_connect_check);
+ assert(peer->fd);
- while (bgp_packet_writes_thread_run) {
- // wait around until next update time
- if (peerhash->count > 0)
- pthread_cond_timedwait(write_cond, peerhash_mtx,
- &next_update);
- else // wait around until we have some peers
- while (peerhash->count == 0
- && bgp_packet_writes_thread_run)
- pthread_cond_wait(write_cond, peerhash_mtx);
+ struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
- hash_iterate(peerhash, peer_process_writes, NULL);
-
- monotime(&currtime);
- timeradd(&currtime, &sleeptime, &currtime);
- TIMEVAL_TO_TIMESPEC(&currtime, &next_update);
+ pthread_mutex_lock(work_mtx);
+ {
+ listnode_delete(write_cancel, peer);
+ thread_add_write(fpt->master, bgp_process_writes, peer,
+ peer->fd, &peer->t_write);
+ SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
}
+ pthread_mutex_unlock(work_mtx);
+}
- // clean up
- pthread_cleanup_pop(1);
+void bgp_writes_off(struct peer *peer)
+{
+ pthread_mutex_lock(work_mtx);
+ {
+ THREAD_OFF(peer->t_write);
+ THREAD_OFF(peer->t_generate_updgrp_packets);
+ listnode_add(write_cancel, peer);
- return NULL;
+ // peer access by us after this point will result in pain
+ UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
+ }
+ pthread_mutex_unlock(work_mtx);
+ /* upon return, i/o thread must not access the peer */
}
-int peer_writes_stop(void **result)
+void bgp_reads_on(struct peer *peer)
{
- struct frr_pthread *fpt = frr_pthread_get(PTHREAD_WRITE);
- bgp_packet_writes_thread_run = false;
- peer_writes_wake();
- pthread_join(fpt->thread, result);
- return 0;
+ assert(peer->status != Deleted);
+ assert(peer->ibuf);
+ assert(peer->fd);
+ assert(peer->ibuf_work);
+ assert(stream_get_endp(peer->ibuf_work) == 0);
+ assert(peer->obuf);
+ assert(!peer->t_connect_check);
+ assert(peer->fd);
+
+ struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
+
+ pthread_mutex_lock(work_mtx);
+ {
+ listnode_delete(read_cancel, peer);
+ thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
+ &peer->t_read);
+ SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
+ }
+ pthread_mutex_unlock(work_mtx);
}
-void peer_writes_on(struct peer *peer)
+void bgp_reads_off(struct peer *peer)
{
- if (peer->status == Deleted)
- return;
-
- pthread_mutex_lock(peerhash_mtx);
+ pthread_mutex_lock(work_mtx);
{
- if (!hash_lookup(peerhash, peer)) {
- hash_get(peerhash, peer, hash_alloc_intern);
- peer_lock(peer);
- }
+ THREAD_OFF(peer->t_read);
+ THREAD_OFF(peer->t_process_packet);
+ listnode_add(read_cancel, peer);
- SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
+ // peer access by us after this point will result in pain
+ UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
}
- pthread_mutex_unlock(peerhash_mtx);
- peer_writes_wake();
+ pthread_mutex_unlock(work_mtx);
}
-void peer_writes_off(struct peer *peer)
+/**
+ * Called from PTHREAD_IO when select() or poll() determines that the file
+ * descriptor is ready to be written to.
+ */
+static int bgp_process_writes(struct thread *thread)
{
- pthread_mutex_lock(peerhash_mtx);
+ static struct peer *peer;
+ peer = THREAD_ARG(thread);
+ uint16_t status;
+
+ if (peer->fd < 0)
+ return -1;
+
+ struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
+
+ bool reschedule;
+ pthread_mutex_lock(&peer->io_mtx);
{
- if (hash_release(peerhash, peer)) {
- peer_unlock(peer);
- fprintf(stderr, "Releasing %p\n", peer);
- }
+ status = bgp_write(peer);
+ reschedule = (stream_fifo_head(peer->obuf) != NULL);
+ }
+ pthread_mutex_unlock(&peer->io_mtx);
- UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
+ if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
}
- pthread_mutex_unlock(peerhash_mtx);
-}
-void peer_writes_wake()
-{
- pthread_cond_signal(write_cond);
+ if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
+ reschedule = 0; // problem
+
+ if (reschedule) {
+ thread_add_write(fpt->master, bgp_process_writes, peer,
+ peer->fd, &peer->t_write);
+ thread_add_background(bm->master, bgp_generate_updgrp_packets,
+ peer, 0,
+ &peer->t_generate_updgrp_packets);
+ }
+
+ return 0;
}
/**
- * Callback for hash_iterate. Takes a hash bucket, unwraps it into a peer and
- * synchronously calls bgp_write() on the peer.
+ * Called from PTHREAD_IO when select() or poll() determines that the file
+ * descriptor is ready to be read from.
*/
-static void peer_process_writes(struct hash_backet *hb, void *arg)
+static int bgp_process_reads(struct thread *thread)
{
static struct peer *peer;
- peer = hb->data;
- pthread_mutex_lock(&peer->obuf_mtx);
+ peer = THREAD_ARG(thread);
+ uint16_t status;
+
+ if (peer->fd < 0)
+ return -1;
+
+ struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
+
+ bool reschedule = true;
+
+ // execute read
+ pthread_mutex_lock(&peer->io_mtx);
{
- bgp_write(peer);
+ status = bgp_read(peer);
+ }
+ pthread_mutex_unlock(&peer->io_mtx);
+
+ // check results of read
+ bool header_valid = true;
+
+ if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
}
- pthread_mutex_unlock(&peer->obuf_mtx);
- // dispatch job on main thread
- BGP_TIMER_ON(peer->t_generate_updgrp_packets,
- bgp_generate_updgrp_packets, 100);
+ if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
+ reschedule = false; // problem
+
+ if (CHECK_FLAG(status, BGP_IO_READ_HEADER)) {
+ header_valid = validate_header(peer);
+ if (!header_valid) {
+ bgp_size_t packetsize =
+ MIN((int)stream_get_endp(peer->ibuf_work),
+ BGP_MAX_PACKET_SIZE);
+ memcpy(peer->last_reset_cause, peer->ibuf_work->data,
+ packetsize);
+ peer->last_reset_cause_size = packetsize;
+ // We're tearing the session down, no point in
+ // rescheduling.
+ // Additionally, bgp_read() will use the TLV if it's
+ // present to
+ // determine how much to read; if this is corrupt, we'll
+ // crash the
+ // program.
+ reschedule = false;
+ }
+ }
+
+ // if we read a full packet, push it onto peer->ibuf, reset our WiP
+ // buffer
+ // and schedule a job to process it on the main thread
+ if (header_valid && CHECK_FLAG(status, BGP_IO_READ_FULLPACKET)) {
+ pthread_mutex_lock(&peer->io_mtx);
+ {
+ stream_fifo_push(peer->ibuf,
+ stream_dup(peer->ibuf_work));
+ }
+ pthread_mutex_unlock(&peer->io_mtx);
+ stream_reset(peer->ibuf_work);
+ assert(stream_get_endp(peer->ibuf_work) == 0);
+
+ thread_add_background(bm->master, bgp_process_packet, peer, 0,
+ &peer->t_process_packet);
+ }
+
+ if (reschedule)
+ thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
+ &peer->t_read);
+
+ return 0;
}
/**
@@ -212,14 +320,14 @@ static void peer_process_writes(struct hash_backet *hb, void *arg)
*
* This function pops packets off of peer->obuf and writes them to peer->fd.
* The amount of packets written is equal to the minimum of peer->wpkt_quanta
- * and the number of packets on the output buffer.
+ * and the number of packets on the output buffer, unless an error occurs.
*
* If write() returns an error, the appropriate FSM event is generated.
*
* The return value is equal to the number of packets written
* (which may be zero).
*/
-static int bgp_write(struct peer *peer)
+static uint16_t bgp_write(struct peer *peer)
{
u_char type;
struct stream *s;
@@ -227,10 +335,8 @@ static int bgp_write(struct peer *peer)
int update_last_write = 0;
unsigned int count = 0;
unsigned int oc = 0;
+ uint16_t status = 0;
- /* Write packets. The number of packets written is the value of
- * bgp->wpkt_quanta or the size of the output buffer, whichever is
- * smaller.*/
while (count < peer->bgp->wpkt_quanta
&& (s = stream_fifo_head(peer->obuf))) {
int writenum;
@@ -239,8 +345,12 @@ static int bgp_write(struct peer *peer)
num = write(peer->fd, STREAM_PNT(s), writenum);
if (num < 0) {
- if (!ERRNO_IO_RETRY(errno))
+ if (!ERRNO_IO_RETRY(errno)) {
BGP_EVENT_ADD(peer, TCP_fatal_error);
+ SET_FLAG(status, BGP_IO_FATAL_ERR);
+ } else {
+ SET_FLAG(status, BGP_IO_TRANS_ERR);
+ }
goto done;
} else if (num != writenum) // incomplete write
@@ -288,7 +398,7 @@ static int bgp_write(struct peer *peer)
}
count++;
- /* OK we send packet so delete it. */
+
stream_free(stream_fifo_pop(peer->obuf));
update_last_write = 1;
}
@@ -303,5 +413,170 @@ done : {
peer->last_write = bgp_clock();
}
- return count;
+ return status;
+}
+
+/**
+ * Reads <= 1 packet worth of data from peer->fd into peer->ibuf_work.
+ *
+ * @return whether a full packet was read
+ */
+static uint16_t bgp_read(struct peer *peer)
+{
+ int readsize; // how many bytes we want to read
+ int nbytes; // how many bytes we actually read
+ bool have_header = false;
+ uint16_t status = 0;
+
+ if (stream_get_endp(peer->ibuf_work) < BGP_HEADER_SIZE)
+ readsize = BGP_HEADER_SIZE - stream_get_endp(peer->ibuf_work);
+ else {
+ // retrieve packet length from tlv and compute # bytes we still
+ // need
+ u_int16_t mlen =
+ stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE);
+ readsize = mlen - stream_get_endp(peer->ibuf_work);
+ have_header = true;
+ }
+
+ nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
+
+ if (nbytes <= 0) // handle errors
+ {
+ switch (nbytes) {
+ case -1: // fatal error; tear down the session
+ zlog_err("%s [Error] bgp_read_packet error: %s",
+ peer->host, safe_strerror(errno));
+
+ if (peer->status == Established) {
+ if (CHECK_FLAG(peer->sflags,
+ PEER_STATUS_NSF_MODE)) {
+ peer->last_reset =
+ PEER_DOWN_NSF_CLOSE_SESSION;
+ SET_FLAG(peer->sflags,
+ PEER_STATUS_NSF_WAIT);
+ } else
+ peer->last_reset =
+ PEER_DOWN_CLOSE_SESSION;
+ }
+
+ BGP_EVENT_ADD(peer, TCP_fatal_error);
+ SET_FLAG(status, BGP_IO_FATAL_ERR);
+ break;
+
+ case 0: // TCP session closed
+ if (bgp_debug_neighbor_events(peer))
+ zlog_debug(
+ "%s [Event] BGP connection closed fd %d",
+ peer->host, peer->fd);
+
+ if (peer->status == Established) {
+ if (CHECK_FLAG(peer->sflags,
+ PEER_STATUS_NSF_MODE)) {
+ peer->last_reset =
+ PEER_DOWN_NSF_CLOSE_SESSION;
+ SET_FLAG(peer->sflags,
+ PEER_STATUS_NSF_WAIT);
+ } else
+ peer->last_reset =
+ PEER_DOWN_CLOSE_SESSION;
+ }
+
+ BGP_EVENT_ADD(peer, TCP_connection_closed);
+ SET_FLAG(status, BGP_IO_FATAL_ERR);
+ break;
+
+ case -2: // temporary error; come back later
+ SET_FLAG(status, BGP_IO_TRANS_ERR);
+ break;
+ default:
+ break;
+ }
+
+ return status;
+ }
+
+ // If we didn't have the header before read(), and now we do, set the
+ // appropriate flag. The caller must validate the header for us.
+ if (!have_header
+ && stream_get_endp(peer->ibuf_work) >= BGP_HEADER_SIZE) {
+ SET_FLAG(status, BGP_IO_READ_HEADER);
+ have_header = true;
+ }
+ // If we read the # of bytes specified in the tlv, we have read a full
+ // packet.
+ //
+ // Note that the header may not have been validated here. This flag
+ // means
+ // ONLY that we read the # of bytes specified in the header; if the
+ // header is
+ // not valid, the packet MUST NOT be processed further.
+ if (have_header && (stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE)
+ == stream_get_endp(peer->ibuf_work)))
+ SET_FLAG(status, BGP_IO_READ_FULLPACKET);
+
+ return status;
+}
+
+/*
+ * Called after we have read a BGP packet header. Validates marker, message
+ * type and packet length. If any of these aren't correct, sends a notify.
+ */
+static bool validate_header(struct peer *peer)
+{
+ u_int16_t size, type;
+
+ /* Marker check */
+ for (int i = 0; i < BGP_MARKER_SIZE; i++)
+ if (peer->ibuf_work->data[i] != 0xff) {
+ bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
+ BGP_NOTIFY_HEADER_NOT_SYNC);
+ return false;
+ }
+
+ /* Get size and type. */
+ size = stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE);
+ type = stream_getc_from(peer->ibuf_work, BGP_MARKER_SIZE + 2);
+
+ /* BGP type check. */
+ if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
+ && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
+ && type != BGP_MSG_ROUTE_REFRESH_NEW
+ && type != BGP_MSG_ROUTE_REFRESH_OLD
+ && type != BGP_MSG_CAPABILITY) {
+ if (bgp_debug_neighbor_events(peer))
+ zlog_debug("%s unknown message type 0x%02x", peer->host,
+ type);
+
+ bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
+ BGP_NOTIFY_HEADER_BAD_MESTYPE,
+ (u_char *)&type, 1);
+ return false;
+ }
+
+ /* Mimimum packet length check. */
+ if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
+ || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
+ || (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE)
+ || (type == BGP_MSG_NOTIFY && size < BGP_MSG_NOTIFY_MIN_SIZE)
+ || (type == BGP_MSG_KEEPALIVE && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
+ || (type == BGP_MSG_ROUTE_REFRESH_NEW
+ && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
+ || (type == BGP_MSG_ROUTE_REFRESH_OLD
+ && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
+ || (type == BGP_MSG_CAPABILITY
+ && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
+ if (bgp_debug_neighbor_events(peer))
+ zlog_debug("%s bad message length - %d for %s",
+ peer->host, size,
+ type == 128 ? "ROUTE-REFRESH"
+ : bgp_type_str[(int)type]);
+
+ bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
+ BGP_NOTIFY_HEADER_BAD_MESLEN,
+ (u_char *)&size, 2);
+ return false;
+ }
+
+ return true;
}
diff --git a/bgpd/bgp_io.h b/bgpd/bgp_io.h
index 7b81b8ee3..fd5f7659d 100644
--- a/bgpd/bgp_io.h
+++ b/bgpd/bgp_io.h
@@ -23,13 +23,13 @@
#ifndef _FRR_BGP_IO_H
#define _FRR_BGP_IO_H
+#include "frr_pthread.h"
#include "bgpd/bgpd.h"
/**
* Control variable for write thread.
*
- * Setting this variable to false and calling peer_writes_wake() will
- * eventually result in thread termination.
+ * Setting this variable to false will eventually result in thread termination.
*/
extern bool bgp_packet_writes_thread_run;
@@ -37,35 +37,32 @@ extern bool bgp_packet_writes_thread_run;
* Initializes data structures and flags for the write thread.
*
* This function should be called from the main thread before
- * peer_writes_start() is invoked.
+ * bgp_writes_start() is invoked.
*/
-extern void peer_writes_init(void);
+extern void bgp_io_init(void);
/**
* Start function for write thread.
*
- * This function should be passed to pthread_create() during BGP startup.
+ * @param arg - unused
*/
-extern void *peer_writes_start(void *arg);
+extern void *bgp_io_start(void *arg);
/**
* Start function for write thread.
*
* Uninitializes all resources and stops the thread.
*
- * @param result -- where to store data result, unused
+ * @param result - where to store data result, unused
*/
-extern int peer_writes_stop(void **result);
+extern int bgp_io_stop(void **result, struct frr_pthread *fpt);
/**
- * Registers a peer with the write thread.
- *
- * This function adds the peer to an internal data structure, which must be
- * locked for write access. This call will block until the structure can be
- * locked.
+ * Turns on packet writing for a peer.
*
* After this function is called, any packets placed on peer->obuf will be
- * written to peer->fd at regular intervals.
+ * written to peer->fd at regular intervals. Additionally it becomes unsafe to
+ * use peer->fd with select() or poll().
*
* This function increments the peer reference counter with peer_lock().
*
@@ -73,32 +70,58 @@ extern int peer_writes_stop(void **result);
*
* @param peer - peer to register
*/
-extern void peer_writes_on(struct peer *peer);
+extern void bgp_writes_on(struct peer *peer);
/**
- * Deregisters a peer with the write thread.
- *
- * This function removes the peer from an internal data structure, which must
- * be locked for write access. This call will block until the structure can be
- * locked.
+ * Turns off packet writing for a peer.
*
* After this function is called, any packets placed on peer->obuf will not be
- * written to peer->fd.
+ * written to peer->fd. After this function returns it is safe to use peer->fd
+ * with select() or poll().
*
- * This function decrements the peer reference counter with peer_unlock().
+ * If the flush = true, a last-ditch effort will be made to flush any remaining
+ * packets to peer->fd. Upon encountering any error whatsoever, the attempt
+ * will abort. If the caller wishes to know whether the flush succeeded they
+ * may check peer->obuf->count against zero.
*
* If the peer is not registered, nothing happens.
*
* @param peer - peer to deregister
+ * @param flush - as described
+ */
+extern void bgp_writes_off(struct peer *peer);
+
+/**
+ * Turns on packet reading for a peer.
+ *
+ * After this function is called, any packets received on peer->fd will be read
+ * and copied into the FIFO queue peer->ibuf. Additionally it becomes unsafe to
+ * use peer->fd with select() or poll().
+ *
+ * When a full packet is read, bgp_process_packet() will be scheduled on the
+ * main thread.
+ *
+ * This function increments the peer reference counter with peer_lock().
+ *
+ * If the peer is already registered, nothing happens.
+ *
+ * @param peer - peer to register
*/
-extern void peer_writes_off(struct peer *peer);
+extern void bgp_reads_on(struct peer *peer);
/**
- * Notifies the write thread that there is work to be done.
+ * Turns off packet reading for a peer.
+ *
+ * After this function is called, any packets received on peer->fd will not be
+ * read. After this function returns it is safe to use peer->fd with select()
+ * or poll().
*
- * This function has the effect of waking the write thread if it is sleeping.
- * If the thread is not sleeping, this signal will be ignored.
+ * This function decrements the peer reference counter with peer_unlock().
+ *
+ * If the peer is not registered, nothing happens.
+ *
+ * @param peer - peer to deregister
*/
-extern void peer_writes_wake(void);
+extern void bgp_reads_off(struct peer *peer);
#endif /* _FRR_BGP_IO_H */
diff --git a/bgpd/bgp_keepalives.c b/bgpd/bgp_keepalives.c
index 23f3f5173..c7383ae77 100644
--- a/bgpd/bgp_keepalives.c
+++ b/bgpd/bgp_keepalives.c
@@ -1,26 +1,26 @@
/*
- * BGP Keepalives.
- *
- * Implements a producer thread to generate BGP keepalives for peers.
- * ----------------------------------------
- * Copyright (C) 2017 Cumulus Networks, Inc.
- * Quentin Young
- *
- * This file is part of FRRouting.
- *
- * FRRouting is free software; you can redistribute it and/or modify it under
- * the terms of the GNU General Public License as published by the Free
- * Software Foundation; either version 2, or (at your option) any later
- * version.
- *
- * FRRouting is distributed in the hope that it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
- * details.
- *
- * You should have received a copy of the GNU General Public License along with
- * FRRouting; see the file COPYING. If not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ BGP Keepalives.
+
+ Implements a producer thread to generate BGP keepalives for peers.
+ ----------------------------------------
+ Copyright (C) 2017 Cumulus Networks, Inc.
+ Quentin Young
+
+ This file is part of FRRouting.
+
+ FRRouting is free software; you can redistribute it and/or modify it under
+ the terms of the GNU General Public License as published by the Free
+ Software Foundation; either version 2, or (at your option) any later
+ version.
+
+ FRRouting is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+ details.
+
+ You should have received a copy of the GNU General Public License along with
+ FRRouting; see the file COPYING. If not, write to the Free Software
+ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include <zebra.h>
#include <signal.h>
@@ -73,7 +73,8 @@ static void pkat_del(void *pkat)
/*
- * Walks the list of peers, sending keepalives to those that are due for them.
+ * Callback for hash_iterate. Determines if a peer needs a keepalive and if so,
+ * generates and sends it.
*
* For any given peer, if the elapsed time since its last keepalive exceeds its
* configured keepalive timer, a keepalive is sent to the peer and its
@@ -143,8 +144,8 @@ static unsigned int peer_hash_key(void *arg)
void peer_keepalives_init()
{
- peerhash_mtx = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_mutex_t));
- peerhash_cond = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_cond_t));
+ peerhash_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t));
+ peerhash_cond = XCALLOC(MTYPE_TMP, sizeof(pthread_cond_t));
// initialize mutex
pthread_mutex_init(peerhash_mtx, NULL);
@@ -175,8 +176,8 @@ static void peer_keepalives_finish(void *arg)
pthread_mutex_destroy(peerhash_mtx);
pthread_cond_destroy(peerhash_cond);
- XFREE(MTYPE_PTHREAD, peerhash_mtx);
- XFREE(MTYPE_PTHREAD, peerhash_cond);
+ XFREE(MTYPE_TMP, peerhash_mtx);
+ XFREE(MTYPE_TMP, peerhash_cond);
}
/**
@@ -275,9 +276,8 @@ void peer_keepalives_wake()
pthread_mutex_unlock(peerhash_mtx);
}
-int peer_keepalives_stop(void **result)
+int peer_keepalives_stop(void **result, struct frr_pthread *fpt)
{
- struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES);
bgp_keepalives_thread_run = false;
peer_keepalives_wake();
pthread_join(fpt->thread, result);
diff --git a/bgpd/bgp_keepalives.h b/bgpd/bgp_keepalives.h
index d74b69e90..602b0da77 100644
--- a/bgpd/bgp_keepalives.h
+++ b/bgpd/bgp_keepalives.h
@@ -24,6 +24,7 @@
#ifndef _BGP_KEEPALIVES_H_
#define _BGP_KEEPALIVES_H_
+#include "frr_pthread.h"
#include "bgpd.h"
/* Thread control flag.
@@ -88,6 +89,6 @@ extern void *peer_keepalives_start(void *arg);
extern void peer_keepalives_wake(void);
/* stop function */
-int peer_keepalives_stop(void **result);
+int peer_keepalives_stop(void **result, struct frr_pthread *fpt);
#endif /* _BGP_KEEPALIVES_H */
diff --git a/bgpd/bgp_packet.c b/bgpd/bgp_packet.c
index a89d72cc6..c21e3cbe3 100644
--- a/bgpd/bgp_packet.c
+++ b/bgpd/bgp_packet.c
@@ -59,16 +59,6 @@
#include "bgpd/bgp_label.h"
#include "bgpd/bgp_io.h"
-/* Linked list of active peers */
-static pthread_mutex_t *plist_mtx;
-static pthread_cond_t *write_cond;
-static struct list *plist;
-
-/* periodically scheduled thread to generate update-group updates */
-static struct thread *t_generate_updgrp_packets;
-
-bool bgp_packet_writes_thread_run = false;
-
/* Set up BGP packet marker and packet type. */
int bgp_packet_set_marker(struct stream *s, u_char type)
{
@@ -107,11 +97,9 @@ int bgp_packet_set_size(struct stream *s)
*/
static void bgp_packet_add(struct peer *peer, struct stream *s)
{
- pthread_mutex_lock(&peer->obuf_mtx);
+ pthread_mutex_lock(&peer->io_mtx);
stream_fifo_push(peer->obuf, s);
- pthread_mutex_unlock(&peer->obuf_mtx);
-
- peer_writes_wake();
+ pthread_mutex_unlock(&peer->io_mtx);
}
static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi,
@@ -165,7 +153,6 @@ static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi,
int bgp_generate_updgrp_packets(struct thread *thread)
{
struct peer *peer = THREAD_ARG(thread);
- peer->t_generate_updgrp_packets = NULL;
struct stream *s;
struct peer_af *paf;
@@ -237,10 +224,13 @@ int bgp_generate_updgrp_packets(struct thread *thread)
if ((s = bgp_update_packet_eor(
peer, afi,
- safi)))
+ safi))) {
bgp_packet_add(
peer,
s);
+ bgp_writes_on(
+ peer);
+ }
}
}
continue;
@@ -252,6 +242,7 @@ int bgp_generate_updgrp_packets(struct thread *thread)
* attributes from peer and advance peer */
s = bpacket_reformat_for_peer(next_pkt, paf);
bgp_packet_add(peer, s);
+ bgp_writes_on(peer);
bpacket_queue_advance_peer(paf);
}
} while (s);
@@ -282,6 +273,8 @@ void bgp_keepalive_send(struct peer *peer)
/* Add packet to the peer. */
bgp_packet_add(peer, s);
+
+ bgp_writes_on(peer);
}
/*
@@ -335,6 +328,67 @@ void bgp_open_send(struct peer *peer)
/* Add packet to the peer. */
bgp_packet_add(peer, s);
+
+ bgp_writes_on(peer);
+}
+
+/* This is only for sending NOTIFICATION message to neighbor. */
+static int bgp_write_notify(struct peer *peer)
+{
+ int ret, val;
+ u_char type;
+ struct stream *s;
+
+ pthread_mutex_lock(&peer->io_mtx);
+ {
+ /* There should be at least one packet. */
+ s = stream_fifo_pop(peer->obuf);
+ if (!s)
+ return 0;
+ assert(stream_get_endp(s) >= BGP_HEADER_SIZE);
+ }
+ pthread_mutex_unlock(&peer->io_mtx);
+
+ /* Stop collecting data within the socket */
+ sockopt_cork(peer->fd, 0);
+
+ /* socket is in nonblocking mode, if we can't deliver the NOTIFY, well,
+ * we only care about getting a clean shutdown at this point. */
+ ret = write(peer->fd, STREAM_DATA(s), stream_get_endp(s));
+
+ /* only connection reset/close gets counted as TCP_fatal_error, failure
+ * to write the entire NOTIFY doesn't get different FSM treatment */
+ if (ret <= 0) {
+ BGP_EVENT_ADD(peer, TCP_fatal_error);
+ return 0;
+ }
+
+ /* Disable Nagle, make NOTIFY packet go out right away */
+ val = 1;
+ (void)setsockopt(peer->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val,
+ sizeof(val));
+
+ /* Retrieve BGP packet type. */
+ stream_set_getp(s, BGP_MARKER_SIZE + 2);
+ type = stream_getc(s);
+
+ assert(type == BGP_MSG_NOTIFY);
+
+ /* Type should be notify. */
+ peer->notify_out++;
+
+ /* Double start timer. */
+ peer->v_start *= 2;
+
+ /* Overflow check. */
+ if (peer->v_start >= (60 * 2))
+ peer->v_start = (60 * 2);
+
+ /* Handle Graceful Restart case where the state changes to
+ Connect instead of Idle */
+ BGP_EVENT_ADD(peer, BGP_Stop);
+
+ return 0;
}
/*
@@ -372,10 +426,12 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
/* Set BGP packet length. */
length = bgp_packet_set_size(s);
- /* Add packet to the peer. */
- pthread_mutex_lock(&peer->obuf_mtx);
- stream_fifo_clean(peer->obuf);
- pthread_mutex_unlock(&peer->obuf_mtx);
+ /* wipe output buffer */
+ pthread_mutex_lock(&peer->io_mtx);
+ {
+ stream_fifo_clean(peer->obuf);
+ }
+ pthread_mutex_unlock(&peer->io_mtx);
/* For debug */
{
@@ -428,8 +484,8 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
/* Add packet to peer's output queue */
bgp_packet_add(peer, s);
- /* Wake up the write thread to get the notify out ASAP */
- peer_writes_wake();
+
+ bgp_write_notify(peer);
}
/*
@@ -544,6 +600,8 @@ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi,
/* Add packet to the peer. */
bgp_packet_add(peer, s);
+
+ bgp_writes_on(peer);
}
/*
@@ -593,6 +651,8 @@ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi,
/* Add packet to the peer. */
bgp_packet_add(peer, s);
+
+ bgp_writes_on(peer);
}
/* RFC1771 6.8 Connection collision detection. */
@@ -696,13 +756,13 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
u_int16_t *holdtime_ptr;
/* Parse open packet. */
- version = stream_getc(peer->ibuf);
- memcpy(notify_data_remote_as, stream_pnt(peer->ibuf), 2);
- remote_as = stream_getw(peer->ibuf);
- holdtime_ptr = (u_int16_t *)stream_pnt(peer->ibuf);
- holdtime = stream_getw(peer->ibuf);
- memcpy(notify_data_remote_id, stream_pnt(peer->ibuf), 4);
- remote_id.s_addr = stream_get_ipv4(peer->ibuf);
+ version = stream_getc(peer->curr);
+ memcpy(notify_data_remote_as, stream_pnt(peer->curr), 2);
+ remote_as = stream_getw(peer->curr);
+ holdtime_ptr = (u_int16_t *)stream_pnt(peer->curr);
+ holdtime = stream_getw(peer->curr);
+ memcpy(notify_data_remote_id, stream_pnt(peer->curr), 4);
+ remote_id.s_addr = stream_get_ipv4(peer->curr);
/* Receive OPEN message log */
if (bgp_debug_neighbor_events(peer))
@@ -714,11 +774,11 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
/* BEGIN to read the capability here, but dont do it yet */
mp_capability = 0;
- optlen = stream_getc(peer->ibuf);
+ optlen = stream_getc(peer->curr);
if (optlen != 0) {
/* If not enough bytes, it is an error. */
- if (STREAM_READABLE(peer->ibuf) < optlen) {
+ if (STREAM_READABLE(peer->curr) < optlen) {
bgp_notify_send(peer, BGP_NOTIFY_OPEN_ERR,
BGP_NOTIFY_OPEN_MALFORMED_ATTR);
return -1;
@@ -990,10 +1050,6 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
return (ret);
}
- peer->packet_size = 0;
- if (peer->ibuf)
- stream_reset(peer->ibuf);
-
return 0;
}
@@ -1177,7 +1233,7 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
memset(peer->rcvd_attr_str, 0, BUFSIZ);
peer->rcvd_attr_printed = 0;
- s = peer->ibuf;
+ s = peer->curr;
end = stream_pnt(s) + size;
/* RFC1771 6.3 If the Unfeasible Routes Length or Total Attribute
@@ -1424,8 +1480,8 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
peer->notify.length = 0;
}
- bgp_notify.code = stream_getc(peer->ibuf);
- bgp_notify.subcode = stream_getc(peer->ibuf);
+ bgp_notify.code = stream_getc(peer->curr);
+ bgp_notify.subcode = stream_getc(peer->curr);
bgp_notify.length = size - 2;
bgp_notify.data = NULL;
@@ -1436,7 +1492,7 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
if (bgp_notify.length) {
peer->notify.length = size - 2;
peer->notify.data = XMALLOC(MTYPE_TMP, size - 2);
- memcpy(peer->notify.data, stream_pnt(peer->ibuf), size - 2);
+ memcpy(peer->notify.data, stream_pnt(peer->curr), size - 2);
}
/* For debug */
@@ -1451,12 +1507,12 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
for (i = 0; i < bgp_notify.length; i++)
if (first) {
sprintf(c, " %02x",
- stream_getc(peer->ibuf));
+ stream_getc(peer->curr));
strcat(bgp_notify.data, c);
} else {
first = 1;
sprintf(c, "%02x",
- stream_getc(peer->ibuf));
+ stream_getc(peer->curr));
strcpy(bgp_notify.data, c);
}
bgp_notify.raw_data = (u_char *)peer->notify.data;
@@ -1526,7 +1582,7 @@ static void bgp_route_refresh_receive(struct peer *peer, bgp_size_t size)
return;
}
- s = peer->ibuf;
+ s = peer->curr;
/* Parse packet. */
pkt_afi = stream_getw(s);
@@ -1874,7 +1930,7 @@ int bgp_capability_receive(struct peer *peer, bgp_size_t size)
u_char *pnt;
/* Fetch pointer. */
- pnt = stream_pnt(peer->ibuf);
+ pnt = stream_pnt(peer->curr);
if (bgp_debug_neighbor_events(peer))
zlog_debug("%s rcv CAPABILITY", peer->host);
@@ -1902,188 +1958,50 @@ int bgp_capability_receive(struct peer *peer, bgp_size_t size)
return bgp_capability_msg_parse(peer, pnt, size);
}
-/* BGP read utility function. */
-static int bgp_read_packet(struct peer *peer)
+/* Starting point of packet process function. */
+int bgp_process_packet(struct thread *thread)
{
- int nbytes;
- int readsize;
-
- readsize = peer->packet_size - stream_get_endp(peer->ibuf);
+ /* Yes first of all get peer pointer. */
+ struct peer *peer;
+ peer = THREAD_ARG(thread);
- /* If size is zero then return. */
- if (!readsize)
+ /* Guard against scheduled events that occur after peer deletion. */
+ if (peer->status == Deleted)
return 0;
- /* Read packet from fd. */
- nbytes = stream_read_try(peer->ibuf, peer->fd, readsize);
-
- /* If read byte is smaller than zero then error occured. */
- if (nbytes < 0) {
- /* Transient error should retry */
- if (nbytes == -2)
- return -1;
-
- zlog_err("%s [Error] bgp_read_packet error: %s", peer->host,
- safe_strerror(errno));
-
- if (peer->status == Established) {
- if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
- peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
- SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
- } else
- peer->last_reset = PEER_DOWN_CLOSE_SESSION;
- }
-
- BGP_EVENT_ADD(peer, TCP_fatal_error);
- return -1;
- }
-
- /* When read byte is zero : clear bgp peer and return */
- if (nbytes == 0) {
- if (bgp_debug_neighbor_events(peer))
- zlog_debug("%s [Event] BGP connection closed fd %d",
- peer->host, peer->fd);
-
- if (peer->status == Established) {
- if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
- peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
- SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
- } else
- peer->last_reset = PEER_DOWN_CLOSE_SESSION;
- }
-
- BGP_EVENT_ADD(peer, TCP_connection_closed);
- return -1;
- }
-
- /* We read partial packet. */
- if (stream_get_endp(peer->ibuf) != peer->packet_size)
- return -1;
-
- return 0;
-}
-
-/* Marker check. */
-static int bgp_marker_all_one(struct stream *s, int length)
-{
- int i;
-
- for (i = 0; i < length; i++)
- if (s->data[i] != 0xff)
- return 0;
-
- return 1;
-}
-
-/* Starting point of packet process function. */
-int bgp_read(struct thread *thread)
-{
- int ret;
u_char type = 0;
- struct peer *peer;
bgp_size_t size;
char notify_data_length[2];
u_int32_t notify_out;
- /* Yes first of all get peer pointer. */
- peer = THREAD_ARG(thread);
- peer->t_read = NULL;
-
/* Note notify_out so we can check later to see if we sent another one
*/
notify_out = peer->notify_out;
- if (peer->fd < 0) {
- zlog_err("bgp_read(): peer's fd is negative value %d",
- peer->fd);
- return -1;
+ pthread_mutex_lock(&peer->io_mtx);
+ {
+ peer->curr = stream_fifo_pop(peer->ibuf);
}
+ pthread_mutex_unlock(&peer->io_mtx);
- BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
-
- /* Read packet header to determine type of the packet */
- if (peer->packet_size == 0)
- peer->packet_size = BGP_HEADER_SIZE;
-
- if (stream_get_endp(peer->ibuf) < BGP_HEADER_SIZE) {
- ret = bgp_read_packet(peer);
-
- /* Header read error or partial read packet. */
- if (ret < 0)
- goto done;
-
- /* Get size and type. */
- stream_forward_getp(peer->ibuf, BGP_MARKER_SIZE);
- memcpy(notify_data_length, stream_pnt(peer->ibuf), 2);
- size = stream_getw(peer->ibuf);
- type = stream_getc(peer->ibuf);
-
- /* Marker check */
- if (((type == BGP_MSG_OPEN) || (type == BGP_MSG_KEEPALIVE))
- && !bgp_marker_all_one(peer->ibuf, BGP_MARKER_SIZE)) {
- bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
- BGP_NOTIFY_HEADER_NOT_SYNC);
- goto done;
- }
-
- /* BGP type check. */
- if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
- && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
- && type != BGP_MSG_ROUTE_REFRESH_NEW
- && type != BGP_MSG_ROUTE_REFRESH_OLD
- && type != BGP_MSG_CAPABILITY) {
- if (bgp_debug_neighbor_events(peer))
- zlog_debug("%s unknown message type 0x%02x",
- peer->host, type);
- bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
- BGP_NOTIFY_HEADER_BAD_MESTYPE,
- &type, 1);
- goto done;
- }
- /* Mimimum packet length check. */
- if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
- || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
- || (type == BGP_MSG_UPDATE
- && size < BGP_MSG_UPDATE_MIN_SIZE)
- || (type == BGP_MSG_NOTIFY
- && size < BGP_MSG_NOTIFY_MIN_SIZE)
- || (type == BGP_MSG_KEEPALIVE
- && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
- || (type == BGP_MSG_ROUTE_REFRESH_NEW
- && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
- || (type == BGP_MSG_ROUTE_REFRESH_OLD
- && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
- || (type == BGP_MSG_CAPABILITY
- && size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
- if (bgp_debug_neighbor_events(peer))
- zlog_debug("%s bad message length - %d for %s",
- peer->host, size,
- type == 128
- ? "ROUTE-REFRESH"
- : bgp_type_str[(int)type]);
- bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
- BGP_NOTIFY_HEADER_BAD_MESLEN,
- (u_char *)notify_data_length,
- 2);
- goto done;
- }
+ if (peer->curr == NULL) // no packets to process, hmm...
+ return 0;
- /* Adjust size to message length. */
- peer->packet_size = size;
- }
+ bgp_size_t actual_size = stream_get_endp(peer->curr);
- ret = bgp_read_packet(peer);
- if (ret < 0)
- goto done;
+ /* skip the marker and copy the packet length */
+ stream_forward_getp(peer->curr, BGP_MARKER_SIZE);
+ memcpy(notify_data_length, stream_pnt(peer->curr), 2);
- /* Get size and type again. */
- (void)stream_getw_from(peer->ibuf, BGP_MARKER_SIZE);
- type = stream_getc_from(peer->ibuf, BGP_MARKER_SIZE + 2);
+ /* read in the packet length and type */
+ size = stream_getw(peer->curr);
+ type = stream_getc(peer->curr);
/* BGP packet dump function. */
- bgp_dump_packet(peer, type, peer->ibuf);
+ bgp_dump_packet(peer, type, peer->curr);
- size = (peer->packet_size - BGP_HEADER_SIZE);
+ /* adjust size to exclude the marker + length + type */
+ size -= BGP_HEADER_SIZE;
/* Read rest of the packet and call each sort of packet routine */
switch (type) {
@@ -2118,26 +2036,14 @@ int bgp_read(struct thread *thread)
* of the packet for troubleshooting purposes
*/
if (notify_out < peer->notify_out) {
- memcpy(peer->last_reset_cause, peer->ibuf->data,
- peer->packet_size);
- peer->last_reset_cause_size = peer->packet_size;
- notify_out = peer->notify_out;
+ memcpy(peer->last_reset_cause, peer->curr->data, actual_size);
+ peer->last_reset_cause_size = actual_size;
}
- /* Clear input buffer. */
- peer->packet_size = 0;
- if (peer->ibuf)
- stream_reset(peer->ibuf);
-
-done:
- /* If reading this packet caused us to send a NOTIFICATION then store a
- * copy
- * of the packet for troubleshooting purposes
- */
- if (notify_out < peer->notify_out) {
- memcpy(peer->last_reset_cause, peer->ibuf->data,
- peer->packet_size);
- peer->last_reset_cause_size = peer->packet_size;
+ /* Delete packet and carry on. */
+ if (peer->curr) {
+ stream_free(peer->curr);
+ peer->curr = NULL;
}
return 0;
diff --git a/bgpd/bgp_packet.h b/bgpd/bgp_packet.h
index d7080d7fb..502dbbdee 100644
--- a/bgpd/bgp_packet.h
+++ b/bgpd/bgp_packet.h
@@ -38,8 +38,6 @@
#define ORF_COMMON_PART_DENY 0x20
/* Packet send and receive function prototypes. */
-extern int bgp_read(struct thread *);
-
extern void bgp_keepalive_send(struct peer *);
extern void bgp_open_send(struct peer *);
extern void bgp_notify_send(struct peer *, u_int8_t, u_int8_t);
@@ -68,5 +66,6 @@ extern int bgp_packet_set_size(struct stream *s);
extern bool bgp_packet_writes_thread_run;
extern int bgp_generate_updgrp_packets(struct thread *);
+extern int bgp_process_packet(struct thread *);
#endif /* _QUAGGA_BGP_PACKET_H */
diff --git a/bgpd/bgp_updgrp.h b/bgpd/bgp_updgrp.h
index a50bc05fe..3e503a8be 100644
--- a/bgpd/bgp_updgrp.h
+++ b/bgpd/bgp_updgrp.h
@@ -179,7 +179,7 @@ struct update_subgroup {
struct stream *work;
/* We use a separate stream to encode MP_REACH_NLRI for efficient
- * NLRI packing. peer->work stores all the other attributes. The
+ * NLRI packing. peer->obuf_work stores all the other attributes. The
* actual packet is then constructed by concatenating the two.
*/
struct stream *scratch;
diff --git a/bgpd/bgp_vty.c b/bgpd/bgp_vty.c
index af702ac85..095379060 100644
--- a/bgpd/bgp_vty.c
+++ b/bgpd/bgp_vty.c
@@ -7068,14 +7068,40 @@ static int bgp_show_summary(struct vty *vty, struct bgp *bgp, int afi, int safi,
vty_out(vty, "4 %10u %7u %7u %8" PRIu64 " %4d %4zd %8s",
peer->as,
- peer->open_in + peer->update_in
- + peer->keepalive_in + peer->notify_in
- + peer->refresh_in
- + peer->dynamic_cap_in,
- peer->open_out + peer->update_out
- + peer->keepalive_out + peer->notify_out
- + peer->refresh_out
- + peer->dynamic_cap_out,
+ atomic_load_explicit(&peer->open_in,
+ memory_order_relaxed)
+ + atomic_load_explicit(
+ &peer->update_in,
+ memory_order_relaxed)
+ + atomic_load_explicit(
+ &peer->keepalive_in,
+ memory_order_relaxed)
+ + atomic_load_explicit(
+ &peer->notify_in,
+ memory_order_relaxed)
+ + atomic_load_explicit(
+ &peer->refresh_in,
+ memory_order_relaxed)
+ + atomic_load_explicit(
+ &peer->dynamic_cap_in,
+ memory_order_relaxed),
+ atomic_load_explicit(&peer->open_out,
+ memory_order_relaxed)
+ + atomic_load_explicit(
+ &peer->update_out,
+ memory_order_relaxed)
+ + atomic_load_explicit(
+ &peer->keepalive_out,
+ memory_order_relaxed)
+ + atomic_load_explicit(
+ &peer->notify_out,
+ memory_order_relaxed)
+ + atomic_load_explicit(
+ &peer->refresh_out,
+ memory_order_relaxed)
+ + atomic_load_explicit(
+ &peer->dynamic_cap_out,
+ memory_order_relaxed),
peer->version[afi][safi], 0, peer->obuf->count,
peer_uptime(peer->uptime, timebuf,
BGP_UPTIME_LEN, 0, NULL));
diff --git a/bgpd/bgpd.c b/bgpd/bgpd.c
index cfe5d5c67..c034eda24 100644
--- a/bgpd/bgpd.c
+++ b/bgpd/bgpd.c
@@ -992,10 +992,14 @@ static void peer_free(struct peer *peer)
* but just to be sure..
*/
bgp_timer_set(peer);
- BGP_READ_OFF(peer->t_read);
- peer_writes_off(peer);
+ bgp_reads_off(peer);
+ bgp_writes_off(peer);
+ assert(!peer->t_write);
+ assert(!peer->t_read);
BGP_EVENT_FLUSH(peer);
+ pthread_mutex_destroy(&peer->io_mtx);
+
/* Free connected nexthop, if present */
if (CHECK_FLAG(peer->flags, PEER_FLAG_CONFIG_NODE)
&& !peer_dynamic_neighbor(peer))
@@ -1138,11 +1142,11 @@ struct peer *peer_new(struct bgp *bgp)
SET_FLAG(peer->sflags, PEER_STATUS_CAPABILITY_OPEN);
/* Create buffers. */
- peer->ibuf = stream_new(BGP_MAX_PACKET_SIZE);
+ peer->ibuf = stream_fifo_new();
peer->obuf = stream_fifo_new();
- pthread_mutex_init(&peer->obuf_mtx, NULL);
+ pthread_mutex_init(&peer->io_mtx, NULL);
- /* We use a larger buffer for peer->work in the event that:
+ /* We use a larger buffer for peer->obuf_work in the event that:
* - We RX a BGP_UPDATE where the attributes alone are just
* under BGP_MAX_PACKET_SIZE
* - The user configures an outbound route-map that does many as-path
@@ -1156,8 +1160,9 @@ struct peer *peer_new(struct bgp *bgp)
* bounds
* checking for every single attribute as we construct an UPDATE.
*/
- peer->work =
+ peer->obuf_work =
stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW);
+ peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE);
peer->scratch = stream_new(BGP_MAX_PACKET_SIZE);
@@ -2086,6 +2091,11 @@ int peer_delete(struct peer *peer)
bgp = peer->bgp;
accept_peer = CHECK_FLAG(peer->sflags, PEER_STATUS_ACCEPT_PEER);
+ bgp_reads_off(peer);
+ bgp_writes_off(peer);
+ assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON));
+ assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_READS_ON));
+
if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT))
peer_nsf_stop(peer);
@@ -2147,7 +2157,7 @@ int peer_delete(struct peer *peer)
/* Buffers. */
if (peer->ibuf) {
- stream_free(peer->ibuf);
+ stream_fifo_free(peer->ibuf);
peer->ibuf = NULL;
}
@@ -2156,9 +2166,14 @@ int peer_delete(struct peer *peer)
peer->obuf = NULL;
}
- if (peer->work) {
- stream_free(peer->work);
- peer->work = NULL;
+ if (peer->ibuf_work) {
+ stream_free(peer->ibuf_work);
+ peer->ibuf_work = NULL;
+ }
+
+ if (peer->obuf_work) {
+ stream_free(peer->obuf_work);
+ peer->obuf_work = NULL;
}
if (peer->scratch) {
@@ -7389,20 +7404,24 @@ void bgp_pthreads_init()
{
frr_pthread_init();
- frr_pthread_new("BGP write thread", PTHREAD_WRITE, peer_writes_start,
- peer_writes_stop);
+ frr_pthread_new("BGP i/o thread", PTHREAD_IO, bgp_io_start,
+ bgp_io_stop);
frr_pthread_new("BGP keepalives thread", PTHREAD_KEEPALIVES,
peer_keepalives_start, peer_keepalives_stop);
/* pre-run initialization */
peer_keepalives_init();
- peer_writes_init();
+ bgp_io_init();
}
void bgp_pthreads_run()
{
- frr_pthread_run(PTHREAD_WRITE, NULL, NULL);
- frr_pthread_run(PTHREAD_KEEPALIVES, NULL, NULL);
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
+
+ frr_pthread_run(PTHREAD_IO, &attr, NULL);
+ frr_pthread_run(PTHREAD_KEEPALIVES, &attr, NULL);
}
void bgp_pthreads_finish()
diff --git a/bgpd/bgpd.h b/bgpd/bgpd.h
index 4fb784e24..208cd897d 100644
--- a/bgpd/bgpd.h
+++ b/bgpd/bgpd.h
@@ -101,7 +101,7 @@ struct bgp_master {
struct thread_master *master;
/* BGP pthreads. */
-#define PTHREAD_WRITE (1 << 1)
+#define PTHREAD_IO (1 << 1)
#define PTHREAD_KEEPALIVES (1 << 2)
/* work queues */
@@ -589,13 +589,17 @@ struct peer {
struct in_addr local_id;
/* Packet receive and send buffer. */
- struct stream *ibuf;
- pthread_mutex_t obuf_mtx;
- struct stream_fifo *obuf;
- struct stream *work;
+ pthread_mutex_t io_mtx; // guards ibuf, obuf
+ struct stream_fifo *ibuf; // packets waiting to be processed
+ struct stream_fifo *obuf; // packets waiting to be written
+
+ struct stream *ibuf_work; // WiP buffer used by bgp_read() only
+ struct stream *obuf_work; // WiP buffer used to construct packets
+
+ struct stream *curr; // the current packet being parsed
/* We use a separate stream to encode MP_REACH_NLRI for efficient
- * NLRI packing. peer->work stores all the other attributes. The
+ * NLRI packing. peer->obuf_work stores all the other attributes. The
* actual packet is then constructed by concatenating the two.
*/
struct stream *scratch;
@@ -799,7 +803,9 @@ struct peer {
/* Threads. */
struct thread *t_read;
+ struct thread *t_write;
struct thread *t_start;
+ struct thread *t_connect_check;
struct thread *t_connect;
struct thread *t_holdtime;
struct thread *t_routeadv;
@@ -807,11 +813,13 @@ struct peer {
struct thread *t_gr_restart;
struct thread *t_gr_stale;
struct thread *t_generate_updgrp_packets;
+ struct thread *t_process_packet;
/* Thread flags. */
u_int16_t thread_flags;
-#define PEER_THREAD_WRITES_ON (1 << 0)
-#define PEER_THREAD_KEEPALIVES_ON (1 << 1)
+#define PEER_THREAD_WRITES_ON (1 << 1)
+#define PEER_THREAD_READS_ON (1 << 2)
+#define PEER_THREAD_KEEPALIVES_ON (1 << 3)
/* workqueues */
struct work_queue *clear_node_queue;
@@ -853,9 +861,6 @@ struct peer {
/* Notify data. */
struct bgp_notify notify;
- /* Whole packet size to be read. */
- unsigned long packet_size;
-
/* Filter structure. */
struct bgp_filter filter[AFI_MAX][SAFI_MAX];
@@ -1149,7 +1154,7 @@ enum bgp_clear_type {
};
/* Macros. */
-#define BGP_INPUT(P) ((P)->ibuf)
+#define BGP_INPUT(P) ((P)->curr)
#define BGP_INPUT_PNT(P) (STREAM_PNT(BGP_INPUT(P)))
#define BGP_IS_VALID_STATE_FOR_NOTIF(S) \
(((S) == OpenSent) || ((S) == OpenConfirm) || ((S) == Established))
diff --git a/bgpd/rfapi/rfapi.c b/bgpd/rfapi/rfapi.c
index 15a29442f..1c342a2ff 100644
--- a/bgpd/rfapi/rfapi.c
+++ b/bgpd/rfapi/rfapi.c
@@ -1304,18 +1304,29 @@ static int rfapi_open_inner(struct rfapi_descriptor *rfd, struct bgp *bgp,
rfd->peer = peer_new(bgp);
rfd->peer->status = Established; /* keep bgp core happy */
bgp_sync_delete(rfd->peer); /* don't need these */
- if (rfd->peer->ibuf) {
- stream_free(rfd->peer->ibuf); /* don't need it */
+
+ // since this peer is not on the I/O thread, this lock is not strictly
+ // necessary, but serves as a reminder to those who may meddle...
+ pthread_mutex_lock(&rfd->peer->io_mtx);
+ {
+ // we don't need any I/O related facilities
+ if (rfd->peer->ibuf)
+ stream_fifo_free(rfd->peer->ibuf);
+ if (rfd->peer->obuf)
+ stream_fifo_free(rfd->peer->obuf);
+
+ if (rfd->peer->ibuf_work)
+ stream_free(rfd->peer->ibuf_work);
+ if (rfd->peer->obuf_work)
+ stream_free(rfd->peer->obuf_work);
+
rfd->peer->ibuf = NULL;
- }
- if (rfd->peer->obuf) {
- stream_fifo_free(rfd->peer->obuf); /* don't need it */
rfd->peer->obuf = NULL;
+ rfd->peer->obuf_work = NULL;
+ rfd->peer->ibuf_work = NULL;
}
- if (rfd->peer->work) {
- stream_free(rfd->peer->work); /* don't need it */
- rfd->peer->work = NULL;
- }
+ pthread_mutex_unlock(&rfd->peer->io_mtx);
+
{ /* base code assumes have valid host pointer */
char buf[BUFSIZ];
buf[0] = 0;
diff --git a/bgpd/rfapi/vnc_zebra.c b/bgpd/rfapi/vnc_zebra.c
index 5c71df238..5f7989238 100644
--- a/bgpd/rfapi/vnc_zebra.c
+++ b/bgpd/rfapi/vnc_zebra.c
@@ -183,22 +183,31 @@ static void vnc_redistribute_add(struct prefix *p, u_int32_t metric,
vncHD1VR.peer->status =
Established; /* keep bgp core happy */
bgp_sync_delete(vncHD1VR.peer); /* don't need these */
- if (vncHD1VR.peer->ibuf) {
- stream_free(vncHD1VR.peer
- ->ibuf); /* don't need it */
+
+ // since this peer is not on the I/O thread, this lock
+ // is not strictly
+ // necessary, but serves as a reminder to those who may
+ // meddle...
+ pthread_mutex_lock(&vncHD1VR.peer->io_mtx);
+ {
+ // we don't need any I/O related facilities
+ if (vncHD1VR.peer->ibuf)
+ stream_fifo_free(vncHD1VR.peer->ibuf);
+ if (vncHD1VR.peer->obuf)
+ stream_fifo_free(vncHD1VR.peer->obuf);
+
+ if (vncHD1VR.peer->ibuf_work)
+ stream_free(vncHD1VR.peer->ibuf_work);
+ if (vncHD1VR.peer->obuf_work)
+ stream_free(vncHD1VR.peer->obuf_work);
+
vncHD1VR.peer->ibuf = NULL;
- }
- if (vncHD1VR.peer->obuf) {
- stream_fifo_free(
- vncHD1VR.peer
- ->obuf); /* don't need it */
vncHD1VR.peer->obuf = NULL;
+ vncHD1VR.peer->obuf_work = NULL;
+ vncHD1VR.peer->ibuf_work = NULL;
}
- if (vncHD1VR.peer->work) {
- stream_free(vncHD1VR.peer
- ->work); /* don't need it */
- vncHD1VR.peer->work = NULL;
- }
+ pthread_mutex_unlock(&vncHD1VR.peer->io_mtx);
+
/* base code assumes have valid host pointer */
vncHD1VR.peer->host =
XSTRDUP(MTYPE_BGP_PEER_HOST, ".zebra.");