diff mbox

[v3,06/14] SUNRPC: add AF_VSOCK support to xprtsock.c

Message ID 20170630132352.32133-7-stefanha@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Stefan Hajnoczi June 30, 2017, 1:23 p.m. UTC
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 include/linux/sunrpc/xprt.h |   1 +
 net/sunrpc/xprtsock.c       | 385 +++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 381 insertions(+), 5 deletions(-)

Comments

Jeff Layton Nov. 7, 2017, 1:46 p.m. UTC | #1
On Fri, 2017-06-30 at 14:23 +0100, Stefan Hajnoczi wrote:
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>  include/linux/sunrpc/xprt.h |   1 +
>  net/sunrpc/xprtsock.c       | 385 +++++++++++++++++++++++++++++++++++++++++++-
>  2 files changed, 381 insertions(+), 5 deletions(-)
> 
> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
> index eab1c74..c038d8a 100644
> --- a/include/linux/sunrpc/xprt.h
> +++ b/include/linux/sunrpc/xprt.h
> @@ -170,6 +170,7 @@ enum xprt_transports {
>  	XPRT_TRANSPORT_RDMA	= 256,
>  	XPRT_TRANSPORT_BC_RDMA	= XPRT_TRANSPORT_RDMA | XPRT_TRANSPORT_BC,
>  	XPRT_TRANSPORT_LOCAL	= 257,
> +	XPRT_TRANSPORT_VSOCK	= 258,
>  };
>  
>  struct rpc_xprt {
> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> index fd0c8b1..cc343b91 100644
> --- a/net/sunrpc/xprtsock.c
> +++ b/net/sunrpc/xprtsock.c
> @@ -46,6 +46,7 @@
>  #include <net/checksum.h>
>  #include <net/udp.h>
>  #include <net/tcp.h>
> +#include <net/af_vsock.h>
>  
>  #include <trace/events/sunrpc.h>
>  
> @@ -271,6 +272,13 @@ static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
>  		sin6 = xs_addr_in6(xprt);
>  		snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
>  		break;
> +	case AF_VSOCK:
> +		(void)rpc_ntop(sap, buf, sizeof(buf));
> +		xprt->address_strings[RPC_DISPLAY_ADDR] =
> +						kstrdup(buf, GFP_KERNEL);
> +		snprintf(buf, sizeof(buf), "%08x",
> +			 ((struct sockaddr_vm *)sap)->svm_cid);
> +		break;
>  	default:
>  		BUG();
>  	}
> @@ -1881,21 +1889,30 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock)
>  			nloop++;
>  	} while (err == -EADDRINUSE && nloop != 2);
>  
> -	if (myaddr.ss_family == AF_INET)
> +	switch (myaddr.ss_family) {
> +	case AF_INET:
>  		dprintk("RPC:       %s %pI4:%u: %s (%d)\n", __func__,
>  				&((struct sockaddr_in *)&myaddr)->sin_addr,
>  				port, err ? "failed" : "ok", err);
> -	else
> +		break;
> +	case AF_INET6:
>  		dprintk("RPC:       %s %pI6:%u: %s (%d)\n", __func__,
>  				&((struct sockaddr_in6 *)&myaddr)->sin6_addr,
>  				port, err ? "failed" : "ok", err);
> +		break;
> +	case AF_VSOCK:
> +		dprintk("RPC:       %s %u:%u: %s (%d)\n", __func__,
> +				((struct sockaddr_vm *)&myaddr)->svm_cid,
> +				port, err ? "failed" : "ok", err);
> +		break;
> +	}
>  	return err;
>  }
>  
>  /*
> - * We don't support autobind on AF_LOCAL sockets
> + * We don't support autobind on AF_LOCAL and AF_VSOCK sockets
>   */
> -static void xs_local_rpcbind(struct rpc_task *task)
> +static void xs_dummy_rpcbind(struct rpc_task *task)
>  {
>  	xprt_set_bound(task->tk_xprt);
>  }
> @@ -1932,6 +1949,14 @@ static inline void xs_reclassify_socket6(struct socket *sock)
>  		&xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
>  }
>  
> +static inline void xs_reclassify_socket_vsock(struct socket *sock)
> +{
> +	struct sock *sk = sock->sk;
> +
> +	sock_lock_init_class_and_name(sk, "slock-AF_VSOCK-RPC",
> +		&xs_slock_key[1], "sk_lock-AF_VSOCK-RPC", &xs_key[1]);
> +}
> +
>  static inline void xs_reclassify_socket(int family, struct socket *sock)
>  {
>  	if (WARN_ON_ONCE(!sock_allow_reclassification(sock->sk)))
> @@ -1947,6 +1972,9 @@ static inline void xs_reclassify_socket(int family, struct socket *sock)
>  	case AF_INET6:
>  		xs_reclassify_socket6(sock);
>  		break;
> +	case AF_VSOCK:
> +		xs_reclassify_socket_vsock(sock);
> +		break;
>  	}
>  }
>  #else
> @@ -2743,7 +2771,7 @@ static struct rpc_xprt_ops xs_local_ops = {
>  	.reserve_xprt		= xprt_reserve_xprt,
>  	.release_xprt		= xs_tcp_release_xprt,
>  	.alloc_slot		= xprt_alloc_slot,
> -	.rpcbind		= xs_local_rpcbind,
> +	.rpcbind		= xs_dummy_rpcbind,
>  	.set_port		= xs_local_set_port,
>  	.connect		= xs_local_connect,
>  	.buf_alloc		= rpc_malloc,
> @@ -2836,6 +2864,10 @@ static int xs_init_anyaddr(const int family, struct sockaddr *sap)
>  		.sin6_family		= AF_INET6,
>  		.sin6_addr		= IN6ADDR_ANY_INIT,
>  	};
> +	static const struct sockaddr_vm svm = {
> +		.svm_family		= AF_VSOCK,
> +		.svm_cid		= VMADDR_CID_ANY,
> +	};
>  
>  	switch (family) {
>  	case AF_LOCAL:
> @@ -2846,6 +2878,9 @@ static int xs_init_anyaddr(const int family, struct sockaddr *sap)
>  	case AF_INET6:
>  		memcpy(sap, &sin6, sizeof(sin6));
>  		break;
> +	case AF_VSOCK:
> +		memcpy(sap, &svm, sizeof(svm));
> +		break;
>  	default:
>  		dprintk("RPC:       %s: Bad address family\n", __func__);
>  		return -EAFNOSUPPORT;
> @@ -3203,6 +3238,330 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
>  	return ret;
>  }
>  
> +#ifdef CONFIG_SUNRPC_XPRT_VSOCK
> +/**
> + * xs_vsock_state_change - callback to handle vsock socket state changes
> + * @sk: socket whose state has changed
> + *
> + */
> +static void xs_vsock_state_change(struct sock *sk)
> +{
> +	struct rpc_xprt *xprt;
> +
> +	read_lock_bh(&sk->sk_callback_lock);
> +	if (!(xprt = xprt_from_sock(sk)))
> +		goto out;
> +	dprintk("RPC:       %s client %p...\n", __func__, xprt);
> +	dprintk("RPC:       state %x conn %d dead %d zapped %d sk_shutdown %d\n",
> +			sk->sk_state, xprt_connected(xprt),
> +			sock_flag(sk, SOCK_DEAD),
> +			sock_flag(sk, SOCK_ZAPPED),
> +			sk->sk_shutdown);
> +
> +	trace_rpc_socket_state_change(xprt, sk->sk_socket);
> +
> +	switch (sk->sk_state) {
> +	case SS_CONNECTING:
> +		/* Do nothing */
> +		break;
> +
> +	case SS_CONNECTED:
> +		spin_lock(&xprt->transport_lock);
> +		if (!xprt_test_and_set_connected(xprt)) {
> +			xs_stream_reset_state(xprt, vsock_read_sock);
> +			xprt->connect_cookie++;
> +
> +			xprt_wake_pending_tasks(xprt, -EAGAIN);
> +		}
> +		spin_unlock(&xprt->transport_lock);
> +		break;
> +
> +	case SS_DISCONNECTING:
> +		/* TODO do we need to distinguish between various shutdown (client-side/server-side)? */
> +		/* The client initiated a shutdown of the socket */
> +		xprt->connect_cookie++;
> +		xprt->reestablish_timeout = 0;
> +		set_bit(XPRT_CLOSING, &xprt->state);
> +		smp_mb__before_atomic();
> +		clear_bit(XPRT_CONNECTED, &xprt->state);
> +		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
> +		smp_mb__after_atomic();
> +		break;
> +
> +	case SS_UNCONNECTED:
> +		xs_sock_mark_closed(xprt);
> +		break;
> +	}
> +
> + out:
> +	read_unlock_bh(&sk->sk_callback_lock);
> +}
> +
> +/**
> + * xs_vsock_error_report - callback to handle vsock socket state errors
> + * @sk: socket
> + *
> + * Note: we don't call sock_error() since there may be a rpc_task
> + * using the socket, and so we don't want to clear sk->sk_err.
> + */
> +static void xs_vsock_error_report(struct sock *sk)
> +{
> +	struct rpc_xprt *xprt;
> +	int err;
> +
> +	read_lock_bh(&sk->sk_callback_lock);
> +	if (!(xprt = xprt_from_sock(sk)))
> +		goto out;
> +
> +	err = -sk->sk_err;
> +	if (err == 0)
> +		goto out;
> +	/* Is this a reset event? */
> +	if (sk->sk_state == SS_UNCONNECTED)
> +		xs_sock_mark_closed(xprt);
> +	dprintk("RPC:       %s client %p, error=%d...\n",
> +			__func__, xprt, -err);
> +	trace_rpc_socket_error(xprt, sk->sk_socket, err);
> +	xprt_wake_pending_tasks(xprt, err);
> + out:
> +	read_unlock_bh(&sk->sk_callback_lock);
> +}

