diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2022-12-13 19:40:31 +0100 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2022-12-13 19:40:31 +0100 |
commit | 96f7e448b9f4546ffd0356ffceb2b9586777f316 (patch) | |
tree | 099ba6679727c3d5d6e2e575ac994cf07019f68c /io_uring | |
parent | Merge tag 'for-6.2/io_uring-2022-12-08' of git://git.kernel.dk/linux (diff) | |
parent | io_uring/msg_ring: flag target ring as having task_work, if needed (diff) | |
download | linux-96f7e448b9f4546ffd0356ffceb2b9586777f316.tar.xz linux-96f7e448b9f4546ffd0356ffceb2b9586777f316.zip |
Merge tag 'for-6.2/io_uring-next-2022-12-08' of git://git.kernel.dk/linux
Pull io_uring updates part two from Jens Axboe:
- Misc fixes (me, Lin)
- Series from Pavel extending the single task exclusive ring mode,
yielding nice improvements for the common case of having a single
ring per thread (Pavel)
- Cleanup for MSG_RING, removing our IOPOLL hack (Pavel)
- Further poll cleanups and fixes (Pavel)
- Misc cleanups and fixes (Pavel)
* tag 'for-6.2/io_uring-next-2022-12-08' of git://git.kernel.dk/linux: (22 commits)
io_uring/msg_ring: flag target ring as having task_work, if needed
io_uring: skip spinlocking for ->task_complete
io_uring: do msg_ring in target task via tw
io_uring: extract a io_msg_install_complete helper
io_uring: get rid of double locking
io_uring: never run tw and fallback in parallel
io_uring: use tw for putting rsrc
io_uring: force multishot CQEs into task context
io_uring: complete all requests in task context
io_uring: don't check overflow flush failures
io_uring: skip overflow CQE posting for dying ring
io_uring: improve io_double_lock_ctx fail handling
io_uring: dont remove file from msg_ring reqs
io_uring: reshuffle issue_flags
io_uring: don't reinstall quiesce node for each tw
io_uring: improve rsrc quiesce refs checks
io_uring: don't raw spin unlock to match cq_lock
io_uring: combine poll tw handlers
io_uring: improve poll warning handling
io_uring: remove ctx variable in io_poll_check_events
...
Diffstat (limited to 'io_uring')
-rw-r--r-- | io_uring/io_uring.c | 167 | ||||
-rw-r--r-- | io_uring/io_uring.h | 15 | ||||
-rw-r--r-- | io_uring/msg_ring.c | 164 | ||||
-rw-r--r-- | io_uring/msg_ring.h | 1 | ||||
-rw-r--r-- | io_uring/net.c | 21 | ||||
-rw-r--r-- | io_uring/opdef.c | 8 | ||||
-rw-r--r-- | io_uring/opdef.h | 2 | ||||
-rw-r--r-- | io_uring/poll.c | 98 | ||||
-rw-r--r-- | io_uring/rsrc.c | 72 | ||||
-rw-r--r-- | io_uring/rsrc.h | 1 |
10 files changed, 359 insertions, 190 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 3436e0b83534..b521186efa5c 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -149,6 +149,7 @@ static void io_clean_op(struct io_kiocb *req); static void io_queue_sqe(struct io_kiocb *req); static void io_move_task_work_from_local(struct io_ring_ctx *ctx); static void __io_submit_flush_completions(struct io_ring_ctx *ctx); +static __cold void io_fallback_tw(struct io_uring_task *tctx); static struct kmem_cache *req_cachep; @@ -326,6 +327,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) spin_lock_init(&ctx->rsrc_ref_lock); INIT_LIST_HEAD(&ctx->rsrc_ref_list); INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work); + init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw); init_llist_head(&ctx->rsrc_put_llist); init_llist_head(&ctx->work_llist); INIT_LIST_HEAD(&ctx->tctx_list); @@ -582,13 +584,25 @@ void __io_commit_cqring_flush(struct io_ring_ctx *ctx) io_eventfd_flush_signal(ctx); } +static inline void __io_cq_lock(struct io_ring_ctx *ctx) + __acquires(ctx->completion_lock) +{ + if (!ctx->task_complete) + spin_lock(&ctx->completion_lock); +} + +static inline void __io_cq_unlock(struct io_ring_ctx *ctx) +{ + if (!ctx->task_complete) + spin_unlock(&ctx->completion_lock); +} + /* keep it inlined for io_submit_flush_completions() */ -static inline void io_cq_unlock_post_inline(struct io_ring_ctx *ctx) +static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx) __releases(ctx->completion_lock) { io_commit_cqring(ctx); - spin_unlock(&ctx->completion_lock); - + __io_cq_unlock(ctx); io_commit_cqring_flush(ctx); io_cqring_wake(ctx); } @@ -596,17 +610,37 @@ static inline void io_cq_unlock_post_inline(struct io_ring_ctx *ctx) void io_cq_unlock_post(struct io_ring_ctx *ctx) __releases(ctx->completion_lock) { - io_cq_unlock_post_inline(ctx); + io_commit_cqring(ctx); + spin_unlock(&ctx->completion_lock); + io_commit_cqring_flush(ctx); + io_cqring_wake(ctx); } /* Returns true if there are no backlogged entries after the flush */ -static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) +static void io_cqring_overflow_kill(struct io_ring_ctx *ctx) +{ + struct io_overflow_cqe *ocqe; + LIST_HEAD(list); + + io_cq_lock(ctx); + list_splice_init(&ctx->cq_overflow_list, &list); + clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); + io_cq_unlock(ctx); + + while (!list_empty(&list)) { + ocqe = list_first_entry(&list, struct io_overflow_cqe, list); + list_del(&ocqe->list); + kfree(ocqe); + } +} + +/* Returns true if there are no backlogged entries after the flush */ +static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx) { - bool all_flushed; size_t cqe_size = sizeof(struct io_uring_cqe); - if (!force && __io_cqring_events(ctx) == ctx->cq_entries) - return false; + if (__io_cqring_events(ctx) == ctx->cq_entries) + return; if (ctx->flags & IORING_SETUP_CQE32) cqe_size <<= 1; @@ -616,43 +650,32 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true); struct io_overflow_cqe *ocqe; - if (!cqe && !force) + if (!cqe) break; ocqe = list_first_entry(&ctx->cq_overflow_list, struct io_overflow_cqe, list); - if (cqe) - memcpy(cqe, &ocqe->cqe, cqe_size); - else - io_account_cq_overflow(ctx); - + memcpy(cqe, &ocqe->cqe, cqe_size); list_del(&ocqe->list); kfree(ocqe); } - all_flushed = list_empty(&ctx->cq_overflow_list); - if (all_flushed) { + if (list_empty(&ctx->cq_overflow_list)) { clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags); } - io_cq_unlock_post(ctx); - return all_flushed; } -static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx) +static void io_cqring_overflow_flush(struct io_ring_ctx *ctx) { - bool ret = true; - if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { /* iopoll syncs against uring_lock, not completion_lock */ if (ctx->flags & IORING_SETUP_IOPOLL) mutex_lock(&ctx->uring_lock); - ret = __io_cqring_overflow_flush(ctx, false); + __io_cqring_overflow_flush(ctx); if (ctx->flags & IORING_SETUP_IOPOLL) mutex_unlock(&ctx->uring_lock); } - - return ret; } void __io_put_task(struct task_struct *task, int nr) @@ -777,12 +800,13 @@ struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow) return &rings->cqes[off]; } -static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, - bool allow_overflow) +static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, + u32 cflags) { struct io_uring_cqe *cqe; - lockdep_assert_held(&ctx->completion_lock); + if (!ctx->task_complete) + lockdep_assert_held(&ctx->completion_lock); ctx->cq_extra++; @@ -805,10 +829,6 @@ static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 } return true; } - - if (allow_overflow) - return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0); - return false; } @@ -822,7 +842,17 @@ static void __io_flush_post_cqes(struct io_ring_ctx *ctx) for (i = 0; i < state->cqes_count; i++) { struct io_uring_cqe *cqe = &state->cqes[i]; - io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags, true); + if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) { + if (ctx->task_complete) { + spin_lock(&ctx->completion_lock); + io_cqring_event_overflow(ctx, cqe->user_data, + cqe->res, cqe->flags, 0, 0); + spin_unlock(&ctx->completion_lock); + } else { + io_cqring_event_overflow(ctx, cqe->user_data, + cqe->res, cqe->flags, 0, 0); + } + } } state->cqes_count = 0; } @@ -833,7 +863,10 @@ static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u bool filled; io_cq_lock(ctx); - filled = io_fill_cqe_aux(ctx, user_data, res, cflags, allow_overflow); + filled = io_fill_cqe_aux(ctx, user_data, res, cflags); + if (!filled && allow_overflow) + filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0); + io_cq_unlock_post(ctx); return filled; } @@ -857,10 +890,10 @@ bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32 lockdep_assert_held(&ctx->uring_lock); if (ctx->submit_state.cqes_count == length) { - io_cq_lock(ctx); + __io_cq_lock(ctx); __io_flush_post_cqes(ctx); /* no need to flush - flush is deferred */ - spin_unlock(&ctx->completion_lock); + __io_cq_unlock_post(ctx); } /* For defered completions this is not as strict as it is otherwise, @@ -915,8 +948,11 @@ static void __io_req_complete_post(struct io_kiocb *req) void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) { - if (!(issue_flags & IO_URING_F_UNLOCKED) || - !(req->ctx->flags & IORING_SETUP_IOPOLL)) { + if (req->ctx->task_complete && (issue_flags & IO_URING_F_IOWQ)) { + req->io_task_work.func = io_req_task_complete; + io_req_task_work_add(req); + } else if (!(issue_flags & IO_URING_F_UNLOCKED) || + !(req->ctx->flags & IORING_SETUP_IOPOLL)) { __io_req_complete_post(req); } else { struct io_ring_ctx *ctx = req->ctx; @@ -1139,10 +1175,17 @@ void tctx_task_work(struct callback_head *cb) struct io_uring_task *tctx = container_of(cb, struct io_uring_task, task_work); struct llist_node fake = {}; - struct llist_node *node = io_llist_xchg(&tctx->task_list, &fake); + struct llist_node *node; unsigned int loops = 1; - unsigned int count = handle_tw_list(node, &ctx, &uring_locked, NULL); + unsigned int count; + + if (unlikely(current->flags & PF_EXITING)) { + io_fallback_tw(tctx); + return; + } + node = io_llist_xchg(&tctx->task_list, &fake); + count = handle_tw_list(node, &ctx, &uring_locked, NULL); node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL); while (node != &fake) { loops++; @@ -1385,7 +1428,7 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) struct io_wq_work_node *node, *prev; struct io_submit_state *state = &ctx->submit_state; - io_cq_lock(ctx); + __io_cq_lock(ctx); /* must come first to preserve CQE ordering in failure cases */ if (state->cqes_count) __io_flush_post_cqes(ctx); @@ -1393,10 +1436,18 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) struct io_kiocb *req = container_of(node, struct io_kiocb, comp_list); - if (!(req->flags & REQ_F_CQE_SKIP)) - __io_fill_cqe_req(ctx, req); + if (!(req->flags & REQ_F_CQE_SKIP) && + unlikely(!__io_fill_cqe_req(ctx, req))) { + if (ctx->task_complete) { + spin_lock(&ctx->completion_lock); + io_req_cqe_overflow(req); + spin_unlock(&ctx->completion_lock); + } else { + io_req_cqe_overflow(req); + } + } } - io_cq_unlock_post_inline(ctx); + __io_cq_unlock_post(ctx); if (!wq_list_empty(&ctx->submit_state.compl_reqs)) { io_free_batch_list(ctx, state->compl_reqs.first); @@ -1467,7 +1518,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) check_cq = READ_ONCE(ctx->check_cq); if (unlikely(check_cq)) { if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) - __io_cqring_overflow_flush(ctx, false); + __io_cqring_overflow_flush(ctx); /* * Similarly do not spin if we have not informed the user of any * dropped CQE. @@ -1799,7 +1850,7 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) return ret; /* If the op doesn't have a file, we're not polling for it */ - if ((req->ctx->flags & IORING_SETUP_IOPOLL) && req->file) + if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue) io_iopoll_req_issued(req, issue_flags); return 0; @@ -1808,8 +1859,6 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) int io_poll_issue(struct io_kiocb *req, bool *locked) { io_tw_lock(req->ctx, locked); - if (unlikely(req->task->flags & PF_EXITING)) - return -EFAULT; return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT| IO_URING_F_COMPLETE_DEFER); } @@ -1826,7 +1875,7 @@ void io_wq_submit_work(struct io_wq_work *work) { struct io_kiocb *req = container_of(work, struct io_kiocb, work); const struct io_op_def *def = &io_op_defs[req->opcode]; - unsigned int issue_flags = IO_URING_F_UNLOCKED; + unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ; bool needs_poll = false; int ret = 0, err = -ECANCELED; @@ -2482,11 +2531,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, trace_io_uring_cqring_wait(ctx, min_events); do { - /* if we can't even flush overflow, don't wait for more */ - if (!io_cqring_overflow_flush(ctx)) { - ret = -EBUSY; - break; - } + io_cqring_overflow_flush(ctx); prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, TASK_INTERRUPTIBLE); ret = io_cqring_wait_schedule(ctx, &iowq, timeout); @@ -2637,8 +2682,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) __io_sqe_buffers_unregister(ctx); if (ctx->file_data) __io_sqe_files_unregister(ctx); - if (ctx->rings) - __io_cqring_overflow_flush(ctx, true); + io_cqring_overflow_kill(ctx); io_eventfd_unregister(ctx); io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free); io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free); @@ -2781,6 +2825,12 @@ static __cold void io_ring_exit_work(struct work_struct *work) * as nobody else will be looking for them. */ do { + if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { + mutex_lock(&ctx->uring_lock); + io_cqring_overflow_kill(ctx); + mutex_unlock(&ctx->uring_lock); + } + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) io_move_task_work_from_local(ctx); @@ -2846,8 +2896,6 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) mutex_lock(&ctx->uring_lock); percpu_ref_kill(&ctx->refs); - if (ctx->rings) - __io_cqring_overflow_flush(ctx, true); xa_for_each(&ctx->personalities, index, creds) io_unregister_personality(ctx, index); if (ctx->rings) @@ -3489,6 +3537,11 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, if (!ctx) return -ENOMEM; + if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && + !(ctx->flags & IORING_SETUP_IOPOLL) && + !(ctx->flags & IORING_SETUP_SQPOLL)) + ctx->task_complete = true; + /* * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user * space applications don't need to do io completion events diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 062899b1fe86..1b2f0b2cc888 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -93,6 +93,11 @@ static inline void io_cq_lock(struct io_ring_ctx *ctx) spin_lock(&ctx->completion_lock); } +static inline void io_cq_unlock(struct io_ring_ctx *ctx) +{ + spin_unlock(&ctx->completion_lock); +} + void io_cq_unlock_post(struct io_ring_ctx *ctx); static inline struct io_uring_cqe *io_get_cqe_overflow(struct io_ring_ctx *ctx, @@ -128,7 +133,7 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx, */ cqe = io_get_cqe(ctx); if (unlikely(!cqe)) - return io_req_cqe_overflow(req); + return false; trace_io_uring_complete(req->ctx, req, req->cqe.user_data, req->cqe.res, req->cqe.flags, @@ -151,6 +156,14 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx, return true; } +static inline bool io_fill_cqe_req(struct io_ring_ctx *ctx, + struct io_kiocb *req) +{ + if (likely(__io_fill_cqe_req(ctx, req))) + return true; + return io_req_cqe_overflow(req); +} + static inline void req_set_fail(struct io_kiocb *req) { req->flags |= REQ_F_FAIL; diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c index afb543aab9f6..2d3cd945a531 100644 --- a/io_uring/msg_ring.c +++ b/io_uring/msg_ring.c @@ -15,6 +15,8 @@ struct io_msg { struct file *file; + struct file *src_file; + struct callback_head tw; u64 user_data; u32 len; u32 cmd; @@ -23,6 +25,34 @@ struct io_msg { u32 flags; }; +void io_msg_ring_cleanup(struct io_kiocb *req) +{ + struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); + + if (WARN_ON_ONCE(!msg->src_file)) + return; + + fput(msg->src_file); + msg->src_file = NULL; +} + +static void io_msg_tw_complete(struct callback_head *head) +{ + struct io_msg *msg = container_of(head, struct io_msg, tw); + struct io_kiocb *req = cmd_to_io_kiocb(msg); + struct io_ring_ctx *target_ctx = req->file->private_data; + int ret = 0; + + if (current->flags & PF_EXITING) + ret = -EOWNERDEAD; + else if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0)) + ret = -EOVERFLOW; + + if (ret < 0) + req_set_fail(req); + io_req_queue_tw_complete(req, ret); +} + static int io_msg_ring_data(struct io_kiocb *req) { struct io_ring_ctx *target_ctx = req->file->private_data; @@ -31,23 +61,29 @@ static int io_msg_ring_data(struct io_kiocb *req) if (msg->src_fd || msg->dst_fd || msg->flags) return -EINVAL; + if (target_ctx->task_complete && current != target_ctx->submitter_task) { + init_task_work(&msg->tw, io_msg_tw_complete); + if (task_work_add(target_ctx->submitter_task, &msg->tw, + TWA_SIGNAL_NO_IPI)) + return -EOWNERDEAD; + + atomic_or(IORING_SQ_TASKRUN, &target_ctx->rings->sq_flags); + return IOU_ISSUE_SKIP_COMPLETE; + } + if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0)) return 0; return -EOVERFLOW; } -static void io_double_unlock_ctx(struct io_ring_ctx *ctx, - struct io_ring_ctx *octx, +static void io_double_unlock_ctx(struct io_ring_ctx *octx, unsigned int issue_flags) { - if (issue_flags & IO_URING_F_UNLOCKED) - mutex_unlock(&ctx->uring_lock); mutex_unlock(&octx->uring_lock); } -static int io_double_lock_ctx(struct io_ring_ctx *ctx, - struct io_ring_ctx *octx, +static int io_double_lock_ctx(struct io_ring_ctx *octx, unsigned int issue_flags) { /* @@ -60,56 +96,49 @@ static int io_double_lock_ctx(struct io_ring_ctx *ctx, return -EAGAIN; return 0; } - - /* Always grab smallest value ctx first. We know ctx != octx. */ - if (ctx < octx) { - mutex_lock(&ctx->uring_lock); - mutex_lock(&octx->uring_lock); - } else { - mutex_lock(&octx->uring_lock); - mutex_lock(&ctx->uring_lock); - } - + mutex_lock(&octx->uring_lock); return 0; } -static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) +static struct file *io_msg_grab_file(struct io_kiocb *req, unsigned int issue_flags) { - struct io_ring_ctx *target_ctx = req->file->private_data; struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); struct io_ring_ctx *ctx = req->ctx; + struct file *file = NULL; unsigned long file_ptr; - struct file *src_file; - int ret; - - if (target_ctx == ctx) - return -EINVAL; - - ret = io_double_lock_ctx(ctx, target_ctx, issue_flags); - if (unlikely(ret)) - return ret; - - ret = -EBADF; - if (unlikely(msg->src_fd >= ctx->nr_user_files)) - goto out_unlock; + int idx = msg->src_fd; + + io_ring_submit_lock(ctx, issue_flags); + if (likely(idx < ctx->nr_user_files)) { + idx = array_index_nospec(idx, ctx->nr_user_files); + file_ptr = io_fixed_file_slot(&ctx->file_table, idx)->file_ptr; + file = (struct file *) (file_ptr & FFS_MASK); + if (file) + get_file(file); + } + io_ring_submit_unlock(ctx, issue_flags); + return file; +} - msg->src_fd = array_index_nospec(msg->src_fd, ctx->nr_user_files); - file_ptr = io_fixed_file_slot(&ctx->file_table, msg->src_fd)->file_ptr; - if (!file_ptr) - goto out_unlock; +static int io_msg_install_complete(struct io_kiocb *req, unsigned int issue_flags) +{ + struct io_ring_ctx *target_ctx = req->file->private_data; + struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); + struct file *src_file = msg->src_file; + int ret; - src_file = (struct file *) (file_ptr & FFS_MASK); - get_file(src_file); + if (unlikely(io_double_lock_ctx(target_ctx, issue_flags))) + return -EAGAIN; ret = __io_fixed_fd_install(target_ctx, src_file, msg->dst_fd); - if (ret < 0) { - fput(src_file); + if (ret < 0) goto out_unlock; - } + + msg->src_file = NULL; + req->flags &= ~REQ_F_NEED_CLEANUP; if (msg->flags & IORING_MSG_RING_CQE_SKIP) goto out_unlock; - /* * If this fails, the target still received the file descriptor but * wasn't notified of the fact. This means that if this request @@ -119,10 +148,51 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0)) ret = -EOVERFLOW; out_unlock: - io_double_unlock_ctx(ctx, target_ctx, issue_flags); + io_double_unlock_ctx(target_ctx, issue_flags); return ret; } +static void io_msg_tw_fd_complete(struct callback_head *head) +{ + struct io_msg *msg = container_of(head, struct io_msg, tw); + struct io_kiocb *req = cmd_to_io_kiocb(msg); + int ret = -EOWNERDEAD; + + if (!(current->flags & PF_EXITING)) + ret = io_msg_install_complete(req, IO_URING_F_UNLOCKED); + if (ret < 0) + req_set_fail(req); + io_req_queue_tw_complete(req, ret); +} + +static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) +{ + struct io_ring_ctx *target_ctx = req->file->private_data; + struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); + struct io_ring_ctx *ctx = req->ctx; + struct file *src_file = msg->src_file; + + if (target_ctx == ctx) + return -EINVAL; + if (!src_file) { + src_file = io_msg_grab_file(req, issue_flags); + if (!src_file) + return -EBADF; + msg->src_file = src_file; + req->flags |= REQ_F_NEED_CLEANUP; + } + + if (target_ctx->task_complete && current != target_ctx->submitter_task) { + init_task_work(&msg->tw, io_msg_tw_fd_complete); + if (task_work_add(target_ctx->submitter_task, &msg->tw, + TWA_SIGNAL)) + return -EOWNERDEAD; + + return IOU_ISSUE_SKIP_COMPLETE; + } + return io_msg_install_complete(req, issue_flags); +} + int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); @@ -130,6 +200,7 @@ int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) if (unlikely(sqe->buf_index || sqe->personality)) return -EINVAL; + msg->src_file = NULL; msg->user_data = READ_ONCE(sqe->off); msg->len = READ_ONCE(sqe->len); msg->cmd = READ_ONCE(sqe->addr); @@ -164,12 +235,11 @@ int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags) } done: - if (ret < 0) + if (ret < 0) { + if (ret == -EAGAIN || ret == IOU_ISSUE_SKIP_COMPLETE) + return ret; req_set_fail(req); + } io_req_set_res(req, ret, 0); - /* put file to avoid an attempt to IOPOLL the req */ - if (!(req->flags & REQ_F_FIXED_FILE)) - io_put_file(req->file); - req->file = NULL; return IOU_OK; } diff --git a/io_uring/msg_ring.h b/io_uring/msg_ring.h index fb9601f202d0..3987ee6c0e5f 100644 --- a/io_uring/msg_ring.h +++ b/io_uring/msg_ring.h @@ -2,3 +2,4 @@ int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe); int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags); +void io_msg_ring_cleanup(struct io_kiocb *req); diff --git a/io_uring/net.c b/io_uring/net.c index cb831326ea5b..5229976cb582 100644 --- a/io_uring/net.c +++ b/io_uring/net.c @@ -67,6 +67,19 @@ struct io_sr_msg { struct io_kiocb *notif; }; +static inline bool io_check_multishot(struct io_kiocb *req, + unsigned int issue_flags) +{ + /* + * When ->locked_cq is set we only allow to post CQEs from the original + * task context. Usual request completions will be handled in other + * generic paths but multipoll may decide to post extra cqes. + */ + return !(issue_flags & IO_URING_F_IOWQ) || + !(issue_flags & IO_URING_F_MULTISHOT) || + !req->ctx->task_complete; +} + int io_shutdown_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { struct io_shutdown *shutdown = io_kiocb_to_cmd(req, struct io_shutdown); @@ -730,6 +743,9 @@ int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags) (sr->flags & IORING_RECVSEND_POLL_FIRST)) return io_setup_async_msg(req, kmsg, issue_flags); + if (!io_check_multishot(req, issue_flags)) + return io_setup_async_msg(req, kmsg, issue_flags); + retry_multishot: if (io_do_buffer_select(req)) { void __user *buf; @@ -829,6 +845,9 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags) (sr->flags & IORING_RECVSEND_POLL_FIRST)) return -EAGAIN; + if (!io_check_multishot(req, issue_flags)) + return -EAGAIN; + sock = sock_from_file(req->file); if (unlikely(!sock)) return -ENOTSOCK; @@ -1280,6 +1299,8 @@ int io_accept(struct io_kiocb *req, unsigned int issue_flags) struct file *file; int ret, fd; + if (!io_check_multishot(req, issue_flags)) + return -EAGAIN; retry: if (!fixed) { fd = __get_unused_fd_flags(accept->flags, accept->nofile); diff --git a/io_uring/opdef.c b/io_uring/opdef.c index 83dc0f9ad3b2..3aa0d65c50e3 100644 --- a/io_uring/opdef.c +++ b/io_uring/opdef.c @@ -63,6 +63,7 @@ const struct io_op_def io_op_defs[] = { .audit_skip = 1, .ioprio = 1, .iopoll = 1, + .iopoll_queue = 1, .async_size = sizeof(struct io_async_rw), .name = "READV", .prep = io_prep_rw, @@ -80,6 +81,7 @@ const struct io_op_def io_op_defs[] = { .audit_skip = 1, .ioprio = 1, .iopoll = 1, + .iopoll_queue = 1, .async_size = sizeof(struct io_async_rw), .name = "WRITEV", .prep = io_prep_rw, @@ -103,6 +105,7 @@ const struct io_op_def io_op_defs[] = { .audit_skip = 1, .ioprio = 1, .iopoll = 1, + .iopoll_queue = 1, .async_size = sizeof(struct io_async_rw), .name = "READ_FIXED", .prep = io_prep_rw, @@ -118,6 +121,7 @@ const struct io_op_def io_op_defs[] = { .audit_skip = 1, .ioprio = 1, .iopoll = 1, + .iopoll_queue = 1, .async_size = sizeof(struct io_async_rw), .name = "WRITE_FIXED", .prep = io_prep_rw, @@ -277,6 +281,7 @@ const struct io_op_def io_op_defs[] = { .audit_skip = 1, .ioprio = 1, .iopoll = 1, + .iopoll_queue = 1, .async_size = sizeof(struct io_async_rw), .name = "READ", .prep = io_prep_rw, @@ -292,6 +297,7 @@ const struct io_op_def io_op_defs[] = { .audit_skip = 1, .ioprio = 1, .iopoll = 1, + .iopoll_queue = 1, .async_size = sizeof(struct io_async_rw), .name = "WRITE", .prep = io_prep_rw, @@ -439,6 +445,7 @@ const struct io_op_def io_op_defs[] = { .name = "MSG_RING", .prep = io_msg_ring_prep, .issue = io_msg_ring, + .cleanup = io_msg_ring_cleanup, }, [IORING_OP_FSETXATTR] = { .needs_file = 1, @@ -481,6 +488,7 @@ const struct io_op_def io_op_defs[] = { .plug = 1, .name = "URING_CMD", .iopoll = 1, + .iopoll_queue = 1, .async_size = uring_cmd_pdu_size(1), .prep = io_uring_cmd_prep, .issue = io_uring_cmd, diff --git a/io_uring/opdef.h b/io_uring/opdef.h index 3efe06d25473..df7e13d9bfba 100644 --- a/io_uring/opdef.h +++ b/io_uring/opdef.h @@ -25,6 +25,8 @@ struct io_op_def { unsigned ioprio : 1; /* supports iopoll */ unsigned iopoll : 1; + /* have to be put into the iopoll list */ + unsigned iopoll_queue : 1; /* opcode specific path will handle ->async_data allocation if needed */ unsigned manual_alloc : 1; /* size of async data needed, if any */ diff --git a/io_uring/poll.c b/io_uring/poll.c index 599ba28c89b2..ee7da6150ec4 100644 --- a/io_uring/poll.c +++ b/io_uring/poll.c @@ -237,7 +237,6 @@ enum { */ static int io_poll_check_events(struct io_kiocb *req, bool *locked) { - struct io_ring_ctx *ctx = req->ctx; int v, ret; /* req->task == current here, checking PF_EXITING is safe */ @@ -247,27 +246,30 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked) do { v = atomic_read(&req->poll_refs); - /* tw handler should be the owner, and so have some references */ - if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK))) - return IOU_POLL_DONE; - if (v & IO_POLL_CANCEL_FLAG) - return -ECANCELED; - /* - * cqe.res contains only events of the first wake up - * and all others are be lost. Redo vfs_poll() to get - * up to date state. - */ - if ((v & IO_POLL_REF_MASK) != 1) - req->cqe.res = 0; - if (v & IO_POLL_RETRY_FLAG) { - req->cqe.res = 0; + if (unlikely(v != 1)) { + /* tw should be the owner and so have some refs */ + if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK))) + return IOU_POLL_NO_ACTION; + if (v & IO_POLL_CANCEL_FLAG) + return -ECANCELED; /* - * We won't find new events that came in between - * vfs_poll and the ref put unless we clear the flag - * in advance. + * cqe.res contains only events of the first wake up + * and all others are to be lost. Redo vfs_poll() to get + * up to date state. */ - atomic_andnot(IO_POLL_RETRY_FLAG, &req->poll_refs); - v &= ~IO_POLL_RETRY_FLAG; + if ((v & IO_POLL_REF_MASK) != 1) + req->cqe.res = 0; + + if (v & IO_POLL_RETRY_FLAG) { + req->cqe.res = 0; + /* + * We won't find new events that came in between + * vfs_poll and the ref put unless we clear the + * flag in advance. + */ + atomic_andnot(IO_POLL_RETRY_FLAG, &req->poll_refs); + v &= ~IO_POLL_RETRY_FLAG; + } } /* the mask was stashed in __io_poll_execute */ @@ -286,7 +288,7 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked) __poll_t mask = mangle_poll(req->cqe.res & req->apoll_events); - if (!io_aux_cqe(ctx, *locked, req->cqe.user_data, + if (!io_aux_cqe(req->ctx, *locked, req->cqe.user_data, mask, IORING_CQE_F_MORE, false)) { io_req_set_res(req, mask, 0); return IOU_POLL_REMOVE_POLL_USE_RES; @@ -319,50 +321,38 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked) ret = io_poll_check_events(req, locked); if (ret == IOU_POLL_NO_ACTION) return; - - if (ret == IOU_POLL_DONE) { - struct io_poll *poll = io_kiocb_to_cmd(req, struct io_poll); - req->cqe.res = mangle_poll(req->cqe.res & poll->events); - } else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) { - req->cqe.res = ret; - req_set_fail(req); - } - io_poll_remove_entries(req); io_poll_tw_hash_eject(req, locked); - io_req_set_res(req, req->cqe.res, 0); - io_req_task_complete(req, locked); -} - -static void io_apoll_task_func(struct io_kiocb *req, bool *locked) -{ - int ret; - - ret = io_poll_check_events(req, locked); - if (ret == IOU_POLL_NO_ACTION) - return; + if (req->opcode == IORING_OP_POLL_ADD) { + if (ret == IOU_POLL_DONE) { + struct io_poll *poll; - io_tw_lock(req->ctx, locked); - io_poll_remove_entries(req); - io_poll_tw_hash_eject(req, locked); + poll = io_kiocb_to_cmd(req, struct io_poll); + req->cqe.res = mangle_poll(req->cqe.res & poll->events); + } else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) { + req->cqe.res = ret; + req_set_fail(req); + } - if (ret == IOU_POLL_REMOVE_POLL_USE_RES) + io_req_set_res(req, req->cqe.res, 0); io_req_task_complete(req, locked); - else if (ret == IOU_POLL_DONE) - io_req_task_submit(req, locked); - else - io_req_defer_failed(req, ret); + } else { + io_tw_lock(req->ctx, locked); + + if (ret == IOU_POLL_REMOVE_POLL_USE_RES) + io_req_task_complete(req, locked); + else if (ret == IOU_POLL_DONE) + io_req_task_submit(req, locked); + else + io_req_defer_failed(req, ret); + } } static void __io_poll_execute(struct io_kiocb *req, int mask) { io_req_set_res(req, mask, 0); - - if (req->opcode == IORING_OP_POLL_ADD) - req->io_task_work.func = io_poll_task_func; - else - req->io_task_work.func = io_apoll_task_func; + req->io_task_work.func = io_poll_task_func; trace_io_uring_task_add(req, mask); io_req_task_work_add(req); diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index 133608200769..18de10c68a15 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -204,6 +204,14 @@ void io_rsrc_put_work(struct work_struct *work) } } +void io_rsrc_put_tw(struct callback_head *cb) +{ + struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx, + rsrc_put_tw); + + io_rsrc_put_work(&ctx->rsrc_put_work.work); +} + void io_wait_rsrc_data(struct io_rsrc_data *data) { if (data && !atomic_dec_and_test(&data->refs)) @@ -242,8 +250,15 @@ static __cold void io_rsrc_node_ref_zero(struct percpu_ref *ref) } spin_unlock_irqrestore(&ctx->rsrc_ref_lock, flags); - if (first_add) - mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay); + if (!first_add) + return; + + if (ctx->submitter_task) { + if (!task_work_add(ctx->submitter_task, &ctx->rsrc_put_tw, + ctx->notify_method)) + return; + } + mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay); } static struct io_rsrc_node *io_rsrc_node_alloc(void) @@ -309,46 +324,41 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, /* As we may drop ->uring_lock, other task may have started quiesce */ if (data->quiesce) return -ENXIO; + ret = io_rsrc_node_switch_start(ctx); + if (ret) + return ret; + io_rsrc_node_switch(ctx, data); + + /* kill initial ref, already quiesced if zero */ + if (atomic_dec_and_test(&data->refs)) + return 0; data->quiesce = true; + mutex_unlock(&ctx->uring_lock); do { - ret = io_rsrc_node_switch_start(ctx); - if (ret) - break; - io_rsrc_node_switch(ctx, data); - - /* kill initial ref, already quiesced if zero */ - if (atomic_dec_and_test(&data->refs)) - break; - mutex_unlock(&ctx->uring_lock); - ret = io_run_task_work_sig(ctx); - if (ret < 0) - goto reinit; + if (ret < 0) { + atomic_inc(&data->refs); + /* wait for all works potentially completing data->done */ + flush_delayed_work(&ctx->rsrc_put_work); + reinit_completion(&data->done); + mutex_lock(&ctx->uring_lock); + break; + } flush_delayed_work(&ctx->rsrc_put_work); ret = wait_for_completion_interruptible(&data->done); if (!ret) { mutex_lock(&ctx->uring_lock); - if (atomic_read(&data->refs) > 0) { - /* - * it has been revived by another thread while - * we were unlocked - */ - mutex_unlock(&ctx->uring_lock); - } else { + if (atomic_read(&data->refs) <= 0) break; - } + /* + * it has been revived by another thread while + * we were unlocked + */ + mutex_unlock(&ctx->uring_lock); } - -reinit: - atomic_inc(&data->refs); - /* wait for all works potentially completing data->done */ - flush_delayed_work(&ctx->rsrc_put_work); - reinit_completion(&data->done); - - mutex_lock(&ctx->uring_lock); - } while (ret >= 0); + } while (1); data->quiesce = false; return ret; diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index 81445a477622..2b8743645efc 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -53,6 +53,7 @@ struct io_mapped_ubuf { struct bio_vec bvec[]; }; +void io_rsrc_put_tw(struct callback_head *cb); void io_rsrc_put_work(struct work_struct *work); void io_rsrc_refs_refill(struct io_ring_ctx *ctx); void io_wait_rsrc_data(struct io_rsrc_data *data); |