diff mbox series

[RFC,3/8] rxrpc: Allow multiple AF_RXRPC sockets to be bound together to form queues

Message ID 165599095279.1827880.8011666375060763290.stgit@warthog.procyon.org.uk (mailing list archive)
State RFC
Delegated to: Netdev Maintainers
Headers show
Series rxrpc: Multiqueue, sendfile, splice and call security | expand

Checks

Context Check Description
netdev/tree_selection success Clearly marked for net, async
netdev/fixes_present fail Series targets non-next tree, but doesn't contain any Fixes tags
netdev/subject_prefix success Link
netdev/cover_letter success Series has a cover letter
netdev/patch_count success Link
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 13 this patch: 13
netdev/cc_maintainers warning 3 maintainers not CCed: pabeni@redhat.com kuba@kernel.org edumazet@google.com
netdev/build_clang success Errors and warnings before: 8 this patch: 8
netdev/module_param success Was 0 now: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 13 this patch: 13
netdev/checkpatch warning WARNING: line length of 100 exceeds 80 columns WARNING: networking block comments don't use an empty /* line, use /* Comment...
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0

Commit Message

David Howells June 23, 2022, 1:29 p.m. UTC
Allow one rxrpc socket to be bound onto another to form a queue.  This is
done by allocating a socket and setting it up, then allocating more sockets
and using a sockopt to bind them together:

	fd1 = socket(AF_RXRPC, SOCK_DGRAM, IPPROTO_IPV6);
	bind(fd1, &address);
	listen(fd1, depth);

	fd2 = socket(AF_RXRPC, SOCK_DGRAM, IPPROTO_IPV6);
	setsockopt(fd2, SOL_RXRPC, RXRPC_BIND_CHANNEL, &fd1, sizeof(fd1));

>From this point:

 (1) Each channel must be charged with user call IDs separately.  Each
     channel has a separate call ID space.  A call ID on one channel cannot
     be used to send a message on another channel.  The same call ID on
     different channels refers to different calls.

 (2) An incoming call will get bound to the next channel that does a
     recvmsg() on an empty queue.  All further incoming packets relating to
     that call will go to that channel exclusively.

 (3) An outgoung client call made on a particular channel will be bound to
     that channel.

 (4) If a channel is closed, all calls bound to that channel will be
     aborted.

 (5) Unaccepted incoming calls are held in a queue common to all channels
     and is of the depth set by listen().  Each time recvmsg() is called on
     a channel, if that channel has at least one charge available, it will
     pop an incoming call from that queue, bind the next charge to it,
     attach it to the socket and push it onto the tail of the recvmsg
     queue.

     This can be used as a mechanism to distribute calls between a thread
     pool and a mechanism to control the arrival of new calls on any
     particular channel.  New calls can and will only be collected if the
     channel is charged.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 include/uapi/linux/rxrpc.h |    1 +
 net/rxrpc/af_rxrpc.c       |   79 +++++++++++++++++++++++++++++++++++++++++++-
 net/rxrpc/call_accept.c    |    5 ---
 net/rxrpc/call_object.c    |    2 +
 4 files changed, 80 insertions(+), 7 deletions(-)
diff mbox series

Patch

