From e61df66c69b11bc050d233dc95714a6339192c28 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Wed, 13 Nov 2019 13:54:49 -0700 Subject: io-wq: ensure free/busy list browsing see all items We have two lists for workers in io-wq, a busy and a free list. For certain operations we want to browse all workers, and we currently do that by browsing the two separate lists. But since these lists are RCU protected, we can potentially miss workers if they move between the two lists while we're browsing them. Add a third list, all_list, that simply holds all workers. A worker is added to that list when it starts, and removed when it exits. This makes the worker iteration cleaner, too. Reported-by: Paul E. McKenney Reviewed-by: Paul E. McKenney Signed-off-by: Jens Axboe --- fs/io-wq.c | 41 +++++++++++------------------------------ 1 file changed, 11 insertions(+), 30 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 4031b75541be..fcb6c74209da 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -46,6 +46,7 @@ struct io_worker { refcount_t ref; unsigned flags; struct hlist_nulls_node nulls_node; + struct list_head all_list; struct task_struct *task; wait_queue_head_t wait; struct io_wqe *wqe; @@ -96,6 +97,7 @@ struct io_wqe { struct io_wq_nulls_list free_list; struct io_wq_nulls_list busy_list; + struct list_head all_list; struct io_wq *wq; }; @@ -212,6 +214,7 @@ static void io_worker_exit(struct io_worker *worker) spin_lock_irq(&wqe->lock); hlist_nulls_del_rcu(&worker->nulls_node); + list_del_rcu(&worker->all_list); if (__io_worker_unuse(wqe, worker)) { __release(&wqe->lock); spin_lock_irq(&wqe->lock); @@ -590,6 +593,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) spin_lock_irq(&wqe->lock); hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list.head); + list_add_tail_rcu(&worker->all_list, &wqe->all_list); worker->flags |= IO_WORKER_F_FREE; if (index == IO_WQ_ACCT_BOUND) worker->flags |= IO_WORKER_F_BOUND; @@ -733,16 +737,13 @@ static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) * worker that isn't exiting */ static bool io_wq_for_each_worker(struct io_wqe *wqe, - struct io_wq_nulls_list *list, bool (*func)(struct io_worker *, void *), void *data) { - struct hlist_nulls_node *n; struct io_worker *worker; bool ret = false; -restart: - hlist_nulls_for_each_entry_rcu(worker, n, &list->head, nulls_node) { + list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { if (io_worker_get(worker)) { ret = func(worker, data); io_worker_release(worker); @@ -750,8 +751,7 @@ restart: break; } } - if (!ret && get_nulls_value(n) != list->nulls) - goto restart; + return ret; } @@ -769,10 +769,7 @@ void io_wq_cancel_all(struct io_wq *wq) for (i = 0; i < wq->nr_wqes; i++) { struct io_wqe *wqe = wq->wqes[i]; - io_wq_for_each_worker(wqe, &wqe->busy_list, - io_wqe_worker_send_sig, NULL); - io_wq_for_each_worker(wqe, &wqe->free_list, - io_wqe_worker_send_sig, NULL); + io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL); } rcu_read_unlock(); } @@ -834,14 +831,7 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe, } rcu_read_lock(); - found = io_wq_for_each_worker(wqe, &wqe->free_list, io_work_cancel, - &data); - if (found) - goto done; - - found = io_wq_for_each_worker(wqe, &wqe->busy_list, io_work_cancel, - &data); -done: + found = io_wq_for_each_worker(wqe, io_work_cancel, &data); rcu_read_unlock(); return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; } @@ -919,14 +909,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, * completion will run normally in this case. */ rcu_read_lock(); - found = io_wq_for_each_worker(wqe, &wqe->free_list, io_wq_worker_cancel, - cwork); - if (found) - goto done; - - found = io_wq_for_each_worker(wqe, &wqe->busy_list, io_wq_worker_cancel, - cwork); -done: + found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, cwork); rcu_read_unlock(); return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; } @@ -1030,6 +1013,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm, wqe->free_list.nulls = 0; INIT_HLIST_NULLS_HEAD(&wqe->busy_list.head, 1); wqe->busy_list.nulls = 1; + INIT_LIST_HEAD(&wqe->all_list); i++; } @@ -1077,10 +1061,7 @@ void io_wq_destroy(struct io_wq *wq) if (!wqe) continue; - io_wq_for_each_worker(wqe, &wqe->free_list, io_wq_worker_wake, - NULL); - io_wq_for_each_worker(wqe, &wqe->busy_list, io_wq_worker_wake, - NULL); + io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL); } rcu_read_unlock(); -- cgit v1.2.3