summaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/Makefile3
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c868
-rw-r--r--net/sunrpc/xprtrdma/transport.c800
-rw-r--r--net/sunrpc/xprtrdma/verbs.c1626
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h330
5 files changed, 3627 insertions, 0 deletions
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
new file mode 100644
index 000000000000..264f0feeb513
--- /dev/null
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -0,0 +1,3 @@
+obj-$(CONFIG_SUNRPC_XPRT_RDMA) += xprtrdma.o
+
+xprtrdma-y := transport.o rpc_rdma.o verbs.o
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
new file mode 100644
index 000000000000..12db63580427
--- /dev/null
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -0,0 +1,868 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * rpc_rdma.c
+ *
+ * This file contains the guts of the RPC RDMA protocol, and
+ * does marshaling/unmarshaling, etc. It is also where interfacing
+ * to the Linux RPC framework lives.
+ */
+
+#include "xprt_rdma.h"
+
+#include <linux/highmem.h>
+
+#ifdef RPC_DEBUG
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+enum rpcrdma_chunktype {
+ rpcrdma_noch = 0,
+ rpcrdma_readch,
+ rpcrdma_areadch,
+ rpcrdma_writech,
+ rpcrdma_replych
+};
+
+#ifdef RPC_DEBUG
+static const char transfertypes[][12] = {
+ "pure inline", /* no chunks */
+ " read chunk", /* some argument via rdma read */
+ "*read chunk", /* entire request via rdma read */
+ "write chunk", /* some result via rdma write */
+ "reply chunk" /* entire reply via rdma write */
+};
+#endif
+
+/*
+ * Chunk assembly from upper layer xdr_buf.
+ *
+ * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
+ * elements. Segments are then coalesced when registered, if possible
+ * within the selected memreg mode.
+ *
+ * Note, this routine is never called if the connection's memory
+ * registration strategy is 0 (bounce buffers).
+ */
+
+static int
+rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, int pos,
+ enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
+{
+ int len, n = 0, p;
+
+ if (pos == 0 && xdrbuf->head[0].iov_len) {
+ seg[n].mr_page = NULL;
+ seg[n].mr_offset = xdrbuf->head[0].iov_base;
+ seg[n].mr_len = xdrbuf->head[0].iov_len;
+ pos += xdrbuf->head[0].iov_len;
+ ++n;
+ }
+
+ if (xdrbuf->page_len && (xdrbuf->pages[0] != NULL)) {
+ if (n == nsegs)
+ return 0;
+ seg[n].mr_page = xdrbuf->pages[0];
+ seg[n].mr_offset = (void *)(unsigned long) xdrbuf->page_base;
+ seg[n].mr_len = min_t(u32,
+ PAGE_SIZE - xdrbuf->page_base, xdrbuf->page_len);
+ len = xdrbuf->page_len - seg[n].mr_len;
+ pos += len;
+ ++n;
+ p = 1;
+ while (len > 0) {
+ if (n == nsegs)
+ return 0;
+ seg[n].mr_page = xdrbuf->pages[p];
+ seg[n].mr_offset = NULL;
+ seg[n].mr_len = min_t(u32, PAGE_SIZE, len);
+ len -= seg[n].mr_len;
+ ++n;
+ ++p;
+ }
+ }
+
+ if (pos < xdrbuf->len && xdrbuf->tail[0].iov_len) {
+ if (n == nsegs)
+ return 0;
+ seg[n].mr_page = NULL;
+ seg[n].mr_offset = xdrbuf->tail[0].iov_base;
+ seg[n].mr_len = xdrbuf->tail[0].iov_len;
+ pos += xdrbuf->tail[0].iov_len;
+ ++n;
+ }
+
+ if (pos < xdrbuf->len)
+ dprintk("RPC: %s: marshaled only %d of %d\n",
+ __func__, pos, xdrbuf->len);
+
+ return n;
+}
+
+/*
+ * Create read/write chunk lists, and reply chunks, for RDMA
+ *
+ * Assume check against THRESHOLD has been done, and chunks are required.
+ * Assume only encoding one list entry for read|write chunks. The NFSv3
+ * protocol is simple enough to allow this as it only has a single "bulk
+ * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
+ * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
+ *
+ * When used for a single reply chunk (which is a special write
+ * chunk used for the entire reply, rather than just the data), it
+ * is used primarily for READDIR and READLINK which would otherwise
+ * be severely size-limited by a small rdma inline read max. The server
+ * response will come back as an RDMA Write, followed by a message
+ * of type RDMA_NOMSG carrying the xid and length. As a result, reply
+ * chunks do not provide data alignment, however they do not require
+ * "fixup" (moving the response to the upper layer buffer) either.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
+ * Read chunklist (a linked list):
+ * N elements, position P (same P for all chunks of same arg!):
+ * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
+ *
+ * Write chunklist (a list of (one) counted array):
+ * N elements:
+ * 1 - N - HLOO - HLOO - ... - HLOO - 0
+ *
+ * Reply chunk (a counted array):
+ * N elements:
+ * 1 - N - HLOO - HLOO - ... - HLOO
+ */
+
+static unsigned int
+rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
+ struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
+{
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt);
+ int nsegs, nchunks = 0;
+ int pos;
+ struct rpcrdma_mr_seg *seg = req->rl_segments;
+ struct rpcrdma_read_chunk *cur_rchunk = NULL;
+ struct rpcrdma_write_array *warray = NULL;
+ struct rpcrdma_write_chunk *cur_wchunk = NULL;
+ u32 *iptr = headerp->rm_body.rm_chunks;
+
+ if (type == rpcrdma_readch || type == rpcrdma_areadch) {
+ /* a read chunk - server will RDMA Read our memory */
+ cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
+ } else {
+ /* a write or reply chunk - server will RDMA Write our memory */
+ *iptr++ = xdr_zero; /* encode a NULL read chunk list */
+ if (type == rpcrdma_replych)
+ *iptr++ = xdr_zero; /* a NULL write chunk list */
+ warray = (struct rpcrdma_write_array *) iptr;
+ cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
+ }
+
+ if (type == rpcrdma_replych || type == rpcrdma_areadch)
+ pos = 0;
+ else
+ pos = target->head[0].iov_len;
+
+ nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
+ if (nsegs == 0)
+ return 0;
+
+ do {
+ /* bind/register the memory, then build chunk from result. */
+ int n = rpcrdma_register_external(seg, nsegs,
+ cur_wchunk != NULL, r_xprt);
+ if (n <= 0)
+ goto out;
+ if (cur_rchunk) { /* read */
+ cur_rchunk->rc_discrim = xdr_one;
+ /* all read chunks have the same "position" */
+ cur_rchunk->rc_position = htonl(pos);
+ cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
+ cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
+ xdr_encode_hyper(
+ (u32 *)&cur_rchunk->rc_target.rs_offset,
+ seg->mr_base);
+ dprintk("RPC: %s: read chunk "
+ "elem %d@0x%llx:0x%x pos %d (%s)\n", __func__,
+ seg->mr_len, seg->mr_base, seg->mr_rkey, pos,
+ n < nsegs ? "more" : "last");
+ cur_rchunk++;
+ r_xprt->rx_stats.read_chunk_count++;
+ } else { /* write/reply */
+ cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
+ cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
+ xdr_encode_hyper(
+ (u32 *)&cur_wchunk->wc_target.rs_offset,
+ seg->mr_base);
+ dprintk("RPC: %s: %s chunk "
+ "elem %d@0x%llx:0x%x (%s)\n", __func__,
+ (type == rpcrdma_replych) ? "reply" : "write",
+ seg->mr_len, seg->mr_base, seg->mr_rkey,
+ n < nsegs ? "more" : "last");
+ cur_wchunk++;
+ if (type == rpcrdma_replych)
+ r_xprt->rx_stats.reply_chunk_count++;
+ else
+ r_xprt->rx_stats.write_chunk_count++;
+ r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+ }
+ nchunks++;
+ seg += n;
+ nsegs -= n;
+ } while (nsegs);
+
+ /* success. all failures return above */
+ req->rl_nchunks = nchunks;
+
+ BUG_ON(nchunks == 0);
+
+ /*
+ * finish off header. If write, marshal discrim and nchunks.
+ */
+ if (cur_rchunk) {
+ iptr = (u32 *) cur_rchunk;
+ *iptr++ = xdr_zero; /* finish the read chunk list */
+ *iptr++ = xdr_zero; /* encode a NULL write chunk list */
+ *iptr++ = xdr_zero; /* encode a NULL reply chunk */
+ } else {
+ warray->wc_discrim = xdr_one;
+ warray->wc_nchunks = htonl(nchunks);
+ iptr = (u32 *) cur_wchunk;
+ if (type == rpcrdma_writech) {
+ *iptr++ = xdr_zero; /* finish the write chunk list */
+ *iptr++ = xdr_zero; /* encode a NULL reply chunk */
+ }
+ }
+
+ /*
+ * Return header size.
+ */
+ return (unsigned char *)iptr - (unsigned char *)headerp;
+
+out:
+ for (pos = 0; nchunks--;)
+ pos += rpcrdma_deregister_external(
+ &req->rl_segments[pos], r_xprt, NULL);
+ return 0;
+}
+
+/*
+ * Copy write data inline.
+ * This function is used for "small" requests. Data which is passed
+ * to RPC via iovecs (or page list) is copied directly into the
+ * pre-registered memory buffer for this request. For small amounts
+ * of data, this is efficient. The cutoff value is tunable.
+ */
+static int
+rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
+{
+ int i, npages, curlen;
+ int copy_len;
+ unsigned char *srcp, *destp;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+
+ destp = rqst->rq_svec[0].iov_base;
+ curlen = rqst->rq_svec[0].iov_len;
+ destp += curlen;
+ /*
+ * Do optional padding where it makes sense. Alignment of write
+ * payload can help the server, if our setting is accurate.
+ */
+ pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/);
+ if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
+ pad = 0; /* don't pad this request */
+
+ dprintk("RPC: %s: pad %d destp 0x%p len %d hdrlen %d\n",
+ __func__, pad, destp, rqst->rq_slen, curlen);
+
+ copy_len = rqst->rq_snd_buf.page_len;
+ r_xprt->rx_stats.pullup_copy_count += copy_len;
+ npages = PAGE_ALIGN(rqst->rq_snd_buf.page_base+copy_len) >> PAGE_SHIFT;
+ for (i = 0; copy_len && i < npages; i++) {
+ if (i == 0)
+ curlen = PAGE_SIZE - rqst->rq_snd_buf.page_base;
+ else
+ curlen = PAGE_SIZE;
+ if (curlen > copy_len)
+ curlen = copy_len;
+ dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n",
+ __func__, i, destp, copy_len, curlen);
+ srcp = kmap_atomic(rqst->rq_snd_buf.pages[i],
+ KM_SKB_SUNRPC_DATA);
+ if (i == 0)
+ memcpy(destp, srcp+rqst->rq_snd_buf.page_base, curlen);
+ else
+ memcpy(destp, srcp, curlen);
+ kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA);
+ rqst->rq_svec[0].iov_len += curlen;
+ destp += curlen;
+ copy_len -= curlen;
+ }
+ if (rqst->rq_snd_buf.tail[0].iov_len) {
+ curlen = rqst->rq_snd_buf.tail[0].iov_len;
+ if (destp != rqst->rq_snd_buf.tail[0].iov_base) {
+ memcpy(destp,
+ rqst->rq_snd_buf.tail[0].iov_base, curlen);
+ r_xprt->rx_stats.pullup_copy_count += curlen;
+ }
+ dprintk("RPC: %s: tail destp 0x%p len %d curlen %d\n",
+ __func__, destp, copy_len, curlen);
+ rqst->rq_svec[0].iov_len += curlen;
+ }
+ /* header now contains entire send message */
+ return pad;
+}
+
+/*
+ * Marshal a request: the primary job of this routine is to choose
+ * the transfer modes. See comments below.
+ *
+ * Uses multiple RDMA IOVs for a request:
+ * [0] -- RPC RDMA header, which uses memory from the *start* of the
+ * preregistered buffer that already holds the RPC data in
+ * its middle.
+ * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
+ * [2] -- optional padding.
+ * [3] -- if padded, header only in [1] and data here.
+ */
+
+int
+rpcrdma_marshal_req(struct rpc_rqst *rqst)
+{
+ struct rpc_xprt *xprt = rqst->rq_task->tk_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+ char *base;
+ size_t hdrlen, rpclen, padlen;
+ enum rpcrdma_chunktype rtype, wtype;
+ struct rpcrdma_msg *headerp;
+
+ /*
+ * rpclen gets amount of data in first buffer, which is the
+ * pre-registered buffer.
+ */
+ base = rqst->rq_svec[0].iov_base;
+ rpclen = rqst->rq_svec[0].iov_len;
+
+ /* build RDMA header in private area at front */
+ headerp = (struct rpcrdma_msg *) req->rl_base;
+ /* don't htonl XID, it's already done in request */
+ headerp->rm_xid = rqst->rq_xid;
+ headerp->rm_vers = xdr_one;
+ headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
+ headerp->rm_type = __constant_htonl(RDMA_MSG);
+
+ /*
+ * Chunks needed for results?
+ *
+ * o If the expected result is under the inline threshold, all ops
+ * return as inline (but see later).
+ * o Large non-read ops return as a single reply chunk.
+ * o Large read ops return data as write chunk(s), header as inline.
+ *
+ * Note: the NFS code sending down multiple result segments implies
+ * the op is one of read, readdir[plus], readlink or NFSv4 getacl.
+ */
+
+ /*
+ * This code can handle read chunks, write chunks OR reply
+ * chunks -- only one type. If the request is too big to fit
+ * inline, then we will choose read chunks. If the request is
+ * a READ, then use write chunks to separate the file data
+ * into pages; otherwise use reply chunks.
+ */
+ if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
+ wtype = rpcrdma_noch;
+ else if (rqst->rq_rcv_buf.page_len == 0)
+ wtype = rpcrdma_replych;
+ else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
+ wtype = rpcrdma_writech;
+ else
+ wtype = rpcrdma_replych;
+
+ /*
+ * Chunks needed for arguments?
+ *
+ * o If the total request is under the inline threshold, all ops
+ * are sent as inline.
+ * o Large non-write ops are sent with the entire message as a
+ * single read chunk (protocol 0-position special case).
+ * o Large write ops transmit data as read chunk(s), header as
+ * inline.
+ *
+ * Note: the NFS code sending down multiple argument segments
+ * implies the op is a write.
+ * TBD check NFSv4 setacl
+ */
+ if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+ rtype = rpcrdma_noch;
+ else if (rqst->rq_snd_buf.page_len == 0)
+ rtype = rpcrdma_areadch;
+ else
+ rtype = rpcrdma_readch;
+
+ /* The following simplification is not true forever */
+ if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
+ wtype = rpcrdma_noch;
+ BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
+
+ if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS &&
+ (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) {
+ /* forced to "pure inline"? */
+ dprintk("RPC: %s: too much data (%d/%d) for inline\n",
+ __func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len);
+ return -1;
+ }
+
+ hdrlen = 28; /*sizeof *headerp;*/
+ padlen = 0;
+
+ /*
+ * Pull up any extra send data into the preregistered buffer.
+ * When padding is in use and applies to the transfer, insert
+ * it and change the message type.
+ */
+ if (rtype == rpcrdma_noch) {
+
+ padlen = rpcrdma_inline_pullup(rqst,
+ RPCRDMA_INLINE_PAD_VALUE(rqst));
+
+ if (padlen) {
+ headerp->rm_type = __constant_htonl(RDMA_MSGP);
+ headerp->rm_body.rm_padded.rm_align =
+ htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
+ headerp->rm_body.rm_padded.rm_thresh =
+ __constant_htonl(RPCRDMA_INLINE_PAD_THRESH);
+ headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
+ headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
+ headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
+ hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
+ BUG_ON(wtype != rpcrdma_noch);
+
+ } else {
+ headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
+ headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
+ headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
+ /* new length after pullup */
+ rpclen = rqst->rq_svec[0].iov_len;
+ /*
+ * Currently we try to not actually use read inline.
+ * Reply chunks have the desirable property that
+ * they land, packed, directly in the target buffers
+ * without headers, so they require no fixup. The
+ * additional RDMA Write op sends the same amount
+ * of data, streams on-the-wire and adds no overhead
+ * on receive. Therefore, we request a reply chunk
+ * for non-writes wherever feasible and efficient.
+ */
+ if (wtype == rpcrdma_noch &&
+ r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER)
+ wtype = rpcrdma_replych;
+ }
+ }
+
+ /*
+ * Marshal chunks. This routine will return the header length
+ * consumed by marshaling.
+ */
+ if (rtype != rpcrdma_noch) {
+ hdrlen = rpcrdma_create_chunks(rqst,
+ &rqst->rq_snd_buf, headerp, rtype);
+ wtype = rtype; /* simplify dprintk */
+
+ } else if (wtype != rpcrdma_noch) {
+ hdrlen = rpcrdma_create_chunks(rqst,
+ &rqst->rq_rcv_buf, headerp, wtype);
+ }
+
+ if (hdrlen == 0)
+ return -1;
+
+ dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd\n"
+ " headerp 0x%p base 0x%p lkey 0x%x\n",
+ __func__, transfertypes[wtype], hdrlen, rpclen, padlen,
+ headerp, base, req->rl_iov.lkey);
+
+ /*
+ * initialize send_iov's - normally only two: rdma chunk header and
+ * single preregistered RPC header buffer, but if padding is present,
+ * then use a preregistered (and zeroed) pad buffer between the RPC
+ * header and any write data. In all non-rdma cases, any following
+ * data has been copied into the RPC header buffer.
+ */
+ req->rl_send_iov[0].addr = req->rl_iov.addr;
+ req->rl_send_iov[0].length = hdrlen;
+ req->rl_send_iov[0].lkey = req->rl_iov.lkey;
+
+ req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
+ req->rl_send_iov[1].length = rpclen;
+ req->rl_send_iov[1].lkey = req->rl_iov.lkey;
+
+ req->rl_niovs = 2;
+
+ if (padlen) {
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+
+ req->rl_send_iov[2].addr = ep->rep_pad.addr;
+ req->rl_send_iov[2].length = padlen;
+ req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
+
+ req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
+ req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
+ req->rl_send_iov[3].lkey = req->rl_iov.lkey;
+
+ req->rl_niovs = 4;
+ }
+
+ return 0;
+}
+
+/*
+ * Chase down a received write or reply chunklist to get length
+ * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
+ */
+static int
+rpcrdma_count_chunks(struct rpcrdma_rep *rep, int max, int wrchunk, u32 **iptrp)
+{
+ unsigned int i, total_len;
+ struct rpcrdma_write_chunk *cur_wchunk;
+
+ i = ntohl(**iptrp); /* get array count */
+ if (i > max)
+ return -1;
+ cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
+ total_len = 0;
+ while (i--) {
+ struct rpcrdma_segment *seg = &cur_wchunk->wc_target;
+ ifdebug(FACILITY) {
+ u64 off;
+ xdr_decode_hyper((u32 *)&seg->rs_offset, &off);
+ dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n",
+ __func__,
+ ntohl(seg->rs_length),
+ off,
+ ntohl(seg->rs_handle));
+ }
+ total_len += ntohl(seg->rs_length);
+ ++cur_wchunk;
+ }
+ /* check and adjust for properly terminated write chunk */
+ if (wrchunk) {
+ u32 *w = (u32 *) cur_wchunk;
+ if (*w++ != xdr_zero)
+ return -1;
+ cur_wchunk = (struct rpcrdma_write_chunk *) w;
+ }
+ if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
+ return -1;
+
+ *iptrp = (u32 *) cur_wchunk;
+ return total_len;
+}
+
+/*
+ * Scatter inline received data back into provided iov's.
+ */
+static void
+rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len)
+{
+ int i, npages, curlen, olen;
+ char *destp;
+
+ curlen = rqst->rq_rcv_buf.head[0].iov_len;
+ if (curlen > copy_len) { /* write chunk header fixup */
+ curlen = copy_len;
+ rqst->rq_rcv_buf.head[0].iov_len = curlen;
+ }
+
+ dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n",
+ __func__, srcp, copy_len, curlen);
+
+ /* Shift pointer for first receive segment only */
+ rqst->rq_rcv_buf.head[0].iov_base = srcp;
+ srcp += curlen;
+ copy_len -= curlen;
+
+ olen = copy_len;
+ i = 0;
+ rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
+ if (copy_len && rqst->rq_rcv_buf.page_len) {
+ npages = PAGE_ALIGN(rqst->rq_rcv_buf.page_base +
+ rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
+ for (; i < npages; i++) {
+ if (i == 0)
+ curlen = PAGE_SIZE - rqst->rq_rcv_buf.page_base;
+ else
+ curlen = PAGE_SIZE;
+ if (curlen > copy_len)
+ curlen = copy_len;
+ dprintk("RPC: %s: page %d"
+ " srcp 0x%p len %d curlen %d\n",
+ __func__, i, srcp, copy_len, curlen);
+ destp = kmap_atomic(rqst->rq_rcv_buf.pages[i],
+ KM_SKB_SUNRPC_DATA);
+ if (i == 0)
+ memcpy(destp + rqst->rq_rcv_buf.page_base,
+ srcp, curlen);
+ else
+ memcpy(destp, srcp, curlen);
+ flush_dcache_page(rqst->rq_rcv_buf.pages[i]);
+ kunmap_atomic(destp, KM_SKB_SUNRPC_DATA);
+ srcp += curlen;
+ copy_len -= curlen;
+ if (copy_len == 0)
+ break;
+ }
+ rqst->rq_rcv_buf.page_len = olen - copy_len;
+ } else
+ rqst->rq_rcv_buf.page_len = 0;
+
+ if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
+ curlen = copy_len;
+ if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
+ curlen = rqst->rq_rcv_buf.tail[0].iov_len;
+ if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
+ memcpy(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
+ dprintk("RPC: %s: tail srcp 0x%p len %d curlen %d\n",
+ __func__, srcp, copy_len, curlen);
+ rqst->rq_rcv_buf.tail[0].iov_len = curlen;
+ copy_len -= curlen; ++i;
+ } else
+ rqst->rq_rcv_buf.tail[0].iov_len = 0;
+
+ if (copy_len)
+ dprintk("RPC: %s: %d bytes in"
+ " %d extra segments (%d lost)\n",
+ __func__, olen, i, copy_len);
+
+ /* TBD avoid a warning from call_decode() */
+ rqst->rq_private_buf = rqst->rq_rcv_buf;
+}
+
+/*
+ * This function is called when an async event is posted to
+ * the connection which changes the connection state. All it
+ * does at this point is mark the connection up/down, the rpc
+ * timers do the rest.
+ */
+void
+rpcrdma_conn_func(struct rpcrdma_ep *ep)
+{
+ struct rpc_xprt *xprt = ep->rep_xprt;
+
+ spin_lock_bh(&xprt->transport_lock);
+ if (ep->rep_connected > 0) {
+ if (!xprt_test_and_set_connected(xprt))
+ xprt_wake_pending_tasks(xprt, 0);
+ } else {
+ if (xprt_test_and_clear_connected(xprt))
+ xprt_wake_pending_tasks(xprt, ep->rep_connected);
+ }
+ spin_unlock_bh(&xprt->transport_lock);
+}
+
+/*
+ * This function is called when memory window unbind which we are waiting
+ * for completes. Just use rr_func (zeroed by upcall) to signal completion.
+ */
+static void
+rpcrdma_unbind_func(struct rpcrdma_rep *rep)
+{
+ wake_up(&rep->rr_unbind);
+}
+
+/*
+ * Called as a tasklet to do req/reply match and complete a request
+ * Errors must result in the RPC task either being awakened, or
+ * allowed to timeout, to discover the errors at that time.
+ */
+void
+rpcrdma_reply_handler(struct rpcrdma_rep *rep)
+{
+ struct rpcrdma_msg *headerp;
+ struct rpcrdma_req *req;
+ struct rpc_rqst *rqst;
+ struct rpc_xprt *xprt = rep->rr_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ u32 *iptr;
+ int i, rdmalen, status;
+
+ /* Check status. If bad, signal disconnect and return rep to pool */
+ if (rep->rr_len == ~0U) {
+ rpcrdma_recv_buffer_put(rep);
+ if (r_xprt->rx_ep.rep_connected == 1) {
+ r_xprt->rx_ep.rep_connected = -EIO;
+ rpcrdma_conn_func(&r_xprt->rx_ep);
+ }
+ return;
+ }
+ if (rep->rr_len < 28) {
+ dprintk("RPC: %s: short/invalid reply\n", __func__);
+ goto repost;
+ }
+ headerp = (struct rpcrdma_msg *) rep->rr_base;
+ if (headerp->rm_vers != xdr_one) {
+ dprintk("RPC: %s: invalid version %d\n",
+ __func__, ntohl(headerp->rm_vers));
+ goto repost;
+ }
+
+ /* Get XID and try for a match. */
+ spin_lock(&xprt->transport_lock);
+ rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+ if (rqst == NULL) {
+ spin_unlock(&xprt->transport_lock);
+ dprintk("RPC: %s: reply 0x%p failed "
+ "to match any request xid 0x%08x len %d\n",
+ __func__, rep, headerp->rm_xid, rep->rr_len);
+repost:
+ r_xprt->rx_stats.bad_reply_count++;
+ rep->rr_func = rpcrdma_reply_handler;
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ rpcrdma_recv_buffer_put(rep);
+
+ return;
+ }
+
+ /* get request object */
+ req = rpcr_to_rdmar(rqst);
+
+ dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
+ " RPC request 0x%p xid 0x%08x\n",
+ __func__, rep, req, rqst, headerp->rm_xid);
+
+ BUG_ON(!req || req->rl_reply);
+
+ /* from here on, the reply is no longer an orphan */
+ req->rl_reply = rep;
+
+ /* check for expected message types */
+ /* The order of some of these tests is important. */
+ switch (headerp->rm_type) {
+ case __constant_htonl(RDMA_MSG):
+ /* never expect read chunks */
+ /* never expect reply chunks (two ways to check) */
+ /* never expect write chunks without having offered RDMA */
+ if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
+ (headerp->rm_body.rm_chunks[1] == xdr_zero &&
+ headerp->rm_body.rm_chunks[2] != xdr_zero) ||
+ (headerp->rm_body.rm_chunks[1] != xdr_zero &&
+ req->rl_nchunks == 0))
+ goto badheader;
+ if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
+ /* count any expected write chunks in read reply */
+ /* start at write chunk array count */
+ iptr = &headerp->rm_body.rm_chunks[2];
+ rdmalen = rpcrdma_count_chunks(rep,
+ req->rl_nchunks, 1, &iptr);
+ /* check for validity, and no reply chunk after */
+ if (rdmalen < 0 || *iptr++ != xdr_zero)
+ goto badheader;
+ rep->rr_len -=
+ ((unsigned char *)iptr - (unsigned char *)headerp);
+ status = rep->rr_len + rdmalen;
+ r_xprt->rx_stats.total_rdma_reply += rdmalen;
+ } else {
+ /* else ordinary inline */
+ iptr = (u32 *)((unsigned char *)headerp + 28);
+ rep->rr_len -= 28; /*sizeof *headerp;*/
+ status = rep->rr_len;
+ }
+ /* Fix up the rpc results for upper layer */
+ rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len);
+ break;
+
+ case __constant_htonl(RDMA_NOMSG):
+ /* never expect read or write chunks, always reply chunks */
+ if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
+ headerp->rm_body.rm_chunks[1] != xdr_zero ||
+ headerp->rm_body.rm_chunks[2] != xdr_one ||
+ req->rl_nchunks == 0)
+ goto badheader;
+ iptr = (u32 *)((unsigned char *)headerp + 28);
+ rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
+ if (rdmalen < 0)
+ goto badheader;
+ r_xprt->rx_stats.total_rdma_reply += rdmalen;
+ /* Reply chunk buffer already is the reply vector - no fixup. */
+ status = rdmalen;
+ break;
+
+badheader:
+ default:
+ dprintk("%s: invalid rpcrdma reply header (type %d):"
+ " chunks[012] == %d %d %d"
+ " expected chunks <= %d\n",
+ __func__, ntohl(headerp->rm_type),
+ headerp->rm_body.rm_chunks[0],
+ headerp->rm_body.rm_chunks[1],
+ headerp->rm_body.rm_chunks[2],
+ req->rl_nchunks);
+ status = -EIO;
+ r_xprt->rx_stats.bad_reply_count++;
+ break;
+ }
+
+ /* If using mw bind, start the deregister process now. */
+ /* (Note: if mr_free(), cannot perform it here, in tasklet context) */
+ if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) {
+ case RPCRDMA_MEMWINDOWS:
+ for (i = 0; req->rl_nchunks-- > 1;)
+ i += rpcrdma_deregister_external(
+ &req->rl_segments[i], r_xprt, NULL);
+ /* Optionally wait (not here) for unbinds to complete */
+ rep->rr_func = rpcrdma_unbind_func;
+ (void) rpcrdma_deregister_external(&req->rl_segments[i],
+ r_xprt, rep);
+ break;
+ case RPCRDMA_MEMWINDOWS_ASYNC:
+ for (i = 0; req->rl_nchunks--;)
+ i += rpcrdma_deregister_external(&req->rl_segments[i],
+ r_xprt, NULL);
+ break;
+ default:
+ break;
+ }
+
+ dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
+ __func__, xprt, rqst, status);
+ xprt_complete_rqst(rqst->rq_task, status);
+ spin_unlock(&xprt->transport_lock);
+}
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
new file mode 100644
index 000000000000..dc55cc974c90
--- /dev/null
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -0,0 +1,800 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * transport.c
+ *
+ * This file contains the top-level implementation of an RPC RDMA
+ * transport.
+ *
+ * Naming convention: functions beginning with xprt_ are part of the
+ * transport switch. All others are RPC RDMA internal.
+ */
+
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/seq_file.h>
+
+#include "xprt_rdma.h"
+
+#ifdef RPC_DEBUG
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+MODULE_LICENSE("Dual BSD/GPL");
+
+MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
+MODULE_AUTHOR("Network Appliance, Inc.");
+
+/*
+ * tunables
+ */
+
+static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
+static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
+static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
+static unsigned int xprt_rdma_inline_write_padding;
+#if !RPCRDMA_PERSISTENT_REGISTRATION
+static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_REGISTER; /* FMR? */
+#else
+static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_ALLPHYSICAL;
+#endif
+
+#ifdef RPC_DEBUG
+
+static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
+static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
+static unsigned int zero;
+static unsigned int max_padding = PAGE_SIZE;
+static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
+static unsigned int max_memreg = RPCRDMA_LAST - 1;
+
+static struct ctl_table_header *sunrpc_table_header;
+
+static ctl_table xr_tunables_table[] = {
+ {
+ .ctl_name = CTL_SLOTTABLE_RDMA,
+ .procname = "rdma_slot_table_entries",
+ .data = &xprt_rdma_slot_table_entries,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &min_slot_table_size,
+ .extra2 = &max_slot_table_size
+ },
+ {
+ .ctl_name = CTL_RDMA_MAXINLINEREAD,
+ .procname = "rdma_max_inline_read",
+ .data = &xprt_rdma_max_inline_read,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec,
+ .strategy = &sysctl_intvec,
+ },
+ {
+ .ctl_name = CTL_RDMA_MAXINLINEWRITE,
+ .procname = "rdma_max_inline_write",
+ .data = &xprt_rdma_max_inline_write,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec,
+ .strategy = &sysctl_intvec,
+ },
+ {
+ .ctl_name = CTL_RDMA_WRITEPADDING,
+ .procname = "rdma_inline_write_padding",
+ .data = &xprt_rdma_inline_write_padding,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &zero,
+ .extra2 = &max_padding,
+ },
+ {
+ .ctl_name = CTL_RDMA_MEMREG,
+ .procname = "rdma_memreg_strategy",
+ .data = &xprt_rdma_memreg_strategy,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &min_memreg,
+ .extra2 = &max_memreg,
+ },
+ {
+ .ctl_name = 0,
+ },
+};
+
+static ctl_table sunrpc_table[] = {
+ {
+ .ctl_name = CTL_SUNRPC,
+ .procname = "sunrpc",
+ .mode = 0555,
+ .child = xr_tunables_table
+ },
+ {
+ .ctl_name = 0,
+ },
+};
+
+#endif
+
+static struct rpc_xprt_ops xprt_rdma_procs; /* forward reference */
+
+static void
+xprt_rdma_format_addresses(struct rpc_xprt *xprt)
+{
+ struct sockaddr_in *addr = (struct sockaddr_in *)
+ &rpcx_to_rdmad(xprt).addr;
+ char *buf;
+
+ buf = kzalloc(20, GFP_KERNEL);
+ if (buf)
+ snprintf(buf, 20, NIPQUAD_FMT, NIPQUAD(addr->sin_addr.s_addr));
+ xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
+
+ buf = kzalloc(8, GFP_KERNEL);
+ if (buf)
+ snprintf(buf, 8, "%u", ntohs(addr->sin_port));
+ xprt->address_strings[RPC_DISPLAY_PORT] = buf;
+
+ xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
+
+ buf = kzalloc(48, GFP_KERNEL);
+ if (buf)
+ snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
+ NIPQUAD(addr->sin_addr.s_addr),
+ ntohs(addr->sin_port), "rdma");
+ xprt->address_strings[RPC_DISPLAY_ALL] = buf;
+
+ buf = kzalloc(10, GFP_KERNEL);
+ if (buf)
+ snprintf(buf, 10, "%02x%02x%02x%02x",
+ NIPQUAD(addr->sin_addr.s_addr));
+ xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
+
+ buf = kzalloc(8, GFP_KERNEL);
+ if (buf)
+ snprintf(buf, 8, "%4hx", ntohs(addr->sin_port));
+ xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
+
+ buf = kzalloc(30, GFP_KERNEL);
+ if (buf)
+ snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
+ NIPQUAD(addr->sin_addr.s_addr),
+ ntohs(addr->sin_port) >> 8,
+ ntohs(addr->sin_port) & 0xff);
+ xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
+
+ /* netid */
+ xprt->address_strings[RPC_DISPLAY_NETID] = "rdma";
+}
+
+static void
+xprt_rdma_free_addresses(struct rpc_xprt *xprt)
+{
+ kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
+ kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
+ kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
+ kfree(xprt->address_strings[RPC_DISPLAY_HEX_ADDR]);
+ kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
+ kfree(xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR]);
+}
+
+static void
+xprt_rdma_connect_worker(struct work_struct *work)
+{
+ struct rpcrdma_xprt *r_xprt =
+ container_of(work, struct rpcrdma_xprt, rdma_connect.work);
+ struct rpc_xprt *xprt = &r_xprt->xprt;
+ int rc = 0;
+
+ if (!xprt->shutdown) {
+ xprt_clear_connected(xprt);
+
+ dprintk("RPC: %s: %sconnect\n", __func__,
+ r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
+ rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
+ if (rc)
+ goto out;
+ }
+ goto out_clear;
+
+out:
+ xprt_wake_pending_tasks(xprt, rc);
+
+out_clear:
+ dprintk("RPC: %s: exit\n", __func__);
+ xprt_clear_connecting(xprt);
+}
+
+/*
+ * xprt_rdma_destroy
+ *
+ * Destroy the xprt.
+ * Free all memory associated with the object, including its own.
+ * NOTE: none of the *destroy methods free memory for their top-level
+ * objects, even though they may have allocated it (they do free
+ * private memory). It's up to the caller to handle it. In this
+ * case (RDMA transport), all structure memory is inlined with the
+ * struct rpcrdma_xprt.
+ */
+static void
+xprt_rdma_destroy(struct rpc_xprt *xprt)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ int rc;
+
+ dprintk("RPC: %s: called\n", __func__);
+
+ cancel_delayed_work(&r_xprt->rdma_connect);
+ flush_scheduled_work();
+
+ xprt_clear_connected(xprt);
+
+ rpcrdma_buffer_destroy(&r_xprt->rx_buf);
+ rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
+ if (rc)
+ dprintk("RPC: %s: rpcrdma_ep_destroy returned %i\n",
+ __func__, rc);
+ rpcrdma_ia_close(&r_xprt->rx_ia);
+
+ xprt_rdma_free_addresses(xprt);
+
+ kfree(xprt->slot);
+ xprt->slot = NULL;
+ kfree(xprt);
+
+ dprintk("RPC: %s: returning\n", __func__);
+
+ module_put(THIS_MODULE);
+}
+
+/**
+ * xprt_setup_rdma - Set up transport to use RDMA
+ *
+ * @args: rpc transport arguments
+ */
+static struct rpc_xprt *
+xprt_setup_rdma(struct xprt_create *args)
+{
+ struct rpcrdma_create_data_internal cdata;
+ struct rpc_xprt *xprt;
+ struct rpcrdma_xprt *new_xprt;
+ struct rpcrdma_ep *new_ep;
+ struct sockaddr_in *sin;
+ int rc;
+
+ if (args->addrlen > sizeof(xprt->addr)) {
+ dprintk("RPC: %s: address too large\n", __func__);
+ return ERR_PTR(-EBADF);
+ }
+
+ xprt = kzalloc(sizeof(struct rpcrdma_xprt), GFP_KERNEL);
+ if (xprt == NULL) {
+ dprintk("RPC: %s: couldn't allocate rpcrdma_xprt\n",
+ __func__);
+ return ERR_PTR(-ENOMEM);
+ }
+
+ xprt->max_reqs = xprt_rdma_slot_table_entries;
+ xprt->slot = kcalloc(xprt->max_reqs,
+ sizeof(struct rpc_rqst), GFP_KERNEL);
+ if (xprt->slot == NULL) {
+ kfree(xprt);
+ dprintk("RPC: %s: couldn't allocate %d slots\n",
+ __func__, xprt->max_reqs);
+ return ERR_PTR(-ENOMEM);
+ }
+
+ /* 60 second timeout, no retries */
+ xprt_set_timeout(&xprt->timeout, 0, 60UL * HZ);
+ xprt->bind_timeout = (60U * HZ);
+ xprt->connect_timeout = (60U * HZ);
+ xprt->reestablish_timeout = (5U * HZ);
+ xprt->idle_timeout = (5U * 60 * HZ);
+
+ xprt->resvport = 0; /* privileged port not needed */
+ xprt->tsh_size = 0; /* RPC-RDMA handles framing */
+ xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE;
+ xprt->ops = &xprt_rdma_procs;
+
+ /*
+ * Set up RDMA-specific connect data.
+ */
+
+ /* Put server RDMA address in local cdata */
+ memcpy(&cdata.addr, args->dstaddr, args->addrlen);
+
+ /* Ensure xprt->addr holds valid server TCP (not RDMA)
+ * address, for any side protocols which peek at it */
+ xprt->prot = IPPROTO_TCP;
+ xprt->addrlen = args->addrlen;
+ memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
+
+ sin = (struct sockaddr_in *)&cdata.addr;
+ if (ntohs(sin->sin_port) != 0)
+ xprt_set_bound(xprt);
+
+ dprintk("RPC: %s: %u.%u.%u.%u:%u\n", __func__,
+ NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
+
+ /* Set max requests */
+ cdata.max_requests = xprt->max_reqs;
+
+ /* Set some length limits */
+ cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
+ cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
+
+ cdata.inline_wsize = xprt_rdma_max_inline_write;
+ if (cdata.inline_wsize > cdata.wsize)
+ cdata.inline_wsize = cdata.wsize;
+
+ cdata.inline_rsize = xprt_rdma_max_inline_read;
+ if (cdata.inline_rsize > cdata.rsize)
+ cdata.inline_rsize = cdata.rsize;
+
+ cdata.padding = xprt_rdma_inline_write_padding;
+
+ /*
+ * Create new transport instance, which includes initialized
+ * o ia
+ * o endpoint
+ * o buffers
+ */
+
+ new_xprt = rpcx_to_rdmax(xprt);
+
+ rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
+ xprt_rdma_memreg_strategy);
+ if (rc)
+ goto out1;
+
+ /*
+ * initialize and create ep
+ */
+ new_xprt->rx_data = cdata;
+ new_ep = &new_xprt->rx_ep;
+ new_ep->rep_remote_addr = cdata.addr;
+
+ rc = rpcrdma_ep_create(&new_xprt->rx_ep,
+ &new_xprt->rx_ia, &new_xprt->rx_data);
+ if (rc)
+ goto out2;
+
+ /*
+ * Allocate pre-registered send and receive buffers for headers and
+ * any inline data. Also specify any padding which will be provided
+ * from a preregistered zero buffer.
+ */
+ rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
+ &new_xprt->rx_data);
+ if (rc)
+ goto out3;
+
+ /*
+ * Register a callback for connection events. This is necessary because
+ * connection loss notification is async. We also catch connection loss
+ * when reaping receives.
+ */
+ INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
+ new_ep->rep_func = rpcrdma_conn_func;
+ new_ep->rep_xprt = xprt;
+
+ xprt_rdma_format_addresses(xprt);
+
+ if (!try_module_get(THIS_MODULE))
+ goto out4;
+
+ return xprt;
+
+out4:
+ xprt_rdma_free_addresses(xprt);
+ rc = -EINVAL;
+out3:
+ (void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
+out2:
+ rpcrdma_ia_close(&new_xprt->rx_ia);
+out1:
+ kfree(xprt->slot);
+ kfree(xprt);
+ return ERR_PTR(rc);
+}
+
+/*
+ * Close a connection, during shutdown or timeout/reconnect
+ */
+static void
+xprt_rdma_close(struct rpc_xprt *xprt)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+ dprintk("RPC: %s: closing\n", __func__);
+ xprt_disconnect(xprt);
+ (void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
+}
+
+static void
+xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
+{
+ struct sockaddr_in *sap;
+
+ sap = (struct sockaddr_in *)&xprt->addr;
+ sap->sin_port = htons(port);
+ sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
+ sap->sin_port = htons(port);
+ dprintk("RPC: %s: %u\n", __func__, port);
+}
+
+static void
+xprt_rdma_connect(struct rpc_task *task)
+{
+ struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+ if (!xprt_test_and_set_connecting(xprt)) {
+ if (r_xprt->rx_ep.rep_connected != 0) {
+ /* Reconnect */
+ schedule_delayed_work(&r_xprt->rdma_connect,
+ xprt->reestablish_timeout);
+ } else {
+ schedule_delayed_work(&r_xprt->rdma_connect, 0);
+ if (!RPC_IS_ASYNC(task))
+ flush_scheduled_work();
+ }
+ }
+}
+
+static int
+xprt_rdma_reserve_xprt(struct rpc_task *task)
+{
+ struct rpc_xprt *xprt = task->tk_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
+
+ /* == RPC_CWNDSCALE @ init, but *after* setup */
+ if (r_xprt->rx_buf.rb_cwndscale == 0UL) {
+ r_xprt->rx_buf.rb_cwndscale = xprt->cwnd;
+ dprintk("RPC: %s: cwndscale %lu\n", __func__,
+ r_xprt->rx_buf.rb_cwndscale);
+ BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
+ }
+ xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
+ return xprt_reserve_xprt_cong(task);
+}
+
+/*
+ * The RDMA allocate/free functions need the task structure as a place
+ * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
+ * sequence. For this reason, the recv buffers are attached to send
+ * buffers for portions of the RPC. Note that the RPC layer allocates
+ * both send and receive buffers in the same call. We may register
+ * the receive buffer portion when using reply chunks.
+ */
+static void *
+xprt_rdma_allocate(struct rpc_task *task, size_t size)
+{
+ struct rpc_xprt *xprt = task->tk_xprt;
+ struct rpcrdma_req *req, *nreq;
+
+ req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
+ BUG_ON(NULL == req);
+
+ if (size > req->rl_size) {
+ dprintk("RPC: %s: size %zd too large for buffer[%zd]: "
+ "prog %d vers %d proc %d\n",
+ __func__, size, req->rl_size,
+ task->tk_client->cl_prog, task->tk_client->cl_vers,
+ task->tk_msg.rpc_proc->p_proc);
+ /*
+ * Outgoing length shortage. Our inline write max must have
+ * been configured to perform direct i/o.
+ *
+ * This is therefore a large metadata operation, and the
+ * allocate call was made on the maximum possible message,
+ * e.g. containing long filename(s) or symlink data. In
+ * fact, while these metadata operations *might* carry
+ * large outgoing payloads, they rarely *do*. However, we
+ * have to commit to the request here, so reallocate and
+ * register it now. The data path will never require this
+ * reallocation.
+ *
+ * If the allocation or registration fails, the RPC framework
+ * will (doggedly) retry.
+ */
+ if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy ==
+ RPCRDMA_BOUNCEBUFFERS) {
+ /* forced to "pure inline" */
+ dprintk("RPC: %s: too much data (%zd) for inline "
+ "(r/w max %d/%d)\n", __func__, size,
+ rpcx_to_rdmad(xprt).inline_rsize,
+ rpcx_to_rdmad(xprt).inline_wsize);
+ size = req->rl_size;
+ rpc_exit(task, -EIO); /* fail the operation */
+ rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
+ goto out;
+ }
+ if (task->tk_flags & RPC_TASK_SWAPPER)
+ nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
+ else
+ nreq = kmalloc(sizeof *req + size, GFP_NOFS);
+ if (nreq == NULL)
+ goto outfail;
+
+ if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
+ nreq->rl_base, size + sizeof(struct rpcrdma_req)
+ - offsetof(struct rpcrdma_req, rl_base),
+ &nreq->rl_handle, &nreq->rl_iov)) {
+ kfree(nreq);
+ goto outfail;
+ }
+ rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
+ nreq->rl_size = size;
+ nreq->rl_niovs = 0;
+ nreq->rl_nchunks = 0;
+ nreq->rl_buffer = (struct rpcrdma_buffer *)req;
+ nreq->rl_reply = req->rl_reply;
+ memcpy(nreq->rl_segments,
+ req->rl_segments, sizeof nreq->rl_segments);
+ /* flag the swap with an unused field */
+ nreq->rl_iov.length = 0;
+ req->rl_reply = NULL;
+ req = nreq;
+ }
+ dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
+out:
+ return req->rl_xdr_buf;
+
+outfail:
+ rpcrdma_buffer_put(req);
+ rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
+ return NULL;
+}
+
+/*
+ * This function returns all RDMA resources to the pool.
+ */
+static void
+xprt_rdma_free(void *buffer)
+{
+ struct rpcrdma_req *req;
+ struct rpcrdma_xprt *r_xprt;
+ struct rpcrdma_rep *rep;
+ int i;
+
+ if (buffer == NULL)
+ return;
+
+ req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
+ r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
+ rep = req->rl_reply;
+
+ dprintk("RPC: %s: called on 0x%p%s\n",
+ __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
+
+ /*
+ * Finish the deregistration. When using mw bind, this was
+ * begun in rpcrdma_reply_handler(). In all other modes, we
+ * do it here, in thread context. The process is considered
+ * complete when the rr_func vector becomes NULL - this
+ * was put in place during rpcrdma_reply_handler() - the wait
+ * call below will not block if the dereg is "done". If
+ * interrupted, our framework will clean up.
+ */
+ for (i = 0; req->rl_nchunks;) {
+ --req->rl_nchunks;
+ i += rpcrdma_deregister_external(
+ &req->rl_segments[i], r_xprt, NULL);
+ }
+
+ if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
+ rep->rr_func = NULL; /* abandon the callback */
+ req->rl_reply = NULL;
+ }
+
+ if (req->rl_iov.length == 0) { /* see allocate above */
+ struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
+ oreq->rl_reply = req->rl_reply;
+ (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
+ req->rl_handle,
+ &req->rl_iov);
+ kfree(req);
+ req = oreq;
+ }
+
+ /* Put back request+reply buffers */
+ rpcrdma_buffer_put(req);
+}
+
+/*
+ * send_request invokes the meat of RPC RDMA. It must do the following:
+ * 1. Marshal the RPC request into an RPC RDMA request, which means
+ * putting a header in front of data, and creating IOVs for RDMA
+ * from those in the request.
+ * 2. In marshaling, detect opportunities for RDMA, and use them.
+ * 3. Post a recv message to set up asynch completion, then send
+ * the request (rpcrdma_ep_post).
+ * 4. No partial sends are possible in the RPC-RDMA protocol (as in UDP).
+ */
+
+static int
+xprt_rdma_send_request(struct rpc_task *task)
+{
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ struct rpc_xprt *xprt = task->tk_xprt;
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+ /* marshal the send itself */
+ if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
+ r_xprt->rx_stats.failed_marshal_count++;
+ dprintk("RPC: %s: rpcrdma_marshal_req failed\n",
+ __func__);
+ return -EIO;
+ }
+
+ if (req->rl_reply == NULL) /* e.g. reconnection */
+ rpcrdma_recv_buffer_get(req);
+
+ if (req->rl_reply) {
+ req->rl_reply->rr_func = rpcrdma_reply_handler;
+ /* this need only be done once, but... */
+ req->rl_reply->rr_xprt = xprt;
+ }
+
+ if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) {
+ xprt_disconnect(xprt);
+ return -ENOTCONN; /* implies disconnect */
+ }
+
+ rqst->rq_bytes_sent = 0;
+ return 0;
+}
+
+static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ long idle_time = 0;
+
+ if (xprt_connected(xprt))
+ idle_time = (long)(jiffies - xprt->last_used) / HZ;
+
+ seq_printf(seq,
+ "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
+ "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
+
+ 0, /* need a local port? */
+ xprt->stat.bind_count,
+ xprt->stat.connect_count,
+ xprt->stat.connect_time,
+ idle_time,
+ xprt->stat.sends,
+ xprt->stat.recvs,
+ xprt->stat.bad_xids,
+ xprt->stat.req_u,
+ xprt->stat.bklog_u,
+
+ r_xprt->rx_stats.read_chunk_count,
+ r_xprt->rx_stats.write_chunk_count,
+ r_xprt->rx_stats.reply_chunk_count,
+ r_xprt->rx_stats.total_rdma_request,
+ r_xprt->rx_stats.total_rdma_reply,
+ r_xprt->rx_stats.pullup_copy_count,
+ r_xprt->rx_stats.fixup_copy_count,
+ r_xprt->rx_stats.hardway_register_count,
+ r_xprt->rx_stats.failed_marshal_count,
+ r_xprt->rx_stats.bad_reply_count);
+}
+
+/*
+ * Plumbing for rpc transport switch and kernel module
+ */
+
+static struct rpc_xprt_ops xprt_rdma_procs = {
+ .reserve_xprt = xprt_rdma_reserve_xprt,
+ .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */
+ .release_request = xprt_release_rqst_cong, /* ditto */
+ .set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */
+ .rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */
+ .set_port = xprt_rdma_set_port,
+ .connect = xprt_rdma_connect,
+ .buf_alloc = xprt_rdma_allocate,
+ .buf_free = xprt_rdma_free,
+ .send_request = xprt_rdma_send_request,
+ .close = xprt_rdma_close,
+ .destroy = xprt_rdma_destroy,
+ .print_stats = xprt_rdma_print_stats
+};
+
+static struct xprt_class xprt_rdma = {
+ .list = LIST_HEAD_INIT(xprt_rdma.list),
+ .name = "rdma",
+ .owner = THIS_MODULE,
+ .ident = XPRT_TRANSPORT_RDMA,
+ .setup = xprt_setup_rdma,
+};
+
+static void __exit xprt_rdma_cleanup(void)
+{
+ int rc;
+
+ dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
+#ifdef RPC_DEBUG
+ if (sunrpc_table_header) {
+ unregister_sysctl_table(sunrpc_table_header);
+ sunrpc_table_header = NULL;
+ }
+#endif
+ rc = xprt_unregister_transport(&xprt_rdma);
+ if (rc)
+ dprintk("RPC: %s: xprt_unregister returned %i\n",
+ __func__, rc);
+}
+
+static int __init xprt_rdma_init(void)
+{
+ int rc;
+
+ rc = xprt_register_transport(&xprt_rdma);
+
+ if (rc)
+ return rc;
+
+ dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n");
+
+ dprintk(KERN_INFO "Defaults:\n");
+ dprintk(KERN_INFO "\tSlots %d\n"
+ "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
+ xprt_rdma_slot_table_entries,
+ xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
+ dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n",
+ xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
+
+#ifdef RPC_DEBUG
+ if (!sunrpc_table_header)
+ sunrpc_table_header = register_sysctl_table(sunrpc_table);
+#endif
+ return 0;
+}
+
+module_init(xprt_rdma_init);
+module_exit(xprt_rdma_cleanup);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
new file mode 100644
index 000000000000..9ec8ca4f6028
--- /dev/null
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -0,0 +1,1626 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * verbs.c
+ *
+ * Encapsulates the major functions managing:
+ * o adapters
+ * o endpoints
+ * o connections
+ * o buffer memory
+ */
+
+#include <linux/pci.h> /* for Tavor hack below */
+
+#include "xprt_rdma.h"
+
+/*
+ * Globals/Macros
+ */
+
+#ifdef RPC_DEBUG
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+/*
+ * internal functions
+ */
+
+/*
+ * handle replies in tasklet context, using a single, global list
+ * rdma tasklet function -- just turn around and call the func
+ * for all replies on the list
+ */
+
+static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
+static LIST_HEAD(rpcrdma_tasklets_g);
+
+static void
+rpcrdma_run_tasklet(unsigned long data)
+{
+ struct rpcrdma_rep *rep;
+ void (*func)(struct rpcrdma_rep *);
+ unsigned long flags;
+
+ data = data;
+ spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
+ while (!list_empty(&rpcrdma_tasklets_g)) {
+ rep = list_entry(rpcrdma_tasklets_g.next,
+ struct rpcrdma_rep, rr_list);
+ list_del(&rep->rr_list);
+ func = rep->rr_func;
+ rep->rr_func = NULL;
+ spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+
+ if (func)
+ func(rep);
+ else
+ rpcrdma_recv_buffer_put(rep);
+
+ spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
+ }
+ spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+}
+
+static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
+
+static inline void
+rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
+ list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
+ spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+ tasklet_schedule(&rpcrdma_tasklet_g);
+}
+
+static void
+rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
+{
+ struct rpcrdma_ep *ep = context;
+
+ dprintk("RPC: %s: QP error %X on device %s ep %p\n",
+ __func__, event->event, event->device->name, context);
+ if (ep->rep_connected == 1) {
+ ep->rep_connected = -EIO;
+ ep->rep_func(ep);
+ wake_up_all(&ep->rep_connect_wait);
+ }
+}
+
+static void
+rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
+{
+ struct rpcrdma_ep *ep = context;
+
+ dprintk("RPC: %s: CQ error %X on device %s ep %p\n",
+ __func__, event->event, event->device->name, context);
+ if (ep->rep_connected == 1) {
+ ep->rep_connected = -EIO;
+ ep->rep_func(ep);
+ wake_up_all(&ep->rep_connect_wait);
+ }
+}
+
+static inline
+void rpcrdma_event_process(struct ib_wc *wc)
+{
+ struct rpcrdma_rep *rep =
+ (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
+
+ dprintk("RPC: %s: event rep %p status %X opcode %X length %u\n",
+ __func__, rep, wc->status, wc->opcode, wc->byte_len);
+
+ if (!rep) /* send or bind completion that we don't care about */
+ return;
+
+ if (IB_WC_SUCCESS != wc->status) {
+ dprintk("RPC: %s: %s WC status %X, connection lost\n",
+ __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
+ wc->status);
+ rep->rr_len = ~0U;
+ rpcrdma_schedule_tasklet(rep);
+ return;
+ }
+
+ switch (wc->opcode) {
+ case IB_WC_RECV:
+ rep->rr_len = wc->byte_len;
+ ib_dma_sync_single_for_cpu(
+ rdmab_to_ia(rep->rr_buffer)->ri_id->device,
+ rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
+ /* Keep (only) the most recent credits, after check validity */
+ if (rep->rr_len >= 16) {
+ struct rpcrdma_msg *p =
+ (struct rpcrdma_msg *) rep->rr_base;
+ unsigned int credits = ntohl(p->rm_credit);
+ if (credits == 0) {
+ dprintk("RPC: %s: server"
+ " dropped credits to 0!\n", __func__);
+ /* don't deadlock */
+ credits = 1;
+ } else if (credits > rep->rr_buffer->rb_max_requests) {
+ dprintk("RPC: %s: server"
+ " over-crediting: %d (%d)\n",
+ __func__, credits,
+ rep->rr_buffer->rb_max_requests);
+ credits = rep->rr_buffer->rb_max_requests;
+ }
+ atomic_set(&rep->rr_buffer->rb_credits, credits);
+ }
+ /* fall through */
+ case IB_WC_BIND_MW:
+ rpcrdma_schedule_tasklet(rep);
+ break;
+ default:
+ dprintk("RPC: %s: unexpected WC event %X\n",
+ __func__, wc->opcode);
+ break;
+ }
+}
+
+static inline int
+rpcrdma_cq_poll(struct ib_cq *cq)
+{
+ struct ib_wc wc;
+ int rc;
+
+ for (;;) {
+ rc = ib_poll_cq(cq, 1, &wc);
+ if (rc < 0) {
+ dprintk("RPC: %s: ib_poll_cq failed %i\n",
+ __func__, rc);
+ return rc;
+ }
+ if (rc == 0)
+ break;
+
+ rpcrdma_event_process(&wc);
+ }
+
+ return 0;
+}
+
+/*
+ * rpcrdma_cq_event_upcall
+ *
+ * This upcall handles recv, send, bind and unbind events.
+ * It is reentrant but processes single events in order to maintain
+ * ordering of receives to keep server credits.
+ *
+ * It is the responsibility of the scheduled tasklet to return
+ * recv buffers to the pool. NOTE: this affects synchronization of
+ * connection shutdown. That is, the structures required for
+ * the completion of the reply handler must remain intact until
+ * all memory has been reclaimed.
+ *
+ * Note that send events are suppressed and do not result in an upcall.
+ */
+static void
+rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
+{
+ int rc;
+
+ rc = rpcrdma_cq_poll(cq);
+ if (rc)
+ return;
+
+ rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
+ if (rc) {
+ dprintk("RPC: %s: ib_req_notify_cq failed %i\n",
+ __func__, rc);
+ return;
+ }
+
+ rpcrdma_cq_poll(cq);
+}
+
+#ifdef RPC_DEBUG
+static const char * const conn[] = {
+ "address resolved",
+ "address error",
+ "route resolved",
+ "route error",
+ "connect request",
+ "connect response",
+ "connect error",
+ "unreachable",
+ "rejected",
+ "established",
+ "disconnected",
+ "device removal"
+};
+#endif
+
+static int
+rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
+{
+ struct rpcrdma_xprt *xprt = id->context;
+ struct rpcrdma_ia *ia = &xprt->rx_ia;
+ struct rpcrdma_ep *ep = &xprt->rx_ep;
+ struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
+ struct ib_qp_attr attr;
+ struct ib_qp_init_attr iattr;
+ int connstate = 0;
+
+ switch (event->event) {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ complete(&ia->ri_done);
+ break;
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ ia->ri_async_rc = -EHOSTUNREACH;
+ dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
+ __func__, ep);
+ complete(&ia->ri_done);
+ break;
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ ia->ri_async_rc = -ENETUNREACH;
+ dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
+ __func__, ep);
+ complete(&ia->ri_done);
+ break;
+ case RDMA_CM_EVENT_ESTABLISHED:
+ connstate = 1;
+ ib_query_qp(ia->ri_id->qp, &attr,
+ IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
+ &iattr);
+ dprintk("RPC: %s: %d responder resources"
+ " (%d initiator)\n",
+ __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
+ goto connected;
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ connstate = -ENOTCONN;
+ goto connected;
+ case RDMA_CM_EVENT_UNREACHABLE:
+ connstate = -ENETDOWN;
+ goto connected;
+ case RDMA_CM_EVENT_REJECTED:
+ connstate = -ECONNREFUSED;
+ goto connected;
+ case RDMA_CM_EVENT_DISCONNECTED:
+ connstate = -ECONNABORTED;
+ goto connected;
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ connstate = -ENODEV;
+connected:
+ dprintk("RPC: %s: %s: %u.%u.%u.%u:%u"
+ " (ep 0x%p event 0x%x)\n",
+ __func__,
+ (event->event <= 11) ? conn[event->event] :
+ "unknown connection error",
+ NIPQUAD(addr->sin_addr.s_addr),
+ ntohs(addr->sin_port),
+ ep, event->event);
+ atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
+ dprintk("RPC: %s: %sconnected\n",
+ __func__, connstate > 0 ? "" : "dis");
+ ep->rep_connected = connstate;
+ ep->rep_func(ep);
+ wake_up_all(&ep->rep_connect_wait);
+ break;
+ default:
+ ia->ri_async_rc = -EINVAL;
+ dprintk("RPC: %s: unexpected CM event %X\n",
+ __func__, event->event);
+ complete(&ia->ri_done);
+ break;
+ }
+
+ return 0;
+}
+
+static struct rdma_cm_id *
+rpcrdma_create_id(struct rpcrdma_xprt *xprt,
+ struct rpcrdma_ia *ia, struct sockaddr *addr)
+{
+ struct rdma_cm_id *id;
+ int rc;
+
+ id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
+ if (IS_ERR(id)) {
+ rc = PTR_ERR(id);
+ dprintk("RPC: %s: rdma_create_id() failed %i\n",
+ __func__, rc);
+ return id;
+ }
+
+ ia->ri_async_rc = 0;
+ rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
+ if (rc) {
+ dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
+ __func__, rc);
+ goto out;
+ }
+ wait_for_completion(&ia->ri_done);
+ rc = ia->ri_async_rc;
+ if (rc)
+ goto out;
+
+ ia->ri_async_rc = 0;
+ rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
+ if (rc) {
+ dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
+ __func__, rc);
+ goto out;
+ }
+ wait_for_completion(&ia->ri_done);
+ rc = ia->ri_async_rc;
+ if (rc)
+ goto out;
+
+ return id;
+
+out:
+ rdma_destroy_id(id);
+ return ERR_PTR(rc);
+}
+
+/*
+ * Drain any cq, prior to teardown.
+ */
+static void
+rpcrdma_clean_cq(struct ib_cq *cq)
+{
+ struct ib_wc wc;
+ int count = 0;
+
+ while (1 == ib_poll_cq(cq, 1, &wc))
+ ++count;
+
+ if (count)
+ dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
+ __func__, count, wc.opcode);
+}
+
+/*
+ * Exported functions.
+ */
+
+/*
+ * Open and initialize an Interface Adapter.
+ * o initializes fields of struct rpcrdma_ia, including
+ * interface and provider attributes and protection zone.
+ */
+int
+rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
+{
+ int rc;
+ struct rpcrdma_ia *ia = &xprt->rx_ia;
+
+ init_completion(&ia->ri_done);
+
+ ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
+ if (IS_ERR(ia->ri_id)) {
+ rc = PTR_ERR(ia->ri_id);
+ goto out1;
+ }
+
+ ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
+ if (IS_ERR(ia->ri_pd)) {
+ rc = PTR_ERR(ia->ri_pd);
+ dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
+ __func__, rc);
+ goto out2;
+ }
+
+ /*
+ * Optionally obtain an underlying physical identity mapping in
+ * order to do a memory window-based bind. This base registration
+ * is protected from remote access - that is enabled only by binding
+ * for the specific bytes targeted during each RPC operation, and
+ * revoked after the corresponding completion similar to a storage
+ * adapter.
+ */
+ if (memreg > RPCRDMA_REGISTER) {
+ int mem_priv = IB_ACCESS_LOCAL_WRITE;
+ switch (memreg) {
+#if RPCRDMA_PERSISTENT_REGISTRATION
+ case RPCRDMA_ALLPHYSICAL:
+ mem_priv |= IB_ACCESS_REMOTE_WRITE;
+ mem_priv |= IB_ACCESS_REMOTE_READ;
+ break;
+#endif
+ case RPCRDMA_MEMWINDOWS_ASYNC:
+ case RPCRDMA_MEMWINDOWS:
+ mem_priv |= IB_ACCESS_MW_BIND;
+ break;
+ default:
+ break;
+ }
+ ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
+ if (IS_ERR(ia->ri_bind_mem)) {
+ printk(KERN_ALERT "%s: ib_get_dma_mr for "
+ "phys register failed with %lX\n\t"
+ "Will continue with degraded performance\n",
+ __func__, PTR_ERR(ia->ri_bind_mem));
+ memreg = RPCRDMA_REGISTER;
+ ia->ri_bind_mem = NULL;
+ }
+ }
+
+ /* Else will do memory reg/dereg for each chunk */
+ ia->ri_memreg_strategy = memreg;
+
+ return 0;
+out2:
+ rdma_destroy_id(ia->ri_id);
+out1:
+ return rc;
+}
+
+/*
+ * Clean up/close an IA.
+ * o if event handles and PD have been initialized, free them.
+ * o close the IA
+ */
+void
+rpcrdma_ia_close(struct rpcrdma_ia *ia)
+{
+ int rc;
+
+ dprintk("RPC: %s: entering\n", __func__);
+ if (ia->ri_bind_mem != NULL) {
+ rc = ib_dereg_mr(ia->ri_bind_mem);
+ dprintk("RPC: %s: ib_dereg_mr returned %i\n",
+ __func__, rc);
+ }
+ if (ia->ri_id != NULL && !IS_ERR(ia->ri_id) && ia->ri_id->qp)
+ rdma_destroy_qp(ia->ri_id);
+ if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
+ rc = ib_dealloc_pd(ia->ri_pd);
+ dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
+ __func__, rc);
+ }
+ if (ia->ri_id != NULL && !IS_ERR(ia->ri_id))
+ rdma_destroy_id(ia->ri_id);
+}
+
+/*
+ * Create unconnected endpoint.
+ */
+int
+rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
+ struct rpcrdma_create_data_internal *cdata)
+{
+ struct ib_device_attr devattr;
+ int rc;
+
+ rc = ib_query_device(ia->ri_id->device, &devattr);
+ if (rc) {
+ dprintk("RPC: %s: ib_query_device failed %d\n",
+ __func__, rc);
+ return rc;
+ }
+
+ /* check provider's send/recv wr limits */
+ if (cdata->max_requests > devattr.max_qp_wr)
+ cdata->max_requests = devattr.max_qp_wr;
+
+ ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
+ ep->rep_attr.qp_context = ep;
+ /* send_cq and recv_cq initialized below */
+ ep->rep_attr.srq = NULL;
+ ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+ switch (ia->ri_memreg_strategy) {
+ case RPCRDMA_MEMWINDOWS_ASYNC:
+ case RPCRDMA_MEMWINDOWS:
+ /* Add room for mw_binds+unbinds - overkill! */
+ ep->rep_attr.cap.max_send_wr++;
+ ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
+ if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
+ return -EINVAL;
+ break;
+ default:
+ break;
+ }
+ ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+ ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
+ ep->rep_attr.cap.max_recv_sge = 1;
+ ep->rep_attr.cap.max_inline_data = 0;
+ ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+ ep->rep_attr.qp_type = IB_QPT_RC;
+ ep->rep_attr.port_num = ~0;
+
+ dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
+ "iovs: send %d recv %d\n",
+ __func__,
+ ep->rep_attr.cap.max_send_wr,
+ ep->rep_attr.cap.max_recv_wr,
+ ep->rep_attr.cap.max_send_sge,
+ ep->rep_attr.cap.max_recv_sge);
+
+ /* set trigger for requesting send completion */
+ ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /* - 1*/;
+ switch (ia->ri_memreg_strategy) {
+ case RPCRDMA_MEMWINDOWS_ASYNC:
+ case RPCRDMA_MEMWINDOWS:
+ ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
+ break;
+ default:
+ break;
+ }
+ if (ep->rep_cqinit <= 2)
+ ep->rep_cqinit = 0;
+ INIT_CQCOUNT(ep);
+ ep->rep_ia = ia;
+ init_waitqueue_head(&ep->rep_connect_wait);
+
+ /*
+ * Create a single cq for receive dto and mw_bind (only ever
+ * care about unbind, really). Send completions are suppressed.
+ * Use single threaded tasklet upcalls to maintain ordering.
+ */
+ ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
+ rpcrdma_cq_async_error_upcall, NULL,
+ ep->rep_attr.cap.max_recv_wr +
+ ep->rep_attr.cap.max_send_wr + 1, 0);
+ if (IS_ERR(ep->rep_cq)) {
+ rc = PTR_ERR(ep->rep_cq);
+ dprintk("RPC: %s: ib_create_cq failed: %i\n",
+ __func__, rc);
+ goto out1;
+ }
+
+ rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
+ if (rc) {
+ dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
+ __func__, rc);
+ goto out2;
+ }
+
+ ep->rep_attr.send_cq = ep->rep_cq;
+ ep->rep_attr.recv_cq = ep->rep_cq;
+
+ /* Initialize cma parameters */
+
+ /* RPC/RDMA does not use private data */
+ ep->rep_remote_cma.private_data = NULL;
+ ep->rep_remote_cma.private_data_len = 0;
+
+ /* Client offers RDMA Read but does not initiate */
+ switch (ia->ri_memreg_strategy) {
+ case RPCRDMA_BOUNCEBUFFERS:
+ ep->rep_remote_cma.responder_resources = 0;
+ break;
+ case RPCRDMA_MTHCAFMR:
+ case RPCRDMA_REGISTER:
+ ep->rep_remote_cma.responder_resources = cdata->max_requests *
+ (RPCRDMA_MAX_DATA_SEGS / 8);
+ break;
+ case RPCRDMA_MEMWINDOWS:
+ case RPCRDMA_MEMWINDOWS_ASYNC:
+#if RPCRDMA_PERSISTENT_REGISTRATION
+ case RPCRDMA_ALLPHYSICAL:
+#endif
+ ep->rep_remote_cma.responder_resources = cdata->max_requests *
+ (RPCRDMA_MAX_DATA_SEGS / 2);
+ break;
+ default:
+ break;
+ }
+ if (ep->rep_remote_cma.responder_resources > devattr.max_qp_rd_atom)
+ ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
+ ep->rep_remote_cma.initiator_depth = 0;
+
+ ep->rep_remote_cma.retry_count = 7;
+ ep->rep_remote_cma.flow_control = 0;
+ ep->rep_remote_cma.rnr_retry_count = 0;
+
+ return 0;
+
+out2:
+ if (ib_destroy_cq(ep->rep_cq))
+ ;
+out1:
+ return rc;
+}
+
+/*
+ * rpcrdma_ep_destroy
+ *
+ * Disconnect and destroy endpoint. After this, the only
+ * valid operations on the ep are to free it (if dynamically
+ * allocated) or re-create it.
+ *
+ * The caller's error handling must be sure to not leak the endpoint
+ * if this function fails.
+ */
+int
+rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+{
+ int rc;
+
+ dprintk("RPC: %s: entering, connected is %d\n",
+ __func__, ep->rep_connected);
+
+ if (ia->ri_id->qp) {
+ rc = rpcrdma_ep_disconnect(ep, ia);
+ if (rc)
+ dprintk("RPC: %s: rpcrdma_ep_disconnect"
+ " returned %i\n", __func__, rc);
+ }
+
+ ep->rep_func = NULL;
+
+ /* padding - could be done in rpcrdma_buffer_destroy... */
+ if (ep->rep_pad_mr) {
+ rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
+ ep->rep_pad_mr = NULL;
+ }
+
+ if (ia->ri_id->qp) {
+ rdma_destroy_qp(ia->ri_id);
+ ia->ri_id->qp = NULL;
+ }
+
+ rpcrdma_clean_cq(ep->rep_cq);
+ rc = ib_destroy_cq(ep->rep_cq);
+ if (rc)
+ dprintk("RPC: %s: ib_destroy_cq returned %i\n",
+ __func__, rc);
+
+ return rc;
+}
+
+/*
+ * Connect unconnected endpoint.
+ */
+int
+rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+{
+ struct rdma_cm_id *id;
+ int rc = 0;
+ int retry_count = 0;
+ int reconnect = (ep->rep_connected != 0);
+
+ if (reconnect) {
+ struct rpcrdma_xprt *xprt;
+retry:
+ rc = rpcrdma_ep_disconnect(ep, ia);
+ if (rc && rc != -ENOTCONN)
+ dprintk("RPC: %s: rpcrdma_ep_disconnect"
+ " status %i\n", __func__, rc);
+ rpcrdma_clean_cq(ep->rep_cq);
+
+ xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+ id = rpcrdma_create_id(xprt, ia,
+ (struct sockaddr *)&xprt->rx_data.addr);
+ if (IS_ERR(id)) {
+ rc = PTR_ERR(id);
+ goto out;
+ }
+ /* TEMP TEMP TEMP - fail if new device:
+ * Deregister/remarshal *all* requests!
+ * Close and recreate adapter, pd, etc!
+ * Re-determine all attributes still sane!
+ * More stuff I haven't thought of!
+ * Rrrgh!
+ */
+ if (ia->ri_id->device != id->device) {
+ printk("RPC: %s: can't reconnect on "
+ "different device!\n", __func__);
+ rdma_destroy_id(id);
+ rc = -ENETDOWN;
+ goto out;
+ }
+ /* END TEMP */
+ rdma_destroy_id(ia->ri_id);
+ ia->ri_id = id;
+ }
+
+ rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+ if (rc) {
+ dprintk("RPC: %s: rdma_create_qp failed %i\n",
+ __func__, rc);
+ goto out;
+ }
+
+/* XXX Tavor device performs badly with 2K MTU! */
+if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
+ struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
+ if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
+ (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
+ pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
+ struct ib_qp_attr attr = {
+ .path_mtu = IB_MTU_1024
+ };
+ rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
+ }
+}
+
+ /* Theoretically a client initiator_depth > 0 is not needed,
+ * but many peers fail to complete the connection unless they
+ * == responder_resources! */
+ if (ep->rep_remote_cma.initiator_depth !=
+ ep->rep_remote_cma.responder_resources)
+ ep->rep_remote_cma.initiator_depth =
+ ep->rep_remote_cma.responder_resources;
+
+ ep->rep_connected = 0;
+
+ rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
+ if (rc) {
+ dprintk("RPC: %s: rdma_connect() failed with %i\n",
+ __func__, rc);
+ goto out;
+ }
+
+ if (reconnect)
+ return 0;
+
+ wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
+
+ /*
+ * Check state. A non-peer reject indicates no listener
+ * (ECONNREFUSED), which may be a transient state. All
+ * others indicate a transport condition which has already
+ * undergone a best-effort.
+ */
+ if (ep->rep_connected == -ECONNREFUSED
+ && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
+ dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
+ goto retry;
+ }
+ if (ep->rep_connected <= 0) {
+ /* Sometimes, the only way to reliably connect to remote
+ * CMs is to use same nonzero values for ORD and IRD. */
+ ep->rep_remote_cma.initiator_depth =
+ ep->rep_remote_cma.responder_resources;
+ if (ep->rep_remote_cma.initiator_depth == 0)
+ ++ep->rep_remote_cma.initiator_depth;
+ if (ep->rep_remote_cma.responder_resources == 0)
+ ++ep->rep_remote_cma.responder_resources;
+ if (retry_count++ == 0)
+ goto retry;
+ rc = ep->rep_connected;
+ } else {
+ dprintk("RPC: %s: connected\n", __func__);
+ }
+
+out:
+ if (rc)
+ ep->rep_connected = rc;
+ return rc;
+}
+
+/*
+ * rpcrdma_ep_disconnect
+ *
+ * This is separate from destroy to facilitate the ability
+ * to reconnect without recreating the endpoint.
+ *
+ * This call is not reentrant, and must not be made in parallel
+ * on the same endpoint.
+ */
+int
+rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+{
+ int rc;
+
+ rpcrdma_clean_cq(ep->rep_cq);
+ rc = rdma_disconnect(ia->ri_id);
+ if (!rc) {
+ /* returns without wait if not connected */
+ wait_event_interruptible(ep->rep_connect_wait,
+ ep->rep_connected != 1);
+ dprintk("RPC: %s: after wait, %sconnected\n", __func__,
+ (ep->rep_connected == 1) ? "still " : "dis");
+ } else {
+ dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
+ ep->rep_connected = rc;
+ }
+ return rc;
+}
+
+/*
+ * Initialize buffer memory
+ */
+int
+rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
+ struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
+{
+ char *p;
+ size_t len;
+ int i, rc;
+
+ buf->rb_max_requests = cdata->max_requests;
+ spin_lock_init(&buf->rb_lock);
+ atomic_set(&buf->rb_credits, 1);
+
+ /* Need to allocate:
+ * 1. arrays for send and recv pointers
+ * 2. arrays of struct rpcrdma_req to fill in pointers
+ * 3. array of struct rpcrdma_rep for replies
+ * 4. padding, if any
+ * 5. mw's, if any
+ * Send/recv buffers in req/rep need to be registered
+ */
+
+ len = buf->rb_max_requests *
+ (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
+ len += cdata->padding;
+ switch (ia->ri_memreg_strategy) {
+ case RPCRDMA_MTHCAFMR:
+ /* TBD we are perhaps overallocating here */
+ len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
+ sizeof(struct rpcrdma_mw);
+ break;
+ case RPCRDMA_MEMWINDOWS_ASYNC:
+ case RPCRDMA_MEMWINDOWS:
+ len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
+ sizeof(struct rpcrdma_mw);
+ break;
+ default:
+ break;
+ }
+
+ /* allocate 1, 4 and 5 in one shot */
+ p = kzalloc(len, GFP_KERNEL);
+ if (p == NULL) {
+ dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
+ __func__, len);
+ rc = -ENOMEM;
+ goto out;
+ }
+ buf->rb_pool = p; /* for freeing it later */
+
+ buf->rb_send_bufs = (struct rpcrdma_req **) p;
+ p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
+ buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
+ p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
+
+ /*
+ * Register the zeroed pad buffer, if any.
+ */
+ if (cdata->padding) {
+ rc = rpcrdma_register_internal(ia, p, cdata->padding,
+ &ep->rep_pad_mr, &ep->rep_pad);
+ if (rc)
+ goto out;
+ }
+ p += cdata->padding;
+
+ /*
+ * Allocate the fmr's, or mw's for mw_bind chunk registration.
+ * We "cycle" the mw's in order to minimize rkey reuse,
+ * and also reduce unbind-to-bind collision.
+ */
+ INIT_LIST_HEAD(&buf->rb_mws);
+ switch (ia->ri_memreg_strategy) {
+ case RPCRDMA_MTHCAFMR:
+ {
+ struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
+ struct ib_fmr_attr fa = {
+ RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT
+ };
+ /* TBD we are perhaps overallocating here */
+ for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
+ r->r.fmr = ib_alloc_fmr(ia->ri_pd,
+ IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
+ &fa);
+ if (IS_ERR(r->r.fmr)) {
+ rc = PTR_ERR(r->r.fmr);
+ dprintk("RPC: %s: ib_alloc_fmr"
+ " failed %i\n", __func__, rc);
+ goto out;
+ }
+ list_add(&r->mw_list, &buf->rb_mws);
+ ++r;
+ }
+ }
+ break;
+ case RPCRDMA_MEMWINDOWS_ASYNC:
+ case RPCRDMA_MEMWINDOWS:
+ {
+ struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
+ /* Allocate one extra request's worth, for full cycling */
+ for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
+ r->r.mw = ib_alloc_mw(ia->ri_pd);
+ if (IS_ERR(r->r.mw)) {
+ rc = PTR_ERR(r->r.mw);
+ dprintk("RPC: %s: ib_alloc_mw"
+ " failed %i\n", __func__, rc);
+ goto out;
+ }
+ list_add(&r->mw_list, &buf->rb_mws);
+ ++r;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+
+ /*
+ * Allocate/init the request/reply buffers. Doing this
+ * using kmalloc for now -- one for each buf.
+ */
+ for (i = 0; i < buf->rb_max_requests; i++) {
+ struct rpcrdma_req *req;
+ struct rpcrdma_rep *rep;
+
+ len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
+ /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
+ /* Typical ~2400b, so rounding up saves work later */
+ if (len < 4096)
+ len = 4096;
+ req = kmalloc(len, GFP_KERNEL);
+ if (req == NULL) {
+ dprintk("RPC: %s: request buffer %d alloc"
+ " failed\n", __func__, i);
+ rc = -ENOMEM;
+ goto out;
+ }
+ memset(req, 0, sizeof(struct rpcrdma_req));
+ buf->rb_send_bufs[i] = req;
+ buf->rb_send_bufs[i]->rl_buffer = buf;
+
+ rc = rpcrdma_register_internal(ia, req->rl_base,
+ len - offsetof(struct rpcrdma_req, rl_base),
+ &buf->rb_send_bufs[i]->rl_handle,
+ &buf->rb_send_bufs[i]->rl_iov);
+ if (rc)
+ goto out;
+
+ buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
+
+ len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
+ rep = kmalloc(len, GFP_KERNEL);
+ if (rep == NULL) {
+ dprintk("RPC: %s: reply buffer %d alloc failed\n",
+ __func__, i);
+ rc = -ENOMEM;
+ goto out;
+ }
+ memset(rep, 0, sizeof(struct rpcrdma_rep));
+ buf->rb_recv_bufs[i] = rep;
+ buf->rb_recv_bufs[i]->rr_buffer = buf;
+ init_waitqueue_head(&rep->rr_unbind);
+
+ rc = rpcrdma_register_internal(ia, rep->rr_base,
+ len - offsetof(struct rpcrdma_rep, rr_base),
+ &buf->rb_recv_bufs[i]->rr_handle,
+ &buf->rb_recv_bufs[i]->rr_iov);
+ if (rc)
+ goto out;
+
+ }
+ dprintk("RPC: %s: max_requests %d\n",
+ __func__, buf->rb_max_requests);
+ /* done */
+ return 0;
+out:
+ rpcrdma_buffer_destroy(buf);
+ return rc;
+}
+
+/*
+ * Unregister and destroy buffer memory. Need to deal with
+ * partial initialization, so it's callable from failed create.
+ * Must be called before destroying endpoint, as registrations
+ * reference it.
+ */
+void
+rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
+{
+ int rc, i;
+ struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+
+ /* clean up in reverse order from create
+ * 1. recv mr memory (mr free, then kfree)
+ * 1a. bind mw memory
+ * 2. send mr memory (mr free, then kfree)
+ * 3. padding (if any) [moved to rpcrdma_ep_destroy]
+ * 4. arrays
+ */
+ dprintk("RPC: %s: entering\n", __func__);
+
+ for (i = 0; i < buf->rb_max_requests; i++) {
+ if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
+ rpcrdma_deregister_internal(ia,
+ buf->rb_recv_bufs[i]->rr_handle,
+ &buf->rb_recv_bufs[i]->rr_iov);
+ kfree(buf->rb_recv_bufs[i]);
+ }
+ if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
+ while (!list_empty(&buf->rb_mws)) {
+ struct rpcrdma_mw *r;
+ r = list_entry(buf->rb_mws.next,
+ struct rpcrdma_mw, mw_list);
+ list_del(&r->mw_list);
+ switch (ia->ri_memreg_strategy) {
+ case RPCRDMA_MTHCAFMR:
+ rc = ib_dealloc_fmr(r->r.fmr);
+ if (rc)
+ dprintk("RPC: %s:"
+ " ib_dealloc_fmr"
+ " failed %i\n",
+ __func__, rc);
+ break;
+ case RPCRDMA_MEMWINDOWS_ASYNC:
+ case RPCRDMA_MEMWINDOWS:
+ rc = ib_dealloc_mw(r->r.mw);
+ if (rc)
+ dprintk("RPC: %s:"
+ " ib_dealloc_mw"
+ " failed %i\n",
+ __func__, rc);
+ break;
+ default:
+ break;
+ }
+ }
+ rpcrdma_deregister_internal(ia,
+ buf->rb_send_bufs[i]->rl_handle,
+ &buf->rb_send_bufs[i]->rl_iov);
+ kfree(buf->rb_send_bufs[i]);
+ }
+ }
+
+ kfree(buf->rb_pool);
+}
+
+/*
+ * Get a set of request/reply buffers.
+ *
+ * Reply buffer (if needed) is attached to send buffer upon return.
+ * Rule:
+ * rb_send_index and rb_recv_index MUST always be pointing to the
+ * *next* available buffer (non-NULL). They are incremented after
+ * removing buffers, and decremented *before* returning them.
+ */
+struct rpcrdma_req *
+rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
+{
+ struct rpcrdma_req *req;
+ unsigned long flags;
+
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ if (buffers->rb_send_index == buffers->rb_max_requests) {
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ dprintk("RPC: %s: out of request buffers\n", __func__);
+ return ((struct rpcrdma_req *)NULL);
+ }
+
+ req = buffers->rb_send_bufs[buffers->rb_send_index];
+ if (buffers->rb_send_index < buffers->rb_recv_index) {
+ dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
+ __func__,
+ buffers->rb_recv_index - buffers->rb_send_index);
+ req->rl_reply = NULL;
+ } else {
+ req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
+ buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
+ }
+ buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
+ if (!list_empty(&buffers->rb_mws)) {
+ int i = RPCRDMA_MAX_SEGS - 1;
+ do {
+ struct rpcrdma_mw *r;
+ r = list_entry(buffers->rb_mws.next,
+ struct rpcrdma_mw, mw_list);
+ list_del(&r->mw_list);
+ req->rl_segments[i].mr_chunk.rl_mw = r;
+ } while (--i >= 0);
+ }
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ return req;
+}
+
+/*
+ * Put request/reply buffers back into pool.
+ * Pre-decrement counter/array index.
+ */
+void
+rpcrdma_buffer_put(struct rpcrdma_req *req)
+{
+ struct rpcrdma_buffer *buffers = req->rl_buffer;
+ struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
+ int i;
+ unsigned long flags;
+
+ BUG_ON(req->rl_nchunks != 0);
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ buffers->rb_send_bufs[--buffers->rb_send_index] = req;
+ req->rl_niovs = 0;
+ if (req->rl_reply) {
+ buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
+ init_waitqueue_head(&req->rl_reply->rr_unbind);
+ req->rl_reply->rr_func = NULL;
+ req->rl_reply = NULL;
+ }
+ switch (ia->ri_memreg_strategy) {
+ case RPCRDMA_MTHCAFMR:
+ case RPCRDMA_MEMWINDOWS_ASYNC:
+ case RPCRDMA_MEMWINDOWS:
+ /*
+ * Cycle mw's back in reverse order, and "spin" them.
+ * This delays and scrambles reuse as much as possible.
+ */
+ i = 1;
+ do {
+ struct rpcrdma_mw **mw;
+ mw = &req->rl_segments[i].mr_chunk.rl_mw;
+ list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
+ *mw = NULL;
+ } while (++i < RPCRDMA_MAX_SEGS);
+ list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
+ &buffers->rb_mws);
+ req->rl_segments[0].mr_chunk.rl_mw = NULL;
+ break;
+ default:
+ break;
+ }
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+}
+
+/*
+ * Recover reply buffers from pool.
+ * This happens when recovering from error conditions.
+ * Post-increment counter/array index.
+ */
+void
+rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
+{
+ struct rpcrdma_buffer *buffers = req->rl_buffer;
+ unsigned long flags;
+
+ if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
+ buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ if (buffers->rb_recv_index < buffers->rb_max_requests) {
+ req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
+ buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
+ }
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+}
+
+/*
+ * Put reply buffers back into pool when not attached to
+ * request. This happens in error conditions, and when
+ * aborting unbinds. Pre-decrement counter/array index.
+ */
+void
+rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
+{
+ struct rpcrdma_buffer *buffers = rep->rr_buffer;
+ unsigned long flags;
+
+ rep->rr_func = NULL;
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+}
+
+/*
+ * Wrappers for internal-use kmalloc memory registration, used by buffer code.
+ */
+
+int
+rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
+ struct ib_mr **mrp, struct ib_sge *iov)
+{
+ struct ib_phys_buf ipb;
+ struct ib_mr *mr;
+ int rc;
+
+ /*
+ * All memory passed here was kmalloc'ed, therefore phys-contiguous.
+ */
+ iov->addr = ib_dma_map_single(ia->ri_id->device,
+ va, len, DMA_BIDIRECTIONAL);
+ iov->length = len;
+
+ if (ia->ri_bind_mem != NULL) {
+ *mrp = NULL;
+ iov->lkey = ia->ri_bind_mem->lkey;
+ return 0;
+ }
+
+ ipb.addr = iov->addr;
+ ipb.size = iov->length;
+ mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
+ IB_ACCESS_LOCAL_WRITE, &iov->addr);
+
+ dprintk("RPC: %s: phys convert: 0x%llx "
+ "registered 0x%llx length %d\n",
+ __func__, ipb.addr, iov->addr, len);
+
+ if (IS_ERR(mr)) {
+ *mrp = NULL;
+ rc = PTR_ERR(mr);
+ dprintk("RPC: %s: failed with %i\n", __func__, rc);
+ } else {
+ *mrp = mr;
+ iov->lkey = mr->lkey;
+ rc = 0;
+ }
+
+ return rc;
+}
+
+int
+rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
+ struct ib_mr *mr, struct ib_sge *iov)
+{
+ int rc;
+
+ ib_dma_unmap_single(ia->ri_id->device,
+ iov->addr, iov->length, DMA_BIDIRECTIONAL);
+
+ if (NULL == mr)
+ return 0;
+
+ rc = ib_dereg_mr(mr);
+ if (rc)
+ dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
+ return rc;
+}
+
+/*
+ * Wrappers for chunk registration, shared by read/write chunk code.
+ */
+
+static void
+rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
+{
+ seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
+ seg->mr_dmalen = seg->mr_len;
+ if (seg->mr_page)
+ seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
+ seg->mr_page, offset_in_page(seg->mr_offset),
+ seg->mr_dmalen, seg->mr_dir);
+ else
+ seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
+ seg->mr_offset,
+ seg->mr_dmalen, seg->mr_dir);
+}
+
+static void
+rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
+{
+ if (seg->mr_page)
+ ib_dma_unmap_page(ia->ri_id->device,
+ seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
+ else
+ ib_dma_unmap_single(ia->ri_id->device,
+ seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
+}
+
+int
+rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
+ int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
+ IB_ACCESS_REMOTE_READ);
+ struct rpcrdma_mr_seg *seg1 = seg;
+ int i;
+ int rc = 0;
+
+ switch (ia->ri_memreg_strategy) {
+
+#if RPCRDMA_PERSISTENT_REGISTRATION
+ case RPCRDMA_ALLPHYSICAL:
+ rpcrdma_map_one(ia, seg, writing);
+ seg->mr_rkey = ia->ri_bind_mem->rkey;
+ seg->mr_base = seg->mr_dma;
+ seg->mr_nsegs = 1;
+ nsegs = 1;
+ break;
+#endif
+
+ /* Registration using fast memory registration */
+ case RPCRDMA_MTHCAFMR:
+ {
+ u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
+ int len, pageoff = offset_in_page(seg->mr_offset);
+ seg1->mr_offset -= pageoff; /* start of page */
+ seg1->mr_len += pageoff;
+ len = -pageoff;
+ if (nsegs > RPCRDMA_MAX_DATA_SEGS)
+ nsegs = RPCRDMA_MAX_DATA_SEGS;
+ for (i = 0; i < nsegs;) {
+ rpcrdma_map_one(ia, seg, writing);
+ physaddrs[i] = seg->mr_dma;
+ len += seg->mr_len;
+ ++seg;
+ ++i;
+ /* Check for holes */
+ if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
+ offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
+ break;
+ }
+ nsegs = i;
+ rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
+ physaddrs, nsegs, seg1->mr_dma);
+ if (rc) {
+ dprintk("RPC: %s: failed ib_map_phys_fmr "
+ "%u@0x%llx+%i (%d)... status %i\n", __func__,
+ len, (unsigned long long)seg1->mr_dma,
+ pageoff, nsegs, rc);
+ while (nsegs--)
+ rpcrdma_unmap_one(ia, --seg);
+ } else {
+ seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
+ seg1->mr_base = seg1->mr_dma + pageoff;
+ seg1->mr_nsegs = nsegs;
+ seg1->mr_len = len;
+ }
+ }
+ break;
+
+ /* Registration using memory windows */
+ case RPCRDMA_MEMWINDOWS_ASYNC:
+ case RPCRDMA_MEMWINDOWS:
+ {
+ struct ib_mw_bind param;
+ rpcrdma_map_one(ia, seg, writing);
+ param.mr = ia->ri_bind_mem;
+ param.wr_id = 0ULL; /* no send cookie */
+ param.addr = seg->mr_dma;
+ param.length = seg->mr_len;
+ param.send_flags = 0;
+ param.mw_access_flags = mem_priv;
+
+ DECR_CQCOUNT(&r_xprt->rx_ep);
+ rc = ib_bind_mw(ia->ri_id->qp,
+ seg->mr_chunk.rl_mw->r.mw, &param);
+ if (rc) {
+ dprintk("RPC: %s: failed ib_bind_mw "
+ "%u@0x%llx status %i\n",
+ __func__, seg->mr_len,
+ (unsigned long long)seg->mr_dma, rc);
+ rpcrdma_unmap_one(ia, seg);
+ } else {
+ seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
+ seg->mr_base = param.addr;
+ seg->mr_nsegs = 1;
+ nsegs = 1;
+ }
+ }
+ break;
+
+ /* Default registration each time */
+ default:
+ {
+ struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
+ int len = 0;
+ if (nsegs > RPCRDMA_MAX_DATA_SEGS)
+ nsegs = RPCRDMA_MAX_DATA_SEGS;
+ for (i = 0; i < nsegs;) {
+ rpcrdma_map_one(ia, seg, writing);
+ ipb[i].addr = seg->mr_dma;
+ ipb[i].size = seg->mr_len;
+ len += seg->mr_len;
+ ++seg;
+ ++i;
+ /* Check for holes */
+ if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
+ offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
+ break;
+ }
+ nsegs = i;
+ seg1->mr_base = seg1->mr_dma;
+ seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
+ ipb, nsegs, mem_priv, &seg1->mr_base);
+ if (IS_ERR(seg1->mr_chunk.rl_mr)) {
+ rc = PTR_ERR(seg1->mr_chunk.rl_mr);
+ dprintk("RPC: %s: failed ib_reg_phys_mr "
+ "%u@0x%llx (%d)... status %i\n",
+ __func__, len,
+ (unsigned long long)seg1->mr_dma, nsegs, rc);
+ while (nsegs--)
+ rpcrdma_unmap_one(ia, --seg);
+ } else {
+ seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
+ seg1->mr_nsegs = nsegs;
+ seg1->mr_len = len;
+ }
+ }
+ break;
+ }
+ if (rc)
+ return -1;
+
+ return nsegs;
+}
+
+int
+rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
+ struct rpcrdma_xprt *r_xprt, void *r)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_mr_seg *seg1 = seg;
+ int nsegs = seg->mr_nsegs, rc;
+
+ switch (ia->ri_memreg_strategy) {
+
+#if RPCRDMA_PERSISTENT_REGISTRATION
+ case RPCRDMA_ALLPHYSICAL:
+ BUG_ON(nsegs != 1);
+ rpcrdma_unmap_one(ia, seg);
+ rc = 0;
+ break;
+#endif
+
+ case RPCRDMA_MTHCAFMR:
+ {
+ LIST_HEAD(l);
+ list_add(&seg->mr_chunk.rl_mw->r.fmr->list, &l);
+ rc = ib_unmap_fmr(&l);
+ while (seg1->mr_nsegs--)
+ rpcrdma_unmap_one(ia, seg++);
+ }
+ if (rc)
+ dprintk("RPC: %s: failed ib_unmap_fmr,"
+ " status %i\n", __func__, rc);
+ break;
+
+ case RPCRDMA_MEMWINDOWS_ASYNC:
+ case RPCRDMA_MEMWINDOWS:
+ {
+ struct ib_mw_bind param;
+ BUG_ON(nsegs != 1);
+ param.mr = ia->ri_bind_mem;
+ param.addr = 0ULL; /* unbind */
+ param.length = 0;
+ param.mw_access_flags = 0;
+ if (r) {
+ param.wr_id = (u64) (unsigned long) r;
+ param.send_flags = IB_SEND_SIGNALED;
+ INIT_CQCOUNT(&r_xprt->rx_ep);
+ } else {
+ param.wr_id = 0ULL;
+ param.send_flags = 0;
+ DECR_CQCOUNT(&r_xprt->rx_ep);
+ }
+ rc = ib_bind_mw(ia->ri_id->qp,
+ seg->mr_chunk.rl_mw->r.mw, &param);
+ rpcrdma_unmap_one(ia, seg);
+ }
+ if (rc)
+ dprintk("RPC: %s: failed ib_(un)bind_mw,"
+ " status %i\n", __func__, rc);
+ else
+ r = NULL; /* will upcall on completion */
+ break;
+
+ default:
+ rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
+ seg1->mr_chunk.rl_mr = NULL;
+ while (seg1->mr_nsegs--)
+ rpcrdma_unmap_one(ia, seg++);
+ if (rc)
+ dprintk("RPC: %s: failed ib_dereg_mr,"
+ " status %i\n", __func__, rc);
+ break;
+ }
+ if (r) {
+ struct rpcrdma_rep *rep = r;
+ void (*func)(struct rpcrdma_rep *) = rep->rr_func;
+ rep->rr_func = NULL;
+ func(rep); /* dereg done, callback now */
+ }
+ return nsegs;
+}
+
+/*
+ * Prepost any receive buffer, then post send.
+ *
+ * Receive buffer is donated to hardware, reclaimed upon recv completion.
+ */
+int
+rpcrdma_ep_post(struct rpcrdma_ia *ia,
+ struct rpcrdma_ep *ep,
+ struct rpcrdma_req *req)
+{
+ struct ib_send_wr send_wr, *send_wr_fail;
+ struct rpcrdma_rep *rep = req->rl_reply;
+ int rc;
+
+ if (rep) {
+ rc = rpcrdma_ep_post_recv(ia, ep, rep);
+ if (rc)
+ goto out;
+ req->rl_reply = NULL;
+ }
+
+ send_wr.next = NULL;
+ send_wr.wr_id = 0ULL; /* no send cookie */
+ send_wr.sg_list = req->rl_send_iov;
+ send_wr.num_sge = req->rl_niovs;
+ send_wr.opcode = IB_WR_SEND;
+ send_wr.imm_data = 0;
+ if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
+ ib_dma_sync_single_for_device(ia->ri_id->device,
+ req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
+ DMA_TO_DEVICE);
+ ib_dma_sync_single_for_device(ia->ri_id->device,
+ req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
+ DMA_TO_DEVICE);
+ ib_dma_sync_single_for_device(ia->ri_id->device,
+ req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
+ DMA_TO_DEVICE);
+
+ if (DECR_CQCOUNT(ep) > 0)
+ send_wr.send_flags = 0;
+ else { /* Provider must take a send completion every now and then */
+ INIT_CQCOUNT(ep);
+ send_wr.send_flags = IB_SEND_SIGNALED;
+ }
+
+ rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
+ if (rc)
+ dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
+ rc);
+out:
+ return rc;
+}
+
+/*
+ * (Re)post a receive buffer.
+ */
+int
+rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
+ struct rpcrdma_ep *ep,
+ struct rpcrdma_rep *rep)
+{
+ struct ib_recv_wr recv_wr, *recv_wr_fail;
+ int rc;
+
+ recv_wr.next = NULL;
+ recv_wr.wr_id = (u64) (unsigned long) rep;
+ recv_wr.sg_list = &rep->rr_iov;
+ recv_wr.num_sge = 1;
+
+ ib_dma_sync_single_for_cpu(ia->ri_id->device,
+ rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
+
+ DECR_CQCOUNT(ep);
+ rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
+
+ if (rc)
+ dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
+ rc);
+ return rc;
+}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
new file mode 100644
index 000000000000..2427822f8bd4
--- /dev/null
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -0,0 +1,330 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _LINUX_SUNRPC_XPRT_RDMA_H
+#define _LINUX_SUNRPC_XPRT_RDMA_H
+
+#include <linux/wait.h> /* wait_queue_head_t, etc */
+#include <linux/spinlock.h> /* spinlock_t, etc */
+#include <asm/atomic.h> /* atomic_t, etc */
+
+#include <rdma/rdma_cm.h> /* RDMA connection api */
+#include <rdma/ib_verbs.h> /* RDMA verbs api */
+
+#include <linux/sunrpc/clnt.h> /* rpc_xprt */
+#include <linux/sunrpc/rpc_rdma.h> /* RPC/RDMA protocol */
+#include <linux/sunrpc/xprtrdma.h> /* xprt parameters */
+
+/*
+ * Interface Adapter -- one per transport instance
+ */
+struct rpcrdma_ia {
+ struct rdma_cm_id *ri_id;
+ struct ib_pd *ri_pd;
+ struct ib_mr *ri_bind_mem;
+ struct completion ri_done;
+ int ri_async_rc;
+ enum rpcrdma_memreg ri_memreg_strategy;
+};
+
+/*
+ * RDMA Endpoint -- one per transport instance
+ */
+
+struct rpcrdma_ep {
+ atomic_t rep_cqcount;
+ int rep_cqinit;
+ int rep_connected;
+ struct rpcrdma_ia *rep_ia;
+ struct ib_cq *rep_cq;
+ struct ib_qp_init_attr rep_attr;
+ wait_queue_head_t rep_connect_wait;
+ struct ib_sge rep_pad; /* holds zeroed pad */
+ struct ib_mr *rep_pad_mr; /* holds zeroed pad */
+ void (*rep_func)(struct rpcrdma_ep *);
+ struct rpc_xprt *rep_xprt; /* for rep_func */
+ struct rdma_conn_param rep_remote_cma;
+ struct sockaddr_storage rep_remote_addr;
+};
+
+#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
+#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
+
+/*
+ * struct rpcrdma_rep -- this structure encapsulates state required to recv
+ * and complete a reply, asychronously. It needs several pieces of
+ * state:
+ * o recv buffer (posted to provider)
+ * o ib_sge (also donated to provider)
+ * o status of reply (length, success or not)
+ * o bookkeeping state to get run by tasklet (list, etc)
+ *
+ * These are allocated during initialization, per-transport instance;
+ * however, the tasklet execution list itself is global, as it should
+ * always be pretty short.
+ *
+ * N of these are associated with a transport instance, and stored in
+ * struct rpcrdma_buffer. N is the max number of outstanding requests.
+ */
+
+/* temporary static scatter/gather max */
+#define RPCRDMA_MAX_DATA_SEGS (8) /* max scatter/gather */
+#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */
+#define MAX_RPCRDMAHDR (\
+ /* max supported RPC/RDMA header */ \
+ sizeof(struct rpcrdma_msg) + (2 * sizeof(u32)) + \
+ (sizeof(struct rpcrdma_read_chunk) * RPCRDMA_MAX_SEGS) + sizeof(u32))
+
+struct rpcrdma_buffer;
+
+struct rpcrdma_rep {
+ unsigned int rr_len; /* actual received reply length */
+ struct rpcrdma_buffer *rr_buffer; /* home base for this structure */
+ struct rpc_xprt *rr_xprt; /* needed for request/reply matching */
+ void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */
+ struct list_head rr_list; /* tasklet list */
+ wait_queue_head_t rr_unbind; /* optional unbind wait */
+ struct ib_sge rr_iov; /* for posting */
+ struct ib_mr *rr_handle; /* handle for mem in rr_iov */
+ char rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */
+};
+
+/*
+ * struct rpcrdma_req -- structure central to the request/reply sequence.
+ *
+ * N of these are associated with a transport instance, and stored in
+ * struct rpcrdma_buffer. N is the max number of outstanding requests.
+ *
+ * It includes pre-registered buffer memory for send AND recv.
+ * The recv buffer, however, is not owned by this structure, and
+ * is "donated" to the hardware when a recv is posted. When a
+ * reply is handled, the recv buffer used is given back to the
+ * struct rpcrdma_req associated with the request.
+ *
+ * In addition to the basic memory, this structure includes an array
+ * of iovs for send operations. The reason is that the iovs passed to
+ * ib_post_{send,recv} must not be modified until the work request
+ * completes.
+ *
+ * NOTES:
+ * o RPCRDMA_MAX_SEGS is the max number of addressible chunk elements we
+ * marshal. The number needed varies depending on the iov lists that
+ * are passed to us, the memory registration mode we are in, and if
+ * physical addressing is used, the layout.
+ */
+
+struct rpcrdma_mr_seg { /* chunk descriptors */
+ union { /* chunk memory handles */
+ struct ib_mr *rl_mr; /* if registered directly */
+ struct rpcrdma_mw { /* if registered from region */
+ union {
+ struct ib_mw *mw;
+ struct ib_fmr *fmr;
+ } r;
+ struct list_head mw_list;
+ } *rl_mw;
+ } mr_chunk;
+ u64 mr_base; /* registration result */
+ u32 mr_rkey; /* registration result */
+ u32 mr_len; /* length of chunk or segment */
+ int mr_nsegs; /* number of segments in chunk or 0 */
+ enum dma_data_direction mr_dir; /* segment mapping direction */
+ dma_addr_t mr_dma; /* segment mapping address */
+ size_t mr_dmalen; /* segment mapping length */
+ struct page *mr_page; /* owning page, if any */
+ char *mr_offset; /* kva if no page, else offset */
+};
+
+struct rpcrdma_req {
+ size_t rl_size; /* actual length of buffer */
+ unsigned int rl_niovs; /* 0, 2 or 4 */
+ unsigned int rl_nchunks; /* non-zero if chunks */
+ struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
+ struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
+ struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
+ struct ib_sge rl_send_iov[4]; /* for active requests */
+ struct ib_sge rl_iov; /* for posting */
+ struct ib_mr *rl_handle; /* handle for mem in rl_iov */
+ char rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */
+ __u32 rl_xdr_buf[0]; /* start of returned rpc rq_buffer */
+};
+#define rpcr_to_rdmar(r) \
+ container_of((r)->rq_buffer, struct rpcrdma_req, rl_xdr_buf[0])
+
+/*
+ * struct rpcrdma_buffer -- holds list/queue of pre-registered memory for
+ * inline requests/replies, and client/server credits.
+ *
+ * One of these is associated with a transport instance
+ */
+struct rpcrdma_buffer {
+ spinlock_t rb_lock; /* protects indexes */
+ atomic_t rb_credits; /* most recent server credits */
+ unsigned long rb_cwndscale; /* cached framework rpc_cwndscale */
+ int rb_max_requests;/* client max requests */
+ struct list_head rb_mws; /* optional memory windows/fmrs */
+ int rb_send_index;
+ struct rpcrdma_req **rb_send_bufs;
+ int rb_recv_index;
+ struct rpcrdma_rep **rb_recv_bufs;
+ char *rb_pool;
+};
+#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
+
+/*
+ * Internal structure for transport instance creation. This
+ * exists primarily for modularity.
+ *
+ * This data should be set with mount options
+ */
+struct rpcrdma_create_data_internal {
+ struct sockaddr_storage addr; /* RDMA server address */
+ unsigned int max_requests; /* max requests (slots) in flight */
+ unsigned int rsize; /* mount rsize - max read hdr+data */
+ unsigned int wsize; /* mount wsize - max write hdr+data */
+ unsigned int inline_rsize; /* max non-rdma read data payload */
+ unsigned int inline_wsize; /* max non-rdma write data payload */
+ unsigned int padding; /* non-rdma write header padding */
+};
+
+#define RPCRDMA_INLINE_READ_THRESHOLD(rq) \
+ (rpcx_to_rdmad(rq->rq_task->tk_xprt).inline_rsize)
+
+#define RPCRDMA_INLINE_WRITE_THRESHOLD(rq)\
+ (rpcx_to_rdmad(rq->rq_task->tk_xprt).inline_wsize)
+
+#define RPCRDMA_INLINE_PAD_VALUE(rq)\
+ rpcx_to_rdmad(rq->rq_task->tk_xprt).padding
+
+/*
+ * Statistics for RPCRDMA
+ */
+struct rpcrdma_stats {
+ unsigned long read_chunk_count;
+ unsigned long write_chunk_count;
+ unsigned long reply_chunk_count;
+
+ unsigned long long total_rdma_request;
+ unsigned long long total_rdma_reply;
+
+ unsigned long long pullup_copy_count;
+ unsigned long long fixup_copy_count;
+ unsigned long hardway_register_count;
+ unsigned long failed_marshal_count;
+ unsigned long bad_reply_count;
+};
+
+/*
+ * RPCRDMA transport -- encapsulates the structures above for
+ * integration with RPC.
+ *
+ * The contained structures are embedded, not pointers,
+ * for convenience. This structure need not be visible externally.
+ *
+ * It is allocated and initialized during mount, and released
+ * during unmount.
+ */
+struct rpcrdma_xprt {
+ struct rpc_xprt xprt;
+ struct rpcrdma_ia rx_ia;
+ struct rpcrdma_ep rx_ep;
+ struct rpcrdma_buffer rx_buf;
+ struct rpcrdma_create_data_internal rx_data;
+ struct delayed_work rdma_connect;
+ struct rpcrdma_stats rx_stats;
+};
+
+#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, xprt)
+#define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data)
+
+/*
+ * Interface Adapter calls - xprtrdma/verbs.c
+ */
+int rpcrdma_ia_open(struct rpcrdma_xprt *, struct sockaddr *, int);
+void rpcrdma_ia_close(struct rpcrdma_ia *);
+
+/*
+ * Endpoint calls - xprtrdma/verbs.c
+ */
+int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
+ struct rpcrdma_create_data_internal *);
+int rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
+int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
+int rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
+
+int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
+ struct rpcrdma_req *);
+int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
+ struct rpcrdma_rep *);
+
+/*
+ * Buffer calls - xprtrdma/verbs.c
+ */
+int rpcrdma_buffer_create(struct rpcrdma_buffer *, struct rpcrdma_ep *,
+ struct rpcrdma_ia *,
+ struct rpcrdma_create_data_internal *);
+void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
+
+struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
+void rpcrdma_buffer_put(struct rpcrdma_req *);
+void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
+void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
+
+int rpcrdma_register_internal(struct rpcrdma_ia *, void *, int,
+ struct ib_mr **, struct ib_sge *);
+int rpcrdma_deregister_internal(struct rpcrdma_ia *,
+ struct ib_mr *, struct ib_sge *);
+
+int rpcrdma_register_external(struct rpcrdma_mr_seg *,
+ int, int, struct rpcrdma_xprt *);
+int rpcrdma_deregister_external(struct rpcrdma_mr_seg *,
+ struct rpcrdma_xprt *, void *);
+
+/*
+ * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
+ */
+void rpcrdma_conn_func(struct rpcrdma_ep *);
+void rpcrdma_reply_handler(struct rpcrdma_rep *);
+
+/*
+ * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
+ */
+int rpcrdma_marshal_req(struct rpc_rqst *);
+
+#endif /* _LINUX_SUNRPC_XPRT_RDMA_H */