Hmm ok...so we have this to avoid some TCP specific stuff in
xs_error_report, I guess?

I wonder if AF_LOCAL transport should be using the function above,
rather than xs_error_report? If so, maybe we should rename:

    xs_error_report -> xs_tcp_error_report
    xs_vsock_error_report -> xs_stream_error_report

Might be good to do that cleanup first as a preparatory patch.

> +
> +/**
> + * xs_vsock_finish_connecting - initialize and connect socket
> + */
> +static int xs_vsock_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
> +{
> +	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
> +	int ret = -ENOTCONN;
> +
> +	if (!transport->inet) {
> +		struct sock *sk = sock->sk;
> +
> +		write_lock_bh(&sk->sk_callback_lock);
> +
> +		xs_save_old_callbacks(transport, sk);
> +
> +		sk->sk_user_data = xprt;
> +		sk->sk_data_ready = xs_data_ready;
> +		sk->sk_state_change = xs_vsock_state_change;
> +		sk->sk_write_space = xs_tcp_write_space;

Might should rename xs_tcp_write_space to xs_stream_write_space?

> +		sk->sk_error_report = xs_vsock_error_report;
> +		sk->sk_allocation = GFP_ATOMIC;

Why GFP_ATOMIC here? The other finish routines use GFP_NOIO.

> +
> +		xprt_clear_connected(xprt);
> +
> +		/* Reset to new socket */
> +		transport->sock = sock;
> +		transport->inet = sk;
> +
> +		write_unlock_bh(&sk->sk_callback_lock);
> +	}
> +
> +	if (!xprt_bound(xprt))
> +		goto out;
> +
> +	xs_set_memalloc(xprt);
> +
> +	/* Tell the socket layer to start connecting... */
> +	xprt->stat.connect_count++;
> +	xprt->stat.connect_start = jiffies;
> +	ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
> +	switch (ret) {
> +	case 0:
> +		xs_set_srcport(transport, sock);
> +	case -EINPROGRESS:
> +		/* SYN_SENT! */
> +		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
> +			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
> +	}
> +out:
> +	return ret;
> +}
> +
> +/**
> + * xs_vsock_setup_socket - create a vsock socket and connect to a remote endpoint
> + *
> + * Invoked by a work queue tasklet.
> + */
> +static void xs_vsock_setup_socket(struct work_struct *work)
> +{
> +	struct sock_xprt *transport =
> +		container_of(work, struct sock_xprt, connect_worker.work);
> +	struct socket *sock = transport->sock;
> +	struct rpc_xprt *xprt = &transport->xprt;
> +	int status = -EIO;
> +
> +	if (!sock) {
> +		sock = xs_create_sock(xprt, transport,
> +				xs_addr(xprt)->sa_family, SOCK_STREAM,
> +				0, true);
> +		if (IS_ERR(sock)) {
> +			status = PTR_ERR(sock);
> +			goto out;
> +		}
> +	}
> +
> +	dprintk("RPC:       worker connecting xprt %p via %s to "
> +				"%s (port %s)\n", xprt,
> +			xprt->address_strings[RPC_DISPLAY_PROTO],
> +			xprt->address_strings[RPC_DISPLAY_ADDR],
> +			xprt->address_strings[RPC_DISPLAY_PORT]);
> +
> +	status = xs_vsock_finish_connecting(xprt, sock);
> +	trace_rpc_socket_connect(xprt, sock, status);
> +	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
> +			xprt, -status, xprt_connected(xprt),
> +			sock->sk->sk_state);
> +	switch (status) {
> +	default:
> +		printk("%s: connect returned unhandled error %d\n",
> +			__func__, status);
> +	case -EADDRNOTAVAIL:
> +		/* We're probably in TIME_WAIT. Get rid of existing socket,
> +		 * and retry
> +		 */
> +		xs_tcp_force_close(xprt);
> +		break;
> +	case 0:
> +	case -EINPROGRESS:
> +	case -EALREADY:
> +		xprt_unlock_connect(xprt, transport);
> +		xprt_clear_connecting(xprt);
> +		return;
> +	case -EINVAL:
> +		/* Happens, for instance, if the user specified a link
> +		 * local IPv6 address without a scope-id.
> +		 */
> +	case -ECONNREFUSED:
> +	case -ECONNRESET:
> +	case -ENETUNREACH:
> +	case -EADDRINUSE:
> +	case -ENOBUFS:
> +		/* retry with existing socket, after a delay */
> +		xs_tcp_force_close(xprt);
> +		goto out;
> +	}
> +	status = -EAGAIN;
> +out:
> +	xprt_unlock_connect(xprt, transport);
> +	xprt_clear_connecting(xprt);
> +	xprt_wake_pending_tasks(xprt, status);
> +}
> +
> +/**
> + * xs_vsock_print_stats - display vsock socket-specifc stats
> + * @xprt: rpc_xprt struct containing statistics
> + * @seq: output file
> + *
> + */
> +static void xs_vsock_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
> +{
> +	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
> +	long idle_time = 0;
> +
> +	if (xprt_connected(xprt))
> +		idle_time = (long)(jiffies - xprt->last_used) / HZ;
> +
> +	seq_printf(seq, "\txprt:\tvsock %u %lu %lu %lu %ld %lu %lu %lu "
> +			"%llu %llu %lu %llu %llu\n",
> +			transport->srcport,
> +			xprt->stat.bind_count,
> +			xprt->stat.connect_count,
> +			xprt->stat.connect_time,
> +			idle_time,
> +			xprt->stat.sends,
> +			xprt->stat.recvs,
> +			xprt->stat.bad_xids,
> +			xprt->stat.req_u,
> +			xprt->stat.bklog_u,
> +			xprt->stat.max_slots,
> +			xprt->stat.sending_u,
> +			xprt->stat.pending_u);
> +}
> +
> +static struct rpc_xprt_ops xs_vsock_ops = {
> +	.reserve_xprt		= xprt_reserve_xprt,
> +	.release_xprt		= xs_tcp_release_xprt,
> +	.alloc_slot		= xprt_lock_and_alloc_slot,
> +	.rpcbind		= xs_dummy_rpcbind,
> +	.set_port		= xs_set_port,
> +	.connect		= xs_connect,
> +	.buf_alloc		= rpc_malloc,
> +	.buf_free		= rpc_free,
> +	.send_request		= xs_tcp_send_request,
> +	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
> +	.close			= xs_tcp_shutdown,
> +	.destroy		= xs_destroy,
> +	.print_stats		= xs_vsock_print_stats,
> +};
> +
> +static const struct rpc_timeout xs_vsock_default_timeout = {
> +	.to_initval = 60 * HZ,
> +	.to_maxval = 60 * HZ,
> +	.to_retries = 2,
> +};
> +
> +/**
> + * xs_setup_vsock - Set up transport to use a vsock socket
> + * @args: rpc transport creation arguments
> + *
> + */
> +static struct rpc_xprt *xs_setup_vsock(struct xprt_create *args)
> +{
> +	struct sockaddr_vm *addr = (struct sockaddr_vm *)args->dstaddr;
> +	struct sock_xprt *transport;
> +	struct rpc_xprt *xprt;
> +	struct rpc_xprt *ret;
> +
> +	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
> +			     xprt_max_tcp_slot_table_entries);
> +	if (IS_ERR(xprt))
> +		return xprt;
> +	transport = container_of(xprt, struct sock_xprt, xprt);
> +
> +	xprt->prot = 0;
> +	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
> +	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
> +
> +	xprt->bind_timeout = XS_BIND_TO;
> +	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
> +	xprt->idle_timeout = XS_IDLE_DISC_TO;
> +
> +	xprt->ops = &xs_vsock_ops;
> +	xprt->timeout = &xs_vsock_default_timeout;
> +
> +	INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
> +	INIT_DELAYED_WORK(&transport->connect_worker, xs_vsock_setup_socket);
> +
> +	switch (addr->svm_family) {
> +	case AF_VSOCK:
> +		if (addr->svm_port == 0) {
> +			dprintk("RPC:       autobind not supported with AF_VSOCK\n");
> +			ret = ERR_PTR(-EINVAL);
> +			goto out_err;
> +		}
> +		xprt_set_bound(xprt);
> +		xs_format_peer_addresses(xprt, "vsock", "vsock" /* TODO register official netid? */);
> +		break;
> +	default:
> +		ret = ERR_PTR(-EAFNOSUPPORT);
> +		goto out_err;
> +	}
> +
> +	dprintk("RPC:       set up xprt to %s (port %s) via AF_VSOCK\n",
> +		xprt->address_strings[RPC_DISPLAY_ADDR],
> +		xprt->address_strings[RPC_DISPLAY_PORT]);
> +
> +	if (try_module_get(THIS_MODULE))
> +		return xprt;
> +	ret = ERR_PTR(-EINVAL);
> +out_err:
> +	xs_xprt_free(xprt);
> +	return ret;
> +}
> +#endif
> +
>  static struct xprt_class	xs_local_transport = {
>  	.list		= LIST_HEAD_INIT(xs_local_transport.list),
>  	.name		= "named UNIX socket",
> @@ -3235,6 +3594,16 @@ static struct xprt_class	xs_bc_tcp_transport = {
>  	.setup		= xs_setup_bc_tcp,
>  };
>  
> +#ifdef CONFIG_SUNRPC_XPRT_VSOCK
> +static struct xprt_class	xs_vsock_transport = {
> +	.list		= LIST_HEAD_INIT(xs_vsock_transport.list),
> +	.name		= "vsock",
> +	.owner		= THIS_MODULE,
> +	.ident		= XPRT_TRANSPORT_VSOCK,
> +	.setup		= xs_setup_vsock,
> +};
> +#endif
> +
>  /**
>   * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
>   *
> @@ -3250,6 +3619,9 @@ int init_socket_xprt(void)
>  	xprt_register_transport(&xs_udp_transport);
>  	xprt_register_transport(&xs_tcp_transport);
>  	xprt_register_transport(&xs_bc_tcp_transport);
> +#ifdef CONFIG_SUNRPC_XPRT_VSOCK
> +	xprt_register_transport(&xs_vsock_transport);
> +#endif
>  
>  	return 0;
>  }
> @@ -3271,6 +3643,9 @@ void cleanup_socket_xprt(void)
>  	xprt_unregister_transport(&xs_udp_transport);
>  	xprt_unregister_transport(&xs_tcp_transport);
>  	xprt_unregister_transport(&xs_bc_tcp_transport);
> +#ifdef CONFIG_SUNRPC_XPRT_VSOCK
> +	xprt_unregister_transport(&xs_vsock_transport);
> +#endif
>  }
>  
>  static int param_set_uint_minmax(const char *val,
Stefan Hajnoczi Nov. 14, 2017, 4:45 p.m. UTC | #2
On Tue, Nov 07, 2017 at 08:46:14AM -0500, Jeff Layton wrote:
> On Fri, 2017-06-30 at 14:23 +0100, Stefan Hajnoczi wrote:
> > +/**
> > + * xs_vsock_error_report - callback to handle vsock socket state errors
> > + * @sk: socket
> > + *
> > + * Note: we don't call sock_error() since there may be a rpc_task
> > + * using the socket, and so we don't want to clear sk->sk_err.
> > + */
> > +static void xs_vsock_error_report(struct sock *sk)
> > +{
> > +	struct rpc_xprt *xprt;
> > +	int err;
> > +
> > +	read_lock_bh(&sk->sk_callback_lock);
> > +	if (!(xprt = xprt_from_sock(sk)))
> > +		goto out;
> > +
> > +	err = -sk->sk_err;
> > +	if (err == 0)
> > +		goto out;
> > +	/* Is this a reset event? */
> > +	if (sk->sk_state == SS_UNCONNECTED)
> > +		xs_sock_mark_closed(xprt);
> > +	dprintk("RPC:       %s client %p, error=%d...\n",
> > +			__func__, xprt, -err);
> > +	trace_rpc_socket_error(xprt, sk->sk_socket, err);
> > +	xprt_wake_pending_tasks(xprt, err);
> > + out:
> > +	read_unlock_bh(&sk->sk_callback_lock);
> > +}
> 
> Hmm ok...so we have this to avoid some TCP specific stuff in
> xs_error_report, I guess?
> 
> I wonder if AF_LOCAL transport should be using the function above,
> rather than xs_error_report? If so, maybe we should rename:
> 
>     xs_error_report -> xs_tcp_error_report
>     xs_vsock_error_report -> xs_stream_error_report
> 
> Might be good to do that cleanup first as a preparatory patch.

AF_LOCAL's use of xs_error_report() is fine because AF_UNIX uses TCP_*
constants for sk->sk_state.

VSOCK has now switched to TCP_* sk_state constants in Dave Miller's
net-next tree.  I made that change for unrelated reasons, but it means
v4 of this patch series will share the xs_error_report() function with
TCP and AF_LOCAL.

> > +
> > +/**
> > + * xs_vsock_finish_connecting - initialize and connect socket
> > + */
> > +static int xs_vsock_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
> > +{
> > +	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
> > +	int ret = -ENOTCONN;
> > +
> > +	if (!transport->inet) {
> > +		struct sock *sk = sock->sk;
> > +
> > +		write_lock_bh(&sk->sk_callback_lock);
> > +
> > +		xs_save_old_callbacks(transport, sk);
> > +
> > +		sk->sk_user_data = xprt;
> > +		sk->sk_data_ready = xs_data_ready;
> > +		sk->sk_state_change = xs_vsock_state_change;
> > +		sk->sk_write_space = xs_tcp_write_space;
> 
> Might should rename xs_tcp_write_space to xs_stream_write_space?

Yes, will fix in v4.

> > +		sk->sk_error_report = xs_vsock_error_report;
> > +		sk->sk_allocation = GFP_ATOMIC;
> 
> Why GFP_ATOMIC here? The other finish routines use GFP_NOIO.

I missed the memo on commit c2126157ea3c4f72b315749e0c07a1b162a2fe2b
("SUNRPC: Allow sockets to do GFP_NOIO allocations").  It is now okay to
use GFP_NOIO.

Thank you, will fix in v4.
diff mbox

Patch

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index eab1c74..c038d8a 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -170,6 +170,7 @@  enum xprt_transports {
 	XPRT_TRANSPORT_RDMA	= 256,
 	XPRT_TRANSPORT_BC_RDMA	= XPRT_TRANSPORT_RDMA | XPRT_TRANSPORT_BC,
 	XPRT_TRANSPORT_LOCAL	= 257,
+	XPRT_TRANSPORT_VSOCK	= 258,
 };
 
 struct rpc_xprt {
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index fd0c8b1..cc343b91 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -46,6 +46,7 @@ 
 #include <net/checksum.h>
 #include <net/udp.h>
 #include <net/tcp.h>
+#include <net/af_vsock.h>
 
 #include <trace/events/sunrpc.h>
 
@@ -271,6 +272,13 @@  static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
 		sin6 = xs_addr_in6(xprt);
 		snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
 		break;
+	case AF_VSOCK:
+		(void)rpc_ntop(sap, buf, sizeof(buf));
+		xprt->address_strings[RPC_DISPLAY_ADDR] =
+						kstrdup(buf, GFP_KERNEL);
+		snprintf(buf, sizeof(buf), "%08x",
+			 ((struct sockaddr_vm *)sap)->svm_cid);
+		break;
 	default:
 		BUG();
 	}
@@ -1881,21 +1889,30 @@  static int xs_bind(struct sock_xprt *transport, struct socket *sock)
 			nloop++;
 	} while (err == -EADDRINUSE && nloop != 2);
 
-	if (myaddr.ss_family == AF_INET)
+	switch (myaddr.ss_family) {
+	case AF_INET:
 		dprintk("RPC:       %s %pI4:%u: %s (%d)\n", __func__,
 				&((struct sockaddr_in *)&myaddr)->sin_addr,
 				port, err ? "failed" : "ok", err);
-	else
+		break;
+	case AF_INET6:
 		dprintk("RPC:       %s %pI6:%u: %s (%d)\n", __func__,
 				&((struct sockaddr_in6 *)&myaddr)->sin6_addr,
 				port, err ? "failed" : "ok", err);
+		break;
+	case AF_VSOCK:
+		dprintk("RPC:       %s %u:%u: %s (%d)\n", __func__,
+				((struct sockaddr_vm *)&myaddr)->svm_cid,
+				port, err ? "failed" : "ok", err);
+		break;
+	}
 	return err;
 }
 
 /*
- * We don't support autobind on AF_LOCAL sockets
+ * We don't support autobind on AF_LOCAL and AF_VSOCK sockets
  */
-static void xs_local_rpcbind(struct rpc_task *task)
+static void xs_dummy_rpcbind(struct rpc_task *task)
 {
 	xprt_set_bound(task->tk_xprt);
 }
@@ -1932,6 +1949,14 @@  static inline void xs_reclassify_socket6(struct socket *sock)
 		&xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
 }
 
