summaryrefslogtreecommitdiffstats
path: root/kernel/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r--kernel/workqueue.c936
1 files changed, 833 insertions, 103 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 4c31fde092c6..0ad46523b423 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -34,17 +34,25 @@
#include <linux/debug_locks.h>
#include <linux/lockdep.h>
#include <linux/idr.h>
-#include <linux/delay.h>
+
+#include "workqueue_sched.h"
enum {
/* global_cwq flags */
+ GCWQ_MANAGE_WORKERS = 1 << 0, /* need to manage workers */
+ GCWQ_MANAGING_WORKERS = 1 << 1, /* managing workers */
+ GCWQ_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
GCWQ_FREEZING = 1 << 3, /* freeze in progress */
/* worker flags */
WORKER_STARTED = 1 << 0, /* started */
WORKER_DIE = 1 << 1, /* die die die */
WORKER_IDLE = 1 << 2, /* is idle */
+ WORKER_PREP = 1 << 3, /* preparing to run works */
WORKER_ROGUE = 1 << 4, /* not bound to any cpu */
+ WORKER_REBIND = 1 << 5, /* mom is home, come back */
+
+ WORKER_NOT_RUNNING = WORKER_PREP | WORKER_ROGUE | WORKER_REBIND,
/* gcwq->trustee_state */
TRUSTEE_START = 0, /* start */
@@ -57,7 +65,19 @@ enum {
BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER,
BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1,
+ MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */
+ IDLE_WORKER_TIMEOUT = 300 * HZ, /* keep idle ones for 5 mins */
+
+ MAYDAY_INITIAL_TIMEOUT = HZ / 100, /* call for help after 10ms */
+ MAYDAY_INTERVAL = HZ / 10, /* and then every 100ms */
+ CREATE_COOLDOWN = HZ, /* time to breath after fail */
TRUSTEE_COOLDOWN = HZ / 10, /* for trustee draining */
+
+ /*
+ * Rescue workers are used only on emergencies and shared by
+ * all cpus. Give -20.
+ */
+ RESCUER_NICE_LEVEL = -20,
};
/*
@@ -65,8 +85,16 @@ enum {
*
* I: Set during initialization and read-only afterwards.
*
+ * P: Preemption protected. Disabling preemption is enough and should
+ * only be modified and accessed from the local cpu.
+ *
* L: gcwq->lock protected. Access with gcwq->lock held.
*
+ * X: During normal operation, modification requires gcwq->lock and
+ * should be done only from local cpu. Either disabling preemption
+ * on local cpu or grabbing gcwq->lock is enough for read access.
+ * While trustee is in charge, it's identical to L.
+ *
* F: wq->flush_mutex protected.
*
* W: workqueue_lock protected.
@@ -74,6 +102,10 @@ enum {
struct global_cwq;
+/*
+ * The poor guys doing the actual heavy lifting. All on-duty workers
+ * are either serving the manager role, on idle list or on busy hash.
+ */
struct worker {
/* on idle list while idle, on busy hash table while busy */
union {
@@ -86,12 +118,17 @@ struct worker {
struct list_head scheduled; /* L: scheduled works */
struct task_struct *task; /* I: worker task */
struct global_cwq *gcwq; /* I: the associated gcwq */
- unsigned int flags; /* L: flags */
+ /* 64 bytes boundary on 64bit, 32 on 32bit */
+ unsigned long last_active; /* L: last active timestamp */
+ unsigned int flags; /* X: flags */
int id; /* I: worker id */
+ struct work_struct rebind_work; /* L: rebind worker to cpu */
};
/*
- * Global per-cpu workqueue.
+ * Global per-cpu workqueue. There's one and only one for each cpu
+ * and all works are queued and processed here regardless of their
+ * target workqueues.
*/
struct global_cwq {
spinlock_t lock; /* the gcwq lock */
@@ -103,15 +140,19 @@ struct global_cwq {
int nr_idle; /* L: currently idle ones */
/* workers are chained either in the idle_list or busy_hash */
- struct list_head idle_list; /* L: list of idle workers */
+ struct list_head idle_list; /* X: list of idle workers */
struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];
/* L: hash of busy workers */
+ struct timer_list idle_timer; /* L: worker idle timeout */
+ struct timer_list mayday_timer; /* L: SOS timer for dworkers */
+
struct ida worker_ida; /* L: for worker IDs */
struct task_struct *trustee; /* L: for gcwq shutdown */
unsigned int trustee_state; /* L: trustee state */
wait_queue_head_t trustee_wait; /* trustee wait */
+ struct worker *first_idle; /* L: first idle worker */
} ____cacheline_aligned_in_smp;
/*
@@ -121,7 +162,6 @@ struct global_cwq {
*/
struct cpu_workqueue_struct {
struct global_cwq *gcwq; /* I: the associated gcwq */
- struct worker *worker;
struct workqueue_struct *wq; /* I: the owning workqueue */
int work_color; /* L: current color */
int flush_color; /* L: flushing color */
@@ -160,6 +200,9 @@ struct workqueue_struct {
unsigned long single_cpu; /* cpu for single cpu wq */
+ cpumask_var_t mayday_mask; /* cpus requesting rescue */
+ struct worker *rescuer; /* I: rescue worker */
+
int saved_max_active; /* I: saved cwq max_active */
const char *name; /* I: workqueue name */
#ifdef CONFIG_LOCKDEP
@@ -286,7 +329,13 @@ static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);
static bool workqueue_freezing; /* W: have wqs started freezing? */
+/*
+ * The almighty global cpu workqueues. nr_running is the only field
+ * which is expected to be used frequently by other cpus via
+ * try_to_wake_up(). Put it in a separate cacheline.
+ */
static DEFINE_PER_CPU(struct global_cwq, global_cwq);
+static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
static int worker_thread(void *__worker);
@@ -295,6 +344,11 @@ static struct global_cwq *get_gcwq(unsigned int cpu)
return &per_cpu(global_cwq, cpu);
}
+static atomic_t *get_gcwq_nr_running(unsigned int cpu)
+{
+ return &per_cpu(gcwq_nr_running, cpu);
+}
+
static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
struct workqueue_struct *wq)
{
@@ -385,6 +439,63 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
return get_gcwq(cpu);
}
+/*
+ * Policy functions. These define the policies on how the global
+ * worker pool is managed. Unless noted otherwise, these functions
+ * assume that they're being called with gcwq->lock held.
+ */
+
+/*
+ * Need to wake up a worker? Called from anything but currently
+ * running workers.
+ */
+static bool need_more_worker(struct global_cwq *gcwq)
+{
+ atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
+
+ return !list_empty(&gcwq->worklist) && !atomic_read(nr_running);
+}
+
+/* Can I start working? Called from busy but !running workers. */
+static bool may_start_working(struct global_cwq *gcwq)
+{
+ return gcwq->nr_idle;
+}
+
+/* Do I need to keep working? Called from currently running workers. */
+static bool keep_working(struct global_cwq *gcwq)
+{
+ atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
+
+ return !list_empty(&gcwq->worklist) && atomic_read(nr_running) <= 1;
+}
+
+/* Do we need a new worker? Called from manager. */
+static bool need_to_create_worker(struct global_cwq *gcwq)
+{
+ return need_more_worker(gcwq) && !may_start_working(gcwq);
+}
+
+/* Do I need to be the manager? */
+static bool need_to_manage_workers(struct global_cwq *gcwq)
+{
+ return need_to_create_worker(gcwq) || gcwq->flags & GCWQ_MANAGE_WORKERS;
+}
+
+/* Do we have too many workers and should some go away? */
+static bool too_many_workers(struct global_cwq *gcwq)
+{
+ bool managing = gcwq->flags & GCWQ_MANAGING_WORKERS;
+ int nr_idle = gcwq->nr_idle + managing; /* manager is considered idle */
+ int nr_busy = gcwq->nr_workers - nr_idle;
+
+ return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
+}
+
+/*
+ * Wake up functions.
+ */
+
/* Return the first worker. Safe with preemption disabled */
static struct worker *first_worker(struct global_cwq *gcwq)
{
@@ -412,12 +523,77 @@ static void wake_up_worker(struct global_cwq *gcwq)
}
/**
- * worker_set_flags - set worker flags
+ * wq_worker_waking_up - a worker is waking up
+ * @task: task waking up
+ * @cpu: CPU @task is waking up to
+ *
+ * This function is called during try_to_wake_up() when a worker is
+ * being awoken.
+ *
+ * CONTEXT:
+ * spin_lock_irq(rq->lock)
+ */
+void wq_worker_waking_up(struct task_struct *task, unsigned int cpu)
+{
+ struct worker *worker = kthread_data(task);
+
+ if (likely(!(worker->flags & WORKER_NOT_RUNNING)))
+ atomic_inc(get_gcwq_nr_running(cpu));
+}
+
+/**
+ * wq_worker_sleeping - a worker is going to sleep
+ * @task: task going to sleep
+ * @cpu: CPU in question, must be the current CPU number
+ *
+ * This function is called during schedule() when a busy worker is
+ * going to sleep. Worker on the same cpu can be woken up by
+ * returning pointer to its task.
+ *
+ * CONTEXT:
+ * spin_lock_irq(rq->lock)
+ *
+ * RETURNS:
+ * Worker task on @cpu to wake up, %NULL if none.
+ */
+struct task_struct *wq_worker_sleeping(struct task_struct *task,
+ unsigned int cpu)
+{
+ struct worker *worker = kthread_data(task), *to_wakeup = NULL;
+ struct global_cwq *gcwq = get_gcwq(cpu);
+ atomic_t *nr_running = get_gcwq_nr_running(cpu);
+
+ if (unlikely(worker->flags & WORKER_NOT_RUNNING))
+ return NULL;
+
+ /* this can only happen on the local cpu */
+ BUG_ON(cpu != raw_smp_processor_id());
+
+ /*
+ * The counterpart of the following dec_and_test, implied mb,
+ * worklist not empty test sequence is in insert_work().
+ * Please read comment there.
+ *
+ * NOT_RUNNING is clear. This means that trustee is not in
+ * charge and we're running on the local cpu w/ rq lock held
+ * and preemption disabled, which in turn means that none else
+ * could be manipulating idle_list, so dereferencing idle_list
+ * without gcwq lock is safe.
+ */
+ if (atomic_dec_and_test(nr_running) && !list_empty(&gcwq->worklist))
+ to_wakeup = first_worker(gcwq);
+ return to_wakeup ? to_wakeup->task : NULL;
+}
+
+/**
+ * worker_set_flags - set worker flags and adjust nr_running accordingly
* @worker: worker to set flags for
* @flags: flags to set
* @wakeup: wakeup an idle worker if necessary
*
- * Set @flags in @worker->flags.
+ * Set @flags in @worker->flags and adjust nr_running accordingly. If
+ * nr_running becomes zero and @wakeup is %true, an idle worker is
+ * woken up.
*
* LOCKING:
* spin_lock_irq(gcwq->lock).
@@ -425,22 +601,49 @@ static void wake_up_worker(struct global_cwq *gcwq)
static inline void worker_set_flags(struct worker *worker, unsigned int flags,
bool wakeup)
{
+ struct global_cwq *gcwq = worker->gcwq;
+
+ /*
+ * If transitioning into NOT_RUNNING, adjust nr_running and
+ * wake up an idle worker as necessary if requested by
+ * @wakeup.
+ */
+ if ((flags & WORKER_NOT_RUNNING) &&
+ !(worker->flags & WORKER_NOT_RUNNING)) {
+ atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
+
+ if (wakeup) {
+ if (atomic_dec_and_test(nr_running) &&
+ !list_empty(&gcwq->worklist))
+ wake_up_worker(gcwq);
+ } else
+ atomic_dec(nr_running);
+ }
+
worker->flags |= flags;
}
/**
- * worker_clr_flags - clear worker flags
+ * worker_clr_flags - clear worker flags and adjust nr_running accordingly
* @worker: worker to set flags for
* @flags: flags to clear
*
- * Clear @flags in @worker->flags.
+ * Clear @flags in @worker->flags and adjust nr_running accordingly.
*
* LOCKING:
* spin_lock_irq(gcwq->lock).
*/
static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
{
+ struct global_cwq *gcwq = worker->gcwq;
+ unsigned int oflags = worker->flags;
+
worker->flags &= ~flags;
+
+ /* if transitioning out of NOT_RUNNING, increment nr_running */
+ if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
+ if (!(worker->flags & WORKER_NOT_RUNNING))
+ atomic_inc(get_gcwq_nr_running(gcwq->cpu));
}
/**
@@ -540,6 +743,8 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work, struct list_head *head,
unsigned int extra_flags)
{
+ struct global_cwq *gcwq = cwq->gcwq;
+
/* we own @work, set data and link */
set_work_cwq(work, cwq, extra_flags);
@@ -550,7 +755,16 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
smp_wmb();
list_add_tail(&work->entry, head);
- wake_up_worker(cwq->gcwq);
+
+ /*
+ * Ensure either worker_sched_deactivated() sees the above
+ * list_add_tail() or we see zero nr_running to avoid workers
+ * lying around lazily while there are works to be processed.
+ */
+ smp_mb();
+
+ if (!atomic_read(get_gcwq_nr_running(gcwq->cpu)))
+ wake_up_worker(gcwq);
}
/**
@@ -810,11 +1024,16 @@ static void worker_enter_idle(struct worker *worker)
worker_set_flags(worker, WORKER_IDLE, false);
gcwq->nr_idle++;
+ worker->last_active = jiffies;
/* idle_list is LIFO */
list_add(&worker->entry, &gcwq->idle_list);
- if (unlikely(worker->flags & WORKER_ROGUE))
+ if (likely(!(worker->flags & WORKER_ROGUE))) {
+ if (too_many_workers(gcwq) && !timer_pending(&gcwq->idle_timer))
+ mod_timer(&gcwq->idle_timer,
+ jiffies + IDLE_WORKER_TIMEOUT);
+ } else
wake_up_all(&gcwq->trustee_wait);
}
@@ -837,6 +1056,81 @@ static void worker_leave_idle(struct worker *worker)
list_del_init(&worker->entry);
}
+/**
+ * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock gcwq
+ * @worker: self
+ *
+ * Works which are scheduled while the cpu is online must at least be
+ * scheduled to a worker which is bound to the cpu so that if they are
+ * flushed from cpu callbacks while cpu is going down, they are
+ * guaranteed to execute on the cpu.
+ *
+ * This function is to be used by rogue workers and rescuers to bind
+ * themselves to the target cpu and may race with cpu going down or
+ * coming online. kthread_bind() can't be used because it may put the
+ * worker to already dead cpu and set_cpus_allowed_ptr() can't be used
+ * verbatim as it's best effort and blocking and gcwq may be
+ * [dis]associated in the meantime.
+ *
+ * This function tries set_cpus_allowed() and locks gcwq and verifies
+ * the binding against GCWQ_DISASSOCIATED which is set during
+ * CPU_DYING and cleared during CPU_ONLINE, so if the worker enters
+ * idle state or fetches works without dropping lock, it can guarantee
+ * the scheduling requirement described in the first paragraph.
+ *
+ * CONTEXT:
+ * Might sleep. Called without any lock but returns with gcwq->lock
+ * held.
+ *
+ * RETURNS:
+ * %true if the associated gcwq is online (@worker is successfully
+ * bound), %false if offline.
+ */
+static bool worker_maybe_bind_and_lock(struct worker *worker)
+{
+ struct global_cwq *gcwq = worker->gcwq;
+ struct task_struct *task = worker->task;
+
+ while (true) {
+ /*
+ * The following call may fail, succeed or succeed
+ * without actually migrating the task to the cpu if
+ * it races with cpu hotunplug operation. Verify
+ * against GCWQ_DISASSOCIATED.
+ */
+ set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu));
+
+ spin_lock_irq(&gcwq->lock);
+ if (gcwq->flags & GCWQ_DISASSOCIATED)
+ return false;
+ if (task_cpu(task) == gcwq->cpu &&
+ cpumask_equal(&current->cpus_allowed,
+ get_cpu_mask(gcwq->cpu)))
+ return true;
+ spin_unlock_irq(&gcwq->lock);
+
+ /* CPU has come up inbetween, retry migration */
+ cpu_relax();
+ }
+}
+
+/*
+ * Function for worker->rebind_work used to rebind rogue busy workers
+ * to the associated cpu which is coming back online. This is
+ * scheduled by cpu up but can race with other cpu hotplug operations
+ * and may be executed twice without intervening cpu down.
+ */
+static void worker_rebind_fn(struct work_struct *work)
+{
+ struct worker *worker = container_of(work, struct worker, rebind_work);
+ struct global_cwq *gcwq = worker->gcwq;
+
+ if (worker_maybe_bind_and_lock(worker))
+ worker_clr_flags(worker, WORKER_REBIND);
+
+ spin_unlock_irq(&gcwq->lock);
+}
+
static struct worker *alloc_worker(void)
{
struct worker *worker;
@@ -845,6 +1139,9 @@ static struct worker *alloc_worker(void)
if (worker) {
INIT_LIST_HEAD(&worker->entry);
INIT_LIST_HEAD(&worker->scheduled);
+ INIT_WORK(&worker->rebind_work, worker_rebind_fn);
+ /* on creation a worker is in !idle && prep state */
+ worker->flags = WORKER_PREP;
}
return worker;
}
@@ -963,6 +1260,220 @@ static void destroy_worker(struct worker *worker)
ida_remove(&gcwq->worker_ida, id);
}
+static void idle_worker_timeout(unsigned long __gcwq)
+{
+ struct global_cwq *gcwq = (void *)__gcwq;
+
+ spin_lock_irq(&gcwq->lock);
+
+ if (too_many_workers(gcwq)) {
+ struct worker *worker;
+ unsigned long expires;
+
+ /* idle_list is kept in LIFO order, check the last one */
+ worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
+ expires = worker->last_active + IDLE_WORKER_TIMEOUT;
+
+ if (time_before(jiffies, expires))
+ mod_timer(&gcwq->idle_timer, expires);
+ else {
+ /* it's been idle for too long, wake up manager */
+ gcwq->flags |= GCWQ_MANAGE_WORKERS;
+ wake_up_worker(gcwq);
+ }
+ }
+
+ spin_unlock_irq(&gcwq->lock);
+}
+
+static bool send_mayday(struct work_struct *work)
+{
+ struct cpu_workqueue_struct *cwq = get_work_cwq(work);
+ struct workqueue_struct *wq = cwq->wq;
+
+ if (!(wq->flags & WQ_RESCUER))
+ return false;
+
+ /* mayday mayday mayday */
+ if (!cpumask_test_and_set_cpu(cwq->gcwq->cpu, wq->mayday_mask))
+ wake_up_process(wq->rescuer->task);
+ return true;
+}
+
+static void gcwq_mayday_timeout(unsigned long __gcwq)
+{
+ struct global_cwq *gcwq = (void *)__gcwq;
+ struct work_struct *work;
+
+ spin_lock_irq(&gcwq->lock);
+
+ if (need_to_create_worker(gcwq)) {
+ /*
+ * We've been trying to create a new worker but
+ * haven't been successful. We might be hitting an
+ * allocation deadlock. Send distress signals to
+ * rescuers.
+ */
+ list_for_each_entry(work, &gcwq->worklist, entry)
+ send_mayday(work);
+ }
+
+ spin_unlock_irq(&gcwq->lock);
+
+ mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INTERVAL);
+}
+
+/**
+ * maybe_create_worker - create a new worker if necessary
+ * @gcwq: gcwq to create a new worker for
+ *
+ * Create a new worker for @gcwq if necessary. @gcwq is guaranteed to
+ * have at least one idle worker on return from this function. If
+ * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
+ * sent to all rescuers with works scheduled on @gcwq to resolve
+ * possible allocation deadlock.
+ *
+ * On return, need_to_create_worker() is guaranteed to be false and
+ * may_start_working() true.
+ *
+ * LOCKING:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times. Does GFP_KERNEL allocations. Called only from
+ * manager.
+ *
+ * RETURNS:
+ * false if no action was taken and gcwq->lock stayed locked, true
+ * otherwise.
+ */
+static bool maybe_create_worker(struct global_cwq *gcwq)
+{
+ if (!need_to_create_worker(gcwq))
+ return false;
+restart:
+ /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
+ mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
+
+ while (true) {
+ struct worker *worker;
+
+ spin_unlock_irq(&gcwq->lock);
+
+ worker = create_worker(gcwq, true);
+ if (worker) {
+ del_timer_sync(&gcwq->mayday_timer);
+ spin_lock_irq(&gcwq->lock);
+ start_worker(worker);
+ BUG_ON(need_to_create_worker(gcwq));
+ return true;
+ }
+
+ if (!need_to_create_worker(gcwq))
+ break;
+
+ spin_unlock_irq(&gcwq->lock);
+ __set_current_state(TASK_INTERRUPTIBLE);
+ schedule_timeout(CREATE_COOLDOWN);
+ spin_lock_irq(&gcwq->lock);
+ if (!need_to_create_worker(gcwq))
+ break;
+ }
+
+ spin_unlock_irq(&gcwq->lock);
+ del_timer_sync(&gcwq->mayday_timer);
+ spin_lock_irq(&gcwq->lock);
+ if (need_to_create_worker(gcwq))
+ goto restart;
+ return true;
+}
+
+/**
+ * maybe_destroy_worker - destroy workers which have been idle for a while
+ * @gcwq: gcwq to destroy workers for
+ *
+ * Destroy @gcwq workers which have been idle for longer than
+ * IDLE_WORKER_TIMEOUT.
+ *
+ * LOCKING:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times. Called only from manager.
+ *
+ * RETURNS:
+ * false if no action was taken and gcwq->lock stayed locked, true
+ * otherwise.
+ */
+static bool maybe_destroy_workers(struct global_cwq *gcwq)
+{
+ bool ret = false;
+
+ while (too_many_workers(gcwq)) {
+ struct worker *worker;
+ unsigned long expires;
+
+ worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
+ expires = worker->last_active + IDLE_WORKER_TIMEOUT;
+
+ if (time_before(jiffies, expires)) {
+ mod_timer(&gcwq->idle_timer, expires);
+ break;
+ }
+
+ destroy_worker(worker);
+ ret = true;
+ }
+
+ return ret;
+}
+
+/**
+ * manage_workers - manage worker pool
+ * @worker: self
+ *
+ * Assume the manager role and manage gcwq worker pool @worker belongs
+ * to. At any given time, there can be only zero or one manager per
+ * gcwq. The exclusion is handled automatically by this function.
+ *
+ * The caller can safely start processing works on false return. On
+ * true return, it's guaranteed that need_to_create_worker() is false
+ * and may_start_working() is true.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times. Does GFP_KERNEL allocations.
+ *
+ * RETURNS:
+ * false if no action was taken and gcwq->lock stayed locked, true if
+ * some action was taken.
+ */
+static bool manage_workers(struct worker *worker)
+{
+ struct global_cwq *gcwq = worker->gcwq;
+ bool ret = false;
+
+ if (gcwq->flags & GCWQ_MANAGING_WORKERS)
+ return ret;
+
+ gcwq->flags &= ~GCWQ_MANAGE_WORKERS;
+ gcwq->flags |= GCWQ_MANAGING_WORKERS;
+
+ /*
+ * Destroy and then create so that may_start_working() is true
+ * on return.
+ */
+ ret |= maybe_destroy_workers(gcwq);
+ ret |= maybe_create_worker(gcwq);
+
+ gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
+
+ /*
+ * The trustee might be waiting to take over the manager
+ * position, tell it we're done.
+ */
+ if (unlikely(gcwq->trustee))
+ wake_up_all(&gcwq->trustee_wait);
+
+ return ret;
+}
+
/**
* move_linked_works - move linked works to a list
* @work: start of series of works to be scheduled
@@ -1169,24 +1680,39 @@ static void process_scheduled_works(struct worker *worker)
* worker_thread - the worker thread function
* @__worker: self
*
- * The cwq worker thread function.
+ * The gcwq worker thread function. There's a single dynamic pool of
+ * these per each cpu. These workers process all works regardless of
+ * their specific target workqueue. The only exception is works which
+ * belong to workqueues with a rescuer which will be explained in
+ * rescuer_thread().
*/
static int worker_thread(void *__worker)
{
struct worker *worker = __worker;
struct global_cwq *gcwq = worker->gcwq;
+ /* tell the scheduler that this is a workqueue worker */
+ worker->task->flags |= PF_WQ_WORKER;
woke_up:
spin_lock_irq(&gcwq->lock);
/* DIE can be set only while we're idle, checking here is enough */
if (worker->flags & WORKER_DIE) {
spin_unlock_irq(&gcwq->lock);
+ worker->task->flags &= ~PF_WQ_WORKER;
return 0;
}
worker_leave_idle(worker);
recheck:
+ /* no more worker necessary? */
+ if (!need_more_worker(gcwq))
+ goto sleep;
+
+ /* do we need to manage? */
+ if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))
+ goto recheck;
+
/*
* ->scheduled list can only be filled while a worker is
* preparing to process a work or actually processing it.
@@ -1194,27 +1720,18 @@ recheck:
*/
BUG_ON(!list_empty(&worker->scheduled));
- while (!list_empty(&gcwq->worklist)) {
+ /*
+ * When control reaches this point, we're guaranteed to have
+ * at least one idle worker or that someone else has already
+ * assumed the manager role.
+ */
+ worker_clr_flags(worker, WORKER_PREP);
+
+ do {
struct work_struct *work =
list_first_entry(&gcwq->worklist,
struct work_struct, entry);
- /*
- * The following is a rather inefficient way to close
- * race window against cpu hotplug operations. Will
- * be replaced soon.
- */
- if (unlikely(!(worker->flags & WORKER_ROGUE) &&
- !cpumask_equal(&worker->task->cpus_allowed,
- get_cpu_mask(gcwq->cpu)))) {
- spin_unlock_irq(&gcwq->lock);
- set_cpus_allowed_ptr(worker->task,
- get_cpu_mask(gcwq->cpu));
- cpu_relax();
- spin_lock_irq(&gcwq->lock);
- goto recheck;
- }
-
if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
/* optimization path, not strictly necessary */
process_one_work(worker, work);
@@ -1224,13 +1741,19 @@ recheck:
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker);
}
- }
+ } while (keep_working(gcwq));
+
+ worker_set_flags(worker, WORKER_PREP, false);
+ if (unlikely(need_to_manage_workers(gcwq)) && manage_workers(worker))
+ goto recheck;
+sleep:
/*
- * gcwq->lock is held and there's no work to process, sleep.
- * Workers are woken up only while holding gcwq->lock, so
- * setting the current state before releasing gcwq->lock is
- * enough to prevent losing any event.
+ * gcwq->lock is held and there's no work to process and no
+ * need to manage, sleep. Workers are woken up only while
+ * holding gcwq->lock or from local cpu, so setting the
+ * current state before releasing gcwq->lock is enough to
+ * prevent losing any event.
*/
worker_enter_idle(worker);
__set_current_state(TASK_INTERRUPTIBLE);
@@ -1239,6 +1762,68 @@ recheck:
goto woke_up;
}
+/**
+ * rescuer_thread - the rescuer thread function
+ * @__wq: the associated workqueue
+ *
+ * Workqueue rescuer thread function. There's one rescuer for each
+ * workqueue which has WQ_RESCUER set.
+ *
+ * Regular work processing on a gcwq may block trying to create a new
+ * worker which uses GFP_KERNEL allocation which has slight chance of
+ * developing into deadlock if some works currently on the same queue
+ * need to be processed to satisfy the GFP_KERNEL allocation. This is
+ * the problem rescuer solves.
+ *
+ * When such condition is possible, the gcwq summons rescuers of all
+ * workqueues which have works queued on the gcwq and let them process
+ * those works so that forward progress can be guaranteed.
+ *
+ * This should happen rarely.
+ */
+static int rescuer_thread(void *__wq)
+{
+ struct workqueue_struct *wq = __wq;
+ struct worker *rescuer = wq->rescuer;
+ struct list_head *scheduled = &rescuer->scheduled;
+ unsigned int cpu;
+
+ set_user_nice(current, RESCUER_NICE_LEVEL);
+repeat:
+ set_current_state(TASK_INTERRUPTIBLE);
+
+ if (kthread_should_stop())
+ return 0;
+
+ for_each_cpu(cpu, wq->mayday_mask) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+ struct global_cwq *gcwq = cwq->gcwq;
+ struct work_struct *work, *n;
+
+ __set_current_state(TASK_RUNNING);
+ cpumask_clear_cpu(cpu, wq->mayday_mask);
+
+ /* migrate to the target cpu if possible */
+ rescuer->gcwq = gcwq;
+ worker_maybe_bind_and_lock(rescuer);
+
+ /*
+ * Slurp in all works issued via this workqueue and
+ * process'em.
+ */
+ BUG_ON(!list_empty(&rescuer->scheduled));
+ list_for_each_entry_safe(work, n, &gcwq->worklist, entry)
+ if (get_work_cwq(work) == cwq)
+ move_linked_works(work, scheduled, &n);
+
+ process_scheduled_works(rescuer);
+ spin_unlock_irq(&gcwq->lock);
+ }
+
+ schedule();
+ goto repeat;
+}
+
struct wq_barrier {
struct work_struct work;
struct completion done;
@@ -1998,7 +2583,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
const char *lock_name)
{
struct workqueue_struct *wq;
- bool failed = false;
unsigned int cpu;
max_active = clamp_val(max_active, 1, INT_MAX);
@@ -2023,13 +2607,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);
- cpu_maps_update_begin();
- /*
- * We must initialize cwqs for each possible cpu even if we
- * are going to call destroy_workqueue() finally. Otherwise
- * cpu_up() can hit the uninitialized cwq once we drop the
- * lock.
- */
for_each_possible_cpu(cpu) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
struct global_cwq *gcwq = get_gcwq(cpu);
@@ -2040,14 +2617,25 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
cwq->flush_color = -1;
cwq->max_active = max_active;
INIT_LIST_HEAD(&cwq->delayed_works);
+ }
- if (failed)
- continue;
- cwq->worker = create_worker(gcwq, cpu_online(cpu));
- if (cwq->worker)
- start_worker(cwq->worker);
- else
- failed = true;
+ if (flags & WQ_RESCUER) {
+ struct worker *rescuer;
+
+ if (!alloc_cpumask_var(&wq->mayday_mask, GFP_KERNEL))
+ goto err;
+
+ wq->rescuer = rescuer = alloc_worker();
+ if (!rescuer)
+ goto err;
+
+ rescuer->task = kthread_create(rescuer_thread, wq, "%s", name);
+ if (IS_ERR(rescuer->task))
+ goto err;
+
+ wq->rescuer = rescuer;
+ rescuer->task->flags |= PF_THREAD_BOUND;
+ wake_up_process(rescuer->task);
}
/*
@@ -2065,16 +2653,12 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
spin_unlock(&workqueue_lock);
- cpu_maps_update_done();
-
- if (failed) {
- destroy_workqueue(wq);
- wq = NULL;
- }
return wq;
err:
if (wq) {
free_cwqs(wq->cpu_wq);
+ free_cpumask_var(wq->mayday_mask);
+ kfree(wq->rescuer);
kfree(wq);
}
return NULL;
@@ -2097,42 +2681,26 @@ void destroy_workqueue(struct workqueue_struct *wq)
* wq list is used to freeze wq, remove from list after
* flushing is complete in case freeze races us.
*/
- cpu_maps_update_begin();
spin_lock(&workqueue_lock);
list_del(&wq->list);
spin_unlock(&workqueue_lock);
- cpu_maps_update_done();
+ /* sanity check */
for_each_possible_cpu(cpu) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
- struct global_cwq *gcwq = cwq->gcwq;
int i;
- if (cwq->worker) {
- retry:
- spin_lock_irq(&gcwq->lock);
- /*
- * Worker can only be destroyed while idle.
- * Wait till it becomes idle. This is ugly
- * and prone to starvation. It will go away
- * once dynamic worker pool is implemented.
- */
- if (!(cwq->worker->flags & WORKER_IDLE)) {
- spin_unlock_irq(&gcwq->lock);
- msleep(100);
- goto retry;
- }
- destroy_worker(cwq->worker);
- cwq->worker = NULL;
- spin_unlock_irq(&gcwq->lock);
- }
-
for (i = 0; i < WORK_NR_COLORS; i++)
BUG_ON(cwq->nr_in_flight[i]);
BUG_ON(cwq->nr_active);
BUG_ON(!list_empty(&cwq->delayed_works));
}
+ if (wq->flags & WQ_RESCUER) {
+ kthread_stop(wq->rescuer->task);
+ free_cpumask_var(wq->mayday_mask);
+ }
+
free_cwqs(wq->cpu_wq);
kfree(wq);
}
@@ -2141,10 +2709,18 @@ EXPORT_SYMBOL_GPL(destroy_workqueue);
/*
* CPU hotplug.
*
- * CPU hotplug is implemented by allowing cwqs to be detached from
- * CPU, running with unbound workers and allowing them to be
- * reattached later if the cpu comes back online. A separate thread
- * is created to govern cwqs in such state and is called the trustee.
+ * There are two challenges in supporting CPU hotplug. Firstly, there
+ * are a lot of assumptions on strong associations among work, cwq and
+ * gcwq which make migrating pending and scheduled works very
+ * difficult to implement without impacting hot paths. Secondly,
+ * gcwqs serve mix of short, long and very long running works making
+ * blocked draining impractical.
+ *
+ * This is solved by allowing a gcwq to be detached from CPU, running
+ * it with unbound (rogue) workers and allowing it to be reattached
+ * later if the cpu comes back online. A separate thread is created
+ * to govern a gcwq in such state and is called the trustee of the
+ * gcwq.
*
* Trustee states and their descriptions.
*
@@ -2152,11 +2728,12 @@ EXPORT_SYMBOL_GPL(destroy_workqueue);
* new trustee is started with this state.
*
* IN_CHARGE Once started, trustee will enter this state after
- * making all existing workers rogue. DOWN_PREPARE waits
- * for trustee to enter this state. After reaching
- * IN_CHARGE, trustee tries to execute the pending
- * worklist until it's empty and the state is set to
- * BUTCHER, or the state is set to RELEASE.
+ * assuming the manager role and making all existing
+ * workers rogue. DOWN_PREPARE waits for trustee to
+ * enter this state. After reaching IN_CHARGE, trustee
+ * tries to execute the pending worklist until it's empty
+ * and the state is set to BUTCHER, or the state is set
+ * to RELEASE.
*
* BUTCHER Command state which is set by the cpu callback after
* the cpu has went down. Once this state is set trustee
@@ -2167,7 +2744,9 @@ EXPORT_SYMBOL_GPL(destroy_workqueue);
* RELEASE Command state which is set by the cpu callback if the
* cpu down has been canceled or it has come online
* again. After recognizing this state, trustee stops
- * trying to drain or butcher and transits to DONE.
+ * trying to drain or butcher and clears ROGUE, rebinds
+ * all remaining workers back to the cpu and releases
+ * manager role.
*
* DONE Trustee will enter this state after BUTCHER or RELEASE
* is complete.
@@ -2233,17 +2812,24 @@ static int __cpuinit trustee_thread(void *__gcwq)
{
struct global_cwq *gcwq = __gcwq;
struct worker *worker;
+ struct work_struct *work;
struct hlist_node *pos;
+ long rc;
int i;
BUG_ON(gcwq->cpu != smp_processor_id());
spin_lock_irq(&gcwq->lock);
/*
- * Make all workers rogue. Trustee must be bound to the
- * target cpu and can't be cancelled.
+ * Claim the manager position and make all workers rogue.
+ * Trustee must be bound to the target cpu and can't be
+ * cancelled.
*/
BUG_ON(gcwq->cpu != smp_processor_id());
+ rc = trustee_wait_event(!(gcwq->flags & GCWQ_MANAGING_WORKERS));
+ BUG_ON(rc < 0);
+
+ gcwq->flags |= GCWQ_MANAGING_WORKERS;
list_for_each_entry(worker, &gcwq->idle_list, entry)
worker_set_flags(worker, WORKER_ROGUE, false);
@@ -2252,6 +2838,28 @@ static int __cpuinit trustee_thread(void *__gcwq)
worker_set_flags(worker, WORKER_ROGUE, false);
/*
+ * Call schedule() so that we cross rq->lock and thus can
+ * guarantee sched callbacks see the rogue flag. This is
+ * necessary as scheduler callbacks may be invoked from other
+ * cpus.
+ */
+ spin_unlock_irq(&gcwq->lock);
+ schedule();
+ spin_lock_irq(&gcwq->lock);
+
+ /*
+ * Sched callbacks are disabled now. gcwq->nr_running should
+ * be zero and will stay that way, making need_more_worker()
+ * and keep_working() always return true as long as the
+ * worklist is not empty.
+ */
+ WARN_ON_ONCE(atomic_read(get_gcwq_nr_running(gcwq->cpu)) != 0);
+
+ spin_unlock_irq(&gcwq->lock);
+ del_timer_sync(&gcwq->idle_timer);
+ spin_lock_irq(&gcwq->lock);
+
+ /*
* We're now in charge. Notify and proceed to drain. We need
* to keep the gcwq running during the whole CPU down
* procedure as other cpu hotunplug callbacks may need to
@@ -2263,18 +2871,90 @@ static int __cpuinit trustee_thread(void *__gcwq)
/*
* The original cpu is in the process of dying and may go away
* anytime now. When that happens, we and all workers would
- * be migrated to other cpus. Try draining any left work.
- * Note that if the gcwq is frozen, there may be frozen works
- * in freezeable cwqs. Don't declare completion while frozen.
+ * be migrated to other cpus. Try draining any left work. We
+ * want to get it over with ASAP - spam rescuers, wake up as
+ * many idlers as necessary and create new ones till the
+ * worklist is empty. Note that if the gcwq is frozen, there
+ * may be frozen works in freezeable cwqs. Don't declare
+ * completion while frozen.
*/
while (gcwq->nr_workers != gcwq->nr_idle ||
gcwq->flags & GCWQ_FREEZING ||
gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
+ int nr_works = 0;
+
+ list_for_each_entry(work, &gcwq->worklist, entry) {
+ send_mayday(work);
+ nr_works++;
+ }
+
+ list_for_each_entry(worker, &gcwq->idle_list, entry) {
+ if (!nr_works--)
+ break;
+ wake_up_process(worker->task);
+ }
+
+ if (need_to_create_worker(gcwq)) {
+ spin_unlock_irq(&gcwq->lock);
+ worker = create_worker(gcwq, false);
+ spin_lock_irq(&gcwq->lock);
+ if (worker) {
+ worker_set_flags(worker, WORKER_ROGUE, false);
+ start_worker(worker);
+ }
+ }
+
/* give a breather */
if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
break;
}
+ /*
+ * Either all works have been scheduled and cpu is down, or
+ * cpu down has already been canceled. Wait for and butcher
+ * all workers till we're canceled.
+ */
+ do {
+ rc = trustee_wait_event(!list_empty(&gcwq->idle_list));
+ while (!list_empty(&gcwq->idle_list))
+ destroy_worker(list_first_entry(&gcwq->idle_list,
+ struct worker, entry));
+ } while (gcwq->nr_workers && rc >= 0);
+
+ /*
+ * At this point, either draining has completed and no worker
+ * is left, or cpu down has been canceled or the cpu is being
+ * brought back up. There shouldn't be any idle one left.
+ * Tell the remaining busy ones to rebind once it finishes the
+ * currently scheduled works by scheduling the rebind_work.
+ */
+ WARN_ON(!list_empty(&gcwq->idle_list));
+
+ for_each_busy_worker(worker, i, pos, gcwq) {
+ struct work_struct *rebind_work = &worker->rebind_work;
+
+ /*
+ * Rebind_work may race with future cpu hotplug
+ * operations. Use a separate flag to mark that
+ * rebinding is scheduled.
+ */
+ worker_set_flags(worker, WORKER_REBIND, false);
+ worker_clr_flags(worker, WORKER_ROGUE);
+
+ /* queue rebind_work, wq doesn't matter, use the default one */
+ if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
+ work_data_bits(rebind_work)))
+ continue;
+
+ debug_work_activate(rebind_work);
+ insert_work(get_cwq(gcwq->cpu, keventd_wq), rebind_work,
+ worker->scheduled.next,
+ work_color_to_flags(WORK_NO_COLOR));
+ }
+
+ /* relinquish manager role */
+ gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
+
/* notify completion */
gcwq->trustee = NULL;
gcwq->trustee_state = TRUSTEE_DONE;
@@ -2313,10 +2993,8 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
unsigned int cpu = (unsigned long)hcpu;
struct global_cwq *gcwq = get_gcwq(cpu);
struct task_struct *new_trustee = NULL;
- struct worker *worker;
- struct hlist_node *pos;
+ struct worker *uninitialized_var(new_worker);
unsigned long flags;
- int i;
action &= ~CPU_TASKS_FROZEN;
@@ -2327,6 +3005,15 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
if (IS_ERR(new_trustee))
return notifier_from_errno(PTR_ERR(new_trustee));
kthread_bind(new_trustee, cpu);
+ /* fall through */
+ case CPU_UP_PREPARE:
+ BUG_ON(gcwq->first_idle);
+ new_worker = create_worker(gcwq, false);
+ if (!new_worker) {
+ if (new_trustee)
+ kthread_stop(new_trustee);
+ return NOTIFY_BAD;
+ }
}
/* some are called w/ irq disabled, don't disturb irq status */
@@ -2340,26 +3027,50 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
gcwq->trustee_state = TRUSTEE_START;
wake_up_process(gcwq->trustee);
wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
+ /* fall through */
+ case CPU_UP_PREPARE:
+ BUG_ON(gcwq->first_idle);
+ gcwq->first_idle = new_worker;
+ break;
+
+ case CPU_DYING:
+ /*
+ * Before this, the trustee and all workers except for
+ * the ones which are still executing works from
+ * before the last CPU down must be on the cpu. After
+ * this, they'll all be diasporas.
+ */
+ gcwq->flags |= GCWQ_DISASSOCIATED;
break;
case CPU_POST_DEAD:
gcwq->trustee_state = TRUSTEE_BUTCHER;
+ /* fall through */
+ case CPU_UP_CANCELED:
+ destroy_worker(gcwq->first_idle);
+ gcwq->first_idle = NULL;
break;
case CPU_DOWN_FAILED:
case CPU_ONLINE:
+ gcwq->flags &= ~GCWQ_DISASSOCIATED;
if (gcwq->trustee_state != TRUSTEE_DONE) {
gcwq->trustee_state = TRUSTEE_RELEASE;
wake_up_process(gcwq->trustee);
wait_trustee_state(gcwq, TRUSTEE_DONE);
}
- /* clear ROGUE from all workers */
- list_for_each_entry(worker, &gcwq->idle_list, entry)
- worker_clr_flags(worker, WORKER_ROGUE);
-
- for_each_busy_worker(worker, i, pos, gcwq)
- worker_clr_flags(worker, WORKER_ROGUE);
+ /*
+ * Trustee is done and there might be no worker left.
+ * Put the first_idle in and request a real manager to
+ * take a look.
+ */
+ spin_unlock_irq(&gcwq->lock);
+ kthread_bind(gcwq->first_idle->task, cpu);
+ spin_lock_irq(&gcwq->lock);
+ gcwq->flags |= GCWQ_MANAGE_WORKERS;
+ start_worker(gcwq->first_idle);
+ gcwq->first_idle = NULL;
break;
}
@@ -2548,10 +3259,10 @@ void thaw_workqueues(void)
if (wq->single_cpu == gcwq->cpu &&
!cwq->nr_active && list_empty(&cwq->delayed_works))
cwq_unbind_single_cpu(cwq);
-
- wake_up_process(cwq->worker->task);
}
+ wake_up_worker(gcwq);
+
spin_unlock_irq(&gcwq->lock);
}
@@ -2588,12 +3299,31 @@ void __init init_workqueues(void)
for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
+ init_timer_deferrable(&gcwq->idle_timer);
+ gcwq->idle_timer.function = idle_worker_timeout;
+ gcwq->idle_timer.data = (unsigned long)gcwq;
+
+ setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,
+ (unsigned long)gcwq);
+
ida_init(&gcwq->worker_ida);
gcwq->trustee_state = TRUSTEE_DONE;
init_waitqueue_head(&gcwq->trustee_wait);
}
+ /* create the initial worker */
+ for_each_online_cpu(cpu) {
+ struct global_cwq *gcwq = get_gcwq(cpu);
+ struct worker *worker;
+
+ worker = create_worker(gcwq, true);
+ BUG_ON(!worker);
+ spin_lock_irq(&gcwq->lock);
+ start_worker(worker);
+ spin_unlock_irq(&gcwq->lock);
+ }
+
keventd_wq = create_workqueue("events");
BUG_ON(!keventd_wq);
}