diff mbox series

[v3,5/7] rcu: Support direct wake-up of synchronize_rcu() users

Message ID 20231128080033.288050-6-urezki@gmail.com (mailing list archive)
State New, archived
Headers show
Series Reduce synchronize_rcu() latency(V3) | expand

Commit Message

Uladzislau Rezki Nov. 28, 2023, 8 a.m. UTC
This patch introduces a small enhancement which allows to do a
direct wake-up of synchronize_rcu() callers. It occurs after a
completion of grace period, thus by the gp-kthread.

Number of clients is limited by the hard-coded maximum allowed
threshold. The remaining part, if still exists is deferred to
a main worker.

Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
---
 kernel/rcu/tree.c | 39 +++++++++++++++++++++++++++++++++++++--
 1 file changed, 37 insertions(+), 2 deletions(-)

Comments

Paul E. McKenney Dec. 20, 2023, 1:46 a.m. UTC | #1
On Tue, Nov 28, 2023 at 09:00:31AM +0100, Uladzislau Rezki (Sony) wrote:
> This patch introduces a small enhancement which allows to do a
> direct wake-up of synchronize_rcu() callers. It occurs after a
> completion of grace period, thus by the gp-kthread.
> 
> Number of clients is limited by the hard-coded maximum allowed
> threshold. The remaining part, if still exists is deferred to
> a main worker.
> 
> Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>

Nice optimization!

One question below.

