diff mbox series

[RFC,v5,02/19] af_vsock: separate wait data loop

Message ID 20210218053637.1066959-1-arseny.krasnov@kaspersky.com (mailing list archive)
State RFC
Delegated to: Netdev Maintainers
Headers show
Series virtio/vsock: introduce SOCK_SEQPACKET support | expand

Checks

Context Check Description
netdev/cover_letter success Link
netdev/fixes_present success Link
netdev/patch_count fail Series longer than 15 patches
netdev/tree_selection success Guessed tree name to be net-next
netdev/subject_prefix success Link
netdev/cc_maintainers warning 1 maintainers not CCed: alex.popov@linux.com
netdev/source_inline success Was 0 now: 0
netdev/verify_signedoff success Link
netdev/module_param success Was 0 now: 0
netdev/build_32bit success Errors and warnings before: 0 this patch: 0
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/verify_fixes success Link
netdev/checkpatch warning WARNING: line length of 81 exceeds 80 columns WARNING: line length of 87 exceeds 80 columns
netdev/build_allmodconfig_warn success Errors and warnings before: 0 this patch: 0
netdev/header_inline success Link
netdev/stable success Stable not CCed

Commit Message

Arseny Krasnov Feb. 18, 2021, 5:36 a.m. UTC
This moves wait loop for data to dedicated function, because later
it will be used by SEQPACKET data receive loop.

Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
---
 net/vmw_vsock/af_vsock.c | 155 +++++++++++++++++++++------------------
 1 file changed, 83 insertions(+), 72 deletions(-)

Comments

Stefano Garzarella Feb. 22, 2021, 11:29 a.m. UTC | #1
On Thu, Feb 18, 2021 at 08:36:33AM +0300, Arseny Krasnov wrote:
>This moves wait loop for data to dedicated function, because later
>it will be used by SEQPACKET data receive loop.

The patch LGTM, maybe just add a line in the commit message with 
something like this:

     While moving the code around, let's update an old comment.

Whit that fixed:

Reviewed-by: Stefano Garzarella <sgarzare@redhat.com>

