diff mbox series

[bpf-next,v2,4/4] tcp_bpf: improve ingress redirection performance with message corking

Message ID 20250306220205.53753-5-xiyou.wangcong@gmail.com (mailing list archive)
State New
Delegated to: BPF
Headers show
Series tcp_bpf: improve ingress redirection performance with message corking | expand

Checks

Context Check Description
netdev/series_format success Posting correctly formatted
netdev/tree_selection success Clearly marked for bpf-next
netdev/ynl success Generated files up to date; no warnings/errors; no diff in generated;
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 0 this patch: 0
netdev/build_tools success Errors and warnings before: 26 (+0) this patch: 26 (+0)
netdev/cc_maintainers warning 7 maintainers not CCed: pabeni@redhat.com ncardwell@google.com kuniyu@amazon.com dsahern@kernel.org kuba@kernel.org horms@kernel.org edumazet@google.com
netdev/build_clang success Errors and warnings before: 0 this patch: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 36 this patch: 36
netdev/checkpatch warning WARNING: line length of 81 exceeds 80 columns
netdev/build_clang_rust success No Rust files in patch. Skipping build
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0
bpf/vmtest-bpf-next-PR success PR summary
bpf/vmtest-bpf-next-VM_Test-0 success Logs for Lint
bpf/vmtest-bpf-next-VM_Test-1 success Logs for ShellCheck
bpf/vmtest-bpf-next-VM_Test-2 success Logs for Unittests
bpf/vmtest-bpf-next-VM_Test-3 success Logs for Validate matrix.py
bpf/vmtest-bpf-next-VM_Test-4 success Logs for aarch64-gcc / GCC BPF
bpf/vmtest-bpf-next-VM_Test-5 success Logs for aarch64-gcc / build / build for aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-7 success Logs for aarch64-gcc / test (test_maps, false, 360) / test_maps on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-6 success Logs for aarch64-gcc / build-release
bpf/vmtest-bpf-next-VM_Test-10 success Logs for aarch64-gcc / test (test_verifier, false, 360) / test_verifier on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-11 success Logs for aarch64-gcc / veristat-kernel
bpf/vmtest-bpf-next-VM_Test-12 success Logs for aarch64-gcc / veristat-meta
bpf/vmtest-bpf-next-VM_Test-13 success Logs for s390x-gcc / GCC BPF
bpf/vmtest-bpf-next-VM_Test-14 success Logs for s390x-gcc / build / build for s390x with gcc
bpf/vmtest-bpf-next-VM_Test-15 success Logs for s390x-gcc / build-release
bpf/vmtest-bpf-next-VM_Test-18 success Logs for s390x-gcc / test (test_verifier, false, 360) / test_verifier on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-19 success Logs for s390x-gcc / veristat-kernel
bpf/vmtest-bpf-next-VM_Test-20 success Logs for s390x-gcc / veristat-meta
bpf/vmtest-bpf-next-VM_Test-21 success Logs for set-matrix
bpf/vmtest-bpf-next-VM_Test-22 success Logs for x86_64-gcc / GCC BPF / GCC BPF
bpf/vmtest-bpf-next-VM_Test-23 success Logs for x86_64-gcc / build / build for x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-24 success Logs for x86_64-gcc / build-release
bpf/vmtest-bpf-next-VM_Test-25 success Logs for x86_64-gcc / test (test_maps, false, 360) / test_maps on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-28 success Logs for x86_64-gcc / test (test_progs_no_alu32_parallel, true, 30) / test_progs_no_alu32_parallel on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-29 success Logs for x86_64-gcc / test (test_progs_parallel, true, 30) / test_progs_parallel on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-30 success Logs for x86_64-gcc / test (test_verifier, false, 360) / test_verifier on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-31 success Logs for x86_64-gcc / veristat-kernel / x86_64-gcc veristat_kernel
bpf/vmtest-bpf-next-VM_Test-32 success Logs for x86_64-gcc / veristat-meta / x86_64-gcc veristat_meta
bpf/vmtest-bpf-next-VM_Test-33 success Logs for x86_64-llvm-17 / GCC BPF / GCC BPF
bpf/vmtest-bpf-next-VM_Test-34 success Logs for x86_64-llvm-17 / build / build for x86_64 with llvm-17
bpf/vmtest-bpf-next-VM_Test-35 success Logs for x86_64-llvm-17 / build-release / build for x86_64 with llvm-17-O2
bpf/vmtest-bpf-next-VM_Test-40 success Logs for x86_64-llvm-17 / veristat-kernel
bpf/vmtest-bpf-next-VM_Test-41 success Logs for x86_64-llvm-17 / veristat-meta
bpf/vmtest-bpf-next-VM_Test-42 success Logs for x86_64-llvm-18 / GCC BPF / GCC BPF
bpf/vmtest-bpf-next-VM_Test-43 success Logs for x86_64-llvm-18 / build / build for x86_64 with llvm-18
bpf/vmtest-bpf-next-VM_Test-44 success Logs for x86_64-llvm-18 / build-release / build for x86_64 with llvm-18-O2
bpf/vmtest-bpf-next-VM_Test-50 success Logs for x86_64-llvm-18 / veristat-kernel
bpf/vmtest-bpf-next-VM_Test-51 success Logs for x86_64-llvm-18 / veristat-meta
bpf/vmtest-bpf-next-VM_Test-26 success Logs for x86_64-gcc / test (test_progs, false, 360) / test_progs on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-27 success Logs for x86_64-gcc / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-38 success Logs for x86_64-llvm-17 / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on x86_64 with llvm-17
bpf/vmtest-bpf-next-VM_Test-39 success Logs for x86_64-llvm-17 / test (test_verifier, false, 360) / test_verifier on x86_64 with llvm-17
bpf/vmtest-bpf-next-VM_Test-8 success Logs for aarch64-gcc / test (test_progs, false, 360) / test_progs on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-9 success Logs for aarch64-gcc / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-36 success Logs for x86_64-llvm-17 / test (test_maps, false, 360) / test_maps on x86_64 with llvm-17
bpf/vmtest-bpf-next-VM_Test-37 success Logs for x86_64-llvm-17 / test (test_progs, false, 360) / test_progs on x86_64 with llvm-17
bpf/vmtest-bpf-next-VM_Test-45 success Logs for x86_64-llvm-18 / test (test_maps, false, 360) / test_maps on x86_64 with llvm-18
bpf/vmtest-bpf-next-VM_Test-46 success Logs for x86_64-llvm-18 / test (test_progs, false, 360) / test_progs on x86_64 with llvm-18
bpf/vmtest-bpf-next-VM_Test-47 success Logs for x86_64-llvm-18 / test (test_progs_cpuv4, false, 360) / test_progs_cpuv4 on x86_64 with llvm-18
bpf/vmtest-bpf-next-VM_Test-48 success Logs for x86_64-llvm-18 / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on x86_64 with llvm-18
bpf/vmtest-bpf-next-VM_Test-49 success Logs for x86_64-llvm-18 / test (test_verifier, false, 360) / test_verifier on x86_64 with llvm-18
bpf/vmtest-bpf-next-VM_Test-16 success Logs for s390x-gcc / test (test_progs, false, 360) / test_progs on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-17 success Logs for s390x-gcc / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on s390x with gcc