diff --git a/include/uapi/linux/rxrpc.h b/include/uapi/linux/rxrpc.h
index 8f8dc7a937a4..811923643751 100644
--- a/include/uapi/linux/rxrpc.h
+++ b/include/uapi/linux/rxrpc.h
@@ -36,6 +36,7 @@  struct sockaddr_rxrpc {
 #define RXRPC_MIN_SECURITY_LEVEL	4	/* minimum security level */
 #define RXRPC_UPGRADEABLE_SERVICE	5	/* Upgrade service[0] -> service[1] */
 #define RXRPC_SUPPORTED_CMSG		6	/* Get highest supported control message type */
+#define RXRPC_BIND_CHANNEL		7	/* Bind a socket as an additional recvmsg channel */
 
 /*
  * RxRPC control messages
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index 703e10969d2f..6b89a5a969e0 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -733,6 +733,71 @@  int rxrpc_sock_set_upgradeable_service(struct sock *sk, unsigned int val[2])
 }
 EXPORT_SYMBOL(rxrpc_sock_set_upgradeable_service);
 
+/*
+ * Bind this socket to another socket that's already set up and listening to
+ * use this as an additional channel for receiving new service calls.
+ */
+static int rxrpc_bind_channel(struct rxrpc_sock *rx2, int fd)
+{
+	struct rxrpc_service *b;
+	struct rxrpc_sock *rx1;
+	struct socket *sock1;
+	unsigned long *call_id_backlog;
+	int ret;
+
+	if (rx2->sk.sk_state != RXRPC_UNBOUND)
+		return -EISCONN;
+	if (rx2->service || rx2->exclusive)
+		return -EINVAL;
+
+	sock1 = sockfd_lookup(fd, &ret);
+	if (!sock1)
+		return ret;
+	rx1 = rxrpc_sk(sock1->sk);
+
+	ret = -EINVAL;
+	if (rx1 == rx2 || rx2->family != rx1->family ||
+	    sock_net(&rx2->sk) != sock_net(&rx1->sk))
+		goto error;
+
+	ret = -EISCONN;
+	if (rx1->sk.sk_state != RXRPC_SERVER_LISTENING)
+		goto error;
+
+	ret = -ENOMEM;
+	call_id_backlog = kcalloc(RXRPC_BACKLOG_MAX,
+				  sizeof(call_id_backlog[0]),
+				  GFP_KERNEL);
+	if (!call_id_backlog)
+		goto error;
+
+	lock_sock_nested(&rx1->sk, 1);
+
+	ret = -EISCONN;
+	if (rx1->sk.sk_state != RXRPC_SERVER_LISTENING)
+		goto error_unlock;
+
+	b = rx1->service;
+	refcount_inc(&b->ref);
+	refcount_inc(&b->active);
+	rx2->service		= b;
+	rx2->srx		= rx1->srx;
+	rx2->call_id_backlog	= call_id_backlog;
+	rx2->min_sec_level	= rx1->min_sec_level;
+	rx2->local		= rxrpc_get_local(rx1->local);
+	atomic_inc(&rx1->local->active_users);
+	rx2->sk.sk_state	= RXRPC_SERVER_LISTENING;
+	call_id_backlog = NULL;
+	ret = 0;
+
+error_unlock:
+	release_sock(&rx1->sk);
+	kfree(call_id_backlog);
+error:
+	fput(sock1->file);
+	return ret;
+}
+
 /*
  * set RxRPC socket options
  */
@@ -742,7 +807,7 @@  static int rxrpc_setsockopt(struct socket *sock, int level, int optname,
 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
 	unsigned int min_sec_level;
 	u16 service_upgrade[2];
-	int ret;
+	int ret, fd;
 
 	_enter(",%d,%d,,%d", level, optname, optlen);
 
@@ -817,6 +882,18 @@  static int rxrpc_setsockopt(struct socket *sock, int level, int optname,
 				goto error;
 			goto success;
 
+		case RXRPC_BIND_CHANNEL:
+			ret = -EINVAL;
+			if (optlen != sizeof(fd))
+				goto error;
+			ret = -EFAULT;
+			if (copy_from_sockptr(&fd, optval, sizeof(fd)) != 0)
+				goto error;
+			ret = rxrpc_bind_channel(rx, fd);
+			if (ret < 0)
+				goto error;
+			goto success;
+
 		default:
 			break;
 		}
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 3cba4dacb8d4..68760a0657a1 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -296,8 +296,6 @@  void rxrpc_deactivate_service(struct rxrpc_sock *rx)
 	if (!refcount_dec_and_test(&rx->service->active))
 		return;
 
-	kdebug("-- deactivate --");
-
 	/* Now that active is 0, make sure that there aren't any incoming calls
 	 * being set up before we clear the preallocation buffers.
 	 */
@@ -335,12 +333,9 @@  void rxrpc_deactivate_service(struct rxrpc_sock *rx)
 
 	head = b->call_backlog_head;
 	tail = b->call_backlog_tail;
-	kdebug("backlog %x %x", head, tail);
 	while (CIRC_CNT(head, tail, size) > 0) {
 		struct rxrpc_call *call = b->call_backlog[tail];
 
-		kdebug("discard c=%08x", call->debug_id);
-
 		trace_rxrpc_call(call->debug_id, rxrpc_call_discard,
 				 refcount_read(&call->ref),
 				 NULL, (const void *)call->user_call_ID);
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index e90b205a6c0f..4ee98ac689f9 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -712,7 +712,7 @@  void rxrpc_cleanup_call(struct rxrpc_call *call)
 
 	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
 	if (WARN_ON(!test_bit(RXRPC_CALL_RELEASED, &call->flags))) {
-		kdebug("### UNRELEASED c=%08x", call->debug_id);
+		pr_warn("### UNRELEASED c=%08x", call->debug_id);
 		return;
 	}