@@ -33,11 +33,6 @@ struct io_msg {
u32 flags;
};
-static void io_double_unlock_ctx(struct io_ring_ctx *octx)
-{
- mutex_unlock(&octx->uring_lock);
-}
-
static int io_double_lock_ctx(struct io_ring_ctx *octx,
unsigned int issue_flags)
{
@@ -66,11 +61,6 @@ void io_msg_ring_cleanup(struct io_kiocb *req)
msg->src_file = NULL;
}
-static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx)
-{
- return target_ctx->task_complete;
-}
-
static struct io_overflow_cqe *io_alloc_overflow(struct io_ring_ctx *target_ctx)
__acquires(&target_ctx->completion_lock)
{
@@ -109,7 +99,7 @@ static void io_msg_add_overflow(struct io_msg *msg,
u32 flags)
__releases(&target_ctx->completion_lock)
{
- unsigned nr_prev, nr_wait;
+ unsigned nr_prev;
if (list_empty(&target_ctx->cq_overflow_list)) {
set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &target_ctx->check_cq);
@@ -122,11 +112,17 @@ static void io_msg_add_overflow(struct io_msg *msg,
nr_prev = target_ctx->nr_overflow++;
list_add_tail(&ocqe->list, &target_ctx->cq_overflow_list);
spin_unlock(&target_ctx->completion_lock);
- rcu_read_lock();
- io_defer_tw_count(target_ctx, &nr_wait);
- nr_prev += nr_wait;
- io_defer_wake(target_ctx, nr_prev + 1, nr_prev);
- rcu_read_unlock();
+ if (target_ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
+ unsigned nr_wait;
+
+ rcu_read_lock();
+ io_defer_tw_count(target_ctx, &nr_wait);
+ nr_prev += nr_wait;
+ io_defer_wake(target_ctx, nr_prev + 1, nr_prev);
+ rcu_read_unlock();
+ } else if (wq_has_sleeper(&target_ctx->cq_wait)) {
+ wake_up(&target_ctx->cq_wait);
+ }
}
static int io_msg_fill_remote(struct io_msg *msg, unsigned int issue_flags,
@@ -149,7 +145,6 @@ static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
struct io_ring_ctx *target_ctx = req->file->private_data;
struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
u32 flags = 0;
- int ret;
if (msg->src_fd || msg->flags & ~IORING_MSG_RING_FLAGS_PASS)
return -EINVAL;
@@ -161,19 +156,7 @@ static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
if (msg->flags & IORING_MSG_RING_FLAGS_PASS)
flags = msg->cqe_flags;
- if (io_msg_need_remote(target_ctx))
- return io_msg_fill_remote(msg, issue_flags, target_ctx, flags);
-
- ret = -EOVERFLOW;
- if (target_ctx->flags & IORING_SETUP_IOPOLL) {
- if (unlikely(io_double_lock_ctx(target_ctx, issue_flags)))
- return -EAGAIN;
- }
- if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, flags))
- ret = 0;
- if (target_ctx->flags & IORING_SETUP_IOPOLL)
- io_double_unlock_ctx(target_ctx);
- return ret;
+ return io_msg_fill_remote(msg, issue_flags, target_ctx, flags);
}
static struct file *io_msg_grab_file(struct io_kiocb *req, unsigned int issue_flags)
@@ -194,38 +177,6 @@ static struct file *io_msg_grab_file(struct io_kiocb *req, unsigned int issue_fl
return file;
}
-static int io_msg_install_complete(struct io_kiocb *req, unsigned int issue_flags)
-{
- struct io_ring_ctx *target_ctx = req->file->private_data;
- struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
- struct file *src_file = msg->src_file;
- int ret;
-
- if (unlikely(io_double_lock_ctx(target_ctx, issue_flags)))
- return -EAGAIN;
-
- ret = __io_fixed_fd_install(target_ctx, src_file, msg->dst_fd);
- if (ret < 0)
- goto out_unlock;
-
- msg->src_file = NULL;
- req->flags &= ~REQ_F_NEED_CLEANUP;
-
- if (msg->flags & IORING_MSG_RING_CQE_SKIP)
- goto out_unlock;
- /*
- * If this fails, the target still received the file descriptor but
- * wasn't notified of the fact. This means that if this request
- * completes with -EOVERFLOW, then the sender must ensure that a
- * later IORING_OP_MSG_RING delivers the message.
- */
- if (!io_post_aux_cqe(target_ctx, msg->user_data, ret, 0))
- ret = -EOVERFLOW;
-out_unlock:
- io_double_unlock_ctx(target_ctx);
- return ret;
-}
-
static int io_msg_install_remote(struct io_kiocb *req, unsigned int issue_flags,
struct io_ring_ctx *target_ctx)
{
@@ -284,9 +235,7 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
req->flags |= REQ_F_NEED_CLEANUP;
}
- if (io_msg_need_remote(target_ctx))
- return io_msg_install_remote(req, issue_flags, target_ctx);
- return io_msg_install_complete(req, issue_flags);
+ return io_msg_install_remote(req, issue_flags, target_ctx);
}
int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
Now that the overflow approach works well, there's no need to retain the double locking for direct CQ posting on the target ring. Just have any kind of target ring use the same messaging mechanism. Signed-off-by: Jens Axboe <axboe@kernel.dk> --- io_uring/msg_ring.c | 79 ++++++++------------------------------------- 1 file changed, 14 insertions(+), 65 deletions(-)