@@ -687,6 +687,12 @@ struct io_bpf {
struct bpf_prog *prog;
};
+struct io_async_bpf {
+ struct wait_queue_entry wqe;
+ unsigned int wait_nr;
+ unsigned int wait_idx;
+};
+
struct io_completion {
struct file *file;
struct list_head list;
@@ -1050,7 +1056,9 @@ static const struct io_op_def io_op_defs[] = {
},
[IORING_OP_RENAMEAT] = {},
[IORING_OP_UNLINKAT] = {},
- [IORING_OP_BPF] = {},
+ [IORING_OP_BPF] = {
+ .async_size = sizeof(struct io_async_bpf),
+ },
};
static bool io_disarm_next(struct io_kiocb *req);
@@ -9148,6 +9156,7 @@ static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
}
}
+ wake_up_all(&ctx->wait);
ret |= io_cancel_defer_files(ctx, task, files);
ret |= io_poll_remove_all(ctx, task, files);
ret |= io_kill_timeouts(ctx, task, files);
@@ -10492,6 +10501,10 @@ static bool io_bpf_is_valid_access(int off, int size,
switch (off) {
case offsetof(struct io_uring_bpf_ctx, user_data):
return size == sizeof_field(struct io_uring_bpf_ctx, user_data);
+ case offsetof(struct io_uring_bpf_ctx, wait_nr):
+ return size == sizeof_field(struct io_uring_bpf_ctx, wait_nr);
+ case offsetof(struct io_uring_bpf_ctx, wait_idx):
+ return size == sizeof_field(struct io_uring_bpf_ctx, wait_idx);
}
return false;
}
@@ -10503,6 +10516,60 @@ const struct bpf_verifier_ops bpf_io_uring_verifier_ops = {
.is_valid_access = io_bpf_is_valid_access,
};
+static inline bool io_bpf_need_wake(struct io_async_bpf *abpf)
+{
+ struct io_kiocb *req = abpf->wqe.private;
+ struct io_ring_ctx *ctx = req->ctx;
+
+ if (unlikely(percpu_ref_is_dying(&ctx->refs)) ||
+ atomic_read(&req->task->io_uring->in_idle))
+ return true;
+ return __io_cqring_events(&ctx->cqs[abpf->wait_idx]) >= abpf->wait_nr;
+}
+
+static int io_bpf_wait_func(struct wait_queue_entry *wqe, unsigned mode,
+ int sync, void *key)
+{
+ struct io_async_bpf *abpf = container_of(wqe, struct io_async_bpf, wqe);
+ bool wake = io_bpf_need_wake(abpf);
+
+ if (wake) {
+ list_del_init_careful(&wqe->entry);
+ req_ref_get(wqe->private);
+ io_queue_async_work(wqe->private);
+ }
+ return wake;
+}
+
+static int io_bpf_wait_cq_async(struct io_kiocb *req, unsigned int nr,
+ unsigned int idx)
+{
+ struct io_ring_ctx *ctx = req->ctx;
+ struct wait_queue_head *wq;
+ struct wait_queue_entry *wqe;
+ struct io_async_bpf *abpf;
+
+ if (unlikely(idx >= ctx->cq_nr))
+ return -EINVAL;
+ if (!req->async_data && io_alloc_async_data(req))
+ return -ENOMEM;
+
+ abpf = req->async_data;
+ abpf->wait_nr = nr;
+ abpf->wait_idx = idx;
+ wqe = &abpf->wqe;
+ init_waitqueue_func_entry(wqe, io_bpf_wait_func);
+ wqe->private = req;
+ wq = &ctx->wait;
+
+ spin_lock_irq(&wq->lock);
+ __add_wait_queue(wq, wqe);
+ smp_mb();
+ io_bpf_wait_func(wqe, 0, 0, NULL);
+ spin_unlock_irq(&wq->lock);
+ return 0;
+}
+
static void io_bpf_run(struct io_kiocb *req, unsigned int issue_flags)
{
struct io_ring_ctx *ctx = req->ctx;
@@ -10512,8 +10579,8 @@ static void io_bpf_run(struct io_kiocb *req, unsigned int issue_flags)
lockdep_assert_held(&req->ctx->uring_lock);
- if (unlikely(percpu_ref_is_dying(&ctx->refs) ||
- atomic_read(&req->task->io_uring->in_idle)))
+ if (unlikely(percpu_ref_is_dying(&ctx->refs)) ||
+ atomic_read(&req->task->io_uring->in_idle))
goto done;
memset(&bpf_ctx.u, 0, sizeof(bpf_ctx.u));
@@ -10531,6 +10598,13 @@ static void io_bpf_run(struct io_kiocb *req, unsigned int issue_flags)
}
io_submit_state_end(&ctx->submit_state, ctx);
ret = 0;
+
+ if (bpf_ctx.u.wait_nr) {
+ ret = io_bpf_wait_cq_async(req, bpf_ctx.u.wait_nr,
+ bpf_ctx.u.wait_idx);
+ if (!ret)
+ return;
+ }
done:
__io_req_complete(req, issue_flags, ret, 0);
}
@@ -405,6 +405,8 @@ struct io_uring_getevents_arg {
struct io_uring_bpf_ctx {
__u64 user_data;
+ __u32 wait_nr;
+ __u32 wait_idx;
};
#endif
Add experimental support for bpf requests waiting for a number of CQEs to in a specified CQ. Signed-off-by: Pavel Begunkov <asml.silence@gmail.com> --- fs/io_uring.c | 80 +++++++++++++++++++++++++++++++++-- include/uapi/linux/io_uring.h | 2 + 2 files changed, 79 insertions(+), 3 deletions(-)