diff mbox series

[net-next,10/12] net: homa: create homa_timer.c

Message ID 20241028213541.1529-11-ouster@cs.stanford.edu (mailing list archive)
State Changes Requested
Delegated to: Netdev Maintainers
Headers show
Series Begin upstreaming Homa transport protocol | expand

Checks

Context Check Description
netdev/series_format success Posting correctly formatted
netdev/tree_selection success Clearly marked for net-next, async
netdev/ynl success Generated files up to date; no warnings/errors; no diff in generated;
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 5 this patch: 5
netdev/build_tools success No tools touched, skip
netdev/cc_maintainers warning 4 maintainers not CCed: horms@kernel.org kuba@kernel.org pabeni@redhat.com edumazet@google.com
netdev/build_clang success Errors and warnings before: 4 this patch: 4
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 8 this patch: 8
netdev/checkpatch warning WARNING: added, moved or deleted file(s), does MAINTAINERS need updating?
netdev/build_clang_rust success No Rust files in patch. Skipping build
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0

Commit Message

John Ousterhout Oct. 28, 2024, 9:35 p.m. UTC
This file contains code that wakes up periodically to check for
missing data, initiate retransmissions, and declare peer nodes
"dead".

Signed-off-by: John Ousterhout <ouster@cs.stanford.edu>
---
 net/homa/homa_timer.c | 158 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 158 insertions(+)
 create mode 100644 net/homa/homa_timer.c

Comments

