summaryrefslogtreecommitdiff
path: root/kernel
diff options
context:
space:
mode:
authorAndrew Morton <akpm@digeo.com>2003-04-14 06:09:45 -0700
committerLinus Torvalds <torvalds@home.transmeta.com>2003-04-14 06:09:45 -0700
commit92b817f8d42ef76fded6294554c3bd7fc94b955c (patch)
tree6245c75ffde6e5a6622bcb683b7e16b83d2dcc28 /kernel
parentec7708a234f332be15443b6fe953f55441b22814 (diff)
[PATCH] flush_work_queue() fixes
The workqueue code currently has a notion of a per-cpu queue being "busy". flush_scheduled_work()'s responsibility is to wait for a queue to be not busy. Problem is, flush_scheduled_work() can easily hang up. - The workqueue is deemed "busy" when there are pending delayed (timer-based) works. But if someone repeatedly schedules new delayed work in the callback, the queue will never fall idle, and flush_scheduled_work() will not terminate. - If someone reschedules work (not delayed work) in the work function, that too will cause the queue to never go idle, and flush_scheduled_work() will not terminate. So what this patch does is: - Create a new "cancel_delayed_work()" which will try to kill off any timer-based delayed works. - Change flush_scheduled_work() so that it is immune to people re-adding work in the work callout handler. We can do this by recognising that the caller does *not* want to wait until the workqueue is "empty". The caller merely wants to wait until all works which were pending at the time flush_scheduled_work() was called have completed. The patch uses a couple of sequence numbers for that. So now, if someone wants to reliably remove delayed work they should do: /* * Make sure that my work-callback will no longer schedule new work */ my_driver_is_shutting_down = 1; /* * Kill off any pending delayed work */ cancel_delayed_work(&my_work); /* * OK, there will be no new works scheduled. But there may be one * currently queued or in progress. So wait for that to complete. */ flush_scheduled_work(); The patch also changes the flush_workqueue() sleep to be uninterruptible. We cannot legally bale out if a signal is delivered anyway.
Diffstat (limited to 'kernel')
-rw-r--r--kernel/workqueue.c85
1 files changed, 44 insertions, 41 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 5e9a520cfd77..968837218657 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -27,13 +27,21 @@
#include <linux/slab.h>
/*
- * The per-CPU workqueue:
+ * The per-CPU workqueue.
+ *
+ * The sequence counters are for flush_scheduled_work(). It wants to wait
+ * until until all currently-scheduled works are completed, but it doesn't
+ * want to be livelocked by new, incoming ones. So it waits until
+ * remove_sequence is >= the insert_sequence which pertained when
+ * flush_scheduled_work() was called.
*/
struct cpu_workqueue_struct {
spinlock_t lock;
- atomic_t nr_queued;
+ long remove_sequence; /* Least-recently added (next to run) */
+ long insert_sequence; /* Next to add */
+
struct list_head worklist;
wait_queue_head_t more_work;
wait_queue_head_t work_done;
@@ -71,10 +79,9 @@ int queue_work(struct workqueue_struct *wq, struct work_struct *work)
spin_lock_irqsave(&cwq->lock, flags);
list_add_tail(&work->entry, &cwq->worklist);
- atomic_inc(&cwq->nr_queued);
- spin_unlock_irqrestore(&cwq->lock, flags);
-
+ cwq->insert_sequence++;
wake_up(&cwq->more_work);
+ spin_unlock_irqrestore(&cwq->lock, flags);
ret = 1;
}
put_cpu();
@@ -93,11 +100,13 @@ static void delayed_work_timer_fn(unsigned long __data)
*/
spin_lock_irqsave(&cwq->lock, flags);
list_add_tail(&work->entry, &cwq->worklist);
+ cwq->insert_sequence++;
wake_up(&cwq->more_work);
spin_unlock_irqrestore(&cwq->lock, flags);
}
-int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay)
+int queue_delayed_work(struct workqueue_struct *wq,
+ struct work_struct *work, unsigned long delay)
{
int ret = 0, cpu = get_cpu();
struct timer_list *timer = &work->timer;
@@ -107,18 +116,11 @@ int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, un
BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry));
- /*
- * Increase nr_queued so that the flush function
- * knows that there's something pending.
- */
- atomic_inc(&cwq->nr_queued);
work->wq_data = cwq;
-
timer->expires = jiffies + delay;
timer->data = (unsigned long)work;
timer->function = delayed_work_timer_fn;
add_timer(timer);
-
ret = 1;
}
put_cpu();
@@ -135,7 +137,8 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
*/
spin_lock_irqsave(&cwq->lock, flags);
while (!list_empty(&cwq->worklist)) {
- struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry);
+ struct work_struct *work = list_entry(cwq->worklist.next,
+ struct work_struct, entry);
void (*f) (void *) = work->func;
void *data = work->data;
@@ -146,14 +149,9 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
clear_bit(0, &work->pending);
f(data);
- /*
- * We only wake up 'work done' waiters (flush) when
- * the last function has been fully processed.
- */
- if (atomic_dec_and_test(&cwq->nr_queued))
- wake_up(&cwq->work_done);
-
spin_lock_irqsave(&cwq->lock, flags);
+ cwq->remove_sequence++;
+ wake_up(&cwq->work_done);
}
spin_unlock_irqrestore(&cwq->lock, flags);
}
@@ -223,37 +221,41 @@ static int worker_thread(void *__startup)
* Forces execution of the workqueue and blocks until its completion.
* This is typically used in driver shutdown handlers.
*
- * NOTE: if work is being added to the queue constantly by some other
- * context then this function might block indefinitely.
+ * This function will sample each workqueue's current insert_sequence number and
+ * will sleep until the head sequence is greater than or equal to that. This
+ * means that we sleep until all works which were queued on entry have been
+ * handled, but we are not livelocked by new incoming ones.
+ *
+ * This function used to run the workqueues itself. Now we just wait for the
+ * helper threads to do it.
*/
void flush_workqueue(struct workqueue_struct *wq)
{
struct cpu_workqueue_struct *cwq;
int cpu;
+ might_sleep();
+
for (cpu = 0; cpu < NR_CPUS; cpu++) {
+ DEFINE_WAIT(wait);
+ long sequence_needed;
+
if (!cpu_online(cpu))
continue;
cwq = wq->cpu_wq + cpu;
- if (atomic_read(&cwq->nr_queued)) {
- DECLARE_WAITQUEUE(wait, current);
-
- if (!list_empty(&cwq->worklist))
- run_workqueue(cwq);
-
- /*
- * Wait for helper thread(s) to finish up
- * the queue:
- */
- set_task_state(current, TASK_INTERRUPTIBLE);
- add_wait_queue(&cwq->work_done, &wait);
- if (atomic_read(&cwq->nr_queued))
- schedule();
- else
- set_task_state(current, TASK_RUNNING);
- remove_wait_queue(&cwq->work_done, &wait);
+ spin_lock_irq(&cwq->lock);
+ sequence_needed = cwq->insert_sequence;
+
+ while (sequence_needed - cwq->remove_sequence > 0) {
+ prepare_to_wait(&cwq->work_done, &wait,
+ TASK_UNINTERRUPTIBLE);
+ spin_unlock_irq(&cwq->lock);
+ schedule();
+ spin_lock_irq(&cwq->lock);
}
+ finish_wait(&cwq->work_done, &wait);
+ spin_unlock_irq(&cwq->lock);
}
}
@@ -279,7 +281,8 @@ struct workqueue_struct *create_workqueue(const char *name)
spin_lock_init(&cwq->lock);
cwq->wq = wq;
cwq->thread = NULL;
- atomic_set(&cwq->nr_queued, 0);
+ cwq->insert_sequence = 0;
+ cwq->remove_sequence = 0;
INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work);
init_waitqueue_head(&cwq->work_done);