Commit Message

Cong Wang March 6, 2025, 10:02 p.m. UTC
From: Zijian Zhang <zijianzhang@bytedance.com>

The TCP_BPF ingress redirection path currently lacks the message corking
mechanism found in standard TCP. This causes the sender to wake up the
receiver for every message, even when messages are small, resulting in
reduced throughput compared to regular TCP in certain scenarios.

This change introduces a kernel worker-based intermediate layer to provide
automatic message corking for TCP_BPF. While this adds a slight latency
overhead, it significantly improves overall throughput by reducing
unnecessary wake-ups and reducing the sock lock contention.

Reviewed-by: Amery Hung <amery.hung@bytedance.com>
Co-developed-by: Cong Wang <cong.wang@bytedance.com>
Signed-off-by: Cong Wang <cong.wang@bytedance.com>
Signed-off-by: Zijian Zhang <zijianzhang@bytedance.com>
---
 include/linux/skmsg.h |  19 ++++
 net/core/skmsg.c      | 139 ++++++++++++++++++++++++++++-
 net/ipv4/tcp_bpf.c    | 197 ++++++++++++++++++++++++++++++++++++++++--
 3 files changed, 347 insertions(+), 8 deletions(-)

Comments

John Fastabend March 11, 2025, 8:54 p.m. UTC | #1
On 2025-03-06 14:02:05, Cong Wang wrote:
> From: Zijian Zhang <zijianzhang@bytedance.com>
> 
> The TCP_BPF ingress redirection path currently lacks the message corking
> mechanism found in standard TCP. This causes the sender to wake up the
> receiver for every message, even when messages are small, resulting in
> reduced throughput compared to regular TCP in certain scenarios.

Agreed this is annoying.

> 
> This change introduces a kernel worker-based intermediate layer to provide
> automatic message corking for TCP_BPF. While this adds a slight latency
> overhead, it significantly improves overall throughput by reducing
> unnecessary wake-ups and reducing the sock lock contention.

Great. Couple questions below.

