diff options
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r-- | net/tipc/link.c | 149 |
1 files changed, 120 insertions, 29 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c index 877d94f34814..b36e16cdc945 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -181,7 +181,10 @@ struct tipc_link { u16 acked; struct tipc_link *bc_rcvlink; struct tipc_link *bc_sndlink; - int nack_state; + unsigned long prev_retr; + u16 prev_from; + u16 prev_to; + u8 nack_state; bool bc_peer_is_up; /* Statistics */ @@ -202,6 +205,8 @@ enum { BC_NACK_SND_SUPPRESS, }; +#define TIPC_BC_RETR_LIMIT 10 /* [ms] */ + /* * Interval between NACKs when packets arrive out of order */ @@ -237,8 +242,8 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, u16 rcvgap, int tolerance, int priority, struct sk_buff_head *xmitq); static void link_print(struct tipc_link *l, const char *str); -static void tipc_link_build_nack_msg(struct tipc_link *l, - struct sk_buff_head *xmitq); +static int tipc_link_build_nack_msg(struct tipc_link *l, + struct sk_buff_head *xmitq); static void tipc_link_build_bc_init_msg(struct tipc_link *l, struct sk_buff_head *xmitq); static bool tipc_link_release_pkts(struct tipc_link *l, u16 to); @@ -367,6 +372,18 @@ int tipc_link_bc_peers(struct tipc_link *l) return l->ackers; } +u16 link_bc_rcv_gap(struct tipc_link *l) +{ + struct sk_buff *skb = skb_peek(&l->deferdq); + u16 gap = 0; + + if (more(l->snd_nxt, l->rcv_nxt)) + gap = l->snd_nxt - l->rcv_nxt; + if (skb) + gap = buf_seqno(skb) - l->rcv_nxt; + return gap; +} + void tipc_link_set_mtu(struct tipc_link *l, int mtu) { l->mtu = mtu; @@ -807,7 +824,7 @@ void link_prepare_wakeup(struct tipc_link *l) skb_queue_walk_safe(&l->wakeupq, skb, tmp) { imp = TIPC_SKB_CB(skb)->chain_imp; - lim = l->window + l->backlog[imp].limit; + lim = l->backlog[imp].limit; pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; if ((pnd[imp] + l->backlog[imp].len) >= lim) break; @@ -873,9 +890,11 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, struct sk_buff *skb, *_skb, *bskb; /* Match msg importance against this and all higher backlog limits: */ - for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) { - if (unlikely(l->backlog[i].len >= l->backlog[i].limit)) - return link_schedule_user(l, list); + if (!skb_queue_empty(backlogq)) { + for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) { + if (unlikely(l->backlog[i].len >= l->backlog[i].limit)) + return link_schedule_user(l, list); + } } if (unlikely(msg_size(hdr) > mtu)) { skb_queue_purge(list); @@ -1133,7 +1152,10 @@ int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq) if (((l->rcv_nxt ^ tipc_own_addr(l->net)) & 0xf) != 0xf) return 0; l->rcv_unacked = 0; - return TIPC_LINK_SND_BC_ACK; + + /* Use snd_nxt to store peer's snd_nxt in broadcast rcv link */ + l->snd_nxt = l->rcv_nxt; + return TIPC_LINK_SND_STATE; } /* Unicast ACK */ @@ -1162,17 +1184,26 @@ void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq) } /* tipc_link_build_nack_msg: prepare link nack message for transmission + * Note that sending of broadcast NACK is coordinated among nodes, to + * reduce the risk of NACK storms towards the sender */ -static void tipc_link_build_nack_msg(struct tipc_link *l, - struct sk_buff_head *xmitq) +static int tipc_link_build_nack_msg(struct tipc_link *l, + struct sk_buff_head *xmitq) { u32 def_cnt = ++l->stats.deferred_recv; + int match1, match2; - if (link_is_bc_rcvlink(l)) - return; + if (link_is_bc_rcvlink(l)) { + match1 = def_cnt & 0xf; + match2 = tipc_own_addr(l->net) & 0xf; + if (match1 == match2) + return TIPC_LINK_SND_STATE; + return 0; + } if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); + return 0; } /* tipc_link_rcv - process TIPC packets/messages arriving from off-node @@ -1223,7 +1254,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, /* Defer delivery if sequence gap */ if (unlikely(seqno != rcv_nxt)) { __tipc_skb_queue_sorted(defq, seqno, skb); - tipc_link_build_nack_msg(l, xmitq); + rc |= tipc_link_build_nack_msg(l, xmitq); break; } @@ -1234,7 +1265,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, rc |= tipc_link_input(l, skb, l->inputq); if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) rc |= tipc_link_build_state_msg(l, xmitq); - if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK)) + if (unlikely(rc & ~TIPC_LINK_SND_STATE)) break; } while ((skb = __skb_dequeue(defq))); @@ -1248,10 +1279,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, u16 rcvgap, int tolerance, int priority, struct sk_buff_head *xmitq) { + struct tipc_link *bcl = l->bc_rcvlink; struct sk_buff *skb; struct tipc_msg *hdr; struct sk_buff_head *dfq = &l->deferdq; - bool node_up = link_is_up(l->bc_rcvlink); + bool node_up = link_is_up(bcl); struct tipc_mon_state *mstate = &l->mon_state; int dlen = 0; void *data; @@ -1279,7 +1311,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, msg_set_net_plane(hdr, l->net_plane); msg_set_next_sent(hdr, l->snd_nxt); msg_set_ack(hdr, l->rcv_nxt - 1); - msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1); + msg_set_bcast_ack(hdr, bcl->rcv_nxt - 1); msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1); msg_set_link_tolerance(hdr, tolerance); msg_set_linkprio(hdr, priority); @@ -1289,6 +1321,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, if (mtyp == STATE_MSG) { msg_set_seq_gap(hdr, rcvgap); + msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl)); msg_set_probe(hdr, probe); tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id); msg_set_size(hdr, INT_H_SIZE + dlen); @@ -1571,51 +1604,107 @@ void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr) l->rcv_nxt = peers_snd_nxt; } +/* link_bc_retr eval()- check if the indicated range can be retransmitted now + * - Adjust permitted range if there is overlap with previous retransmission + */ +static bool link_bc_retr_eval(struct tipc_link *l, u16 *from, u16 *to) +{ + unsigned long elapsed = jiffies_to_msecs(jiffies - l->prev_retr); + + if (less(*to, *from)) + return false; + + /* New retransmission request */ + if ((elapsed > TIPC_BC_RETR_LIMIT) || + less(*to, l->prev_from) || more(*from, l->prev_to)) { + l->prev_from = *from; + l->prev_to = *to; + l->prev_retr = jiffies; + return true; + } + + /* Inside range of previous retransmit */ + if (!less(*from, l->prev_from) && !more(*to, l->prev_to)) + return false; + + /* Fully or partially outside previous range => exclude overlap */ + if (less(*from, l->prev_from)) { + *to = l->prev_from - 1; + l->prev_from = *from; + } + if (more(*to, l->prev_to)) { + *from = l->prev_to + 1; + l->prev_to = *to; + } + l->prev_retr = jiffies; + return true; +} + /* tipc_link_bc_sync_rcv - update rcv link according to peer's send state */ -void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, - struct sk_buff_head *xmitq) +int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, + struct sk_buff_head *xmitq) { + struct tipc_link *snd_l = l->bc_sndlink; u16 peers_snd_nxt = msg_bc_snd_nxt(hdr); + u16 from = msg_bcast_ack(hdr) + 1; + u16 to = from + msg_bc_gap(hdr) - 1; + int rc = 0; if (!link_is_up(l)) - return; + return rc; if (!msg_peer_node_is_up(hdr)) - return; + return rc; /* Open when peer ackowledges our bcast init msg (pkt #1) */ if (msg_ack(hdr)) l->bc_peer_is_up = true; if (!l->bc_peer_is_up) - return; + return rc; + + l->stats.recv_nacks++; /* Ignore if peers_snd_nxt goes beyond receive window */ if (more(peers_snd_nxt, l->rcv_nxt + l->window)) - return; + return rc; + + if (link_bc_retr_eval(snd_l, &from, &to)) + rc = tipc_link_retrans(snd_l, from, to, xmitq); + + l->snd_nxt = peers_snd_nxt; + if (link_bc_rcv_gap(l)) + rc |= TIPC_LINK_SND_STATE; + + /* Return now if sender supports nack via STATE messages */ + if (l->peer_caps & TIPC_BCAST_STATE_NACK) + return rc; + + /* Otherwise, be backwards compatible */ if (!more(peers_snd_nxt, l->rcv_nxt)) { l->nack_state = BC_NACK_SND_CONDITIONAL; - return; + return 0; } /* Don't NACK if one was recently sent or peeked */ if (l->nack_state == BC_NACK_SND_SUPPRESS) { l->nack_state = BC_NACK_SND_UNCONDITIONAL; - return; + return 0; } /* Conditionally delay NACK sending until next synch rcv */ if (l->nack_state == BC_NACK_SND_CONDITIONAL) { l->nack_state = BC_NACK_SND_UNCONDITIONAL; if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN) - return; + return 0; } /* Send NACK now but suppress next one */ tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq); l->nack_state = BC_NACK_SND_SUPPRESS; + return 0; } void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, @@ -1652,6 +1741,8 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, } /* tipc_link_bc_nack_rcv(): receive broadcast nack message + * This function is here for backwards compatibility, since + * no BCAST_PROTOCOL/STATE messages occur from TIPC v2.5. */ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *xmitq) @@ -1692,10 +1783,10 @@ void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); l->window = win; - l->backlog[TIPC_LOW_IMPORTANCE].limit = win / 2; - l->backlog[TIPC_MEDIUM_IMPORTANCE].limit = win; - l->backlog[TIPC_HIGH_IMPORTANCE].limit = win / 2 * 3; - l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = win * 2; + l->backlog[TIPC_LOW_IMPORTANCE].limit = max_t(u16, 50, win); + l->backlog[TIPC_MEDIUM_IMPORTANCE].limit = max_t(u16, 100, win * 2); + l->backlog[TIPC_HIGH_IMPORTANCE].limit = max_t(u16, 150, win * 3); + l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = max_t(u16, 200, win * 4); l->backlog[TIPC_SYSTEM_IMPORTANCE].limit = max_bulk; } |