diff mbox series

[RFC,7/8] rxrpc: Implement splice-read for rxrpc calls

Message ID 165599097976.1827880.6339180782812070930.stgit@warthog.procyon.org.uk (mailing list archive)
State RFC
Delegated to: Netdev Maintainers
Headers show
Series rxrpc: Multiqueue, sendfile, splice and call security | expand

Checks

Context Check Description
netdev/tree_selection success Clearly marked for net, async
netdev/fixes_present fail Series targets non-next tree, but doesn't contain any Fixes tags
netdev/subject_prefix success Link
netdev/cover_letter success Series has a cover letter
netdev/patch_count success Link
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 8 this patch: 8
netdev/cc_maintainers warning 5 maintainers not CCed: rostedt@goodmis.org mingo@redhat.com pabeni@redhat.com kuba@kernel.org edumazet@google.com
netdev/build_clang success Errors and warnings before: 11 this patch: 11
netdev/module_param success Was 0 now: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 8 this patch: 8
netdev/checkpatch warning WARNING: function definition argument 'loff_t *' should also have an identifier name WARNING: function definition argument 'size_t' should also have an identifier name WARNING: function definition argument 'struct pipe_inode_info *' should also have an identifier name WARNING: function definition argument 'struct socket *' should also have an identifier name WARNING: function definition argument 'unsigned int' should also have an identifier name WARNING: labels should not be indented WARNING: line length of 81 exceeds 80 columns WARNING: line length of 83 exceeds 80 columns WARNING: line length of 84 exceeds 80 columns WARNING: line length of 85 exceeds 80 columns WARNING: line length of 86 exceeds 80 columns WARNING: line length of 88 exceeds 80 columns WARNING: line length of 90 exceeds 80 columns WARNING: line length of 93 exceeds 80 columns WARNING: line length of 97 exceeds 80 columns WARNING: networking block comments don't use an empty /* line, use /* Comment...
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0

Commit Message

David Howells June 23, 2022, 1:29 p.m. UTC
Implement the splice_read protocol operation for AF_RXRPC sockets.  This
allows the received data from a call to be spliced elsewhere.  The call
to be read from must be prespecified with setsockopt():

	setsockopt(client, SOL_RXRPC, RXRPC_SELECT_CALL_FOR_RECV,
		   &call_id, sizeof(call_id));

	while (count < datasize) {
		ret = splice(client, NULL, pipefd[1], NULL,
			     datasize - count, 0);
		OSERROR(ret, "splice");
		count += ret;
	}

The splice keeps going until the call ends or the output pipe is full (in
which case it returns a short read or EWOULDBLOCK).

The prespecified call ID can be cleared with:

	call_id = 0;
	ret = setsockopt(client, SOL_RXRPC, RXRPC_SELECT_CALL_FOR_RECV,
			 &call_id, sizeof(call_id));

or changed.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 include/trace/events/rxrpc.h |    3 +
 net/rxrpc/af_rxrpc.c         |    1 
 net/rxrpc/ar-internal.h      |    2 
 net/rxrpc/recvmsg.c          |  204 ++++++++++++++++++++++++++++++++++++++++--
 4 files changed, 200 insertions(+), 10 deletions(-)
diff mbox series

Patch

diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index b9b0b694b223..212e2b01fdd4 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -116,6 +116,9 @@ 
 	EM(rxrpc_recvmsg_full,			"FULL") \
 	EM(rxrpc_recvmsg_hole,			"HOLE") \
 	EM(rxrpc_recvmsg_next,			"NEXT") \
+	EM(rxrpc_recvmsg_splice,		"SPLC") \
+	EM(rxrpc_recvmsg_splice_full,		"SPFU") \
+	EM(rxrpc_recvmsg_splice_skb,		"SPSK") \
 	EM(rxrpc_recvmsg_requeue,		"REQU") \
 	EM(rxrpc_recvmsg_return,		"RETN") \
 	EM(rxrpc_recvmsg_terminal,		"TERM") \
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index 41420c456e77..bf2bb1b99890 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -1319,6 +1319,7 @@  static const struct proto_ops rxrpc_rpc_ops = {
 	.sendmsg	= rxrpc_sendmsg,
 	.sendpage	= rxrpc_sendpage,
 	.recvmsg	= rxrpc_recvmsg,
+	.splice_read	= rxrpc_splice_read,
 	.mmap		= sock_no_mmap,
 };
 
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index bec398c66341..526169effe89 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -1056,6 +1056,8 @@  bool rxrpc_call_completed(struct rxrpc_call *);
 bool __rxrpc_abort_call(const char *, struct rxrpc_call *, rxrpc_seq_t, u32, int);
 bool rxrpc_abort_call(const char *, struct rxrpc_call *, rxrpc_seq_t, u32, int);
 int rxrpc_recvmsg(struct socket *, struct msghdr *, size_t, int);
