diff mbox series

[1/6] net/rds: Avoid queuing superfluous send and recv work

Message ID 20250227042638.82553-2-allison.henderson@oracle.com (mailing list archive)
State New
Delegated to: Netdev Maintainers
Headers show
Series RDS bug fix collection | expand

Checks

Context Check Description
netdev/series_format warning Target tree name not specified in the subject
netdev/tree_selection success Guessed tree name to be net-next, async
netdev/ynl success Generated files up to date; no warnings/errors; no diff in generated;
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 0 this patch: 0
netdev/build_tools success No tools touched, skip
netdev/cc_maintainers warning 6 maintainers not CCed: linux-rdma@vger.kernel.org rds-devel@oss.oracle.com kuba@kernel.org horms@kernel.org edumazet@google.com pabeni@redhat.com
netdev/build_clang success Errors and warnings before: 0 this patch: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 12 this patch: 12
netdev/checkpatch warning WARNING: line length of 90 exceeds 80 columns
netdev/build_clang_rust success No Rust files in patch. Skipping build
netdev/kdoc success Errors and warnings before: 30 this patch: 30
netdev/source_inline success Was 0 now: 0

Commit Message

Allison Henderson Feb. 27, 2025, 4:26 a.m. UTC
From: Håkon Bugge <haakon.bugge@oracle.com>

Queuing work on the connect path's work-queue is done from a plurality
of places and contexts.  Two more bits in cp_flags are defined, and
together with some helper functions, the work is only queued if not
already queued.

This to avoid the work queue being overloaded, reducing the connection
management performance, since the connection management uses the same
work queue.

Signed-off-by: Håkon Bugge <haakon.bugge@oracle.com>
Signed-off-by: Gerd Rausch <gerd.rausch@oracle.com>
Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
---
 net/rds/connection.c | 11 ++++++++---
 net/rds/ib_recv.c    |  2 +-
 net/rds/ib_send.c    |  2 +-
 net/rds/rds.h        | 33 +++++++++++++++++++++++++++++++++
 net/rds/send.c       |  6 +++---
 net/rds/tcp.c        | 12 ++++++++++--
 net/rds/tcp_recv.c   |  2 +-
 net/rds/tcp_send.c   |  2 +-
 net/rds/threads.c    | 25 +++++++++++++++++--------
 9 files changed, 75 insertions(+), 20 deletions(-)
diff mbox series

Patch

diff --git a/net/rds/connection.c b/net/rds/connection.c
index c749c5525b40..1d80586fdda2 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -445,9 +445,14 @@  static void rds_conn_path_destroy(struct rds_conn_path *cp)
 	if (!cp->cp_transport_data)
 		return;
 
-	/* make sure lingering queued work won't try to ref the conn */
-	cancel_delayed_work_sync(&cp->cp_send_w);
-	cancel_delayed_work_sync(&cp->cp_recv_w);
+	/* make sure lingering queued work won't try to ref the
+	 * conn. If there is work queued, we cancel it (and set the
+	 * bit to avoid any re-queueing)
+	 */
+	if (test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
+		cancel_delayed_work_sync(&cp->cp_send_w);
+	if (test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags))
+		cancel_delayed_work_sync(&cp->cp_recv_w);
 
 	rds_conn_path_drop(cp, true);
 	flush_work(&cp->cp_down_w);
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index e53b7f266bd7..4ecee1ff1c26 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -457,7 +457,7 @@  void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
 	    (must_wake ||
 	    (can_wait && rds_ib_ring_low(&ic->i_recv_ring)) ||
 	    rds_ib_ring_empty(&ic->i_recv_ring))) {
-		queue_delayed_work(rds_wq, &conn->c_recv_w, 1);
+		rds_cond_queue_recv_work(conn->c_path + 0, 1);
 	}
 	if (can_wait)
 		cond_resched();
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 4190b90ff3b1..95edd4cb8528 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -419,7 +419,7 @@  void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
 
 	atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
 	if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
