@@ -95,7 +95,8 @@ struct io_uring_task {
struct percpu_counter inflight;
struct { /* task_work */
- struct llist_head task_list;
+ struct io_wq_work_list task_list;
+ spinlock_t task_lock;
struct callback_head task_work;
} ____cacheline_aligned_in_smp;
};
@@ -561,10 +562,7 @@ enum {
typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts);
struct io_task_work {
- union {
- struct io_wq_work_node node;
- struct llist_node llist_node;
- };
+ struct io_wq_work_node node;
io_req_tw_func_t func;
};
@@ -1134,17 +1134,17 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
* If more entries than max_entries are available, stop processing once this
* is reached and return the rest of the list.
*/
-struct llist_node *io_handle_tw_list(struct llist_node *node,
- unsigned int *count,
- unsigned int max_entries)
+struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
+ unsigned int *count,
+ unsigned int max_entries)
{
struct io_ring_ctx *ctx = NULL;
struct io_tw_state ts = { };
do {
- struct llist_node *next = node->next;
+ struct io_wq_work_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
- io_task_work.llist_node);
+ io_task_work.node);
if (req->ctx != ctx) {
ctx_flush_and_put(ctx, &ts);
@@ -1170,15 +1170,20 @@ struct llist_node *io_handle_tw_list(struct llist_node *node,
static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync)
{
- struct llist_node *node = llist_del_all(&tctx->task_list);
struct io_ring_ctx *last_ctx = NULL;
+ struct io_wq_work_node *node;
struct io_kiocb *req;
+ unsigned long flags;
+
+ spin_lock_irqsave(&tctx->task_lock, flags);
+ node = tctx->task_list.first;
+ INIT_WQ_LIST(&tctx->task_list);
+ spin_unlock_irqrestore(&tctx->task_lock, flags);
while (node) {
- unsigned long flags;
bool do_wake;
- req = container_of(node, struct io_kiocb, io_task_work.llist_node);
+ req = container_of(node, struct io_kiocb, io_task_work.node);
node = node->next;
if (sync && last_ctx != req->ctx) {
if (last_ctx) {
@@ -1202,22 +1207,24 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync)
}
}
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
- unsigned int max_entries,
- unsigned int *count)
+struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
+ unsigned int max_entries,
+ unsigned int *count)
{
- struct llist_node *node;
+ struct io_wq_work_node *node;
if (unlikely(current->flags & PF_EXITING)) {
io_fallback_tw(tctx, true);
return NULL;
}
- node = llist_del_all(&tctx->task_list);
- if (node) {
- node = llist_reverse_order(node);
+ spin_lock_irq(&tctx->task_lock);
+ node = tctx->task_list.first;
+ INIT_WQ_LIST(&tctx->task_list);
+ spin_unlock_irq(&tctx->task_lock);
+
+ if (node)
node = io_handle_tw_list(node, count, max_entries);
- }
/* relaxed read is enough as only the task itself sets ->in_cancel */
if (unlikely(atomic_read(&tctx->in_cancel)))
@@ -1229,8 +1236,8 @@ struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
void tctx_task_work(struct callback_head *cb)
{
+ struct io_wq_work_node *ret;
struct io_uring_task *tctx;
- struct llist_node *ret;
unsigned int count = 0;
tctx = container_of(cb, struct io_uring_task, task_work);
@@ -1284,9 +1291,16 @@ static void io_req_normal_work_add(struct io_kiocb *req)
{
struct io_uring_task *tctx = req->task->io_uring;
struct io_ring_ctx *ctx = req->ctx;
+ unsigned long flags;
+ bool was_empty;
+
+ spin_lock_irqsave(&tctx->task_lock, flags);
+ was_empty = wq_list_empty(&tctx->task_list);
+ wq_list_add_tail(&req->io_task_work.node, &tctx->task_list);
+ spin_unlock_irqrestore(&tctx->task_lock, flags);
/* task_work already pending, we're done */
- if (!llist_add(&req->io_task_work.llist_node, &tctx->task_list))
+ if (!was_empty)
return;
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
@@ -87,8 +87,8 @@ void io_req_task_queue(struct io_kiocb *req);
void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts);
void io_req_task_queue_fail(struct io_kiocb *req, int ret);
void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
-struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
+struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node, unsigned int *count, unsigned int max_entries);
+struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
void tctx_task_work(struct callback_head *cb);
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
int io_uring_alloc_task_context(struct task_struct *task,
@@ -230,7 +230,7 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd)
* than we were asked to process. Newly queued task_work isn't run until the
* retry list has been fully processed.
*/
-static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
+static unsigned int io_sq_tw(struct io_wq_work_node **retry_list, int max_entries)
{
struct io_uring_task *tctx = current->io_uring;
unsigned int count = 0;
@@ -246,11 +246,11 @@ static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
return count;
}
-static bool io_sq_tw_pending(struct llist_node *retry_list)
+static bool io_sq_tw_pending(struct io_wq_work_node *retry_list)
{
struct io_uring_task *tctx = current->io_uring;
- return retry_list || !llist_empty(&tctx->task_list);
+ return retry_list || !wq_list_empty(&tctx->task_list);
}
static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
@@ -266,7 +266,7 @@ static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
static int io_sq_thread(void *data)
{
- struct llist_node *retry_list = NULL;
+ struct io_wq_work_node *retry_list = NULL;
struct io_sq_data *sqd = data;
struct io_ring_ctx *ctx;
struct rusage start;
@@ -86,7 +86,8 @@ __cold int io_uring_alloc_task_context(struct task_struct *task,
atomic_set(&tctx->in_cancel, 0);
atomic_set(&tctx->inflight_tracked, 0);
task->io_uring = tctx;
- init_llist_head(&tctx->task_list);
+ INIT_WQ_LIST(&tctx->task_list);
+ spin_lock_init(&tctx->task_lock);
init_task_work(&tctx->task_work, tctx_task_work);
return 0;
}
This concludes the transition, now all times of task_work are using the same mechanism. Signed-off-by: Jens Axboe <axboe@kernel.dk> --- include/linux/io_uring_types.h | 8 ++---- io_uring/io_uring.c | 50 ++++++++++++++++++++++------------ io_uring/io_uring.h | 4 +-- io_uring/sqpoll.c | 8 +++--- io_uring/tctx.c | 3 +- 5 files changed, 43 insertions(+), 30 deletions(-)