summaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma/verbs.c
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2016-06-29 19:54:00 +0200
committerAnna Schumaker <Anna.Schumaker@Netapp.com>2016-07-11 21:50:43 +0200
commite2ac236c0b65129f12fef358390f76cc3cacb865 (patch)
tree14c8ab8d8f1ba37fc60695382e55a46eac1d2aa1 /net/sunrpc/xprtrdma/verbs.c
parentxprtrdma: Chunk list encoders must not return zero (diff)
downloadlinux-e2ac236c0b65129f12fef358390f76cc3cacb865.tar.xz
linux-e2ac236c0b65129f12fef358390f76cc3cacb865.zip
xprtrdma: Allocate MRs on demand
Frequent MR list exhaustion can impact I/O throughput, so enough MRs are always created during transport set-up to prevent running out. This means more MRs are created than most workloads need. Commit 94f58c58c0b4 ("xprtrdma: Allow Read list and Reply chunk simultaneously") introduced support for sending two chunk lists per RPC, which consumes more MRs per RPC. Instead of trying to provision more MRs, introduce a mechanism for allocating MRs on demand. A few MRs are allocated during transport set-up to kick things off. This significantly reduces the average number of MRs per transport while allowing the MR count to grow for workloads or devices that need more MRs. FRWR with mlx4 allocated almost 400 MRs per transport before this patch. Now it starts with 32. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Tested-by: Steve Wise <swise@opengridcomputing.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net/sunrpc/xprtrdma/verbs.c')
-rw-r--r--net/sunrpc/xprtrdma/verbs.c98
1 files changed, 92 insertions, 6 deletions
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index db935ed3ac75..e8677eafb329 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -782,6 +782,55 @@ rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
schedule_delayed_work(&buf->rb_recovery_worker, 0);
}
+static void
+rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ unsigned int count;
+ LIST_HEAD(free);
+ LIST_HEAD(all);
+
+ for (count = 0; count < 32; count++) {
+ struct rpcrdma_mw *mw;
+ int rc;
+
+ mw = kzalloc(sizeof(*mw), GFP_KERNEL);
+ if (!mw)
+ break;
+
+ rc = ia->ri_ops->ro_init_mr(ia, mw);
+ if (rc) {
+ kfree(mw);
+ break;
+ }
+
+ mw->mw_xprt = r_xprt;
+
+ list_add(&mw->mw_list, &free);
+ list_add(&mw->mw_all, &all);
+ }
+
+ spin_lock(&buf->rb_mwlock);
+ list_splice(&free, &buf->rb_mws);
+ list_splice(&all, &buf->rb_all);
+ r_xprt->rx_stats.mrs_allocated += count;
+ spin_unlock(&buf->rb_mwlock);
+
+ dprintk("RPC: %s: created %u MRs\n", __func__, count);
+}
+
+static void
+rpcrdma_mr_refresh_worker(struct work_struct *work)
+{
+ struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
+ rb_refresh_worker.work);
+ struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
+ rx_buf);
+
+ rpcrdma_create_mrs(r_xprt);
+}
+
struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
@@ -837,21 +886,23 @@ int
rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
int i, rc;
buf->rb_max_requests = r_xprt->rx_data.max_requests;
buf->rb_bc_srv_max_requests = 0;
atomic_set(&buf->rb_credits, 1);
+ spin_lock_init(&buf->rb_mwlock);
spin_lock_init(&buf->rb_lock);
spin_lock_init(&buf->rb_recovery_lock);
+ INIT_LIST_HEAD(&buf->rb_mws);
+ INIT_LIST_HEAD(&buf->rb_all);
INIT_LIST_HEAD(&buf->rb_stale_mrs);
+ INIT_DELAYED_WORK(&buf->rb_refresh_worker,
+ rpcrdma_mr_refresh_worker);
INIT_DELAYED_WORK(&buf->rb_recovery_worker,
rpcrdma_mr_recovery_worker);
- rc = ia->ri_ops->ro_init(r_xprt);
- if (rc)
- goto out;
+ rpcrdma_create_mrs(r_xprt);
INIT_LIST_HEAD(&buf->rb_send_bufs);
INIT_LIST_HEAD(&buf->rb_allreqs);
@@ -927,6 +978,32 @@ rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
kfree(req);
}
+static void
+rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
+ rx_buf);
+ struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+ struct rpcrdma_mw *mw;
+ unsigned int count;
+
+ count = 0;
+ spin_lock(&buf->rb_mwlock);
+ while (!list_empty(&buf->rb_all)) {
+ mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
+ list_del(&mw->mw_all);
+
+ spin_unlock(&buf->rb_mwlock);
+ ia->ri_ops->ro_release_mr(mw);
+ count++;
+ spin_lock(&buf->rb_mwlock);
+ }
+ spin_unlock(&buf->rb_mwlock);
+ r_xprt->rx_stats.mrs_allocated = 0;
+
+ dprintk("RPC: %s: released %u MRs\n", __func__, count);
+}
+
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
@@ -955,7 +1032,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
}
spin_unlock(&buf->rb_reqslock);
- ia->ri_ops->ro_destroy(buf);
+ rpcrdma_destroy_mrs(buf);
}
struct rpcrdma_mw *
@@ -973,8 +1050,17 @@ rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
spin_unlock(&buf->rb_mwlock);
if (!mw)
- pr_err("RPC: %s: no MWs available\n", __func__);
+ goto out_nomws;
return mw;
+
+out_nomws:
+ dprintk("RPC: %s: no MWs available\n", __func__);
+ schedule_delayed_work(&buf->rb_refresh_worker, 0);
+
+ /* Allow the reply handler and refresh worker to run */
+ cond_resched();
+
+ return NULL;
}
void