diff mbox series

[bpf-next,v2,2/5] af_unix: add unix_stream_proto for sockmap

Message ID 20210729212402.1043211-3-jiang.wang@bytedance.com (mailing list archive)
State New
Headers show
Series sockmap: add sockmap support for unix stream socket | expand

Commit Message

Jiang Wang . July 29, 2021, 9:23 p.m. UTC
Previously, sockmap for AF_UNIX protocol only supports
dgram type. This patch add unix stream type support, which
is similar to unix_dgram_proto. To support sockmap, dgram
and stream cannot share the same unix_proto anymore, because
they have different implementations, such as unhash for stream
type (which will remove closed or disconnected sockets from the map),
so rename unix_proto to unix_dgram_proto and add a new
unix_stream_proto.

Also implement stream related sockmap functions.
And add dgram key words to those dgram specific functions.

Signed-off-by: Jiang Wang <jiang.wang@bytedance.com>
Reviewed-by: Cong Wang <cong.wang@bytedance.com>
---
 include/net/af_unix.h |  8 +++-
 net/core/sock_map.c   |  8 +++-
 net/unix/af_unix.c    | 76 ++++++++++++++++++++++++++++++-----
 net/unix/unix_bpf.c   | 93 +++++++++++++++++++++++++++++++++----------
 4 files changed, 149 insertions(+), 36 deletions(-)

Comments

Jakub Sitnicki July 30, 2021, 2:13 p.m. UTC | #1
On Thu, Jul 29, 2021 at 11:23 PM CEST, Jiang Wang wrote:
> Previously, sockmap for AF_UNIX protocol only supports
> dgram type. This patch add unix stream type support, which
> is similar to unix_dgram_proto. To support sockmap, dgram
> and stream cannot share the same unix_proto anymore, because
> they have different implementations, such as unhash for stream
> type (which will remove closed or disconnected sockets from the map),
> so rename unix_proto to unix_dgram_proto and add a new
> unix_stream_proto.
>
> Also implement stream related sockmap functions.
> And add dgram key words to those dgram specific functions.
>
> Signed-off-by: Jiang Wang <jiang.wang@bytedance.com>
> Reviewed-by: Cong Wang <cong.wang@bytedance.com>
> ---

