From a68f4a27f55f1d54e35c270aff89383da4b1b656 Mon Sep 17 00:00:00 2001 From: David Howells Date: Wed, 18 Oct 2017 11:36:39 +0100 Subject: rxrpc: Support service upgrade from a kernel service Provide support for a kernel service to make use of the service upgrade facility. This involves: (1) Pass an upgrade request flag to rxrpc_kernel_begin_call(). (2) Make rxrpc_kernel_recv_data() return the call's current service ID so that the caller can detect service upgrade and see what the service was upgraded to. Signed-off-by: David Howells --- fs/afs/internal.h | 1 + fs/afs/rxrpc.c | 11 +++++++---- 2 files changed, 8 insertions(+), 4 deletions(-) (limited to 'fs') diff --git a/fs/afs/internal.h b/fs/afs/internal.h index 82e16556afea..3f03f7888302 100644 --- a/fs/afs/internal.h +++ b/fs/afs/internal.h @@ -100,6 +100,7 @@ struct afs_call { bool send_pages; /* T if data from mapping should be sent */ bool need_attention; /* T if RxRPC poked us */ bool async; /* T if asynchronous */ + bool upgrade; /* T to request service upgrade */ u16 service_id; /* RxRPC service ID to call */ __be16 port; /* target UDP port */ u32 operation_ID; /* operation ID for an incoming call */ diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c index 0bf191f0dbaf..172a4f9747ac 100644 --- a/fs/afs/rxrpc.c +++ b/fs/afs/rxrpc.c @@ -387,7 +387,8 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp, tx_total_len, gfp, (async ? afs_wake_up_async_call : - afs_wake_up_call_waiter)); + afs_wake_up_call_waiter), + call->upgrade); call->key = NULL; if (IS_ERR(rxcall)) { ret = PTR_ERR(rxcall); @@ -443,7 +444,7 @@ error_do_abort: abort_code = 0; offset = 0; rxrpc_kernel_recv_data(afs_socket, rxcall, NULL, 0, &offset, - false, &abort_code); + false, &abort_code, &call->service_id); ret = call->type->abort_to_error(abort_code); } error_kill_call: @@ -471,7 +472,8 @@ static void afs_deliver_to_call(struct afs_call *call) size_t offset = 0; ret = rxrpc_kernel_recv_data(afs_socket, call->rxcall, NULL, 0, &offset, false, - &call->abort_code); + &call->abort_code, + &call->service_id); trace_afs_recv_data(call, 0, offset, false, ret); if (ret == -EINPROGRESS || ret == -EAGAIN) @@ -851,7 +853,8 @@ int afs_extract_data(struct afs_call *call, void *buf, size_t count, ret = rxrpc_kernel_recv_data(afs_socket, call->rxcall, buf, count, &call->offset, - want_more, &call->abort_code); + want_more, &call->abort_code, + &call->service_id); trace_afs_recv_data(call, count, call->offset, want_more, ret); if (ret == 0 || ret == -EAGAIN) return ret; -- cgit v1.2.3 From bc5e3a546d553e5223851fc199e69040eb70f68b Mon Sep 17 00:00:00 2001 From: David Howells Date: Wed, 18 Oct 2017 11:07:31 +0100 Subject: rxrpc: Use MSG_WAITALL to tell sendmsg() to temporarily ignore signals Make AF_RXRPC accept MSG_WAITALL as a flag to sendmsg() to tell it to ignore signals whilst loading up the message queue, provided progress is being made in emptying the queue at the other side. Progress is defined as the base of the transmit window having being advanced within 2 RTT periods. If the period is exceeded with no progress, sendmsg() will return anyway, indicating how much data has been copied, if any. Once the supplied buffer is entirely decanted, the sendmsg() will return. Signed-off-by: David Howells --- Documentation/networking/rxrpc.txt | 12 +++++ fs/afs/rxrpc.c | 31 +++++++++-- net/rxrpc/sendmsg.c | 107 ++++++++++++++++++++++++++++--------- 3 files changed, 119 insertions(+), 31 deletions(-) (limited to 'fs') diff --git a/Documentation/networking/rxrpc.txt b/Documentation/networking/rxrpc.txt index 1fb5c553aedd..b5407163d53b 100644 --- a/Documentation/networking/rxrpc.txt +++ b/Documentation/networking/rxrpc.txt @@ -280,6 +280,18 @@ Interaction with the user of the RxRPC socket: nominated by a socket option. +Notes on sendmsg: + + (*) MSG_WAITALL can be set to tell sendmsg to ignore signals if the peer is + making progress at accepting packets within a reasonable time such that we + manage to queue up all the data for transmission. This requires the + client to accept at least one packet per 2*RTT time period. + + If this isn't set, sendmsg() will return immediately, either returning + EINTR/ERESTARTSYS if nothing was consumed or returning the amount of data + consumed. + + Notes on recvmsg: (*) If there's a sequence of data messages belonging to a particular call on diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c index 172a4f9747ac..bb1e2caa1720 100644 --- a/fs/afs/rxrpc.c +++ b/fs/afs/rxrpc.c @@ -407,7 +407,7 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp, call->request_size); msg.msg_control = NULL; msg.msg_controllen = 0; - msg.msg_flags = (call->send_pages ? MSG_MORE : 0); + msg.msg_flags = MSG_WAITALL | (call->send_pages ? MSG_MORE : 0); /* We have to change the state *before* sending the last packet as * rxrpc might give us the reply before it returns from sending the @@ -538,15 +538,26 @@ call_complete: */ static int afs_wait_for_call_to_complete(struct afs_call *call) { + signed long rtt2, timeout; int ret; + u64 rtt; + u32 life, last_life; DECLARE_WAITQUEUE(myself, current); _enter(""); + rtt = rxrpc_kernel_get_rtt(afs_socket, call->rxcall); + rtt2 = nsecs_to_jiffies64(rtt) * 2; + if (rtt2 < 2) + rtt2 = 2; + + timeout = rtt2; + last_life = rxrpc_kernel_check_life(afs_socket, call->rxcall); + add_wait_queue(&call->waitq, &myself); for (;;) { - set_current_state(TASK_INTERRUPTIBLE); + set_current_state(TASK_UNINTERRUPTIBLE); /* deliver any messages that are in the queue */ if (call->state < AFS_CALL_COMPLETE && call->need_attention) { @@ -556,10 +567,20 @@ static int afs_wait_for_call_to_complete(struct afs_call *call) continue; } - if (call->state == AFS_CALL_COMPLETE || - signal_pending(current)) + if (call->state == AFS_CALL_COMPLETE) break; - schedule(); + + life = rxrpc_kernel_check_life(afs_socket, call->rxcall); + if (timeout == 0 && + life == last_life && signal_pending(current)) + break; + + if (life != last_life) { + timeout = rtt2; + last_life = life; + } + + timeout = schedule_timeout(timeout); } remove_wait_queue(&call->waitq, &myself); diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c index 9ea6f972767e..2d9edc656ca3 100644 --- a/net/rxrpc/sendmsg.c +++ b/net/rxrpc/sendmsg.c @@ -37,13 +37,87 @@ struct rxrpc_send_params { bool upgrade; /* If the connection is upgradeable */ }; +/* + * Wait for space to appear in the Tx queue or a signal to occur. + */ +static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx, + struct rxrpc_call *call, + long *timeo) +{ + for (;;) { + set_current_state(TASK_INTERRUPTIBLE); + if (call->tx_top - call->tx_hard_ack < + min_t(unsigned int, call->tx_winsize, + call->cong_cwnd + call->cong_extra)) + return 0; + + if (call->state >= RXRPC_CALL_COMPLETE) + return call->error; + + if (signal_pending(current)) + return sock_intr_errno(*timeo); + + trace_rxrpc_transmit(call, rxrpc_transmit_wait); + mutex_unlock(&call->user_mutex); + *timeo = schedule_timeout(*timeo); + if (mutex_lock_interruptible(&call->user_mutex) < 0) + return sock_intr_errno(*timeo); + } +} + +/* + * Wait for space to appear in the Tx queue uninterruptibly, but with + * a timeout of 2*RTT if no progress was made and a signal occurred. + */ +static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx, + struct rxrpc_call *call) +{ + rxrpc_seq_t tx_start, tx_win; + signed long rtt2, timeout; + u64 rtt; + + rtt = READ_ONCE(call->peer->rtt); + rtt2 = nsecs_to_jiffies64(rtt) * 2; + if (rtt2 < 1) + rtt2 = 1; + + timeout = rtt2; + tx_start = READ_ONCE(call->tx_hard_ack); + + for (;;) { + set_current_state(TASK_UNINTERRUPTIBLE); + + tx_win = READ_ONCE(call->tx_hard_ack); + if (call->tx_top - tx_win < + min_t(unsigned int, call->tx_winsize, + call->cong_cwnd + call->cong_extra)) + return 0; + + if (call->state >= RXRPC_CALL_COMPLETE) + return call->error; + + if (timeout == 0 && + tx_win == tx_start && signal_pending(current)) + return -EINTR; + + if (tx_win != tx_start) { + timeout = rtt2; + tx_start = tx_win; + } + + trace_rxrpc_transmit(call, rxrpc_transmit_wait); + timeout = schedule_timeout(timeout); + } +} + /* * wait for space to appear in the transmit/ACK window * - caller holds the socket locked */ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx, struct rxrpc_call *call, - long *timeo) + long *timeo, + bool waitall) { DECLARE_WAITQUEUE(myself, current); int ret; @@ -53,30 +127,10 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx, add_wait_queue(&call->waitq, &myself); - for (;;) { - set_current_state(TASK_INTERRUPTIBLE); - ret = 0; - if (call->tx_top - call->tx_hard_ack < - min_t(unsigned int, call->tx_winsize, - call->cong_cwnd + call->cong_extra)) - break; - if (call->state >= RXRPC_CALL_COMPLETE) { - ret = call->error; - break; - } - if (signal_pending(current)) { - ret = sock_intr_errno(*timeo); - break; - } - - trace_rxrpc_transmit(call, rxrpc_transmit_wait); - mutex_unlock(&call->user_mutex); - *timeo = schedule_timeout(*timeo); - if (mutex_lock_interruptible(&call->user_mutex) < 0) { - ret = sock_intr_errno(*timeo); - break; - } - } + if (waitall) + ret = rxrpc_wait_for_tx_window_nonintr(rx, call); + else + ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo); remove_wait_queue(&call->waitq, &myself); set_current_state(TASK_RUNNING); @@ -254,7 +308,8 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, if (msg->msg_flags & MSG_DONTWAIT) goto maybe_error; ret = rxrpc_wait_for_tx_window(rx, call, - &timeo); + &timeo, + msg->msg_flags & MSG_WAITALL); if (ret < 0) goto maybe_error; } -- cgit v1.2.3