>
>Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>---
> net/vmw_vsock/af_vsock.c | 155 +++++++++++++++++++++------------------
> 1 file changed, 83 insertions(+), 72 deletions(-)
>
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index 656370e11707..6cf7bb977aa1 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -1832,6 +1832,68 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> 	return err;
> }
>
>+static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
>+			   long timeout,
>+			   struct vsock_transport_recv_notify_data *recv_data,
>+			   size_t target)
>+{
>+	const struct vsock_transport *transport;
>+	struct vsock_sock *vsk;
>+	s64 data;
>+	int err;
>+
>+	vsk = vsock_sk(sk);
>+	err = 0;
>+	transport = vsk->transport;
>+	prepare_to_wait(sk_sleep(sk), wait, TASK_INTERRUPTIBLE);
>+
>+	while ((data = vsock_stream_has_data(vsk)) == 0) {
>+		if (sk->sk_err != 0 ||
>+		    (sk->sk_shutdown & RCV_SHUTDOWN) ||
>+		    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
>+			break;
>+		}
>+
>+		/* Don't wait for non-blocking sockets. */
>+		if (timeout == 0) {
>+			err = -EAGAIN;
>+			break;
>+		}
>+
>+		if (recv_data) {
>+			err = transport->notify_recv_pre_block(vsk, target, recv_data);
>+			if (err < 0)
>+				break;
>+		}
>+
>+		release_sock(sk);
>+		timeout = schedule_timeout(timeout);
>+		lock_sock(sk);
>+
>+		if (signal_pending(current)) {
>+			err = sock_intr_errno(timeout);
>+			break;
>+		} else if (timeout == 0) {
>+			err = -EAGAIN;
>+			break;
>+		}
>+	}
>+
>+	finish_wait(sk_sleep(sk), wait);
>+
>+	if (err)
>+		return err;
>+
>+	/* Internal transport error when checking for available
>+	 * data. XXX This should be changed to a connection
>+	 * reset in a later change.
>+	 */
>+	if (data < 0)
>+		return -ENOMEM;
>+
>+	return data;
>+}
>+
> static int
> vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> 			  int flags)
>@@ -1911,85 +1973,34 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>
>
> 	while (1) {
>-		s64 ready;
>-
>-		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
>-		ready = vsock_stream_has_data(vsk);
>-
>-		if (ready == 0) {
>-			if (sk->sk_err != 0 ||
>-			    (sk->sk_shutdown & RCV_SHUTDOWN) ||
>-			    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
>-				finish_wait(sk_sleep(sk), &wait);
>-				break;
>-			}
>-			/* Don't wait for non-blocking sockets. */
>-			if (timeout == 0) {
>-				err = -EAGAIN;
>-				finish_wait(sk_sleep(sk), &wait);
>-				break;
>-			}
>+		ssize_t read;
>
>-			err = transport->notify_recv_pre_block(
>-					vsk, target, &recv_data);
>-			if (err < 0) {
>-				finish_wait(sk_sleep(sk), &wait);
>-				break;
>-			}
>-			release_sock(sk);
>-			timeout = schedule_timeout(timeout);
>-			lock_sock(sk);
>+		err = vsock_wait_data(sk, &wait, timeout, &recv_data, target);
>+		if (err <= 0)
>+			break;
>
>-			if (signal_pending(current)) {
>-				err = sock_intr_errno(timeout);
>-				finish_wait(sk_sleep(sk), &wait);
>-				break;
>-			} else if (timeout == 0) {
>-				err = -EAGAIN;
>-				finish_wait(sk_sleep(sk), &wait);
>-				break;
>-			}
>-		} else {
>-			ssize_t read;
>-
>-			finish_wait(sk_sleep(sk), &wait);
>-
>-			if (ready < 0) {
>-				/* Invalid queue pair content. XXX This should
>-				* be changed to a connection reset in a later
>-				* change.
>-				*/
>-
>-				err = -ENOMEM;
>-				goto out;
>-			}
>-
>-			err = transport->notify_recv_pre_dequeue(
>-					vsk, target, &recv_data);
>-			if (err < 0)
>-				break;
>+		err = transport->notify_recv_pre_dequeue(vsk, target,
>+							 &recv_data);
>+		if (err < 0)
>+			break;
>
>-			read = transport->stream_dequeue(
>-					vsk, msg,
>-					len - copied, flags);
>-			if (read < 0) {
>-				err = -ENOMEM;
>-				break;
>-			}
>+		read = transport->stream_dequeue(vsk, msg, len - copied, flags);
>+		if (read < 0) {
>+			err = -ENOMEM;
>+			break;
>+		}
>
>-			copied += read;
>+		copied += read;
>
>-			err = transport->notify_recv_post_dequeue(
>-					vsk, target, read,
>-					!(flags & MSG_PEEK), &recv_data);
>-			if (err < 0)
>-				goto out;
>+		err = transport->notify_recv_post_dequeue(vsk, target, read,
>+						!(flags & MSG_PEEK), &recv_data);
>+		if (err < 0)
>+			goto out;
>
>-			if (read >= target || flags & MSG_PEEK)
>-				break;
>+		if (read >= target || flags & MSG_PEEK)
>+			break;
>
>-			target -= read;
>-		}
>+		target -= read;
> 	}
>
> 	if (sk->sk_err)
>-- 
>2.25.1
>
Jorgen Hansen Feb. 25, 2021, 2:24 p.m. UTC | #2
> On 18 Feb 2021, at 06:36, Arseny Krasnov <arseny.krasnov@kaspersky.com> wrote:
> 
> This moves wait loop for data to dedicated function, because later
> it will be used by SEQPACKET data receive loop.
> 
> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
> ---
> net/vmw_vsock/af_vsock.c | 155 +++++++++++++++++++++------------------
> 1 file changed, 83 insertions(+), 72 deletions(-)
> 
> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
> index 656370e11707..6cf7bb977aa1 100644
> --- a/net/vmw_vsock/af_vsock.c
> +++ b/net/vmw_vsock/af_vsock.c
> @@ -1832,6 +1832,68 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> 	return err;
> }
> 
> +static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
> +			   long timeout,
> +			   struct vsock_transport_recv_notify_data *recv_data,
> +			   size_t target)
> +{
> +	const struct vsock_transport *transport;
> +	struct vsock_sock *vsk;
> +	s64 data;
> +	int err;
> +
> +	vsk = vsock_sk(sk);
> +	err = 0;
> +	transport = vsk->transport;
> +	prepare_to_wait(sk_sleep(sk), wait, TASK_INTERRUPTIBLE);
> +
> +	while ((data = vsock_stream_has_data(vsk)) == 0) {

In the original code, the prepare_to_wait() is called for each iteration of the while loop. In this
version, it is only called once. So if we do multiple iterations, the thread would be in the
TASK_RUNNING state, and subsequent schedule_timeout() will return immediately. So
looks like the prepare_to_wait() should be move here, in case we have a spurious wake_up.

> +		if (sk->sk_err != 0 ||
> +		    (sk->sk_shutdown & RCV_SHUTDOWN) ||
> +		    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
> +			break;
> +		}
> +
> +		/* Don't wait for non-blocking sockets. */
> +		if (timeout == 0) {
> +			err = -EAGAIN;
> +			break;
> +		}
> +
> +		if (recv_data) {
> +			err = transport->notify_recv_pre_block(vsk, target, recv_data);
> +			if (err < 0)
> +				break;
> +		}
> +
> +		release_sock(sk);
> +		timeout = schedule_timeout(timeout);
> +		lock_sock(sk);
> +
> +		if (signal_pending(current)) {
> +			err = sock_intr_errno(timeout);
> +			break;
> +		} else if (timeout == 0) {
> +			err = -EAGAIN;
> +			break;
> +		}
> +	}
> +
> +	finish_wait(sk_sleep(sk), wait);
> +
> +	if (err)
> +		return err;
> +
> +	/* Internal transport error when checking for available
> +	 * data. XXX This should be changed to a connection
> +	 * reset in a later change.
> +	 */
> +	if (data < 0)
> +		return -ENOMEM;
> +
> +	return data;
> +}
> +
> static int
> vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> 			  int flags)
> @@ -1911,85 +1973,34 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> 
> 
> 	while (1) {
> -		s64 ready;
> -
> -		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
> -		ready = vsock_stream_has_data(vsk);
> -
> -		if (ready == 0) {
> -			if (sk->sk_err != 0 ||
> -			    (sk->sk_shutdown & RCV_SHUTDOWN) ||
> -			    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
> -				finish_wait(sk_sleep(sk), &wait);
> -				break;
> -			}
> -			/* Don't wait for non-blocking sockets. */
> -			if (timeout == 0) {
> -				err = -EAGAIN;
> -				finish_wait(sk_sleep(sk), &wait);
> -				break;
> -			}
> +		ssize_t read;
> 
> -			err = transport->notify_recv_pre_block(
> -					vsk, target, &recv_data);
> -			if (err < 0) {
> -				finish_wait(sk_sleep(sk), &wait);
> -				break;
> -			}
> -			release_sock(sk);
> -			timeout = schedule_timeout(timeout);
> -			lock_sock(sk);
> +		err = vsock_wait_data(sk, &wait, timeout, &recv_data, target);
> +		if (err <= 0)
> +			break;
> 
> -			if (signal_pending(current)) {
> -				err = sock_intr_errno(timeout);
> -				finish_wait(sk_sleep(sk), &wait);
> -				break;
> -			} else if (timeout == 0) {
> -				err = -EAGAIN;
> -				finish_wait(sk_sleep(sk), &wait);
> -				break;
> -			}
> -		} else {
> -			ssize_t read;
> -
> -			finish_wait(sk_sleep(sk), &wait);
> -
> -			if (ready < 0) {
> -				/* Invalid queue pair content. XXX This should
> -				* be changed to a connection reset in a later
> -				* change.
> -				*/
> -
> -				err = -ENOMEM;
> -				goto out;
> -			}
> -
> -			err = transport->notify_recv_pre_dequeue(
> -					vsk, target, &recv_data);
> -			if (err < 0)
> -				break;
> +		err = transport->notify_recv_pre_dequeue(vsk, target,
> +							 &recv_data);
> +		if (err < 0)
> +			break;
> 
> -			read = transport->stream_dequeue(
> -					vsk, msg,
> -					len - copied, flags);
> -			if (read < 0) {
> -				err = -ENOMEM;
> -				break;
> -			}
> +		read = transport->stream_dequeue(vsk, msg, len - copied, flags);
> +		if (read < 0) {
> +			err = -ENOMEM;
> +			break;
> +		}
> 
> -			copied += read;
> +		copied += read;
> 
> -			err = transport->notify_recv_post_dequeue(
> -					vsk, target, read,
> -					!(flags & MSG_PEEK), &recv_data);
> -			if (err < 0)
> -				goto out;
> +		err = transport->notify_recv_post_dequeue(vsk, target, read,
> +						!(flags & MSG_PEEK), &recv_data);
> +		if (err < 0)
> +			goto out;
> 
> -			if (read >= target || flags & MSG_PEEK)
> -				break;
> +		if (read >= target || flags & MSG_PEEK)
> +			break;
> 
> -			target -= read;
> -		}
> +		target -= read;
> 	}
> 
> 	if (sk->sk_err)
> -- 
> 2.25.1
>
Arseny Krasnov Feb. 25, 2021, 5:01 p.m. UTC | #3
On 25.02.2021 17:24, Jorgen Hansen wrote:
>> On 18 Feb 2021, at 06:36, Arseny Krasnov <arseny.krasnov@kaspersky.com> wrote:
>>
>> This moves wait loop for data to dedicated function, because later
>> it will be used by SEQPACKET data receive loop.
>>
>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>> ---
>> net/vmw_vsock/af_vsock.c | 155 +++++++++++++++++++++------------------
>> 1 file changed, 83 insertions(+), 72 deletions(-)
>>
>> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>> index 656370e11707..6cf7bb977aa1 100644
>> --- a/net/vmw_vsock/af_vsock.c
>> +++ b/net/vmw_vsock/af_vsock.c
>> @@ -1832,6 +1832,68 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
>> 	return err;
>> }
>>
>> +static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
>> +			   long timeout,
>> +			   struct vsock_transport_recv_notify_data *recv_data,
>> +			   size_t target)
>> +{
>> +	const struct vsock_transport *transport;
>> +	struct vsock_sock *vsk;
>> +	s64 data;
>> +	int err;
>> +
>> +	vsk = vsock_sk(sk);
>> +	err = 0;
>> +	transport = vsk->transport;
>> +	prepare_to_wait(sk_sleep(sk), wait, TASK_INTERRUPTIBLE);
>> +
>> +	while ((data = vsock_stream_has_data(vsk)) == 0) {
> In the original code, the prepare_to_wait() is called for each iteration of the while loop. In this
> version, it is only called once. So if we do multiple iterations, the thread would be in the
> TASK_RUNNING state, and subsequent schedule_timeout() will return immediately. So
> looks like the prepare_to_wait() should be move here, in case we have a spurious wake_up.
Thank you, i'll fix it
>
>> +		if (sk->sk_err != 0 ||
>> +		    (sk->sk_shutdown & RCV_SHUTDOWN) ||
>> +		    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
>> +			break;
>> +		}
>> +
>> +		/* Don't wait for non-blocking sockets. */
>> +		if (timeout == 0) {
>> +			err = -EAGAIN;
>> +			break;
>> +		}
>> +
>> +		if (recv_data) {
>> +			err = transport->notify_recv_pre_block(vsk, target, recv_data);
>> +			if (err < 0)
>> +				break;
>> +		}
>> +
>> +		release_sock(sk);
>> +		timeout = schedule_timeout(timeout);
>> +		lock_sock(sk);
>> +
>> +		if (signal_pending(current)) {
>> +			err = sock_intr_errno(timeout);
>> +			break;
>> +		} else if (timeout == 0) {
>> +			err = -EAGAIN;
>> +			break;
>> +		}
>> +	}
>> +
>> +	finish_wait(sk_sleep(sk), wait);
>> +
>> +	if (err)
>> +		return err;
>> +
>> +	/* Internal transport error when checking for available
>> +	 * data. XXX This should be changed to a connection
>> +	 * reset in a later change.
>> +	 */
>> +	if (data < 0)
>> +		return -ENOMEM;
>> +
>> +	return data;
>> +}
>> +
>> static int
>> vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>> 			  int flags)
>> @@ -1911,85 +1973,34 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>>
>>
>> 	while (1) {
>> -		s64 ready;
>> -
>> -		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
>> -		ready = vsock_stream_has_data(vsk);
>> -
>> -		if (ready == 0) {
>> -			if (sk->sk_err != 0 ||
>> -			    (sk->sk_shutdown & RCV_SHUTDOWN) ||
>> -			    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
>> -				finish_wait(sk_sleep(sk), &wait);
>> -				break;
>> -			}
>> -			/* Don't wait for non-blocking sockets. */
>> -			if (timeout == 0) {
>> -				err = -EAGAIN;
>> -				finish_wait(sk_sleep(sk), &wait);
>> -				break;
>> -			}
>> +		ssize_t read;
>>
>> -			err = transport->notify_recv_pre_block(
>> -					vsk, target, &recv_data);
>> -			if (err < 0) {
>> -				finish_wait(sk_sleep(sk), &wait);
>> -				break;
>> -			}
>> -			release_sock(sk);
>> -			timeout = schedule_timeout(timeout);
>> -			lock_sock(sk);
>> +		err = vsock_wait_data(sk, &wait, timeout, &recv_data, target);
>> +		if (err <= 0)
>> +			break;
>>
>> -			if (signal_pending(current)) {
>> -				err = sock_intr_errno(timeout);
>> -				finish_wait(sk_sleep(sk), &wait);
>> -				break;
>> -			} else if (timeout == 0) {
>> -				err = -EAGAIN;
>> -				finish_wait(sk_sleep(sk), &wait);
>> -				break;
>> -			}
>> -		} else {
>> -			ssize_t read;
>> -
>> -			finish_wait(sk_sleep(sk), &wait);
>> -
>> -			if (ready < 0) {
>> -				/* Invalid queue pair content. XXX This should
>> -				* be changed to a connection reset in a later
>> -				* change.
>> -				*/
>> -
>> -				err = -ENOMEM;
>> -				goto out;
>> -			}
>> -
>> -			err = transport->notify_recv_pre_dequeue(
>> -					vsk, target, &recv_data);
>> -			if (err < 0)
>> -				break;
>> +		err = transport->notify_recv_pre_dequeue(vsk, target,
>> +							 &recv_data);
>> +		if (err < 0)
>> +			break;
>>
>> -			read = transport->stream_dequeue(
>> -					vsk, msg,
>> -					len - copied, flags);
>> -			if (read < 0) {
>> -				err = -ENOMEM;
>> -				break;
>> -			}
>> +		read = transport->stream_dequeue(vsk, msg, len - copied, flags);
>> +		if (read < 0) {
>> +			err = -ENOMEM;
>> +			break;
>> +		}
>>
>> -			copied += read;
>> +		copied += read;
>>
>> -			err = transport->notify_recv_post_dequeue(
>> -					vsk, target, read,
>> -					!(flags & MSG_PEEK), &recv_data);
>> -			if (err < 0)
>> -				goto out;
>> +		err = transport->notify_recv_post_dequeue(vsk, target, read,
>> +						!(flags & MSG_PEEK), &recv_data);
>> +		if (err < 0)
>> +			goto out;
>>
>> -			if (read >= target || flags & MSG_PEEK)
>> -				break;
>> +		if (read >= target || flags & MSG_PEEK)
>> +			break;
>>
>> -			target -= read;
>> -		}
>> +		target -= read;
>> 	}
>>
>> 	if (sk->sk_err)
>> -- 
>> 2.25.1
>>
>
diff mbox series