+ssize_t rxrpc_splice_read(struct socket *, loff_t *, struct pipe_inode_info *,
+			  size_t, unsigned int);
 
 /*
  * Abort a call due to a protocol error.
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 3fc6bf8b1ff2..3bbee5ae4c75 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -375,6 +375,8 @@  static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
  */
 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 			      struct msghdr *msg, struct iov_iter *iter,
+			      struct pipe_inode_info *pipe,
+			      unsigned int splice_flags,
 			      size_t len, int flags, size_t *_offset)
 {
 	struct rxrpc_skb_priv *sp;
@@ -444,17 +446,41 @@  static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 					    rx_pkt_offset, rx_pkt_len, 0);
 		}
 
+	try_another_transfer:
 		/* We have to handle short, empty and used-up DATA packets. */
 		remain = len - *_offset;
 		copy = rx_pkt_len;
 		if (copy > remain)
 			copy = remain;
 		if (copy > 0) {
-			ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
-						      copy);
-			if (ret2 < 0) {
-				ret = ret2;
-				goto out;
+			if (!pipe) {
+				ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset,
+							      iter, copy);
+				if (ret2 < 0) {
+					ret = ret2;
+					goto out;
+				}
+			} else {
+				if (!(sp->hdr.flags & RXRPC_LAST_PACKET))
+					splice_flags |= SPLICE_F_MORE;
+				else if (copy < rx_pkt_len)
+					splice_flags |= SPLICE_F_MORE;
+				else
+					splice_flags &= ~SPLICE_F_MORE;
+
+				ret2 = skb_splice_bits(skb, sock->sk, rx_pkt_offset,
+						       pipe, copy, splice_flags);
+				if (ret2 < 0) {
+					trace_rxrpc_recvmsg(call, rxrpc_recvmsg_splice_full, seq,
+							    rx_pkt_offset, rx_pkt_len, ret2);
+					if (ret2 == -EAGAIN)
+						ret2 = -EXFULL;
+					ret = ret2;
+					goto out;
+				}
+				trace_rxrpc_recvmsg(call, rxrpc_recvmsg_splice_skb, seq,
+						    rx_pkt_offset, rx_pkt_len, ret2);
+				copy = ret2;
 			}
 
 			/* handle piecemeal consumption of data packets */
@@ -463,14 +489,16 @@  static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 			*_offset += copy;
 		}
 
-		if (rx_pkt_len > 0) {
+		if (*_offset >= len) {
 			trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
 					    rx_pkt_offset, rx_pkt_len, 0);
-			ASSERTCMP(*_offset, ==, len);
 			ret = 0;
 			break;
 		}
 
+		if (rx_pkt_len > 0)
+			goto try_another_transfer;
+
 		/* The whole packet has been transferred. */
 		if (!(flags & MSG_PEEK))
 			rxrpc_rotate_rx_window(call);
@@ -679,8 +707,8 @@  int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 	case RXRPC_CALL_CLIENT_RECV_REPLY:
 	case RXRPC_CALL_SERVER_RECV_REQUEST:
 	case RXRPC_CALL_SERVER_ACK_REQUEST:
-		ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
-					 flags, &copied);
+		ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter,
+					 NULL, 0, len, flags, &copied);
 		if (ret == -EAGAIN)
 			ret = 0;
 
@@ -731,6 +759,7 @@  int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 		list_add(&call->recvmsg_link, &rx->recvmsg_q);
 		write_unlock_bh(&rx->recvmsg_lock);
 		trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
+		rx->sk.sk_data_ready(&rx->sk);
 	} else {
 		rxrpc_put_call(call, rxrpc_call_put);
 	}
@@ -748,6 +777,161 @@  int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 	goto error_trace;
 }
 
