diff options
Diffstat (limited to 'io_uring/io-wq.c')
-rw-r--r-- | io_uring/io-wq.c | 180 |
1 files changed, 70 insertions, 110 deletions
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c index f81c0a7136a5..44d522c5d36f 100644 --- a/io_uring/io-wq.c +++ b/io_uring/io-wq.c @@ -15,6 +15,7 @@ #include <linux/cpu.h> #include <linux/task_work.h> #include <linux/audit.h> +#include <linux/mmu_context.h> #include <uapi/linux/io_uring.h> #include "io-wq.h" @@ -96,8 +97,6 @@ struct io_wqe { raw_spinlock_t lock; struct io_wqe_acct acct[IO_WQ_ACCT_NR]; - int node; - struct hlist_nulls_head free_list; struct list_head all_list; @@ -127,7 +126,7 @@ struct io_wq { struct task_struct *task; - struct io_wqe *wqes[]; + struct io_wqe wqe; }; static enum cpuhp_state io_wq_online; @@ -754,7 +753,7 @@ static void create_worker_cont(struct callback_head *cb) worker = container_of(cb, struct io_worker, create_work); clear_bit_unlock(0, &worker->create_state); wqe = worker->wqe; - tsk = create_io_thread(io_wqe_worker, worker, wqe->node); + tsk = create_io_thread(io_wqe_worker, worker, NUMA_NO_NODE); if (!IS_ERR(tsk)) { io_init_new_worker(wqe, worker, tsk); io_worker_release(worker); @@ -804,7 +803,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) __set_current_state(TASK_RUNNING); - worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); + worker = kzalloc(sizeof(*worker), GFP_KERNEL); if (!worker) { fail: atomic_dec(&acct->nr_running); @@ -823,7 +822,7 @@ fail: if (index == IO_WQ_ACCT_BOUND) worker->flags |= IO_WORKER_F_BOUND; - tsk = create_io_thread(io_wqe_worker, worker, wqe->node); + tsk = create_io_thread(io_wqe_worker, worker, NUMA_NO_NODE); if (!IS_ERR(tsk)) { io_init_new_worker(wqe, worker, tsk); } else if (!io_should_retry_thread(PTR_ERR(tsk))) { @@ -961,7 +960,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) { - struct io_wqe *wqe = wq->wqes[numa_node_id()]; + struct io_wqe *wqe = &wq->wqe; io_wqe_enqueue(wqe, work); } @@ -1083,7 +1082,7 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, .data = data, .cancel_all = cancel_all, }; - int node; + struct io_wqe *wqe = &wq->wqe; /* * First check pending list, if we're lucky we can just remove it @@ -1098,19 +1097,15 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, * Do both of these while holding the wqe->lock, to ensure that * we'll find a work item regardless of state. */ - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - - io_wqe_cancel_pending_work(wqe, &match); - if (match.nr_pending && !match.cancel_all) - return IO_WQ_CANCEL_OK; + io_wqe_cancel_pending_work(wqe, &match); + if (match.nr_pending && !match.cancel_all) + return IO_WQ_CANCEL_OK; - raw_spin_lock(&wqe->lock); - io_wqe_cancel_running_work(wqe, &match); - raw_spin_unlock(&wqe->lock); - if (match.nr_running && !match.cancel_all) - return IO_WQ_CANCEL_RUNNING; - } + raw_spin_lock(&wqe->lock); + io_wqe_cancel_running_work(wqe, &match); + raw_spin_unlock(&wqe->lock); + if (match.nr_running && !match.cancel_all) + return IO_WQ_CANCEL_RUNNING; if (match.nr_running) return IO_WQ_CANCEL_RUNNING; @@ -1140,15 +1135,16 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) { - int ret, node, i; + int ret, i; struct io_wq *wq; + struct io_wqe *wqe; if (WARN_ON_ONCE(!data->free_work || !data->do_work)) return ERR_PTR(-EINVAL); if (WARN_ON_ONCE(!bounded)) return ERR_PTR(-EINVAL); - wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL); + wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); if (!wq) return ERR_PTR(-ENOMEM); ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); @@ -1159,40 +1155,30 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) wq->hash = data->hash; wq->free_work = data->free_work; wq->do_work = data->do_work; + wqe = &wq->wqe; ret = -ENOMEM; - for_each_node(node) { - struct io_wqe *wqe; - int alloc_node = node; - - if (!node_online(alloc_node)) - alloc_node = NUMA_NO_NODE; - wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); - if (!wqe) - goto err; - wq->wqes[node] = wqe; - if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL)) - goto err; - cpumask_copy(wqe->cpu_mask, cpumask_of_node(node)); - wqe->node = alloc_node; - wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; - wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = - task_rlimit(current, RLIMIT_NPROC); - INIT_LIST_HEAD(&wqe->wait.entry); - wqe->wait.func = io_wqe_hash_wake; - for (i = 0; i < IO_WQ_ACCT_NR; i++) { - struct io_wqe_acct *acct = &wqe->acct[i]; - - acct->index = i; - atomic_set(&acct->nr_running, 0); - INIT_WQ_LIST(&acct->work_list); - raw_spin_lock_init(&acct->lock); - } - wqe->wq = wq; - raw_spin_lock_init(&wqe->lock); - INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); - INIT_LIST_HEAD(&wqe->all_list); + + if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL)) + goto err; + cpumask_copy(wqe->cpu_mask, cpu_possible_mask); + wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; + wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = + task_rlimit(current, RLIMIT_NPROC); + INIT_LIST_HEAD(&wqe->wait.entry); + wqe->wait.func = io_wqe_hash_wake; + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + struct io_wqe_acct *acct = &wqe->acct[i]; + + acct->index = i; + atomic_set(&acct->nr_running, 0); + INIT_WQ_LIST(&acct->work_list); + raw_spin_lock_init(&acct->lock); } + wqe->wq = wq; + raw_spin_lock_init(&wqe->lock); + INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); + INIT_LIST_HEAD(&wqe->all_list); wq->task = get_task_struct(data->task); atomic_set(&wq->worker_refs, 1); @@ -1201,12 +1187,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) err: io_wq_put_hash(data->hash); cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); - for_each_node(node) { - if (!wq->wqes[node]) - continue; - free_cpumask_var(wq->wqes[node]->cpu_mask); - kfree(wq->wqes[node]); - } + + free_cpumask_var(wq->wqe.cpu_mask); err_wq: kfree(wq); return ERR_PTR(ret); @@ -1247,48 +1229,36 @@ static void io_wq_cancel_tw_create(struct io_wq *wq) static void io_wq_exit_workers(struct io_wq *wq) { - int node; - if (!wq->task) return; io_wq_cancel_tw_create(wq); rcu_read_lock(); - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - - io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL); - } + io_wq_for_each_worker(&wq->wqe, io_wq_worker_wake, NULL); rcu_read_unlock(); io_worker_ref_put(wq); wait_for_completion(&wq->worker_done); - for_each_node(node) { - spin_lock_irq(&wq->hash->wait.lock); - list_del_init(&wq->wqes[node]->wait.entry); - spin_unlock_irq(&wq->hash->wait.lock); - } + spin_lock_irq(&wq->hash->wait.lock); + list_del_init(&wq->wqe.wait.entry); + spin_unlock_irq(&wq->hash->wait.lock); + put_task_struct(wq->task); wq->task = NULL; } static void io_wq_destroy(struct io_wq *wq) { - int node; + struct io_cb_cancel_data match = { + .fn = io_wq_work_match_all, + .cancel_all = true, + }; + struct io_wqe *wqe = &wq->wqe; cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); - - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - struct io_cb_cancel_data match = { - .fn = io_wq_work_match_all, - .cancel_all = true, - }; - io_wqe_cancel_pending_work(wqe, &match); - free_cpumask_var(wqe->cpu_mask); - kfree(wqe); - } + io_wqe_cancel_pending_work(wqe, &match); + free_cpumask_var(wqe->cpu_mask); io_wq_put_hash(wq->hash); kfree(wq); } @@ -1323,11 +1293,9 @@ static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) .cpu = cpu, .online = online }; - int i; rcu_read_lock(); - for_each_node(i) - io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od); + io_wq_for_each_worker(&wq->wqe, io_wq_worker_affinity, &od); rcu_read_unlock(); return 0; } @@ -1348,18 +1316,15 @@ static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask) { - int i; + struct io_wqe *wqe = &wq->wqe; rcu_read_lock(); - for_each_node(i) { - struct io_wqe *wqe = wq->wqes[i]; - - if (mask) - cpumask_copy(wqe->cpu_mask, mask); - else - cpumask_copy(wqe->cpu_mask, cpumask_of_node(i)); - } + if (mask) + cpumask_copy(wqe->cpu_mask, mask); + else + cpumask_copy(wqe->cpu_mask, cpu_possible_mask); rcu_read_unlock(); + return 0; } @@ -1369,9 +1334,10 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask) */ int io_wq_max_workers(struct io_wq *wq, int *new_count) { + struct io_wqe *wqe = &wq->wqe; + struct io_wqe_acct *acct; int prev[IO_WQ_ACCT_NR]; - bool first_node = true; - int i, node; + int i; BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); @@ -1386,21 +1352,15 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count) prev[i] = 0; rcu_read_lock(); - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - struct io_wqe_acct *acct; - raw_spin_lock(&wqe->lock); - for (i = 0; i < IO_WQ_ACCT_NR; i++) { - acct = &wqe->acct[i]; - if (first_node) - prev[i] = max_t(int, acct->max_workers, prev[i]); - if (new_count[i]) - acct->max_workers = new_count[i]; - } - raw_spin_unlock(&wqe->lock); - first_node = false; + raw_spin_lock(&wqe->lock); + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + acct = &wqe->acct[i]; + prev[i] = max_t(int, acct->max_workers, prev[i]); + if (new_count[i]) + acct->max_workers = new_count[i]; } + raw_spin_unlock(&wqe->lock); rcu_read_unlock(); for (i = 0; i < IO_WQ_ACCT_NR; i++) |