diff mbox series

[7/7] io_uring/msg_ring: remove non-remote message passing

Message ID 20240530152822.535791-9-axboe@kernel.dk (mailing list archive)
State New
Headers show
Series Improve MSG_RING DEFER_TASKRUN performance | expand

Commit Message

Jens Axboe May 30, 2024, 3:23 p.m. UTC
Now that the overflow approach works well, there's no need to retain the
double locking for direct CQ posting on the target ring. Just have any
kind of target ring use the same messaging mechanism.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 io_uring/msg_ring.c | 78 +++++----------------------------------------
 1 file changed, 8 insertions(+), 70 deletions(-)
diff mbox series

Patch

diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index 5264ba346df8..e966ec8757cb 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -33,11 +33,6 @@  struct io_msg {
 	u32 flags;
 };
 
-static void io_double_unlock_ctx(struct io_ring_ctx *octx)
-{
-	mutex_unlock(&octx->uring_lock);
-}
-
 static int io_double_lock_ctx(struct io_ring_ctx *octx,
 			      unsigned int issue_flags)
 {
@@ -66,11 +61,6 @@  void io_msg_ring_cleanup(struct io_kiocb *req)
 	msg->src_file = NULL;
 }
 
-static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx)
-{
-	return target_ctx->task_complete;
-}
-
 static struct io_overflow_cqe *io_alloc_overflow(struct io_ring_ctx *target_ctx)
 {
 	struct io_overflow_cqe *ocqe;
@@ -106,6 +96,8 @@  static void io_msg_add_overflow(struct io_msg *msg,
 				u32 flags)
 	__releases(&target_ctx->completion_lock)
 {
+	struct task_struct *task = READ_ONCE(target_ctx->submitter_task);
+
 	if (list_empty(&target_ctx->cq_overflow_list)) {
 		set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &target_ctx->check_cq);
 		atomic_or(IORING_SQ_TASKRUN, &target_ctx->rings->sq_flags);
@@ -116,7 +108,10 @@  static void io_msg_add_overflow(struct io_msg *msg,
 	ocqe->cqe.flags = flags;
 	list_add_tail(&ocqe->list, &target_ctx->cq_overflow_list);
 	spin_unlock(&target_ctx->completion_lock);
-	wake_up_state(target_ctx->submitter_task, TASK_INTERRUPTIBLE);
+	if (task)
+		wake_up_state(task, TASK_INTERRUPTIBLE);
+	else if (wq_has_sleeper(&target_ctx->cq_wait))
+		wake_up(&target_ctx->cq_wait);
 }
 
 static int io_msg_fill_remote(struct io_msg *msg, unsigned int issue_flags,
@@ -141,7 +136,6 @@  static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
 	struct io_ring_ctx *target_ctx = req->file->private_data;
 	struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
 	u32 flags = 0;
-	int ret;
 
 	if (msg->src_fd || msg->flags & ~IORING_MSG_RING_FLAGS_PASS)
 		return -EINVAL;
@@ -153,19 +147,7 @@  static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
 	if (msg->flags & IORING_MSG_RING_FLAGS_PASS)
 		flags = msg->cqe_flags;
 
-	if (io_msg_need_remote(target_ctx))
-		return io_msg_fill_remote(msg, issue_flags, target_ctx, flags);
-
-	ret = -EOVERFLOW;
-	if (target_ctx->flags & IORING_SETUP_IOPOLL) {
-		if (unlikely(io_double_lock_ctx(target_ctx, issue_flags)))
-			return -EAGAIN;
-	}
-	if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, flags))
-		ret = 0;
-	if (target_ctx->flags & IORING_SETUP_IOPOLL)
-		io_double_unlock_ctx(target_ctx);
-	return ret;
+	return io_msg_fill_remote(msg, issue_flags, target_ctx, flags);
 }
 
 static struct file *io_msg_grab_file(struct io_kiocb *req, unsigned int issue_flags)
@@ -186,48 +168,6 @@  static struct file *io_msg_grab_file(struct io_kiocb *req, unsigned int issue_fl
 	return file;
 }
 
-static int __io_msg_install_complete(struct io_kiocb *req)
-{
-	struct io_ring_ctx *target_ctx = req->file->private_data;
-	struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
-	struct file *src_file = msg->src_file;
-	int ret;
-
-	ret = __io_fixed_fd_install(target_ctx, src_file, msg->dst_fd);
-	if (ret < 0)
-		return ret;
-
-	msg->src_file = NULL;
-	req->flags &= ~REQ_F_NEED_CLEANUP;
-
-	if (msg->flags & IORING_MSG_RING_CQE_SKIP)
-		return ret;
-
-	/*
-	 * If this fails, the target still received the file descriptor but
-	 * wasn't notified of the fact. This means that if this request
-	 * completes with -EOVERFLOW, then the sender must ensure that a
-	 * later IORING_OP_MSG_RING delivers the message.
-	 */
-	if (!io_post_aux_cqe(target_ctx, msg->user_data, ret, 0))
-		return -EOVERFLOW;
-
-	return ret;
-}
-
-static int io_msg_install_complete(struct io_kiocb *req, unsigned int issue_flags)
-{
-	struct io_ring_ctx *target_ctx = req->file->private_data;
-	int ret;
-
-	if (unlikely(io_double_lock_ctx(target_ctx, issue_flags)))
-		return -EAGAIN;
-
-	ret = __io_msg_install_complete(req);
-	io_double_unlock_ctx(target_ctx);
-	return ret;
-}
-
 static int io_msg_install_remote(struct io_kiocb *req, unsigned int issue_flags,
 				 struct io_ring_ctx *target_ctx)
 {
@@ -285,9 +225,7 @@  static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
 		req->flags |= REQ_F_NEED_CLEANUP;
 	}
 
-	if (io_msg_need_remote(target_ctx))
-		return io_msg_install_remote(req, issue_flags, target_ctx);
-	return io_msg_install_complete(req, issue_flags);
+	return io_msg_install_remote(req, issue_flags, target_ctx);
 }
 
 int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)