diff options
Diffstat (limited to 'net/sunrpc/xprtrdma/svc_rdma_rw.c')
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_rw.c | 245 |
1 files changed, 167 insertions, 78 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c index c00fcce61d1e..f2a100c4c81f 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -197,28 +197,6 @@ void svc_rdma_cc_release(struct svcxprt_rdma *rdma, llist_add_batch(first, last, &rdma->sc_rw_ctxts); } -/* State for sending a Write or Reply chunk. - * - Tracks progress of writing one chunk over all its segments - * - Stores arguments for the SGL constructor functions - */ -struct svc_rdma_write_info { - struct svcxprt_rdma *wi_rdma; - - const struct svc_rdma_chunk *wi_chunk; - - /* write state of this chunk */ - unsigned int wi_seg_off; - unsigned int wi_seg_no; - - /* SGL constructor arguments */ - const struct xdr_buf *wi_xdr; - unsigned char *wi_base; - unsigned int wi_next_off; - - struct svc_rdma_chunk_ctxt wi_cc; - struct work_struct wi_work; -}; - static struct svc_rdma_write_info * svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, const struct svc_rdma_chunk *chunk) @@ -253,6 +231,71 @@ static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) } /** + * svc_rdma_write_chunk_release - Release Write chunk I/O resources + * @rdma: controlling transport + * @ctxt: Send context that is being released + */ +void svc_rdma_write_chunk_release(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) +{ + struct svc_rdma_write_info *info; + struct svc_rdma_chunk_ctxt *cc; + + while (!list_empty(&ctxt->sc_write_info_list)) { + info = list_first_entry(&ctxt->sc_write_info_list, + struct svc_rdma_write_info, wi_list); + list_del(&info->wi_list); + + cc = &info->wi_cc; + svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); + svc_rdma_write_info_free(info); + } +} + +/** + * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources + * @rdma: controlling transport + * @ctxt: Send context that is being released + */ +void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) +{ + struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc; + + if (!cc->cc_sqecount) + return; + svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE); +} + +/** + * svc_rdma_reply_done - Reply chunk Write completion handler + * @cq: controlling Completion Queue + * @wc: Work Completion report + * + * Pages under I/O are released by a subsequent Send completion. + */ +static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct svc_rdma_chunk_ctxt *cc = + container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); + struct svcxprt_rdma *rdma = cq->cq_context; + + switch (wc->status) { + case IB_WC_SUCCESS: + trace_svcrdma_wc_reply(&cc->cc_cid); + return; + case IB_WC_WR_FLUSH_ERR: + trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid); + break; + default: + trace_svcrdma_wc_reply_err(wc, &cc->cc_cid); + } + + svc_xprt_deferred_close(&rdma->sc_xprt); +} + +/** * svc_rdma_write_done - Write chunk completion * @cq: controlling Completion Queue * @wc: Work Completion @@ -265,13 +308,11 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) struct ib_cqe *cqe = wc->wr_cqe; struct svc_rdma_chunk_ctxt *cc = container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); - struct svc_rdma_write_info *info = - container_of(cc, struct svc_rdma_write_info, wi_cc); switch (wc->status) { case IB_WC_SUCCESS: trace_svcrdma_wc_write(&cc->cc_cid); - break; + return; case IB_WC_WR_FLUSH_ERR: trace_svcrdma_wc_write_flush(wc, &cc->cc_cid); break; @@ -279,12 +320,11 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) trace_svcrdma_wc_write_err(wc, &cc->cc_cid); } - svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); - - if (unlikely(wc->status != IB_WC_SUCCESS)) - svc_xprt_deferred_close(&rdma->sc_xprt); - - svc_rdma_write_info_free(info); + /* The RDMA Write has flushed, so the client won't get + * some of the outgoing RPC message. Signal the loss + * to the client by closing the connection. + */ + svc_xprt_deferred_close(&rdma->sc_xprt); } /** @@ -580,41 +620,54 @@ static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data) return xdr->len; } -/** - * svc_rdma_send_write_chunk - Write all segments in a Write chunk - * @rdma: controlling RDMA transport - * @chunk: Write chunk provided by the client - * @xdr: xdr_buf containing the data payload - * - * Returns a non-negative number of bytes the chunk consumed, or - * %-E2BIG if the payload was larger than the Write chunk, - * %-EINVAL if client provided too many segments, - * %-ENOMEM if rdma_rw context pool was exhausted, - * %-ENOTCONN if posting failed (connection is lost), - * %-EIO if rdma_rw initialization failed (DMA mapping, etc). +/* Link Write WRs for @chunk onto @sctxt's WR chain. */ -int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, - const struct svc_rdma_chunk *chunk, - const struct xdr_buf *xdr) +static int svc_rdma_prepare_write_chunk(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *sctxt, + const struct svc_rdma_chunk *chunk, + const struct xdr_buf *xdr) { struct svc_rdma_write_info *info; struct svc_rdma_chunk_ctxt *cc; + struct ib_send_wr *first_wr; + struct xdr_buf payload; + struct list_head *pos; + struct ib_cqe *cqe; int ret; + if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position, + chunk->ch_payload_length)) + return -EMSGSIZE; + info = svc_rdma_write_info_alloc(rdma, chunk); if (!info) return -ENOMEM; cc = &info->wi_cc; - ret = svc_rdma_xb_write(xdr, info); - if (ret != xdr->len) + ret = svc_rdma_xb_write(&payload, info); + if (ret != payload.len) goto out_err; - trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); - ret = svc_rdma_post_chunk_ctxt(rdma, cc); - if (ret < 0) + ret = -EINVAL; + if (unlikely(cc->cc_sqecount > rdma->sc_sq_depth)) goto out_err; - return xdr->len; + + first_wr = sctxt->sc_wr_chain; + cqe = &cc->cc_cqe; + list_for_each(pos, &cc->cc_rwctxts) { + struct svc_rdma_rw_ctxt *rwc; + + rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list); + first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp, + rdma->sc_port_num, cqe, first_wr); + cqe = NULL; + } + sctxt->sc_wr_chain = first_wr; + sctxt->sc_sqecount += cc->cc_sqecount; + list_add(&info->wi_list, &sctxt->sc_write_info_list); + + trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); + return 0; out_err: svc_rdma_write_info_free(info); @@ -622,9 +675,39 @@ out_err: } /** - * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk + * svc_rdma_prepare_write_list - Construct WR chain for sending Write list * @rdma: controlling RDMA transport - * @rctxt: Write and Reply chunks from client + * @write_pcl: Write list provisioned by the client + * @sctxt: Send WR resources + * @xdr: xdr_buf containing an RPC Reply message + * + * Returns zero on success, or a negative errno if one or more + * Write chunks could not be sent. + */ +int svc_rdma_prepare_write_list(struct svcxprt_rdma *rdma, + const struct svc_rdma_pcl *write_pcl, + struct svc_rdma_send_ctxt *sctxt, + const struct xdr_buf *xdr) +{ + struct svc_rdma_chunk *chunk; + int ret; + + pcl_for_each_chunk(chunk, write_pcl) { + if (!chunk->ch_payload_length) + break; + ret = svc_rdma_prepare_write_chunk(rdma, sctxt, chunk, xdr); + if (ret < 0) + return ret; + } + return 0; +} + +/** + * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk + * @rdma: controlling RDMA transport + * @write_pcl: Write chunk list provided by client + * @reply_pcl: Reply chunk provided by client + * @sctxt: Send WR resources * @xdr: xdr_buf containing an RPC Reply * * Returns a non-negative number of bytes the chunk consumed, or @@ -634,39 +717,45 @@ out_err: * %-ENOTCONN if posting failed (connection is lost), * %-EIO if rdma_rw initialization failed (DMA mapping, etc). */ -int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, - const struct svc_rdma_recv_ctxt *rctxt, - const struct xdr_buf *xdr) +int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma, + const struct svc_rdma_pcl *write_pcl, + const struct svc_rdma_pcl *reply_pcl, + struct svc_rdma_send_ctxt *sctxt, + const struct xdr_buf *xdr) { - struct svc_rdma_write_info *info; - struct svc_rdma_chunk_ctxt *cc; - struct svc_rdma_chunk *chunk; + struct svc_rdma_write_info *info = &sctxt->sc_reply_info; + struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; + struct ib_send_wr *first_wr; + struct list_head *pos; + struct ib_cqe *cqe; int ret; - if (pcl_is_empty(&rctxt->rc_reply_pcl)) - return 0; - - chunk = pcl_first_chunk(&rctxt->rc_reply_pcl); - info = svc_rdma_write_info_alloc(rdma, chunk); - if (!info) - return -ENOMEM; - cc = &info->wi_cc; + info->wi_rdma = rdma; + info->wi_chunk = pcl_first_chunk(reply_pcl); + info->wi_seg_off = 0; + info->wi_seg_no = 0; + info->wi_cc.cc_cqe.done = svc_rdma_reply_done; - ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, + ret = pcl_process_nonpayloads(write_pcl, xdr, svc_rdma_xb_write, info); if (ret < 0) - goto out_err; + return ret; - trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); - ret = svc_rdma_post_chunk_ctxt(rdma, cc); - if (ret < 0) - goto out_err; + first_wr = sctxt->sc_wr_chain; + cqe = &cc->cc_cqe; + list_for_each(pos, &cc->cc_rwctxts) { + struct svc_rdma_rw_ctxt *rwc; - return xdr->len; + rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list); + first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp, + rdma->sc_port_num, cqe, first_wr); + cqe = NULL; + } + sctxt->sc_wr_chain = first_wr; + sctxt->sc_sqecount += cc->cc_sqecount; -out_err: - svc_rdma_write_info_free(info); - return ret; + trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); + return xdr->len; } /** |