diff mbox series

[net-next,01/13] rxrpc: Implement an in-kernel rxperf server for testing purposes

Message ID 166919798907.1256245.9711301613832280075.stgit@warthog.procyon.org.uk (mailing list archive)
State Not Applicable
Delegated to: Netdev Maintainers
Headers show
Series rxrpc: Increasing SACK size and moving away from softirq, part 2 | expand

Checks

Context Check Description
netdev/tree_selection success Clearly marked for net-next, async
netdev/apply fail Patch does not apply to net-next

Commit Message

David Howells Nov. 23, 2022, 10:06 a.m. UTC
Implement an in-kernel rxperf server to allow kernel-based rxrpc services
to be tested directly, unlike with AFS where they're accessed by the
fileserver when the latter decides it wants to.

This is implemented as a module that, if loaded, opens UDP port 7009
(afs3-rmtsys) and listens on it for incoming calls.  Calls can be generated
using the rxperf command shipped with OpenAFS, for example.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
---

 include/net/af_rxrpc.h |    1 
 net/rxrpc/Kconfig      |    7 +
 net/rxrpc/Makefile     |    3 
 net/rxrpc/rxperf.c     |  614 ++++++++++++++++++++++++++++++++++++++++++++++++
 net/rxrpc/server_key.c |   25 ++
 5 files changed, 650 insertions(+)
 create mode 100644 net/rxrpc/rxperf.c

Comments

