@@ -616,22 +616,33 @@ static bool __sbq_wake_up(struct sbitmap_queue *sbq)
return false;
wait_cnt = atomic_dec_return(&ws->wait_cnt);
- if (wait_cnt > 0)
- return false;
-
/*
* For concurrent callers of this, callers should call this function
* again to wakeup a new batch on a different 'ws'.
*/
- if (wait_cnt < 0)
+ if (wait_cnt < 0 || !waitqueue_active(&ws->wait))
return true;
+ if (wait_cnt > 0)
+ return false;
+
wake_batch = READ_ONCE(sbq->wake_batch);
+ /*
+ * Wake up first in case that concurrent callers decrease wait_cnt
+ * while waitqueue is empty.
+ */
+ wake_up_nr(&ws->wait, wake_batch);
+
/*
* Pairs with the memory barrier in sbitmap_queue_resize() to
* ensure that we see the batch size update before the wait
* count is reset.
+ *
+ * Also pairs with the implicit barrier between becrementing wait_cnt
+ * and checking for waitqueue_active() to make sure waitqueue_active()
+ * sees result of the wakeup if atomic_dec_return() has seen the result
+ * of atomic_set().
*/
smp_mb__before_atomic();
@@ -642,7 +653,6 @@ static bool __sbq_wake_up(struct sbitmap_queue *sbq)
*/
sbq_index_atomic_inc(&sbq->wake_index);
atomic_set(&ws->wait_cnt, wake_batch);
- wake_up_nr(&ws->wait, wake_batch);
return false;
}