diff options
Diffstat (limited to 'fs')
-rw-r--r-- | fs/aio.c | 106 |
1 files changed, 99 insertions, 7 deletions
@@ -26,6 +26,7 @@ #include <linux/mm.h> #include <linux/mman.h> #include <linux/mmu_context.h> +#include <linux/percpu.h> #include <linux/slab.h> #include <linux/timer.h> #include <linux/aio.h> @@ -64,6 +65,10 @@ struct aio_ring { #define AIO_RING_PAGES 8 +struct kioctx_cpu { + unsigned reqs_available; +}; + struct kioctx { atomic_t users; atomic_t dead; @@ -72,6 +77,13 @@ struct kioctx { unsigned long user_id; struct hlist_node list; + struct __percpu kioctx_cpu *cpu; + + /* + * For percpu reqs_available, number of slots we move to/from global + * counter at a time: + */ + unsigned req_batch; /* * This is what userspace passed to io_setup(), it's not used for * anything but counting against the global max_reqs quota. @@ -99,6 +111,8 @@ struct kioctx { * so we avoid overflowing it: it's decremented (if positive) * when allocating a kiocb and incremented when the resulting * io_event is pulled off the ringbuffer. + * + * We batch accesses to it with a percpu version. */ atomic_t reqs_available; } ____cacheline_aligned_in_smp; @@ -379,6 +393,8 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb, static void free_ioctx_rcu(struct rcu_head *head) { struct kioctx *ctx = container_of(head, struct kioctx, rcu_head); + + free_percpu(ctx->cpu); kmem_cache_free(kioctx_cachep, ctx); } @@ -392,7 +408,7 @@ static void free_ioctx(struct kioctx *ctx) struct aio_ring *ring; struct io_event res; struct kiocb *req; - unsigned head, avail; + unsigned cpu, head, avail; spin_lock_irq(&ctx->ctx_lock); @@ -406,6 +422,13 @@ static void free_ioctx(struct kioctx *ctx) spin_unlock_irq(&ctx->ctx_lock); + for_each_possible_cpu(cpu) { + struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu); + + atomic_add(kcpu->reqs_available, &ctx->reqs_available); + kcpu->reqs_available = 0; + } + ring = kmap_atomic(ctx->ring_pages[0]); head = ring->head; kunmap_atomic(ring); @@ -454,6 +477,18 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) struct kioctx *ctx; int err = -ENOMEM; + /* + * We keep track of the number of available ringbuffer slots, to prevent + * overflow (reqs_available), and we also use percpu counters for this. + * + * So since up to half the slots might be on other cpu's percpu counters + * and unavailable, double nr_events so userspace sees what they + * expected: additionally, we move req_batch slots to/from percpu + * counters at a time, so make sure that isn't 0: + */ + nr_events = max(nr_events, num_possible_cpus() * 4); + nr_events *= 2; + /* Prevent overflows */ if ((nr_events > (0x10000000U / sizeof(struct io_event))) || (nr_events > (0x10000000U / sizeof(struct kiocb)))) { @@ -479,10 +514,16 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) INIT_LIST_HEAD(&ctx->active_reqs); - if (aio_setup_ring(ctx) < 0) + ctx->cpu = alloc_percpu(struct kioctx_cpu); + if (!ctx->cpu) goto out_freectx; + if (aio_setup_ring(ctx) < 0) + goto out_freepcpu; + atomic_set(&ctx->reqs_available, ctx->nr_events - 1); + ctx->req_batch = (ctx->nr_events - 1) / (num_possible_cpus() * 4); + BUG_ON(!ctx->req_batch); /* limit the number of system wide aios */ spin_lock(&aio_nr_lock); @@ -506,6 +547,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) out_cleanup: err = -EAGAIN; aio_free_ring(ctx); +out_freepcpu: + free_percpu(ctx->cpu); out_freectx: if (ctx->aio_ring_file) fput(ctx->aio_ring_file); @@ -610,6 +653,52 @@ void exit_aio(struct mm_struct *mm) } } +static void put_reqs_available(struct kioctx *ctx, unsigned nr) +{ + struct kioctx_cpu *kcpu; + + preempt_disable(); + kcpu = this_cpu_ptr(ctx->cpu); + + kcpu->reqs_available += nr; + while (kcpu->reqs_available >= ctx->req_batch * 2) { + kcpu->reqs_available -= ctx->req_batch; + atomic_add(ctx->req_batch, &ctx->reqs_available); + } + + preempt_enable(); +} + +static bool get_reqs_available(struct kioctx *ctx) +{ + struct kioctx_cpu *kcpu; + bool ret = false; + + preempt_disable(); + kcpu = this_cpu_ptr(ctx->cpu); + + if (!kcpu->reqs_available) { + int old, avail = atomic_read(&ctx->reqs_available); + + do { + if (avail < ctx->req_batch) + goto out; + + old = avail; + avail = atomic_cmpxchg(&ctx->reqs_available, + avail, avail - ctx->req_batch); + } while (avail != old); + + kcpu->reqs_available += ctx->req_batch; + } + + ret = true; + kcpu->reqs_available--; +out: + preempt_enable(); + return ret; +} + /* aio_get_req * Allocate a slot for an aio request. Increments the ki_users count * of the kioctx so that the kioctx stays around until all requests are @@ -624,7 +713,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx) { struct kiocb *req; - if (atomic_dec_if_positive(&ctx->reqs_available) <= 0) + if (!get_reqs_available(ctx)) return NULL; req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO); @@ -633,10 +722,9 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx) atomic_set(&req->ki_users, 2); req->ki_ctx = ctx; - return req; out_put: - atomic_inc(&ctx->reqs_available); + put_reqs_available(ctx, 1); return NULL; } @@ -725,6 +813,10 @@ void aio_complete(struct kiocb *iocb, long res, long res2) */ if (unlikely(xchg(&iocb->ki_cancel, KIOCB_CANCELLED) == KIOCB_CANCELLED)) { + /* + * Can't use the percpu reqs_available here - could race with + * free_ioctx() + */ atomic_inc(&ctx->reqs_available); /* Still need the wake_up in case free_ioctx is waiting */ goto put_rq; @@ -863,7 +955,7 @@ static long aio_read_events_ring(struct kioctx *ctx, pr_debug("%li h%u t%u\n", ret, head, ctx->tail); - atomic_add(ret, &ctx->reqs_available); + put_reqs_available(ctx, ret); out: mutex_unlock(&ctx->ring_lock); @@ -1247,7 +1339,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, aio_put_req(req); /* drop extra ref to req */ return 0; out_put_req: - atomic_inc(&ctx->reqs_available); + put_reqs_available(ctx, 1); aio_put_req(req); /* drop extra ref to req */ aio_put_req(req); /* drop i/o ref to req */ return ret; |