diff mbox series

[2/6] io_uring: replace defer task_work llist with io_wq_work_list

Message ID 20241122161645.494868-3-axboe@kernel.dk (mailing list archive)
State New
Headers show
Series task work cleanups | expand

Commit Message

Jens Axboe Nov. 22, 2024, 4:12 p.m. UTC
Add a spinlock for the list, and replace the lockless llist with the
work list instead. This avoids needing to reverse items in the list
before running them, as the io_wq_work_list is FIFO by nature whereas
the llist is LIFO.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 include/linux/io_uring_types.h |  13 ++-
 io_uring/io_uring.c            | 194 ++++++++++++++++-----------------
 io_uring/io_uring.h            |   2 +-
 3 files changed, 104 insertions(+), 105 deletions(-)

Comments

Pavel Begunkov Nov. 22, 2024, 5:07 p.m. UTC | #1
On 11/22/24 16:12, Jens Axboe wrote:
...
>   static inline void io_req_local_work_add(struct io_kiocb *req,
>   					 struct io_ring_ctx *ctx,
> -					 unsigned flags)
> +					 unsigned tw_flags)
>   {
> -	unsigned nr_wait, nr_tw, nr_tw_prev;
> -	struct llist_node *head;
> +	unsigned nr_tw, nr_tw_prev, nr_wait;
> +	unsigned long flags;
>   
>   	/* See comment above IO_CQ_WAKE_INIT */
>   	BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>   
>   	/*
> -	 * We don't know how many reuqests is there in the link and whether
> -	 * they can even be queued lazily, fall back to non-lazy.
> +	 * We don't know how many requests are in the link and whether they can
> +	 * even be queued lazily, fall back to non-lazy.
>   	 */
>   	if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
> -		flags &= ~IOU_F_TWQ_LAZY_WAKE;
> +		tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>   
> -	guard(rcu)();

protects against ctx->task deallocation, see a comment in
io_ring_exit_work() -> synchronize_rcu()

> +	spin_lock_irqsave(&ctx->work_lock, flags);
> +	wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
> +	nr_tw_prev = ctx->work_items++;

Is there a good reason why it changes the semantics of
what's stored across adds? It was assigning a corrected
nr_tw, this one will start heavily spamming with wake_up()
in some cases.

> +	spin_unlock_irqrestore(&ctx->work_lock, flags);
>   
> -	head = READ_ONCE(ctx->work_llist.first);
> -	do {
> -		nr_tw_prev = 0;
> -		if (head) {
> -			struct io_kiocb *first_req = container_of(head,
> -							struct io_kiocb,
> -							io_task_work.node);
> -			/*
> -			 * Might be executed at any moment, rely on
> -			 * SLAB_TYPESAFE_BY_RCU to keep it alive.
> -			 */
> -			nr_tw_prev = READ_ONCE(first_req->nr_tw);
> -		}
> -
> -		/*
> -		 * Theoretically, it can overflow, but that's fine as one of
> -		 * previous adds should've tried to wake the task.
> -		 */
> -		nr_tw = nr_tw_prev + 1;
> -		if (!(flags & IOU_F_TWQ_LAZY_WAKE))
> -			nr_tw = IO_CQ_WAKE_FORCE;
> -
> -		req->nr_tw = nr_tw;
> -		req->io_task_work.node.next = head;
> -	} while (!try_cmpxchg(&ctx->work_llist.first, &head,
> -			      &req->io_task_work.node));
> -
> -	/*
> -	 * cmpxchg implies a full barrier, which pairs with the barrier
> -	 * in set_current_state() on the io_cqring_wait() side. It's used
> -	 * to ensure that either we see updated ->cq_wait_nr, or waiters
> -	 * going to sleep will observe the work added to the list, which
> -	 * is similar to the wait/wawke task state sync.
> -	 */
> +	nr_tw = nr_tw_prev + 1;
> +	if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE))
> +		nr_tw = IO_CQ_WAKE_FORCE;
>   
> -	if (!head) {
> +	if (!nr_tw_prev) {
>   		if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
>   			atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
>   		if (ctx->has_evfd)
>   			io_eventfd_signal(ctx);
>   	}
>   
> +	/*
> +	 * We need a barrier after unlock, which pairs with the barrier
> +	 * in set_current_state() on the io_cqring_wait() side. It's used
> +	 * to ensure that either we see updated ->cq_wait_nr, or waiters
> +	 * going to sleep will observe the work added to the list, which
> +	 * is similar to the wait/wake task state sync.
> +	 */
> +	smp_mb();
>   	nr_wait = atomic_read(&ctx->cq_wait_nr);
>   	/* not enough or no one is waiting */
>   	if (nr_tw < nr_wait)
> @@ -1253,11 +1233,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
Jens Axboe Nov. 22, 2024, 5:11 p.m. UTC | #2
On 11/22/24 10:07 AM, Pavel Begunkov wrote:
> On 11/22/24 16:12, Jens Axboe wrote:
> ...
>>   static inline void io_req_local_work_add(struct io_kiocb *req,
>>                        struct io_ring_ctx *ctx,
>> -                     unsigned flags)
>> +                     unsigned tw_flags)
>>   {
>> -    unsigned nr_wait, nr_tw, nr_tw_prev;
>> -    struct llist_node *head;
>> +    unsigned nr_tw, nr_tw_prev, nr_wait;
>> +    unsigned long flags;
>>         /* See comment above IO_CQ_WAKE_INIT */
>>       BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>>         /*
>> -     * We don't know how many reuqests is there in the link and whether
>> -     * they can even be queued lazily, fall back to non-lazy.
>> +     * We don't know how many requests are in the link and whether they can
>> +     * even be queued lazily, fall back to non-lazy.
>>        */
>>       if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
>> -        flags &= ~IOU_F_TWQ_LAZY_WAKE;
>> +        tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>   -    guard(rcu)();
> 
> protects against ctx->task deallocation, see a comment in
> io_ring_exit_work() -> synchronize_rcu()

Yeah that's just an editing mistake.

>> +    spin_lock_irqsave(&ctx->work_lock, flags);
>> +    wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
>> +    nr_tw_prev = ctx->work_items++;
> 
> Is there a good reason why it changes the semantics of
> what's stored across adds? It was assigning a corrected
> nr_tw, this one will start heavily spamming with wake_up()
> in some cases.

Not sure I follow, how so? nr_tw_prev will be the previous count, just
like before. Except we won't need to dig into the list to find it, we
have it readily available. nr_tw will be the current code, or force wake
if needed. As before.
Pavel Begunkov Nov. 22, 2024, 5:25 p.m. UTC | #3
On 11/22/24 17:11, Jens Axboe wrote:
> On 11/22/24 10:07 AM, Pavel Begunkov wrote:
>> On 11/22/24 16:12, Jens Axboe wrote:
>> ...
>>>    static inline void io_req_local_work_add(struct io_kiocb *req,
>>>                         struct io_ring_ctx *ctx,
>>> -                     unsigned flags)
>>> +                     unsigned tw_flags)
>>>    {
>>> -    unsigned nr_wait, nr_tw, nr_tw_prev;
>>> -    struct llist_node *head;
>>> +    unsigned nr_tw, nr_tw_prev, nr_wait;
>>> +    unsigned long flags;
>>>          /* See comment above IO_CQ_WAKE_INIT */
>>>        BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>>>          /*
>>> -     * We don't know how many reuqests is there in the link and whether
>>> -     * they can even be queued lazily, fall back to non-lazy.
>>> +     * We don't know how many requests are in the link and whether they can
>>> +     * even be queued lazily, fall back to non-lazy.
>>>         */
>>>        if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
>>> -        flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>> +        tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>>    -    guard(rcu)();
>>
>> protects against ctx->task deallocation, see a comment in
>> io_ring_exit_work() -> synchronize_rcu()
> 
> Yeah that's just an editing mistake.
> 
>>> +    spin_lock_irqsave(&ctx->work_lock, flags);
>>> +    wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
>>> +    nr_tw_prev = ctx->work_items++;
>>
>> Is there a good reason why it changes the semantics of
>> what's stored across adds? It was assigning a corrected
>> nr_tw, this one will start heavily spamming with wake_up()
>> in some cases.
> 
> Not sure I follow, how so? nr_tw_prev will be the previous count, just
> like before. Except we won't need to dig into the list to find it, we
> have it readily available. nr_tw will be the current code, or force wake
> if needed. As before.

The problem is what it stores, not how and where. Before req->nr_tw
could've been set to IO_CQ_WAKE_FORCE, in which case following
requests are not going to attempt waking up the task, now work_items
is just a counter.

Let's say you've got a bunch of non-lazy adds coming close to each
other. The first sets IO_CQ_WAKE_FORCE and wakes the task, and
others just queue themselves in the list. Now, every single one
of them will try to wake_up() as long as ->cq_wait_nr is large
enough.
Jens Axboe Nov. 22, 2024, 5:44 p.m. UTC | #4
On 11/22/24 10:25 AM, Pavel Begunkov wrote:
> On 11/22/24 17:11, Jens Axboe wrote:
>> On 11/22/24 10:07 AM, Pavel Begunkov wrote:
>>> On 11/22/24 16:12, Jens Axboe wrote:
>>> ...
>>>>    static inline void io_req_local_work_add(struct io_kiocb *req,
>>>>                         struct io_ring_ctx *ctx,
>>>> -                     unsigned flags)
>>>> +                     unsigned tw_flags)
>>>>    {
>>>> -    unsigned nr_wait, nr_tw, nr_tw_prev;
>>>> -    struct llist_node *head;
>>>> +    unsigned nr_tw, nr_tw_prev, nr_wait;
>>>> +    unsigned long flags;
>>>>          /* See comment above IO_CQ_WAKE_INIT */
>>>>        BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>>>>          /*
>>>> -     * We don't know how many reuqests is there in the link and whether
>>>> -     * they can even be queued lazily, fall back to non-lazy.
>>>> +     * We don't know how many requests are in the link and whether they can
>>>> +     * even be queued lazily, fall back to non-lazy.
>>>>         */
>>>>        if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
>>>> -        flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>>> +        tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>>>    -    guard(rcu)();
>>>
>>> protects against ctx->task deallocation, see a comment in
>>> io_ring_exit_work() -> synchronize_rcu()
>>
>> Yeah that's just an editing mistake.
>>
>>>> +    spin_lock_irqsave(&ctx->work_lock, flags);
>>>> +    wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
>>>> +    nr_tw_prev = ctx->work_items++;
>>>
>>> Is there a good reason why it changes the semantics of
>>> what's stored across adds? It was assigning a corrected
>>> nr_tw, this one will start heavily spamming with wake_up()
>>> in some cases.
>>
>> Not sure I follow, how so? nr_tw_prev will be the previous count, just
>> like before. Except we won't need to dig into the list to find it, we
>> have it readily available. nr_tw will be the current code, or force wake
>> if needed. As before.
> 
> The problem is what it stores, not how and where. Before req->nr_tw
> could've been set to IO_CQ_WAKE_FORCE, in which case following
> requests are not going to attempt waking up the task, now work_items
> is just a counter.
> 
> Let's say you've got a bunch of non-lazy adds coming close to each
> other. The first sets IO_CQ_WAKE_FORCE and wakes the task, and
> others just queue themselves in the list. Now, every single one
> of them will try to wake_up() as long as ->cq_wait_nr is large
> enough.

If we really care about the non-lazy path as much, we can just use the
same storing scheme as we did in req->nr_tw, except in ->work_items
instead. Not a big deal imho.
diff mbox series

Patch

diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index 011860ade268..e9ba99cb0ed0 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -335,8 +335,9 @@  struct io_ring_ctx {
 	 * regularly bounce b/w CPUs.
 	 */
 	struct {
-		struct llist_head	work_llist;
-		struct llist_head	retry_llist;
+		struct io_wq_work_list	work_list;
+		spinlock_t		work_lock;
+		int			work_items;
 		unsigned long		check_cq;
 		atomic_t		cq_wait_nr;
 		atomic_t		cq_timeouts;
@@ -566,7 +567,11 @@  enum {
 typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts);
 
 struct io_task_work {
-	struct llist_node		node;
+	/* DEFER_TASKRUN uses work_node, regular task_work node */
+	union {
+		struct io_wq_work_node	work_node;
+		struct llist_node	node;
+	};
 	io_req_tw_func_t		func;
 };
 
@@ -622,8 +627,6 @@  struct io_kiocb {
 	 */
 	u16				buf_index;
 
-	unsigned			nr_tw;
-
 	/* REQ_F_* flags */
 	io_req_flags_t			flags;
 
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index c3a7d0197636..b7eb962e9872 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -339,7 +339,8 @@  static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	INIT_LIST_HEAD(&ctx->defer_list);
 	INIT_LIST_HEAD(&ctx->timeout_list);
 	INIT_LIST_HEAD(&ctx->ltimeout_list);
-	init_llist_head(&ctx->work_llist);
+	INIT_WQ_LIST(&ctx->work_list);
+	spin_lock_init(&ctx->work_lock);
 	INIT_LIST_HEAD(&ctx->tctx_list);
 	ctx->submit_state.free_list.next = NULL;
 	INIT_HLIST_HEAD(&ctx->waitid_list);
@@ -1066,25 +1067,31 @@  struct llist_node *io_handle_tw_list(struct llist_node *node,
 	return node;
 }
 
-static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
+static __cold void __io_fallback_tw(struct io_kiocb *req, bool sync,
+				    struct io_ring_ctx **last_ctx)
 {
+	if (sync && *last_ctx != req->ctx) {
+		if (*last_ctx) {
+			flush_delayed_work(&(*last_ctx)->fallback_work);
+			percpu_ref_put(&(*last_ctx)->refs);
+		}
+		*last_ctx = req->ctx;
+		percpu_ref_get(&(*last_ctx)->refs);
+	}
+	if (llist_add(&req->io_task_work.node, &req->ctx->fallback_llist))
+		schedule_delayed_work(&req->ctx->fallback_work, 1);
+}
+
+static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+{
+	struct llist_node *node = llist_del_all(&tctx->task_list);
 	struct io_ring_ctx *last_ctx = NULL;
 	struct io_kiocb *req;
 
 	while (node) {
 		req = container_of(node, struct io_kiocb, io_task_work.node);
 		node = node->next;
-		if (sync && last_ctx != req->ctx) {
-			if (last_ctx) {
-				flush_delayed_work(&last_ctx->fallback_work);
-				percpu_ref_put(&last_ctx->refs);
-			}
-			last_ctx = req->ctx;
-			percpu_ref_get(&last_ctx->refs);
-		}
-		if (llist_add(&req->io_task_work.node,
-			      &req->ctx->fallback_llist))
-			schedule_delayed_work(&req->ctx->fallback_work, 1);
+		__io_fallback_tw(req, sync, &last_ctx);
 	}
 
 	if (last_ctx) {
@@ -1093,13 +1100,6 @@  static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
 	}
 }
 
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
-{
-	struct llist_node *node = llist_del_all(&tctx->task_list);
-
-	__io_fallback_tw(node, sync);
-}
-
 struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
 				      unsigned int max_entries,
 				      unsigned int *count)
@@ -1139,65 +1139,45 @@  void tctx_task_work(struct callback_head *cb)
 
 static inline void io_req_local_work_add(struct io_kiocb *req,
 					 struct io_ring_ctx *ctx,
-					 unsigned flags)
+					 unsigned tw_flags)
 {
-	unsigned nr_wait, nr_tw, nr_tw_prev;
-	struct llist_node *head;
+	unsigned nr_tw, nr_tw_prev, nr_wait;
+	unsigned long flags;
 
 	/* See comment above IO_CQ_WAKE_INIT */
 	BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
 
 	/*
-	 * We don't know how many reuqests is there in the link and whether
-	 * they can even be queued lazily, fall back to non-lazy.
+	 * We don't know how many requests are in the link and whether they can
+	 * even be queued lazily, fall back to non-lazy.
 	 */
 	if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
-		flags &= ~IOU_F_TWQ_LAZY_WAKE;
+		tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
 
-	guard(rcu)();
+	spin_lock_irqsave(&ctx->work_lock, flags);
+	wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
+	nr_tw_prev = ctx->work_items++;
+	spin_unlock_irqrestore(&ctx->work_lock, flags);
 
-	head = READ_ONCE(ctx->work_llist.first);
-	do {
-		nr_tw_prev = 0;
-		if (head) {
-			struct io_kiocb *first_req = container_of(head,
-							struct io_kiocb,
-							io_task_work.node);
-			/*
-			 * Might be executed at any moment, rely on
-			 * SLAB_TYPESAFE_BY_RCU to keep it alive.
-			 */
-			nr_tw_prev = READ_ONCE(first_req->nr_tw);
-		}
-
-		/*
-		 * Theoretically, it can overflow, but that's fine as one of
-		 * previous adds should've tried to wake the task.
-		 */
-		nr_tw = nr_tw_prev + 1;
-		if (!(flags & IOU_F_TWQ_LAZY_WAKE))
-			nr_tw = IO_CQ_WAKE_FORCE;
-
-		req->nr_tw = nr_tw;
-		req->io_task_work.node.next = head;
-	} while (!try_cmpxchg(&ctx->work_llist.first, &head,
-			      &req->io_task_work.node));
-
-	/*
-	 * cmpxchg implies a full barrier, which pairs with the barrier
-	 * in set_current_state() on the io_cqring_wait() side. It's used
-	 * to ensure that either we see updated ->cq_wait_nr, or waiters
-	 * going to sleep will observe the work added to the list, which
-	 * is similar to the wait/wawke task state sync.
-	 */
+	nr_tw = nr_tw_prev + 1;
+	if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE))
+		nr_tw = IO_CQ_WAKE_FORCE;
 
-	if (!head) {
+	if (!nr_tw_prev) {
 		if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
 			atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
 		if (ctx->has_evfd)
 			io_eventfd_signal(ctx);
 	}
 
+	/*
+	 * We need a barrier after unlock, which pairs with the barrier
+	 * in set_current_state() on the io_cqring_wait() side. It's used
+	 * to ensure that either we see updated ->cq_wait_nr, or waiters
+	 * going to sleep will observe the work added to the list, which
+	 * is similar to the wait/wake task state sync.
+	 */
+	smp_mb();
 	nr_wait = atomic_read(&ctx->cq_wait_nr);
 	/* not enough or no one is waiting */
 	if (nr_tw < nr_wait)
@@ -1253,11 +1233,27 @@  void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
 
 static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
 {
-	struct llist_node *node = llist_del_all(&ctx->work_llist);
+	struct io_ring_ctx *last_ctx = NULL;
+	struct io_wq_work_node *node;
+	unsigned long flags;
 
-	__io_fallback_tw(node, false);
-	node = llist_del_all(&ctx->retry_llist);
-	__io_fallback_tw(node, false);
+	spin_lock_irqsave(&ctx->work_lock, flags);
+	node = ctx->work_list.first;
+	INIT_WQ_LIST(&ctx->work_list);
+	ctx->work_items = 0;
+	spin_unlock_irqrestore(&ctx->work_lock, flags);
+
+	while (node) {
+		struct io_kiocb *req;
+
+		req = container_of(node, struct io_kiocb, io_task_work.work_node);
+		node = node->next;
+		__io_fallback_tw(req, false, &last_ctx);
+	}
+	if (last_ctx) {
+		flush_delayed_work(&last_ctx->fallback_work);
+		percpu_ref_put(&last_ctx->refs);
+	}
 }
 
 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
@@ -1272,52 +1268,52 @@  static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
 	return false;
 }
 
-static int __io_run_local_work_loop(struct llist_node **node,
-				    struct io_tw_state *ts,
-				    int events)
-{
-	while (*node) {
-		struct llist_node *next = (*node)->next;
-		struct io_kiocb *req = container_of(*node, struct io_kiocb,
-						    io_task_work.node);
-		INDIRECT_CALL_2(req->io_task_work.func,
-				io_poll_task_func, io_req_rw_complete,
-				req, ts);
-		*node = next;
-		if (--events <= 0)
-			break;
-	}
-
-	return events;
-}
-
 static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
 			       int min_events)
 {
-	struct llist_node *node;
+	struct io_wq_work_node *node, *tail;
+	int ret, limit, nitems;
 	unsigned int loops = 0;
-	int ret, limit;
 
 	if (WARN_ON_ONCE(ctx->submitter_task != current))
 		return -EEXIST;
 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+	ret = 0;
 	limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events);
 again:
