diff options
Diffstat (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c')
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 363 |
1 files changed, 225 insertions, 138 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index f1889f4d4803..ed34dc0f144c 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1,4 +1,5 @@ /* + * Copyright (c) 2014-2017 Oracle. All rights reserved. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two @@ -75,11 +76,11 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) /* Maximum Read list size */ maxsegs += 2; /* segment for head and tail buffers */ - size = maxsegs * sizeof(struct rpcrdma_read_chunk); + size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32); /* Minimal Read chunk size */ size += sizeof(__be32); /* segment count */ - size += sizeof(struct rpcrdma_segment); + size += rpcrdma_segment_maxsz * sizeof(__be32); size += sizeof(__be32); /* list discriminator */ dprintk("RPC: %s: max call header size = %u\n", @@ -102,7 +103,7 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) /* Maximum Write list size */ maxsegs += 2; /* segment for head and tail buffers */ size = sizeof(__be32); /* segment count */ - size += maxsegs * sizeof(struct rpcrdma_segment); + size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32); size += sizeof(__be32); /* list discriminator */ dprintk("RPC: %s: max reply header size = %u\n", @@ -511,27 +512,60 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, return 0; } -/* Prepare the RPC-over-RDMA header SGE. +/** + * rpcrdma_unmap_sendctx - DMA-unmap Send buffers + * @sc: sendctx containing SGEs to unmap + * + */ +void +rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc) +{ + struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia; + struct ib_sge *sge; + unsigned int count; + + dprintk("RPC: %s: unmapping %u sges for sc=%p\n", + __func__, sc->sc_unmap_count, sc); + + /* The first two SGEs contain the transport header and + * the inline buffer. These are always left mapped so + * they can be cheaply re-used. + */ + sge = &sc->sc_sges[2]; + for (count = sc->sc_unmap_count; count; ++sge, --count) + ib_dma_unmap_page(ia->ri_device, + sge->addr, sge->length, DMA_TO_DEVICE); + + if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) { + smp_mb__after_atomic(); + wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES); + } +} + +/* Prepare an SGE for the RPC-over-RDMA transport header. */ static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req, u32 len) { + struct rpcrdma_sendctx *sc = req->rl_sendctx; struct rpcrdma_regbuf *rb = req->rl_rdmabuf; - struct ib_sge *sge = &req->rl_send_sge[0]; + struct ib_sge *sge = sc->sc_sges; - if (unlikely(!rpcrdma_regbuf_is_mapped(rb))) { - if (!__rpcrdma_dma_map_regbuf(ia, rb)) - return false; - sge->addr = rdmab_addr(rb); - sge->lkey = rdmab_lkey(rb); - } + if (!rpcrdma_dma_map_regbuf(ia, rb)) + goto out_regbuf; + sge->addr = rdmab_addr(rb); sge->length = len; + sge->lkey = rdmab_lkey(rb); ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, DMA_TO_DEVICE); - req->rl_send_wr.num_sge++; + sc->sc_wr.num_sge++; return true; + +out_regbuf: + pr_err("rpcrdma: failed to DMA map a Send buffer\n"); + return false; } /* Prepare the Send SGEs. The head and tail iovec, and each entry @@ -541,10 +575,11 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) { + struct rpcrdma_sendctx *sc = req->rl_sendctx; unsigned int sge_no, page_base, len, remaining; struct rpcrdma_regbuf *rb = req->rl_sendbuf; struct ib_device *device = ia->ri_device; - struct ib_sge *sge = req->rl_send_sge; + struct ib_sge *sge = sc->sc_sges; u32 lkey = ia->ri_pd->local_dma_lkey; struct page *page, **ppages; @@ -552,7 +587,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, * DMA-mapped. Sync the content that has changed. */ if (!rpcrdma_dma_map_regbuf(ia, rb)) - return false; + goto out_regbuf; sge_no = 1; sge[sge_no].addr = rdmab_addr(rb); sge[sge_no].length = xdr->head[0].iov_len; @@ -607,7 +642,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, sge[sge_no].length = len; sge[sge_no].lkey = lkey; - req->rl_mapped_sges++; + sc->sc_unmap_count++; ppages++; remaining -= len; page_base = 0; @@ -633,56 +668,61 @@ map_tail: goto out_mapping_err; sge[sge_no].length = len; sge[sge_no].lkey = lkey; - req->rl_mapped_sges++; + sc->sc_unmap_count++; } out: - req->rl_send_wr.num_sge = sge_no + 1; + sc->sc_wr.num_sge += sge_no; + if (sc->sc_unmap_count) + __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); return true; +out_regbuf: + pr_err("rpcrdma: failed to DMA map a Send buffer\n"); + return false; + out_mapping_overflow: + rpcrdma_unmap_sendctx(sc); pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no); return false; out_mapping_err: + rpcrdma_unmap_sendctx(sc); pr_err("rpcrdma: Send mapping error\n"); return false; } -bool -rpcrdma_prepare_send_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, - u32 hdrlen, struct xdr_buf *xdr, - enum rpcrdma_chunktype rtype) +/** + * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR + * @r_xprt: controlling transport + * @req: context of RPC Call being marshalled + * @hdrlen: size of transport header, in bytes + * @xdr: xdr_buf containing RPC Call + * @rtype: chunk type being encoded + * + * Returns 0 on success; otherwise a negative errno is returned. + */ +int +rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, u32 hdrlen, + struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) { - req->rl_send_wr.num_sge = 0; - req->rl_mapped_sges = 0; - - if (!rpcrdma_prepare_hdr_sge(ia, req, hdrlen)) - goto out_map; + req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf); + if (!req->rl_sendctx) + return -ENOBUFS; + req->rl_sendctx->sc_wr.num_sge = 0; + req->rl_sendctx->sc_unmap_count = 0; + req->rl_sendctx->sc_req = req; + __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); + + if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen)) + return -EIO; if (rtype != rpcrdma_areadch) - if (!rpcrdma_prepare_msg_sges(ia, req, xdr, rtype)) - goto out_map; - - return true; - -out_map: - pr_err("rpcrdma: failed to DMA map a Send buffer\n"); - return false; -} - -void -rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req) -{ - struct ib_device *device = ia->ri_device; - struct ib_sge *sge; - int count; + if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype)) + return -EIO; - sge = &req->rl_send_sge[2]; - for (count = req->rl_mapped_sges; count--; sge++) - ib_dma_unmap_page(device, sge->addr, sge->length, - DMA_TO_DEVICE); - req->rl_mapped_sges = 0; + return 0; } /** @@ -833,12 +873,10 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) transfertypes[rtype], transfertypes[wtype], xdr_stream_pos(xdr)); - if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, - xdr_stream_pos(xdr), - &rqst->rq_snd_buf, rtype)) { - ret = -EIO; + ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr), + &rqst->rq_snd_buf, rtype); + if (ret) goto out_err; - } return 0; out_err: @@ -970,14 +1008,13 @@ rpcrdma_mark_remote_invalidation(struct list_head *mws, * straightforward to check the RPC header's direction field. */ static bool -rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, - __be32 xid, __be32 proc) +rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) #if defined(CONFIG_SUNRPC_BACKCHANNEL) { struct xdr_stream *xdr = &rep->rr_stream; __be32 *p; - if (proc != rdma_msg) + if (rep->rr_proc != rdma_msg) return false; /* Peek at stream contents without advancing. */ @@ -992,7 +1029,7 @@ rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, return false; /* RPC header */ - if (*p++ != xid) + if (*p++ != rep->rr_xid) return false; if (*p != cpu_to_be32(RPC_CALL)) return false; @@ -1212,105 +1249,170 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, return -EREMOTEIO; } +/* Perform XID lookup, reconstruction of the RPC reply, and + * RPC completion while holding the transport lock to ensure + * the rep, rqst, and rq_task pointers remain stable. + */ +void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) +{ + struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; + struct rpc_xprt *xprt = &r_xprt->rx_xprt; + struct rpc_rqst *rqst = rep->rr_rqst; + unsigned long cwnd; + int status; + + xprt->reestablish_timeout = 0; + + switch (rep->rr_proc) { + case rdma_msg: + status = rpcrdma_decode_msg(r_xprt, rep, rqst); + break; + case rdma_nomsg: + status = rpcrdma_decode_nomsg(r_xprt, rep); + break; + case rdma_error: + status = rpcrdma_decode_error(r_xprt, rep, rqst); + break; + default: + status = -EIO; + } + if (status < 0) + goto out_badheader; + +out: + spin_lock(&xprt->recv_lock); + cwnd = xprt->cwnd; + xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT; + if (xprt->cwnd > cwnd) + xprt_release_rqst_cong(rqst->rq_task); + + xprt_complete_rqst(rqst->rq_task, status); + xprt_unpin_rqst(rqst); + spin_unlock(&xprt->recv_lock); + return; + +/* If the incoming reply terminated a pending RPC, the next + * RPC call will post a replacement receive buffer as it is + * being marshaled. + */ +out_badheader: + dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n", + rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc)); + r_xprt->rx_stats.bad_reply_count++; + status = -EIO; + goto out; +} + +void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) +{ + /* Invalidate and unmap the data payloads before waking + * the waiting application. This guarantees the memory + * regions are properly fenced from the server before the + * application accesses the data. It also ensures proper + * send flow control: waking the next RPC waits until this + * RPC has relinquished all its Send Queue entries. + */ + if (!list_empty(&req->rl_registered)) + r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, + &req->rl_registered); + + /* Ensure that any DMA mapped pages associated with + * the Send of the RPC Call have been unmapped before + * allowing the RPC to complete. This protects argument + * memory not controlled by the RPC client from being + * re-used before we're done with it. + */ + if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { + r_xprt->rx_stats.reply_waits_for_send++; + out_of_line_wait_on_bit(&req->rl_flags, + RPCRDMA_REQ_F_TX_RESOURCES, + bit_wait, + TASK_UNINTERRUPTIBLE); + } +} + +/* Reply handling runs in the poll worker thread. Anything that + * might wait is deferred to a separate workqueue. + */ +void rpcrdma_deferred_completion(struct work_struct *work) +{ + struct rpcrdma_rep *rep = + container_of(work, struct rpcrdma_rep, rr_work); + struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst); + + rpcrdma_mark_remote_invalidation(&req->rl_registered, rep); + rpcrdma_release_rqst(rep->rr_rxprt, req); + rpcrdma_complete_rqst(rep); +} + /* Process received RPC/RDMA messages. * * Errors must result in the RPC task either being awakened, or * allowed to timeout, to discover the errors at that time. */ -void -rpcrdma_reply_handler(struct work_struct *work) +void rpcrdma_reply_handler(struct rpcrdma_rep *rep) { - struct rpcrdma_rep *rep = - container_of(work, struct rpcrdma_rep, rr_work); struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; struct rpc_xprt *xprt = &r_xprt->rx_xprt; - struct xdr_stream *xdr = &rep->rr_stream; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_req *req; struct rpc_rqst *rqst; - __be32 *p, xid, vers, proc; - unsigned long cwnd; - int status; + u32 credits; + __be32 *p; dprintk("RPC: %s: incoming rep %p\n", __func__, rep); if (rep->rr_hdrbuf.head[0].iov_len == 0) goto out_badstatus; - xdr_init_decode(xdr, &rep->rr_hdrbuf, + xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, rep->rr_hdrbuf.head[0].iov_base); /* Fixed transport header fields */ - p = xdr_inline_decode(xdr, 4 * sizeof(*p)); + p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p)); if (unlikely(!p)) goto out_shortreply; - xid = *p++; - vers = *p++; - p++; /* credits */ - proc = *p++; + rep->rr_xid = *p++; + rep->rr_vers = *p++; + credits = be32_to_cpu(*p++); + rep->rr_proc = *p++; + + if (rep->rr_vers != rpcrdma_version) + goto out_badversion; - if (rpcrdma_is_bcall(r_xprt, rep, xid, proc)) + if (rpcrdma_is_bcall(r_xprt, rep)) return; /* Match incoming rpcrdma_rep to an rpcrdma_req to * get context for handling any incoming chunks. */ spin_lock(&xprt->recv_lock); - rqst = xprt_lookup_rqst(xprt, xid); + rqst = xprt_lookup_rqst(xprt, rep->rr_xid); if (!rqst) goto out_norqst; xprt_pin_rqst(rqst); + + if (credits == 0) + credits = 1; /* don't deadlock */ + else if (credits > buf->rb_max_requests) + credits = buf->rb_max_requests; + buf->rb_credits = credits; + spin_unlock(&xprt->recv_lock); + req = rpcr_to_rdmar(rqst); req->rl_reply = rep; + rep->rr_rqst = rqst; + clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", - __func__, rep, req, be32_to_cpu(xid)); - - /* Invalidate and unmap the data payloads before waking the - * waiting application. This guarantees the memory regions - * are properly fenced from the server before the application - * accesses the data. It also ensures proper send flow control: - * waking the next RPC waits until this RPC has relinquished - * all its Send Queue entries. - */ - if (!list_empty(&req->rl_registered)) { - rpcrdma_mark_remote_invalidation(&req->rl_registered, rep); - r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, - &req->rl_registered); - } - - xprt->reestablish_timeout = 0; - if (vers != rpcrdma_version) - goto out_badversion; + __func__, rep, req, be32_to_cpu(rep->rr_xid)); - switch (proc) { - case rdma_msg: - status = rpcrdma_decode_msg(r_xprt, rep, rqst); - break; - case rdma_nomsg: - status = rpcrdma_decode_nomsg(r_xprt, rep); - break; - case rdma_error: - status = rpcrdma_decode_error(r_xprt, rep, rqst); - break; - default: - status = -EIO; - } - if (status < 0) - goto out_badheader; - -out: - spin_lock(&xprt->recv_lock); - cwnd = xprt->cwnd; - xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; - if (xprt->cwnd > cwnd) - xprt_release_rqst_cong(rqst->rq_task); - - xprt_complete_rqst(rqst->rq_task, status); - xprt_unpin_rqst(rqst); - spin_unlock(&xprt->recv_lock); - dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", - __func__, xprt, rqst, status); + if (list_empty(&req->rl_registered) && + !test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) + rpcrdma_complete_rqst(rep); + else + queue_work(rpcrdma_receive_wq, &rep->rr_work); return; out_badstatus: @@ -1321,37 +1423,22 @@ out_badstatus: } return; -/* If the incoming reply terminated a pending RPC, the next - * RPC call will post a replacement receive buffer as it is - * being marshaled. - */ out_badversion: dprintk("RPC: %s: invalid version %d\n", - __func__, be32_to_cpu(vers)); - status = -EIO; - r_xprt->rx_stats.bad_reply_count++; - goto out; - -out_badheader: - dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n", - rqst->rq_task->tk_pid, __func__, be32_to_cpu(proc)); - r_xprt->rx_stats.bad_reply_count++; - status = -EIO; - goto out; + __func__, be32_to_cpu(rep->rr_vers)); + goto repost; -/* The req was still available, but by the time the recv_lock - * was acquired, the rqst and task had been released. Thus the RPC - * has already been terminated. +/* The RPC transaction has already been terminated, or the header + * is corrupt. */ out_norqst: spin_unlock(&xprt->recv_lock); dprintk("RPC: %s: no match for incoming xid 0x%08x\n", - __func__, be32_to_cpu(xid)); + __func__, be32_to_cpu(rep->rr_xid)); goto repost; out_shortreply: dprintk("RPC: %s: short/invalid reply\n", __func__); - goto repost; /* If no pending RPC transaction was matched, post a replacement * receive buffer before returning. |