> 
> Reviewed-by: Amery Hung <amery.hung@bytedance.com>
> Co-developed-by: Cong Wang <cong.wang@bytedance.com>
> Signed-off-by: Cong Wang <cong.wang@bytedance.com>
> Signed-off-by: Zijian Zhang <zijianzhang@bytedance.com>
> ---
>  include/linux/skmsg.h |  19 ++++
>  net/core/skmsg.c      | 139 ++++++++++++++++++++++++++++-
>  net/ipv4/tcp_bpf.c    | 197 ++++++++++++++++++++++++++++++++++++++++--
>  3 files changed, 347 insertions(+), 8 deletions(-)
> 
> diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h
> index 7620f170c4b1..2531428168ad 100644
> --- a/include/linux/skmsg.h
> +++ b/include/linux/skmsg.h
> @@ -15,6 +15,8 @@
>  
>  #define MAX_MSG_FRAGS			MAX_SKB_FRAGS
>  #define NR_MSG_FRAG_IDS			(MAX_MSG_FRAGS + 1)
> +/* GSO size for TCP BPF backlog processing */
> +#define TCP_BPF_GSO_SIZE		65536
>  
>  enum __sk_action {
>  	__SK_DROP = 0,
> @@ -85,8 +87,10 @@ struct sk_psock {
>  	struct sock			*sk_redir;
>  	u32				apply_bytes;
>  	u32				cork_bytes;
> +	u32				backlog_since_notify;
>  	u8				eval;
>  	u8 				redir_ingress : 1; /* undefined if sk_redir is null */
> +	u8				backlog_work_delayed : 1;
>  	struct sk_msg			*cork;
>  	struct sk_psock_progs		progs;
>  #if IS_ENABLED(CONFIG_BPF_STREAM_PARSER)
> @@ -97,6 +101,9 @@ struct sk_psock {
>  	struct sk_buff_head		ingress_skb;
>  	struct list_head		ingress_msg;
>  	spinlock_t			ingress_lock;
> +	struct list_head		backlog_msg;
> +	/* spin_lock for backlog_msg and backlog_since_notify */
> +	spinlock_t			backlog_msg_lock;
>  	unsigned long			state;
>  	struct list_head		link;
>  	spinlock_t			link_lock;
> @@ -117,11 +124,13 @@ struct sk_psock {
>  	struct mutex			work_mutex;
>  	struct sk_psock_work_state	work_state;
>  	struct delayed_work		work;
> +	struct delayed_work		backlog_work;
>  	struct sock			*sk_pair;
>  	struct rcu_work			rwork;
>  };

[...]

> +static int tcp_bpf_ingress_backlog(struct sock *sk, struct sock *sk_redir,
> +				   struct sk_msg *msg, u32 apply_bytes)
> +{
> +	bool ingress_msg_empty = false;
> +	bool apply = apply_bytes;
> +	struct sk_psock *psock;
> +	struct sk_msg *tmp;
> +	u32 tot_size = 0;
> +	int ret = 0;
> +	u8 nonagle;
> +
> +	psock = sk_psock_get(sk_redir);
> +	if (unlikely(!psock))
> +		return -EPIPE;
> +
> +	spin_lock(&psock->backlog_msg_lock);
> +	/* If possible, coalesce the curr sk_msg to the last sk_msg from the
> +	 * psock->backlog_msg.
> +	 */
> +	if (!list_empty(&psock->backlog_msg)) {
> +		struct sk_msg *last;
> +
> +		last = list_last_entry(&psock->backlog_msg, struct sk_msg, list);
> +		if (last->sk == sk) {
> +			int i = tcp_bpf_coalesce_msg(last, msg, &apply_bytes,
> +						     &tot_size);
> +
> +			if (i == msg->sg.end || (apply && !apply_bytes))
> +				goto out_unlock;
> +		}
> +	}
> +
> +	/* Otherwise, allocate a new sk_msg and transfer the data from the
> +	 * passed in msg to it.
> +	 */
> +	tmp = sk_msg_alloc(GFP_ATOMIC);
> +	if (!tmp) {
> +		ret = -ENOMEM;
> +		spin_unlock(&psock->backlog_msg_lock);
> +		goto error;
> +	}
> +
> +	tmp->sk = sk;
> +	sock_hold(tmp->sk);
> +	tmp->sg.start = msg->sg.start;
> +	tcp_bpf_xfer_msg(tmp, msg, &apply_bytes, &tot_size);
> +
> +	ingress_msg_empty = list_empty(&psock->ingress_msg);
> +	list_add_tail(&tmp->list, &psock->backlog_msg);
> +
> +out_unlock:
> +	spin_unlock(&psock->backlog_msg_lock);
> +	sk_wmem_queued_add(sk, tot_size);
> +
> +	/* At this point, the data has been handled well. If one of the
> +	 * following conditions is met, we can notify the peer socket in
> +	 * the context of this system call immediately.
> +	 * 1. If the write buffer has been used up;
> +	 * 2. Or, the message size is larger than TCP_BPF_GSO_SIZE;
> +	 * 3. Or, the ingress queue was empty;
> +	 * 4. Or, the tcp socket is set to no_delay.
> +	 * Otherwise, kick off the backlog work so that we can have some
> +	 * time to wait for any incoming messages before sending a
> +	 * notification to the peer socket.
> +	 */

I think this could also be used to get the bpf_msg_cork_bytes working
directly in receive path. This also means we can avoid using
strparser in the receive path. The strparser case has noticable
overhead for us that is significant enough we don't use it.
Not that we need to do it all in one patch set.

> +	nonagle = tcp_sk(sk)->nonagle;
> +	if (!sk_stream_memory_free(sk) ||
> +	    tot_size >= TCP_BPF_GSO_SIZE || ingress_msg_empty ||
> +	    (!(nonagle & TCP_NAGLE_CORK) && (nonagle & TCP_NAGLE_OFF))) {
> +		release_sock(sk);
> +		psock->backlog_work_delayed = false;
> +		sk_psock_backlog_msg(psock);
> +		lock_sock(sk);
> +	} else {
> +		sk_psock_run_backlog_work(psock, false);
> +	}
> +
> +error:
> +	sk_psock_put(sk_redir, psock);
> +	return ret;
> +}
> +
>  static int tcp_bpf_send_verdict(struct sock *sk, struct sk_psock *psock,
>  				struct sk_msg *msg, int *copied, int flags)
>  {
> @@ -442,18 +619,24 @@ static int tcp_bpf_send_verdict(struct sock *sk, struct sk_psock *psock,
>  			cork = true;
>  			psock->cork = NULL;
>  		}
> -		release_sock(sk);
>  
> -		origsize = msg->sg.size;
> -		ret = tcp_bpf_sendmsg_redir(sk_redir, redir_ingress,
> -					    msg, tosend, flags);

The only sticky bit here that is blocking folding this entire tcp_bpf_sendmsg_redir
logic out is tls user right?

> -		sent = origsize - msg->sg.size;
> +		if (redir_ingress) {
> +			ret = tcp_bpf_ingress_backlog(sk, sk_redir, msg, tosend);
> +		} else {
> +			release_sock(sk);
> +
> +			origsize = msg->sg.size;
> +			ret = tcp_bpf_sendmsg_redir(sk_redir, redir_ingress,
> +						    msg, tosend, flags);

now sendmsg redir is really only for egress here so we can skip handling
the ingress here. And the entire existing sk_psock_backlog work queue because
its handled by tcp_bpf_ingress_backlog?

> +			sent = origsize - msg->sg.size;
> +
> +			lock_sock(sk);
> +			sk_mem_uncharge(sk, sent);
> +		}

I like the direction but any blockers to just get this out of TLS as
well? I'm happy to do it if needed I would prefer not to try and
support both styles at the same time.
Zijian Zhang March 12, 2025, 6:20 p.m. UTC | #2
On 3/11/25 1:54 PM, John Fastabend wrote:
> On 2025-03-06 14:02:05, Cong Wang wrote:
>> From: Zijian Zhang <zijianzhang@bytedance.com>
[...]
>> +static int tcp_bpf_ingress_backlog(struct sock *sk, struct sock *sk_redir,
>> +				   struct sk_msg *msg, u32 apply_bytes)
>> +{
>> +	bool ingress_msg_empty = false;
>> +	bool apply = apply_bytes;
>> +	struct sk_psock *psock;
>> +	struct sk_msg *tmp;
>> +	u32 tot_size = 0;
>> +	int ret = 0;
>> +	u8 nonagle;
>> +
>> +	psock = sk_psock_get(sk_redir);
>> +	if (unlikely(!psock))
>> +		return -EPIPE;
>> +
>> +	spin_lock(&psock->backlog_msg_lock);
>> +	/* If possible, coalesce the curr sk_msg to the last sk_msg from the
>> +	 * psock->backlog_msg.
>> +	 */
>> +	if (!list_empty(&psock->backlog_msg)) {
>> +		struct sk_msg *last;
>> +
>> +		last = list_last_entry(&psock->backlog_msg, struct sk_msg, list);
>> +		if (last->sk == sk) {
>> +			int i = tcp_bpf_coalesce_msg(last, msg, &apply_bytes,
>> +						     &tot_size);
>> +
>> +			if (i == msg->sg.end || (apply && !apply_bytes))
>> +				goto out_unlock;
>> +		}
>> +	}
>> +
>> +	/* Otherwise, allocate a new sk_msg and transfer the data from the
>> +	 * passed in msg to it.
>> +	 */
>> +	tmp = sk_msg_alloc(GFP_ATOMIC);
>> +	if (!tmp) {
>> +		ret = -ENOMEM;
>> +		spin_unlock(&psock->backlog_msg_lock);
>> +		goto error;
>> +	}
>> +
>> +	tmp->sk = sk;
>> +	sock_hold(tmp->sk);
>> +	tmp->sg.start = msg->sg.start;
>> +	tcp_bpf_xfer_msg(tmp, msg, &apply_bytes, &tot_size);
>> +
>> +	ingress_msg_empty = list_empty(&psock->ingress_msg);
>> +	list_add_tail(&tmp->list, &psock->backlog_msg);
>> +
>> +out_unlock:
>> +	spin_unlock(&psock->backlog_msg_lock);
>> +	sk_wmem_queued_add(sk, tot_size);
>> +
>> +	/* At this point, the data has been handled well. If one of the
>> +	 * following conditions is met, we can notify the peer socket in
>> +	 * the context of this system call immediately.
>> +	 * 1. If the write buffer has been used up;
>> +	 * 2. Or, the message size is larger than TCP_BPF_GSO_SIZE;
>> +	 * 3. Or, the ingress queue was empty;
>> +	 * 4. Or, the tcp socket is set to no_delay.
>> +	 * Otherwise, kick off the backlog work so that we can have some
>> +	 * time to wait for any incoming messages before sending a
>> +	 * notification to the peer socket.
>> +	 */
> 
> I think this could also be used to get the bpf_msg_cork_bytes working
> directly in receive path. This also means we can avoid using
> strparser in the receive path. The strparser case has noticable
> overhead for us that is significant enough we don't use it.
> Not that we need to do it all in one patch set.
> 

Sounds promising!

>> +	nonagle = tcp_sk(sk)->nonagle;
>> +	if (!sk_stream_memory_free(sk) ||
>> +	    tot_size >= TCP_BPF_GSO_SIZE || ingress_msg_empty ||
>> +	    (!(nonagle & TCP_NAGLE_CORK) && (nonagle & TCP_NAGLE_OFF))) {
>> +		release_sock(sk);
>> +		psock->backlog_work_delayed = false;
>> +		sk_psock_backlog_msg(psock);
>> +		lock_sock(sk);
>> +	} else {
>> +		sk_psock_run_backlog_work(psock, false);
>> +	}
>> +
>> +error:
>> +	sk_psock_put(sk_redir, psock);
>> +	return ret;
>> +}
>> +
>>   static int tcp_bpf_send_verdict(struct sock *sk, struct sk_psock *psock,
>>   				struct sk_msg *msg, int *copied, int flags)
>>   {
>> @@ -442,18 +619,24 @@ static int tcp_bpf_send_verdict(struct sock *sk, struct sk_psock *psock,
>>   			cork = true;
>>   			psock->cork = NULL;
>>   		}
>> -		release_sock(sk);
>>   
>> -		origsize = msg->sg.size;
>> -		ret = tcp_bpf_sendmsg_redir(sk_redir, redir_ingress,
>> -					    msg, tosend, flags);
> 
> The only sticky bit here that is blocking folding this entire tcp_bpf_sendmsg_redir
> logic out is tls user right?
> 

Right, tls also uses tcp_bpf_sendmsg_redir.

>> -		sent = origsize - msg->sg.size;
>> +		if (redir_ingress) {
>> +			ret = tcp_bpf_ingress_backlog(sk, sk_redir, msg, tosend);
>> +		} else {
>> +			release_sock(sk);
>> +
>> +			origsize = msg->sg.size;
>> +			ret = tcp_bpf_sendmsg_redir(sk_redir, redir_ingress,
>> +						    msg, tosend, flags);
> 
> now sendmsg redir is really only for egress here so we can skip handling
> the ingress here. And the entire existing sk_psock_backlog work queue because
> its handled by tcp_bpf_ingress_backlog?
> 

Agreed, tcp_bpf_sendmsg_redir here is only for egress.

 From my understanding,
as for sk_psock_backlog, it handles the ingress skb in psock-
 >ingress_skb.
[skb RX->Redirect->sk_msg(skb backed-up) RX]

On the other hand, tcp_bpf_ingress_backlog mainly focus on moving the
corked sk_msg from sender socket queue "backlog_msg" to receiver socket
psock->ingress_msg. These sk_msgs are redirected using __SK_REDIRECT
by tcp_bpf_sendmsg, in other words, these sk_msg->skb should be NULL.
[sk_msg TX->Redirect->sk_msg(skb is NULL) RX]

IMHO, they are mostly mutually independent.

>> +			sent = origsize - msg->sg.size;
>> +
>> +			lock_sock(sk);
>> +			sk_mem_uncharge(sk, sent);
>> +		}
> 
> I like the direction but any blockers to just get this out of TLS as
> well? I'm happy to do it if needed I would prefer not to try and
> support both styles at the same time.

I haven't looked into TLS mainly because I'm not very familiar with it. 
If you're interested, it would be great if you could take a look in the
future :)
diff mbox series

Patch

diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h
index 7620f170c4b1..2531428168ad 100644
--- a/include/linux/skmsg.h
+++ b/include/linux/skmsg.h
@@ -15,6 +15,8 @@ 
 
 #define MAX_MSG_FRAGS			MAX_SKB_FRAGS
 #define NR_MSG_FRAG_IDS			(MAX_MSG_FRAGS + 1)
+/* GSO size for TCP BPF backlog processing */
+#define TCP_BPF_GSO_SIZE		65536
 
 enum __sk_action {
 	__SK_DROP = 0,
@@ -85,8 +87,10 @@  struct sk_psock {
 	struct sock			*sk_redir;
 	u32				apply_bytes;
 	u32				cork_bytes;
+	u32				backlog_since_notify;
 	u8				eval;
 	u8 				redir_ingress : 1; /* undefined if sk_redir is null */
+	u8				backlog_work_delayed : 1;
 	struct sk_msg			*cork;
 	struct sk_psock_progs		progs;
 #if IS_ENABLED(CONFIG_BPF_STREAM_PARSER)
@@ -97,6 +101,9 @@  struct sk_psock {
 	struct sk_buff_head		ingress_skb;
 	struct list_head		ingress_msg;
 	spinlock_t			ingress_lock;
+	struct list_head		backlog_msg;
+	/* spin_lock for backlog_msg and backlog_since_notify */
+	spinlock_t			backlog_msg_lock;
 	unsigned long			state;
 	struct list_head		link;
 	spinlock_t			link_lock;
@@ -117,11 +124,13 @@  struct sk_psock {
 	struct mutex			work_mutex;
 	struct sk_psock_work_state	work_state;
 	struct delayed_work		work;
+	struct delayed_work		backlog_work;
 	struct sock			*sk_pair;
 	struct rcu_work			rwork;
 };
 
 struct sk_msg *sk_msg_alloc(gfp_t gfp);
+bool sk_msg_try_coalesce_ok(struct sk_msg *msg, int elem_first_coalesce);
 int sk_msg_expand(struct sock *sk, struct sk_msg *msg, int len,
 		  int elem_first_coalesce);
 int sk_msg_clone(struct sock *sk, struct sk_msg *dst, struct sk_msg *src,
@@ -396,9 +405,19 @@  static inline void sk_psock_report_error(struct sk_psock *psock, int err)
 	sk_error_report(sk);
 }
 
+void sk_psock_backlog_msg(struct sk_psock *psock);
 struct sk_psock *sk_psock_init(struct sock *sk, int node);
 void sk_psock_stop(struct sk_psock *psock);
 
+static inline void sk_psock_run_backlog_work(struct sk_psock *psock,
+					     bool delayed)
+{
+	if (!sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED))
+		return;
+	psock->backlog_work_delayed = delayed;
+	schedule_delayed_work(&psock->backlog_work, delayed ? 1 : 0);
+}
+
 #if IS_ENABLED(CONFIG_BPF_STREAM_PARSER)
 int sk_psock_init_strp(struct sock *sk, struct sk_psock *psock);
 void sk_psock_start_strp(struct sock *sk, struct sk_psock *psock);
diff --git a/net/core/skmsg.c b/net/core/skmsg.c
index 25c53c8c9857..32507163fd2d 100644
--- a/net/core/skmsg.c
+++ b/net/core/skmsg.c
@@ -12,7 +12,7 @@ 
 
 struct kmem_cache *sk_msg_cachep;
 
-static bool sk_msg_try_coalesce_ok(struct sk_msg *msg, int elem_first_coalesce)
+bool sk_msg_try_coalesce_ok(struct sk_msg *msg, int elem_first_coalesce)
 {
 	if (msg->sg.end > msg->sg.start &&
 	    elem_first_coalesce < msg->sg.end)
@@ -707,6 +707,118 @@  static void sk_psock_backlog(struct work_struct *work)
 	mutex_unlock(&psock->work_mutex);
 }
 
+static bool backlog_notify(struct sk_psock *psock, bool m_sched_failed,
+			   bool ingress_empty)
+{
+	/* Notify if:
+	 * 1. We have corked enough bytes
+	 * 2. We have already delayed notification
+	 * 3. Memory allocation failed
+	 * 4. Ingress queue was empty and we're about to add data
+	 */
+	return psock->backlog_since_notify >= TCP_BPF_GSO_SIZE ||
+	       psock->backlog_work_delayed ||
+	       m_sched_failed ||
+	       ingress_empty;
+}
+
+static bool backlog_xfer_to_local(struct sk_psock *psock, struct sock *sk_from,
+				  struct list_head *local_head, u32 *tot_size)
+{
+	struct sock *sk = psock->sk;
+	struct sk_msg *msg, *tmp;
+	u32 size = 0;
+
+	list_for_each_entry_safe(msg, tmp, &psock->backlog_msg, list) {
+		if (msg->sk != sk_from)
+			break;
+
+		if (!__sk_rmem_schedule(sk, msg->sg.size, false))
+			return true;
+
+		list_move_tail(&msg->list, local_head);
+		sk_wmem_queued_add(msg->sk, -msg->sg.size);
+		sock_put(msg->sk);
+		msg->sk = NULL;
+		psock->backlog_since_notify += msg->sg.size;
+		size += msg->sg.size;
+	}
+
+	*tot_size = size;
+	return false;
+}
+
+/* This function handles the transfer of backlogged messages from the sender
+ * backlog queue to the ingress queue of the peer socket. Notification of data
+ * availability will be sent under some conditions.
+ */
+void sk_psock_backlog_msg(struct sk_psock *psock)
+{
+	bool rmem_schedule_failed = false;
+	struct sock *sk_from = NULL;
+	struct sock *sk = psock->sk;
+	LIST_HEAD(local_head);
+	struct sk_msg *msg;
+	bool should_notify;
+	u32 tot_size = 0;
+
+	if (!sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED))
+		return;
+
+	lock_sock(sk);
+	spin_lock(&psock->backlog_msg_lock);
+
+	msg = list_first_entry_or_null(&psock->backlog_msg,
+				       struct sk_msg, list);
+	if (!msg) {
+		should_notify = !list_empty(&psock->ingress_msg);
+		spin_unlock(&psock->backlog_msg_lock);
+		goto notify;
+	}
+
+	sk_from = msg->sk;
+	sock_hold(sk_from);
+
+	rmem_schedule_failed = backlog_xfer_to_local(psock, sk_from,
+						     &local_head, &tot_size);
+	should_notify = backlog_notify(psock, rmem_schedule_failed,
+				       list_empty(&psock->ingress_msg));
+	spin_unlock(&psock->backlog_msg_lock);
+
+	spin_lock_bh(&psock->ingress_lock);
+	list_splice_tail_init(&local_head, &psock->ingress_msg);
+	spin_unlock_bh(&psock->ingress_lock);
+
+	atomic_add(tot_size, &sk->sk_rmem_alloc);
+	sk_mem_charge(sk, tot_size);
+
+notify:
+	if (should_notify) {
+		psock->backlog_since_notify = 0;
+		sk_psock_data_ready(sk, psock);
+		if (!list_empty(&psock->backlog_msg))
+			sk_psock_run_backlog_work(psock, rmem_schedule_failed);
+	} else {
+		sk_psock_run_backlog_work(psock, true);
+	}
+	release_sock(sk);
+
+	if (sk_from) {
+		bool slow = lock_sock_fast(sk_from);
+
+		sk_mem_uncharge(sk_from, tot_size);
+		unlock_sock_fast(sk_from, slow);
+		sock_put(sk_from);
+	}
+}
+
+static void sk_psock_backlog_msg_work(struct work_struct *work)
+{
+	struct delayed_work *dwork = to_delayed_work(work);
+
+	sk_psock_backlog_msg(container_of(dwork, struct sk_psock, backlog_work));
+}
+
 struct sk_psock *sk_psock_init(struct sock *sk, int node)
 {
 	struct sk_psock *psock;
@@ -744,8 +856,11 @@  struct sk_psock *sk_psock_init(struct sock *sk, int node)
 
 	INIT_DELAYED_WORK(&psock->work, sk_psock_backlog);
 	mutex_init(&psock->work_mutex);
+	INIT_DELAYED_WORK(&psock->backlog_work, sk_psock_backlog_msg_work);
 	INIT_LIST_HEAD(&psock->ingress_msg);
 	spin_lock_init(&psock->ingress_lock);
+	INIT_LIST_HEAD(&psock->backlog_msg);
+	spin_lock_init(&psock->backlog_msg_lock);
 	skb_queue_head_init(&psock->ingress_skb);
 
 	sk_psock_set_state(psock, SK_PSOCK_TX_ENABLED);
@@ -799,6 +914,26 @@  static void __sk_psock_zap_ingress(struct sk_psock *psock)
 	__sk_psock_purge_ingress_msg(psock);
 }
 
+static void __sk_psock_purge_backlog_msg(struct sk_psock *psock)
+{
+	struct sk_msg *msg, *tmp;
+
+	spin_lock(&psock->backlog_msg_lock);
+	list_for_each_entry_safe(msg, tmp, &psock->backlog_msg, list) {
+		struct sock *sk_from = msg->sk;
+		bool slow;
+
+		list_del(&msg->list);
+		slow = lock_sock_fast(sk_from);
+		sk_wmem_queued_add(sk_from, -msg->sg.size);
+		sock_put(sk_from);
+		sk_msg_free(sk_from, msg);
+		unlock_sock_fast(sk_from, slow);
+		kfree_sk_msg(msg);
+	}
+	spin_unlock(&psock->backlog_msg_lock);
+}
+
 static void sk_psock_link_destroy(struct sk_psock *psock)
 {
 	struct sk_psock_link *link, *tmp;
@@ -828,7 +963,9 @@  static void sk_psock_destroy(struct work_struct *work)
 	sk_psock_done_strp(psock);
 
 	cancel_delayed_work_sync(&psock->work);
+	cancel_delayed_work_sync(&psock->backlog_work);
 	__sk_psock_zap_ingress(psock);
+	__sk_psock_purge_backlog_msg(psock);
 	mutex_destroy(&psock->work_mutex);
 
 	psock_progs_drop(&psock->progs);
diff --git a/net/ipv4/tcp_bpf.c b/net/ipv4/tcp_bpf.c
index f0ef41c951e2..82d437210f6f 100644
--- a/net/ipv4/tcp_bpf.c
+++ b/net/ipv4/tcp_bpf.c
@@ -381,6 +381,183 @@  static int tcp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 	return ret;
 }
 
+static int tcp_bpf_coalesce_msg(struct sk_msg *last, struct sk_msg *msg,
+				u32 *apply_bytes_ptr, u32 *tot_size)
+{
+	struct scatterlist *sge_from, *sge_to;
+	u32 apply_bytes = *apply_bytes_ptr;
+	bool apply = apply_bytes;
+	int i = msg->sg.start;
+	u32 size;
+
+	while (i != msg->sg.end) {
+		int last_sge_idx = last->sg.end;
+
+		if (sk_msg_full(last))
+			break;
+
+		sge_from = sk_msg_elem(msg, i);
+		sk_msg_iter_var_prev(last_sge_idx);
+		sge_to = &last->sg.data[last_sge_idx];
+
+		size = (apply && apply_bytes < sge_from->length) ?
+			apply_bytes : sge_from->length;
+		if (sk_msg_try_coalesce_ok(last, last_sge_idx) &&
+		    sg_page(sge_to) == sg_page(sge_from) &&
+		    sge_to->offset + sge_to->length == sge_from->offset) {
+			sge_to->length += size;
+		} else {
+			sge_to = &last->sg.data[last->sg.end];
+			sg_unmark_end(sge_to);
+			sg_set_page(sge_to, sg_page(sge_from), size,
+				    sge_from->offset);
+			get_page(sg_page(sge_to));
+			sk_msg_iter_next(last, end);
+		}
+
+		sge_from->length -= size;
+		sge_from->offset += size;
+
+		if (sge_from->length == 0) {
+			put_page(sg_page(sge_to));
+			sk_msg_iter_var_next(i);
+		}
+
+		msg->sg.size -= size;
+		last->sg.size += size;
+		*tot_size += size;
+
+		if (apply) {
+			apply_bytes -= size;
+			if (!apply_bytes)
+				break;
+		}
+	}
+
+	if (apply)
+		*apply_bytes_ptr = apply_bytes;
+
+	msg->sg.start = i;
+	return i;
+}
+
+static void tcp_bpf_xfer_msg(struct sk_msg *dst, struct sk_msg *msg,
+			     u32 *apply_bytes_ptr, u32 *tot_size)
+{
+	u32 apply_bytes = *apply_bytes_ptr;
+	bool apply = apply_bytes;
+	struct scatterlist *sge;
+	int i = msg->sg.start;
+	u32 size;
+
+	do {
+		sge = sk_msg_elem(msg, i);
+		size = (apply && apply_bytes < sge->length) ?
+			apply_bytes : sge->length;
+
+		sk_msg_xfer(dst, msg, i, size);
+		*tot_size += size;
+		if (sge->length)
+			get_page(sk_msg_page(dst, i));
+		sk_msg_iter_var_next(i);
+		dst->sg.end = i;
+		if (apply) {
+			apply_bytes -= size;
+			if (!apply_bytes) {
+				if (sge->length)
+					sk_msg_iter_var_prev(i);
+				break;
+			}
+		}
+	} while (i != msg->sg.end);
+
+	if (apply)
+		*apply_bytes_ptr = apply_bytes;
+	msg->sg.start = i;
+}
+
+static int tcp_bpf_ingress_backlog(struct sock *sk, struct sock *sk_redir,
+				   struct sk_msg *msg, u32 apply_bytes)
+{
+	bool ingress_msg_empty = false;
+	bool apply = apply_bytes;
+	struct sk_psock *psock;
+	struct sk_msg *tmp;
+	u32 tot_size = 0;
+	int ret = 0;
+	u8 nonagle;
+
+	psock = sk_psock_get(sk_redir);
+	if (unlikely(!psock))
+		return -EPIPE;
+
+	spin_lock(&psock->backlog_msg_lock);
+	/* If possible, coalesce the curr sk_msg to the last sk_msg from the
+	 * psock->backlog_msg.
+	 */
+	if (!list_empty(&psock->backlog_msg)) {
+		struct sk_msg *last;
+
+		last = list_last_entry(&psock->backlog_msg, struct sk_msg, list);
+		if (last->sk == sk) {
+			int i = tcp_bpf_coalesce_msg(last, msg, &apply_bytes,
+						     &tot_size);
+
+			if (i == msg->sg.end || (apply && !apply_bytes))
+				goto out_unlock;
+		}
+	}
+
+	/* Otherwise, allocate a new sk_msg and transfer the data from the
+	 * passed in msg to it.
+	 */
+	tmp = sk_msg_alloc(GFP_ATOMIC);
+	if (!tmp) {
+		ret = -ENOMEM;
+		spin_unlock(&psock->backlog_msg_lock);
+		goto error;
+	}
+
+	tmp->sk = sk;
+	sock_hold(tmp->sk);
+	tmp->sg.start = msg->sg.start;
+	tcp_bpf_xfer_msg(tmp, msg, &apply_bytes, &tot_size);
+
+	ingress_msg_empty = list_empty(&psock->ingress_msg);
+	list_add_tail(&tmp->list, &psock->backlog_msg);
+
+out_unlock:
+	spin_unlock(&psock->backlog_msg_lock);
+	sk_wmem_queued_add(sk, tot_size);
+
+	/* At this point, the data has been handled well. If one of the
+	 * following conditions is met, we can notify the peer socket in
+	 * the context of this system call immediately.
+	 * 1. If the write buffer has been used up;
+	 * 2. Or, the message size is larger than TCP_BPF_GSO_SIZE;
+	 * 3. Or, the ingress queue was empty;
+	 * 4. Or, the tcp socket is set to no_delay.
+	 * Otherwise, kick off the backlog work so that we can have some
+	 * time to wait for any incoming messages before sending a
+	 * notification to the peer socket.
+	 */
+	nonagle = tcp_sk(sk)->nonagle;
+	if (!sk_stream_memory_free(sk) ||
+	    tot_size >= TCP_BPF_GSO_SIZE || ingress_msg_empty ||
+	    (!(nonagle & TCP_NAGLE_CORK) && (nonagle & TCP_NAGLE_OFF))) {
+		release_sock(sk);
+		psock->backlog_work_delayed = false;
+		sk_psock_backlog_msg(psock);
+		lock_sock(sk);
+	} else {
+		sk_psock_run_backlog_work(psock, false);
+	}
+
+error:
+	sk_psock_put(sk_redir, psock);
+	return ret;
+}
+
 static int tcp_bpf_send_verdict(struct sock *sk, struct sk_psock *psock,
 				struct sk_msg *msg, int *copied, int flags)
 {
@@ -442,18 +619,24 @@  static int tcp_bpf_send_verdict(struct sock *sk, struct sk_psock *psock,
 			cork = true;
 			psock->cork = NULL;
 		}
-		release_sock(sk);
 
-		origsize = msg->sg.size;
-		ret = tcp_bpf_sendmsg_redir(sk_redir, redir_ingress,
-					    msg, tosend, flags);
-		sent = origsize - msg->sg.size;
+		if (redir_ingress) {
+			ret = tcp_bpf_ingress_backlog(sk, sk_redir, msg, tosend);
+		} else {
+			release_sock(sk);
+
+			origsize = msg->sg.size;
+			ret = tcp_bpf_sendmsg_redir(sk_redir, redir_ingress,
+						    msg, tosend, flags);
+			sent = origsize - msg->sg.size;
+
+			lock_sock(sk);
+			sk_mem_uncharge(sk, sent);
+		}
 
 		if (eval == __SK_REDIRECT)
 			sock_put(sk_redir);
 
-		lock_sock(sk);
-		sk_mem_uncharge(sk, sent);
 		if (unlikely(ret < 0)) {
 			int free = sk_msg_free(sk, msg);