diff mbox series

[RFC,v1,net-next,1/4] net: Introduce Qdisc backpressure infrastructure

Message ID f4090d129b685df72070f708294550fbc513f888.1651800598.git.peilin.ye@bytedance.com (mailing list archive)
State RFC
Delegated to: Netdev Maintainers
Headers show
Series net: Qdisc backpressure infrastructure | expand

Checks

Context Check Description
netdev/tree_selection success Clearly marked for net-next, async
netdev/fixes_present success Fixes tag not required for -next series
netdev/subject_prefix success Link
netdev/cover_letter success Series has a cover letter
netdev/patch_count success Link
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 3148 this patch: 3148
netdev/cc_maintainers warning 2 maintainers not CCed: petrm@nvidia.com edumazet@google.com
netdev/build_clang success Errors and warnings before: 623 this patch: 623
netdev/module_param success Was 0 now: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 3274 this patch: 3274
netdev/checkpatch warning WARNING: line length of 81 exceeds 80 columns WARNING: line length of 82 exceeds 80 columns WARNING: line length of 83 exceeds 80 columns WARNING: line length of 86 exceeds 80 columns WARNING: line length of 87 exceeds 80 columns WARNING: line length of 93 exceeds 80 columns WARNING: line length of 99 exceeds 80 columns WARNING: memory barrier without comment
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0

Commit Message

Peilin Ye May 6, 2022, 7:44 p.m. UTC
From: Peilin Ye <peilin.ye@bytedance.com>

Currently sockets (especially UDP ones) can drop a lot of traffic at TC
egress when rate limited by shaper Qdiscs like HTB.  Improve it by
implementing the following state machine for sockets (currently only
support UDP and TCP ones):

  ┌────────────────┐  [a]  ┌────────────────┐  [b]  ┌────────────────┐
  │ SK_UNTHROTTLED │─ ─ ─ >│  SK_OVERLIMIT  │─ ─ ─ >│  SK_THROTTLED  │
  └────────────────┘       └────────────────┘       └────────────────┘
           └─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ < ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┘
                                   [c]

Take TBF as an example,

  [a] When TBF's inner Qdisc (e.g. bfifo) becomes full, TBF fails to
      enqueue an skb belonging to UNTHROTTLED socket A.  socket A is
      marked as OVERLIMIT, and added to TBF's "backpressure_list";

  [b] When TBF runs out of tokens, it marks all OVERLIMIT sockets
      (including A) on its backpressure_list as THROTTLED, and schedules
      a Qdisc watchdog timer to wait for more tokens;

  [c] After the timer expires, all THROTTLED sockets (including A) are
      removed from TBF's backpressure_list, and marked as UNTHROTTLED.

UDP and TCP sleep on THROTTLED sockets in sock_wait_for_wmem() and
sk_stream_wait_memory() respectively.  epoll() and friends should not
report EPOLL{OUT,WRNORM} for THROTTLED sockets.  When unthrottling in [c],
call ->sk_write_space() to wake up UDP and/or TCP waiters, and notify
SOCKWQ_ASYNC_NOSPACE subscribers.

For each device, backpressure_list operations are always serialized by
Qdisc root_lock.

When marking a socket as OVERLIMIT in [a], use a cmpxchg() to make sure
that multiple CPUs do not try to add this socket to different
backpressure_lists (on different devices) concurrently.

After removing a THROTTLED socket from backpressure_list in [c], use a
smp_store_release() to make sure changes have been committed to memory
before marking the socket as UNTHROTTLED.

Suggested-by: Cong Wang <cong.wang@bytedance.com>
Signed-off-by: Peilin Ye <peilin.ye@bytedance.com>
---
 include/net/sch_generic.h | 43 +++++++++++++++++++++++++++++++++++++++
 include/net/sock.h        | 18 +++++++++++++++-
 net/core/dev.c            |  1 +
 net/core/sock.c           |  6 ++++--
 net/ipv4/tcp_ipv4.c       | 11 +++++++---
 net/sched/sch_generic.c   |  4 ++++
 6 files changed, 77 insertions(+), 6 deletions(-)

Comments

