diff mbox series

[-next,RFC,v2,3/8] sbitmap: make sure waitqueues are balanced

Message ID 20220408073916.1428590-4-yukuai3@huawei.com (mailing list archive)
State New, archived
Headers show
Series improve tag allocation under heavy load | expand

Commit Message

Yu Kuai April 8, 2022, 7:39 a.m. UTC
Currently, same waitqueue might be woken up continuously:

__sbq_wake_up		__sbq_wake_up
 sbq_wake_ptr -> assume	0
			 sbq_wake_ptr -> 0
 atomic_dec_return
			atomic_dec_return
 atomic_cmpxchg -> succeed
			 atomic_cmpxchg -> failed
			  return true

			__sbq_wake_up
			 sbq_wake_ptr
			  atomic_read(&sbq->wake_index) -> still 0
 sbq_index_atomic_inc -> inc to 1
			  if (waitqueue_active(&ws->wait))
			   if (wake_index != atomic_read(&sbq->wake_index))
			    atomic_set -> reset from 1 to 0
 wake_up_nr -> wake up first waitqueue
			    // continue to wake up in first waitqueue

What's worse, io hung is possible in theory because wake up might be
missed. For example, 2 * wake_batch tags are put, while only wake_batch
threads are worken:

__sbq_wake_up
 atomic_cmpxchg -> reset wait_cnt
			__sbq_wake_up -> decrease wait_cnt
			...
			__sbq_wake_up -> wait_cnt is decreased to 0 again
			 atomic_cmpxchg
			 sbq_index_atomic_inc -> increase wake_index
			 wake_up_nr -> wake up and waitqueue might be empty
 sbq_index_atomic_inc -> increase again, one waitqueue is skipped
 wake_up_nr -> invalid wake up because old wakequeue might be empty

To fix the problem, refactor to make sure waitqueues will be woken up
one by one, and also choose the next waitqueue by the number of threads
that are waiting to keep waitqueues balanced.

Test cmd: nr_requests is 64, and queue_depth is 32
[global]
filename=/dev/sda
ioengine=libaio
direct=1
allow_mounted_write=0
group_reporting

[test]
rw=randwrite
bs=4k
numjobs=512
iodepth=2

Before this patch, waitqueues can be extremly unbalanced, for example:
ws_active=484
ws={
        {.wait_cnt=8, .waiters_cnt=117},
        {.wait_cnt=8, .waiters_cnt=59},
        {.wait_cnt=8, .waiters_cnt=76},
        {.wait_cnt=8, .waiters_cnt=0},
        {.wait_cnt=5, .waiters_cnt=24},
        {.wait_cnt=8, .waiters_cnt=12},
        {.wait_cnt=8, .waiters_cnt=21},
        {.wait_cnt=8, .waiters_cnt=175},
}

With this patch, waitqueues is always balanced, for example:
ws_active=477
ws={
        {.wait_cnt=8, .waiters_cnt=59},
        {.wait_cnt=6, .waiters_cnt=62},
        {.wait_cnt=8, .waiters_cnt=61},
        {.wait_cnt=8, .waiters_cnt=60},
        {.wait_cnt=8, .waiters_cnt=63},
        {.wait_cnt=8, .waiters_cnt=56},
        {.wait_cnt=8, .waiters_cnt=59},
        {.wait_cnt=8, .waiters_cnt=57},
}

Signed-off-by: Yu Kuai <yukuai3@huawei.com>
---
 lib/sbitmap.c | 81 ++++++++++++++++++++++++++-------------------------
 1 file changed, 42 insertions(+), 39 deletions(-)

Comments

