summaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorKolmakov Dmitriy <kolmakov.dmitriy@huawei.com>2015-09-07 11:05:48 +0200
committerDavid S. Miller <davem@davemloft.net>2015-09-09 07:50:26 +0200
commit7845989cb4b3da1db903918c844fccb9817d34a0 (patch)
tree2dff51039bdf3f6f12ec05013971de05af7434a6 /net/tipc
parentdm9000: fix a typo (diff)
downloadlinux-7845989cb4b3da1db903918c844fccb9817d34a0.tar.xz
linux-7845989cb4b3da1db903918c844fccb9817d34a0.zip
net: tipc: fix stall during bclink wakeup procedure
If an attempt to wake up users of broadcast link is made when there is no enough place in send queue than it may hang up inside the tipc_sk_rcv() function since the loop breaks only after the wake up queue becomes empty. This can lead to complete CPU stall with the following message generated by RCU: INFO: rcu_sched self-detected stall on CPU { 0} (t=2101 jiffies g=54225 c=54224 q=11465) Task dump for CPU 0: tpch R running task 0 39949 39948 0x0000000a ffffffff818536c0 ffff88181fa037a0 ffffffff8106a4be 0000000000000000 ffffffff818536c0 ffff88181fa037c0 ffffffff8106d8a8 ffff88181fa03800 0000000000000001 ffff88181fa037f0 ffffffff81094a50 ffff88181fa15680 Call Trace: <IRQ> [<ffffffff8106a4be>] sched_show_task+0xae/0x120 [<ffffffff8106d8a8>] dump_cpu_task+0x38/0x40 [<ffffffff81094a50>] rcu_dump_cpu_stacks+0x90/0xd0 [<ffffffff81097c3b>] rcu_check_callbacks+0x3eb/0x6e0 [<ffffffff8106e53f>] ? account_system_time+0x7f/0x170 [<ffffffff81099e64>] update_process_times+0x34/0x60 [<ffffffff810a84d1>] tick_sched_handle.isra.18+0x31/0x40 [<ffffffff810a851c>] tick_sched_timer+0x3c/0x70 [<ffffffff8109a43d>] __run_hrtimer.isra.34+0x3d/0xc0 [<ffffffff8109aa95>] hrtimer_interrupt+0xc5/0x1e0 [<ffffffff81030d52>] ? native_smp_send_reschedule+0x42/0x60 [<ffffffff81032f04>] local_apic_timer_interrupt+0x34/0x60 [<ffffffff810335bc>] smp_apic_timer_interrupt+0x3c/0x60 [<ffffffff8165a3fb>] apic_timer_interrupt+0x6b/0x70 [<ffffffff81659129>] ? _raw_spin_unlock_irqrestore+0x9/0x10 [<ffffffff8107eb9f>] __wake_up_sync_key+0x4f/0x60 [<ffffffffa313ddd1>] tipc_write_space+0x31/0x40 [tipc] [<ffffffffa313dadf>] filter_rcv+0x31f/0x520 [tipc] [<ffffffffa313d699>] ? tipc_sk_lookup+0xc9/0x110 [tipc] [<ffffffff81659259>] ? _raw_spin_lock_bh+0x19/0x30 [<ffffffffa314122c>] tipc_sk_rcv+0x2dc/0x3e0 [tipc] [<ffffffffa312e7ff>] tipc_bclink_wakeup_users+0x2f/0x40 [tipc] [<ffffffffa313ce26>] tipc_node_unlock+0x186/0x190 [tipc] [<ffffffff81597c1c>] ? kfree_skb+0x2c/0x40 [<ffffffffa313475c>] tipc_rcv+0x2ac/0x8c0 [tipc] [<ffffffffa312ff58>] tipc_l2_rcv_msg+0x38/0x50 [tipc] [<ffffffff815a76d3>] __netif_receive_skb_core+0x5a3/0x950 [<ffffffff815a98d3>] __netif_receive_skb+0x13/0x60 [<ffffffff815a993e>] netif_receive_skb_internal+0x1e/0x90 [<ffffffff815aa138>] napi_gro_receive+0x78/0xa0 [<ffffffffa07f93f4>] tg3_poll_work+0xc54/0xf40 [tg3] [<ffffffff81597c8c>] ? consume_skb+0x2c/0x40 [<ffffffffa07f9721>] tg3_poll_msix+0x41/0x160 [tg3] [<ffffffff815ab0f2>] net_rx_action+0xe2/0x290 [<ffffffff8104b92a>] __do_softirq+0xda/0x1f0 [<ffffffff8104bc26>] irq_exit+0x76/0xa0 [<ffffffff81004355>] do_IRQ+0x55/0xf0 [<ffffffff8165a12b>] common_interrupt+0x6b/0x6b <EOI> The issue occurs only when tipc_sk_rcv() is used to wake up postponed senders: tipc_bclink_wakeup_users() // wakeupq - is a queue which consists of special // messages with SOCK_WAKEUP type. tipc_sk_rcv(wakeupq) ... while (skb_queue_len(inputq)) { filter_rcv(skb) // Here the type of message is checked // and if it is SOCK_WAKEUP then // it tries to wake up a sender. tipc_write_space(sk) wake_up_interruptible_sync_poll() } After the sender thread is woke up it can gather control and perform an attempt to send a message. But if there is no enough place in send queue it will call link_schedule_user() function which puts a message of type SOCK_WAKEUP to the wakeup queue and put the sender to sleep. Thus the size of the queue actually is not changed and the while() loop never exits. The approach I proposed is to wake up only senders for which there is enough place in send queue so the described issue can't occur. Moreover the same approach is already used to wake up senders on unicast links. I have got into the issue on our product code but to reproduce the issue I changed a benchmark test application (from tipcutils/demos/benchmark) to perform the following scenario: 1. Run 64 instances of test application (nodes). It can be done on the one physical machine. 2. Each application connects to all other using TIPC sockets in RDM mode. 3. When setup is done all nodes start simultaneously send broadcast messages. 4. Everything hangs up. The issue is reproducible only when a congestion on broadcast link occurs. For example, when there are only 8 nodes it works fine since congestion doesn't occur. Send queue limit is 40 in my case (I use a critical importance level) and when 64 nodes send a message at the same moment a congestion occurs every time. Signed-off-by: Dmitry S Kolmakov <kolmakov.dmitriy@huawei.com> Reviewed-by: Jon Maloy <jon.maloy@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c30
1 files changed, 29 insertions, 1 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 8b010c976b2f..41042de3ae9b 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -170,6 +170,30 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
}
/**
+ * bclink_prepare_wakeup - prepare users for wakeup after congestion
+ * @bcl: broadcast link
+ * @resultq: queue for users which can be woken up
+ * Move a number of waiting users, as permitted by available space in
+ * the send queue, from link wait queue to specified queue for wakeup
+ */
+static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq)
+{
+ int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
+ int imp, lim;
+ struct sk_buff *skb, *tmp;
+
+ skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) {
+ imp = TIPC_SKB_CB(skb)->chain_imp;
+ lim = bcl->window + bcl->backlog[imp].limit;
+ pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
+ if ((pnd[imp] + bcl->backlog[imp].len) >= lim)
+ continue;
+ skb_unlink(skb, &bcl->wakeupq);
+ skb_queue_tail(resultq, skb);
+ }
+}
+
+/**
* tipc_bclink_wakeup_users - wake up pending users
*
* Called with no locks taken
@@ -177,8 +201,12 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
void tipc_bclink_wakeup_users(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
+ struct tipc_link *bcl = tn->bcl;
+ struct sk_buff_head resultq;
- tipc_sk_rcv(net, &tn->bclink->link.wakeupq);
+ skb_queue_head_init(&resultq);
+ bclink_prepare_wakeup(bcl, &resultq);
+ tipc_sk_rcv(net, &resultq);
}
/**