Message ID | 20220803121504.212071-1-yukuai1@huaweicloud.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | sbitmap: fix possible io hung due to lost wakeup | expand |
在 2022/08/03 20:15, Yu Kuai 写道: > From: Yu Kuai <yukuai3@huawei.com> > > There are two problems can lead to lost wakeup: > > 1) invalid wakeup on the wrong waitqueue: > > For example, 2 * wake_batch tags are put, while only wake_batch threads > are woken: > > __sbq_wake_up > atomic_cmpxchg -> reset wait_cnt > __sbq_wake_up -> decrease wait_cnt > ... > __sbq_wake_up -> wait_cnt is decreased to 0 again > atomic_cmpxchg > sbq_index_atomic_inc -> increase wake_index > wake_up_nr -> wake up and waitqueue might be empty > sbq_index_atomic_inc -> increase again, one waitqueue is skipped > wake_up_nr -> invalid wake up because old wakequeue might be empty > > To fix the problem, increasing 'wake_index' before resetting 'wait_cnt'. > > 2) 'wait_cnt' can be decreased while waitqueue is empty > > As pointed out by Jan Kara, following race is possible: > > CPU1 CPU2 > __sbq_wake_up __sbq_wake_up > sbq_wake_ptr() sbq_wake_ptr() -> the same > wait_cnt = atomic_dec_return() > /* decreased to 0 */ > sbq_index_atomic_inc() > /* move to next waitqueue */ > atomic_set() > /* reset wait_cnt */ > wake_up_nr() > /* wake up on the old waitqueue */ > wait_cnt = atomic_dec_return() > /* > * decrease wait_cnt in the old > * waitqueue, while it can be > * empty. > */ > > Fix the problem by waking up before updating 'wake_index' and > 'wait_cnt'. > > With this patch, noted that 'wait_cnt' is still decreased in the old > empty waitqueue, however, the wakeup is redirected to a active waitqueue, > and the extra decrement on the old empty waitqueue is not handled. friendly ping ... > > Fixes: 88459642cba4 ("blk-mq: abstract tag allocation out into sbitmap library") > Signed-off-by: Yu Kuai <yukuai3@huawei.com> > Reviewed-by: Jan Kara <jack@suse.cz> > --- > Changes in official version: > - fix spelling mistake in comments > - add review tag > Changes in rfc v4: > - remove patch 1, which improve fairness with overhead > - merge patch2 and patch 3 > Changes in rfc v3: > - rename patch 2, and add some comments. > - add patch 3, which fixes a new issue pointed out by Jan Kara. > Changes in rfc v2: > - split to spearate patches for different problem. > - add fix tag > > previous versions: > rfc v1: https://lore.kernel.org/all/20220617141125.3024491-1-yukuai3@huawei.com/ > rfc v2: https://lore.kernel.org/all/20220619080309.1630027-1-yukuai3@huawei.com/ > rfc v3: https://lore.kernel.org/all/20220710042200.20936-1-yukuai1@huaweicloud.com/ > rfc v4: https://lore.kernel.org/all/20220723024122.2990436-1-yukuai1@huaweicloud.com/ > lib/sbitmap.c | 55 ++++++++++++++++++++++++++++++--------------------- > 1 file changed, 33 insertions(+), 22 deletions(-) > > diff --git a/lib/sbitmap.c b/lib/sbitmap.c > index 29eb0484215a..1aa55806f6a5 100644 > --- a/lib/sbitmap.c > +++ b/lib/sbitmap.c > @@ -611,32 +611,43 @@ static bool __sbq_wake_up(struct sbitmap_queue *sbq) > return false; > > wait_cnt = atomic_dec_return(&ws->wait_cnt); > - if (wait_cnt <= 0) { > - int ret; > + /* > + * For concurrent callers of this, callers should call this function > + * again to wakeup a new batch on a different 'ws'. > + */ > + if (wait_cnt < 0 || !waitqueue_active(&ws->wait)) > + return true; > > - wake_batch = READ_ONCE(sbq->wake_batch); > + if (wait_cnt > 0) > + return false; > > - /* > - * Pairs with the memory barrier in sbitmap_queue_resize() to > - * ensure that we see the batch size update before the wait > - * count is reset. > - */ > - smp_mb__before_atomic(); > + wake_batch = READ_ONCE(sbq->wake_batch); > > - /* > - * For concurrent callers of this, the one that failed the > - * atomic_cmpxhcg() race should call this function again > - * to wakeup a new batch on a different 'ws'. > - */ > - ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch); > - if (ret == wait_cnt) { > - sbq_index_atomic_inc(&sbq->wake_index); > - wake_up_nr(&ws->wait, wake_batch); > - return false; > - } > + /* > + * Wake up first in case that concurrent callers decrease wait_cnt > + * while waitqueue is empty. > + */ > + wake_up_nr(&ws->wait, wake_batch); > > - return true; > - } > + /* > + * Pairs with the memory barrier in sbitmap_queue_resize() to > + * ensure that we see the batch size update before the wait > + * count is reset. > + * > + * Also pairs with the implicit barrier between decrementing wait_cnt > + * and checking for waitqueue_active() to make sure waitqueue_active() > + * sees result of the wakeup if atomic_dec_return() has seen the result > + * of atomic_set(). > + */ > + smp_mb__before_atomic(); > + > + /* > + * Increase wake_index before updating wait_cnt, otherwise concurrent > + * callers can see valid wait_cnt in old waitqueue, which can cause > + * invalid wakeup on the old waitqueue. > + */ > + sbq_index_atomic_inc(&sbq->wake_index); > + atomic_set(&ws->wait_cnt, wake_batch); > > return false; > } >
On Wed, 3 Aug 2022 20:15:04 +0800, Yu Kuai wrote: > From: Yu Kuai <yukuai3@huawei.com> > > There are two problems can lead to lost wakeup: > > 1) invalid wakeup on the wrong waitqueue: > > For example, 2 * wake_batch tags are put, while only wake_batch threads > are woken: > > [...] Applied, thanks! [1/1] sbitmap: fix possible io hung due to lost wakeup commit: 040b83fcecfb86f3225d3a5de7fd9b3fbccf83b4 Best regards,
On Wed, Aug 03, 2022 at 08:15:04PM +0800, Yu Kuai wrote: > wait_cnt = atomic_dec_return(&ws->wait_cnt); > - if (wait_cnt <= 0) { > - int ret; > + /* > + * For concurrent callers of this, callers should call this function > + * again to wakeup a new batch on a different 'ws'. > + */ > + if (wait_cnt < 0 || !waitqueue_active(&ws->wait)) > + return true; If wait_cnt is '0', but the waitqueue_active happens to be false due to racing with add_wait_queue(), this returns true so the caller will retry. The next atomic_dec will set the current waitstate wait_cnt < 0, which also forces an early return true. When does the wake up happen, or wait_cnt and wait_index get updated in that case? > - wake_batch = READ_ONCE(sbq->wake_batch); > + if (wait_cnt > 0) > + return false; > > - /* > - * Pairs with the memory barrier in sbitmap_queue_resize() to > - * ensure that we see the batch size update before the wait > - * count is reset. > - */ > - smp_mb__before_atomic(); > + wake_batch = READ_ONCE(sbq->wake_batch); > > - /* > - * For concurrent callers of this, the one that failed the > - * atomic_cmpxhcg() race should call this function again > - * to wakeup a new batch on a different 'ws'. > - */ > - ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch); > - if (ret == wait_cnt) { > - sbq_index_atomic_inc(&sbq->wake_index); > - wake_up_nr(&ws->wait, wake_batch); > - return false; > - } > + /* > + * Wake up first in case that concurrent callers decrease wait_cnt > + * while waitqueue is empty. > + */ > + wake_up_nr(&ws->wait, wake_batch); > > - return true; > - } > + /* > + * Pairs with the memory barrier in sbitmap_queue_resize() to > + * ensure that we see the batch size update before the wait > + * count is reset. > + * > + * Also pairs with the implicit barrier between decrementing wait_cnt > + * and checking for waitqueue_active() to make sure waitqueue_active() > + * sees result of the wakeup if atomic_dec_return() has seen the result > + * of atomic_set(). > + */ > + smp_mb__before_atomic(); > + > + /* > + * Increase wake_index before updating wait_cnt, otherwise concurrent > + * callers can see valid wait_cnt in old waitqueue, which can cause > + * invalid wakeup on the old waitqueue. > + */ > + sbq_index_atomic_inc(&sbq->wake_index); > + atomic_set(&ws->wait_cnt, wake_batch); > > return false; > } > -- > 2.31.1 >
Hi, 在 2022/09/07 5:27, Keith Busch 写道: > On Wed, Aug 03, 2022 at 08:15:04PM +0800, Yu Kuai wrote: >> wait_cnt = atomic_dec_return(&ws->wait_cnt); >> - if (wait_cnt <= 0) { >> - int ret; >> + /* >> + * For concurrent callers of this, callers should call this function >> + * again to wakeup a new batch on a different 'ws'. >> + */ >> + if (wait_cnt < 0 || !waitqueue_active(&ws->wait)) >> + return true; > > If wait_cnt is '0', but the waitqueue_active happens to be false due to racing > with add_wait_queue(), this returns true so the caller will retry. The next > atomic_dec will set the current waitstate wait_cnt < 0, which also forces an > early return true. When does the wake up happen, or wait_cnt and wait_index get > updated in that case? If waitqueue becomes empty, then concurrent callers can go on: __sbq_wake_up sbq_wake_ptr for (i = 0; i < SBQ_WAIT_QUEUES; i++) if (waitqueue_active(&ws->wait)) -> only choose the active waitqueue If waitqueue is not empty, it is the same with or without this patch, concurrent caller will have to wait for the one who wins the race: Before: __sbq_wake_up atomic_cmpxchg -> win the race sbq_index_atomic_inc -> concurrent callers can go After: __sbq_wake_up wake_up_nr -> concurrent callers can go on if waitqueue become empty atomic_dec_return -> return 0 sbq_index_atomic_inc Thanks, Kuai > >> - wake_batch = READ_ONCE(sbq->wake_batch); >> + if (wait_cnt > 0) >> + return false; >> >> - /* >> - * Pairs with the memory barrier in sbitmap_queue_resize() to >> - * ensure that we see the batch size update before the wait >> - * count is reset. >> - */ >> - smp_mb__before_atomic(); >> + wake_batch = READ_ONCE(sbq->wake_batch); >> >> - /* >> - * For concurrent callers of this, the one that failed the >> - * atomic_cmpxhcg() race should call this function again >> - * to wakeup a new batch on a different 'ws'. >> - */ >> - ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch); >> - if (ret == wait_cnt) { >> - sbq_index_atomic_inc(&sbq->wake_index); >> - wake_up_nr(&ws->wait, wake_batch); >> - return false; >> - } >> + /* >> + * Wake up first in case that concurrent callers decrease wait_cnt >> + * while waitqueue is empty. >> + */ >> + wake_up_nr(&ws->wait, wake_batch); >> >> - return true; >> - } >> + /* >> + * Pairs with the memory barrier in sbitmap_queue_resize() to >> + * ensure that we see the batch size update before the wait >> + * count is reset. >> + * >> + * Also pairs with the implicit barrier between decrementing wait_cnt >> + * and checking for waitqueue_active() to make sure waitqueue_active() >> + * sees result of the wakeup if atomic_dec_return() has seen the result >> + * of atomic_set(). >> + */ >> + smp_mb__before_atomic(); >> + >> + /* >> + * Increase wake_index before updating wait_cnt, otherwise concurrent >> + * callers can see valid wait_cnt in old waitqueue, which can cause >> + * invalid wakeup on the old waitqueue. >> + */ >> + sbq_index_atomic_inc(&sbq->wake_index); >> + atomic_set(&ws->wait_cnt, wake_batch); >> >> return false; >> } >> -- >> 2.31.1 >> > . >
On Tue 06-09-22 15:27:51, Keith Busch wrote: > On Wed, Aug 03, 2022 at 08:15:04PM +0800, Yu Kuai wrote: > > wait_cnt = atomic_dec_return(&ws->wait_cnt); > > - if (wait_cnt <= 0) { > > - int ret; > > + /* > > + * For concurrent callers of this, callers should call this function > > + * again to wakeup a new batch on a different 'ws'. > > + */ > > + if (wait_cnt < 0 || !waitqueue_active(&ws->wait)) > > + return true; > > If wait_cnt is '0', but the waitqueue_active happens to be false due to racing > with add_wait_queue(), this returns true so the caller will retry. Well, note that sbq_wake_ptr() called to obtain 'ws' did waitqueue_active() check. So !waitqueue_active() should really happen only if waiter was woken up by someone else or so. Not that it would matter much but I wanted to point it out. > The next atomic_dec will set the current waitstate wait_cnt < 0, which > also forces an early return true. When does the wake up happen, or > wait_cnt and wait_index get updated in that case? I guess your concern could be rephrased as: Who's going to ever set ws->wait_cnt to value > 0 if we ever exit with wait_cnt == 0 due to !waitqueue_active() condition? And that is a good question and I think that's a bug in this patch. I think we need something like: ... /* * For concurrent callers of this, callers should call this function * again to wakeup a new batch on a different 'ws'. */ if (wait_cnt < 0) return true; /* * If we decremented queue without waiters, retry to avoid lost * wakeups. */ if (wait_cnt > 0) return !waitqueue_active(&ws->wait); /* * When wait_cnt == 0, we have to be particularly careful as we are * responsible to reset wait_cnt regardless whether we've actually * woken up anybody. But in case we didn't wakeup anybody, we still * need to retry. */ ret = !waitqueue_active(&ws->wait); wake_batch = READ_ONCE(sbq->wake_batch); /* * Wake up first in case that concurrent callers decrease wait_cnt * while waitqueue is empty. */ wake_up_nr(&ws->wait, wake_batch); ... return ret; Does this fix your concern Keith? Honza > > > > - wake_batch = READ_ONCE(sbq->wake_batch); > > + if (wait_cnt > 0) > > + return false; > > > > - /* > > - * Pairs with the memory barrier in sbitmap_queue_resize() to > > - * ensure that we see the batch size update before the wait > > - * count is reset. > > - */ > > - smp_mb__before_atomic(); > > + wake_batch = READ_ONCE(sbq->wake_batch); > > > > - /* > > - * For concurrent callers of this, the one that failed the > > - * atomic_cmpxhcg() race should call this function again > > - * to wakeup a new batch on a different 'ws'. > > - */ > > - ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch); > > - if (ret == wait_cnt) { > > - sbq_index_atomic_inc(&sbq->wake_index); > > - wake_up_nr(&ws->wait, wake_batch); > > - return false; > > - } > > + /* > > + * Wake up first in case that concurrent callers decrease wait_cnt > > + * while waitqueue is empty. > > + */ > > + wake_up_nr(&ws->wait, wake_batch); > > > > - return true; > > - } > > + /* > > + * Pairs with the memory barrier in sbitmap_queue_resize() to > > + * ensure that we see the batch size update before the wait > > + * count is reset. > > + * > > + * Also pairs with the implicit barrier between decrementing wait_cnt > > + * and checking for waitqueue_active() to make sure waitqueue_active() > > + * sees result of the wakeup if atomic_dec_return() has seen the result > > + * of atomic_set(). > > + */ > > + smp_mb__before_atomic(); > > + > > + /* > > + * Increase wake_index before updating wait_cnt, otherwise concurrent > > + * callers can see valid wait_cnt in old waitqueue, which can cause > > + * invalid wakeup on the old waitqueue. > > + */ > > + sbq_index_atomic_inc(&sbq->wake_index); > > + atomic_set(&ws->wait_cnt, wake_batch); > > > > return false; > > } > > -- > > 2.31.1 > >
On Wed, Sep 07, 2022 at 12:23:18PM +0200, Jan Kara wrote: > On Tue 06-09-22 15:27:51, Keith Busch wrote: > > On Wed, Aug 03, 2022 at 08:15:04PM +0800, Yu Kuai wrote: > > > wait_cnt = atomic_dec_return(&ws->wait_cnt); > > > - if (wait_cnt <= 0) { > > > - int ret; > > > + /* > > > + * For concurrent callers of this, callers should call this function > > > + * again to wakeup a new batch on a different 'ws'. > > > + */ > > > + if (wait_cnt < 0 || !waitqueue_active(&ws->wait)) > > > + return true; > > > > If wait_cnt is '0', but the waitqueue_active happens to be false due to racing > > with add_wait_queue(), this returns true so the caller will retry. > > Well, note that sbq_wake_ptr() called to obtain 'ws' did waitqueue_active() > check. So !waitqueue_active() should really happen only if waiter was woken > up by someone else or so. Not that it would matter much but I wanted to > point it out. > > > The next atomic_dec will set the current waitstate wait_cnt < 0, which > > also forces an early return true. When does the wake up happen, or > > wait_cnt and wait_index get updated in that case? > > I guess your concern could be rephrased as: Who's going to ever set > ws->wait_cnt to value > 0 if we ever exit with wait_cnt == 0 due to > !waitqueue_active() condition? > > And that is a good question and I think that's a bug in this patch. I think > we need something like: > > ... > /* > * For concurrent callers of this, callers should call this function > * again to wakeup a new batch on a different 'ws'. > */ > if (wait_cnt < 0) > return true; > /* > * If we decremented queue without waiters, retry to avoid lost > * wakeups. > */ > if (wait_cnt > 0) > return !waitqueue_active(&ws->wait); I'm not sure about this part. We've already decremented, so the freed bit is accounted for against the batch. Returning true here may double-count the freed bit, right? > /* > * When wait_cnt == 0, we have to be particularly careful as we are > * responsible to reset wait_cnt regardless whether we've actually > * woken up anybody. But in case we didn't wakeup anybody, we still > * need to retry. > */ > ret = !waitqueue_active(&ws->wait); > wake_batch = READ_ONCE(sbq->wake_batch); > /* > * Wake up first in case that concurrent callers decrease wait_cnt > * while waitqueue is empty. > */ > wake_up_nr(&ws->wait, wake_batch); > ... > > return ret; > > Does this fix your concern Keith? Other than the above comment, this does appear to address the concern. Thanks!
On Wed 07-09-22 08:13:40, Keith Busch wrote: > On Wed, Sep 07, 2022 at 12:23:18PM +0200, Jan Kara wrote: > > On Tue 06-09-22 15:27:51, Keith Busch wrote: > > > On Wed, Aug 03, 2022 at 08:15:04PM +0800, Yu Kuai wrote: > > > > wait_cnt = atomic_dec_return(&ws->wait_cnt); > > > > - if (wait_cnt <= 0) { > > > > - int ret; > > > > + /* > > > > + * For concurrent callers of this, callers should call this function > > > > + * again to wakeup a new batch on a different 'ws'. > > > > + */ > > > > + if (wait_cnt < 0 || !waitqueue_active(&ws->wait)) > > > > + return true; > > > > > > If wait_cnt is '0', but the waitqueue_active happens to be false due to racing > > > with add_wait_queue(), this returns true so the caller will retry. > > > > Well, note that sbq_wake_ptr() called to obtain 'ws' did waitqueue_active() > > check. So !waitqueue_active() should really happen only if waiter was woken > > up by someone else or so. Not that it would matter much but I wanted to > > point it out. > > > > > The next atomic_dec will set the current waitstate wait_cnt < 0, which > > > also forces an early return true. When does the wake up happen, or > > > wait_cnt and wait_index get updated in that case? > > > > I guess your concern could be rephrased as: Who's going to ever set > > ws->wait_cnt to value > 0 if we ever exit with wait_cnt == 0 due to > > !waitqueue_active() condition? > > > > And that is a good question and I think that's a bug in this patch. I think > > we need something like: > > > > ... > > /* > > * For concurrent callers of this, callers should call this function > > * again to wakeup a new batch on a different 'ws'. > > */ > > if (wait_cnt < 0) > > return true; > > /* > > * If we decremented queue without waiters, retry to avoid lost > > * wakeups. > > */ > > if (wait_cnt > 0) > > return !waitqueue_active(&ws->wait); > > I'm not sure about this part. We've already decremented, so the freed bit is > accounted for against the batch. Returning true here may double-count the freed > bit, right? Yes, we may wake up waiters unnecessarily frequently. But that's a performance issue at worst and only if it happens frequently. So I don't think it matters in practice (famous last words ;). Honza
On Wed, Sep 07, 2022 at 06:41:50PM +0200, Jan Kara wrote: > On Wed 07-09-22 08:13:40, Keith Busch wrote: > > > > I'm not sure about this part. We've already decremented, so the freed bit is > > accounted for against the batch. Returning true here may double-count the freed > > bit, right? > > Yes, we may wake up waiters unnecessarily frequently. But that's a > performance issue at worst and only if it happens frequently. So I don't > think it matters in practice (famous last words ;). Ok, let's go with that. Do you want to send the patch? I have a follow-up fix for batched tagging that depends on this.
On Wed 07-09-22 12:20:09, Keith Busch wrote: > On Wed, Sep 07, 2022 at 06:41:50PM +0200, Jan Kara wrote: > > On Wed 07-09-22 08:13:40, Keith Busch wrote: > > > > > > I'm not sure about this part. We've already decremented, so the freed bit is > > > accounted for against the batch. Returning true here may double-count the freed > > > bit, right? > > > > Yes, we may wake up waiters unnecessarily frequently. But that's a > > performance issue at worst and only if it happens frequently. So I don't > > think it matters in practice (famous last words ;). > > Ok, let's go with that. Do you want to send the patch? I have a follow-up fix > for batched tagging that depends on this. I think Kuai will do that. But if not, I can certainly take his patch and resubmit it with the fixup. Honza
Hi, Jan 在 2022/09/08 17:33, Jan Kara 写道: > On Wed 07-09-22 12:20:09, Keith Busch wrote: >> On Wed, Sep 07, 2022 at 06:41:50PM +0200, Jan Kara wrote: >>> On Wed 07-09-22 08:13:40, Keith Busch wrote: >>>> >>>> I'm not sure about this part. We've already decremented, so the freed bit is >>>> accounted for against the batch. Returning true here may double-count the freed >>>> bit, right? >>> >>> Yes, we may wake up waiters unnecessarily frequently. But that's a >>> performance issue at worst and only if it happens frequently. So I don't >>> think it matters in practice (famous last words ;). >> >> Ok, let's go with that. Do you want to send the patch? I have a follow-up fix >> for batched tagging that depends on this. > > I think Kuai will do that. But if not, I can certainly take his patch and > resubmit it with the fixup. Sorry that I'm too busy with work stuff recently, I can do that in a few weeks. In the meantime, feel free go on without me. Thanks, Kuai > > Honza >
diff --git a/lib/sbitmap.c b/lib/sbitmap.c index 29eb0484215a..1aa55806f6a5 100644 --- a/lib/sbitmap.c +++ b/lib/sbitmap.c @@ -611,32 +611,43 @@ static bool __sbq_wake_up(struct sbitmap_queue *sbq) return false; wait_cnt = atomic_dec_return(&ws->wait_cnt); - if (wait_cnt <= 0) { - int ret; + /* + * For concurrent callers of this, callers should call this function + * again to wakeup a new batch on a different 'ws'. + */ + if (wait_cnt < 0 || !waitqueue_active(&ws->wait)) + return true; - wake_batch = READ_ONCE(sbq->wake_batch); + if (wait_cnt > 0) + return false; - /* - * Pairs with the memory barrier in sbitmap_queue_resize() to - * ensure that we see the batch size update before the wait - * count is reset. - */ - smp_mb__before_atomic(); + wake_batch = READ_ONCE(sbq->wake_batch); - /* - * For concurrent callers of this, the one that failed the - * atomic_cmpxhcg() race should call this function again - * to wakeup a new batch on a different 'ws'. - */ - ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch); - if (ret == wait_cnt) { - sbq_index_atomic_inc(&sbq->wake_index); - wake_up_nr(&ws->wait, wake_batch); - return false; - } + /* + * Wake up first in case that concurrent callers decrease wait_cnt + * while waitqueue is empty. + */ + wake_up_nr(&ws->wait, wake_batch); - return true; - } + /* + * Pairs with the memory barrier in sbitmap_queue_resize() to + * ensure that we see the batch size update before the wait + * count is reset. + * + * Also pairs with the implicit barrier between decrementing wait_cnt + * and checking for waitqueue_active() to make sure waitqueue_active() + * sees result of the wakeup if atomic_dec_return() has seen the result + * of atomic_set(). + */ + smp_mb__before_atomic(); + + /* + * Increase wake_index before updating wait_cnt, otherwise concurrent + * callers can see valid wait_cnt in old waitqueue, which can cause + * invalid wakeup on the old waitqueue. + */ + sbq_index_atomic_inc(&sbq->wake_index); + atomic_set(&ws->wait_cnt, wake_batch); return false; }