Patch

diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 656370e11707..6cf7bb977aa1 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1832,6 +1832,68 @@  static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
 	return err;
 }
 
+static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
+			   long timeout,
+			   struct vsock_transport_recv_notify_data *recv_data,
+			   size_t target)
+{
+	const struct vsock_transport *transport;
+	struct vsock_sock *vsk;
+	s64 data;
+	int err;
+
+	vsk = vsock_sk(sk);
+	err = 0;
+	transport = vsk->transport;
+	prepare_to_wait(sk_sleep(sk), wait, TASK_INTERRUPTIBLE);
+
+	while ((data = vsock_stream_has_data(vsk)) == 0) {
+		if (sk->sk_err != 0 ||
+		    (sk->sk_shutdown & RCV_SHUTDOWN) ||
+		    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
+			break;
+		}
+
+		/* Don't wait for non-blocking sockets. */
+		if (timeout == 0) {
+			err = -EAGAIN;
+			break;
+		}
+
+		if (recv_data) {
+			err = transport->notify_recv_pre_block(vsk, target, recv_data);
+			if (err < 0)
+				break;
+		}
+
+		release_sock(sk);
+		timeout = schedule_timeout(timeout);
+		lock_sock(sk);
+
+		if (signal_pending(current)) {
+			err = sock_intr_errno(timeout);
+			break;
+		} else if (timeout == 0) {
+			err = -EAGAIN;
+			break;
+		}
+	}
+
+	finish_wait(sk_sleep(sk), wait);
+
+	if (err)
+		return err;
+
+	/* Internal transport error when checking for available
+	 * data. XXX This should be changed to a connection
+	 * reset in a later change.
+	 */
+	if (data < 0)
+		return -ENOMEM;
+
+	return data;
+}
+
 static int
 vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 			  int flags)
