diff options
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r-- | fs/io-wq.c | 360 |
1 files changed, 158 insertions, 202 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c index 3dc10bfd8c3b..5361a9b4b47b 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -16,9 +16,7 @@ #include <linux/rculist_nulls.h> #include <linux/cpu.h> #include <linux/tracehook.h> -#include <linux/freezer.h> -#include "../kernel/sched/sched.h" #include "io-wq.h" #define WORKER_IDLE_TIMEOUT (5 * HZ) @@ -69,6 +67,7 @@ struct io_worker { struct io_wqe_acct { unsigned nr_workers; unsigned max_workers; + int index; atomic_t nr_running; }; @@ -109,19 +108,16 @@ struct io_wq { free_work_fn *free_work; io_wq_work_fn *do_work; - struct task_struct *manager; - struct io_wq_hash *hash; refcount_t refs; - struct completion exited; atomic_t worker_refs; struct completion worker_done; struct hlist_node cpuhp_node; - pid_t task_pid; + struct task_struct *task; }; static enum cpuhp_state io_wq_online; @@ -134,8 +130,7 @@ struct io_cb_cancel_data { bool cancel_all; }; -static void io_wqe_cancel_pending_work(struct io_wqe *wqe, - struct io_cb_cancel_data *match); +static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index); static bool io_worker_get(struct io_worker *worker) { @@ -148,23 +143,26 @@ static void io_worker_release(struct io_worker *worker) complete(&worker->ref_done); } +static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound) +{ + return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; +} + static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, struct io_wq_work *work) { - if (work->flags & IO_WQ_WORK_UNBOUND) - return &wqe->acct[IO_WQ_ACCT_UNBOUND]; - - return &wqe->acct[IO_WQ_ACCT_BOUND]; + return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND)); } static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker) { - struct io_wqe *wqe = worker->wqe; - - if (worker->flags & IO_WORKER_F_BOUND) - return &wqe->acct[IO_WQ_ACCT_BOUND]; + return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND); +} - return &wqe->acct[IO_WQ_ACCT_UNBOUND]; +static void io_worker_ref_put(struct io_wq *wq) +{ + if (atomic_dec_and_test(&wq->worker_refs)) + complete(&wq->worker_done); } static void io_worker_exit(struct io_worker *worker) @@ -194,8 +192,7 @@ static void io_worker_exit(struct io_worker *worker) raw_spin_unlock_irq(&wqe->lock); kfree_rcu(worker, rcu); - if (atomic_dec_and_test(&wqe->wq->worker_refs)) - complete(&wqe->wq->worker_done); + io_worker_ref_put(wqe->wq); do_exit(0); } @@ -210,7 +207,7 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe) /* * Check head of free list for an available worker. If one isn't available, - * caller must wake up the wq manager to create one. + * caller must create one. */ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) __must_hold(RCU) @@ -234,7 +231,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) /* * We need a worker. If we find a free one, we're good. If not, and we're - * below the max number of workers, wake up the manager to create one. + * below the max number of workers, create one. */ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) { @@ -250,8 +247,11 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) ret = io_wqe_activate_free_worker(wqe); rcu_read_unlock(); - if (!ret && acct->nr_workers < acct->max_workers) - wake_up_process(wqe->wq->manager); + if (!ret && acct->nr_workers < acct->max_workers) { + atomic_inc(&acct->nr_running); + atomic_inc(&wqe->wq->worker_refs); + create_io_worker(wqe->wq, wqe, acct->index); + } } static void io_wqe_inc_running(struct io_worker *worker) @@ -261,14 +261,61 @@ static void io_wqe_inc_running(struct io_worker *worker) atomic_inc(&acct->nr_running); } +struct create_worker_data { + struct callback_head work; + struct io_wqe *wqe; + int index; +}; + +static void create_worker_cb(struct callback_head *cb) +{ + struct create_worker_data *cwd; + struct io_wq *wq; + + cwd = container_of(cb, struct create_worker_data, work); + wq = cwd->wqe->wq; + create_io_worker(wq, cwd->wqe, cwd->index); + kfree(cwd); +} + +static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct) +{ + struct create_worker_data *cwd; + struct io_wq *wq = wqe->wq; + + /* raced with exit, just ignore create call */ + if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) + goto fail; + + cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC); + if (cwd) { + init_task_work(&cwd->work, create_worker_cb); + cwd->wqe = wqe; + cwd->index = acct->index; + if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL)) + return; + + kfree(cwd); + } +fail: + atomic_dec(&acct->nr_running); + io_worker_ref_put(wq); +} + static void io_wqe_dec_running(struct io_worker *worker) __must_hold(wqe->lock) { struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; - if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) - io_wqe_wake_worker(wqe, acct); + if (!(worker->flags & IO_WORKER_F_UP)) + return; + + if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) { + atomic_inc(&acct->nr_running); + atomic_inc(&wqe->wq->worker_refs); + io_queue_worker_create(wqe, acct); + } } /* @@ -281,6 +328,8 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, { bool worker_bound, work_bound; + BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1); + if (worker->flags & IO_WORKER_F_FREE) { worker->flags &= ~IO_WORKER_F_FREE; hlist_nulls_del_init_rcu(&worker->nulls_node); @@ -293,16 +342,11 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; if (worker_bound != work_bound) { + int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND; io_wqe_dec_running(worker); - if (work_bound) { - worker->flags |= IO_WORKER_F_BOUND; - wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--; - wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++; - } else { - worker->flags &= ~IO_WORKER_F_BOUND; - wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++; - wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--; - } + worker->flags ^= IO_WORKER_F_BOUND; + wqe->acct[index].nr_workers--; + wqe->acct[index ^ 1].nr_workers++; io_wqe_inc_running(worker); } } @@ -388,11 +432,9 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) static bool io_flush_signals(void) { - if (unlikely(test_tsk_thread_flag(current, TIF_NOTIFY_SIGNAL))) { + if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) { __set_current_state(TASK_RUNNING); - if (current->task_works) - task_work_run(); - clear_tsk_thread_flag(current, TIF_NOTIFY_SIGNAL); + tracehook_notify_signal(); return true; } return false; @@ -418,6 +460,7 @@ static void io_worker_handle_work(struct io_worker *worker) { struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; + bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); do { struct io_wq_work *work; @@ -447,6 +490,9 @@ get_next: unsigned int hash = io_get_work_hash(work); next_hashed = wq_next_work(work); + + if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND)) + work->flags |= IO_WQ_WORK_CANCEL; wq->do_work(work); io_assign_current_work(worker, NULL); @@ -485,9 +531,8 @@ static int io_wqe_worker(void *data) char buf[TASK_COMM_LEN]; worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); - io_wqe_inc_running(worker); - sprintf(buf, "iou-wrk-%d", wq->task_pid); + snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); set_task_comm(current, buf); while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { @@ -505,10 +550,15 @@ loop: if (io_flush_signals()) continue; ret = schedule_timeout(WORKER_IDLE_TIMEOUT); - if (try_to_freeze() || ret) - continue; - if (fatal_signal_pending(current)) + if (signal_pending(current)) { + struct ksignal ksig; + + if (!get_signal(&ksig)) + continue; break; + } + if (ret) + continue; /* timed out, exit unless we're the fixed worker */ if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || !(worker->flags & IO_WORKER_F_FIXED)) @@ -546,8 +596,7 @@ void io_wq_worker_running(struct task_struct *tsk) /* * Called when worker is going to sleep. If there are no workers currently - * running and we have work pending, wake up a free one or have the manager - * set one up. + * running and we have work pending, wake up a free one or create a new one. */ void io_wq_worker_sleeping(struct task_struct *tsk) { @@ -567,7 +616,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk) raw_spin_unlock_irq(&worker->wqe->lock); } -static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) +static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) { struct io_wqe_acct *acct = &wqe->acct[index]; struct io_worker *worker; @@ -577,7 +626,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); if (!worker) - return false; + goto fail; refcount_set(&worker->ref, 1); worker->nulls_node.pprev = NULL; @@ -585,14 +634,13 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) spin_lock_init(&worker->lock); init_completion(&worker->ref_done); - atomic_inc(&wq->worker_refs); - tsk = create_io_thread(io_wqe_worker, worker, wqe->node); if (IS_ERR(tsk)) { - if (atomic_dec_and_test(&wq->worker_refs)) - complete(&wq->worker_done); kfree(worker); - return false; +fail: + atomic_dec(&acct->nr_running); + io_worker_ref_put(wq); + return; } tsk->pf_io_worker = worker; @@ -611,20 +659,6 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) acct->nr_workers++; raw_spin_unlock_irq(&wqe->lock); wake_up_new_task(tsk); - return true; -} - -static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) - __must_hold(wqe->lock) -{ - struct io_wqe_acct *acct = &wqe->acct[index]; - - if (acct->nr_workers && test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) - return false; - /* if we have available workers or no work, no need */ - if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe)) - return false; - return acct->nr_workers < acct->max_workers; } /* @@ -659,89 +693,11 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data) return false; } -static void io_wq_check_workers(struct io_wq *wq) -{ - int node; - - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - bool fork_worker[2] = { false, false }; - - if (!node_online(node)) - continue; - - raw_spin_lock_irq(&wqe->lock); - if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) - fork_worker[IO_WQ_ACCT_BOUND] = true; - if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND)) - fork_worker[IO_WQ_ACCT_UNBOUND] = true; - raw_spin_unlock_irq(&wqe->lock); - if (fork_worker[IO_WQ_ACCT_BOUND]) - create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND); - if (fork_worker[IO_WQ_ACCT_UNBOUND]) - create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND); - } -} - static bool io_wq_work_match_all(struct io_wq_work *work, void *data) { return true; } -static void io_wq_cancel_pending(struct io_wq *wq) -{ - struct io_cb_cancel_data match = { - .fn = io_wq_work_match_all, - .cancel_all = true, - }; - int node; - - for_each_node(node) - io_wqe_cancel_pending_work(wq->wqes[node], &match); -} - -/* - * Manager thread. Tasked with creating new workers, if we need them. - */ -static int io_wq_manager(void *data) -{ - struct io_wq *wq = data; - char buf[TASK_COMM_LEN]; - int node; - - sprintf(buf, "iou-mgr-%d", wq->task_pid); - set_task_comm(current, buf); - - do { - set_current_state(TASK_INTERRUPTIBLE); - io_wq_check_workers(wq); - schedule_timeout(HZ); - try_to_freeze(); - if (fatal_signal_pending(current)) - set_bit(IO_WQ_BIT_EXIT, &wq->state); - } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)); - - io_wq_check_workers(wq); - - rcu_read_lock(); - for_each_node(node) - io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); - rcu_read_unlock(); - - if (atomic_dec_and_test(&wq->worker_refs)) - complete(&wq->worker_done); - wait_for_completion(&wq->worker_done); - - spin_lock_irq(&wq->hash->wait.lock); - for_each_node(node) - list_del_init(&wq->wqes[node]->wait.entry); - spin_unlock_irq(&wq->hash->wait.lock); - - io_wq_cancel_pending(wq); - complete(&wq->exited); - do_exit(0); -} - static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) { struct io_wq *wq = wqe->wq; @@ -773,39 +729,13 @@ append: wq_list_add_after(&work->list, &tail->list, &wqe->work_list); } -static int io_wq_fork_manager(struct io_wq *wq) -{ - struct task_struct *tsk; - - if (wq->manager) - return 0; - - WARN_ON_ONCE(test_bit(IO_WQ_BIT_EXIT, &wq->state)); - - init_completion(&wq->worker_done); - atomic_set(&wq->worker_refs, 1); - tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE); - if (!IS_ERR(tsk)) { - wq->manager = get_task_struct(tsk); - wake_up_new_task(tsk); - return 0; - } - - if (atomic_dec_and_test(&wq->worker_refs)) - complete(&wq->worker_done); - - return PTR_ERR(tsk); -} - static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { struct io_wqe_acct *acct = io_work_get_acct(wqe, work); int work_flags; unsigned long flags; - /* Can only happen if manager creation fails after exec */ - if (io_wq_fork_manager(wqe->wq) || - test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) { + if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) { io_run_cancel(work, wqe); return; } @@ -960,17 +890,12 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key) { struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); - int ret; list_del_init(&wait->entry); rcu_read_lock(); - ret = io_wqe_activate_free_worker(wqe); + io_wqe_activate_free_worker(wqe); rcu_read_unlock(); - - if (!ret) - wake_up_process(wqe->wq->manager); - return 1; } @@ -1011,6 +936,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) goto err; wq->wqes[node] = wqe; wqe->node = alloc_node; + wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND; + wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND; wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = @@ -1025,13 +952,11 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) INIT_LIST_HEAD(&wqe->all_list); } - wq->task_pid = current->pid; - init_completion(&wq->exited); + wq->task = get_task_struct(data->task); refcount_set(&wq->refs, 1); - - ret = io_wq_fork_manager(wq); - if (!ret) - return wq; + atomic_set(&wq->worker_refs, 1); + init_completion(&wq->worker_done); + return wq; err: io_wq_put_hash(data->hash); cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); @@ -1044,14 +969,49 @@ err_wq: return ERR_PTR(ret); } -static void io_wq_destroy_manager(struct io_wq *wq) +static bool io_task_work_match(struct callback_head *cb, void *data) { - if (wq->manager) { - wake_up_process(wq->manager); - wait_for_completion(&wq->exited); - put_task_struct(wq->manager); - wq->manager = NULL; + struct create_worker_data *cwd; + + if (cb->func != create_worker_cb) + return false; + cwd = container_of(cb, struct create_worker_data, work); + return cwd->wqe->wq == data; +} + +static void io_wq_exit_workers(struct io_wq *wq) +{ + struct callback_head *cb; + int node; + + set_bit(IO_WQ_BIT_EXIT, &wq->state); + + if (!wq->task) + return; + + while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { + struct create_worker_data *cwd; + + cwd = container_of(cb, struct create_worker_data, work); + atomic_dec(&cwd->wqe->acct[cwd->index].nr_running); + io_worker_ref_put(wq); + kfree(cwd); + } + + rcu_read_lock(); + for_each_node(node) { + struct io_wqe *wqe = wq->wqes[node]; + + io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL); + spin_lock_irq(&wq->hash->wait.lock); + list_del_init(&wq->wqes[node]->wait.entry); + spin_unlock_irq(&wq->hash->wait.lock); } + rcu_read_unlock(); + io_worker_ref_put(wq); + wait_for_completion(&wq->worker_done); + put_task_struct(wq->task); + wq->task = NULL; } static void io_wq_destroy(struct io_wq *wq) @@ -1060,12 +1020,15 @@ static void io_wq_destroy(struct io_wq *wq) cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); - set_bit(IO_WQ_BIT_EXIT, &wq->state); - io_wq_destroy_manager(wq); + io_wq_exit_workers(wq); for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; - WARN_ON_ONCE(!wq_list_empty(&wqe->work_list)); + struct io_cb_cancel_data match = { + .fn = io_wq_work_match_all, + .cancel_all = true, + }; + io_wqe_cancel_pending_work(wqe, &match); kfree(wqe); } io_wq_put_hash(wq->hash); @@ -1081,21 +1044,14 @@ void io_wq_put(struct io_wq *wq) void io_wq_put_and_exit(struct io_wq *wq) { - set_bit(IO_WQ_BIT_EXIT, &wq->state); - io_wq_destroy_manager(wq); + io_wq_exit_workers(wq); io_wq_put(wq); } static bool io_wq_worker_affinity(struct io_worker *worker, void *data) { - struct task_struct *task = worker->task; - struct rq_flags rf; - struct rq *rq; - - rq = task_rq_lock(task, &rf); - do_set_cpus_allowed(task, cpumask_of_node(worker->wqe->node)); - task->flags |= PF_NO_SETAFFINITY; - task_rq_unlock(rq, task, &rf); + set_cpus_allowed_ptr(worker->task, cpumask_of_node(worker->wqe->node)); + return false; } |