Stephen Hemminger May 6, 2022, 8:31 p.m. UTC | #1
On Fri,  6 May 2022 12:44:22 -0700
Peilin Ye <yepeilin.cs@gmail.com> wrote:

> +static inline void qdisc_backpressure_overlimit(struct Qdisc *sch, struct sk_buff *skb)
> +{
> +	struct sock *sk = skb->sk;
> +
> +	if (!sk || !sk_fullsock(sk))
> +		return;
> +
> +	if (cmpxchg(&sk->sk_backpressure_status, SK_UNTHROTTLED, SK_OVERLIMIT) == SK_UNTHROTTLED) {
> +		sock_hold(sk);
> +		list_add_tail(&sk->sk_backpressure_node, &sch->backpressure_list);
> +	}
> +}

What if socket is closed? You are holding reference but application maybe gone.

Or if output is stalled indefinitely?
Peilin Ye May 6, 2022, 11:34 p.m. UTC | #2
Hi Stephen,

On Fri, May 06, 2022 at 01:31:11PM -0700, Stephen Hemminger wrote:
> On Fri,  6 May 2022 12:44:22 -0700, Peilin Ye <yepeilin.cs@gmail.com> wrote:
> > +static inline void qdisc_backpressure_overlimit(struct Qdisc *sch, struct sk_buff *skb)
> > +{
> > +	struct sock *sk = skb->sk;
> > +
> > +	if (!sk || !sk_fullsock(sk))
> > +		return;
> > +
> > +	if (cmpxchg(&sk->sk_backpressure_status, SK_UNTHROTTLED, SK_OVERLIMIT) == SK_UNTHROTTLED) {
> > +		sock_hold(sk);
> > +		list_add_tail(&sk->sk_backpressure_node, &sch->backpressure_list);
> > +	}
> > +}
> 
> What if socket is closed? You are holding reference but application maybe gone.

Thanks for pointing this out!  I just understood how sk_refcnt works
together with sk_wmem_alloc.

By the time we process this in-flight skb, sk_refcnt may have already
reached 0, which means sk_free() may have already decreased that "extra" 1
sk_wmem_alloc, so skb->destructor() may call __sk_free() while I "hold"
the sock here.  Seems like a UAF.

> Or if output is stalled indefinitely?

It would be better to do a cleanup in sock destroying code, but I am
trying to avoid acquiring Qdisc root_lock there.  I will try to come up
with a better solution.

Thanks,
Peilin Ye
Dave Taht May 9, 2022, 7:53 a.m. UTC | #3
I am very pleased to see this work.

However,  my "vision" such as it was, and as misguided as it might be,
was to implement a facility similar to tcp_notsent_lowat for udp
packets, tracking the progress of the udp packet through the kernel,
and supplying backpressure and providing better information about
where when and why the packet was dropped in the stack back to the
application.

I've been really impressed by the DROP_REASON work and had had no clue
prior to seeing all that instrumentation, where else packets might be
dropped in the kernel.

I'd be interested to see what happens with sch_cake.
Peilin Ye May 10, 2022, 2:23 a.m. UTC | #4
Hi Dave,

On Mon, May 09, 2022 at 12:53:28AM -0700, Dave Taht wrote:
> I am very pleased to see this work.

Thanks!

> However,  my "vision" such as it was, and as misguided as it might be,
> was to implement a facility similar to tcp_notsent_lowat for udp
> packets, tracking the progress of the udp packet through the kernel,
> and supplying backpressure and providing better information about
> where when and why the packet was dropped in the stack back to the
> application.

By "a facility similar to tcp_notsent_lowat", do you mean a smaller
sk_sndbuf, or "UDP Small Queues"?

I don't fully understand the implications of using a smaller sk_sndbuf
yet, but I think it can work together with this RFC.

sk_sndbuf is a per-socket attribute, while this RFC tries to improve it
from Qdisc's perspective.  Using a smaller sk_sndbuf alone does not
prevent the "when UDP sends faster, TBF simply drops faster" issue
(described in [I] of the cover letter) from happening.  There's always a
point, that there're too many sockets, so TBF's queue cannot contain
"sk_sndbuf times number of sockets" (roughly speaking) bytes of skbs.
After that point, TBF will suddenly start to drop a lot.

