diff mbox series

[13/19] io-wq: add wq->owner for uringlet mode

Message ID 20220819152738.1111255-14-hao.xu@linux.dev (mailing list archive)
State New
Headers show
Series uringlet | expand

Commit Message

Hao Xu Aug. 19, 2022, 3:27 p.m. UTC
From: Hao Xu <howeyxu@tencent.com>

In uringlet mode, we allow exact one worker to submit sqes at the same
time. nr_running is not a good choice to aim that. Add an member
wq->owner and its lock to achieve that, this avoids race condition
between workers.

Signed-off-by: Hao Xu <howeyxu@tencent.com>
---
 io_uring/io-wq.c | 30 ++++++++++++++++++++++++++++++
 1 file changed, 30 insertions(+)
diff mbox series

Patch

diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 00a1cdefb787..9fcaeea7a478 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -96,6 +96,9 @@  struct io_wq {
 
 	void *private;
 
+	raw_spinlock_t lock;
+	struct io_worker *owner;
+
 	struct io_wqe *wqes[];
 };
 
@@ -381,6 +384,8 @@  static inline bool io_worker_test_submit(struct io_worker *worker)
 	return worker->flags & IO_WORKER_F_SUBMIT;
 }
 
+#define IO_WQ_OWNER_TRANSMIT	((struct io_worker *)-1)
+
 static void io_wqe_dec_running(struct io_worker *worker)
 {
 	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
@@ -401,6 +406,10 @@  static void io_wqe_dec_running(struct io_worker *worker)
 
 		io_uringlet_end(wq->private);
 		io_worker_set_scheduled(worker);
+		raw_spin_lock(&wq->lock);
+		wq->owner = IO_WQ_OWNER_TRANSMIT;
+		raw_spin_unlock(&wq->lock);
+
 		raw_spin_lock(&wqe->lock);
 		rcu_read_lock();
 		activated = io_wqe_activate_free_worker(wqe, acct);
@@ -674,6 +683,17 @@  static void io_wqe_worker_let(struct io_worker *worker)
 
 	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
 		unsigned int empty_count = 0;
+		struct io_worker *owner;
+
+		raw_spin_lock(&wq->lock);
+		owner = wq->owner;
+		if (owner && owner != IO_WQ_OWNER_TRANSMIT && owner != worker) {
+			raw_spin_unlock(&wq->lock);
+			set_current_state(TASK_INTERRUPTIBLE);
+			goto sleep;
+		}
+		wq->owner = worker;
+		raw_spin_unlock(&wq->lock);
 
 		__io_worker_busy(wqe, worker);
 		set_current_state(TASK_INTERRUPTIBLE);
@@ -697,6 +717,7 @@  static void io_wqe_worker_let(struct io_worker *worker)
 			cond_resched();
 		} while (1);
 
+sleep:
 		raw_spin_lock(&wqe->lock);
 		__io_worker_idle(wqe, worker);
 		raw_spin_unlock(&wqe->lock);
@@ -780,6 +801,14 @@  int io_uringlet_offload(struct io_wq *wq)
 	struct io_wqe_acct *acct = io_get_acct(wqe, true);
 	bool waken;
 
+	raw_spin_lock(&wq->lock);
+	if (wq->owner) {
+		raw_spin_unlock(&wq->lock);
+		return 0;
+	}
+	wq->owner = IO_WQ_OWNER_TRANSMIT;
+	raw_spin_unlock(&wq->lock);
+
 	raw_spin_lock(&wqe->lock);
 	rcu_read_lock();
 	waken = io_wqe_activate_free_worker(wqe, acct);
@@ -1248,6 +1277,7 @@  struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 	wq->free_work = data->free_work;
 	wq->do_work = data->do_work;
 	wq->private = data->private;
+	raw_spin_lock_init(&wq->lock);
 
 	ret = -ENOMEM;
 	for_each_node(node) {