diff options
Diffstat (limited to 'net/mptcp/protocol.c')
-rw-r--r-- | net/mptcp/protocol.c | 178 |
1 files changed, 168 insertions, 10 deletions
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c index 408efbe34753..ad9c73cc20e1 100644 --- a/net/mptcp/protocol.c +++ b/net/mptcp/protocol.c @@ -9,6 +9,8 @@ #include <linux/kernel.h> #include <linux/module.h> #include <linux/netdevice.h> +#include <linux/sched/signal.h> +#include <linux/atomic.h> #include <net/sock.h> #include <net/inet_common.h> #include <net/inet_hashtables.h> @@ -105,6 +107,21 @@ static bool mptcp_ext_cache_refill(struct mptcp_sock *msk) return !!msk->cached_ext; } +static struct sock *mptcp_subflow_recv_lookup(const struct mptcp_sock *msk) +{ + struct mptcp_subflow_context *subflow; + struct sock *sk = (struct sock *)msk; + + sock_owned_by_me(sk); + + mptcp_for_each_subflow(msk, subflow) { + if (subflow->data_avail) + return mptcp_subflow_tcp_sock(subflow); + } + + return NULL; +} + static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, struct msghdr *msg, long *timeo) { @@ -269,13 +286,37 @@ int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb, return copy_len; } +static void mptcp_wait_data(struct sock *sk, long *timeo) +{ + DEFINE_WAIT_FUNC(wait, woken_wake_function); + struct mptcp_sock *msk = mptcp_sk(sk); + + add_wait_queue(sk_sleep(sk), &wait); + sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk); + + sk_wait_event(sk, timeo, + test_and_clear_bit(MPTCP_DATA_READY, &msk->flags), &wait); + + sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk); + remove_wait_queue(sk_sleep(sk), &wait); +} + static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock, int flags, int *addr_len) { struct mptcp_sock *msk = mptcp_sk(sk); + struct mptcp_subflow_context *subflow; + bool more_data_avail = false; + struct mptcp_read_arg arg; + read_descriptor_t desc; + bool wait_data = false; struct socket *ssock; + struct tcp_sock *tp; + bool done = false; struct sock *ssk; int copied = 0; + int target; + long timeo; if (msg->msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT)) return -EOPNOTSUPP; @@ -290,16 +331,124 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, return copied; } - ssk = mptcp_subflow_get(msk); - if (!ssk) { - release_sock(sk); - return -ENOTCONN; + arg.msg = msg; + desc.arg.data = &arg; + desc.error = 0; + + timeo = sock_rcvtimeo(sk, nonblock); + + len = min_t(size_t, len, INT_MAX); + target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); + + while (!done) { + u32 map_remaining; + int bytes_read; + + ssk = mptcp_subflow_recv_lookup(msk); + pr_debug("msk=%p ssk=%p", msk, ssk); + if (!ssk) + goto wait_for_data; + + subflow = mptcp_subflow_ctx(ssk); + tp = tcp_sk(ssk); + + lock_sock(ssk); + do { + /* try to read as much data as available */ + map_remaining = subflow->map_data_len - + mptcp_subflow_get_map_offset(subflow); + desc.count = min_t(size_t, len - copied, map_remaining); + pr_debug("reading %zu bytes, copied %d", desc.count, + copied); + bytes_read = tcp_read_sock(ssk, &desc, + mptcp_read_actor); + if (bytes_read < 0) { + if (!copied) + copied = bytes_read; + done = true; + goto next; + } + + pr_debug("msk ack_seq=%llx -> %llx", msk->ack_seq, + msk->ack_seq + bytes_read); + msk->ack_seq += bytes_read; + copied += bytes_read; + if (copied >= len) { + done = true; + goto next; + } + if (tp->urg_data && tp->urg_seq == tp->copied_seq) { + pr_err("Urgent data present, cannot proceed"); + done = true; + goto next; + } +next: + more_data_avail = mptcp_subflow_data_available(ssk); + } while (more_data_avail && !done); + release_sock(ssk); + continue; + +wait_for_data: + more_data_avail = false; + + /* only the master socket status is relevant here. The exit + * conditions mirror closely tcp_recvmsg() + */ + if (copied >= target) + break; + + if (copied) { + if (sk->sk_err || + sk->sk_state == TCP_CLOSE || + (sk->sk_shutdown & RCV_SHUTDOWN) || + !timeo || + signal_pending(current)) + break; + } else { + if (sk->sk_err) { + copied = sock_error(sk); + break; + } + + if (sk->sk_shutdown & RCV_SHUTDOWN) + break; + + if (sk->sk_state == TCP_CLOSE) { + copied = -ENOTCONN; + break; + } + + if (!timeo) { + copied = -EAGAIN; + break; + } + + if (signal_pending(current)) { + copied = sock_intr_errno(timeo); + break; + } + } + + pr_debug("block timeout %ld", timeo); + wait_data = true; + mptcp_wait_data(sk, &timeo); } - copied = sock_recvmsg(ssk->sk_socket, msg, flags); + if (more_data_avail) { + if (!test_bit(MPTCP_DATA_READY, &msk->flags)) + set_bit(MPTCP_DATA_READY, &msk->flags); + } else if (!wait_data) { + clear_bit(MPTCP_DATA_READY, &msk->flags); - release_sock(sk); + /* .. race-breaker: ssk might get new data after last + * data_available() returns false. + */ + ssk = mptcp_subflow_recv_lookup(msk); + if (unlikely(ssk)) + set_bit(MPTCP_DATA_READY, &msk->flags); + } + release_sock(sk); return copied; } @@ -460,10 +609,6 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, msk->write_seq = subflow->idsn + 1; ack_seq++; msk->ack_seq = ack_seq; - subflow->map_seq = ack_seq; - subflow->map_subflow_seq = 1; - subflow->rel_write_seq = 1; - subflow->tcp_sock = ssk; newsk = new_mptcp_sock; mptcp_copy_inaddrs(newsk, ssk); list_add(&subflow->node, &msk->conn_list); @@ -475,6 +620,19 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, bh_unlock_sock(new_mptcp_sock); local_bh_enable(); release_sock(sk); + + /* the subflow can already receive packet, avoid racing with + * the receive path and process the pending ones + */ + lock_sock(ssk); + subflow->map_seq = ack_seq; + subflow->map_subflow_seq = 1; + subflow->rel_write_seq = 1; + subflow->tcp_sock = ssk; + subflow->conn = new_mptcp_sock; + if (unlikely(!skb_queue_empty(&ssk->sk_receive_queue))) + mptcp_subflow_data_available(ssk); + release_sock(ssk); } return newsk; |