@@ -98,6 +98,7 @@ struct percpu_ref_data {
percpu_ref_func_t *confirm_switch;
bool force_atomic:1;
bool allow_reinit:1;
+ bool sync:1;
struct rcu_head rcu;
struct percpu_ref *ref;
};
@@ -123,7 +124,8 @@ int __must_check percpu_ref_init(struct percpu_ref *ref,
gfp_t gfp);
void percpu_ref_exit(struct percpu_ref *ref);
void percpu_ref_switch_to_atomic(struct percpu_ref *ref,
- percpu_ref_func_t *confirm_switch);
+ percpu_ref_func_t *confirm_switch,
+ bool sync);
void percpu_ref_switch_to_atomic_sync(struct percpu_ref *ref);
void percpu_ref_switch_to_percpu(struct percpu_ref *ref);
void percpu_ref_kill_and_confirm(struct percpu_ref *ref,
@@ -99,6 +99,7 @@ int percpu_ref_init(struct percpu_ref *ref, percpu_ref_func_t *release,
data->release = release;
data->confirm_switch = NULL;
data->ref = ref;
+ data->sync = false;
ref->data = data;
return 0;
}
@@ -146,21 +147,33 @@ void percpu_ref_exit(struct percpu_ref *ref)
}
EXPORT_SYMBOL_GPL(percpu_ref_exit);
+static inline void percpu_ref_switch_to_atomic_post(struct percpu_ref *ref)
+{
+ struct percpu_ref_data *data = ref->data;
+
+ if (!data->allow_reinit)
+ __percpu_ref_exit(ref);
+
+ /* drop ref from percpu_ref_switch_to_atomic() */
+ percpu_ref_put(ref);
+}
+
static void percpu_ref_call_confirm_rcu(struct rcu_head *rcu)
{
struct percpu_ref_data *data = container_of(rcu,
struct percpu_ref_data, rcu);
struct percpu_ref *ref = data->ref;
+ bool need_put = true;
+
+ if (data->sync)
+ need_put = data->sync = false;
data->confirm_switch(ref);
data->confirm_switch = NULL;
wake_up_all(&percpu_ref_switch_waitq);
- if (!data->allow_reinit)
- __percpu_ref_exit(ref);
-
- /* drop ref from percpu_ref_switch_to_atomic() */
- percpu_ref_put(ref);
+ if (need_put)
+ percpu_ref_switch_to_atomic_post(ref);
}
static void percpu_ref_switch_to_atomic_rcu(struct rcu_head *rcu)
@@ -210,14 +223,19 @@ static void percpu_ref_noop_confirm_switch(struct percpu_ref *ref)
}
static void __percpu_ref_switch_to_atomic(struct percpu_ref *ref,
- percpu_ref_func_t *confirm_switch)
+ percpu_ref_func_t *confirm_switch,
+ bool sync)
{
if (ref->percpu_count_ptr & __PERCPU_REF_ATOMIC) {
if (confirm_switch)
confirm_switch(ref);
+ if (sync)
+ percpu_ref_get(ref);
return;
}
+ ref->data->sync = sync;
+
/* switching from percpu to atomic */
ref->percpu_count_ptr |= __PERCPU_REF_ATOMIC;
@@ -232,13 +250,16 @@ static void __percpu_ref_switch_to_atomic(struct percpu_ref *ref,
call_rcu(&ref->data->rcu, percpu_ref_switch_to_atomic_rcu);
}
-static void __percpu_ref_switch_to_percpu(struct percpu_ref *ref)
+static void __percpu_ref_switch_to_percpu(struct percpu_ref *ref, bool sync)
{
unsigned long __percpu *percpu_count = percpu_count_ptr(ref);
int cpu;
BUG_ON(!percpu_count);
+ if (sync)
+ percpu_ref_get(ref);
+
if (!(ref->percpu_count_ptr & __PERCPU_REF_ATOMIC))
return;
@@ -261,7 +282,8 @@ static void __percpu_ref_switch_to_percpu(struct percpu_ref *ref)
}
static void __percpu_ref_switch_mode(struct percpu_ref *ref,
- percpu_ref_func_t *confirm_switch)
+ percpu_ref_func_t *confirm_switch,
+ bool sync)
{
struct percpu_ref_data *data = ref->data;
@@ -276,9 +298,9 @@ static void __percpu_ref_switch_mode(struct percpu_ref *ref,
percpu_ref_switch_lock);
if (data->force_atomic || percpu_ref_is_dying(ref))
- __percpu_ref_switch_to_atomic(ref, confirm_switch);
+ __percpu_ref_switch_to_atomic(ref, confirm_switch, sync);
else
- __percpu_ref_switch_to_percpu(ref);
+ __percpu_ref_switch_to_percpu(ref, sync);
}
/**
@@ -302,14 +324,15 @@ static void __percpu_ref_switch_mode(struct percpu_ref *ref,
* switching to atomic mode, this function can be called from any context.
*/
void percpu_ref_switch_to_atomic(struct percpu_ref *ref,
- percpu_ref_func_t *confirm_switch)
+ percpu_ref_func_t *confirm_switch,
+ bool sync)
{
unsigned long flags;
spin_lock_irqsave(&percpu_ref_switch_lock, flags);
ref->data->force_atomic = true;
- __percpu_ref_switch_mode(ref, confirm_switch);
+ __percpu_ref_switch_mode(ref, confirm_switch, sync);
spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
}
@@ -325,8 +348,9 @@ EXPORT_SYMBOL_GPL(percpu_ref_switch_to_atomic);
*/
void percpu_ref_switch_to_atomic_sync(struct percpu_ref *ref)
{
- percpu_ref_switch_to_atomic(ref, NULL);
+ percpu_ref_switch_to_atomic(ref, NULL, true);
wait_event(percpu_ref_switch_waitq, !ref->data->confirm_switch);
+ percpu_ref_switch_to_atomic_post(ref);
}
EXPORT_SYMBOL_GPL(percpu_ref_switch_to_atomic_sync);
@@ -355,7 +379,7 @@ void percpu_ref_switch_to_percpu(struct percpu_ref *ref)
spin_lock_irqsave(&percpu_ref_switch_lock, flags);
ref->data->force_atomic = false;
- __percpu_ref_switch_mode(ref, NULL);
+ __percpu_ref_switch_mode(ref, NULL, false);
spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
}
@@ -390,7 +414,7 @@ void percpu_ref_kill_and_confirm(struct percpu_ref *ref,
ref->data->release);
ref->percpu_count_ptr |= __PERCPU_REF_DEAD;
- __percpu_ref_switch_mode(ref, confirm_kill);
+ __percpu_ref_switch_mode(ref, confirm_kill, false);
percpu_ref_put(ref);
spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
@@ -470,7 +494,7 @@ void percpu_ref_resurrect(struct percpu_ref *ref)
ref->percpu_count_ptr &= ~__PERCPU_REF_DEAD;
percpu_ref_get(ref);
- __percpu_ref_switch_mode(ref, NULL);
+ __percpu_ref_switch_mode(ref, NULL, false);
spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
}
In the percpu_ref_call_confirm_rcu(), we call the wake_up_all() before calling percpu_ref_put(), which will cause the value of percpu_ref to be unstable when percpu_ref_switch_to_atomic_sync() returns. CPU0 CPU1 percpu_ref_switch_to_atomic_sync(&ref) --> percpu_ref_switch_to_atomic(&ref) --> percpu_ref_get(ref); /* put after confirmation */ call_rcu(&ref->data->rcu, percpu_ref_switch_to_atomic_rcu); percpu_ref_switch_to_atomic_rcu --> percpu_ref_call_confirm_rcu --> data->confirm_switch = NULL; wake_up_all(&percpu_ref_switch_waitq); /* here waiting to wake up */ wait_event(percpu_ref_switch_waitq, !ref->data->confirm_switch); (A)percpu_ref_put(ref); /* The value of &ref is unstable! */ percpu_ref_is_zero(&ref) (B)percpu_ref_put(ref); As shown above, assuming that the counts on each cpu add up to 0 before calling percpu_ref_switch_to_atomic_sync(), we expect that after switching to atomic mode, percpu_ref_is_zero() can return true. But actually it will return different values in the two cases of A and B, which is not what we expected. Now there are two users of percpu_ref_switch_to_atomic_sync() in the kernel: i. mddev->writes_pending in the driver/md/md.c ii. q->q_usage_counter in the block/blk-pm.c And they are all used as shown above. In the worst case, percpu_ref_is_zero() may not hold because of the case B every time. While this is unlikely to occur in a production environment, it is a problem. This patch moves percpu_ref_put() out of the rcu handler and call it after wait_event(), which can makes ref stable after percpu_ref_switch_to_atomic_sync() returns. Then in the example above, percpu_ref_is_zero() can see a steady 0 value, which is what we would expect. Signed-off-by: Qi Zheng <zhengqi.arch@bytedance.com> --- include/linux/percpu-refcount.h | 4 ++- lib/percpu-refcount.c | 56 +++++++++++++++++++++++---------- 2 files changed, 43 insertions(+), 17 deletions(-)