@@ -73,6 +73,7 @@ struct io_wqe_acct {
};
};
unsigned max_works;
+ unsigned work_seq;
union {
struct io_wq_work_list work_list;
struct {
@@ -631,9 +632,9 @@ static void io_assign_current_work(struct io_worker *worker,
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
-static void io_worker_handle_work(struct io_worker *worker)
+static void io_worker_handle_work(struct io_worker *worker,
+ struct io_wqe_acct *acct)
{
- struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
@@ -705,19 +706,31 @@ static void io_worker_handle_work(struct io_worker *worker)
} while (1);
}
+static inline void io_worker_handle_private_work(struct io_worker *worker)
+{
+ io_worker_handle_work(worker, &worker->acct);
+}
+
+static inline void io_worker_handle_public_work(struct io_worker *worker)
+{
+ io_worker_handle_work(worker, io_wqe_get_acct(worker));
+}
+
static int io_wqe_worker(void *data)
{
struct io_worker *worker = data;
- struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
- bool last_timeout = false;
+ struct io_wqe_acct *acct =
+ io_get_acct(wqe, worker->flags & IO_WORKER_F_BOUND, false);
bool fixed = worker->flags & IO_WORKER_F_FIXED;
+ bool last_timeout = false;
char buf[TASK_COMM_LEN];
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
- snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
+ snprintf(buf, sizeof(buf), fixed ? "iou-fix-%d" : "iou-wrk-%d",
+ wq->task->pid);
set_task_comm(current, buf);
audit_alloc_kernel(current);
@@ -729,13 +742,24 @@ static int io_wqe_worker(void *data)
break;
set_current_state(TASK_INTERRUPTIBLE);
- while (!(worker->flags & IO_WORKER_F_EXIT) &&
- io_acct_run_queue(acct))
- io_worker_handle_work(worker);
-
+ if (fixed) {
+ while (io_acct_run_queue(&worker->acct))
+ io_worker_handle_private_work(worker);
+ if (io_acct_run_queue(acct))
+ io_worker_handle_public_work(worker);
+ } else {
+ while (io_acct_run_queue(acct))
+ io_worker_handle_public_work(worker);
+ }
raw_spin_lock(&wqe->lock);
- /* timed out, exit unless we're the last worker */
- if (last_timeout && acct->nr_workers > 1) {
+ /* timed out, a worker will exit only if:
+ * - not a fixed worker
+ * - not the last non-fixed worker
+ *
+ * the second condition is due to we need at least one worker to
+ * handle the public work list.
+ */
+ if (last_timeout && !fixed && acct->nr_workers > 1) {
acct->nr_workers--;
raw_spin_unlock(&wqe->lock);
__set_current_state(TASK_RUNNING);
@@ -761,10 +785,18 @@ static int io_wqe_worker(void *data)
last_timeout = !ret;
}
- if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
- io_worker_handle_work(worker);
- if (fixed)
+ if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && !fixed)
+ io_worker_handle_public_work(worker);
+ if (fixed) {
io_fixed_worker_exit(worker);
+ /*
+ * Check and handle private work list again
+ * to avoid race with private work insertion
+ * TODO: an alternative way is to deliver
+ * works to the public work list
+ */
+ io_worker_handle_private_work(worker);
+ }
audit_free(current);
io_worker_exit(worker);
@@ -1008,9 +1040,9 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
} while (work);
}
-static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
+static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work,
+ struct io_wqe_acct *acct)
{
- struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
unsigned int hash;
struct io_wq_work *tail;
@@ -1029,6 +1061,45 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
wq_list_add_after(&work->list, &tail->list, &acct->work_list);
}
+static bool io_wqe_insert_private_work(struct io_wqe *wqe,
+ struct io_wq_work *work,
+ struct io_wqe_acct *acct)
+{
+ unsigned int nr_fixed;
+ struct io_worker *fixed_worker;
+ struct io_wqe_acct *iw_acct;
+ unsigned int fixed_worker_index;
+
+ raw_spin_lock(&acct->lock);
+ nr_fixed = acct->nr_fixed;
+ if (!nr_fixed) {
+ raw_spin_unlock(&acct->lock);
+ return false;
+ }
+
+ fixed_worker_index = (acct->work_seq++) % nr_fixed;
+ fixed_worker = acct->fixed_workers[fixed_worker_index];
+ if (!fixed_worker || fixed_worker->flags & IO_WORKER_F_EXIT) {
+ raw_spin_unlock(&acct->lock);
+ return false;
+ }
+ iw_acct = &fixed_worker->acct;
+
+ raw_spin_lock(&iw_acct->lock);
+ if (iw_acct->nr_works < iw_acct->max_works) {
+ io_wqe_insert_work(wqe, work, iw_acct);
+ iw_acct->nr_works++;
+ raw_spin_unlock(&iw_acct->lock);
+ wake_up_process(fixed_worker->task);
+ raw_spin_unlock(&acct->lock);
+ return true;
+ }
+ raw_spin_unlock(&iw_acct->lock);
+ raw_spin_unlock(&acct->lock);
+
+ return false;
+}
+
static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
{
return work == data;
@@ -1037,6 +1108,7 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
+ struct io_wqe_acct *fixed_acct;
struct io_cb_cancel_data match;
unsigned work_flags = work->flags;
bool do_create;
@@ -1051,8 +1123,14 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
return;
}
+ fixed_acct = io_get_acct(wqe, !acct->index, true);
+ if (fixed_acct->fixed_worker_registered && !io_wq_is_hashed(work)) {
+ if (io_wqe_insert_private_work(wqe, work, fixed_acct))
+ return;
+ }
+
raw_spin_lock(&acct->lock);
- io_wqe_insert_work(wqe, work);
+ io_wqe_insert_work(wqe, work, acct);
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);
@@ -1138,9 +1216,9 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
struct io_wq_work *work,
- struct io_wq_work_node *prev)
+ struct io_wq_work_node *prev,
+ struct io_wqe_acct *acct)
{
- struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
unsigned int hash = io_get_work_hash(work);
struct io_wq_work *prev_work = NULL;
@@ -1167,7 +1245,7 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
work = container_of(node, struct io_wq_work, list);
if (!match->fn(work, match->data))
continue;
- io_wqe_remove_pending(wqe, work, prev);
+ io_wqe_remove_pending(wqe, work, prev, acct);
raw_spin_unlock(&acct->lock);
io_run_cancel(work, wqe);
match->nr_pending++;
@@ -1182,17 +1260,35 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
struct io_cb_cancel_data *match)
{
- int i;
-retry:
- for (i = 0; i < IO_WQ_ACCT_NR; i++) {
- struct io_wqe_acct *acct = io_get_acct(wqe, i == 0, false);
+ int i, j;
+ struct io_wqe_acct *acct, *iw_acct;
+retry_public:
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ acct = io_get_acct(wqe, i == 0, false);
if (io_acct_cancel_pending_work(wqe, acct, match)) {
if (match->cancel_all)
- goto retry;
- break;
+ goto retry_public;
+ return;
}
}
+
+retry_private:
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ acct = io_get_acct(wqe, i == 0, true);
+ raw_spin_lock(&acct->lock);
+ for (j = 0; j < acct->nr_fixed; j++) {
+ iw_acct = &acct->fixed_workers[j]->acct;
+ if (io_acct_cancel_pending_work(wqe, iw_acct, match)) {
+ if (match->cancel_all) {
+ raw_spin_unlock(&acct->lock);
+ goto retry_private;
+ }
+ break;
+ }
+ }
+ raw_spin_unlock(&acct->lock);
+ }
}
static void io_wqe_cancel_running_work(struct io_wqe *wqe,