diff options
author | David Howells <dhowells@redhat.com> | 2016-08-30 21:42:14 +0200 |
---|---|---|
committer | David S. Miller <davem@davemloft.net> | 2016-09-02 01:43:27 +0200 |
commit | d001648ec7cf8b21ae9eec8b9ba4a18295adfb14 (patch) | |
tree | 830a6ec7dbc683675ba088750caeb5eafb4c8012 /net/rxrpc/recvmsg.c | |
parent | net: pegasus: Remove deprecated create_singlethread_workqueue (diff) | |
download | linux-d001648ec7cf8b21ae9eec8b9ba4a18295adfb14.tar.xz linux-d001648ec7cf8b21ae9eec8b9ba4a18295adfb14.zip |
rxrpc: Don't expose skbs to in-kernel users [ver #2]
Don't expose skbs to in-kernel users, such as the AFS filesystem, but
instead provide a notification hook the indicates that a call needs
attention and another that indicates that there's a new call to be
collected.
This makes the following possibilities more achievable:
(1) Call refcounting can be made simpler if skbs don't hold refs to calls.
(2) skbs referring to non-data events will be able to be freed much sooner
rather than being queued for AFS to pick up as rxrpc_kernel_recv_data
will be able to consult the call state.
(3) We can shortcut the receive phase when a call is remotely aborted
because we don't have to go through all the packets to get to the one
cancelling the operation.
(4) It makes it easier to do encryption/decryption directly between AFS's
buffers and sk_buffs.
(5) Encryption/decryption can more easily be done in the AFS's thread
contexts - usually that of the userspace process that issued a syscall
- rather than in one of rxrpc's background threads on a workqueue.
(6) AFS will be able to wait synchronously on a call inside AF_RXRPC.
To make this work, the following interface function has been added:
int rxrpc_kernel_recv_data(
struct socket *sock, struct rxrpc_call *call,
void *buffer, size_t bufsize, size_t *_offset,
bool want_more, u32 *_abort_code);
This is the recvmsg equivalent. It allows the caller to find out about the
state of a specific call and to transfer received data into a buffer
piecemeal.
afs_extract_data() and rxrpc_kernel_recv_data() now do all the extraction
logic between them. They don't wait synchronously yet because the socket
lock needs to be dealt with.
Five interface functions have been removed:
rxrpc_kernel_is_data_last()
rxrpc_kernel_get_abort_code()
rxrpc_kernel_get_error_number()
rxrpc_kernel_free_skb()
rxrpc_kernel_data_consumed()
As a temporary hack, sk_buffs going to an in-kernel call are queued on the
rxrpc_call struct (->knlrecv_queue) rather than being handed over to the
in-kernel user. To process the queue internally, a temporary function,
temp_deliver_data() has been added. This will be replaced with common code
between the rxrpc_recvmsg() path and the kernel_rxrpc_recv_data() path in a
future patch.
Signed-off-by: David Howells <dhowells@redhat.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/rxrpc/recvmsg.c')
-rw-r--r-- | net/rxrpc/recvmsg.c | 191 |
1 files changed, 157 insertions, 34 deletions
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c index c9b38c7fb448..0ab7b334bab1 100644 --- a/net/rxrpc/recvmsg.c +++ b/net/rxrpc/recvmsg.c @@ -369,55 +369,178 @@ wait_error: } -/** - * rxrpc_kernel_is_data_last - Determine if data message is last one - * @skb: Message holding data +/* + * Deliver messages to a call. This keeps processing packets until the buffer + * is filled and we find either more DATA (returns 0) or the end of the DATA + * (returns 1). If more packets are required, it returns -EAGAIN. * - * Determine if data message is last one for the parent call. + * TODO: Note that this is hacked in at the moment and will be replaced. */ -bool rxrpc_kernel_is_data_last(struct sk_buff *skb) +static int temp_deliver_data(struct socket *sock, struct rxrpc_call *call, + struct iov_iter *iter, size_t size, + size_t *_offset) { - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + struct rxrpc_skb_priv *sp; + struct sk_buff *skb; + size_t remain; + int ret, copy; + + _enter("%d", call->debug_id); + +next: + local_bh_disable(); + skb = skb_dequeue(&call->knlrecv_queue); + local_bh_enable(); + if (!skb) { + if (test_bit(RXRPC_CALL_RX_NO_MORE, &call->flags)) + return 1; + _leave(" = -EAGAIN [empty]"); + return -EAGAIN; + } - ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA); + sp = rxrpc_skb(skb); + _debug("dequeued %p %u/%zu", skb, sp->offset, size); - return sp->hdr.flags & RXRPC_LAST_PACKET; -} + switch (skb->mark) { + case RXRPC_SKB_MARK_DATA: + remain = size - *_offset; + if (remain > 0) { + copy = skb->len - sp->offset; + if (copy > remain) + copy = remain; + ret = skb_copy_datagram_iter(skb, sp->offset, iter, + copy); + if (ret < 0) + goto requeue_and_leave; -EXPORT_SYMBOL(rxrpc_kernel_is_data_last); + /* handle piecemeal consumption of data packets */ + sp->offset += copy; + *_offset += copy; + } -/** - * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message - * @skb: Message indicating an abort - * - * Get the abort code from an RxRPC abort message. - */ -u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb) -{ - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + if (sp->offset < skb->len) + goto partially_used_skb; + + /* We consumed the whole packet */ + ASSERTCMP(sp->offset, ==, skb->len); + if (sp->hdr.flags & RXRPC_LAST_PACKET) + set_bit(RXRPC_CALL_RX_NO_MORE, &call->flags); + rxrpc_kernel_data_consumed(call, skb); + rxrpc_free_skb(skb); + goto next; - switch (skb->mark) { - case RXRPC_SKB_MARK_REMOTE_ABORT: - case RXRPC_SKB_MARK_LOCAL_ABORT: - return sp->call->abort_code; default: - BUG(); + rxrpc_free_skb(skb); + goto next; } -} -EXPORT_SYMBOL(rxrpc_kernel_get_abort_code); +partially_used_skb: + ASSERTCMP(*_offset, ==, size); + ret = 0; +requeue_and_leave: + skb_queue_head(&call->knlrecv_queue, skb); + return ret; +} /** - * rxrpc_kernel_get_error - Get the error number from an RxRPC error message - * @skb: Message indicating an error + * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info + * @sock: The socket that the call exists on + * @call: The call to send data through + * @buf: The buffer to receive into + * @size: The size of the buffer, including data already read + * @_offset: The running offset into the buffer. + * @want_more: True if more data is expected to be read + * @_abort: Where the abort code is stored if -ECONNABORTED is returned + * + * Allow a kernel service to receive data and pick up information about the + * state of a call. Returns 0 if got what was asked for and there's more + * available, 1 if we got what was asked for and we're at the end of the data + * and -EAGAIN if we need more data. + * + * Note that we may return -EAGAIN to drain empty packets at the end of the + * data, even if we've already copied over the requested data. * - * Get the error number from an RxRPC error message. + * This function adds the amount it transfers to *_offset, so this should be + * precleared as appropriate. Note that the amount remaining in the buffer is + * taken to be size - *_offset. + * + * *_abort should also be initialised to 0. */ -int rxrpc_kernel_get_error_number(struct sk_buff *skb) +int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, + void *buf, size_t size, size_t *_offset, + bool want_more, u32 *_abort) { - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + struct iov_iter iter; + struct kvec iov; + int ret; - return sp->error; -} + _enter("{%d,%s},%zu,%d", + call->debug_id, rxrpc_call_states[call->state], size, want_more); + + ASSERTCMP(*_offset, <=, size); + ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING); -EXPORT_SYMBOL(rxrpc_kernel_get_error_number); + iov.iov_base = buf + *_offset; + iov.iov_len = size - *_offset; + iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset); + + lock_sock(sock->sk); + + switch (call->state) { + case RXRPC_CALL_CLIENT_RECV_REPLY: + case RXRPC_CALL_SERVER_RECV_REQUEST: + case RXRPC_CALL_SERVER_ACK_REQUEST: + ret = temp_deliver_data(sock, call, &iter, size, _offset); + if (ret < 0) + goto out; + + /* We can only reach here with a partially full buffer if we + * have reached the end of the data. We must otherwise have a + * full buffer or have been given -EAGAIN. + */ + if (ret == 1) { + if (*_offset < size) + goto short_data; + if (!want_more) + goto read_phase_complete; + ret = 0; + goto out; + } + + if (!want_more) + goto excess_data; + goto out; + + case RXRPC_CALL_COMPLETE: + goto call_complete; + + default: + *_offset = 0; + ret = -EINPROGRESS; + goto out; + } + +read_phase_complete: + ret = 1; +out: + release_sock(sock->sk); + _leave(" = %d [%zu,%d]", ret, *_offset, *_abort); + return ret; + +short_data: + ret = -EBADMSG; + goto out; +excess_data: + ret = -EMSGSIZE; + goto out; +call_complete: + *_abort = call->abort_code; + ret = call->error; + if (call->completion == RXRPC_CALL_SUCCEEDED) { + ret = 1; + if (size > 0) + ret = -ECONNRESET; + } + goto out; +} +EXPORT_SYMBOL(rxrpc_kernel_recv_data); |