diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/auth.c | 2 | ||||
-rw-r--r-- | net/sunrpc/auth_generic.c | 9 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/auth_gss.c | 7 | ||||
-rw-r--r-- | net/sunrpc/auth_unix.c | 9 | ||||
-rw-r--r-- | net/sunrpc/backchannel_rqst.c | 8 | ||||
-rw-r--r-- | net/sunrpc/cache.c | 5 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 132 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 35 | ||||
-rw-r--r-- | net/sunrpc/svc.c | 17 | ||||
-rw-r--r-- | net/sunrpc/xdr.c | 11 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtmultipath.c | 24 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 53 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 7 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 28 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 323 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 21 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 82 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 60 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 202 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 237 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 108 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 34 |
24 files changed, 905 insertions, 513 deletions
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c index a7e42f9a405c..2bff63a73cf8 100644 --- a/net/sunrpc/auth.c +++ b/net/sunrpc/auth.c @@ -551,7 +551,7 @@ rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred, *entry, *new; unsigned int nr; - nr = hash_long(from_kuid(&init_user_ns, acred->uid), cache->hashbits); + nr = auth->au_ops->hash_cred(acred, cache->hashbits); rcu_read_lock(); hlist_for_each_entry_rcu(entry, &cache->hashtable[nr], cr_hash) { diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c index 83dffeadf20a..f1df9837f1ac 100644 --- a/net/sunrpc/auth_generic.c +++ b/net/sunrpc/auth_generic.c @@ -78,6 +78,14 @@ static struct rpc_cred *generic_bind_cred(struct rpc_task *task, return auth->au_ops->lookup_cred(auth, acred, lookupflags); } +static int +generic_hash_cred(struct auth_cred *acred, unsigned int hashbits) +{ + return hash_64(from_kgid(&init_user_ns, acred->gid) | + ((u64)from_kuid(&init_user_ns, acred->uid) << + (sizeof(gid_t) * 8)), hashbits); +} + /* * Lookup generic creds for current process */ @@ -258,6 +266,7 @@ generic_key_timeout(struct rpc_auth *auth, struct rpc_cred *cred) static const struct rpc_authops generic_auth_ops = { .owner = THIS_MODULE, .au_name = "Generic", + .hash_cred = generic_hash_cred, .lookup_cred = generic_lookup_cred, .crcreate = generic_create_cred, .key_timeout = generic_key_timeout, diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index 976c7812bbd5..d8bd97a5a7c9 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -1298,6 +1298,12 @@ gss_destroy_cred(struct rpc_cred *cred) gss_destroy_nullcred(cred); } +static int +gss_hash_cred(struct auth_cred *acred, unsigned int hashbits) +{ + return hash_64(from_kuid(&init_user_ns, acred->uid), hashbits); +} + /* * Lookup RPCSEC_GSS cred for the current process */ @@ -1982,6 +1988,7 @@ static const struct rpc_authops authgss_ops = { .au_name = "RPCSEC_GSS", .create = gss_create, .destroy = gss_destroy, + .hash_cred = gss_hash_cred, .lookup_cred = gss_lookup_cred, .crcreate = gss_create_cred, .list_pseudoflavors = gss_mech_list_pseudoflavors, diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c index a1d768a973f5..306fc0f54596 100644 --- a/net/sunrpc/auth_unix.c +++ b/net/sunrpc/auth_unix.c @@ -46,6 +46,14 @@ unx_destroy(struct rpc_auth *auth) rpcauth_clear_credcache(auth->au_credcache); } +static int +unx_hash_cred(struct auth_cred *acred, unsigned int hashbits) +{ + return hash_64(from_kgid(&init_user_ns, acred->gid) | + ((u64)from_kuid(&init_user_ns, acred->uid) << + (sizeof(gid_t) * 8)), hashbits); +} + /* * Lookup AUTH_UNIX creds for current process */ @@ -220,6 +228,7 @@ const struct rpc_authops authunix_ops = { .au_name = "UNIX", .create = unx_create, .destroy = unx_destroy, + .hash_cred = unx_hash_cred, .lookup_cred = unx_lookup_cred, .crcreate = unx_create_cred, }; diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c index 229956bf8457..ac701c28f44f 100644 --- a/net/sunrpc/backchannel_rqst.c +++ b/net/sunrpc/backchannel_rqst.c @@ -76,13 +76,7 @@ static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags) page = alloc_page(gfp_flags); if (page == NULL) return -ENOMEM; - buf->head[0].iov_base = page_address(page); - buf->head[0].iov_len = PAGE_SIZE; - buf->tail[0].iov_base = NULL; - buf->tail[0].iov_len = 0; - buf->page_len = 0; - buf->len = 0; - buf->buflen = PAGE_SIZE; + xdr_buf_init(buf, page_address(page), PAGE_SIZE); return 0; } diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c index 4d8e11f94a35..8aabe12201f8 100644 --- a/net/sunrpc/cache.c +++ b/net/sunrpc/cache.c @@ -353,7 +353,7 @@ void sunrpc_init_cache_detail(struct cache_detail *cd) spin_unlock(&cache_list_lock); /* start the cleaning process */ - schedule_delayed_work(&cache_cleaner, 0); + queue_delayed_work(system_power_efficient_wq, &cache_cleaner, 0); } EXPORT_SYMBOL_GPL(sunrpc_init_cache_detail); @@ -476,7 +476,8 @@ static void do_cache_clean(struct work_struct *work) delay = 0; if (delay) - schedule_delayed_work(&cache_cleaner, delay); + queue_delayed_work(system_power_efficient_wq, + &cache_cleaner, delay); } diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 66f23b376fa0..34dd7b26ee5f 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -184,7 +184,6 @@ static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event, struct super_block *sb) { struct dentry *dentry; - int err = 0; switch (event) { case RPC_PIPEFS_MOUNT: @@ -201,7 +200,7 @@ static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event, printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event); return -ENOTSUPP; } - return err; + return 0; } static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event, @@ -988,7 +987,6 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt) { if (clnt != NULL) { - rpc_task_release_client(task); if (task->tk_xprt == NULL) task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi); task->tk_client = clnt; @@ -1693,6 +1691,7 @@ call_allocate(struct rpc_task *task) struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; struct rpc_procinfo *proc = task->tk_msg.rpc_proc; + int status; dprint_status(task); @@ -1718,11 +1717,14 @@ call_allocate(struct rpc_task *task) req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen; req->rq_rcvsize <<= 2; - req->rq_buffer = xprt->ops->buf_alloc(task, - req->rq_callsize + req->rq_rcvsize); - if (req->rq_buffer != NULL) - return; + status = xprt->ops->buf_alloc(task); xprt_inject_disconnect(xprt); + if (status == 0) + return; + if (status != -ENOMEM) { + rpc_exit(task, status); + return; + } dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid); @@ -1748,18 +1750,6 @@ rpc_task_force_reencode(struct rpc_task *task) task->tk_rqstp->rq_bytes_sent = 0; } -static inline void -rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len) -{ - buf->head[0].iov_base = start; - buf->head[0].iov_len = len; - buf->tail[0].iov_len = 0; - buf->page_len = 0; - buf->flags = 0; - buf->len = 0; - buf->buflen = len; -} - /* * 3. Encode arguments of an RPC call */ @@ -1772,12 +1762,12 @@ rpc_xdr_encode(struct rpc_task *task) dprint_status(task); - rpc_xdr_buf_init(&req->rq_snd_buf, - req->rq_buffer, - req->rq_callsize); - rpc_xdr_buf_init(&req->rq_rcv_buf, - (char *)req->rq_buffer + req->rq_callsize, - req->rq_rcvsize); + xdr_buf_init(&req->rq_snd_buf, + req->rq_buffer, + req->rq_callsize); + xdr_buf_init(&req->rq_rcv_buf, + req->rq_rbuffer, + req->rq_rcvsize); p = rpc_encode_header(task); if (p == NULL) { @@ -2616,6 +2606,70 @@ int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt, EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt); /** + * rpc_clnt_setup_test_and_add_xprt() + * + * This is an rpc_clnt_add_xprt setup() function which returns 1 so: + * 1) caller of the test function must dereference the rpc_xprt_switch + * and the rpc_xprt. + * 2) test function must call rpc_xprt_switch_add_xprt, usually in + * the rpc_call_done routine. + * + * Upon success (return of 1), the test function adds the new + * transport to the rpc_clnt xprt switch + * + * @clnt: struct rpc_clnt to get the new transport + * @xps: the rpc_xprt_switch to hold the new transport + * @xprt: the rpc_xprt to test + * @data: a struct rpc_add_xprt_test pointer that holds the test function + * and test function call data + */ +int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt, + struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt, + void *data) +{ + struct rpc_cred *cred; + struct rpc_task *task; + struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data; + int status = -EADDRINUSE; + + xprt = xprt_get(xprt); + xprt_switch_get(xps); + + if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr)) + goto out_err; + + /* Test the connection */ + cred = authnull_ops.lookup_cred(NULL, NULL, 0); + task = rpc_call_null_helper(clnt, xprt, cred, + RPC_TASK_SOFT | RPC_TASK_SOFTCONN, + NULL, NULL); + put_rpccred(cred); + if (IS_ERR(task)) { + status = PTR_ERR(task); + goto out_err; + } + status = task->tk_status; + rpc_put_task(task); + + if (status < 0) + goto out_err; + + /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */ + xtest->add_xprt_test(clnt, xprt, xtest->data); + + /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */ + return 1; +out_err: + xprt_put(xprt); + xprt_switch_put(xps); + pr_info("RPC: rpc_clnt_test_xprt failed: %d addr %s not added\n", + status, xprt->address_strings[RPC_DISPLAY_ADDR]); + return status; +} +EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt); + +/** * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt * @clnt: pointer to struct rpc_clnt * @xprtargs: pointer to struct xprt_create @@ -2697,6 +2751,34 @@ rpc_cap_max_reconnect_timeout(struct rpc_clnt *clnt, unsigned long timeo) } EXPORT_SYMBOL_GPL(rpc_cap_max_reconnect_timeout); +void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt) +{ + xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch)); +} +EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put); + +void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt) +{ + rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch), + xprt); +} +EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt); + +bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt, + const struct sockaddr *sap) +{ + struct rpc_xprt_switch *xps; + bool ret; + + xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch); + + rcu_read_lock(); + ret = rpc_xprt_switch_has_addr(xps, sap); + rcu_read_unlock(); + return ret; +} +EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr); + #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) static void rpc_show_header(void) { diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 9ae588511aaf..5db68b371db2 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -849,14 +849,17 @@ static void rpc_async_schedule(struct work_struct *work) } /** - * rpc_malloc - allocate an RPC buffer - * @task: RPC task that will use this buffer - * @size: requested byte size + * rpc_malloc - allocate RPC buffer resources + * @task: RPC task + * + * A single memory region is allocated, which is split between the + * RPC call and RPC reply that this task is being used for. When + * this RPC is retired, the memory is released by calling rpc_free. * * To prevent rpciod from hanging, this allocator never sleeps, - * returning NULL and suppressing warning if the request cannot be serviced - * immediately. - * The caller can arrange to sleep in a way that is safe for rpciod. + * returning -ENOMEM and suppressing warning if the request cannot + * be serviced immediately. The caller can arrange to sleep in a + * way that is safe for rpciod. * * Most requests are 'small' (under 2KiB) and can be serviced from a * mempool, ensuring that NFS reads and writes can always proceed, @@ -865,8 +868,10 @@ static void rpc_async_schedule(struct work_struct *work) * In order to avoid memory starvation triggering more writebacks of * NFS requests, we avoid using GFP_KERNEL. */ -void *rpc_malloc(struct rpc_task *task, size_t size) +int rpc_malloc(struct rpc_task *task) { + struct rpc_rqst *rqst = task->tk_rqstp; + size_t size = rqst->rq_callsize + rqst->rq_rcvsize; struct rpc_buffer *buf; gfp_t gfp = GFP_NOIO | __GFP_NOWARN; @@ -880,28 +885,28 @@ void *rpc_malloc(struct rpc_task *task, size_t size) buf = kmalloc(size, gfp); if (!buf) - return NULL; + return -ENOMEM; buf->len = size; dprintk("RPC: %5u allocated buffer of size %zu at %p\n", task->tk_pid, size, buf); - return &buf->data; + rqst->rq_buffer = buf->data; + rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize; + return 0; } EXPORT_SYMBOL_GPL(rpc_malloc); /** - * rpc_free - free buffer allocated via rpc_malloc - * @buffer: buffer to free + * rpc_free - free RPC buffer resources allocated via rpc_malloc + * @task: RPC task * */ -void rpc_free(void *buffer) +void rpc_free(struct rpc_task *task) { + void *buffer = task->tk_rqstp->rq_buffer; size_t size; struct rpc_buffer *buf; - if (!buffer) - return; - buf = container_of(buffer, struct rpc_buffer, data); size = buf->len; diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index c5b0cb4f4056..7c8070ec93c8 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -401,6 +401,21 @@ int svc_bind(struct svc_serv *serv, struct net *net) } EXPORT_SYMBOL_GPL(svc_bind); +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +static void +__svc_init_bc(struct svc_serv *serv) +{ + INIT_LIST_HEAD(&serv->sv_cb_list); + spin_lock_init(&serv->sv_cb_lock); + init_waitqueue_head(&serv->sv_cb_waitq); +} +#else +static void +__svc_init_bc(struct svc_serv *serv) +{ +} +#endif + /* * Create an RPC service */ @@ -443,6 +458,8 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools, init_timer(&serv->sv_temptimer); spin_lock_init(&serv->sv_lock); + __svc_init_bc(serv); + serv->sv_nrpools = npools; serv->sv_pools = kcalloc(serv->sv_nrpools, sizeof(struct svc_pool), diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c index c4f3cc0c0775..7f1071e103ca 100644 --- a/net/sunrpc/xdr.c +++ b/net/sunrpc/xdr.c @@ -767,7 +767,7 @@ static void xdr_set_next_page(struct xdr_stream *xdr) newbase -= xdr->buf->page_base; if (xdr_set_page_base(xdr, newbase, PAGE_SIZE) < 0) - xdr_set_iov(xdr, xdr->buf->tail, xdr->buf->len); + xdr_set_iov(xdr, xdr->buf->tail, xdr->nwords << 2); } static bool xdr_set_next_buffer(struct xdr_stream *xdr) @@ -776,7 +776,7 @@ static bool xdr_set_next_buffer(struct xdr_stream *xdr) xdr_set_next_page(xdr); else if (xdr->iov == xdr->buf->head) { if (xdr_set_page_base(xdr, 0, PAGE_SIZE) < 0) - xdr_set_iov(xdr, xdr->buf->tail, xdr->buf->len); + xdr_set_iov(xdr, xdr->buf->tail, xdr->nwords << 2); } return xdr->p != xdr->end; } @@ -859,12 +859,15 @@ EXPORT_SYMBOL_GPL(xdr_set_scratch_buffer); static __be32 *xdr_copy_to_scratch(struct xdr_stream *xdr, size_t nbytes) { __be32 *p; - void *cpdest = xdr->scratch.iov_base; + char *cpdest = xdr->scratch.iov_base; size_t cplen = (char *)xdr->end - (char *)xdr->p; if (nbytes > xdr->scratch.iov_len) return NULL; - memcpy(cpdest, xdr->p, cplen); + p = __xdr_inline_decode(xdr, cplen); + if (p == NULL) + return NULL; + memcpy(cpdest, p, cplen); cpdest += cplen; nbytes -= cplen; if (!xdr_set_next_buffer(xdr)) diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index ea244b29138b..685e6d225414 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1295,7 +1295,7 @@ void xprt_release(struct rpc_task *task) xprt_schedule_autodisconnect(xprt); spin_unlock_bh(&xprt->transport_lock); if (req->rq_buffer) - xprt->ops->buf_free(req->rq_buffer); + xprt->ops->buf_free(task); xprt_inject_disconnect(xprt); if (req->rq_cred != NULL) put_rpccred(req->rq_cred); diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c index 66c9d63f4797..ae92a9e9ba52 100644 --- a/net/sunrpc/xprtmultipath.c +++ b/net/sunrpc/xprtmultipath.c @@ -15,6 +15,7 @@ #include <asm/cmpxchg.h> #include <linux/spinlock.h> #include <linux/sunrpc/xprt.h> +#include <linux/sunrpc/addr.h> #include <linux/sunrpc/xprtmultipath.h> typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head, @@ -49,7 +50,8 @@ void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, if (xprt == NULL) return; spin_lock(&xps->xps_lock); - if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) + if ((xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) && + !rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr)) xprt_switch_add_xprt_locked(xps, xprt); spin_unlock(&xps->xps_lock); } @@ -232,6 +234,26 @@ struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi) return xprt_switch_find_current_entry(head, xpi->xpi_cursor); } +bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps, + const struct sockaddr *sap) +{ + struct list_head *head; + struct rpc_xprt *pos; + + if (xps == NULL || sap == NULL) + return false; + + head = &xps->xps_xprt_list; + list_for_each_entry_rcu(pos, head, xprt_switch) { + if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) { + pr_info("RPC: addr %s already in xprt switch\n", + pos->address_strings[RPC_DISPLAY_ADDR]); + return true; + } + } + return false; +} + static struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, const struct rpc_xprt *cur) diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 87762d976b63..2c472e1b4827 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -27,7 +27,7 @@ static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, list_del(&req->rl_all); spin_unlock(&buf->rb_reqslock); - rpcrdma_destroy_req(&r_xprt->rx_ia, req); + rpcrdma_destroy_req(req); kfree(rqst); } @@ -35,10 +35,8 @@ static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) { - struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_regbuf *rb; struct rpcrdma_req *req; - struct xdr_buf *buf; size_t size; req = rpcrdma_create_req(r_xprt); @@ -46,30 +44,19 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, return PTR_ERR(req); req->rl_backchannel = true; - size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); - rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); + rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE, + DMA_TO_DEVICE, GFP_KERNEL); if (IS_ERR(rb)) goto out_fail; req->rl_rdmabuf = rb; - size += RPCRDMA_INLINE_READ_THRESHOLD(rqst); - rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); + size = r_xprt->rx_data.inline_rsize; + rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL); if (IS_ERR(rb)) goto out_fail; - rb->rg_owner = req; req->rl_sendbuf = rb; - /* so that rpcr_to_rdmar works when receiving a request */ - rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base; - - buf = &rqst->rq_snd_buf; - buf->head[0].iov_base = rqst->rq_buffer; - buf->head[0].iov_len = 0; - buf->tail[0].iov_base = NULL; - buf->tail[0].iov_len = 0; - buf->page_len = 0; - buf->len = 0; - buf->buflen = size; - + xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base, size); + rpcrdma_set_xprtdata(rqst, req); return 0; out_fail: @@ -219,7 +206,6 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_msg *headerp; - size_t rpclen; headerp = rdmab_to_msg(req->rl_rdmabuf); headerp->rm_xid = rqst->rq_xid; @@ -231,26 +217,9 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) headerp->rm_body.rm_chunks[1] = xdr_zero; headerp->rm_body.rm_chunks[2] = xdr_zero; - rpclen = rqst->rq_svec[0].iov_len; - -#ifdef RPCRDMA_BACKCHANNEL_DEBUG - pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n", - __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf)); - pr_info("RPC: %s: RPC/RDMA: %*ph\n", - __func__, (int)RPCRDMA_HDRLEN_MIN, headerp); - pr_info("RPC: %s: RPC: %*ph\n", - __func__, (int)rpclen, rqst->rq_svec[0].iov_base); -#endif - - req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); - req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN; - req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); - - req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf); - req->rl_send_iov[1].length = rpclen; - req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf); - - req->rl_niovs = 2; + if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN, + &rqst->rq_snd_buf, rpcrdma_noch)) + return -EIO; return 0; } @@ -402,7 +371,7 @@ out_overflow: out_short: pr_warn("RPC/RDMA short backward direction call\n"); - if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) + if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep)) xprt_disconnect_done(xprt); else pr_warn("RPC: %s: reposting rep %p\n", diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index 21cb3b150b37..1ebb09e1ac4f 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -160,9 +160,8 @@ static int fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_create_data_internal *cdata) { - rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1, - RPCRDMA_MAX_DATA_SEGS / - RPCRDMA_MAX_FMR_SGES)); + ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS / + RPCRDMA_MAX_FMR_SGES); return 0; } @@ -274,6 +273,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ list_for_each_entry(mw, &req->rl_registered, mw_list) list_add_tail(&mw->fmr.fm_mr->list, &unmap_list); + r_xprt->rx_stats.local_inv_needed++; rc = ib_unmap_fmr(&unmap_list); if (rc) goto out_reset; @@ -331,4 +331,5 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { .ro_init_mr = fmr_op_init_mr, .ro_release_mr = fmr_op_release_mr, .ro_displayname = "fmr", + .ro_send_w_inv_ok = 0, }; diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 892b5e1d9b09..210949562786 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -67,6 +67,8 @@ * pending send queue WRs before the transport is reconnected. */ +#include <linux/sunrpc/rpc_rdma.h> + #include "xprt_rdma.h" #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) @@ -161,7 +163,7 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) return PTR_ERR(f->fr_mr); } - dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); + dprintk("RPC: %s: recovered FRMR %p\n", __func__, f); f->fr_state = FRMR_IS_INVALID; return 0; } @@ -242,9 +244,8 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, depth; } - rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1, - RPCRDMA_MAX_DATA_SEGS / - ia->ri_max_frmr_depth)); + ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS / + ia->ri_max_frmr_depth); return 0; } @@ -329,7 +330,7 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); if (wc->status != IB_WC_SUCCESS) __frwr_sendcompletion_flush(wc, frmr, "localinv"); - complete_all(&frmr->fr_linv_done); + complete(&frmr->fr_linv_done); } /* Post a REG_MR Work Request to register a memory region @@ -396,7 +397,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, goto out_mapmr_err; dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n", - __func__, mw, mw->mw_nents, mr->length); + __func__, frmr, mw->mw_nents, mr->length); key = (u8)(mr->rkey & 0x000000FF); ib_update_fast_reg_key(mr, ++key); @@ -449,6 +450,8 @@ __frwr_prepare_linv_wr(struct rpcrdma_mw *mw) struct rpcrdma_frmr *f = &mw->frmr; struct ib_send_wr *invalidate_wr; + dprintk("RPC: %s: invalidating frmr %p\n", __func__, f); + f->fr_state = FRMR_IS_INVALID; invalidate_wr = &f->fr_invwr; @@ -472,6 +475,7 @@ static void frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr; + struct rpcrdma_rep *rep = req->rl_reply; struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_mw *mw, *tmp; struct rpcrdma_frmr *f; @@ -487,6 +491,12 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) f = NULL; invalidate_wrs = pos = prev = NULL; list_for_each_entry(mw, &req->rl_registered, mw_list) { + if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) && + (mw->mw_handle == rep->rr_inv_rkey)) { + mw->frmr.fr_state = FRMR_IS_INVALID; + continue; + } + pos = __frwr_prepare_linv_wr(mw); if (!invalidate_wrs) @@ -496,6 +506,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) prev = pos; f = &mw->frmr; } + if (!f) + goto unmap; /* Strong send queue ordering guarantees that when the * last WR in the chain completes, all WRs in the chain @@ -510,6 +522,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * replaces the QP. The RPC reply handler won't call us * unless ri_id->qp is a valid pointer. */ + r_xprt->rx_stats.local_inv_needed++; rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr); if (rc) goto reset_mrs; @@ -521,6 +534,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ unmap: list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) { + dprintk("RPC: %s: unmapping frmr %p\n", + __func__, &mw->frmr); list_del_init(&mw->mw_list); ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir); @@ -576,4 +591,5 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { .ro_init_mr = frwr_op_init_mr, .ro_release_mr = frwr_op_release_mr, .ro_displayname = "frwr", + .ro_send_w_inv_ok = RPCRDMA_CMP_F_SND_W_INV_OK, }; diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index a47f170b20ef..d987c2d3dd6e 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -53,14 +53,6 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif -enum rpcrdma_chunktype { - rpcrdma_noch = 0, - rpcrdma_readch, - rpcrdma_areadch, - rpcrdma_writech, - rpcrdma_replych -}; - static const char transfertypes[][12] = { "inline", /* no chunks */ "read list", /* some argument via rdma read */ @@ -118,10 +110,12 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) return size; } -void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia, - struct rpcrdma_create_data_internal *cdata, - unsigned int maxsegs) +void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt) { + struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + unsigned int maxsegs = ia->ri_max_segs; + ia->ri_max_inline_write = cdata->inline_wsize - rpcrdma_max_call_header_size(maxsegs); ia->ri_max_inline_read = cdata->inline_rsize - @@ -155,42 +149,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read; } -static int -rpcrdma_tail_pullup(struct xdr_buf *buf) -{ - size_t tlen = buf->tail[0].iov_len; - size_t skip = tlen & 3; - - /* Do not include the tail if it is only an XDR pad */ - if (tlen < 4) - return 0; - - /* xdr_write_pages() adds a pad at the beginning of the tail - * if the content in "buf->pages" is unaligned. Force the - * tail's actual content to land at the next XDR position - * after the head instead. - */ - if (skip) { - unsigned char *src, *dst; - unsigned int count; - - src = buf->tail[0].iov_base; - dst = buf->head[0].iov_base; - dst += buf->head[0].iov_len; - - src += skip; - tlen -= skip; - - dprintk("RPC: %s: skip=%zu, memmove(%p, %p, %zu)\n", - __func__, skip, dst, src, tlen); - - for (count = tlen; count; count--) - *dst++ = *src++; - } - - return tlen; -} - /* Split "vec" on page boundaries into segments. FMR registers pages, * not a byte range. Other modes coalesce these segments into a single * MR when they can. @@ -229,7 +187,8 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n) static int rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, - enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg) + enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, + bool reminv_expected) { int len, n, p, page_base; struct page **ppages; @@ -271,6 +230,13 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, if (type == rpcrdma_readch) return n; + /* When encoding the Write list, some servers need to see an extra + * segment for odd-length Write chunks. The upper layer provides + * space in the tail iovec for this purpose. + */ + if (type == rpcrdma_writech && reminv_expected) + return n; + if (xdrbuf->tail[0].iov_len) { /* the rpcrdma protocol allows us to omit any trailing * xdr pad bytes, saving the server an RDMA operation. */ @@ -327,7 +293,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, if (rtype == rpcrdma_areadch) pos = 0; seg = req->rl_segments; - nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg); + nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, false); if (nsegs < 0) return ERR_PTR(nsegs); @@ -391,7 +357,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, seg = req->rl_segments; nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, rqst->rq_rcv_buf.head[0].iov_len, - wtype, seg); + wtype, seg, + r_xprt->rx_ia.ri_reminv_expected); if (nsegs < 0) return ERR_PTR(nsegs); @@ -456,7 +423,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, } seg = req->rl_segments; - nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg); + nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg, + r_xprt->rx_ia.ri_reminv_expected); if (nsegs < 0) return ERR_PTR(nsegs); @@ -491,74 +459,184 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, return iptr; } -/* - * Copy write data inline. - * This function is used for "small" requests. Data which is passed - * to RPC via iovecs (or page list) is copied directly into the - * pre-registered memory buffer for this request. For small amounts - * of data, this is efficient. The cutoff value is tunable. +/* Prepare the RPC-over-RDMA header SGE. */ -static void rpcrdma_inline_pullup(struct rpc_rqst *rqst) +static bool +rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req, + u32 len) { - int i, npages, curlen; - int copy_len; - unsigned char *srcp, *destp; - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); - int page_base; - struct page **ppages; + struct rpcrdma_regbuf *rb = req->rl_rdmabuf; + struct ib_sge *sge = &req->rl_send_sge[0]; + + if (unlikely(!rpcrdma_regbuf_is_mapped(rb))) { + if (!__rpcrdma_dma_map_regbuf(ia, rb)) + return false; + sge->addr = rdmab_addr(rb); + sge->lkey = rdmab_lkey(rb); + } + sge->length = len; + + ib_dma_sync_single_for_device(ia->ri_device, sge->addr, + sge->length, DMA_TO_DEVICE); + req->rl_send_wr.num_sge++; + return true; +} - destp = rqst->rq_svec[0].iov_base; - curlen = rqst->rq_svec[0].iov_len; - destp += curlen; +/* Prepare the Send SGEs. The head and tail iovec, and each entry + * in the page list, gets its own SGE. + */ +static bool +rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, + struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) +{ + unsigned int sge_no, page_base, len, remaining; + struct rpcrdma_regbuf *rb = req->rl_sendbuf; + struct ib_device *device = ia->ri_device; + struct ib_sge *sge = req->rl_send_sge; + u32 lkey = ia->ri_pd->local_dma_lkey; + struct page *page, **ppages; + + /* The head iovec is straightforward, as it is already + * DMA-mapped. Sync the content that has changed. + */ + if (!rpcrdma_dma_map_regbuf(ia, rb)) + return false; + sge_no = 1; + sge[sge_no].addr = rdmab_addr(rb); + sge[sge_no].length = xdr->head[0].iov_len; + sge[sge_no].lkey = rdmab_lkey(rb); + ib_dma_sync_single_for_device(device, sge[sge_no].addr, + sge[sge_no].length, DMA_TO_DEVICE); + + /* If there is a Read chunk, the page list is being handled + * via explicit RDMA, and thus is skipped here. However, the + * tail iovec may include an XDR pad for the page list, as + * well as additional content, and may not reside in the + * same page as the head iovec. + */ + if (rtype == rpcrdma_readch) { + len = xdr->tail[0].iov_len; - dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n", - __func__, destp, rqst->rq_slen, curlen); + /* Do not include the tail if it is only an XDR pad */ + if (len < 4) + goto out; - copy_len = rqst->rq_snd_buf.page_len; + page = virt_to_page(xdr->tail[0].iov_base); + page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK; - if (rqst->rq_snd_buf.tail[0].iov_len) { - curlen = rqst->rq_snd_buf.tail[0].iov_len; - if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) { - memmove(destp + copy_len, - rqst->rq_snd_buf.tail[0].iov_base, curlen); - r_xprt->rx_stats.pullup_copy_count += curlen; + /* If the content in the page list is an odd length, + * xdr_write_pages() has added a pad at the beginning + * of the tail iovec. Force the tail's non-pad content + * to land at the next XDR position in the Send message. + */ + page_base += len & 3; + len -= len & 3; + goto map_tail; + } + + /* If there is a page list present, temporarily DMA map + * and prepare an SGE for each page to be sent. + */ + if (xdr->page_len) { + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + page_base = xdr->page_base & ~PAGE_MASK; + remaining = xdr->page_len; + while (remaining) { + sge_no++; + if (sge_no > RPCRDMA_MAX_SEND_SGES - 2) + goto out_mapping_overflow; + + len = min_t(u32, PAGE_SIZE - page_base, remaining); + sge[sge_no].addr = ib_dma_map_page(device, *ppages, + page_base, len, + DMA_TO_DEVICE); + if (ib_dma_mapping_error(device, sge[sge_no].addr)) + goto out_mapping_err; + sge[sge_no].length = len; + sge[sge_no].lkey = lkey; + + req->rl_mapped_sges++; + ppages++; + remaining -= len; + page_base = 0; } - dprintk("RPC: %s: tail destp 0x%p len %d\n", - __func__, destp + copy_len, curlen); - rqst->rq_svec[0].iov_len += curlen; } - r_xprt->rx_stats.pullup_copy_count += copy_len; - page_base = rqst->rq_snd_buf.page_base; - ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT); - page_base &= ~PAGE_MASK; - npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT; - for (i = 0; copy_len && i < npages; i++) { - curlen = PAGE_SIZE - page_base; - if (curlen > copy_len) - curlen = copy_len; - dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n", - __func__, i, destp, copy_len, curlen); - srcp = kmap_atomic(ppages[i]); - memcpy(destp, srcp+page_base, curlen); - kunmap_atomic(srcp); - rqst->rq_svec[0].iov_len += curlen; - destp += curlen; - copy_len -= curlen; - page_base = 0; + /* The tail iovec is not always constructed in the same + * page where the head iovec resides (see, for example, + * gss_wrap_req_priv). To neatly accommodate that case, + * DMA map it separately. + */ + if (xdr->tail[0].iov_len) { + page = virt_to_page(xdr->tail[0].iov_base); + page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK; + len = xdr->tail[0].iov_len; + +map_tail: + sge_no++; + sge[sge_no].addr = ib_dma_map_page(device, page, + page_base, len, + DMA_TO_DEVICE); + if (ib_dma_mapping_error(device, sge[sge_no].addr)) + goto out_mapping_err; + sge[sge_no].length = len; + sge[sge_no].lkey = lkey; + req->rl_mapped_sges++; } - /* header now contains entire send message */ + +out: + req->rl_send_wr.num_sge = sge_no + 1; + return true; + +out_mapping_overflow: + pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no); + return false; + +out_mapping_err: + pr_err("rpcrdma: Send mapping error\n"); + return false; +} + +bool +rpcrdma_prepare_send_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, + u32 hdrlen, struct xdr_buf *xdr, + enum rpcrdma_chunktype rtype) +{ + req->rl_send_wr.num_sge = 0; + req->rl_mapped_sges = 0; + + if (!rpcrdma_prepare_hdr_sge(ia, req, hdrlen)) + goto out_map; + + if (rtype != rpcrdma_areadch) + if (!rpcrdma_prepare_msg_sges(ia, req, xdr, rtype)) + goto out_map; + + return true; + +out_map: + pr_err("rpcrdma: failed to DMA map a Send buffer\n"); + return false; +} + +void +rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req) +{ + struct ib_device *device = ia->ri_device; + struct ib_sge *sge; + int count; + + sge = &req->rl_send_sge[2]; + for (count = req->rl_mapped_sges; count--; sge++) + ib_dma_unmap_page(device, sge->addr, sge->length, + DMA_TO_DEVICE); + req->rl_mapped_sges = 0; } /* * Marshal a request: the primary job of this routine is to choose * the transfer modes. See comments below. * - * Prepares up to two IOVs per Call message: - * - * [0] -- RPC RDMA header - * [1] -- the RPC header/data - * * Returns zero on success, otherwise a negative errno. */ @@ -626,12 +704,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) */ if (rpcrdma_args_inline(r_xprt, rqst)) { rtype = rpcrdma_noch; - rpcrdma_inline_pullup(rqst); - rpclen = rqst->rq_svec[0].iov_len; + rpclen = rqst->rq_snd_buf.len; } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { rtype = rpcrdma_readch; - rpclen = rqst->rq_svec[0].iov_len; - rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); + rpclen = rqst->rq_snd_buf.head[0].iov_len + + rqst->rq_snd_buf.tail[0].iov_len; } else { r_xprt->rx_stats.nomsg_call_count++; headerp->rm_type = htonl(RDMA_NOMSG); @@ -673,34 +750,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) goto out_unmap; hdrlen = (unsigned char *)iptr - (unsigned char *)headerp; - if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst)) - goto out_overflow; - dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n", rqst->rq_task->tk_pid, __func__, transfertypes[rtype], transfertypes[wtype], hdrlen, rpclen); - req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); - req->rl_send_iov[0].length = hdrlen; - req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); - - req->rl_niovs = 1; - if (rtype == rpcrdma_areadch) - return 0; - - req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf); - req->rl_send_iov[1].length = rpclen; - req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf); - - req->rl_niovs = 2; + if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, hdrlen, + &rqst->rq_snd_buf, rtype)) { + iptr = ERR_PTR(-EIO); + goto out_unmap; + } return 0; -out_overflow: - pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n", - hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]); - iptr = ERR_PTR(-EIO); - out_unmap: r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); return PTR_ERR(iptr); @@ -916,8 +977,10 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep) * allowed to timeout, to discover the errors at that time. */ void -rpcrdma_reply_handler(struct rpcrdma_rep *rep) +rpcrdma_reply_handler(struct work_struct *work) { + struct rpcrdma_rep *rep = + container_of(work, struct rpcrdma_rep, rr_work); struct rpcrdma_msg *headerp; struct rpcrdma_req *req; struct rpc_rqst *rqst; @@ -1132,6 +1195,6 @@ out_duplicate: repost: r_xprt->rx_stats.bad_reply_count++; - if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) + if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep)) rpcrdma_recv_buffer_put(rep); } diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index a2a7519b0f23..2d8545c34095 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -129,7 +129,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, ret = -EIO; goto out_unmap; } - atomic_inc(&rdma->sc_dma_used); + svc_rdma_count_mappings(rdma, ctxt); memset(&send_wr, 0, sizeof(send_wr)); ctxt->cqe.done = svc_rdma_wc_send; @@ -159,33 +159,34 @@ out_unmap: /* Server-side transport endpoint wants a whole page for its send * buffer. The client RPC code constructs the RPC header in this * buffer before it invokes ->send_request. - * - * Returns NULL if there was a temporary allocation failure. */ -static void * -xprt_rdma_bc_allocate(struct rpc_task *task, size_t size) +static int +xprt_rdma_bc_allocate(struct rpc_task *task) { struct rpc_rqst *rqst = task->tk_rqstp; struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt; + size_t size = rqst->rq_callsize; struct svcxprt_rdma *rdma; struct page *page; rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt); - /* Prevent an infinite loop: try to make this case work */ - if (size > PAGE_SIZE) + if (size > PAGE_SIZE) { WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n", size); + return -EINVAL; + } page = alloc_page(RPCRDMA_DEF_GFP); if (!page) - return NULL; + return -ENOMEM; - return page_address(page); + rqst->rq_buffer = page_address(page); + return 0; } static void -xprt_rdma_bc_free(void *buffer) +xprt_rdma_bc_free(struct rpc_task *task) { /* No-op: ctxt and page have already been freed. */ } diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 2c25606f2561..ad1df979b3f0 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -159,7 +159,7 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt, ctxt->sge[pno].addr); if (ret) goto err; - atomic_inc(&xprt->sc_dma_used); + svc_rdma_count_mappings(xprt, ctxt); ctxt->sge[pno].lkey = xprt->sc_pd->local_dma_lkey; ctxt->sge[pno].length = len; diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 54d533300620..f5a91edcd233 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -225,6 +225,48 @@ svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp, return rp_ary; } +/* RPC-over-RDMA Version One private extension: Remote Invalidation. + * Responder's choice: requester signals it can handle Send With + * Invalidate, and responder chooses one rkey to invalidate. + * + * Find a candidate rkey to invalidate when sending a reply. Picks the + * first rkey it finds in the chunks lists. + * + * Returns zero if RPC's chunk lists are empty. + */ +static u32 svc_rdma_get_inv_rkey(struct rpcrdma_msg *rdma_argp, + struct rpcrdma_write_array *wr_ary, + struct rpcrdma_write_array *rp_ary) +{ + struct rpcrdma_read_chunk *rd_ary; + struct rpcrdma_segment *arg_ch; + u32 inv_rkey; + + inv_rkey = 0; + + rd_ary = svc_rdma_get_read_chunk(rdma_argp); + if (rd_ary) { + inv_rkey = be32_to_cpu(rd_ary->rc_target.rs_handle); + goto out; + } + + if (wr_ary && be32_to_cpu(wr_ary->wc_nchunks)) { + arg_ch = &wr_ary->wc_array[0].wc_target; + inv_rkey = be32_to_cpu(arg_ch->rs_handle); + goto out; + } + + if (rp_ary && be32_to_cpu(rp_ary->wc_nchunks)) { + arg_ch = &rp_ary->wc_array[0].wc_target; + inv_rkey = be32_to_cpu(arg_ch->rs_handle); + goto out; + } + +out: + dprintk("svcrdma: Send With Invalidate rkey=%08x\n", inv_rkey); + return inv_rkey; +} + /* Assumptions: * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE */ @@ -280,7 +322,7 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, if (ib_dma_mapping_error(xprt->sc_cm_id->device, sge[sge_no].addr)) goto err; - atomic_inc(&xprt->sc_dma_used); + svc_rdma_count_mappings(xprt, ctxt); sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; ctxt->count++; sge_off = 0; @@ -464,7 +506,8 @@ static int send_reply(struct svcxprt_rdma *rdma, struct page *page, struct rpcrdma_msg *rdma_resp, struct svc_rdma_req_map *vec, - int byte_count) + int byte_count, + u32 inv_rkey) { struct svc_rdma_op_ctxt *ctxt; struct ib_send_wr send_wr; @@ -489,7 +532,7 @@ static int send_reply(struct svcxprt_rdma *rdma, ctxt->sge[0].length, DMA_TO_DEVICE); if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) goto err; - atomic_inc(&rdma->sc_dma_used); + svc_rdma_count_mappings(rdma, ctxt); ctxt->direction = DMA_TO_DEVICE; @@ -505,7 +548,7 @@ static int send_reply(struct svcxprt_rdma *rdma, if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[sge_no].addr)) goto err; - atomic_inc(&rdma->sc_dma_used); + svc_rdma_count_mappings(rdma, ctxt); ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; ctxt->sge[sge_no].length = sge_bytes; } @@ -523,23 +566,9 @@ static int send_reply(struct svcxprt_rdma *rdma, ctxt->pages[page_no+1] = rqstp->rq_respages[page_no]; ctxt->count++; rqstp->rq_respages[page_no] = NULL; - /* - * If there are more pages than SGE, terminate SGE - * list so that svc_rdma_unmap_dma doesn't attempt to - * unmap garbage. - */ - if (page_no+1 >= sge_no) - ctxt->sge[page_no+1].length = 0; } rqstp->rq_next_page = rqstp->rq_respages + 1; - /* The loop above bumps sc_dma_used for each sge. The - * xdr_buf.tail gets a separate sge, but resides in the - * same page as xdr_buf.head. Don't count it twice. - */ - if (sge_no > ctxt->count) - atomic_dec(&rdma->sc_dma_used); - if (sge_no > rdma->sc_max_sge) { pr_err("svcrdma: Too many sges (%d)\n", sge_no); goto err; @@ -549,7 +578,11 @@ static int send_reply(struct svcxprt_rdma *rdma, send_wr.wr_cqe = &ctxt->cqe; send_wr.sg_list = ctxt->sge; send_wr.num_sge = sge_no; - send_wr.opcode = IB_WR_SEND; + if (inv_rkey) { + send_wr.opcode = IB_WR_SEND_WITH_INV; + send_wr.ex.invalidate_rkey = inv_rkey; + } else + send_wr.opcode = IB_WR_SEND; send_wr.send_flags = IB_SEND_SIGNALED; ret = svc_rdma_send(rdma, &send_wr); @@ -581,6 +614,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) int inline_bytes; struct page *res_page; struct svc_rdma_req_map *vec; + u32 inv_rkey; dprintk("svcrdma: sending response for rqstp=%p\n", rqstp); @@ -591,6 +625,10 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) wr_ary = svc_rdma_get_write_array(rdma_argp); rp_ary = svc_rdma_get_reply_array(rdma_argp, wr_ary); + inv_rkey = 0; + if (rdma->sc_snd_w_inv) + inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_ary, rp_ary); + /* Build an req vec for the XDR */ vec = svc_rdma_get_req_map(rdma); ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL); @@ -633,9 +671,9 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) goto err1; ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec, - inline_bytes); + inline_bytes, inv_rkey); if (ret < 0) - goto err1; + goto err0; svc_rdma_put_req_map(rdma, vec); dprintk("svcrdma: send_reply returns %d\n", ret); @@ -692,7 +730,7 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, svc_rdma_put_context(ctxt, 1); return; } - atomic_inc(&xprt->sc_dma_used); + svc_rdma_count_mappings(xprt, ctxt); /* Prepare SEND WR */ memset(&err_wr, 0, sizeof(err_wr)); diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index eb2857f52b05..6864fb967038 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -198,6 +198,7 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) out: ctxt->count = 0; + ctxt->mapped_sges = 0; ctxt->frmr = NULL; return ctxt; @@ -221,22 +222,27 @@ out_empty: void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) { struct svcxprt_rdma *xprt = ctxt->xprt; - int i; - for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) { + struct ib_device *device = xprt->sc_cm_id->device; + u32 lkey = xprt->sc_pd->local_dma_lkey; + unsigned int i, count; + + for (count = 0, i = 0; i < ctxt->mapped_sges; i++) { /* * Unmap the DMA addr in the SGE if the lkey matches * the local_dma_lkey, otherwise, ignore it since it is * an FRMR lkey and will be unmapped later when the * last WR that uses it completes. */ - if (ctxt->sge[i].lkey == xprt->sc_pd->local_dma_lkey) { - atomic_dec(&xprt->sc_dma_used); - ib_dma_unmap_page(xprt->sc_cm_id->device, + if (ctxt->sge[i].lkey == lkey) { + count++; + ib_dma_unmap_page(device, ctxt->sge[i].addr, ctxt->sge[i].length, ctxt->direction); } } + ctxt->mapped_sges = 0; + atomic_sub(count, &xprt->sc_dma_used); } void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) @@ -600,7 +606,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags) DMA_FROM_DEVICE); if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa)) goto err_put_ctxt; - atomic_inc(&xprt->sc_dma_used); + svc_rdma_count_mappings(xprt, ctxt); ctxt->sge[sge_no].addr = pa; ctxt->sge[sge_no].length = PAGE_SIZE; ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; @@ -642,6 +648,26 @@ int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags) return ret; } +static void +svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt, + struct rdma_conn_param *param) +{ + const struct rpcrdma_connect_private *pmsg = param->private_data; + + if (pmsg && + pmsg->cp_magic == rpcrdma_cmp_magic && + pmsg->cp_version == RPCRDMA_CMP_VERSION) { + newxprt->sc_snd_w_inv = pmsg->cp_flags & + RPCRDMA_CMP_F_SND_W_INV_OK; + + dprintk("svcrdma: client send_size %u, recv_size %u " + "remote inv %ssupported\n", + rpcrdma_decode_buffer_size(pmsg->cp_send_size), + rpcrdma_decode_buffer_size(pmsg->cp_recv_size), + newxprt->sc_snd_w_inv ? "" : "un"); + } +} + /* * This function handles the CONNECT_REQUEST event on a listening * endpoint. It is passed the cma_id for the _new_ connection. The context in @@ -653,7 +679,8 @@ int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags) * will call the recvfrom method on the listen xprt which will accept the new * connection. */ -static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird) +static void handle_connect_req(struct rdma_cm_id *new_cma_id, + struct rdma_conn_param *param) { struct svcxprt_rdma *listen_xprt = new_cma_id->context; struct svcxprt_rdma *newxprt; @@ -669,9 +696,10 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird) new_cma_id->context = newxprt; dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", newxprt, newxprt->sc_cm_id, listen_xprt); + svc_rdma_parse_connect_private(newxprt, param); /* Save client advertised inbound read limit for use later in accept. */ - newxprt->sc_ord = client_ird; + newxprt->sc_ord = param->initiator_depth; /* Set the local and remote addresses in the transport */ sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; @@ -706,8 +734,7 @@ static int rdma_listen_handler(struct rdma_cm_id *cma_id, dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " "event = %s (%d)\n", cma_id, cma_id->context, rdma_event_msg(event->event), event->event); - handle_connect_req(cma_id, - event->param.conn.initiator_depth); + handle_connect_req(cma_id, &event->param.conn); break; case RDMA_CM_EVENT_ESTABLISHED: @@ -941,6 +968,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) struct svcxprt_rdma *listen_rdma; struct svcxprt_rdma *newxprt = NULL; struct rdma_conn_param conn_param; + struct rpcrdma_connect_private pmsg; struct ib_qp_init_attr qp_attr; struct ib_device *dev; unsigned int i; @@ -1070,7 +1098,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) dev->attrs.max_fast_reg_page_list_len; newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG; newxprt->sc_reader = rdma_read_chunk_frmr; - } + } else + newxprt->sc_snd_w_inv = false; /* * Determine if a DMA MR is required and if so, what privs are required @@ -1094,11 +1123,20 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) /* Swap out the handler */ newxprt->sc_cm_id->event_handler = rdma_cma_handler; + /* Construct RDMA-CM private message */ + pmsg.cp_magic = rpcrdma_cmp_magic; + pmsg.cp_version = RPCRDMA_CMP_VERSION; + pmsg.cp_flags = 0; + pmsg.cp_send_size = pmsg.cp_recv_size = + rpcrdma_encode_buffer_size(newxprt->sc_max_req_size); + /* Accept Connection */ set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); memset(&conn_param, 0, sizeof conn_param); conn_param.responder_resources = 0; conn_param.initiator_depth = newxprt->sc_ord; + conn_param.private_data = &pmsg; + conn_param.private_data_len = sizeof(pmsg); ret = rdma_accept(newxprt->sc_cm_id, &conn_param); if (ret) { dprintk("svcrdma: failed to accept new connection, ret=%d\n", diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 81f0e879f019..ed5e285fd2ea 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -97,7 +97,7 @@ static struct ctl_table xr_tunables_table[] = { .data = &xprt_rdma_max_inline_read, .maxlen = sizeof(unsigned int), .mode = 0644, - .proc_handler = proc_dointvec, + .proc_handler = proc_dointvec_minmax, .extra1 = &min_inline_size, .extra2 = &max_inline_size, }, @@ -106,7 +106,7 @@ static struct ctl_table xr_tunables_table[] = { .data = &xprt_rdma_max_inline_write, .maxlen = sizeof(unsigned int), .mode = 0644, - .proc_handler = proc_dointvec, + .proc_handler = proc_dointvec_minmax, .extra1 = &min_inline_size, .extra2 = &max_inline_size, }, @@ -477,115 +477,152 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) } } -/* - * The RDMA allocate/free functions need the task structure as a place - * to hide the struct rpcrdma_req, which is necessary for the actual send/recv - * sequence. +/* Allocate a fixed-size buffer in which to construct and send the + * RPC-over-RDMA header for this request. + */ +static bool +rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + gfp_t flags) +{ + size_t size = RPCRDMA_HDRBUF_SIZE; + struct rpcrdma_regbuf *rb; + + if (req->rl_rdmabuf) + return true; + + rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags); + if (IS_ERR(rb)) + return false; + + r_xprt->rx_stats.hardway_register_count += size; + req->rl_rdmabuf = rb; + return true; +} + +static bool +rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + size_t size, gfp_t flags) +{ + struct rpcrdma_regbuf *rb; + + if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size) + return true; + + rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags); + if (IS_ERR(rb)) + return false; + + rpcrdma_free_regbuf(req->rl_sendbuf); + r_xprt->rx_stats.hardway_register_count += size; + req->rl_sendbuf = rb; + return true; +} + +/* The rq_rcv_buf is used only if a Reply chunk is necessary. + * The decision to use a Reply chunk is made later in + * rpcrdma_marshal_req. This buffer is registered at that time. * - * The RPC layer allocates both send and receive buffers in the same call - * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer). - * We may register rq_rcv_buf when using reply chunks. + * Otherwise, the associated RPC Reply arrives in a separate + * Receive buffer, arbitrarily chosen by the HCA. The buffer + * allocated here for the RPC Reply is not utilized in that + * case. See rpcrdma_inline_fixup. + * + * A regbuf is used here to remember the buffer size. */ -static void * -xprt_rdma_allocate(struct rpc_task *task, size_t size) +static bool +rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + size_t size, gfp_t flags) { - struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_regbuf *rb; + + if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size) + return true; + + rb = rpcrdma_alloc_regbuf(size, DMA_NONE, flags); + if (IS_ERR(rb)) + return false; + + rpcrdma_free_regbuf(req->rl_recvbuf); + r_xprt->rx_stats.hardway_register_count += size; + req->rl_recvbuf = rb; + return true; +} + +/** + * xprt_rdma_allocate - allocate transport resources for an RPC + * @task: RPC task + * + * Return values: + * 0: Success; rq_buffer points to RPC buffer to use + * ENOMEM: Out of memory, call again later + * EIO: A permanent error occurred, do not retry + * + * The RDMA allocate/free functions need the task structure as a place + * to hide the struct rpcrdma_req, which is necessary for the actual + * send/recv sequence. + * + * xprt_rdma_allocate provides buffers that are already mapped for + * DMA, and a local DMA lkey is provided for each. + */ +static int +xprt_rdma_allocate(struct rpc_task *task) +{ + struct rpc_rqst *rqst = task->tk_rqstp; + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_req *req; - size_t min_size; gfp_t flags; req = rpcrdma_buffer_get(&r_xprt->rx_buf); if (req == NULL) - return NULL; + return -ENOMEM; flags = RPCRDMA_DEF_GFP; if (RPC_IS_SWAPPER(task)) flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN; - if (req->rl_rdmabuf == NULL) - goto out_rdmabuf; - if (req->rl_sendbuf == NULL) - goto out_sendbuf; - if (size > req->rl_sendbuf->rg_size) - goto out_sendbuf; - -out: - dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); - req->rl_connect_cookie = 0; /* our reserved value */ - req->rl_task = task; - return req->rl_sendbuf->rg_base; - -out_rdmabuf: - min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp); - rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags); - if (IS_ERR(rb)) + if (!rpcrdma_get_rdmabuf(r_xprt, req, flags)) goto out_fail; - req->rl_rdmabuf = rb; - -out_sendbuf: - /* XDR encoding and RPC/RDMA marshaling of this request has not - * yet occurred. Thus a lower bound is needed to prevent buffer - * overrun during marshaling. - * - * RPC/RDMA marshaling may choose to send payload bearing ops - * inline, if the result is smaller than the inline threshold. - * The value of the "size" argument accounts for header - * requirements but not for the payload in these cases. - * - * Likewise, allocate enough space to receive a reply up to the - * size of the inline threshold. - * - * It's unlikely that both the send header and the received - * reply will be large, but slush is provided here to allow - * flexibility when marshaling. - */ - min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp); - min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp); - if (size < min_size) - size = min_size; - - rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags); - if (IS_ERR(rb)) + if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags)) + goto out_fail; + if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags)) goto out_fail; - rb->rg_owner = req; - r_xprt->rx_stats.hardway_register_count += size; - rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf); - req->rl_sendbuf = rb; - goto out; + dprintk("RPC: %5u %s: send size = %zd, recv size = %zd, req = %p\n", + task->tk_pid, __func__, rqst->rq_callsize, + rqst->rq_rcvsize, req); + + req->rl_connect_cookie = 0; /* our reserved value */ + rpcrdma_set_xprtdata(rqst, req); + rqst->rq_buffer = req->rl_sendbuf->rg_base; + rqst->rq_rbuffer = req->rl_recvbuf->rg_base; + return 0; out_fail: rpcrdma_buffer_put(req); - return NULL; + return -ENOMEM; } -/* - * This function returns all RDMA resources to the pool. +/** + * xprt_rdma_free - release resources allocated by xprt_rdma_allocate + * @task: RPC task + * + * Caller guarantees rqst->rq_buffer is non-NULL. */ static void -xprt_rdma_free(void *buffer) +xprt_rdma_free(struct rpc_task *task) { - struct rpcrdma_req *req; - struct rpcrdma_xprt *r_xprt; - struct rpcrdma_regbuf *rb; - - if (buffer == NULL) - return; + struct rpc_rqst *rqst = task->tk_rqstp; + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + struct rpcrdma_ia *ia = &r_xprt->rx_ia; - rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]); - req = rb->rg_owner; if (req->rl_backchannel) return; - r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf); - dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); - r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, - !RPC_IS_ASYNC(req->rl_task)); - + ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task)); + rpcrdma_unmap_sges(ia, req); rpcrdma_buffer_put(req); } @@ -685,10 +722,11 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) r_xprt->rx_stats.failed_marshal_count, r_xprt->rx_stats.bad_reply_count, r_xprt->rx_stats.nomsg_call_count); - seq_printf(seq, "%lu %lu %lu\n", + seq_printf(seq, "%lu %lu %lu %lu\n", r_xprt->rx_stats.mrs_recovered, r_xprt->rx_stats.mrs_orphaned, - r_xprt->rx_stats.mrs_allocated); + r_xprt->rx_stats.mrs_allocated, + r_xprt->rx_stats.local_inv_needed); } static int diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index be3178e5e2d2..ec74289af7ec 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -129,15 +129,6 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) wc->status, wc->vendor_err); } -static void -rpcrdma_receive_worker(struct work_struct *work) -{ - struct rpcrdma_rep *rep = - container_of(work, struct rpcrdma_rep, rr_work); - - rpcrdma_reply_handler(rep); -} - /* Perform basic sanity checking to avoid using garbage * to update the credit grant value. */ @@ -161,13 +152,13 @@ rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) } /** - * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC + * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC * @cq: completion queue (ignored) * @wc: completed WR * */ static void -rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc) +rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, @@ -185,6 +176,9 @@ rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc) __func__, rep, wc->byte_len); rep->rr_len = wc->byte_len; + rep->rr_wc_flags = wc->wc_flags; + rep->rr_inv_rkey = wc->ex.invalidate_rkey; + ib_dma_sync_single_for_cpu(rep->rr_device, rdmab_addr(rep->rr_rdmabuf), rep->rr_len, DMA_FROM_DEVICE); @@ -204,6 +198,36 @@ out_fail: goto out_schedule; } +static void +rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, + struct rdma_conn_param *param) +{ + struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; + const struct rpcrdma_connect_private *pmsg = param->private_data; + unsigned int rsize, wsize; + + /* Default settings for RPC-over-RDMA Version One */ + r_xprt->rx_ia.ri_reminv_expected = false; + rsize = RPCRDMA_V1_DEF_INLINE_SIZE; + wsize = RPCRDMA_V1_DEF_INLINE_SIZE; + + if (pmsg && + pmsg->cp_magic == rpcrdma_cmp_magic && + pmsg->cp_version == RPCRDMA_CMP_VERSION) { + r_xprt->rx_ia.ri_reminv_expected = true; + rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); + wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); + } + + if (rsize < cdata->inline_rsize) + cdata->inline_rsize = rsize; + if (wsize < cdata->inline_wsize) + cdata->inline_wsize = wsize; + pr_info("rpcrdma: max send %u, max recv %u\n", + cdata->inline_wsize, cdata->inline_rsize); + rpcrdma_set_max_header_sizes(r_xprt); +} + static int rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) { @@ -244,6 +268,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) " (%d initiator)\n", __func__, attr->max_dest_rd_atomic, attr->max_rd_atomic); + rpcrdma_update_connect_private(xprt, &event->param.conn); goto connected; case RDMA_CM_EVENT_CONNECT_ERROR: connstate = -ENOTCONN; @@ -454,11 +479,12 @@ int rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata) { + struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; struct ib_cq *sendcq, *recvcq; unsigned int max_qp_wr; int rc; - if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) { + if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_SEND_SGES) { dprintk("RPC: %s: insufficient sge's available\n", __func__); return -ENOMEM; @@ -487,7 +513,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.cap.max_recv_wr = cdata->max_requests; ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ - ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; + ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_SEND_SGES; ep->rep_attr.cap.max_recv_sge = 1; ep->rep_attr.cap.max_inline_data = 0; ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; @@ -536,9 +562,14 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, /* Initialize cma parameters */ memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); - /* RPC/RDMA does not use private data */ - ep->rep_remote_cma.private_data = NULL; - ep->rep_remote_cma.private_data_len = 0; + /* Prepare RDMA-CM private message */ + pmsg->cp_magic = rpcrdma_cmp_magic; + pmsg->cp_version = RPCRDMA_CMP_VERSION; + pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok; + pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize); + pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize); + ep->rep_remote_cma.private_data = pmsg; + ep->rep_remote_cma.private_data_len = sizeof(*pmsg); /* Client offers RDMA Read but does not initiate */ ep->rep_remote_cma.initiator_depth = 0; @@ -849,6 +880,10 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) req->rl_cqe.done = rpcrdma_wc_send; req->rl_buffer = &r_xprt->rx_buf; INIT_LIST_HEAD(&req->rl_registered); + req->rl_send_wr.next = NULL; + req->rl_send_wr.wr_cqe = &req->rl_cqe; + req->rl_send_wr.sg_list = req->rl_send_sge; + req->rl_send_wr.opcode = IB_WR_SEND; return req; } @@ -865,17 +900,21 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) if (rep == NULL) goto out; - rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize, - GFP_KERNEL); + rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize, + DMA_FROM_DEVICE, GFP_KERNEL); if (IS_ERR(rep->rr_rdmabuf)) { rc = PTR_ERR(rep->rr_rdmabuf); goto out_free; } rep->rr_device = ia->ri_device; - rep->rr_cqe.done = rpcrdma_receive_wc; + rep->rr_cqe.done = rpcrdma_wc_receive; rep->rr_rxprt = r_xprt; - INIT_WORK(&rep->rr_work, rpcrdma_receive_worker); + INIT_WORK(&rep->rr_work, rpcrdma_reply_handler); + rep->rr_recv_wr.next = NULL; + rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; + rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; + rep->rr_recv_wr.num_sge = 1; return rep; out_free: @@ -966,17 +1005,18 @@ rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) } static void -rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) +rpcrdma_destroy_rep(struct rpcrdma_rep *rep) { - rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); + rpcrdma_free_regbuf(rep->rr_rdmabuf); kfree(rep); } void -rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) +rpcrdma_destroy_req(struct rpcrdma_req *req) { - rpcrdma_free_regbuf(ia, req->rl_sendbuf); - rpcrdma_free_regbuf(ia, req->rl_rdmabuf); + rpcrdma_free_regbuf(req->rl_recvbuf); + rpcrdma_free_regbuf(req->rl_sendbuf); + rpcrdma_free_regbuf(req->rl_rdmabuf); kfree(req); } @@ -1009,15 +1049,13 @@ rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) void rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { - struct rpcrdma_ia *ia = rdmab_to_ia(buf); - cancel_delayed_work_sync(&buf->rb_recovery_worker); while (!list_empty(&buf->rb_recv_bufs)) { struct rpcrdma_rep *rep; rep = rpcrdma_buffer_get_rep_locked(buf); - rpcrdma_destroy_rep(ia, rep); + rpcrdma_destroy_rep(rep); } buf->rb_send_count = 0; @@ -1030,7 +1068,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) list_del(&req->rl_all); spin_unlock(&buf->rb_reqslock); - rpcrdma_destroy_req(ia, req); + rpcrdma_destroy_req(req); spin_lock(&buf->rb_reqslock); } spin_unlock(&buf->rb_reqslock); @@ -1129,7 +1167,7 @@ rpcrdma_buffer_put(struct rpcrdma_req *req) struct rpcrdma_buffer *buffers = req->rl_buffer; struct rpcrdma_rep *rep = req->rl_reply; - req->rl_niovs = 0; + req->rl_send_wr.num_sge = 0; req->rl_reply = NULL; spin_lock(&buffers->rb_lock); @@ -1171,70 +1209,81 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) spin_unlock(&buffers->rb_lock); } -/* - * Wrappers for internal-use kmalloc memory registration, used by buffer code. - */ - /** - * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers - * @ia: controlling rpcrdma_ia + * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers * @size: size of buffer to be allocated, in bytes + * @direction: direction of data movement * @flags: GFP flags * - * Returns pointer to private header of an area of internally - * registered memory, or an ERR_PTR. The registered buffer follows - * the end of the private header. + * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that + * can be persistently DMA-mapped for I/O. * * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for - * receiving the payload of RDMA RECV operations. regbufs are not - * used for RDMA READ/WRITE operations, thus are registered only for - * LOCAL access. + * receiving the payload of RDMA RECV operations. During Long Calls + * or Replies they may be registered externally via ro_map. */ struct rpcrdma_regbuf * -rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags) +rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction, + gfp_t flags) { struct rpcrdma_regbuf *rb; - struct ib_sge *iov; rb = kmalloc(sizeof(*rb) + size, flags); if (rb == NULL) - goto out; + return ERR_PTR(-ENOMEM); - iov = &rb->rg_iov; - iov->addr = ib_dma_map_single(ia->ri_device, - (void *)rb->rg_base, size, - DMA_BIDIRECTIONAL); - if (ib_dma_mapping_error(ia->ri_device, iov->addr)) - goto out_free; + rb->rg_device = NULL; + rb->rg_direction = direction; + rb->rg_iov.length = size; - iov->length = size; - iov->lkey = ia->ri_pd->local_dma_lkey; - rb->rg_size = size; - rb->rg_owner = NULL; return rb; +} -out_free: - kfree(rb); -out: - return ERR_PTR(-ENOMEM); +/** + * __rpcrdma_map_regbuf - DMA-map a regbuf + * @ia: controlling rpcrdma_ia + * @rb: regbuf to be mapped + */ +bool +__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) +{ + if (rb->rg_direction == DMA_NONE) + return false; + + rb->rg_iov.addr = ib_dma_map_single(ia->ri_device, + (void *)rb->rg_base, + rdmab_length(rb), + rb->rg_direction); + if (ib_dma_mapping_error(ia->ri_device, rdmab_addr(rb))) + return false; + + rb->rg_device = ia->ri_device; + rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey; + return true; +} + +static void +rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) +{ + if (!rpcrdma_regbuf_is_mapped(rb)) + return; + + ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), + rdmab_length(rb), rb->rg_direction); + rb->rg_device = NULL; } /** * rpcrdma_free_regbuf - deregister and free registered buffer - * @ia: controlling rpcrdma_ia * @rb: regbuf to be deregistered and freed */ void -rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) +rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb) { - struct ib_sge *iov; - if (!rb) return; - iov = &rb->rg_iov; - ib_dma_unmap_single(ia->ri_device, - iov->addr, iov->length, DMA_BIDIRECTIONAL); + rpcrdma_dma_unmap_regbuf(rb); kfree(rb); } @@ -1248,39 +1297,28 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_req *req) { - struct ib_device *device = ia->ri_device; - struct ib_send_wr send_wr, *send_wr_fail; - struct rpcrdma_rep *rep = req->rl_reply; - struct ib_sge *iov = req->rl_send_iov; - int i, rc; + struct ib_send_wr *send_wr = &req->rl_send_wr; + struct ib_send_wr *send_wr_fail; + int rc; - if (rep) { - rc = rpcrdma_ep_post_recv(ia, ep, rep); + if (req->rl_reply) { + rc = rpcrdma_ep_post_recv(ia, req->rl_reply); if (rc) return rc; req->rl_reply = NULL; } - send_wr.next = NULL; - send_wr.wr_cqe = &req->rl_cqe; - send_wr.sg_list = iov; - send_wr.num_sge = req->rl_niovs; - send_wr.opcode = IB_WR_SEND; - - for (i = 0; i < send_wr.num_sge; i++) - ib_dma_sync_single_for_device(device, iov[i].addr, - iov[i].length, DMA_TO_DEVICE); dprintk("RPC: %s: posting %d s/g entries\n", - __func__, send_wr.num_sge); + __func__, send_wr->num_sge); if (DECR_CQCOUNT(ep) > 0) - send_wr.send_flags = 0; + send_wr->send_flags = 0; else { /* Provider must take a send completion every now and then */ INIT_CQCOUNT(ep); - send_wr.send_flags = IB_SEND_SIGNALED; + send_wr->send_flags = IB_SEND_SIGNALED; } - rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail); + rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); if (rc) goto out_postsend_err; return 0; @@ -1290,32 +1328,24 @@ out_postsend_err: return -ENOTCONN; } -/* - * (Re)post a receive buffer. - */ int rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, - struct rpcrdma_ep *ep, struct rpcrdma_rep *rep) { - struct ib_recv_wr recv_wr, *recv_wr_fail; + struct ib_recv_wr *recv_wr_fail; int rc; - recv_wr.next = NULL; - recv_wr.wr_cqe = &rep->rr_cqe; - recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; - recv_wr.num_sge = 1; - - ib_dma_sync_single_for_cpu(ia->ri_device, - rdmab_addr(rep->rr_rdmabuf), - rdmab_length(rep->rr_rdmabuf), - DMA_BIDIRECTIONAL); - - rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail); + if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) + goto out_map; + rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); if (rc) goto out_postrecv; return 0; +out_map: + pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); + return -EIO; + out_postrecv: pr_err("rpcrdma: ib_post_recv returned %i\n", rc); return -ENOTCONN; @@ -1333,7 +1363,6 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) { struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_ep *ep = &r_xprt->rx_ep; struct rpcrdma_rep *rep; int rc; @@ -1344,7 +1373,7 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) rep = rpcrdma_buffer_get_rep_locked(buffers); spin_unlock(&buffers->rb_lock); - rc = rpcrdma_ep_post_recv(ia, ep, rep); + rc = rpcrdma_ep_post_recv(ia, rep); if (rc) goto out_rc; } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index a71b0f5897d8..0d35b761c883 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -70,9 +70,11 @@ struct rpcrdma_ia { struct ib_pd *ri_pd; struct completion ri_done; int ri_async_rc; + unsigned int ri_max_segs; unsigned int ri_max_frmr_depth; unsigned int ri_max_inline_write; unsigned int ri_max_inline_read; + bool ri_reminv_expected; struct ib_qp_attr ri_qp_attr; struct ib_qp_init_attr ri_qp_init_attr; }; @@ -87,6 +89,7 @@ struct rpcrdma_ep { int rep_connected; struct ib_qp_init_attr rep_attr; wait_queue_head_t rep_connect_wait; + struct rpcrdma_connect_private rep_cm_private; struct rdma_conn_param rep_remote_cma; struct sockaddr_storage rep_remote_addr; struct delayed_work rep_connect_worker; @@ -112,9 +115,9 @@ struct rpcrdma_ep { */ struct rpcrdma_regbuf { - size_t rg_size; - struct rpcrdma_req *rg_owner; struct ib_sge rg_iov; + struct ib_device *rg_device; + enum dma_data_direction rg_direction; __be32 rg_base[0] __attribute__ ((aligned(256))); }; @@ -162,7 +165,10 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) * The smallest inline threshold is 1024 bytes, ensuring that * at least 750 bytes are available for RPC messages. */ -#define RPCRDMA_MAX_HDR_SEGS (8) +enum { + RPCRDMA_MAX_HDR_SEGS = 8, + RPCRDMA_HDRBUF_SIZE = 256, +}; /* * struct rpcrdma_rep -- this structure encapsulates state required to recv @@ -182,10 +188,13 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) struct rpcrdma_rep { struct ib_cqe rr_cqe; unsigned int rr_len; + int rr_wc_flags; + u32 rr_inv_rkey; struct ib_device *rr_device; struct rpcrdma_xprt *rr_rxprt; struct work_struct rr_work; struct list_head rr_list; + struct ib_recv_wr rr_recv_wr; struct rpcrdma_regbuf *rr_rdmabuf; }; @@ -276,19 +285,30 @@ struct rpcrdma_mr_seg { /* chunk descriptors */ char *mr_offset; /* kva if no page, else offset */ }; -#define RPCRDMA_MAX_IOVS (2) +/* Reserve enough Send SGEs to send a maximum size inline request: + * - RPC-over-RDMA header + * - xdr_buf head iovec + * - RPCRDMA_MAX_INLINE bytes, possibly unaligned, in pages + * - xdr_buf tail iovec + */ +enum { + RPCRDMA_MAX_SEND_PAGES = PAGE_SIZE + RPCRDMA_MAX_INLINE - 1, + RPCRDMA_MAX_PAGE_SGES = (RPCRDMA_MAX_SEND_PAGES >> PAGE_SHIFT) + 1, + RPCRDMA_MAX_SEND_SGES = 1 + 1 + RPCRDMA_MAX_PAGE_SGES + 1, +}; struct rpcrdma_buffer; struct rpcrdma_req { struct list_head rl_free; - unsigned int rl_niovs; + unsigned int rl_mapped_sges; unsigned int rl_connect_cookie; - struct rpc_task *rl_task; struct rpcrdma_buffer *rl_buffer; - struct rpcrdma_rep *rl_reply;/* holder for reply buffer */ - struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS]; - struct rpcrdma_regbuf *rl_rdmabuf; - struct rpcrdma_regbuf *rl_sendbuf; + struct rpcrdma_rep *rl_reply; + struct ib_send_wr rl_send_wr; + struct ib_sge rl_send_sge[RPCRDMA_MAX_SEND_SGES]; + struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */ + struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */ + struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */ struct ib_cqe rl_cqe; struct list_head rl_all; @@ -298,14 +318,16 @@ struct rpcrdma_req { struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; }; +static inline void +rpcrdma_set_xprtdata(struct rpc_rqst *rqst, struct rpcrdma_req *req) +{ + rqst->rq_xprtdata = req; +} + static inline struct rpcrdma_req * rpcr_to_rdmar(struct rpc_rqst *rqst) { - void *buffer = rqst->rq_buffer; - struct rpcrdma_regbuf *rb; - - rb = container_of(buffer, struct rpcrdma_regbuf, rg_base); - return rb->rg_owner; + return rqst->rq_xprtdata; } /* @@ -356,15 +378,6 @@ struct rpcrdma_create_data_internal { unsigned int padding; /* non-rdma write header padding */ }; -#define RPCRDMA_INLINE_READ_THRESHOLD(rq) \ - (rpcx_to_rdmad(rq->rq_xprt).inline_rsize) - -#define RPCRDMA_INLINE_WRITE_THRESHOLD(rq)\ - (rpcx_to_rdmad(rq->rq_xprt).inline_wsize) - -#define RPCRDMA_INLINE_PAD_VALUE(rq)\ - rpcx_to_rdmad(rq->rq_xprt).padding - /* * Statistics for RPCRDMA */ @@ -386,6 +399,7 @@ struct rpcrdma_stats { unsigned long mrs_recovered; unsigned long mrs_orphaned; unsigned long mrs_allocated; + unsigned long local_inv_needed; }; /* @@ -409,6 +423,7 @@ struct rpcrdma_memreg_ops { struct rpcrdma_mw *); void (*ro_release_mr)(struct rpcrdma_mw *); const char *ro_displayname; + const int ro_send_w_inv_ok; }; extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops; @@ -461,15 +476,14 @@ void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *); int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *, struct rpcrdma_req *); -int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *, - struct rpcrdma_rep *); +int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_rep *); /* * Buffer calls - xprtrdma/verbs.c */ struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *); struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *); -void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *); +void rpcrdma_destroy_req(struct rpcrdma_req *); int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); @@ -482,10 +496,24 @@ void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *); -struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *, - size_t, gfp_t); -void rpcrdma_free_regbuf(struct rpcrdma_ia *, - struct rpcrdma_regbuf *); +struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(size_t, enum dma_data_direction, + gfp_t); +bool __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *); +void rpcrdma_free_regbuf(struct rpcrdma_regbuf *); + +static inline bool +rpcrdma_regbuf_is_mapped(struct rpcrdma_regbuf *rb) +{ + return rb->rg_device != NULL; +} + +static inline bool +rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) +{ + if (likely(rpcrdma_regbuf_is_mapped(rb))) + return true; + return __rpcrdma_dma_map_regbuf(ia, rb); +} int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); @@ -507,15 +535,25 @@ rpcrdma_data_dir(bool writing) */ void rpcrdma_connect_worker(struct work_struct *); void rpcrdma_conn_func(struct rpcrdma_ep *); -void rpcrdma_reply_handler(struct rpcrdma_rep *); +void rpcrdma_reply_handler(struct work_struct *); /* * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c */ + +enum rpcrdma_chunktype { + rpcrdma_noch = 0, + rpcrdma_readch, + rpcrdma_areadch, + rpcrdma_writech, + rpcrdma_replych +}; + +bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *, + u32, struct xdr_buf *, enum rpcrdma_chunktype); +void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *); int rpcrdma_marshal_req(struct rpc_rqst *); -void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *, - struct rpcrdma_create_data_internal *, - unsigned int); +void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); /* RPC/RDMA module init - xprtrdma/transport.c */ diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index bf168838a029..0137af1c0916 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -473,7 +473,16 @@ static int xs_nospace(struct rpc_task *task) spin_unlock_bh(&xprt->transport_lock); /* Race breaker in case memory is freed before above code is called */ - sk->sk_write_space(sk); + if (ret == -EAGAIN) { + struct socket_wq *wq; + + rcu_read_lock(); + wq = rcu_dereference(sk->sk_wq); + set_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags); + rcu_read_unlock(); + + sk->sk_write_space(sk); + } return ret; } @@ -2533,35 +2542,38 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) * we allocate pages instead doing a kmalloc like rpc_malloc is because we want * to use the server side send routines. */ -static void *bc_malloc(struct rpc_task *task, size_t size) +static int bc_malloc(struct rpc_task *task) { + struct rpc_rqst *rqst = task->tk_rqstp; + size_t size = rqst->rq_callsize; struct page *page; struct rpc_buffer *buf; - WARN_ON_ONCE(size > PAGE_SIZE - sizeof(struct rpc_buffer)); - if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) - return NULL; + if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) { + WARN_ONCE(1, "xprtsock: large bc buffer request (size %zu)\n", + size); + return -EINVAL; + } page = alloc_page(GFP_KERNEL); if (!page) - return NULL; + return -ENOMEM; buf = page_address(page); buf->len = PAGE_SIZE; - return buf->data; + rqst->rq_buffer = buf->data; + return 0; } /* * Free the space allocated in the bc_alloc routine */ -static void bc_free(void *buffer) +static void bc_free(struct rpc_task *task) { + void *buffer = task->tk_rqstp->rq_buffer; struct rpc_buffer *buf; - if (!buffer) - return; - buf = container_of(buffer, struct rpc_buffer, data); free_page((unsigned long)buf); } |