summaryrefslogtreecommitdiffstats
path: root/kernel/relay.c
diff options
context:
space:
mode:
authorTom Zanussi <zanussi@us.ibm.com>2007-06-04 09:12:05 +0200
committerJens Axboe <jens.axboe@oracle.com>2007-07-10 08:04:14 +0200
commitebf9909343392c929d9943c04f421cd42e03b530 (patch)
treefc484001e9af14cf5ca3b868b89de9c4f50719ad /kernel/relay.c
parentsendfile: convert nfsd to splice_direct_to_actor() (diff)
downloadlinux-ebf9909343392c929d9943c04f421cd42e03b530.tar.xz
linux-ebf9909343392c929d9943c04f421cd42e03b530.zip
splice: relay support
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
Diffstat (limited to 'kernel/relay.c')
-rw-r--r--kernel/relay.c241
1 files changed, 191 insertions, 50 deletions
diff --git a/kernel/relay.c b/kernel/relay.c
index 95db8c79fe8f..d1d1920f2800 100644
--- a/kernel/relay.c
+++ b/kernel/relay.c
@@ -21,6 +21,7 @@
#include <linux/vmalloc.h>
#include <linux/mm.h>
#include <linux/cpu.h>
+#include <linux/pipe_fs_i.h>
/* list of open channels, for cpu hotplug */
static DEFINE_MUTEX(relay_channels_mutex);
@@ -121,6 +122,7 @@ static void *relay_alloc_buf(struct rchan_buf *buf, size_t *size)
buf->page_array[i] = alloc_page(GFP_KERNEL);
if (unlikely(!buf->page_array[i]))
goto depopulate;
+ set_page_private(buf->page_array[i], (unsigned long)buf);
}
mem = vmap(buf->page_array, n_pages, VM_MAP, PAGE_KERNEL);
if (!mem)
@@ -970,43 +972,6 @@ static int subbuf_read_actor(size_t read_start,
return ret;
}
-/*
- * subbuf_send_actor - send up to one subbuf's worth of data
- */
-static int subbuf_send_actor(size_t read_start,
- struct rchan_buf *buf,
- size_t avail,
- read_descriptor_t *desc,
- read_actor_t actor)
-{
- unsigned long pidx, poff;
- unsigned int subbuf_pages;
- int ret = 0;
-
- subbuf_pages = buf->chan->alloc_size >> PAGE_SHIFT;
- pidx = (read_start / PAGE_SIZE) % subbuf_pages;
- poff = read_start & ~PAGE_MASK;
- while (avail) {
- struct page *p = buf->page_array[pidx];
- unsigned int len;
-
- len = PAGE_SIZE - poff;
- if (len > avail)
- len = avail;
-
- len = actor(desc, p, poff, len);
- if (desc->error)
- break;
-
- avail -= len;
- ret += len;
- poff = 0;
- pidx = (pidx + 1) % subbuf_pages;
- }
-
- return ret;
-}
-
typedef int (*subbuf_actor_t) (size_t read_start,
struct rchan_buf *buf,
size_t avail,
@@ -1067,19 +1032,195 @@ static ssize_t relay_file_read(struct file *filp,
NULL, &desc);
}
-static ssize_t relay_file_sendfile(struct file *filp,
- loff_t *ppos,
- size_t count,
- read_actor_t actor,
- void *target)
+static void relay_pipe_buf_release(struct pipe_inode_info *pipe,
+ struct pipe_buffer *buf)
{
- read_descriptor_t desc;
- desc.written = 0;
- desc.count = count;
- desc.arg.data = target;
- desc.error = 0;
- return relay_file_read_subbufs(filp, ppos, subbuf_send_actor,
- actor, &desc);
+ struct rchan_buf *rbuf;
+
+ rbuf = (struct rchan_buf *)page_private(buf->page);
+
+ rbuf->bytes_consumed += PAGE_SIZE;
+
+ if (rbuf->bytes_consumed == rbuf->chan->subbuf_size) {
+ relay_subbufs_consumed(rbuf->chan, rbuf->cpu, 1);
+ rbuf->bytes_consumed = 0;
+ }
+}
+
+static struct pipe_buf_operations relay_pipe_buf_ops = {
+ .can_merge = 0,
+ .map = generic_pipe_buf_map,
+ .unmap = generic_pipe_buf_unmap,
+ .pin = generic_pipe_buf_pin,
+ .release = relay_pipe_buf_release,
+ .steal = generic_pipe_buf_steal,
+ .get = generic_pipe_buf_get,
+};
+
+/**
+ * subbuf_splice_actor - splice up to one subbuf's worth of data
+ */
+static int subbuf_splice_actor(struct file *in,
+ loff_t *ppos,
+ struct pipe_inode_info *pipe,
+ size_t len,
+ unsigned int flags,
+ int *nonpad_ret)
+{
+ unsigned int pidx, poff;
+ unsigned int subbuf_pages;
+ int ret = 0;
+ int do_wakeup = 0;
+ struct rchan_buf *rbuf = in->private_data;
+ unsigned int subbuf_size = rbuf->chan->subbuf_size;
+ size_t read_start = ((size_t)*ppos) % rbuf->chan->alloc_size;
+ size_t avail = subbuf_size - read_start % subbuf_size;
+ size_t read_subbuf = read_start / subbuf_size;
+ size_t padding = rbuf->padding[read_subbuf];
+ size_t nonpad_end = read_subbuf * subbuf_size + subbuf_size - padding;
+
+ if (rbuf->subbufs_produced == rbuf->subbufs_consumed)
+ return 0;
+
+ if (len > avail)
+ len = avail;
+
+ if (pipe->inode)
+ mutex_lock(&pipe->inode->i_mutex);
+
+ subbuf_pages = rbuf->chan->alloc_size >> PAGE_SHIFT;
+ pidx = (read_start / PAGE_SIZE) % subbuf_pages;
+ poff = read_start & ~PAGE_MASK;
+
+ for (;;) {
+ unsigned int this_len;
+ unsigned int this_end;
+ int newbuf = (pipe->curbuf + pipe->nrbufs) & (PIPE_BUFFERS - 1);
+ struct pipe_buffer *buf = pipe->bufs + newbuf;
+
+ if (!pipe->readers) {
+ send_sig(SIGPIPE, current, 0);
+ if (!ret)
+ ret = -EPIPE;
+ break;
+ }
+
+ if (pipe->nrbufs < PIPE_BUFFERS) {
+ this_len = PAGE_SIZE - poff;
+ if (this_len > avail)
+ this_len = avail;
+
+ buf->page = rbuf->page_array[pidx];
+ buf->offset = poff;
+ this_end = read_start + ret + this_len;
+ if (this_end > nonpad_end) {
+ if (read_start + ret >= nonpad_end)
+ buf->len = 0;
+ else
+ buf->len = nonpad_end - (read_start + ret);
+ } else
+ buf->len = this_len;
+
+ *nonpad_ret += buf->len;
+
+ buf->ops = &relay_pipe_buf_ops;
+ pipe->nrbufs++;
+
+ avail -= this_len;
+ ret += this_len;
+ poff = 0;
+ pidx = (pidx + 1) % subbuf_pages;
+
+ if (pipe->inode)
+ do_wakeup = 1;
+
+ if (!avail)
+ break;
+
+ if (pipe->nrbufs < PIPE_BUFFERS)
+ continue;
+
+ break;
+ }
+
+ if (flags & SPLICE_F_NONBLOCK) {
+ if (!ret)
+ ret = -EAGAIN;
+ break;
+ }
+
+ if (signal_pending(current)) {
+ if (!ret)
+ ret = -ERESTARTSYS;
+ break;
+ }
+
+ if (do_wakeup) {
+ smp_mb();
+ if (waitqueue_active(&pipe->wait))
+ wake_up_interruptible_sync(&pipe->wait);
+ kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
+ do_wakeup = 0;
+ }
+
+ pipe->waiting_writers++;
+ pipe_wait(pipe);
+ pipe->waiting_writers--;
+ }
+
+ if (pipe->inode)
+ mutex_unlock(&pipe->inode->i_mutex);
+
+ if (do_wakeup) {
+ smp_mb();
+ if (waitqueue_active(&pipe->wait))
+ wake_up_interruptible(&pipe->wait);
+ kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
+ }
+
+ return ret;
+}
+
+static ssize_t relay_file_splice_read(struct file *in,
+ loff_t *ppos,
+ struct pipe_inode_info *pipe,
+ size_t len,
+ unsigned int flags)
+{
+ ssize_t spliced;
+ int ret;
+ int nonpad_ret = 0;
+
+ ret = 0;
+ spliced = 0;
+
+ while (len) {
+ ret = subbuf_splice_actor(in, ppos, pipe, len, flags, &nonpad_ret);
+ if (ret < 0)
+ break;
+ else if (!ret) {
+ break;
+ if (spliced)
+ break;
+ if (flags & SPLICE_F_NONBLOCK) {
+ ret = -EAGAIN;
+ break;
+ }
+ }
+
+ *ppos += ret;
+ if (ret > len)
+ len = 0;
+ else
+ len -= ret;
+ spliced += nonpad_ret;
+ nonpad_ret = 0;
+ }
+
+ if (spliced)
+ return spliced;
+
+ return ret;
}
const struct file_operations relay_file_operations = {
@@ -1089,7 +1230,7 @@ const struct file_operations relay_file_operations = {
.read = relay_file_read,
.llseek = no_llseek,
.release = relay_file_release,
- .sendfile = relay_file_sendfile,
+ .splice_read = relay_file_splice_read,
};
EXPORT_SYMBOL_GPL(relay_file_operations);