diff options
| author | Andrew Morton <akpm@digeo.com> | 2003-04-14 06:09:45 -0700 |
|---|---|---|
| committer | Linus Torvalds <torvalds@home.transmeta.com> | 2003-04-14 06:09:45 -0700 |
| commit | 92b817f8d42ef76fded6294554c3bd7fc94b955c (patch) | |
| tree | 6245c75ffde6e5a6622bcb683b7e16b83d2dcc28 /kernel | |
| parent | ec7708a234f332be15443b6fe953f55441b22814 (diff) | |
[PATCH] flush_work_queue() fixes
The workqueue code currently has a notion of a per-cpu queue being "busy".
flush_scheduled_work()'s responsibility is to wait for a queue to be not busy.
Problem is, flush_scheduled_work() can easily hang up.
- The workqueue is deemed "busy" when there are pending delayed
(timer-based) works. But if someone repeatedly schedules new delayed work
in the callback, the queue will never fall idle, and flush_scheduled_work()
will not terminate.
- If someone reschedules work (not delayed work) in the work function, that
too will cause the queue to never go idle, and flush_scheduled_work() will
not terminate.
So what this patch does is:
- Create a new "cancel_delayed_work()" which will try to kill off any
timer-based delayed works.
- Change flush_scheduled_work() so that it is immune to people re-adding
work in the work callout handler.
We can do this by recognising that the caller does *not* want to wait
until the workqueue is "empty". The caller merely wants to wait until all
works which were pending at the time flush_scheduled_work() was called have
completed.
The patch uses a couple of sequence numbers for that.
So now, if someone wants to reliably remove delayed work they should do:
/*
* Make sure that my work-callback will no longer schedule new work
*/
my_driver_is_shutting_down = 1;
/*
* Kill off any pending delayed work
*/
cancel_delayed_work(&my_work);
/*
* OK, there will be no new works scheduled. But there may be one
* currently queued or in progress. So wait for that to complete.
*/
flush_scheduled_work();
The patch also changes the flush_workqueue() sleep to be uninterruptible.
We cannot legally bale out if a signal is delivered anyway.
Diffstat (limited to 'kernel')
| -rw-r--r-- | kernel/workqueue.c | 85 |
1 files changed, 44 insertions, 41 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c index 5e9a520cfd77..968837218657 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -27,13 +27,21 @@ #include <linux/slab.h> /* - * The per-CPU workqueue: + * The per-CPU workqueue. + * + * The sequence counters are for flush_scheduled_work(). It wants to wait + * until until all currently-scheduled works are completed, but it doesn't + * want to be livelocked by new, incoming ones. So it waits until + * remove_sequence is >= the insert_sequence which pertained when + * flush_scheduled_work() was called. */ struct cpu_workqueue_struct { spinlock_t lock; - atomic_t nr_queued; + long remove_sequence; /* Least-recently added (next to run) */ + long insert_sequence; /* Next to add */ + struct list_head worklist; wait_queue_head_t more_work; wait_queue_head_t work_done; @@ -71,10 +79,9 @@ int queue_work(struct workqueue_struct *wq, struct work_struct *work) spin_lock_irqsave(&cwq->lock, flags); list_add_tail(&work->entry, &cwq->worklist); - atomic_inc(&cwq->nr_queued); - spin_unlock_irqrestore(&cwq->lock, flags); - + cwq->insert_sequence++; wake_up(&cwq->more_work); + spin_unlock_irqrestore(&cwq->lock, flags); ret = 1; } put_cpu(); @@ -93,11 +100,13 @@ static void delayed_work_timer_fn(unsigned long __data) */ spin_lock_irqsave(&cwq->lock, flags); list_add_tail(&work->entry, &cwq->worklist); + cwq->insert_sequence++; wake_up(&cwq->more_work); spin_unlock_irqrestore(&cwq->lock, flags); } -int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay) +int queue_delayed_work(struct workqueue_struct *wq, + struct work_struct *work, unsigned long delay) { int ret = 0, cpu = get_cpu(); struct timer_list *timer = &work->timer; @@ -107,18 +116,11 @@ int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, un BUG_ON(timer_pending(timer)); BUG_ON(!list_empty(&work->entry)); - /* - * Increase nr_queued so that the flush function - * knows that there's something pending. - */ - atomic_inc(&cwq->nr_queued); work->wq_data = cwq; - timer->expires = jiffies + delay; timer->data = (unsigned long)work; timer->function = delayed_work_timer_fn; add_timer(timer); - ret = 1; } put_cpu(); @@ -135,7 +137,8 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq) */ spin_lock_irqsave(&cwq->lock, flags); while (!list_empty(&cwq->worklist)) { - struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry); + struct work_struct *work = list_entry(cwq->worklist.next, + struct work_struct, entry); void (*f) (void *) = work->func; void *data = work->data; @@ -146,14 +149,9 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq) clear_bit(0, &work->pending); f(data); - /* - * We only wake up 'work done' waiters (flush) when - * the last function has been fully processed. - */ - if (atomic_dec_and_test(&cwq->nr_queued)) - wake_up(&cwq->work_done); - spin_lock_irqsave(&cwq->lock, flags); + cwq->remove_sequence++; + wake_up(&cwq->work_done); } spin_unlock_irqrestore(&cwq->lock, flags); } @@ -223,37 +221,41 @@ static int worker_thread(void *__startup) * Forces execution of the workqueue and blocks until its completion. * This is typically used in driver shutdown handlers. * - * NOTE: if work is being added to the queue constantly by some other - * context then this function might block indefinitely. + * This function will sample each workqueue's current insert_sequence number and + * will sleep until the head sequence is greater than or equal to that. This + * means that we sleep until all works which were queued on entry have been + * handled, but we are not livelocked by new incoming ones. + * + * This function used to run the workqueues itself. Now we just wait for the + * helper threads to do it. */ void flush_workqueue(struct workqueue_struct *wq) { struct cpu_workqueue_struct *cwq; int cpu; + might_sleep(); + for (cpu = 0; cpu < NR_CPUS; cpu++) { + DEFINE_WAIT(wait); + long sequence_needed; + if (!cpu_online(cpu)) continue; cwq = wq->cpu_wq + cpu; - if (atomic_read(&cwq->nr_queued)) { - DECLARE_WAITQUEUE(wait, current); - - if (!list_empty(&cwq->worklist)) - run_workqueue(cwq); - - /* - * Wait for helper thread(s) to finish up - * the queue: - */ - set_task_state(current, TASK_INTERRUPTIBLE); - add_wait_queue(&cwq->work_done, &wait); - if (atomic_read(&cwq->nr_queued)) - schedule(); - else - set_task_state(current, TASK_RUNNING); - remove_wait_queue(&cwq->work_done, &wait); + spin_lock_irq(&cwq->lock); + sequence_needed = cwq->insert_sequence; + + while (sequence_needed - cwq->remove_sequence > 0) { + prepare_to_wait(&cwq->work_done, &wait, + TASK_UNINTERRUPTIBLE); + spin_unlock_irq(&cwq->lock); + schedule(); + spin_lock_irq(&cwq->lock); } + finish_wait(&cwq->work_done, &wait); + spin_unlock_irq(&cwq->lock); } } @@ -279,7 +281,8 @@ struct workqueue_struct *create_workqueue(const char *name) spin_lock_init(&cwq->lock); cwq->wq = wq; cwq->thread = NULL; - atomic_set(&cwq->nr_queued, 0); + cwq->insert_sequence = 0; + cwq->remove_sequence = 0; INIT_LIST_HEAD(&cwq->worklist); init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->work_done); |