Marc Dionne Nov. 24, 2022, 6:38 p.m. UTC | #1
On Wed, Nov 23, 2022 at 6:06 AM David Howells <dhowells@redhat.com> wrote:
>
> Implement an in-kernel rxperf server to allow kernel-based rxrpc services
> to be tested directly, unlike with AFS where they're accessed by the
> fileserver when the latter decides it wants to.
>
> This is implemented as a module that, if loaded, opens UDP port 7009
> (afs3-rmtsys) and listens on it for incoming calls.  Calls can be generated
> using the rxperf command shipped with OpenAFS, for example.
>
> Signed-off-by: David Howells <dhowells@redhat.com>
> cc: Marc Dionne <marc.dionne@auristor.com>
> cc: linux-afs@lists.infradead.org
> ---
>
>  include/net/af_rxrpc.h |    1
>  net/rxrpc/Kconfig      |    7 +
>  net/rxrpc/Makefile     |    3
>  net/rxrpc/rxperf.c     |  614 ++++++++++++++++++++++++++++++++++++++++++++++++
>  net/rxrpc/server_key.c |   25 ++
>  5 files changed, 650 insertions(+)
>  create mode 100644 net/rxrpc/rxperf.c
>
> diff --git a/include/net/af_rxrpc.h b/include/net/af_rxrpc.h
> index b69ca695935c..dc033f08191e 100644
> --- a/include/net/af_rxrpc.h
> +++ b/include/net/af_rxrpc.h
> @@ -71,5 +71,6 @@ void rxrpc_kernel_set_max_life(struct socket *, struct rxrpc_call *,
>                                unsigned long);
>
>  int rxrpc_sock_set_min_security_level(struct sock *sk, unsigned int val);
> +int rxrpc_sock_set_security_keyring(struct sock *, struct key *);
>
>  #endif /* _NET_RXRPC_H */
> diff --git a/net/rxrpc/Kconfig b/net/rxrpc/Kconfig
> index accd35c05577..7ae023b37a83 100644
> --- a/net/rxrpc/Kconfig
> +++ b/net/rxrpc/Kconfig
> @@ -58,4 +58,11 @@ config RXKAD
>
>           See Documentation/networking/rxrpc.rst.
>
> +config RXPERF
> +       tristate "RxRPC test service"
> +       help
> +         Provide an rxperf service tester.  This listens on UDP port 7009 for
> +         incoming calls from the rxperf program (an example of which can be
> +         found in OpenAFS).
> +
>  endif
> diff --git a/net/rxrpc/Makefile b/net/rxrpc/Makefile
> index fdeba488fc6e..79687477d93c 100644
> --- a/net/rxrpc/Makefile
> +++ b/net/rxrpc/Makefile
> @@ -36,3 +36,6 @@ rxrpc-y := \
>  rxrpc-$(CONFIG_PROC_FS) += proc.o
>  rxrpc-$(CONFIG_RXKAD) += rxkad.o
>  rxrpc-$(CONFIG_SYSCTL) += sysctl.o
> +
> +
> +obj-$(CONFIG_RXPERF) += rxperf.o
> diff --git a/net/rxrpc/rxperf.c b/net/rxrpc/rxperf.c
> new file mode 100644
> index 000000000000..7f8a1a186da8
> --- /dev/null
> +++ b/net/rxrpc/rxperf.c
> @@ -0,0 +1,614 @@
> +// SPDX-License-Identifier: GPL-2.0-or-later
> +/* In-kernel rxperf server for testing purposes.
> + *
> + * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
> + * Written by David Howells (dhowells@redhat.com)
> + */
> +
> +#define pr_fmt(fmt) "rxperf: " fmt
> +#include <linux/slab.h>
> +#include <net/sock.h>
> +//#include <net/netns/generic.h>
> +#include <net/af_rxrpc.h>
> +
> +#define RXPERF_PORT            7009
> +#define RX_PERF_SERVICE                147
> +#define RX_PERF_VERSION                3
> +#define RX_PERF_SEND           0
> +#define RX_PERF_RECV           1
> +#define RX_PERF_RPC            3
> +#define RX_PERF_FILE           4
> +#define RX_PERF_MAGIC_COOKIE   0x4711
> +
> +struct rxperf_proto_params {
> +       __be32          version;
> +       __be32          type;
> +       __be32          rsize;
> +       __be32          wsize;
> +} __packed;
> +
> +static const u8 rxperf_magic_cookie[] = { 0x00, 0x00, 0x47, 0x11 };
> +static const u8 secret[8] = { 0xa7, 0x83, 0x8a, 0xcb, 0xc7, 0x83, 0xec, 0x94 };
> +
> +enum rxperf_call_state {
> +       RXPERF_CALL_SV_AWAIT_PARAMS,    /* Server: Awaiting parameter block */
> +       RXPERF_CALL_SV_AWAIT_REQUEST,   /* Server: Awaiting request data */
> +       RXPERF_CALL_SV_REPLYING,        /* Server: Replying */
> +       RXPERF_CALL_SV_AWAIT_ACK,       /* Server: Awaiting final ACK */
> +       RXPERF_CALL_COMPLETE,           /* Completed or failed */
> +};
> +
> +struct rxperf_call {
> +       struct rxrpc_call       *rxcall;
> +       struct iov_iter         iter;
> +       struct kvec             kvec[1];
> +       struct work_struct      work;
> +       const char              *type;
> +       size_t                  iov_len;
> +       size_t                  req_len;        /* Size of request blob */
> +       size_t                  reply_len;      /* Size of reply blob */
> +       unsigned int            debug_id;
> +       unsigned int            operation_id;
> +       struct rxperf_proto_params params;
> +       __be32                  tmp[2];
> +       s32                     abort_code;
> +       enum rxperf_call_state  state;
> +       short                   error;
> +       unsigned short          unmarshal;
> +       u16                     service_id;
> +       int (*deliver)(struct rxperf_call *call);
> +       void (*processor)(struct work_struct *work);
> +};
> +
> +static struct socket *rxperf_socket;
> +static struct key *rxperf_sec_keyring; /* Ring of security/crypto keys */
> +static struct workqueue_struct *rxperf_workqueue;
> +
> +static void rxperf_deliver_to_call(struct work_struct *work);
> +static int rxperf_deliver_param_block(struct rxperf_call *call);
> +static int rxperf_deliver_request(struct rxperf_call *call);
> +static int rxperf_process_call(struct rxperf_call *call);
> +static void rxperf_charge_preallocation(struct work_struct *work);
> +
> +static DECLARE_WORK(rxperf_charge_preallocation_work,
> +                   rxperf_charge_preallocation);
> +
> +static inline void rxperf_set_call_state(struct rxperf_call *call,
> +                                        enum rxperf_call_state to)
> +{
> +       call->state = to;
> +}
> +
> +static inline void rxperf_set_call_complete(struct rxperf_call *call,
> +                                           int error, s32 remote_abort)
> +{
> +       if (call->state != RXPERF_CALL_COMPLETE) {
> +               call->abort_code = remote_abort;
> +               call->error = error;
> +               call->state = RXPERF_CALL_COMPLETE;
> +       }
> +}
> +
> +static void rxperf_rx_discard_new_call(struct rxrpc_call *rxcall,
> +                                      unsigned long user_call_ID)
> +{
> +       kfree((struct rxperf_call *)user_call_ID);
> +}
> +
> +static void rxperf_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
> +                              unsigned long user_call_ID)
> +{
> +       queue_work(rxperf_workqueue, &rxperf_charge_preallocation_work);
> +}
> +
> +static void rxperf_queue_call_work(struct rxperf_call *call)
> +{
> +       queue_work(rxperf_workqueue, &call->work);
> +}
> +
> +static void rxperf_notify_rx(struct sock *sk, struct rxrpc_call *rxcall,
> +                            unsigned long call_user_ID)
> +{
> +       struct rxperf_call *call = (struct rxperf_call *)call_user_ID;
> +
> +       if (call->state != RXPERF_CALL_COMPLETE)
> +               rxperf_queue_call_work(call);
> +}
> +
> +static void rxperf_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
> +{
> +       struct rxperf_call *call = (struct rxperf_call *)user_call_ID;
> +
> +       call->rxcall = rxcall;
> +}
> +
> +static void rxperf_notify_end_reply_tx(struct sock *sock,
> +                                      struct rxrpc_call *rxcall,
> +                                      unsigned long call_user_ID)
> +{
> +       rxperf_set_call_state((struct rxperf_call *)call_user_ID,
> +                             RXPERF_CALL_SV_AWAIT_ACK);
> +}
> +
> +/*
> + * Charge the incoming call preallocation.
> + */
> +static void rxperf_charge_preallocation(struct work_struct *work)
> +{
> +       struct rxperf_call *call;
> +
> +       for (;;) {
> +               call = kzalloc(sizeof(*call), GFP_KERNEL);
> +               if (!call)
> +                       break;
> +
> +               call->type              = "unset";
> +               call->debug_id          = atomic_inc_return(&rxrpc_debug_id);
> +               call->deliver           = rxperf_deliver_param_block;
> +               call->state             = RXPERF_CALL_SV_AWAIT_PARAMS;
> +               call->service_id        = RX_PERF_SERVICE;
> +               call->iov_len           = sizeof(call->params);
> +               call->kvec[0].iov_len   = sizeof(call->params);
> +               call->kvec[0].iov_base  = &call->params;
> +               iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
> +               INIT_WORK(&call->work, rxperf_deliver_to_call);
> +
> +               if (rxrpc_kernel_charge_accept(rxperf_socket,
> +                                              rxperf_notify_rx,
> +                                              rxperf_rx_attach,
> +                                              (unsigned long)call,
> +                                              GFP_KERNEL,
> +                                              call->debug_id) < 0)
> +                       break;
> +               call = NULL;
> +       }
> +
> +       kfree(call);
> +}
> +
> +/*
> + * Open an rxrpc socket and bind it to be a server for callback notifications
> + * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
> + */
> +static int rxperf_open_socket(void)
> +{
> +       struct sockaddr_rxrpc srx;
> +       struct socket *socket;
> +       int ret;
> +
> +       ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6,
> +                              &socket);
> +       if (ret < 0)
> +               goto error_1;
> +
> +       socket->sk->sk_allocation = GFP_NOFS;
> +
> +       /* bind the callback manager's address to make this a server socket */
> +       memset(&srx, 0, sizeof(srx));
> +       srx.srx_family                  = AF_RXRPC;
> +       srx.srx_service                 = RX_PERF_SERVICE;
> +       srx.transport_type              = SOCK_DGRAM;
> +       srx.transport_len               = sizeof(srx.transport.sin6);
> +       srx.transport.sin6.sin6_family  = AF_INET6;
> +       srx.transport.sin6.sin6_port    = htons(RXPERF_PORT);
> +
> +       ret = rxrpc_sock_set_min_security_level(socket->sk,
> +                                               RXRPC_SECURITY_ENCRYPT);
> +       if (ret < 0)
> +               goto error_2;
> +
> +       ret = rxrpc_sock_set_security_keyring(socket->sk, rxperf_sec_keyring);
> +
> +       ret = kernel_bind(socket, (struct sockaddr *)&srx, sizeof(srx));
> +       if (ret < 0)
> +               goto error_2;
> +
> +       rxrpc_kernel_new_call_notification(socket, rxperf_rx_new_call,
> +                                          rxperf_rx_discard_new_call);
> +
> +       ret = kernel_listen(socket, INT_MAX);
> +       if (ret < 0)
> +               goto error_2;
> +
> +       rxperf_socket = socket;
> +       rxperf_charge_preallocation(&rxperf_charge_preallocation_work);
> +       return 0;
> +
> +error_2:
> +       sock_release(socket);
> +error_1:
> +       pr_err("Can't set up rxperf socket: %d\n", ret);
> +       return ret;
> +}
> +
> +/*
> + * close the rxrpc socket rxperf was using
> + */
> +static void rxperf_close_socket(void)
> +{
> +       kernel_listen(rxperf_socket, 0);
> +       kernel_sock_shutdown(rxperf_socket, SHUT_RDWR);
> +       flush_workqueue(rxperf_workqueue);
> +       sock_release(rxperf_socket);
> +}
> +
> +/*
> + * Log remote abort codes that indicate that we have a protocol disagreement
> + * with the server.
> + */
> +static void rxperf_log_error(struct rxperf_call *call, s32 remote_abort)
> +{
> +       static int max = 0;
> +       const char *msg;
> +       int m;
> +
> +       switch (remote_abort) {
> +       case RX_EOF:             msg = "unexpected EOF";        break;
> +       case RXGEN_CC_MARSHAL:   msg = "client marshalling";    break;
> +       case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling";  break;
> +       case RXGEN_SS_MARSHAL:   msg = "server marshalling";    break;
> +       case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling";  break;
> +       case RXGEN_DECODE:       msg = "opcode decode";         break;
> +       case RXGEN_SS_XDRFREE:   msg = "server XDR cleanup";    break;
> +       case RXGEN_CC_XDRFREE:   msg = "client XDR cleanup";    break;
> +       case -32:                msg = "insufficient data";     break;
> +       default:
> +               return;
> +       }
> +
> +       m = max;
> +       if (m < 3) {
> +               max = m + 1;
> +               pr_info("Peer reported %s failure on %s\n", msg, call->type);
> +       }
> +}
> +
> +/*
> + * deliver messages to a call
> + */
> +static void rxperf_deliver_to_call(struct work_struct *work)
> +{
> +       struct rxperf_call *call = container_of(work, struct rxperf_call, work);
> +       enum rxperf_call_state state;
> +       u32 abort_code, remote_abort = 0;
> +       int ret;
> +
> +       if (call->state == RXPERF_CALL_COMPLETE)
> +               return;
> +
> +       while (state = call->state,
> +              state == RXPERF_CALL_SV_AWAIT_PARAMS ||
> +              state == RXPERF_CALL_SV_AWAIT_REQUEST ||
> +              state == RXPERF_CALL_SV_AWAIT_ACK
> +              ) {
> +               if (state == RXPERF_CALL_SV_AWAIT_ACK) {
> +                       if (!rxrpc_kernel_check_life(rxperf_socket, call->rxcall))
> +                               goto call_complete;
> +                       return;
> +               }
> +
> +               ret = call->deliver(call);
> +               if (ret == 0)
> +                       ret = rxperf_process_call(call);
> +
> +               switch (ret) {
> +               case 0:
> +                       continue;
> +               case -EINPROGRESS:
> +               case -EAGAIN:
> +                       return;
> +               case -ECONNABORTED:
> +                       rxperf_log_error(call, call->abort_code);
> +                       goto call_complete;
> +               case -EOPNOTSUPP:
> +                       abort_code = RXGEN_OPCODE;
> +                       rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
> +                                               abort_code, ret, "GOP");
> +                       goto call_complete;
> +               case -ENOTSUPP:
> +                       abort_code = RX_USER_ABORT;
> +                       rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
> +                                               abort_code, ret, "GUA");
> +                       goto call_complete;
> +               case -EIO:
> +                       pr_err("Call %u in bad state %u\n",
> +                              call->debug_id, call->state);
> +                       fallthrough;
> +               case -ENODATA:
> +               case -EBADMSG:
> +               case -EMSGSIZE:
> +               case -ENOMEM:
> +               case -EFAULT:
> +                       rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
> +                                               RXGEN_SS_UNMARSHAL, ret, "GUM");
> +                       goto call_complete;
> +               default:
> +                       rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
> +                                               RX_CALL_DEAD, ret, "GER");
> +                       goto call_complete;
> +               }
> +       }
> +
> +call_complete:
> +       rxperf_set_call_complete(call, ret, remote_abort);
> +       /* The call may have been requeued */
> +       rxrpc_kernel_end_call(rxperf_socket, call->rxcall);
> +       cancel_work(&call->work);
> +       kfree(call);
> +}
> +
> +/*
> + * Extract a piece of data from the received data socket buffers.
> + */
> +static int rxperf_extract_data(struct rxperf_call *call, bool want_more)
> +{
> +       u32 remote_abort = 0;
> +       int ret;
> +
> +       ret = rxrpc_kernel_recv_data(rxperf_socket, call->rxcall, &call->iter,
> +                                    &call->iov_len, want_more, &remote_abort,
> +                                    &call->service_id);
> +       pr_debug("Extract i=%zu l=%zu m=%u ret=%d\n",
> +                iov_iter_count(&call->iter), call->iov_len, want_more, ret);
> +       if (ret == 0 || ret == -EAGAIN)
> +               return ret;
> +
> +       if (ret == 1) {
> +               switch (call->state) {
> +               case RXPERF_CALL_SV_AWAIT_REQUEST:
> +                       rxperf_set_call_state(call, RXPERF_CALL_SV_REPLYING);
> +                       break;
> +               case RXPERF_CALL_COMPLETE:
> +                       pr_debug("premature completion %d", call->error);
> +                       return call->error;
> +               default:
> +                       break;
> +               }
> +               return 0;
> +       }
> +
> +       rxperf_set_call_complete(call, ret, remote_abort);
> +       return ret;
> +}
> +
> +/*
> + * Grab the operation ID from an incoming manager call.
> + */
> +static int rxperf_deliver_param_block(struct rxperf_call *call)
> +{
> +       u32 version;
> +       int ret;
> +
> +       /* Extract the parameter block */
> +       ret = rxperf_extract_data(call, true);
> +       if (ret < 0)
> +               return ret;
> +
> +       version                 = ntohl(call->params.version);
> +       call->operation_id      = ntohl(call->params.type);
> +       call->deliver           = rxperf_deliver_request;
> +
> +       if (version != RX_PERF_VERSION) {
> +               pr_info("Version mismatch %x\n", version);
> +               return -ENOTSUPP;
> +       }
> +
> +       switch (call->operation_id) {
> +       case RX_PERF_SEND:
> +               call->type = "send";
> +               call->reply_len = 0;
> +               call->iov_len = 4;      /* Expect req size */
> +               break;
> +       case RX_PERF_RECV:
> +               call->type = "recv";
> +               call->req_len = 0;
> +               call->iov_len = 4;      /* Expect reply size */
> +               break;
> +       case RX_PERF_RPC:
> +               call->type = "rpc";
> +               call->iov_len = 8;      /* Expect req size and reply size */
> +               break;
> +       case RX_PERF_FILE:
> +               call->type = "file";
> +               fallthrough;
> +       default:
> +               return -EOPNOTSUPP;
> +       }
> +
> +       rxperf_set_call_state(call, RXPERF_CALL_SV_AWAIT_REQUEST);
> +       return call->deliver(call);
> +}
> +
> +/*
> + * Deliver the request data.
> + */
> +static int rxperf_deliver_request(struct rxperf_call *call)
> +{
> +       int ret;
> +
> +       switch (call->unmarshal) {
> +       case 0:
> +               call->kvec[0].iov_len   = call->iov_len;
> +               call->kvec[0].iov_base  = call->tmp;
> +               iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
> +               call->unmarshal++;
> +               fallthrough;
> +       case 1:
> +               ret = rxperf_extract_data(call, true);
> +               if (ret < 0)
> +                       return ret;
> +
> +               switch (call->operation_id) {
> +               case RX_PERF_SEND:
> +                       call->type = "send";
> +                       call->req_len   = ntohl(call->tmp[0]);
> +                       call->reply_len = 0;
> +                       break;
> +               case RX_PERF_RECV:
> +                       call->type = "recv";
> +                       call->req_len = 0;
> +                       call->reply_len = ntohl(call->tmp[0]);
> +                       break;
> +               case RX_PERF_RPC:
> +                       call->type = "rpc";
> +                       call->req_len   = ntohl(call->tmp[0]);
> +                       call->reply_len = ntohl(call->tmp[1]);
> +                       break;
> +               default:
> +                       pr_info("Can't parse extra params\n");
> +                       return -EIO;
> +               }
> +
> +               pr_debug("CALL op=%s rq=%zx rp=%zx\n",
> +                        call->type, call->req_len, call->reply_len);
> +
> +               call->iov_len = call->req_len;
> +               iov_iter_discard(&call->iter, READ, call->req_len);
> +               call->unmarshal++;
> +               fallthrough;
> +       case 2:
> +               ret = rxperf_extract_data(call, false);
> +               if (ret < 0)
> +                       return ret;
> +               call->unmarshal++;
> +               fallthrough;
> +       default:
> +               return 0;
> +       }
> +}
> +
> +/*
> + * Process a call for which we've received the request.
> + */
> +static int rxperf_process_call(struct rxperf_call *call)
> +{
> +       struct msghdr msg = {};
> +       struct bio_vec bv[1];
> +       struct kvec iov[1];
> +       ssize_t n;
> +       size_t reply_len = call->reply_len, len;
> +
> +       rxrpc_kernel_set_tx_length(rxperf_socket, call->rxcall,
> +                                  reply_len + sizeof(rxperf_magic_cookie));
> +
> +       while (reply_len > 0) {
> +               len = min(reply_len, PAGE_SIZE);
> +               bv[0].bv_page   = ZERO_PAGE(0);
> +               bv[0].bv_offset = 0;
> +               bv[0].bv_len    = len;
> +               iov_iter_bvec(&msg.msg_iter, WRITE, bv, 1, len);
> +               msg.msg_flags = MSG_MORE;
> +               n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg,
> +                                          len, rxperf_notify_end_reply_tx);
> +               if (n < 0)
> +                       return n;
> +               if (n == 0)
> +                       return -EIO;
> +               reply_len -= n;
> +       }
> +
> +       len = sizeof(rxperf_magic_cookie);
> +       iov[0].iov_base = (void *)rxperf_magic_cookie;
> +       iov[0].iov_len  = len;
> +       iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
> +       msg.msg_flags = 0;
> +       n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, len,
> +                                  rxperf_notify_end_reply_tx);
> +       if (n >= 0)
> +               return 0; /* Success */
> +
> +       if (n == -ENOMEM)
> +               rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
> +                                       RXGEN_SS_MARSHAL, -ENOMEM, "GOM");
> +       return n;
> +}
> +
> +/*
> + * Add a key to the security keyring.
> + */
> +static int rxperf_add_key(struct key *keyring)
> +{
> +       key_ref_t kref;
> +       int ret;
> +
> +       kref = key_create_or_update(make_key_ref(keyring, true),
> +                                   "rxrpc_s",
> +                                   __stringify(RX_PERF_SERVICE) ":2",
> +                                   secret,
> +                                   sizeof(secret),
> +                                   KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH
> +                                   | KEY_USR_VIEW,
> +                                   KEY_ALLOC_NOT_IN_QUOTA);
> +
> +       if (IS_ERR(kref)) {
> +               pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref));
> +               return PTR_ERR(kref);
> +       }
> +
> +       ret = key_link(keyring, key_ref_to_ptr(kref));
> +       if (ret < 0)
> +               pr_err("Can't link rxperf server key: %d\n", ret);
> +       key_ref_put(kref);
> +       return ret;
> +}
> +
> +/*
> + * Initialise the rxperf server.
> + */
> +static int __init rxperf_init(void)
> +{
> +       struct key *keyring;
> +       int ret = -ENOMEM;
> +
> +       pr_info("Server registering\n");
> +
> +       rxperf_workqueue = alloc_workqueue("rxperf", 0, 0);
> +       if (!rxperf_workqueue)
> +               goto error_workqueue;
> +
> +       keyring = keyring_alloc("rxperf_server",
> +                               GLOBAL_ROOT_UID, GLOBAL_ROOT_GID, current_cred(),
> +                               KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH |
> +                               KEY_POS_WRITE |
> +                               KEY_USR_VIEW | KEY_USR_READ | KEY_USR_SEARCH |
> +                               KEY_USR_WRITE |
> +                               KEY_OTH_VIEW | KEY_OTH_READ | KEY_OTH_SEARCH,
> +                               KEY_ALLOC_NOT_IN_QUOTA,
> +                               NULL, NULL);
> +       if (IS_ERR(keyring)) {
> +               pr_err("Can't allocate rxperf server keyring: %ld\n",
> +                      PTR_ERR(keyring));
> +               goto error_keyring;
> +       }
> +       rxperf_sec_keyring = keyring;
> +       ret = rxperf_add_key(keyring);
> +       if (ret < 0)
> +               goto error_key;
> +
> +       ret = rxperf_open_socket();
> +       if (ret < 0)
> +               goto error_socket;
> +       return 0;
> +
> +error_socket:
> +error_key:
> +       key_put(rxperf_sec_keyring);
> +error_keyring:
> +       destroy_workqueue(rxperf_workqueue);
> +       rcu_barrier();
> +error_workqueue:
> +       pr_err("Failed to register: %d\n", ret);
> +       return ret;
> +}
> +late_initcall(rxperf_init); /* Must be called after net/ to create socket */
> +
> +static void __exit rxperf_exit(void)
> +{
> +       pr_info("Server unregistering.\n");
> +
> +       rxperf_close_socket();
> +       key_put(rxperf_sec_keyring);
> +       destroy_workqueue(rxperf_workqueue);
> +       rcu_barrier();
> +}
> +module_exit(rxperf_exit);
> diff --git a/net/rxrpc/server_key.c b/net/rxrpc/server_key.c
> index ee269e0e6ee8..e51940589ee5 100644
> --- a/net/rxrpc/server_key.c
> +++ b/net/rxrpc/server_key.c
> @@ -144,3 +144,28 @@ int rxrpc_server_keyring(struct rxrpc_sock *rx, sockptr_t optval, int optlen)
>         _leave(" = 0 [key %x]", key->serial);
>         return 0;
>  }
> +
> +/**
> + * rxrpc_sock_set_security_keyring - Set the security keyring for a kernel service
> + * @sk: The socket to set the keyring on
> + * @keyring: The keyring to set
> + *
> + * Set the server security keyring on an rxrpc socket.  This is used to provide
> + * the encryption keys for a kernel service.
> + */
> +int rxrpc_sock_set_security_keyring(struct sock *sk, struct key *keyring)
> +{
> +       struct rxrpc_sock *rx = rxrpc_sk(sk);
> +       int ret = 0;
> +
> +       lock_sock(sk);
> +       if (rx->securities)
> +               ret = -EINVAL;
> +       else if (rx->sk.sk_state != RXRPC_UNBOUND)
> +               ret = -EISCONN;
> +       else
> +               rx->securities = key_get(keyring);
> +       release_sock(sk);
> +       return ret;
> +}
> +EXPORT_SYMBOL(rxrpc_sock_set_security_keyring);

