summaryrefslogtreecommitdiffstats
path: root/net/rds
diff options
context:
space:
mode:
authorSowmini Varadhan <sowmini.varadhan@oracle.com>2018-02-27 18:52:43 +0100
committerDavid S. Miller <davem@davemloft.net>2018-02-27 20:19:11 +0100
commit401910db4cd425899832a093539222b6174f92a2 (patch)
tree7eb70bb22f23c1b7edb5245e17ad0f8c81bfc333 /net/rds
parentselftests/net: revert the zerocopy Rx path for PF_RDS (diff)
downloadlinux-401910db4cd425899832a093539222b6174f92a2.tar.xz
linux-401910db4cd425899832a093539222b6174f92a2.zip
rds: deliver zerocopy completion notification with data
This commit is an optimization over commit 01883eda72bd ("rds: support for zcopy completion notification") for PF_RDS sockets. RDS applications are predominantly request-response transactions, so it is more efficient to reduce the number of system calls and have zerocopy completion notification delivered as ancillary data on the POLLIN channel. Cookies are passed up as ancillary data (at level SOL_RDS) in a struct rds_zcopy_cookies when the returned value of recvmsg() is greater than, or equal to, 0. A max of RDS_MAX_ZCOOKIES may be passed with each message. This commit removes support for zerocopy completion notification on MSG_ERRQUEUE for PF_RDS sockets. Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com> Acked-by: Willem de Bruijn <willemb@google.com> Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/rds')
-rw-r--r--net/rds/af_rds.c7
-rw-r--r--net/rds/message.c38
-rw-r--r--net/rds/rds.h2
-rw-r--r--net/rds/recv.c31
4 files changed, 53 insertions, 25 deletions
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index a937f18896ae..f7126108a811 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -77,6 +77,7 @@ static int rds_release(struct socket *sock)
rds_send_drop_to(rs, NULL);
rds_rdma_drop_keys(rs);
rds_notify_queue_get(rs, NULL);
+ __skb_queue_purge(&rs->rs_zcookie_queue);
spin_lock_bh(&rds_sock_lock);
list_del_init(&rs->rs_item);
@@ -144,7 +145,7 @@ static int rds_getname(struct socket *sock, struct sockaddr *uaddr,
* - to signal that a previously congested destination may have become
* uncongested
* - A notification has been queued to the socket (this can be a congestion
- * update, or a RDMA completion).
+ * update, or a RDMA completion, or a MSG_ZEROCOPY completion).
*
* EPOLLOUT is asserted if there is room on the send queue. This does not mean
* however, that the next sendmsg() call will succeed. If the application tries
@@ -178,7 +179,8 @@ static __poll_t rds_poll(struct file *file, struct socket *sock,
spin_unlock(&rs->rs_lock);
}
if (!list_empty(&rs->rs_recv_queue) ||
- !list_empty(&rs->rs_notify_queue))
+ !list_empty(&rs->rs_notify_queue) ||
+ !skb_queue_empty(&rs->rs_zcookie_queue))
mask |= (EPOLLIN | EPOLLRDNORM);
if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
mask |= (EPOLLOUT | EPOLLWRNORM);
@@ -513,6 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
INIT_LIST_HEAD(&rs->rs_recv_queue);
INIT_LIST_HEAD(&rs->rs_notify_queue);
INIT_LIST_HEAD(&rs->rs_cong_list);
+ skb_queue_head_init(&rs->rs_zcookie_queue);
spin_lock_init(&rs->rs_rdma_lock);
rs->rs_rdma_keys = RB_ROOT;
rs->rs_rx_traces = 0;
diff --git a/net/rds/message.c b/net/rds/message.c
index 651834513481..116cf87ccb89 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -58,32 +58,26 @@ EXPORT_SYMBOL_GPL(rds_message_addref);
static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
{
- struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
- int ncookies;
- u32 *ptr;
+ struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb;
+ int ncookies = ck->num;
- if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE)
+ if (ncookies == RDS_MAX_ZCOOKIES)
return false;
- ncookies = serr->ee.ee_data;
- if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES)
- return false;
- ptr = skb_put(skb, sizeof(u32));
- *ptr = cookie;
- serr->ee.ee_data = ++ncookies;
+ ck->cookies[ncookies] = cookie;
+ ck->num = ++ncookies;
return true;
}
static void rds_rm_zerocopy_callback(struct rds_sock *rs,
struct rds_znotifier *znotif)
{
- struct sock *sk = rds_rs_to_sk(rs);
struct sk_buff *skb, *tail;
- struct sock_exterr_skb *serr;
unsigned long flags;
struct sk_buff_head *q;
u32 cookie = znotif->z_cookie;
+ struct rds_zcopy_cookies *ck;
- q = &sk->sk_error_queue;
+ q = &rs->rs_zcookie_queue;
spin_lock_irqsave(&q->lock, flags);
tail = skb_peek_tail(q);
@@ -91,22 +85,19 @@ static void rds_rm_zerocopy_callback(struct rds_sock *rs,
spin_unlock_irqrestore(&q->lock, flags);
mm_unaccount_pinned_pages(&znotif->z_mmp);
consume_skb(rds_skb_from_znotifier(znotif));
- sk->sk_error_report(sk);
+ /* caller invokes rds_wake_sk_sleep() */
return;
}
skb = rds_skb_from_znotifier(znotif);
- serr = SKB_EXT_ERR(skb);
- memset(&serr->ee, 0, sizeof(serr->ee));
- serr->ee.ee_errno = 0;
- serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
- serr->ee.ee_info = 0;
+ ck = (struct rds_zcopy_cookies *)skb->cb;
+ memset(ck, 0, sizeof(*ck));
WARN_ON(!skb_zcookie_add(skb, cookie));
__skb_queue_tail(q, skb);
spin_unlock_irqrestore(&q->lock, flags);
- sk->sk_error_report(sk);
+ /* caller invokes rds_wake_sk_sleep() */
mm_unaccount_pinned_pages(&znotif->z_mmp);
}
@@ -129,6 +120,7 @@ static void rds_message_purge(struct rds_message *rm)
if (rm->data.op_mmp_znotifier) {
zcopy = true;
rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
+ rds_wake_sk_sleep(rs);
rm->data.op_mmp_znotifier = NULL;
}
sock_put(rds_rs_to_sk(rs));
@@ -362,10 +354,12 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
int total_copied = 0;
struct sk_buff *skb;
- skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
- GFP_KERNEL);
+ skb = alloc_skb(0, GFP_KERNEL);
if (!skb)
return -ENOMEM;
+ BUILD_BUG_ON(sizeof(skb->cb) <
+ max_t(int, sizeof(struct rds_znotifier),
+ sizeof(struct rds_zcopy_cookies)));
rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
length)) {
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 31cd38852050..33b16353d8f3 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -603,6 +603,8 @@ struct rds_sock {
/* Socket receive path trace points*/
u8 rs_rx_traces;
u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
+
+ struct sk_buff_head rs_zcookie_queue;
};
static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
diff --git a/net/rds/recv.c b/net/rds/recv.c
index b080961464df..d50747725221 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -577,6 +577,32 @@ out:
return ret;
}
+static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
+{
+ struct sk_buff *skb;
+ struct sk_buff_head *q = &rs->rs_zcookie_queue;
+ struct rds_zcopy_cookies *done;
+
+ if (!msg->msg_control)
+ return false;
+
+ if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) ||
+ msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
+ return false;
+
+ skb = skb_dequeue(q);
+ if (!skb)
+ return false;
+ done = (struct rds_zcopy_cookies *)skb->cb;
+ if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
+ done)) {
+ skb_queue_head(q, skb);
+ return false;
+ }
+ consume_skb(skb);
+ return true;
+}
+
int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
int msg_flags)
{
@@ -611,7 +637,9 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
if (!rds_next_incoming(rs, &inc)) {
if (nonblock) {
- ret = -EAGAIN;
+ bool reaped = rds_recvmsg_zcookie(rs, msg);
+
+ ret = reaped ? 0 : -EAGAIN;
break;
}
@@ -660,6 +688,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
ret = -EFAULT;
goto out;
}
+ rds_recvmsg_zcookie(rs, msg);
rds_stats_inc(s_recv_delivered);