summaryrefslogtreecommitdiffstats
path: root/fs/io-wq.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r--fs/io-wq.c94
1 files changed, 60 insertions, 34 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index dac5c5961c9d..c51691262208 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -15,6 +15,7 @@
#include <linux/cpu.h>
#include <linux/tracehook.h>
#include <linux/audit.h>
+#include <uapi/linux/io_uring.h>
#include "io-wq.h"
@@ -140,6 +141,7 @@ static void io_wqe_dec_running(struct io_worker *worker);
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
struct io_wqe_acct *acct,
struct io_cb_cancel_data *match);
+static void create_worker_cb(struct callback_head *cb);
static bool io_worker_get(struct io_worker *worker)
{
@@ -174,20 +176,52 @@ static void io_worker_ref_put(struct io_wq *wq)
complete(&wq->worker_done);
}
+static void io_worker_cancel_cb(struct io_worker *worker)
+{
+ struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+ struct io_wqe *wqe = worker->wqe;
+ struct io_wq *wq = wqe->wq;
+
+ atomic_dec(&acct->nr_running);
+ raw_spin_lock(&worker->wqe->lock);
+ acct->nr_workers--;
+ raw_spin_unlock(&worker->wqe->lock);
+ io_worker_ref_put(wq);
+ clear_bit_unlock(0, &worker->create_state);
+ io_worker_release(worker);
+}
+
+static bool io_task_worker_match(struct callback_head *cb, void *data)
+{
+ struct io_worker *worker;
+
+ if (cb->func != create_worker_cb)
+ return false;
+ worker = container_of(cb, struct io_worker, create_work);
+ return worker == data;
+}
+
static void io_worker_exit(struct io_worker *worker)
{
struct io_wqe *wqe = worker->wqe;
- struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+ struct io_wq *wq = wqe->wq;
- if (refcount_dec_and_test(&worker->ref))
- complete(&worker->ref_done);
+ while (1) {
+ struct callback_head *cb = task_work_cancel_match(wq->task,
+ io_task_worker_match, worker);
+
+ if (!cb)
+ break;
+ io_worker_cancel_cb(worker);
+ }
+
+ io_worker_release(worker);
wait_for_completion(&worker->ref_done);
raw_spin_lock(&wqe->lock);
if (worker->flags & IO_WORKER_F_FREE)
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
- acct->nr_workers--;
preempt_disable();
io_wqe_dec_running(worker);
worker->flags = 0;
@@ -247,8 +281,6 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
*/
static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
- bool do_create = false;
-
/*
* Most likely an attempt to queue unbounded work on an io_wq that
* wasn't setup with any unbounded workers.
@@ -257,18 +289,15 @@ static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
pr_warn_once("io-wq is not configured for unbound workers");
raw_spin_lock(&wqe->lock);
- if (acct->nr_workers < acct->max_workers) {
- acct->nr_workers++;
- do_create = true;
+ if (acct->nr_workers >= acct->max_workers) {
+ raw_spin_unlock(&wqe->lock);
+ return true;
}
+ acct->nr_workers++;
raw_spin_unlock(&wqe->lock);
- if (do_create) {
- atomic_inc(&acct->nr_running);
- atomic_inc(&wqe->wq->worker_refs);
- return create_io_worker(wqe->wq, wqe, acct->index);
- }
-
- return true;
+ atomic_inc(&acct->nr_running);
+ atomic_inc(&wqe->wq->worker_refs);
+ return create_io_worker(wqe->wq, wqe, acct->index);
}
static void io_wqe_inc_running(struct io_worker *worker)
@@ -330,8 +359,10 @@ static bool io_queue_worker_create(struct io_worker *worker,
init_task_work(&worker->create_work, func);
worker->create_index = acct->index;
- if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL))
+ if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
+ clear_bit_unlock(0, &worker->create_state);
return true;
+ }
clear_bit_unlock(0, &worker->create_state);
fail_release:
io_worker_release(worker);
@@ -577,6 +608,7 @@ loop:
}
/* timed out, exit unless we're the last worker */
if (last_timeout && acct->nr_workers > 1) {
+ acct->nr_workers--;
raw_spin_unlock(&wqe->lock);
__set_current_state(TASK_RUNNING);
break;
@@ -592,9 +624,7 @@ loop:
if (!get_signal(&ksig))
continue;
- if (fatal_signal_pending(current))
- break;
- continue;
+ break;
}
last_timeout = !ret;
}
@@ -727,11 +757,8 @@ static void io_workqueue_create(struct work_struct *work)
struct io_worker *worker = container_of(work, struct io_worker, work);
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
- if (!io_queue_worker_create(worker, acct, create_worker_cont)) {
- clear_bit_unlock(0, &worker->create_state);
- io_worker_release(worker);
+ if (!io_queue_worker_create(worker, acct, create_worker_cont))
kfree(worker);
- }
}
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
@@ -1161,17 +1188,9 @@ static void io_wq_exit_workers(struct io_wq *wq)
while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
struct io_worker *worker;
- struct io_wqe_acct *acct;
worker = container_of(cb, struct io_worker, create_work);
- acct = io_wqe_get_acct(worker);
- atomic_dec(&acct->nr_running);
- raw_spin_lock(&worker->wqe->lock);
- acct->nr_workers--;
- raw_spin_unlock(&worker->wqe->lock);
- io_worker_ref_put(wq);
- clear_bit_unlock(0, &worker->create_state);
- io_worker_release(worker);
+ io_worker_cancel_cb(worker);
}
rcu_read_lock();
@@ -1291,6 +1310,10 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
int i, node, prev = 0;
+ BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
+ BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
+ BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
+
for (i = 0; i < 2; i++) {
if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
new_count[i] = task_rlimit(current, RLIMIT_NPROC);
@@ -1298,15 +1321,18 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
rcu_read_lock();
for_each_node(node) {
+ struct io_wqe *wqe = wq->wqes[node];
struct io_wqe_acct *acct;
+ raw_spin_lock(&wqe->lock);
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
- acct = &wq->wqes[node]->acct[i];
+ acct = &wqe->acct[i];
prev = max_t(int, acct->max_workers, prev);
if (new_count[i])
acct->max_workers = new_count[i];
new_count[i] = prev;
}
+ raw_spin_unlock(&wqe->lock);
}
rcu_read_unlock();
return 0;