diff options
author | Quentin Young <qlyoung@cumulusnetworks.com> | 2017-05-02 02:37:45 +0200 |
---|---|---|
committer | Quentin Young <qlyoung@cumulusnetworks.com> | 2017-11-30 22:17:59 +0100 |
commit | 424ab01d0f69a71b865e5f2d817baea7ce263e44 (patch) | |
tree | 82f06cf1dae116f6db05cd5409475a6f8b6dcf1e /bgpd | |
parent | bgpd: move bgp i/o to a separate source file (diff) | |
download | frr-424ab01d0f69a71b865e5f2d817baea7ce263e44.tar.xz frr-424ab01d0f69a71b865e5f2d817baea7ce263e44.zip |
bgpd: implement buffered reads
* Move and modify all network input related code to bgp_io.c
* Add a real input buffer to `struct peer`
* Move connection initialization to its own thread.c task instead of
piggybacking off of bgp_read()
* Tons of little fixups
Primary changes are in bgp_packet.[ch], bgp_io.[ch], bgp_fsm.[ch].
Changes made elsewhere are almost exclusively refactoring peer->ibuf to
peer->curr since peer->ibuf is now the true FIFO packet input buffer
while peer->curr represents the packet currently being processed by the
main pthread.
Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
Diffstat (limited to 'bgpd')
-rw-r--r-- | bgpd/bgp_attr.c | 68 | ||||
-rw-r--r-- | bgpd/bgp_fsm.c | 129 | ||||
-rw-r--r-- | bgpd/bgp_fsm.h | 18 | ||||
-rw-r--r-- | bgpd/bgp_io.c | 531 | ||||
-rw-r--r-- | bgpd/bgp_io.h | 77 | ||||
-rw-r--r-- | bgpd/bgp_keepalives.c | 58 | ||||
-rw-r--r-- | bgpd/bgp_keepalives.h | 3 | ||||
-rw-r--r-- | bgpd/bgp_packet.c | 350 | ||||
-rw-r--r-- | bgpd/bgp_packet.h | 3 | ||||
-rw-r--r-- | bgpd/bgp_updgrp.h | 2 | ||||
-rw-r--r-- | bgpd/bgp_vty.c | 42 | ||||
-rw-r--r-- | bgpd/bgpd.c | 49 | ||||
-rw-r--r-- | bgpd/bgpd.h | 29 | ||||
-rw-r--r-- | bgpd/rfapi/rfapi.c | 29 | ||||
-rw-r--r-- | bgpd/rfapi/vnc_zebra.c | 35 |
15 files changed, 864 insertions, 559 deletions
diff --git a/bgpd/bgp_attr.c b/bgpd/bgp_attr.c index 6ddb2ec8a..859158e3c 100644 --- a/bgpd/bgp_attr.c +++ b/bgpd/bgp_attr.c @@ -1156,7 +1156,7 @@ static int bgp_attr_aspath(struct bgp_attr_parser_args *args) * peer with AS4 => will get 4Byte ASnums * otherwise, will get 16 Bit */ - attr->aspath = aspath_parse(peer->ibuf, length, + attr->aspath = aspath_parse(peer->curr, length, CHECK_FLAG(peer->cap, PEER_CAP_AS4_RCV)); /* In case of IBGP, length will be zero. */ @@ -1230,7 +1230,7 @@ static int bgp_attr_as4_path(struct bgp_attr_parser_args *args, struct attr *const attr = args->attr; const bgp_size_t length = args->length; - *as4_path = aspath_parse(peer->ibuf, length, 1); + *as4_path = aspath_parse(peer->curr, length, 1); /* In case of IBGP, length will be zero. */ if (!*as4_path) { @@ -1271,7 +1271,7 @@ static bgp_attr_parse_ret_t bgp_attr_nexthop(struct bgp_attr_parser_args *args) logged locally (this is implemented somewhere else). The UPDATE message gets ignored in any of these cases. */ - nexthop_n = stream_get_ipv4(peer->ibuf); + nexthop_n = stream_get_ipv4(peer->curr); nexthop_h = ntohl(nexthop_n); if ((IPV4_NET0(nexthop_h) || IPV4_NET127(nexthop_h) || IPV4_CLASS_DE(nexthop_h)) @@ -1307,7 +1307,7 @@ static bgp_attr_parse_ret_t bgp_attr_med(struct bgp_attr_parser_args *args) args->total); } - attr->med = stream_getl(peer->ibuf); + attr->med = stream_getl(peer->curr); attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_MULTI_EXIT_DISC); @@ -1333,11 +1333,11 @@ bgp_attr_local_pref(struct bgp_attr_parser_args *args) external peer, then this attribute MUST be ignored by the receiving speaker. */ if (peer->sort == BGP_PEER_EBGP) { - stream_forward_getp(peer->ibuf, length); + stream_forward_getp(peer->curr, length); return BGP_ATTR_PARSE_PROCEED; } - attr->local_pref = stream_getl(peer->ibuf); + attr->local_pref = stream_getl(peer->curr); /* Set the local-pref flag. */ attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_LOCAL_PREF); @@ -1386,10 +1386,10 @@ static int bgp_attr_aggregator(struct bgp_attr_parser_args *args) } if (CHECK_FLAG(peer->cap, PEER_CAP_AS4_RCV)) - attr->aggregator_as = stream_getl(peer->ibuf); + attr->aggregator_as = stream_getl(peer->curr); else - attr->aggregator_as = stream_getw(peer->ibuf); - attr->aggregator_addr.s_addr = stream_get_ipv4(peer->ibuf); + attr->aggregator_as = stream_getw(peer->curr); + attr->aggregator_addr.s_addr = stream_get_ipv4(peer->curr); /* Set atomic aggregate flag. */ attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_AGGREGATOR); @@ -1413,8 +1413,8 @@ bgp_attr_as4_aggregator(struct bgp_attr_parser_args *args, 0); } - *as4_aggregator_as = stream_getl(peer->ibuf); - as4_aggregator_addr->s_addr = stream_get_ipv4(peer->ibuf); + *as4_aggregator_as = stream_getl(peer->curr); + as4_aggregator_addr->s_addr = stream_get_ipv4(peer->curr); attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_AS4_AGGREGATOR); @@ -1540,10 +1540,10 @@ bgp_attr_community(struct bgp_attr_parser_args *args) } attr->community = - community_parse((u_int32_t *)stream_pnt(peer->ibuf), length); + community_parse((u_int32_t *)stream_pnt(peer->curr), length); /* XXX: fix community_parse to use stream API and remove this */ - stream_forward_getp(peer->ibuf, length); + stream_forward_getp(peer->curr, length); if (!attr->community) return bgp_attr_malformed(args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR, @@ -1570,7 +1570,7 @@ bgp_attr_originator_id(struct bgp_attr_parser_args *args) args->total); } - attr->originator_id.s_addr = stream_get_ipv4(peer->ibuf); + attr->originator_id.s_addr = stream_get_ipv4(peer->curr); attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_ORIGINATOR_ID); @@ -1594,10 +1594,10 @@ bgp_attr_cluster_list(struct bgp_attr_parser_args *args) } attr->cluster = - cluster_parse((struct in_addr *)stream_pnt(peer->ibuf), length); + cluster_parse((struct in_addr *)stream_pnt(peer->curr), length); /* XXX: Fix cluster_parse to use stream API and then remove this */ - stream_forward_getp(peer->ibuf, length); + stream_forward_getp(peer->curr, length); attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_CLUSTER_LIST); @@ -1778,7 +1778,7 @@ int bgp_mp_unreach_parse(struct bgp_attr_parser_args *args, struct attr *const attr = args->attr; const bgp_size_t length = args->length; - s = peer->ibuf; + s = peer->curr; #define BGP_MP_UNREACH_MIN_SIZE 3 if ((length > STREAM_READABLE(s)) || (length < BGP_MP_UNREACH_MIN_SIZE)) @@ -1832,9 +1832,9 @@ bgp_attr_large_community(struct bgp_attr_parser_args *args) } attr->lcommunity = - lcommunity_parse((u_int8_t *)stream_pnt(peer->ibuf), length); + lcommunity_parse((u_int8_t *)stream_pnt(peer->curr), length); /* XXX: fix ecommunity_parse to use stream API */ - stream_forward_getp(peer->ibuf, length); + stream_forward_getp(peer->curr, length); if (!attr->lcommunity) return bgp_attr_malformed(args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR, @@ -1861,9 +1861,9 @@ bgp_attr_ext_communities(struct bgp_attr_parser_args *args) } attr->ecommunity = - ecommunity_parse((u_int8_t *)stream_pnt(peer->ibuf), length); + ecommunity_parse((u_int8_t *)stream_pnt(peer->curr), length); /* XXX: fix ecommunity_parse to use stream API */ - stream_forward_getp(peer->ibuf, length); + stream_forward_getp(peer->curr, length); if (!attr->ecommunity) return bgp_attr_malformed(args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR, @@ -1957,7 +1957,7 @@ static int bgp_attr_encap(uint8_t type, struct peer *peer, /* IN */ + sublength); tlv->type = subtype; tlv->length = sublength; - stream_get(tlv->value, peer->ibuf, sublength); + stream_get(tlv->value, peer->curr, sublength); length -= sublength; /* attach tlv to encap chain */ @@ -2025,8 +2025,8 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args, attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_PREFIX_SID); - type = stream_getc(peer->ibuf); - length = stream_getw(peer->ibuf); + type = stream_getc(peer->curr); + length = stream_getw(peer->curr); if (type == BGP_PREFIX_SID_LABEL_INDEX) { if (length != BGP_PREFIX_SID_LABEL_INDEX_LENGTH) { @@ -2039,11 +2039,11 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args, } /* Ignore flags and reserved */ - stream_getc(peer->ibuf); - stream_getw(peer->ibuf); + stream_getc(peer->curr); + stream_getw(peer->curr); /* Fetch the label index and see if it is valid. */ - label_index = stream_getl(peer->ibuf); + label_index = stream_getl(peer->curr); if (label_index == BGP_INVALID_LABEL_INDEX) return bgp_attr_malformed( args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR, @@ -2074,16 +2074,16 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args, } /* Ignore reserved */ - stream_getc(peer->ibuf); - stream_getw(peer->ibuf); + stream_getc(peer->curr); + stream_getw(peer->curr); - stream_get(&ipv6_sid, peer->ibuf, 16); + stream_get(&ipv6_sid, peer->curr, 16); } /* Placeholder code for the Originator SRGB type */ else if (type == BGP_PREFIX_SID_ORIGINATOR_SRGB) { /* Ignore flags */ - stream_getw(peer->ibuf); + stream_getw(peer->curr); length -= 2; @@ -2099,8 +2099,8 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args, srgb_count = length / BGP_PREFIX_SID_ORIGINATOR_SRGB_LENGTH; for (int i = 0; i < srgb_count; i++) { - stream_get(&srgb_base, peer->ibuf, 3); - stream_get(&srgb_range, peer->ibuf, 3); + stream_get(&srgb_base, peer->curr, 3); + stream_get(&srgb_range, peer->curr, 3); } } @@ -2125,7 +2125,7 @@ static bgp_attr_parse_ret_t bgp_attr_unknown(struct bgp_attr_parser_args *args) peer->host, type, length); /* Forward read pointer of input stream. */ - stream_forward_getp(peer->ibuf, length); + stream_forward_getp(peer->curr, length); /* If any of the mandatory well-known attributes are not recognized, then the Error Subcode is set to Unrecognized Well-known diff --git a/bgpd/bgp_fsm.c b/bgpd/bgp_fsm.c index 95e2f157c..18262e6e9 100644 --- a/bgpd/bgp_fsm.c +++ b/bgpd/bgp_fsm.c @@ -126,35 +126,61 @@ static struct peer *peer_xfer_conn(struct peer *from_peer) from_peer->host, from_peer, from_peer->fd, peer, peer->fd); - peer_writes_off(peer); - BGP_READ_OFF(peer->t_read); - peer_writes_off(from_peer); - BGP_READ_OFF(from_peer->t_read); + bgp_writes_off(peer); + bgp_reads_off(peer); + bgp_writes_off(from_peer); + bgp_reads_off(from_peer); BGP_TIMER_OFF(peer->t_routeadv); + BGP_TIMER_OFF(peer->t_connect); + BGP_TIMER_OFF(peer->t_connect_check); BGP_TIMER_OFF(from_peer->t_routeadv); - - fd = peer->fd; - peer->fd = from_peer->fd; - from_peer->fd = fd; - stream_reset(peer->ibuf); + BGP_TIMER_OFF(from_peer->t_connect); + BGP_TIMER_OFF(from_peer->t_connect_check); // At this point in time, it is possible that there are packets pending // on - // from_peer->obuf. These need to be transferred to the new peer struct. - pthread_mutex_lock(&peer->obuf_mtx); - pthread_mutex_lock(&from_peer->obuf_mtx); + // various buffers. Those need to be transferred or dropped, otherwise + // we'll + // get spurious failures during session establishment. + pthread_mutex_lock(&peer->io_mtx); + pthread_mutex_lock(&from_peer->io_mtx); { - // wipe new peer's packet queue + fd = peer->fd; + peer->fd = from_peer->fd; + from_peer->fd = fd; + + stream_fifo_clean(peer->ibuf); stream_fifo_clean(peer->obuf); + stream_reset(peer->ibuf_work); - // copy each packet from old peer's queue to new peer's queue + // this should never happen, since bgp_process_packet() is the + // only task + // that sets and unsets the current packet and it runs in our + // pthread. + if (peer->curr) { + zlog_err( + "[%s] Dropping pending packet on connection transfer:", + peer->host); + u_int16_t type = stream_getc_from(peer->curr, + BGP_MARKER_SIZE + 2); + bgp_dump_packet(peer, type, peer->curr); + stream_free(peer->curr); + peer->curr = NULL; + } + + // copy each packet from old peer's output queue to new peer while (from_peer->obuf->head) stream_fifo_push(peer->obuf, stream_fifo_pop(from_peer->obuf)); + + // copy each packet from old peer's input queue to new peer + while (from_peer->ibuf->head) + stream_fifo_push(peer->ibuf, + stream_fifo_pop(from_peer->ibuf)); } - pthread_mutex_unlock(&from_peer->obuf_mtx); - pthread_mutex_unlock(&peer->obuf_mtx); + pthread_mutex_unlock(&from_peer->io_mtx); + pthread_mutex_unlock(&peer->io_mtx); peer->as = from_peer->as; peer->v_holdtime = from_peer->v_holdtime; @@ -232,8 +258,8 @@ static struct peer *peer_xfer_conn(struct peer *from_peer) } } - BGP_READ_ON(peer->t_read, bgp_read, peer->fd); - peer_writes_on(peer); + bgp_reads_on(peer); + bgp_writes_on(peer); if (from_peer) peer_xfer_stats(peer, from_peer); @@ -381,6 +407,10 @@ static int bgp_connect_timer(struct thread *thread) int ret; peer = THREAD_ARG(thread); + + assert(!peer->t_write); + assert(!peer->t_read); + peer->t_connect = NULL; if (bgp_debug_neighbor_events(peer)) @@ -429,6 +459,9 @@ int bgp_routeadv_timer(struct thread *thread) peer->synctime = bgp_clock(); + thread_add_background(bm->master, bgp_generate_updgrp_packets, peer, 0, + &peer->t_generate_updgrp_packets); + /* MRAI timer will be started again when FIFO is built, no need to * do it here. */ @@ -634,6 +667,9 @@ void bgp_adjust_routeadv(struct peer *peer) BGP_TIMER_OFF(peer->t_routeadv); peer->synctime = bgp_clock(); + thread_add_background(bm->master, bgp_generate_updgrp_packets, + peer, 0, + &peer->t_generate_updgrp_packets); return; } @@ -1028,33 +1064,40 @@ int bgp_stop(struct peer *peer) bgp_bfd_deregister_peer(peer); } - /* Stop read and write threads when exists. */ - BGP_READ_OFF(peer->t_read); - peer_writes_off(peer); + /* stop keepalives */ + peer_keepalives_off(peer); + + /* Stop read and write threads. */ + bgp_writes_off(peer); + bgp_reads_off(peer); + + THREAD_OFF(peer->t_connect_check); /* Stop all timers. */ BGP_TIMER_OFF(peer->t_start); BGP_TIMER_OFF(peer->t_connect); BGP_TIMER_OFF(peer->t_holdtime); - peer_keepalives_off(peer); BGP_TIMER_OFF(peer->t_routeadv); - BGP_TIMER_OFF(peer->t_generate_updgrp_packets); - - /* Stream reset. */ - peer->packet_size = 0; /* Clear input and output buffer. */ - if (peer->ibuf) - stream_reset(peer->ibuf); - if (peer->work) - stream_reset(peer->work); - - pthread_mutex_lock(&peer->obuf_mtx); + pthread_mutex_lock(&peer->io_mtx); { + if (peer->ibuf) + stream_fifo_clean(peer->ibuf); if (peer->obuf) stream_fifo_clean(peer->obuf); + + if (peer->ibuf_work) + stream_reset(peer->ibuf_work); + if (peer->obuf_work) + stream_reset(peer->obuf_work); + + if (peer->curr) { + stream_free(peer->curr); + peer->curr = NULL; + } } - pthread_mutex_unlock(&peer->obuf_mtx); + pthread_mutex_unlock(&peer->io_mtx); /* Close of file descriptor. */ if (peer->fd >= 0) { @@ -1177,10 +1220,12 @@ static int bgp_connect_check(struct thread *thread) struct peer *peer; peer = THREAD_ARG(thread); + assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_READS_ON)); + assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON)); + assert(!peer->t_read); + assert(!peer->t_write); - /* This value needs to be unset in order for bgp_read() to be scheduled - */ - BGP_READ_OFF(peer->t_read); + peer->t_connect_check = NULL; /* Check file descriptor. */ slen = sizeof(status); @@ -1218,17 +1263,16 @@ static int bgp_connect_success(struct peer *peer) return -1; } - peer_writes_on(peer); - if (bgp_getsockname(peer) < 0) { zlog_err("%s: bgp_getsockname(): failed for peer %s, fd %d", __FUNCTION__, peer->host, peer->fd); bgp_notify_send(peer, BGP_NOTIFY_FSM_ERR, 0); /* internal error */ + bgp_writes_on(peer); return -1; } - BGP_READ_ON(peer->t_read, bgp_read, peer->fd); + bgp_reads_on(peer); if (bgp_debug_neighbor_events(peer)) { char buf1[SU_ADDRSTRLEN]; @@ -1332,6 +1376,10 @@ int bgp_start(struct peer *peer) #endif } + assert(!peer->t_write); + assert(!peer->t_read); + assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON)); + assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_READS_ON)); status = bgp_connect(peer); switch (status) { @@ -1362,7 +1410,8 @@ int bgp_start(struct peer *peer) // when the socket becomes ready (or fails to connect), // bgp_connect_check // will be called. - BGP_READ_ON(peer->t_read, bgp_connect_check, peer->fd); + thread_add_read(bm->master, bgp_connect_check, peer, peer->fd, + &peer->t_connect_check); break; } return 0; diff --git a/bgpd/bgp_fsm.h b/bgpd/bgp_fsm.h index a7abfdb2f..131de40b5 100644 --- a/bgpd/bgp_fsm.h +++ b/bgpd/bgp_fsm.h @@ -23,24 +23,6 @@ #define _QUAGGA_BGP_FSM_H /* Macro for BGP read, write and timer thread. */ -#define BGP_READ_ON(T, F, V) \ - do { \ - if ((peer->status != Deleted)) \ - thread_add_read(bm->master, (F), peer, (V), &(T)); \ - } while (0) - -#define BGP_READ_OFF(T) \ - do { \ - if (T) \ - THREAD_READ_OFF(T); \ - } while (0) - -#define BGP_WRITE_OFF(T) \ - do { \ - if (T) \ - THREAD_WRITE_OFF(T); \ - } while (0) - #define BGP_TIMER_ON(T, F, V) \ do { \ if ((peer->status != Deleted)) \ diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c index 5d14737d2..4bdafa744 100644 --- a/bgpd/bgp_io.c +++ b/bgpd/bgp_io.c @@ -1,8 +1,10 @@ /* BGP I/O. - Implements a consumer thread to flush packets destined for remote peers. + Implements packet I/O in a consumer pthread. + -------------------------------------------- Copyright (C) 2017 Cumulus Networks + Quentin Young This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -31,7 +33,7 @@ #include "log.h" #include "monotime.h" #include "network.h" -#include "frr_pthread.h" +#include "pqueue.h" #include "bgpd/bgpd.h" #include "bgpd/bgp_io.h" @@ -39,172 +41,278 @@ #include "bgpd/bgp_packet.h" #include "bgpd/bgp_fsm.h" -static int bgp_write(struct peer *); -static void peer_process_writes(struct hash_backet *, void *); +/* forward declarations */ +static uint16_t bgp_write(struct peer *); +static uint16_t bgp_read(struct peer *); +static int bgp_process_writes(struct thread *); +static int bgp_process_reads(struct thread *); +static bool validate_header(struct peer *); -bool bgp_packet_writes_thread_run = false; +/* generic i/o status codes */ +#define BGP_IO_TRANS_ERR (1 << 1) // EAGAIN or similar occurred +#define BGP_IO_FATAL_ERR (1 << 2) // some kind of fatal TCP error -/* Hash table of peers to operate on, associated synchronization primitives and - * hash table callbacks. +/* bgp_read() status codes */ +#define BGP_IO_READ_HEADER (1 << 3) // when read a full packet header +#define BGP_IO_READ_FULLPACKET (1 << 4) // read a full packet + +/* Start and stop routines for I/O pthread + control variables * ------------------------------------------------------------------------ */ -static struct hash *peerhash; -/* Mutex to protect hash table */ -static pthread_mutex_t *peerhash_mtx; -/* Condition variable used to notify the write thread that there is work to do - */ -static pthread_cond_t *write_cond; +bool bgp_packet_write_thread_run = false; +pthread_mutex_t *work_mtx; -static unsigned int peer_hash_key_make(void *p) -{ - struct peer *peer = p; - return sockunion_hash(&peer->su); -} +static struct list *read_cancel; +static struct list *write_cancel; -static int peer_hash_cmp(const void *p1, const void *p2) +void bgp_io_init() { - const struct peer *peer1 = p1; - const struct peer *peer2 = p2; - return (sockunion_same(&peer1->su, &peer2->su) - && CHECK_FLAG(peer1->flags, PEER_FLAG_CONFIG_NODE) - == CHECK_FLAG(peer2->flags, PEER_FLAG_CONFIG_NODE)); + work_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t)); + pthread_mutex_init(work_mtx, NULL); + + read_cancel = list_new(); + write_cancel = list_new(); } -/* ------------------------------------------------------------------------ */ -void peer_writes_init(void) +void *bgp_io_start(void *arg) { - peerhash_mtx = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_mutex_t)); - write_cond = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_cond_t)); - - // initialize mutex - pthread_mutex_init(peerhash_mtx, NULL); - - // use monotonic clock with condition variable - pthread_condattr_t attrs; - pthread_condattr_init(&attrs); - pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC); - pthread_cond_init(write_cond, &attrs); - pthread_condattr_destroy(&attrs); + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + + // we definitely don't want to handle signals + fpt->master->handle_signals = false; + + bgp_packet_write_thread_run = true; + struct thread task; + + while (bgp_packet_write_thread_run) { + if (thread_fetch(fpt->master, &task)) { + pthread_mutex_lock(work_mtx); + { + bool cancel = false; + struct peer *peer = THREAD_ARG(&task); + if ((task.func == bgp_process_reads + && listnode_lookup(read_cancel, peer)) + || (task.func == bgp_process_writes + && listnode_lookup(write_cancel, peer))) + cancel = true; + + list_delete_all_node(write_cancel); + list_delete_all_node(read_cancel); + + if (!cancel) + thread_call(&task); + } + pthread_mutex_unlock(work_mtx); + } + } - // initialize peerhash - peerhash = hash_create_size(2048, peer_hash_key_make, peer_hash_cmp); + return NULL; } -static void peer_writes_finish(void *arg) +int bgp_io_stop(void **result, struct frr_pthread *fpt) { - bgp_packet_writes_thread_run = false; - - if (peerhash) - hash_free(peerhash); - - peerhash = NULL; + fpt->master->spin = false; + bgp_packet_write_thread_run = false; + pthread_kill(fpt->thread, SIGINT); + pthread_join(fpt->thread, result); - pthread_mutex_unlock(peerhash_mtx); - pthread_mutex_destroy(peerhash_mtx); - pthread_cond_destroy(write_cond); + pthread_mutex_unlock(work_mtx); + pthread_mutex_destroy(work_mtx); - XFREE(MTYPE_PTHREAD, peerhash_mtx); - XFREE(MTYPE_PTHREAD, write_cond); + list_delete(read_cancel); + list_delete(write_cancel); + XFREE(MTYPE_TMP, work_mtx); + return 0; } +/* ------------------------------------------------------------------------ */ -void *peer_writes_start(void *arg) +void bgp_writes_on(struct peer *peer) { - struct timeval currtime = {0, 0}; - struct timeval sleeptime = {0, 500}; - struct timespec next_update = {0, 0}; - - pthread_mutex_lock(peerhash_mtx); - - // register cleanup handler - pthread_cleanup_push(&peer_writes_finish, NULL); - - bgp_packet_writes_thread_run = true; + assert(peer->status != Deleted); + assert(peer->obuf); + assert(peer->ibuf); + assert(peer->ibuf_work); + assert(!peer->t_connect_check); + assert(peer->fd); - while (bgp_packet_writes_thread_run) { - // wait around until next update time - if (peerhash->count > 0) - pthread_cond_timedwait(write_cond, peerhash_mtx, - &next_update); - else // wait around until we have some peers - while (peerhash->count == 0 - && bgp_packet_writes_thread_run) - pthread_cond_wait(write_cond, peerhash_mtx); + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - hash_iterate(peerhash, peer_process_writes, NULL); - - monotime(&currtime); - timeradd(&currtime, &sleeptime, &currtime); - TIMEVAL_TO_TIMESPEC(&currtime, &next_update); + pthread_mutex_lock(work_mtx); + { + listnode_delete(write_cancel, peer); + thread_add_write(fpt->master, bgp_process_writes, peer, + peer->fd, &peer->t_write); + SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); } + pthread_mutex_unlock(work_mtx); +} - // clean up - pthread_cleanup_pop(1); +void bgp_writes_off(struct peer *peer) +{ + pthread_mutex_lock(work_mtx); + { + THREAD_OFF(peer->t_write); + THREAD_OFF(peer->t_generate_updgrp_packets); + listnode_add(write_cancel, peer); - return NULL; + // peer access by us after this point will result in pain + UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); + } + pthread_mutex_unlock(work_mtx); + /* upon return, i/o thread must not access the peer */ } -int peer_writes_stop(void **result) +void bgp_reads_on(struct peer *peer) { - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_WRITE); - bgp_packet_writes_thread_run = false; - peer_writes_wake(); - pthread_join(fpt->thread, result); - return 0; + assert(peer->status != Deleted); + assert(peer->ibuf); + assert(peer->fd); + assert(peer->ibuf_work); + assert(stream_get_endp(peer->ibuf_work) == 0); + assert(peer->obuf); + assert(!peer->t_connect_check); + assert(peer->fd); + + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + + pthread_mutex_lock(work_mtx); + { + listnode_delete(read_cancel, peer); + thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, + &peer->t_read); + SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); + } + pthread_mutex_unlock(work_mtx); } -void peer_writes_on(struct peer *peer) +void bgp_reads_off(struct peer *peer) { - if (peer->status == Deleted) - return; - - pthread_mutex_lock(peerhash_mtx); + pthread_mutex_lock(work_mtx); { - if (!hash_lookup(peerhash, peer)) { - hash_get(peerhash, peer, hash_alloc_intern); - peer_lock(peer); - } + THREAD_OFF(peer->t_read); + THREAD_OFF(peer->t_process_packet); + listnode_add(read_cancel, peer); - SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); + // peer access by us after this point will result in pain + UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); } - pthread_mutex_unlock(peerhash_mtx); - peer_writes_wake(); + pthread_mutex_unlock(work_mtx); } -void peer_writes_off(struct peer *peer) +/** + * Called from PTHREAD_IO when select() or poll() determines that the file + * descriptor is ready to be written to. + */ +static int bgp_process_writes(struct thread *thread) { - pthread_mutex_lock(peerhash_mtx); + static struct peer *peer; + peer = THREAD_ARG(thread); + uint16_t status; + + if (peer->fd < 0) + return -1; + + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + + bool reschedule; + pthread_mutex_lock(&peer->io_mtx); { - if (hash_release(peerhash, peer)) { - peer_unlock(peer); - fprintf(stderr, "Releasing %p\n", peer); - } + status = bgp_write(peer); + reschedule = (stream_fifo_head(peer->obuf) != NULL); + } + pthread_mutex_unlock(&peer->io_mtx); - UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); + if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */ } - pthread_mutex_unlock(peerhash_mtx); -} -void peer_writes_wake() -{ - pthread_cond_signal(write_cond); + if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) + reschedule = 0; // problem + + if (reschedule) { + thread_add_write(fpt->master, bgp_process_writes, peer, + peer->fd, &peer->t_write); + thread_add_background(bm->master, bgp_generate_updgrp_packets, + peer, 0, + &peer->t_generate_updgrp_packets); + } + + return 0; } /** - * Callback for hash_iterate. Takes a hash bucket, unwraps it into a peer and - * synchronously calls bgp_write() on the peer. + * Called from PTHREAD_IO when select() or poll() determines that the file + * descriptor is ready to be read from. */ -static void peer_process_writes(struct hash_backet *hb, void *arg) +static int bgp_process_reads(struct thread *thread) { static struct peer *peer; - peer = hb->data; - pthread_mutex_lock(&peer->obuf_mtx); + peer = THREAD_ARG(thread); + uint16_t status; + + if (peer->fd < 0) + return -1; + + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + + bool reschedule = true; + + // execute read + pthread_mutex_lock(&peer->io_mtx); { - bgp_write(peer); + status = bgp_read(peer); + } + pthread_mutex_unlock(&peer->io_mtx); + + // check results of read + bool header_valid = true; + + if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */ } - pthread_mutex_unlock(&peer->obuf_mtx); - // dispatch job on main thread - BGP_TIMER_ON(peer->t_generate_updgrp_packets, - bgp_generate_updgrp_packets, 100); + if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) + reschedule = false; // problem + + if (CHECK_FLAG(status, BGP_IO_READ_HEADER)) { + header_valid = validate_header(peer); + if (!header_valid) { + bgp_size_t packetsize = + MIN((int)stream_get_endp(peer->ibuf_work), + BGP_MAX_PACKET_SIZE); + memcpy(peer->last_reset_cause, peer->ibuf_work->data, + packetsize); + peer->last_reset_cause_size = packetsize; + // We're tearing the session down, no point in + // rescheduling. + // Additionally, bgp_read() will use the TLV if it's + // present to + // determine how much to read; if this is corrupt, we'll + // crash the + // program. + reschedule = false; + } + } + + // if we read a full packet, push it onto peer->ibuf, reset our WiP + // buffer + // and schedule a job to process it on the main thread + if (header_valid && CHECK_FLAG(status, BGP_IO_READ_FULLPACKET)) { + pthread_mutex_lock(&peer->io_mtx); + { + stream_fifo_push(peer->ibuf, + stream_dup(peer->ibuf_work)); + } + pthread_mutex_unlock(&peer->io_mtx); + stream_reset(peer->ibuf_work); + assert(stream_get_endp(peer->ibuf_work) == 0); + + thread_add_background(bm->master, bgp_process_packet, peer, 0, + &peer->t_process_packet); + } + + if (reschedule) + thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, + &peer->t_read); + + return 0; } /** @@ -212,14 +320,14 @@ static void peer_process_writes(struct hash_backet *hb, void *arg) * * This function pops packets off of peer->obuf and writes them to peer->fd. * The amount of packets written is equal to the minimum of peer->wpkt_quanta - * and the number of packets on the output buffer. + * and the number of packets on the output buffer, unless an error occurs. * * If write() returns an error, the appropriate FSM event is generated. * * The return value is equal to the number of packets written * (which may be zero). */ -static int bgp_write(struct peer *peer) +static uint16_t bgp_write(struct peer *peer) { u_char type; struct stream *s; @@ -227,10 +335,8 @@ static int bgp_write(struct peer *peer) int update_last_write = 0; unsigned int count = 0; unsigned int oc = 0; + uint16_t status = 0; - /* Write packets. The number of packets written is the value of - * bgp->wpkt_quanta or the size of the output buffer, whichever is - * smaller.*/ while (count < peer->bgp->wpkt_quanta && (s = stream_fifo_head(peer->obuf))) { int writenum; @@ -239,8 +345,12 @@ static int bgp_write(struct peer *peer) num = write(peer->fd, STREAM_PNT(s), writenum); if (num < 0) { - if (!ERRNO_IO_RETRY(errno)) + if (!ERRNO_IO_RETRY(errno)) { BGP_EVENT_ADD(peer, TCP_fatal_error); + SET_FLAG(status, BGP_IO_FATAL_ERR); + } else { + SET_FLAG(status, BGP_IO_TRANS_ERR); + } goto done; } else if (num != writenum) // incomplete write @@ -288,7 +398,7 @@ static int bgp_write(struct peer *peer) } count++; - /* OK we send packet so delete it. */ + stream_free(stream_fifo_pop(peer->obuf)); update_last_write = 1; } @@ -303,5 +413,170 @@ done : { peer->last_write = bgp_clock(); } - return count; + return status; +} + +/** + * Reads <= 1 packet worth of data from peer->fd into peer->ibuf_work. + * + * @return whether a full packet was read + */ +static uint16_t bgp_read(struct peer *peer) +{ + int readsize; // how many bytes we want to read + int nbytes; // how many bytes we actually read + bool have_header = false; + uint16_t status = 0; + + if (stream_get_endp(peer->ibuf_work) < BGP_HEADER_SIZE) + readsize = BGP_HEADER_SIZE - stream_get_endp(peer->ibuf_work); + else { + // retrieve packet length from tlv and compute # bytes we still + // need + u_int16_t mlen = + stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE); + readsize = mlen - stream_get_endp(peer->ibuf_work); + have_header = true; + } + + nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize); + + if (nbytes <= 0) // handle errors + { + switch (nbytes) { + case -1: // fatal error; tear down the session + zlog_err("%s [Error] bgp_read_packet error: %s", + peer->host, safe_strerror(errno)); + + if (peer->status == Established) { + if (CHECK_FLAG(peer->sflags, + PEER_STATUS_NSF_MODE)) { + peer->last_reset = + PEER_DOWN_NSF_CLOSE_SESSION; + SET_FLAG(peer->sflags, + PEER_STATUS_NSF_WAIT); + } else + peer->last_reset = + PEER_DOWN_CLOSE_SESSION; + } + + BGP_EVENT_ADD(peer, TCP_fatal_error); + SET_FLAG(status, BGP_IO_FATAL_ERR); + break; + + case 0: // TCP session closed + if (bgp_debug_neighbor_events(peer)) + zlog_debug( + "%s [Event] BGP connection closed fd %d", + peer->host, peer->fd); + + if (peer->status == Established) { + if (CHECK_FLAG(peer->sflags, + PEER_STATUS_NSF_MODE)) { + peer->last_reset = + PEER_DOWN_NSF_CLOSE_SESSION; + SET_FLAG(peer->sflags, + PEER_STATUS_NSF_WAIT); + } else + peer->last_reset = + PEER_DOWN_CLOSE_SESSION; + } + + BGP_EVENT_ADD(peer, TCP_connection_closed); + SET_FLAG(status, BGP_IO_FATAL_ERR); + break; + + case -2: // temporary error; come back later + SET_FLAG(status, BGP_IO_TRANS_ERR); + break; + default: + break; + } + + return status; + } + + // If we didn't have the header before read(), and now we do, set the + // appropriate flag. The caller must validate the header for us. + if (!have_header + && stream_get_endp(peer->ibuf_work) >= BGP_HEADER_SIZE) { + SET_FLAG(status, BGP_IO_READ_HEADER); + have_header = true; + } + // If we read the # of bytes specified in the tlv, we have read a full + // packet. + // + // Note that the header may not have been validated here. This flag + // means + // ONLY that we read the # of bytes specified in the header; if the + // header is + // not valid, the packet MUST NOT be processed further. + if (have_header && (stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE) + == stream_get_endp(peer->ibuf_work))) + SET_FLAG(status, BGP_IO_READ_FULLPACKET); + + return status; +} + +/* + * Called after we have read a BGP packet header. Validates marker, message + * type and packet length. If any of these aren't correct, sends a notify. + */ +static bool validate_header(struct peer *peer) +{ + u_int16_t size, type; + + /* Marker check */ + for (int i = 0; i < BGP_MARKER_SIZE; i++) + if (peer->ibuf_work->data[i] != 0xff) { + bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR, + BGP_NOTIFY_HEADER_NOT_SYNC); + return false; + } + + /* Get size and type. */ + size = stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE); + type = stream_getc_from(peer->ibuf_work, BGP_MARKER_SIZE + 2); + + /* BGP type check. */ + if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE + && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE + && type != BGP_MSG_ROUTE_REFRESH_NEW + && type != BGP_MSG_ROUTE_REFRESH_OLD + && type != BGP_MSG_CAPABILITY) { + if (bgp_debug_neighbor_events(peer)) + zlog_debug("%s unknown message type 0x%02x", peer->host, + type); + + bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR, + BGP_NOTIFY_HEADER_BAD_MESTYPE, + (u_char *)&type, 1); + return false; + } + + /* Mimimum packet length check. */ + if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE) + || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE) + || (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE) + || (type == BGP_MSG_NOTIFY && size < BGP_MSG_NOTIFY_MIN_SIZE) + || (type == BGP_MSG_KEEPALIVE && size != BGP_MSG_KEEPALIVE_MIN_SIZE) + || (type == BGP_MSG_ROUTE_REFRESH_NEW + && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE) + || (type == BGP_MSG_ROUTE_REFRESH_OLD + && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE) + || (type == BGP_MSG_CAPABILITY + && size < BGP_MSG_CAPABILITY_MIN_SIZE)) { + if (bgp_debug_neighbor_events(peer)) + zlog_debug("%s bad message length - %d for %s", + peer->host, size, + type == 128 ? "ROUTE-REFRESH" + : bgp_type_str[(int)type]); + + bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR, + BGP_NOTIFY_HEADER_BAD_MESLEN, + (u_char *)&size, 2); + return false; + } + + return true; } diff --git a/bgpd/bgp_io.h b/bgpd/bgp_io.h index 7b81b8ee3..fd5f7659d 100644 --- a/bgpd/bgp_io.h +++ b/bgpd/bgp_io.h @@ -23,13 +23,13 @@ #ifndef _FRR_BGP_IO_H #define _FRR_BGP_IO_H +#include "frr_pthread.h" #include "bgpd/bgpd.h" /** * Control variable for write thread. * - * Setting this variable to false and calling peer_writes_wake() will - * eventually result in thread termination. + * Setting this variable to false will eventually result in thread termination. */ extern bool bgp_packet_writes_thread_run; @@ -37,35 +37,32 @@ extern bool bgp_packet_writes_thread_run; * Initializes data structures and flags for the write thread. * * This function should be called from the main thread before - * peer_writes_start() is invoked. + * bgp_writes_start() is invoked. */ -extern void peer_writes_init(void); +extern void bgp_io_init(void); /** * Start function for write thread. * - * This function should be passed to pthread_create() during BGP startup. + * @param arg - unused */ -extern void *peer_writes_start(void *arg); +extern void *bgp_io_start(void *arg); /** * Start function for write thread. * * Uninitializes all resources and stops the thread. * - * @param result -- where to store data result, unused + * @param result - where to store data result, unused */ -extern int peer_writes_stop(void **result); +extern int bgp_io_stop(void **result, struct frr_pthread *fpt); /** - * Registers a peer with the write thread. - * - * This function adds the peer to an internal data structure, which must be - * locked for write access. This call will block until the structure can be - * locked. + * Turns on packet writing for a peer. * * After this function is called, any packets placed on peer->obuf will be - * written to peer->fd at regular intervals. + * written to peer->fd at regular intervals. Additionally it becomes unsafe to + * use peer->fd with select() or poll(). * * This function increments the peer reference counter with peer_lock(). * @@ -73,32 +70,58 @@ extern int peer_writes_stop(void **result); * * @param peer - peer to register */ -extern void peer_writes_on(struct peer *peer); +extern void bgp_writes_on(struct peer *peer); /** - * Deregisters a peer with the write thread. - * - * This function removes the peer from an internal data structure, which must - * be locked for write access. This call will block until the structure can be - * locked. + * Turns off packet writing for a peer. * * After this function is called, any packets placed on peer->obuf will not be - * written to peer->fd. + * written to peer->fd. After this function returns it is safe to use peer->fd + * with select() or poll(). * - * This function decrements the peer reference counter with peer_unlock(). + * If the flush = true, a last-ditch effort will be made to flush any remaining + * packets to peer->fd. Upon encountering any error whatsoever, the attempt + * will abort. If the caller wishes to know whether the flush succeeded they + * may check peer->obuf->count against zero. * * If the peer is not registered, nothing happens. * * @param peer - peer to deregister + * @param flush - as described + */ +extern void bgp_writes_off(struct peer *peer); + +/** + * Turns on packet reading for a peer. + * + * After this function is called, any packets received on peer->fd will be read + * and copied into the FIFO queue peer->ibuf. Additionally it becomes unsafe to + * use peer->fd with select() or poll(). + * + * When a full packet is read, bgp_process_packet() will be scheduled on the + * main thread. + * + * This function increments the peer reference counter with peer_lock(). + * + * If the peer is already registered, nothing happens. + * + * @param peer - peer to register */ -extern void peer_writes_off(struct peer *peer); +extern void bgp_reads_on(struct peer *peer); /** - * Notifies the write thread that there is work to be done. + * Turns off packet reading for a peer. + * + * After this function is called, any packets received on peer->fd will not be + * read. After this function returns it is safe to use peer->fd with select() + * or poll(). * - * This function has the effect of waking the write thread if it is sleeping. - * If the thread is not sleeping, this signal will be ignored. + * This function decrements the peer reference counter with peer_unlock(). + * + * If the peer is not registered, nothing happens. + * + * @param peer - peer to deregister */ -extern void peer_writes_wake(void); +extern void bgp_reads_off(struct peer *peer); #endif /* _FRR_BGP_IO_H */ diff --git a/bgpd/bgp_keepalives.c b/bgpd/bgp_keepalives.c index 23f3f5173..c7383ae77 100644 --- a/bgpd/bgp_keepalives.c +++ b/bgpd/bgp_keepalives.c @@ -1,26 +1,26 @@ /* - * BGP Keepalives. - * - * Implements a producer thread to generate BGP keepalives for peers. - * ---------------------------------------- - * Copyright (C) 2017 Cumulus Networks, Inc. - * Quentin Young - * - * This file is part of FRRouting. - * - * FRRouting is free software; you can redistribute it and/or modify it under - * the terms of the GNU General Public License as published by the Free - * Software Foundation; either version 2, or (at your option) any later - * version. - * - * FRRouting is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS - * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more - * details. - * - * You should have received a copy of the GNU General Public License along with - * FRRouting; see the file COPYING. If not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + BGP Keepalives. + + Implements a producer thread to generate BGP keepalives for peers. + ---------------------------------------- + Copyright (C) 2017 Cumulus Networks, Inc. + Quentin Young + + This file is part of FRRouting. + + FRRouting is free software; you can redistribute it and/or modify it under + the terms of the GNU General Public License as published by the Free + Software Foundation; either version 2, or (at your option) any later + version. + + FRRouting is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + details. + + You should have received a copy of the GNU General Public License along with + FRRouting; see the file COPYING. If not, write to the Free Software + Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ #include <zebra.h> #include <signal.h> @@ -73,7 +73,8 @@ static void pkat_del(void *pkat) /* - * Walks the list of peers, sending keepalives to those that are due for them. + * Callback for hash_iterate. Determines if a peer needs a keepalive and if so, + * generates and sends it. * * For any given peer, if the elapsed time since its last keepalive exceeds its * configured keepalive timer, a keepalive is sent to the peer and its @@ -143,8 +144,8 @@ static unsigned int peer_hash_key(void *arg) void peer_keepalives_init() { - peerhash_mtx = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_mutex_t)); - peerhash_cond = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_cond_t)); + peerhash_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t)); + peerhash_cond = XCALLOC(MTYPE_TMP, sizeof(pthread_cond_t)); // initialize mutex pthread_mutex_init(peerhash_mtx, NULL); @@ -175,8 +176,8 @@ static void peer_keepalives_finish(void *arg) pthread_mutex_destroy(peerhash_mtx); pthread_cond_destroy(peerhash_cond); - XFREE(MTYPE_PTHREAD, peerhash_mtx); - XFREE(MTYPE_PTHREAD, peerhash_cond); + XFREE(MTYPE_TMP, peerhash_mtx); + XFREE(MTYPE_TMP, peerhash_cond); } /** @@ -275,9 +276,8 @@ void peer_keepalives_wake() pthread_mutex_unlock(peerhash_mtx); } -int peer_keepalives_stop(void **result) +int peer_keepalives_stop(void **result, struct frr_pthread *fpt) { - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES); bgp_keepalives_thread_run = false; peer_keepalives_wake(); pthread_join(fpt->thread, result); diff --git a/bgpd/bgp_keepalives.h b/bgpd/bgp_keepalives.h index d74b69e90..602b0da77 100644 --- a/bgpd/bgp_keepalives.h +++ b/bgpd/bgp_keepalives.h @@ -24,6 +24,7 @@ #ifndef _BGP_KEEPALIVES_H_ #define _BGP_KEEPALIVES_H_ +#include "frr_pthread.h" #include "bgpd.h" /* Thread control flag. @@ -88,6 +89,6 @@ extern void *peer_keepalives_start(void *arg); extern void peer_keepalives_wake(void); /* stop function */ -int peer_keepalives_stop(void **result); +int peer_keepalives_stop(void **result, struct frr_pthread *fpt); #endif /* _BGP_KEEPALIVES_H */ diff --git a/bgpd/bgp_packet.c b/bgpd/bgp_packet.c index a89d72cc6..c21e3cbe3 100644 --- a/bgpd/bgp_packet.c +++ b/bgpd/bgp_packet.c @@ -59,16 +59,6 @@ #include "bgpd/bgp_label.h" #include "bgpd/bgp_io.h" -/* Linked list of active peers */ -static pthread_mutex_t *plist_mtx; -static pthread_cond_t *write_cond; -static struct list *plist; - -/* periodically scheduled thread to generate update-group updates */ -static struct thread *t_generate_updgrp_packets; - -bool bgp_packet_writes_thread_run = false; - /* Set up BGP packet marker and packet type. */ int bgp_packet_set_marker(struct stream *s, u_char type) { @@ -107,11 +97,9 @@ int bgp_packet_set_size(struct stream *s) */ static void bgp_packet_add(struct peer *peer, struct stream *s) { - pthread_mutex_lock(&peer->obuf_mtx); + pthread_mutex_lock(&peer->io_mtx); stream_fifo_push(peer->obuf, s); - pthread_mutex_unlock(&peer->obuf_mtx); - - peer_writes_wake(); + pthread_mutex_unlock(&peer->io_mtx); } static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi, @@ -165,7 +153,6 @@ static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi, int bgp_generate_updgrp_packets(struct thread *thread) { struct peer *peer = THREAD_ARG(thread); - peer->t_generate_updgrp_packets = NULL; struct stream *s; struct peer_af *paf; @@ -237,10 +224,13 @@ int bgp_generate_updgrp_packets(struct thread *thread) if ((s = bgp_update_packet_eor( peer, afi, - safi))) + safi))) { bgp_packet_add( peer, s); + bgp_writes_on( + peer); + } } } continue; @@ -252,6 +242,7 @@ int bgp_generate_updgrp_packets(struct thread *thread) * attributes from peer and advance peer */ s = bpacket_reformat_for_peer(next_pkt, paf); bgp_packet_add(peer, s); + bgp_writes_on(peer); bpacket_queue_advance_peer(paf); } } while (s); @@ -282,6 +273,8 @@ void bgp_keepalive_send(struct peer *peer) /* Add packet to the peer. */ bgp_packet_add(peer, s); + + bgp_writes_on(peer); } /* @@ -335,6 +328,67 @@ void bgp_open_send(struct peer *peer) /* Add packet to the peer. */ bgp_packet_add(peer, s); + + bgp_writes_on(peer); +} + +/* This is only for sending NOTIFICATION message to neighbor. */ +static int bgp_write_notify(struct peer *peer) +{ + int ret, val; + u_char type; + struct stream *s; + + pthread_mutex_lock(&peer->io_mtx); + { + /* There should be at least one packet. */ + s = stream_fifo_pop(peer->obuf); + if (!s) + return 0; + assert(stream_get_endp(s) >= BGP_HEADER_SIZE); + } + pthread_mutex_unlock(&peer->io_mtx); + + /* Stop collecting data within the socket */ + sockopt_cork(peer->fd, 0); + + /* socket is in nonblocking mode, if we can't deliver the NOTIFY, well, + * we only care about getting a clean shutdown at this point. */ + ret = write(peer->fd, STREAM_DATA(s), stream_get_endp(s)); + + /* only connection reset/close gets counted as TCP_fatal_error, failure + * to write the entire NOTIFY doesn't get different FSM treatment */ + if (ret <= 0) { + BGP_EVENT_ADD(peer, TCP_fatal_error); + return 0; + } + + /* Disable Nagle, make NOTIFY packet go out right away */ + val = 1; + (void)setsockopt(peer->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val, + sizeof(val)); + + /* Retrieve BGP packet type. */ + stream_set_getp(s, BGP_MARKER_SIZE + 2); + type = stream_getc(s); + + assert(type == BGP_MSG_NOTIFY); + + /* Type should be notify. */ + peer->notify_out++; + + /* Double start timer. */ + peer->v_start *= 2; + + /* Overflow check. */ + if (peer->v_start >= (60 * 2)) + peer->v_start = (60 * 2); + + /* Handle Graceful Restart case where the state changes to + Connect instead of Idle */ + BGP_EVENT_ADD(peer, BGP_Stop); + + return 0; } /* @@ -372,10 +426,12 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code, /* Set BGP packet length. */ length = bgp_packet_set_size(s); - /* Add packet to the peer. */ - pthread_mutex_lock(&peer->obuf_mtx); - stream_fifo_clean(peer->obuf); - pthread_mutex_unlock(&peer->obuf_mtx); + /* wipe output buffer */ + pthread_mutex_lock(&peer->io_mtx); + { + stream_fifo_clean(peer->obuf); + } + pthread_mutex_unlock(&peer->io_mtx); /* For debug */ { @@ -428,8 +484,8 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code, /* Add packet to peer's output queue */ bgp_packet_add(peer, s); - /* Wake up the write thread to get the notify out ASAP */ - peer_writes_wake(); + + bgp_write_notify(peer); } /* @@ -544,6 +600,8 @@ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi, /* Add packet to the peer. */ bgp_packet_add(peer, s); + + bgp_writes_on(peer); } /* @@ -593,6 +651,8 @@ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi, /* Add packet to the peer. */ bgp_packet_add(peer, s); + + bgp_writes_on(peer); } /* RFC1771 6.8 Connection collision detection. */ @@ -696,13 +756,13 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size) u_int16_t *holdtime_ptr; /* Parse open packet. */ - version = stream_getc(peer->ibuf); - memcpy(notify_data_remote_as, stream_pnt(peer->ibuf), 2); - remote_as = stream_getw(peer->ibuf); - holdtime_ptr = (u_int16_t *)stream_pnt(peer->ibuf); - holdtime = stream_getw(peer->ibuf); - memcpy(notify_data_remote_id, stream_pnt(peer->ibuf), 4); - remote_id.s_addr = stream_get_ipv4(peer->ibuf); + version = stream_getc(peer->curr); + memcpy(notify_data_remote_as, stream_pnt(peer->curr), 2); + remote_as = stream_getw(peer->curr); + holdtime_ptr = (u_int16_t *)stream_pnt(peer->curr); + holdtime = stream_getw(peer->curr); + memcpy(notify_data_remote_id, stream_pnt(peer->curr), 4); + remote_id.s_addr = stream_get_ipv4(peer->curr); /* Receive OPEN message log */ if (bgp_debug_neighbor_events(peer)) @@ -714,11 +774,11 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size) /* BEGIN to read the capability here, but dont do it yet */ mp_capability = 0; - optlen = stream_getc(peer->ibuf); + optlen = stream_getc(peer->curr); if (optlen != 0) { /* If not enough bytes, it is an error. */ - if (STREAM_READABLE(peer->ibuf) < optlen) { + if (STREAM_READABLE(peer->curr) < optlen) { bgp_notify_send(peer, BGP_NOTIFY_OPEN_ERR, BGP_NOTIFY_OPEN_MALFORMED_ATTR); return -1; @@ -990,10 +1050,6 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size) return (ret); } - peer->packet_size = 0; - if (peer->ibuf) - stream_reset(peer->ibuf); - return 0; } @@ -1177,7 +1233,7 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size) memset(peer->rcvd_attr_str, 0, BUFSIZ); peer->rcvd_attr_printed = 0; - s = peer->ibuf; + s = peer->curr; end = stream_pnt(s) + size; /* RFC1771 6.3 If the Unfeasible Routes Length or Total Attribute @@ -1424,8 +1480,8 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size) peer->notify.length = 0; } - bgp_notify.code = stream_getc(peer->ibuf); - bgp_notify.subcode = stream_getc(peer->ibuf); + bgp_notify.code = stream_getc(peer->curr); + bgp_notify.subcode = stream_getc(peer->curr); bgp_notify.length = size - 2; bgp_notify.data = NULL; @@ -1436,7 +1492,7 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size) if (bgp_notify.length) { peer->notify.length = size - 2; peer->notify.data = XMALLOC(MTYPE_TMP, size - 2); - memcpy(peer->notify.data, stream_pnt(peer->ibuf), size - 2); + memcpy(peer->notify.data, stream_pnt(peer->curr), size - 2); } /* For debug */ @@ -1451,12 +1507,12 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size) for (i = 0; i < bgp_notify.length; i++) if (first) { sprintf(c, " %02x", - stream_getc(peer->ibuf)); + stream_getc(peer->curr)); strcat(bgp_notify.data, c); } else { first = 1; sprintf(c, "%02x", - stream_getc(peer->ibuf)); + stream_getc(peer->curr)); strcpy(bgp_notify.data, c); } bgp_notify.raw_data = (u_char *)peer->notify.data; @@ -1526,7 +1582,7 @@ static void bgp_route_refresh_receive(struct peer *peer, bgp_size_t size) return; } - s = peer->ibuf; + s = peer->curr; /* Parse packet. */ pkt_afi = stream_getw(s); @@ -1874,7 +1930,7 @@ int bgp_capability_receive(struct peer *peer, bgp_size_t size) u_char *pnt; /* Fetch pointer. */ - pnt = stream_pnt(peer->ibuf); + pnt = stream_pnt(peer->curr); if (bgp_debug_neighbor_events(peer)) zlog_debug("%s rcv CAPABILITY", peer->host); @@ -1902,188 +1958,50 @@ int bgp_capability_receive(struct peer *peer, bgp_size_t size) return bgp_capability_msg_parse(peer, pnt, size); } -/* BGP read utility function. */ -static int bgp_read_packet(struct peer *peer) +/* Starting point of packet process function. */ +int bgp_process_packet(struct thread *thread) { - int nbytes; - int readsize; - - readsize = peer->packet_size - stream_get_endp(peer->ibuf); + /* Yes first of all get peer pointer. */ + struct peer *peer; + peer = THREAD_ARG(thread); - /* If size is zero then return. */ - if (!readsize) + /* Guard against scheduled events that occur after peer deletion. */ + if (peer->status == Deleted) return 0; - /* Read packet from fd. */ - nbytes = stream_read_try(peer->ibuf, peer->fd, readsize); - - /* If read byte is smaller than zero then error occured. */ - if (nbytes < 0) { - /* Transient error should retry */ - if (nbytes == -2) - return -1; - - zlog_err("%s [Error] bgp_read_packet error: %s", peer->host, - safe_strerror(errno)); - - if (peer->status == Established) { - if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) { - peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION; - SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT); - } else - peer->last_reset = PEER_DOWN_CLOSE_SESSION; - } - - BGP_EVENT_ADD(peer, TCP_fatal_error); - return -1; - } - - /* When read byte is zero : clear bgp peer and return */ - if (nbytes == 0) { - if (bgp_debug_neighbor_events(peer)) - zlog_debug("%s [Event] BGP connection closed fd %d", - peer->host, peer->fd); - - if (peer->status == Established) { - if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) { - peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION; - SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT); - } else - peer->last_reset = PEER_DOWN_CLOSE_SESSION; - } - - BGP_EVENT_ADD(peer, TCP_connection_closed); - return -1; - } - - /* We read partial packet. */ - if (stream_get_endp(peer->ibuf) != peer->packet_size) - return -1; - - return 0; -} - -/* Marker check. */ -static int bgp_marker_all_one(struct stream *s, int length) -{ - int i; - - for (i = 0; i < length; i++) - if (s->data[i] != 0xff) - return 0; - - return 1; -} - -/* Starting point of packet process function. */ -int bgp_read(struct thread *thread) -{ - int ret; u_char type = 0; - struct peer *peer; bgp_size_t size; char notify_data_length[2]; u_int32_t notify_out; - /* Yes first of all get peer pointer. */ - peer = THREAD_ARG(thread); - peer->t_read = NULL; - /* Note notify_out so we can check later to see if we sent another one */ notify_out = peer->notify_out; - if (peer->fd < 0) { - zlog_err("bgp_read(): peer's fd is negative value %d", - peer->fd); - return -1; + pthread_mutex_lock(&peer->io_mtx); + { + peer->curr = stream_fifo_pop(peer->ibuf); } + pthread_mutex_unlock(&peer->io_mtx); - BGP_READ_ON(peer->t_read, bgp_read, peer->fd); - - /* Read packet header to determine type of the packet */ - if (peer->packet_size == 0) - peer->packet_size = BGP_HEADER_SIZE; - - if (stream_get_endp(peer->ibuf) < BGP_HEADER_SIZE) { - ret = bgp_read_packet(peer); - - /* Header read error or partial read packet. */ - if (ret < 0) - goto done; - - /* Get size and type. */ - stream_forward_getp(peer->ibuf, BGP_MARKER_SIZE); - memcpy(notify_data_length, stream_pnt(peer->ibuf), 2); - size = stream_getw(peer->ibuf); - type = stream_getc(peer->ibuf); - - /* Marker check */ - if (((type == BGP_MSG_OPEN) || (type == BGP_MSG_KEEPALIVE)) - && !bgp_marker_all_one(peer->ibuf, BGP_MARKER_SIZE)) { - bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR, - BGP_NOTIFY_HEADER_NOT_SYNC); - goto done; - } - - /* BGP type check. */ - if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE - && type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE - && type != BGP_MSG_ROUTE_REFRESH_NEW - && type != BGP_MSG_ROUTE_REFRESH_OLD - && type != BGP_MSG_CAPABILITY) { - if (bgp_debug_neighbor_events(peer)) - zlog_debug("%s unknown message type 0x%02x", - peer->host, type); - bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR, - BGP_NOTIFY_HEADER_BAD_MESTYPE, - &type, 1); - goto done; - } - /* Mimimum packet length check. */ - if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE) - || (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE) - || (type == BGP_MSG_UPDATE - && size < BGP_MSG_UPDATE_MIN_SIZE) - || (type == BGP_MSG_NOTIFY - && size < BGP_MSG_NOTIFY_MIN_SIZE) - || (type == BGP_MSG_KEEPALIVE - && size != BGP_MSG_KEEPALIVE_MIN_SIZE) - || (type == BGP_MSG_ROUTE_REFRESH_NEW - && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE) - || (type == BGP_MSG_ROUTE_REFRESH_OLD - && size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE) - || (type == BGP_MSG_CAPABILITY - && size < BGP_MSG_CAPABILITY_MIN_SIZE)) { - if (bgp_debug_neighbor_events(peer)) - zlog_debug("%s bad message length - %d for %s", - peer->host, size, - type == 128 - ? "ROUTE-REFRESH" - : bgp_type_str[(int)type]); - bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR, - BGP_NOTIFY_HEADER_BAD_MESLEN, - (u_char *)notify_data_length, - 2); - goto done; - } + if (peer->curr == NULL) // no packets to process, hmm... + return 0; - /* Adjust size to message length. */ - peer->packet_size = size; - } + bgp_size_t actual_size = stream_get_endp(peer->curr); - ret = bgp_read_packet(peer); - if (ret < 0) - goto done; + /* skip the marker and copy the packet length */ + stream_forward_getp(peer->curr, BGP_MARKER_SIZE); + memcpy(notify_data_length, stream_pnt(peer->curr), 2); - /* Get size and type again. */ - (void)stream_getw_from(peer->ibuf, BGP_MARKER_SIZE); - type = stream_getc_from(peer->ibuf, BGP_MARKER_SIZE + 2); + /* read in the packet length and type */ + size = stream_getw(peer->curr); + type = stream_getc(peer->curr); /* BGP packet dump function. */ - bgp_dump_packet(peer, type, peer->ibuf); + bgp_dump_packet(peer, type, peer->curr); - size = (peer->packet_size - BGP_HEADER_SIZE); + /* adjust size to exclude the marker + length + type */ + size -= BGP_HEADER_SIZE; /* Read rest of the packet and call each sort of packet routine */ switch (type) { @@ -2118,26 +2036,14 @@ int bgp_read(struct thread *thread) * of the packet for troubleshooting purposes */ if (notify_out < peer->notify_out) { - memcpy(peer->last_reset_cause, peer->ibuf->data, - peer->packet_size); - peer->last_reset_cause_size = peer->packet_size; - notify_out = peer->notify_out; + memcpy(peer->last_reset_cause, peer->curr->data, actual_size); + peer->last_reset_cause_size = actual_size; } - /* Clear input buffer. */ - peer->packet_size = 0; - if (peer->ibuf) - stream_reset(peer->ibuf); - -done: - /* If reading this packet caused us to send a NOTIFICATION then store a - * copy - * of the packet for troubleshooting purposes - */ - if (notify_out < peer->notify_out) { - memcpy(peer->last_reset_cause, peer->ibuf->data, - peer->packet_size); - peer->last_reset_cause_size = peer->packet_size; + /* Delete packet and carry on. */ + if (peer->curr) { + stream_free(peer->curr); + peer->curr = NULL; } return 0; diff --git a/bgpd/bgp_packet.h b/bgpd/bgp_packet.h index d7080d7fb..502dbbdee 100644 --- a/bgpd/bgp_packet.h +++ b/bgpd/bgp_packet.h @@ -38,8 +38,6 @@ #define ORF_COMMON_PART_DENY 0x20 /* Packet send and receive function prototypes. */ -extern int bgp_read(struct thread *); - extern void bgp_keepalive_send(struct peer *); extern void bgp_open_send(struct peer *); extern void bgp_notify_send(struct peer *, u_int8_t, u_int8_t); @@ -68,5 +66,6 @@ extern int bgp_packet_set_size(struct stream *s); extern bool bgp_packet_writes_thread_run; extern int bgp_generate_updgrp_packets(struct thread *); +extern int bgp_process_packet(struct thread *); #endif /* _QUAGGA_BGP_PACKET_H */ diff --git a/bgpd/bgp_updgrp.h b/bgpd/bgp_updgrp.h index a50bc05fe..3e503a8be 100644 --- a/bgpd/bgp_updgrp.h +++ b/bgpd/bgp_updgrp.h @@ -179,7 +179,7 @@ struct update_subgroup { struct stream *work; /* We use a separate stream to encode MP_REACH_NLRI for efficient - * NLRI packing. peer->work stores all the other attributes. The + * NLRI packing. peer->obuf_work stores all the other attributes. The * actual packet is then constructed by concatenating the two. */ struct stream *scratch; diff --git a/bgpd/bgp_vty.c b/bgpd/bgp_vty.c index af702ac85..095379060 100644 --- a/bgpd/bgp_vty.c +++ b/bgpd/bgp_vty.c @@ -7068,14 +7068,40 @@ static int bgp_show_summary(struct vty *vty, struct bgp *bgp, int afi, int safi, vty_out(vty, "4 %10u %7u %7u %8" PRIu64 " %4d %4zd %8s", peer->as, - peer->open_in + peer->update_in - + peer->keepalive_in + peer->notify_in - + peer->refresh_in - + peer->dynamic_cap_in, - peer->open_out + peer->update_out - + peer->keepalive_out + peer->notify_out - + peer->refresh_out - + peer->dynamic_cap_out, + atomic_load_explicit(&peer->open_in, + memory_order_relaxed) + + atomic_load_explicit( + &peer->update_in, + memory_order_relaxed) + + atomic_load_explicit( + &peer->keepalive_in, + memory_order_relaxed) + + atomic_load_explicit( + &peer->notify_in, + memory_order_relaxed) + + atomic_load_explicit( + &peer->refresh_in, + memory_order_relaxed) + + atomic_load_explicit( + &peer->dynamic_cap_in, + memory_order_relaxed), + atomic_load_explicit(&peer->open_out, + memory_order_relaxed) + + atomic_load_explicit( + &peer->update_out, + memory_order_relaxed) + + atomic_load_explicit( + &peer->keepalive_out, + memory_order_relaxed) + + atomic_load_explicit( + &peer->notify_out, + memory_order_relaxed) + + atomic_load_explicit( + &peer->refresh_out, + memory_order_relaxed) + + atomic_load_explicit( + &peer->dynamic_cap_out, + memory_order_relaxed), peer->version[afi][safi], 0, peer->obuf->count, peer_uptime(peer->uptime, timebuf, BGP_UPTIME_LEN, 0, NULL)); diff --git a/bgpd/bgpd.c b/bgpd/bgpd.c index cfe5d5c67..c034eda24 100644 --- a/bgpd/bgpd.c +++ b/bgpd/bgpd.c @@ -992,10 +992,14 @@ static void peer_free(struct peer *peer) * but just to be sure.. */ bgp_timer_set(peer); - BGP_READ_OFF(peer->t_read); - peer_writes_off(peer); + bgp_reads_off(peer); + bgp_writes_off(peer); + assert(!peer->t_write); + assert(!peer->t_read); BGP_EVENT_FLUSH(peer); + pthread_mutex_destroy(&peer->io_mtx); + /* Free connected nexthop, if present */ if (CHECK_FLAG(peer->flags, PEER_FLAG_CONFIG_NODE) && !peer_dynamic_neighbor(peer)) @@ -1138,11 +1142,11 @@ struct peer *peer_new(struct bgp *bgp) SET_FLAG(peer->sflags, PEER_STATUS_CAPABILITY_OPEN); /* Create buffers. */ - peer->ibuf = stream_new(BGP_MAX_PACKET_SIZE); + peer->ibuf = stream_fifo_new(); peer->obuf = stream_fifo_new(); - pthread_mutex_init(&peer->obuf_mtx, NULL); + pthread_mutex_init(&peer->io_mtx, NULL); - /* We use a larger buffer for peer->work in the event that: + /* We use a larger buffer for peer->obuf_work in the event that: * - We RX a BGP_UPDATE where the attributes alone are just * under BGP_MAX_PACKET_SIZE * - The user configures an outbound route-map that does many as-path @@ -1156,8 +1160,9 @@ struct peer *peer_new(struct bgp *bgp) * bounds * checking for every single attribute as we construct an UPDATE. */ - peer->work = + peer->obuf_work = stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW); + peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE); peer->scratch = stream_new(BGP_MAX_PACKET_SIZE); @@ -2086,6 +2091,11 @@ int peer_delete(struct peer *peer) bgp = peer->bgp; accept_peer = CHECK_FLAG(peer->sflags, PEER_STATUS_ACCEPT_PEER); + bgp_reads_off(peer); + bgp_writes_off(peer); + assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON)); + assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_READS_ON)); + if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT)) peer_nsf_stop(peer); @@ -2147,7 +2157,7 @@ int peer_delete(struct peer *peer) /* Buffers. */ if (peer->ibuf) { - stream_free(peer->ibuf); + stream_fifo_free(peer->ibuf); peer->ibuf = NULL; } @@ -2156,9 +2166,14 @@ int peer_delete(struct peer *peer) peer->obuf = NULL; } - if (peer->work) { - stream_free(peer->work); - peer->work = NULL; + if (peer->ibuf_work) { + stream_free(peer->ibuf_work); + peer->ibuf_work = NULL; + } + + if (peer->obuf_work) { + stream_free(peer->obuf_work); + peer->obuf_work = NULL; } if (peer->scratch) { @@ -7389,20 +7404,24 @@ void bgp_pthreads_init() { frr_pthread_init(); - frr_pthread_new("BGP write thread", PTHREAD_WRITE, peer_writes_start, - peer_writes_stop); + frr_pthread_new("BGP i/o thread", PTHREAD_IO, bgp_io_start, + bgp_io_stop); frr_pthread_new("BGP keepalives thread", PTHREAD_KEEPALIVES, peer_keepalives_start, peer_keepalives_stop); /* pre-run initialization */ peer_keepalives_init(); - peer_writes_init(); + bgp_io_init(); } void bgp_pthreads_run() { - frr_pthread_run(PTHREAD_WRITE, NULL, NULL); - frr_pthread_run(PTHREAD_KEEPALIVES, NULL, NULL); + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setschedpolicy(&attr, SCHED_FIFO); + + frr_pthread_run(PTHREAD_IO, &attr, NULL); + frr_pthread_run(PTHREAD_KEEPALIVES, &attr, NULL); } void bgp_pthreads_finish() diff --git a/bgpd/bgpd.h b/bgpd/bgpd.h index 4fb784e24..208cd897d 100644 --- a/bgpd/bgpd.h +++ b/bgpd/bgpd.h @@ -101,7 +101,7 @@ struct bgp_master { struct thread_master *master; /* BGP pthreads. */ -#define PTHREAD_WRITE (1 << 1) +#define PTHREAD_IO (1 << 1) #define PTHREAD_KEEPALIVES (1 << 2) /* work queues */ @@ -589,13 +589,17 @@ struct peer { struct in_addr local_id; /* Packet receive and send buffer. */ - struct stream *ibuf; - pthread_mutex_t obuf_mtx; - struct stream_fifo *obuf; - struct stream *work; + pthread_mutex_t io_mtx; // guards ibuf, obuf + struct stream_fifo *ibuf; // packets waiting to be processed + struct stream_fifo *obuf; // packets waiting to be written + + struct stream *ibuf_work; // WiP buffer used by bgp_read() only + struct stream *obuf_work; // WiP buffer used to construct packets + + struct stream *curr; // the current packet being parsed /* We use a separate stream to encode MP_REACH_NLRI for efficient - * NLRI packing. peer->work stores all the other attributes. The + * NLRI packing. peer->obuf_work stores all the other attributes. The * actual packet is then constructed by concatenating the two. */ struct stream *scratch; @@ -799,7 +803,9 @@ struct peer { /* Threads. */ struct thread *t_read; + struct thread *t_write; struct thread *t_start; + struct thread *t_connect_check; struct thread *t_connect; struct thread *t_holdtime; struct thread *t_routeadv; @@ -807,11 +813,13 @@ struct peer { struct thread *t_gr_restart; struct thread *t_gr_stale; struct thread *t_generate_updgrp_packets; + struct thread *t_process_packet; /* Thread flags. */ u_int16_t thread_flags; -#define PEER_THREAD_WRITES_ON (1 << 0) -#define PEER_THREAD_KEEPALIVES_ON (1 << 1) +#define PEER_THREAD_WRITES_ON (1 << 1) +#define PEER_THREAD_READS_ON (1 << 2) +#define PEER_THREAD_KEEPALIVES_ON (1 << 3) /* workqueues */ struct work_queue *clear_node_queue; @@ -853,9 +861,6 @@ struct peer { /* Notify data. */ struct bgp_notify notify; - /* Whole packet size to be read. */ - unsigned long packet_size; - /* Filter structure. */ struct bgp_filter filter[AFI_MAX][SAFI_MAX]; @@ -1149,7 +1154,7 @@ enum bgp_clear_type { }; /* Macros. */ -#define BGP_INPUT(P) ((P)->ibuf) +#define BGP_INPUT(P) ((P)->curr) #define BGP_INPUT_PNT(P) (STREAM_PNT(BGP_INPUT(P))) #define BGP_IS_VALID_STATE_FOR_NOTIF(S) \ (((S) == OpenSent) || ((S) == OpenConfirm) || ((S) == Established)) diff --git a/bgpd/rfapi/rfapi.c b/bgpd/rfapi/rfapi.c index 15a29442f..1c342a2ff 100644 --- a/bgpd/rfapi/rfapi.c +++ b/bgpd/rfapi/rfapi.c @@ -1304,18 +1304,29 @@ static int rfapi_open_inner(struct rfapi_descriptor *rfd, struct bgp *bgp, rfd->peer = peer_new(bgp); rfd->peer->status = Established; /* keep bgp core happy */ bgp_sync_delete(rfd->peer); /* don't need these */ - if (rfd->peer->ibuf) { - stream_free(rfd->peer->ibuf); /* don't need it */ + + // since this peer is not on the I/O thread, this lock is not strictly + // necessary, but serves as a reminder to those who may meddle... + pthread_mutex_lock(&rfd->peer->io_mtx); + { + // we don't need any I/O related facilities + if (rfd->peer->ibuf) + stream_fifo_free(rfd->peer->ibuf); + if (rfd->peer->obuf) + stream_fifo_free(rfd->peer->obuf); + + if (rfd->peer->ibuf_work) + stream_free(rfd->peer->ibuf_work); + if (rfd->peer->obuf_work) + stream_free(rfd->peer->obuf_work); + rfd->peer->ibuf = NULL; - } - if (rfd->peer->obuf) { - stream_fifo_free(rfd->peer->obuf); /* don't need it */ rfd->peer->obuf = NULL; + rfd->peer->obuf_work = NULL; + rfd->peer->ibuf_work = NULL; } - if (rfd->peer->work) { - stream_free(rfd->peer->work); /* don't need it */ - rfd->peer->work = NULL; - } + pthread_mutex_unlock(&rfd->peer->io_mtx); + { /* base code assumes have valid host pointer */ char buf[BUFSIZ]; buf[0] = 0; diff --git a/bgpd/rfapi/vnc_zebra.c b/bgpd/rfapi/vnc_zebra.c index 5c71df238..5f7989238 100644 --- a/bgpd/rfapi/vnc_zebra.c +++ b/bgpd/rfapi/vnc_zebra.c @@ -183,22 +183,31 @@ static void vnc_redistribute_add(struct prefix *p, u_int32_t metric, vncHD1VR.peer->status = Established; /* keep bgp core happy */ bgp_sync_delete(vncHD1VR.peer); /* don't need these */ - if (vncHD1VR.peer->ibuf) { - stream_free(vncHD1VR.peer - ->ibuf); /* don't need it */ + + // since this peer is not on the I/O thread, this lock + // is not strictly + // necessary, but serves as a reminder to those who may + // meddle... + pthread_mutex_lock(&vncHD1VR.peer->io_mtx); + { + // we don't need any I/O related facilities + if (vncHD1VR.peer->ibuf) + stream_fifo_free(vncHD1VR.peer->ibuf); + if (vncHD1VR.peer->obuf) + stream_fifo_free(vncHD1VR.peer->obuf); + + if (vncHD1VR.peer->ibuf_work) + stream_free(vncHD1VR.peer->ibuf_work); + if (vncHD1VR.peer->obuf_work) + stream_free(vncHD1VR.peer->obuf_work); + vncHD1VR.peer->ibuf = NULL; - } - if (vncHD1VR.peer->obuf) { - stream_fifo_free( - vncHD1VR.peer - ->obuf); /* don't need it */ vncHD1VR.peer->obuf = NULL; + vncHD1VR.peer->obuf_work = NULL; + vncHD1VR.peer->ibuf_work = NULL; } - if (vncHD1VR.peer->work) { - stream_free(vncHD1VR.peer - ->work); /* don't need it */ - vncHD1VR.peer->work = NULL; - } + pthread_mutex_unlock(&vncHD1VR.peer->io_mtx); + /* base code assumes have valid host pointer */ vncHD1VR.peer->host = XSTRDUP(MTYPE_BGP_PEER_HOST, ".zebra."); |