From: Anton Blanchard <anton@samba.org>

> Anton, the ia32 semaphores will, when down() fails, leave extra refs
> against the semaphore's count when extra tasks are waiting.  I think.  So
> if ls is waiting for the sem, and tiobench releases it and then quickly
> tries to reacquire it, tiobench will see the negative count in the
> semaphore due to ls's presence and will enter the slow path.  What we want
> to do there is to make sure that tiobench will go to sleep rather than
> zooming in and stealing the semaphore again.

Definitely sounds reasonable. I wonder why we dont do that.

Quick test... replace our semaphores with x86 versions. Im trying to
replicate here.



 25-akpm/arch/ppc64/kernel/semaphore.c |  138 ++++++++++++++++++++++++++++++++++
 25-akpm/include/asm-ppc64/semaphore.h |    3 
 2 files changed, 141 insertions(+)

diff -puN arch/ppc64/kernel/semaphore.c~ppc64-semaphore-reimplementation arch/ppc64/kernel/semaphore.c
--- 25/arch/ppc64/kernel/semaphore.c~ppc64-semaphore-reimplementation	Mon Jun  2 16:46:02 2003
+++ 25-akpm/arch/ppc64/kernel/semaphore.c	Mon Jun  2 16:46:02 2003
@@ -21,6 +21,7 @@
 #include <asm/semaphore.h>
 #include <asm/errno.h>
 
+#if 0
 /*
  * Atomically update sem->count.
  * This does the equivalent of the following:
@@ -129,3 +130,140 @@ int __down_interruptible(struct semaphor
 	wake_up(&sem->wait);
 	return retval;
 }
+#else
+
+static __inline__ int atomic_add_negative(int i, atomic_t *v)
+{
+	if (atomic_add_return(i, v) < 0)
+		return 1;
+	else
+		return 0;
+}
+
+void __up(struct semaphore *sem)
+{
+	wake_up(&sem->wait);
+}
+
+void __down(struct semaphore * sem)
+{
+	struct task_struct *tsk = current;
+	DECLARE_WAITQUEUE(wait, tsk);
+	unsigned long flags;
+
+	tsk->state = TASK_UNINTERRUPTIBLE;
+	spin_lock_irqsave(&sem->wait.lock, flags);
+	add_wait_queue_exclusive_locked(&sem->wait, &wait);
+
+	sem->sleepers++;
+	for (;;) {
+		int sleepers = sem->sleepers;
+
+		/*
+		 * Add "everybody else" into it. They aren't
+		 * playing, because we own the spinlock in
+		 * the wait_queue_head.
+		 */
+		if (!atomic_add_negative(sleepers - 1, &sem->count)) {
+			sem->sleepers = 0;
+			break;
+		}
+		sem->sleepers = 1;	/* us - see -1 above */
+		spin_unlock_irqrestore(&sem->wait.lock, flags);
+
+		schedule();
+
+		spin_lock_irqsave(&sem->wait.lock, flags);
+		tsk->state = TASK_UNINTERRUPTIBLE;
+	}
+	remove_wait_queue_locked(&sem->wait, &wait);
+	wake_up_locked(&sem->wait);
+	spin_unlock_irqrestore(&sem->wait.lock, flags);
+	tsk->state = TASK_RUNNING;
+}
+
+int __down_interruptible(struct semaphore * sem)
+{
+	int retval = 0;
+	struct task_struct *tsk = current;
+	DECLARE_WAITQUEUE(wait, tsk);
+	unsigned long flags;
+
+	tsk->state = TASK_INTERRUPTIBLE;
+	spin_lock_irqsave(&sem->wait.lock, flags);
+	add_wait_queue_exclusive_locked(&sem->wait, &wait);
+
+	sem->sleepers++;
+	for (;;) {
+		int sleepers = sem->sleepers;
+
+		/*
+		 * With signals pending, this turns into
+		 * the trylock failure case - we won't be
+		 * sleeping, and we* can't get the lock as
+		 * it has contention. Just correct the count
+		 * and exit.
+		 */
+		if (signal_pending(current)) {
+			retval = -EINTR;
+			sem->sleepers = 0;
+			atomic_add(sleepers, &sem->count);
+			break;
+		}
+
+		/*
+		 * Add "everybody else" into it. They aren't
+		 * playing, because we own the spinlock in
+		 * wait_queue_head. The "-1" is because we're
+		 * still hoping to get the semaphore.
+		 */
+		if (!atomic_add_negative(sleepers - 1, &sem->count)) {
+			sem->sleepers = 0;
+			break;
+		}
+		sem->sleepers = 1;	/* us - see -1 above */
+		spin_unlock_irqrestore(&sem->wait.lock, flags);
+
+		schedule();
+
+		spin_lock_irqsave(&sem->wait.lock, flags);
+		tsk->state = TASK_INTERRUPTIBLE;
+	}
+	remove_wait_queue_locked(&sem->wait, &wait);
+	wake_up_locked(&sem->wait);
+	spin_unlock_irqrestore(&sem->wait.lock, flags);
+
+	tsk->state = TASK_RUNNING;
+	return retval;
+}
+
+/*
+ * Trylock failed - make sure we correct for
+ * having decremented the count.
+ *
+ * We could have done the trylock with a
+ * single "cmpxchg" without failure cases,
+ * but then it wouldn't work on a 386.
+ */
+int __down_trylock(struct semaphore * sem)
+{
+	int sleepers;
+	unsigned long flags;
+
+	spin_lock_irqsave(&sem->wait.lock, flags);
+	sleepers = sem->sleepers + 1;
+	sem->sleepers = 0;
+
+	/*
+	 * Add "everybody else" and us into it. They aren't
+	 * playing, because we own the spinlock in the
+	 * wait_queue_head.
+	 */
+	if (!atomic_add_negative(sleepers, &sem->count)) {
+		wake_up_locked(&sem->wait);
+	}
+
+	spin_unlock_irqrestore(&sem->wait.lock, flags);
+	return 1;
+}
+#endif
diff -puN include/asm-ppc64/semaphore.h~ppc64-semaphore-reimplementation include/asm-ppc64/semaphore.h
--- 25/include/asm-ppc64/semaphore.h~ppc64-semaphore-reimplementation	Mon Jun  2 16:46:02 2003
+++ 25-akpm/include/asm-ppc64/semaphore.h	Mon Jun  2 16:46:02 2003
@@ -22,6 +22,7 @@ struct semaphore {
 	 * sleeping on `wait'.
 	 */
 	atomic_t count;
+	int sleepers;
 	wait_queue_head_t wait;
 #ifdef WAITQUEUE_DEBUG
 	long __magic;
@@ -37,6 +38,7 @@ struct semaphore {
 
 #define __SEMAPHORE_INITIALIZER(name, count) \
 	{ ATOMIC_INIT(count), \
+	  0, \
 	  __WAIT_QUEUE_HEAD_INITIALIZER((name).wait) \
 	  __SEM_DEBUG_INIT(name) }
 
@@ -52,6 +54,7 @@ struct semaphore {
 static inline void sema_init (struct semaphore *sem, int val)
 {
 	atomic_set(&sem->count, val);
+	sem->sleepers = 0;
 	init_waitqueue_head(&sem->wait);
 #ifdef WAITQUEUE_DEBUG
 	sem->__magic = (long)&sem->__magic;

_