Li, Ming4 April 15, 2022, 6:31 a.m. UTC | #1
On 4/8/2022 3:39 PM, Yu Kuai wrote:
> Currently, same waitqueue might be woken up continuously:
> 
> __sbq_wake_up		__sbq_wake_up
>  sbq_wake_ptr -> assume	0
> 			 sbq_wake_ptr -> 0
>  atomic_dec_return
> 			atomic_dec_return
>  atomic_cmpxchg -> succeed
> 			 atomic_cmpxchg -> failed
> 			  return true
> 
> 			__sbq_wake_up
> 			 sbq_wake_ptr
> 			  atomic_read(&sbq->wake_index) -> still 0
>  sbq_index_atomic_inc -> inc to 1
> 			  if (waitqueue_active(&ws->wait))
> 			   if (wake_index != atomic_read(&sbq->wake_index))
> 			    atomic_set -> reset from 1 to 0
>  wake_up_nr -> wake up first waitqueue
> 			    // continue to wake up in first waitqueue
> 
> What's worse, io hung is possible in theory because wake up might be
> missed. For example, 2 * wake_batch tags are put, while only wake_batch
> threads are worken:
> 
> __sbq_wake_up
>  atomic_cmpxchg -> reset wait_cnt
> 			__sbq_wake_up -> decrease wait_cnt
> 			...
> 			__sbq_wake_up -> wait_cnt is decreased to 0 again
> 			 atomic_cmpxchg
> 			 sbq_index_atomic_inc -> increase wake_index
> 			 wake_up_nr -> wake up and waitqueue might be empty
>  sbq_index_atomic_inc -> increase again, one waitqueue is skipped
>  wake_up_nr -> invalid wake up because old wakequeue might be empty
> 
> To fix the problem, refactor to make sure waitqueues will be woken up
> one by one, and also choose the next waitqueue by the number of threads
> that are waiting to keep waitqueues balanced.
Hi, do you think that updating wake_index before atomic_cmpxchg(ws->wait_cnt) also can solve these two problems?
like this:
__sbq_wake_up()
{
	....
	if (wait_cnt <= 0) {
		ret = atomic_cmpxchg(sbq->wake_index, old_wake_index, next_wake_index);
		if (ret == old_wake_index) {
			ret = atomic_cmpxchg(ws->wait_cnt, wait_cnt, wake_batch);
			if (ret == wait_cnt)
				wake_up_nr(ws->wait, wake_batch);
		}
	}
}

Your solution is picking the waitqueue with the largest waiters_cnt as the next one to be waked up, I think that waitqueue is possible to starve.
if lots of threads in a same waitqueue stop waiting before sbq wakes them up, it will cause the waiters_cnt of waitqueue is much less than others, looks like sbq_update_wake_index() would never pick this waitqueue. What do you think? is it possible?