-	ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit);
-	if (ctx->retry_llist.first)
+	spin_lock_irq(&ctx->work_lock);
+	node = ctx->work_list.first;
+	tail = ctx->work_list.last;
+	nitems = ctx->work_items;
+	INIT_WQ_LIST(&ctx->work_list);
+	ctx->work_items = 0;
+	spin_unlock_irq(&ctx->work_lock);
+
+	while (node) {
+		struct io_kiocb *req = container_of(node, struct io_kiocb,
+						    io_task_work.work_node);
+		node = node->next;
+		INDIRECT_CALL_2(req->io_task_work.func,
+				io_poll_task_func, io_req_rw_complete,
+				req, ts);
+		nitems--;
+		if (++ret >= limit)
+			break;
+	}
+
+	if (unlikely(node)) {
+		spin_lock_irq(&ctx->work_lock);
+		tail->next = ctx->work_list.first;
+		ctx->work_list.first = node;
+		if (!ctx->work_list.last)
+			ctx->work_list.last = tail;
+		ctx->work_items += nitems;
+		spin_unlock_irq(&ctx->work_lock);
 		goto retry_done;
+	}
 
-	/*
-	 * llists are in reverse order, flip it back the right way before
-	 * running the pending items.
-	 */
-	node = llist_reverse_order(llist_del_all(&ctx->work_llist));
-	ret = __io_run_local_work_loop(&node, ts, ret);
-	ctx->retry_llist.first = node;
 	loops++;
-
-	ret = limit - ret;
 	if (io_run_local_work_continue(ctx, ret, min_events))
 		goto again;
 retry_done:
@@ -2413,7 +2409,7 @@  static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
 	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
 		atomic_set(&ctx->cq_wait_nr, 1);
 		smp_mb();
-		if (!llist_empty(&ctx->work_llist))
+		if (io_local_work_pending(ctx))
 			goto out_wake;
 	}
 
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 214f9f175102..2fae27803116 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -349,7 +349,7 @@  static inline int io_run_task_work(void)
 
 static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
 {
-	return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
+	return READ_ONCE(ctx->work_list.first);
 }
 
 static inline bool io_task_work_pending(struct io_ring_ctx *ctx)