diff options
Diffstat (limited to 'net/mptcp/protocol.c')
-rw-r--r-- | net/mptcp/protocol.c | 1813 |
1 files changed, 1232 insertions, 581 deletions
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c index 88f2a7a0ccb8..b812aaae8044 100644 --- a/net/mptcp/protocol.c +++ b/net/mptcp/protocol.c @@ -21,6 +21,7 @@ #include <net/transp_v6.h> #endif #include <net/mptcp.h> +#include <net/xfrm.h> #include "protocol.h" #include "mib.h" @@ -41,6 +42,9 @@ struct mptcp_skb_cb { static struct percpu_counter mptcp_sockets_allocated; +static void __mptcp_destroy_sock(struct sock *sk); +static void __mptcp_check_send_data_fin(struct sock *sk); + /* If msk has an initial subflow socket, and the MP_CAPABLE handshake has not * completed yet or has failed, return the subflow socket. * Otherwise return NULL. @@ -53,6 +57,12 @@ static struct socket *__mptcp_nmpc_socket(const struct mptcp_sock *msk) return msk->subflow; } +/* Returns end sequence number of the receiver's advertised window */ +static u64 mptcp_wnd_end(const struct mptcp_sock *msk) +{ + return READ_ONCE(msk->wnd_end); +} + static bool mptcp_is_tcpsk(struct sock *sk) { struct socket *sock = sk->sk_socket; @@ -102,6 +112,7 @@ static int __mptcp_socket_create(struct mptcp_sock *msk) msk->subflow = ssock; subflow = mptcp_subflow_ctx(ssock->sk); list_add(&subflow->node, &msk->conn_list); + sock_hold(ssock->sk); subflow->request_mptcp = 1; /* accept() will wait on first subflow sk_wq, and we always wakes up @@ -157,18 +168,19 @@ static void mptcp_data_queue_ofo(struct mptcp_sock *msk, struct sk_buff *skb) struct rb_node **p, *parent; u64 seq, end_seq, max_seq; struct sk_buff *skb1; - int space; seq = MPTCP_SKB_CB(skb)->map_seq; end_seq = MPTCP_SKB_CB(skb)->end_seq; - space = tcp_space(sk); - max_seq = space > 0 ? space + msk->ack_seq : msk->ack_seq; + max_seq = READ_ONCE(msk->rcv_wnd_sent); pr_debug("msk=%p seq=%llx limit=%llx empty=%d", msk, seq, max_seq, RB_EMPTY_ROOT(&msk->out_of_order_queue)); - if (after64(seq, max_seq)) { + if (after64(end_seq, max_seq)) { /* out of window */ mptcp_drop(sk, skb); + pr_debug("oow by %lld, rcv_wnd_sent %llu\n", + (unsigned long long)end_seq - (unsigned long)max_seq, + (unsigned long long)msk->rcv_wnd_sent); MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_NODSSWINDOW); return; } @@ -323,17 +335,35 @@ static void mptcp_stop_timer(struct sock *sk) mptcp_sk(sk)->timer_ival = 0; } -static void mptcp_check_data_fin_ack(struct sock *sk) +static void mptcp_close_wake_up(struct sock *sk) +{ + if (sock_flag(sk, SOCK_DEAD)) + return; + + sk->sk_state_change(sk); + if (sk->sk_shutdown == SHUTDOWN_MASK || + sk->sk_state == TCP_CLOSE) + sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); + else + sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); +} + +static bool mptcp_pending_data_fin_ack(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); - if (__mptcp_check_fallback(msk)) - return; + return !__mptcp_check_fallback(msk) && + ((1 << sk->sk_state) & + (TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) && + msk->write_seq == READ_ONCE(msk->snd_una); +} + +static void mptcp_check_data_fin_ack(struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); /* Look for an acknowledged DATA_FIN */ - if (((1 << sk->sk_state) & - (TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) && - msk->write_seq == atomic64_read(&msk->snd_una)) { + if (mptcp_pending_data_fin_ack(sk)) { mptcp_stop_timer(sk); WRITE_ONCE(msk->snd_data_fin_enable, 0); @@ -341,20 +371,14 @@ static void mptcp_check_data_fin_ack(struct sock *sk) switch (sk->sk_state) { case TCP_FIN_WAIT1: inet_sk_state_store(sk, TCP_FIN_WAIT2); - sk->sk_state_change(sk); break; case TCP_CLOSING: case TCP_LAST_ACK: inet_sk_state_store(sk, TCP_CLOSE); - sk->sk_state_change(sk); break; } - if (sk->sk_shutdown == SHUTDOWN_MASK || - sk->sk_state == TCP_CLOSE) - sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); - else - sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); + mptcp_close_wake_up(sk); } } @@ -388,13 +412,79 @@ static void mptcp_set_timeout(const struct sock *sk, const struct sock *ssk) mptcp_sk(sk)->timer_ival = tout > 0 ? tout : TCP_RTO_MIN; } -static void mptcp_check_data_fin(struct sock *sk) +static bool mptcp_subflow_active(struct mptcp_subflow_context *subflow) +{ + struct sock *ssk = mptcp_subflow_tcp_sock(subflow); + + /* can't send if JOIN hasn't completed yet (i.e. is usable for mptcp) */ + if (subflow->request_join && !subflow->fully_established) + return false; + + /* only send if our side has not closed yet */ + return ((1 << ssk->sk_state) & (TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)); +} + +static bool tcp_can_send_ack(const struct sock *ssk) +{ + return !((1 << inet_sk_state_load(ssk)) & + (TCPF_SYN_SENT | TCPF_SYN_RECV | TCPF_TIME_WAIT | TCPF_CLOSE)); +} + +static void mptcp_send_ack(struct mptcp_sock *msk) +{ + struct mptcp_subflow_context *subflow; + + mptcp_for_each_subflow(msk, subflow) { + struct sock *ssk = mptcp_subflow_tcp_sock(subflow); + + lock_sock(ssk); + if (tcp_can_send_ack(ssk)) + tcp_send_ack(ssk); + release_sock(ssk); + } +} + +static bool mptcp_subflow_cleanup_rbuf(struct sock *ssk) +{ + int ret; + + lock_sock(ssk); + ret = tcp_can_send_ack(ssk); + if (ret) + tcp_cleanup_rbuf(ssk, 1); + release_sock(ssk); + return ret; +} + +static void mptcp_cleanup_rbuf(struct mptcp_sock *msk) +{ + struct sock *ack_hint = READ_ONCE(msk->ack_hint); + struct mptcp_subflow_context *subflow; + + /* if the hinted ssk is still active, try to use it */ + if (likely(ack_hint)) { + mptcp_for_each_subflow(msk, subflow) { + struct sock *ssk = mptcp_subflow_tcp_sock(subflow); + + if (ack_hint == ssk && mptcp_subflow_cleanup_rbuf(ssk)) + return; + } + } + + /* otherwise pick the first active subflow */ + mptcp_for_each_subflow(msk, subflow) + if (mptcp_subflow_cleanup_rbuf(mptcp_subflow_tcp_sock(subflow))) + return; +} + +static bool mptcp_check_data_fin(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); u64 rcv_data_fin_seq; + bool ret = false; if (__mptcp_check_fallback(msk) || !msk->first) - return; + return ret; /* Need to ack a DATA_FIN received from a peer while this side * of the connection is in ESTABLISHED, FIN_WAIT1, or FIN_WAIT2. @@ -410,8 +500,6 @@ static void mptcp_check_data_fin(struct sock *sk) */ if (mptcp_pending_data_fin(sk, &rcv_data_fin_seq)) { - struct mptcp_subflow_context *subflow; - WRITE_ONCE(msk->ack_seq, msk->ack_seq + 1); WRITE_ONCE(msk->rcv_data_fin, 0); @@ -428,7 +516,6 @@ static void mptcp_check_data_fin(struct sock *sk) break; case TCP_FIN_WAIT2: inet_sk_state_store(sk, TCP_CLOSE); - // @@ Close subflows now? break; default: /* Other states not expected */ @@ -436,23 +523,12 @@ static void mptcp_check_data_fin(struct sock *sk) break; } + ret = true; mptcp_set_timeout(sk, NULL); - mptcp_for_each_subflow(msk, subflow) { - struct sock *ssk = mptcp_subflow_tcp_sock(subflow); - - lock_sock(ssk); - tcp_send_ack(ssk); - release_sock(ssk); - } - - sk->sk_state_change(sk); - - if (sk->sk_shutdown == SHUTDOWN_MASK || - sk->sk_state == TCP_CLOSE) - sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); - else - sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); + mptcp_send_ack(msk); + mptcp_close_wake_up(sk); } + return ret; } static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk, @@ -464,12 +540,22 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk, unsigned int moved = 0; bool more_data_avail; struct tcp_sock *tp; - u32 old_copied_seq; bool done = false; + int sk_rbuf; + + sk_rbuf = READ_ONCE(sk->sk_rcvbuf); + + if (!(sk->sk_userlocks & SOCK_RCVBUF_LOCK)) { + int ssk_rbuf = READ_ONCE(ssk->sk_rcvbuf); + + if (unlikely(ssk_rbuf > sk_rbuf)) { + WRITE_ONCE(sk->sk_rcvbuf, ssk_rbuf); + sk_rbuf = ssk_rbuf; + } + } pr_debug("msk=%p ssk=%p", msk, ssk); tp = tcp_sk(ssk); - old_copied_seq = tp->copied_seq; do { u32 map_remaining, offset; u32 seq = tp->copied_seq; @@ -528,20 +614,18 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk, WRITE_ONCE(tp->copied_seq, seq); more_data_avail = mptcp_subflow_data_available(ssk); - if (atomic_read(&sk->sk_rmem_alloc) > READ_ONCE(sk->sk_rcvbuf)) { + if (atomic_read(&sk->sk_rmem_alloc) > sk_rbuf) { done = true; break; } } while (more_data_avail); + WRITE_ONCE(msk->ack_hint, ssk); *bytes += moved; - if (tp->copied_seq != old_copied_seq) - tcp_cleanup_rbuf(ssk, 1); - return done; } -static bool mptcp_ofo_queue(struct mptcp_sock *msk) +static bool __mptcp_ofo_queue(struct mptcp_sock *msk) { struct sock *sk = (struct sock *)msk; struct sk_buff *skb, *tail; @@ -587,43 +671,43 @@ static bool mptcp_ofo_queue(struct mptcp_sock *msk) /* In most cases we will be able to lock the mptcp socket. If its already * owned, we need to defer to the work queue to avoid ABBA deadlock. */ -static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk) +static void move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk) { struct sock *sk = (struct sock *)msk; unsigned int moved = 0; - if (READ_ONCE(sk->sk_lock.owned)) - return false; - - if (unlikely(!spin_trylock_bh(&sk->sk_lock.slock))) - return false; - - /* must re-check after taking the lock */ - if (!READ_ONCE(sk->sk_lock.owned)) { - __mptcp_move_skbs_from_subflow(msk, ssk, &moved); - mptcp_ofo_queue(msk); + if (inet_sk_state_load(sk) == TCP_CLOSE) + return; - /* If the moves have caught up with the DATA_FIN sequence number - * it's time to ack the DATA_FIN and change socket state, but - * this is not a good place to change state. Let the workqueue - * do it. - */ - if (mptcp_pending_data_fin(sk, NULL) && - schedule_work(&msk->work)) - sock_hold(sk); - } + mptcp_data_lock(sk); - spin_unlock_bh(&sk->sk_lock.slock); + __mptcp_move_skbs_from_subflow(msk, ssk, &moved); + __mptcp_ofo_queue(msk); - return moved > 0; + /* If the moves have caught up with the DATA_FIN sequence number + * it's time to ack the DATA_FIN and change socket state, but + * this is not a good place to change state. Let the workqueue + * do it. + */ + if (mptcp_pending_data_fin(sk, NULL)) + mptcp_schedule_work(sk); + mptcp_data_unlock(sk); } void mptcp_data_ready(struct sock *sk, struct sock *ssk) { struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk); struct mptcp_sock *msk = mptcp_sk(sk); + int sk_rbuf, ssk_rbuf; bool wake; + /* The peer can send data while we are shutting down this + * subflow at msk destruction time, but we must avoid enqueuing + * more data to the msk receive queue + */ + if (unlikely(subflow->disposable)) + return; + /* move_skbs_to_msk below can legitly clear the data_avail flag, * but we will need later to properly woke the reader, cache its * value @@ -632,30 +716,23 @@ void mptcp_data_ready(struct sock *sk, struct sock *ssk) if (wake) set_bit(MPTCP_DATA_READY, &msk->flags); - if (atomic_read(&sk->sk_rmem_alloc) < READ_ONCE(sk->sk_rcvbuf) && - move_skbs_to_msk(msk, ssk)) - goto wake; + ssk_rbuf = READ_ONCE(ssk->sk_rcvbuf); + sk_rbuf = READ_ONCE(sk->sk_rcvbuf); + if (unlikely(ssk_rbuf > sk_rbuf)) + sk_rbuf = ssk_rbuf; - /* don't schedule if mptcp sk is (still) over limit */ - if (atomic_read(&sk->sk_rmem_alloc) > READ_ONCE(sk->sk_rcvbuf)) + /* over limit? can't append more skbs to msk */ + if (atomic_read(&sk->sk_rmem_alloc) > sk_rbuf) goto wake; - /* mptcp socket is owned, release_cb should retry */ - if (!test_and_set_bit(TCP_DELACK_TIMER_DEFERRED, - &sk->sk_tsq_flags)) { - sock_hold(sk); + move_skbs_to_msk(msk, ssk); - /* need to try again, its possible release_cb() has already - * been called after the test_and_set_bit() above. - */ - move_skbs_to_msk(msk, ssk); - } wake: if (wake) sk->sk_data_ready(sk); } -static void __mptcp_flush_join_list(struct mptcp_sock *msk) +void __mptcp_flush_join_list(struct mptcp_sock *msk) { if (likely(list_empty(&msk->join_list))) return; @@ -675,6 +752,10 @@ static void mptcp_reset_timer(struct sock *sk) struct inet_connection_sock *icsk = inet_csk(sk); unsigned long tout; + /* prevent rescheduling on close */ + if (unlikely(inet_sk_state_load(sk) == TCP_CLOSE)) + return; + /* should never be called with mptcp level timer cleared */ tout = READ_ONCE(mptcp_sk(sk)->timer_ival); if (WARN_ON_ONCE(!tout)) @@ -682,23 +763,23 @@ static void mptcp_reset_timer(struct sock *sk) sk_reset_timer(sk, &icsk->icsk_retransmit_timer, jiffies + tout); } -void mptcp_data_acked(struct sock *sk) +bool mptcp_schedule_work(struct sock *sk) { - mptcp_reset_timer(sk); - - if ((!test_bit(MPTCP_SEND_SPACE, &mptcp_sk(sk)->flags) || - (inet_sk_state_load(sk) != TCP_ESTABLISHED)) && - schedule_work(&mptcp_sk(sk)->work)) + if (inet_sk_state_load(sk) != TCP_CLOSE && + schedule_work(&mptcp_sk(sk)->work)) { + /* each subflow already holds a reference to the sk, and the + * workqueue is invoked by a subflow, so sk can't go away here. + */ sock_hold(sk); + return true; + } + return false; } void mptcp_subflow_eof(struct sock *sk) { - struct mptcp_sock *msk = mptcp_sk(sk); - - if (!test_and_set_bit(MPTCP_WORK_EOF, &msk->flags) && - schedule_work(&msk->work)) - sock_hold(sk); + if (!test_and_set_bit(MPTCP_WORK_EOF, &mptcp_sk(sk)->flags)) + mptcp_schedule_work(sk); } static void mptcp_check_for_eof(struct mptcp_sock *msk) @@ -709,8 +790,10 @@ static void mptcp_check_for_eof(struct mptcp_sock *msk) mptcp_for_each_subflow(msk, subflow) receivers += !subflow->rx_eof; + if (receivers) + return; - if (!receivers && !(sk->sk_shutdown & RCV_SHUTDOWN)) { + if (!(sk->sk_shutdown & RCV_SHUTDOWN)) { /* hopefully temporary hack: propagate shutdown status * to msk, when all subflows agree on it */ @@ -720,16 +803,21 @@ static void mptcp_check_for_eof(struct mptcp_sock *msk) set_bit(MPTCP_DATA_READY, &msk->flags); sk->sk_data_ready(sk); } -} - -static bool mptcp_ext_cache_refill(struct mptcp_sock *msk) -{ - const struct sock *sk = (const struct sock *)msk; - if (!msk->cached_ext) - msk->cached_ext = __skb_ext_alloc(sk->sk_allocation); - - return !!msk->cached_ext; + switch (sk->sk_state) { + case TCP_ESTABLISHED: + inet_sk_state_store(sk, TCP_CLOSE_WAIT); + break; + case TCP_FIN_WAIT1: + inet_sk_state_store(sk, TCP_CLOSING); + break; + case TCP_FIN_WAIT2: + inet_sk_state_store(sk, TCP_CLOSE); + break; + default: + return; + } + mptcp_close_wake_up(sk); } static struct sock *mptcp_subflow_recv_lookup(const struct mptcp_sock *msk) @@ -754,8 +842,11 @@ static bool mptcp_skb_can_collapse_to(u64 write_seq, if (!tcp_skb_can_collapse_to(skb)) return false; - /* can collapse only if MPTCP level sequence is in order */ - return mpext && mpext->data_seq + mpext->data_len == write_seq; + /* can collapse only if MPTCP level sequence is in order and this + * mapping has not been xmitted yet + */ + return mpext && mpext->data_seq + mpext->data_len == write_seq && + !mpext->frozen; } static bool mptcp_frag_can_collapse_to(const struct mptcp_sock *msk, @@ -763,9 +854,125 @@ static bool mptcp_frag_can_collapse_to(const struct mptcp_sock *msk, const struct mptcp_data_frag *df) { return df && pfrag->page == df->page && + pfrag->size - pfrag->offset > 0 && df->data_seq + df->data_len == msk->write_seq; } +static int mptcp_wmem_with_overhead(struct sock *sk, int size) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + int ret, skbs; + + ret = size + ((sizeof(struct mptcp_data_frag) * size) >> PAGE_SHIFT); + skbs = (msk->tx_pending_data + size) / msk->size_goal_cache; + if (skbs < msk->skb_tx_cache.qlen) + return ret; + + return ret + (skbs - msk->skb_tx_cache.qlen) * SKB_TRUESIZE(MAX_TCP_HEADER); +} + +static void __mptcp_wmem_reserve(struct sock *sk, int size) +{ + int amount = mptcp_wmem_with_overhead(sk, size); + struct mptcp_sock *msk = mptcp_sk(sk); + + WARN_ON_ONCE(msk->wmem_reserved); + if (amount <= sk->sk_forward_alloc) + goto reserve; + + /* under memory pressure try to reserve at most a single page + * otherwise try to reserve the full estimate and fallback + * to a single page before entering the error path + */ + if ((tcp_under_memory_pressure(sk) && amount > PAGE_SIZE) || + !sk_wmem_schedule(sk, amount)) { + if (amount <= PAGE_SIZE) + goto nomem; + + amount = PAGE_SIZE; + if (!sk_wmem_schedule(sk, amount)) + goto nomem; + } + +reserve: + msk->wmem_reserved = amount; + sk->sk_forward_alloc -= amount; + return; + +nomem: + /* we will wait for memory on next allocation */ + msk->wmem_reserved = -1; +} + +static void __mptcp_update_wmem(struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + + if (!msk->wmem_reserved) + return; + + if (msk->wmem_reserved < 0) + msk->wmem_reserved = 0; + if (msk->wmem_reserved > 0) { + sk->sk_forward_alloc += msk->wmem_reserved; + msk->wmem_reserved = 0; + } +} + +static bool mptcp_wmem_alloc(struct sock *sk, int size) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + + /* check for pre-existing error condition */ + if (msk->wmem_reserved < 0) + return false; + + if (msk->wmem_reserved >= size) + goto account; + + mptcp_data_lock(sk); + if (!sk_wmem_schedule(sk, size)) { + mptcp_data_unlock(sk); + return false; + } + + sk->sk_forward_alloc -= size; + msk->wmem_reserved += size; + mptcp_data_unlock(sk); + +account: + msk->wmem_reserved -= size; + return true; +} + +static void mptcp_wmem_uncharge(struct sock *sk, int size) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + + if (msk->wmem_reserved < 0) + msk->wmem_reserved = 0; + msk->wmem_reserved += size; +} + +static void mptcp_mem_reclaim_partial(struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + + /* if we are experiencing a transint allocation error, + * the forward allocation memory has been already + * released + */ + if (msk->wmem_reserved < 0) + return; + + mptcp_data_lock(sk); + sk->sk_forward_alloc += msk->wmem_reserved; + sk_mem_reclaim_partial(sk); + msk->wmem_reserved = sk->sk_forward_alloc; + sk->sk_forward_alloc = 0; + mptcp_data_unlock(sk); +} + static void dfrag_uncharge(struct sock *sk, int len) { sk_mem_uncharge(sk, len); @@ -781,21 +988,7 @@ static void dfrag_clear(struct sock *sk, struct mptcp_data_frag *dfrag) put_page(dfrag->page); } -static bool mptcp_is_writeable(struct mptcp_sock *msk) -{ - struct mptcp_subflow_context *subflow; - - if (!sk_stream_is_writeable((struct sock *)msk)) - return false; - - mptcp_for_each_subflow(msk, subflow) { - if (sk_stream_is_writeable(subflow->tcp_sock)) - return true; - } - return false; -} - -static void mptcp_clean_una(struct sock *sk) +static void __mptcp_clean_una(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); struct mptcp_data_frag *dtmp, *dfrag; @@ -806,13 +999,15 @@ static void mptcp_clean_una(struct sock *sk) * plain TCP */ if (__mptcp_check_fallback(msk)) - atomic64_set(&msk->snd_una, msk->write_seq); - snd_una = atomic64_read(&msk->snd_una); + msk->snd_una = READ_ONCE(msk->snd_nxt); + snd_una = msk->snd_una; list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) { if (after64(dfrag->data_seq + dfrag->data_len, snd_una)) break; + if (WARN_ON_ONCE(dfrag == msk->first_pending)) + break; dfrag_clear(sk, dfrag); cleaned = true; } @@ -821,12 +1016,13 @@ static void mptcp_clean_una(struct sock *sk) if (dfrag && after64(snd_una, dfrag->data_seq)) { u64 delta = snd_una - dfrag->data_seq; - if (WARN_ON_ONCE(delta > dfrag->data_len)) + if (WARN_ON_ONCE(delta > dfrag->already_sent)) goto out; dfrag->data_seq += delta; dfrag->offset += delta; dfrag->data_len -= delta; + dfrag->already_sent -= delta; dfrag_uncharge(sk, delta); cleaned = true; @@ -834,19 +1030,42 @@ static void mptcp_clean_una(struct sock *sk) out: if (cleaned) { - sk_mem_reclaim_partial(sk); - - /* Only wake up writers if a subflow is ready */ - if (mptcp_is_writeable(msk)) { - set_bit(MPTCP_SEND_SPACE, &mptcp_sk(sk)->flags); - smp_mb__after_atomic(); + if (tcp_under_memory_pressure(sk)) { + __mptcp_update_wmem(sk); + sk_mem_reclaim_partial(sk); + } - /* set SEND_SPACE before sk_stream_write_space clears - * NOSPACE - */ - sk_stream_write_space(sk); + if (sk_stream_is_writeable(sk)) { + /* pairs with memory barrier in mptcp_poll */ + smp_mb(); + if (test_and_clear_bit(MPTCP_NOSPACE, &msk->flags)) + sk_stream_write_space(sk); } } + + if (snd_una == READ_ONCE(msk->snd_nxt)) { + if (msk->timer_ival) + mptcp_stop_timer(sk); + } else { + mptcp_reset_timer(sk); + } +} + +static void mptcp_enter_memory_pressure(struct sock *sk) +{ + struct mptcp_subflow_context *subflow; + struct mptcp_sock *msk = mptcp_sk(sk); + bool first = true; + + sk_stream_moderate_sndbuf(sk); + mptcp_for_each_subflow(msk, subflow) { + struct sock *ssk = mptcp_subflow_tcp_sock(subflow); + + if (first) + tcp_enter_memory_pressure(ssk); + sk_stream_moderate_sndbuf(ssk); + first = false; + } } /* ensure we get enough memory for the frag hdr, beyond some minimal amount of @@ -858,8 +1077,7 @@ static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag) pfrag, sk->sk_allocation))) return true; - sk->sk_prot->enter_memory_pressure(sk); - sk_stream_moderate_sndbuf(sk); + mptcp_enter_memory_pressure(sk); return false; } @@ -875,149 +1093,241 @@ mptcp_carve_data_frag(const struct mptcp_sock *msk, struct page_frag *pfrag, dfrag->data_seq = msk->write_seq; dfrag->overhead = offset - orig_offset + sizeof(struct mptcp_data_frag); dfrag->offset = offset + sizeof(struct mptcp_data_frag); + dfrag->already_sent = 0; dfrag->page = pfrag->page; return dfrag; } -static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, - struct msghdr *msg, struct mptcp_data_frag *dfrag, - long *timeo, int *pmss_now, - int *ps_goal) +struct mptcp_sendmsg_info { + int mss_now; + int size_goal; + u16 limit; + u16 sent; + unsigned int flags; +}; + +static int mptcp_check_allowed_size(struct mptcp_sock *msk, u64 data_seq, + int avail_size) +{ + u64 window_end = mptcp_wnd_end(msk); + + if (__mptcp_check_fallback(msk)) + return avail_size; + + if (!before64(data_seq + avail_size, window_end)) { + u64 allowed_size = window_end - data_seq; + + return min_t(unsigned int, allowed_size, avail_size); + } + + return avail_size; +} + +static bool __mptcp_add_ext(struct sk_buff *skb, gfp_t gfp) +{ + struct skb_ext *mpext = __skb_ext_alloc(gfp); + + if (!mpext) + return false; + __skb_ext_set(skb, SKB_EXT_MPTCP, mpext); + return true; +} + +static struct sk_buff *__mptcp_do_alloc_tx_skb(struct sock *sk, gfp_t gfp) +{ + struct sk_buff *skb; + + skb = alloc_skb_fclone(MAX_TCP_HEADER, gfp); + if (likely(skb)) { + if (likely(__mptcp_add_ext(skb, gfp))) { + skb_reserve(skb, MAX_TCP_HEADER); + skb->reserved_tailroom = skb->end - skb->tail; + return skb; + } + __kfree_skb(skb); + } else { + mptcp_enter_memory_pressure(sk); + } + return NULL; +} + +static bool mptcp_tx_cache_refill(struct sock *sk, int size, + struct sk_buff_head *skbs, int *total_ts) { - int mss_now, avail_size, size_goal, offset, ret, frag_truesize = 0; - bool dfrag_collapsed, can_collapse = false; struct mptcp_sock *msk = mptcp_sk(sk); - struct mptcp_ext *mpext = NULL; - bool retransmission = !!dfrag; - struct sk_buff *skb, *tail; - struct page_frag *pfrag; - struct page *page; - u64 *write_seq; - size_t psize; - - /* use the mptcp page cache so that we can easily move the data - * from one substream to another, but do per subflow memory accounting - * Note: pfrag is used only !retransmission, but the compiler if - * fooled into a warning if we don't init here - */ - pfrag = sk_page_frag(sk); - if (!retransmission) { - write_seq = &msk->write_seq; - page = pfrag->page; + struct sk_buff *skb; + int space_needed; + + if (unlikely(tcp_under_memory_pressure(sk))) { + mptcp_mem_reclaim_partial(sk); + + /* under pressure pre-allocate at most a single skb */ + if (msk->skb_tx_cache.qlen) + return true; + space_needed = msk->size_goal_cache; } else { - write_seq = &dfrag->data_seq; - page = dfrag->page; + space_needed = msk->tx_pending_data + size - + msk->skb_tx_cache.qlen * msk->size_goal_cache; } - /* compute copy limit */ - mss_now = tcp_send_mss(ssk, &size_goal, msg->msg_flags); - *pmss_now = mss_now; - *ps_goal = size_goal; - avail_size = size_goal; - skb = tcp_write_queue_tail(ssk); + while (space_needed > 0) { + skb = __mptcp_do_alloc_tx_skb(sk, sk->sk_allocation); + if (unlikely(!skb)) { + /* under memory pressure, try to pass the caller a + * single skb to allow forward progress + */ + while (skbs->qlen > 1) { + skb = __skb_dequeue_tail(skbs); + __kfree_skb(skb); + } + return skbs->qlen > 0; + } + + *total_ts += skb->truesize; + __skb_queue_tail(skbs, skb); + space_needed -= msk->size_goal_cache; + } + return true; +} + +static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk, gfp_t gfp) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + struct sk_buff *skb; + + if (ssk->sk_tx_skb_cache) { + skb = ssk->sk_tx_skb_cache; + if (unlikely(!skb_ext_find(skb, SKB_EXT_MPTCP) && + !__mptcp_add_ext(skb, gfp))) + return false; + return true; + } + + skb = skb_peek(&msk->skb_tx_cache); if (skb) { - mpext = skb_ext_find(skb, SKB_EXT_MPTCP); + if (likely(sk_wmem_schedule(ssk, skb->truesize))) { + skb = __skb_dequeue(&msk->skb_tx_cache); + if (WARN_ON_ONCE(!skb)) + return false; + + mptcp_wmem_uncharge(sk, skb->truesize); + ssk->sk_tx_skb_cache = skb; + return true; + } + + /* over memory limit, no point to try to allocate a new skb */ + return false; + } + + skb = __mptcp_do_alloc_tx_skb(sk, gfp); + if (!skb) + return false; + + if (likely(sk_wmem_schedule(ssk, skb->truesize))) { + ssk->sk_tx_skb_cache = skb; + return true; + } + kfree_skb(skb); + return false; +} + +static bool mptcp_must_reclaim_memory(struct sock *sk, struct sock *ssk) +{ + return !ssk->sk_tx_skb_cache && + !skb_peek(&mptcp_sk(sk)->skb_tx_cache) && + tcp_under_memory_pressure(sk); +} + +static bool mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk) +{ + if (unlikely(mptcp_must_reclaim_memory(sk, ssk))) + mptcp_mem_reclaim_partial(sk); + return __mptcp_alloc_tx_skb(sk, ssk, sk->sk_allocation); +} +static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, + struct mptcp_data_frag *dfrag, + struct mptcp_sendmsg_info *info) +{ + u64 data_seq = dfrag->data_seq + info->sent; + struct mptcp_sock *msk = mptcp_sk(sk); + bool zero_window_probe = false; + struct mptcp_ext *mpext = NULL; + struct sk_buff *skb, *tail; + bool can_collapse = false; + int size_bias = 0; + int avail_size; + size_t ret = 0; + + pr_debug("msk=%p ssk=%p sending dfrag at seq=%lld len=%d already sent=%d", + msk, ssk, dfrag->data_seq, dfrag->data_len, info->sent); + + /* compute send limit */ + info->mss_now = tcp_send_mss(ssk, &info->size_goal, info->flags); + avail_size = info->size_goal; + msk->size_goal_cache = info->size_goal; + skb = tcp_write_queue_tail(ssk); + if (skb) { /* Limit the write to the size available in the * current skb, if any, so that we create at most a new skb. * Explicitly tells TCP internals to avoid collapsing on later * queue management operation, to avoid breaking the ext <-> * SSN association set here */ - can_collapse = (size_goal - skb->len > 0) && - mptcp_skb_can_collapse_to(*write_seq, skb, mpext); - if (!can_collapse) + mpext = skb_ext_find(skb, SKB_EXT_MPTCP); + can_collapse = (info->size_goal - skb->len > 0) && + mptcp_skb_can_collapse_to(data_seq, skb, mpext); + if (!can_collapse) { TCP_SKB_CB(skb)->eor = 1; - else - avail_size = size_goal - skb->len; - } - - if (!retransmission) { - /* reuse tail pfrag, if possible, or carve a new one from the - * page allocator - */ - dfrag = mptcp_rtx_tail(sk); - offset = pfrag->offset; - dfrag_collapsed = mptcp_frag_can_collapse_to(msk, pfrag, dfrag); - if (!dfrag_collapsed) { - dfrag = mptcp_carve_data_frag(msk, pfrag, offset); - offset = dfrag->offset; - frag_truesize = dfrag->overhead; - } - psize = min_t(size_t, pfrag->size - offset, avail_size); - - /* Copy to page */ - pr_debug("left=%zu", msg_data_left(msg)); - psize = copy_page_from_iter(pfrag->page, offset, - min_t(size_t, msg_data_left(msg), - psize), - &msg->msg_iter); - pr_debug("left=%zu", msg_data_left(msg)); - if (!psize) - return -EINVAL; - - if (!sk_wmem_schedule(sk, psize + dfrag->overhead)) { - iov_iter_revert(&msg->msg_iter, psize); - return -ENOMEM; + } else { + size_bias = skb->len; + avail_size = info->size_goal - skb->len; } - } else { - offset = dfrag->offset; - psize = min_t(size_t, dfrag->data_len, avail_size); } - /* tell the TCP stack to delay the push so that we can safely - * access the skb after the sendpages call - */ - ret = do_tcp_sendpages(ssk, page, offset, psize, - msg->msg_flags | MSG_SENDPAGE_NOTLAST | MSG_DONTWAIT); - if (ret <= 0) { - if (!retransmission) - iov_iter_revert(&msg->msg_iter, psize); - return ret; - } + /* Zero window and all data acked? Probe. */ + avail_size = mptcp_check_allowed_size(msk, data_seq, avail_size); + if (avail_size == 0) { + u64 snd_una = READ_ONCE(msk->snd_una); - frag_truesize += ret; - if (!retransmission) { - if (unlikely(ret < psize)) - iov_iter_revert(&msg->msg_iter, psize - ret); + if (skb || snd_una != msk->snd_nxt) + return 0; + zero_window_probe = true; + data_seq = snd_una - 1; + avail_size = 1; + } - /* send successful, keep track of sent data for mptcp-level - * retransmission - */ - dfrag->data_len += ret; - if (!dfrag_collapsed) { - get_page(dfrag->page); - list_add_tail(&dfrag->list, &msk->rtx_queue); - sk_wmem_queued_add(sk, frag_truesize); - } else { - sk_wmem_queued_add(sk, ret); - } + if (WARN_ON_ONCE(info->sent > info->limit || + info->limit > dfrag->data_len)) + return 0; - /* charge data on mptcp rtx queue to the master socket - * Note: we charge such data both to sk and ssk - */ - sk->sk_forward_alloc -= frag_truesize; + ret = info->limit - info->sent; + tail = tcp_build_frag(ssk, avail_size + size_bias, info->flags, + dfrag->page, dfrag->offset + info->sent, &ret); + if (!tail) { + tcp_remove_empty_skb(sk, tcp_write_queue_tail(ssk)); + return -ENOMEM; } - /* if the tail skb extension is still the cached one, collapsing - * really happened. Note: we can't check for 'same skb' as the sk_buff - * hdr on tail can be transmitted, freed and re-allocated by the - * do_tcp_sendpages() call + /* if the tail skb is still the cached one, collapsing really happened. */ - tail = tcp_write_queue_tail(ssk); - if (mpext && tail && mpext == skb_ext_find(tail, SKB_EXT_MPTCP)) { - WARN_ON_ONCE(!can_collapse); + if (skb == tail) { + TCP_SKB_CB(tail)->tcp_flags &= ~TCPHDR_PSH; mpext->data_len += ret; + WARN_ON_ONCE(!can_collapse); + WARN_ON_ONCE(zero_window_probe); goto out; } - skb = tcp_write_queue_tail(ssk); - mpext = __skb_ext_set(skb, SKB_EXT_MPTCP, msk->cached_ext); - msk->cached_ext = NULL; + mpext = skb_ext_find(tail, SKB_EXT_MPTCP); + if (WARN_ON_ONCE(!mpext)) { + /* should never reach here, stream corrupted */ + return -EINVAL; + } memset(mpext, 0, sizeof(*mpext)); - mpext->data_seq = *write_seq; + mpext->data_seq = data_seq; mpext->subflow_seq = mptcp_subflow_ctx(ssk)->rel_write_seq; mpext->data_len = ret; mpext->use_map = 1; @@ -1027,44 +1337,17 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, mpext->data_seq, mpext->subflow_seq, mpext->data_len, mpext->dsn64); + if (zero_window_probe) { + mptcp_subflow_ctx(ssk)->rel_write_seq += ret; + mpext->frozen = 1; + ret = 0; + tcp_push_pending_frames(ssk); + } out: - if (!retransmission) - pfrag->offset += frag_truesize; - WRITE_ONCE(*write_seq, *write_seq + ret); mptcp_subflow_ctx(ssk)->rel_write_seq += ret; - return ret; } -static void mptcp_nospace(struct mptcp_sock *msk) -{ - struct mptcp_subflow_context *subflow; - - clear_bit(MPTCP_SEND_SPACE, &msk->flags); - smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */ - - mptcp_for_each_subflow(msk, subflow) { - struct sock *ssk = mptcp_subflow_tcp_sock(subflow); - struct socket *sock = READ_ONCE(ssk->sk_socket); - - /* enables ssk->write_space() callbacks */ - if (sock) - set_bit(SOCK_NOSPACE, &sock->flags); - } -} - -static bool mptcp_subflow_active(struct mptcp_subflow_context *subflow) -{ - struct sock *ssk = mptcp_subflow_tcp_sock(subflow); - - /* can't send if JOIN hasn't completed yet (i.e. is usable for mptcp) */ - if (subflow->request_join && !subflow->fully_established) - return false; - - /* only send if our side has not closed yet */ - return ((1 << ssk->sk_state) & (TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)); -} - #define MPTCP_SEND_BURST_SIZE ((1 << 16) - \ sizeof(struct tcphdr) - \ MAX_TCP_OPTION_SPACE - \ @@ -1089,9 +1372,6 @@ static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk, sock_owned_by_me((struct sock *)msk); *sndbuf = 0; - if (!mptcp_ext_cache_refill(msk)) - return NULL; - if (__mptcp_check_fallback(msk)) { if (!msk->first) return NULL; @@ -1154,27 +1434,160 @@ static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk, return NULL; } -static void ssk_check_wmem(struct mptcp_sock *msk) +static void mptcp_push_release(struct sock *sk, struct sock *ssk, + struct mptcp_sendmsg_info *info) +{ + mptcp_set_timeout(sk, ssk); + tcp_push(ssk, 0, info->mss_now, tcp_sk(ssk)->nonagle, info->size_goal); + release_sock(ssk); +} + +static void mptcp_push_pending(struct sock *sk, unsigned int flags) +{ + struct sock *prev_ssk = NULL, *ssk = NULL; + struct mptcp_sock *msk = mptcp_sk(sk); + struct mptcp_sendmsg_info info = { + .flags = flags, + }; + struct mptcp_data_frag *dfrag; + int len, copied = 0; + u32 sndbuf; + + while ((dfrag = mptcp_send_head(sk))) { + info.sent = dfrag->already_sent; + info.limit = dfrag->data_len; + len = dfrag->data_len - dfrag->already_sent; + while (len > 0) { + int ret = 0; + + prev_ssk = ssk; + __mptcp_flush_join_list(msk); + ssk = mptcp_subflow_get_send(msk, &sndbuf); + + /* do auto tuning */ + if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) && + sndbuf > READ_ONCE(sk->sk_sndbuf)) + WRITE_ONCE(sk->sk_sndbuf, sndbuf); + + /* try to keep the subflow socket lock across + * consecutive xmit on the same socket + */ + if (ssk != prev_ssk && prev_ssk) + mptcp_push_release(sk, prev_ssk, &info); + if (!ssk) + goto out; + + if (ssk != prev_ssk || !prev_ssk) + lock_sock(ssk); + + /* keep it simple and always provide a new skb for the + * subflow, even if we will not use it when collapsing + * on the pending one + */ + if (!mptcp_alloc_tx_skb(sk, ssk)) { + mptcp_push_release(sk, ssk, &info); + goto out; + } + + ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info); + if (ret <= 0) { + mptcp_push_release(sk, ssk, &info); + goto out; + } + + info.sent += ret; + dfrag->already_sent += ret; + msk->snd_nxt += ret; + msk->snd_burst -= ret; + msk->tx_pending_data -= ret; + copied += ret; + len -= ret; + } + WRITE_ONCE(msk->first_pending, mptcp_send_next(sk)); + } + + /* at this point we held the socket lock for the last subflow we used */ + if (ssk) + mptcp_push_release(sk, ssk, &info); + +out: + if (copied) { + /* start the timer, if it's not pending */ + if (!mptcp_timer_pending(sk)) + mptcp_reset_timer(sk); + __mptcp_check_send_data_fin(sk); + } +} + +static void __mptcp_subflow_push_pending(struct sock *sk, struct sock *ssk) { - if (unlikely(!mptcp_is_writeable(msk))) - mptcp_nospace(msk); + struct mptcp_sock *msk = mptcp_sk(sk); + struct mptcp_sendmsg_info info; + struct mptcp_data_frag *dfrag; + int len, copied = 0; + + info.flags = 0; + while ((dfrag = mptcp_send_head(sk))) { + info.sent = dfrag->already_sent; + info.limit = dfrag->data_len; + len = dfrag->data_len - dfrag->already_sent; + while (len > 0) { + int ret = 0; + + /* do auto tuning */ + if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) && + ssk->sk_sndbuf > READ_ONCE(sk->sk_sndbuf)) + WRITE_ONCE(sk->sk_sndbuf, ssk->sk_sndbuf); + + if (unlikely(mptcp_must_reclaim_memory(sk, ssk))) { + __mptcp_update_wmem(sk); + sk_mem_reclaim_partial(sk); + } + if (!__mptcp_alloc_tx_skb(sk, ssk, GFP_ATOMIC)) + goto out; + + ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info); + if (ret <= 0) + goto out; + + info.sent += ret; + dfrag->already_sent += ret; + msk->snd_nxt += ret; + msk->snd_burst -= ret; + msk->tx_pending_data -= ret; + copied += ret; + len -= ret; + } + WRITE_ONCE(msk->first_pending, mptcp_send_next(sk)); + } + +out: + /* __mptcp_alloc_tx_skb could have released some wmem and we are + * not going to flush it via release_sock() + */ + __mptcp_update_wmem(sk); + if (copied) { + mptcp_set_timeout(sk, ssk); + tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle, + info.size_goal); + if (msk->snd_data_fin_enable && + msk->snd_nxt + 1 == msk->write_seq) + mptcp_schedule_work(sk); + } } static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) { - int mss_now = 0, size_goal = 0, ret = 0; struct mptcp_sock *msk = mptcp_sk(sk); struct page_frag *pfrag; size_t copied = 0; - struct sock *ssk; - u32 sndbuf; - bool tx_ok; + int ret = 0; long timeo; if (msg->msg_flags & ~(MSG_MORE | MSG_DONTWAIT | MSG_NOSIGNAL)) return -EOPNOTSUPP; - lock_sock(sk); + mptcp_lock_sock(sk, __mptcp_wmem_reserve(sk, len)); timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT); @@ -1185,130 +1598,97 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) } pfrag = sk_page_frag(sk); -restart: - mptcp_clean_una(sk); - - if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) { - ret = -EPIPE; - goto out; - } - __mptcp_flush_join_list(msk); - ssk = mptcp_subflow_get_send(msk, &sndbuf); - while (!sk_stream_memory_free(sk) || - !ssk || - !mptcp_page_frag_refill(ssk, pfrag)) { - if (ssk) { - /* make sure retransmit timer is - * running before we wait for memory. - * - * The retransmit timer might be needed - * to make the peer send an up-to-date - * MPTCP Ack. - */ - mptcp_set_timeout(sk, ssk); - if (!mptcp_timer_pending(sk)) - mptcp_reset_timer(sk); - } + while (msg_data_left(msg)) { + int total_ts, frag_truesize = 0; + struct mptcp_data_frag *dfrag; + struct sk_buff_head skbs; + bool dfrag_collapsed; + size_t psize, offset; - mptcp_nospace(msk); - ret = sk_stream_wait_memory(sk, &timeo); - if (ret) - goto out; - - mptcp_clean_una(sk); - - ssk = mptcp_subflow_get_send(msk, &sndbuf); - if (list_empty(&msk->conn_list)) { - ret = -ENOTCONN; + if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) { + ret = -EPIPE; goto out; } - } - /* do auto tuning */ - if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) && - sndbuf > READ_ONCE(sk->sk_sndbuf)) - WRITE_ONCE(sk->sk_sndbuf, sndbuf); + /* reuse tail pfrag, if possible, or carve a new one from the + * page allocator + */ + dfrag = mptcp_pending_tail(sk); + dfrag_collapsed = mptcp_frag_can_collapse_to(msk, pfrag, dfrag); + if (!dfrag_collapsed) { + if (!sk_stream_memory_free(sk)) + goto wait_for_memory; - pr_debug("conn_list->subflow=%p", ssk); + if (!mptcp_page_frag_refill(sk, pfrag)) + goto wait_for_memory; - lock_sock(ssk); - tx_ok = msg_data_left(msg); - while (tx_ok) { - ret = mptcp_sendmsg_frag(sk, ssk, msg, NULL, &timeo, &mss_now, - &size_goal); - if (ret < 0) { - if (ret == -EAGAIN && timeo > 0) { - mptcp_set_timeout(sk, ssk); - release_sock(ssk); - goto restart; - } - break; + dfrag = mptcp_carve_data_frag(msk, pfrag, pfrag->offset); + frag_truesize = dfrag->overhead; } - /* burst can be negative, we will try move to the next subflow - * at selection time, if possible. + /* we do not bound vs wspace, to allow a single packet. + * memory accounting will prevent execessive memory usage + * anyway */ - msk->snd_burst -= ret; - copied += ret; - - tx_ok = msg_data_left(msg); - if (!tx_ok) - break; + offset = dfrag->offset + dfrag->data_len; + psize = pfrag->size - offset; + psize = min_t(size_t, psize, msg_data_left(msg)); + total_ts = psize + frag_truesize; + __skb_queue_head_init(&skbs); + if (!mptcp_tx_cache_refill(sk, psize, &skbs, &total_ts)) + goto wait_for_memory; + + if (!mptcp_wmem_alloc(sk, total_ts)) { + __skb_queue_purge(&skbs); + goto wait_for_memory; + } - if (!sk_stream_memory_free(ssk) || - !mptcp_page_frag_refill(ssk, pfrag) || - !mptcp_ext_cache_refill(msk)) { - tcp_push(ssk, msg->msg_flags, mss_now, - tcp_sk(ssk)->nonagle, size_goal); - mptcp_set_timeout(sk, ssk); - release_sock(ssk); - goto restart; + skb_queue_splice_tail(&skbs, &msk->skb_tx_cache); + if (copy_page_from_iter(dfrag->page, offset, psize, + &msg->msg_iter) != psize) { + mptcp_wmem_uncharge(sk, psize + frag_truesize); + ret = -EFAULT; + goto out; } - /* memory is charged to mptcp level socket as well, i.e. - * if msg is very large, mptcp socket may run out of buffer - * space. mptcp_clean_una() will release data that has - * been acked at mptcp level in the mean time, so there is - * a good chance we can continue sending data right away. - * - * Normally, when the tcp subflow can accept more data, then - * so can the MPTCP socket. However, we need to cope with - * peers that might lag behind in their MPTCP-level - * acknowledgements, i.e. data might have been acked at - * tcp level only. So, we must also check the MPTCP socket - * limits before we send more data. + /* data successfully copied into the write queue */ + copied += psize; + dfrag->data_len += psize; + frag_truesize += psize; + pfrag->offset += frag_truesize; + WRITE_ONCE(msk->write_seq, msk->write_seq + psize); + + /* charge data on mptcp pending queue to the msk socket + * Note: we charge such data both to sk and ssk */ - if (unlikely(!sk_stream_memory_free(sk))) { - tcp_push(ssk, msg->msg_flags, mss_now, - tcp_sk(ssk)->nonagle, size_goal); - mptcp_clean_una(sk); - if (!sk_stream_memory_free(sk)) { - /* can't send more for now, need to wait for - * MPTCP-level ACKs from peer. - * - * Wakeup will happen via mptcp_clean_una(). - */ - mptcp_set_timeout(sk, ssk); - release_sock(ssk); - goto restart; - } + sk_wmem_queued_add(sk, frag_truesize); + if (!dfrag_collapsed) { + get_page(dfrag->page); + list_add_tail(&dfrag->list, &msk->rtx_queue); + if (!msk->first_pending) + WRITE_ONCE(msk->first_pending, dfrag); } + pr_debug("msk=%p dfrag at seq=%lld len=%d sent=%d new=%d", msk, + dfrag->data_seq, dfrag->data_len, dfrag->already_sent, + !dfrag_collapsed); + + continue; + +wait_for_memory: + set_bit(MPTCP_NOSPACE, &msk->flags); + mptcp_push_pending(sk, msg->msg_flags); + ret = sk_stream_wait_memory(sk, &timeo); + if (ret) + goto out; } - mptcp_set_timeout(sk, ssk); if (copied) { - tcp_push(ssk, msg->msg_flags, mss_now, tcp_sk(ssk)->nonagle, - size_goal); - - /* start the timer, if it's not pending */ - if (!mptcp_timer_pending(sk)) - mptcp_reset_timer(sk); + msk->tx_pending_data += copied; + mptcp_push_pending(sk, msg->msg_flags); } - release_sock(ssk); out: - ssk_check_wmem(msk); release_sock(sk); return copied ? : ret; } @@ -1332,11 +1712,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk, struct msghdr *msg, size_t len) { - struct sock *sk = (struct sock *)msk; struct sk_buff *skb; int copied = 0; - while ((skb = skb_peek(&sk->sk_receive_queue)) != NULL) { + while ((skb = skb_peek(&msk->receive_queue)) != NULL) { u32 offset = MPTCP_SKB_CB(skb)->offset; u32 data_len = skb->len - offset; u32 count = min_t(size_t, len - copied, data_len); @@ -1356,7 +1735,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk, break; } - __skb_unlink(skb, &sk->sk_receive_queue); + /* we will bulk release the skb memory later */ + skb->destructor = NULL; + msk->rmem_released += skb->truesize; + __skb_unlink(skb, &msk->receive_queue); __kfree_skb(skb); if (copied >= len) @@ -1464,32 +1846,68 @@ new_measure: msk->rcvq_space.time = mstamp; } -static bool __mptcp_move_skbs(struct mptcp_sock *msk) +static void __mptcp_update_rmem(struct sock *sk) { - unsigned int moved = 0; - bool done; + struct mptcp_sock *msk = mptcp_sk(sk); - /* avoid looping forever below on racing close */ - if (((struct sock *)msk)->sk_state == TCP_CLOSE) - return false; + if (!msk->rmem_released) + return; + + atomic_sub(msk->rmem_released, &sk->sk_rmem_alloc); + sk_mem_uncharge(sk, msk->rmem_released); + msk->rmem_released = 0; +} + +static void __mptcp_splice_receive_queue(struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); + + skb_queue_splice_tail_init(&sk->sk_receive_queue, &msk->receive_queue); +} + +static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv) +{ + struct sock *sk = (struct sock *)msk; + unsigned int moved = 0; + bool ret, done; __mptcp_flush_join_list(msk); do { struct sock *ssk = mptcp_subflow_recv_lookup(msk); + bool slowpath; - if (!ssk) + /* we can have data pending in the subflows only if the msk + * receive buffer was full at subflow_data_ready() time, + * that is an unlikely slow path. + */ + if (likely(!ssk)) break; - lock_sock(ssk); + slowpath = lock_sock_fast(ssk); + mptcp_data_lock(sk); done = __mptcp_move_skbs_from_subflow(msk, ssk, &moved); - release_sock(ssk); + mptcp_data_unlock(sk); + if (moved && rcv) { + WRITE_ONCE(msk->rmem_pending, min(rcv, moved)); + tcp_cleanup_rbuf(ssk, 1); + WRITE_ONCE(msk->rmem_pending, 0); + } + unlock_sock_fast(ssk, slowpath); } while (!done); - if (mptcp_ofo_queue(msk) || moved > 0) { - mptcp_check_data_fin((struct sock *)msk); - return true; + /* acquire the data lock only if some input data is pending */ + ret = moved > 0; + if (!RB_EMPTY_ROOT(&msk->out_of_order_queue) || + !skb_queue_empty_lockless(&sk->sk_receive_queue)) { + mptcp_data_lock(sk); + __mptcp_update_rmem(sk); + ret |= __mptcp_ofo_queue(msk); + __mptcp_splice_receive_queue(sk); + mptcp_data_unlock(sk); } - return false; + if (ret) + mptcp_check_data_fin((struct sock *)msk); + return !skb_queue_empty(&msk->receive_queue); } static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, @@ -1503,15 +1921,19 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, if (msg->msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT)) return -EOPNOTSUPP; - lock_sock(sk); + mptcp_lock_sock(sk, __mptcp_splice_receive_queue(sk)); + if (unlikely(sk->sk_state == TCP_LISTEN)) { + copied = -ENOTCONN; + goto out_err; + } + timeo = sock_rcvtimeo(sk, nonblock); len = min_t(size_t, len, INT_MAX); target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); - __mptcp_flush_join_list(msk); - while (len > (size_t)copied) { - int bytes_read; + while (copied < len) { + int bytes_read, old_space; bytes_read = __mptcp_recvmsg_mskq(msk, msg, len - copied); if (unlikely(bytes_read < 0)) { @@ -1522,10 +1944,15 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, copied += bytes_read; - if (skb_queue_empty(&sk->sk_receive_queue) && - __mptcp_move_skbs(msk)) + if (skb_queue_empty(&msk->receive_queue) && + __mptcp_move_skbs(msk, len - copied)) continue; + /* be sure to advertise window change */ + old_space = READ_ONCE(msk->old_wspace); + if ((tcp_space(sk) - old_space) >= old_space) + mptcp_cleanup_rbuf(msk); + /* only the master socket status is relevant here. The exit * conditions mirror closely tcp_recvmsg() */ @@ -1548,8 +1975,14 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, if (test_and_clear_bit(MPTCP_WORK_EOF, &msk->flags)) mptcp_check_for_eof(msk); - if (sk->sk_shutdown & RCV_SHUTDOWN) + if (sk->sk_shutdown & RCV_SHUTDOWN) { + /* race breaker: the shutdown could be after the + * previous receive queue check + */ + if (__mptcp_move_skbs(msk, len - copied)) + continue; break; + } if (sk->sk_state == TCP_CLOSE) { copied = -ENOTCONN; @@ -1571,14 +2004,15 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, mptcp_wait_data(sk, &timeo); } - if (skb_queue_empty(&sk->sk_receive_queue)) { + if (skb_queue_empty_lockless(&sk->sk_receive_queue) && + skb_queue_empty(&msk->receive_queue)) { /* entire backlog drained, clear DATA_READY. */ clear_bit(MPTCP_DATA_READY, &msk->flags); /* .. race-breaker: ssk might have gotten new data * after last __mptcp_move_skbs() returned false. */ - if (unlikely(__mptcp_move_skbs(msk))) + if (unlikely(__mptcp_move_skbs(msk, 0))) set_bit(MPTCP_DATA_READY, &msk->flags); } else if (unlikely(!test_bit(MPTCP_DATA_READY, &msk->flags))) { /* data to read but mptcp_wait_data() cleared DATA_READY */ @@ -1587,7 +2021,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, out_err: pr_debug("msk=%p data_ready=%d rx queue empty=%d copied=%d", msk, test_bit(MPTCP_DATA_READY, &msk->flags), - skb_queue_empty(&sk->sk_receive_queue), copied); + skb_queue_empty_lockless(&sk->sk_receive_queue), copied); mptcp_rcv_space_adjust(msk, copied); release_sock(sk); @@ -1598,13 +2032,8 @@ static void mptcp_retransmit_handler(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); - if (atomic64_read(&msk->snd_una) == READ_ONCE(msk->write_seq)) { - mptcp_stop_timer(sk); - } else { - set_bit(MPTCP_WORK_RTX, &msk->flags); - if (schedule_work(&msk->work)) - sock_hold(sk); - } + set_bit(MPTCP_WORK_RTX, &msk->flags); + mptcp_schedule_work(sk); } static void mptcp_retransmit_timer(struct timer_list *t) @@ -1626,6 +2055,14 @@ static void mptcp_retransmit_timer(struct timer_list *t) sock_put(sk); } +static void mptcp_timeout_timer(struct timer_list *t) +{ + struct sock *sk = from_timer(sk, t, sk_timer); + + mptcp_schedule_work(sk); + sock_put(sk); +} + /* Find an idle subflow. Return NULL if there is unacked data at tcp * level. * @@ -1639,7 +2076,7 @@ static struct sock *mptcp_subflow_get_retrans(const struct mptcp_sock *msk) sock_owned_by_me((const struct sock *)msk); if (__mptcp_check_fallback(msk)) - return msk->first; + return NULL; mptcp_for_each_subflow(msk, subflow) { struct sock *ssk = mptcp_subflow_tcp_sock(subflow); @@ -1648,8 +2085,11 @@ static struct sock *mptcp_subflow_get_retrans(const struct mptcp_sock *msk) continue; /* still data outstanding at TCP level? Don't retransmit. */ - if (!tcp_write_queue_empty(ssk)) + if (!tcp_write_queue_empty(ssk)) { + if (inet_csk(ssk)->icsk_ca_state >= TCP_CA_Loss) + continue; return NULL; + } if (subflow->backup) { if (!backup) @@ -1672,20 +2112,44 @@ static struct sock *mptcp_subflow_get_retrans(const struct mptcp_sock *msk) * parent socket. */ void __mptcp_close_ssk(struct sock *sk, struct sock *ssk, - struct mptcp_subflow_context *subflow, - long timeout) + struct mptcp_subflow_context *subflow) { - struct socket *sock = READ_ONCE(ssk->sk_socket); + bool dispose_socket = false; + struct socket *sock; list_del(&subflow->node); - if (sock && sock != sk->sk_socket) { - /* outgoing subflow */ - sock_release(sock); + lock_sock(ssk); + + /* if we are invoked by the msk cleanup code, the subflow is + * already orphaned + */ + sock = ssk->sk_socket; + if (sock) { + dispose_socket = sock != sk->sk_socket; + sock_orphan(ssk); + } + + subflow->disposable = 1; + + /* if ssk hit tcp_done(), tcp_cleanup_ulp() cleared the related ops + * the ssk has been already destroyed, we just need to release the + * reference owned by msk; + */ + if (!inet_csk(ssk)->icsk_ulp_ops) { + kfree_rcu(subflow, rcu); } else { - /* incoming subflow */ - tcp_close(ssk, timeout); + /* otherwise tcp will dispose of the ssk and subflow ctx */ + __tcp_close(ssk, 0); + + /* close acquired an extra ref */ + __sock_put(ssk); } + release_sock(ssk); + if (dispose_socket) + iput(SOCK_INODE(sock)); + + sock_put(ssk); } static unsigned int mptcp_sync_mss(struct sock *sk, u32 pmtu) @@ -1704,6 +2168,10 @@ static void pm_work(struct mptcp_sock *msk) pm->status &= ~BIT(MPTCP_PM_ADD_ADDR_RECEIVED); mptcp_pm_nl_add_addr_received(msk); } + if (pm->status & BIT(MPTCP_PM_ADD_ADDR_SEND_ACK)) { + pm->status &= ~BIT(MPTCP_PM_ADD_ADDR_SEND_ACK); + mptcp_pm_nl_add_addr_send_ack(msk); + } if (pm->status & BIT(MPTCP_PM_RM_ADDR_RECEIVED)) { pm->status &= ~BIT(MPTCP_PM_RM_ADDR_RECEIVED); mptcp_pm_nl_rm_addr_received(msk); @@ -1730,40 +2198,102 @@ static void __mptcp_close_subflow(struct mptcp_sock *msk) if (inet_sk_state_load(ssk) != TCP_CLOSE) continue; - __mptcp_close_ssk((struct sock *)msk, ssk, subflow, 0); + __mptcp_close_ssk((struct sock *)msk, ssk, subflow); + } +} + +static bool mptcp_check_close_timeout(const struct sock *sk) +{ + s32 delta = tcp_jiffies32 - inet_csk(sk)->icsk_mtup.probe_timestamp; + struct mptcp_subflow_context *subflow; + + if (delta >= TCP_TIMEWAIT_LEN) + return true; + + /* if all subflows are in closed status don't bother with additional + * timeout + */ + mptcp_for_each_subflow(mptcp_sk(sk), subflow) { + if (inet_sk_state_load(mptcp_subflow_tcp_sock(subflow)) != + TCP_CLOSE) + return false; + } + return true; +} + +static void mptcp_check_fastclose(struct mptcp_sock *msk) +{ + struct mptcp_subflow_context *subflow, *tmp; + struct sock *sk = &msk->sk.icsk_inet.sk; + + if (likely(!READ_ONCE(msk->rcv_fastclose))) + return; + + mptcp_token_destroy(msk); + + list_for_each_entry_safe(subflow, tmp, &msk->conn_list, node) { + struct sock *tcp_sk = mptcp_subflow_tcp_sock(subflow); + + lock_sock(tcp_sk); + if (tcp_sk->sk_state != TCP_CLOSE) { + tcp_send_active_reset(tcp_sk, GFP_ATOMIC); + tcp_set_state(tcp_sk, TCP_CLOSE); + } + release_sock(tcp_sk); } + + inet_sk_state_store(sk, TCP_CLOSE); + sk->sk_shutdown = SHUTDOWN_MASK; + smp_mb__before_atomic(); /* SHUTDOWN must be visible first */ + set_bit(MPTCP_DATA_READY, &msk->flags); + set_bit(MPTCP_WORK_CLOSE_SUBFLOW, &msk->flags); + + mptcp_close_wake_up(sk); } static void mptcp_worker(struct work_struct *work) { struct mptcp_sock *msk = container_of(work, struct mptcp_sock, work); struct sock *ssk, *sk = &msk->sk.icsk_inet.sk; - int orig_len, orig_offset, mss_now = 0, size_goal = 0; + struct mptcp_sendmsg_info info = {}; struct mptcp_data_frag *dfrag; - u64 orig_write_seq; size_t copied = 0; - struct msghdr msg = { - .msg_flags = MSG_DONTWAIT, - }; - long timeo = 0; + int state, ret; lock_sock(sk); - mptcp_clean_una(sk); + state = sk->sk_state; + if (unlikely(state == TCP_CLOSE)) + goto unlock; + mptcp_check_data_fin_ack(sk); __mptcp_flush_join_list(msk); + + mptcp_check_fastclose(msk); + if (test_and_clear_bit(MPTCP_WORK_CLOSE_SUBFLOW, &msk->flags)) __mptcp_close_subflow(msk); - __mptcp_move_skbs(msk); - if (msk->pm.status) pm_work(msk); if (test_and_clear_bit(MPTCP_WORK_EOF, &msk->flags)) mptcp_check_for_eof(msk); + __mptcp_check_send_data_fin(sk); mptcp_check_data_fin(sk); + /* if the msk data is completely acked, or the socket timedout, + * there is no point in keeping around an orphaned sk + */ + if (sock_flag(sk, SOCK_DEAD) && + (mptcp_check_close_timeout(sk) || + (state != sk->sk_state && + ((1 << inet_sk_state_load(sk)) & (TCPF_CLOSE | TCPF_FIN_WAIT2))))) { + inet_sk_state_store(sk, TCP_CLOSE); + __mptcp_destroy_sock(sk); + goto unlock; + } + if (!test_and_clear_bit(MPTCP_WORK_RTX, &msk->flags)) goto unlock; @@ -1771,39 +2301,30 @@ static void mptcp_worker(struct work_struct *work) if (!dfrag) goto unlock; - if (!mptcp_ext_cache_refill(msk)) - goto reset_unlock; - ssk = mptcp_subflow_get_retrans(msk); if (!ssk) goto reset_unlock; lock_sock(ssk); - orig_len = dfrag->data_len; - orig_offset = dfrag->offset; - orig_write_seq = dfrag->data_seq; - while (dfrag->data_len > 0) { - int ret = mptcp_sendmsg_frag(sk, ssk, &msg, dfrag, &timeo, - &mss_now, &size_goal); - if (ret < 0) + /* limit retransmission to the bytes already sent on some subflows */ + info.sent = 0; + info.limit = dfrag->already_sent; + while (info.sent < dfrag->already_sent) { + if (!mptcp_alloc_tx_skb(sk, ssk)) + break; + + ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info); + if (ret <= 0) break; MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RETRANSSEGS); copied += ret; - dfrag->data_len -= ret; - dfrag->offset += ret; - - if (!mptcp_ext_cache_refill(msk)) - break; + info.sent += ret; } if (copied) - tcp_push(ssk, msg.msg_flags, mss_now, tcp_sk(ssk)->nonagle, - size_goal); - - dfrag->data_seq = orig_write_seq; - dfrag->offset = orig_offset; - dfrag->data_len = orig_len; + tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle, + info.size_goal); mptcp_set_timeout(sk, ssk); release_sock(ssk); @@ -1826,10 +2347,17 @@ static int __mptcp_init_sock(struct sock *sk) INIT_LIST_HEAD(&msk->conn_list); INIT_LIST_HEAD(&msk->join_list); INIT_LIST_HEAD(&msk->rtx_queue); - __set_bit(MPTCP_SEND_SPACE, &msk->flags); INIT_WORK(&msk->work, mptcp_worker); + __skb_queue_head_init(&msk->receive_queue); + __skb_queue_head_init(&msk->skb_tx_cache); msk->out_of_order_queue = RB_ROOT; + msk->first_pending = NULL; + msk->wmem_reserved = 0; + msk->rmem_released = 0; + msk->tx_pending_data = 0; + msk->size_goal_cache = TCP_BASE_MSS; + msk->ack_hint = NULL; msk->first = NULL; inet_csk(sk)->icsk_sync_mss = mptcp_sync_mss; @@ -1837,7 +2365,7 @@ static int __mptcp_init_sock(struct sock *sk) /* re-use the csk retrans timer for MPTCP-level retrans */ timer_setup(&msk->sk.icsk_retransmit_timer, mptcp_retransmit_timer, 0); - + timer_setup(&sk->sk_timer, mptcp_timeout_timer, 0); return 0; } @@ -1871,11 +2399,15 @@ static void __mptcp_clear_xmit(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); struct mptcp_data_frag *dtmp, *dfrag; + struct sk_buff *skb; - sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer); - + WRITE_ONCE(msk->first_pending, NULL); list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) dfrag_clear(sk, dfrag); + while ((skb = __skb_dequeue(&msk->skb_tx_cache)) != NULL) { + sk->sk_forward_alloc += skb->truesize; + kfree_skb(skb); + } } static void mptcp_cancel_work(struct sock *sk) @@ -1883,7 +2415,7 @@ static void mptcp_cancel_work(struct sock *sk) struct mptcp_sock *msk = mptcp_sk(sk); if (cancel_work_sync(&msk->work)) - sock_put(sk); + __sock_put(sk); } void mptcp_subflow_shutdown(struct sock *sk, struct sock *ssk, int how) @@ -1941,42 +2473,67 @@ static int mptcp_close_state(struct sock *sk) return next & TCP_ACTION_FIN; } -static void mptcp_close(struct sock *sk, long timeout) +static void __mptcp_check_send_data_fin(struct sock *sk) { - struct mptcp_subflow_context *subflow, *tmp; + struct mptcp_subflow_context *subflow; struct mptcp_sock *msk = mptcp_sk(sk); - LIST_HEAD(conn_list); - lock_sock(sk); - sk->sk_shutdown = SHUTDOWN_MASK; + pr_debug("msk=%p snd_data_fin_enable=%d pending=%d snd_nxt=%llu write_seq=%llu", + msk, msk->snd_data_fin_enable, !!mptcp_send_head(sk), + msk->snd_nxt, msk->write_seq); - if (sk->sk_state == TCP_LISTEN) { - inet_sk_state_store(sk, TCP_CLOSE); - goto cleanup; - } else if (sk->sk_state == TCP_CLOSE) { - goto cleanup; - } + /* we still need to enqueue subflows or not really shutting down, + * skip this + */ + if (!msk->snd_data_fin_enable || msk->snd_nxt + 1 != msk->write_seq || + mptcp_send_head(sk)) + return; + + WRITE_ONCE(msk->snd_nxt, msk->write_seq); + /* fallback socket will not get data_fin/ack, can move to the next + * state now + */ if (__mptcp_check_fallback(msk)) { - goto update_state; - } else if (mptcp_close_state(sk)) { - pr_debug("Sending DATA_FIN sk=%p", sk); - WRITE_ONCE(msk->write_seq, msk->write_seq + 1); - WRITE_ONCE(msk->snd_data_fin_enable, 1); + if ((1 << sk->sk_state) & (TCPF_CLOSING | TCPF_LAST_ACK)) { + inet_sk_state_store(sk, TCP_CLOSE); + mptcp_close_wake_up(sk); + } else if (sk->sk_state == TCP_FIN_WAIT1) { + inet_sk_state_store(sk, TCP_FIN_WAIT2); + } + } - mptcp_for_each_subflow(msk, subflow) { - struct sock *tcp_sk = mptcp_subflow_tcp_sock(subflow); + __mptcp_flush_join_list(msk); + mptcp_for_each_subflow(msk, subflow) { + struct sock *tcp_sk = mptcp_subflow_tcp_sock(subflow); - mptcp_subflow_shutdown(sk, tcp_sk, SHUTDOWN_MASK); - } + mptcp_subflow_shutdown(sk, tcp_sk, SEND_SHUTDOWN); } +} - sk_stream_wait_close(sk, timeout); +static void __mptcp_wr_shutdown(struct sock *sk) +{ + struct mptcp_sock *msk = mptcp_sk(sk); -update_state: - inet_sk_state_store(sk, TCP_CLOSE); + pr_debug("msk=%p snd_data_fin_enable=%d shutdown=%x state=%d pending=%d", + msk, msk->snd_data_fin_enable, sk->sk_shutdown, sk->sk_state, + !!mptcp_send_head(sk)); + + /* will be ignored by fallback sockets */ + WRITE_ONCE(msk->write_seq, msk->write_seq + 1); + WRITE_ONCE(msk->snd_data_fin_enable, 1); + + __mptcp_check_send_data_fin(sk); +} + +static void __mptcp_destroy_sock(struct sock *sk) +{ + struct mptcp_subflow_context *subflow, *tmp; + struct mptcp_sock *msk = mptcp_sk(sk); + LIST_HEAD(conn_list); + + pr_debug("msk=%p", msk); -cleanup: /* be sure to always acquire the join list lock, to sync vs * mptcp_finish_join(). */ @@ -1985,20 +2542,77 @@ cleanup: spin_unlock_bh(&msk->join_list_lock); list_splice_init(&msk->conn_list, &conn_list); - __mptcp_clear_xmit(sk); - - release_sock(sk); + sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer); + sk_stop_timer(sk, &sk->sk_timer); + msk->pm.status = 0; list_for_each_entry_safe(subflow, tmp, &conn_list, node) { struct sock *ssk = mptcp_subflow_tcp_sock(subflow); - __mptcp_close_ssk(sk, ssk, subflow, timeout); + __mptcp_close_ssk(sk, ssk, subflow); } - mptcp_cancel_work(sk); + sk->sk_prot->destroy(sk); + + WARN_ON_ONCE(msk->wmem_reserved); + WARN_ON_ONCE(msk->rmem_released); + sk_stream_kill_queues(sk); + xfrm_sk_free_policy(sk); + sk_refcnt_debug_release(sk); + sock_put(sk); +} + +static void mptcp_close(struct sock *sk, long timeout) +{ + struct mptcp_subflow_context *subflow; + bool do_cancel_work = false; - __skb_queue_purge(&sk->sk_receive_queue); + lock_sock(sk); + sk->sk_shutdown = SHUTDOWN_MASK; - sk_common_release(sk); + if ((1 << sk->sk_state) & (TCPF_LISTEN | TCPF_CLOSE)) { + inet_sk_state_store(sk, TCP_CLOSE); + goto cleanup; + } + + if (mptcp_close_state(sk)) + __mptcp_wr_shutdown(sk); + + sk_stream_wait_close(sk, timeout); + +cleanup: + /* orphan all the subflows */ + inet_csk(sk)->icsk_mtup.probe_timestamp = tcp_jiffies32; + list_for_each_entry(subflow, &mptcp_sk(sk)->conn_list, node) { + struct sock *ssk = mptcp_subflow_tcp_sock(subflow); + bool slow, dispose_socket; + struct socket *sock; + + slow = lock_sock_fast(ssk); + sock = ssk->sk_socket; + dispose_socket = sock && sock != sk->sk_socket; + sock_orphan(ssk); + unlock_sock_fast(ssk, slow); + + /* for the outgoing subflows we additionally need to free + * the associated socket + */ + if (dispose_socket) + iput(SOCK_INODE(sock)); + } + sock_orphan(sk); + + sock_hold(sk); + pr_debug("msk=%p state=%d", sk, sk->sk_state); + if (sk->sk_state == TCP_CLOSE) { + __mptcp_destroy_sock(sk); + do_cancel_work = true; + } else { + sk_reset_timer(sk, &sk->sk_timer, jiffies + TCP_TIMEWAIT_LEN); + } + release_sock(sk); + if (do_cancel_work) + mptcp_cancel_work(sk); + sock_put(sk); } static void mptcp_copy_inaddrs(struct sock *msk, const struct sock *ssk) @@ -2069,13 +2683,17 @@ struct sock *mptcp_sk_clone(const struct sock *sk, WRITE_ONCE(msk->fully_established, false); msk->write_seq = subflow_req->idsn + 1; - atomic64_set(&msk->snd_una, msk->write_seq); + msk->snd_nxt = msk->write_seq; + msk->snd_una = msk->write_seq; + msk->wnd_end = msk->snd_nxt + req->rsk_rcv_wnd; + if (mp_opt->mp_capable) { msk->can_ack = true; msk->remote_key = mp_opt->sndr_key; mptcp_crypto_key_sha(msk->remote_key, NULL, &ack_seq); ack_seq++; WRITE_ONCE(msk->ack_seq, ack_seq); + WRITE_ONCE(msk->rcv_wnd_sent, ack_seq); } sock_reset_flag(nsk, SOCK_RCU_FREE); @@ -2102,6 +2720,8 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk) TCP_INIT_CWND * tp->advmss); if (msk->rcvq_space.space == 0) msk->rcvq_space.space = TCP_INIT_CWND * TCP_MSS_DEFAULT; + + WRITE_ONCE(msk->wnd_end, msk->snd_nxt + tcp_sk(ssk)->snd_wnd); } static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, @@ -2126,7 +2746,6 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, if (sk_is_mptcp(newsk)) { struct mptcp_subflow_context *subflow; struct sock *new_mptcp_sock; - struct sock *ssk = newsk; subflow = mptcp_subflow_ctx(newsk); new_mptcp_sock = subflow->conn; @@ -2141,21 +2760,8 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, /* acquire the 2nd reference for the owning socket */ sock_hold(new_mptcp_sock); - - local_bh_disable(); - bh_lock_sock(new_mptcp_sock); - msk = mptcp_sk(new_mptcp_sock); - msk->first = newsk; - newsk = new_mptcp_sock; - mptcp_copy_inaddrs(newsk, ssk); - list_add(&subflow->node, &msk->conn_list); - - mptcp_rcv_space_init(msk, ssk); - bh_unlock_sock(new_mptcp_sock); - - __MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_MPCAPABLEPASSIVEACK); - local_bh_enable(); + MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_MPCAPABLEPASSIVEACK); } else { MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_MPCAPABLEPASSIVEFALLBACK); @@ -2166,6 +2772,13 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, void mptcp_destroy_common(struct mptcp_sock *msk) { + struct sock *sk = (struct sock *)msk; + + __mptcp_clear_xmit(sk); + + /* move to sk_receive_queue, sk_stream_kill_queues will purge it */ + skb_queue_splice_tail_init(&msk->receive_queue, &sk->sk_receive_queue); + skb_rbtree_purge(&msk->out_of_order_queue); mptcp_token_destroy(msk); mptcp_pm_free_anno_list(msk); @@ -2175,9 +2788,6 @@ static void mptcp_destroy(struct sock *sk) { struct mptcp_sock *msk = mptcp_sk(sk); - if (msk->cached_ext) - __skb_ext_put(msk->cached_ext); - mptcp_destroy_common(msk); sk_sockets_allocated_dec(sk); } @@ -2292,16 +2902,58 @@ static int mptcp_getsockopt(struct sock *sk, int level, int optname, return -EOPNOTSUPP; } -#define MPTCP_DEFERRED_ALL (TCPF_DELACK_TIMER_DEFERRED | \ - TCPF_WRITE_TIMER_DEFERRED) +void __mptcp_data_acked(struct sock *sk) +{ + if (!sock_owned_by_user(sk)) + __mptcp_clean_una(sk); + else + set_bit(MPTCP_CLEAN_UNA, &mptcp_sk(sk)->flags); -/* this is very alike tcp_release_cb() but we must handle differently a - * different set of events - */ + if (mptcp_pending_data_fin_ack(sk)) + mptcp_schedule_work(sk); +} + +void __mptcp_wnd_updated(struct sock *sk, struct sock *ssk) +{ + if (!mptcp_send_head(sk)) + return; + + if (!sock_owned_by_user(sk)) + __mptcp_subflow_push_pending(sk, ssk); + else + set_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->flags); +} + +#define MPTCP_DEFERRED_ALL (TCPF_WRITE_TIMER_DEFERRED) + +/* processes deferred events and flush wmem */ static void mptcp_release_cb(struct sock *sk) { unsigned long flags, nflags; + /* push_pending may touch wmem_reserved, do it before the later + * cleanup + */ + if (test_and_clear_bit(MPTCP_CLEAN_UNA, &mptcp_sk(sk)->flags)) + __mptcp_clean_una(sk); + if (test_and_clear_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->flags)) { + /* mptcp_push_pending() acquires the subflow socket lock + * + * 1) can't be invoked in atomic scope + * 2) must avoid ABBA deadlock with msk socket spinlock: the RX + * datapath acquires the msk socket spinlock while helding + * the subflow socket lock + */ + + spin_unlock_bh(&sk->sk_lock.slock); + mptcp_push_pending(sk, 0); + spin_lock_bh(&sk->sk_lock.slock); + } + + /* clear any wmem reservation and errors */ + __mptcp_update_wmem(sk); + __mptcp_update_rmem(sk); + do { flags = sk->sk_tsq_flags; if (!(flags & MPTCP_DEFERRED_ALL)) @@ -2311,15 +2963,6 @@ static void mptcp_release_cb(struct sock *sk) sock_release_ownership(sk); - if (flags & TCPF_DELACK_TIMER_DEFERRED) { - struct mptcp_sock *msk = mptcp_sk(sk); - struct sock *ssk; - - ssk = mptcp_subflow_recv_lookup(msk); - if (!ssk || !schedule_work(&msk->work)) - __sock_put(sk); - } - if (flags & TCPF_WRITE_TIMER_DEFERRED) { mptcp_retransmit_handler(sk); __sock_put(sk); @@ -2377,9 +3020,11 @@ void mptcp_finish_connect(struct sock *ssk) WRITE_ONCE(msk->remote_key, subflow->remote_key); WRITE_ONCE(msk->local_key, subflow->local_key); WRITE_ONCE(msk->write_seq, subflow->idsn + 1); + WRITE_ONCE(msk->snd_nxt, msk->write_seq); WRITE_ONCE(msk->ack_seq, ack_seq); + WRITE_ONCE(msk->rcv_wnd_sent, ack_seq); WRITE_ONCE(msk->can_ack, 1); - atomic64_set(&msk->snd_una, msk->write_seq); + WRITE_ONCE(msk->snd_una, msk->write_seq); mptcp_pm_new_connection(msk, 0); @@ -2395,9 +3040,9 @@ static void mptcp_sock_graft(struct sock *sk, struct socket *parent) write_unlock_bh(&sk->sk_callback_lock); } -bool mptcp_finish_join(struct sock *sk) +bool mptcp_finish_join(struct sock *ssk) { - struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk); + struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk); struct mptcp_sock *msk = mptcp_sk(subflow->conn); struct sock *parent = (void *)msk; struct socket *parent_sock; @@ -2418,12 +3063,14 @@ bool mptcp_finish_join(struct sock *sk) /* active connections are already on conn_list, and we can't acquire * msk lock here. * use the join list lock as synchronization point and double-check - * msk status to avoid racing with mptcp_close() + * msk status to avoid racing with __mptcp_destroy_sock() */ spin_lock_bh(&msk->join_list_lock); ret = inet_sk_state_load(parent) == TCP_ESTABLISHED; - if (ret && !WARN_ON_ONCE(!list_empty(&subflow->node))) + if (ret && !WARN_ON_ONCE(!list_empty(&subflow->node))) { list_add_tail(&subflow->node, &msk->join_list); + sock_hold(ssk); + } spin_unlock_bh(&msk->join_list_lock); if (!ret) return false; @@ -2432,19 +3079,12 @@ bool mptcp_finish_join(struct sock *sk) * at close time */ parent_sock = READ_ONCE(parent->sk_socket); - if (parent_sock && !sk->sk_socket) - mptcp_sock_graft(sk, parent_sock); + if (parent_sock && !ssk->sk_socket) + mptcp_sock_graft(ssk, parent_sock); subflow->map_seq = READ_ONCE(msk->ack_seq); return true; } -static bool mptcp_memory_free(const struct sock *sk, int wake) -{ - struct mptcp_sock *msk = mptcp_sk(sk); - - return wake ? test_bit(MPTCP_SEND_SPACE, &msk->flags) : true; -} - static struct proto mptcp_prot = { .name = "MPTCP", .owner = THIS_MODULE, @@ -2465,7 +3105,6 @@ static struct proto mptcp_prot = { .sockets_allocated = &mptcp_sockets_allocated, .memory_allocated = &tcp_memory_allocated, .memory_pressure = &tcp_memory_pressure, - .stream_memory_free = mptcp_memory_free, .sysctl_wmem_offset = offsetof(struct net, ipv4.sysctl_tcp_wmem), .sysctl_rmem_offset = offsetof(struct net, ipv4.sysctl_tcp_rmem), .sysctl_mem = sysctl_tcp_mem, @@ -2610,6 +3249,23 @@ static int mptcp_stream_accept(struct socket *sock, struct socket *newsock, if (err == 0 && !mptcp_is_tcpsk(newsock->sk)) { struct mptcp_sock *msk = mptcp_sk(newsock->sk); struct mptcp_subflow_context *subflow; + struct sock *newsk = newsock->sk; + bool slowpath; + + slowpath = lock_sock_fast(newsk); + + /* PM/worker can now acquire the first subflow socket + * lock without racing with listener queue cleanup, + * we can notify it, if needed. + */ + subflow = mptcp_subflow_ctx(msk->first); + list_add(&subflow->node, &msk->conn_list); + sock_hold(msk->first); + if (mptcp_is_fully_established(newsk)) + mptcp_pm_fully_established(msk); + + mptcp_copy_inaddrs(newsk, msk->first); + mptcp_rcv_space_init(msk, msk->first); /* set ssk->sk_socket of accept()ed flows to mptcp socket. * This is needed so NOSPACE flag can be set from tcp stack. @@ -2621,6 +3277,7 @@ static int mptcp_stream_accept(struct socket *sock, struct socket *newsock, if (!ssk->sk_socket) mptcp_sock_graft(ssk, newsock); } + unlock_sock_fast(newsk, slowpath); } if (inet_csk_listen_poll(ssock->sk)) @@ -2639,6 +3296,24 @@ static __poll_t mptcp_check_readable(struct mptcp_sock *msk) 0; } +static __poll_t mptcp_check_writeable(struct mptcp_sock *msk) +{ + struct sock *sk = (struct sock *)msk; + + if (unlikely(sk->sk_shutdown & SEND_SHUTDOWN)) + return 0; + + if (sk_stream_is_writeable(sk)) + return EPOLLOUT | EPOLLWRNORM; + + set_bit(MPTCP_NOSPACE, &msk->flags); + smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */ + if (sk_stream_is_writeable(sk)) + return EPOLLOUT | EPOLLWRNORM; + + return 0; +} + static __poll_t mptcp_poll(struct file *file, struct socket *sock, struct poll_table_struct *wait) { @@ -2657,8 +3332,7 @@ static __poll_t mptcp_poll(struct file *file, struct socket *sock, if (state != TCP_SYN_SENT && state != TCP_SYN_RECV) { mask |= mptcp_check_readable(msk); - if (test_bit(MPTCP_SEND_SPACE, &msk->flags)) - mask |= EPOLLOUT | EPOLLWRNORM; + mask |= mptcp_check_writeable(msk); } if (sk->sk_shutdown & RCV_SHUTDOWN) mask |= EPOLLIN | EPOLLRDNORM | EPOLLRDHUP; @@ -2669,12 +3343,12 @@ static __poll_t mptcp_poll(struct file *file, struct socket *sock, static int mptcp_shutdown(struct socket *sock, int how) { struct mptcp_sock *msk = mptcp_sk(sock->sk); - struct mptcp_subflow_context *subflow; + struct sock *sk = sock->sk; int ret = 0; pr_debug("sk=%p, how=%d", msk, how); - lock_sock(sock->sk); + lock_sock(sk); how++; if ((how & ~SHUTDOWN_MASK) || !how) { @@ -2683,45 +3357,22 @@ static int mptcp_shutdown(struct socket *sock, int how) } if (sock->state == SS_CONNECTING) { - if ((1 << sock->sk->sk_state) & + if ((1 << sk->sk_state) & (TCPF_SYN_SENT | TCPF_SYN_RECV | TCPF_CLOSE)) sock->state = SS_DISCONNECTING; else sock->state = SS_CONNECTED; } - /* If we've already sent a FIN, or it's a closed state, skip this. */ - if (__mptcp_check_fallback(msk)) { - if (how == SHUT_WR || how == SHUT_RDWR) - inet_sk_state_store(sock->sk, TCP_FIN_WAIT1); - - mptcp_for_each_subflow(msk, subflow) { - struct sock *tcp_sk = mptcp_subflow_tcp_sock(subflow); - - mptcp_subflow_shutdown(sock->sk, tcp_sk, how); - } - } else if ((how & SEND_SHUTDOWN) && - ((1 << sock->sk->sk_state) & - (TCPF_ESTABLISHED | TCPF_SYN_SENT | - TCPF_SYN_RECV | TCPF_CLOSE_WAIT)) && - mptcp_close_state(sock->sk)) { - __mptcp_flush_join_list(msk); - - WRITE_ONCE(msk->write_seq, msk->write_seq + 1); - WRITE_ONCE(msk->snd_data_fin_enable, 1); - - mptcp_for_each_subflow(msk, subflow) { - struct sock *tcp_sk = mptcp_subflow_tcp_sock(subflow); - - mptcp_subflow_shutdown(sock->sk, tcp_sk, how); - } - } + sk->sk_shutdown |= how; + if ((how & SEND_SHUTDOWN) && mptcp_close_state(sk)) + __mptcp_wr_shutdown(sk); /* Wake up anyone sleeping in poll. */ - sock->sk->sk_state_change(sock->sk); + sk->sk_state_change(sk); out_unlock: - release_sock(sock->sk); + release_sock(sk); return ret; } |