diff options
| author | Rusty Russell <rusty@rustcorp.com.au> | 2004-03-18 16:03:54 -0800 |
|---|---|---|
| committer | Linus Torvalds <torvalds@ppc970.osdl.org> | 2004-03-18 16:03:54 -0800 |
| commit | 6bcaa29dc02edc04b6f2baa4c85ace1c2c69871a (patch) | |
| tree | 0e2fecc2c867f889537449f73129b6615b9a5e27 /kernel/workqueue.c | |
| parent | 010b27dc120ac4f57e22998854ef4373c174b171 (diff) | |
[PATCH] Hotplug CPUs: Workqueue Changes
Workqueues need to bring up/destroy the per-cpu thread on cpu up/down.
1) Add a global list of workqueues, and keep the name in the structure
(to name the newly created thread).
2) Remove BUG_ON in run_workqueue, since thread is dragged off CPU when
it goes down.
3) Lock out cpu up/down in flush_workqueue, create_workqueue and
destroy_workqueue.
4) Add notifier to add/destroy workqueue threads, and take over work.
Diffstat (limited to 'kernel/workqueue.c')
| -rw-r--r-- | kernel/workqueue.c | 115 |
1 files changed, 106 insertions, 9 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c index 9cbfa8e69ac6..2046c6d763fe 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -22,6 +22,8 @@ #include <linux/completion.h> #include <linux/workqueue.h> #include <linux/slab.h> +#include <linux/cpu.h> +#include <linux/notifier.h> #include <linux/kthread.h> /* @@ -56,8 +58,22 @@ struct cpu_workqueue_struct { */ struct workqueue_struct { struct cpu_workqueue_struct cpu_wq[NR_CPUS]; + const char *name; + struct list_head list; }; +#ifdef CONFIG_HOTPLUG_CPU +/* All the workqueues on the system, for hotplug cpu to add/remove + threads to each one as cpus come/go. Protected by cpucontrol + sem. */ +static LIST_HEAD(workqueues); +#define add_workqueue(wq) list_add(&(wq)->list, &workqueues) +#define del_workqueue(wq) list_del(&(wq)->list) +#else +#define add_workqueue(wq) +#define del_workqueue(wq) +#endif /* CONFIG_HOTPLUG_CPU */ + /* Preempt must be disabled. */ static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work) @@ -161,7 +177,6 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq) static int worker_thread(void *__cwq) { struct cpu_workqueue_struct *cwq = __cwq; - int cpu = cwq - cwq->wq->cpu_wq; DECLARE_WAITQUEUE(wait, current); struct k_sigaction sa; sigset_t blocked; @@ -169,7 +184,6 @@ static int worker_thread(void *__cwq) current->flags |= PF_IOTHREAD; set_user_nice(current, -10); - BUG_ON(smp_processor_id() != cpu); /* Block and flush all signals */ sigfillset(&blocked); @@ -219,6 +233,7 @@ void fastcall flush_workqueue(struct workqueue_struct *wq) might_sleep(); + lock_cpu_hotplug(); for (cpu = 0; cpu < NR_CPUS; cpu++) { DEFINE_WAIT(wait); long sequence_needed; @@ -248,11 +263,10 @@ void fastcall flush_workqueue(struct workqueue_struct *wq) finish_wait(&cwq->work_done, &wait); spin_unlock_irq(&cwq->lock); } + unlock_cpu_hotplug(); } -static int create_workqueue_thread(struct workqueue_struct *wq, - const char *name, - int cpu) +static int create_workqueue_thread(struct workqueue_struct *wq, int cpu) { struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu; struct task_struct *p; @@ -266,7 +280,7 @@ static int create_workqueue_thread(struct workqueue_struct *wq, init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->work_done); - p = kthread_create(worker_thread, cwq, "%s/%d", name, cpu); + p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); if (IS_ERR(p)) return PTR_ERR(p); cwq->thread = p; @@ -286,14 +300,19 @@ struct workqueue_struct *create_workqueue(const char *name) return NULL; memset(wq, 0, sizeof(*wq)); + wq->name = name; + /* We don't need the distraction of CPUs appearing and vanishing. */ + lock_cpu_hotplug(); for (cpu = 0; cpu < NR_CPUS; cpu++) { if (!cpu_online(cpu)) continue; - if (create_workqueue_thread(wq, name, cpu) < 0) + if (create_workqueue_thread(wq, cpu) < 0) destroy = 1; else wake_up_process(wq->cpu_wq[cpu].thread); } + add_workqueue(wq); + /* * Was there any error during startup? If yes then clean up: */ @@ -301,16 +320,23 @@ struct workqueue_struct *create_workqueue(const char *name) destroy_workqueue(wq); wq = NULL; } + unlock_cpu_hotplug(); return wq; } static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) { struct cpu_workqueue_struct *cwq; + unsigned long flags; + struct task_struct *p; cwq = wq->cpu_wq + cpu; - if (cwq->thread) - kthread_stop(cwq->thread); + spin_lock_irqsave(&cwq->lock, flags); + p = cwq->thread; + cwq->thread = NULL; + spin_unlock_irqrestore(&cwq->lock, flags); + if (p) + kthread_stop(p); } void destroy_workqueue(struct workqueue_struct *wq) @@ -319,10 +345,14 @@ void destroy_workqueue(struct workqueue_struct *wq) flush_workqueue(wq); + /* We don't need the distraction of CPUs appearing and vanishing. */ + lock_cpu_hotplug(); for (cpu = 0; cpu < NR_CPUS; cpu++) { if (cpu_online(cpu)) cleanup_workqueue_thread(wq, cpu); } + del_workqueue(wq); + unlock_cpu_hotplug(); kfree(wq); } @@ -364,8 +394,75 @@ int current_is_keventd(void) } +#ifdef CONFIG_HOTPLUG_CPU +/* Take the work from this (downed) CPU. */ +static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) +{ + struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu; + LIST_HEAD(list); + struct work_struct *work; + + spin_lock_irq(&cwq->lock); + list_splice_init(&cwq->worklist, &list); + + while (!list_empty(&list)) { + printk("Taking work for %s\n", wq->name); + work = list_entry(list.next,struct work_struct,entry); + list_del(&work->entry); + __queue_work(wq->cpu_wq + smp_processor_id(), work); + } + spin_unlock_irq(&cwq->lock); +} + +/* We're holding the cpucontrol mutex here */ +static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, + unsigned long action, + void *hcpu) +{ + unsigned int hotcpu = (unsigned long)hcpu; + struct workqueue_struct *wq; + + switch (action) { + case CPU_UP_PREPARE: + /* Create a new workqueue thread for it. */ + list_for_each_entry(wq, &workqueues, list) { + if (create_workqueue_thread(wq, hotcpu) < 0) { + printk("workqueue for %i failed\n", hotcpu); + return NOTIFY_BAD; + } + } + break; + + case CPU_ONLINE: + /* Kick off worker threads. */ + list_for_each_entry(wq, &workqueues, list) + wake_up_process(wq->cpu_wq[hotcpu].thread); + break; + + case CPU_UP_CANCELED: + list_for_each_entry(wq, &workqueues, list) { + /* Unbind so it can run. */ + kthread_bind(wq->cpu_wq[hotcpu].thread, + smp_processor_id()); + cleanup_workqueue_thread(wq, hotcpu); + } + break; + + case CPU_DEAD: + list_for_each_entry(wq, &workqueues, list) + cleanup_workqueue_thread(wq, hotcpu); + list_for_each_entry(wq, &workqueues, list) + take_over_work(wq, hotcpu); + break; + } + + return NOTIFY_OK; +} +#endif + void init_workqueues(void) { + hotcpu_notifier(workqueue_cpu_callback, 0); keventd_wq = create_workqueue("events"); BUG_ON(!keventd_wq); } |
