diff options
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r-- | fs/io-wq.c | 429 |
1 files changed, 169 insertions, 260 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c index 0a5ab1a8f69a..cc5cf2209fb0 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -69,6 +69,8 @@ struct io_worker { #define IO_WQ_HASH_ORDER 5 #endif +#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) + struct io_wqe_acct { unsigned nr_workers; unsigned max_workers; @@ -98,6 +100,7 @@ struct io_wqe { struct list_head all_list; struct io_wq *wq; + struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; }; /* @@ -107,8 +110,7 @@ struct io_wq { struct io_wqe **wqes; unsigned long state; - get_work_fn *get_work; - put_work_fn *put_work; + free_work_fn *free_work; struct task_struct *manager; struct user_struct *user; @@ -376,26 +378,35 @@ static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) return __io_worker_unuse(wqe, worker); } -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash) +static inline unsigned int io_get_work_hash(struct io_wq_work *work) +{ + return work->flags >> IO_WQ_HASH_SHIFT; +} + +static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) __must_hold(wqe->lock) { struct io_wq_work_node *node, *prev; - struct io_wq_work *work; + struct io_wq_work *work, *tail; + unsigned int hash; wq_list_for_each(node, prev, &wqe->work_list) { work = container_of(node, struct io_wq_work, list); /* not hashed, can run anytime */ - if (!(work->flags & IO_WQ_WORK_HASHED)) { - wq_node_del(&wqe->work_list, node, prev); + if (!io_wq_is_hashed(work)) { + wq_list_del(&wqe->work_list, node, prev); return work; } /* hashed, can run if not already running */ - *hash = work->flags >> IO_WQ_HASH_SHIFT; - if (!(wqe->hash_map & BIT_ULL(*hash))) { - wqe->hash_map |= BIT_ULL(*hash); - wq_node_del(&wqe->work_list, node, prev); + hash = io_get_work_hash(work); + if (!(wqe->hash_map & BIT(hash))) { + wqe->hash_map |= BIT(hash); + /* all items with this hash lie in [work, tail] */ + tail = wqe->hash_tail[hash]; + wqe->hash_tail[hash] = NULL; + wq_list_cut(&wqe->work_list, &tail->list, prev); return work; } } @@ -440,16 +451,49 @@ static void io_wq_switch_creds(struct io_worker *worker, worker->saved_creds = old_creds; } +static void io_impersonate_work(struct io_worker *worker, + struct io_wq_work *work) +{ + if (work->files && current->files != work->files) { + task_lock(current); + current->files = work->files; + task_unlock(current); + } + if (work->fs && current->fs != work->fs) + current->fs = work->fs; + if (work->mm != worker->mm) + io_wq_switch_mm(worker, work); + if (worker->cur_creds != work->creds) + io_wq_switch_creds(worker, work); +} + +static void io_assign_current_work(struct io_worker *worker, + struct io_wq_work *work) +{ + if (work) { + /* flush pending signals before assigning new work */ + if (signal_pending(current)) + flush_signals(current); + cond_resched(); + } + + spin_lock_irq(&worker->lock); + worker->cur_work = work; + spin_unlock_irq(&worker->lock); +} + +static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); + static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { - struct io_wq_work *work, *old_work = NULL, *put_work = NULL; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; do { - unsigned hash = -1U; - + struct io_wq_work *work; + unsigned int hash; +get_next: /* * If we got some work, mark us as busy. If we didn't, but * the list isn't empty, it means we stalled on hashed work. @@ -457,120 +501,80 @@ static void io_worker_handle_work(struct io_worker *worker) * can't make progress, any work completion or insertion will * clear the stalled flag. */ - work = io_get_next_work(wqe, &hash); + work = io_get_next_work(wqe); if (work) __io_worker_busy(wqe, worker, work); else if (!wq_list_empty(&wqe->work_list)) wqe->flags |= IO_WQE_FLAG_STALLED; spin_unlock_irq(&wqe->lock); - if (put_work && wq->put_work) - wq->put_work(old_work); if (!work) break; -next: - /* flush any pending signals before assigning new work */ - if (signal_pending(current)) - flush_signals(current); - - cond_resched(); - - spin_lock_irq(&worker->lock); - worker->cur_work = work; - spin_unlock_irq(&worker->lock); - - if (work->flags & IO_WQ_WORK_CB) - work->func(&work); - - if (work->files && current->files != work->files) { - task_lock(current); - current->files = work->files; - task_unlock(current); - } - if (work->fs && current->fs != work->fs) - current->fs = work->fs; - if (work->mm != worker->mm) - io_wq_switch_mm(worker, work); - if (worker->cur_creds != work->creds) - io_wq_switch_creds(worker, work); - /* - * OK to set IO_WQ_WORK_CANCEL even for uncancellable work, - * the worker function will do the right thing. - */ - if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) - work->flags |= IO_WQ_WORK_CANCEL; - if (worker->mm) - work->flags |= IO_WQ_WORK_HAS_MM; - - if (wq->get_work && !(work->flags & IO_WQ_WORK_INTERNAL)) { - put_work = work; - wq->get_work(work); - } - - old_work = work; - work->func(&work); - - spin_lock_irq(&worker->lock); - worker->cur_work = NULL; - spin_unlock_irq(&worker->lock); - - spin_lock_irq(&wqe->lock); - - if (hash != -1U) { - wqe->hash_map &= ~BIT_ULL(hash); - wqe->flags &= ~IO_WQE_FLAG_STALLED; - } - if (work && work != old_work) { - spin_unlock_irq(&wqe->lock); - - if (put_work && wq->put_work) { - wq->put_work(put_work); - put_work = NULL; + io_assign_current_work(worker, work); + + /* handle a whole dependent link */ + do { + struct io_wq_work *old_work, *next_hashed, *linked; + + next_hashed = wq_next_work(work); + io_impersonate_work(worker, work); + /* + * OK to set IO_WQ_WORK_CANCEL even for uncancellable + * work, the worker function will do the right thing. + */ + if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) + work->flags |= IO_WQ_WORK_CANCEL; + + hash = io_get_work_hash(work); + linked = old_work = work; + linked->func(&linked); + linked = (old_work == linked) ? NULL : linked; + + work = next_hashed; + if (!work && linked && !io_wq_is_hashed(linked)) { + work = linked; + linked = NULL; } + io_assign_current_work(worker, work); + wq->free_work(old_work); + + if (linked) + io_wqe_enqueue(wqe, linked); + + if (hash != -1U && !next_hashed) { + spin_lock_irq(&wqe->lock); + wqe->hash_map &= ~BIT_ULL(hash); + wqe->flags &= ~IO_WQE_FLAG_STALLED; + /* dependent work is not hashed */ + hash = -1U; + /* skip unnecessary unlock-lock wqe->lock */ + if (!work) + goto get_next; + spin_unlock_irq(&wqe->lock); + } + } while (work); - /* dependent work not hashed */ - hash = -1U; - goto next; - } + spin_lock_irq(&wqe->lock); } while (1); } -static inline void io_worker_spin_for_work(struct io_wqe *wqe) -{ - int i = 0; - - while (++i < 1000) { - if (io_wqe_run_queue(wqe)) - break; - if (need_resched()) - break; - cpu_relax(); - } -} - static int io_wqe_worker(void *data) { struct io_worker *worker = data; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; - bool did_work; io_worker_start(wqe, worker); - did_work = false; while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { set_current_state(TASK_INTERRUPTIBLE); loop: - if (did_work) - io_worker_spin_for_work(wqe); spin_lock_irq(&wqe->lock); if (io_wqe_run_queue(wqe)) { __set_current_state(TASK_RUNNING); io_worker_handle_work(worker); - did_work = true; goto loop; } - did_work = false; /* drops the lock on success, retry */ if (__io_worker_idle(wqe, worker)) { __release(&wqe->lock); @@ -766,6 +770,40 @@ static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, return true; } +static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) +{ + struct io_wq *wq = wqe->wq; + + do { + struct io_wq_work *old_work = work; + + work->flags |= IO_WQ_WORK_CANCEL; + work->func(&work); + work = (work == old_work) ? NULL : work; + wq->free_work(old_work); + } while (work); +} + +static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) +{ + unsigned int hash; + struct io_wq_work *tail; + + if (!io_wq_is_hashed(work)) { +append: + wq_list_add_tail(&work->list, &wqe->work_list); + return; + } + + hash = io_get_work_hash(work); + tail = wqe->hash_tail[hash]; + wqe->hash_tail[hash] = work; + if (!tail) + goto append; + + wq_list_add_after(&work->list, &tail->list, &wqe->work_list); +} + static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { struct io_wqe_acct *acct = io_work_get_acct(wqe, work); @@ -779,14 +817,13 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) * It's close enough to not be an issue, fork() has the same delay. */ if (unlikely(!io_wq_can_queue(wqe, acct, work))) { - work->flags |= IO_WQ_WORK_CANCEL; - work->func(&work); + io_run_cancel(work, wqe); return; } work_flags = work->flags; spin_lock_irqsave(&wqe->lock, flags); - wq_list_add_tail(&work->list, &wqe->work_list); + io_wqe_insert_work(wqe, work); wqe->flags &= ~IO_WQE_FLAG_STALLED; spin_unlock_irqrestore(&wqe->lock, flags); @@ -803,19 +840,15 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) } /* - * Enqueue work, hashed by some key. Work items that hash to the same value - * will not be done in parallel. Used to limit concurrent writes, generally - * hashed by inode. + * Work items that hash to the same value will not be done in parallel. + * Used to limit concurrent writes, generally hashed by inode. */ -void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val) +void io_wq_hash_work(struct io_wq_work *work, void *val) { - struct io_wqe *wqe = wq->wqes[numa_node_id()]; - unsigned bit; - + unsigned int bit; bit = hash_ptr(val, IO_WQ_HASH_ORDER); work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); - io_wqe_enqueue(wqe, work); } static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) @@ -865,14 +898,13 @@ void io_wq_cancel_all(struct io_wq *wq) } struct io_cb_cancel_data { - struct io_wqe *wqe; - work_cancel_fn *cancel; - void *caller_data; + work_cancel_fn *fn; + void *data; }; -static bool io_work_cancel(struct io_worker *worker, void *cancel_data) +static bool io_wq_worker_cancel(struct io_worker *worker, void *data) { - struct io_cb_cancel_data *data = cancel_data; + struct io_cb_cancel_data *match = data; unsigned long flags; bool ret = false; @@ -883,84 +915,7 @@ static bool io_work_cancel(struct io_worker *worker, void *cancel_data) spin_lock_irqsave(&worker->lock, flags); if (worker->cur_work && !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) && - data->cancel(worker->cur_work, data->caller_data)) { - send_sig(SIGINT, worker->task, 1); - ret = true; - } - spin_unlock_irqrestore(&worker->lock, flags); - - return ret; -} - -static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe, - work_cancel_fn *cancel, - void *cancel_data) -{ - struct io_cb_cancel_data data = { - .wqe = wqe, - .cancel = cancel, - .caller_data = cancel_data, - }; - struct io_wq_work_node *node, *prev; - struct io_wq_work *work; - unsigned long flags; - bool found = false; - - spin_lock_irqsave(&wqe->lock, flags); - wq_list_for_each(node, prev, &wqe->work_list) { - work = container_of(node, struct io_wq_work, list); - - if (cancel(work, cancel_data)) { - wq_node_del(&wqe->work_list, node, prev); - found = true; - break; - } - } - spin_unlock_irqrestore(&wqe->lock, flags); - - if (found) { - work->flags |= IO_WQ_WORK_CANCEL; - work->func(&work); - return IO_WQ_CANCEL_OK; - } - - rcu_read_lock(); - found = io_wq_for_each_worker(wqe, io_work_cancel, &data); - rcu_read_unlock(); - return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; -} - -enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, - void *data) -{ - enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; - int node; - - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - - ret = io_wqe_cancel_cb_work(wqe, cancel, data); - if (ret != IO_WQ_CANCEL_NOTFOUND) - break; - } - - return ret; -} - -struct work_match { - bool (*fn)(struct io_wq_work *, void *data); - void *data; -}; - -static bool io_wq_worker_cancel(struct io_worker *worker, void *data) -{ - struct work_match *match = data; - unsigned long flags; - bool ret = false; - - spin_lock_irqsave(&worker->lock, flags); - if (match->fn(worker->cur_work, match->data) && - !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL)) { + match->fn(worker->cur_work, match->data)) { send_sig(SIGINT, worker->task, 1); ret = true; } @@ -970,7 +925,7 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data) } static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, - struct work_match *match) + struct io_cb_cancel_data *match) { struct io_wq_work_node *node, *prev; struct io_wq_work *work; @@ -987,7 +942,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, work = container_of(node, struct io_wq_work, list); if (match->fn(work, match->data)) { - wq_node_del(&wqe->work_list, node, prev); + wq_list_del(&wqe->work_list, node, prev); found = true; break; } @@ -995,8 +950,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, spin_unlock_irqrestore(&wqe->lock, flags); if (found) { - work->flags |= IO_WQ_WORK_CANCEL; - work->func(&work); + io_run_cancel(work, wqe); return IO_WQ_CANCEL_OK; } @@ -1012,22 +966,16 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; } -static bool io_wq_work_match(struct io_wq_work *work, void *data) -{ - return work == data; -} - -enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) +enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, + void *data) { - struct work_match match = { - .fn = io_wq_work_match, - .data = cwork + struct io_cb_cancel_data match = { + .fn = cancel, + .data = data, }; enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; int node; - cwork->flags |= IO_WQ_WORK_CANCEL; - for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; @@ -1039,69 +987,28 @@ enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) return ret; } -static bool io_wq_pid_match(struct io_wq_work *work, void *data) +static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data) { - pid_t pid = (pid_t) (unsigned long) data; - - if (work) - return work->task_pid == pid; - return false; + return work == data; } -enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid) +enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) { - struct work_match match = { - .fn = io_wq_pid_match, - .data = (void *) (unsigned long) pid - }; - enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; - int node; - - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - - ret = io_wqe_cancel_work(wqe, &match); - if (ret != IO_WQ_CANCEL_NOTFOUND) - break; - } - - return ret; + return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork); } -struct io_wq_flush_data { - struct io_wq_work work; - struct completion done; -}; - -static void io_wq_flush_func(struct io_wq_work **workptr) +static bool io_wq_pid_match(struct io_wq_work *work, void *data) { - struct io_wq_work *work = *workptr; - struct io_wq_flush_data *data; + pid_t pid = (pid_t) (unsigned long) data; - data = container_of(work, struct io_wq_flush_data, work); - complete(&data->done); + return work->task_pid == pid; } -/* - * Doesn't wait for previously queued work to finish. When this completes, - * it just means that previously queued work was started. - */ -void io_wq_flush(struct io_wq *wq) +enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid) { - struct io_wq_flush_data data; - int node; + void *data = (void *) (unsigned long) pid; - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - - if (!node_online(node)) - continue; - init_completion(&data.done); - INIT_IO_WORK(&data.work, io_wq_flush_func); - data.work.flags |= IO_WQ_WORK_INTERNAL; - io_wqe_enqueue(wqe, &data.work); - wait_for_completion(&data.done); - } + return io_wq_cancel_cb(wq, io_wq_pid_match, data); } struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) @@ -1109,6 +1016,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) int ret = -ENOMEM, node; struct io_wq *wq; + if (WARN_ON_ONCE(!data->free_work)) + return ERR_PTR(-EINVAL); + wq = kzalloc(sizeof(*wq), GFP_KERNEL); if (!wq) return ERR_PTR(-ENOMEM); @@ -1119,8 +1029,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) return ERR_PTR(-ENOMEM); } - wq->get_work = data->get_work; - wq->put_work = data->put_work; + wq->free_work = data->free_work; /* caller must already hold a reference to this */ wq->user = data->user; @@ -1177,7 +1086,7 @@ err: bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) { - if (data->get_work != wq->get_work || data->put_work != wq->put_work) + if (data->free_work != wq->free_work) return false; return refcount_inc_not_zero(&wq->use_refs); |