summaryrefslogtreecommitdiff
path: root/kernel/workqueue.c
diff options
context:
space:
mode:
authorRusty Russell <rusty@rustcorp.com.au>2004-03-18 16:03:54 -0800
committerLinus Torvalds <torvalds@ppc970.osdl.org>2004-03-18 16:03:54 -0800
commit6bcaa29dc02edc04b6f2baa4c85ace1c2c69871a (patch)
tree0e2fecc2c867f889537449f73129b6615b9a5e27 /kernel/workqueue.c
parent010b27dc120ac4f57e22998854ef4373c174b171 (diff)
[PATCH] Hotplug CPUs: Workqueue Changes
Workqueues need to bring up/destroy the per-cpu thread on cpu up/down. 1) Add a global list of workqueues, and keep the name in the structure (to name the newly created thread). 2) Remove BUG_ON in run_workqueue, since thread is dragged off CPU when it goes down. 3) Lock out cpu up/down in flush_workqueue, create_workqueue and destroy_workqueue. 4) Add notifier to add/destroy workqueue threads, and take over work.
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r--kernel/workqueue.c115
1 files changed, 106 insertions, 9 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 9cbfa8e69ac6..2046c6d763fe 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -22,6 +22,8 @@
#include <linux/completion.h>
#include <linux/workqueue.h>
#include <linux/slab.h>
+#include <linux/cpu.h>
+#include <linux/notifier.h>
#include <linux/kthread.h>
/*
@@ -56,8 +58,22 @@ struct cpu_workqueue_struct {
*/
struct workqueue_struct {
struct cpu_workqueue_struct cpu_wq[NR_CPUS];
+ const char *name;
+ struct list_head list;
};
+#ifdef CONFIG_HOTPLUG_CPU
+/* All the workqueues on the system, for hotplug cpu to add/remove
+ threads to each one as cpus come/go. Protected by cpucontrol
+ sem. */
+static LIST_HEAD(workqueues);
+#define add_workqueue(wq) list_add(&(wq)->list, &workqueues)
+#define del_workqueue(wq) list_del(&(wq)->list)
+#else
+#define add_workqueue(wq)
+#define del_workqueue(wq)
+#endif /* CONFIG_HOTPLUG_CPU */
+
/* Preempt must be disabled. */
static void __queue_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
@@ -161,7 +177,6 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
- int cpu = cwq - cwq->wq->cpu_wq;
DECLARE_WAITQUEUE(wait, current);
struct k_sigaction sa;
sigset_t blocked;
@@ -169,7 +184,6 @@ static int worker_thread(void *__cwq)
current->flags |= PF_IOTHREAD;
set_user_nice(current, -10);
- BUG_ON(smp_processor_id() != cpu);
/* Block and flush all signals */
sigfillset(&blocked);
@@ -219,6 +233,7 @@ void fastcall flush_workqueue(struct workqueue_struct *wq)
might_sleep();
+ lock_cpu_hotplug();
for (cpu = 0; cpu < NR_CPUS; cpu++) {
DEFINE_WAIT(wait);
long sequence_needed;
@@ -248,11 +263,10 @@ void fastcall flush_workqueue(struct workqueue_struct *wq)
finish_wait(&cwq->work_done, &wait);
spin_unlock_irq(&cwq->lock);
}
+ unlock_cpu_hotplug();
}
-static int create_workqueue_thread(struct workqueue_struct *wq,
- const char *name,
- int cpu)
+static int create_workqueue_thread(struct workqueue_struct *wq, int cpu)
{
struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
struct task_struct *p;
@@ -266,7 +280,7 @@ static int create_workqueue_thread(struct workqueue_struct *wq,
init_waitqueue_head(&cwq->more_work);
init_waitqueue_head(&cwq->work_done);
- p = kthread_create(worker_thread, cwq, "%s/%d", name, cpu);
+ p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
if (IS_ERR(p))
return PTR_ERR(p);
cwq->thread = p;
@@ -286,14 +300,19 @@ struct workqueue_struct *create_workqueue(const char *name)
return NULL;
memset(wq, 0, sizeof(*wq));
+ wq->name = name;
+ /* We don't need the distraction of CPUs appearing and vanishing. */
+ lock_cpu_hotplug();
for (cpu = 0; cpu < NR_CPUS; cpu++) {
if (!cpu_online(cpu))
continue;
- if (create_workqueue_thread(wq, name, cpu) < 0)
+ if (create_workqueue_thread(wq, cpu) < 0)
destroy = 1;
else
wake_up_process(wq->cpu_wq[cpu].thread);
}
+ add_workqueue(wq);
+
/*
* Was there any error during startup? If yes then clean up:
*/
@@ -301,16 +320,23 @@ struct workqueue_struct *create_workqueue(const char *name)
destroy_workqueue(wq);
wq = NULL;
}
+ unlock_cpu_hotplug();
return wq;
}
static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
{
struct cpu_workqueue_struct *cwq;
+ unsigned long flags;
+ struct task_struct *p;
cwq = wq->cpu_wq + cpu;
- if (cwq->thread)
- kthread_stop(cwq->thread);
+ spin_lock_irqsave(&cwq->lock, flags);
+ p = cwq->thread;
+ cwq->thread = NULL;
+ spin_unlock_irqrestore(&cwq->lock, flags);
+ if (p)
+ kthread_stop(p);
}
void destroy_workqueue(struct workqueue_struct *wq)
@@ -319,10 +345,14 @@ void destroy_workqueue(struct workqueue_struct *wq)
flush_workqueue(wq);
+ /* We don't need the distraction of CPUs appearing and vanishing. */
+ lock_cpu_hotplug();
for (cpu = 0; cpu < NR_CPUS; cpu++) {
if (cpu_online(cpu))
cleanup_workqueue_thread(wq, cpu);
}
+ del_workqueue(wq);
+ unlock_cpu_hotplug();
kfree(wq);
}
@@ -364,8 +394,75 @@ int current_is_keventd(void)
}
+#ifdef CONFIG_HOTPLUG_CPU
+/* Take the work from this (downed) CPU. */
+static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
+{
+ struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
+ LIST_HEAD(list);
+ struct work_struct *work;
+
+ spin_lock_irq(&cwq->lock);
+ list_splice_init(&cwq->worklist, &list);
+
+ while (!list_empty(&list)) {
+ printk("Taking work for %s\n", wq->name);
+ work = list_entry(list.next,struct work_struct,entry);
+ list_del(&work->entry);
+ __queue_work(wq->cpu_wq + smp_processor_id(), work);
+ }
+ spin_unlock_irq(&cwq->lock);
+}
+
+/* We're holding the cpucontrol mutex here */
+static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
+ unsigned long action,
+ void *hcpu)
+{
+ unsigned int hotcpu = (unsigned long)hcpu;
+ struct workqueue_struct *wq;
+
+ switch (action) {
+ case CPU_UP_PREPARE:
+ /* Create a new workqueue thread for it. */
+ list_for_each_entry(wq, &workqueues, list) {
+ if (create_workqueue_thread(wq, hotcpu) < 0) {
+ printk("workqueue for %i failed\n", hotcpu);
+ return NOTIFY_BAD;
+ }
+ }
+ break;
+
+ case CPU_ONLINE:
+ /* Kick off worker threads. */
+ list_for_each_entry(wq, &workqueues, list)
+ wake_up_process(wq->cpu_wq[hotcpu].thread);
+ break;
+
+ case CPU_UP_CANCELED:
+ list_for_each_entry(wq, &workqueues, list) {
+ /* Unbind so it can run. */
+ kthread_bind(wq->cpu_wq[hotcpu].thread,
+ smp_processor_id());
+ cleanup_workqueue_thread(wq, hotcpu);
+ }
+ break;
+
+ case CPU_DEAD:
+ list_for_each_entry(wq, &workqueues, list)
+ cleanup_workqueue_thread(wq, hotcpu);
+ list_for_each_entry(wq, &workqueues, list)
+ take_over_work(wq, hotcpu);
+ break;
+ }
+
+ return NOTIFY_OK;
+}
+#endif
+
void init_workqueues(void)
{
+ hotcpu_notifier(workqueue_cpu_callback, 0);
keventd_wq = create_workqueue("events");
BUG_ON(!keventd_wq);
}