diff options
Diffstat (limited to 'kernel/workqueue.c')
| -rw-r--r-- | kernel/workqueue.c | 85 |
1 files changed, 44 insertions, 41 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c index 5e9a520cfd77..968837218657 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -27,13 +27,21 @@ #include <linux/slab.h> /* - * The per-CPU workqueue: + * The per-CPU workqueue. + * + * The sequence counters are for flush_scheduled_work(). It wants to wait + * until until all currently-scheduled works are completed, but it doesn't + * want to be livelocked by new, incoming ones. So it waits until + * remove_sequence is >= the insert_sequence which pertained when + * flush_scheduled_work() was called. */ struct cpu_workqueue_struct { spinlock_t lock; - atomic_t nr_queued; + long remove_sequence; /* Least-recently added (next to run) */ + long insert_sequence; /* Next to add */ + struct list_head worklist; wait_queue_head_t more_work; wait_queue_head_t work_done; @@ -71,10 +79,9 @@ int queue_work(struct workqueue_struct *wq, struct work_struct *work) spin_lock_irqsave(&cwq->lock, flags); list_add_tail(&work->entry, &cwq->worklist); - atomic_inc(&cwq->nr_queued); - spin_unlock_irqrestore(&cwq->lock, flags); - + cwq->insert_sequence++; wake_up(&cwq->more_work); + spin_unlock_irqrestore(&cwq->lock, flags); ret = 1; } put_cpu(); @@ -93,11 +100,13 @@ static void delayed_work_timer_fn(unsigned long __data) */ spin_lock_irqsave(&cwq->lock, flags); list_add_tail(&work->entry, &cwq->worklist); + cwq->insert_sequence++; wake_up(&cwq->more_work); spin_unlock_irqrestore(&cwq->lock, flags); } -int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay) +int queue_delayed_work(struct workqueue_struct *wq, + struct work_struct *work, unsigned long delay) { int ret = 0, cpu = get_cpu(); struct timer_list *timer = &work->timer; @@ -107,18 +116,11 @@ int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, un BUG_ON(timer_pending(timer)); BUG_ON(!list_empty(&work->entry)); - /* - * Increase nr_queued so that the flush function - * knows that there's something pending. - */ - atomic_inc(&cwq->nr_queued); work->wq_data = cwq; - timer->expires = jiffies + delay; timer->data = (unsigned long)work; timer->function = delayed_work_timer_fn; add_timer(timer); - ret = 1; } put_cpu(); @@ -135,7 +137,8 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq) */ spin_lock_irqsave(&cwq->lock, flags); while (!list_empty(&cwq->worklist)) { - struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry); + struct work_struct *work = list_entry(cwq->worklist.next, + struct work_struct, entry); void (*f) (void *) = work->func; void *data = work->data; @@ -146,14 +149,9 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq) clear_bit(0, &work->pending); f(data); - /* - * We only wake up 'work done' waiters (flush) when - * the last function has been fully processed. - */ - if (atomic_dec_and_test(&cwq->nr_queued)) - wake_up(&cwq->work_done); - spin_lock_irqsave(&cwq->lock, flags); + cwq->remove_sequence++; + wake_up(&cwq->work_done); } spin_unlock_irqrestore(&cwq->lock, flags); } @@ -223,37 +221,41 @@ static int worker_thread(void *__startup) * Forces execution of the workqueue and blocks until its completion. * This is typically used in driver shutdown handlers. * - * NOTE: if work is being added to the queue constantly by some other - * context then this function might block indefinitely. + * This function will sample each workqueue's current insert_sequence number and + * will sleep until the head sequence is greater than or equal to that. This + * means that we sleep until all works which were queued on entry have been + * handled, but we are not livelocked by new incoming ones. + * + * This function used to run the workqueues itself. Now we just wait for the + * helper threads to do it. */ void flush_workqueue(struct workqueue_struct *wq) { struct cpu_workqueue_struct *cwq; int cpu; + might_sleep(); + for (cpu = 0; cpu < NR_CPUS; cpu++) { + DEFINE_WAIT(wait); + long sequence_needed; + if (!cpu_online(cpu)) continue; cwq = wq->cpu_wq + cpu; - if (atomic_read(&cwq->nr_queued)) { - DECLARE_WAITQUEUE(wait, current); - - if (!list_empty(&cwq->worklist)) - run_workqueue(cwq); - - /* - * Wait for helper thread(s) to finish up - * the queue: - */ - set_task_state(current, TASK_INTERRUPTIBLE); - add_wait_queue(&cwq->work_done, &wait); - if (atomic_read(&cwq->nr_queued)) - schedule(); - else - set_task_state(current, TASK_RUNNING); - remove_wait_queue(&cwq->work_done, &wait); + spin_lock_irq(&cwq->lock); + sequence_needed = cwq->insert_sequence; + + while (sequence_needed - cwq->remove_sequence > 0) { + prepare_to_wait(&cwq->work_done, &wait, + TASK_UNINTERRUPTIBLE); + spin_unlock_irq(&cwq->lock); + schedule(); + spin_lock_irq(&cwq->lock); } + finish_wait(&cwq->work_done, &wait); + spin_unlock_irq(&cwq->lock); } } @@ -279,7 +281,8 @@ struct workqueue_struct *create_workqueue(const char *name) spin_lock_init(&cwq->lock); cwq->wq = wq; cwq->thread = NULL; - atomic_set(&cwq->nr_queued, 0); + cwq->insert_sequence = 0; + cwq->remove_sequence = 0; INIT_LIST_HEAD(&cwq->worklist); init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->work_done); |