> ---
>  kernel/rcu/tree.c | 39 +++++++++++++++++++++++++++++++++++++--
>  1 file changed, 37 insertions(+), 2 deletions(-)
> 
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index d7b48996825f..69663a6d5050 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -1384,6 +1384,12 @@ static void rcu_poll_gp_seq_end_unlocked(unsigned long *snap)
>  		raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
>  }
>  
> +/*
> + * A max threshold for synchronize_rcu() users which are
> + * awaken directly by the rcu_gp_kthread(). Left part is
> + * deferred to the main worker.
> + */
> +#define SR_MAX_USERS_WAKE_FROM_GP 5
>  #define SR_NORMAL_GP_WAIT_HEAD_MAX 5
>  
>  struct sr_wait_node {
> @@ -1617,7 +1623,8 @@ static DECLARE_WORK(sr_normal_gp_cleanup, rcu_sr_normal_gp_cleanup_work);
>   */
>  static void rcu_sr_normal_gp_cleanup(void)
>  {
> -	struct llist_node *wait_tail;
> +	struct llist_node *wait_tail, *head, *rcu;
> +	int done = 0;
>  
>  	wait_tail = sr.srs_wait_tail;
>  	if (wait_tail == NULL)
> @@ -1626,11 +1633,39 @@ static void rcu_sr_normal_gp_cleanup(void)
>  	sr.srs_wait_tail = NULL;
>  	ASSERT_EXCLUSIVE_WRITER(sr.srs_wait_tail);
>  
> +	WARN_ON_ONCE(!rcu_sr_is_wait_head(wait_tail));
> +	head = wait_tail->next;
> +
> +	/*
> +	 * Process (a) and (d) cases. See an illustration. Apart of
> +	 * that it handles the scenario when all clients are done,
> +	 * wait-head is released if last. The worker is not kicked.
> +	 */
> +	llist_for_each_safe(rcu, head, head) {

This does appear to be a clever way to save eight bytes on the stack,
but is our stack space really so restricted?  We are being invoked from
the RCU GP kthread, which isn't using much stack, right?

If so, let's spend the extra local variable and spare the reader a
trip to the llist_for_each_safe() definition.

> +		if (rcu_sr_is_wait_head(rcu)) {
> +			if (!rcu->next) {
> +				rcu_sr_put_wait_head(rcu);
> +				wait_tail->next = NULL;
> +			} else {
> +				wait_tail->next = rcu;
> +			}
> +
> +			break;
> +		}
> +
> +		rcu_sr_normal_complete(rcu);
> +		// It can be last, update a next on this step.
> +		wait_tail->next = head;
> +
> +		if (++done == SR_MAX_USERS_WAKE_FROM_GP)
> +			break;
> +	}
> +
>  	// concurrent sr_normal_gp_cleanup work might observe this update.
>  	smp_store_release(&sr.srs_done_tail, wait_tail);
>  	ASSERT_EXCLUSIVE_WRITER(sr.srs_done_tail);
>  
> -	if (wait_tail)
> +	if (wait_tail->next)
>  		queue_work(system_highpri_wq, &sr_normal_gp_cleanup);
>  }
>  
> -- 
> 2.39.2
>
Uladzislau Rezki Dec. 21, 2023, 11:22 a.m. UTC | #2
On Tue, Dec 19, 2023 at 05:46:11PM -0800, Paul E. McKenney wrote:
> On Tue, Nov 28, 2023 at 09:00:31AM +0100, Uladzislau Rezki (Sony) wrote:
> > This patch introduces a small enhancement which allows to do a
> > direct wake-up of synchronize_rcu() callers. It occurs after a
> > completion of grace period, thus by the gp-kthread.
> > 
> > Number of clients is limited by the hard-coded maximum allowed
> > threshold. The remaining part, if still exists is deferred to
> > a main worker.
> > 
> > Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
> 
> Nice optimization!
> 
> One question below.
> 
> > ---
> >  kernel/rcu/tree.c | 39 +++++++++++++++++++++++++++++++++++++--
> >  1 file changed, 37 insertions(+), 2 deletions(-)
> > 
> > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > index d7b48996825f..69663a6d5050 100644
> > --- a/kernel/rcu/tree.c
> > +++ b/kernel/rcu/tree.c
> > @@ -1384,6 +1384,12 @@ static void rcu_poll_gp_seq_end_unlocked(unsigned long *snap)
> >  		raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
> >  }
> >  
> > +/*
> > + * A max threshold for synchronize_rcu() users which are
> > + * awaken directly by the rcu_gp_kthread(). Left part is
> > + * deferred to the main worker.
> > + */
> > +#define SR_MAX_USERS_WAKE_FROM_GP 5
> >  #define SR_NORMAL_GP_WAIT_HEAD_MAX 5
> >  
> >  struct sr_wait_node {
> > @@ -1617,7 +1623,8 @@ static DECLARE_WORK(sr_normal_gp_cleanup, rcu_sr_normal_gp_cleanup_work);
> >   */
> >  static void rcu_sr_normal_gp_cleanup(void)
> >  {
> > -	struct llist_node *wait_tail;
> > +	struct llist_node *wait_tail, *head, *rcu;
> > +	int done = 0;
> >  
> >  	wait_tail = sr.srs_wait_tail;
> >  	if (wait_tail == NULL)
> > @@ -1626,11 +1633,39 @@ static void rcu_sr_normal_gp_cleanup(void)
> >  	sr.srs_wait_tail = NULL;
> >  	ASSERT_EXCLUSIVE_WRITER(sr.srs_wait_tail);
> >  
> > +	WARN_ON_ONCE(!rcu_sr_is_wait_head(wait_tail));
> > +	head = wait_tail->next;
> > +
> > +	/*
> > +	 * Process (a) and (d) cases. See an illustration. Apart of
> > +	 * that it handles the scenario when all clients are done,
> > +	 * wait-head is released if last. The worker is not kicked.
> > +	 */
> > +	llist_for_each_safe(rcu, head, head) {
> 
> This does appear to be a clever way to save eight bytes on the stack,
> but is our stack space really so restricted?  We are being invoked from
> the RCU GP kthread, which isn't using much stack, right?
> 
> If so, let's spend the extra local variable and spare the reader a
> trip to the llist_for_each_safe() definition.
> 
OK, you mean to go with an extra "next" variable to use it in the
llist-loop. I will change it accordingly!

--
Uladzislau Rezki
diff mbox series

Patch

diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index d7b48996825f..69663a6d5050 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -1384,6 +1384,12 @@  static void rcu_poll_gp_seq_end_unlocked(unsigned long *snap)
 		raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 }
 
+/*
+ * A max threshold for synchronize_rcu() users which are
+ * awaken directly by the rcu_gp_kthread(). Left part is
+ * deferred to the main worker.
+ */
+#define SR_MAX_USERS_WAKE_FROM_GP 5
 #define SR_NORMAL_GP_WAIT_HEAD_MAX 5
 
 struct sr_wait_node {
@@ -1617,7 +1623,8 @@  static DECLARE_WORK(sr_normal_gp_cleanup, rcu_sr_normal_gp_cleanup_work);
  */
 static void rcu_sr_normal_gp_cleanup(void)
 {
-	struct llist_node *wait_tail;
+	struct llist_node *wait_tail, *head, *rcu;
+	int done = 0;
 
 	wait_tail = sr.srs_wait_tail;
 	if (wait_tail == NULL)
@@ -1626,11 +1633,39 @@  static void rcu_sr_normal_gp_cleanup(void)
 	sr.srs_wait_tail = NULL;
 	ASSERT_EXCLUSIVE_WRITER(sr.srs_wait_tail);
 
+	WARN_ON_ONCE(!rcu_sr_is_wait_head(wait_tail));
+	head = wait_tail->next;
+
+	/*
+	 * Process (a) and (d) cases. See an illustration. Apart of
+	 * that it handles the scenario when all clients are done,
+	 * wait-head is released if last. The worker is not kicked.
+	 */
+	llist_for_each_safe(rcu, head, head) {
+		if (rcu_sr_is_wait_head(rcu)) {
+			if (!rcu->next) {
+				rcu_sr_put_wait_head(rcu);
+				wait_tail->next = NULL;
+			} else {
+				wait_tail->next = rcu;
+			}
+
+			break;
+		}
+
+		rcu_sr_normal_complete(rcu);
+		// It can be last, update a next on this step.
+		wait_tail->next = head;
+
+		if (++done == SR_MAX_USERS_WAKE_FROM_GP)
+			break;
+	}
+
 	// concurrent sr_normal_gp_cleanup work might observe this update.
 	smp_store_release(&sr.srs_done_tail, wait_tail);
 	ASSERT_EXCLUSIVE_WRITER(sr.srs_done_tail);
 
-	if (wait_tail)
+	if (wait_tail->next)
 		queue_work(system_highpri_wq, &sr_normal_gp_cleanup);
 }