diff options
Diffstat (limited to 'net/rxrpc/call_accept.c')
-rw-r--r-- | net/rxrpc/call_accept.c | 472 |
1 files changed, 189 insertions, 283 deletions
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c index cc7194e05a15..b8acec0d596e 100644 --- a/net/rxrpc/call_accept.c +++ b/net/rxrpc/call_accept.c @@ -129,6 +129,8 @@ static int rxrpc_service_prealloc_one(struct rxrpc_sock *rx, set_bit(RXRPC_CALL_HAS_USERID, &call->flags); } + list_add(&call->sock_link, &rx->sock_calls); + write_unlock(&rx->call_lock); write_lock(&rxrpc_call_lock); @@ -186,6 +188,12 @@ void rxrpc_discard_prealloc(struct rxrpc_sock *rx) return; rx->backlog = NULL; + /* Make sure that there aren't any incoming calls in progress before we + * clear the preallocation buffers. + */ + spin_lock_bh(&rx->incoming_lock); + spin_unlock_bh(&rx->incoming_lock); + head = b->peer_backlog_head; tail = b->peer_backlog_tail; while (CIRC_CNT(head, tail, size) > 0) { @@ -224,251 +232,179 @@ void rxrpc_discard_prealloc(struct rxrpc_sock *rx) } /* - * generate a connection-level abort + * Allocate a new incoming call from the prealloc pool, along with a connection + * and a peer as necessary. */ -static int rxrpc_busy(struct rxrpc_local *local, struct sockaddr_rxrpc *srx, - struct rxrpc_wire_header *whdr) +static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx, + struct rxrpc_local *local, + struct rxrpc_connection *conn, + struct sk_buff *skb) { - struct msghdr msg; - struct kvec iov[1]; - size_t len; - int ret; - - _enter("%d,,", local->debug_id); - - whdr->type = RXRPC_PACKET_TYPE_BUSY; - whdr->serial = htonl(1); - - msg.msg_name = &srx->transport.sin; - msg.msg_namelen = sizeof(srx->transport.sin); - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_flags = 0; - - iov[0].iov_base = whdr; - iov[0].iov_len = sizeof(*whdr); - - len = iov[0].iov_len; - - _proto("Tx BUSY %%1"); + struct rxrpc_backlog *b = rx->backlog; + struct rxrpc_peer *peer, *xpeer; + struct rxrpc_call *call; + unsigned short call_head, conn_head, peer_head; + unsigned short call_tail, conn_tail, peer_tail; + unsigned short call_count, conn_count; + + /* #calls >= #conns >= #peers must hold true. */ + call_head = smp_load_acquire(&b->call_backlog_head); + call_tail = b->call_backlog_tail; + call_count = CIRC_CNT(call_head, call_tail, RXRPC_BACKLOG_MAX); + conn_head = smp_load_acquire(&b->conn_backlog_head); + conn_tail = b->conn_backlog_tail; + conn_count = CIRC_CNT(conn_head, conn_tail, RXRPC_BACKLOG_MAX); + ASSERTCMP(conn_count, >=, call_count); + peer_head = smp_load_acquire(&b->peer_backlog_head); + peer_tail = b->peer_backlog_tail; + ASSERTCMP(CIRC_CNT(peer_head, peer_tail, RXRPC_BACKLOG_MAX), >=, + conn_count); + + if (call_count == 0) + return NULL; + + if (!conn) { + /* No connection. We're going to need a peer to start off + * with. If one doesn't yet exist, use a spare from the + * preallocation set. We dump the address into the spare in + * anticipation - and to save on stack space. + */ + xpeer = b->peer_backlog[peer_tail]; + if (rxrpc_extract_addr_from_skb(&xpeer->srx, skb) < 0) + return NULL; + + peer = rxrpc_lookup_incoming_peer(local, xpeer); + if (peer == xpeer) { + b->peer_backlog[peer_tail] = NULL; + smp_store_release(&b->peer_backlog_tail, + (peer_tail + 1) & + (RXRPC_BACKLOG_MAX - 1)); + } - ret = kernel_sendmsg(local->socket, &msg, iov, 1, len); - if (ret < 0) { - _leave(" = -EAGAIN [sendmsg failed: %d]", ret); - return -EAGAIN; + /* Now allocate and set up the connection */ + conn = b->conn_backlog[conn_tail]; + b->conn_backlog[conn_tail] = NULL; + smp_store_release(&b->conn_backlog_tail, + (conn_tail + 1) & (RXRPC_BACKLOG_MAX - 1)); + rxrpc_get_local(local); + conn->params.local = local; + conn->params.peer = peer; + rxrpc_new_incoming_connection(conn, skb); + } else { + rxrpc_get_connection(conn); } - _leave(" = 0"); - return 0; + /* And now we can allocate and set up a new call */ + call = b->call_backlog[call_tail]; + b->call_backlog[call_tail] = NULL; + smp_store_release(&b->call_backlog_tail, + (call_tail + 1) & (RXRPC_BACKLOG_MAX - 1)); + + call->conn = conn; + call->peer = rxrpc_get_peer(conn->params.peer); + return call; } /* - * accept an incoming call that needs peer, transport and/or connection setting - * up + * Set up a new incoming call. Called in BH context with the RCU read lock + * held. + * + * If this is for a kernel service, when we allocate the call, it will have + * three refs on it: (1) the kernel service, (2) the user_call_ID tree, (3) the + * retainer ref obtained from the backlog buffer. Prealloc calls for userspace + * services only have the ref from the backlog buffer. We want to pass this + * ref to non-BH context to dispose of. + * + * If we want to report an error, we mark the skb with the packet type and + * abort code and return NULL. */ -static int rxrpc_accept_incoming_call(struct rxrpc_local *local, - struct rxrpc_sock *rx, - struct sk_buff *skb, - struct sockaddr_rxrpc *srx) +struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local, + struct rxrpc_connection *conn, + struct sk_buff *skb) { - struct rxrpc_connection *conn; - struct rxrpc_skb_priv *sp, *nsp; + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + struct rxrpc_sock *rx; struct rxrpc_call *call; - struct sk_buff *notification; - int ret; _enter(""); - sp = rxrpc_skb(skb); - - /* get a notification message to send to the server app */ - notification = alloc_skb(0, GFP_NOFS); - if (!notification) { - _debug("no memory"); - ret = -ENOMEM; - goto error_nofree; - } - rxrpc_new_skb(notification); - notification->mark = RXRPC_SKB_MARK_NEW_CALL; - - conn = rxrpc_incoming_connection(local, srx, skb); - if (IS_ERR(conn)) { - _debug("no conn"); - ret = PTR_ERR(conn); - goto error; - } - - call = rxrpc_incoming_call(rx, conn, skb); - rxrpc_put_connection(conn); - if (IS_ERR(call)) { - _debug("no call"); - ret = PTR_ERR(call); - goto error; + /* Get the socket providing the service */ + hlist_for_each_entry_rcu_bh(rx, &local->services, listen_link) { + if (rx->srx.srx_service == sp->hdr.serviceId) + goto found_service; } - /* attach the call to the socket */ - read_lock_bh(&local->services_lock); - if (rx->sk.sk_state == RXRPC_CLOSE) - goto invalid_service; - - write_lock(&rx->call_lock); - if (!test_and_set_bit(RXRPC_CALL_INIT_ACCEPT, &call->flags)) { - rxrpc_get_call(call, rxrpc_call_got); - - spin_lock(&call->conn->state_lock); - if (sp->hdr.securityIndex > 0 && - call->conn->state == RXRPC_CONN_SERVICE_UNSECURED) { - _debug("await conn sec"); - list_add_tail(&call->accept_link, &rx->secureq); - call->conn->state = RXRPC_CONN_SERVICE_CHALLENGING; - set_bit(RXRPC_CONN_EV_CHALLENGE, &call->conn->events); - rxrpc_queue_conn(call->conn); - } else { - _debug("conn ready"); - call->state = RXRPC_CALL_SERVER_ACCEPTING; - list_add_tail(&call->accept_link, &rx->acceptq); - rxrpc_get_call_for_skb(call, notification); - nsp = rxrpc_skb(notification); - nsp->call = call; - - ASSERTCMP(atomic_read(&call->usage), >=, 3); - - _debug("notify"); - spin_lock(&call->lock); - ret = rxrpc_queue_rcv_skb(call, notification, true, - false); - spin_unlock(&call->lock); - notification = NULL; - BUG_ON(ret < 0); - } - spin_unlock(&call->conn->state_lock); + trace_rxrpc_abort("INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, + RX_INVALID_OPERATION, EOPNOTSUPP); + skb->mark = RXRPC_SKB_MARK_LOCAL_ABORT; + skb->priority = RX_INVALID_OPERATION; + _leave(" = NULL [service]"); + return NULL; - _debug("queued"); +found_service: + spin_lock(&rx->incoming_lock); + if (rx->sk.sk_state == RXRPC_CLOSE) { + trace_rxrpc_abort("CLS", sp->hdr.cid, sp->hdr.callNumber, + sp->hdr.seq, RX_INVALID_OPERATION, ESHUTDOWN); + skb->mark = RXRPC_SKB_MARK_LOCAL_ABORT; + skb->priority = RX_INVALID_OPERATION; + _leave(" = NULL [close]"); + call = NULL; + goto out; } - write_unlock(&rx->call_lock); - _debug("process"); - rxrpc_fast_process_packet(call, skb); - - _debug("done"); - read_unlock_bh(&local->services_lock); - rxrpc_free_skb(notification); - rxrpc_put_call(call, rxrpc_call_put); - _leave(" = 0"); - return 0; - -invalid_service: - _debug("invalid"); - read_unlock_bh(&local->services_lock); - - rxrpc_release_call(rx, call); - rxrpc_put_call(call, rxrpc_call_put); - ret = -ECONNREFUSED; -error: - rxrpc_free_skb(notification); -error_nofree: - _leave(" = %d", ret); - return ret; -} + call = rxrpc_alloc_incoming_call(rx, local, conn, skb); + if (!call) { + skb->mark = RXRPC_SKB_MARK_BUSY; + _leave(" = NULL [busy]"); + call = NULL; + goto out; + } -/* - * accept incoming calls that need peer, transport and/or connection setting up - * - the packets we get are all incoming client DATA packets that have seq == 1 - */ -void rxrpc_accept_incoming_calls(struct rxrpc_local *local) -{ - struct rxrpc_skb_priv *sp; - struct sockaddr_rxrpc srx; - struct rxrpc_sock *rx; - struct rxrpc_wire_header whdr; - struct sk_buff *skb; - int ret; + /* Make the call live. */ + rxrpc_incoming_call(rx, call, skb); + conn = call->conn; - _enter("%d", local->debug_id); + if (rx->notify_new_call) + rx->notify_new_call(&rx->sk, call, call->user_call_ID); - skb = skb_dequeue(&local->accept_queue); - if (!skb) { - _leave("\n"); - return; - } + spin_lock(&conn->state_lock); + switch (conn->state) { + case RXRPC_CONN_SERVICE_UNSECURED: + conn->state = RXRPC_CONN_SERVICE_CHALLENGING; + set_bit(RXRPC_CONN_EV_CHALLENGE, &call->conn->events); + rxrpc_queue_conn(call->conn); + break; - _net("incoming call skb %p", skb); - - rxrpc_see_skb(skb); - sp = rxrpc_skb(skb); - - /* Set up a response packet header in case we need it */ - whdr.epoch = htonl(sp->hdr.epoch); - whdr.cid = htonl(sp->hdr.cid); - whdr.callNumber = htonl(sp->hdr.callNumber); - whdr.seq = htonl(sp->hdr.seq); - whdr.serial = 0; - whdr.flags = 0; - whdr.type = 0; - whdr.userStatus = 0; - whdr.securityIndex = sp->hdr.securityIndex; - whdr._rsvd = 0; - whdr.serviceId = htons(sp->hdr.serviceId); - - if (rxrpc_extract_addr_from_skb(&srx, skb) < 0) - goto drop; - - /* get the socket providing the service */ - read_lock_bh(&local->services_lock); - hlist_for_each_entry(rx, &local->services, listen_link) { - if (rx->srx.srx_service == sp->hdr.serviceId && - rx->sk.sk_state != RXRPC_CLOSE) - goto found_service; - } - read_unlock_bh(&local->services_lock); - goto invalid_service; + case RXRPC_CONN_SERVICE: + write_lock(&call->state_lock); + if (rx->discard_new_call) + call->state = RXRPC_CALL_SERVER_RECV_REQUEST; + else + call->state = RXRPC_CALL_SERVER_ACCEPTING; + write_unlock(&call->state_lock); + break; -found_service: - _debug("found service %hd", rx->srx.srx_service); - if (sk_acceptq_is_full(&rx->sk)) - goto backlog_full; - sk_acceptq_added(&rx->sk); - read_unlock_bh(&local->services_lock); - - ret = rxrpc_accept_incoming_call(local, rx, skb, &srx); - if (ret < 0) - sk_acceptq_removed(&rx->sk); - switch (ret) { - case -ECONNRESET: /* old calls are ignored */ - case -ECONNABORTED: /* aborted calls are reaborted or ignored */ - case 0: - return; - case -ECONNREFUSED: - goto invalid_service; - case -EBUSY: - goto busy; - case -EKEYREJECTED: - goto security_mismatch; + case RXRPC_CONN_REMOTELY_ABORTED: + rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, + conn->remote_abort, ECONNABORTED); + break; + case RXRPC_CONN_LOCALLY_ABORTED: + rxrpc_abort_call("CON", call, sp->hdr.seq, + conn->local_abort, ECONNABORTED); + break; default: BUG(); } + spin_unlock(&conn->state_lock); -backlog_full: - read_unlock_bh(&local->services_lock); -busy: - rxrpc_busy(local, &srx, &whdr); - rxrpc_free_skb(skb); - return; - -drop: - rxrpc_free_skb(skb); - return; + if (call->state == RXRPC_CALL_SERVER_ACCEPTING) + rxrpc_notify_socket(call); -invalid_service: - skb->priority = RX_INVALID_OPERATION; - rxrpc_reject_packet(local, skb); - return; - - /* can't change connection security type mid-flow */ -security_mismatch: - skb->priority = RX_PROTOCOL_ERROR; - rxrpc_reject_packet(local, skb); - return; + _leave(" = %p{%d}", call, call->debug_id); +out: + spin_unlock(&rx->incoming_lock); + return call; } /* @@ -490,11 +426,10 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx, write_lock(&rx->call_lock); ret = -ENODATA; - if (list_empty(&rx->acceptq)) + if (list_empty(&rx->to_be_accepted)) goto out; /* check the user ID isn't already in use */ - ret = -EBADSLT; pp = &rx->calls.rb_node; parent = NULL; while (*pp) { @@ -506,11 +441,14 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx, else if (user_call_ID > call->user_call_ID) pp = &(*pp)->rb_right; else - goto out; + goto id_in_use; } - /* dequeue the first call and check it's still valid */ - call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link); + /* Dequeue the first call and check it's still valid. We gain + * responsibility for the queue's reference. + */ + call = list_entry(rx->to_be_accepted.next, + struct rxrpc_call, accept_link); list_del_init(&call->accept_link); sk_acceptq_removed(&rx->sk); rxrpc_see_call(call); @@ -528,31 +466,35 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx, } /* formalise the acceptance */ - rxrpc_get_call(call, rxrpc_call_got_userid); + rxrpc_get_call(call, rxrpc_call_got); call->notify_rx = notify_rx; call->user_call_ID = user_call_ID; + rxrpc_get_call(call, rxrpc_call_got_userid); rb_link_node(&call->sock_node, parent, pp); rb_insert_color(&call->sock_node, &rx->calls); if (test_and_set_bit(RXRPC_CALL_HAS_USERID, &call->flags)) BUG(); - if (test_and_set_bit(RXRPC_CALL_EV_ACCEPTED, &call->events)) - BUG(); write_unlock_bh(&call->state_lock); write_unlock(&rx->call_lock); - rxrpc_queue_call(call); + rxrpc_notify_socket(call); + rxrpc_service_prealloc(rx, GFP_KERNEL); _leave(" = %p{%d}", call, call->debug_id); return call; out_release: + _debug("release %p", call); write_unlock_bh(&call->state_lock); write_unlock(&rx->call_lock); - _debug("release %p", call); rxrpc_release_call(rx, call); - _leave(" = %d", ret); - return ERR_PTR(ret); -out: + rxrpc_put_call(call, rxrpc_call_put); + goto out; + +id_in_use: + ret = -EBADSLT; write_unlock(&rx->call_lock); +out: + rxrpc_service_prealloc(rx, GFP_KERNEL); _leave(" = %d", ret); return ERR_PTR(ret); } @@ -564,6 +506,7 @@ out: int rxrpc_reject_call(struct rxrpc_sock *rx) { struct rxrpc_call *call; + bool abort = false; int ret; _enter(""); @@ -572,15 +515,16 @@ int rxrpc_reject_call(struct rxrpc_sock *rx) write_lock(&rx->call_lock); - ret = -ENODATA; - if (list_empty(&rx->acceptq)) { + if (list_empty(&rx->to_be_accepted)) { write_unlock(&rx->call_lock); - _leave(" = -ENODATA"); return -ENODATA; } - /* dequeue the first call and check it's still valid */ - call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link); + /* Dequeue the first call and check it's still valid. We gain + * responsibility for the queue's reference. + */ + call = list_entry(rx->to_be_accepted.next, + struct rxrpc_call, accept_link); list_del_init(&call->accept_link); sk_acceptq_removed(&rx->sk); rxrpc_see_call(call); @@ -588,66 +532,28 @@ int rxrpc_reject_call(struct rxrpc_sock *rx) write_lock_bh(&call->state_lock); switch (call->state) { case RXRPC_CALL_SERVER_ACCEPTING: - __rxrpc_set_call_completion(call, RXRPC_CALL_SERVER_BUSY, - 0, ECONNABORTED); - if (test_and_set_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events)) - rxrpc_queue_call(call); - ret = 0; - break; + __rxrpc_abort_call("REJ", call, 1, RX_USER_ABORT, ECONNABORTED); + abort = true; + /* fall through */ case RXRPC_CALL_COMPLETE: ret = call->error; - break; + goto out_discard; default: BUG(); } +out_discard: write_unlock_bh(&call->state_lock); write_unlock(&rx->call_lock); - rxrpc_release_call(rx, call); - _leave(" = %d", ret); - return ret; -} - -/** - * rxrpc_kernel_accept_call - Allow a kernel service to accept an incoming call - * @sock: The socket on which the impending call is waiting - * @user_call_ID: The tag to attach to the call - * @notify_rx: Where to send notifications instead of socket queue - * - * Allow a kernel service to accept an incoming call, assuming the incoming - * call is still valid. The caller should immediately trigger their own - * notification as there must be data waiting. - */ -struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *sock, - unsigned long user_call_ID, - rxrpc_notify_rx_t notify_rx) -{ - struct rxrpc_call *call; - - _enter(",%lx", user_call_ID); - call = rxrpc_accept_call(rxrpc_sk(sock->sk), user_call_ID, notify_rx); - _leave(" = %p", call); - return call; -} -EXPORT_SYMBOL(rxrpc_kernel_accept_call); - -/** - * rxrpc_kernel_reject_call - Allow a kernel service to reject an incoming call - * @sock: The socket on which the impending call is waiting - * - * Allow a kernel service to reject an incoming call with a BUSY message, - * assuming the incoming call is still valid. - */ -int rxrpc_kernel_reject_call(struct socket *sock) -{ - int ret; - - _enter(""); - ret = rxrpc_reject_call(rxrpc_sk(sock->sk)); + if (abort) { + rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ABORT); + rxrpc_release_call(rx, call); + rxrpc_put_call(call, rxrpc_call_put); + } + rxrpc_service_prealloc(rx, GFP_KERNEL); _leave(" = %d", ret); return ret; } -EXPORT_SYMBOL(rxrpc_kernel_reject_call); /* * rxrpc_kernel_charge_accept - Charge up socket with preallocated calls |