summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c6
-rw-r--r--net/sunrpc/xprtrdma/transport.c146
-rw-r--r--net/sunrpc/xprtrdma/verbs.c16
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h14
4 files changed, 78 insertions, 104 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index f2eda155299a..8a6bdbd3e936 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -541,9 +541,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
req->rl_send_iov[0].length = hdrlen;
req->rl_send_iov[0].lkey = req->rl_iov.lkey;
- req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
+ req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen;
- req->rl_send_iov[1].lkey = req->rl_iov.lkey;
+ req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
req->rl_niovs = 2;
@@ -556,7 +556,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
- req->rl_send_iov[3].lkey = req->rl_iov.lkey;
+ req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf);
req->rl_niovs = 4;
}
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 808b3c52427a..a9d566227e7e 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -449,77 +449,72 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
/*
* The RDMA allocate/free functions need the task structure as a place
* to hide the struct rpcrdma_req, which is necessary for the actual send/recv
- * sequence. For this reason, the recv buffers are attached to send
- * buffers for portions of the RPC. Note that the RPC layer allocates
- * both send and receive buffers in the same call. We may register
- * the receive buffer portion when using reply chunks.
+ * sequence.
+ *
+ * The RPC layer allocates both send and receive buffers in the same call
+ * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
+ * We may register rq_rcv_buf when using reply chunks.
*/
static void *
xprt_rdma_allocate(struct rpc_task *task, size_t size)
{
struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
- struct rpcrdma_req *req, *nreq;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_regbuf *rb;
+ struct rpcrdma_req *req;
+ size_t min_size;
+ gfp_t flags = task->tk_flags & RPC_TASK_SWAPPER ?
+ GFP_ATOMIC : GFP_NOFS;
- req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
+ req = rpcrdma_buffer_get(&r_xprt->rx_buf);
if (req == NULL)
return NULL;
- if (size > req->rl_size) {
- dprintk("RPC: %s: size %zd too large for buffer[%zd]: "
- "prog %d vers %d proc %d\n",
- __func__, size, req->rl_size,
- task->tk_client->cl_prog, task->tk_client->cl_vers,
- task->tk_msg.rpc_proc->p_proc);
- /*
- * Outgoing length shortage. Our inline write max must have
- * been configured to perform direct i/o.
- *
- * This is therefore a large metadata operation, and the
- * allocate call was made on the maximum possible message,
- * e.g. containing long filename(s) or symlink data. In
- * fact, while these metadata operations *might* carry
- * large outgoing payloads, they rarely *do*. However, we
- * have to commit to the request here, so reallocate and
- * register it now. The data path will never require this
- * reallocation.
- *
- * If the allocation or registration fails, the RPC framework
- * will (doggedly) retry.
- */
- if (task->tk_flags & RPC_TASK_SWAPPER)
- nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
- else
- nreq = kmalloc(sizeof *req + size, GFP_NOFS);
- if (nreq == NULL)
- goto outfail;
-
- if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
- nreq->rl_base, size + sizeof(struct rpcrdma_req)
- - offsetof(struct rpcrdma_req, rl_base),
- &nreq->rl_handle, &nreq->rl_iov)) {
- kfree(nreq);
- goto outfail;
- }
- rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
- nreq->rl_size = size;
- nreq->rl_niovs = 0;
- nreq->rl_nchunks = 0;
- nreq->rl_buffer = (struct rpcrdma_buffer *)req;
- nreq->rl_reply = req->rl_reply;
- memcpy(nreq->rl_segments,
- req->rl_segments, sizeof nreq->rl_segments);
- /* flag the swap with an unused field */
- nreq->rl_iov.length = 0;
- req->rl_reply = NULL;
- req = nreq;
- }
+ if (req->rl_sendbuf == NULL)
+ goto out_sendbuf;
+ if (size > req->rl_sendbuf->rg_size)
+ goto out_sendbuf;
+
+out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
- return req->rl_xdr_buf;
-
-outfail:
+ return req->rl_sendbuf->rg_base;
+
+out_sendbuf:
+ /* XDR encoding and RPC/RDMA marshaling of this request has not
+ * yet occurred. Thus a lower bound is needed to prevent buffer
+ * overrun during marshaling.
+ *
+ * RPC/RDMA marshaling may choose to send payload bearing ops
+ * inline, if the result is smaller than the inline threshold.
+ * The value of the "size" argument accounts for header
+ * requirements but not for the payload in these cases.
+ *
+ * Likewise, allocate enough space to receive a reply up to the
+ * size of the inline threshold.
+ *
+ * It's unlikely that both the send header and the received
+ * reply will be large, but slush is provided here to allow
+ * flexibility when marshaling.
+ */
+ min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
+ min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ if (size < min_size)
+ size = min_size;
+
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ if (IS_ERR(rb))
+ goto out_fail;
+ rb->rg_owner = req;
+
+ r_xprt->rx_stats.hardway_register_count += size;
+ rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+ req->rl_sendbuf = rb;
+ goto out;
+
+out_fail:
rpcrdma_buffer_put(req);
- rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
+ r_xprt->rx_stats.failed_marshal_count++;
return NULL;
}
@@ -531,47 +526,24 @@ xprt_rdma_free(void *buffer)
{
struct rpcrdma_req *req;
struct rpcrdma_xprt *r_xprt;
- struct rpcrdma_rep *rep;
+ struct rpcrdma_regbuf *rb;
int i;
if (buffer == NULL)
return;
- req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
- if (req->rl_iov.length == 0) { /* see allocate above */
- r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
- struct rpcrdma_xprt, rx_buf);
- } else
- r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
- rep = req->rl_reply;
+ rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
+ req = rb->rg_owner;
+ r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
- dprintk("RPC: %s: called on 0x%p%s\n",
- __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
+ dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
- /*
- * Finish the deregistration. The process is considered
- * complete when the rr_func vector becomes NULL - this
- * was put in place during rpcrdma_reply_handler() - the wait
- * call below will not block if the dereg is "done". If
- * interrupted, our framework will clean up.
- */
for (i = 0; req->rl_nchunks;) {
--req->rl_nchunks;
i += rpcrdma_deregister_external(
&req->rl_segments[i], r_xprt);
}
- if (req->rl_iov.length == 0) { /* see allocate above */
- struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
- oreq->rl_reply = req->rl_reply;
- (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
- req->rl_handle,
- &req->rl_iov);
- kfree(req);
- req = oreq;
- }
-
- /* Put back request+reply buffers */
rpcrdma_buffer_put(req);
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index cdd6aacc9168..40894403db81 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1079,25 +1079,22 @@ static struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
- size_t wlen = 1 << fls(cdata->inline_wsize +
- sizeof(struct rpcrdma_req));
+ size_t wlen = cdata->inline_wsize;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_req *req;
int rc;
rc = -ENOMEM;
- req = kmalloc(wlen, GFP_KERNEL);
+ req = kmalloc(sizeof(*req) + wlen, GFP_KERNEL);
if (req == NULL)
goto out;
- memset(req, 0, sizeof(struct rpcrdma_req));
+ memset(req, 0, sizeof(*req));
- rc = rpcrdma_register_internal(ia, req->rl_base, wlen -
- offsetof(struct rpcrdma_req, rl_base),
+ rc = rpcrdma_register_internal(ia, req->rl_base, wlen,
&req->rl_handle, &req->rl_iov);
if (rc)
goto out_free;
- req->rl_size = wlen - sizeof(struct rpcrdma_req);
req->rl_buffer = &r_xprt->rx_buf;
return req;
@@ -1121,7 +1118,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
rep = kmalloc(rlen, GFP_KERNEL);
if (rep == NULL)
goto out;
- memset(rep, 0, sizeof(struct rpcrdma_rep));
+ memset(rep, 0, sizeof(*rep));
rc = rpcrdma_register_internal(ia, rep->rr_base, rlen -
offsetof(struct rpcrdma_rep, rr_base),
@@ -1335,6 +1332,7 @@ rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
if (!req)
return;
+ rpcrdma_free_regbuf(ia, req->rl_sendbuf);
rpcrdma_deregister_internal(ia, req->rl_handle, &req->rl_iov);
kfree(req);
}
@@ -1729,8 +1727,6 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
struct rpcrdma_buffer *buffers = req->rl_buffer;
unsigned long flags;
- if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
- buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
spin_lock_irqsave(&buffers->rb_lock, flags);
if (buffers->rb_recv_index < buffers->rb_max_requests) {
req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 36c37c60f1fe..aa82f8d1c5b4 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -262,7 +262,6 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
};
struct rpcrdma_req {
- size_t rl_size; /* actual length of buffer */
unsigned int rl_niovs; /* 0, 2 or 4 */
unsigned int rl_nchunks; /* non-zero if chunks */
unsigned int rl_connect_cookie; /* retry detection */
@@ -271,13 +270,20 @@ struct rpcrdma_req {
struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
struct ib_sge rl_send_iov[4]; /* for active requests */
+ struct rpcrdma_regbuf *rl_sendbuf;
struct ib_sge rl_iov; /* for posting */
struct ib_mr *rl_handle; /* handle for mem in rl_iov */
char rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */
- __u32 rl_xdr_buf[0]; /* start of returned rpc rq_buffer */
};
-#define rpcr_to_rdmar(r) \
- container_of((r)->rq_buffer, struct rpcrdma_req, rl_xdr_buf[0])
+
+static inline struct rpcrdma_req *
+rpcr_to_rdmar(struct rpc_rqst *rqst)
+{
+ struct rpcrdma_regbuf *rb = container_of(rqst->rq_buffer,
+ struct rpcrdma_regbuf,
+ rg_base[0]);
+ return rb->rg_owner;
+}
/*
* struct rpcrdma_buffer -- holds list/queue of pre-registered memory for