This will need some MODULE_* definitions when compiled as a module.

Could the port be made configurable, perhaps as a module parameter,
with 7009 as the default?

Marc
diff mbox series

Patch

diff --git a/include/net/af_rxrpc.h b/include/net/af_rxrpc.h
index b69ca695935c..dc033f08191e 100644
--- a/include/net/af_rxrpc.h
+++ b/include/net/af_rxrpc.h
@@ -71,5 +71,6 @@  void rxrpc_kernel_set_max_life(struct socket *, struct rxrpc_call *,
 			       unsigned long);
 
 int rxrpc_sock_set_min_security_level(struct sock *sk, unsigned int val);
+int rxrpc_sock_set_security_keyring(struct sock *, struct key *);
 
 #endif /* _NET_RXRPC_H */
diff --git a/net/rxrpc/Kconfig b/net/rxrpc/Kconfig
index accd35c05577..7ae023b37a83 100644
--- a/net/rxrpc/Kconfig
+++ b/net/rxrpc/Kconfig
@@ -58,4 +58,11 @@  config RXKAD
 
 	  See Documentation/networking/rxrpc.rst.
 
+config RXPERF
+	tristate "RxRPC test service"
+	help
+	  Provide an rxperf service tester.  This listens on UDP port 7009 for
+	  incoming calls from the rxperf program (an example of which can be
+	  found in OpenAFS).
+
 endif
