Core retry infrastructure


 fs/aio.c                  |  525 +++++++++++++++++++++++++++++++++++++++-------
 include/linux/aio.h       |   22 +
 include/linux/errno.h     |    1 
 include/linux/init_task.h |    1 
 include/linux/sched.h     |    7 
 include/linux/wait.h      |    9 
 kernel/fork.c             |   17 +
 7 files changed, 502 insertions(+), 80 deletions(-)

diff -puN fs/aio.c~aio-01-retry fs/aio.c
--- 25/fs/aio.c~aio-01-retry	2003-06-05 00:53:52.000000000 -0700
+++ 25-akpm/fs/aio.c	2003-06-05 00:53:52.000000000 -0700
@@ -39,6 +39,9 @@
 #define dprintk(x...)	do { ; } while (0)
 #endif
 
+long aio_run = 0; /* for testing only */
+long aio_wakeups = 0; /* for testing only */
+
 /*------ sysctl variables----*/
 atomic_t aio_nr = ATOMIC_INIT(0);	/* current system wide number of aio requests */
 unsigned aio_max_nr = 0x10000;	/* system wide maximum number of aio requests */
@@ -281,6 +284,7 @@ static void aio_cancel_all(struct kioctx
 		struct kiocb *iocb = list_kiocb(pos);
 		list_del_init(&iocb->ki_list);
 		cancel = iocb->ki_cancel;
+		kiocbSetCancelled(iocb);
 		if (cancel) {
 			iocb->ki_users++;
 			spin_unlock_irq(&ctx->ctx_lock);
@@ -395,6 +399,7 @@ static struct kiocb *__aio_get_req(struc
 	req->ki_cancel = NULL;
 	req->ki_retry = NULL;
 	req->ki_user_obj = NULL;
+	INIT_LIST_HEAD(&req->ki_run_list);
 
 	/* Check if the completion queue has enough free space to
 	 * accept an event from this io.
@@ -536,65 +541,277 @@ struct kioctx *lookup_ioctx(unsigned lon
 	return ioctx;
 }
 
+/*
+ * use_mm
+ * 	Makes the calling kernel thread take on the specified 
+ * 	mm context. 
+ * 	Called by the retry thread execute retries within the 
+ * 	iocb issuer's mm context, so that copy_from/to_user
+ * 	operations work seamlessly for aio.
+ * 	(Note: this routine is intended to be called only 
+ * 	from a kernel thread context)
+ */
 static void use_mm(struct mm_struct *mm)
 {
 	struct mm_struct *active_mm = current->active_mm;
 	atomic_inc(&mm->mm_count);
 	current->mm = mm;
-	if (mm != active_mm) {
-		current->active_mm = mm;
-		activate_mm(active_mm, mm);
-	}
+
+	current->active_mm = mm;
+	activate_mm(active_mm, mm);
+
 	mmdrop(active_mm);
 }
 
-static void unuse_mm(struct mm_struct *mm)
+/*
+ * unuse_mm
+ * 	Reverses the effect of use_mm, i.e. releases the
+ * 	specified mm context which was earlier taken on
+ * 	by the calling kernel thread 
+ * 	(Note: this routine is intended to be called only 
+ * 	from a kernel thread context)
+ */
+void unuse_mm(struct mm_struct *mm)
 {
 	current->mm = NULL;
 	/* active_mm is still 'mm' */
 	enter_lazy_tlb(mm, current, smp_processor_id());
 }
 
-/* Run on kevent's context.  FIXME: needs to be per-cpu and warn if an
- * operation blocks.
+/*
+ * Queue up a kiocb to be retried. Assumes that the kiocb
+ * has already been marked as kicked, and places it on  
+ * the retry run list for the corresponding ioctx, if it
+ * isn't already queued. Returns 1 if it actually queued
+ * the kiocb (to tell the caller to activate the work
+ * queue to process it), or 0, if it found that it was
+ * already queued.
+ *
+ * Should be called with the spin lock iocb->ki_ctx->ctx_lock 
+ * held
  */
-static void aio_kick_handler(void *data)
+static inline int __queue_kicked_iocb(struct kiocb *iocb)
 {
-	struct kioctx *ctx = data;
+	struct kioctx	*ctx = iocb->ki_ctx;
 
-	use_mm(ctx->mm);
+	if (list_empty(&iocb->ki_run_list)) {
+		list_add_tail(&iocb->ki_run_list, 
+			&ctx->run_list);
+		iocb->ki_queued++;
+		return 1;
+	}
+	return 0;
+}
 
-	spin_lock_irq(&ctx->ctx_lock);
-	while (!list_empty(&ctx->run_list)) {
-		struct kiocb *iocb;
-		long ret;
+/* aio_run_iocb
+ * 	 This is the core aio execution routine. It is
+ * 	 invoked both for initial i/o submission and 
+ * 	 subsequent retries via the aio_kick_handler.
+ *       Expects to be invoked with iocb->ki_ctx->lock 
+ *       already held. The lock is released and reaquired 
+ *       as needed during processing.
+ *
+ * Calls the iocb retry method (already setup for the 
+ * iocb on initial submission) for operation specific
+ * handling, but takes care of most of common retry 
+ * execution details for a given iocb. The retry method
+ * needs to be non-blocking as far as possible, to avoid
+ * holding up other iocbs waiting to be serviced by the
+ * retry kernel thread.
+ *
+ * The trickier parts in this code have to do with 
+ * ensuring that only one retry instance is in progress 
+ * for a given iocb at any time. Providing that guarantee
+ * simplifies the coding of individual aio operations as
+ * it avoids various potential races.
+ */
+static ssize_t aio_run_iocb(struct kiocb *iocb)
+{
+	struct kioctx	*ctx = iocb->ki_ctx;
+	ssize_t (*retry)(struct kiocb *);
+	ssize_t ret;
 
-		iocb = list_entry(ctx->run_list.next, struct kiocb,
-				  ki_run_list);
-		list_del(&iocb->ki_run_list);
-		iocb->ki_users ++;
-		spin_unlock_irq(&ctx->ctx_lock);
+	if (iocb->ki_retried++ > 1024*1024) {
+		printk("Maximal retry count.  Bytes done %Zd\n",
+			iocb->ki_nbytes - iocb->ki_left);
+		return -EAGAIN;
+	}
 
-		kiocbClearKicked(iocb);
-		ret = iocb->ki_retry(iocb);
+	if (!(iocb->ki_retried & 0xff)) {
+		pr_debug("%ld retry: %d of %d (kick %ld, Q %ld run %ld, wake %ld)\n",
+			iocb->ki_retried, 
+			iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes,
+			iocb->ki_kicked, iocb->ki_queued, aio_run, aio_wakeups);
+	}
+
+	if (!(retry = iocb->ki_retry)) {
+		printk("aio_run_iocb: iocb->ki_retry = NULL\n");
+		return 0;
+	}
+
+	/*
+	 * We don't want the next retry iteration for this 
+	 * operation to start until this one has returned and
+	 * updated the iocb state. However, wait_queue functions 
+	 * can trigger a kick_iocb from interrupt context in the 
+	 * meantime, indicating that data is available for the next 
+	 * iteration. We want to remember that and enable the 
+	 * next retry iteration _after_ we are through with 
+	 * this one.
+	 *
+	 * So, in order to be able to register a "kick", but
+	 * prevent it from being queued now, we clear the kick
+	 * flag, but make the kick code *think* that the iocb is 
+	 * still on the run list until we are actually done.
+	 * When we are done with this iteration, we check if 
+	 * the iocb was kicked in the meantime and if so, queue 
+	 * it up afresh.
+	 */
+	
+	kiocbClearKicked(iocb);
+	
+	/* 
+	 * This is so that aio_complete knows it doesn't need to
+	 * pull the iocb off the run list (We can't just call 
+	 * INIT_LIST_HEAD because we don't want a kick_iocb to 
+	 * queue this on the run list yet)
+	 */
+	iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL;
+	iocb->ki_retry = NULL;	
+	spin_unlock_irq(&ctx->ctx_lock);
+	
+	/* Quit retrying if the i/o has been cancelled */
+	if (kiocbIsCancelled(iocb)) {
+		ret = -EINTR;
+		aio_complete(iocb, ret, 0);
+		/* must not access the iocb after this */
+		goto out;
+	}
+
+	/*
+	 * Now we are all set to call the retry method in async
+	 * context. By setting this thread's io_wait context
+	 * to point to the wait queue entry inside the currently 
+	 * running iocb for the duration of the retry, we ensure 
+	 * that async notification wakeups are queued by the 
+	 * operation instead of blocking waits, and when notified, 
+	 * cause the iocb to be kicked for continuation (through 
+	 * the aio_wake_function callback).
+	 */
+	BUG_ON(current->io_wait != NULL);
+	current->io_wait = &iocb->ki_wait;
+	ret = retry(iocb);
+	current->io_wait = NULL;
+
+	if (-EIOCBRETRY != ret) {
 		if (-EIOCBQUEUED != ret) {
+			BUG_ON(!list_empty(&iocb->ki_wait.task_list)); 
 			aio_complete(iocb, ret, 0);
-			iocb = NULL;
+			/* must not access the iocb after this */
 		}
+	} else {
+		/*
+		 * Issue an additional retry to avoid waiting forever if 
+		 * no waits were queued (e.g. in case of a short read).
+		 */
+		if (list_empty(&iocb->ki_wait.task_list)) 
+			kiocbSetKicked(iocb);
+	}
+out:
+	spin_lock_irq(&ctx->ctx_lock);
 
-		spin_lock_irq(&ctx->ctx_lock);
-		if (NULL != iocb)
-			__aio_put_req(ctx, iocb);
+	if (-EIOCBRETRY == ret) {
+		/*
+		 * OK, now that we are done with this iteration
+		 * and know that there is more left to go,
+		 * this is where we let go so that a subsequent
+		 * "kick" can start the next iteration
+		 */
+		iocb->ki_retry = retry;
+		/* will make __queue_kicked_iocb succeed from here on */
+		INIT_LIST_HEAD(&iocb->ki_run_list);
+		/* we must queue the next iteration ourselves, if it
+		 * has already been kicked */
+		if (kiocbIsKicked(iocb)) {
+			__queue_kicked_iocb(iocb);
+		} 
+	}
+	return ret;
+}
+
+/* 
+ * aio_run_iocbs:
+ * 	Process all pending retries queued on the ioctx 
+ * 	run list. 
+ * Assumes it is operating within the aio issuer's mm
+ * context.
+ */
+static void aio_run_iocbs(struct kioctx *ctx)
+{
+	struct kiocb *iocb;
+	ssize_t ret;
+	int count = 0;
+
+	spin_lock_irq(&ctx->ctx_lock);
+	while (!list_empty(&ctx->run_list)) {
+		iocb = list_entry(ctx->run_list.next, struct kiocb,
+			ki_run_list);
+		list_del(&iocb->ki_run_list);
+		ret = aio_run_iocb(iocb);
+		count++;
 	}
 	spin_unlock_irq(&ctx->ctx_lock);
+	aio_run++;
+}
+
+/* 
+ * aio_kick_handler:
+ * 	Work queue handler triggered to process pending 
+ * 	retries on an ioctx. Takes on the aio issuer's 
+ * 	mm context before running the iocbs.
+ * Run on aiod's context.  
+ */
+static void aio_kick_handler(void *data)
+{
+	struct kioctx *ctx = data;
 
+	use_mm(ctx->mm);
+	aio_run_iocbs(ctx);
 	unuse_mm(ctx->mm);
 }
 
-void kick_iocb(struct kiocb *iocb)
+
+/*
+ * Called by kick_iocb to queue the kiocb for retry
+ * and if required activate the aio work queue to process 
+ * it 
+ */
+void queue_kicked_iocb(struct kiocb *iocb)
 {
 	struct kioctx	*ctx = iocb->ki_ctx;
+	unsigned long flags;
+	int run = 0;
+
+	WARN_ON((!list_empty(&iocb->ki_wait.task_list)));
 
+	spin_lock_irqsave(&ctx->ctx_lock, flags);
+	run = __queue_kicked_iocb(iocb);
+	spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+	if (run) {
+		queue_work(aio_wq, &ctx->wq);
+		aio_wakeups++;
+	}
+}
+
+/*
+ * kick_iocb:
+ * 	Called typically from a wait queue callback context 
+ * 	(aio_wake_function) to trigger a retry of the iocb.
+ * 	The retry is usually executed by aio workqueue 
+ * 	threads (See aio_kick_handler).
+ */
+void kick_iocb(struct kiocb *iocb)
+{
 	/* sync iocbs are easy: they can only ever be executing from a 
 	 * single context. */
 	if (is_sync_kiocb(iocb)) {
@@ -603,12 +820,10 @@ void kick_iocb(struct kiocb *iocb)
 		return;
 	}
 
+	iocb->ki_kicked++;
+	/* If its already kicked we shouldn't queue it again */
 	if (!kiocbTryKick(iocb)) {
-		unsigned long flags;
-		spin_lock_irqsave(&ctx->ctx_lock, flags);
-		list_add_tail(&iocb->ki_run_list, &ctx->run_list);
-		spin_unlock_irqrestore(&ctx->ctx_lock, flags);
-		schedule_work(&ctx->wq);
+		queue_kicked_iocb(iocb);
 	}
 }
 
@@ -661,6 +876,9 @@ int aio_complete(struct kiocb *iocb, lon
 	 */
 	spin_lock_irqsave(&ctx->ctx_lock, flags);
 
+	if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list))
+		list_del_init(&iocb->ki_run_list);
+
 	ring = kmap_atomic(info->ring_pages[0], KM_IRQ1);
 
 	tail = info->tail;
@@ -690,6 +908,11 @@ int aio_complete(struct kiocb *iocb, lon
 
 	pr_debug("added to ring %p at [%lu]\n", iocb, tail);
 
+	pr_debug("%ld retries: %d of %d (kicked %ld, Q %ld run %ld wake %ld)\n",
+		iocb->ki_retried, 
+		iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes,
+		iocb->ki_kicked, iocb->ki_queued, aio_run, aio_wakeups);
+
 	/* everything turned out well, dispose of the aiocb. */
 	ret = __aio_put_req(ctx, iocb);
 
@@ -804,6 +1027,7 @@ static int read_events(struct kioctx *ct
 	int			i = 0;
 	struct io_event		ent;
 	struct timeout		to;
+	int 			event_loop = 0; /* testing only */
 
 	/* needed to zero any padding within an entry (there shouldn't be 
 	 * any, but C is fun!
@@ -853,7 +1077,6 @@ static int read_events(struct kioctx *ct
 		add_wait_queue_exclusive(&ctx->wait, &wait);
 		do {
 			set_task_state(tsk, TASK_INTERRUPTIBLE);
-
 			ret = aio_read_evt(ctx, &ent);
 			if (ret)
 				break;
@@ -863,6 +1086,7 @@ static int read_events(struct kioctx *ct
 			if (to.timed_out)	/* Only check after read evt */
 				break;
 			schedule();
+			event_loop++;
 			if (signal_pending(tsk)) {
 				ret = -EINTR;
 				break;
@@ -890,6 +1114,9 @@ static int read_events(struct kioctx *ct
 	if (timeout)
 		clear_timeout(&to);
 out:
+	pr_debug("event loop executed %d times\n", event_loop);
+	pr_debug("aio_run %ld\n", aio_run);
+	pr_debug("aio_wakeups %ld\n", aio_wakeups);
 	return i ? i : ret;
 }
 
@@ -981,13 +1208,179 @@ asmlinkage long sys_io_destroy(aio_conte
 	return -EINVAL;
 }
 
+/* 
+ * Retry method for aio_read (also used for first time submit)
+ * Responsible for updating iocb state as retries progress 
+ */
+static ssize_t aio_pread(struct kiocb *iocb)
+{
+	struct file *file = iocb->ki_filp;
+	ssize_t ret = 0;
+
+	ret = file->f_op->aio_read(iocb, iocb->ki_buf,
+		iocb->ki_left, iocb->ki_pos);
+
+	/*
+	 * Can't just depend on iocb->ki_left to determine 
+	 * whether we are done. This may have been a short read.
+	 */
+	if (ret > 0) {
+		iocb->ki_buf += ret;
+		iocb->ki_left -= ret;
+
+		ret = -EIOCBRETRY;
+	}
+
+	/* This means we must have transferred all that we could */
+	/* No need to retry anymore */
+	if ((ret == 0) || (iocb->ki_left == 0)) 
+		ret = iocb->ki_nbytes - iocb->ki_left;
+
+	return ret;
+}
+
+/* 
+ * Retry method for aio_write (also used for first time submit)
+ * Responsible for updating iocb state as retries progress 
+ */
+static ssize_t aio_pwrite(struct kiocb *iocb)
+{
+	struct file *file = iocb->ki_filp;
+	ssize_t ret = 0;
+
+	ret = file->f_op->aio_write(iocb, iocb->ki_buf,
+		iocb->ki_left, iocb->ki_pos);
+
+	/* 
+	 * TBD: Even if iocb->ki_left = 0, could we need to 
+	 * wait for data to be sync'd ? Or can we assume
+	 * that aio_fdsync/aio_fsync would be called explicitly
+	 * as required.
+	 */
+	if (ret > 0) {
+		iocb->ki_buf += ret;
+		iocb->ki_left -= ret;
+
+		ret = -EIOCBRETRY;
+	}
+
+	/* This means we must have transferred all that we could */
+	/* No need to retry anymore */
+	if (ret == 0) 
+		ret = iocb->ki_nbytes - iocb->ki_left;
+
+	return ret;
+}
+
+static ssize_t aio_fdsync(struct kiocb *iocb)
+{
+	struct file *file = iocb->ki_filp;
+	ssize_t ret = -EINVAL;
+
+	if (file->f_op->aio_fsync)
+		ret = file->f_op->aio_fsync(iocb, 1);
+	return ret;
+}
+	
+static ssize_t aio_fsync(struct kiocb *iocb)
+{
+	struct file *file = iocb->ki_filp;
+	ssize_t ret = -EINVAL;
+
+	if (file->f_op->aio_fsync)
+		ret = file->f_op->aio_fsync(iocb, 0);
+	return ret;
+}
+	
+/* 
+ * aio_setup_iocb:
+ *	Performs the initial checks and aio retry method
+ *	setup for the kiocb at the time of io submission.
+ */
+ssize_t aio_setup_iocb(struct kiocb *kiocb)
+{
+	struct file *file = kiocb->ki_filp;
+	ssize_t ret = 0;
+	
+	switch (kiocb->ki_opcode) {
+	case IOCB_CMD_PREAD:
+		ret = -EBADF;
+		if (unlikely(!(file->f_mode & FMODE_READ)))
+			break;
+		ret = -EFAULT;
+		if (unlikely(!access_ok(VERIFY_WRITE, kiocb->ki_buf, 
+			kiocb->ki_left)))
+			break;
+		ret = -EINVAL;
+		if (file->f_op->aio_read)
+			kiocb->ki_retry = aio_pread;
+		break;
+	case IOCB_CMD_PWRITE:
+		ret = -EBADF;
+		if (unlikely(!(file->f_mode & FMODE_WRITE)))
+			break;
+		ret = -EFAULT;
+		if (unlikely(!access_ok(VERIFY_READ, kiocb->ki_buf, 
+			kiocb->ki_left)))
+			break;
+		ret = -EINVAL;
+		if (file->f_op->aio_write)
+			kiocb->ki_retry = aio_pwrite;
+		break;
+	case IOCB_CMD_FDSYNC:
+		ret = -EINVAL;
+		if (file->f_op->aio_fsync)
+			kiocb->ki_retry = aio_fdsync;
+		break;
+	case IOCB_CMD_FSYNC:
+		ret = -EINVAL;
+		if (file->f_op->aio_fsync)
+			kiocb->ki_retry = aio_fsync;
+		break;
+	default:
+		dprintk("EINVAL: io_submit: no operation provided\n");
+		ret = -EINVAL;
+	}
+
+	if (!kiocb->ki_retry)
+		return ret;
+
+	return 0;
+}
+
+/*
+ * aio_wake_function:
+ * 	wait queue callback function for aio notification,
+ * 	Simply triggers a retry of the operation via kick_iocb.
+ *
+ * 	This callback is specified in the wait queue entry in 
+ *	a kiocb	(current->io_wait points to this wait queue 
+ *	entry when an aio operation executes; it is used
+ * 	instead of a synchronous wait when an i/o blocking 
+ *	condition is encountered during aio).
+ *
+ * Note:
+ * This routine is executed with the wait queue lock held.
+ * Since kick_iocb acquires iocb->ctx->ctx_lock, it nests
+ * the ioctx lock inside the wait queue lock. This is safe
+ * because this callback isn't used for wait queues which 
+ * are nested inside ioctx lock (i.e. ctx->wait)
+ */
+int aio_wake_function(wait_queue_t *wait, unsigned mode, int sync)
+{
+	struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait);
+
+	list_del_init(&wait->task_list);
+	kick_iocb(iocb);
+	return 1;
+}
+
 int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 			 struct iocb *iocb)
 {
 	struct kiocb *req;
 	struct file *file;
 	ssize_t ret;
-	char *buf;
 
 	/* enforce forwards compatibility on users */
 	if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 ||
@@ -1028,51 +1421,31 @@ int io_submit_one(struct kioctx *ctx, st
 	req->ki_user_data = iocb->aio_data;
 	req->ki_pos = iocb->aio_offset;
 
-	buf = (char *)(unsigned long)iocb->aio_buf;
+	req->ki_buf = (char *)(unsigned long)iocb->aio_buf;
+	req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
+	req->ki_opcode = iocb->aio_lio_opcode;
+	init_waitqueue_func_entry(&req->ki_wait, aio_wake_function);
+	INIT_LIST_HEAD(&req->ki_wait.task_list);
+	req->ki_run_list.next = req->ki_run_list.prev = NULL;
+	req->ki_retry = NULL;
+	req->ki_retried = 0;
+	req->ki_kicked = 0;
+	req->ki_queued = 0;
+	aio_run = 0;
+	aio_wakeups = 0;
 
-	switch (iocb->aio_lio_opcode) {
-	case IOCB_CMD_PREAD:
-		ret = -EBADF;
-		if (unlikely(!(file->f_mode & FMODE_READ)))
-			goto out_put_req;
-		ret = -EFAULT;
-		if (unlikely(!access_ok(VERIFY_WRITE, buf, iocb->aio_nbytes)))
-			goto out_put_req;
-		ret = -EINVAL;
-		if (file->f_op->aio_read)
-			ret = file->f_op->aio_read(req, buf,
-					iocb->aio_nbytes, req->ki_pos);
-		break;
-	case IOCB_CMD_PWRITE:
-		ret = -EBADF;
-		if (unlikely(!(file->f_mode & FMODE_WRITE)))
-			goto out_put_req;
-		ret = -EFAULT;
-		if (unlikely(!access_ok(VERIFY_READ, buf, iocb->aio_nbytes)))
-			goto out_put_req;
-		ret = -EINVAL;
-		if (file->f_op->aio_write)
-			ret = file->f_op->aio_write(req, buf,
-					iocb->aio_nbytes, req->ki_pos);
-		break;
-	case IOCB_CMD_FDSYNC:
-		ret = -EINVAL;
-		if (file->f_op->aio_fsync)
-			ret = file->f_op->aio_fsync(req, 1);
-		break;
-	case IOCB_CMD_FSYNC:
-		ret = -EINVAL;
-		if (file->f_op->aio_fsync)
-			ret = file->f_op->aio_fsync(req, 0);
-		break;
-	default:
-		dprintk("EINVAL: io_submit: no operation provided\n");
-		ret = -EINVAL;
-	}
+	ret = aio_setup_iocb(req);
+
+	if ((-EBADF == ret) || (-EFAULT == ret))
+		goto out_put_req;
+
+	spin_lock_irq(&ctx->ctx_lock);
+	ret = aio_run_iocb(req);
+	spin_unlock_irq(&ctx->ctx_lock);
+
+	if (-EIOCBRETRY == ret)
+		queue_work(aio_wq, &ctx->wq);
 
-	if (likely(-EIOCBQUEUED == ret))
-		return 0;
-	aio_complete(req, ret, 0);
 	return 0;
 
 out_put_req:
diff -puN include/linux/aio.h~aio-01-retry include/linux/aio.h
--- 25/include/linux/aio.h~aio-01-retry	2003-06-05 00:53:52.000000000 -0700
+++ 25-akpm/include/linux/aio.h	2003-06-05 00:53:52.000000000 -0700
@@ -54,7 +54,7 @@ struct kiocb {
 	struct file		*ki_filp;
 	struct kioctx		*ki_ctx;	/* may be NULL for sync ops */
 	int			(*ki_cancel)(struct kiocb *, struct io_event *);
-	long			(*ki_retry)(struct kiocb *);
+	ssize_t			(*ki_retry)(struct kiocb *);
 
 	struct list_head	ki_list;	/* the aio core uses this
 						 * for cancellation */
@@ -62,6 +62,16 @@ struct kiocb {
 	void			*ki_user_obj;	/* pointer to userland's iocb */
 	__u64			ki_user_data;	/* user's data for completion */
 	loff_t			ki_pos;
+	
+	/* State that we remember to be able to restart/retry  */
+	unsigned short		ki_opcode;
+	size_t			ki_nbytes; 	/* copy of iocb->aio_nbytes */
+	char 			*ki_buf;	/* remaining iocb->aio_buf */
+	size_t			ki_left; 	/* remaining bytes */
+	wait_queue_t		ki_wait;
+	long			ki_retried; 	/* just for testing */
+	long			ki_kicked; 	/* just for testing */
+	long			ki_queued; 	/* just for testing */
 
 	char			private[KIOCB_PRIVATE_SIZE];
 };
@@ -77,6 +87,8 @@ struct kiocb {
 		(x)->ki_ctx = &tsk->active_mm->default_kioctx;	\
 		(x)->ki_cancel = NULL;			\
 		(x)->ki_user_obj = tsk;			\
+		(x)->ki_user_data = 0;			\
+		init_wait((&(x)->ki_wait));		\
 	} while (0)
 
 #define AIO_RING_MAGIC			0xa10a10a1
@@ -159,6 +171,14 @@ int FASTCALL(io_submit_one(struct kioctx
 #define get_ioctx(kioctx)	do { if (unlikely(atomic_read(&(kioctx)->users) <= 0)) BUG(); atomic_inc(&(kioctx)->users); } while (0)
 #define put_ioctx(kioctx)	do { if (unlikely(atomic_dec_and_test(&(kioctx)->users))) __put_ioctx(kioctx); else if (unlikely(atomic_read(&(kioctx)->users) < 0)) BUG(); } while (0)
 
+#define in_aio() !is_sync_wait(current->io_wait)
+/* may be used for debugging */
+#define warn_if_async()	if (in_aio()) {\
+	printk(KERN_ERR "%s(%s:%d) called in async context!\n", \
+	__FUNCTION__, __FILE__, __LINE__); \
+	dump_stack(); \
+	}
+
 #include <linux/aio_abi.h>
 
 static inline struct kiocb *list_kiocb(struct list_head *h)
diff -puN include/linux/errno.h~aio-01-retry include/linux/errno.h
--- 25/include/linux/errno.h~aio-01-retry	2003-06-05 00:53:52.000000000 -0700
+++ 25-akpm/include/linux/errno.h	2003-06-05 00:53:52.000000000 -0700
@@ -22,6 +22,7 @@
 #define EBADTYPE	527	/* Type not supported by server */
 #define EJUKEBOX	528	/* Request initiated, but will not complete before timeout */
 #define EIOCBQUEUED	529	/* iocb queued, will get completion event */
+#define EIOCBRETRY	530	/* iocb queued, will trigger a retry */
 
 #endif
 
diff -puN include/linux/init_task.h~aio-01-retry include/linux/init_task.h
--- 25/include/linux/init_task.h~aio-01-retry	2003-06-05 00:53:52.000000000 -0700
+++ 25-akpm/include/linux/init_task.h	2003-06-05 00:53:52.000000000 -0700
@@ -108,6 +108,7 @@
 	.proc_lock	= SPIN_LOCK_UNLOCKED,				\
 	.switch_lock	= SPIN_LOCK_UNLOCKED,				\
 	.journal_info	= NULL,						\
+	.io_wait	= NULL,						\
 }
 
 
diff -puN include/linux/sched.h~aio-01-retry include/linux/sched.h
--- 25/include/linux/sched.h~aio-01-retry	2003-06-05 00:53:52.000000000 -0700
+++ 25-akpm/include/linux/sched.h	2003-06-05 00:53:52.000000000 -0700
@@ -453,6 +453,13 @@ struct task_struct {
 	unsigned long ptrace_message;
 	siginfo_t *last_siginfo; /* For ptrace use.  */
 	long debug;
+/* 
+ * current io wait handle: wait queue entry to use for io waits
+ * If this thread is processing aio, this points at the waitqueue 
+ * inside the currently handled kiocb. It may be NULL (i.e. default
+ * to a stack based synchronous wait) if its doing sync IO.
+ */
+	wait_queue_t *io_wait;
 };
 
 extern void __put_task_struct(struct task_struct *tsk);
diff -puN include/linux/wait.h~aio-01-retry include/linux/wait.h
--- 25/include/linux/wait.h~aio-01-retry	2003-06-05 00:53:52.000000000 -0700
+++ 25-akpm/include/linux/wait.h	2003-06-05 00:53:52.000000000 -0700
@@ -80,6 +80,15 @@ static inline int waitqueue_active(wait_
 	return !list_empty(&q->task_list);
 }
 
+/* 
+ * Used to distinguish between sync and async io wait context:
+ * sync i/o typically specifies a NULL wait queue entry or a wait
+ * queue entry bound to a task (current task) to wake up.
+ * aio specifies a wait queue entry with an async notification 
+ * callback routine, not associated with any task.
+ */
+#define is_sync_wait(wait)	(!(wait) || ((wait)->task))
+
 extern void FASTCALL(add_wait_queue(wait_queue_head_t *q, wait_queue_t * wait));
 extern void FASTCALL(add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t * wait));
 extern void FASTCALL(remove_wait_queue(wait_queue_head_t *q, wait_queue_t * wait));
diff -puN kernel/fork.c~aio-01-retry kernel/fork.c
--- 25/kernel/fork.c~aio-01-retry	2003-06-05 00:53:52.000000000 -0700
+++ 25-akpm/kernel/fork.c	2003-06-05 00:53:52.000000000 -0700
@@ -145,8 +145,13 @@ void remove_wait_queue(wait_queue_head_t
 void prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
 {
 	unsigned long flags;
-
-	__set_current_state(state);
+	
+	/* 
+	 * don't alter the task state if this is just going to 
+	 * queue an async wait queue callback
+	 */
+	if (is_sync_wait(wait))
+		__set_current_state(state);
 	wait->flags &= ~WQ_FLAG_EXCLUSIVE;
 	spin_lock_irqsave(&q->lock, flags);
 	if (list_empty(&wait->task_list))
@@ -159,7 +164,12 @@ prepare_to_wait_exclusive(wait_queue_hea
 {
 	unsigned long flags;
 
-	__set_current_state(state);
+	/* 
+	 * don't alter the task state if this is just going to 
+	 * queue an async wait queue callback
+	 */
+	if (is_sync_wait(wait))
+		__set_current_state(state);
 	wait->flags |= WQ_FLAG_EXCLUSIVE;
 	spin_lock_irqsave(&q->lock, flags);
 	if (list_empty(&wait->task_list))
@@ -863,6 +873,7 @@ struct task_struct *copy_process(unsigne
 	p->lock_depth = -1;		/* -1 = no lock */
 	p->start_time = get_jiffies_64();
 	p->security = NULL;
+	p->io_wait = NULL;
 	p->as_io_context = NULL;
 
 	retval = -ENOMEM;

_