Eric Dumazet Oct. 30, 2024, 7:02 p.m. UTC | #1
On 10/28/24 10:35 PM, John Ousterhout wrote:
> This file contains code that wakes up periodically to check for
> missing data, initiate retransmissions, and declare peer nodes
> "dead".
>
> Signed-off-by: John Ousterhout <ouster@cs.stanford.edu>
> ---
>   net/homa/homa_timer.c | 158 ++++++++++++++++++++++++++++++++++++++++++
>   1 file changed, 158 insertions(+)
>   create mode 100644 net/homa/homa_timer.c
>
> diff --git a/net/homa/homa_timer.c b/net/homa/homa_timer.c
> new file mode 100644
> index 000000000000..bce7c02bb1cd
> --- /dev/null
> +++ b/net/homa/homa_timer.c
> @@ -0,0 +1,158 @@
> +// SPDX-License-Identifier: BSD-2-Clause
> +
> +/* This file handles timing-related functions for Homa, such as retries
> + * and timeouts.
> + */
> +
> +#include "homa_impl.h"
> +#include "homa_peer.h"
> +#include "homa_rpc.h"
> +
> +/**
> + * homa_check_rpc() -  Invoked for each RPC during each timer pass; does
> + * most of the work of checking for time-related actions such as sending
> + * resends, aborting RPCs for which there is no response, and sending
> + * requests for acks. It is separate from homa_timer because homa_timer
> + * got too long and deeply indented.
> + * @rpc:     RPC to check; must be locked by the caller.
> + */
> +void homa_check_rpc(struct homa_rpc *rpc)
> +{
> +	struct homa *homa = rpc->hsk->homa;
> +	struct resend_header resend;
> +
> +	/* See if we need to request an ack for this RPC. */
> +	if (!homa_is_client(rpc->id) && rpc->state == RPC_OUTGOING &&
> +	    rpc->msgout.next_xmit_offset >= rpc->msgout.length) {
> +		if (rpc->done_timer_ticks == 0) {
> +			rpc->done_timer_ticks = homa->timer_ticks;
> +		} else {
> +			/* >= comparison that handles tick wrap-around. */
> +			if ((rpc->done_timer_ticks + homa->request_ack_ticks
> +					- 1 - homa->timer_ticks) & 1 << 31) {
> +				struct need_ack_header h;
> +
> +				homa_xmit_control(NEED_ACK, &h, sizeof(h), rpc);
> +			}
> +		}
> +	}
> +
> +	if (rpc->state == RPC_INCOMING) {
> +		if (rpc->msgin.num_bpages == 0) {
> +			/* Waiting for buffer space, so no problem. */
> +			rpc->silent_ticks = 0;
> +			return;
> +		}
> +	} else if (!homa_is_client(rpc->id)) {
> +		/* We're the server and we've received the input message;
> +		 * no need to worry about retries.
> +		 */
> +		rpc->silent_ticks = 0;
> +		return;
> +	}
> +
> +	if (rpc->state == RPC_OUTGOING) {
> +		if (rpc->msgout.next_xmit_offset < rpc->msgout.length) {
> +			/* There are bytes that we haven't transmitted,
> +			 * so no need to be concerned; the ball is in our court.
> +			 */
> +			rpc->silent_ticks = 0;
> +			return;
> +		}
> +	}
> +
> +	if (rpc->silent_ticks < homa->resend_ticks)
> +		return;
> +	if (rpc->silent_ticks >= homa->timeout_ticks) {
> +		homa_rpc_abort(rpc, -ETIMEDOUT);
> +		return;
> +	}
> +	if (((rpc->silent_ticks - homa->resend_ticks) % homa->resend_interval)
> +			!= 0)
> +		return;
> +
> +	/* Issue a resend for the bytes just after the last ones received
> +	 * (gaps in the middle were already handled by homa_gap_retry above).
> +	 */
> +	if (rpc->msgin.length < 0) {
> +		/* Haven't received any data for this message; request
> +		 * retransmission of just the first packet (the sender
> +		 * will send at least one full packet, regardless of
> +		 * the length below).
> +		 */
> +		resend.offset = htonl(0);
> +		resend.length = htonl(100);
> +	} else {
> +		homa_gap_retry(rpc);
> +		resend.offset = htonl(rpc->msgin.recv_end);
> +		resend.length = htonl(rpc->msgin.length - rpc->msgin.recv_end);
> +		if (resend.length == 0)
> +			return;
> +	}
> +	homa_xmit_control(RESEND, &resend, sizeof(resend), rpc);
> +}
> +
> +/**
> + * homa_timer() - This function is invoked at regular intervals ("ticks")
> + * to implement retries and aborts for Homa.
> + * @homa:    Overall data about the Homa protocol implementation.
> + */
> +void homa_timer(struct homa *homa)
> +{
> +	struct homa_socktab_scan scan;
> +	struct homa_sock *hsk;
> +	struct homa_rpc *rpc;
> +	int total_rpcs = 0;
> +	int rpc_count = 0;
> +
> +	homa->timer_ticks++;
> +
> +	/* Scan all existing RPCs in all sockets.  The rcu_read_lock
> +	 * below prevents sockets from being deleted during the scan.
> +	 */
> +	rcu_read_lock();
> +	for (hsk = homa_socktab_start_scan(homa->port_map, &scan);
> +			hsk; hsk = homa_socktab_next(&scan)) {
> +		while (hsk->dead_skbs >= homa->dead_buffs_limit)
> +			/* If we get here, it means that homa_wait_for_message
> +			 * isn't keeping up with RPC reaping, so we'll help
> +			 * out.  See reap.txt for more info.
> +			 */
> +			if (homa_rpc_reap(hsk, hsk->homa->reap_limit) == 0)
> +				break;
> +
> +		if (list_empty(&hsk->active_rpcs) || hsk->shutdown)
> +			continue;
> +
> +		if (!homa_protect_rpcs(hsk))
> +			continue;
> +		list_for_each_entry_rcu(rpc, &hsk->active_rpcs, active_links) {
> +			total_rpcs++;
> +			homa_rpc_lock(rpc, "homa_timer");
> +			if (rpc->state == RPC_IN_SERVICE) {
> +				rpc->silent_ticks = 0;
> +				homa_rpc_unlock(rpc);
> +				continue;
> +			}
> +			rpc->silent_ticks++;
> +			homa_check_rpc(rpc);
> +			homa_rpc_unlock(rpc);
> +			rpc_count++;
> +			if (rpc_count >= 10) {
> +				/* Give other kernel threads a chance to run
> +				 * on this core. Must release the RCU read lock
> +				 * while doing this.
> +				 */
> +				rcu_read_unlock();
> +				schedule();

This is unsafe. homa_socktab_next() will access possibly freed data.

> +				rcu_read_lock();
> +				rpc_count = 0;
> +			}
> +		}
> +		homa_unprotect_rpcs(hsk);
> +	}
> +	rcu_read_unlock();
> +
> +//	if (total_rpcs > 0)
> +//		tt_record1("homa_timer finished scanning %d RPCs", total_rpcs);
> +}
John Ousterhout Oct. 31, 2024, 6:55 p.m. UTC | #2
On Wed, Oct 30, 2024 at 12:02 PM Eric Dumazet <eric.dumazet@gmail.com> wrote:
> ...
> > +                     if (rpc_count >= 10) {
> > +                             /* Give other kernel threads a chance to run
> > +                              * on this core. Must release the RCU read lock
> > +                              * while doing this.
> > +                              */
> > +                             rcu_read_unlock();
> > +                             schedule();
>
> This is unsafe. homa_socktab_next() will access possibly freed data.

Yikes; you're right. When I added the self-preemption code I forgot
that RCU not only keeps sockets from being deleted, but it also keeps
the hash table link structure from changing out from underneath scans.

I have implemented a fix (making socket scans work even in the face of
socket deletion), which will be in the next version of the patch
series. Thanks for catching this.

-John-
diff mbox series

Patch

diff --git a/net/homa/homa_timer.c b/net/homa/homa_timer.c
new file mode 100644
index 000000000000..bce7c02bb1cd
--- /dev/null
+++ b/net/homa/homa_timer.c
@@ -0,0 +1,158 @@ 
+// SPDX-License-Identifier: BSD-2-Clause
+
+/* This file handles timing-related functions for Homa, such as retries
+ * and timeouts.
+ */
+
+#include "homa_impl.h"
+#include "homa_peer.h"
+#include "homa_rpc.h"
+
+/**
+ * homa_check_rpc() -  Invoked for each RPC during each timer pass; does
+ * most of the work of checking for time-related actions such as sending
+ * resends, aborting RPCs for which there is no response, and sending
+ * requests for acks. It is separate from homa_timer because homa_timer
+ * got too long and deeply indented.
+ * @rpc:     RPC to check; must be locked by the caller.
+ */
+void homa_check_rpc(struct homa_rpc *rpc)
+{
+	struct homa *homa = rpc->hsk->homa;
+	struct resend_header resend;
+
+	/* See if we need to request an ack for this RPC. */
+	if (!homa_is_client(rpc->id) && rpc->state == RPC_OUTGOING &&
+	    rpc->msgout.next_xmit_offset >= rpc->msgout.length) {
+		if (rpc->done_timer_ticks == 0) {
+			rpc->done_timer_ticks = homa->timer_ticks;
+		} else {
+			/* >= comparison that handles tick wrap-around. */
+			if ((rpc->done_timer_ticks + homa->request_ack_ticks
+					- 1 - homa->timer_ticks) & 1 << 31) {
+				struct need_ack_header h;
+
+				homa_xmit_control(NEED_ACK, &h, sizeof(h), rpc);
+			}
+		}
+	}
+
+	if (rpc->state == RPC_INCOMING) {
+		if (rpc->msgin.num_bpages == 0) {
+			/* Waiting for buffer space, so no problem. */
+			rpc->silent_ticks = 0;
+			return;
+		}
+	} else if (!homa_is_client(rpc->id)) {
+		/* We're the server and we've received the input message;
+		 * no need to worry about retries.
+		 */
+		rpc->silent_ticks = 0;
+		return;
+	}
+
+	if (rpc->state == RPC_OUTGOING) {
+		if (rpc->msgout.next_xmit_offset < rpc->msgout.length) {
+			/* There are bytes that we haven't transmitted,
+			 * so no need to be concerned; the ball is in our court.
+			 */
+			rpc->silent_ticks = 0;
+			return;
+		}
+	}
+
+	if (rpc->silent_ticks < homa->resend_ticks)
+		return;
+	if (rpc->silent_ticks >= homa->timeout_ticks) {
+		homa_rpc_abort(rpc, -ETIMEDOUT);
+		return;
+	}
+	if (((rpc->silent_ticks - homa->resend_ticks) % homa->resend_interval)
+			!= 0)
+		return;
+
+	/* Issue a resend for the bytes just after the last ones received
+	 * (gaps in the middle were already handled by homa_gap_retry above).
+	 */
+	if (rpc->msgin.length < 0) {
+		/* Haven't received any data for this message; request
+		 * retransmission of just the first packet (the sender
+		 * will send at least one full packet, regardless of
+		 * the length below).
+		 */
+		resend.offset = htonl(0);
+		resend.length = htonl(100);
+	} else {
+		homa_gap_retry(rpc);
+		resend.offset = htonl(rpc->msgin.recv_end);
+		resend.length = htonl(rpc->msgin.length - rpc->msgin.recv_end);
+		if (resend.length == 0)
+			return;
+	}
+	homa_xmit_control(RESEND, &resend, sizeof(resend), rpc);
+}
+
+/**
+ * homa_timer() - This function is invoked at regular intervals ("ticks")
+ * to implement retries and aborts for Homa.
+ * @homa:    Overall data about the Homa protocol implementation.
+ */
+void homa_timer(struct homa *homa)
+{
+	struct homa_socktab_scan scan;
+	struct homa_sock *hsk;
+	struct homa_rpc *rpc;
+	int total_rpcs = 0;
+	int rpc_count = 0;
+
+	homa->timer_ticks++;
+
+	/* Scan all existing RPCs in all sockets.  The rcu_read_lock
+	 * below prevents sockets from being deleted during the scan.
+	 */
+	rcu_read_lock();
+	for (hsk = homa_socktab_start_scan(homa->port_map, &scan);
+			hsk; hsk = homa_socktab_next(&scan)) {
+		while (hsk->dead_skbs >= homa->dead_buffs_limit)
+			/* If we get here, it means that homa_wait_for_message
+			 * isn't keeping up with RPC reaping, so we'll help
+			 * out.  See reap.txt for more info.
+			 */
+			if (homa_rpc_reap(hsk, hsk->homa->reap_limit) == 0)
+				break;
+
+		if (list_empty(&hsk->active_rpcs) || hsk->shutdown)
+			continue;
+
+		if (!homa_protect_rpcs(hsk))
+			continue;
+		list_for_each_entry_rcu(rpc, &hsk->active_rpcs, active_links) {
+			total_rpcs++;
+			homa_rpc_lock(rpc, "homa_timer");
+			if (rpc->state == RPC_IN_SERVICE) {
+				rpc->silent_ticks = 0;
+				homa_rpc_unlock(rpc);
+				continue;
+			}
+			rpc->silent_ticks++;
+			homa_check_rpc(rpc);
+			homa_rpc_unlock(rpc);
+			rpc_count++;
+			if (rpc_count >= 10) {
+				/* Give other kernel threads a chance to run
+				 * on this core. Must release the RCU read lock
+				 * while doing this.
+				 */
+				rcu_read_unlock();
+				schedule();
+				rcu_read_lock();
+				rpc_count = 0;
+			}
+		}
+		homa_unprotect_rpcs(hsk);
+	}
+	rcu_read_unlock();
+
+//	if (total_rpcs > 0)
+//		tt_record1("homa_timer finished scanning %d RPCs", total_rpcs);
+}