summaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2017-01-03 16:55:11 +0100
committerDavid S. Miller <davem@davemloft.net>2017-01-03 17:13:05 +0100
commit365ad353c2564bba8835290061308ba825166b3a (patch)
tree06439f0724f3df34e29e8d3fb32432894e6d8ff0 /net/tipc/link.c
parenttipc: modify struct tipc_plist to be more versatile (diff)
downloadlinux-365ad353c2564bba8835290061308ba825166b3a.tar.xz
linux-365ad353c2564bba8835290061308ba825166b3a.zip
tipc: reduce risk of user starvation during link congestion
The socket code currently handles link congestion by either blocking and trying to send again when the congestion has abated, or just returning to the user with -EAGAIN and let him re-try later. This mechanism is prone to starvation, because the wakeup algorithm is non-atomic. During the time the link issues a wakeup signal, until the socket wakes up and re-attempts sending, other senders may have come in between and occupied the free buffer space in the link. This in turn may lead to a socket having to make many send attempts before it is successful. In extremely loaded systems we have observed latency times of several seconds before a low-priority socket is able to send out a message. In this commit, we simplify this mechanism and reduce the risk of the described scenario happening. When a message is attempted sent via a congested link, we now let it be added to the link's backlog queue anyway, thus permitting an oversubscription of one message per source socket. We still create a wakeup item and return an error code, hence instructing the sender to block or stop sending. Only when enough space has been freed up in the link's backlog queue do we issue a wakeup event that allows the sender to continue with the next message, if any. The fact that a socket now can consider a message sent even when the link returns a congestion code means that the sending socket code can be simplified. Also, since this is a good opportunity to get rid of the obsolete 'mtu change' condition in the three socket send functions, we now choose to refactor those functions completely. Signed-off-by: Parthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c75
1 files changed, 32 insertions, 43 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index bda89bf9f4ff..b758ca8b2f79 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -776,60 +776,47 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
/**
* link_schedule_user - schedule a message sender for wakeup after congestion
- * @link: congested link
- * @list: message that was attempted sent
+ * @l: congested link
+ * @hdr: header of message that is being sent
* Create pseudo msg to send back to user when congestion abates
- * Does not consume buffer list
*/
-static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
+static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
{
- struct tipc_msg *msg = buf_msg(skb_peek(list));
- int imp = msg_importance(msg);
- u32 oport = msg_origport(msg);
- u32 addr = tipc_own_addr(link->net);
+ u32 dnode = tipc_own_addr(l->net);
+ u32 dport = msg_origport(hdr);
struct sk_buff *skb;
- /* This really cannot happen... */
- if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
- pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
- return -ENOBUFS;
- }
- /* Non-blocking sender: */
- if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
- return -ELINKCONG;
-
/* Create and schedule wakeup pseudo message */
skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
- addr, addr, oport, 0, 0);
+ dnode, l->addr, dport, 0, 0);
if (!skb)
return -ENOBUFS;
- TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
- TIPC_SKB_CB(skb)->chain_imp = imp;
- skb_queue_tail(&link->wakeupq, skb);
- link->stats.link_congs++;
+ msg_set_dest_droppable(buf_msg(skb), true);
+ TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr);
+ skb_queue_tail(&l->wakeupq, skb);
+ l->stats.link_congs++;
return -ELINKCONG;
}
/**
* link_prepare_wakeup - prepare users for wakeup after congestion
- * @link: congested link
- * Move a number of waiting users, as permitted by available space in
- * the send queue, from link wait queue to node wait queue for wakeup
+ * @l: congested link
+ * Wake up a number of waiting users, as permitted by available space
+ * in the send queue
*/
void link_prepare_wakeup(struct tipc_link *l)
{
- int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
- int imp, lim;
struct sk_buff *skb, *tmp;
+ int imp, i = 0;
skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
imp = TIPC_SKB_CB(skb)->chain_imp;
- lim = l->backlog[imp].limit;
- pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
- if ((pnd[imp] + l->backlog[imp].len) >= lim)
+ if (l->backlog[imp].len < l->backlog[imp].limit) {
+ skb_unlink(skb, &l->wakeupq);
+ skb_queue_tail(l->inputq, skb);
+ } else if (i++ > 10) {
break;
- skb_unlink(skb, &l->wakeupq);
- skb_queue_tail(l->inputq, skb);
+ }
}
}
@@ -869,8 +856,7 @@ void tipc_link_reset(struct tipc_link *l)
* @list: chain of buffers containing message
* @xmitq: returned list of packets to be sent by caller
*
- * Consumes the buffer chain, except when returning -ELINKCONG,
- * since the caller then may want to make more send attempts.
+ * Consumes the buffer chain.
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
*/
@@ -879,7 +865,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
{
struct tipc_msg *hdr = buf_msg(skb_peek(list));
unsigned int maxwin = l->window;
- unsigned int i, imp = msg_importance(hdr);
+ int imp = msg_importance(hdr);
unsigned int mtu = l->mtu;
u16 ack = l->rcv_nxt - 1;
u16 seqno = l->snd_nxt;
@@ -888,19 +874,22 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
struct sk_buff_head *backlogq = &l->backlogq;
struct sk_buff *skb, *_skb, *bskb;
int pkt_cnt = skb_queue_len(list);
+ int rc = 0;
- /* Match msg importance against this and all higher backlog limits: */
- if (!skb_queue_empty(backlogq)) {
- for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
- if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
- return link_schedule_user(l, list);
- }
- }
if (unlikely(msg_size(hdr) > mtu)) {
skb_queue_purge(list);
return -EMSGSIZE;
}
+ /* Allow oversubscription of one data msg per source at congestion */
+ if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) {
+ if (imp == TIPC_SYSTEM_IMPORTANCE) {
+ pr_warn("%s<%s>, link overflow", link_rst_msg, l->name);
+ return -ENOBUFS;
+ }
+ rc = link_schedule_user(l, hdr);
+ }
+
if (pkt_cnt > 1) {
l->stats.sent_fragmented++;
l->stats.sent_fragments += pkt_cnt;
@@ -946,7 +935,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
skb_queue_splice_tail_init(list, backlogq);
}
l->snd_nxt = seqno;
- return 0;
+ return rc;
}
void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)