summaryrefslogtreecommitdiffstats
path: root/fs/io-wq.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r--fs/io-wq.c91
1 files changed, 56 insertions, 35 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index a7763127f884..bb7f161bb19c 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -48,7 +48,8 @@ struct io_worker {
struct io_wqe *wqe;
struct io_wq_work *cur_work;
- spinlock_t lock;
+ struct io_wq_work *next_work;
+ raw_spinlock_t lock;
struct completion ref_done;
@@ -405,8 +406,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
* Worker will start processing some work. Move it to the busy list, if
* it's currently on the freelist
*/
-static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
- struct io_wq_work *work)
+static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker)
__must_hold(wqe->lock)
{
if (worker->flags & IO_WORKER_F_FREE) {
@@ -529,9 +529,10 @@ static void io_assign_current_work(struct io_worker *worker,
cond_resched();
}
- spin_lock(&worker->lock);
+ raw_spin_lock(&worker->lock);
worker->cur_work = work;
- spin_unlock(&worker->lock);
+ worker->next_work = NULL;
+ raw_spin_unlock(&worker->lock);
}
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
@@ -546,7 +547,7 @@ static void io_worker_handle_work(struct io_worker *worker)
do {
struct io_wq_work *work;
-get_next:
+
/*
* If we got some work, mark us as busy. If we didn't, but
* the list isn't empty, it means we stalled on hashed work.
@@ -555,9 +556,20 @@ get_next:
* clear the stalled flag.
*/
work = io_get_next_work(acct, worker);
- if (work)
- __io_worker_busy(wqe, worker, work);
-
+ if (work) {
+ __io_worker_busy(wqe, worker);
+
+ /*
+ * Make sure cancelation can find this, even before
+ * it becomes the active work. That avoids a window
+ * where the work has been removed from our general
+ * work list, but isn't yet discoverable as the
+ * current work item for this worker.
+ */
+ raw_spin_lock(&worker->lock);
+ worker->next_work = work;
+ raw_spin_unlock(&worker->lock);
+ }
raw_spin_unlock(&wqe->lock);
if (!work)
break;
@@ -594,11 +606,6 @@ get_next:
spin_unlock_irq(&wq->hash->wait.lock);
if (wq_has_sleeper(&wq->hash->wait))
wake_up(&wq->hash->wait);
- raw_spin_lock(&wqe->lock);
- /* skip unnecessary unlock-lock wqe->lock */
- if (!work)
- goto get_next;
- raw_spin_unlock(&wqe->lock);
}
} while (work);
@@ -815,7 +822,7 @@ fail:
refcount_set(&worker->ref, 1);
worker->wqe = wqe;
- spin_lock_init(&worker->lock);
+ raw_spin_lock_init(&worker->lock);
init_completion(&worker->ref_done);
if (index == IO_WQ_ACCT_BOUND)
@@ -973,6 +980,19 @@ void io_wq_hash_work(struct io_wq_work *work, void *val)
work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
}
+static bool __io_wq_worker_cancel(struct io_worker *worker,
+ struct io_cb_cancel_data *match,
+ struct io_wq_work *work)
+{
+ if (work && match->fn(work, match->data)) {
+ work->flags |= IO_WQ_WORK_CANCEL;
+ set_notify_signal(worker->task);
+ return true;
+ }
+
+ return false;
+}
+
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
{
struct io_cb_cancel_data *match = data;
@@ -981,13 +1001,11 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
* Hold the lock to avoid ->cur_work going out of scope, caller
* may dereference the passed in work.
*/
- spin_lock(&worker->lock);
- if (worker->cur_work &&
- match->fn(worker->cur_work, match->data)) {
- set_notify_signal(worker->task);
+ raw_spin_lock(&worker->lock);
+ if (__io_wq_worker_cancel(worker, match, worker->cur_work) ||
+ __io_wq_worker_cancel(worker, match, worker->next_work))
match->nr_running++;
- }
- spin_unlock(&worker->lock);
+ raw_spin_unlock(&worker->lock);
return match->nr_running && !match->cancel_all;
}
@@ -1039,17 +1057,16 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
{
int i;
retry:
- raw_spin_lock(&wqe->lock);
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
if (io_acct_cancel_pending_work(wqe, acct, match)) {
+ raw_spin_lock(&wqe->lock);
if (match->cancel_all)
goto retry;
- return;
+ break;
}
}
- raw_spin_unlock(&wqe->lock);
}
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
@@ -1074,25 +1091,27 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
* First check pending list, if we're lucky we can just remove it
* from there. CANCEL_OK means that the work is returned as-new,
* no completion will be posted for it.
- */
- for_each_node(node) {
- struct io_wqe *wqe = wq->wqes[node];
-
- io_wqe_cancel_pending_work(wqe, &match);
- if (match.nr_pending && !match.cancel_all)
- return IO_WQ_CANCEL_OK;
- }
-
- /*
- * Now check if a free (going busy) or busy worker has the work
+ *
+ * Then check if a free (going busy) or busy worker has the work
* currently running. If we find it there, we'll return CANCEL_RUNNING
* as an indication that we attempt to signal cancellation. The
* completion will run normally in this case.
+ *
+ * Do both of these while holding the wqe->lock, to ensure that
+ * we'll find a work item regardless of state.
*/
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
+ raw_spin_lock(&wqe->lock);
+ io_wqe_cancel_pending_work(wqe, &match);
+ if (match.nr_pending && !match.cancel_all) {
+ raw_spin_unlock(&wqe->lock);
+ return IO_WQ_CANCEL_OK;
+ }
+
io_wqe_cancel_running_work(wqe, &match);
+ raw_spin_unlock(&wqe->lock);
if (match.nr_running && !match.cancel_all)
return IO_WQ_CANCEL_RUNNING;
}
@@ -1263,7 +1282,9 @@ static void io_wq_destroy(struct io_wq *wq)
.fn = io_wq_work_match_all,
.cancel_all = true,
};
+ raw_spin_lock(&wqe->lock);
io_wqe_cancel_pending_work(wqe, &match);
+ raw_spin_unlock(&wqe->lock);
free_cpumask_var(wqe->cpu_mask);
kfree(wqe);
}