@@ -357,6 +357,13 @@ struct io_ring_ctx {
struct io_alloc_cache futex_cache;
#endif
+ /*
+ * Unlike the other caches, this one is used by the sender of messages
+ * to this ring, not by the ring itself. As such, protection for this
+ * cache is under ->completion_lock, not ->uring_lock.
+ */
+ struct io_alloc_cache msg_cache;
+
const struct cred *sq_creds; /* cred used for __io_sq_thread() */
struct io_sq_data *sq_data; /* if using sq thread polling */
@@ -95,6 +95,7 @@
#include "futex.h"
#include "napi.h"
#include "uring_cmd.h"
+#include "msg_ring.h"
#include "memmap.h"
#include "timeout.h"
@@ -315,6 +316,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
ret |= io_alloc_cache_init(&ctx->uring_cache, IO_ALLOC_CACHE_MAX,
sizeof(struct uring_cache));
ret |= io_futex_cache_init(ctx);
+ ret |= io_msg_cache_init(ctx);
if (ret)
goto err;
init_completion(&ctx->ref_comp);
@@ -351,6 +353,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
io_alloc_cache_free(&ctx->uring_cache, kfree);
io_futex_cache_free(ctx);
+ io_msg_cache_free(ctx);
kfree(ctx->cancel_table.hbs);
kfree(ctx->cancel_table_locked.hbs);
xa_destroy(&ctx->io_bl_xa);
@@ -695,7 +698,8 @@ static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying)
memcpy(cqe, &ocqe->cqe, cqe_size);
}
list_del(&ocqe->list);
- kfree(ocqe);
+ if (!io_alloc_cache_put(&ctx->msg_cache, ocqe))
+ kfree(ocqe);
}
if (list_empty(&ctx->cq_overflow_list)) {
@@ -2649,6 +2653,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
io_alloc_cache_free(&ctx->uring_cache, kfree);
io_futex_cache_free(ctx);
+ io_msg_cache_free(ctx);
io_destroy_buffers(ctx);
mutex_unlock(&ctx->uring_lock);
if (ctx->sq_creds)
@@ -11,6 +11,7 @@
#include "io_uring.h"
#include "rsrc.h"
#include "filetable.h"
+#include "alloc_cache.h"
#include "msg_ring.h"
@@ -73,19 +74,24 @@ static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx)
static struct io_overflow_cqe *io_alloc_overflow(struct io_ring_ctx *target_ctx)
{
- bool is_cqe32 = target_ctx->flags & IORING_SETUP_CQE32;
- size_t cqe_size = sizeof(struct io_overflow_cqe);
struct io_overflow_cqe *ocqe;
- if (is_cqe32)
- cqe_size += sizeof(struct io_uring_cqe);
+ ocqe = io_alloc_cache_get(&target_ctx->msg_cache);
+ if (!ocqe) {
+ bool is_cqe32 = target_ctx->flags & IORING_SETUP_CQE32;
+ size_t cqe_size = sizeof(struct io_overflow_cqe);
- ocqe = kmalloc(cqe_size, GFP_ATOMIC | __GFP_ACCOUNT);
- if (!ocqe)
- return NULL;
+ if (is_cqe32)
+ cqe_size += sizeof(struct io_uring_cqe);
- if (is_cqe32)
- ocqe->cqe.big_cqe[0] = ocqe->cqe.big_cqe[1] = 0;
+ ocqe = kmalloc(cqe_size, GFP_ATOMIC | __GFP_ACCOUNT);
+ if (!ocqe)
+ return NULL;
+
+ /* just init at alloc time, won't change */
+ if (is_cqe32)
+ ocqe->cqe.big_cqe[0] = ocqe->cqe.big_cqe[1] = 0;
+ }
return ocqe;
}
@@ -119,13 +125,16 @@ static int io_msg_fill_remote(struct io_msg *msg, unsigned int issue_flags,
{
struct io_overflow_cqe *ocqe;
+ spin_lock(&target_ctx->completion_lock);
+
ocqe = io_alloc_overflow(target_ctx);
- if (!ocqe)
- return -ENOMEM;
+ if (ocqe) {
+ io_msg_add_overflow(msg, target_ctx, ocqe, msg->len, flags);
+ return 0;
+ }
- spin_lock(&target_ctx->completion_lock);
- io_msg_add_overflow(msg, target_ctx, ocqe, msg->len, flags);
- return 0;
+ spin_unlock(&target_ctx->completion_lock);
+ return -ENOMEM;
}
static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
@@ -228,17 +237,16 @@ static int io_msg_install_remote(struct io_kiocb *req, unsigned int issue_flags,
struct io_overflow_cqe *ocqe = NULL;
int ret;
+ if (unlikely(io_double_lock_ctx(target_ctx, issue_flags)))
+ return -EAGAIN;
+
if (!skip_cqe) {
+ spin_lock(&target_ctx->completion_lock);
ocqe = io_alloc_overflow(target_ctx);
if (!ocqe)
return -ENOMEM;
}
- if (unlikely(io_double_lock_ctx(target_ctx, issue_flags))) {
- kfree(ocqe);
- return -EAGAIN;
- }
-
ret = __io_fixed_fd_install(target_ctx, msg->src_file, msg->dst_fd);
mutex_unlock(&target_ctx->uring_lock);
@@ -246,12 +254,14 @@ static int io_msg_install_remote(struct io_kiocb *req, unsigned int issue_flags,
msg->src_file = NULL;
req->flags &= ~REQ_F_NEED_CLEANUP;
if (!skip_cqe) {
- spin_lock(&target_ctx->completion_lock);
io_msg_add_overflow(msg, target_ctx, ocqe, ret, 0);
return 0;
}
}
- kfree(ocqe);
+ if (ocqe) {
+ spin_unlock(&target_ctx->completion_lock);
+ kfree(ocqe);
+ }
return ret;
}
@@ -331,3 +341,18 @@ int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags)
io_req_set_res(req, ret, 0);
return IOU_OK;
}
+
+int io_msg_cache_init(struct io_ring_ctx *ctx)
+{
+ size_t size = sizeof(struct io_overflow_cqe);
+
+ if (ctx->flags & IORING_SETUP_CQE32)
+ size += sizeof(struct io_uring_cqe);
+
+ return io_alloc_cache_init(&ctx->msg_cache, IO_ALLOC_CACHE_MAX, size);
+}
+
+void io_msg_cache_free(struct io_ring_ctx *ctx)
+{
+ io_alloc_cache_free(&ctx->msg_cache, kfree);
+}
@@ -3,3 +3,6 @@
int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags);
void io_msg_ring_cleanup(struct io_kiocb *req);
+
+int io_msg_cache_init(struct io_ring_ctx *ctx);
+void io_msg_cache_free(struct io_ring_ctx *ctx);
io_uring accounts the memory allocated, which is quite expensive. Wrap the allocation and frees in the provided alloc cache framework. The target ctx needs to be locked anyway for posting the overflow entry, so just move the overflow alloc inside that section. Flushing the entries has it locked as well, so io_cache_alloc_free() can be used. In a simple test, most of the overhead of DEFER_TASKRUN message passing ends up being accounting for allocation and free, and with this change it's completely gone. Signed-off-by: Jens Axboe <axboe@kernel.dk> --- include/linux/io_uring_types.h | 7 ++++ io_uring/io_uring.c | 7 +++- io_uring/msg_ring.c | 67 +++++++++++++++++++++++----------- io_uring/msg_ring.h | 3 ++ 4 files changed, 62 insertions(+), 22 deletions(-)