For example, I used the default 212992 sk_sndbuf
(/proc/sys/net/core/wmem_default) in the test setup ([V] in the cover
letter).  Let's make it one tenth as large, 21299.  It works well for
the 2-client setup; zero packets dropped.  However, if we test it with
15 iperf2 clients:

[  3]  0.0-30.0 sec  46.4 MBytes  13.0 Mbits/sec   1.251 ms 89991/123091 (73%)
[  3]  0.0-30.0 sec  46.6 MBytes  13.0 Mbits/sec   2.033 ms 91204/124464 (73%)
[  3]  0.0-30.0 sec  46.5 MBytes  13.0 Mbits/sec   0.504 ms 89879/123054 (73%)
<...>                                                       ^^^^^^^^^^^^ ^^^^^

73% drop rate again.  Now apply this RFC:

[  3]  0.0-30.0 sec  46.3 MBytes  12.9 Mbits/sec   1.206 ms  807/33839 (2.4%)
[  3]  0.0-30.0 sec  45.5 MBytes  12.7 Mbits/sec   1.919 ms  839/33283 (2.5%)
[  3]  0.0-30.0 sec  45.8 MBytes  12.8 Mbits/sec   2.521 ms  837/33508 (2.5%)
<...>                                                        ^^^^^^^^^ ^^^^^^

Down to 3% again.

Next, same 21299 sk_sndbuf, 20 iperf2 clients, without RFC:

[  3]  0.0-30.0 sec  34.5 MBytes  9.66 Mbits/sec   1.054 ms 258703/283342 (91%)
[  3]  0.0-30.0 sec  34.5 MBytes  9.66 Mbits/sec   1.033 ms 257324/281964 (91%)
[  3]  0.0-30.0 sec  34.5 MBytes  9.66 Mbits/sec   1.116 ms 257858/282500 (91%)
<...>                                                       ^^^^^^^^^^^^^ ^^^^^

91% drop rate.  Finally, apply RFC:

[  3]  0.0-30.0 sec  34.4 MBytes  9.61 Mbits/sec   0.974 ms 7982/32503 (25%)
[  3]  0.0-30.0 sec  34.1 MBytes  9.54 Mbits/sec   1.381 ms 7394/31732 (23%)
[  3]  0.0-30.0 sec  34.3 MBytes  9.58 Mbits/sec   2.431 ms 8149/32583 (25%)
<...>                                                       ^^^^^^^^^^ ^^^^^

The thundering herd probelm ([III] in the cover letter) surfaces, but
still an improvement.

In conclusion, assuming we are going to use smaller sk_sndbuf or "UDP
Small Queues", I think it doesn't replace this RFC, and vice versa.

> I've been really impressed by the DROP_REASON work and had had no clue
> prior to seeing all that instrumentation, where else packets might be
> dropped in the kernel.
> 
> I'd be interested to see what happens with sch_cake.

Sure, I will cover sch_cake in v2.

Thanks,
Peilin Ye
diff mbox series

Patch

diff --git a/include/net/sch_generic.h b/include/net/sch_generic.h
index 9bab396c1f3b..5ddbe0b65cb6 100644
--- a/include/net/sch_generic.h
+++ b/include/net/sch_generic.h
@@ -19,6 +19,7 @@ 
 #include <net/gen_stats.h>
 #include <net/rtnetlink.h>
 #include <net/flow_offload.h>
+#include <net/sock.h>
 
 struct Qdisc_ops;
 struct qdisc_walker;
