summaryrefslogtreecommitdiffstats
path: root/io_uring/io-wq.c
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring/io-wq.c')
-rw-r--r--io_uring/io-wq.c39
1 files changed, 22 insertions, 17 deletions
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index d1c47a9d9215..f1e7c670add8 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -23,6 +23,7 @@
#include "io_uring.h"
#define WORKER_IDLE_TIMEOUT (5 * HZ)
+#define WORKER_INIT_LIMIT 3
enum {
IO_WORKER_F_UP = 0, /* up and active */
@@ -58,6 +59,7 @@ struct io_worker {
unsigned long create_state;
struct callback_head create_work;
+ int init_retries;
union {
struct rcu_head rcu;
@@ -159,7 +161,7 @@ static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound)
static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
struct io_wq_work *work)
{
- return io_get_acct(wq, !(work->flags & IO_WQ_WORK_UNBOUND));
+ return io_get_acct(wq, !(atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND));
}
static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
@@ -451,7 +453,7 @@ static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker)
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
- return work->flags >> IO_WQ_HASH_SHIFT;
+ return atomic_read(&work->flags) >> IO_WQ_HASH_SHIFT;
}
static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
@@ -592,8 +594,9 @@ static void io_worker_handle_work(struct io_wq_acct *acct,
next_hashed = wq_next_work(work);
- if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
- work->flags |= IO_WQ_WORK_CANCEL;
+ if (do_kill &&
+ (atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND))
+ atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
wq->do_work(work);
io_assign_current_work(worker, NULL);
@@ -744,7 +747,7 @@ static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
return true;
}
-static inline bool io_should_retry_thread(long err)
+static inline bool io_should_retry_thread(struct io_worker *worker, long err)
{
/*
* Prevent perpetual task_work retry, if the task (or its group) is
@@ -752,6 +755,8 @@ static inline bool io_should_retry_thread(long err)
*/
if (fatal_signal_pending(current))
return false;
+ if (worker->init_retries++ >= WORKER_INIT_LIMIT)
+ return false;
switch (err) {
case -EAGAIN:
@@ -778,7 +783,7 @@ static void create_worker_cont(struct callback_head *cb)
io_init_new_worker(wq, worker, tsk);
io_worker_release(worker);
return;
- } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
+ } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
struct io_wq_acct *acct = io_wq_get_acct(worker);
atomic_dec(&acct->nr_running);
@@ -845,7 +850,7 @@ fail:
tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
if (!IS_ERR(tsk)) {
io_init_new_worker(wq, worker, tsk);
- } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
+ } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
kfree(worker);
goto fail;
} else {
@@ -891,7 +896,7 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
{
do {
- work->flags |= IO_WQ_WORK_CANCEL;
+ atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
wq->do_work(work);
work = wq->free_work(work);
} while (work);
@@ -926,8 +931,12 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
struct io_wq_acct *acct = io_work_get_acct(wq, work);
- unsigned long work_flags = work->flags;
- struct io_cb_cancel_data match;
+ unsigned int work_flags = atomic_read(&work->flags);
+ struct io_cb_cancel_data match = {
+ .fn = io_wq_work_match_item,
+ .data = work,
+ .cancel_all = false,
+ };
bool do_create;
/*
@@ -935,7 +944,7 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
* been marked as one that should not get executed, cancel it here.
*/
if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
- (work->flags & IO_WQ_WORK_CANCEL)) {
+ (work_flags & IO_WQ_WORK_CANCEL)) {
io_run_cancel(work, wq);
return;
}
@@ -965,10 +974,6 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
raw_spin_unlock(&wq->lock);
/* fatal condition, failed to create the first worker */
- match.fn = io_wq_work_match_item,
- match.data = work,
- match.cancel_all = false,
-
io_acct_cancel_pending_work(wq, acct, &match);
}
}
@@ -982,7 +987,7 @@ void io_wq_hash_work(struct io_wq_work *work, void *val)
unsigned int bit;
bit = hash_ptr(val, IO_WQ_HASH_ORDER);
- work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
+ atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags);
}
static bool __io_wq_worker_cancel(struct io_worker *worker,
@@ -990,7 +995,7 @@ static bool __io_wq_worker_cancel(struct io_worker *worker,
struct io_wq_work *work)
{
if (work && match->fn(work, match->data)) {
- work->flags |= IO_WQ_WORK_CANCEL;
+ atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
__set_notify_signal(worker->task);
return true;
}