patch-2.2.18 linux/net/sunrpc/sched.c
Next file: linux/net/sunrpc/stats.c
Previous file: linux/net/sunrpc/pmap_clnt.c
Back to the patch index
Back to the overall index
- Lines: 1185
- Date:
Fri Sep 15 22:10:44 2000
- Orig file:
v2.2.17/net/sunrpc/sched.c
- Orig date:
Sat Sep 9 18:42:51 2000
diff -u --new-file --recursive --exclude-from /usr/src/exclude v2.2.17/net/sunrpc/sched.c linux/net/sunrpc/sched.c
@@ -31,7 +31,7 @@
*/
#define GFP_RPC GFP_NFS
-static void __rpc_default_timer(struct rpc_task *task);
+static void rpc_default_timer(struct rpc_task *task);
static void rpciod_killall(void);
/*
@@ -68,12 +68,110 @@
static int rpc_inhibit = 0;
/*
+ * Spinlock for wait queues. Access to the latter has to be interrupt-safe
+ * since we want to wake up tasks from sk->write_space().
+ */
+spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
+
+/*
* This is the last-ditch buffer for NFS swap requests
*/
static u32 swap_buffer[PAGE_SIZE >> 2];
static int swap_buffer_used = 0;
/*
+ * Make allocation of the swap_buffer SMP-safe
+ */
+static __inline__ int rpc_lock_swapbuf(void)
+{
+ return !test_and_set_bit(1, &swap_buffer_used);
+}
+static __inline__ void rpc_unlock_swapbuf(void)
+{
+ clear_bit(1, &swap_buffer_used);
+}
+
+/*
+ * Disable the timer for a given RPC task. Should be called with
+ * rpc_queue_lock and bh_disabled in order to avoid races within
+ * rpc_run_timer().
+ */
+static inline void
+__rpc_disable_timer(struct rpc_task *task)
+{
+ dprintk("RPC: %4d disabling timer\n", task->tk_pid);
+ task->tk_timeout_fn = NULL;
+ task->tk_timeout = 0;
+}
+
+/*
+ * Run a timeout function.
+ * We use the callback in order to allow __rpc_wake_up_task()
+ * and friends to disable the timer synchronously on SMP systems
+ * without calling del_timer_sync(). The latter could cause a
+ * deadlock if called while we're holding spinlocks...
+ */
+static void
+rpc_run_timer(struct rpc_task *task)
+{
+ void (*callback)(struct rpc_task *);
+ unsigned long oldflags;
+
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ callback = task->tk_timeout_fn;
+ task->tk_timeout_fn = NULL;
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+ if (callback) {
+ dprintk("RPC: %4d running timer\n", task->tk_pid);
+ callback(task);
+ }
+}
+
+/*
+ * Set up a timer for the current task.
+ */
+static inline void
+__rpc_add_timer(struct rpc_task *task, rpc_action timer)
+{
+ if (!task->tk_timeout)
+ return;
+
+ dprintk("RPC: %4d setting alarm for %lu ms\n",
+ task->tk_pid, task->tk_timeout * 1000 / HZ);
+
+ if (timer)
+ task->tk_timeout_fn = timer;
+ else
+ task->tk_timeout_fn = rpc_default_timer;
+ mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
+}
+
+/*
+ * Set up a timer for an already sleeping task.
+ */
+void rpc_add_timer(struct rpc_task *task, rpc_action timer)
+{
+ unsigned long oldflags;
+
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ if (!(RPC_IS_RUNNING(task) || task->tk_wakeup))
+ __rpc_add_timer(task, timer);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+}
+
+/*
+ * Delete any timer for the current task.
+ */
+static inline void
+rpc_delete_timer(struct rpc_task *task)
+{
+ if (timer_pending(&task->tk_timer)) {
+ dprintk("RPC: %4d deleting timer\n", task->tk_pid);
+ del_timer(&task->tk_timer);
+ }
+}
+
+/*
* Add new request to wait queue.
*
* Swapper tasks always get inserted at the head of the queue.
@@ -81,16 +179,17 @@
* improve overall performance.
* Everyone else gets appended to the queue to ensure proper FIFO behavior.
*/
-int
-rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
+static inline int
+__rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
{
- if (task->tk_rpcwait) {
- if (task->tk_rpcwait != queue)
- {
- printk(KERN_WARNING "RPC: doubly enqueued task!\n");
- return -EWOULDBLOCK;
- }
+ if (task->tk_rpcwait == queue)
return 0;
+
+ if (task->tk_rpcwait) {
+ printk(KERN_WARNING "RPC: task already queued!\n");
+ dprintk("task already on %s, to be added to %s\n",
+ rpc_qname(task->tk_rpcwait), rpc_qname(queue));
+ return -EWOULDBLOCK;
}
if (RPC_IS_SWAPPER(task))
rpc_insert_list(&queue->task, task);
@@ -104,17 +203,30 @@
return 0;
}
+int
+rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
+{
+ unsigned long oldflags;
+ int result;
+
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ result = __rpc_add_wait_queue(q, task);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+ return result;
+}
+
/*
* Remove request from queue.
- * Note: must be called with interrupts disabled.
+ * Note: must be called with spin lock held.
*/
-void
-rpc_remove_wait_queue(struct rpc_task *task)
+static inline void
+__rpc_remove_wait_queue(struct rpc_task *task)
{
- struct rpc_wait_queue *queue;
+ struct rpc_wait_queue *queue = task->tk_rpcwait;
- if (!(queue = task->tk_rpcwait))
+ if (!queue)
return;
+
rpc_remove_list(&queue->task, task);
task->tk_rpcwait = NULL;
@@ -122,73 +234,63 @@
task->tk_pid, queue, rpc_qname(queue));
}
-/*
- * Set up a timer for the current task.
- */
-inline void
-rpc_add_timer(struct rpc_task *task, rpc_action timer)
+void
+rpc_remove_wait_queue(struct rpc_task *task)
{
- unsigned long expires = jiffies + task->tk_timeout;
-
- dprintk("RPC: %4d setting alarm for %lu ms\n",
- task->tk_pid, task->tk_timeout * 1000 / HZ);
- if (!timer)
- timer = __rpc_default_timer;
- if (time_before(expires, jiffies)) {
- printk(KERN_ERR "RPC: bad timeout value %ld - setting to 10 sec!\n",
- task->tk_timeout);
- expires = jiffies + 10 * HZ;
- }
- task->tk_timer.expires = expires;
- task->tk_timer.data = (unsigned long) task;
- task->tk_timer.function = (void (*)(unsigned long)) timer;
- task->tk_timer.prev = NULL;
- task->tk_timer.next = NULL;
- add_timer(&task->tk_timer);
-}
+ unsigned long oldflags;
-/*
- * Delete any timer for the current task.
- * Must be called with interrupts off.
- */
-inline void
-rpc_del_timer(struct rpc_task *task)
-{
- if (task->tk_timeout) {
- dprintk("RPC: %4d deleting timer\n", task->tk_pid);
- del_timer(&task->tk_timer);
- task->tk_timeout = 0;
- }
+ if (!task->tk_rpcwait)
+ return;
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ __rpc_remove_wait_queue(task);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
}
/*
* Make an RPC task runnable.
*
* Note: If the task is ASYNC, this must be called with
- * interrupts disabled to protect the wait queue operation.
+ * spin lock held in order to protect the wait queue operation.
*/
static inline void
-rpc_make_runnable(struct rpc_task *task)
+__rpc_make_runnable(struct rpc_task *task)
{
if (task->tk_timeout) {
printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n");
return;
}
- task->tk_flags |= RPC_TASK_RUNNING;
+ task->tk_running = 1;
if (RPC_IS_ASYNC(task)) {
- int status;
- status = rpc_add_wait_queue(&schedq, task);
- if (status)
- {
- printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
- task->tk_status = status;
+ if (RPC_IS_SLEEPING(task)) {
+ int status;
+ status = __rpc_add_wait_queue(&schedq, task);
+ if (status < 0) {
+ printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
+ task->tk_status = status;
+ } else
+ task->tk_sleeping = 0;
}
wake_up(&rpciod_idle);
} else {
+ task->tk_sleeping = 0;
wake_up(&task->tk_wait);
}
}
+/*
+ * Place a newly initialized task on the schedq.
+ */
+static inline void
+__rpc_schedule_run(struct rpc_task *task)
+{
+ /* Don't run a child twice! */
+ if (RPC_IS_ACTIVATED(task))
+ return;
+ task->tk_active = 1;
+ task->tk_sleeping = 1;
+ __rpc_make_runnable(task);
+}
+
/*
* For other people who may need to wake the I/O daemon
@@ -198,9 +300,7 @@
void rpciod_wake_up(void)
{
if(rpciod_pid==0)
- {
printk(KERN_ERR "rpciod: wot no daemon?\n");
- }
wake_up(&rpciod_idle);
}
@@ -214,33 +314,32 @@
__rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
rpc_action action, rpc_action timer)
{
- unsigned long oldflags;
int status;
dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
rpc_qname(q), jiffies);
- /*
- * Protect the execution below.
- */
- save_flags(oldflags); cli();
+ if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
+ printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
+ return;
+ }
+
+ /* Mark the task as being activated if so needed */
+ if (!RPC_IS_ACTIVATED(task)) {
+ task->tk_active = 1;
+ task->tk_sleeping = 1;
+ }
- status = rpc_add_wait_queue(q, task);
- if (status)
- {
+ status = __rpc_add_wait_queue(q, task);
+ if (status) {
printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
task->tk_status = status;
- task->tk_flags |= RPC_TASK_RUNNING;
- }
- else
- {
+ } else {
+ task->tk_running = 0;
task->tk_callback = action;
- if (task->tk_timeout)
- rpc_add_timer(task, timer);
- task->tk_flags &= ~RPC_TASK_RUNNING;
+ __rpc_add_timer(task, timer);
}
- restore_flags(oldflags);
return;
}
@@ -248,17 +347,39 @@
rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
rpc_action action, rpc_action timer)
{
+ unsigned long oldflags;
+
+ /*
+ * Protect the queue operations.
+ */
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
__rpc_sleep_on(q, task, action, timer);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+}
+
+void
+rpc_sleep_locked(struct rpc_wait_queue *q, struct rpc_task *task,
+ rpc_action action, rpc_action timer)
+{
+ unsigned long oldflags;
+
+ /*
+ * Protect the queue operations.
+ */
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ __rpc_sleep_on(q, task, action, timer);
+ __rpc_lock_task(task);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
}
/*
- * Wake up a single task -- must be invoked with bottom halves off.
+ * Wake up a single task -- must be invoked with spin lock held.
*
* It would probably suffice to cli/sti the del_timer and remove_wait_queue
* operations individually.
*/
-static void
-__rpc_wake_up(struct rpc_task *task)
+static inline void
+__rpc_wake_up_task(struct rpc_task *task)
{
dprintk("RPC: %4d __rpc_wake_up (now %ld inh %d)\n",
task->tk_pid, jiffies, rpc_inhibit);
@@ -267,16 +388,32 @@
if (task->tk_magic != 0xf00baa) {
printk(KERN_ERR "RPC: attempt to wake up non-existing task!\n");
rpc_debug = ~0;
+ rpc_show_tasks();
return;
}
#endif
- rpc_del_timer(task);
- if (task->tk_rpcwait != &schedq)
- rpc_remove_wait_queue(task);
- if (!RPC_IS_RUNNING(task)) {
- task->tk_flags |= RPC_TASK_CALLBACK;
- rpc_make_runnable(task);
+ /* Has the task been executed yet? If not, we cannot wake it up! */
+ if (!RPC_IS_ACTIVATED(task)) {
+ printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
+ return;
}
+ if (RPC_IS_RUNNING(task))
+ return;
+
+ __rpc_disable_timer(task);
+ if (task->tk_rpcwait != &schedq)
+ __rpc_remove_wait_queue(task);
+
+ /* If the task has been locked, then set tk_wakeup so that
+ * rpc_unlock_task() wakes us up... */
+ if (task->tk_lock) {
+ task->tk_wakeup = 1;
+ return;
+ } else
+ task->tk_wakeup = 0;
+
+ __rpc_make_runnable(task);
+
dprintk("RPC: __rpc_wake_up done\n");
}
@@ -284,12 +421,12 @@
* Default timeout handler if none specified by user
*/
static void
-__rpc_default_timer(struct rpc_task *task)
+rpc_default_timer(struct rpc_task *task)
{
- dprintk("RPC: %d timeout (default timer)\n", task->tk_pid);
+ dprintk("RPC: %4d timeout (default timer)\n", task->tk_pid);
task->tk_status = -ETIMEDOUT;
task->tk_timeout = 0;
- __rpc_wake_up(task);
+ rpc_wake_up_task(task);
}
/*
@@ -300,26 +437,38 @@
{
unsigned long oldflags;
- save_flags(oldflags); cli();
- __rpc_wake_up(task);
- restore_flags(oldflags);
+ if (RPC_IS_RUNNING(task))
+ return;
+
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ __rpc_wake_up_task(task);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
}
/*
* Wake up the next task on the wait queue.
*/
-struct rpc_task *
-rpc_wake_up_next(struct rpc_wait_queue *queue)
+static inline struct rpc_task *
+__rpc_wake_up_next(struct rpc_wait_queue *queue)
{
- unsigned long oldflags;
struct rpc_task *task;
dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
- save_flags(oldflags); cli();
if ((task = queue->task) != 0)
- __rpc_wake_up(task);
- restore_flags(oldflags);
+ __rpc_wake_up_task(task);
+
+ return task;
+}
+
+struct rpc_task *
+rpc_wake_up_next(struct rpc_wait_queue *queue)
+{
+ struct rpc_task *task;
+ unsigned long oldflags;
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ task = __rpc_wake_up_next(queue);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
return task;
}
@@ -331,10 +480,10 @@
{
unsigned long oldflags;
- save_flags(oldflags); cli();
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
while (queue->task)
- __rpc_wake_up(queue->task);
- restore_flags(oldflags);
+ __rpc_wake_up_task(queue->task);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
}
/*
@@ -346,30 +495,63 @@
struct rpc_task *task;
unsigned long oldflags;
- save_flags(oldflags); cli();
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
while ((task = queue->task) != NULL) {
task->tk_status = status;
- __rpc_wake_up(task);
+ __rpc_wake_up_task(task);
}
- restore_flags(oldflags);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+}
+
+/*
+ * Lock down a sleeping task to prevent it from waking up
+ * and disappearing from beneath us.
+ *
+ * This function should always be called with the
+ * rpc_queue_lock held.
+ */
+int
+__rpc_lock_task(struct rpc_task *task)
+{
+ if (!RPC_IS_RUNNING(task))
+ return ++task->tk_lock;
+ return 0;
+}
+
+static inline void
+__rpc_unlock_task(struct rpc_task *task)
+{
+ if (task->tk_lock && !--task->tk_lock && task->tk_wakeup)
+ __rpc_wake_up_task(task);
+}
+
+void
+rpc_unlock_task(struct rpc_task *task)
+{
+ unsigned long oldflags;
+
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ __rpc_unlock_task(task);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
}
/*
* Run a task at a later time
*/
-static void __rpc_atrun(struct rpc_task *);
+static void rpc_atrun(struct rpc_task *);
void
rpc_delay(struct rpc_task *task, unsigned long delay)
{
task->tk_timeout = delay;
- rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
+ rpc_sleep_on(&delay_queue, task, NULL, rpc_atrun);
}
static void
-__rpc_atrun(struct rpc_task *task)
+rpc_atrun(struct rpc_task *task)
{
task->tk_status = 0;
- __rpc_wake_up(task);
+ task->tk_timeout = 0;
+ rpc_wake_up_task(task);
}
/*
@@ -389,15 +571,15 @@
return 0;
}
+restarted:
while (1) {
/*
* Execute any pending callback.
*/
- if (task->tk_flags & RPC_TASK_CALLBACK) {
+ if (RPC_DO_CALLBACK(task)) {
/* Define a callback save pointer */
void (*save_callback)(struct rpc_task *);
- task->tk_flags &= ~RPC_TASK_CALLBACK;
/*
* If a callback exists, save it, reset it,
* call it.
@@ -405,11 +587,9 @@
* another callback set within the callback handler
* - Dave
*/
- if (task->tk_callback) {
- save_callback=task->tk_callback;
- task->tk_callback=NULL;
- save_callback(task);
- }
+ save_callback=task->tk_callback;
+ task->tk_callback=NULL;
+ save_callback(task);
}
/*
@@ -418,6 +598,10 @@
* by someone else.
*/
if (RPC_IS_RUNNING(task)) {
+ /*
+ * Garbage collection of pending timers...
+ */
+ rpc_delete_timer(task);
if (!task->tk_action)
break;
task->tk_action(task);
@@ -425,86 +609,98 @@
/*
* Check whether task is sleeping.
- * Note that if the task may go to sleep in tk_action,
+ * Note that if the task goes to sleep in tk_action,
* and the RPC reply arrives before we get here, it will
* have state RUNNING, but will still be on schedq.
+ * 27/9/99: The above has been attempted fixed by
+ * introduction of task->tk_sleeping.
*/
- save_flags(oldflags); cli();
- if (RPC_IS_RUNNING(task)) {
- if (task->tk_rpcwait == &schedq)
- rpc_remove_wait_queue(task);
- } else while (!RPC_IS_RUNNING(task)) {
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ if (!RPC_IS_RUNNING(task)) {
+ task->tk_sleeping = 1;
if (RPC_IS_ASYNC(task)) {
- restore_flags(oldflags);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
return 0;
}
+ } else
+ task->tk_sleeping = 0;
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+ while (RPC_IS_SLEEPING(task)) {
/* sync task: sleep here */
dprintk("RPC: %4d sync task going to sleep\n",
task->tk_pid);
- if (current->pid == rpciod_pid)
- printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
+ if (current->pid == rpciod_pid) {
+ printk(KERN_ERR "RPC: rpciod waiting on sync task %4d!\n", task->tk_pid);
+ rpc_show_tasks();
+ }
- __wait_event(task->tk_wait, RPC_IS_RUNNING(task));
+ __wait_event(task->tk_wait, !RPC_IS_SLEEPING(task));
+ dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
/*
- * When the task received a signal, remove from
- * any queues etc, and make runnable again.
+ * When a sync task receives a signal, it exits with
+ * -ERESTARTSYS. In order to catch any callbacks that
+ * clean up after sleeping on some queue, we don't
+ * break the loop here, but go around once more.
*/
- if (signalled()) {
- cli();
- __rpc_wake_up(task);
+ if (task->tk_client->cl_intr && signalled()) {
+ dprintk("RPC: %4d got signal\n", task->tk_pid);
+ task->tk_flags |= RPC_TASK_KILLED;
+ rpc_exit(task, -ERESTARTSYS);
+ rpc_wake_up_task(task);
}
-
- dprintk("RPC: %4d sync task resuming\n",
- task->tk_pid);
- }
- restore_flags(oldflags);
-
- /*
- * When a sync task receives a signal, it exits with
- * -ERESTARTSYS. In order to catch any callbacks that
- * clean up after sleeping on some queue, we don't
- * break the loop here, but go around once more.
- */
- if (!RPC_IS_ASYNC(task) && signalled()) {
- dprintk("RPC: %4d got signal\n", task->tk_pid);
- rpc_exit(task, -ERESTARTSYS);
}
}
dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
if (task->tk_exit) {
- status = task->tk_status;
task->tk_exit(task);
+ /* If tk_action is non-null, the user wants us to restart */
+ if (task->tk_action) {
+ if (!RPC_ASSASSINATED(task)) {
+ /* Release RPC slot and buffer memory */
+ if (task->tk_rqstp)
+ xprt_release(task);
+ if (task->tk_buffer) {
+ rpc_free(task->tk_buffer);
+ task->tk_buffer = NULL;
+ }
+ goto restarted;
+ }
+ printk(KERN_ERR "RPC: dead task tries to walk away.\n");
+ }
}
+ /* Save the task exit status */
+ status = task->tk_status;
+
+ /* Release all resources associated with the task */
+ rpc_release_task(task);
return status;
}
/*
* User-visible entry point to the scheduler.
- * The recursion protection is for debugging. It should go away once
- * the code has stabilized.
+ *
+ * This may be called recursively if e.g. an async NFS task updates
+ * the attributes and finds that dirty pages must be flushed.
*/
-void
+int
rpc_execute(struct rpc_task *task)
{
- static int executing = 0;
- int incr = RPC_IS_ASYNC(task)? 1 : 0;
-
- if (incr) {
- if (rpc_inhibit) {
- printk(KERN_INFO "RPC: execution inhibited!\n");
- return;
- }
- if (executing)
- printk(KERN_WARNING "RPC: %d tasks executed\n", executing);
+ if (rpc_inhibit) {
+ printk(KERN_INFO "RPC: execution inhibited!\n");
+ return -EIO;
+ }
+ task->tk_running = 1;
+ if (task->tk_active) {
+ printk(KERN_ERR "RPC: active task was run twice!\n");
+ return -EWOULDBLOCK;
}
+ task->tk_active = 1;
- executing += incr;
- __rpc_execute(task);
- executing -= incr;
+ return __rpc_execute(task);
}
/*
@@ -514,31 +710,36 @@
__rpc_schedule(void)
{
struct rpc_task *task;
- int count = 0;
unsigned long oldflags;
- int need_resched = current->need_resched;
+ int count = 0;
dprintk("RPC: rpc_schedule enter\n");
- save_flags(oldflags);
while (1) {
- cli();
- if (!(task = schedq.task))
+ /* Ensure equal rights for tcp tasks... */
+ rpciod_tcp_dispatcher();
+
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ if (!(task = schedq.task)) {
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
break;
- rpc_del_timer(task);
- rpc_remove_wait_queue(task);
- task->tk_flags |= RPC_TASK_RUNNING;
- restore_flags(oldflags);
+ }
+ if (task->tk_lock) {
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+ printk(KERN_ERR "RPC: Locked task was scheduled !!!!\n");
+ rpc_debug = ~0;
+ rpc_show_tasks();
+ break;
+ }
+ __rpc_remove_wait_queue(task);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
__rpc_execute(task);
- if (++count >= 200) {
+ if (++count >= 200 || current->need_resched) {
+ schedule();
count = 0;
- need_resched = 1;
}
- if (need_resched)
- schedule();
}
- restore_flags(oldflags);
dprintk("RPC: rpc_schedule leave\n");
}
@@ -579,7 +780,8 @@
dprintk("RPC: allocated buffer %p\n", buffer);
return buffer;
}
- if ((flags & RPC_TASK_SWAPPER) && !swap_buffer_used++) {
+ if ((flags & RPC_TASK_SWAPPER) && size <= sizeof(swap_buffer)
+ && rpc_lock_swapbuf()) {
dprintk("RPC: used last-ditch swap buffer\n");
return swap_buffer;
}
@@ -587,6 +789,7 @@
return NULL;
current->state = TASK_INTERRUPTIBLE;
schedule_timeout(HZ>>4);
+ current->state = TASK_RUNNING;
} while (!signalled());
return NULL;
@@ -599,20 +802,24 @@
kfree(buffer);
return;
}
- swap_buffer_used = 0;
+ rpc_unlock_swapbuf();
}
/*
* Creation and deletion of RPC task structures
*/
-inline void
+void
rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
rpc_action callback, int flags)
{
memset(task, 0, sizeof(*task));
+ init_timer(&task->tk_timer);
+ task->tk_timer.data = (unsigned long) task;
+ task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
+
task->tk_client = clnt;
- task->tk_flags = RPC_TASK_RUNNING | flags;
task->tk_exit = callback;
+ task->tk_flags = flags;
if (current->uid != current->fsuid || current->gid != current->fsgid)
task->tk_flags |= RPC_TASK_SETUID;
@@ -621,6 +828,11 @@
task->tk_cred_retry = 2;
task->tk_suid_retry = 1;
+#ifdef RPC_DEBUG
+ task->tk_magic = 0xf00baa;
+ task->tk_pid = rpc_task_id++;
+#endif
+
/* Add to global list of all tasks */
task->tk_next_task = all_tasks;
task->tk_prev_task = NULL;
@@ -629,14 +841,11 @@
all_tasks = task;
if (clnt)
- clnt->cl_users++;
+ atomic_inc(&clnt->cl_users);
-#ifdef RPC_DEBUG
- task->tk_magic = 0xf00baa;
- task->tk_pid = rpc_task_id++;
-#endif
- dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
- current->pid);
+ dprintk("RPC: %d new task procpid %d%s\n",
+ task->tk_pid, current->pid,
+ (flags & RPC_TASK_DYNAMIC) ? " (alloc)" : "");
}
/*
@@ -653,10 +862,7 @@
if (!task)
goto cleanup;
- rpc_init_task(task, clnt, callback, flags);
-
- dprintk("RPC: %4d allocated task\n", task->tk_pid);
- task->tk_flags |= RPC_TASK_DYNAMIC;
+ rpc_init_task(task, clnt, callback, flags | RPC_TASK_DYNAMIC);
out:
return task;
@@ -664,8 +870,8 @@
/* Check whether to release the client */
if (clnt) {
printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
- clnt->cl_users, clnt->cl_oneshot);
- clnt->cl_users++; /* pretend we were used ... */
+ atomic_read(&clnt->cl_users), clnt->cl_oneshot);
+ atomic_inc(&clnt->cl_users); /* pretend we were used ... */
rpc_release_client(clnt);
}
goto out;
@@ -675,9 +881,19 @@
rpc_release_task(struct rpc_task *task)
{
struct rpc_task *next, *prev;
+ unsigned long oldflags;
dprintk("RPC: %4d release task\n", task->tk_pid);
+#ifdef RPC_DEBUG
+ if (task->tk_magic != 0xf00baa) {
+ printk(KERN_ERR "RPC: attempt to release a non-existing task!\n");
+ rpc_debug = ~0;
+ rpc_show_tasks();
+ return;
+ }
+#endif
+
/* Remove from global task list */
prev = task->tk_prev_task;
next = task->tk_next_task;
@@ -687,12 +903,29 @@
prev->tk_next_task = next;
else
all_tasks = next;
+ task->tk_next_task = task->tk_prev_task = NULL;
+
+ /* Protect the execution below. */
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+
+ /* Disable timer to prevent zombie wakeup */
+ __rpc_disable_timer(task);
+
+ /* Remove from any wait queue we're still on */
+ __rpc_remove_wait_queue(task);
+
+ task->tk_active = 0;
+
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+
+ /* Synchronously delete any running timer */
+ rpc_delete_timer(task);
/* Release resources */
if (task->tk_rqstp)
xprt_release(task);
- if (task->tk_cred)
- rpcauth_releasecred(task);
+ if (task->tk_msg.rpc_cred)
+ rpcauth_unbindcred(task);
if (task->tk_buffer) {
rpc_free(task->tk_buffer);
task->tk_buffer = NULL;
@@ -708,9 +941,12 @@
if (task->tk_flags & RPC_TASK_DYNAMIC) {
dprintk("RPC: %4d freeing task\n", task->tk_pid);
+ if (task->tk_release)
+ task->tk_release(task);
task->tk_flags &= ~RPC_TASK_DYNAMIC;
rpc_free(task);
- }
+ } else if (task->tk_release)
+ task->tk_release(task);
}
/*
@@ -719,14 +955,18 @@
* parent task may already have gone away
*/
static inline struct rpc_task *
-rpc_find_parent(struct rpc_task *child)
+__rpc_find_parent(struct rpc_task *child)
{
- struct rpc_task *temp, *parent;
+ struct rpc_task *head, *parent;
- parent = (struct rpc_task *) child->tk_calldata;
- for (temp = childq.task; temp; temp = temp->tk_next) {
- if (temp == parent)
- return parent;
+ parent = child->tk_parent;
+ if ((head = childq.task) != NULL) {
+ struct rpc_task *task = head;
+ do {
+ if (task == parent)
+ return parent;
+ task = task->tk_next;
+ } while (task != head);
}
return NULL;
}
@@ -735,12 +975,14 @@
rpc_child_exit(struct rpc_task *child)
{
struct rpc_task *parent;
+ unsigned long oldflags;
- if ((parent = rpc_find_parent(child)) != NULL) {
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ if ((parent = __rpc_find_parent(child)) != NULL) {
parent->tk_status = child->tk_status;
- rpc_wake_up_task(parent);
+ __rpc_wake_up_task(parent);
}
- rpc_release_task(child);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
}
/*
@@ -755,7 +997,7 @@
if (!task)
goto fail;
task->tk_exit = rpc_child_exit;
- task->tk_calldata = parent;
+ task->tk_parent = parent;
return task;
fail:
@@ -766,13 +1008,13 @@
void
rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
{
- unsigned long oldflags;
+ unsigned long oldflags;
- save_flags(oldflags); cli();
- rpc_make_runnable(child);
- restore_flags(oldflags);
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
/* N.B. Is it possible for the child to have already finished? */
- rpc_sleep_on(&childq, task, func, NULL);
+ __rpc_sleep_on(&childq, task, func, NULL);
+ __rpc_schedule_run(child);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
}
/*
@@ -785,8 +1027,9 @@
struct rpc_task **q, *rovr;
dprintk("RPC: killing all tasks for client %p\n", clnt);
- /* N.B. Why bother to inhibit? Nothing blocks here ... */
- rpc_inhibit++;
+ /*
+ * Spin lock all_tasks to prevent changes...
+ */
for (q = &all_tasks; (rovr = *q); q = &rovr->tk_next_task) {
if (!clnt || rovr->tk_client == clnt) {
rovr->tk_flags |= RPC_TASK_KILLED;
@@ -794,11 +1037,16 @@
rpc_wake_up_task(rovr);
}
}
- rpc_inhibit--;
}
static struct semaphore rpciod_running = MUTEX_LOCKED;
+static inline int
+rpciod_task_pending(void)
+{
+ return schedq.task != NULL || xprt_tcp_pending();
+}
+
/*
* This is the rpciod kernel thread
*/
@@ -806,11 +1054,12 @@
rpciod(void *ptr)
{
struct wait_queue **assassin = (struct wait_queue **) ptr;
- unsigned long oldflags;
int rounds = 0;
MOD_INC_USE_COUNT;
+
lock_kernel();
+
/*
* Let our maker know we're running ...
*/
@@ -837,22 +1086,17 @@
}
__rpc_schedule();
- if (++rounds >= 64) { /* safeguard */
+ if (++rounds >= 64 || current->need_resched) { /* safeguard */
schedule();
rounds = 0;
}
- save_flags(oldflags); cli();
- dprintk("RPC: rpciod running checking dispatch\n");
- rpciod_tcp_dispatcher();
- if (!schedq.task) {
+ if (!rpciod_task_pending()) {
dprintk("RPC: rpciod back to sleep\n");
- interruptible_sleep_on(&rpciod_idle);
+ wait_event_interruptible(rpciod_idle, rpciod_task_pending());
dprintk("RPC: switch to rpciod\n");
- rpciod_tcp_dispatcher();
rounds = 0;
}
- restore_flags(oldflags);
}
dprintk("RPC: rpciod shutdown commences\n");
@@ -882,6 +1126,7 @@
dprintk("rpciod_killall: waiting for tasks to exit\n");
current->state = TASK_INTERRUPTIBLE;
schedule_timeout(1);
+ current->state = TASK_RUNNING;
}
}
@@ -953,6 +1198,7 @@
current->sigpending = 0;
current->state = TASK_INTERRUPTIBLE;
schedule_timeout(1);
+ current->state = TASK_RUNNING;
/*
* Display a message if we're going to wait longer.
*/
@@ -972,36 +1218,23 @@
MOD_DEC_USE_COUNT;
}
-#ifdef RPC_DEBUG
-#include <linux/nfs_fs.h>
void rpc_show_tasks(void)
{
struct rpc_task *t = all_tasks, *next;
- struct nfs_wreq *wreq;
- if (!t)
+ t = all_tasks;
+ if (!t) {
return;
+ }
printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
"-rpcwait -action- --exit--\n");
for (; t; t = next) {
next = t->tk_next_task;
printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
- t->tk_pid, t->tk_proc, t->tk_flags, t->tk_status,
+ t->tk_pid, t->tk_msg.rpc_proc, t->tk_flags, t->tk_status,
t->tk_client, t->tk_client->cl_prog,
t->tk_rqstp, t->tk_timeout,
t->tk_rpcwait ? rpc_qname(t->tk_rpcwait) : " <NULL> ",
t->tk_action, t->tk_exit);
-
- if (!(t->tk_flags & RPC_TASK_NFSWRITE))
- continue;
- /* NFS write requests */
- wreq = (struct nfs_wreq *) t->tk_calldata;
- printk(" NFS: flgs=%08x, pid=%d, pg=%p, off=(%d, %d)\n",
- wreq->wb_flags, wreq->wb_pid, wreq->wb_page,
- wreq->wb_offset, wreq->wb_bytes);
- printk(" name=%s/%s\n",
- wreq->wb_file->f_dentry->d_parent->d_name.name,
- wreq->wb_file->f_dentry->d_name.name);
}
}
-#endif
FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)