summaryrefslogtreecommitdiffstats
path: root/net/rds/message.c
diff options
context:
space:
mode:
authorSowmini Varadhan <sowmini.varadhan@oracle.com>2018-02-15 19:49:35 +0100
committerDavid S. Miller <davem@davemloft.net>2018-02-16 22:04:17 +0100
commit01883eda72bd3f0a6c81447e4f223de14033fd9d (patch)
tree1d70322d05664e531520ae30d47ee5ef7267c644 /net/rds/message.c
parentsock: permit SO_ZEROCOPY on PF_RDS socket (diff)
downloadlinux-01883eda72bd3f0a6c81447e4f223de14033fd9d.tar.xz
linux-01883eda72bd3f0a6c81447e4f223de14033fd9d.zip
rds: support for zcopy completion notification
RDS removes a datagram (rds_message) from the retransmit queue when an ACK is received. The ACK indicates that the receiver has queued the RDS datagram, so that the sender can safely forget the datagram. When all references to the rds_message are quiesced, rds_message_purge is called to release resources used by the rds_message If the datagram to be removed had pinned pages set up, add an entry to the rs->rs_znotify_queue so that the notifcation will be sent up via rds_rm_zerocopy_callback() when the rds_message is eventually freed by rds_message_purge. rds_rm_zerocopy_callback() attempts to batch the number of cookies sent with each notification to a max of SO_EE_ORIGIN_MAX_ZCOOKIES. This is achieved by checking the tail skb in the sk_error_queue: if this has room for one more cookie, the cookie from the current notification is added; else a new skb is added to the sk_error_queue. Every invocation of rds_rm_zerocopy_callback() will trigger a ->sk_error_report to notify the application. Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com> Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com> Acked-by: Willem de Bruijn <willemb@google.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/rds/message.c')
-rw-r--r--net/rds/message.c83
1 files changed, 76 insertions, 7 deletions
diff --git a/net/rds/message.c b/net/rds/message.c
index ef3daafa3d79..bf1a656b198a 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -33,6 +33,9 @@
#include <linux/kernel.h>
#include <linux/slab.h>
#include <linux/export.h>
+#include <linux/skbuff.h>
+#include <linux/list.h>
+#include <linux/errqueue.h>
#include "rds.h"
@@ -53,29 +56,95 @@ void rds_message_addref(struct rds_message *rm)
}
EXPORT_SYMBOL_GPL(rds_message_addref);
+static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
+{
+ struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
+ int ncookies;
+ u32 *ptr;
+
+ if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE)
+ return false;
+ ncookies = serr->ee.ee_data;
+ if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES)
+ return false;
+ ptr = skb_put(skb, sizeof(u32));
+ *ptr = cookie;
+ serr->ee.ee_data = ++ncookies;
+ return true;
+}
+
+static void rds_rm_zerocopy_callback(struct rds_sock *rs,
+ struct rds_znotifier *znotif)
+{
+ struct sock *sk = rds_rs_to_sk(rs);
+ struct sk_buff *skb, *tail;
+ struct sock_exterr_skb *serr;
+ unsigned long flags;
+ struct sk_buff_head *q;
+ u32 cookie = znotif->z_cookie;
+
+ q = &sk->sk_error_queue;
+ spin_lock_irqsave(&q->lock, flags);
+ tail = skb_peek_tail(q);
+
+ if (tail && skb_zcookie_add(tail, cookie)) {
+ spin_unlock_irqrestore(&q->lock, flags);
+ mm_unaccount_pinned_pages(&znotif->z_mmp);
+ consume_skb(rds_skb_from_znotifier(znotif));
+ sk->sk_error_report(sk);
+ return;
+ }
+
+ skb = rds_skb_from_znotifier(znotif);
+ serr = SKB_EXT_ERR(skb);
+ memset(&serr->ee, 0, sizeof(serr->ee));
+ serr->ee.ee_errno = 0;
+ serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
+ serr->ee.ee_info = 0;
+ WARN_ON(!skb_zcookie_add(skb, cookie));
+
+ __skb_queue_tail(q, skb);
+
+ spin_unlock_irqrestore(&q->lock, flags);
+ sk->sk_error_report(sk);
+
+ mm_unaccount_pinned_pages(&znotif->z_mmp);
+}
+
/*
* This relies on dma_map_sg() not touching sg[].page during merging.
*/
static void rds_message_purge(struct rds_message *rm)
{
unsigned long i, flags;
+ bool zcopy = false;
if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
return;
- for (i = 0; i < rm->data.op_nents; i++) {
- rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
- /* XXX will have to put_page for page refs */
- __free_page(sg_page(&rm->data.op_sg[i]));
- }
- rm->data.op_nents = 0;
spin_lock_irqsave(&rm->m_rs_lock, flags);
if (rm->m_rs) {
- sock_put(rds_rs_to_sk(rm->m_rs));
+ struct rds_sock *rs = rm->m_rs;
+
+ if (rm->data.op_mmp_znotifier) {
+ zcopy = true;
+ rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
+ rm->data.op_mmp_znotifier = NULL;
+ }
+ sock_put(rds_rs_to_sk(rs));
rm->m_rs = NULL;
}
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
+ for (i = 0; i < rm->data.op_nents; i++) {
+ /* XXX will have to put_page for page refs */
+ if (!zcopy)
+ __free_page(sg_page(&rm->data.op_sg[i]));
+ else
+ put_page(sg_page(&rm->data.op_sg[i]));
+ }
+ rm->data.op_nents = 0;
+
if (rm->rdma.op_active)
rds_rdma_free_op(&rm->rdma);
if (rm->rdma.op_rdma_mr)