+static inline void xs_reclassify_socket_vsock(struct socket *sock)
+{
+	struct sock *sk = sock->sk;
+
+	sock_lock_init_class_and_name(sk, "slock-AF_VSOCK-RPC",
+		&xs_slock_key[1], "sk_lock-AF_VSOCK-RPC", &xs_key[1]);
+}
+
 static inline void xs_reclassify_socket(int family, struct socket *sock)
 {
 	if (WARN_ON_ONCE(!sock_allow_reclassification(sock->sk)))
@@ -1947,6 +1972,9 @@  static inline void xs_reclassify_socket(int family, struct socket *sock)
 	case AF_INET6:
 		xs_reclassify_socket6(sock);
 		break;
+	case AF_VSOCK:
+		xs_reclassify_socket_vsock(sock);
+		break;
 	}
 }
 #else
@@ -2743,7 +2771,7 @@  static struct rpc_xprt_ops xs_local_ops = {
 	.reserve_xprt		= xprt_reserve_xprt,
 	.release_xprt		= xs_tcp_release_xprt,
 	.alloc_slot		= xprt_alloc_slot,
-	.rpcbind		= xs_local_rpcbind,
+	.rpcbind		= xs_dummy_rpcbind,
 	.set_port		= xs_local_set_port,
 	.connect		= xs_local_connect,
 	.buf_alloc		= rpc_malloc,
@@ -2836,6 +2864,10 @@  static int xs_init_anyaddr(const int family, struct sockaddr *sap)
 		.sin6_family		= AF_INET6,
 		.sin6_addr		= IN6ADDR_ANY_INIT,
 	};
