@@ -399,6 +399,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
if (!io_worker_test_submit(worker))
return;
+ io_uringlet_end(wq->private);
io_worker_set_scheduled(worker);
raw_spin_lock(&wqe->lock);
rcu_read_lock();
@@ -2054,6 +2054,12 @@ static void io_commit_sqring(struct io_ring_ctx *ctx)
smp_store_release(&rings->sq.head, ctx->cached_sq_head);
}
+void io_uringlet_end(struct io_ring_ctx *ctx)
+{
+ io_submit_state_end(ctx);
+ io_commit_sqring(ctx);
+}
+
/*
* Fetch an sqe, if one is available. Note this returns a pointer to memory
* that is mapped by userspace. This means that care needs to be taken to
@@ -2141,6 +2147,55 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
io_commit_sqring(ctx);
return ret;
}
+int io_submit_sqes_let(struct io_wq_work *work)
+{
+ struct io_ring_ctx *ctx = (struct io_ring_ctx *)work;
+ unsigned int entries;
+ bool scheduled = false;
+ void *worker = current->worker_private;
+
+ entries = io_sqring_entries(ctx);
+ if (!entries)
+ return IO_URINGLET_EMPTY;
+
+ io_get_task_refs(entries);
+ io_submit_state_start(&ctx->submit_state, entries);
+ do {
+ const struct io_uring_sqe *sqe;
+ struct io_kiocb *req;
+
+ if (unlikely(!io_alloc_req_refill(ctx)))
+ break;
+ req = io_alloc_req(ctx);
+ sqe = io_get_sqe(ctx);
+ if (unlikely(!sqe)) {
+ io_req_add_to_cache(req, ctx);
+ break;
+ }
+
+ if (unlikely(io_submit_sqe(ctx, req, sqe)))
+ break;
+ /* TODO this one breaks encapsulation */
+ scheduled = io_worker_test_scheduled(worker);
+ if (unlikely(scheduled)) {
+ entries--;
+ break;
+ }
+ } while (--entries);
+
+ /* TODO do this at the schedule time too */
+ if (unlikely(entries))
+ current->io_uring->cached_refs += entries;
+
+ /* Commit SQ ring head once we've consumed and submitted all SQEs */
+
+ if (scheduled)
+ return IO_URINGLET_SCHEDULED;
+
+ io_uringlet_end(ctx);
+ return IO_URINGLET_INLINE;
+}
+
struct io_wait_queue {
struct wait_queue_entry wq;
@@ -64,9 +64,11 @@ int io_uring_alloc_task_context(struct task_struct *task,
int io_poll_issue(struct io_kiocb *req, bool *locked);
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr);
+int io_submit_sqes_let(struct io_wq_work *work);
int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin);
void io_free_batch_list(struct io_ring_ctx *ctx, struct io_wq_work_node *node);
int io_req_prep_async(struct io_kiocb *req);
+void io_uringlet_end(struct io_ring_ctx *ctx);
struct io_wq_work *io_wq_free_work(struct io_wq_work *work);
int io_wq_submit_work(struct io_wq_work *work);
@@ -37,12 +37,14 @@ struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx,
/* for uringlet, wq->task is the iouring instance creator */
data.task = task;
data.free_work = io_wq_free_work;
- data.do_work = io_wq_submit_work;
/* distinguish normal iowq and uringlet by wq->private for now */
- if (ctx->flags & IORING_SETUP_URINGLET)
+ if (ctx->flags & IORING_SETUP_URINGLET) {
data.private = ctx;
- else
+ data.do_work = io_submit_sqes_let;
+ } else {
data.private = NULL;
+ data.do_work = io_wq_submit_work;
+ }
/* Do QD, or 4 * CPUS, whatever is smallest */
concurrency = min(ctx->sq_entries, 4 * num_online_cpus());