-		queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+		rds_cond_queue_send_work(conn->c_path + 0, 0);
 
 	WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
 
diff --git a/net/rds/rds.h b/net/rds/rds.h
index dc360252c515..c9a22d0e887b 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -90,6 +90,8 @@  enum {
 #define RDS_IN_XMIT		2
 #define RDS_RECV_REFILL		3
 #define	RDS_DESTROY_PENDING	4
+#define RDS_SEND_WORK_QUEUED	5
+#define RDS_RECV_WORK_QUEUED	6
 
 /* Max number of multipaths per RDS connection. Must be a power of 2 */
 #define	RDS_MPATH_WORKERS	8
@@ -791,6 +793,37 @@  void __rds_conn_path_error(struct rds_conn_path *cp, const char *, ...);
 #define rds_conn_path_error(cp, fmt...) \
 	__rds_conn_path_error(cp, KERN_WARNING "RDS: " fmt)
 
+extern struct workqueue_struct *rds_wq;
+static inline void rds_cond_queue_send_work(struct rds_conn_path *cp, unsigned long delay)
+{
+	if (!test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
+		queue_delayed_work(rds_wq, &cp->cp_send_w, delay);
+}
+
+static inline void rds_clear_queued_send_work_bit(struct rds_conn_path *cp)
+{
+	/* clear_bit() does not imply a memory barrier */
+	smp_mb__before_atomic();
+	clear_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags);
+	/* clear_bit() does not imply a memory barrier */
+	smp_mb__after_atomic();
+}
+
+static inline void rds_cond_queue_recv_work(struct rds_conn_path *cp, unsigned long delay)
+{
+	if (!test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags))
+		queue_delayed_work(rds_wq, &cp->cp_recv_w, delay);
+}
+
+static inline void rds_clear_queued_recv_work_bit(struct rds_conn_path *cp)
+{
+	/* clear_bit() does not imply a memory barrier */
+	smp_mb__before_atomic();
+	clear_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags);
+	/* clear_bit() does not imply a memory barrier */
+	smp_mb__after_atomic();
+}
+
 static inline int
 rds_conn_path_transition(struct rds_conn_path *cp, int old, int new)
 {
diff --git a/net/rds/send.c b/net/rds/send.c
index 09a280110654..6329cc8ec246 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -458,7 +458,7 @@  int rds_send_xmit(struct rds_conn_path *cp)
 			if (rds_destroy_pending(cp->cp_conn))
 				ret = -ENETUNREACH;
 			else
-				queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+				rds_cond_queue_send_work(cp, 1);
 			rcu_read_unlock();
 		} else if (raced) {
 			rds_stats_inc(s_send_lock_queue_raced);
@@ -1380,7 +1380,7 @@  int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 		if (rds_destroy_pending(cpath->cp_conn))
 			ret = -ENETUNREACH;
 		else
-			queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
+			rds_cond_queue_send_work(cpath, 1);
 		rcu_read_unlock();
 	}
 	if (ret)
