diff mbox series

[2/3] io_uring/io-wq: reduce frequency of acct->lock acquisitions

Message ID 20230809194306.170979-3-axboe@kernel.dk (mailing list archive)
State New
Headers show
Series io-wq locking improvements | expand

Commit Message

Jens Axboe Aug. 9, 2023, 7:43 p.m. UTC
When we check if we have work to run, we grab the acct lock, check,
drop it, and then return the result. If we do have work to run, then
running the work will again grab acct->lock and get the work item.

This causes us to grab acct->lock more frequently than we need to.
If we have work to do, have io_acct_run_queue() return with the acct
lock still acquired. io_worker_handle_work() is then always invoked
with the acct lock already held.

In a simple test cases that stats files (IORING_OP_STATX always hits
io-wq), we see a nice reduction in locking overhead with this change:

19.32%   -12.55%  [kernel.kallsyms]      [k] __cmpwait_case_32
20.90%   -12.07%  [kernel.kallsyms]      [k] queued_spin_lock_slowpath

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 io_uring/io-wq.c | 47 ++++++++++++++++++++++++++++++++++-------------
 1 file changed, 34 insertions(+), 13 deletions(-)
diff mbox series

Patch

diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 3e7025b9e0dd..18a049fc53ef 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -232,17 +232,25 @@  static void io_worker_exit(struct io_worker *worker)
 	do_exit(0);
 }
 
-static inline bool io_acct_run_queue(struct io_wq_acct *acct)
+static inline bool __io_acct_run_queue(struct io_wq_acct *acct)
 {
-	bool ret = false;
+	return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) &&
+		!wq_list_empty(&acct->work_list);
+}
 
+/*
+ * If there's work to do, returns true with acct->lock acquired. If not,
+ * returns false with no lock held.
+ */
+static inline bool io_acct_run_queue(struct io_wq_acct *acct)
+	__acquires(&acct->lock)
+{
 	raw_spin_lock(&acct->lock);
-	if (!wq_list_empty(&acct->work_list) &&
-	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
-		ret = true;
-	raw_spin_unlock(&acct->lock);
+	if (__io_acct_run_queue(acct))
+		return true;
 
-	return ret;
+	raw_spin_unlock(&acct->lock);
+	return false;
 }
 
 /*
@@ -397,6 +405,7 @@  static void io_wq_dec_running(struct io_worker *worker)
 	if (!io_acct_run_queue(acct))
 		return;
 
+	raw_spin_unlock(&acct->lock);
 	atomic_inc(&acct->nr_running);
 	atomic_inc(&wq->worker_refs);
 	io_queue_worker_create(worker, acct, create_worker_cb);
@@ -521,9 +530,13 @@  static void io_assign_current_work(struct io_worker *worker,
 	raw_spin_unlock(&worker->lock);
 }
 
-static void io_worker_handle_work(struct io_worker *worker)
+/*
+ * Called with acct->lock held, drops it before returning
+ */
+static void io_worker_handle_work(struct io_wq_acct *acct,
+				  struct io_worker *worker)
+	__releases(&acct->lock)
 {
-	struct io_wq_acct *acct = io_wq_get_acct(worker);
 	struct io_wq *wq = worker->wq;
 	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
 
@@ -537,7 +550,6 @@  static void io_worker_handle_work(struct io_worker *worker)
 		 * can't make progress, any work completion or insertion will
 		 * clear the stalled flag.
 		 */
-		raw_spin_lock(&acct->lock);
 		work = io_get_next_work(acct, worker);
 		raw_spin_unlock(&acct->lock);
 		if (work) {
@@ -591,6 +603,10 @@  static void io_worker_handle_work(struct io_worker *worker)
 					wake_up(&wq->hash->wait);
 			}
 		} while (work);
+
+		if (!__io_acct_run_queue(acct))
+			break;
+		raw_spin_lock(&acct->lock);
 	} while (1);
 }
 
@@ -611,8 +627,13 @@  static int io_wq_worker(void *data)
 		long ret;
 
 		set_current_state(TASK_INTERRUPTIBLE);
+
+		/*
+		 * If we have work to do, io_acct_run_queue() returns with
+		 * the acct->lock held. If not, it will drop it.
+		 */
 		while (io_acct_run_queue(acct))
-			io_worker_handle_work(worker);
+			io_worker_handle_work(acct, worker);
 
 		raw_spin_lock(&wq->lock);
 		/*
@@ -645,8 +666,8 @@  static int io_wq_worker(void *data)
 		}
 	}
 
-	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
-		io_worker_handle_work(worker);
+	if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct))
+		io_worker_handle_work(acct, worker);
 
 	io_worker_exit(worker);
 	return 0;