summaryrefslogtreecommitdiffstats
path: root/net/rxrpc/recvmsg.c
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2016-08-30 21:42:14 +0200
committerDavid S. Miller <davem@davemloft.net>2016-09-02 01:43:27 +0200
commitd001648ec7cf8b21ae9eec8b9ba4a18295adfb14 (patch)
tree830a6ec7dbc683675ba088750caeb5eafb4c8012 /net/rxrpc/recvmsg.c
parentnet: pegasus: Remove deprecated create_singlethread_workqueue (diff)
downloadlinux-d001648ec7cf8b21ae9eec8b9ba4a18295adfb14.tar.xz
linux-d001648ec7cf8b21ae9eec8b9ba4a18295adfb14.zip
rxrpc: Don't expose skbs to in-kernel users [ver #2]
Don't expose skbs to in-kernel users, such as the AFS filesystem, but instead provide a notification hook the indicates that a call needs attention and another that indicates that there's a new call to be collected. This makes the following possibilities more achievable: (1) Call refcounting can be made simpler if skbs don't hold refs to calls. (2) skbs referring to non-data events will be able to be freed much sooner rather than being queued for AFS to pick up as rxrpc_kernel_recv_data will be able to consult the call state. (3) We can shortcut the receive phase when a call is remotely aborted because we don't have to go through all the packets to get to the one cancelling the operation. (4) It makes it easier to do encryption/decryption directly between AFS's buffers and sk_buffs. (5) Encryption/decryption can more easily be done in the AFS's thread contexts - usually that of the userspace process that issued a syscall - rather than in one of rxrpc's background threads on a workqueue. (6) AFS will be able to wait synchronously on a call inside AF_RXRPC. To make this work, the following interface function has been added: int rxrpc_kernel_recv_data( struct socket *sock, struct rxrpc_call *call, void *buffer, size_t bufsize, size_t *_offset, bool want_more, u32 *_abort_code); This is the recvmsg equivalent. It allows the caller to find out about the state of a specific call and to transfer received data into a buffer piecemeal. afs_extract_data() and rxrpc_kernel_recv_data() now do all the extraction logic between them. They don't wait synchronously yet because the socket lock needs to be dealt with. Five interface functions have been removed: rxrpc_kernel_is_data_last() rxrpc_kernel_get_abort_code() rxrpc_kernel_get_error_number() rxrpc_kernel_free_skb() rxrpc_kernel_data_consumed() As a temporary hack, sk_buffs going to an in-kernel call are queued on the rxrpc_call struct (->knlrecv_queue) rather than being handed over to the in-kernel user. To process the queue internally, a temporary function, temp_deliver_data() has been added. This will be replaced with common code between the rxrpc_recvmsg() path and the kernel_rxrpc_recv_data() path in a future patch. Signed-off-by: David Howells <dhowells@redhat.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/rxrpc/recvmsg.c')
-rw-r--r--net/rxrpc/recvmsg.c191
1 files changed, 157 insertions, 34 deletions
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index c9b38c7fb448..0ab7b334bab1 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -369,55 +369,178 @@ wait_error:
}
-/**
- * rxrpc_kernel_is_data_last - Determine if data message is last one
- * @skb: Message holding data
+/*
+ * Deliver messages to a call. This keeps processing packets until the buffer
+ * is filled and we find either more DATA (returns 0) or the end of the DATA
+ * (returns 1). If more packets are required, it returns -EAGAIN.
*
- * Determine if data message is last one for the parent call.
+ * TODO: Note that this is hacked in at the moment and will be replaced.
*/
-bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
+static int temp_deliver_data(struct socket *sock, struct rxrpc_call *call,
+ struct iov_iter *iter, size_t size,
+ size_t *_offset)
{
- struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ struct rxrpc_skb_priv *sp;
+ struct sk_buff *skb;
+ size_t remain;
+ int ret, copy;
+
+ _enter("%d", call->debug_id);
+
+next:
+ local_bh_disable();
+ skb = skb_dequeue(&call->knlrecv_queue);
+ local_bh_enable();
+ if (!skb) {
+ if (test_bit(RXRPC_CALL_RX_NO_MORE, &call->flags))
+ return 1;
+ _leave(" = -EAGAIN [empty]");
+ return -EAGAIN;
+ }
- ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);
+ sp = rxrpc_skb(skb);
+ _debug("dequeued %p %u/%zu", skb, sp->offset, size);
- return sp->hdr.flags & RXRPC_LAST_PACKET;
-}
+ switch (skb->mark) {
+ case RXRPC_SKB_MARK_DATA:
+ remain = size - *_offset;
+ if (remain > 0) {
+ copy = skb->len - sp->offset;
+ if (copy > remain)
+ copy = remain;
+ ret = skb_copy_datagram_iter(skb, sp->offset, iter,
+ copy);
+ if (ret < 0)
+ goto requeue_and_leave;
-EXPORT_SYMBOL(rxrpc_kernel_is_data_last);
+ /* handle piecemeal consumption of data packets */
+ sp->offset += copy;
+ *_offset += copy;
+ }
-/**
- * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
- * @skb: Message indicating an abort
- *
- * Get the abort code from an RxRPC abort message.
- */
-u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
-{
- struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ if (sp->offset < skb->len)
+ goto partially_used_skb;
+
+ /* We consumed the whole packet */
+ ASSERTCMP(sp->offset, ==, skb->len);
+ if (sp->hdr.flags & RXRPC_LAST_PACKET)
+ set_bit(RXRPC_CALL_RX_NO_MORE, &call->flags);
+ rxrpc_kernel_data_consumed(call, skb);
+ rxrpc_free_skb(skb);
+ goto next;
- switch (skb->mark) {
- case RXRPC_SKB_MARK_REMOTE_ABORT:
- case RXRPC_SKB_MARK_LOCAL_ABORT:
- return sp->call->abort_code;
default:
- BUG();
+ rxrpc_free_skb(skb);
+ goto next;
}
-}
-EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);
+partially_used_skb:
+ ASSERTCMP(*_offset, ==, size);
+ ret = 0;
+requeue_and_leave:
+ skb_queue_head(&call->knlrecv_queue, skb);
+ return ret;
+}
/**
- * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
- * @skb: Message indicating an error
+ * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
+ * @sock: The socket that the call exists on
+ * @call: The call to send data through
+ * @buf: The buffer to receive into
+ * @size: The size of the buffer, including data already read
+ * @_offset: The running offset into the buffer.
+ * @want_more: True if more data is expected to be read
+ * @_abort: Where the abort code is stored if -ECONNABORTED is returned
+ *
+ * Allow a kernel service to receive data and pick up information about the
+ * state of a call. Returns 0 if got what was asked for and there's more
+ * available, 1 if we got what was asked for and we're at the end of the data
+ * and -EAGAIN if we need more data.
+ *
+ * Note that we may return -EAGAIN to drain empty packets at the end of the
+ * data, even if we've already copied over the requested data.
*
- * Get the error number from an RxRPC error message.
+ * This function adds the amount it transfers to *_offset, so this should be
+ * precleared as appropriate. Note that the amount remaining in the buffer is
+ * taken to be size - *_offset.
+ *
+ * *_abort should also be initialised to 0.
*/
-int rxrpc_kernel_get_error_number(struct sk_buff *skb)
+int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
+ void *buf, size_t size, size_t *_offset,
+ bool want_more, u32 *_abort)
{
- struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ struct iov_iter iter;
+ struct kvec iov;
+ int ret;
- return sp->error;
-}
+ _enter("{%d,%s},%zu,%d",
+ call->debug_id, rxrpc_call_states[call->state], size, want_more);
+
+ ASSERTCMP(*_offset, <=, size);
+ ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
-EXPORT_SYMBOL(rxrpc_kernel_get_error_number);
+ iov.iov_base = buf + *_offset;
+ iov.iov_len = size - *_offset;
+ iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
+
+ lock_sock(sock->sk);
+
+ switch (call->state) {
+ case RXRPC_CALL_CLIENT_RECV_REPLY:
+ case RXRPC_CALL_SERVER_RECV_REQUEST:
+ case RXRPC_CALL_SERVER_ACK_REQUEST:
+ ret = temp_deliver_data(sock, call, &iter, size, _offset);
+ if (ret < 0)
+ goto out;
+
+ /* We can only reach here with a partially full buffer if we
+ * have reached the end of the data. We must otherwise have a
+ * full buffer or have been given -EAGAIN.
+ */
+ if (ret == 1) {
+ if (*_offset < size)
+ goto short_data;
+ if (!want_more)
+ goto read_phase_complete;
+ ret = 0;
+ goto out;
+ }
+
+ if (!want_more)
+ goto excess_data;
+ goto out;
+
+ case RXRPC_CALL_COMPLETE:
+ goto call_complete;
+
+ default:
+ *_offset = 0;
+ ret = -EINPROGRESS;
+ goto out;
+ }
+
+read_phase_complete:
+ ret = 1;
+out:
+ release_sock(sock->sk);
+ _leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
+ return ret;
+
+short_data:
+ ret = -EBADMSG;
+ goto out;
+excess_data:
+ ret = -EMSGSIZE;
+ goto out;
+call_complete:
+ *_abort = call->abort_code;
+ ret = call->error;
+ if (call->completion == RXRPC_CALL_SUCCEEDED) {
+ ret = 1;
+ if (size > 0)
+ ret = -ECONNRESET;
+ }
+ goto out;
+}
+EXPORT_SYMBOL(rxrpc_kernel_recv_data);