diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2021-05-07 20:23:41 +0200 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2021-05-07 20:23:41 +0200 |
commit | a647034fe26b92702d5084b518c061e3cebefbaf (patch) | |
tree | 7b76983fac97c7ccc821dfb7addc72a6ba7251ee /net | |
parent | Merge tag '9p-for-5.13-rc1' of git://github.com/martinetd/linux (diff) | |
parent | xprtrdma: Fix a NULL dereference in frwr_unmap_sync() (diff) | |
download | linux-a647034fe26b92702d5084b518c061e3cebefbaf.tar.xz linux-a647034fe26b92702d5084b518c061e3cebefbaf.zip |
Merge tag 'nfs-for-5.13-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust:
"Highlights include:
Stable fixes:
- Add validation of the UDP retrans parameter to prevent shift
out-of-bounds
- Don't discard pNFS layout segments that are marked for return
Bugfixes:
- Fix a NULL dereference crash in xprt_complete_bc_request() when the
NFSv4.1 server misbehaves.
- Fix the handling of NFS READDIR cookie verifiers
- Sundry fixes to ensure attribute revalidation works correctly when
the server does not return post-op attributes.
- nfs4_bitmask_adjust() must not change the server global bitmasks
- Fix major timeout handling in the RPC code.
- NFSv4.2 fallocate() fixes.
- Fix the NFSv4.2 SEEK_HOLE/SEEK_DATA end-of-file handling
- Copy offload attribute revalidation fixes
- Fix an incorrect filehandle size check in the pNFS flexfiles driver
- Fix several RDMA transport setup/teardown races
- Fix several RDMA queue wrapping issues
- Fix a misplaced memory read barrier in sunrpc's call_decode()
Features:
- Micro optimisation of the TCP transmission queue using TCP_CORK
- statx() performance improvements by further splitting up the
tracking of invalid cached file metadata.
- Support the NFSv4.2 'change_attr_type' attribute and use it to
optimise handling of change attribute updates"
* tag 'nfs-for-5.13-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (85 commits)
xprtrdma: Fix a NULL dereference in frwr_unmap_sync()
sunrpc: Fix misplaced barrier in call_decode
NFSv4.2: Remove ifdef CONFIG_NFSD from NFSv4.2 client SSC code.
xprtrdma: Move fr_mr field to struct rpcrdma_mr
xprtrdma: Move the Work Request union to struct rpcrdma_mr
xprtrdma: Move fr_linv_done field to struct rpcrdma_mr
xprtrdma: Move cqe to struct rpcrdma_mr
xprtrdma: Move fr_cid to struct rpcrdma_mr
xprtrdma: Remove the RPC/RDMA QP event handler
xprtrdma: Don't display r_xprt memory addresses in tracepoints
xprtrdma: Add an rpcrdma_mr_completion_class
xprtrdma: Add tracepoints showing FastReg WRs and remote invalidation
xprtrdma: Avoid Send Queue wrapping
xprtrdma: Do not wake RPC consumer on a failed LocalInv
xprtrdma: Do not recycle MR after FastReg/LocalInv flushes
xprtrdma: Clarify use of barrier in frwr_wc_localinv_done()
xprtrdma: Rename frwr_release_mr()
xprtrdma: rpcrdma_mr_pop() already does list_del_init()
xprtrdma: Delete rpcrdma_recv_buffer_put()
xprtrdma: Fix cwnd update ordering
...
Diffstat (limited to 'net')
-rw-r--r-- | net/sunrpc/clnt.c | 12 | ||||
-rw-r--r-- | net/sunrpc/rpcb_clnt.c | 7 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 18 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 4 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 209 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 39 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 6 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 131 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 29 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 9 |
10 files changed, 237 insertions, 227 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 612f0a641f4c..f555d335e910 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1799,7 +1799,6 @@ call_allocate(struct rpc_task *task) status = xprt->ops->buf_alloc(task); trace_rpc_buf_alloc(task, status); - xprt_inject_disconnect(xprt); if (status == 0) return; if (status != -ENOMEM) { @@ -2458,12 +2457,6 @@ call_decode(struct rpc_task *task) } /* - * Ensure that we see all writes made by xprt_complete_rqst() - * before it changed req->rq_reply_bytes_recvd. - */ - smp_rmb(); - - /* * Did we ever call xprt_complete_rqst()? If not, we should assume * the message is incomplete. */ @@ -2471,6 +2464,11 @@ call_decode(struct rpc_task *task) if (!req->rq_reply_bytes_recvd) goto out; + /* Ensure that we see all writes made by xprt_complete_rqst() + * before it changed req->rq_reply_bytes_recvd. + */ + smp_rmb(); + req->rq_rcv_buf.len = req->rq_private_buf.len; trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf); diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c index 38fe2ce8a5aa..647b323cc1d5 100644 --- a/net/sunrpc/rpcb_clnt.c +++ b/net/sunrpc/rpcb_clnt.c @@ -344,13 +344,15 @@ static struct rpc_clnt *rpcb_create(struct net *net, const char *nodename, const char *hostname, struct sockaddr *srvaddr, size_t salen, int proto, u32 version, - const struct cred *cred) + const struct cred *cred, + const struct rpc_timeout *timeo) { struct rpc_create_args args = { .net = net, .protocol = proto, .address = srvaddr, .addrsize = salen, + .timeout = timeo, .servername = hostname, .nodename = nodename, .program = &rpcb_program, @@ -705,7 +707,8 @@ void rpcb_getport_async(struct rpc_task *task) clnt->cl_nodename, xprt->servername, sap, salen, xprt->prot, bind_version, - clnt->cl_cred); + clnt->cl_cred, + task->tk_client->cl_timeout); if (IS_ERR(rpcb_clnt)) { status = PTR_ERR(rpcb_clnt); goto bailout_nofree; diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 691ccf8049a4..e5b5a960a69b 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -698,9 +698,9 @@ int xprt_adjust_timeout(struct rpc_rqst *req) const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; int status = 0; - if (time_before(jiffies, req->rq_minortimeo)) - return status; if (time_before(jiffies, req->rq_majortimeo)) { + if (time_before(jiffies, req->rq_minortimeo)) + return status; if (to->to_exponential) req->rq_timeout <<= 1; else @@ -1352,6 +1352,7 @@ xprt_request_enqueue_transmit(struct rpc_task *task) list_add_tail(&req->rq_xmit, &xprt->xmit_queue); INIT_LIST_HEAD(&req->rq_xmit2); out: + atomic_long_inc(&xprt->xmit_queuelen); set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); spin_unlock(&xprt->queue_lock); } @@ -1381,6 +1382,7 @@ xprt_request_dequeue_transmit_locked(struct rpc_task *task) } } else list_del(&req->rq_xmit2); + atomic_long_dec(&req->rq_xprt->xmit_queuelen); } /** @@ -1469,8 +1471,6 @@ bool xprt_prepare_transmit(struct rpc_task *task) struct rpc_xprt *xprt = req->rq_xprt; if (!xprt_lock_write(xprt, task)) { - trace_xprt_transmit_queued(xprt, task); - /* Race breaker: someone may have transmitted us */ if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) rpc_wake_up_queued_task_set_status(&xprt->sending, @@ -1483,7 +1483,10 @@ bool xprt_prepare_transmit(struct rpc_task *task) void xprt_end_transmit(struct rpc_task *task) { - xprt_release_write(task->tk_rqstp->rq_xprt, task); + struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; + + xprt_inject_disconnect(xprt); + xprt_release_write(xprt, task); } /** @@ -1537,8 +1540,10 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) return status; } - if (is_retrans) + if (is_retrans) { task->tk_client->cl_stats->rpcretrans++; + trace_xprt_retransmit(req); + } xprt_inject_disconnect(xprt); @@ -1885,7 +1890,6 @@ void xprt_release(struct rpc_task *task) spin_unlock(&xprt->transport_lock); if (req->rq_buffer) xprt->ops->buf_free(task); - xprt_inject_disconnect(xprt); xdr_free_bvec(&req->rq_rcv_buf); xdr_free_bvec(&req->rq_snd_buf); if (req->rq_cred != NULL) diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index a249837d6a55..1151efd09b27 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -155,9 +155,11 @@ void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) { struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + struct rpcrdma_rep *rep = req->rl_reply; struct rpc_xprt *xprt = rqst->rq_xprt; + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); - rpcrdma_recv_buffer_put(req->rl_reply); + rpcrdma_rep_put(&r_xprt->rx_buf, rep); req->rl_reply = NULL; spin_lock(&xprt->bc_pa_lock); diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 766a1048a48a..229fcc9a9064 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -49,20 +49,13 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif -/** - * frwr_release_mr - Destroy one MR - * @mr: MR allocated by frwr_mr_init - * - */ -void frwr_release_mr(struct rpcrdma_mr *mr) +static void frwr_cid_init(struct rpcrdma_ep *ep, + struct rpcrdma_mr *mr) { - int rc; + struct rpc_rdma_cid *cid = &mr->mr_cid; - rc = ib_dereg_mr(mr->frwr.fr_mr); - if (rc) - trace_xprtrdma_frwr_dereg(mr, rc); - kfree(mr->mr_sg); - kfree(mr); + cid->ci_queue_id = ep->re_attr.send_cq->res.id; + cid->ci_completion_id = mr->mr_ibmr->res.id; } static void frwr_mr_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) @@ -75,20 +68,22 @@ static void frwr_mr_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) } } -static void frwr_mr_recycle(struct rpcrdma_mr *mr) +/** + * frwr_mr_release - Destroy one MR + * @mr: MR allocated by frwr_mr_init + * + */ +void frwr_mr_release(struct rpcrdma_mr *mr) { - struct rpcrdma_xprt *r_xprt = mr->mr_xprt; - - trace_xprtrdma_mr_recycle(mr); - - frwr_mr_unmap(r_xprt, mr); + int rc; - spin_lock(&r_xprt->rx_buf.rb_lock); - list_del(&mr->mr_all); - r_xprt->rx_stats.mrs_recycled++; - spin_unlock(&r_xprt->rx_buf.rb_lock); + frwr_mr_unmap(mr->mr_xprt, mr); - frwr_release_mr(mr); + rc = ib_dereg_mr(mr->mr_ibmr); + if (rc) + trace_xprtrdma_frwr_dereg(mr, rc); + kfree(mr->mr_sg); + kfree(mr); } static void frwr_mr_put(struct rpcrdma_mr *mr) @@ -144,10 +139,11 @@ int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) goto out_list_err; mr->mr_xprt = r_xprt; - mr->frwr.fr_mr = frmr; + mr->mr_ibmr = frmr; mr->mr_device = NULL; INIT_LIST_HEAD(&mr->mr_list); - init_completion(&mr->frwr.fr_linv_done); + init_completion(&mr->mr_linv_done); + frwr_cid_init(ep, mr); sg_init_table(sg, depth); mr->mr_sg = sg; @@ -257,6 +253,7 @@ int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device) ep->re_attr.cap.max_send_wr += 1; /* for ib_drain_sq */ ep->re_attr.cap.max_recv_wr = ep->re_max_requests; ep->re_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; + ep->re_attr.cap.max_recv_wr += RPCRDMA_MAX_RECV_BATCH; ep->re_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */ ep->re_max_rdma_segs = @@ -326,7 +323,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, goto out_dmamap_err; mr->mr_device = ep->re_id->device; - ibmr = mr->frwr.fr_mr; + ibmr = mr->mr_ibmr; n = ib_map_mr_sg(ibmr, mr->mr_sg, dma_nents, NULL, PAGE_SIZE); if (n != dma_nents) goto out_mapmr_err; @@ -336,7 +333,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, key = (u8)(ibmr->rkey & 0x000000FF); ib_update_fast_reg_key(ibmr, ++key); - reg_wr = &mr->frwr.fr_regwr; + reg_wr = &mr->mr_regwr; reg_wr->mr = ibmr; reg_wr->key = ibmr->rkey; reg_wr->access = writing ? @@ -364,29 +361,19 @@ out_mapmr_err: * @cq: completion queue * @wc: WCE for a completed FastReg WR * + * Each flushed MR gets destroyed after the QP has drained. */ static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); + struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ - trace_xprtrdma_wc_fastreg(wc, &frwr->fr_cid); - /* The MR will get recycled when the associated req is retransmitted */ + trace_xprtrdma_wc_fastreg(wc, &mr->mr_cid); rpcrdma_flush_disconnect(cq->cq_context, wc); } -static void frwr_cid_init(struct rpcrdma_ep *ep, - struct rpcrdma_frwr *frwr) -{ - struct rpc_rdma_cid *cid = &frwr->fr_cid; - - cid->ci_queue_id = ep->re_attr.send_cq->res.id; - cid->ci_completion_id = frwr->fr_mr->res.id; -} - /** * frwr_send - post Send WRs containing the RPC Call message * @r_xprt: controlling transport instance @@ -403,27 +390,36 @@ static void frwr_cid_init(struct rpcrdma_ep *ep, */ int frwr_send(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { + struct ib_send_wr *post_wr, *send_wr = &req->rl_wr; struct rpcrdma_ep *ep = r_xprt->rx_ep; - struct ib_send_wr *post_wr; struct rpcrdma_mr *mr; + unsigned int num_wrs; - post_wr = &req->rl_wr; + num_wrs = 1; + post_wr = send_wr; list_for_each_entry(mr, &req->rl_registered, mr_list) { - struct rpcrdma_frwr *frwr; - - frwr = &mr->frwr; - - frwr->fr_cqe.done = frwr_wc_fastreg; - frwr_cid_init(ep, frwr); - frwr->fr_regwr.wr.next = post_wr; - frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe; - frwr->fr_regwr.wr.num_sge = 0; - frwr->fr_regwr.wr.opcode = IB_WR_REG_MR; - frwr->fr_regwr.wr.send_flags = 0; + trace_xprtrdma_mr_fastreg(mr); + + mr->mr_cqe.done = frwr_wc_fastreg; + mr->mr_regwr.wr.next = post_wr; + mr->mr_regwr.wr.wr_cqe = &mr->mr_cqe; + mr->mr_regwr.wr.num_sge = 0; + mr->mr_regwr.wr.opcode = IB_WR_REG_MR; + mr->mr_regwr.wr.send_flags = 0; + post_wr = &mr->mr_regwr.wr; + ++num_wrs; + } - post_wr = &frwr->fr_regwr.wr; + if ((kref_read(&req->rl_kref) > 1) || num_wrs > ep->re_send_count) { + send_wr->send_flags |= IB_SEND_SIGNALED; + ep->re_send_count = min_t(unsigned int, ep->re_send_batch, + num_wrs - ep->re_send_count); + } else { + send_wr->send_flags &= ~IB_SEND_SIGNALED; + ep->re_send_count -= num_wrs; } + trace_xprtrdma_post_send(req); return ib_post_send(ep->re_id->qp, post_wr, NULL); } @@ -440,6 +436,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) list_for_each_entry(mr, mrs, mr_list) if (mr->mr_handle == rep->rr_inv_rkey) { list_del_init(&mr->mr_list); + trace_xprtrdma_mr_reminv(mr); frwr_mr_put(mr); break; /* only one invalidated MR per RPC */ } @@ -447,9 +444,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) static void frwr_mr_done(struct ib_wc *wc, struct rpcrdma_mr *mr) { - if (wc->status != IB_WC_SUCCESS) - frwr_mr_recycle(mr); - else + if (likely(wc->status == IB_WC_SUCCESS)) frwr_mr_put(mr); } @@ -462,12 +457,10 @@ static void frwr_mr_done(struct ib_wc *wc, struct rpcrdma_mr *mr) static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); - struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ - trace_xprtrdma_wc_li(wc, &frwr->fr_cid); + trace_xprtrdma_wc_li(wc, &mr->mr_cid); frwr_mr_done(wc, mr); rpcrdma_flush_disconnect(cq->cq_context, wc); @@ -483,14 +476,12 @@ static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); - struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ - trace_xprtrdma_wc_li_wake(wc, &frwr->fr_cid); + trace_xprtrdma_wc_li_wake(wc, &mr->mr_cid); frwr_mr_done(wc, mr); - complete(&frwr->fr_linv_done); + complete(&mr->mr_linv_done); rpcrdma_flush_disconnect(cq->cq_context, wc); } @@ -511,7 +502,6 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) struct ib_send_wr *first, **prev, *last; struct rpcrdma_ep *ep = r_xprt->rx_ep; const struct ib_send_wr *bad_wr; - struct rpcrdma_frwr *frwr; struct rpcrdma_mr *mr; int rc; @@ -520,35 +510,34 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * Chain the LOCAL_INV Work Requests and post them with * a single ib_post_send() call. */ - frwr = NULL; prev = &first; while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { trace_xprtrdma_mr_localinv(mr); r_xprt->rx_stats.local_inv_needed++; - frwr = &mr->frwr; - frwr->fr_cqe.done = frwr_wc_localinv; - frwr_cid_init(ep, frwr); - last = &frwr->fr_invwr; + last = &mr->mr_invwr; last->next = NULL; - last->wr_cqe = &frwr->fr_cqe; + last->wr_cqe = &mr->mr_cqe; last->sg_list = NULL; last->num_sge = 0; last->opcode = IB_WR_LOCAL_INV; last->send_flags = IB_SEND_SIGNALED; last->ex.invalidate_rkey = mr->mr_handle; + last->wr_cqe->done = frwr_wc_localinv; + *prev = last; prev = &last->next; } + mr = container_of(last, struct rpcrdma_mr, mr_invwr); /* Strong send queue ordering guarantees that when the * last WR in the chain completes, all WRs in the chain * are complete. */ - frwr->fr_cqe.done = frwr_wc_localinv_wake; - reinit_completion(&frwr->fr_linv_done); + last->wr_cqe->done = frwr_wc_localinv_wake; + reinit_completion(&mr->mr_linv_done); /* Transport disconnect drains the receive CQ before it * replaces the QP. The RPC reply handler won't call us @@ -562,22 +551,12 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * not happen, so don't wait in that case. */ if (bad_wr != first) - wait_for_completion(&frwr->fr_linv_done); + wait_for_completion(&mr->mr_linv_done); if (!rc) return; - /* Recycle MRs in the LOCAL_INV chain that did not get posted. - */ + /* On error, the MRs get destroyed once the QP has drained. */ trace_xprtrdma_post_linv_err(req, rc); - while (bad_wr) { - frwr = container_of(bad_wr, struct rpcrdma_frwr, - fr_invwr); - mr = container_of(frwr, struct rpcrdma_mr, frwr); - bad_wr = bad_wr->next; - - list_del_init(&mr->mr_list); - frwr_mr_recycle(mr); - } } /** @@ -589,20 +568,24 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); - struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); - struct rpcrdma_rep *rep = mr->mr_req->rl_reply; + struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe); + struct rpcrdma_rep *rep; /* WARNING: Only wr_cqe and status are reliable at this point */ - trace_xprtrdma_wc_li_done(wc, &frwr->fr_cid); - frwr_mr_done(wc, mr); + trace_xprtrdma_wc_li_done(wc, &mr->mr_cid); - /* Ensure @rep is generated before frwr_mr_done */ + /* Ensure that @rep is generated before the MR is released */ + rep = mr->mr_req->rl_reply; smp_rmb(); - rpcrdma_complete_rqst(rep); - rpcrdma_flush_disconnect(cq->cq_context, wc); + if (wc->status != IB_WC_SUCCESS) { + if (rep) + rpcrdma_unpin_rqst(rep); + rpcrdma_flush_disconnect(cq->cq_context, wc); + return; + } + frwr_mr_put(mr); + rpcrdma_complete_rqst(rep); } /** @@ -619,33 +602,29 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { struct ib_send_wr *first, *last, **prev; struct rpcrdma_ep *ep = r_xprt->rx_ep; - const struct ib_send_wr *bad_wr; - struct rpcrdma_frwr *frwr; struct rpcrdma_mr *mr; int rc; /* Chain the LOCAL_INV Work Requests and post them with * a single ib_post_send() call. */ - frwr = NULL; prev = &first; while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { trace_xprtrdma_mr_localinv(mr); r_xprt->rx_stats.local_inv_needed++; - frwr = &mr->frwr; - frwr->fr_cqe.done = frwr_wc_localinv; - frwr_cid_init(ep, frwr); - last = &frwr->fr_invwr; + last = &mr->mr_invwr; last->next = NULL; - last->wr_cqe = &frwr->fr_cqe; + last->wr_cqe = &mr->mr_cqe; last->sg_list = NULL; last->num_sge = 0; last->opcode = IB_WR_LOCAL_INV; last->send_flags = IB_SEND_SIGNALED; last->ex.invalidate_rkey = mr->mr_handle; + last->wr_cqe->done = frwr_wc_localinv; + *prev = last; prev = &last->next; } @@ -655,31 +634,23 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * are complete. The last completion will wake up the * RPC waiter. */ - frwr->fr_cqe.done = frwr_wc_localinv_done; + last->wr_cqe->done = frwr_wc_localinv_done; /* Transport disconnect drains the receive CQ before it * replaces the QP. The RPC reply handler won't call us * unless re_id->qp is a valid pointer. */ - bad_wr = NULL; - rc = ib_post_send(ep->re_id->qp, first, &bad_wr); + rc = ib_post_send(ep->re_id->qp, first, NULL); if (!rc) return; - /* Recycle MRs in the LOCAL_INV chain that did not get posted. - */ + /* On error, the MRs get destroyed once the QP has drained. */ trace_xprtrdma_post_linv_err(req, rc); - while (bad_wr) { - frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr); - mr = container_of(frwr, struct rpcrdma_mr, frwr); - bad_wr = bad_wr->next; - - frwr_mr_recycle(mr); - } /* The final LOCAL_INV WR in the chain is supposed to - * do the wake. If it was never posted, the wake will - * not happen, so wake here in that case. + * do the wake. If it was never posted, the wake does + * not happen. Unpin the rqst in preparation for its + * retransmission. */ - rpcrdma_complete_rqst(req->rl_reply); + rpcrdma_unpin_rqst(req->rl_reply); } diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 292f066d006e..649f7d8b9733 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1326,9 +1326,35 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, return -EIO; } -/* Perform XID lookup, reconstruction of the RPC reply, and - * RPC completion while holding the transport lock to ensure - * the rep, rqst, and rq_task pointers remain stable. +/** + * rpcrdma_unpin_rqst - Release rqst without completing it + * @rep: RPC/RDMA Receive context + * + * This is done when a connection is lost so that a Reply + * can be dropped and its matching Call can be subsequently + * retransmitted on a new connection. + */ +void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep) +{ + struct rpc_xprt *xprt = &rep->rr_rxprt->rx_xprt; + struct rpc_rqst *rqst = rep->rr_rqst; + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + + req->rl_reply = NULL; + rep->rr_rqst = NULL; + + spin_lock(&xprt->queue_lock); + xprt_unpin_rqst(rqst); + spin_unlock(&xprt->queue_lock); +} + +/** + * rpcrdma_complete_rqst - Pass completed rqst back to RPC + * @rep: RPC/RDMA Receive context + * + * Reconstruct the RPC reply and complete the transaction + * while @rqst is still pinned to ensure the rep, rqst, and + * rq_task pointers remain stable. */ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) { @@ -1430,13 +1456,14 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) credits = 1; /* don't deadlock */ else if (credits > r_xprt->rx_ep->re_max_requests) credits = r_xprt->rx_ep->re_max_requests; + rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1), + false); if (buf->rb_credits != credits) rpcrdma_update_cwnd(r_xprt, credits); - rpcrdma_post_recvs(r_xprt, false); req = rpcr_to_rdmar(rqst); if (unlikely(req->rl_reply)) - rpcrdma_recv_buffer_put(req->rl_reply); + rpcrdma_rep_put(buf, req->rl_reply); req->rl_reply = rep; rep->rr_rqst = rqst; @@ -1464,5 +1491,5 @@ out_shortreply: trace_xprtrdma_reply_short_err(rep); out: - rpcrdma_recv_buffer_put(rep); + rpcrdma_rep_put(buf, rep); } diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 78d29d1bcc20..09953597d055 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -262,8 +262,10 @@ xprt_rdma_connect_worker(struct work_struct *work) * xprt_rdma_inject_disconnect - inject a connection fault * @xprt: transport context * - * If @xprt is connected, disconnect it to simulate spurious connection - * loss. + * If @xprt is connected, disconnect it to simulate spurious + * connection loss. Caller must hold @xprt's send lock to + * ensure that data structures and hardware resources are + * stable during the rdma_disconnect() call. */ static void xprt_rdma_inject_disconnect(struct rpc_xprt *xprt) diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index ec912cf9c618..1e965a380896 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -101,6 +101,12 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) struct rpcrdma_ep *ep = r_xprt->rx_ep; struct rdma_cm_id *id = ep->re_id; + /* Wait for rpcrdma_post_recvs() to leave its critical + * section. + */ + if (atomic_inc_return(&ep->re_receiving) > 1) + wait_for_completion(&ep->re_done); + /* Flush Receives, then wait for deferred Reply work * to complete. */ @@ -114,22 +120,6 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) rpcrdma_ep_put(ep); } -/** - * rpcrdma_qp_event_handler - Handle one QP event (error notification) - * @event: details of the event - * @context: ep that owns QP where event occurred - * - * Called from the RDMA provider (device driver) possibly in an interrupt - * context. The QP is always destroyed before the ID, so the ID will be - * reliably available when this handler is invoked. - */ -static void rpcrdma_qp_event_handler(struct ib_event *event, void *context) -{ - struct rpcrdma_ep *ep = context; - - trace_xprtrdma_qp_event(ep, event); -} - /* Ensure xprt_force_disconnect() is invoked exactly once when a * connection is closed or lost. (The important thing is it needs * to be invoked "at least" once). @@ -205,7 +195,7 @@ static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) out_flushed: rpcrdma_flush_disconnect(r_xprt, wc); - rpcrdma_rep_destroy(rep); + rpcrdma_rep_put(&r_xprt->rx_buf, rep); } static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep, @@ -414,6 +404,7 @@ static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) __module_get(THIS_MODULE); device = id->device; ep->re_id = id; + reinit_completion(&ep->re_done); ep->re_max_requests = r_xprt->rx_xprt.max_reqs; ep->re_inline_send = xprt_rdma_max_inline_write; @@ -424,8 +415,6 @@ static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests); - ep->re_attr.event_handler = rpcrdma_qp_event_handler; - ep->re_attr.qp_context = ep; ep->re_attr.srq = NULL; ep->re_attr.cap.max_inline_data = 0; ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR; @@ -535,7 +524,7 @@ int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt) * outstanding Receives. */ rpcrdma_ep_get(ep); - rpcrdma_post_recvs(r_xprt, true); + rpcrdma_post_recvs(r_xprt, 1, true); rc = rdma_connect(ep->re_id, &ep->re_remote_cma); if (rc) @@ -954,13 +943,11 @@ static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt) rpcrdma_req_reset(req); } -/* No locking needed here. This function is called only by the - * Receive completion handler. - */ static noinline struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) { + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_rep *rep; rep = kzalloc(sizeof(*rep), GFP_KERNEL); @@ -987,7 +974,10 @@ struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; rep->rr_recv_wr.num_sge = 1; rep->rr_temp = temp; - list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps); + + spin_lock(&buf->rb_lock); + list_add(&rep->rr_all, &buf->rb_all_reps); + spin_unlock(&buf->rb_lock); return rep; out_free_regbuf: @@ -998,16 +988,23 @@ out: return NULL; } -/* No locking needed here. This function is invoked only by the - * Receive completion handler, or during transport shutdown. - */ -static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) +static void rpcrdma_rep_free(struct rpcrdma_rep *rep) { - list_del(&rep->rr_all); rpcrdma_regbuf_free(rep->rr_rdmabuf); kfree(rep); } +static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) +{ + struct rpcrdma_buffer *buf = &rep->rr_rxprt->rx_buf; + + spin_lock(&buf->rb_lock); + list_del(&rep->rr_all); + spin_unlock(&buf->rb_lock); + + rpcrdma_rep_free(rep); +} + static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf) { struct llist_node *node; @@ -1019,12 +1016,21 @@ static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf) return llist_entry(node, struct rpcrdma_rep, rr_node); } -static void rpcrdma_rep_put(struct rpcrdma_buffer *buf, - struct rpcrdma_rep *rep) +/** + * rpcrdma_rep_put - Release rpcrdma_rep back to free list + * @buf: buffer pool + * @rep: rep to release + * + */ +void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep) { llist_add(&rep->rr_node, &buf->rb_free_reps); } +/* Caller must ensure the QP is quiescent (RQ is drained) before + * invoking this function, to guarantee rb_all_reps is not + * changing. + */ static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; @@ -1032,7 +1038,7 @@ static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt) list_for_each_entry(rep, &buf->rb_all_reps, rr_all) { rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf); - rep->rr_temp = true; + rep->rr_temp = true; /* Mark this rep for destruction */ } } @@ -1040,8 +1046,18 @@ static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf) { struct rpcrdma_rep *rep; - while ((rep = rpcrdma_rep_get_locked(buf)) != NULL) - rpcrdma_rep_destroy(rep); + spin_lock(&buf->rb_lock); + while ((rep = list_first_entry_or_null(&buf->rb_all_reps, + struct rpcrdma_rep, + rr_all)) != NULL) { + list_del(&rep->rr_all); + spin_unlock(&buf->rb_lock); + + rpcrdma_rep_free(rep); + + spin_lock(&buf->rb_lock); + } + spin_unlock(&buf->rb_lock); } /** @@ -1104,7 +1120,7 @@ void rpcrdma_req_destroy(struct rpcrdma_req *req) list_del(&mr->mr_all); spin_unlock(&buf->rb_lock); - frwr_release_mr(mr); + frwr_mr_release(mr); } rpcrdma_regbuf_free(req->rl_recvbuf); @@ -1135,7 +1151,7 @@ static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt) list_del(&mr->mr_all); spin_unlock(&buf->rb_lock); - frwr_release_mr(mr); + frwr_mr_release(mr); spin_lock(&buf->rb_lock); } @@ -1221,17 +1237,6 @@ void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) spin_unlock(&buffers->rb_lock); } -/** - * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list - * @rep: rep to release - * - * Used after error conditions. - */ -void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) -{ - rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep); -} - /* Returns a pointer to a rpcrdma_regbuf object, or NULL. * * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for @@ -1342,21 +1347,7 @@ static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb) */ int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { - struct ib_send_wr *send_wr = &req->rl_wr; - struct rpcrdma_ep *ep = r_xprt->rx_ep; - int rc; - - if (!ep->re_send_count || kref_read(&req->rl_kref) > 1) { - send_wr->send_flags |= IB_SEND_SIGNALED; - ep->re_send_count = ep->re_send_batch; - } else { - send_wr->send_flags &= ~IB_SEND_SIGNALED; - --ep->re_send_count; - } - - trace_xprtrdma_post_send(req); - rc = frwr_send(r_xprt, req); - if (rc) + if (frwr_send(r_xprt, req)) return -ENOTCONN; return 0; } @@ -1364,27 +1355,30 @@ int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) /** * rpcrdma_post_recvs - Refill the Receive Queue * @r_xprt: controlling transport instance - * @temp: mark Receive buffers to be deleted after use + * @needed: current credit grant + * @temp: mark Receive buffers to be deleted after one use * */ -void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) +void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ep *ep = r_xprt->rx_ep; struct ib_recv_wr *wr, *bad_wr; struct rpcrdma_rep *rep; - int needed, count, rc; + int count, rc; rc = 0; count = 0; - needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); if (likely(ep->re_receive_count > needed)) goto out; needed -= ep->re_receive_count; if (!temp) needed += RPCRDMA_MAX_RECV_BATCH; + if (atomic_inc_return(&ep->re_receiving) > 1) + goto out; + /* fast path: all needed reps can be found on the free list */ wr = NULL; while (needed) { @@ -1410,6 +1404,9 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) rc = ib_post_recv(ep->re_id->qp, wr, (const struct ib_recv_wr **)&bad_wr); + if (atomic_dec_return(&ep->re_receiving) > 0) + complete(&ep->re_done); + out: trace_xprtrdma_post_recvs(r_xprt, count, rc); if (rc) { @@ -1418,7 +1415,7 @@ out: rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr); wr = wr->next; - rpcrdma_recv_buffer_put(rep); + rpcrdma_rep_put(buf, rep); --count; } } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index fe3be985e239..436ad7312614 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -83,6 +83,7 @@ struct rpcrdma_ep { unsigned int re_max_inline_recv; int re_async_rc; int re_connect_status; + atomic_t re_receiving; atomic_t re_force_disconnect; struct ib_qp_init_attr re_attr; wait_queue_head_t re_connect_wait; @@ -228,31 +229,28 @@ struct rpcrdma_sendctx { * An external memory region is any buffer or page that is registered * on the fly (ie, not pre-registered). */ -struct rpcrdma_frwr { - struct ib_mr *fr_mr; - struct ib_cqe fr_cqe; - struct rpc_rdma_cid fr_cid; - struct completion fr_linv_done; - union { - struct ib_reg_wr fr_regwr; - struct ib_send_wr fr_invwr; - }; -}; - struct rpcrdma_req; struct rpcrdma_mr { struct list_head mr_list; struct rpcrdma_req *mr_req; + + struct ib_mr *mr_ibmr; struct ib_device *mr_device; struct scatterlist *mr_sg; int mr_nents; enum dma_data_direction mr_dir; - struct rpcrdma_frwr frwr; + struct ib_cqe mr_cqe; + struct completion mr_linv_done; + union { + struct ib_reg_wr mr_regwr; + struct ib_send_wr mr_invwr; + }; struct rpcrdma_xprt *mr_xprt; u32 mr_handle; u32 mr_length; u64 mr_offset; struct list_head mr_all; + struct rpc_rdma_cid mr_cid; }; /* @@ -461,7 +459,7 @@ int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt); void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt); int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); -void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp); +void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp); /* * Buffer calls - xprtrdma/verbs.c @@ -480,7 +478,7 @@ void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt); struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req); -void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); +void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep); bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags); @@ -527,7 +525,7 @@ rpcrdma_data_dir(bool writing) void frwr_reset(struct rpcrdma_req *req); int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device); int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr); -void frwr_release_mr(struct rpcrdma_mr *mr); +void frwr_mr_release(struct rpcrdma_mr *mr); struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int nsegs, bool writing, __be32 xid, @@ -560,6 +558,7 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep); void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt); void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); +void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep); void rpcrdma_reply_handler(struct rpcrdma_rep *rep); static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index e35760f238a4..47aa47a2b07c 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -558,6 +558,10 @@ xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) struct rpc_rqst *req; ssize_t ret; + /* Is this transport associated with the backchannel? */ + if (!xprt->bc_serv) + return -ESHUTDOWN; + /* Look up and lock the request corresponding to the given XID */ req = xprt_lookup_bc_request(xprt, transport->recv.xid); if (!req) { @@ -1018,6 +1022,7 @@ static int xs_tcp_send_request(struct rpc_rqst *req) * to cope with writespace callbacks arriving _after_ we have * called sendmsg(). */ req->rq_xtime = ktime_get(); + tcp_sock_set_cork(transport->inet, true); while (1) { status = xprt_sock_sendmsg(transport->sock, &msg, xdr, transport->xmit.offset, rm, &sent); @@ -1032,6 +1037,8 @@ static int xs_tcp_send_request(struct rpc_rqst *req) if (likely(req->rq_bytes_sent >= msglen)) { req->rq_xmit_bytes_sent += transport->xmit.offset; transport->xmit.offset = 0; + if (atomic_long_read(&xprt->xmit_queuelen) == 1) + tcp_sock_set_cork(transport->inet, false); return 0; } @@ -2163,6 +2170,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) } xs_tcp_set_socket_timeouts(xprt, sock); + tcp_sock_set_nodelay(sk); write_lock_bh(&sk->sk_callback_lock); @@ -2177,7 +2185,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) /* socket options */ sock_reset_flag(sk, SOCK_LINGER); - tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; xprt_clear_connected(xprt); |