diff mbox series

[2/2] fs/aio: obey min_nr when doing wakeups

Message ID 20230118152603.28301-2-kent.overstreet@linux.dev (mailing list archive)
State New, archived
Headers show
Series [1/2] fs/aio: Use kmap_local() instead of kmap() | expand

Commit Message

Kent Overstreet Jan. 18, 2023, 3:26 p.m. UTC
I've been observing workloads where IPIs due to wakeups in
aio_complete() are ~15% of total CPU time in the profile. Most of those
wakeups are unnecessary when completion batching is in use in
io_getevents().

This plumbs min_nr through via the wait eventry, so that aio_complete()
can avoid doing unnecessary wakeups.

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
Cc: Benjamin LaHaise <bcrl@kvack.org
Cc: linux-aio@kvack.org
Cc: linux-fsdevel@vger.kernel.org
---
 fs/aio.c | 63 +++++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 53 insertions(+), 10 deletions(-)
diff mbox series

Patch

diff --git a/fs/aio.c b/fs/aio.c
index 3f795ed2a2..a03bc93016 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -1105,6 +1105,11 @@  static inline void iocb_destroy(struct aio_kiocb *iocb)
 	kmem_cache_free(kiocb_cachep, iocb);
 }
 
+struct aio_waiter {
+	struct wait_queue_entry	w;
+	size_t			min_nr;
+};
+
 /* aio_complete
  *	Called when the io request on the given iocb is complete.
  */
@@ -1113,7 +1118,7 @@  static void aio_complete(struct aio_kiocb *iocb)
 	struct kioctx	*ctx = iocb->ki_ctx;
 	struct aio_ring	*ring;
 	struct io_event	*ev_page, *event;
-	unsigned tail, pos, head;
+	unsigned tail, pos, head, avail;
 	unsigned long	flags;
 
 	/*
@@ -1157,6 +1162,10 @@  static void aio_complete(struct aio_kiocb *iocb)
 	ctx->completed_events++;
 	if (ctx->completed_events > 1)
 		refill_reqs_available(ctx, head, tail);
+
+	avail = tail > head
+		? tail - head
+		: tail + ctx->nr_events - head;
 	spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
 	pr_debug("added to ring %p at [%u]\n", iocb, tail);
@@ -1177,8 +1186,18 @@  static void aio_complete(struct aio_kiocb *iocb)
 	 */
 	smp_mb();
 
-	if (waitqueue_active(&ctx->wait))
-		wake_up(&ctx->wait);
+	if (waitqueue_active(&ctx->wait)) {
+		struct aio_waiter *curr, *next;
+		unsigned long flags;
+
+		spin_lock_irqsave(&ctx->wait.lock, flags);
+		list_for_each_entry_safe(curr, next, &ctx->wait.head, w.entry)
+			if (avail >= curr->min_nr) {
+				list_del_init_careful(&curr->w.entry);
+				wake_up_process(curr->w.private);
+			}
+		spin_unlock_irqrestore(&ctx->wait.lock, flags);
+	}
 }
 
 static inline void iocb_put(struct aio_kiocb *iocb)
@@ -1294,7 +1313,9 @@  static long read_events(struct kioctx *ctx, long min_nr, long nr,
 			struct io_event __user *event,
 			ktime_t until)
 {
-	long ret = 0;
+	struct hrtimer_sleeper	t;
+	struct aio_waiter	w;
+	long ret = 0, ret2 = 0;
 
 	/*
 	 * Note that aio_read_events() is being called as the conditional - i.e.
@@ -1310,12 +1331,34 @@  static long read_events(struct kioctx *ctx, long min_nr, long nr,
 	 * the ringbuffer empty. So in practice we should be ok, but it's
 	 * something to be aware of when touching this code.
 	 */
-	if (until == 0)
-		aio_read_events(ctx, min_nr, nr, event, &ret);
-	else
-		wait_event_interruptible_hrtimeout(ctx->wait,
-				aio_read_events(ctx, min_nr, nr, event, &ret),
-				until);
+	aio_read_events(ctx, min_nr, nr, event, &ret);
+	if (until == 0 || ret < 0 || ret >= min_nr)
+		return ret;
+
+	hrtimer_init_sleeper_on_stack(&t, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
+	if (until != KTIME_MAX) {
+		hrtimer_set_expires_range_ns(&t.timer, until, current->timer_slack_ns);
+		hrtimer_sleeper_start_expires(&t, HRTIMER_MODE_REL);
+	}
+
+	init_wait(&w.w);
+
+	while (1) {
+		w.min_nr = min_nr - ret;
+
+		ret2 = prepare_to_wait_event(&ctx->wait, &w.w, TASK_INTERRUPTIBLE) ?:
+			!t.task ? -ETIME : 0;
+
+		if (aio_read_events(ctx, min_nr, nr, event, &ret) || ret2)
+			break;
+
+		schedule();
+	}
+
+	finish_wait(&ctx->wait, &w.w);
+	hrtimer_cancel(&t.timer);
+	destroy_hrtimer_on_stack(&t.timer);
+
 	return ret;
 }