diff options
author | David S. Miller <davem@davemloft.net> | 2016-08-31 07:08:52 +0200 |
---|---|---|
committer | David S. Miller <davem@davemloft.net> | 2016-08-31 07:08:52 +0200 |
commit | 792438e59ab64bf44d559fbc9c1a42f878c50b4d (patch) | |
tree | df8353d249f3103aad19f72c7a7750d71133f9d9 | |
parent | drivers: net: stmmac: fix spelling mistake "mulitcast" -> "multicast" (diff) | |
parent | rxrpc: Pass struct socket * to more rxrpc kernel interface functions (diff) | |
download | linux-792438e59ab64bf44d559fbc9c1a42f878c50b4d.tar.xz linux-792438e59ab64bf44d559fbc9c1a42f878c50b4d.zip |
Merge tag 'rxrpc-rewrite-20160830-1' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs
David Howells says:
====================
rxrpc: Preparation for removal of use of skbs from AFS
Here's a set of patches that prepare the way for the removal of the use of
sk_buffs from fs/afs (they'll be entirely retained within net/rxrpc):
(1) Fix a potential NULL-pointer deref in rxrpc_abort_calls().
(2) Condense all the terminal call state machine states to a single one
plus supplementary info.
(3) Add a trace point for rxrpc call usage debugging.
(4) Cleanups and missing headers.
(5) Provide a way for AFS to ask about a call's peer address without
having an sk_buff to query.
(6) Use call->peer directly rather than going via call->conn (which might
be NULL).
(7) Pass struct socket * to various rxrpc kernel interface functions so
they can use that directly rather than getting it from the rxrpc_call
struct.
====================
Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r-- | Documentation/networking/rxrpc.txt | 18 | ||||
-rw-r--r-- | fs/afs/cmservice.c | 26 | ||||
-rw-r--r-- | fs/afs/internal.h | 5 | ||||
-rw-r--r-- | fs/afs/main.c | 1 | ||||
-rw-r--r-- | fs/afs/rxrpc.c | 28 | ||||
-rw-r--r-- | fs/afs/server.c | 11 | ||||
-rw-r--r-- | include/net/af_rxrpc.h | 12 | ||||
-rw-r--r-- | include/trace/events/rxrpc.h | 39 | ||||
-rw-r--r-- | net/rxrpc/af_rxrpc.c | 5 | ||||
-rw-r--r-- | net/rxrpc/ar-internal.h | 135 | ||||
-rw-r--r-- | net/rxrpc/call_accept.c | 24 | ||||
-rw-r--r-- | net/rxrpc/call_event.c | 63 | ||||
-rw-r--r-- | net/rxrpc/call_object.c | 133 | ||||
-rw-r--r-- | net/rxrpc/conn_client.c | 3 | ||||
-rw-r--r-- | net/rxrpc/conn_event.c | 53 | ||||
-rw-r--r-- | net/rxrpc/conn_object.c | 4 | ||||
-rw-r--r-- | net/rxrpc/input.c | 72 | ||||
-rw-r--r-- | net/rxrpc/output.c | 48 | ||||
-rw-r--r-- | net/rxrpc/peer_event.c | 25 | ||||
-rw-r--r-- | net/rxrpc/peer_object.c | 15 | ||||
-rw-r--r-- | net/rxrpc/proc.c | 3 | ||||
-rw-r--r-- | net/rxrpc/recvmsg.c | 13 | ||||
-rw-r--r-- | net/rxrpc/skbuff.c | 4 |
23 files changed, 470 insertions, 270 deletions
diff --git a/Documentation/networking/rxrpc.txt b/Documentation/networking/rxrpc.txt index 70c926ae212d..cfc8cb91452f 100644 --- a/Documentation/networking/rxrpc.txt +++ b/Documentation/networking/rxrpc.txt @@ -725,7 +725,8 @@ The kernel interface functions are as follows: (*) End a client call. - void rxrpc_kernel_end_call(struct rxrpc_call *call); + void rxrpc_kernel_end_call(struct socket *sock, + struct rxrpc_call *call); This is used to end a previously begun call. The user_call_ID is expunged from AF_RXRPC's knowledge and will not be seen again in association with @@ -733,7 +734,9 @@ The kernel interface functions are as follows: (*) Send data through a call. - int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg, + int rxrpc_kernel_send_data(struct socket *sock, + struct rxrpc_call *call, + struct msghdr *msg, size_t len); This is used to supply either the request part of a client call or the @@ -747,7 +750,9 @@ The kernel interface functions are as follows: (*) Abort a call. - void rxrpc_kernel_abort_call(struct rxrpc_call *call, u32 abort_code); + void rxrpc_kernel_abort_call(struct socket *sock, + struct rxrpc_call *call, + u32 abort_code); This is used to abort a call if it's still in an abortable state. The abort code specified will be placed in the ABORT message sent. @@ -868,6 +873,13 @@ The kernel interface functions are as follows: This is used to allocate a null RxRPC key that can be used to indicate anonymous security for a particular domain. + (*) Get the peer address of a call. + + void rxrpc_kernel_get_peer(struct socket *sock, struct rxrpc_call *call, + struct sockaddr_rxrpc *_srx); + + This is used to find the remote peer address of a call. + ======================= CONFIGURABLE PARAMETERS diff --git a/fs/afs/cmservice.c b/fs/afs/cmservice.c index 85737e96ab8b..77ee481059ac 100644 --- a/fs/afs/cmservice.c +++ b/fs/afs/cmservice.c @@ -17,10 +17,6 @@ #include "internal.h" #include "afs_cm.h" -#if 0 -struct workqueue_struct *afs_cm_workqueue; -#endif /* 0 */ - static int afs_deliver_cb_init_call_back_state(struct afs_call *, struct sk_buff *, bool); static int afs_deliver_cb_init_call_back_state3(struct afs_call *, @@ -171,9 +167,9 @@ static void SRXAFSCB_CallBack(struct work_struct *work) static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb, bool last) { + struct sockaddr_rxrpc srx; struct afs_callback *cb; struct afs_server *server; - struct in_addr addr; __be32 *bp; u32 tmp; int ret, loop; @@ -182,6 +178,7 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb, switch (call->unmarshall) { case 0: + rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx); call->offset = 0; call->unmarshall++; @@ -282,13 +279,11 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb, break; } - call->state = AFS_CALL_REPLYING; /* we'll need the file server record as that tells us which set of * vnodes to operate upon */ - memcpy(&addr, &ip_hdr(skb)->saddr, 4); - server = afs_find_server(&addr); + server = afs_find_server(&srx); if (!server) return -ENOTCONN; call->server = server; @@ -319,12 +314,14 @@ static int afs_deliver_cb_init_call_back_state(struct afs_call *call, struct sk_buff *skb, bool last) { + struct sockaddr_rxrpc srx; struct afs_server *server; - struct in_addr addr; int ret; _enter(",{%u},%d", skb->len, last); + rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx); + ret = afs_data_complete(call, skb, last); if (ret < 0) return ret; @@ -334,8 +331,7 @@ static int afs_deliver_cb_init_call_back_state(struct afs_call *call, /* we'll need the file server record as that tells us which set of * vnodes to operate upon */ - memcpy(&addr, &ip_hdr(skb)->saddr, 4); - server = afs_find_server(&addr); + server = afs_find_server(&srx); if (!server) return -ENOTCONN; call->server = server; @@ -352,11 +348,13 @@ static int afs_deliver_cb_init_call_back_state3(struct afs_call *call, struct sk_buff *skb, bool last) { + struct sockaddr_rxrpc srx; struct afs_server *server; - struct in_addr addr; _enter(",{%u},%d", skb->len, last); + rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx); + /* There are some arguments that we ignore */ afs_data_consumed(call, skb); if (!last) @@ -367,8 +365,7 @@ static int afs_deliver_cb_init_call_back_state3(struct afs_call *call, /* we'll need the file server record as that tells us which set of * vnodes to operate upon */ - memcpy(&addr, &ip_hdr(skb)->saddr, 4); - server = afs_find_server(&addr); + server = afs_find_server(&srx); if (!server) return -ENOTCONN; call->server = server; @@ -426,7 +423,6 @@ static void SRXAFSCB_ProbeUuid(struct work_struct *work) _enter(""); - if (memcmp(r, &afs_uuid, sizeof(afs_uuid)) == 0) reply.match = htonl(0); else diff --git a/fs/afs/internal.h b/fs/afs/internal.h index df976b2a7f40..d97552de9c59 100644 --- a/fs/afs/internal.h +++ b/fs/afs/internal.h @@ -20,6 +20,7 @@ #include <linux/sched.h> #include <linux/fscache.h> #include <linux/backing-dev.h> +#include <net/af_rxrpc.h> #include "afs.h" #include "afs_vl.h" @@ -607,6 +608,8 @@ extern void afs_proc_cell_remove(struct afs_cell *); /* * rxrpc.c */ +extern struct socket *afs_socket; + extern int afs_open_socket(void); extern void afs_close_socket(void); extern void afs_data_consumed(struct afs_call *, struct sk_buff *); @@ -654,7 +657,7 @@ do { \ extern struct afs_server *afs_lookup_server(struct afs_cell *, const struct in_addr *); -extern struct afs_server *afs_find_server(const struct in_addr *); +extern struct afs_server *afs_find_server(const struct sockaddr_rxrpc *); extern void afs_put_server(struct afs_server *); extern void __exit afs_purge_servers(void); diff --git a/fs/afs/main.c b/fs/afs/main.c index 35de0c04729f..0b187ef3b5b7 100644 --- a/fs/afs/main.c +++ b/fs/afs/main.c @@ -14,6 +14,7 @@ #include <linux/init.h> #include <linux/completion.h> #include <linux/sched.h> +#include <linux/random.h> #include "internal.h" MODULE_DESCRIPTION("AFS Client File System"); diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c index 14d04c848465..7b0d18900f50 100644 --- a/fs/afs/rxrpc.c +++ b/fs/afs/rxrpc.c @@ -16,7 +16,7 @@ #include "internal.h" #include "afs_cm.h" -static struct socket *afs_socket; /* my RxRPC socket */ +struct socket *afs_socket; /* my RxRPC socket */ static struct workqueue_struct *afs_async_calls; static atomic_t afs_outstanding_calls; static atomic_t afs_outstanding_skbs; @@ -207,7 +207,7 @@ static void afs_free_call(struct afs_call *call) static void afs_end_call_nofree(struct afs_call *call) { if (call->rxcall) { - rxrpc_kernel_end_call(call->rxcall); + rxrpc_kernel_end_call(afs_socket, call->rxcall); call->rxcall = NULL; } if (call->type->destructor) @@ -325,8 +325,8 @@ static int afs_send_pages(struct afs_call *call, struct msghdr *msg, * returns from sending the request */ if (first + loop >= last) call->state = AFS_CALL_AWAIT_REPLY; - ret = rxrpc_kernel_send_data(call->rxcall, msg, - to - offset); + ret = rxrpc_kernel_send_data(afs_socket, call->rxcall, + msg, to - offset); kunmap(pages[loop]); if (ret < 0) break; @@ -406,7 +406,8 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp, * request */ if (!call->send_pages) call->state = AFS_CALL_AWAIT_REPLY; - ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size); + ret = rxrpc_kernel_send_data(afs_socket, rxcall, + &msg, call->request_size); if (ret < 0) goto error_do_abort; @@ -421,7 +422,7 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp, return wait_mode->wait(call); error_do_abort: - rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT); + rxrpc_kernel_abort_call(afs_socket, rxcall, RX_USER_ABORT); while ((skb = skb_dequeue(&call->rx_queue))) afs_free_skb(skb); error_kill_call: @@ -509,7 +510,8 @@ static void afs_deliver_to_call(struct afs_call *call) if (call->state != AFS_CALL_AWAIT_REPLY) abort_code = RXGEN_SS_UNMARSHAL; do_abort: - rxrpc_kernel_abort_call(call->rxcall, + rxrpc_kernel_abort_call(afs_socket, + call->rxcall, abort_code); call->error = ret; call->state = AFS_CALL_ERROR; @@ -605,7 +607,7 @@ static int afs_wait_for_call_to_complete(struct afs_call *call) /* kill the call */ if (call->state < AFS_CALL_COMPLETE) { _debug("call incomplete"); - rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD); + rxrpc_kernel_abort_call(afs_socket, call->rxcall, RX_CALL_DEAD); while ((skb = skb_dequeue(&call->rx_queue))) afs_free_skb(skb); } @@ -823,14 +825,15 @@ void afs_send_empty_reply(struct afs_call *call) msg.msg_flags = 0; call->state = AFS_CALL_AWAIT_ACK; - switch (rxrpc_kernel_send_data(call->rxcall, &msg, 0)) { + switch (rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, 0)) { case 0: _leave(" [replied]"); return; case -ENOMEM: _debug("oom"); - rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT); + rxrpc_kernel_abort_call(afs_socket, call->rxcall, + RX_USER_ABORT); default: afs_end_call(call); _leave(" [error]"); @@ -859,7 +862,7 @@ void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len) msg.msg_flags = 0; call->state = AFS_CALL_AWAIT_ACK; - n = rxrpc_kernel_send_data(call->rxcall, &msg, len); + n = rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, len); if (n >= 0) { /* Success */ _leave(" [replied]"); @@ -868,7 +871,8 @@ void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len) if (n == -ENOMEM) { _debug("oom"); - rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT); + rxrpc_kernel_abort_call(afs_socket, call->rxcall, + RX_USER_ABORT); } afs_end_call(call); _leave(" [error]"); diff --git a/fs/afs/server.c b/fs/afs/server.c index f342acf3547d..d4066ab7dd55 100644 --- a/fs/afs/server.c +++ b/fs/afs/server.c @@ -178,13 +178,18 @@ server_in_two_cells: /* * look up a server by its IP address */ -struct afs_server *afs_find_server(const struct in_addr *_addr) +struct afs_server *afs_find_server(const struct sockaddr_rxrpc *srx) { struct afs_server *server = NULL; struct rb_node *p; - struct in_addr addr = *_addr; + struct in_addr addr = srx->transport.sin.sin_addr; - _enter("%pI4", &addr.s_addr); + _enter("{%d,%pI4}", srx->transport.family, &addr.s_addr); + + if (srx->transport.family != AF_INET) { + WARN(true, "AFS does not yes support non-IPv4 addresses\n"); + return NULL; + } read_lock(&afs_servers_lock); diff --git a/include/net/af_rxrpc.h b/include/net/af_rxrpc.h index 7b0f88699b25..f8d8079dc058 100644 --- a/include/net/af_rxrpc.h +++ b/include/net/af_rxrpc.h @@ -15,6 +15,9 @@ #include <linux/skbuff.h> #include <linux/rxrpc.h> +struct key; +struct sock; +struct socket; struct rxrpc_call; /* @@ -39,15 +42,18 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *, struct key *, unsigned long, gfp_t); -int rxrpc_kernel_send_data(struct rxrpc_call *, struct msghdr *, size_t); +int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *, + struct msghdr *, size_t); void rxrpc_kernel_data_consumed(struct rxrpc_call *, struct sk_buff *); -void rxrpc_kernel_abort_call(struct rxrpc_call *, u32); -void rxrpc_kernel_end_call(struct rxrpc_call *); +void rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *, u32); +void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *); bool rxrpc_kernel_is_data_last(struct sk_buff *); u32 rxrpc_kernel_get_abort_code(struct sk_buff *); int rxrpc_kernel_get_error_number(struct sk_buff *); void rxrpc_kernel_free_skb(struct sk_buff *); struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *, unsigned long); int rxrpc_kernel_reject_call(struct socket *); +void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *, + struct sockaddr_rxrpc *); #endif /* _NET_RXRPC_H */ diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h index 15283ee3e41a..cbe574ea674b 100644 --- a/include/trace/events/rxrpc.h +++ b/include/trace/events/rxrpc.h @@ -16,6 +16,45 @@ #include <linux/tracepoint.h> +TRACE_EVENT(rxrpc_call, + TP_PROTO(struct rxrpc_call *call, int op, int usage, int nskb, + const void *where, const void *aux), + + TP_ARGS(call, op, usage, nskb, where, aux), + + TP_STRUCT__entry( + __field(struct rxrpc_call *, call ) + __field(int, op ) + __field(int, usage ) + __field(int, nskb ) + __field(const void *, where ) + __field(const void *, aux ) + ), + + TP_fast_assign( + __entry->call = call; + __entry->op = op; + __entry->usage = usage; + __entry->nskb = nskb; + __entry->where = where; + __entry->aux = aux; + ), + + TP_printk("c=%p %s u=%d s=%d p=%pSR a=%p", + __entry->call, + (__entry->op == 0 ? "NWc" : + __entry->op == 1 ? "NWs" : + __entry->op == 2 ? "SEE" : + __entry->op == 3 ? "GET" : + __entry->op == 4 ? "Gsb" : + __entry->op == 5 ? "PUT" : + "Psb"), + __entry->usage, + __entry->nskb, + __entry->where, + __entry->aux) + ); + TRACE_EVENT(rxrpc_skb, TP_PROTO(struct sk_buff *skb, int op, int usage, int mod_count, const void *where), diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c index c7cf356b42b8..e07c91acd904 100644 --- a/net/rxrpc/af_rxrpc.c +++ b/net/rxrpc/af_rxrpc.c @@ -279,15 +279,16 @@ EXPORT_SYMBOL(rxrpc_kernel_begin_call); /** * rxrpc_kernel_end_call - Allow a kernel service to end a call it was using + * @sock: The socket the call is on * @call: The call to end * * Allow a kernel service to end a call it was using. The call must be * complete before this is called (the call should be aborted if necessary). */ -void rxrpc_kernel_end_call(struct rxrpc_call *call) +void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call) { _enter("%d{%d}", call->debug_id, atomic_read(&call->usage)); - rxrpc_remove_user_ID(call->socket, call); + rxrpc_remove_user_ID(rxrpc_sk(sock->sk), call); rxrpc_put_call(call); } EXPORT_SYMBOL(rxrpc_kernel_end_call); diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h index c761124961cc..0c320b2b7b43 100644 --- a/net/rxrpc/ar-internal.h +++ b/net/rxrpc/ar-internal.h @@ -289,8 +289,6 @@ enum rxrpc_conn_proto_state { RXRPC_CONN_SERVICE, /* Service secured connection */ RXRPC_CONN_REMOTELY_ABORTED, /* Conn aborted by peer */ RXRPC_CONN_LOCALLY_ABORTED, /* Conn aborted locally */ - RXRPC_CONN_NETWORK_ERROR, /* Conn terminated by network error */ - RXRPC_CONN_LOCAL_ERROR, /* Conn terminated by local error */ RXRPC_CONN__NR_STATES }; @@ -344,7 +342,6 @@ struct rxrpc_connection { enum rxrpc_conn_proto_state state : 8; /* current state of connection */ u32 local_abort; /* local abort code */ u32 remote_abort; /* remote abort code */ - int error; /* local error incurred */ int debug_id; /* debug ID for printks */ atomic_t serial; /* packet serial number counter */ unsigned int hi_serial; /* highest serial number received */ @@ -411,13 +408,22 @@ enum rxrpc_call_state { RXRPC_CALL_SERVER_ACK_REQUEST, /* - server pending ACK of request */ RXRPC_CALL_SERVER_SEND_REPLY, /* - server sending reply */ RXRPC_CALL_SERVER_AWAIT_ACK, /* - server awaiting final ACK */ - RXRPC_CALL_COMPLETE, /* - call completed */ + RXRPC_CALL_COMPLETE, /* - call complete */ + RXRPC_CALL_DEAD, /* - call is dead */ + NR__RXRPC_CALL_STATES +}; + +/* + * Call completion condition (state == RXRPC_CALL_COMPLETE). + */ +enum rxrpc_call_completion { + RXRPC_CALL_SUCCEEDED, /* - Normal termination */ RXRPC_CALL_SERVER_BUSY, /* - call rejected by busy server */ RXRPC_CALL_REMOTELY_ABORTED, /* - call aborted by peer */ RXRPC_CALL_LOCALLY_ABORTED, /* - call aborted locally on error or close */ + RXRPC_CALL_LOCAL_ERROR, /* - call failed due to local error */ RXRPC_CALL_NETWORK_ERROR, /* - call terminated by network error */ - RXRPC_CALL_DEAD, /* - call is dead */ - NR__RXRPC_CALL_STATES + NR__RXRPC_CALL_COMPLETIONS }; /* @@ -451,14 +457,13 @@ struct rxrpc_call { unsigned long events; spinlock_t lock; rwlock_t state_lock; /* lock for state transition */ + u32 abort_code; /* Local/remote abort code */ + int error; /* Local error incurred */ + enum rxrpc_call_state state : 8; /* current state of call */ + enum rxrpc_call_completion completion : 8; /* Call completion condition */ atomic_t usage; atomic_t skb_count; /* Outstanding packets on this call */ atomic_t sequence; /* Tx data packet sequence counter */ - u32 local_abort; /* local abort code */ - u32 remote_abort; /* remote abort code */ - int error_report; /* Network error (ICMP/local transport) */ - int error; /* Local error incurred */ - enum rxrpc_call_state state : 8; /* current state of call */ u16 service_id; /* service ID */ u32 call_id; /* call ID on connection */ u32 cid; /* connection ID plus channel index */ @@ -493,20 +498,6 @@ struct rxrpc_call { unsigned long ackr_window[RXRPC_ACKR_WINDOW_ASZ + 1]; }; -/* - * locally abort an RxRPC call - */ -static inline void rxrpc_abort_call(struct rxrpc_call *call, u32 abort_code) -{ - write_lock_bh(&call->state_lock); - if (call->state < RXRPC_CALL_COMPLETE) { - call->local_abort = abort_code; - call->state = RXRPC_CALL_LOCALLY_ABORTED; - set_bit(RXRPC_CALL_EV_ABORT, &call->events); - } - write_unlock_bh(&call->state_lock); -} - #include <trace/events/rxrpc.h> /* @@ -534,6 +525,8 @@ void rxrpc_process_call(struct work_struct *); /* * call_object.c */ +extern const char *const rxrpc_call_states[]; +extern const char *const rxrpc_call_completions[]; extern unsigned int rxrpc_max_call_lifetime; extern unsigned int rxrpc_dead_call_expiry; extern struct kmem_cache *rxrpc_call_jar; @@ -550,7 +543,11 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *, struct sk_buff *); void rxrpc_release_call(struct rxrpc_call *); void rxrpc_release_calls_on_socket(struct rxrpc_sock *); -void __rxrpc_put_call(struct rxrpc_call *); +void rxrpc_see_call(struct rxrpc_call *); +void rxrpc_get_call(struct rxrpc_call *); +void rxrpc_put_call(struct rxrpc_call *); +void rxrpc_get_call_for_skb(struct rxrpc_call *, struct sk_buff *); +void rxrpc_put_call_for_skb(struct rxrpc_call *, struct sk_buff *); void __exit rxrpc_destroy_all_calls(void); static inline bool rxrpc_is_service_call(const struct rxrpc_call *call) @@ -564,6 +561,78 @@ static inline bool rxrpc_is_client_call(const struct rxrpc_call *call) } /* + * Transition a call to the complete state. + */ +static inline bool __rxrpc_set_call_completion(struct rxrpc_call *call, + enum rxrpc_call_completion compl, + u32 abort_code, + int error) +{ + if (call->state < RXRPC_CALL_COMPLETE) { + call->abort_code = abort_code; + call->error = error; + call->completion = compl, + call->state = RXRPC_CALL_COMPLETE; + return true; + } + return false; +} + +static inline bool rxrpc_set_call_completion(struct rxrpc_call *call, + enum rxrpc_call_completion compl, + u32 abort_code, + int error) +{ + int ret; + + write_lock_bh(&call->state_lock); + ret = __rxrpc_set_call_completion(call, compl, abort_code, error); + write_unlock_bh(&call->state_lock); + return ret; +} + +/* + * Record that a call successfully completed. + */ +static inline void __rxrpc_call_completed(struct rxrpc_call *call) +{ + __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0); +} + +static inline void rxrpc_call_completed(struct rxrpc_call *call) +{ + write_lock_bh(&call->state_lock); + __rxrpc_call_completed(call); + write_unlock_bh(&call->state_lock); +} + +/* + * Record that a call is locally aborted. + */ +static inline bool __rxrpc_abort_call(struct rxrpc_call *call, + u32 abort_code, int error) +{ + if (__rxrpc_set_call_completion(call, + RXRPC_CALL_LOCALLY_ABORTED, + abort_code, error)) { + set_bit(RXRPC_CALL_EV_ABORT, &call->events); + return true; + } + return false; +} + +static inline bool rxrpc_abort_call(struct rxrpc_call *call, + u32 abort_code, int error) +{ + bool ret; + + write_lock_bh(&call->state_lock); + ret = __rxrpc_abort_call(call, abort_code, error); + write_unlock_bh(&call->state_lock); + return ret; +} + +/* * conn_client.c */ extern unsigned int rxrpc_max_client_connections; @@ -778,7 +847,6 @@ static inline void rxrpc_put_peer(struct rxrpc_peer *peer) /* * proc.c */ -extern const char *const rxrpc_call_states[]; extern const struct file_operations rxrpc_call_seq_fops; extern const struct file_operations rxrpc_connection_seq_fops; @@ -958,16 +1026,3 @@ do { \ } while (0) #endif /* __KDEBUGALL */ - - -#define rxrpc_get_call(CALL) \ -do { \ - CHECK_SLAB_OKAY(&(CALL)->usage); \ - if (atomic_inc_return(&(CALL)->usage) == 1) \ - BUG(); \ -} while (0) - -#define rxrpc_put_call(CALL) \ -do { \ - __rxrpc_put_call(CALL); \ -} while (0) diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c index 669ac79d3b44..03af88fe798b 100644 --- a/net/rxrpc/call_accept.c +++ b/net/rxrpc/call_accept.c @@ -129,8 +129,7 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local, _debug("conn ready"); call->state = RXRPC_CALL_SERVER_ACCEPTING; list_add_tail(&call->accept_link, &rx->acceptq); - rxrpc_get_call(call); - atomic_inc(&call->skb_count); + rxrpc_get_call_for_skb(call, notification); nsp = rxrpc_skb(notification); nsp->call = call; @@ -323,18 +322,15 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx, call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link); list_del_init(&call->accept_link); sk_acceptq_removed(&rx->sk); + rxrpc_see_call(call); write_lock_bh(&call->state_lock); switch (call->state) { case RXRPC_CALL_SERVER_ACCEPTING: call->state = RXRPC_CALL_SERVER_RECV_REQUEST; break; - case RXRPC_CALL_REMOTELY_ABORTED: - case RXRPC_CALL_LOCALLY_ABORTED: - ret = -ECONNABORTED; - goto out_release; - case RXRPC_CALL_NETWORK_ERROR: - ret = call->conn->error; + case RXRPC_CALL_COMPLETE: + ret = call->error; goto out_release; case RXRPC_CALL_DEAD: ret = -ETIME; @@ -399,21 +395,19 @@ int rxrpc_reject_call(struct rxrpc_sock *rx) call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link); list_del_init(&call->accept_link); sk_acceptq_removed(&rx->sk); + rxrpc_see_call(call); write_lock_bh(&call->state_lock); switch (call->state) { case RXRPC_CALL_SERVER_ACCEPTING: - call->state = RXRPC_CALL_SERVER_BUSY; + __rxrpc_set_call_completion(call, RXRPC_CALL_SERVER_BUSY, + 0, ECONNABORTED); if (test_and_set_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events)) rxrpc_queue_call(call); ret = 0; goto out_release; - case RXRPC_CALL_REMOTELY_ABORTED: - case RXRPC_CALL_LOCALLY_ABORTED: - ret = -ECONNABORTED; - goto out_release; - case RXRPC_CALL_NETWORK_ERROR: - ret = call->conn->error; + case RXRPC_CALL_COMPLETE: + ret = call->error; goto out_release; case RXRPC_CALL_DEAD: ret = -ETIME; diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c index 5292bcfd8816..de72de662044 100644 --- a/net/rxrpc/call_event.c +++ b/net/rxrpc/call_event.c @@ -95,7 +95,7 @@ cancel_timer: _debug("cancel timer %%%u", serial); try_to_del_timer_sync(&call->ack_timer); read_lock_bh(&call->state_lock); - if (call->state <= RXRPC_CALL_COMPLETE && + if (call->state < RXRPC_CALL_COMPLETE && !test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events)) rxrpc_queue_call(call); read_unlock_bh(&call->state_lock); @@ -123,7 +123,7 @@ static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend, unsigned long resend_at) { read_lock_bh(&call->state_lock); - if (call->state >= RXRPC_CALL_COMPLETE) + if (call->state == RXRPC_CALL_COMPLETE) resend = 0; if (resend & 1) { @@ -230,7 +230,7 @@ static void rxrpc_resend_timer(struct rxrpc_call *call) _enter("%d,%d,%d", call->acks_tail, call->acks_unacked, call->acks_head); - if (call->state >= RXRPC_CALL_COMPLETE) + if (call->state == RXRPC_CALL_COMPLETE) return; resend = 0; @@ -465,8 +465,7 @@ static void rxrpc_insert_oos_packet(struct rxrpc_call *call, skb->destructor = rxrpc_packet_destructor; ASSERTCMP(sp->call, ==, NULL); sp->call = call; - rxrpc_get_call(call); - atomic_inc(&call->skb_count); + rxrpc_get_call_for_skb(call, skb); /* insert into the buffer in sequence order */ spin_lock_bh(&call->lock); @@ -552,7 +551,7 @@ static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb, mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU)); - peer = call->conn->params.peer; + peer = call->peer; if (mtu < peer->maxdata) { spin_lock_bh(&peer->lock); peer->maxdata = mtu; @@ -711,7 +710,7 @@ all_acked: break; case RXRPC_CALL_SERVER_AWAIT_ACK: _debug("srv complete"); - call->state = RXRPC_CALL_COMPLETE; + __rxrpc_call_completed(call); post_ACK = true; break; case RXRPC_CALL_CLIENT_SEND_REQUEST: @@ -741,8 +740,7 @@ all_acked: _debug("post ACK"); skb->mark = RXRPC_SKB_MARK_FINAL_ACK; sp->call = call; - rxrpc_get_call(call); - atomic_inc(&call->skb_count); + rxrpc_get_call_for_skb(call, skb); spin_lock_bh(&call->lock); if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0) BUG(); @@ -801,8 +799,7 @@ static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error, memset(sp, 0, sizeof(*sp)); sp->error = error; sp->call = call; - rxrpc_get_call(call); - atomic_inc(&call->skb_count); + rxrpc_get_call_for_skb(call, skb); spin_lock_bh(&call->lock); ret = rxrpc_queue_rcv_skb(call, skb, true, fatal); @@ -834,6 +831,8 @@ void rxrpc_process_call(struct work_struct *work) u32 serial, abort_code = RX_PROTOCOL_ERROR; u8 *acks = NULL; + rxrpc_see_call(call); + //printk("\n--------------------\n"); _enter("{%d,%s,%lx} [%lu]", call->debug_id, rxrpc_call_states[call->state], call->events, @@ -844,8 +843,8 @@ void rxrpc_process_call(struct work_struct *work) /* there's a good chance we're going to have to send a message, so set * one up in advance */ - msg.msg_name = &call->conn->params.peer->srx.transport; - msg.msg_namelen = call->conn->params.peer->srx.transport_len; + msg.msg_name = &call->peer->srx.transport; + msg.msg_namelen = call->peer->srx.transport_len; msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_flags = 0; @@ -875,24 +874,22 @@ skip_msg_init: clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events); clear_bit(RXRPC_CALL_EV_ABORT, &call->events); - error = call->error_report; - if (error < RXRPC_LOCAL_ERROR_OFFSET) { + if (call->completion == RXRPC_CALL_NETWORK_ERROR) { mark = RXRPC_SKB_MARK_NET_ERROR; _debug("post net error %d", error); } else { mark = RXRPC_SKB_MARK_LOCAL_ERROR; - error -= RXRPC_LOCAL_ERROR_OFFSET; _debug("post net local error %d", error); } - if (rxrpc_post_message(call, mark, error, true) < 0) + if (rxrpc_post_message(call, mark, call->error, true) < 0) goto no_mem; clear_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events); goto kill_ACKs; } if (test_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events)) { - ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE); + ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE); clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events); clear_bit(RXRPC_CALL_EV_ABORT, &call->events); @@ -900,7 +897,7 @@ skip_msg_init: _debug("post conn abort"); if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR, - call->conn->error, true) < 0) + call->error, true) < 0) goto no_mem; clear_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events); goto kill_ACKs; @@ -913,13 +910,13 @@ skip_msg_init: } if (test_bit(RXRPC_CALL_EV_ABORT, &call->events)) { - ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE); + ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE); if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR, - ECONNABORTED, true) < 0) + call->error, true) < 0) goto no_mem; whdr.type = RXRPC_PACKET_TYPE_ABORT; - data = htonl(call->local_abort); + data = htonl(call->abort_code); iov[1].iov_base = &data; iov[1].iov_len = sizeof(data); genbit = RXRPC_CALL_EV_ABORT; @@ -979,13 +976,7 @@ skip_msg_init: } if (test_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events)) { - write_lock_bh(&call->state_lock); - if (call->state <= RXRPC_CALL_COMPLETE) { - call->state = RXRPC_CALL_LOCALLY_ABORTED; - call->local_abort = RX_CALL_TIMEOUT; - set_bit(RXRPC_CALL_EV_ABORT, &call->events); - } - write_unlock_bh(&call->state_lock); + rxrpc_abort_call(call, RX_CALL_TIMEOUT, ETIME); _debug("post timeout"); if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR, @@ -998,7 +989,8 @@ skip_msg_init: /* deal with assorted inbound messages */ if (!skb_queue_empty(&call->rx_queue)) { - switch (rxrpc_process_rx_queue(call, &abort_code)) { + ret = rxrpc_process_rx_queue(call, &abort_code); + switch (ret) { case 0: case -EAGAIN: break; @@ -1007,7 +999,7 @@ skip_msg_init: case -EKEYEXPIRED: case -EKEYREJECTED: case -EPROTO: - rxrpc_abort_call(call, abort_code); + rxrpc_abort_call(call, abort_code, -ret); goto kill_ACKs; } } @@ -1159,8 +1151,8 @@ skip_msg_init: send_ACK_with_skew: ack.maxSkew = htons(call->ackr_skew); send_ACK: - mtu = call->conn->params.peer->if_mtu; - mtu -= call->conn->params.peer->hdrsize; + mtu = call->peer->if_mtu; + mtu -= call->peer->hdrsize; ackinfo.maxMTU = htonl(mtu); ackinfo.rwind = htonl(rxrpc_rx_window_size); @@ -1232,10 +1224,7 @@ send_message_2: goto kill_ACKs; case RXRPC_CALL_EV_ACK_FINAL: - write_lock_bh(&call->state_lock); - if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK) - call->state = RXRPC_CALL_COMPLETE; - write_unlock_bh(&call->state_lock); + rxrpc_call_completed(call); goto kill_ACKs; default: diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c index e7cbcc4a87cf..104ee8b1de06 100644 --- a/net/rxrpc/call_object.c +++ b/net/rxrpc/call_object.c @@ -30,7 +30,7 @@ unsigned int rxrpc_max_call_lifetime = 60 * HZ; unsigned int rxrpc_dead_call_expiry = 2 * HZ; const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = { - [RXRPC_CALL_UNINITIALISED] = "Uninit", + [RXRPC_CALL_UNINITIALISED] = "Uninit ", [RXRPC_CALL_CLIENT_AWAIT_CONN] = "ClWtConn", [RXRPC_CALL_CLIENT_SEND_REQUEST] = "ClSndReq", [RXRPC_CALL_CLIENT_AWAIT_REPLY] = "ClAwtRpl", @@ -43,11 +43,16 @@ const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = { [RXRPC_CALL_SERVER_SEND_REPLY] = "SvSndRpl", [RXRPC_CALL_SERVER_AWAIT_ACK] = "SvAwtACK", [RXRPC_CALL_COMPLETE] = "Complete", + [RXRPC_CALL_DEAD] = "Dead ", +}; + +const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = { + [RXRPC_CALL_SUCCEEDED] = "Complete", [RXRPC_CALL_SERVER_BUSY] = "SvBusy ", [RXRPC_CALL_REMOTELY_ABORTED] = "RmtAbort", [RXRPC_CALL_LOCALLY_ABORTED] = "LocAbort", + [RXRPC_CALL_LOCAL_ERROR] = "LocError", [RXRPC_CALL_NETWORK_ERROR] = "NetError", - [RXRPC_CALL_DEAD] = "Dead ", }; struct kmem_cache *rxrpc_call_jar; @@ -214,6 +219,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx, { struct rxrpc_call *call, *xcall; struct rb_node *parent, **pp; + const void *here = __builtin_return_address(0); int ret; _enter("%p,%lx", rx, user_call_ID); @@ -224,6 +230,9 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx, return call; } + trace_rxrpc_call(call, 0, atomic_read(&call->usage), 0, here, + (const void *)user_call_ID); + /* Publish the call, even though it is incompletely set up as yet */ call->user_call_ID = user_call_ID; __set_bit(RXRPC_CALL_HAS_USERID, &call->flags); @@ -303,6 +312,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx, { struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct rxrpc_call *call, *candidate; + const void *here = __builtin_return_address(0); u32 call_id, chan; _enter(",%d", conn->debug_id); @@ -313,6 +323,9 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx, if (!candidate) return ERR_PTR(-EBUSY); + trace_rxrpc_call(candidate, 1, atomic_read(&candidate->usage), + 0, here, NULL); + chan = sp->hdr.cid & RXRPC_CHANNELMASK; candidate->socket = rx; candidate->conn = conn; @@ -358,7 +371,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx, _debug("CALL: %u { %s }", call->debug_id, rxrpc_call_states[call->state]); - if (call->state >= RXRPC_CALL_COMPLETE) { + if (call->state == RXRPC_CALL_COMPLETE) { __rxrpc_disconnect_call(conn, call); } else { spin_unlock(&conn->channel_lock); @@ -426,6 +439,44 @@ old_call: } /* + * Note the re-emergence of a call. + */ +void rxrpc_see_call(struct rxrpc_call *call) +{ + const void *here = __builtin_return_address(0); + if (call) { + int n = atomic_read(&call->usage); + int m = atomic_read(&call->skb_count); + + trace_rxrpc_call(call, 2, n, m, here, 0); + } +} + +/* + * Note the addition of a ref on a call. + */ +void rxrpc_get_call(struct rxrpc_call *call) +{ + const void *here = __builtin_return_address(0); + int n = atomic_inc_return(&call->usage); + int m = atomic_read(&call->skb_count); + + trace_rxrpc_call(call, 3, n, m, here, 0); +} + +/* + * Note the addition of a ref on a call for a socket buffer. + */ +void rxrpc_get_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb) +{ + const void *here = __builtin_return_address(0); + int n = atomic_inc_return(&call->usage); + int m = atomic_inc_return(&call->skb_count); + + trace_rxrpc_call(call, 4, n, m, here, skb); +} + +/* * detach a call from a socket and set up for release */ void rxrpc_release_call(struct rxrpc_call *call) @@ -438,6 +489,8 @@ void rxrpc_release_call(struct rxrpc_call *call) atomic_read(&call->ackr_not_idle), call->rx_first_oos); + rxrpc_see_call(call); + spin_lock_bh(&call->lock); if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags)) BUG(); @@ -472,8 +525,7 @@ void rxrpc_release_call(struct rxrpc_call *call) if (call->state < RXRPC_CALL_COMPLETE && call->state != RXRPC_CALL_CLIENT_FINAL_ACK) { _debug("+++ ABORTING STATE %d +++\n", call->state); - call->state = RXRPC_CALL_LOCALLY_ABORTED; - call->local_abort = RX_CALL_DEAD; + __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET); } write_unlock_bh(&call->state_lock); @@ -522,6 +574,7 @@ static void rxrpc_dead_call_expired(unsigned long _call) _enter("{%d}", call->debug_id); + rxrpc_see_call(call); write_lock_bh(&call->state_lock); call->state = RXRPC_CALL_DEAD; write_unlock_bh(&call->state_lock); @@ -536,22 +589,16 @@ static void rxrpc_mark_call_released(struct rxrpc_call *call) { bool sched; + rxrpc_see_call(call); write_lock(&call->state_lock); if (call->state < RXRPC_CALL_DEAD) { - sched = false; - if (call->state < RXRPC_CALL_COMPLETE) { - _debug("abort call %p", call); - call->state = RXRPC_CALL_LOCALLY_ABORTED; - call->local_abort = RX_CALL_DEAD; - if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events)) - sched = true; - } + sched = __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET); if (!test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events)) sched = true; - if (sched) - rxrpc_queue_call(call); } write_unlock(&call->state_lock); + if (sched) + rxrpc_queue_call(call); } /* @@ -588,21 +635,43 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx) /* * release a call */ -void __rxrpc_put_call(struct rxrpc_call *call) +void rxrpc_put_call(struct rxrpc_call *call) { - ASSERT(call != NULL); + const void *here = __builtin_return_address(0); + int n, m; - _enter("%p{u=%d}", call, atomic_read(&call->usage)); + ASSERT(call != NULL); - ASSERTCMP(atomic_read(&call->usage), >, 0); + n = atomic_dec_return(&call->usage); + m = atomic_read(&call->skb_count); + trace_rxrpc_call(call, 5, n, m, here, NULL); + ASSERTCMP(n, >=, 0); + if (n == 0) { + _debug("call %d dead", call->debug_id); + WARN_ON(m != 0); + ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD); + rxrpc_queue_work(&call->destroyer); + } +} - if (atomic_dec_and_test(&call->usage)) { +/* + * Release a call ref held by a socket buffer. + */ +void rxrpc_put_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb) +{ + const void *here = __builtin_return_address(0); + int n, m; + + n = atomic_dec_return(&call->usage); + m = atomic_dec_return(&call->skb_count); + trace_rxrpc_call(call, 6, n, m, here, skb); + ASSERTCMP(n, >=, 0); + if (n == 0) { _debug("call %d dead", call->debug_id); - WARN_ON(atomic_read(&call->skb_count) != 0); + WARN_ON(m != 0); ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD); rxrpc_queue_work(&call->destroyer); } - _leave(""); } /* @@ -708,6 +777,7 @@ void __exit rxrpc_destroy_all_calls(void) call = list_entry(rxrpc_calls.next, struct rxrpc_call, link); _debug("Zapping call %p", call); + rxrpc_see_call(call); list_del_init(&call->link); switch (atomic_read(&call->usage)) { @@ -749,16 +819,14 @@ static void rxrpc_call_life_expired(unsigned long _call) { struct rxrpc_call *call = (struct rxrpc_call *) _call; + _enter("{%d}", call->debug_id); + + rxrpc_see_call(call); if (call->state >= RXRPC_CALL_COMPLETE) return; - _enter("{%d}", call->debug_id); - read_lock_bh(&call->state_lock); - if (call->state < RXRPC_CALL_COMPLETE) { - set_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events); - rxrpc_queue_call(call); - } - read_unlock_bh(&call->state_lock); + set_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events); + rxrpc_queue_call(call); } /* @@ -771,6 +839,7 @@ static void rxrpc_resend_time_expired(unsigned long _call) _enter("{%d}", call->debug_id); + rxrpc_see_call(call); if (call->state >= RXRPC_CALL_COMPLETE) return; @@ -788,12 +857,10 @@ static void rxrpc_ack_time_expired(unsigned long _call) _enter("{%d}", call->debug_id); + rxrpc_see_call(call); if (call->state >= RXRPC_CALL_COMPLETE) return; - read_lock_bh(&call->state_lock); - if (call->state < RXRPC_CALL_COMPLETE && - !test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events)) + if (!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events)) rxrpc_queue_call(call); - read_unlock_bh(&call->state_lock); } diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c index 349402b08e5a..4b213bc0f554 100644 --- a/net/rxrpc/conn_client.c +++ b/net/rxrpc/conn_client.c @@ -537,6 +537,7 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn, struct rxrpc_call, chan_wait_link); u32 call_id = chan->call_counter + 1; + rxrpc_see_call(call); list_del_init(&call->chan_wait_link); conn->active_chans |= 1 << channel; call->peer = rxrpc_get_peer(conn->params.peer); @@ -741,7 +742,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_call *call) * terminal retransmission without requiring access to the call. */ if (test_bit(RXRPC_CALL_EXPOSED, &call->flags)) { - _debug("exposed %u,%u", call->call_id, call->local_abort); + _debug("exposed %u,%u", call->call_id, call->abort_code); __rxrpc_disconnect_call(conn, call); } diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c index 6296374df840..bc9b05938ff5 100644 --- a/net/rxrpc/conn_event.c +++ b/net/rxrpc/conn_event.c @@ -27,8 +27,8 @@ /* * Retransmit terminal ACK or ABORT of the previous call. */ -static void rxrpc_conn_retransmit(struct rxrpc_connection *conn, - struct sk_buff *skb) +static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, + struct sk_buff *skb) { struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct rxrpc_channel *chan; @@ -135,33 +135,39 @@ static void rxrpc_conn_retransmit(struct rxrpc_connection *conn, /* * pass a connection-level abort onto all calls on that connection */ -static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state, - u32 abort_code) +static void rxrpc_abort_calls(struct rxrpc_connection *conn, + enum rxrpc_call_completion compl, + u32 abort_code, int error) { struct rxrpc_call *call; - int i; + bool queue; + int i, bit; _enter("{%d},%x", conn->debug_id, abort_code); + if (compl == RXRPC_CALL_LOCALLY_ABORTED) + bit = RXRPC_CALL_EV_CONN_ABORT; + else + bit = RXRPC_CALL_EV_RCVD_ABORT; + spin_lock(&conn->channel_lock); for (i = 0; i < RXRPC_MAXCALLS; i++) { call = rcu_dereference_protected( conn->channels[i].call, lockdep_is_held(&conn->channel_lock)); - write_lock_bh(&call->state_lock); - if (call->state <= RXRPC_CALL_COMPLETE) { - call->state = state; - if (state == RXRPC_CALL_LOCALLY_ABORTED) { - call->local_abort = conn->local_abort; - set_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events); - } else { - call->remote_abort = conn->remote_abort; - set_bit(RXRPC_CALL_EV_RCVD_ABORT, &call->events); + if (call) { + rxrpc_see_call(call); + write_lock_bh(&call->state_lock); + if (rxrpc_set_call_completion(call, compl, abort_code, + error)) { + set_bit(bit, &call->events); + queue = true; } - rxrpc_queue_call(call); + write_unlock_bh(&call->state_lock); + if (queue) + rxrpc_queue_call(call); } - write_unlock_bh(&call->state_lock); } spin_unlock(&conn->channel_lock); @@ -186,17 +192,16 @@ static int rxrpc_abort_connection(struct rxrpc_connection *conn, /* generate a connection-level abort */ spin_lock_bh(&conn->state_lock); - if (conn->state < RXRPC_CONN_REMOTELY_ABORTED) { - conn->state = RXRPC_CONN_LOCALLY_ABORTED; - conn->error = error; - spin_unlock_bh(&conn->state_lock); - } else { + if (conn->state >= RXRPC_CONN_REMOTELY_ABORTED) { spin_unlock_bh(&conn->state_lock); _leave(" = 0 [already dead]"); return 0; } - rxrpc_abort_calls(conn, RXRPC_CALL_LOCALLY_ABORTED, abort_code); + conn->state = RXRPC_CONN_LOCALLY_ABORTED; + spin_unlock_bh(&conn->state_lock); + + rxrpc_abort_calls(conn, RXRPC_CALL_LOCALLY_ABORTED, abort_code, error); msg.msg_name = &conn->params.peer->srx.transport; msg.msg_namelen = conn->params.peer->srx.transport_len; @@ -276,7 +281,7 @@ static int rxrpc_process_event(struct rxrpc_connection *conn, switch (sp->hdr.type) { case RXRPC_PACKET_TYPE_DATA: case RXRPC_PACKET_TYPE_ACK: - rxrpc_conn_retransmit(conn, skb); + rxrpc_conn_retransmit_call(conn, skb); rxrpc_free_skb(skb); return 0; @@ -287,7 +292,7 @@ static int rxrpc_process_event(struct rxrpc_connection *conn, _proto("Rx ABORT %%%u { ac=%d }", sp->hdr.serial, abort_code); conn->state = RXRPC_CONN_REMOTELY_ABORTED; - rxrpc_abort_calls(conn, RXRPC_CALL_REMOTELY_ABORTED, + rxrpc_abort_calls(conn, 0, RXRPC_CALL_REMOTELY_ABORTED, abort_code); return -ECONNABORTED; diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c index 5b45b6c367e7..9c6685b97e70 100644 --- a/net/rxrpc/conn_object.c +++ b/net/rxrpc/conn_object.c @@ -165,8 +165,8 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn, * through the channel, whilst disposing of the actual call record. */ chan->last_service_id = call->service_id; - if (call->local_abort) { - chan->last_abort = call->local_abort; + if (call->abort_code) { + chan->last_abort = call->abort_code; chan->last_type = RXRPC_PACKET_TYPE_ABORT; } else { chan->last_seq = call->rx_data_eaten; diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c index 5e683dd21ab9..86bea9ad6c3d 100644 --- a/net/rxrpc/input.c +++ b/net/rxrpc/input.c @@ -196,8 +196,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call, goto enqueue_packet; sp->call = call; - rxrpc_get_call(call); - atomic_inc(&call->skb_count); + rxrpc_get_call_for_skb(call, skb); terminal = ((flags & RXRPC_LAST_PACKET) && !(flags & RXRPC_CLIENT_INITIATED)); ret = rxrpc_queue_rcv_skb(call, skb, false, terminal); @@ -341,14 +340,13 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb) abort_code = ntohl(wtmp); _proto("Rx ABORT %%%u { %x }", sp->hdr.serial, abort_code); - write_lock_bh(&call->state_lock); - if (call->state < RXRPC_CALL_COMPLETE) { - call->state = RXRPC_CALL_REMOTELY_ABORTED; - call->remote_abort = abort_code; + if (__rxrpc_set_call_completion(call, + RXRPC_CALL_REMOTELY_ABORTED, + abort_code, ECONNABORTED)) { set_bit(RXRPC_CALL_EV_RCVD_ABORT, &call->events); rxrpc_queue_call(call); } - goto free_packet_unlock; + goto free_packet; case RXRPC_PACKET_TYPE_BUSY: _proto("Rx BUSY %%%u", sp->hdr.serial); @@ -359,7 +357,9 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb) write_lock_bh(&call->state_lock); switch (call->state) { case RXRPC_CALL_CLIENT_SEND_REQUEST: - call->state = RXRPC_CALL_SERVER_BUSY; + __rxrpc_set_call_completion(call, + RXRPC_CALL_SERVER_BUSY, + 0, EBUSY); set_bit(RXRPC_CALL_EV_RCVD_BUSY, &call->events); rxrpc_queue_call(call); case RXRPC_CALL_SERVER_BUSY: @@ -415,12 +415,8 @@ protocol_error: _debug("protocol error"); write_lock_bh(&call->state_lock); protocol_error_locked: - if (call->state <= RXRPC_CALL_COMPLETE) { - call->state = RXRPC_CALL_LOCALLY_ABORTED; - call->local_abort = RX_PROTOCOL_ERROR; - set_bit(RXRPC_CALL_EV_ABORT, &call->events); + if (__rxrpc_abort_call(call, RX_PROTOCOL_ERROR, EPROTO)) rxrpc_queue_call(call); - } free_packet_unlock: write_unlock_bh(&call->state_lock); free_packet: @@ -486,14 +482,8 @@ protocol_error: _debug("protocol error"); rxrpc_free_skb(part); rxrpc_free_skb(jumbo); - write_lock_bh(&call->state_lock); - if (call->state <= RXRPC_CALL_COMPLETE) { - call->state = RXRPC_CALL_LOCALLY_ABORTED; - call->local_abort = RX_PROTOCOL_ERROR; - set_bit(RXRPC_CALL_EV_ABORT, &call->events); + if (rxrpc_abort_call(call, RX_PROTOCOL_ERROR, EPROTO)) rxrpc_queue_call(call); - } - write_unlock_bh(&call->state_lock); _leave(""); } @@ -514,26 +504,28 @@ static void rxrpc_post_packet_to_call(struct rxrpc_call *call, read_lock(&call->state_lock); switch (call->state) { - case RXRPC_CALL_LOCALLY_ABORTED: - if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events)) { - rxrpc_queue_call(call); - goto free_unlock; - } - case RXRPC_CALL_REMOTELY_ABORTED: - case RXRPC_CALL_NETWORK_ERROR: case RXRPC_CALL_DEAD: goto dead_call; + case RXRPC_CALL_COMPLETE: - case RXRPC_CALL_CLIENT_FINAL_ACK: - /* complete server call */ - if (rxrpc_conn_is_service(call->conn)) + switch (call->completion) { + case RXRPC_CALL_LOCALLY_ABORTED: + if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, + &call->events)) { + rxrpc_queue_call(call); + goto free_unlock; + } + default: goto dead_call; - /* resend last packet of a completed call */ - _debug("final ack again"); - rxrpc_get_call(call); - set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events); - rxrpc_queue_call(call); - goto free_unlock; + case RXRPC_CALL_SUCCEEDED: + if (rxrpc_conn_is_service(call->conn)) + goto dead_call; + goto resend_final_ack; + } + + case RXRPC_CALL_CLIENT_FINAL_ACK: + goto resend_final_ack; + default: break; } @@ -550,6 +542,13 @@ static void rxrpc_post_packet_to_call(struct rxrpc_call *call, rxrpc_put_call(call); goto done; +resend_final_ack: + _debug("final ack again"); + rxrpc_get_call(call); + set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events); + rxrpc_queue_call(call); + goto free_unlock; + dead_call: if (sp->hdr.type != RXRPC_PACKET_TYPE_ABORT) { skb->priority = RX_CALL_DEAD; @@ -748,6 +747,7 @@ void rxrpc_data_ready(struct sock *sk) if (!call || atomic_read(&call->usage) == 0) goto cant_route_call; + rxrpc_see_call(call); rxrpc_post_packet_to_call(call, skb); goto out_unlock; } diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c index 8a9917cba6fe..b1e708a12151 100644 --- a/net/rxrpc/output.c +++ b/net/rxrpc/output.c @@ -115,12 +115,12 @@ static int rxrpc_sendmsg_cmsg(struct msghdr *msg, */ static void rxrpc_send_abort(struct rxrpc_call *call, u32 abort_code) { + if (call->state >= RXRPC_CALL_COMPLETE) + return; + write_lock_bh(&call->state_lock); - if (call->state <= RXRPC_CALL_COMPLETE) { - call->state = RXRPC_CALL_LOCALLY_ABORTED; - call->local_abort = abort_code; - set_bit(RXRPC_CALL_EV_ABORT, &call->events); + if (__rxrpc_abort_call(call, abort_code, ECONNABORTED)) { del_timer_sync(&call->resend_timer); del_timer_sync(&call->ack_timer); clear_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events); @@ -207,12 +207,13 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) return PTR_ERR(call); } + rxrpc_see_call(call); _debug("CALL %d USR %lx ST %d on CONN %p", call->debug_id, call->user_call_ID, call->state, call->conn); if (call->state >= RXRPC_CALL_COMPLETE) { /* it's too late for this call */ - ret = -ECONNRESET; + ret = -ESHUTDOWN; } else if (cmd == RXRPC_CMD_SEND_ABORT) { rxrpc_send_abort(call, abort_code); ret = 0; @@ -238,6 +239,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) /** * rxrpc_kernel_send_data - Allow a kernel service to send data on a call + * @sock: The socket the call is on * @call: The call to send data through * @msg: The data to send * @len: The amount of data to send @@ -247,8 +249,8 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) * nor should an address be supplied. MSG_MORE should be flagged if there's * more data to come, otherwise this data will end the transmission phase. */ -int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg, - size_t len) +int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call, + struct msghdr *msg, size_t len) { int ret; @@ -257,7 +259,7 @@ int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg, ASSERTCMP(msg->msg_name, ==, NULL); ASSERTCMP(msg->msg_control, ==, NULL); - lock_sock(&call->socket->sk); + lock_sock(sock->sk); _debug("CALL %d USR %lx ST %d on CONN %p", call->debug_id, call->user_call_ID, call->state, call->conn); @@ -269,36 +271,36 @@ int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg, call->state != RXRPC_CALL_SERVER_SEND_REPLY) { ret = -EPROTO; /* request phase complete for this client call */ } else { - ret = rxrpc_send_data(call->socket, call, msg, len); + ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len); } - release_sock(&call->socket->sk); + release_sock(sock->sk); _leave(" = %d", ret); return ret; } - EXPORT_SYMBOL(rxrpc_kernel_send_data); /** * rxrpc_kernel_abort_call - Allow a kernel service to abort a call + * @sock: The socket the call is on * @call: The call to be aborted * @abort_code: The abort code to stick into the ABORT packet * * Allow a kernel service to abort a call, if it's still in an abortable state. */ -void rxrpc_kernel_abort_call(struct rxrpc_call *call, u32 abort_code) +void rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call, + u32 abort_code) { _enter("{%d},%d", call->debug_id, abort_code); - lock_sock(&call->socket->sk); + lock_sock(sock->sk); _debug("CALL %d USR %lx ST %d on CONN %p", call->debug_id, call->user_call_ID, call->state, call->conn); - if (call->state < RXRPC_CALL_COMPLETE) - rxrpc_send_abort(call, abort_code); + rxrpc_send_abort(call, abort_code); - release_sock(&call->socket->sk); + release_sock(sock->sk); _leave(""); } @@ -640,8 +642,8 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, /* check for the far side aborting the call or a network error * occurring */ - if (call->state > RXRPC_CALL_COMPLETE) - goto call_aborted; + if (call->state == RXRPC_CALL_COMPLETE) + goto call_terminated; /* add the packet to the send queue if it's now full */ if (sp->remain <= 0 || @@ -702,15 +704,9 @@ out: _leave(" = %d", ret); return ret; -call_aborted: +call_terminated: rxrpc_free_skb(skb); - if (call->state == RXRPC_CALL_NETWORK_ERROR) - ret = call->error_report < RXRPC_LOCAL_ERROR_OFFSET ? - call->error_report : - call->error_report - RXRPC_LOCAL_ERROR_OFFSET; - else - ret = -ECONNABORTED; - _leave(" = %d", ret); + _leave(" = %d", -call->error); return ret; maybe_error: diff --git a/net/rxrpc/peer_event.c b/net/rxrpc/peer_event.c index 8940674b5e08..27b9ecad007e 100644 --- a/net/rxrpc/peer_event.c +++ b/net/rxrpc/peer_event.c @@ -248,13 +248,21 @@ void rxrpc_peer_error_distributor(struct work_struct *work) struct rxrpc_peer *peer = container_of(work, struct rxrpc_peer, error_distributor); struct rxrpc_call *call; - int error_report; + enum rxrpc_call_completion compl; + bool queue; + int error; _enter(""); - error_report = READ_ONCE(peer->error_report); + error = READ_ONCE(peer->error_report); + if (error < RXRPC_LOCAL_ERROR_OFFSET) { + compl = RXRPC_CALL_NETWORK_ERROR; + } else { + compl = RXRPC_CALL_LOCAL_ERROR; + error -= RXRPC_LOCAL_ERROR_OFFSET; + } - _debug("ISSUE ERROR %d", error_report); + _debug("ISSUE ERROR %s %d", rxrpc_call_completions[compl], error); spin_lock_bh(&peer->lock); @@ -262,16 +270,17 @@ void rxrpc_peer_error_distributor(struct work_struct *work) call = hlist_entry(peer->error_targets.first, struct rxrpc_call, error_link); hlist_del_init(&call->error_link); + rxrpc_see_call(call); + queue = false; write_lock(&call->state_lock); - if (call->state != RXRPC_CALL_COMPLETE && - call->state < RXRPC_CALL_NETWORK_ERROR) { - call->error_report = error_report; - call->state = RXRPC_CALL_NETWORK_ERROR; + if (__rxrpc_set_call_completion(call, compl, 0, error)) { set_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events); - rxrpc_queue_call(call); + queue = true; } write_unlock(&call->state_lock); + if (queue) + rxrpc_queue_call(call); } spin_unlock_bh(&peer->lock); diff --git a/net/rxrpc/peer_object.c b/net/rxrpc/peer_object.c index 538e9831c699..aebc73ac16dc 100644 --- a/net/rxrpc/peer_object.c +++ b/net/rxrpc/peer_object.c @@ -313,3 +313,18 @@ void __rxrpc_put_peer(struct rxrpc_peer *peer) kfree_rcu(peer, rcu); } + +/** + * rxrpc_kernel_get_peer - Get the peer address of a call + * @sock: The socket on which the call is in progress. + * @call: The call to query + * @_srx: Where to place the result + * + * Get the address of the remote peer in a call. + */ +void rxrpc_kernel_get_peer(struct socket *sock, struct rxrpc_call *call, + struct sockaddr_rxrpc *_srx) +{ + *_srx = call->peer->srx; +} +EXPORT_SYMBOL(rxrpc_kernel_get_peer); diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c index 060fb4892c39..82c64055449d 100644 --- a/net/rxrpc/proc.c +++ b/net/rxrpc/proc.c @@ -22,7 +22,6 @@ static const char *const rxrpc_conn_states[RXRPC_CONN__NR_STATES] = { [RXRPC_CONN_SERVICE] = "SvSecure", [RXRPC_CONN_REMOTELY_ABORTED] = "RmtAbort", [RXRPC_CONN_LOCALLY_ABORTED] = "LocAbort", - [RXRPC_CONN_NETWORK_ERROR] = "NetError", }; /* @@ -94,7 +93,7 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v) rxrpc_is_service_call(call) ? "Svc" : "Clt", atomic_read(&call->usage), rxrpc_call_states[call->state], - call->remote_abort ?: call->local_abort, + call->abort_code, call->user_call_ID); return 0; diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c index b964c2d49a88..c9b38c7fb448 100644 --- a/net/rxrpc/recvmsg.c +++ b/net/rxrpc/recvmsg.c @@ -115,6 +115,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, sp = rxrpc_skb(skb); call = sp->call; ASSERT(call != NULL); + rxrpc_see_call(call); _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]); @@ -294,12 +295,17 @@ receive_non_data_message: ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code); break; case RXRPC_SKB_MARK_REMOTE_ABORT: - abort_code = call->remote_abort; + abort_code = call->abort_code; ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); break; case RXRPC_SKB_MARK_LOCAL_ABORT: - abort_code = call->local_abort; + abort_code = call->abort_code; ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); + if (call->error) { + abort_code = call->error; + ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, + &abort_code); + } break; case RXRPC_SKB_MARK_NET_ERROR: _debug("RECV NET ERROR %d", sp->error); @@ -392,9 +398,8 @@ u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb) switch (skb->mark) { case RXRPC_SKB_MARK_REMOTE_ABORT: - return sp->call->remote_abort; case RXRPC_SKB_MARK_LOCAL_ABORT: - return sp->call->local_abort; + return sp->call->abort_code; default: BUG(); } diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c index fbd8c74d9505..20529205bb8c 100644 --- a/net/rxrpc/skbuff.c +++ b/net/rxrpc/skbuff.c @@ -140,9 +140,7 @@ void rxrpc_packet_destructor(struct sk_buff *skb) _enter("%p{%p}", skb, call); if (call) { - if (atomic_dec_return(&call->skb_count) < 0) - BUG(); - rxrpc_put_call(call); + rxrpc_put_call_for_skb(call, skb); sp->call = NULL; } |