diff options
author | Jon Paul Maloy <jon.maloy@ericsson.com> | 2015-02-05 14:36:41 +0100 |
---|---|---|
committer | David S. Miller <davem@davemloft.net> | 2015-02-06 01:00:02 +0100 |
commit | c637c1035534867b85b78b453c38c495b58e2c5a (patch) | |
tree | 77cd2a48a5b04e43b014da64168a6c1e209a1d40 /net/tipc/socket.c | |
parent | tipc: use existing sk_write_queue for outgoing packet chain (diff) | |
download | linux-c637c1035534867b85b78b453c38c495b58e2c5a.tar.xz linux-c637c1035534867b85b78b453c38c495b58e2c5a.zip |
tipc: resolve race problem at unicast message reception
TIPC handles message cardinality and sequencing at the link layer,
before passing messages upwards to the destination sockets. During the
upcall from link to socket no locks are held. It is therefore possible,
and we see it happen occasionally, that messages arriving in different
threads and delivered in sequence still bypass each other before they
reach the destination socket. This must not happen, since it violates
the sequentiality guarantee.
We solve this by adding a new input buffer queue to the link structure.
Arriving messages are added safely to the tail of that queue by the
link, while the head of the queue is consumed, also safely, by the
receiving socket. Sequentiality is secured per socket by only allowing
buffers to be dequeued inside the socket lock. Since there may be multiple
simultaneous readers of the queue, we use a 'filter' parameter to reduce
the risk that they peek the same buffer from the queue, hence also
reducing the risk of contention on the receiving socket locks.
This solves the sequentiality problem, and seems to cause no measurable
performance degradation.
A nice side effect of this change is that lock handling in the functions
tipc_rcv() and tipc_bcast_rcv() now becomes uniform, something that
will enable future simplifications of those functions.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r-- | net/tipc/socket.c | 132 |
1 files changed, 85 insertions, 47 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 611a04fb0ddc..c1a4611649ab 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -41,6 +41,7 @@ #include "node.h" #include "link.h" #include "config.h" +#include "name_distr.h" #include "socket.h" #define SS_LISTENING -1 /* socket is listening */ @@ -785,10 +786,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf) struct sk_buff *b; uint i, last, dst = 0; u32 scope = TIPC_CLUSTER_SCOPE; + struct sk_buff_head msgs; if (in_own_node(net, msg_orignode(msg))) scope = TIPC_NODE_SCOPE; + if (unlikely(!msg_mcast(msg))) { + pr_warn("Received non-multicast msg in multicast\n"); + kfree_skb(buf); + goto exit; + } /* Create destination port list: */ tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), msg_nameupper(msg), scope, &dports); @@ -806,9 +813,12 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf) continue; } msg_set_destport(msg, item->ports[i]); - tipc_sk_rcv(net, b); + skb_queue_head_init(&msgs); + skb_queue_tail(&msgs, b); + tipc_sk_rcv(net, &msgs); } } +exit: tipc_port_list_free(&dports); } @@ -1760,71 +1770,99 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) } /** - * tipc_sk_enqueue_skb - enqueue buffer to socket or backlog queue - * @sk: socket - * @skb: pointer to message. Set to NULL if buffer is consumed. - * @dnode: if buffer should be forwarded/returned, send to this node + * tipc_sk_enqueue - extract all buffers with destination 'dport' from + * inputq and try adding them to socket or backlog queue + * @inputq: list of incoming buffers with potentially different destinations + * @sk: socket where the buffers should be enqueued + * @dport: port number for the socket + * @_skb: returned buffer to be forwarded or rejected, if applicable * * Caller must hold socket lock * - * Returns TIPC_OK (0) or -tipc error code + * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD + * or -TIPC_ERR_NO_PORT */ -static int tipc_sk_enqueue_skb(struct sock *sk, struct sk_buff **skb) +static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, + u32 dport, struct sk_buff **_skb) { unsigned int lim; atomic_t *dcnt; - - if (unlikely(!*skb)) - return TIPC_OK; - if (!sock_owned_by_user(sk)) - return filter_rcv(sk, skb); - dcnt = &tipc_sk(sk)->dupl_rcvcnt; - if (sk->sk_backlog.len) - atomic_set(dcnt, 0); - lim = rcvbuf_limit(sk, *skb) + atomic_read(dcnt); - if (unlikely(sk_add_backlog(sk, *skb, lim))) + int err; + struct sk_buff *skb; + unsigned long time_limit = jiffies + 2; + + while (skb_queue_len(inputq)) { + skb = tipc_skb_dequeue(inputq, dport); + if (unlikely(!skb)) + return TIPC_OK; + /* Return if softirq window exhausted */ + if (unlikely(time_after_eq(jiffies, time_limit))) + return TIPC_OK; + if (!sock_owned_by_user(sk)) { + err = filter_rcv(sk, &skb); + if (likely(!skb)) + continue; + *_skb = skb; + return err; + } + dcnt = &tipc_sk(sk)->dupl_rcvcnt; + if (sk->sk_backlog.len) + atomic_set(dcnt, 0); + lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); + if (likely(!sk_add_backlog(sk, skb, lim))) + continue; + *_skb = skb; return -TIPC_ERR_OVERLOAD; - *skb = NULL; + } return TIPC_OK; } /** - * tipc_sk_rcv - handle incoming message - * @skb: buffer containing arriving message - * Consumes buffer - * Returns 0 if success, or errno: -EHOSTUNREACH + * tipc_sk_rcv - handle a chain of incoming buffers + * @inputq: buffer list containing the buffers + * Consumes all buffers in list until inputq is empty + * Note: may be called in multiple threads referring to the same queue + * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH + * Only node local calls check the return value, sending single-buffer queues */ -int tipc_sk_rcv(struct net *net, struct sk_buff *skb) +int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) { + u32 dnode, dport = 0; + int err = -TIPC_ERR_NO_PORT; + struct sk_buff *skb; struct tipc_sock *tsk; struct tipc_net *tn; struct sock *sk; - u32 dport = msg_destport(buf_msg(skb)); - int err = -TIPC_ERR_NO_PORT; - u32 dnode; - /* Find destination */ - tsk = tipc_sk_lookup(net, dport); - if (likely(tsk)) { - sk = &tsk->sk; - spin_lock_bh(&sk->sk_lock.slock); - err = tipc_sk_enqueue_skb(sk, &skb); - spin_unlock_bh(&sk->sk_lock.slock); - sock_put(sk); - } - if (likely(!skb)) - return 0; - if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) - goto xmit; - if (!err) { - dnode = msg_destnode(buf_msg(skb)); - goto xmit; - } - tn = net_generic(net, tipc_net_id); - if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) - return -EHOSTUNREACH; + while (skb_queue_len(inputq)) { + skb = NULL; + dport = tipc_skb_peek_port(inputq, dport); + tsk = tipc_sk_lookup(net, dport); + if (likely(tsk)) { + sk = &tsk->sk; + if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { + err = tipc_sk_enqueue(inputq, sk, dport, &skb); + spin_unlock_bh(&sk->sk_lock.slock); + dport = 0; + } + sock_put(sk); + } else { + skb = tipc_skb_dequeue(inputq, dport); + } + if (likely(!skb)) + continue; + if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) + goto xmit; + if (!err) { + dnode = msg_destnode(buf_msg(skb)); + goto xmit; + } + tn = net_generic(net, tipc_net_id); + if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) + continue; xmit: - tipc_link_xmit_skb(net, skb, dnode, dport); + tipc_link_xmit_skb(net, skb, dnode, dport); + } return err ? -EHOSTUNREACH : 0; } |