@@ -1473,7 +1473,7 @@  rds_send_probe(struct rds_conn_path *cp, __be16 sport,
 	/* schedule the send work on rds_wq */
 	rcu_read_lock();
 	if (!rds_destroy_pending(cp->cp_conn))
-		queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+		rds_cond_queue_send_work(cp, 0);
 	rcu_read_unlock();
 
 	rds_message_put(rm);
diff --git a/net/rds/tcp.c b/net/rds/tcp.c
index 0581c53e6517..b3f2c6e27b59 100644
--- a/net/rds/tcp.c
+++ b/net/rds/tcp.c
@@ -168,8 +168,16 @@  void rds_tcp_reset_callbacks(struct socket *sock,
 	atomic_set(&cp->cp_state, RDS_CONN_RESETTING);
 	wait_event(cp->cp_waitq, !test_bit(RDS_IN_XMIT, &cp->cp_flags));
 	/* reset receive side state for rds_tcp_data_recv() for osock  */
-	cancel_delayed_work_sync(&cp->cp_send_w);
-	cancel_delayed_work_sync(&cp->cp_recv_w);
+
+	/* make sure lingering queued work won't try to ref the
+	 * conn. If there is work queued, we cancel it (and set the bit
+	 * to avoid any re-queueing)
+	 */
+
+	if (test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
+		cancel_delayed_work_sync(&cp->cp_send_w);
+	if (test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags))
+		cancel_delayed_work_sync(&cp->cp_recv_w);
 	lock_sock(osock->sk);
 	if (tc->t_tinc) {
 		rds_inc_put(&tc->t_tinc->ti_inc);
diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c
index 7997a19d1da3..ab9fc150f974 100644
--- a/net/rds/tcp_recv.c
+++ b/net/rds/tcp_recv.c
@@ -327,7 +327,7 @@  void rds_tcp_data_ready(struct sock *sk)
 	if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) {
 		rcu_read_lock();
 		if (!rds_destroy_pending(cp->cp_conn))
-			queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+			rds_cond_queue_recv_work(cp, 0);
 		rcu_read_unlock();
 	}
 out:
diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c
index 7d284ac7e81a..cecd3dbde58d 100644
--- a/net/rds/tcp_send.c
+++ b/net/rds/tcp_send.c
@@ -201,7 +201,7 @@  void rds_tcp_write_space(struct sock *sk)
 	rcu_read_lock();
 	if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf &&
 	    !rds_destroy_pending(cp->cp_conn))
-		queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+		rds_cond_queue_send_work(cp, 0);
 	rcu_read_unlock();
 
 out:
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 1f424cbfcbb4..eedae5653051 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -89,8 +89,10 @@  void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
 	set_bit(0, &cp->cp_conn->c_map_queued);
 	rcu_read_lock();
 	if (!rds_destroy_pending(cp->cp_conn)) {
-		queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
-		queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+		rds_clear_queued_send_work_bit(cp);
+		rds_cond_queue_send_work(cp, 0);
+		rds_clear_queued_recv_work_bit(cp);
+		rds_cond_queue_recv_work(cp, 0);
 	}
 	rcu_read_unlock();
 	cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
@@ -193,9 +195,11 @@  void rds_send_worker(struct work_struct *work)
 	struct rds_conn_path *cp = container_of(work,
 						struct rds_conn_path,
 						cp_send_w.work);
+	unsigned long delay;
 	int ret;
 
 	if (rds_conn_path_state(cp) == RDS_CONN_UP) {
+		rds_clear_queued_send_work_bit(cp);
 		clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
 		ret = rds_send_xmit(cp);
 		cond_resched();
@@ -203,15 +207,17 @@  void rds_send_worker(struct work_struct *work)
 		switch (ret) {
 		case -EAGAIN:
 			rds_stats_inc(s_send_immediate_retry);
-			queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+			delay = 0;
 			break;
 		case -ENOMEM:
 			rds_stats_inc(s_send_delayed_retry);
-			queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
+			delay = 2;
 			break;
 		default:
-			break;
+			return;
 		}
+
+		rds_cond_queue_send_work(cp, delay);
 	}
 }
 
@@ -220,23 +226,26 @@  void rds_recv_worker(struct work_struct *work)
 	struct rds_conn_path *cp = container_of(work,
 						struct rds_conn_path,
 						cp_recv_w.work);
+	unsigned long delay;
 	int ret;
 
 	if (rds_conn_path_state(cp) == RDS_CONN_UP) {
+		rds_clear_queued_recv_work_bit(cp);
 		ret = cp->cp_conn->c_trans->recv_path(cp);
 		rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
 		switch (ret) {
 		case -EAGAIN:
 			rds_stats_inc(s_recv_immediate_retry);
-			queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+			delay = 0;
 			break;
 		case -ENOMEM:
 			rds_stats_inc(s_recv_delayed_retry);
-			queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
+			delay = 2;
 			break;
 		default:
-			break;
+			return;
 		}
+		rds_cond_queue_recv_work(cp, delay);
 	}
 }