@@ -108,6 +109,7 @@  struct Qdisc {
 	struct gnet_stats_queue	__percpu *cpu_qstats;
 	int			pad;
 	refcount_t		refcnt;
+	struct list_head	backpressure_list;
 
 	/*
 	 * For performance sake on SMP, we put highly modified fields at the end
@@ -1221,6 +1223,47 @@  static inline int qdisc_drop_all(struct sk_buff *skb, struct Qdisc *sch,
 	return NET_XMIT_DROP;
 }
 
+static inline void qdisc_backpressure_overlimit(struct Qdisc *sch, struct sk_buff *skb)
+{
+	struct sock *sk = skb->sk;
+
+	if (!sk || !sk_fullsock(sk))
+		return;
+
+	if (cmpxchg(&sk->sk_backpressure_status, SK_UNTHROTTLED, SK_OVERLIMIT) == SK_UNTHROTTLED) {
+		sock_hold(sk);
+		list_add_tail(&sk->sk_backpressure_node, &sch->backpressure_list);
+	}
+}
+
+static inline void qdisc_backpressure_throttle(struct Qdisc *sch)
+{
+	struct list_head *pos;
+	struct sock *sk;
+
+	list_for_each(pos, &sch->backpressure_list) {
+		sk = list_entry(pos, struct sock, sk_backpressure_node);
+
+		WRITE_ONCE(sk->sk_backpressure_status, SK_THROTTLED);
+	}
+}
+
+static inline void qdisc_backpressure_unthrottle(struct Qdisc *sch)
+{
+	struct list_head *pos, *next;
+	struct sock *sk;
+
+	list_for_each_safe(pos, next, &sch->backpressure_list) {
+		sk = list_entry(pos, struct sock, sk_backpressure_node);
+
+		list_del_init(pos);
+		smp_store_release(&sk->sk_backpressure_status, SK_UNTHROTTLED);
+		sk->sk_write_space(sk);
+
+		sock_put(sk);
+	}
+}
+
 /* Length to Time (L2T) lookup in a qdisc_rate_table, to determine how
    long it will take to send a packet given its size.
  */
diff --git a/include/net/sock.h b/include/net/sock.h
index 73063c88a249..6ed2de43dc98 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -315,6 +315,8 @@  struct sk_filter;
   *	@sk_rcvtimeo: %SO_RCVTIMEO setting
   *	@sk_sndtimeo: %SO_SNDTIMEO setting
   *	@sk_txhash: computed flow hash for use on transmit
+  *	@sk_backpressure_status: Qdisc backpressure status
+  *	@sk_backpressure_node: linkage for Qdisc backpressure list
   *	@sk_txrehash: enable TX hash rethink
   *	@sk_filter: socket filtering instructions
   *	@sk_timer: sock cleanup timer
@@ -468,6 +470,8 @@  struct sock {
 	unsigned int		sk_gso_max_size;
 	gfp_t			sk_allocation;
 	__u32			sk_txhash;
+	u32			sk_backpressure_status; /* see enum sk_backpressure */
+	struct list_head	sk_backpressure_node;
 
 	/*
 	 * Because of non atomicity rules, all
@@ -548,6 +552,12 @@  enum sk_pacing {
 	SK_PACING_FQ		= 2,
 };
 
+enum sk_backpressure {
+	SK_UNTHROTTLED	= 0,
+	SK_OVERLIMIT	= 1,
+	SK_THROTTLED	= 2,
+};
+
 /* Pointer stored in sk_user_data might not be suitable for copying
  * when cloning the socket. For instance, it can point to a reference
  * counted object. sk_user_data bottom bit is set if pointer must not
@@ -2522,12 +2532,18 @@  static inline struct page_frag *sk_page_frag(struct sock *sk)
 
 bool sk_page_frag_refill(struct sock *sk, struct page_frag *pfrag);
 
+static inline bool sk_is_throttled(const struct sock *sk)
+{
+	return READ_ONCE(sk->sk_backpressure_status) == SK_THROTTLED;
+}
+
 /*
  *	Default write policy as shown to user space via poll/select/SIGIO
  */
 static inline bool sock_writeable(const struct sock *sk)
 {
-	return refcount_read(&sk->sk_wmem_alloc) < (READ_ONCE(sk->sk_sndbuf) >> 1);
+	return !sk_is_throttled(sk) &&
+	       refcount_read(&sk->sk_wmem_alloc) < (READ_ONCE(sk->sk_sndbuf) >> 1);
 }
 
 static inline gfp_t gfp_any(void)
diff --git a/net/core/dev.c b/net/core/dev.c
index c2d73595a7c3..7c3d136725b9 100644
--- a/net/core/dev.c
+++ b/net/core/dev.c
@@ -5013,6 +5013,7 @@  static __latent_entropy void net_tx_action(struct softirq_action *h)
 			if (!(q->flags & TCQ_F_NOLOCK)) {
 				root_lock = qdisc_lock(q);
 				spin_lock(root_lock);
+				qdisc_backpressure_unthrottle(q);
 			} else if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED,
 						     &q->state))) {
 				/* There is a synchronize_net() between
diff --git a/net/core/sock.c b/net/core/sock.c
index be20a1af20e5..7ed9d2bd991f 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -2034,6 +2034,7 @@  struct sock *sk_alloc(struct net *net, int family, gfp_t priority,
 
 		sock_net_set(sk, net);
 		refcount_set(&sk->sk_wmem_alloc, 1);
+		INIT_LIST_HEAD(&sk->sk_backpressure_node);
 
 		mem_cgroup_sk_alloc(sk);
 		cgroup_sk_alloc(&sk->sk_cgrp_data);
@@ -2589,7 +2590,8 @@  static long sock_wait_for_wmem(struct sock *sk, long timeo)
 			break;
 		set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
-		if (refcount_read(&sk->sk_wmem_alloc) < READ_ONCE(sk->sk_sndbuf))
+		if (!sk_is_throttled(sk) &&
+		    refcount_read(&sk->sk_wmem_alloc) < READ_ONCE(sk->sk_sndbuf))
 			break;
 		if (sk->sk_shutdown & SEND_SHUTDOWN)
 			break;
@@ -2624,7 +2626,7 @@  struct sk_buff *sock_alloc_send_pskb(struct sock *sk, unsigned long header_len,
 		if (sk->sk_shutdown & SEND_SHUTDOWN)
 			goto failure;
 
-		if (sk_wmem_alloc_get(sk) < READ_ONCE(sk->sk_sndbuf))
+		if (!sk_is_throttled(sk) && sk_wmem_alloc_get(sk) < READ_ONCE(sk->sk_sndbuf))
 			break;
 
 		sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
diff --git a/net/ipv4/tcp_ipv4.c b/net/ipv4/tcp_ipv4.c
index 918816ec5dd4..6e905995bfa2 100644
--- a/net/ipv4/tcp_ipv4.c
+++ b/net/ipv4/tcp_ipv4.c
@@ -3006,9 +3006,14 @@  void tcp4_proc_exit(void)
  */
 bool tcp_stream_memory_free(const struct sock *sk, int wake)
 {
-	const struct tcp_sock *tp = tcp_sk(sk);
-	u32 notsent_bytes = READ_ONCE(tp->write_seq) -
-			    READ_ONCE(tp->snd_nxt);
+	const struct tcp_sock *tp;
+	u32 notsent_bytes;
+
+	if (sk_is_throttled(sk))
+		return false;
+
+	tp = tcp_sk(sk);
+	notsent_bytes = READ_ONCE(tp->write_seq) - READ_ONCE(tp->snd_nxt);
 
 	return (notsent_bytes << wake) < tcp_notsent_lowat(tp);
 }
diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c
index dba0b3e24af5..9ab314b874a7 100644
--- a/net/sched/sch_generic.c
+++ b/net/sched/sch_generic.c
@@ -674,6 +674,7 @@  struct Qdisc noop_qdisc = {
 		.qlen = 0,
 		.lock = __SPIN_LOCK_UNLOCKED(noop_qdisc.skb_bad_txq.lock),
 	},
+	.backpressure_list =	LIST_HEAD_INIT(noop_qdisc.backpressure_list),
 };
 EXPORT_SYMBOL(noop_qdisc);
 
@@ -947,6 +948,7 @@  struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
 	qdisc_skb_head_init(&sch->q);
 	gnet_stats_basic_sync_init(&sch->bstats);
 	spin_lock_init(&sch->q.lock);
+	INIT_LIST_HEAD(&sch->backpressure_list);
 
 	if (ops->static_flags & TCQ_F_CPUSTATS) {
 		sch->cpu_bstats =
@@ -1025,6 +1027,8 @@  void qdisc_reset(struct Qdisc *qdisc)
 	if (ops->reset)
 		ops->reset(qdisc);
 
+	qdisc_backpressure_unthrottle(qdisc);
+
 	__skb_queue_purge(&qdisc->gso_skb);
 	__skb_queue_purge(&qdisc->skb_bad_txq);