@@ -1911,85 +1973,34 @@  vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 
 
 	while (1) {
-		s64 ready;
-
-		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
-		ready = vsock_stream_has_data(vsk);
-
-		if (ready == 0) {
-			if (sk->sk_err != 0 ||
-			    (sk->sk_shutdown & RCV_SHUTDOWN) ||
-			    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
-				finish_wait(sk_sleep(sk), &wait);
-				break;
-			}
-			/* Don't wait for non-blocking sockets. */
-			if (timeout == 0) {
-				err = -EAGAIN;
-				finish_wait(sk_sleep(sk), &wait);
-				break;
-			}
+		ssize_t read;
 
-			err = transport->notify_recv_pre_block(
-					vsk, target, &recv_data);
-			if (err < 0) {
-				finish_wait(sk_sleep(sk), &wait);
-				break;
-			}
-			release_sock(sk);
-			timeout = schedule_timeout(timeout);
-			lock_sock(sk);
+		err = vsock_wait_data(sk, &wait, timeout, &recv_data, target);
+		if (err <= 0)
+			break;
 
-			if (signal_pending(current)) {
-				err = sock_intr_errno(timeout);
-				finish_wait(sk_sleep(sk), &wait);
-				break;
-			} else if (timeout == 0) {
-				err = -EAGAIN;
-				finish_wait(sk_sleep(sk), &wait);
-				break;
-			}
-		} else {
-			ssize_t read;
-
-			finish_wait(sk_sleep(sk), &wait);
-
-			if (ready < 0) {
-				/* Invalid queue pair content. XXX This should
-				* be changed to a connection reset in a later
-				* change.
-				*/
-
-				err = -ENOMEM;
-				goto out;
-			}
-
-			err = transport->notify_recv_pre_dequeue(
-					vsk, target, &recv_data);
-			if (err < 0)
-				break;
+		err = transport->notify_recv_pre_dequeue(vsk, target,
+							 &recv_data);
+		if (err < 0)
+			break;
 
-			read = transport->stream_dequeue(
-					vsk, msg,
-					len - copied, flags);
-			if (read < 0) {
-				err = -ENOMEM;
-				break;
-			}
+		read = transport->stream_dequeue(vsk, msg, len - copied, flags);
+		if (read < 0) {
+			err = -ENOMEM;
+			break;
+		}
 
-			copied += read;
+		copied += read;
 
-			err = transport->notify_recv_post_dequeue(
-					vsk, target, read,
-					!(flags & MSG_PEEK), &recv_data);
-			if (err < 0)
-				goto out;
+		err = transport->notify_recv_post_dequeue(vsk, target, read,
+						!(flags & MSG_PEEK), &recv_data);
+		if (err < 0)
+			goto out;
 
-			if (read >= target || flags & MSG_PEEK)
-				break;
+		if (read >= target || flags & MSG_PEEK)
+			break;
 
-			target -= read;
-		}
+		target -= read;
 	}
 
 	if (sk->sk_err)