diff --git a/net/rxrpc/Makefile b/net/rxrpc/Makefile
index fdeba488fc6e..79687477d93c 100644
--- a/net/rxrpc/Makefile
+++ b/net/rxrpc/Makefile
@@ -36,3 +36,6 @@  rxrpc-y := \
 rxrpc-$(CONFIG_PROC_FS) += proc.o
 rxrpc-$(CONFIG_RXKAD) += rxkad.o
 rxrpc-$(CONFIG_SYSCTL) += sysctl.o
+
+
+obj-$(CONFIG_RXPERF) += rxperf.o
diff --git a/net/rxrpc/rxperf.c b/net/rxrpc/rxperf.c
new file mode 100644
index 000000000000..7f8a1a186da8
--- /dev/null
+++ b/net/rxrpc/rxperf.c
@@ -0,0 +1,614 @@ 
+// SPDX-License-Identifier: GPL-2.0-or-later
+/* In-kernel rxperf server for testing purposes.
+ *
+ * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ */
+
+#define pr_fmt(fmt) "rxperf: " fmt
+#include <linux/slab.h>
+#include <net/sock.h>
+//#include <net/netns/generic.h>
+#include <net/af_rxrpc.h>
+
+#define RXPERF_PORT		7009
+#define RX_PERF_SERVICE		147
+#define RX_PERF_VERSION		3
+#define RX_PERF_SEND		0
+#define RX_PERF_RECV		1
+#define RX_PERF_RPC		3
+#define RX_PERF_FILE		4
+#define RX_PERF_MAGIC_COOKIE	0x4711
+
+struct rxperf_proto_params {
+	__be32		version;
+	__be32		type;
+	__be32		rsize;
+	__be32		wsize;
+} __packed;
+
+static const u8 rxperf_magic_cookie[] = { 0x00, 0x00, 0x47, 0x11 };
+static const u8 secret[8] = { 0xa7, 0x83, 0x8a, 0xcb, 0xc7, 0x83, 0xec, 0x94 };
+
+enum rxperf_call_state {
+	RXPERF_CALL_SV_AWAIT_PARAMS,	/* Server: Awaiting parameter block */
+	RXPERF_CALL_SV_AWAIT_REQUEST,	/* Server: Awaiting request data */
+	RXPERF_CALL_SV_REPLYING,	/* Server: Replying */
+	RXPERF_CALL_SV_AWAIT_ACK,	/* Server: Awaiting final ACK */
+	RXPERF_CALL_COMPLETE,		/* Completed or failed */
+};
+
+struct rxperf_call {
+	struct rxrpc_call	*rxcall;
+	struct iov_iter		iter;
+	struct kvec		kvec[1];
+	struct work_struct	work;
+	const char		*type;
+	size_t			iov_len;
+	size_t			req_len;	/* Size of request blob */
+	size_t			reply_len;	/* Size of reply blob */
+	unsigned int		debug_id;
+	unsigned int		operation_id;
+	struct rxperf_proto_params params;
+	__be32			tmp[2];
+	s32			abort_code;
+	enum rxperf_call_state	state;
+	short			error;
+	unsigned short		unmarshal;
+	u16			service_id;
+	int (*deliver)(struct rxperf_call *call);
+	void (*processor)(struct work_struct *work);
+};
+
+static struct socket *rxperf_socket;
+static struct key *rxperf_sec_keyring;	/* Ring of security/crypto keys */
+static struct workqueue_struct *rxperf_workqueue;
+
+static void rxperf_deliver_to_call(struct work_struct *work);
+static int rxperf_deliver_param_block(struct rxperf_call *call);
+static int rxperf_deliver_request(struct rxperf_call *call);
+static int rxperf_process_call(struct rxperf_call *call);
+static void rxperf_charge_preallocation(struct work_struct *work);
+
+static DECLARE_WORK(rxperf_charge_preallocation_work,
+		    rxperf_charge_preallocation);
+
+static inline void rxperf_set_call_state(struct rxperf_call *call,
+					 enum rxperf_call_state to)
+{
+	call->state = to;
+}
+
+static inline void rxperf_set_call_complete(struct rxperf_call *call,
+					    int error, s32 remote_abort)
+{
+	if (call->state != RXPERF_CALL_COMPLETE) {
+		call->abort_code = remote_abort;
+		call->error = error;
+		call->state = RXPERF_CALL_COMPLETE;
+	}
+}
+
+static void rxperf_rx_discard_new_call(struct rxrpc_call *rxcall,
+				       unsigned long user_call_ID)
+{
+	kfree((struct rxperf_call *)user_call_ID);
+}
+
+static void rxperf_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
+			       unsigned long user_call_ID)
+{
+	queue_work(rxperf_workqueue, &rxperf_charge_preallocation_work);
+}
+
+static void rxperf_queue_call_work(struct rxperf_call *call)
+{
+	queue_work(rxperf_workqueue, &call->work);
+}
+
+static void rxperf_notify_rx(struct sock *sk, struct rxrpc_call *rxcall,
+			     unsigned long call_user_ID)
+{
+	struct rxperf_call *call = (struct rxperf_call *)call_user_ID;
+
+	if (call->state != RXPERF_CALL_COMPLETE)
+		rxperf_queue_call_work(call);
+}
+
+static void rxperf_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
+{
+	struct rxperf_call *call = (struct rxperf_call *)user_call_ID;
+
+	call->rxcall = rxcall;
+}
+
+static void rxperf_notify_end_reply_tx(struct sock *sock,
+				       struct rxrpc_call *rxcall,
+				       unsigned long call_user_ID)
+{
+	rxperf_set_call_state((struct rxperf_call *)call_user_ID,
+			      RXPERF_CALL_SV_AWAIT_ACK);
+}
+
+/*
+ * Charge the incoming call preallocation.
+ */
+static void rxperf_charge_preallocation(struct work_struct *work)
+{
+	struct rxperf_call *call;
+
+	for (;;) {
+		call = kzalloc(sizeof(*call), GFP_KERNEL);
+		if (!call)
+			break;
+
+		call->type		= "unset";
+		call->debug_id		= atomic_inc_return(&rxrpc_debug_id);
+		call->deliver		= rxperf_deliver_param_block;
+		call->state		= RXPERF_CALL_SV_AWAIT_PARAMS;
+		call->service_id	= RX_PERF_SERVICE;
+		call->iov_len		= sizeof(call->params);
+		call->kvec[0].iov_len	= sizeof(call->params);
+		call->kvec[0].iov_base	= &call->params;
+		iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
+		INIT_WORK(&call->work, rxperf_deliver_to_call);
+
+		if (rxrpc_kernel_charge_accept(rxperf_socket,
+					       rxperf_notify_rx,
+					       rxperf_rx_attach,
+					       (unsigned long)call,
+					       GFP_KERNEL,
+					       call->debug_id) < 0)
+			break;
+		call = NULL;
+	}
+
+	kfree(call);
+}
+
+/*
+ * Open an rxrpc socket and bind it to be a server for callback notifications
+ * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
+ */
+static int rxperf_open_socket(void)
+{
+	struct sockaddr_rxrpc srx;
+	struct socket *socket;
+	int ret;
+
+	ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6,
+			       &socket);
+	if (ret < 0)
+		goto error_1;
+
+	socket->sk->sk_allocation = GFP_NOFS;
+
+	/* bind the callback manager's address to make this a server socket */
+	memset(&srx, 0, sizeof(srx));
+	srx.srx_family			= AF_RXRPC;
+	srx.srx_service			= RX_PERF_SERVICE;
+	srx.transport_type		= SOCK_DGRAM;
+	srx.transport_len		= sizeof(srx.transport.sin6);
+	srx.transport.sin6.sin6_family	= AF_INET6;
+	srx.transport.sin6.sin6_port	= htons(RXPERF_PORT);
+
+	ret = rxrpc_sock_set_min_security_level(socket->sk,
+						RXRPC_SECURITY_ENCRYPT);
+	if (ret < 0)
+		goto error_2;
+
+	ret = rxrpc_sock_set_security_keyring(socket->sk, rxperf_sec_keyring);
+
+	ret = kernel_bind(socket, (struct sockaddr *)&srx, sizeof(srx));
+	if (ret < 0)
+		goto error_2;
+
+	rxrpc_kernel_new_call_notification(socket, rxperf_rx_new_call,
+					   rxperf_rx_discard_new_call);
+
+	ret = kernel_listen(socket, INT_MAX);
+	if (ret < 0)
+		goto error_2;
+
+	rxperf_socket = socket;
+	rxperf_charge_preallocation(&rxperf_charge_preallocation_work);
+	return 0;
+
+error_2:
+	sock_release(socket);
+error_1:
+	pr_err("Can't set up rxperf socket: %d\n", ret);
+	return ret;
+}
+
+/*
+ * close the rxrpc socket rxperf was using
+ */
+static void rxperf_close_socket(void)
+{
+	kernel_listen(rxperf_socket, 0);
+	kernel_sock_shutdown(rxperf_socket, SHUT_RDWR);
+	flush_workqueue(rxperf_workqueue);
+	sock_release(rxperf_socket);
+}
+
+/*
+ * Log remote abort codes that indicate that we have a protocol disagreement
+ * with the server.
+ */
+static void rxperf_log_error(struct rxperf_call *call, s32 remote_abort)
+{
+	static int max = 0;
+	const char *msg;
+	int m;
+
+	switch (remote_abort) {
+	case RX_EOF:		 msg = "unexpected EOF";	break;
+	case RXGEN_CC_MARSHAL:	 msg = "client marshalling";	break;
+	case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling";	break;
+	case RXGEN_SS_MARSHAL:	 msg = "server marshalling";	break;
+	case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling";	break;
+	case RXGEN_DECODE:	 msg = "opcode decode";		break;
+	case RXGEN_SS_XDRFREE:	 msg = "server XDR cleanup";	break;
+	case RXGEN_CC_XDRFREE:	 msg = "client XDR cleanup";	break;
+	case -32:		 msg = "insufficient data";	break;
+	default:
+		return;
+	}
+
+	m = max;
+	if (m < 3) {
+		max = m + 1;
+		pr_info("Peer reported %s failure on %s\n", msg, call->type);
+	}
+}
+
+/*
+ * deliver messages to a call
+ */
+static void rxperf_deliver_to_call(struct work_struct *work)
+{
+	struct rxperf_call *call = container_of(work, struct rxperf_call, work);
+	enum rxperf_call_state state;
+	u32 abort_code, remote_abort = 0;
+	int ret;
+
+	if (call->state == RXPERF_CALL_COMPLETE)
+		return;
+
+	while (state = call->state,
+	       state == RXPERF_CALL_SV_AWAIT_PARAMS ||
+	       state == RXPERF_CALL_SV_AWAIT_REQUEST ||
+	       state == RXPERF_CALL_SV_AWAIT_ACK
+	       ) {
+		if (state == RXPERF_CALL_SV_AWAIT_ACK) {
+			if (!rxrpc_kernel_check_life(rxperf_socket, call->rxcall))
+				goto call_complete;
+			return;
+		}
+
+		ret = call->deliver(call);
+		if (ret == 0)
+			ret = rxperf_process_call(call);
+
+		switch (ret) {
+		case 0:
+			continue;
+		case -EINPROGRESS:
+		case -EAGAIN:
+			return;
+		case -ECONNABORTED:
+			rxperf_log_error(call, call->abort_code);
+			goto call_complete;
+		case -EOPNOTSUPP:
+			abort_code = RXGEN_OPCODE;
+			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
+						abort_code, ret, "GOP");
+			goto call_complete;
+		case -ENOTSUPP:
+			abort_code = RX_USER_ABORT;
+			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
+						abort_code, ret, "GUA");
+			goto call_complete;
+		case -EIO:
+			pr_err("Call %u in bad state %u\n",
+			       call->debug_id, call->state);
+			fallthrough;
+		case -ENODATA:
+		case -EBADMSG:
+		case -EMSGSIZE:
+		case -ENOMEM:
+		case -EFAULT:
+			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
+						RXGEN_SS_UNMARSHAL, ret, "GUM");
+			goto call_complete;
+		default:
+			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
+						RX_CALL_DEAD, ret, "GER");
+			goto call_complete;
+		}
+	}
+
+call_complete:
+	rxperf_set_call_complete(call, ret, remote_abort);
+	/* The call may have been requeued */
+	rxrpc_kernel_end_call(rxperf_socket, call->rxcall);
+	cancel_work(&call->work);
+	kfree(call);
+}
+
+/*
+ * Extract a piece of data from the received data socket buffers.
+ */
+static int rxperf_extract_data(struct rxperf_call *call, bool want_more)
+{
+	u32 remote_abort = 0;
+	int ret;
+
+	ret = rxrpc_kernel_recv_data(rxperf_socket, call->rxcall, &call->iter,
+				     &call->iov_len, want_more, &remote_abort,
+				     &call->service_id);
+	pr_debug("Extract i=%zu l=%zu m=%u ret=%d\n",
+		 iov_iter_count(&call->iter), call->iov_len, want_more, ret);
+	if (ret == 0 || ret == -EAGAIN)
+		return ret;
+
+	if (ret == 1) {
+		switch (call->state) {
+		case RXPERF_CALL_SV_AWAIT_REQUEST:
+			rxperf_set_call_state(call, RXPERF_CALL_SV_REPLYING);
+			break;
+		case RXPERF_CALL_COMPLETE:
+			pr_debug("premature completion %d", call->error);
+			return call->error;
+		default:
+			break;
+		}
+		return 0;
+	}
+
+	rxperf_set_call_complete(call, ret, remote_abort);
+	return ret;
+}
+
+/*
+ * Grab the operation ID from an incoming manager call.
+ */
+static int rxperf_deliver_param_block(struct rxperf_call *call)
+{
+	u32 version;
+	int ret;
+
+	/* Extract the parameter block */
+	ret = rxperf_extract_data(call, true);
+	if (ret < 0)
+		return ret;
+
+	version			= ntohl(call->params.version);
+	call->operation_id	= ntohl(call->params.type);
+	call->deliver		= rxperf_deliver_request;
+
+	if (version != RX_PERF_VERSION) {
+		pr_info("Version mismatch %x\n", version);
+		return -ENOTSUPP;
+	}
+
+	switch (call->operation_id) {
+	case RX_PERF_SEND:
+		call->type = "send";
+		call->reply_len = 0;
+		call->iov_len = 4;	/* Expect req size */
+		break;
+	case RX_PERF_RECV:
+		call->type = "recv";
+		call->req_len = 0;
+		call->iov_len = 4;	/* Expect reply size */
+		break;
+	case RX_PERF_RPC:
+		call->type = "rpc";
+		call->iov_len = 8;	/* Expect req size and reply size */
+		break;
+	case RX_PERF_FILE:
+		call->type = "file";
+		fallthrough;
+	default:
+		return -EOPNOTSUPP;
+	}
+
+	rxperf_set_call_state(call, RXPERF_CALL_SV_AWAIT_REQUEST);
+	return call->deliver(call);
+}
+
+/*
+ * Deliver the request data.
+ */
+static int rxperf_deliver_request(struct rxperf_call *call)
+{
+	int ret;
+
+	switch (call->unmarshal) {
+	case 0:
+		call->kvec[0].iov_len	= call->iov_len;
+		call->kvec[0].iov_base	= call->tmp;
+		iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
+		call->unmarshal++;
+		fallthrough;
+	case 1:
+		ret = rxperf_extract_data(call, true);
+		if (ret < 0)
+			return ret;
+
+		switch (call->operation_id) {
+		case RX_PERF_SEND:
+			call->type = "send";
+			call->req_len	= ntohl(call->tmp[0]);
+			call->reply_len	= 0;
+			break;
+		case RX_PERF_RECV:
+			call->type = "recv";
+			call->req_len = 0;
+			call->reply_len	= ntohl(call->tmp[0]);
+			break;
+		case RX_PERF_RPC:
+			call->type = "rpc";
+			call->req_len	= ntohl(call->tmp[0]);
+			call->reply_len	= ntohl(call->tmp[1]);
+			break;
+		default:
+			pr_info("Can't parse extra params\n");
+			return -EIO;
+		}
+
+		pr_debug("CALL op=%s rq=%zx rp=%zx\n",
+			 call->type, call->req_len, call->reply_len);
+
+		call->iov_len = call->req_len;
+		iov_iter_discard(&call->iter, READ, call->req_len);
+		call->unmarshal++;
+		fallthrough;
+	case 2:
+		ret = rxperf_extract_data(call, false);
+		if (ret < 0)
+			return ret;
+		call->unmarshal++;
+		fallthrough;
+	default:
+		return 0;
+	}
+}
+
+/*
+ * Process a call for which we've received the request.
+ */
+static int rxperf_process_call(struct rxperf_call *call)
+{
+	struct msghdr msg = {};
+	struct bio_vec bv[1];
+	struct kvec iov[1];
+	ssize_t n;
+	size_t reply_len = call->reply_len, len;
+
+	rxrpc_kernel_set_tx_length(rxperf_socket, call->rxcall,
+				   reply_len + sizeof(rxperf_magic_cookie));
+
+	while (reply_len > 0) {
+		len = min(reply_len, PAGE_SIZE);
+		bv[0].bv_page	= ZERO_PAGE(0);
+		bv[0].bv_offset	= 0;
+		bv[0].bv_len	= len;
+		iov_iter_bvec(&msg.msg_iter, WRITE, bv, 1, len);
+		msg.msg_flags = MSG_MORE;
+		n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg,
+					   len, rxperf_notify_end_reply_tx);
+		if (n < 0)
+			return n;
+		if (n == 0)
+			return -EIO;
+		reply_len -= n;
+	}
+
+	len = sizeof(rxperf_magic_cookie);
+	iov[0].iov_base	= (void *)rxperf_magic_cookie;
+	iov[0].iov_len	= len;
+	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
+	msg.msg_flags = 0;
+	n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, len,
+				   rxperf_notify_end_reply_tx);
+	if (n >= 0)
+		return 0; /* Success */
+
+	if (n == -ENOMEM)
+		rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
+					RXGEN_SS_MARSHAL, -ENOMEM, "GOM");
+	return n;
+}
+
+/*
+ * Add a key to the security keyring.
+ */
+static int rxperf_add_key(struct key *keyring)
+{
+	key_ref_t kref;
+	int ret;
+
+	kref = key_create_or_update(make_key_ref(keyring, true),
+				    "rxrpc_s",
+				    __stringify(RX_PERF_SERVICE) ":2",
+				    secret,
+				    sizeof(secret),
+				    KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH
+				    | KEY_USR_VIEW,
+				    KEY_ALLOC_NOT_IN_QUOTA);
+
+	if (IS_ERR(kref)) {
+		pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref));
+		return PTR_ERR(kref);
+	}
+
+	ret = key_link(keyring, key_ref_to_ptr(kref));
+	if (ret < 0)
+		pr_err("Can't link rxperf server key: %d\n", ret);
+	key_ref_put(kref);
+	return ret;
+}
+
+/*
+ * Initialise the rxperf server.
+ */
+static int __init rxperf_init(void)
+{
+	struct key *keyring;
+	int ret = -ENOMEM;
+
+	pr_info("Server registering\n");
+
+	rxperf_workqueue = alloc_workqueue("rxperf", 0, 0);
+	if (!rxperf_workqueue)
+		goto error_workqueue;
+
+	keyring = keyring_alloc("rxperf_server",
+				GLOBAL_ROOT_UID, GLOBAL_ROOT_GID, current_cred(),
+				KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH |
+				KEY_POS_WRITE |
+				KEY_USR_VIEW | KEY_USR_READ | KEY_USR_SEARCH |
+				KEY_USR_WRITE |
+				KEY_OTH_VIEW | KEY_OTH_READ | KEY_OTH_SEARCH,
+				KEY_ALLOC_NOT_IN_QUOTA,
+				NULL, NULL);
+	if (IS_ERR(keyring)) {
+		pr_err("Can't allocate rxperf server keyring: %ld\n",
+		       PTR_ERR(keyring));
+		goto error_keyring;
+	}
+	rxperf_sec_keyring = keyring;
+	ret = rxperf_add_key(keyring);
+	if (ret < 0)
+		goto error_key;
+
+	ret = rxperf_open_socket();
+	if (ret < 0)
+		goto error_socket;
+	return 0;
+
+error_socket:
+error_key:
+	key_put(rxperf_sec_keyring);
+error_keyring:
+	destroy_workqueue(rxperf_workqueue);
+	rcu_barrier();
+error_workqueue:
+	pr_err("Failed to register: %d\n", ret);
+	return ret;
+}
+late_initcall(rxperf_init); /* Must be called after net/ to create socket */
+
+static void __exit rxperf_exit(void)
+{
+	pr_info("Server unregistering.\n");
+
+	rxperf_close_socket();
+	key_put(rxperf_sec_keyring);
+	destroy_workqueue(rxperf_workqueue);
+	rcu_barrier();
+}
+module_exit(rxperf_exit);
diff --git a/net/rxrpc/server_key.c b/net/rxrpc/server_key.c
index ee269e0e6ee8..e51940589ee5 100644
--- a/net/rxrpc/server_key.c
+++ b/net/rxrpc/server_key.c
@@ -144,3 +144,28 @@  int rxrpc_server_keyring(struct rxrpc_sock *rx, sockptr_t optval, int optlen)
 	_leave(" = 0 [key %x]", key->serial);
 	return 0;
 }
+
+/**
+ * rxrpc_sock_set_security_keyring - Set the security keyring for a kernel service
+ * @sk: The socket to set the keyring on
+ * @keyring: The keyring to set
+ *
+ * Set the server security keyring on an rxrpc socket.  This is used to provide
+ * the encryption keys for a kernel service.
+ */
+int rxrpc_sock_set_security_keyring(struct sock *sk, struct key *keyring)
+{
+	struct rxrpc_sock *rx = rxrpc_sk(sk);
+	int ret = 0;
+
+	lock_sock(sk);
+	if (rx->securities)
+		ret = -EINVAL;
+	else if (rx->sk.sk_state != RXRPC_UNBOUND)
+		ret = -EISCONN;
+	else
+		rx->securities = key_get(keyring);
+	release_sock(sk);
+	return ret;
+}
+EXPORT_SYMBOL(rxrpc_sock_set_security_keyring);