summaryrefslogtreecommitdiffstats
path: root/net/tipc/node.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-02-05 14:36:41 +0100
committerDavid S. Miller <davem@davemloft.net>2015-02-06 01:00:02 +0100
commitc637c1035534867b85b78b453c38c495b58e2c5a (patch)
tree77cd2a48a5b04e43b014da64168a6c1e209a1d40 /net/tipc/node.c
parenttipc: use existing sk_write_queue for outgoing packet chain (diff)
downloadlinux-c637c1035534867b85b78b453c38c495b58e2c5a.tar.xz
linux-c637c1035534867b85b78b453c38c495b58e2c5a.zip
tipc: resolve race problem at unicast message reception
TIPC handles message cardinality and sequencing at the link layer, before passing messages upwards to the destination sockets. During the upcall from link to socket no locks are held. It is therefore possible, and we see it happen occasionally, that messages arriving in different threads and delivered in sequence still bypass each other before they reach the destination socket. This must not happen, since it violates the sequentiality guarantee. We solve this by adding a new input buffer queue to the link structure. Arriving messages are added safely to the tail of that queue by the link, while the head of the queue is consumed, also safely, by the receiving socket. Sequentiality is secured per socket by only allowing buffers to be dequeued inside the socket lock. Since there may be multiple simultaneous readers of the queue, we use a 'filter' parameter to reduce the risk that they peek the same buffer from the queue, hence also reducing the risk of contention on the receiving socket locks. This solves the sequentiality problem, and seems to cause no measurable performance degradation. A nice side effect of this change is that lock handling in the functions tipc_rcv() and tipc_bcast_rcv() now becomes uniform, something that will enable future simplifications of those functions. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/node.c')
-rw-r--r--net/tipc/node.c43
1 files changed, 24 insertions, 19 deletions
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 1c409c45f0fe..dcb83d9b2193 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -111,11 +111,8 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
INIT_LIST_HEAD(&n_ptr->list);
INIT_LIST_HEAD(&n_ptr->publ_list);
INIT_LIST_HEAD(&n_ptr->conn_sks);
- skb_queue_head_init(&n_ptr->waiting_sks);
__skb_queue_head_init(&n_ptr->bclink.deferred_queue);
-
hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
-
list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
if (n_ptr->addr < temp_node->addr)
break;
@@ -201,19 +198,22 @@ void tipc_node_abort_sock_conns(struct net *net, struct list_head *conns)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_sock_conn *conn, *safe;
- struct sk_buff *buf;
+ struct sk_buff *skb;
+ struct sk_buff_head skbs;
+ skb_queue_head_init(&skbs);
list_for_each_entry_safe(conn, safe, conns, list) {
- buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
+ skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
TIPC_CONN_MSG, SHORT_H_SIZE, 0,
tn->own_addr, conn->peer_node,
conn->port, conn->peer_port,
TIPC_ERR_NO_NODE);
- if (likely(buf))
- tipc_sk_rcv(net, buf);
+ if (likely(skb))
+ skb_queue_tail(&skbs, skb);
list_del(&conn->list);
kfree(conn);
}
+ tipc_sk_rcv(net, &skbs);
}
/**
@@ -568,37 +568,36 @@ void tipc_node_unlock(struct tipc_node *node)
struct net *net = node->net;
LIST_HEAD(nsub_list);
LIST_HEAD(conn_sks);
- struct sk_buff_head waiting_sks;
u32 addr = 0;
- int flags = node->action_flags;
+ u32 flags = node->action_flags;
u32 link_id = 0;
+ struct sk_buff_head *inputq = node->inputq;
+ struct sk_buff_head *namedq = node->inputq;
- if (likely(!flags)) {
+ if (likely(!flags || (flags == TIPC_MSG_EVT))) {
+ node->action_flags = 0;
spin_unlock_bh(&node->lock);
+ if (flags == TIPC_MSG_EVT)
+ tipc_sk_rcv(net, inputq);
return;
}
addr = node->addr;
link_id = node->link_id;
- __skb_queue_head_init(&waiting_sks);
-
- if (flags & TIPC_WAKEUP_USERS)
- skb_queue_splice_init(&node->waiting_sks, &waiting_sks);
+ namedq = node->namedq;
if (flags & TIPC_NOTIFY_NODE_DOWN) {
list_replace_init(&node->publ_list, &nsub_list);
list_replace_init(&node->conn_sks, &conn_sks);
}
- node->action_flags &= ~(TIPC_WAKEUP_USERS | TIPC_NOTIFY_NODE_DOWN |
+ node->action_flags &= ~(TIPC_MSG_EVT | TIPC_NOTIFY_NODE_DOWN |
TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP |
TIPC_NOTIFY_LINK_DOWN |
- TIPC_WAKEUP_BCAST_USERS);
+ TIPC_WAKEUP_BCAST_USERS |
+ TIPC_NAMED_MSG_EVT);
spin_unlock_bh(&node->lock);
- while (!skb_queue_empty(&waiting_sks))
- tipc_sk_rcv(net, __skb_dequeue(&waiting_sks));
-
if (!list_empty(&conn_sks))
tipc_node_abort_sock_conns(net, &conn_sks);
@@ -618,6 +617,12 @@ void tipc_node_unlock(struct tipc_node *node)
if (flags & TIPC_NOTIFY_LINK_DOWN)
tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
link_id, addr);
+
+ if (flags & TIPC_MSG_EVT)
+ tipc_sk_rcv(net, inputq);
+
+ if (flags & TIPC_NAMED_MSG_EVT)
+ tipc_named_rcv(net, namedq);
}
/* Caller should hold node lock for the passed node */