diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2020-06-03 00:42:50 +0200 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2020-06-03 00:42:50 +0200 |
commit | 1ee08de1e234d95b5b4f866878b72fceb5372904 (patch) | |
tree | c6c346f8d25cf309ce5585cb02af6c75b6ca9b75 /fs/io_uring.c | |
parent | Merge tag 'for-5.8/drivers-2020-06-01' of git://git.kernel.dk/linux-block (diff) | |
parent | io_uring: fix overflowed reqs cancellation (diff) | |
download | linux-1ee08de1e234d95b5b4f866878b72fceb5372904.tar.xz linux-1ee08de1e234d95b5b4f866878b72fceb5372904.zip |
Merge tag 'for-5.8/io_uring-2020-06-01' of git://git.kernel.dk/linux-block
Pull io_uring updates from Jens Axboe:
"A relatively quiet round, mostly just fixes and code improvements. In
particular:
- Make statx just use the generic statx handler, instead of open
coding it. We don't need that anymore, as we always call it async
safe (Bijan)
- Enable closing of the ring itself. Also fixes O_PATH closure (me)
- Properly name completion members (me)
- Batch reap of dead file registrations (me)
- Allow IORING_OP_POLL with double waitqueues (me)
- Add tee(2) support (Pavel)
- Remove double off read (Pavel)
- Fix overflow cancellations (Pavel)
- Improve CQ timeouts (Pavel)
- Async defer drain fixes (Pavel)
- Add support for enabling/disabling notifications on a registered
eventfd (Stefano)
- Remove dead state parameter (Xiaoguang)
- Disable SQPOLL submit on dying ctx (Xiaoguang)
- Various code cleanups"
* tag 'for-5.8/io_uring-2020-06-01' of git://git.kernel.dk/linux-block: (29 commits)
io_uring: fix overflowed reqs cancellation
io_uring: off timeouts based only on completions
io_uring: move timeouts flushing to a helper
statx: hide interfaces no longer used by io_uring
io_uring: call statx directly
statx: allow system call to be invoked from io_uring
io_uring: add io_statx structure
io_uring: get rid of manual punting in io_close
io_uring: separate DRAIN flushing into a cold path
io_uring: don't re-read sqe->off in timeout_prep()
io_uring: simplify io_timeout locking
io_uring: fix flush req->refs underflow
io_uring: don't submit sqes when ctx->refs is dying
io_uring: async task poll trigger cleanup
io_uring: add tee(2) support
splice: export do_tee()
io_uring: don't repeat valid flag list
io_uring: rename io_file_put()
io_uring: remove req->needs_fixed_files
io_uring: cleanup io_poll_remove_one() logic
...
Diffstat (limited to 'fs/io_uring.c')
-rw-r--r-- | fs/io_uring.c | 752 |
1 files changed, 408 insertions, 344 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c index bb25e3997d41..9d4bd0d3a080 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -142,7 +142,7 @@ struct io_rings { */ u32 sq_dropped; /* - * Runtime flags + * Runtime SQ flags * * Written by the kernel, shouldn't be modified by the * application. @@ -152,6 +152,13 @@ struct io_rings { */ u32 sq_flags; /* + * Runtime CQ flags + * + * Written by the application, shouldn't be modified by the + * kernel. + */ + u32 cq_flags; + /* * Number of completion events lost because the queue was full; * this should be avoided by the application by making sure * there are not more requests pending than there is space in @@ -191,7 +198,7 @@ struct fixed_file_ref_node { struct list_head node; struct list_head file_list; struct fixed_file_data *file_data; - struct work_struct work; + struct llist_node llist; }; struct fixed_file_data { @@ -279,8 +286,8 @@ struct io_ring_ctx { const struct cred *creds; - /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */ - struct completion *completions; + struct completion ref_comp; + struct completion sq_thread_comp; /* if all else fails... */ struct io_kiocb *fallback_req; @@ -327,6 +334,9 @@ struct io_ring_ctx { struct list_head inflight_list; } ____cacheline_aligned_in_smp; + struct delayed_work file_put_work; + struct llist_head file_put_llist; + struct work_struct exit_work; }; @@ -384,7 +394,8 @@ struct io_timeout { struct file *file; u64 addr; int flags; - u32 count; + u32 off; + u32 target_seq; }; struct io_rw { @@ -415,11 +426,7 @@ struct io_sr_msg { struct io_open { struct file *file; int dfd; - union { - unsigned mask; - }; struct filename *filename; - struct statx __user *buffer; struct open_how how; unsigned long nofile; }; @@ -471,6 +478,15 @@ struct io_provide_buf { __u16 bid; }; +struct io_statx { + struct file *file; + int dfd; + unsigned int mask; + unsigned int flags; + const char __user *filename; + struct statx __user *buffer; +}; + struct io_async_connect { struct sockaddr_storage address; }; @@ -612,11 +628,11 @@ struct io_kiocb { struct io_epoll epoll; struct io_splice splice; struct io_provide_buf pbuf; + struct io_statx statx; }; struct io_async_ctx *io; int cflags; - bool needs_fixed_file; u8 opcode; u16 buf_index; @@ -788,7 +804,6 @@ static const struct io_op_def io_op_defs[] = { .needs_fs = 1, }, [IORING_OP_CLOSE] = { - .needs_file = 1, .file_table = 1, }, [IORING_OP_FILES_UPDATE] = { @@ -847,6 +862,11 @@ static const struct io_op_def io_op_defs[] = { }, [IORING_OP_PROVIDE_BUFFERS] = {}, [IORING_OP_REMOVE_BUFFERS] = {}, + [IORING_OP_TEE] = { + .needs_file = 1, + .hash_reg_file = 1, + .unbound_nonreg_file = 1, + }, }; static void io_wq_submit_work(struct io_wq_work **workptr); @@ -882,11 +902,18 @@ struct sock *io_uring_get_socket(struct file *file) } EXPORT_SYMBOL(io_uring_get_socket); +static void io_file_put_work(struct work_struct *work); + +static inline bool io_async_submit(struct io_ring_ctx *ctx) +{ + return ctx->flags & IORING_SETUP_SQPOLL; +} + static void io_ring_ctx_ref_free(struct percpu_ref *ref) { struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs); - complete(&ctx->completions[0]); + complete(&ctx->ref_comp); } static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) @@ -902,10 +929,6 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) if (!ctx->fallback_req) goto err; - ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL); - if (!ctx->completions) - goto err; - /* * Use 5 bits less than the max cq entries, that should give us around * 32 entries per hash list if totally full and uniformly spread. @@ -929,8 +952,8 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) init_waitqueue_head(&ctx->sqo_wait); init_waitqueue_head(&ctx->cq_wait); INIT_LIST_HEAD(&ctx->cq_overflow_list); - init_completion(&ctx->completions[0]); - init_completion(&ctx->completions[1]); + init_completion(&ctx->ref_comp); + init_completion(&ctx->sq_thread_comp); idr_init(&ctx->io_buffer_idr); idr_init(&ctx->personality_idr); mutex_init(&ctx->uring_lock); @@ -942,11 +965,12 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) init_waitqueue_head(&ctx->inflight_wait); spin_lock_init(&ctx->inflight_lock); INIT_LIST_HEAD(&ctx->inflight_list); + INIT_DELAYED_WORK(&ctx->file_put_work, io_file_put_work); + init_llist_head(&ctx->file_put_llist); return ctx; err: if (ctx->fallback_req) kmem_cache_free(req_cachep, ctx->fallback_req); - kfree(ctx->completions); kfree(ctx->cancel_hash); kfree(ctx); return NULL; @@ -968,36 +992,6 @@ static inline bool req_need_defer(struct io_kiocb *req) return false; } -static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) -{ - struct io_kiocb *req; - - req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list); - if (req && !req_need_defer(req)) { - list_del_init(&req->list); - return req; - } - - return NULL; -} - -static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx) -{ - struct io_kiocb *req; - - req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list); - if (req) { - if (req->flags & REQ_F_TIMEOUT_NOSEQ) - return NULL; - if (!__req_need_defer(req)) { - list_del_init(&req->list); - return req; - } - } - - return NULL; -} - static void __io_commit_cqring(struct io_ring_ctx *ctx) { struct io_rings *rings = ctx->rings; @@ -1113,17 +1107,43 @@ static void io_kill_timeouts(struct io_ring_ctx *ctx) spin_unlock_irq(&ctx->completion_lock); } -static void io_commit_cqring(struct io_ring_ctx *ctx) +static void __io_queue_deferred(struct io_ring_ctx *ctx) { - struct io_kiocb *req; + do { + struct io_kiocb *req = list_first_entry(&ctx->defer_list, + struct io_kiocb, list); - while ((req = io_get_timeout_req(ctx)) != NULL) + if (req_need_defer(req)) + break; + list_del_init(&req->list); + io_queue_async_work(req); + } while (!list_empty(&ctx->defer_list)); +} + +static void io_flush_timeouts(struct io_ring_ctx *ctx) +{ + while (!list_empty(&ctx->timeout_list)) { + struct io_kiocb *req = list_first_entry(&ctx->timeout_list, + struct io_kiocb, list); + + if (req->flags & REQ_F_TIMEOUT_NOSEQ) + break; + if (req->timeout.target_seq != ctx->cached_cq_tail + - atomic_read(&ctx->cq_timeouts)) + break; + + list_del_init(&req->list); io_kill_timeout(req); + } +} +static void io_commit_cqring(struct io_ring_ctx *ctx) +{ + io_flush_timeouts(ctx); __io_commit_cqring(ctx); - while ((req = io_get_deferred_req(ctx)) != NULL) - io_queue_async_work(req); + if (unlikely(!list_empty(&ctx->defer_list))) + __io_queue_deferred(ctx); } static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) @@ -1148,6 +1168,8 @@ static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx) { if (!ctx->cq_ev_fd) return false; + if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED) + return false; if (!ctx->eventfd_async) return true; return io_wq_current_is_worker(); @@ -1984,15 +2006,19 @@ static void io_iopoll_req_issued(struct io_kiocb *req) wake_up(&ctx->sqo_wait); } -static void io_file_put(struct io_submit_state *state) +static void __io_state_file_put(struct io_submit_state *state) { - if (state->file) { - int diff = state->has_refs - state->used_refs; + int diff = state->has_refs - state->used_refs; - if (diff) - fput_many(state->file, diff); - state->file = NULL; - } + if (diff) + fput_many(state->file, diff); + state->file = NULL; +} + +static inline void io_state_file_put(struct io_submit_state *state) +{ + if (state->file) + __io_state_file_put(state); } /* @@ -2011,7 +2037,7 @@ static struct file *__io_file_get(struct io_submit_state *state, int fd) state->ios_left--; return state->file; } - io_file_put(state); + __io_state_file_put(state); } state->file = fget_many(fd, state->ios_left); if (!state->file) @@ -2727,7 +2753,8 @@ out_free: return ret; } -static int io_splice_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) +static int __io_splice_prep(struct io_kiocb *req, + const struct io_uring_sqe *sqe) { struct io_splice* sp = &req->splice; unsigned int valid_flags = SPLICE_F_FD_IN_FIXED | SPLICE_F_ALL; @@ -2737,8 +2764,6 @@ static int io_splice_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return 0; sp->file_in = NULL; - sp->off_in = READ_ONCE(sqe->splice_off_in); - sp->off_out = READ_ONCE(sqe->off); sp->len = READ_ONCE(sqe->len); sp->flags = READ_ONCE(sqe->splice_flags); @@ -2757,6 +2782,46 @@ static int io_splice_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return 0; } +static int io_tee_prep(struct io_kiocb *req, + const struct io_uring_sqe *sqe) +{ + if (READ_ONCE(sqe->splice_off_in) || READ_ONCE(sqe->off)) + return -EINVAL; + return __io_splice_prep(req, sqe); +} + +static int io_tee(struct io_kiocb *req, bool force_nonblock) +{ + struct io_splice *sp = &req->splice; + struct file *in = sp->file_in; + struct file *out = sp->file_out; + unsigned int flags = sp->flags & ~SPLICE_F_FD_IN_FIXED; + long ret = 0; + + if (force_nonblock) + return -EAGAIN; + if (sp->len) + ret = do_tee(in, out, sp->len, flags); + + io_put_file(req, in, (sp->flags & SPLICE_F_FD_IN_FIXED)); + req->flags &= ~REQ_F_NEED_CLEANUP; + + io_cqring_add_event(req, ret); + if (ret != sp->len) + req_set_fail_links(req); + io_put_req(req); + return 0; +} + +static int io_splice_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + struct io_splice* sp = &req->splice; + + sp->off_in = READ_ONCE(sqe->splice_off_in); + sp->off_out = READ_ONCE(sqe->off); + return __io_splice_prep(req, sqe); +} + static int io_splice(struct io_kiocb *req, bool force_nonblock) { struct io_splice *sp = &req->splice; @@ -3305,43 +3370,23 @@ static int io_fadvise(struct io_kiocb *req, bool force_nonblock) static int io_statx_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { - const char __user *fname; - unsigned lookup_flags; - int ret; - if (sqe->ioprio || sqe->buf_index) return -EINVAL; if (req->flags & REQ_F_FIXED_FILE) return -EBADF; - if (req->flags & REQ_F_NEED_CLEANUP) - return 0; - - req->open.dfd = READ_ONCE(sqe->fd); - req->open.mask = READ_ONCE(sqe->len); - fname = u64_to_user_ptr(READ_ONCE(sqe->addr)); - req->open.buffer = u64_to_user_ptr(READ_ONCE(sqe->addr2)); - req->open.how.flags = READ_ONCE(sqe->statx_flags); - - if (vfs_stat_set_lookup_flags(&lookup_flags, req->open.how.flags)) - return -EINVAL; - req->open.filename = getname_flags(fname, lookup_flags, NULL); - if (IS_ERR(req->open.filename)) { - ret = PTR_ERR(req->open.filename); - req->open.filename = NULL; - return ret; - } + req->statx.dfd = READ_ONCE(sqe->fd); + req->statx.mask = READ_ONCE(sqe->len); + req->statx.filename = u64_to_user_ptr(READ_ONCE(sqe->addr)); + req->statx.buffer = u64_to_user_ptr(READ_ONCE(sqe->addr2)); + req->statx.flags = READ_ONCE(sqe->statx_flags); - req->flags |= REQ_F_NEED_CLEANUP; return 0; } static int io_statx(struct io_kiocb *req, bool force_nonblock) { - struct io_open *ctx = &req->open; - unsigned lookup_flags; - struct path path; - struct kstat stat; + struct io_statx *ctx = &req->statx; int ret; if (force_nonblock) { @@ -3351,29 +3396,9 @@ static int io_statx(struct io_kiocb *req, bool force_nonblock) return -EAGAIN; } - if (vfs_stat_set_lookup_flags(&lookup_flags, ctx->how.flags)) - return -EINVAL; - -retry: - /* filename_lookup() drops it, keep a reference */ - ctx->filename->refcnt++; + ret = do_statx(ctx->dfd, ctx->filename, ctx->flags, ctx->mask, + ctx->buffer); - ret = filename_lookup(ctx->dfd, ctx->filename, lookup_flags, &path, - NULL); - if (ret) - goto err; - - ret = vfs_getattr(&path, &stat, ctx->mask, ctx->how.flags); - path_put(&path); - if (retry_estale(ret, lookup_flags)) { - lookup_flags |= LOOKUP_REVAL; - goto retry; - } - if (!ret) - ret = cp_statx(&stat, ctx->buffer); -err: - putname(ctx->filename); - req->flags &= ~REQ_F_NEED_CLEANUP; if (ret < 0) req_set_fail_links(req); io_cqring_add_event(req, ret); @@ -3396,10 +3421,6 @@ static int io_close_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return -EBADF; req->close.fd = READ_ONCE(sqe->fd); - if (req->file->f_op == &io_uring_fops || - req->close.fd == req->ctx->ring_fd) - return -EBADF; - return 0; } @@ -3432,21 +3453,14 @@ static int io_close(struct io_kiocb *req, bool force_nonblock) req->close.put_file = NULL; ret = __close_fd_get_file(req->close.fd, &req->close.put_file); if (ret < 0) - return ret; + return (ret == -ENOENT) ? -EBADF : ret; /* if the file has a flush method, be safe and punt to async */ if (req->close.put_file->f_op->flush && force_nonblock) { - /* submission ref will be dropped, take it for async */ - refcount_inc(&req->refs); - + /* avoid grabbing files - we don't need the files */ + req->flags |= REQ_F_NO_FILE_TABLE | REQ_F_MUST_PUNT; req->work.func = io_close_finish; - /* - * Do manual async queue here to avoid grabbing files - we don't - * need the files, and it'll cause io_close_finish() to close - * the file again and cause a double CQE entry for this request - */ - io_queue_async_work(req); - return 0; + return -EAGAIN; } /* @@ -4096,27 +4110,6 @@ struct io_poll_table { int error; }; -static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt, - struct wait_queue_head *head) -{ - if (unlikely(poll->head)) { - pt->error = -EINVAL; - return; - } - - pt->error = 0; - poll->head = head; - add_wait_queue(head, &poll->wait); -} - -static void io_async_queue_proc(struct file *file, struct wait_queue_head *head, - struct poll_table_struct *p) -{ - struct io_poll_table *pt = container_of(p, struct io_poll_table, pt); - - __io_queue_proc(&pt->req->apoll->poll, pt, head); -} - static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll, __poll_t mask, task_work_func_t func) { @@ -4170,12 +4163,150 @@ static bool io_poll_rewait(struct io_kiocb *req, struct io_poll_iocb *poll) return false; } +static void io_poll_remove_double(struct io_kiocb *req) +{ + struct io_poll_iocb *poll = (struct io_poll_iocb *) req->io; + + lockdep_assert_held(&req->ctx->completion_lock); + + if (poll && poll->head) { + struct wait_queue_head *head = poll->head; + + spin_lock(&head->lock); + list_del_init(&poll->wait.entry); + if (poll->wait.private) + refcount_dec(&req->refs); + poll->head = NULL; + spin_unlock(&head->lock); + } +} + +static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error) +{ + struct io_ring_ctx *ctx = req->ctx; + + io_poll_remove_double(req); + req->poll.done = true; + io_cqring_fill_event(req, error ? error : mangle_poll(mask)); + io_commit_cqring(ctx); +} + +static void io_poll_task_handler(struct io_kiocb *req, struct io_kiocb **nxt) +{ + struct io_ring_ctx *ctx = req->ctx; + + if (io_poll_rewait(req, &req->poll)) { + spin_unlock_irq(&ctx->completion_lock); + return; + } + + hash_del(&req->hash_node); + io_poll_complete(req, req->result, 0); + req->flags |= REQ_F_COMP_LOCKED; + io_put_req_find_next(req, nxt); + spin_unlock_irq(&ctx->completion_lock); + + io_cqring_ev_posted(ctx); +} + +static void io_poll_task_func(struct callback_head *cb) +{ + struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); + struct io_kiocb *nxt = NULL; + + io_poll_task_handler(req, &nxt); + if (nxt) { + struct io_ring_ctx *ctx = nxt->ctx; + + mutex_lock(&ctx->uring_lock); + __io_queue_sqe(nxt, NULL); + mutex_unlock(&ctx->uring_lock); + } +} + +static int io_poll_double_wake(struct wait_queue_entry *wait, unsigned mode, + int sync, void *key) +{ + struct io_kiocb *req = wait->private; + struct io_poll_iocb *poll = (struct io_poll_iocb *) req->io; + __poll_t mask = key_to_poll(key); + + /* for instances that support it check for an event match first: */ + if (mask && !(mask & poll->events)) + return 0; + + if (req->poll.head) { + bool done; + + spin_lock(&req->poll.head->lock); + done = list_empty(&req->poll.wait.entry); + if (!done) + list_del_init(&req->poll.wait.entry); + spin_unlock(&req->poll.head->lock); + if (!done) + __io_async_wake(req, poll, mask, io_poll_task_func); + } + refcount_dec(&req->refs); + return 1; +} + +static void io_init_poll_iocb(struct io_poll_iocb *poll, __poll_t events, + wait_queue_func_t wake_func) +{ + poll->head = NULL; + poll->done = false; + poll->canceled = false; + poll->events = events; + INIT_LIST_HEAD(&poll->wait.entry); + init_waitqueue_func_entry(&poll->wait, wake_func); +} + +static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt, + struct wait_queue_head *head) +{ + struct io_kiocb *req = pt->req; + + /* + * If poll->head is already set, it's because the file being polled + * uses multiple waitqueues for poll handling (eg one for read, one + * for write). Setup a separate io_poll_iocb if this happens. + */ + if (unlikely(poll->head)) { + /* already have a 2nd entry, fail a third attempt */ + if (req->io) { + pt->error = -EINVAL; + return; + } + poll = kmalloc(sizeof(*poll), GFP_ATOMIC); + if (!poll) { + pt->error = -ENOMEM; + return; + } + io_init_poll_iocb(poll, req->poll.events, io_poll_double_wake); + refcount_inc(&req->refs); + poll->wait.private = req; + req->io = (void *) poll; + } + + pt->error = 0; + poll->head = head; + add_wait_queue(head, &poll->wait); +} + +static void io_async_queue_proc(struct file *file, struct wait_queue_head *head, + struct poll_table_struct *p) +{ + struct io_poll_table *pt = container_of(p, struct io_poll_table, pt); + + __io_queue_proc(&pt->req->apoll->poll, pt, head); +} + static void io_async_task_func(struct callback_head *cb) { struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); struct async_poll *apoll = req->apoll; struct io_ring_ctx *ctx = req->ctx; - bool canceled; + bool canceled = false; trace_io_uring_task_run(req->ctx, req->opcode, req->user_data); @@ -4184,34 +4315,33 @@ static void io_async_task_func(struct callback_head *cb) return; } - if (hash_hashed(&req->hash_node)) + /* If req is still hashed, it cannot have been canceled. Don't check. */ + if (hash_hashed(&req->hash_node)) { hash_del(&req->hash_node); - - canceled = READ_ONCE(apoll->poll.canceled); - if (canceled) { - io_cqring_fill_event(req, -ECANCELED); - io_commit_cqring(ctx); + } else { + canceled = READ_ONCE(apoll->poll.canceled); + if (canceled) { + io_cqring_fill_event(req, -ECANCELED); + io_commit_cqring(ctx); + } } spin_unlock_irq(&ctx->completion_lock); /* restore ->work in case we need to retry again */ memcpy(&req->work, &apoll->work, sizeof(req->work)); + kfree(apoll); - if (canceled) { - kfree(apoll); + if (!canceled) { + __set_current_state(TASK_RUNNING); + mutex_lock(&ctx->uring_lock); + __io_queue_sqe(req, NULL); + mutex_unlock(&ctx->uring_lock); + } else { io_cqring_ev_posted(ctx); req_set_fail_links(req); io_double_put_req(req); - return; } - - __set_current_state(TASK_RUNNING); - mutex_lock(&ctx->uring_lock); - __io_queue_sqe(req, NULL); - mutex_unlock(&ctx->uring_lock); - - kfree(apoll); } static int io_async_wake(struct wait_queue_entry *wait, unsigned mode, int sync, @@ -4245,18 +4375,13 @@ static __poll_t __io_arm_poll_handler(struct io_kiocb *req, bool cancel = false; poll->file = req->file; - poll->head = NULL; - poll->done = poll->canceled = false; - poll->events = mask; + io_init_poll_iocb(poll, mask, wake_func); + poll->wait.private = req; ipt->pt._key = mask; ipt->req = req; ipt->error = -EINVAL; - INIT_LIST_HEAD(&poll->wait.entry); - init_waitqueue_func_entry(&poll->wait, wake_func); - poll->wait.private = req; - mask = vfs_poll(req->file, &ipt->pt) & poll->events; spin_lock_irq(&ctx->completion_lock); @@ -4287,6 +4412,7 @@ static bool io_arm_poll_handler(struct io_kiocb *req) struct async_poll *apoll; struct io_poll_table ipt; __poll_t mask, ret; + bool had_io; if (!req->file || !file_can_poll(req->file)) return false; @@ -4301,6 +4427,7 @@ static bool io_arm_poll_handler(struct io_kiocb *req) req->flags |= REQ_F_POLLED; memcpy(&apoll->work, &req->work, sizeof(req->work)); + had_io = req->io != NULL; get_task_struct(current); req->task = current; @@ -4320,7 +4447,9 @@ static bool io_arm_poll_handler(struct io_kiocb *req) io_async_wake); if (ret) { ipt.error = 0; - apoll->poll.done = true; + /* only remove double add if we did it here */ + if (!had_io) + io_poll_remove_double(req); spin_unlock_irq(&ctx->completion_lock); memcpy(&req->work, &apoll->work, sizeof(req->work)); kfree(apoll); @@ -4344,32 +4473,32 @@ static bool __io_poll_remove_one(struct io_kiocb *req, do_complete = true; } spin_unlock(&poll->head->lock); + hash_del(&req->hash_node); return do_complete; } static bool io_poll_remove_one(struct io_kiocb *req) { - struct async_poll *apoll = NULL; bool do_complete; if (req->opcode == IORING_OP_POLL_ADD) { + io_poll_remove_double(req); do_complete = __io_poll_remove_one(req, &req->poll); } else { - apoll = req->apoll; + struct async_poll *apoll = req->apoll; + /* non-poll requests have submit ref still */ - do_complete = __io_poll_remove_one(req, &req->apoll->poll); - if (do_complete) + do_complete = __io_poll_remove_one(req, &apoll->poll); + if (do_complete) { io_put_req(req); - } - - hash_del(&req->hash_node); - - if (do_complete && apoll) { - /* - * restore ->work because we need to call io_req_work_drop_env. - */ - memcpy(&req->work, &apoll->work, sizeof(req->work)); - kfree(apoll); + /* + * restore ->work because we will call + * io_req_work_drop_env below when dropping the + * final reference. + */ + memcpy(&req->work, &apoll->work, sizeof(req->work)); + kfree(apoll); + } } if (do_complete) { @@ -4454,49 +4583,6 @@ static int io_poll_remove(struct io_kiocb *req) return 0; } -static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error) -{ - struct io_ring_ctx *ctx = req->ctx; - - req->poll.done = true; - io_cqring_fill_event(req, error ? error : mangle_poll(mask)); - io_commit_cqring(ctx); -} - -static void io_poll_task_handler(struct io_kiocb *req, struct io_kiocb **nxt) -{ - struct io_ring_ctx *ctx = req->ctx; - struct io_poll_iocb *poll = &req->poll; - - if (io_poll_rewait(req, poll)) { - spin_unlock_irq(&ctx->completion_lock); - return; - } - - hash_del(&req->hash_node); - io_poll_complete(req, req->result, 0); - req->flags |= REQ_F_COMP_LOCKED; - io_put_req_find_next(req, nxt); - spin_unlock_irq(&ctx->completion_lock); - - io_cqring_ev_posted(ctx); -} - -static void io_poll_task_func(struct callback_head *cb) -{ - struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); - struct io_kiocb *nxt = NULL; - - io_poll_task_handler(req, &nxt); - if (nxt) { - struct io_ring_ctx *ctx = nxt->ctx; - - mutex_lock(&ctx->uring_lock); - __io_queue_sqe(nxt, NULL); - mutex_unlock(&ctx->uring_lock); - } -} - static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key) { @@ -4576,20 +4662,8 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer) * We could be racing with timeout deletion. If the list is empty, * then timeout lookup already found it and will be handling it. */ - if (!list_empty(&req->list)) { - struct io_kiocb *prev; - - /* - * Adjust the reqs sequence before the current one because it - * will consume a slot in the cq_ring and the cq_tail - * pointer will be increased, otherwise other timeout reqs may - * return in advance without waiting for enough wait_nr. - */ - prev = req; - list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list) - prev->sequence++; + if (!list_empty(&req->list)) list_del_init(&req->list); - } io_cqring_fill_event(req, -ETIME); io_commit_cqring(ctx); @@ -4669,18 +4743,19 @@ static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, { struct io_timeout_data *data; unsigned flags; + u32 off = READ_ONCE(sqe->off); if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) return -EINVAL; if (sqe->ioprio || sqe->buf_index || sqe->len != 1) return -EINVAL; - if (sqe->off && is_timeout_link) + if (off && is_timeout_link) return -EINVAL; flags = READ_ONCE(sqe->timeout_flags); if (flags & ~IORING_TIMEOUT_ABS) return -EINVAL; - req->timeout.count = READ_ONCE(sqe->off); + req->timeout.off = off; if (!req->io && io_alloc_async_ctx(req)) return -ENOMEM; @@ -4704,68 +4779,39 @@ static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, static int io_timeout(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; - struct io_timeout_data *data; + struct io_timeout_data *data = &req->io->timeout; struct list_head *entry; - unsigned span = 0; - u32 count = req->timeout.count; - u32 seq = req->sequence; + u32 tail, off = req->timeout.off; - data = &req->io->timeout; + spin_lock_irq(&ctx->completion_lock); /* * sqe->off holds how many events that need to occur for this * timeout event to be satisfied. If it isn't set, then this is * a pure timeout request, sequence isn't used. */ - if (!count) { + if (!off) { req->flags |= REQ_F_TIMEOUT_NOSEQ; - spin_lock_irq(&ctx->completion_lock); entry = ctx->timeout_list.prev; goto add; } - req->sequence = seq + count; + tail = ctx->cached_cq_tail - atomic_read(&ctx->cq_timeouts); + req->timeout.target_seq = tail + off; /* * Insertion sort, ensuring the first entry in the list is always * the one we need first. */ - spin_lock_irq(&ctx->completion_lock); list_for_each_prev(entry, &ctx->timeout_list) { struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list); - unsigned nxt_seq; - long long tmp, tmp_nxt; - u32 nxt_offset = nxt->timeout.count; if (nxt->flags & REQ_F_TIMEOUT_NOSEQ) continue; - - /* - * Since seq + count can overflow, use type long - * long to store it. - */ - tmp = (long long)seq + count; - nxt_seq = nxt->sequence - nxt_offset; - tmp_nxt = (long long)nxt_seq + nxt_offset; - - /* - * cached_sq_head may overflow, and it will never overflow twice - * once there is some timeout req still be valid. - */ - if (seq < nxt_seq) - tmp += UINT_MAX; - - if (tmp > tmp_nxt) + /* nxt.seq is behind @tail, otherwise would've been completed */ + if (off >= nxt->timeout.target_seq - tail) break; - - /* - * Sequence of reqs after the insert one and itself should - * be adjusted because each timeout req consumes a slot. - */ - span++; - nxt->sequence++; } - req->sequence -= span; add: list_add(&req->list, entry); data->timer.function = io_timeout_fn; @@ -4994,6 +5040,9 @@ static int io_req_defer_prep(struct io_kiocb *req, case IORING_OP_REMOVE_BUFFERS: ret = io_remove_buffers_prep(req, sqe); break; + case IORING_OP_TEE: + ret = io_tee_prep(req, sqe); + break; default: printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n", req->opcode); @@ -5064,10 +5113,9 @@ static void io_cleanup_req(struct io_kiocb *req) break; case IORING_OP_OPENAT: case IORING_OP_OPENAT2: - case IORING_OP_STATX: - putname(req->open.filename); break; case IORING_OP_SPLICE: + case IORING_OP_TEE: io_put_file(req, req->splice.file_in, (req->splice.flags & SPLICE_F_FD_IN_FIXED)); break; @@ -5298,6 +5346,14 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, } ret = io_remove_buffers(req, force_nonblock); break; + case IORING_OP_TEE: + if (sqe) { + ret = io_tee_prep(req, sqe); + if (ret < 0) + break; + } + ret = io_tee(req, force_nonblock); + break; default: ret = -EINVAL; break; @@ -5367,7 +5423,7 @@ static inline struct file *io_file_from_index(struct io_ring_ctx *ctx, struct fixed_file_table *table; table = &ctx->file_data->table[index >> IORING_FILE_TABLE_SHIFT]; - return table->files[index & IORING_FILE_TABLE_MASK];; + return table->files[index & IORING_FILE_TABLE_MASK]; } static int io_file_get(struct io_submit_state *state, struct io_kiocb *req, @@ -5403,7 +5459,7 @@ static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req, bool fixed; fixed = (req->flags & REQ_F_FIXED_FILE) != 0; - if (unlikely(!fixed && req->needs_fixed_file)) + if (unlikely(!fixed && io_async_submit(req->ctx))) return -EBADF; return io_file_get(state, req, fd, &req->file, fixed); @@ -5638,7 +5694,7 @@ static inline void io_queue_link_head(struct io_kiocb *req) } static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, - struct io_submit_state *state, struct io_kiocb **link) + struct io_kiocb **link) { struct io_ring_ctx *ctx = req->ctx; int ret; @@ -5711,7 +5767,7 @@ static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, static void io_submit_state_end(struct io_submit_state *state) { blk_finish_plug(&state->plug); - io_file_put(state); + io_state_file_put(state); if (state->free_reqs) kmem_cache_free_bulk(req_cachep, state->free_reqs, state->reqs); } @@ -5782,7 +5838,7 @@ static inline void io_consume_sqe(struct io_ring_ctx *ctx) static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, const struct io_uring_sqe *sqe, - struct io_submit_state *state, bool async) + struct io_submit_state *state) { unsigned int sqe_flags; int id; @@ -5803,7 +5859,6 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, refcount_set(&req->refs, 2); req->task = NULL; req->result = 0; - req->needs_fixed_file = async; INIT_IO_WORK(&req->work, io_wq_submit_work); if (unlikely(req->opcode >= IORING_OP_LAST)) @@ -5833,9 +5888,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, } /* same numerical values with corresponding REQ_F_*, safe to copy */ - req->flags |= sqe_flags & (IOSQE_IO_DRAIN | IOSQE_IO_HARDLINK | - IOSQE_ASYNC | IOSQE_FIXED_FILE | - IOSQE_BUFFER_SELECT | IOSQE_IO_LINK); + req->flags |= sqe_flags; if (!io_op_defs[req->opcode].needs_file) return 0; @@ -5844,7 +5897,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, } static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr, - struct file *ring_file, int ring_fd, bool async) + struct file *ring_file, int ring_fd) { struct io_submit_state state, *statep = NULL; struct io_kiocb *link = NULL; @@ -5888,7 +5941,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr, break; } - err = io_init_req(ctx, req, sqe, statep, async); + err = io_init_req(ctx, req, sqe, statep); io_consume_sqe(ctx); /* will complete beyond this point, count as submitted */ submitted++; @@ -5901,8 +5954,8 @@ fail_req: } trace_io_uring_submit_sqe(ctx, req->opcode, req->user_data, - true, async); - err = io_submit_sqe(req, sqe, statep, &link); + true, io_async_submit(ctx)); + err = io_submit_sqe(req, sqe, &link); if (err) goto fail_req; } @@ -5942,7 +5995,7 @@ static int io_sq_thread(void *data) unsigned long timeout; int ret = 0; - complete(&ctx->completions[1]); + complete(&ctx->sq_thread_comp); old_fs = get_fs(); set_fs(USER_DS); @@ -6041,7 +6094,8 @@ static int io_sq_thread(void *data) } mutex_lock(&ctx->uring_lock); - ret = io_submit_sqes(ctx, to_submit, NULL, -1, true); + if (likely(!percpu_ref_is_dying(&ctx->refs))) + ret = io_submit_sqes(ctx, to_submit, NULL, -1); mutex_unlock(&ctx->uring_lock); timeout = jiffies + ctx->sq_thread_idle; } @@ -6189,22 +6243,22 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx) struct fixed_file_data *data = ctx->file_data; struct fixed_file_ref_node *ref_node = NULL; unsigned nr_tables, i; - unsigned long flags; if (!data) return -ENXIO; - spin_lock_irqsave(&data->lock, flags); + spin_lock(&data->lock); if (!list_empty(&data->ref_list)) ref_node = list_first_entry(&data->ref_list, struct fixed_file_ref_node, node); - spin_unlock_irqrestore(&data->lock, flags); + spin_unlock(&data->lock); if (ref_node) percpu_ref_kill(&ref_node->refs); percpu_ref_kill(&data->refs); /* wait for all refs nodes to complete */ + flush_delayed_work(&ctx->file_put_work); wait_for_completion(&data->done); __io_sqe_files_unregister(ctx); @@ -6222,7 +6276,7 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx) static void io_sq_thread_stop(struct io_ring_ctx *ctx) { if (ctx->sqo_thread) { - wait_for_completion(&ctx->completions[1]); + wait_for_completion(&ctx->sq_thread_comp); /* * The park is a bit of a work-around, without it we get * warning spews on shutdown with SQPOLL set and affinity @@ -6435,40 +6489,63 @@ struct io_file_put { struct file *file; }; -static void io_file_put_work(struct work_struct *work) +static void __io_file_put_work(struct fixed_file_ref_node *ref_node) { - struct fixed_file_ref_node *ref_node; - struct fixed_file_data *file_data; - struct io_ring_ctx *ctx; + struct fixed_file_data *file_data = ref_node->file_data; + struct io_ring_ctx *ctx = file_data->ctx; struct io_file_put *pfile, *tmp; - unsigned long flags; - - ref_node = container_of(work, struct fixed_file_ref_node, work); - file_data = ref_node->file_data; - ctx = file_data->ctx; list_for_each_entry_safe(pfile, tmp, &ref_node->file_list, list) { - list_del_init(&pfile->list); + list_del(&pfile->list); io_ring_file_put(ctx, pfile->file); kfree(pfile); } - spin_lock_irqsave(&file_data->lock, flags); - list_del_init(&ref_node->node); - spin_unlock_irqrestore(&file_data->lock, flags); + spin_lock(&file_data->lock); + list_del(&ref_node->node); + spin_unlock(&file_data->lock); percpu_ref_exit(&ref_node->refs); kfree(ref_node); percpu_ref_put(&file_data->refs); } +static void io_file_put_work(struct work_struct *work) +{ + struct io_ring_ctx *ctx; + struct llist_node *node; + + ctx = container_of(work, struct io_ring_ctx, file_put_work.work); + node = llist_del_all(&ctx->file_put_llist); + + while (node) { + struct fixed_file_ref_node *ref_node; + struct llist_node *next = node->next; + + ref_node = llist_entry(node, struct fixed_file_ref_node, llist); + __io_file_put_work(ref_node); + node = next; + } +} + static void io_file_data_ref_zero(struct percpu_ref *ref) { struct fixed_file_ref_node *ref_node; + struct io_ring_ctx *ctx; + bool first_add; + int delay = HZ; ref_node = container_of(ref, struct fixed_file_ref_node, refs); + ctx = ref_node->file_data->ctx; + + if (percpu_ref_is_dying(&ctx->file_data->refs)) + delay = 0; - queue_work(system_wq, &ref_node->work); + first_add = llist_add(&ref_node->llist, &ctx->file_put_llist); + if (!delay) + mod_delayed_work(system_wq, &ctx->file_put_work, 0); + else if (first_add) + queue_delayed_work(system_wq, &ctx->file_put_work, delay); } static struct fixed_file_ref_node *alloc_fixed_file_ref_node( @@ -6487,10 +6564,8 @@ static struct fixed_file_ref_node *alloc_fixed_file_ref_node( } INIT_LIST_HEAD(&ref_node->node); INIT_LIST_HEAD(&ref_node->file_list); - INIT_WORK(&ref_node->work, io_file_put_work); ref_node->file_data = ctx->file_data; return ref_node; - } static void destroy_fixed_file_ref_node(struct fixed_file_ref_node *ref_node) @@ -6508,7 +6583,6 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, int fd, ret = 0; unsigned i; struct fixed_file_ref_node *ref_node; - unsigned long flags; if (ctx->file_data) return -EBUSY; @@ -6616,9 +6690,9 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, } ctx->file_data->cur_refs = &ref_node->refs; - spin_lock_irqsave(&ctx->file_data->lock, flags); + spin_lock(&ctx->file_data->lock); list_add(&ref_node->node, &ctx->file_data->ref_list); - spin_unlock_irqrestore(&ctx->file_data->lock, flags); + spin_unlock(&ctx->file_data->lock); percpu_ref_get(&ctx->file_data->refs); return ret; } @@ -6694,7 +6768,6 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, __s32 __user *fds; int fd, i, err; __u32 done; - unsigned long flags; bool needs_switch = false; if (check_add_overflow(up->offset, nr_args, &done)) @@ -6759,10 +6832,10 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, if (needs_switch) { percpu_ref_kill(data->cur_refs); - spin_lock_irqsave(&data->lock, flags); + spin_lock(&data->lock); list_add(&ref_node->node, &data->ref_list); data->cur_refs = &ref_node->refs; - spin_unlock_irqrestore(&data->lock, flags); + spin_unlock(&data->lock); percpu_ref_get(&ctx->file_data->refs); } else destroy_fixed_file_ref_node(ref_node); @@ -7250,7 +7323,6 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx) ring_pages(ctx->sq_entries, ctx->cq_entries)); free_uid(ctx->user); put_cred(ctx->creds); - kfree(ctx->completions); kfree(ctx->cancel_hash); kmem_cache_free(req_cachep, ctx->fallback_req); kfree(ctx); @@ -7302,7 +7374,7 @@ static void io_ring_exit_work(struct work_struct *work) if (ctx->rings) io_cqring_overflow_flush(ctx, true); - wait_for_completion(&ctx->completions[0]); + wait_for_completion(&ctx->ref_comp); io_ring_ctx_free(ctx); } @@ -7312,16 +7384,6 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) percpu_ref_kill(&ctx->refs); mutex_unlock(&ctx->uring_lock); - /* - * Wait for sq thread to idle, if we have one. It won't spin on new - * work after we've killed the ctx ref above. This is important to do - * before we cancel existing commands, as the thread could otherwise - * be queueing new work post that. If that's work we need to cancel, - * it could cause shutdown to hang. - */ - while (ctx->sqo_thread && !wq_has_sleeper(&ctx->sqo_wait)) - cond_resched(); - io_kill_timeouts(ctx); io_poll_remove_all(ctx); @@ -7390,14 +7452,15 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx, * all we had, then we're done with this request. */ if (refcount_sub_and_test(2, &cancel_req->refs)) { - io_put_req(cancel_req); + io_free_req(cancel_req); finish_wait(&ctx->inflight_wait, &wait); continue; } + } else { + io_wq_cancel_work(ctx->io_wq, &cancel_req->work); + io_put_req(cancel_req); } - io_wq_cancel_work(ctx->io_wq, &cancel_req->work); - io_put_req(cancel_req); schedule(); finish_wait(&ctx->inflight_wait, &wait); } @@ -7530,7 +7593,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, submitted = to_submit; } else if (to_submit) { mutex_lock(&ctx->uring_lock); - submitted = io_submit_sqes(ctx, to_submit, f.file, fd, false); + submitted = io_submit_sqes(ctx, to_submit, f.file, fd); mutex_unlock(&ctx->uring_lock); if (submitted != to_submit) @@ -7841,6 +7904,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries); p->cq_off.overflow = offsetof(struct io_rings, cq_overflow); p->cq_off.cqes = offsetof(struct io_rings, cqes); + p->cq_off.flags = offsetof(struct io_rings, cq_flags); p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP | IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS | @@ -8001,7 +8065,7 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, * after we've killed the percpu ref. */ mutex_unlock(&ctx->uring_lock); - ret = wait_for_completion_interruptible(&ctx->completions[0]); + ret = wait_for_completion_interruptible(&ctx->ref_comp); mutex_lock(&ctx->uring_lock); if (ret) { percpu_ref_resurrect(&ctx->refs); @@ -8078,7 +8142,7 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, /* bring the ctx back to life */ percpu_ref_reinit(&ctx->refs); out: - reinit_completion(&ctx->completions[0]); + reinit_completion(&ctx->ref_comp); } return ret; } |