+	static const struct sockaddr_vm svm = {
+		.svm_family		= AF_VSOCK,
+		.svm_cid		= VMADDR_CID_ANY,
+	};
 
 	switch (family) {
 	case AF_LOCAL:
@@ -2846,6 +2878,9 @@  static int xs_init_anyaddr(const int family, struct sockaddr *sap)
 	case AF_INET6:
 		memcpy(sap, &sin6, sizeof(sin6));
 		break;
+	case AF_VSOCK:
+		memcpy(sap, &svm, sizeof(svm));
+		break;
 	default:
 		dprintk("RPC:       %s: Bad address family\n", __func__);
 		return -EAFNOSUPPORT;
@@ -3203,6 +3238,330 @@  static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
 	return ret;
 }
 
+#ifdef CONFIG_SUNRPC_XPRT_VSOCK
+/**
+ * xs_vsock_state_change - callback to handle vsock socket state changes
+ * @sk: socket whose state has changed
+ *
+ */
+static void xs_vsock_state_change(struct sock *sk)
+{
+	struct rpc_xprt *xprt;
+
+	read_lock_bh(&sk->sk_callback_lock);
+	if (!(xprt = xprt_from_sock(sk)))
+		goto out;
+	dprintk("RPC:       %s client %p...\n", __func__, xprt);
+	dprintk("RPC:       state %x conn %d dead %d zapped %d sk_shutdown %d\n",
+			sk->sk_state, xprt_connected(xprt),
+			sock_flag(sk, SOCK_DEAD),
+			sock_flag(sk, SOCK_ZAPPED),
+			sk->sk_shutdown);
+
+	trace_rpc_socket_state_change(xprt, sk->sk_socket);
+
+	switch (sk->sk_state) {
+	case SS_CONNECTING:
+		/* Do nothing */
+		break;
+
+	case SS_CONNECTED:
+		spin_lock(&xprt->transport_lock);
+		if (!xprt_test_and_set_connected(xprt)) {
+			xs_stream_reset_state(xprt, vsock_read_sock);
+			xprt->connect_cookie++;
+
+			xprt_wake_pending_tasks(xprt, -EAGAIN);
+		}
+		spin_unlock(&xprt->transport_lock);
+		break;
+
+	case SS_DISCONNECTING:
+		/* TODO do we need to distinguish between various shutdown (client-side/server-side)? */
+		/* The client initiated a shutdown of the socket */
+		xprt->connect_cookie++;
+		xprt->reestablish_timeout = 0;
+		set_bit(XPRT_CLOSING, &xprt->state);
+		smp_mb__before_atomic();
+		clear_bit(XPRT_CONNECTED, &xprt->state);
+		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
+		smp_mb__after_atomic();
+		break;
+
+	case SS_UNCONNECTED:
+		xs_sock_mark_closed(xprt);
+		break;
+	}
+
+ out:
+	read_unlock_bh(&sk->sk_callback_lock);
+}
+
+/**
+ * xs_vsock_error_report - callback to handle vsock socket state errors
+ * @sk: socket
+ *
+ * Note: we don't call sock_error() since there may be a rpc_task
+ * using the socket, and so we don't want to clear sk->sk_err.
+ */
+static void xs_vsock_error_report(struct sock *sk)
+{
+	struct rpc_xprt *xprt;
+	int err;
+
+	read_lock_bh(&sk->sk_callback_lock);
+	if (!(xprt = xprt_from_sock(sk)))
+		goto out;
+
+	err = -sk->sk_err;
+	if (err == 0)
+		goto out;
+	/* Is this a reset event? */
+	if (sk->sk_state == SS_UNCONNECTED)
+		xs_sock_mark_closed(xprt);
+	dprintk("RPC:       %s client %p, error=%d...\n",
+			__func__, xprt, -err);
+	trace_rpc_socket_error(xprt, sk->sk_socket, err);
+	xprt_wake_pending_tasks(xprt, err);
+ out:
+	read_unlock_bh(&sk->sk_callback_lock);
+}
+
+/**
+ * xs_vsock_finish_connecting - initialize and connect socket
+ */
+static int xs_vsock_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
+{
+	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+	int ret = -ENOTCONN;
+
+	if (!transport->inet) {
+		struct sock *sk = sock->sk;
+
+		write_lock_bh(&sk->sk_callback_lock);
+
+		xs_save_old_callbacks(transport, sk);
+
+		sk->sk_user_data = xprt;
+		sk->sk_data_ready = xs_data_ready;
+		sk->sk_state_change = xs_vsock_state_change;
+		sk->sk_write_space = xs_tcp_write_space;
+		sk->sk_error_report = xs_vsock_error_report;
+		sk->sk_allocation = GFP_ATOMIC;
+
+		xprt_clear_connected(xprt);
+
+		/* Reset to new socket */
+		transport->sock = sock;
+		transport->inet = sk;
+
+		write_unlock_bh(&sk->sk_callback_lock);
+	}
+
+	if (!xprt_bound(xprt))
+		goto out;
+
+	xs_set_memalloc(xprt);
+
+	/* Tell the socket layer to start connecting... */
+	xprt->stat.connect_count++;
+	xprt->stat.connect_start = jiffies;
+	ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
+	switch (ret) {
+	case 0:
+		xs_set_srcport(transport, sock);
+	case -EINPROGRESS:
+		/* SYN_SENT! */
+		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
+			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
+	}
+out:
+	return ret;
+}
+
+/**
+ * xs_vsock_setup_socket - create a vsock socket and connect to a remote endpoint
+ *
+ * Invoked by a work queue tasklet.
+ */
+static void xs_vsock_setup_socket(struct work_struct *work)
+{
+	struct sock_xprt *transport =
+		container_of(work, struct sock_xprt, connect_worker.work);
+	struct socket *sock = transport->sock;
+	struct rpc_xprt *xprt = &transport->xprt;
+	int status = -EIO;
+
+	if (!sock) {
+		sock = xs_create_sock(xprt, transport,
+				xs_addr(xprt)->sa_family, SOCK_STREAM,
+				0, true);
+		if (IS_ERR(sock)) {
+			status = PTR_ERR(sock);
+			goto out;
+		}
+	}
+
+	dprintk("RPC:       worker connecting xprt %p via %s to "
+				"%s (port %s)\n", xprt,
+			xprt->address_strings[RPC_DISPLAY_PROTO],
+			xprt->address_strings[RPC_DISPLAY_ADDR],
+			xprt->address_strings[RPC_DISPLAY_PORT]);
+
+	status = xs_vsock_finish_connecting(xprt, sock);
+	trace_rpc_socket_connect(xprt, sock, status);
+	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
+			xprt, -status, xprt_connected(xprt),
+			sock->sk->sk_state);
+	switch (status) {
+	default:
+		printk("%s: connect returned unhandled error %d\n",
+			__func__, status);
+	case -EADDRNOTAVAIL:
+		/* We're probably in TIME_WAIT. Get rid of existing socket,
+		 * and retry
+		 */
+		xs_tcp_force_close(xprt);
+		break;
+	case 0:
+	case -EINPROGRESS:
+	case -EALREADY:
+		xprt_unlock_connect(xprt, transport);
+		xprt_clear_connecting(xprt);
+		return;
+	case -EINVAL:
+		/* Happens, for instance, if the user specified a link
+		 * local IPv6 address without a scope-id.
+		 */
+	case -ECONNREFUSED:
+	case -ECONNRESET:
+	case -ENETUNREACH:
+	case -EADDRINUSE:
+	case -ENOBUFS:
+		/* retry with existing socket, after a delay */
+		xs_tcp_force_close(xprt);
+		goto out;
+	}
+	status = -EAGAIN;
+out:
+	xprt_unlock_connect(xprt, transport);
+	xprt_clear_connecting(xprt);
+	xprt_wake_pending_tasks(xprt, status);
+}
+
+/**
+ * xs_vsock_print_stats - display vsock socket-specifc stats
+ * @xprt: rpc_xprt struct containing statistics
+ * @seq: output file
+ *
+ */
+static void xs_vsock_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
+{
+	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+	long idle_time = 0;
+
+	if (xprt_connected(xprt))
+		idle_time = (long)(jiffies - xprt->last_used) / HZ;
+
+	seq_printf(seq, "\txprt:\tvsock %u %lu %lu %lu %ld %lu %lu %lu "
+			"%llu %llu %lu %llu %llu\n",
+			transport->srcport,
+			xprt->stat.bind_count,
+			xprt->stat.connect_count,
+			xprt->stat.connect_time,
+			idle_time,
+			xprt->stat.sends,
+			xprt->stat.recvs,
+			xprt->stat.bad_xids,
+			xprt->stat.req_u,
+			xprt->stat.bklog_u,
+			xprt->stat.max_slots,
+			xprt->stat.sending_u,
+			xprt->stat.pending_u);
+}
+
+static struct rpc_xprt_ops xs_vsock_ops = {
+	.reserve_xprt		= xprt_reserve_xprt,
+	.release_xprt		= xs_tcp_release_xprt,
+	.alloc_slot		= xprt_lock_and_alloc_slot,
+	.rpcbind		= xs_dummy_rpcbind,
+	.set_port		= xs_set_port,
+	.connect		= xs_connect,
+	.buf_alloc		= rpc_malloc,
+	.buf_free		= rpc_free,
+	.send_request		= xs_tcp_send_request,
+	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
+	.close			= xs_tcp_shutdown,
+	.destroy		= xs_destroy,
+	.print_stats		= xs_vsock_print_stats,
+};
+
+static const struct rpc_timeout xs_vsock_default_timeout = {
+	.to_initval = 60 * HZ,
+	.to_maxval = 60 * HZ,
+	.to_retries = 2,
+};
+
+/**
+ * xs_setup_vsock - Set up transport to use a vsock socket
+ * @args: rpc transport creation arguments
+ *
+ */
+static struct rpc_xprt *xs_setup_vsock(struct xprt_create *args)
+{
+	struct sockaddr_vm *addr = (struct sockaddr_vm *)args->dstaddr;
+	struct sock_xprt *transport;
+	struct rpc_xprt *xprt;
+	struct rpc_xprt *ret;
+
+	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
+			     xprt_max_tcp_slot_table_entries);
+	if (IS_ERR(xprt))
+		return xprt;
+	transport = container_of(xprt, struct sock_xprt, xprt);
+
+	xprt->prot = 0;
+	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
+	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
+
+	xprt->bind_timeout = XS_BIND_TO;
+	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
+	xprt->idle_timeout = XS_IDLE_DISC_TO;
+
+	xprt->ops = &xs_vsock_ops;
+	xprt->timeout = &xs_vsock_default_timeout;
+
+	INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
+	INIT_DELAYED_WORK(&transport->connect_worker, xs_vsock_setup_socket);
+
+	switch (addr->svm_family) {
+	case AF_VSOCK:
+		if (addr->svm_port == 0) {
+			dprintk("RPC:       autobind not supported with AF_VSOCK\n");
+			ret = ERR_PTR(-EINVAL);
+			goto out_err;
+		}
+		xprt_set_bound(xprt);
+		xs_format_peer_addresses(xprt, "vsock", "vsock" /* TODO register official netid? */);
+		break;
+	default:
+		ret = ERR_PTR(-EAFNOSUPPORT);
+		goto out_err;
+	}
+
+	dprintk("RPC:       set up xprt to %s (port %s) via AF_VSOCK\n",
+		xprt->address_strings[RPC_DISPLAY_ADDR],
+		xprt->address_strings[RPC_DISPLAY_PORT]);
+
+	if (try_module_get(THIS_MODULE))
+		return xprt;
+	ret = ERR_PTR(-EINVAL);
+out_err:
+	xs_xprt_free(xprt);
+	return ret;
+}
+#endif
+
 static struct xprt_class	xs_local_transport = {
 	.list		= LIST_HEAD_INIT(xs_local_transport.list),
 	.name		= "named UNIX socket",
@@ -3235,6 +3594,16 @@  static struct xprt_class	xs_bc_tcp_transport = {
 	.setup		= xs_setup_bc_tcp,
 };
 
+#ifdef CONFIG_SUNRPC_XPRT_VSOCK
+static struct xprt_class	xs_vsock_transport = {
+	.list		= LIST_HEAD_INIT(xs_vsock_transport.list),
+	.name		= "vsock",
+	.owner		= THIS_MODULE,
+	.ident		= XPRT_TRANSPORT_VSOCK,
+	.setup		= xs_setup_vsock,
+};
+#endif
+
 /**
  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
  *
@@ -3250,6 +3619,9 @@  int init_socket_xprt(void)
 	xprt_register_transport(&xs_udp_transport);
 	xprt_register_transport(&xs_tcp_transport);
 	xprt_register_transport(&xs_bc_tcp_transport);
+#ifdef CONFIG_SUNRPC_XPRT_VSOCK
+	xprt_register_transport(&xs_vsock_transport);
+#endif
 
 	return 0;
 }
@@ -3271,6 +3643,9 @@  void cleanup_socket_xprt(void)
 	xprt_unregister_transport(&xs_udp_transport);
 	xprt_unregister_transport(&xs_tcp_transport);
 	xprt_unregister_transport(&xs_bc_tcp_transport);
+#ifdef CONFIG_SUNRPC_XPRT_VSOCK
+	xprt_unregister_transport(&xs_vsock_transport);
+#endif
 }
 
 static int param_set_uint_minmax(const char *val,