> 
> Test cmd: nr_requests is 64, and queue_depth is 32
> [global]
> filename=/dev/sda
> ioengine=libaio
> direct=1
> allow_mounted_write=0
> group_reporting
> 
> [test]
> rw=randwrite
> bs=4k
> numjobs=512
> iodepth=2
> 
> Before this patch, waitqueues can be extremly unbalanced, for example:
> ws_active=484
> ws={
>         {.wait_cnt=8, .waiters_cnt=117},
>         {.wait_cnt=8, .waiters_cnt=59},
>         {.wait_cnt=8, .waiters_cnt=76},
>         {.wait_cnt=8, .waiters_cnt=0},
>         {.wait_cnt=5, .waiters_cnt=24},
>         {.wait_cnt=8, .waiters_cnt=12},
>         {.wait_cnt=8, .waiters_cnt=21},
>         {.wait_cnt=8, .waiters_cnt=175},
> }
> 
> With this patch, waitqueues is always balanced, for example:
> ws_active=477
> ws={
>         {.wait_cnt=8, .waiters_cnt=59},
>         {.wait_cnt=6, .waiters_cnt=62},
>         {.wait_cnt=8, .waiters_cnt=61},
>         {.wait_cnt=8, .waiters_cnt=60},
>         {.wait_cnt=8, .waiters_cnt=63},
>         {.wait_cnt=8, .waiters_cnt=56},
>         {.wait_cnt=8, .waiters_cnt=59},
>         {.wait_cnt=8, .waiters_cnt=57},
> }
> 
> Signed-off-by: Yu Kuai <yukuai3@huawei.com>
> ---
>  lib/sbitmap.c | 81 ++++++++++++++++++++++++++-------------------------
>  1 file changed, 42 insertions(+), 39 deletions(-)
> 
> diff --git a/lib/sbitmap.c b/lib/sbitmap.c
> index 393f2b71647a..176fba0252d7 100644
> --- a/lib/sbitmap.c
> +++ b/lib/sbitmap.c
> @@ -575,68 +575,71 @@ void sbitmap_queue_min_shallow_depth(struct sbitmap_queue *sbq,
>  }
>  EXPORT_SYMBOL_GPL(sbitmap_queue_min_shallow_depth);
>  
> -static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
> +/* always choose the 'ws' with the max waiters */
> +static void sbq_update_wake_index(struct sbitmap_queue *sbq,
> +				  int old_wake_index)
>  {
> -	int i, wake_index;
> +	int index, wake_index;
> +	int max_waiters = 0;
>  
> -	if (!atomic_read(&sbq->ws_active))
> -		return NULL;
> +	if (old_wake_index != atomic_read(&sbq->wake_index))
> +		return;
>  
> -	wake_index = atomic_read(&sbq->wake_index);
> -	for (i = 0; i < SBQ_WAIT_QUEUES; i++) {
> -		struct sbq_wait_state *ws = &sbq->ws[wake_index];
> +	for (wake_index = 0; wake_index < SBQ_WAIT_QUEUES; wake_index++) {
> +		struct sbq_wait_state *ws;
> +		int waiters;
>  
> -		if (waitqueue_active(&ws->wait)) {
> -			if (wake_index != atomic_read(&sbq->wake_index))
> -				atomic_set(&sbq->wake_index, wake_index);
> -			return ws;
> -		}
> +		if (wake_index == old_wake_index)
> +			continue;
>  
> -		wake_index = sbq_index_inc(wake_index);
> +		ws = &sbq->ws[wake_index];
> +		waiters = atomic_read(&ws->waiters_cnt);
> +		if (waiters > max_waiters) {
> +			max_waiters = waiters;
> +			index = wake_index;
> +		}
>  	}
>  
> -	return NULL;
> +	if (max_waiters)
> +		atomic_cmpxchg(&sbq->wake_index, old_wake_index, index);
>  }
>  
>  static bool __sbq_wake_up(struct sbitmap_queue *sbq)
>  {
>  	struct sbq_wait_state *ws;
>  	unsigned int wake_batch;
> -	int wait_cnt;
> +	int wait_cnt, wake_index;
>  
> -	ws = sbq_wake_ptr(sbq);
> -	if (!ws)
> +	if (!atomic_read(&sbq->ws_active))
>  		return false;
>  
> +	wake_index = atomic_read(&sbq->wake_index);
> +	ws = &sbq->ws[wake_index];
>  	wait_cnt = atomic_dec_return(&ws->wait_cnt);
> -	if (wait_cnt <= 0) {
> -		int ret;
> -
> -		wake_batch = READ_ONCE(sbq->wake_batch);
> -
> -		/*
> -		 * Pairs with the memory barrier in sbitmap_queue_resize() to
> -		 * ensure that we see the batch size update before the wait
> -		 * count is reset.
> -		 */
> -		smp_mb__before_atomic();
> -
> +	if (wait_cnt > 0) {
> +		return false;
> +	} else if (wait_cnt < 0) {
>  		/*
> -		 * For concurrent callers of this, the one that failed the
> -		 * atomic_cmpxhcg() race should call this function again
> +		 * Concurrent callers should call this function again
>  		 * to wakeup a new batch on a different 'ws'.
>  		 */
> -		ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch);
> -		if (ret == wait_cnt) {
> -			sbq_index_atomic_inc(&sbq->wake_index);
> -			wake_up_nr(&ws->wait, wake_batch);
> -			return false;
> -		}
> -
> +		sbq_update_wake_index(sbq, wake_index);
>  		return true;
>  	}
>  
> -	return false;
> +	sbq_update_wake_index(sbq, wake_index);
> +	wake_batch = READ_ONCE(sbq->wake_batch);
> +
> +	/*
> +	 * Pairs with the memory barrier in sbitmap_queue_resize() to
> +	 * ensure that we see the batch size update before the wait
> +	 * count is reset.
> +	 */
> +	smp_mb__before_atomic();
> +	atomic_set(&ws->wait_cnt, wake_batch);
> +	wake_up_nr(&ws->wait, wake_batch);
> +
> +	return true;
>  }
>  
>  void sbitmap_queue_wake_up(struct sbitmap_queue *sbq)
Yu Kuai April 15, 2022, 7:07 a.m. UTC | #2
在 2022/04/15 14:31, Li, Ming 写道:
> 
> 
> On 4/8/2022 3:39 PM, Yu Kuai wrote:
>> Currently, same waitqueue might be woken up continuously:
>>
>> __sbq_wake_up		__sbq_wake_up
>>   sbq_wake_ptr -> assume	0
>> 			 sbq_wake_ptr -> 0
>>   atomic_dec_return
>> 			atomic_dec_return
>>   atomic_cmpxchg -> succeed
>> 			 atomic_cmpxchg -> failed
>> 			  return true
>>
>> 			__sbq_wake_up
>> 			 sbq_wake_ptr
>> 			  atomic_read(&sbq->wake_index) -> still 0
>>   sbq_index_atomic_inc -> inc to 1
>> 			  if (waitqueue_active(&ws->wait))
>> 			   if (wake_index != atomic_read(&sbq->wake_index))
>> 			    atomic_set -> reset from 1 to 0
>>   wake_up_nr -> wake up first waitqueue
>> 			    // continue to wake up in first waitqueue
>>
>> What's worse, io hung is possible in theory because wake up might be
>> missed. For example, 2 * wake_batch tags are put, while only wake_batch
>> threads are worken:
>>
>> __sbq_wake_up
>>   atomic_cmpxchg -> reset wait_cnt
>> 			__sbq_wake_up -> decrease wait_cnt
>> 			...
>> 			__sbq_wake_up -> wait_cnt is decreased to 0 again
>> 			 atomic_cmpxchg
>> 			 sbq_index_atomic_inc -> increase wake_index
>> 			 wake_up_nr -> wake up and waitqueue might be empty
>>   sbq_index_atomic_inc -> increase again, one waitqueue is skipped
>>   wake_up_nr -> invalid wake up because old wakequeue might be empty
>>
>> To fix the problem, refactor to make sure waitqueues will be woken up
>> one by one, and also choose the next waitqueue by the number of threads
>> that are waiting to keep waitqueues balanced.
> Hi, do you think that updating wake_index before atomic_cmpxchg(ws->wait_cnt) also can solve these two problems?
> like this:
Hi,

The first problem is due to sbq_wake_ptr() is using atomic_set() to
update 'wake_index'.

The second problem is due to __sbq_wake_up() is updating 'wait_cnt'
before 'wait_index'.

> __sbq_wake_up()
> {
> 	....
> 	if (wait_cnt <= 0) {
> 		ret = atomic_cmpxchg(sbq->wake_index, old_wake_index, next_wake_index);
How is the 'next_wake_index' chosen? And the same in sbq_wake_ptr().
> 		if (ret == old_wake_index) {
> 			ret = atomic_cmpxchg(ws->wait_cnt, wait_cnt, wake_batch);

If this failed, just return true with 'wake_index' updated? Then the
caller will call this again, so it seems this can't prevent 'wake_index'
updated multiple times, and 'wait_cnt' in the old 'ws' is not updated.
> 			if (ret == wait_cnt)
> 				wake_up_nr(ws->wait, wake_batch);
> 		}
> 	}
> }
> 
> Your solution is picking the waitqueue with the largest waiters_cnt as the next one to be waked up, I think that waitqueue is possible to starve.
> if lots of threads in a same waitqueue stop waiting before sbq wakes them up, it will cause the waiters_cnt of waitqueue is much less than others, looks like sbq_update_wake_index() would never pick this waitqueue. What do you think? is it possible?

It will be possible if adding threads to waitqueues is not balanced, and
I suppose it's not possible after tag premmption is disabled. However,
instead of chosing the waitqueue with largest waiters_cnt, chosing the
next waitqueue with 'waiters_cnt > 0' might be alternative.

Thanks,
Kuai
diff mbox series

Patch

diff --git a/lib/sbitmap.c b/lib/sbitmap.c
index 393f2b71647a..176fba0252d7 100644
--- a/lib/sbitmap.c
+++ b/lib/sbitmap.c
@@ -575,68 +575,71 @@  void sbitmap_queue_min_shallow_depth(struct sbitmap_queue *sbq,
 }
 EXPORT_SYMBOL_GPL(sbitmap_queue_min_shallow_depth);
 
-static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
+/* always choose the 'ws' with the max waiters */
+static void sbq_update_wake_index(struct sbitmap_queue *sbq,
+				  int old_wake_index)
 {
-	int i, wake_index;
+	int index, wake_index;
+	int max_waiters = 0;
 
-	if (!atomic_read(&sbq->ws_active))
-		return NULL;
+	if (old_wake_index != atomic_read(&sbq->wake_index))
+		return;
 
-	wake_index = atomic_read(&sbq->wake_index);
-	for (i = 0; i < SBQ_WAIT_QUEUES; i++) {
-		struct sbq_wait_state *ws = &sbq->ws[wake_index];
+	for (wake_index = 0; wake_index < SBQ_WAIT_QUEUES; wake_index++) {
+		struct sbq_wait_state *ws;
+		int waiters;
 
-		if (waitqueue_active(&ws->wait)) {
-			if (wake_index != atomic_read(&sbq->wake_index))
-				atomic_set(&sbq->wake_index, wake_index);
-			return ws;
-		}
+		if (wake_index == old_wake_index)
+			continue;
 
-		wake_index = sbq_index_inc(wake_index);
+		ws = &sbq->ws[wake_index];
+		waiters = atomic_read(&ws->waiters_cnt);
+		if (waiters > max_waiters) {
+			max_waiters = waiters;
+			index = wake_index;
+		}
 	}
 
-	return NULL;
+	if (max_waiters)
+		atomic_cmpxchg(&sbq->wake_index, old_wake_index, index);
 }
 
 static bool __sbq_wake_up(struct sbitmap_queue *sbq)
 {
 	struct sbq_wait_state *ws;
 	unsigned int wake_batch;
-	int wait_cnt;
+	int wait_cnt, wake_index;
 
-	ws = sbq_wake_ptr(sbq);
-	if (!ws)
+	if (!atomic_read(&sbq->ws_active))
 		return false;
 
+	wake_index = atomic_read(&sbq->wake_index);
+	ws = &sbq->ws[wake_index];
 	wait_cnt = atomic_dec_return(&ws->wait_cnt);
-	if (wait_cnt <= 0) {
-		int ret;
-
-		wake_batch = READ_ONCE(sbq->wake_batch);
-
-		/*
-		 * Pairs with the memory barrier in sbitmap_queue_resize() to
-		 * ensure that we see the batch size update before the wait
-		 * count is reset.
-		 */
-		smp_mb__before_atomic();
-
+	if (wait_cnt > 0) {
+		return false;
+	} else if (wait_cnt < 0) {
 		/*
-		 * For concurrent callers of this, the one that failed the
-		 * atomic_cmpxhcg() race should call this function again
+		 * Concurrent callers should call this function again
 		 * to wakeup a new batch on a different 'ws'.
 		 */
-		ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch);
-		if (ret == wait_cnt) {
-			sbq_index_atomic_inc(&sbq->wake_index);
-			wake_up_nr(&ws->wait, wake_batch);
-			return false;
-		}
-
+		sbq_update_wake_index(sbq, wake_index);
 		return true;
 	}
 
-	return false;
+	sbq_update_wake_index(sbq, wake_index);
+	wake_batch = READ_ONCE(sbq->wake_batch);
+
+	/*
+	 * Pairs with the memory barrier in sbitmap_queue_resize() to
+	 * ensure that we see the batch size update before the wait
+	 * count is reset.
+	 */
+	smp_mb__before_atomic();
+	atomic_set(&ws->wait_cnt, wake_batch);
+	wake_up_nr(&ws->wait, wake_batch);
+
+	return true;
 }
 
 void sbitmap_queue_wake_up(struct sbitmap_queue *sbq)