@@ -239,7 +239,19 @@ EXPORT_SYMBOL_GPL(sbitmap_queue_init_node);
void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth)
{
- sbq->wake_batch = sbq_calc_wake_batch(depth);
+ unsigned int wake_batch = sbq_calc_wake_batch(depth);
+ int i;
+
+ if (sbq->wake_batch != wake_batch) {
+ WRITE_ONCE(sbq->wake_batch, wake_batch);
+ /*
+ * Pairs with the memory barrier in sbq_wake_up() to ensure that
+ * the batch size is updated before the wait counts.
+ */
+ smp_mb__before_atomic();
+ for (i = 0; i < SBQ_WAIT_QUEUES; i++)
+ atomic_set(&sbq->ws[i].wait_cnt, 1);
+ }
sbitmap_resize(&sbq->sb, depth);
}
EXPORT_SYMBOL_GPL(sbitmap_queue_resize);
@@ -297,6 +309,7 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
static void sbq_wake_up(struct sbitmap_queue *sbq)
{
struct sbq_wait_state *ws;
+ unsigned int wake_batch;
int wait_cnt;
/*
@@ -313,10 +326,22 @@ static void sbq_wake_up(struct sbitmap_queue *sbq)
return;
wait_cnt = atomic_dec_return(&ws->wait_cnt);
- if (unlikely(wait_cnt < 0))
- wait_cnt = atomic_inc_return(&ws->wait_cnt);
- if (wait_cnt == 0) {
- atomic_add(sbq->wake_batch, &ws->wait_cnt);
+ if (wait_cnt <= 0) {
+ wake_batch = READ_ONCE(sbq->wake_batch);
+ /*
+ * Pairs with the memory barrier in sbitmap_queue_resize() to
+ * ensure that we see the batch size update before the wait
+ * count is reset.
+ */
+ smp_mb__before_atomic();
+ /*
+ * If there are concurrent callers to sbq_wake_up(), the last
+ * one to decrement the wait count below zero will bump it back
+ * up. If there is a concurrent resize, the count reset will
+ * either cause the cmpxchg to fail or overwrite after the
+ * cmpxchg.
+ */
+ atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wait_cnt + wake_batch);
sbq_index_atomic_inc(&sbq->wake_index);
wake_up(&ws->wait);
}