+/*
+ * Read data from the call specified by setsockopt(RXRPC_SELECT_CALL_FOR_RECV)
+ * and splice it into a pipe.
+ */
+ssize_t rxrpc_splice_read(struct socket *sock, loff_t *ppos,
+			  struct pipe_inode_info *pipe, size_t len,
+			  unsigned int splice_flags)
+{
+	struct rxrpc_call *call;
+	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
+	unsigned int flags = 0;
+	ssize_t ret;
+	size_t copied = 0, partial;
+	long timeo;
+
+	DEFINE_WAIT(wait);
+
+	_enter("%zu", len);
+
+	if (unlikely(!ppos))
+		return -ESPIPE;
+
+	if (splice_flags & SPLICE_F_NONBLOCK)
+		flags |= MSG_DONTWAIT;
+	timeo = sock_rcvtimeo(&rx->sk, splice_flags & SPLICE_F_NONBLOCK);
+
+	lock_sock(&rx->sk);
+	call = rx->selected_recv_call;
+	if (!call) {
+		release_sock(&rx->sk);
+		return -EBADSLT;
+	}
+
+	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_splice, 0, 0, 0, 0);
+
+	rxrpc_get_call(call, rxrpc_call_got);
+	release_sock(&rx->sk);
+
+try_again:
+	if (list_empty(&call->recvmsg_link)) {
+		if (timeo == 0) {
+			ret = -EWOULDBLOCK;
+			goto error;
+		}
+
+		/* Wait for something to happen */
+		for (;;) {
+			prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
+						  TASK_INTERRUPTIBLE);
+			ret = sock_error(&rx->sk);
+			if (ret)
+				goto wait_error;
+
+			if (!list_empty(&call->recvmsg_link))
+				break;
+
+			if (signal_pending(current))
+				goto wait_interrupted;
+			trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait, 0, timeo, 0, 0);
+			timeo = schedule_timeout(timeo);
+			ret = -ETIMEDOUT;
+			if (timeo == 0)
+				goto wait_error;
+		}
+		finish_wait(sk_sleep(&rx->sk), &wait);
+	}
+
+	write_lock_bh(&rx->recvmsg_lock);
+	if (list_empty(&call->recvmsg_link)) {
+		write_unlock_bh(&rx->recvmsg_lock);
+		goto try_again;
+	}
+	list_del_init(&call->recvmsg_link);
+	write_unlock_bh(&rx->recvmsg_lock);
+
+	rxrpc_put_call(call, rxrpc_call_put); /* Drop extra ref inherited from the list */
+
+	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
+
+	/* We're going to drop the socket lock, so we need to lock the call
+	 * against interference by sendmsg.
+	 */
+	if (!mutex_trylock(&call->user_mutex)) {
+		ret = -EWOULDBLOCK;
+		if (splice_flags & SPLICE_F_NONBLOCK)
+			goto error_requeue_call_no_lock;
+		ret = -ERESTARTSYS;
+		if (mutex_lock_interruptible(&call->user_mutex) < 0)
+			goto error_requeue_call_no_lock;
+	}
+
+	switch (READ_ONCE(call->state)) {
+	case RXRPC_CALL_CLIENT_RECV_REPLY:
+	case RXRPC_CALL_SERVER_RECV_REQUEST:
+	case RXRPC_CALL_SERVER_ACK_REQUEST:
+		partial = 0;
+		ret = rxrpc_recvmsg_data(sock, call, NULL, NULL,
+					 pipe, splice_flags, len, flags, &partial);
+		copied += partial;
+		len -= partial;
+		if (ret == -EAGAIN) {
+			if (call->state != RXRPC_CALL_COMPLETE &&
+			    len > 0) {
+				mutex_unlock(&call->user_mutex);
+				goto try_again;
+			}
+			ret = 0;
+		}
+		if (ret == -EXFULL)
+			ret = 0;
+
+		if (after(call->rx_top, call->rx_hard_ack) &&
+		    call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
+			rxrpc_notify_socket(call);
+		break;
+	default:
+		ret = 0;
+		break;
+	}
+
+	if (ret < 0)
+		goto error_unlock_call;
+
+	if (call->state == RXRPC_CALL_COMPLETE)
+		/* recvmsg() must be called to get the termination state. */
+		goto error_requeue_call;
+
+	ret = copied;
+
+error_unlock_call:
+	mutex_unlock(&call->user_mutex);
+error:
+	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
+	rxrpc_put_call(call, rxrpc_call_put);
+	_leave(" = %zd", ret);
+	return ret;
+
+error_requeue_call:
+	mutex_unlock(&call->user_mutex);
+error_requeue_call_no_lock:
+	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
+	write_lock_bh(&rx->recvmsg_lock);
+	rxrpc_get_call(call, rxrpc_call_got);
+	list_add(&call->recvmsg_link, &rx->recvmsg_q);
+	write_unlock_bh(&rx->recvmsg_lock);
+	rx->sk.sk_data_ready(&rx->sk);
+	goto error;
+
+wait_interrupted:
+	ret = sock_intr_errno(timeo);
+wait_error:
+	finish_wait(sk_sleep(&rx->sk), &wait);
+	goto error;
+}
+
 /**
  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
  * @sock: The socket that the call exists on
@@ -787,7 +971,7 @@  int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 	case RXRPC_CALL_CLIENT_RECV_REPLY:
 	case RXRPC_CALL_SERVER_RECV_REQUEST:
 	case RXRPC_CALL_SERVER_ACK_REQUEST:
-		ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
+		ret = rxrpc_recvmsg_data(sock, call, NULL, iter, NULL, 0,
 					 *_len, 0, &offset);
 		*_len -= offset;
 		if (ret < 0)