diff options
Diffstat (limited to 'fs/io_uring.c')
-rw-r--r-- | fs/io_uring.c | 2197 |
1 files changed, 1503 insertions, 694 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c index 2c819c3c855d..4c030a92de79 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -56,7 +56,6 @@ #include <linux/mmu_context.h> #include <linux/percpu.h> #include <linux/slab.h> -#include <linux/workqueue.h> #include <linux/kthread.h> #include <linux/blkdev.h> #include <linux/bvec.h> @@ -71,12 +70,24 @@ #include <linux/sizes.h> #include <linux/hugetlb.h> +#define CREATE_TRACE_POINTS +#include <trace/events/io_uring.h> + #include <uapi/linux/io_uring.h> #include "internal.h" +#include "io-wq.h" #define IORING_MAX_ENTRIES 32768 -#define IORING_MAX_FIXED_FILES 1024 +#define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES) + +/* + * Shift of 9 is 512 entries, or exactly one page on 64-bit archs + */ +#define IORING_FILE_TABLE_SHIFT 9 +#define IORING_MAX_FILES_TABLE (1U << IORING_FILE_TABLE_SHIFT) +#define IORING_FILE_TABLE_MASK (IORING_MAX_FILES_TABLE - 1) +#define IORING_MAX_FIXED_FILES (64 * IORING_MAX_FILES_TABLE) struct io_uring { u32 head ____cacheline_aligned_in_smp; @@ -161,14 +172,8 @@ struct io_mapped_ubuf { unsigned int nr_bvecs; }; -struct async_list { - spinlock_t lock; - atomic_t cnt; - struct list_head list; - - struct file *file; - off_t io_start; - size_t io_len; +struct fixed_file_table { + struct file **files; }; struct io_ring_ctx { @@ -180,6 +185,7 @@ struct io_ring_ctx { unsigned int flags; bool compat; bool account_mem; + bool cq_overflow_flushed; /* * Ring buffer of indices into array of io_uring_sqe, which is @@ -198,38 +204,30 @@ struct io_ring_ctx { unsigned sq_mask; unsigned sq_thread_idle; unsigned cached_sq_dropped; + atomic_t cached_cq_overflow; struct io_uring_sqe *sq_sqes; struct list_head defer_list; struct list_head timeout_list; + struct list_head cq_overflow_list; + + wait_queue_head_t inflight_wait; } ____cacheline_aligned_in_smp; + struct io_rings *rings; + /* IO offload */ - struct workqueue_struct *sqo_wq[2]; + struct io_wq *io_wq; struct task_struct *sqo_thread; /* if using sq thread polling */ struct mm_struct *sqo_mm; wait_queue_head_t sqo_wait; - struct completion sqo_thread_started; - - struct { - unsigned cached_cq_tail; - atomic_t cached_cq_overflow; - unsigned cq_entries; - unsigned cq_mask; - struct wait_queue_head cq_wait; - struct fasync_struct *cq_fasync; - struct eventfd_ctx *cq_ev_fd; - atomic_t cq_timeouts; - } ____cacheline_aligned_in_smp; - - struct io_rings *rings; /* * If used, fixed file set. Writers must ensure that ->refs is dead, * readers must ensure that ->refs is alive as long as the file* is * used. Only updated through io_uring_register(2). */ - struct file **user_files; + struct fixed_file_table *file_table; unsigned nr_user_files; /* if used, fixed mapped user buffers */ @@ -238,7 +236,25 @@ struct io_ring_ctx { struct user_struct *user; - struct completion ctx_done; + /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */ + struct completion *completions; + + /* if all else fails... */ + struct io_kiocb *fallback_req; + +#if defined(CONFIG_UNIX) + struct socket *ring_sock; +#endif + + struct { + unsigned cached_cq_tail; + unsigned cq_entries; + unsigned cq_mask; + atomic_t cq_timeouts; + struct wait_queue_head cq_wait; + struct fasync_struct *cq_fasync; + struct eventfd_ctx *cq_ev_fd; + } ____cacheline_aligned_in_smp; struct { struct mutex uring_lock; @@ -255,22 +271,20 @@ struct io_ring_ctx { * manipulate the list, hence no extra locking is needed there. */ struct list_head poll_list; - struct list_head cancel_list; - } ____cacheline_aligned_in_smp; + struct rb_root cancel_tree; - struct async_list pending_async[2]; - -#if defined(CONFIG_UNIX) - struct socket *ring_sock; -#endif + spinlock_t inflight_lock; + struct list_head inflight_list; + } ____cacheline_aligned_in_smp; }; struct sqe_submit { const struct io_uring_sqe *sqe; - unsigned short index; + struct file *ring_file; + int ring_fd; u32 sequence; bool has_user; - bool needs_lock; + bool in_async; bool needs_fixed_file; }; @@ -309,7 +323,10 @@ struct io_kiocb { struct sqe_submit submit; struct io_ring_ctx *ctx; - struct list_head list; + union { + struct list_head list; + struct rb_node rb_node; + }; struct list_head link_list; unsigned int flags; refcount_t refs; @@ -320,18 +337,22 @@ struct io_kiocb { #define REQ_F_IO_DRAIN 16 /* drain existing IO first */ #define REQ_F_IO_DRAINED 32 /* drain done */ #define REQ_F_LINK 64 /* linked sqes */ -#define REQ_F_LINK_DONE 128 /* linked sqes done */ +#define REQ_F_LINK_TIMEOUT 128 /* has linked timeout */ #define REQ_F_FAIL_LINK 256 /* fail rest of links */ #define REQ_F_SHADOW_DRAIN 512 /* link-drain shadow req */ #define REQ_F_TIMEOUT 1024 /* timeout request */ #define REQ_F_ISREG 2048 /* regular file */ #define REQ_F_MUST_PUNT 4096 /* must be punted even for NONBLOCK */ #define REQ_F_TIMEOUT_NOSEQ 8192 /* no timeout sequence */ +#define REQ_F_INFLIGHT 16384 /* on inflight list */ +#define REQ_F_COMP_LOCKED 32768 /* completion under lock */ u64 user_data; u32 result; u32 sequence; - struct work_struct work; + struct list_head inflight_entry; + + struct io_wq_work work; }; #define IO_PLUG_THRESHOLD 2 @@ -357,10 +378,11 @@ struct io_submit_state { unsigned int ios_left; }; -static void io_sq_wq_submit_work(struct work_struct *work); -static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, - long res); +static void io_wq_submit_work(struct io_wq_work **workptr); +static void io_cqring_fill_event(struct io_kiocb *req, long res); static void __io_free_req(struct io_kiocb *req); +static void io_put_req(struct io_kiocb *req); +static void io_double_put_req(struct io_kiocb *req); static struct kmem_cache *req_cachep; @@ -383,57 +405,67 @@ static void io_ring_ctx_ref_free(struct percpu_ref *ref) { struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs); - complete(&ctx->ctx_done); + complete(&ctx->completions[0]); } static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) { struct io_ring_ctx *ctx; - int i; ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); if (!ctx) return NULL; + ctx->fallback_req = kmem_cache_alloc(req_cachep, GFP_KERNEL); + if (!ctx->fallback_req) + goto err; + + ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL); + if (!ctx->completions) + goto err; + if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, - PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) { - kfree(ctx); - return NULL; - } + PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) + goto err; ctx->flags = p->flags; init_waitqueue_head(&ctx->cq_wait); - init_completion(&ctx->ctx_done); - init_completion(&ctx->sqo_thread_started); + INIT_LIST_HEAD(&ctx->cq_overflow_list); + init_completion(&ctx->completions[0]); + init_completion(&ctx->completions[1]); mutex_init(&ctx->uring_lock); init_waitqueue_head(&ctx->wait); - for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) { - spin_lock_init(&ctx->pending_async[i].lock); - INIT_LIST_HEAD(&ctx->pending_async[i].list); - atomic_set(&ctx->pending_async[i].cnt, 0); - } spin_lock_init(&ctx->completion_lock); INIT_LIST_HEAD(&ctx->poll_list); - INIT_LIST_HEAD(&ctx->cancel_list); + ctx->cancel_tree = RB_ROOT; INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); + init_waitqueue_head(&ctx->inflight_wait); + spin_lock_init(&ctx->inflight_lock); + INIT_LIST_HEAD(&ctx->inflight_list); return ctx; +err: + if (ctx->fallback_req) + kmem_cache_free(req_cachep, ctx->fallback_req); + kfree(ctx->completions); + kfree(ctx); + return NULL; } -static inline bool __io_sequence_defer(struct io_ring_ctx *ctx, - struct io_kiocb *req) +static inline bool __req_need_defer(struct io_kiocb *req) { + struct io_ring_ctx *ctx = req->ctx; + return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped + atomic_read(&ctx->cached_cq_overflow); } -static inline bool io_sequence_defer(struct io_ring_ctx *ctx, - struct io_kiocb *req) +static inline bool req_need_defer(struct io_kiocb *req) { - if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN) - return false; + if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) == REQ_F_IO_DRAIN) + return __req_need_defer(req); - return __io_sequence_defer(ctx, req); + return false; } static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) @@ -441,7 +473,7 @@ static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) struct io_kiocb *req; req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list); - if (req && !io_sequence_defer(ctx, req)) { + if (req && !req_need_defer(req)) { list_del_init(&req->list); return req; } @@ -457,7 +489,7 @@ static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx) if (req) { if (req->flags & REQ_F_TIMEOUT_NOSEQ) return NULL; - if (!__io_sequence_defer(ctx, req)) { + if (!__req_need_defer(req)) { list_del_init(&req->list); return req; } @@ -481,21 +513,59 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx) } } -static inline void io_queue_async_work(struct io_ring_ctx *ctx, - struct io_kiocb *req) +static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe) { - int rw = 0; + u8 opcode = READ_ONCE(sqe->opcode); + + return !(opcode == IORING_OP_READ_FIXED || + opcode == IORING_OP_WRITE_FIXED); +} + +static inline bool io_prep_async_work(struct io_kiocb *req) +{ + bool do_hashed = false; if (req->submit.sqe) { switch (req->submit.sqe->opcode) { case IORING_OP_WRITEV: case IORING_OP_WRITE_FIXED: - rw = !(req->rw.ki_flags & IOCB_DIRECT); + do_hashed = true; + /* fall-through */ + case IORING_OP_READV: + case IORING_OP_READ_FIXED: + case IORING_OP_SENDMSG: + case IORING_OP_RECVMSG: + case IORING_OP_ACCEPT: + case IORING_OP_POLL_ADD: + /* + * We know REQ_F_ISREG is not set on some of these + * opcodes, but this enables us to keep the check in + * just one place. + */ + if (!(req->flags & REQ_F_ISREG)) + req->work.flags |= IO_WQ_WORK_UNBOUND; break; } + if (io_sqe_needs_user(req->submit.sqe)) + req->work.flags |= IO_WQ_WORK_NEEDS_USER; } - queue_work(ctx->sqo_wq[rw], &req->work); + return do_hashed; +} + +static inline void io_queue_async_work(struct io_kiocb *req) +{ + bool do_hashed = io_prep_async_work(req); + struct io_ring_ctx *ctx = req->ctx; + + trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work, + req->flags); + if (!do_hashed) { + io_wq_enqueue(ctx->io_wq, &req->work); + } else { + io_wq_enqueue_hashed(ctx->io_wq, &req->work, + file_inode(req->file)); + } } static void io_kill_timeout(struct io_kiocb *req) @@ -505,9 +575,9 @@ static void io_kill_timeout(struct io_kiocb *req) ret = hrtimer_try_to_cancel(&req->timeout.timer); if (ret != -1) { atomic_inc(&req->ctx->cq_timeouts); - list_del(&req->list); - io_cqring_fill_event(req->ctx, req->user_data, 0); - __io_free_req(req); + list_del_init(&req->list); + io_cqring_fill_event(req, 0); + io_put_req(req); } } @@ -537,7 +607,7 @@ static void io_commit_cqring(struct io_ring_ctx *ctx) continue; } req->flags |= REQ_F_IO_DRAINED; - io_queue_async_work(ctx, req); + io_queue_async_work(req); } } @@ -559,50 +629,124 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) return &rings->cqes[tail & ctx->cq_mask]; } -static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, - long res) +static void io_cqring_ev_posted(struct io_ring_ctx *ctx) +{ + if (waitqueue_active(&ctx->wait)) + wake_up(&ctx->wait); + if (waitqueue_active(&ctx->sqo_wait)) + wake_up(&ctx->sqo_wait); + if (ctx->cq_ev_fd) + eventfd_signal(ctx->cq_ev_fd, 1); +} + +static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) +{ + struct io_rings *rings = ctx->rings; + struct io_uring_cqe *cqe; + struct io_kiocb *req; + unsigned long flags; + LIST_HEAD(list); + + if (!force) { + if (list_empty_careful(&ctx->cq_overflow_list)) + return; + if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) == + rings->cq_ring_entries)) + return; + } + + spin_lock_irqsave(&ctx->completion_lock, flags); + + /* if force is set, the ring is going away. always drop after that */ + if (force) + ctx->cq_overflow_flushed = true; + + while (!list_empty(&ctx->cq_overflow_list)) { + cqe = io_get_cqring(ctx); + if (!cqe && !force) + break; + + req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb, + list); + list_move(&req->list, &list); + if (cqe) { + WRITE_ONCE(cqe->user_data, req->user_data); + WRITE_ONCE(cqe->res, req->result); + WRITE_ONCE(cqe->flags, 0); + } else { + WRITE_ONCE(ctx->rings->cq_overflow, + atomic_inc_return(&ctx->cached_cq_overflow)); + } + } + + io_commit_cqring(ctx); + spin_unlock_irqrestore(&ctx->completion_lock, flags); + io_cqring_ev_posted(ctx); + + while (!list_empty(&list)) { + req = list_first_entry(&list, struct io_kiocb, list); + list_del(&req->list); + io_put_req(req); + } +} + +static void io_cqring_fill_event(struct io_kiocb *req, long res) { + struct io_ring_ctx *ctx = req->ctx; struct io_uring_cqe *cqe; + trace_io_uring_complete(ctx, req->user_data, res); + /* * If we can't get a cq entry, userspace overflowed the * submission (by quite a lot). Increment the overflow count in * the ring. */ cqe = io_get_cqring(ctx); - if (cqe) { - WRITE_ONCE(cqe->user_data, ki_user_data); + if (likely(cqe)) { + WRITE_ONCE(cqe->user_data, req->user_data); WRITE_ONCE(cqe->res, res); WRITE_ONCE(cqe->flags, 0); - } else { + } else if (ctx->cq_overflow_flushed) { WRITE_ONCE(ctx->rings->cq_overflow, atomic_inc_return(&ctx->cached_cq_overflow)); + } else { + refcount_inc(&req->refs); + req->result = res; + list_add_tail(&req->list, &ctx->cq_overflow_list); } } -static void io_cqring_ev_posted(struct io_ring_ctx *ctx) -{ - if (waitqueue_active(&ctx->wait)) - wake_up(&ctx->wait); - if (waitqueue_active(&ctx->sqo_wait)) - wake_up(&ctx->sqo_wait); - if (ctx->cq_ev_fd) - eventfd_signal(ctx->cq_ev_fd, 1); -} - -static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data, - long res) +static void io_cqring_add_event(struct io_kiocb *req, long res) { + struct io_ring_ctx *ctx = req->ctx; unsigned long flags; spin_lock_irqsave(&ctx->completion_lock, flags); - io_cqring_fill_event(ctx, user_data, res); + io_cqring_fill_event(req, res); io_commit_cqring(ctx); spin_unlock_irqrestore(&ctx->completion_lock, flags); io_cqring_ev_posted(ctx); } +static inline bool io_is_fallback_req(struct io_kiocb *req) +{ + return req == (struct io_kiocb *) + ((unsigned long) req->ctx->fallback_req & ~1UL); +} + +static struct io_kiocb *io_get_fallback_req(struct io_ring_ctx *ctx) +{ + struct io_kiocb *req; + + req = ctx->fallback_req; + if (!test_and_set_bit_lock(0, (unsigned long *) ctx->fallback_req)) + return req; + + return NULL; +} + static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx, struct io_submit_state *state) { @@ -615,7 +759,7 @@ static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx, if (!state) { req = kmem_cache_alloc(req_cachep, gfp); if (unlikely(!req)) - goto out; + goto fallback; } else if (!state->free_reqs) { size_t sz; int ret; @@ -630,7 +774,7 @@ static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx, if (unlikely(ret <= 0)) { state->reqs[0] = kmem_cache_alloc(req_cachep, gfp); if (!state->reqs[0]) - goto out; + goto fallback; ret = 1; } state->free_reqs = ret - 1; @@ -642,14 +786,19 @@ static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx, state->cur_req++; } +got_it: req->file = NULL; req->ctx = ctx; req->flags = 0; /* one is dropped after submission, the other at completion */ refcount_set(&req->refs, 2); req->result = 0; + INIT_IO_WORK(&req->work, io_wq_submit_work); return req; -out: +fallback: + req = io_get_fallback_req(ctx); + if (req) + goto got_it; percpu_ref_put(&ctx->refs); return NULL; } @@ -665,15 +814,48 @@ static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr) static void __io_free_req(struct io_kiocb *req) { + struct io_ring_ctx *ctx = req->ctx; + if (req->file && !(req->flags & REQ_F_FIXED_FILE)) fput(req->file); - percpu_ref_put(&req->ctx->refs); - kmem_cache_free(req_cachep, req); + if (req->flags & REQ_F_INFLIGHT) { + unsigned long flags; + + spin_lock_irqsave(&ctx->inflight_lock, flags); + list_del(&req->inflight_entry); + if (waitqueue_active(&ctx->inflight_wait)) + wake_up(&ctx->inflight_wait); + spin_unlock_irqrestore(&ctx->inflight_lock, flags); + } + percpu_ref_put(&ctx->refs); + if (likely(!io_is_fallback_req(req))) + kmem_cache_free(req_cachep, req); + else + clear_bit_unlock(0, (unsigned long *) ctx->fallback_req); } -static void io_req_link_next(struct io_kiocb *req) +static bool io_link_cancel_timeout(struct io_kiocb *req) { + struct io_ring_ctx *ctx = req->ctx; + int ret; + + ret = hrtimer_try_to_cancel(&req->timeout.timer); + if (ret != -1) { + io_cqring_fill_event(req, -ECANCELED); + io_commit_cqring(ctx); + req->flags &= ~REQ_F_LINK; + io_put_req(req); + return true; + } + + return false; +} + +static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr) +{ + struct io_ring_ctx *ctx = req->ctx; struct io_kiocb *nxt; + bool wake_ev = false; /* * The list should never be empty when we are called here. But could @@ -681,18 +863,35 @@ static void io_req_link_next(struct io_kiocb *req) * safe side. */ nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list); - if (nxt) { - list_del(&nxt->list); + while (nxt) { + list_del_init(&nxt->list); if (!list_empty(&req->link_list)) { INIT_LIST_HEAD(&nxt->link_list); list_splice(&req->link_list, &nxt->link_list); nxt->flags |= REQ_F_LINK; } - nxt->flags |= REQ_F_LINK_DONE; - INIT_WORK(&nxt->work, io_sq_wq_submit_work); - io_queue_async_work(req->ctx, nxt); + /* + * If we're in async work, we can continue processing the chain + * in this context instead of having to queue up new async work. + */ + if (req->flags & REQ_F_LINK_TIMEOUT) { + wake_ev = io_link_cancel_timeout(nxt); + + /* we dropped this link, get next */ + nxt = list_first_entry_or_null(&req->link_list, + struct io_kiocb, list); + } else if (nxtptr && io_wq_current_is_worker()) { + *nxtptr = nxt; + break; + } else { + io_queue_async_work(nxt); + break; + } } + + if (wake_ev) + io_cqring_ev_posted(ctx); } /* @@ -700,43 +899,118 @@ static void io_req_link_next(struct io_kiocb *req) */ static void io_fail_links(struct io_kiocb *req) { + struct io_ring_ctx *ctx = req->ctx; struct io_kiocb *link; + unsigned long flags; + + spin_lock_irqsave(&ctx->completion_lock, flags); while (!list_empty(&req->link_list)) { link = list_first_entry(&req->link_list, struct io_kiocb, list); - list_del(&link->list); + list_del_init(&link->list); - io_cqring_add_event(req->ctx, link->user_data, -ECANCELED); - __io_free_req(link); + trace_io_uring_fail_link(req, link); + + if ((req->flags & REQ_F_LINK_TIMEOUT) && + link->submit.sqe->opcode == IORING_OP_LINK_TIMEOUT) { + io_link_cancel_timeout(link); + } else { + io_cqring_fill_event(link, -ECANCELED); + io_double_put_req(link); + } } + + io_commit_cqring(ctx); + spin_unlock_irqrestore(&ctx->completion_lock, flags); + io_cqring_ev_posted(ctx); } -static void io_free_req(struct io_kiocb *req) +static void io_free_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt) { + if (likely(!(req->flags & REQ_F_LINK))) { + __io_free_req(req); + return; + } + /* * If LINK is set, we have dependent requests in this chain. If we * didn't fail this request, queue the first one up, moving any other * dependencies to the next request. In case of failure, fail the rest * of the chain. */ - if (req->flags & REQ_F_LINK) { - if (req->flags & REQ_F_FAIL_LINK) - io_fail_links(req); - else - io_req_link_next(req); + if (req->flags & REQ_F_FAIL_LINK) { + io_fail_links(req); + } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) == + REQ_F_LINK_TIMEOUT) { + struct io_ring_ctx *ctx = req->ctx; + unsigned long flags; + + /* + * If this is a timeout link, we could be racing with the + * timeout timer. Grab the completion lock for this case to + * protect against that. + */ + spin_lock_irqsave(&ctx->completion_lock, flags); + io_req_link_next(req, nxt); + spin_unlock_irqrestore(&ctx->completion_lock, flags); + } else { + io_req_link_next(req, nxt); } __io_free_req(req); } +static void io_free_req(struct io_kiocb *req) +{ + io_free_req_find_next(req, NULL); +} + +/* + * Drop reference to request, return next in chain (if there is one) if this + * was the last reference to this request. + */ +static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr) +{ + struct io_kiocb *nxt = NULL; + + if (refcount_dec_and_test(&req->refs)) + io_free_req_find_next(req, &nxt); + + if (nxt) { + if (nxtptr) + *nxtptr = nxt; + else + io_queue_async_work(nxt); + } +} + static void io_put_req(struct io_kiocb *req) { if (refcount_dec_and_test(&req->refs)) io_free_req(req); } -static unsigned io_cqring_events(struct io_rings *rings) +static void io_double_put_req(struct io_kiocb *req) +{ + /* drop both submit and complete references */ + if (refcount_sub_and_test(2, &req->refs)) + __io_free_req(req); +} + +static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush) { + struct io_rings *rings = ctx->rings; + + /* + * noflush == true is from the waitqueue handler, just ensure we wake + * up the task, and the next invocation will flush the entries. We + * cannot safely to it from here. + */ + if (noflush && !list_empty(&ctx->cq_overflow_list)) + return -1U; + + io_cqring_overflow_flush(ctx, false); + /* See comment at the top of this file */ smp_rmb(); return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head); @@ -765,7 +1039,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, req = list_first_entry(done, struct io_kiocb, list); list_del(&req->list); - io_cqring_fill_event(ctx, req->user_data, req->result); + io_cqring_fill_event(req, req->result); (*nr_events)++; if (refcount_dec_and_test(&req->refs)) { @@ -774,8 +1048,8 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, * completions for those, only batch free for fixed * file and non-linked commands. */ - if ((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) == - REQ_F_FIXED_FILE) { + if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) == + REQ_F_FIXED_FILE) && !io_is_fallback_req(req)) { reqs[to_free++] = req; if (to_free == ARRAY_SIZE(reqs)) io_free_req_many(ctx, reqs, &to_free); @@ -892,7 +1166,7 @@ static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events, * If we do, we can potentially be spinning for commands that * already triggered a CQE (eg in error). */ - if (io_cqring_events(ctx->rings)) + if (io_cqring_events(ctx, false)) break; /* @@ -952,7 +1226,7 @@ static void kiocb_end_write(struct io_kiocb *req) file_end_write(req->file); } -static void io_complete_rw(struct kiocb *kiocb, long res, long res2) +static void io_complete_rw_common(struct kiocb *kiocb, long res) { struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); @@ -961,10 +1235,28 @@ static void io_complete_rw(struct kiocb *kiocb, long res, long res2) if ((req->flags & REQ_F_LINK) && res != req->result) req->flags |= REQ_F_FAIL_LINK; - io_cqring_add_event(req->ctx, req->user_data, res); + io_cqring_add_event(req, res); +} + +static void io_complete_rw(struct kiocb *kiocb, long res, long res2) +{ + struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); + + io_complete_rw_common(kiocb, res); io_put_req(req); } +static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res) +{ + struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); + struct io_kiocb *nxt = NULL; + + io_complete_rw_common(kiocb, res); + io_put_req_find_next(req, &nxt); + + return nxt; +} + static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2) { struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); @@ -1072,10 +1364,9 @@ static bool io_file_supports_async(struct file *file) return false; } -static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s, - bool force_nonblock) +static int io_prep_rw(struct io_kiocb *req, bool force_nonblock) { - const struct io_uring_sqe *sqe = s->sqe; + const struct io_uring_sqe *sqe = req->submit.sqe; struct io_ring_ctx *ctx = req->ctx; struct kiocb *kiocb = &req->rw; unsigned ioprio; @@ -1159,6 +1450,15 @@ static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret) } } +static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt, + bool in_async) +{ + if (in_async && ret >= 0 && nxt && kiocb->ki_complete == io_complete_rw) + *nxt = __io_complete_rw(kiocb, ret); + else + io_rw_done(kiocb, ret); +} + static int io_import_fixed(struct io_ring_ctx *ctx, int rw, const struct io_uring_sqe *sqe, struct iov_iter *iter) @@ -1270,65 +1570,6 @@ static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw, return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter); } -static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb) -{ - if (al->file == kiocb->ki_filp) { - off_t start, end; - - /* - * Allow merging if we're anywhere in the range of the same - * page. Generally this happens for sub-page reads or writes, - * and it's beneficial to allow the first worker to bring the - * page in and the piggy backed work can then work on the - * cached page. - */ - start = al->io_start & PAGE_MASK; - end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK; - if (kiocb->ki_pos >= start && kiocb->ki_pos <= end) - return true; - } - - al->file = NULL; - return false; -} - -/* - * Make a note of the last file/offset/direction we punted to async - * context. We'll use this information to see if we can piggy back a - * sequential request onto the previous one, if it's still hasn't been - * completed by the async worker. - */ -static void io_async_list_note(int rw, struct io_kiocb *req, size_t len) -{ - struct async_list *async_list = &req->ctx->pending_async[rw]; - struct kiocb *kiocb = &req->rw; - struct file *filp = kiocb->ki_filp; - - if (io_should_merge(async_list, kiocb)) { - unsigned long max_bytes; - - /* Use 8x RA size as a decent limiter for both reads/writes */ - max_bytes = filp->f_ra.ra_pages << (PAGE_SHIFT + 3); - if (!max_bytes) - max_bytes = VM_READAHEAD_PAGES << (PAGE_SHIFT + 3); - - /* If max len are exceeded, reset the state */ - if (async_list->io_len + len <= max_bytes) { - req->flags |= REQ_F_SEQ_PREV; - async_list->io_len += len; - } else { - async_list->file = NULL; - } - } - - /* New file? Reset state. */ - if (async_list->file != filp) { - async_list->io_start = kiocb->ki_pos; - async_list->io_len = len; - async_list->file = filp; - } -} - /* * For files that don't have ->read_iter() and ->write_iter(), handle them * by looping over ->read() or ->write() manually. @@ -1374,7 +1615,7 @@ static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb, return ret; } -static int io_read(struct io_kiocb *req, const struct sqe_submit *s, +static int io_read(struct io_kiocb *req, struct io_kiocb **nxt, bool force_nonblock) { struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; @@ -1384,7 +1625,7 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s, size_t iov_count; ssize_t read_size, ret; - ret = io_prep_rw(req, s, force_nonblock); + ret = io_prep_rw(req, force_nonblock); if (ret) return ret; file = kiocb->ki_filp; @@ -1392,7 +1633,7 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s, if (unlikely(!(file->f_mode & FMODE_READ))) return -EBADF; - ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter); + ret = io_import_iovec(req->ctx, READ, &req->submit, &iovec, &iter); if (ret < 0) return ret; @@ -1423,23 +1664,16 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s, ret2 > 0 && ret2 < read_size) ret2 = -EAGAIN; /* Catch -EAGAIN return for forced non-blocking submission */ - if (!force_nonblock || ret2 != -EAGAIN) { - io_rw_done(kiocb, ret2); - } else { - /* - * If ->needs_lock is true, we're already in async - * context. - */ - if (!s->needs_lock) - io_async_list_note(READ, req, iov_count); + if (!force_nonblock || ret2 != -EAGAIN) + kiocb_done(kiocb, ret2, nxt, req->submit.in_async); + else ret = -EAGAIN; - } } kfree(iovec); return ret; } -static int io_write(struct io_kiocb *req, const struct sqe_submit *s, +static int io_write(struct io_kiocb *req, struct io_kiocb **nxt, bool force_nonblock) { struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; @@ -1449,7 +1683,7 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s, size_t iov_count; ssize_t ret; - ret = io_prep_rw(req, s, force_nonblock); + ret = io_prep_rw(req, force_nonblock); if (ret) return ret; @@ -1457,7 +1691,7 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s, if (unlikely(!(file->f_mode & FMODE_WRITE))) return -EBADF; - ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter); + ret = io_import_iovec(req->ctx, WRITE, &req->submit, &iovec, &iter); if (ret < 0) return ret; @@ -1467,12 +1701,8 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s, iov_count = iov_iter_count(&iter); ret = -EAGAIN; - if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) { - /* If ->needs_lock is true, we're already in async context. */ - if (!s->needs_lock) - io_async_list_note(WRITE, req, iov_count); + if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) goto out_free; - } ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count); if (!ret) { @@ -1497,17 +1727,10 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s, ret2 = call_write_iter(file, kiocb, &iter); else ret2 = loop_rw_iter(WRITE, file, kiocb, &iter); - if (!force_nonblock || ret2 != -EAGAIN) { - io_rw_done(kiocb, ret2); - } else { - /* - * If ->needs_lock is true, we're already in async - * context. - */ - if (!s->needs_lock) - io_async_list_note(WRITE, req, iov_count); + if (!force_nonblock || ret2 != -EAGAIN) + kiocb_done(kiocb, ret2, nxt, req->submit.in_async); + else ret = -EAGAIN; - } } out_free: kfree(iovec); @@ -1517,15 +1740,14 @@ out_free: /* * IORING_OP_NOP just posts a completion event, nothing else. */ -static int io_nop(struct io_kiocb *req, u64 user_data) +static int io_nop(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; - long err = 0; if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) return -EINVAL; - io_cqring_add_event(ctx, user_data, err); + io_cqring_add_event(req, 0); io_put_req(req); return 0; } @@ -1546,7 +1768,7 @@ static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe) } static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe, - bool force_nonblock) + struct io_kiocb **nxt, bool force_nonblock) { loff_t sqe_off = READ_ONCE(sqe->off); loff_t sqe_len = READ_ONCE(sqe->len); @@ -1572,8 +1794,8 @@ static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe, if (ret < 0 && (req->flags & REQ_F_LINK)) req->flags |= REQ_F_FAIL_LINK; - io_cqring_add_event(req->ctx, sqe->user_data, ret); - io_put_req(req); + io_cqring_add_event(req, ret); + io_put_req_find_next(req, nxt); return 0; } @@ -1595,6 +1817,7 @@ static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe) static int io_sync_file_range(struct io_kiocb *req, const struct io_uring_sqe *sqe, + struct io_kiocb **nxt, bool force_nonblock) { loff_t sqe_off; @@ -1618,14 +1841,14 @@ static int io_sync_file_range(struct io_kiocb *req, if (ret < 0 && (req->flags & REQ_F_LINK)) req->flags |= REQ_F_FAIL_LINK; - io_cqring_add_event(req->ctx, sqe->user_data, ret); - io_put_req(req); + io_cqring_add_event(req, ret); + io_put_req_find_next(req, nxt); return 0; } #if defined(CONFIG_NET) static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe, - bool force_nonblock, + struct io_kiocb **nxt, bool force_nonblock, long (*fn)(struct socket *, struct user_msghdr __user *, unsigned int)) { @@ -1654,32 +1877,80 @@ static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe, return ret; } - io_cqring_add_event(req->ctx, sqe->user_data, ret); - io_put_req(req); + io_cqring_add_event(req, ret); + if (ret < 0 && (req->flags & REQ_F_LINK)) + req->flags |= REQ_F_FAIL_LINK; + io_put_req_find_next(req, nxt); return 0; } #endif static int io_sendmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe, - bool force_nonblock) + struct io_kiocb **nxt, bool force_nonblock) { #if defined(CONFIG_NET) - return io_send_recvmsg(req, sqe, force_nonblock, __sys_sendmsg_sock); + return io_send_recvmsg(req, sqe, nxt, force_nonblock, + __sys_sendmsg_sock); #else return -EOPNOTSUPP; #endif } static int io_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe, - bool force_nonblock) + struct io_kiocb **nxt, bool force_nonblock) { #if defined(CONFIG_NET) - return io_send_recvmsg(req, sqe, force_nonblock, __sys_recvmsg_sock); + return io_send_recvmsg(req, sqe, nxt, force_nonblock, + __sys_recvmsg_sock); +#else + return -EOPNOTSUPP; +#endif +} + +static int io_accept(struct io_kiocb *req, const struct io_uring_sqe *sqe, + struct io_kiocb **nxt, bool force_nonblock) +{ +#if defined(CONFIG_NET) + struct sockaddr __user *addr; + int __user *addr_len; + unsigned file_flags; + int flags, ret; + + if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL))) + return -EINVAL; + if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index) + return -EINVAL; + + addr = (struct sockaddr __user *) (unsigned long) READ_ONCE(sqe->addr); + addr_len = (int __user *) (unsigned long) READ_ONCE(sqe->addr2); + flags = READ_ONCE(sqe->accept_flags); + file_flags = force_nonblock ? O_NONBLOCK : 0; + + ret = __sys_accept4_file(req->file, file_flags, addr, addr_len, flags); + if (ret == -EAGAIN && force_nonblock) { + req->work.flags |= IO_WQ_WORK_NEEDS_FILES; + return -EAGAIN; + } + if (ret == -ERESTARTSYS) + ret = -EINTR; + if (ret < 0 && (req->flags & REQ_F_LINK)) + req->flags |= REQ_F_FAIL_LINK; + io_cqring_add_event(req, ret); + io_put_req_find_next(req, nxt); + return 0; #else return -EOPNOTSUPP; #endif } +static inline void io_poll_remove_req(struct io_kiocb *req) +{ + if (!RB_EMPTY_NODE(&req->rb_node)) { + rb_erase(&req->rb_node, &req->ctx->cancel_tree); + RB_CLEAR_NODE(&req->rb_node); + } +} + static void io_poll_remove_one(struct io_kiocb *req) { struct io_poll_iocb *poll = &req->poll; @@ -1688,25 +1959,47 @@ static void io_poll_remove_one(struct io_kiocb *req) WRITE_ONCE(poll->canceled, true); if (!list_empty(&poll->wait.entry)) { list_del_init(&poll->wait.entry); - io_queue_async_work(req->ctx, req); + io_queue_async_work(req); } spin_unlock(&poll->head->lock); - - list_del_init(&req->list); + io_poll_remove_req(req); } static void io_poll_remove_all(struct io_ring_ctx *ctx) { + struct rb_node *node; struct io_kiocb *req; spin_lock_irq(&ctx->completion_lock); - while (!list_empty(&ctx->cancel_list)) { - req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list); + while ((node = rb_first(&ctx->cancel_tree)) != NULL) { + req = rb_entry(node, struct io_kiocb, rb_node); io_poll_remove_one(req); } spin_unlock_irq(&ctx->completion_lock); } +static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr) +{ + struct rb_node *p, *parent = NULL; + struct io_kiocb *req; + + p = ctx->cancel_tree.rb_node; + while (p) { + parent = p; + req = rb_entry(parent, struct io_kiocb, rb_node); + if (sqe_addr < req->user_data) { + p = p->rb_left; + } else if (sqe_addr > req->user_data) { + p = p->rb_right; + } else { + io_poll_remove_one(req); + return 0; + } + } + + return -ENOENT; +} + /* * Find a running poll command that matches one specified in sqe->addr, * and remove it if found. @@ -1714,8 +2007,7 @@ static void io_poll_remove_all(struct io_ring_ctx *ctx) static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe) { struct io_ring_ctx *ctx = req->ctx; - struct io_kiocb *poll_req, *next; - int ret = -ENOENT; + int ret; if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) return -EINVAL; @@ -1724,36 +2016,38 @@ static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe) return -EINVAL; spin_lock_irq(&ctx->completion_lock); - list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) { - if (READ_ONCE(sqe->addr) == poll_req->user_data) { - io_poll_remove_one(poll_req); - ret = 0; - break; - } - } + ret = io_poll_cancel(ctx, READ_ONCE(sqe->addr)); spin_unlock_irq(&ctx->completion_lock); - io_cqring_add_event(req->ctx, sqe->user_data, ret); + io_cqring_add_event(req, ret); + if (ret < 0 && (req->flags & REQ_F_LINK)) + req->flags |= REQ_F_FAIL_LINK; io_put_req(req); return 0; } -static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req, - __poll_t mask) +static void io_poll_complete(struct io_kiocb *req, __poll_t mask) { + struct io_ring_ctx *ctx = req->ctx; + req->poll.done = true; - io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask)); + io_cqring_fill_event(req, mangle_poll(mask)); io_commit_cqring(ctx); } -static void io_poll_complete_work(struct work_struct *work) +static void io_poll_complete_work(struct io_wq_work **workptr) { + struct io_wq_work *work = *workptr; struct io_kiocb *req = container_of(work, struct io_kiocb, work); struct io_poll_iocb *poll = &req->poll; struct poll_table_struct pt = { ._key = poll->events }; struct io_ring_ctx *ctx = req->ctx; + struct io_kiocb *nxt = NULL; __poll_t mask = 0; + if (work->flags & IO_WQ_WORK_CANCEL) + WRITE_ONCE(poll->canceled, true); + if (!READ_ONCE(poll->canceled)) mask = vfs_poll(poll->file, &pt) & poll->events; @@ -1770,12 +2064,15 @@ static void io_poll_complete_work(struct work_struct *work) spin_unlock_irq(&ctx->completion_lock); return; } - list_del_init(&req->list); - io_poll_complete(ctx, req, mask); + io_poll_remove_req(req); + io_poll_complete(req, mask); spin_unlock_irq(&ctx->completion_lock); io_cqring_ev_posted(ctx); - io_put_req(req); + + io_put_req_find_next(req, &nxt); + if (nxt) + *workptr = &nxt->work; } static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, @@ -1794,15 +2091,22 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, list_del_init(&poll->wait.entry); + /* + * Run completion inline if we can. We're using trylock here because + * we are violating the completion_lock -> poll wq lock ordering. + * If we have a link timeout we're going to need the completion_lock + * for finalizing the request, mark us as having grabbed that already. + */ if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) { - list_del(&req->list); - io_poll_complete(ctx, req, mask); + io_poll_remove_req(req); + io_poll_complete(req, mask); + req->flags |= REQ_F_COMP_LOCKED; + io_put_req(req); spin_unlock_irqrestore(&ctx->completion_lock, flags); io_cqring_ev_posted(ctx); - io_put_req(req); } else { - io_queue_async_work(ctx, req); + io_queue_async_work(req); } return 1; @@ -1829,7 +2133,27 @@ static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head, add_wait_queue(head, &pt->req->poll.wait); } -static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe) +static void io_poll_req_insert(struct io_kiocb *req) +{ + struct io_ring_ctx *ctx = req->ctx; + struct rb_node **p = &ctx->cancel_tree.rb_node; + struct rb_node *parent = NULL; + struct io_kiocb *tmp; + + while (*p) { + parent = *p; + tmp = rb_entry(parent, struct io_kiocb, rb_node); + if (req->user_data < tmp->user_data) + p = &(*p)->rb_left; + else + p = &(*p)->rb_right; + } + rb_link_node(&req->rb_node, parent, p); + rb_insert_color(&req->rb_node, &ctx->cancel_tree); +} + +static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe, + struct io_kiocb **nxt) { struct io_poll_iocb *poll = &req->poll; struct io_ring_ctx *ctx = req->ctx; @@ -1846,9 +2170,10 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe) return -EBADF; req->submit.sqe = NULL; - INIT_WORK(&req->work, io_poll_complete_work); + INIT_IO_WORK(&req->work, io_poll_complete_work); events = READ_ONCE(sqe->poll_events); poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP; + RB_CLEAR_NODE(&req->rb_node); poll->head = NULL; poll->done = false; @@ -1881,18 +2206,18 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe) else if (cancel) WRITE_ONCE(poll->canceled, true); else if (!poll->done) /* actually waiting for an event */ - list_add_tail(&req->list, &ctx->cancel_list); + io_poll_req_insert(req); spin_unlock(&poll->head->lock); } if (mask) { /* no async, we'd stolen it */ ipt.error = 0; - io_poll_complete(ctx, req, mask); + io_poll_complete(req, mask); } spin_unlock_irq(&ctx->completion_lock); if (mask) { io_cqring_ev_posted(ctx); - io_put_req(req); + io_put_req_find_next(req, nxt); } return ipt.error; } @@ -1900,7 +2225,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe) static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer) { struct io_ring_ctx *ctx; - struct io_kiocb *req, *prev; + struct io_kiocb *req; unsigned long flags; req = container_of(timer, struct io_kiocb, timeout.timer); @@ -1909,43 +2234,118 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer) spin_lock_irqsave(&ctx->completion_lock, flags); /* - * Adjust the reqs sequence before the current one because it - * will consume a slot in the cq_ring and the the cq_tail pointer - * will be increased, otherwise other timeout reqs may return in - * advance without waiting for enough wait_nr. + * We could be racing with timeout deletion. If the list is empty, + * then timeout lookup already found it and will be handling it. */ - prev = req; - list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list) - prev->sequence++; - list_del(&req->list); + if (!list_empty(&req->list)) { + struct io_kiocb *prev; + + /* + * Adjust the reqs sequence before the current one because it + * will consume a slot in the cq_ring and the the cq_tail + * pointer will be increased, otherwise other timeout reqs may + * return in advance without waiting for enough wait_nr. + */ + prev = req; + list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list) + prev->sequence++; + list_del_init(&req->list); + } - io_cqring_fill_event(ctx, req->user_data, -ETIME); + io_cqring_fill_event(req, -ETIME); io_commit_cqring(ctx); spin_unlock_irqrestore(&ctx->completion_lock, flags); io_cqring_ev_posted(ctx); - + if (req->flags & REQ_F_LINK) + req->flags |= REQ_F_FAIL_LINK; io_put_req(req); return HRTIMER_NORESTART; } +static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data) +{ + struct io_kiocb *req; + int ret = -ENOENT; + + list_for_each_entry(req, &ctx->timeout_list, list) { + if (user_data == req->user_data) { + list_del_init(&req->list); + ret = 0; + break; + } + } + + if (ret == -ENOENT) + return ret; + + ret = hrtimer_try_to_cancel(&req->timeout.timer); + if (ret == -1) + return -EALREADY; + + io_cqring_fill_event(req, -ECANCELED); + io_put_req(req); + return 0; +} + +/* + * Remove or update an existing timeout command + */ +static int io_timeout_remove(struct io_kiocb *req, + const struct io_uring_sqe *sqe) +{ + struct io_ring_ctx *ctx = req->ctx; + unsigned flags; + int ret; + + if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) + return -EINVAL; + if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len) + return -EINVAL; + flags = READ_ONCE(sqe->timeout_flags); + if (flags) + return -EINVAL; + + spin_lock_irq(&ctx->completion_lock); + ret = io_timeout_cancel(ctx, READ_ONCE(sqe->addr)); + + io_cqring_fill_event(req, ret); + io_commit_cqring(ctx); + spin_unlock_irq(&ctx->completion_lock); + io_cqring_ev_posted(ctx); + if (ret < 0 && req->flags & REQ_F_LINK) + req->flags |= REQ_F_FAIL_LINK; + io_put_req(req); + return 0; +} + static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe) { unsigned count; struct io_ring_ctx *ctx = req->ctx; struct list_head *entry; + enum hrtimer_mode mode; struct timespec64 ts; unsigned span = 0; + unsigned flags; if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) return -EINVAL; - if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->timeout_flags || - sqe->len != 1) + if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len != 1) + return -EINVAL; + flags = READ_ONCE(sqe->timeout_flags); + if (flags & ~IORING_TIMEOUT_ABS) return -EINVAL; if (get_timespec64(&ts, u64_to_user_ptr(sqe->addr))) return -EFAULT; + if (flags & IORING_TIMEOUT_ABS) + mode = HRTIMER_MODE_ABS; + else + mode = HRTIMER_MODE_REL; + + hrtimer_init(&req->timeout.timer, CLOCK_MONOTONIC, mode); req->flags |= REQ_F_TIMEOUT; /* @@ -2006,21 +2406,92 @@ static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe) req->sequence -= span; add: list_add(&req->list, entry); + req->timeout.timer.function = io_timeout_fn; + hrtimer_start(&req->timeout.timer, timespec64_to_ktime(ts), mode); spin_unlock_irq(&ctx->completion_lock); + return 0; +} - hrtimer_init(&req->timeout.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL); - req->timeout.timer.function = io_timeout_fn; - hrtimer_start(&req->timeout.timer, timespec64_to_ktime(ts), - HRTIMER_MODE_REL); +static bool io_cancel_cb(struct io_wq_work *work, void *data) +{ + struct io_kiocb *req = container_of(work, struct io_kiocb, work); + + return req->user_data == (unsigned long) data; +} + +static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr) +{ + enum io_wq_cancel cancel_ret; + int ret = 0; + + cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr); + switch (cancel_ret) { + case IO_WQ_CANCEL_OK: + ret = 0; + break; + case IO_WQ_CANCEL_RUNNING: + ret = -EALREADY; + break; + case IO_WQ_CANCEL_NOTFOUND: + ret = -ENOENT; + break; + } + + return ret; +} + +static void io_async_find_and_cancel(struct io_ring_ctx *ctx, + struct io_kiocb *req, __u64 sqe_addr, + struct io_kiocb **nxt) +{ + unsigned long flags; + int ret; + + ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr); + if (ret != -ENOENT) { + spin_lock_irqsave(&ctx->completion_lock, flags); + goto done; + } + + spin_lock_irqsave(&ctx->completion_lock, flags); + ret = io_timeout_cancel(ctx, sqe_addr); + if (ret != -ENOENT) + goto done; + ret = io_poll_cancel(ctx, sqe_addr); +done: + io_cqring_fill_event(req, ret); + io_commit_cqring(ctx); + spin_unlock_irqrestore(&ctx->completion_lock, flags); + io_cqring_ev_posted(ctx); + + if (ret < 0 && (req->flags & REQ_F_LINK)) + req->flags |= REQ_F_FAIL_LINK; + io_put_req_find_next(req, nxt); +} + +static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe, + struct io_kiocb **nxt) +{ + struct io_ring_ctx *ctx = req->ctx; + + if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) + return -EINVAL; + if (sqe->flags || sqe->ioprio || sqe->off || sqe->len || + sqe->cancel_flags) + return -EINVAL; + + io_async_find_and_cancel(ctx, req, READ_ONCE(sqe->addr), NULL); return 0; } -static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req, - const struct io_uring_sqe *sqe) +static int io_req_defer(struct io_kiocb *req) { + const struct io_uring_sqe *sqe = req->submit.sqe; struct io_uring_sqe *sqe_copy; + struct io_ring_ctx *ctx = req->ctx; - if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) + /* Still need defer if there is pending req in defer list. */ + if (!req_need_defer(req) && list_empty(&ctx->defer_list)) return 0; sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL); @@ -2028,7 +2499,7 @@ static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req, return -EAGAIN; spin_lock_irq(&ctx->completion_lock); - if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) { + if (!req_need_defer(req) && list_empty(&ctx->defer_list)) { spin_unlock_irq(&ctx->completion_lock); kfree(sqe_copy); return 0; @@ -2037,64 +2508,70 @@ static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req, memcpy(sqe_copy, sqe, sizeof(*sqe_copy)); req->submit.sqe = sqe_copy; - INIT_WORK(&req->work, io_sq_wq_submit_work); + trace_io_uring_defer(ctx, req, false); list_add_tail(&req->list, &ctx->defer_list); spin_unlock_irq(&ctx->completion_lock); return -EIOCBQUEUED; } -static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, - const struct sqe_submit *s, bool force_nonblock) +static int __io_submit_sqe(struct io_kiocb *req, struct io_kiocb **nxt, + bool force_nonblock) { int ret, opcode; - - req->user_data = READ_ONCE(s->sqe->user_data); - - if (unlikely(s->index >= ctx->sq_entries)) - return -EINVAL; + struct sqe_submit *s = &req->submit; + struct io_ring_ctx *ctx = req->ctx; opcode = READ_ONCE(s->sqe->opcode); switch (opcode) { case IORING_OP_NOP: - ret = io_nop(req, req->user_data); + ret = io_nop(req); break; case IORING_OP_READV: if (unlikely(s->sqe->buf_index)) return -EINVAL; - ret = io_read(req, s, force_nonblock); + ret = io_read(req, nxt, force_nonblock); break; case IORING_OP_WRITEV: if (unlikely(s->sqe->buf_index)) return -EINVAL; - ret = io_write(req, s, force_nonblock); + ret = io_write(req, nxt, force_nonblock); break; case IORING_OP_READ_FIXED: - ret = io_read(req, s, force_nonblock); + ret = io_read(req, nxt, force_nonblock); break; case IORING_OP_WRITE_FIXED: - ret = io_write(req, s, force_nonblock); + ret = io_write(req, nxt, force_nonblock); break; case IORING_OP_FSYNC: - ret = io_fsync(req, s->sqe, force_nonblock); + ret = io_fsync(req, s->sqe, nxt, force_nonblock); break; case IORING_OP_POLL_ADD: - ret = io_poll_add(req, s->sqe); + ret = io_poll_add(req, s->sqe, nxt); break; case IORING_OP_POLL_REMOVE: ret = io_poll_remove(req, s->sqe); break; case IORING_OP_SYNC_FILE_RANGE: - ret = io_sync_file_range(req, s->sqe, force_nonblock); + ret = io_sync_file_range(req, s->sqe, nxt, force_nonblock); break; case IORING_OP_SENDMSG: - ret = io_sendmsg(req, s->sqe, force_nonblock); + ret = io_sendmsg(req, s->sqe, nxt, force_nonblock); break; case IORING_OP_RECVMSG: - ret = io_recvmsg(req, s->sqe, force_nonblock); + ret = io_recvmsg(req, s->sqe, nxt, force_nonblock); break; case IORING_OP_TIMEOUT: ret = io_timeout(req, s->sqe); break; + case IORING_OP_TIMEOUT_REMOVE: + ret = io_timeout_remove(req, s->sqe); + break; + case IORING_OP_ACCEPT: + ret = io_accept(req, s->sqe, nxt, force_nonblock); + break; + case IORING_OP_ASYNC_CANCEL: + ret = io_async_cancel(req, s->sqe, nxt); + break; default: ret = -EINVAL; break; @@ -2108,187 +2585,65 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, return -EAGAIN; /* workqueue context doesn't hold uring_lock, grab it now */ - if (s->needs_lock) + if (s->in_async) mutex_lock(&ctx->uring_lock); io_iopoll_req_issued(req); - if (s->needs_lock) + if (s->in_async) mutex_unlock(&ctx->uring_lock); } return 0; } -static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx, - const struct io_uring_sqe *sqe) -{ - switch (sqe->opcode) { - case IORING_OP_READV: - case IORING_OP_READ_FIXED: - return &ctx->pending_async[READ]; - case IORING_OP_WRITEV: - case IORING_OP_WRITE_FIXED: - return &ctx->pending_async[WRITE]; - default: - return NULL; - } -} - -static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe) -{ - u8 opcode = READ_ONCE(sqe->opcode); - - return !(opcode == IORING_OP_READ_FIXED || - opcode == IORING_OP_WRITE_FIXED); -} - -static void io_sq_wq_submit_work(struct work_struct *work) +static void io_wq_submit_work(struct io_wq_work **workptr) { + struct io_wq_work *work = *workptr; struct io_kiocb *req = container_of(work, struct io_kiocb, work); - struct io_ring_ctx *ctx = req->ctx; - struct mm_struct *cur_mm = NULL; - struct async_list *async_list; - LIST_HEAD(req_list); - mm_segment_t old_fs; - int ret; + struct sqe_submit *s = &req->submit; + const struct io_uring_sqe *sqe = s->sqe; + struct io_kiocb *nxt = NULL; + int ret = 0; - async_list = io_async_list_from_sqe(ctx, req->submit.sqe); -restart: - do { - struct sqe_submit *s = &req->submit; - const struct io_uring_sqe *sqe = s->sqe; - unsigned int flags = req->flags; + /* Ensure we clear previously set non-block flag */ + req->rw.ki_flags &= ~IOCB_NOWAIT; - /* Ensure we clear previously set non-block flag */ - req->rw.ki_flags &= ~IOCB_NOWAIT; + if (work->flags & IO_WQ_WORK_CANCEL) + ret = -ECANCELED; - ret = 0; - if (io_sqe_needs_user(sqe) && !cur_mm) { - if (!mmget_not_zero(ctx->sqo_mm)) { - ret = -EFAULT; - } else { - cur_mm = ctx->sqo_mm; - use_mm(cur_mm); - old_fs = get_fs(); - set_fs(USER_DS); - } - } + if (!ret) { + s->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0; + s->in_async = true; + do { + ret = __io_submit_sqe(req, &nxt, false); + /* + * We can get EAGAIN for polled IO even though we're + * forcing a sync submission from here, since we can't + * wait for request slots on the block side. + */ + if (ret != -EAGAIN) + break; + cond_resched(); + } while (1); + } - if (!ret) { - s->has_user = cur_mm != NULL; - s->needs_lock = true; - do { - ret = __io_submit_sqe(ctx, req, s, false); - /* - * We can get EAGAIN for polled IO even though - * we're forcing a sync submission from here, - * since we can't wait for request slots on the - * block side. - */ - if (ret != -EAGAIN) - break; - cond_resched(); - } while (1); - } + /* drop submission reference */ + io_put_req(req); - /* drop submission reference */ + if (ret) { + if (req->flags & REQ_F_LINK) + req->flags |= REQ_F_FAIL_LINK; + io_cqring_add_event(req, ret); io_put_req(req); - - if (ret) { - io_cqring_add_event(ctx, sqe->user_data, ret); - io_put_req(req); - } - - /* async context always use a copy of the sqe */ - kfree(sqe); - - /* req from defer and link list needn't decrease async cnt */ - if (flags & (REQ_F_IO_DRAINED | REQ_F_LINK_DONE)) - goto out; - - if (!async_list) - break; - if (!list_empty(&req_list)) { - req = list_first_entry(&req_list, struct io_kiocb, - list); - list_del(&req->list); - continue; - } - if (list_empty(&async_list->list)) - break; - - req = NULL; - spin_lock(&async_list->lock); - if (list_empty(&async_list->list)) { - spin_unlock(&async_list->lock); - break; - } - list_splice_init(&async_list->list, &req_list); - spin_unlock(&async_list->lock); - - req = list_first_entry(&req_list, struct io_kiocb, list); - list_del(&req->list); - } while (req); - - /* - * Rare case of racing with a submitter. If we find the count has - * dropped to zero AND we have pending work items, then restart - * the processing. This is a tiny race window. - */ - if (async_list) { - ret = atomic_dec_return(&async_list->cnt); - while (!ret && !list_empty(&async_list->list)) { - spin_lock(&async_list->lock); - atomic_inc(&async_list->cnt); - list_splice_init(&async_list->list, &req_list); - spin_unlock(&async_list->lock); - - if (!list_empty(&req_list)) { - req = list_first_entry(&req_list, - struct io_kiocb, list); - list_del(&req->list); - goto restart; - } - ret = atomic_dec_return(&async_list->cnt); - } - } - -out: - if (cur_mm) { - set_fs(old_fs); - unuse_mm(cur_mm); - mmput(cur_mm); } -} -/* - * See if we can piggy back onto previously submitted work, that is still - * running. We currently only allow this if the new request is sequential - * to the previous one we punted. - */ -static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req) -{ - bool ret; - - if (!list) - return false; - if (!(req->flags & REQ_F_SEQ_PREV)) - return false; - if (!atomic_read(&list->cnt)) - return false; + /* async context always use a copy of the sqe */ + kfree(sqe); - ret = true; - spin_lock(&list->lock); - list_add_tail(&req->list, &list->list); - /* - * Ensure we see a simultaneous modification from io_sq_wq_submit_work() - */ - smp_mb(); - if (!atomic_read(&list->cnt)) { - list_del_init(&req->list); - ret = false; + /* if a dependent link is ready, pass it back */ + if (!ret && nxt) { + io_prep_async_work(nxt); + *workptr = &nxt->work; } - spin_unlock(&list->lock); - return ret; } static bool io_op_needs_file(const struct io_uring_sqe *sqe) @@ -2299,15 +2654,28 @@ static bool io_op_needs_file(const struct io_uring_sqe *sqe) case IORING_OP_NOP: case IORING_OP_POLL_REMOVE: case IORING_OP_TIMEOUT: + case IORING_OP_TIMEOUT_REMOVE: + case IORING_OP_ASYNC_CANCEL: + case IORING_OP_LINK_TIMEOUT: return false; default: return true; } } -static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s, - struct io_submit_state *state, struct io_kiocb *req) +static inline struct file *io_file_from_index(struct io_ring_ctx *ctx, + int index) { + struct fixed_file_table *table; + + table = &ctx->file_table[index >> IORING_FILE_TABLE_SHIFT]; + return table->files[index & IORING_FILE_TABLE_MASK]; +} + +static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req) +{ + struct sqe_submit *s = &req->submit; + struct io_ring_ctx *ctx = req->ctx; unsigned flags; int fd; @@ -2327,14 +2695,18 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s, return 0; if (flags & IOSQE_FIXED_FILE) { - if (unlikely(!ctx->user_files || + if (unlikely(!ctx->file_table || (unsigned) fd >= ctx->nr_user_files)) return -EBADF; - req->file = ctx->user_files[fd]; + fd = array_index_nospec(fd, ctx->nr_user_files); + req->file = io_file_from_index(ctx, fd); + if (!req->file) + return -EBADF; req->flags |= REQ_F_FIXED_FILE; } else { if (s->needs_fixed_file) return -EBADF; + trace_io_uring_file_get(ctx, fd); req->file = io_file_get(state, fd); if (unlikely(!req->file)) return -EBADF; @@ -2343,12 +2715,146 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s, return 0; } -static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, - struct sqe_submit *s) +static int io_grab_files(struct io_kiocb *req) +{ + int ret = -EBADF; + struct io_ring_ctx *ctx = req->ctx; + + rcu_read_lock(); + spin_lock_irq(&ctx->inflight_lock); + /* + * We use the f_ops->flush() handler to ensure that we can flush + * out work accessing these files if the fd is closed. Check if + * the fd has changed since we started down this path, and disallow + * this operation if it has. + */ + if (fcheck(req->submit.ring_fd) == req->submit.ring_file) { + list_add(&req->inflight_entry, &ctx->inflight_list); + req->flags |= REQ_F_INFLIGHT; + req->work.files = current->files; + ret = 0; + } + spin_unlock_irq(&ctx->inflight_lock); + rcu_read_unlock(); + + return ret; +} + +static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer) +{ + struct io_kiocb *req = container_of(timer, struct io_kiocb, + timeout.timer); + struct io_ring_ctx *ctx = req->ctx; + struct io_kiocb *prev = NULL; + unsigned long flags; + + spin_lock_irqsave(&ctx->completion_lock, flags); + + /* + * We don't expect the list to be empty, that will only happen if we + * race with the completion of the linked work. + */ + if (!list_empty(&req->list)) { + prev = list_entry(req->list.prev, struct io_kiocb, link_list); + if (refcount_inc_not_zero(&prev->refs)) + list_del_init(&req->list); + else + prev = NULL; + } + + spin_unlock_irqrestore(&ctx->completion_lock, flags); + + if (prev) { + io_async_find_and_cancel(ctx, req, prev->user_data, NULL); + io_put_req(prev); + } else { + io_cqring_add_event(req, -ETIME); + io_put_req(req); + } + return HRTIMER_NORESTART; +} + +static void io_queue_linked_timeout(struct io_kiocb *req, struct timespec64 *ts, + enum hrtimer_mode *mode) +{ + struct io_ring_ctx *ctx = req->ctx; + + /* + * If the list is now empty, then our linked request finished before + * we got a chance to setup the timer + */ + spin_lock_irq(&ctx->completion_lock); + if (!list_empty(&req->list)) { + req->timeout.timer.function = io_link_timeout_fn; + hrtimer_start(&req->timeout.timer, timespec64_to_ktime(*ts), + *mode); + } + spin_unlock_irq(&ctx->completion_lock); + + /* drop submission reference */ + io_put_req(req); +} + +static int io_validate_link_timeout(const struct io_uring_sqe *sqe, + struct timespec64 *ts) +{ + if (sqe->ioprio || sqe->buf_index || sqe->len != 1 || sqe->off) + return -EINVAL; + if (sqe->timeout_flags & ~IORING_TIMEOUT_ABS) + return -EINVAL; + if (get_timespec64(ts, u64_to_user_ptr(sqe->addr))) + return -EFAULT; + + return 0; +} + +static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req, + struct timespec64 *ts, + enum hrtimer_mode *mode) { + struct io_kiocb *nxt; int ret; - ret = __io_submit_sqe(ctx, req, s, true); + if (!(req->flags & REQ_F_LINK)) + return NULL; + + nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list); + if (!nxt || nxt->submit.sqe->opcode != IORING_OP_LINK_TIMEOUT) + return NULL; + + ret = io_validate_link_timeout(nxt->submit.sqe, ts); + if (ret) { + list_del_init(&nxt->list); + io_cqring_add_event(nxt, ret); + io_double_put_req(nxt); + return ERR_PTR(-ECANCELED); + } + + if (nxt->submit.sqe->timeout_flags & IORING_TIMEOUT_ABS) + *mode = HRTIMER_MODE_ABS; + else + *mode = HRTIMER_MODE_REL; + + req->flags |= REQ_F_LINK_TIMEOUT; + hrtimer_init(&nxt->timeout.timer, CLOCK_MONOTONIC, *mode); + return nxt; +} + +static int __io_queue_sqe(struct io_kiocb *req) +{ + enum hrtimer_mode mode; + struct io_kiocb *nxt; + struct timespec64 ts; + int ret; + + nxt = io_prep_linked_timeout(req, &ts, &mode); + if (IS_ERR(nxt)) { + ret = PTR_ERR(nxt); + nxt = NULL; + goto err; + } + + ret = __io_submit_sqe(req, NULL, true); /* * We async punt it if the file wasn't marked NOWAIT, or if the file @@ -2356,36 +2862,47 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, */ if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) || (req->flags & REQ_F_MUST_PUNT))) { + struct sqe_submit *s = &req->submit; struct io_uring_sqe *sqe_copy; sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL); if (sqe_copy) { - struct async_list *list; - s->sqe = sqe_copy; - memcpy(&req->submit, s, sizeof(*s)); - list = io_async_list_from_sqe(ctx, s->sqe); - if (!io_add_to_prev_work(list, req)) { - if (list) - atomic_inc(&list->cnt); - INIT_WORK(&req->work, io_sq_wq_submit_work); - io_queue_async_work(ctx, req); + if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) { + ret = io_grab_files(req); + if (ret) { + kfree(sqe_copy); + goto err; + } } /* * Queued up for async execution, worker will release * submit reference when the iocb is actually submitted. */ + io_queue_async_work(req); + + if (nxt) + io_queue_linked_timeout(nxt, &ts, &mode); + return 0; } } +err: /* drop submission reference */ io_put_req(req); + if (nxt) { + if (!ret) + io_queue_linked_timeout(nxt, &ts, &mode); + else + io_put_req(nxt); + } + /* and drop final reference, if we failed */ if (ret) { - io_cqring_add_event(ctx, req->user_data, ret); + io_cqring_add_event(req, ret); if (req->flags & REQ_F_LINK) req->flags |= REQ_F_FAIL_LINK; io_put_req(req); @@ -2394,31 +2911,30 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, return ret; } -static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, - struct sqe_submit *s) +static int io_queue_sqe(struct io_kiocb *req) { int ret; - ret = io_req_defer(ctx, req, s->sqe); + ret = io_req_defer(req); if (ret) { if (ret != -EIOCBQUEUED) { - io_free_req(req); - io_cqring_add_event(ctx, s->sqe->user_data, ret); + io_cqring_add_event(req, ret); + io_double_put_req(req); } return 0; } - return __io_queue_sqe(ctx, req, s); + return __io_queue_sqe(req); } -static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req, - struct sqe_submit *s, struct io_kiocb *shadow) +static int io_queue_link_head(struct io_kiocb *req, struct io_kiocb *shadow) { int ret; int need_submit = false; + struct io_ring_ctx *ctx = req->ctx; if (!shadow) - return io_queue_sqe(ctx, req, s); + return io_queue_sqe(req); /* * Mark the first IO in link list as DRAIN, let all the following @@ -2426,12 +2942,12 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req, * list. */ req->flags |= REQ_F_IO_DRAIN; - ret = io_req_defer(ctx, req, s->sqe); + ret = io_req_defer(req); if (ret) { if (ret != -EIOCBQUEUED) { - io_free_req(req); + io_cqring_add_event(req, ret); + io_double_put_req(req); __io_free_req(shadow); - io_cqring_add_event(ctx, s->sqe->user_data, ret); return 0; } } else { @@ -2444,47 +2960,42 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req, /* Insert shadow req to defer_list, blocking next IOs */ spin_lock_irq(&ctx->completion_lock); + trace_io_uring_defer(ctx, shadow, true); list_add_tail(&shadow->list, &ctx->defer_list); spin_unlock_irq(&ctx->completion_lock); if (need_submit) - return __io_queue_sqe(ctx, req, s); + return __io_queue_sqe(req); return 0; } #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK) -static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, - struct io_submit_state *state, struct io_kiocb **link) +static void io_submit_sqe(struct io_kiocb *req, struct io_submit_state *state, + struct io_kiocb **link) { struct io_uring_sqe *sqe_copy; - struct io_kiocb *req; + struct sqe_submit *s = &req->submit; + struct io_ring_ctx *ctx = req->ctx; int ret; + req->user_data = s->sqe->user_data; + /* enforce forwards compatibility on users */ if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS)) { ret = -EINVAL; - goto err; - } - - req = io_get_req(ctx, state); - if (unlikely(!req)) { - ret = -EAGAIN; - goto err; + goto err_req; } - ret = io_req_set_file(ctx, s, state, req); + ret = io_req_set_file(state, req); if (unlikely(ret)) { err_req: - io_free_req(req); -err: - io_cqring_add_event(ctx, s->sqe->user_data, ret); + io_cqring_add_event(req, ret); + io_double_put_req(req); return; } - req->user_data = s->sqe->user_data; - /* * If we already have a head request, queue this one for async * submittal once the head completes. If we don't have a head but @@ -2502,16 +3013,19 @@ err: } s->sqe = sqe_copy; - memcpy(&req->submit, s, sizeof(*s)); + trace_io_uring_link(ctx, req, prev); list_add_tail(&req->list, &prev->link_list); } else if (s->sqe->flags & IOSQE_IO_LINK) { req->flags |= REQ_F_LINK; - memcpy(&req->submit, s, sizeof(*s)); INIT_LIST_HEAD(&req->link_list); *link = req; + } else if (READ_ONCE(s->sqe->opcode) == IORING_OP_LINK_TIMEOUT) { + /* Only valid as a linked SQE */ + ret = -EINVAL; + goto err_req; } else { - io_queue_sqe(ctx, req, s); + io_queue_sqe(req); } } @@ -2582,7 +3096,7 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) head = READ_ONCE(sq_array[head & ctx->sq_mask]); if (head < ctx->sq_entries) { - s->index = head; + s->ring_file = NULL; s->sqe = &ctx->sq_sqes[head]; s->sequence = ctx->cached_sq_head; ctx->cached_sq_head++; @@ -2597,13 +3111,19 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) } static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr, - bool has_user, bool mm_fault) + struct file *ring_file, int ring_fd, + struct mm_struct **mm, bool async) { struct io_submit_state state, *statep = NULL; struct io_kiocb *link = NULL; struct io_kiocb *shadow_req = NULL; - bool prev_was_link = false; int i, submitted = 0; + bool mm_fault = false; + + if (!list_empty(&ctx->cq_overflow_list)) { + io_cqring_overflow_flush(ctx, false); + return -EBUSY; + } if (nr > IO_PLUG_THRESHOLD) { io_submit_state_start(&state, ctx, nr); @@ -2611,23 +3131,31 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr, } for (i = 0; i < nr; i++) { - struct sqe_submit s; + struct io_kiocb *req; + unsigned int sqe_flags; - if (!io_get_sqring(ctx, &s)) + req = io_get_req(ctx, statep); + if (unlikely(!req)) { + if (!submitted) + submitted = -EAGAIN; break; + } + if (!io_get_sqring(ctx, &req->submit)) { + __io_free_req(req); + break; + } - /* - * If previous wasn't linked and we have a linked command, - * that's the end of the chain. Submit the previous link. - */ - if (!prev_was_link && link) { - io_queue_link_head(ctx, link, &link->submit, shadow_req); - link = NULL; - shadow_req = NULL; + if (io_sqe_needs_user(req->submit.sqe) && !*mm) { + mm_fault = mm_fault || !mmget_not_zero(ctx->sqo_mm); + if (!mm_fault) { + use_mm(ctx->sqo_mm); + *mm = ctx->sqo_mm; + } } - prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0; - if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) { + sqe_flags = req->submit.sqe->flags; + + if (link && (sqe_flags & IOSQE_IO_DRAIN)) { if (!shadow_req) { shadow_req = io_get_req(ctx, NULL); if (unlikely(!shadow_req)) @@ -2635,27 +3163,39 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr, shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN); refcount_dec(&shadow_req->refs); } - shadow_req->sequence = s.sequence; + shadow_req->sequence = req->submit.sequence; } out: - if (unlikely(mm_fault)) { - io_cqring_add_event(ctx, s.sqe->user_data, - -EFAULT); - } else { - s.has_user = has_user; - s.needs_lock = true; - s.needs_fixed_file = true; - io_submit_sqe(ctx, &s, statep, &link); - submitted++; + req->submit.ring_file = ring_file; + req->submit.ring_fd = ring_fd; + req->submit.has_user = *mm != NULL; + req->submit.in_async = async; + req->submit.needs_fixed_file = async; + trace_io_uring_submit_sqe(ctx, req->submit.sqe->user_data, + true, async); + io_submit_sqe(req, statep, &link); + submitted++; + + /* + * If previous wasn't linked and we have a linked command, + * that's the end of the chain. Submit the previous link. + */ + if (!(sqe_flags & IOSQE_IO_LINK) && link) { + io_queue_link_head(link, shadow_req); + link = NULL; + shadow_req = NULL; } } if (link) - io_queue_link_head(ctx, link, &link->submit, shadow_req); + io_queue_link_head(link, shadow_req); if (statep) io_submit_state_end(&state); + /* Commit SQ ring head once we've consumed and submitted all SQEs */ + io_commit_sqring(ctx); + return submitted; } @@ -2667,15 +3207,15 @@ static int io_sq_thread(void *data) DEFINE_WAIT(wait); unsigned inflight; unsigned long timeout; + int ret; - complete(&ctx->sqo_thread_started); + complete(&ctx->completions[1]); old_fs = get_fs(); set_fs(USER_DS); - timeout = inflight = 0; + ret = timeout = inflight = 0; while (!kthread_should_park()) { - bool mm_fault = false; unsigned int to_submit; if (inflight) { @@ -2710,13 +3250,21 @@ static int io_sq_thread(void *data) } to_submit = io_sqring_entries(ctx); - if (!to_submit) { + + /* + * If submit got -EBUSY, flag us as needing the application + * to enter the kernel to reap and flush events. + */ + if (!to_submit || ret == -EBUSY) { /* * We're polling. If we're within the defined idle * period, then let us spin without work before going - * to sleep. + * to sleep. The exception is if we got EBUSY doing + * more IO, we should wait for the application to + * reap events and wake us up. */ - if (inflight || !time_after(jiffies, timeout)) { + if (inflight || + (!time_after(jiffies, timeout) && ret != -EBUSY)) { cond_resched(); continue; } @@ -2742,7 +3290,7 @@ static int io_sq_thread(void *data) smp_mb(); to_submit = io_sqring_entries(ctx); - if (!to_submit) { + if (!to_submit || ret == -EBUSY) { if (kthread_should_park()) { finish_wait(&ctx->sqo_wait, &wait); break; @@ -2760,21 +3308,10 @@ static int io_sq_thread(void *data) ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP; } - /* Unless all new commands are FIXED regions, grab mm */ - if (!cur_mm) { - mm_fault = !mmget_not_zero(ctx->sqo_mm); - if (!mm_fault) { - use_mm(ctx->sqo_mm); - cur_mm = ctx->sqo_mm; - } - } - to_submit = min(to_submit, ctx->sq_entries); - inflight += io_submit_sqes(ctx, to_submit, cur_mm != NULL, - mm_fault); - - /* Commit SQ ring head once we've consumed all SQEs */ - io_commit_sqring(ctx); + ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true); + if (ret > 0) + inflight += ret; } set_fs(old_fs); @@ -2788,65 +3325,6 @@ static int io_sq_thread(void *data) return 0; } -static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) -{ - struct io_submit_state state, *statep = NULL; - struct io_kiocb *link = NULL; - struct io_kiocb *shadow_req = NULL; - bool prev_was_link = false; - int i, submit = 0; - - if (to_submit > IO_PLUG_THRESHOLD) { - io_submit_state_start(&state, ctx, to_submit); - statep = &state; - } - - for (i = 0; i < to_submit; i++) { - struct sqe_submit s; - - if (!io_get_sqring(ctx, &s)) - break; - - /* - * If previous wasn't linked and we have a linked command, - * that's the end of the chain. Submit the previous link. - */ - if (!prev_was_link && link) { - io_queue_link_head(ctx, link, &link->submit, shadow_req); - link = NULL; - shadow_req = NULL; - } - prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0; - - if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) { - if (!shadow_req) { - shadow_req = io_get_req(ctx, NULL); - if (unlikely(!shadow_req)) - goto out; - shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN); - refcount_dec(&shadow_req->refs); - } - shadow_req->sequence = s.sequence; - } - -out: - s.has_user = true; - s.needs_lock = false; - s.needs_fixed_file = false; - submit++; - io_submit_sqe(ctx, &s, statep, &link); - } - - if (link) - io_queue_link_head(ctx, link, &link->submit, shadow_req); - if (statep) - io_submit_state_end(statep); - - io_commit_sqring(ctx); - - return submit; -} - struct io_wait_queue { struct wait_queue_entry wq; struct io_ring_ctx *ctx; @@ -2854,7 +3332,7 @@ struct io_wait_queue { unsigned nr_timeouts; }; -static inline bool io_should_wake(struct io_wait_queue *iowq) +static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush) { struct io_ring_ctx *ctx = iowq->ctx; @@ -2863,7 +3341,7 @@ static inline bool io_should_wake(struct io_wait_queue *iowq) * started waiting. For timeouts, we always want to return to userspace, * regardless of event count. */ - return io_cqring_events(ctx->rings) >= iowq->to_wait || + return io_cqring_events(ctx, noflush) >= iowq->to_wait || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts; } @@ -2873,7 +3351,8 @@ static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq); - if (!io_should_wake(iowq)) + /* use noflush == true, as we can't safely rely on locking context */ + if (!io_should_wake(iowq, true)) return -1; return autoremove_wake_function(curr, mode, wake_flags, key); @@ -2896,9 +3375,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, .to_wait = min_events, }; struct io_rings *rings = ctx->rings; - int ret; + int ret = 0; - if (io_cqring_events(rings) >= min_events) + if (io_cqring_events(ctx, false) >= min_events) return 0; if (sig) { @@ -2914,24 +3393,22 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, return ret; } - ret = 0; iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); + trace_io_uring_cqring_wait(ctx, min_events); do { prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, TASK_INTERRUPTIBLE); - if (io_should_wake(&iowq)) + if (io_should_wake(&iowq, false)) break; schedule(); if (signal_pending(current)) { - ret = -ERESTARTSYS; + ret = -EINTR; break; } } while (1); finish_wait(&ctx->wait, &iowq.wq); - restore_saved_sigmask_unless(ret == -ERESTARTSYS); - if (ret == -ERESTARTSYS) - ret = -EINTR; + restore_saved_sigmask_unless(ret == -EINTR); return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0; } @@ -2949,19 +3426,29 @@ static void __io_sqe_files_unregister(struct io_ring_ctx *ctx) #else int i; - for (i = 0; i < ctx->nr_user_files; i++) - fput(ctx->user_files[i]); + for (i = 0; i < ctx->nr_user_files; i++) { + struct file *file; + + file = io_file_from_index(ctx, i); + if (file) + fput(file); + } #endif } static int io_sqe_files_unregister(struct io_ring_ctx *ctx) { - if (!ctx->user_files) + unsigned nr_tables, i; + + if (!ctx->file_table) return -ENXIO; __io_sqe_files_unregister(ctx); - kfree(ctx->user_files); - ctx->user_files = NULL; + nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE); + for (i = 0; i < nr_tables; i++) + kfree(ctx->file_table[i].files); + kfree(ctx->file_table); + ctx->file_table = NULL; ctx->nr_user_files = 0; return 0; } @@ -2969,7 +3456,7 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx) static void io_sq_thread_stop(struct io_ring_ctx *ctx) { if (ctx->sqo_thread) { - wait_for_completion(&ctx->sqo_thread_started); + wait_for_completion(&ctx->completions[1]); /* * The park is a bit of a work-around, without it we get * warning spews on shutdown with SQPOLL set and affinity @@ -2983,15 +3470,11 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx) static void io_finish_async(struct io_ring_ctx *ctx) { - int i; - io_sq_thread_stop(ctx); - for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) { - if (ctx->sqo_wq[i]) { - destroy_workqueue(ctx->sqo_wq[i]); - ctx->sqo_wq[i] = NULL; - } + if (ctx->io_wq) { + io_wq_destroy(ctx->io_wq); + ctx->io_wq = NULL; } } @@ -2999,11 +3482,9 @@ static void io_finish_async(struct io_ring_ctx *ctx) static void io_destruct_skb(struct sk_buff *skb) { struct io_ring_ctx *ctx = skb->sk->sk_user_data; - int i; - for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) - if (ctx->sqo_wq[i]) - flush_workqueue(ctx->sqo_wq[i]); + if (ctx->io_wq) + io_wq_flush(ctx->io_wq); unix_destruct_scm(skb); } @@ -3018,7 +3499,7 @@ static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset) struct sock *sk = ctx->ring_sock->sk; struct scm_fp_list *fpl; struct sk_buff *skb; - int i; + int i, nr_files; if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) { unsigned long inflight = ctx->user->unix_inflight + nr; @@ -3038,21 +3519,33 @@ static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset) } skb->sk = sk; - skb->destructor = io_destruct_skb; + nr_files = 0; fpl->user = get_uid(ctx->user); for (i = 0; i < nr; i++) { - fpl->fp[i] = get_file(ctx->user_files[i + offset]); - unix_inflight(fpl->user, fpl->fp[i]); + struct file *file = io_file_from_index(ctx, i + offset); + + if (!file) + continue; + fpl->fp[nr_files] = get_file(file); + unix_inflight(fpl->user, fpl->fp[nr_files]); + nr_files++; } - fpl->max = fpl->count = nr; - UNIXCB(skb).fp = fpl; - refcount_add(skb->truesize, &sk->sk_wmem_alloc); - skb_queue_head(&sk->sk_receive_queue, skb); + if (nr_files) { + fpl->max = SCM_MAX_FD; + fpl->count = nr_files; + UNIXCB(skb).fp = fpl; + skb->destructor = io_destruct_skb; + refcount_add(skb->truesize, &sk->sk_wmem_alloc); + skb_queue_head(&sk->sk_receive_queue, skb); - for (i = 0; i < nr; i++) - fput(fpl->fp[i]); + for (i = 0; i < nr_files; i++) + fput(fpl->fp[i]); + } else { + kfree_skb(skb); + kfree(fpl); + } return 0; } @@ -3083,7 +3576,10 @@ static int io_sqe_files_scm(struct io_ring_ctx *ctx) return 0; while (total < ctx->nr_user_files) { - fput(ctx->user_files[total]); + struct file *file = io_file_from_index(ctx, total); + + if (file) + fput(file); total++; } @@ -3096,33 +3592,79 @@ static int io_sqe_files_scm(struct io_ring_ctx *ctx) } #endif +static int io_sqe_alloc_file_tables(struct io_ring_ctx *ctx, unsigned nr_tables, + unsigned nr_files) +{ + int i; + + for (i = 0; i < nr_tables; i++) { + struct fixed_file_table *table = &ctx->file_table[i]; + unsigned this_files; + + this_files = min(nr_files, IORING_MAX_FILES_TABLE); + table->files = kcalloc(this_files, sizeof(struct file *), + GFP_KERNEL); + if (!table->files) + break; + nr_files -= this_files; + } + + if (i == nr_tables) + return 0; + + for (i = 0; i < nr_tables; i++) { + struct fixed_file_table *table = &ctx->file_table[i]; + kfree(table->files); + } + return 1; +} + static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, unsigned nr_args) { __s32 __user *fds = (__s32 __user *) arg; + unsigned nr_tables; int fd, ret = 0; unsigned i; - if (ctx->user_files) + if (ctx->file_table) return -EBUSY; if (!nr_args) return -EINVAL; if (nr_args > IORING_MAX_FIXED_FILES) return -EMFILE; - ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL); - if (!ctx->user_files) + nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE); + ctx->file_table = kcalloc(nr_tables, sizeof(struct fixed_file_table), + GFP_KERNEL); + if (!ctx->file_table) return -ENOMEM; - for (i = 0; i < nr_args; i++) { + if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) { + kfree(ctx->file_table); + ctx->file_table = NULL; + return -ENOMEM; + } + + for (i = 0; i < nr_args; i++, ctx->nr_user_files++) { + struct fixed_file_table *table; + unsigned index; + ret = -EFAULT; if (copy_from_user(&fd, &fds[i], sizeof(fd))) break; + /* allow sparse sets */ + if (fd == -1) { + ret = 0; + continue; + } - ctx->user_files[i] = fget(fd); + table = &ctx->file_table[i >> IORING_FILE_TABLE_SHIFT]; + index = i & IORING_FILE_TABLE_MASK; + table->files[index] = fget(fd); ret = -EBADF; - if (!ctx->user_files[i]) + if (!table->files[index]) break; /* * Don't allow io_uring instances to be registered. If UNIX @@ -3131,20 +3673,26 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, * handle it just fine, but there's still no point in allowing * a ring fd as it doesn't support regular read/write anyway. */ - if (ctx->user_files[i]->f_op == &io_uring_fops) { - fput(ctx->user_files[i]); + if (table->files[index]->f_op == &io_uring_fops) { + fput(table->files[index]); break; } - ctx->nr_user_files++; ret = 0; } if (ret) { - for (i = 0; i < ctx->nr_user_files; i++) - fput(ctx->user_files[i]); + for (i = 0; i < ctx->nr_user_files; i++) { + struct file *file; - kfree(ctx->user_files); - ctx->user_files = NULL; + file = io_file_from_index(ctx, i); + if (file) + fput(file); + } + for (i = 0; i < nr_tables; i++) + kfree(ctx->file_table[i].files); + + kfree(ctx->file_table); + ctx->file_table = NULL; ctx->nr_user_files = 0; return ret; } @@ -3156,9 +3704,201 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, return ret; } +static void io_sqe_file_unregister(struct io_ring_ctx *ctx, int index) +{ +#if defined(CONFIG_UNIX) + struct file *file = io_file_from_index(ctx, index); + struct sock *sock = ctx->ring_sock->sk; + struct sk_buff_head list, *head = &sock->sk_receive_queue; + struct sk_buff *skb; + int i; + + __skb_queue_head_init(&list); + + /* + * Find the skb that holds this file in its SCM_RIGHTS. When found, + * remove this entry and rearrange the file array. + */ + skb = skb_dequeue(head); + while (skb) { + struct scm_fp_list *fp; + + fp = UNIXCB(skb).fp; + for (i = 0; i < fp->count; i++) { + int left; + + if (fp->fp[i] != file) + continue; + + unix_notinflight(fp->user, fp->fp[i]); + left = fp->count - 1 - i; + if (left) { + memmove(&fp->fp[i], &fp->fp[i + 1], + left * sizeof(struct file *)); + } + fp->count--; + if (!fp->count) { + kfree_skb(skb); + skb = NULL; + } else { + __skb_queue_tail(&list, skb); + } + fput(file); + file = NULL; + break; + } + + if (!file) + break; + + __skb_queue_tail(&list, skb); + + skb = skb_dequeue(head); + } + + if (skb_peek(&list)) { + spin_lock_irq(&head->lock); + while ((skb = __skb_dequeue(&list)) != NULL) + __skb_queue_tail(head, skb); + spin_unlock_irq(&head->lock); + } +#else + fput(io_file_from_index(ctx, index)); +#endif +} + +static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file, + int index) +{ +#if defined(CONFIG_UNIX) + struct sock *sock = ctx->ring_sock->sk; + struct sk_buff_head *head = &sock->sk_receive_queue; + struct sk_buff *skb; + + /* + * See if we can merge this file into an existing skb SCM_RIGHTS + * file set. If there's no room, fall back to allocating a new skb + * and filling it in. + */ + spin_lock_irq(&head->lock); + skb = skb_peek(head); + if (skb) { + struct scm_fp_list *fpl = UNIXCB(skb).fp; + + if (fpl->count < SCM_MAX_FD) { + __skb_unlink(skb, head); + spin_unlock_irq(&head->lock); + fpl->fp[fpl->count] = get_file(file); + unix_inflight(fpl->user, fpl->fp[fpl->count]); + fpl->count++; + spin_lock_irq(&head->lock); + __skb_queue_head(head, skb); + } else { + skb = NULL; + } + } + spin_unlock_irq(&head->lock); + + if (skb) { + fput(file); + return 0; + } + + return __io_sqe_files_scm(ctx, 1, index); +#else + return 0; +#endif +} + +static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg, + unsigned nr_args) +{ + struct io_uring_files_update up; + __s32 __user *fds; + int fd, i, err; + __u32 done; + + if (!ctx->file_table) + return -ENXIO; + if (!nr_args) + return -EINVAL; + if (copy_from_user(&up, arg, sizeof(up))) + return -EFAULT; + if (check_add_overflow(up.offset, nr_args, &done)) + return -EOVERFLOW; + if (done > ctx->nr_user_files) + return -EINVAL; + + done = 0; + fds = (__s32 __user *) up.fds; + while (nr_args) { + struct fixed_file_table *table; + unsigned index; + + err = 0; + if (copy_from_user(&fd, &fds[done], sizeof(fd))) { + err = -EFAULT; + break; + } + i = array_index_nospec(up.offset, ctx->nr_user_files); + table = &ctx->file_table[i >> IORING_FILE_TABLE_SHIFT]; + index = i & IORING_FILE_TABLE_MASK; + if (table->files[index]) { + io_sqe_file_unregister(ctx, i); + table->files[index] = NULL; + } + if (fd != -1) { + struct file *file; + + file = fget(fd); + if (!file) { + err = -EBADF; + break; + } + /* + * Don't allow io_uring instances to be registered. If + * UNIX isn't enabled, then this causes a reference + * cycle and this instance can never get freed. If UNIX + * is enabled we'll handle it just fine, but there's + * still no point in allowing a ring fd as it doesn't + * support regular read/write anyway. + */ + if (file->f_op == &io_uring_fops) { + fput(file); + err = -EBADF; + break; + } + table->files[index] = file; + err = io_sqe_file_register(ctx, file, i); + if (err) + break; + } + nr_args--; + done++; + up.offset++; + } + + return done ? done : err; +} + +static void io_put_work(struct io_wq_work *work) +{ + struct io_kiocb *req = container_of(work, struct io_kiocb, work); + + io_put_req(req); +} + +static void io_get_work(struct io_wq_work *work) +{ + struct io_kiocb *req = container_of(work, struct io_kiocb, work); + + refcount_inc(&req->refs); +} + static int io_sq_offload_start(struct io_ring_ctx *ctx, struct io_uring_params *p) { + unsigned concurrency; int ret; init_waitqueue_head(&ctx->sqo_wait); @@ -3202,26 +3942,13 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx, goto err; } - /* Do QD, or 2 * CPUS, whatever is smallest */ - ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq", - WQ_UNBOUND | WQ_FREEZABLE, - min(ctx->sq_entries - 1, 2 * num_online_cpus())); - if (!ctx->sqo_wq[0]) { - ret = -ENOMEM; - goto err; - } - - /* - * This is for buffered writes, where we want to limit the parallelism - * due to file locking in file systems. As "normal" buffered writes - * should parellelize on writeout quite nicely, limit us to having 2 - * pending. This avoids massive contention on the inode when doing - * buffered async writes. - */ - ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq", - WQ_UNBOUND | WQ_FREEZABLE, 2); - if (!ctx->sqo_wq[1]) { - ret = -ENOMEM; + /* Do QD, or 4 * CPUS, whatever is smallest */ + concurrency = min(ctx->sq_entries, 4 * num_online_cpus()); + ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm, ctx->user, + io_get_work, io_put_work); + if (IS_ERR(ctx->io_wq)) { + ret = PTR_ERR(ctx->io_wq); + ctx->io_wq = NULL; goto err; } @@ -3567,6 +4294,8 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx) io_unaccount_mem(ctx->user, ring_pages(ctx->sq_entries, ctx->cq_entries)); free_uid(ctx->user); + kfree(ctx->completions); + kmem_cache_free(req_cachep, ctx->fallback_req); kfree(ctx); } @@ -3605,8 +4334,15 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) io_kill_timeouts(ctx); io_poll_remove_all(ctx); + + if (ctx->io_wq) + io_wq_cancel_all(ctx->io_wq); + io_iopoll_reap_events(ctx); - wait_for_completion(&ctx->ctx_done); + /* if we failed setting up the ctx, we might not have any rings */ + if (ctx->rings) + io_cqring_overflow_flush(ctx, true); + wait_for_completion(&ctx->completions[0]); io_ring_ctx_free(ctx); } @@ -3619,6 +4355,53 @@ static int io_uring_release(struct inode *inode, struct file *file) return 0; } +static void io_uring_cancel_files(struct io_ring_ctx *ctx, + struct files_struct *files) +{ + struct io_kiocb *req; + DEFINE_WAIT(wait); + + while (!list_empty_careful(&ctx->inflight_list)) { + struct io_kiocb *cancel_req = NULL; + + spin_lock_irq(&ctx->inflight_lock); + list_for_each_entry(req, &ctx->inflight_list, inflight_entry) { + if (req->work.files != files) + continue; + /* req is being completed, ignore */ + if (!refcount_inc_not_zero(&req->refs)) + continue; + cancel_req = req; + break; + } + if (cancel_req) + prepare_to_wait(&ctx->inflight_wait, &wait, + TASK_UNINTERRUPTIBLE); + spin_unlock_irq(&ctx->inflight_lock); + + /* We need to keep going until we don't find a matching req */ + if (!cancel_req) + break; + + io_wq_cancel_work(ctx->io_wq, &cancel_req->work); + io_put_req(cancel_req); + schedule(); + } + finish_wait(&ctx->inflight_wait, &wait); +} + +static int io_uring_flush(struct file *file, void *data) +{ + struct io_ring_ctx *ctx = file->private_data; + + io_uring_cancel_files(ctx, data); + if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) { + io_cqring_overflow_flush(ctx, true); + io_wq_cancel_all(ctx->io_wq); + } + return 0; +} + static int io_uring_mmap(struct file *file, struct vm_area_struct *vma) { loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT; @@ -3680,14 +4463,20 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, */ ret = 0; if (ctx->flags & IORING_SETUP_SQPOLL) { + if (!list_empty_careful(&ctx->cq_overflow_list)) + io_cqring_overflow_flush(ctx, false); if (flags & IORING_ENTER_SQ_WAKEUP) wake_up(&ctx->sqo_wait); submitted = to_submit; } else if (to_submit) { - to_submit = min(to_submit, ctx->sq_entries); + struct mm_struct *cur_mm; + to_submit = min(to_submit, ctx->sq_entries); mutex_lock(&ctx->uring_lock); - submitted = io_ring_submit(ctx, to_submit); + /* already have mm, so io_submit_sqes() won't try to grab it */ + cur_mm = ctx->sqo_mm; + submitted = io_submit_sqes(ctx, to_submit, f.file, fd, + &cur_mm, false); mutex_unlock(&ctx->uring_lock); } if (flags & IORING_ENTER_GETEVENTS) { @@ -3710,6 +4499,7 @@ out_fput: static const struct file_operations io_uring_fops = { .release = io_uring_release, + .flush = io_uring_flush, .mmap = io_uring_mmap, .poll = io_uring_poll, .fasync = io_uring_fasync, @@ -3809,10 +4599,23 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p) * Use twice as many entries for the CQ ring. It's possible for the * application to drive a higher depth than the size of the SQ ring, * since the sqes are only used at submission time. This allows for - * some flexibility in overcommitting a bit. + * some flexibility in overcommitting a bit. If the application has + * set IORING_SETUP_CQSIZE, it will have passed in the desired number + * of CQ ring entries manually. */ p->sq_entries = roundup_pow_of_two(entries); - p->cq_entries = 2 * p->sq_entries; + if (p->flags & IORING_SETUP_CQSIZE) { + /* + * If IORING_SETUP_CQSIZE is set, we do the same roundup + * to a power-of-two, if it isn't already. We do NOT impose + * any cq vs sq ring sizing. + */ + if (p->cq_entries < p->sq_entries || p->cq_entries > IORING_MAX_CQ_ENTRIES) + return -EINVAL; + p->cq_entries = roundup_pow_of_two(p->cq_entries); + } else { + p->cq_entries = 2 * p->sq_entries; + } user = get_uid(current_user()); account_mem = !capable(CAP_IPC_LOCK); @@ -3871,7 +4674,8 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p) if (ret < 0) goto err; - p->features = IORING_FEAT_SINGLE_MMAP; + p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP; + trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags); return ret; err: io_ring_ctx_wait_and_kill(ctx); @@ -3897,7 +4701,7 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params) } if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL | - IORING_SETUP_SQ_AFF)) + IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE)) return -EINVAL; ret = io_uring_create(entries, &p); @@ -3941,7 +4745,7 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, * no new references will come in after we've killed the percpu ref. */ mutex_unlock(&ctx->uring_lock); - wait_for_completion(&ctx->ctx_done); + wait_for_completion(&ctx->completions[0]); mutex_lock(&ctx->uring_lock); switch (opcode) { @@ -3963,6 +4767,9 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, break; ret = io_sqe_files_unregister(ctx); break; + case IORING_REGISTER_FILES_UPDATE: + ret = io_sqe_files_update(ctx, arg, nr_args); + break; case IORING_REGISTER_EVENTFD: ret = -EINVAL; if (nr_args != 1) @@ -3981,7 +4788,7 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, } /* bring the ctx back to life */ - reinit_completion(&ctx->ctx_done); + reinit_completion(&ctx->completions[0]); percpu_ref_reinit(&ctx->refs); return ret; } @@ -4006,6 +4813,8 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, mutex_lock(&ctx->uring_lock); ret = __io_uring_register(ctx, opcode, arg, nr_args); mutex_unlock(&ctx->uring_lock); + trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, + ctx->cq_ev_fd != NULL, ret); out_fput: fdput(f); return ret; |