It seems that with commit c63829182c37 ("af_unix: Implement
->psock_update_sk_prot()") we have enabled inserting dgram, stream, and
seqpacket UNIX sockets into sockmap.

After all, in ->map_update_elem we only check if
sk->sk_prot->psock_update_sk_prot is set (sock_map_sk_is_suitable).

Socket can be in listening, established or disconnected (TCP_CLOSE)
state, that is before bind+listen/connect, or after connect(AF_UNSPEC).

For connection-oriented socket types (stream, seqpacket) there's not
much you can do with disconnected sockets. I think we should limit the
allowed states to listening and established for UNIX domain, as we do
for TCP.

AFAIU we also seem to be already allowing redirect to connected stream
(and dgram, and seqpacket) UNIX sockets. sock_map_redirect_allowed()
checks only if a socket is in TCP_ESTABLISHED state for anything else
than TCP. Not sure what it leads to, though.

Is this change is also a fix in a sense?

[...]

> diff --git a/net/core/sock_map.c b/net/core/sock_map.c
> index ae5fa4338..42f50ea7a 100644
> --- a/net/core/sock_map.c
> +++ b/net/core/sock_map.c
> @@ -517,9 +517,15 @@ static bool sk_is_tcp(const struct sock *sk)
>  	       sk->sk_protocol == IPPROTO_TCP;
>  }
>
> +static bool sk_is_unix_stream(const struct sock *sk)
> +{
> +	return sk->sk_type == SOCK_STREAM &&
> +	       sk->sk_protocol == PF_UNIX;
> +}
> +
>  static bool sock_map_redirect_allowed(const struct sock *sk)
>  {
> -	if (sk_is_tcp(sk))
> +	if (sk_is_tcp(sk) || sk_is_unix_stream(sk))
>  		return sk->sk_state != TCP_LISTEN;
>  	else
>  		return sk->sk_state == TCP_ESTABLISHED;

For the moment we can have TCP_CLOSE stream and seqpacket sockets in a
sockmap . This means that the above allows redirecting to TCP_CLOSE
connection-oriented sockets. sock_map_sk_state_allowed() needs an update
for the this check to be effective. And we also need to account for
seqpacket.


> diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
> index 0ae3fc4c8..cfcd0d9e5 100644
> --- a/net/unix/af_unix.c
> +++ b/net/unix/af_unix.c
> @@ -791,17 +791,35 @@ static void unix_close(struct sock *sk, long timeout)
>  	 */
>  }
>
> -struct proto unix_proto = {
> -	.name			= "UNIX",
> +static void unix_unhash(struct sock *sk)
> +{
> +	/* Nothing to do here, unix socket does not need a ->unhash().
> +	 * This is merely for sockmap.
> +	 */
> +}
> +
> +struct proto unix_dgram_proto = {
> +	.name			= "UNIX-DGRAM",
>  	.owner			= THIS_MODULE,
>  	.obj_size		= sizeof(struct unix_sock),
>  	.close			= unix_close,
>  #ifdef CONFIG_BPF_SYSCALL
> -	.psock_update_sk_prot	= unix_bpf_update_proto,
> +	.psock_update_sk_prot	= unix_dgram_bpf_update_proto,
>  #endif
>  };
>
> -static struct sock *unix_create1(struct net *net, struct socket *sock, int kern)
> +struct proto unix_stream_proto = {
> +	.name			= "UNIX-STREAM",
> +	.owner			= THIS_MODULE,
> +	.obj_size		= sizeof(struct unix_sock),
> +	.close			= unix_close,
> +	.unhash			= unix_unhash,
> +#ifdef CONFIG_BPF_SYSCALL
> +	.psock_update_sk_prot	= unix_stream_bpf_update_proto,
> +#endif
> +};
> +
> +static struct sock *unix_create1(struct net *net, struct socket *sock, int kern, int type)
>  {
>  	struct sock *sk = NULL;
>  	struct unix_sock *u;
> @@ -810,7 +828,11 @@ static struct sock *unix_create1(struct net *net, struct socket *sock, int kern)
>  	if (atomic_long_read(&unix_nr_socks) > 2 * get_max_files())
>  		goto out;
>
> -	sk = sk_alloc(net, PF_UNIX, GFP_KERNEL, &unix_proto, kern);
> +	if (type == SOCK_STREAM)
> +		sk = sk_alloc(net, PF_UNIX, GFP_KERNEL, &unix_stream_proto, kern);
> +	else /*dgram and  seqpacket */
> +		sk = sk_alloc(net, PF_UNIX, GFP_KERNEL, &unix_dgram_proto, kern);
> +

Seqpacket also needs .unhash, right?

>  	if (!sk)
>  		goto out;
>
> @@ -872,7 +894,7 @@ static int unix_create(struct net *net, struct socket *sock, int protocol,
>  		return -ESOCKTNOSUPPORT;
>  	}
>
> -	return unix_create1(net, sock, kern) ? 0 : -ENOMEM;
> +	return unix_create1(net, sock, kern, sock->type) ? 0 : -ENOMEM;
>  }
>
>  static int unix_release(struct socket *sock)
> @@ -1286,7 +1308,7 @@ static int unix_stream_connect(struct socket *sock, struct sockaddr *uaddr,
>  	err = -ENOMEM;
>
>  	/* create new sock for complete connection */
> -	newsk = unix_create1(sock_net(sk), NULL, 0);
> +	newsk = unix_create1(sock_net(sk), NULL, 0, sock->type);
>  	if (newsk == NULL)
>  		goto out;
>
> @@ -2214,7 +2236,7 @@ static int unix_dgram_recvmsg(struct socket *sock, struct msghdr *msg, size_t si
>  	struct sock *sk = sock->sk;
>
>  #ifdef CONFIG_BPF_SYSCALL
> -	if (sk->sk_prot != &unix_proto)
> +	if (sk->sk_prot != &unix_dgram_proto)
>  		return sk->sk_prot->recvmsg(sk, msg, size, flags & MSG_DONTWAIT,
>  					    flags & ~MSG_DONTWAIT, NULL);
>  #endif
> @@ -2533,6 +2555,21 @@ static int unix_stream_read_actor(struct sk_buff *skb,
>  	return ret ?: chunk;
>  }
>
> +int __unix_stream_recvmsg(struct sock *sk, struct msghdr *msg,
> +			  size_t size, int flags)
> +{
> +	struct socket *sock = sk->sk_socket;

Nit: This intermediate variable might be not needed.

> +	struct unix_stream_read_state state = {
> +		.recv_actor = unix_stream_read_actor,
> +		.socket = sock,
> +		.msg = msg,
> +		.size = size,
> +		.flags = flags
> +	};
> +
> +	return unix_stream_read_generic(&state, true);
> +}
> +
>  static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg,
>  			       size_t size, int flags)
>  {
> @@ -2544,6 +2581,13 @@ static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg,
>  		.flags = flags
>  	};
>
> +	struct sock *sk = sock->sk;

This will generate a warning if CONFIG_BPF_SYSCALL is unset.

> +
> +#ifdef CONFIG_BPF_SYSCALL
> +	if (sk->sk_prot != &unix_stream_proto)
> +		return sk->sk_prot->recvmsg(sk, msg, size, flags & MSG_DONTWAIT,
> +					    flags & ~MSG_DONTWAIT, NULL);
> +#endif
>  	return unix_stream_read_generic(&state, true);
>  }
>
> @@ -2605,6 +2649,7 @@ static int unix_shutdown(struct socket *sock, int mode)
>
>  		int peer_mode = 0;
>
> +		other->sk_prot->unhash(other);
>  		if (mode&RCV_SHUTDOWN)
>  			peer_mode |= SEND_SHUTDOWN;
>  		if (mode&SEND_SHUTDOWN)
> @@ -2613,8 +2658,10 @@ static int unix_shutdown(struct socket *sock, int mode)
>  		other->sk_shutdown |= peer_mode;
>  		unix_state_unlock(other);
>  		other->sk_state_change(other);
> -		if (peer_mode == SHUTDOWN_MASK)
> +		if (peer_mode == SHUTDOWN_MASK) {
>  			sk_wake_async(other, SOCK_WAKE_WAITD, POLL_HUP);
> +			other->sk_state = TCP_CLOSE;
> +		}
>  		else if (peer_mode & RCV_SHUTDOWN)
>  			sk_wake_async(other, SOCK_WAKE_WAITD, POLL_IN);
>  	}
> @@ -2993,7 +3040,13 @@ static int __init af_unix_init(void)
>
>  	BUILD_BUG_ON(sizeof(struct unix_skb_parms) > sizeof_field(struct sk_buff, cb));
>
> -	rc = proto_register(&unix_proto, 1);
> +	rc = proto_register(&unix_dgram_proto, 1);
> +	if (rc != 0) {
> +		pr_crit("%s: Cannot create unix_sock SLAB cache!\n", __func__);
> +		goto out;
> +	}
> +
> +	rc = proto_register(&unix_stream_proto, 1);
>  	if (rc != 0) {
>  		pr_crit("%s: Cannot create unix_sock SLAB cache!\n", __func__);
>  		goto out;
> @@ -3009,7 +3062,8 @@ static int __init af_unix_init(void)
>  static void __exit af_unix_exit(void)
>  {
>  	sock_unregister(PF_UNIX);
> -	proto_unregister(&unix_proto);
> +	proto_unregister(&unix_dgram_proto);
> +	proto_unregister(&unix_stream_proto);
>  	unregister_pernet_subsys(&unix_net_ops);
>  }
>
> diff --git a/net/unix/unix_bpf.c b/net/unix/unix_bpf.c
> index db0cda29f..9067210d3 100644
> --- a/net/unix/unix_bpf.c
> +++ b/net/unix/unix_bpf.c
> @@ -38,9 +38,18 @@ static int unix_msg_wait_data(struct sock *sk, struct sk_psock *psock,
>  	return ret;
>  }
>
> -static int unix_dgram_bpf_recvmsg(struct sock *sk, struct msghdr *msg,
> -				  size_t len, int nonblock, int flags,
> -				  int *addr_len)
> +static int __unix_recvmsg(struct sock *sk, struct msghdr *msg,
> +			   size_t len, int flags)
> +{
> +	if (sk->sk_type == SOCK_DGRAM)
> +		return __unix_dgram_recvmsg(sk, msg, len, flags);
> +	else
> +		return __unix_stream_recvmsg(sk, msg, len, flags);
> +}

What about seqpacket? Looks like we should continue to delegate to
__unix_dgram_recvmsg, as this is what unix_seqpacket_recvmsg does.

> +
> +static int unix_bpf_recvmsg(struct sock *sk, struct msghdr *msg,
> +			    size_t len, int nonblock, int flags,
> +			    int *addr_len)
>  {
>  	struct unix_sock *u = unix_sk(sk);
>  	struct sk_psock *psock;

[...]
Cong Wang July 31, 2021, 6:23 p.m. UTC | #2
On Fri, Jul 30, 2021 at 7:14 AM Jakub Sitnicki <jakub@cloudflare.com> wrote:
>
> On Thu, Jul 29, 2021 at 11:23 PM CEST, Jiang Wang wrote:
> > Previously, sockmap for AF_UNIX protocol only supports
> > dgram type. This patch add unix stream type support, which
> > is similar to unix_dgram_proto. To support sockmap, dgram
> > and stream cannot share the same unix_proto anymore, because
> > they have different implementations, such as unhash for stream
> > type (which will remove closed or disconnected sockets from the map),
> > so rename unix_proto to unix_dgram_proto and add a new
> > unix_stream_proto.
> >
> > Also implement stream related sockmap functions.
> > And add dgram key words to those dgram specific functions.
> >
> > Signed-off-by: Jiang Wang <jiang.wang@bytedance.com>
> > Reviewed-by: Cong Wang <cong.wang@bytedance.com>
> > ---
>
> It seems that with commit c63829182c37 ("af_unix: Implement
> ->psock_update_sk_prot()") we have enabled inserting dgram, stream, and
> seqpacket UNIX sockets into sockmap.
>
> After all, in ->map_update_elem we only check if
> sk->sk_prot->psock_update_sk_prot is set (sock_map_sk_is_suitable).

Excellent point. I should check the sock type in unix_bpf_update_proto(),
and will send a fix.

>
> Socket can be in listening, established or disconnected (TCP_CLOSE)
> state, that is before bind+listen/connect, or after connect(AF_UNSPEC).
>
> For connection-oriented socket types (stream, seqpacket) there's not
> much you can do with disconnected sockets. I think we should limit the
> allowed states to listening and established for UNIX domain, as we do
> for TCP.

I think we should use ->unhash() to remove those connection-oriented
sockets, like TCP.

>
> AFAIU we also seem to be already allowing redirect to connected stream
> (and dgram, and seqpacket) UNIX sockets. sock_map_redirect_allowed()
> checks only if a socket is in TCP_ESTABLISHED state for anything else
> than TCP. Not sure what it leads to, though.

The goal is to keep all stream sockets like TCP, which only allows
established ones to stay in sockmap. For dgram, any socket state is
allowed to add to map but only established ones are allowed to redirect.

BTW, we do not have any intention to support Unix seqpacket socket
or any seqpacket.

Thanks.
diff mbox series

Patch

diff --git a/include/net/af_unix.h b/include/net/af_unix.h
index 435a2c3d5..5d04fbf8a 100644
--- a/include/net/af_unix.h
+++ b/include/net/af_unix.h
@@ -84,6 +84,8 @@  long unix_outq_len(struct sock *sk);
 
 int __unix_dgram_recvmsg(struct sock *sk, struct msghdr *msg, size_t size,
 			 int flags);
+int __unix_stream_recvmsg(struct sock *sk, struct msghdr *msg, size_t size,
+			  int flags);
 #ifdef CONFIG_SYSCTL
 int unix_sysctl_register(struct net *net);
 void unix_sysctl_unregister(struct net *net);
@@ -93,9 +95,11 @@  static inline void unix_sysctl_unregister(struct net *net) {}
 #endif
 
 #ifdef CONFIG_BPF_SYSCALL
-extern struct proto unix_proto;
+extern struct proto unix_dgram_proto;
+extern struct proto unix_stream_proto;
 
-int unix_bpf_update_proto(struct sock *sk, struct sk_psock *psock, bool restore);
+int unix_dgram_bpf_update_proto(struct sock *sk, struct sk_psock *psock, bool restore);
+int unix_stream_bpf_update_proto(struct sock *sk, struct sk_psock *psock, bool restore);
 void __init unix_bpf_build_proto(void);
 #else
 static inline void __init unix_bpf_build_proto(void)
diff --git a/net/core/sock_map.c b/net/core/sock_map.c
index ae5fa4338..42f50ea7a 100644
--- a/net/core/sock_map.c
+++ b/net/core/sock_map.c
@@ -517,9 +517,15 @@  static bool sk_is_tcp(const struct sock *sk)
 	       sk->sk_protocol == IPPROTO_TCP;
 }
 
+static bool sk_is_unix_stream(const struct sock *sk)
+{
+	return sk->sk_type == SOCK_STREAM &&
+	       sk->sk_protocol == PF_UNIX;
+}
+
 static bool sock_map_redirect_allowed(const struct sock *sk)
 {
-	if (sk_is_tcp(sk))
+	if (sk_is_tcp(sk) || sk_is_unix_stream(sk))
 		return sk->sk_state != TCP_LISTEN;
 	else
 		return sk->sk_state == TCP_ESTABLISHED;
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 0ae3fc4c8..cfcd0d9e5 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -791,17 +791,35 @@  static void unix_close(struct sock *sk, long timeout)
 	 */
 }
 
-struct proto unix_proto = {
-	.name			= "UNIX",
+static void unix_unhash(struct sock *sk)
+{
+	/* Nothing to do here, unix socket does not need a ->unhash().
+	 * This is merely for sockmap.
+	 */
+}
+
+struct proto unix_dgram_proto = {
+	.name			= "UNIX-DGRAM",
 	.owner			= THIS_MODULE,
 	.obj_size		= sizeof(struct unix_sock),
 	.close			= unix_close,
 #ifdef CONFIG_BPF_SYSCALL
-	.psock_update_sk_prot	= unix_bpf_update_proto,
+	.psock_update_sk_prot	= unix_dgram_bpf_update_proto,
 #endif
 };
 
-static struct sock *unix_create1(struct net *net, struct socket *sock, int kern)
+struct proto unix_stream_proto = {
+	.name			= "UNIX-STREAM",
+	.owner			= THIS_MODULE,
+	.obj_size		= sizeof(struct unix_sock),
+	.close			= unix_close,
+	.unhash			= unix_unhash,
+#ifdef CONFIG_BPF_SYSCALL
+	.psock_update_sk_prot	= unix_stream_bpf_update_proto,
+#endif
+};
+
+static struct sock *unix_create1(struct net *net, struct socket *sock, int kern, int type)
 {
 	struct sock *sk = NULL;
 	struct unix_sock *u;
@@ -810,7 +828,11 @@  static struct sock *unix_create1(struct net *net, struct socket *sock, int kern)
 	if (atomic_long_read(&unix_nr_socks) > 2 * get_max_files())
 		goto out;
 
-	sk = sk_alloc(net, PF_UNIX, GFP_KERNEL, &unix_proto, kern);
+	if (type == SOCK_STREAM)
+		sk = sk_alloc(net, PF_UNIX, GFP_KERNEL, &unix_stream_proto, kern);
+	else /*dgram and  seqpacket */
+		sk = sk_alloc(net, PF_UNIX, GFP_KERNEL, &unix_dgram_proto, kern);
+
 	if (!sk)
 		goto out;
 
@@ -872,7 +894,7 @@  static int unix_create(struct net *net, struct socket *sock, int protocol,
 		return -ESOCKTNOSUPPORT;
 	}
 
-	return unix_create1(net, sock, kern) ? 0 : -ENOMEM;
+	return unix_create1(net, sock, kern, sock->type) ? 0 : -ENOMEM;
 }
 
 static int unix_release(struct socket *sock)
@@ -1286,7 +1308,7 @@  static int unix_stream_connect(struct socket *sock, struct sockaddr *uaddr,
 	err = -ENOMEM;
 
 	/* create new sock for complete connection */
-	newsk = unix_create1(sock_net(sk), NULL, 0);
+	newsk = unix_create1(sock_net(sk), NULL, 0, sock->type);
 	if (newsk == NULL)
 		goto out;
 
@@ -2214,7 +2236,7 @@  static int unix_dgram_recvmsg(struct socket *sock, struct msghdr *msg, size_t si
 	struct sock *sk = sock->sk;
 
 #ifdef CONFIG_BPF_SYSCALL
-	if (sk->sk_prot != &unix_proto)
+	if (sk->sk_prot != &unix_dgram_proto)
 		return sk->sk_prot->recvmsg(sk, msg, size, flags & MSG_DONTWAIT,
 					    flags & ~MSG_DONTWAIT, NULL);
 #endif
@@ -2533,6 +2555,21 @@  static int unix_stream_read_actor(struct sk_buff *skb,
 	return ret ?: chunk;
 }
 
+int __unix_stream_recvmsg(struct sock *sk, struct msghdr *msg,
+			  size_t size, int flags)
+{
+	struct socket *sock = sk->sk_socket;
+	struct unix_stream_read_state state = {
+		.recv_actor = unix_stream_read_actor,
+		.socket = sock,
+		.msg = msg,
+		.size = size,
+		.flags = flags
+	};
+
+	return unix_stream_read_generic(&state, true);
+}
+
 static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg,
 			       size_t size, int flags)
 {
@@ -2544,6 +2581,13 @@  static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg,
 		.flags = flags
 	};
 
+	struct sock *sk = sock->sk;
+
+#ifdef CONFIG_BPF_SYSCALL
+	if (sk->sk_prot != &unix_stream_proto)
+		return sk->sk_prot->recvmsg(sk, msg, size, flags & MSG_DONTWAIT,
+					    flags & ~MSG_DONTWAIT, NULL);
+#endif
 	return unix_stream_read_generic(&state, true);
 }
 
@@ -2605,6 +2649,7 @@  static int unix_shutdown(struct socket *sock, int mode)
 
 		int peer_mode = 0;
 
+		other->sk_prot->unhash(other);
 		if (mode&RCV_SHUTDOWN)
 			peer_mode |= SEND_SHUTDOWN;
 		if (mode&SEND_SHUTDOWN)
@@ -2613,8 +2658,10 @@  static int unix_shutdown(struct socket *sock, int mode)
 		other->sk_shutdown |= peer_mode;
 		unix_state_unlock(other);
 		other->sk_state_change(other);
-		if (peer_mode == SHUTDOWN_MASK)
+		if (peer_mode == SHUTDOWN_MASK) {
 			sk_wake_async(other, SOCK_WAKE_WAITD, POLL_HUP);
+			other->sk_state = TCP_CLOSE;
+		}
 		else if (peer_mode & RCV_SHUTDOWN)
 			sk_wake_async(other, SOCK_WAKE_WAITD, POLL_IN);
 	}
@@ -2993,7 +3040,13 @@  static int __init af_unix_init(void)
 
 	BUILD_BUG_ON(sizeof(struct unix_skb_parms) > sizeof_field(struct sk_buff, cb));
 
-	rc = proto_register(&unix_proto, 1);
+	rc = proto_register(&unix_dgram_proto, 1);
+	if (rc != 0) {
+		pr_crit("%s: Cannot create unix_sock SLAB cache!\n", __func__);
+		goto out;
+	}
+
+	rc = proto_register(&unix_stream_proto, 1);
 	if (rc != 0) {
 		pr_crit("%s: Cannot create unix_sock SLAB cache!\n", __func__);
 		goto out;
@@ -3009,7 +3062,8 @@  static int __init af_unix_init(void)
 static void __exit af_unix_exit(void)
 {
 	sock_unregister(PF_UNIX);
-	proto_unregister(&unix_proto);
+	proto_unregister(&unix_dgram_proto);
+	proto_unregister(&unix_stream_proto);
 	unregister_pernet_subsys(&unix_net_ops);
 }
 
diff --git a/net/unix/unix_bpf.c b/net/unix/unix_bpf.c
index db0cda29f..9067210d3 100644
--- a/net/unix/unix_bpf.c
+++ b/net/unix/unix_bpf.c
@@ -38,9 +38,18 @@  static int unix_msg_wait_data(struct sock *sk, struct sk_psock *psock,
 	return ret;
 }
 
-static int unix_dgram_bpf_recvmsg(struct sock *sk, struct msghdr *msg,
-				  size_t len, int nonblock, int flags,
-				  int *addr_len)
+static int __unix_recvmsg(struct sock *sk, struct msghdr *msg,
+			   size_t len, int flags)
+{
+	if (sk->sk_type == SOCK_DGRAM)
+		return __unix_dgram_recvmsg(sk, msg, len, flags);
+	else
+		return __unix_stream_recvmsg(sk, msg, len, flags);
+}
+
+static int unix_bpf_recvmsg(struct sock *sk, struct msghdr *msg,
+			    size_t len, int nonblock, int flags,
+			    int *addr_len)
 {
 	struct unix_sock *u = unix_sk(sk);
 	struct sk_psock *psock;
@@ -48,12 +57,12 @@  static int unix_dgram_bpf_recvmsg(struct sock *sk, struct msghdr *msg,
 
 	psock = sk_psock_get(sk);
 	if (unlikely(!psock))
-		return __unix_dgram_recvmsg(sk, msg, len, flags);
+		return __unix_recvmsg(sk, msg, len, flags);
 
 	mutex_lock(&u->iolock);
 	if (!skb_queue_empty(&sk->sk_receive_queue) &&
 	    sk_psock_queue_empty(psock)) {
-		ret = __unix_dgram_recvmsg(sk, msg, len, flags);
+		ret = __unix_recvmsg(sk, msg, len, flags);
 		goto out;
 	}
 
@@ -68,7 +77,7 @@  static int unix_dgram_bpf_recvmsg(struct sock *sk, struct msghdr *msg,
 		if (data) {
 			if (!sk_psock_queue_empty(psock))
 				goto msg_bytes_ready;
-			ret = __unix_dgram_recvmsg(sk, msg, len, flags);
+			ret = __unix_recvmsg(sk, msg, len, flags);
 			goto out;
 		}
 		copied = -EAGAIN;
@@ -80,30 +89,55 @@  static int unix_dgram_bpf_recvmsg(struct sock *sk, struct msghdr *msg,
 	return ret;
 }
 
-static struct proto *unix_prot_saved __read_mostly;
-static DEFINE_SPINLOCK(unix_prot_lock);
-static struct proto unix_bpf_prot;
+static struct proto *unix_dgram_prot_saved __read_mostly;
+static DEFINE_SPINLOCK(unix_dgram_prot_lock);
+static struct proto unix_dgram_bpf_prot;
+
+static struct proto *unix_stream_prot_saved __read_mostly;
+static DEFINE_SPINLOCK(unix_stream_prot_lock);
+static struct proto unix_stream_bpf_prot;
+
+static void unix_dgram_bpf_rebuild_protos(struct proto *prot, const struct proto *base)
+{
+	*prot        = *base;
+	prot->close  = sock_map_close;
+	prot->recvmsg = unix_bpf_recvmsg;
+}
 
-static void unix_bpf_rebuild_protos(struct proto *prot, const struct proto *base)
+static void unix_stream_bpf_rebuild_protos(struct proto *prot,
+					   const struct proto *base)
 {
 	*prot        = *base;
 	prot->close  = sock_map_close;
-	prot->recvmsg = unix_dgram_bpf_recvmsg;
+	prot->recvmsg = unix_bpf_recvmsg;
+	prot->unhash  = sock_map_unhash;
 }
 
-static void unix_bpf_check_needs_rebuild(struct proto *ops)
+static void unix_dgram_bpf_check_needs_rebuild(struct proto *ops)
 {
-	if (unlikely(ops != smp_load_acquire(&unix_prot_saved))) {
-		spin_lock_bh(&unix_prot_lock);
-		if (likely(ops != unix_prot_saved)) {
-			unix_bpf_rebuild_protos(&unix_bpf_prot, ops);
-			smp_store_release(&unix_prot_saved, ops);
+	if (unlikely(ops != smp_load_acquire(&unix_dgram_prot_saved))) {
+		spin_lock_bh(&unix_dgram_prot_lock);
+		if (likely(ops != unix_dgram_prot_saved)) {
+			unix_dgram_bpf_rebuild_protos(&unix_dgram_bpf_prot, ops);
+			smp_store_release(&unix_dgram_prot_saved, ops);
 		}
-		spin_unlock_bh(&unix_prot_lock);
+		spin_unlock_bh(&unix_dgram_prot_lock);
 	}
 }
 
-int unix_bpf_update_proto(struct sock *sk, struct sk_psock *psock, bool restore)
+static void unix_stream_bpf_check_needs_rebuild(struct proto *ops)
+{
+	if (unlikely(ops != smp_load_acquire(&unix_stream_prot_saved))) {
+		spin_lock_bh(&unix_stream_prot_lock);
+		if (likely(ops != unix_stream_prot_saved)) {
+			unix_stream_bpf_rebuild_protos(&unix_stream_bpf_prot, ops);
+			smp_store_release(&unix_stream_prot_saved, ops);
+		}
+		spin_unlock_bh(&unix_stream_prot_lock);
+	}
+}
+
+int unix_dgram_bpf_update_proto(struct sock *sk, struct sk_psock *psock, bool restore)
 {
 	if (restore) {
 		sk->sk_write_space = psock->saved_write_space;
@@ -111,12 +145,27 @@  int unix_bpf_update_proto(struct sock *sk, struct sk_psock *psock, bool restore)
 		return 0;
 	}
 
-	unix_bpf_check_needs_rebuild(psock->sk_proto);
-	WRITE_ONCE(sk->sk_prot, &unix_bpf_prot);
+	unix_dgram_bpf_check_needs_rebuild(psock->sk_proto);
+	WRITE_ONCE(sk->sk_prot, &unix_dgram_bpf_prot);
+	return 0;
+}
+
+int unix_stream_bpf_update_proto(struct sock *sk, struct sk_psock *psock, bool restore)
+{
+	if (restore) {
+		sk->sk_write_space = psock->saved_write_space;
+		WRITE_ONCE(sk->sk_prot, psock->sk_proto);
+		return 0;
+	}
+
+	unix_stream_bpf_check_needs_rebuild(psock->sk_proto);
+	WRITE_ONCE(sk->sk_prot, &unix_stream_bpf_prot);
 	return 0;
 }
 
 void __init unix_bpf_build_proto(void)
 {
-	unix_bpf_rebuild_protos(&unix_bpf_prot, &unix_proto);
+	unix_dgram_bpf_rebuild_protos(&unix_dgram_bpf_prot, &unix_dgram_proto);
+	unix_stream_bpf_rebuild_protos(&unix_stream_bpf_prot, &unix_stream_proto);
+
 }