Message ID | 20231122234257.179390-1-kent.overstreet@linux.dev (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | fs/aio: obey min_nr when doing wakeups | expand |
On Wed, Nov 22, 2023 at 06:42:53PM -0500, Kent Overstreet wrote: > Unclear who's maintaining fs/aio.c these days - who wants to take this? > -- >8 -- > > I've been observing workloads where IPIs due to wakeups in > aio_complete() are ~15% of total CPU time in the profile. Most of those > wakeups are unnecessary when completion batching is in use in > io_getevents(). > > This plumbs min_nr through via the wait eventry, so that aio_complete() > can avoid doing unnecessary wakeups. > > Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev> > Cc: Benjamin LaHaise <bcrl@kvack.org > Cc: Christian Brauner <brauner@kernel.org> > Cc: linux-aio@kvack.org > Cc: linux-fsdevel@vger.kernel.org > --- > fs/aio.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++--------- > 1 file changed, 56 insertions(+), 10 deletions(-) > > diff --git a/fs/aio.c b/fs/aio.c > index f8589caef9c1..c69e7caacd1b 100644 > --- a/fs/aio.c > +++ b/fs/aio.c > @@ -1106,6 +1106,11 @@ static inline void iocb_destroy(struct aio_kiocb *iocb) > kmem_cache_free(kiocb_cachep, iocb); > } > > +struct aio_waiter { > + struct wait_queue_entry w; > + size_t min_nr; > +}; > + > /* aio_complete > * Called when the io request on the given iocb is complete. > */ > @@ -1114,7 +1119,7 @@ static void aio_complete(struct aio_kiocb *iocb) > struct kioctx *ctx = iocb->ki_ctx; > struct aio_ring *ring; > struct io_event *ev_page, *event; > - unsigned tail, pos, head; > + unsigned tail, pos, head, avail; > unsigned long flags; > > /* > @@ -1156,6 +1161,10 @@ static void aio_complete(struct aio_kiocb *iocb) > ctx->completed_events++; > if (ctx->completed_events > 1) > refill_reqs_available(ctx, head, tail); > + > + avail = tail > head > + ? tail - head > + : tail + ctx->nr_events - head; > spin_unlock_irqrestore(&ctx->completion_lock, flags); > > pr_debug("added to ring %p at [%u]\n", iocb, tail); > @@ -1176,8 +1185,18 @@ static void aio_complete(struct aio_kiocb *iocb) > */ > smp_mb(); > > - if (waitqueue_active(&ctx->wait)) > - wake_up(&ctx->wait); > + if (waitqueue_active(&ctx->wait)) { > + struct aio_waiter *curr, *next; > + unsigned long flags; > + > + spin_lock_irqsave(&ctx->wait.lock, flags); > + list_for_each_entry_safe(curr, next, &ctx->wait.head, w.entry) > + if (avail >= curr->min_nr) { > + list_del_init_careful(&curr->w.entry); > + wake_up_process(curr->w.private); > + } > + spin_unlock_irqrestore(&ctx->wait.lock, flags); > + } > } > > static inline void iocb_put(struct aio_kiocb *iocb) > @@ -1290,7 +1309,9 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr, > struct io_event __user *event, > ktime_t until) > { > - long ret = 0; > + struct hrtimer_sleeper t; > + struct aio_waiter w; > + long ret = 0, ret2 = 0; > > /* > * Note that aio_read_events() is being called as the conditional - i.e. > @@ -1306,12 +1327,37 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr, > * the ringbuffer empty. So in practice we should be ok, but it's > * something to be aware of when touching this code. > */ > - if (until == 0) > - aio_read_events(ctx, min_nr, nr, event, &ret); > - else > - wait_event_interruptible_hrtimeout(ctx->wait, > - aio_read_events(ctx, min_nr, nr, event, &ret), > - until); > + aio_read_events(ctx, min_nr, nr, event, &ret); > + if (until == 0 || ret < 0 || ret >= min_nr) > + return ret; > + > + hrtimer_init_sleeper_on_stack(&t, CLOCK_MONOTONIC, HRTIMER_MODE_REL); > + if (until != KTIME_MAX) { > + hrtimer_set_expires_range_ns(&t.timer, until, current->timer_slack_ns); > + hrtimer_sleeper_start_expires(&t, HRTIMER_MODE_REL); > + } > + > + init_wait(&w.w); > + > + while (1) { > + unsigned long nr_got = ret; > + > + w.min_nr = min_nr - ret; Hm, can this underflow? > + > + ret2 = prepare_to_wait_event(&ctx->wait, &w.w, TASK_INTERRUPTIBLE) ?: > + !t.task ? -ETIME : 0; I'd like to avoid the nested ?: as that's rather hard to read. I _think_ this is equivalent to: if (!ret2 && !t.task) ret = -ETIME; I can just fix this in-tree though. Did I parse that correctly? > + > + if (aio_read_events(ctx, min_nr, nr, event, &ret) || ret2) > + break; > + > + if (nr_got == ret) > + schedule(); > + } > + > + finish_wait(&ctx->wait, &w.w); > + hrtimer_cancel(&t.timer); > + destroy_hrtimer_on_stack(&t.timer); > + > return ret; > } > > -- > 2.42.0 >
On Thu, Nov 23, 2023 at 06:40:01PM +0100, Christian Brauner wrote: > On Wed, Nov 22, 2023 at 06:42:53PM -0500, Kent Overstreet wrote: > > Unclear who's maintaining fs/aio.c these days - who wants to take this? > > -- >8 -- > > > > I've been observing workloads where IPIs due to wakeups in > > aio_complete() are ~15% of total CPU time in the profile. Most of those > > wakeups are unnecessary when completion batching is in use in > > io_getevents(). > > > > This plumbs min_nr through via the wait eventry, so that aio_complete() > > can avoid doing unnecessary wakeups. > > > > Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev> > > Cc: Benjamin LaHaise <bcrl@kvack.org > > Cc: Christian Brauner <brauner@kernel.org> > > Cc: linux-aio@kvack.org > > Cc: linux-fsdevel@vger.kernel.org > > --- > > fs/aio.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++--------- > > 1 file changed, 56 insertions(+), 10 deletions(-) > > > > diff --git a/fs/aio.c b/fs/aio.c > > index f8589caef9c1..c69e7caacd1b 100644 > > --- a/fs/aio.c > > +++ b/fs/aio.c > > @@ -1106,6 +1106,11 @@ static inline void iocb_destroy(struct aio_kiocb *iocb) > > kmem_cache_free(kiocb_cachep, iocb); > > } > > > > +struct aio_waiter { > > + struct wait_queue_entry w; > > + size_t min_nr; > > +}; > > + > > /* aio_complete > > * Called when the io request on the given iocb is complete. > > */ > > @@ -1114,7 +1119,7 @@ static void aio_complete(struct aio_kiocb *iocb) > > struct kioctx *ctx = iocb->ki_ctx; > > struct aio_ring *ring; > > struct io_event *ev_page, *event; > > - unsigned tail, pos, head; > > + unsigned tail, pos, head, avail; > > unsigned long flags; > > > > /* > > @@ -1156,6 +1161,10 @@ static void aio_complete(struct aio_kiocb *iocb) > > ctx->completed_events++; > > if (ctx->completed_events > 1) > > refill_reqs_available(ctx, head, tail); > > + > > + avail = tail > head > > + ? tail - head > > + : tail + ctx->nr_events - head; > > spin_unlock_irqrestore(&ctx->completion_lock, flags); > > > > pr_debug("added to ring %p at [%u]\n", iocb, tail); > > @@ -1176,8 +1185,18 @@ static void aio_complete(struct aio_kiocb *iocb) > > */ > > smp_mb(); > > > > - if (waitqueue_active(&ctx->wait)) > > - wake_up(&ctx->wait); > > + if (waitqueue_active(&ctx->wait)) { > > + struct aio_waiter *curr, *next; > > + unsigned long flags; > > + > > + spin_lock_irqsave(&ctx->wait.lock, flags); > > + list_for_each_entry_safe(curr, next, &ctx->wait.head, w.entry) > > + if (avail >= curr->min_nr) { > > + list_del_init_careful(&curr->w.entry); > > + wake_up_process(curr->w.private); > > + } > > + spin_unlock_irqrestore(&ctx->wait.lock, flags); > > + } > > } > > > > static inline void iocb_put(struct aio_kiocb *iocb) > > @@ -1290,7 +1309,9 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr, > > struct io_event __user *event, > > ktime_t until) > > { > > - long ret = 0; > > + struct hrtimer_sleeper t; > > + struct aio_waiter w; > > + long ret = 0, ret2 = 0; > > > > /* > > * Note that aio_read_events() is being called as the conditional - i.e. > > @@ -1306,12 +1327,37 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr, > > * the ringbuffer empty. So in practice we should be ok, but it's > > * something to be aware of when touching this code. > > */ > > - if (until == 0) > > - aio_read_events(ctx, min_nr, nr, event, &ret); > > - else > > - wait_event_interruptible_hrtimeout(ctx->wait, > > - aio_read_events(ctx, min_nr, nr, event, &ret), > > - until); > > + aio_read_events(ctx, min_nr, nr, event, &ret); > > + if (until == 0 || ret < 0 || ret >= min_nr) > > + return ret; > > + > > + hrtimer_init_sleeper_on_stack(&t, CLOCK_MONOTONIC, HRTIMER_MODE_REL); > > + if (until != KTIME_MAX) { > > + hrtimer_set_expires_range_ns(&t.timer, until, current->timer_slack_ns); > > + hrtimer_sleeper_start_expires(&t, HRTIMER_MODE_REL); > > + } > > + > > + init_wait(&w.w); > > + > > + while (1) { > > + unsigned long nr_got = ret; > > + > > + w.min_nr = min_nr - ret; > > Hm, can this underflow? No, because if ret >= min_nr aio_read_events() returned true, and we're done. > > + > > + ret2 = prepare_to_wait_event(&ctx->wait, &w.w, TASK_INTERRUPTIBLE) ?: > > + !t.task ? -ETIME : 0; > > I'd like to avoid the nested ?: as that's rather hard to read. > I _think_ this is equivalent to: > > if (!ret2 && !t.task) > ret = -ETIME; > > I can just fix this in-tree though. Did I parse that correctly? You did, except it needs to be ret2 = -ETIME - we don't return that to userspace.
On Wed, 22 Nov 2023 18:42:53 -0500, Kent Overstreet wrote: > I've been observing workloads where IPIs due to wakeups in > aio_complete() are ~15% of total CPU time in the profile. Most of those > wakeups are unnecessary when completion batching is in use in > io_getevents(). > > This plumbs min_nr through via the wait eventry, so that aio_complete() > can avoid doing unnecessary wakeups. > > [...] Applied to the vfs.misc branch of the vfs/vfs.git tree. Patches in the vfs.misc branch should appear in linux-next soon. Please report any outstanding bugs that were missed during review in a new review to the original patch series allowing us to drop it. It's encouraged to provide Acked-bys and Reviewed-bys even though the patch has now been applied. If possible patch trailers will be updated. Note that commit hashes shown below are subject to change due to rebase, trailer updates or similar. If in doubt, please check the listed branch. tree: https://git.kernel.org/pub/scm/linux/kernel/git/vfs/vfs.git branch: vfs.misc [1/1] fs/aio: obey min_nr when doing wakeups https://git.kernel.org/vfs/vfs/c/7a2c359a17b5
diff --git a/fs/aio.c b/fs/aio.c index f8589caef9c1..c69e7caacd1b 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -1106,6 +1106,11 @@ static inline void iocb_destroy(struct aio_kiocb *iocb) kmem_cache_free(kiocb_cachep, iocb); } +struct aio_waiter { + struct wait_queue_entry w; + size_t min_nr; +}; + /* aio_complete * Called when the io request on the given iocb is complete. */ @@ -1114,7 +1119,7 @@ static void aio_complete(struct aio_kiocb *iocb) struct kioctx *ctx = iocb->ki_ctx; struct aio_ring *ring; struct io_event *ev_page, *event; - unsigned tail, pos, head; + unsigned tail, pos, head, avail; unsigned long flags; /* @@ -1156,6 +1161,10 @@ static void aio_complete(struct aio_kiocb *iocb) ctx->completed_events++; if (ctx->completed_events > 1) refill_reqs_available(ctx, head, tail); + + avail = tail > head + ? tail - head + : tail + ctx->nr_events - head; spin_unlock_irqrestore(&ctx->completion_lock, flags); pr_debug("added to ring %p at [%u]\n", iocb, tail); @@ -1176,8 +1185,18 @@ static void aio_complete(struct aio_kiocb *iocb) */ smp_mb(); - if (waitqueue_active(&ctx->wait)) - wake_up(&ctx->wait); + if (waitqueue_active(&ctx->wait)) { + struct aio_waiter *curr, *next; + unsigned long flags; + + spin_lock_irqsave(&ctx->wait.lock, flags); + list_for_each_entry_safe(curr, next, &ctx->wait.head, w.entry) + if (avail >= curr->min_nr) { + list_del_init_careful(&curr->w.entry); + wake_up_process(curr->w.private); + } + spin_unlock_irqrestore(&ctx->wait.lock, flags); + } } static inline void iocb_put(struct aio_kiocb *iocb) @@ -1290,7 +1309,9 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr, struct io_event __user *event, ktime_t until) { - long ret = 0; + struct hrtimer_sleeper t; + struct aio_waiter w; + long ret = 0, ret2 = 0; /* * Note that aio_read_events() is being called as the conditional - i.e. @@ -1306,12 +1327,37 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr, * the ringbuffer empty. So in practice we should be ok, but it's * something to be aware of when touching this code. */ - if (until == 0) - aio_read_events(ctx, min_nr, nr, event, &ret); - else - wait_event_interruptible_hrtimeout(ctx->wait, - aio_read_events(ctx, min_nr, nr, event, &ret), - until); + aio_read_events(ctx, min_nr, nr, event, &ret); + if (until == 0 || ret < 0 || ret >= min_nr) + return ret; + + hrtimer_init_sleeper_on_stack(&t, CLOCK_MONOTONIC, HRTIMER_MODE_REL); + if (until != KTIME_MAX) { + hrtimer_set_expires_range_ns(&t.timer, until, current->timer_slack_ns); + hrtimer_sleeper_start_expires(&t, HRTIMER_MODE_REL); + } + + init_wait(&w.w); + + while (1) { + unsigned long nr_got = ret; + + w.min_nr = min_nr - ret; + + ret2 = prepare_to_wait_event(&ctx->wait, &w.w, TASK_INTERRUPTIBLE) ?: + !t.task ? -ETIME : 0; + + if (aio_read_events(ctx, min_nr, nr, event, &ret) || ret2) + break; + + if (nr_got == ret) + schedule(); + } + + finish_wait(&ctx->wait, &w.w); + hrtimer_cancel(&t.timer); + destroy_hrtimer_on_stack(&t.timer); + return ret; }
Unclear who's maintaining fs/aio.c these days - who wants to take this? -- >8 -- I've been observing workloads where IPIs due to wakeups in aio_complete() are ~15% of total CPU time in the profile. Most of those wakeups are unnecessary when completion batching is in use in io_getevents(). This plumbs min_nr through via the wait eventry, so that aio_complete() can avoid doing unnecessary wakeups. Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev> Cc: Benjamin LaHaise <bcrl@kvack.org Cc: Christian Brauner <brauner@kernel.org> Cc: linux-aio@kvack.org Cc: linux-fsdevel@vger.kernel.org --- fs/aio.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 56 insertions(+), 10 deletions(-)