@@ -96,6 +96,9 @@ struct io_wq {
void *private;
+ raw_spinlock_t lock;
+ struct io_worker *owner;
+
struct io_wqe *wqes[];
};
@@ -381,6 +384,8 @@ static inline bool io_worker_test_submit(struct io_worker *worker)
return worker->flags & IO_WORKER_F_SUBMIT;
}
+#define IO_WQ_OWNER_TRANSMIT ((struct io_worker *)-1)
+
static void io_wqe_dec_running(struct io_worker *worker)
{
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
@@ -401,6 +406,10 @@ static void io_wqe_dec_running(struct io_worker *worker)
io_uringlet_end(wq->private);
io_worker_set_scheduled(worker);
+ raw_spin_lock(&wq->lock);
+ wq->owner = IO_WQ_OWNER_TRANSMIT;
+ raw_spin_unlock(&wq->lock);
+
raw_spin_lock(&wqe->lock);
rcu_read_lock();
activated = io_wqe_activate_free_worker(wqe, acct);
@@ -674,6 +683,17 @@ static void io_wqe_worker_let(struct io_worker *worker)
while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
unsigned int empty_count = 0;
+ struct io_worker *owner;
+
+ raw_spin_lock(&wq->lock);
+ owner = wq->owner;
+ if (owner && owner != IO_WQ_OWNER_TRANSMIT && owner != worker) {
+ raw_spin_unlock(&wq->lock);
+ set_current_state(TASK_INTERRUPTIBLE);
+ goto sleep;
+ }
+ wq->owner = worker;
+ raw_spin_unlock(&wq->lock);
__io_worker_busy(wqe, worker);
set_current_state(TASK_INTERRUPTIBLE);
@@ -697,6 +717,7 @@ static void io_wqe_worker_let(struct io_worker *worker)
cond_resched();
} while (1);
+sleep:
raw_spin_lock(&wqe->lock);
__io_worker_idle(wqe, worker);
raw_spin_unlock(&wqe->lock);
@@ -780,6 +801,14 @@ int io_uringlet_offload(struct io_wq *wq)
struct io_wqe_acct *acct = io_get_acct(wqe, true);
bool waken;
+ raw_spin_lock(&wq->lock);
+ if (wq->owner) {
+ raw_spin_unlock(&wq->lock);
+ return 0;
+ }
+ wq->owner = IO_WQ_OWNER_TRANSMIT;
+ raw_spin_unlock(&wq->lock);
+
raw_spin_lock(&wqe->lock);
rcu_read_lock();
waken = io_wqe_activate_free_worker(wqe, acct);
@@ -1248,6 +1277,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
wq->free_work = data->free_work;
wq->do_work = data->do_work;
wq->private = data->private;
+ raw_spin_lock_init(&wq->lock);
ret = -ENOMEM;
for_each_node(node) {