diff mbox series

rpmsg: char: Use preallocated SKBs.

Message ID 20221206085008.25388-1-piotr.wojtaszczyk@timesys.com (mailing list archive)
State Superseded
Headers show
Series rpmsg: char: Use preallocated SKBs. | expand

Commit Message

Piotr Wojtaszczyk Dec. 6, 2022, 8:50 a.m. UTC
On a message reception copy the message to a SKB taken from preallocated
pool instead of allocating a new SKB each time.
During high rpmsg traffic this reduces consumed CPU time noticeably.

Signed-off-by: Piotr Wojtaszczyk <piotr.wojtaszczyk@timesys.com>
---
 drivers/rpmsg/rpmsg_char.c       | 58 ++++++++++++++++++++++++++++----
 drivers/rpmsg/rpmsg_internal.h   | 21 ++++++++++++
 drivers/rpmsg/virtio_rpmsg_bus.c | 21 ------------
 3 files changed, 72 insertions(+), 28 deletions(-)

Comments

Arnaud POULIQUEN Dec. 6, 2022, 12:54 p.m. UTC | #1
Hello Piotr

On 12/6/22 09:50, Piotr Wojtaszczyk wrote:
> On a message reception copy the message to a SKB taken from preallocated
> pool instead of allocating a new SKB each time.
> During high rpmsg traffic this reduces consumed CPU time noticeably.

Do you have any metrics to share?

> 
> Signed-off-by: Piotr Wojtaszczyk <piotr.wojtaszczyk@timesys.com>
> ---
>  drivers/rpmsg/rpmsg_char.c       | 58 ++++++++++++++++++++++++++++----
>  drivers/rpmsg/rpmsg_internal.h   | 21 ++++++++++++
>  drivers/rpmsg/virtio_rpmsg_bus.c | 21 ------------
>  3 files changed, 72 insertions(+), 28 deletions(-)
> 
> diff --git a/drivers/rpmsg/rpmsg_char.c b/drivers/rpmsg/rpmsg_char.c
> index ac50ed757765..76546ba72cdc 100644
> --- a/drivers/rpmsg/rpmsg_char.c
> +++ b/drivers/rpmsg/rpmsg_char.c
> @@ -75,9 +75,44 @@ struct rpmsg_eptdev {
>  
>  	spinlock_t queue_lock;
>  	struct sk_buff_head queue;
> +	struct sk_buff_head skb_pool;
>  	wait_queue_head_t readq;
>  };
>  
> +static inline
> +struct sk_buff *rpmsg_eptdev_get_skb(struct rpmsg_eptdev *eptdev)
> +{
> +	struct sk_buff *skb;
> +
> +	skb = skb_dequeue(&eptdev->skb_pool);
> +	if (!skb)
> +		skb = alloc_skb(MAX_RPMSG_BUF_SIZE, GFP_ATOMIC);

The "get_mtu" endpoint ops should be used here.
But in any case this works for the virtio backend which defines get_mtu ops
(asit define the MAX_RPMSG_BUF_SIZE), but not for other backend such as glink.
Your proposal needs to be compatible with the legacy.

Here is a proposal:

static struct
sk_buff *rpmsg_eptdev_get_skb(struct rpmsg_eptdev *eptdev, int len)
{
	struct sk_buff *skb;

	if (eptdev->ept->ops->get_mtu) {
		skb = skb_dequeue(&eptdev->skb_pool);
		if (!skb)
			skb = alloc_skb(eptdev->ept->ops->get_mtu(eptdev->ept),
					GFP_ATOMIC);
	} else {
		alloc_skb (len);
	}
}

> +	return skb;
> +}
> +
> +static inline
> +void rpmsg_eptdev_put_skb(struct rpmsg_eptdev *eptdev, struct sk_buff *skb)
> +{
> +	/* Recycle the skb */
> +	skb->tail = 0;
> +	skb->len = 0;
> +	skb_queue_head(&eptdev->skb_pool, skb);
> +}
> +
> +static void rpmsg_eptdev_free_all_skb(struct rpmsg_eptdev *eptdev)
> +{
> +	struct sk_buff *skb;
> +
> +	while (!skb_queue_empty(&eptdev->queue)) {
> +		skb = skb_dequeue(&eptdev->queue);
> +		kfree_skb(skb);
> +	}
> +	while (!skb_queue_empty(&eptdev->skb_pool)) {
> +		skb = skb_dequeue(&eptdev->skb_pool);
> +		kfree_skb(skb);
> +	}
> +}
> +
>  static int rpmsg_eptdev_destroy(struct device *dev, void *data)
>  {
>  	struct rpmsg_eptdev *eptdev = dev_to_eptdev(dev);
> @@ -104,7 +139,7 @@ static int rpmsg_ept_cb(struct rpmsg_device *rpdev, void *buf, int len,
>  	struct rpmsg_eptdev *eptdev = priv;
>  	struct sk_buff *skb;
>  
> -	skb = alloc_skb(len, GFP_ATOMIC);
> +	skb = rpmsg_eptdev_get_skb(eptdev);
>  	if (!skb)
>  		return -ENOMEM;
>  
> @@ -126,6 +161,18 @@ static int rpmsg_eptdev_open(struct inode *inode, struct file *filp)
>  	struct rpmsg_endpoint *ept;
>  	struct rpmsg_device *rpdev = eptdev->rpdev;
>  	struct device *dev = &eptdev->dev;
> +	struct sk_buff *skb;
> +	int i;
> +
> +	/* Preallocate 8 SKBs */
> +	for (i = 0; i < 8; i++) {

Do you need to preallocate them?
during runtime, it will try to reuse SKBs of the skb_pool and if no more
available it will create a new one.
This would also help to solve the issue of using MAX_RPMSG_BUF_SIZE

Regards,
Arnaud
> +		skb = rpmsg_eptdev_get_skb(eptdev);
> +		if (!skb) {
> +			rpmsg_eptdev_free_all_skb(eptdev);
> +			return -ENOMEM;
> +		}
> +		rpmsg_eptdev_put_skb(eptdev, skb);
> +	}
>  
>  	get_device(dev);
>  
> @@ -146,7 +193,6 @@ static int rpmsg_eptdev_release(struct inode *inode, struct file *filp)
>  {
>  	struct rpmsg_eptdev *eptdev = cdev_to_eptdev(inode->i_cdev);
>  	struct device *dev = &eptdev->dev;
> -	struct sk_buff *skb;
>  
>  	/* Close the endpoint, if it's not already destroyed by the parent */
>  	mutex_lock(&eptdev->ept_lock);
> @@ -157,10 +203,7 @@ static int rpmsg_eptdev_release(struct inode *inode, struct file *filp)
>  	mutex_unlock(&eptdev->ept_lock);
>  
>  	/* Discard all SKBs */
> -	while (!skb_queue_empty(&eptdev->queue)) {
> -		skb = skb_dequeue(&eptdev->queue);
> -		kfree_skb(skb);
> -	}
> +	rpmsg_eptdev_free_all_skb(eptdev);
>  
>  	put_device(dev);
>  
> @@ -209,7 +252,7 @@ static ssize_t rpmsg_eptdev_read_iter(struct kiocb *iocb, struct iov_iter *to)
>  	if (copy_to_iter(skb->data, use, to) != use)
>  		use = -EFAULT;
>  
> -	kfree_skb(skb);
> +	rpmsg_eptdev_put_skb(eptdev, skb);
>  
>  	return use;
>  }
> @@ -358,6 +401,7 @@ static int rpmsg_eptdev_create(struct rpmsg_ctrldev *ctrldev,
>  	mutex_init(&eptdev->ept_lock);
>  	spin_lock_init(&eptdev->queue_lock);
>  	skb_queue_head_init(&eptdev->queue);
> +	skb_queue_head_init(&eptdev->skb_pool);
>  	init_waitqueue_head(&eptdev->readq);
>  
>  	device_initialize(dev);
> diff --git a/drivers/rpmsg/rpmsg_internal.h b/drivers/rpmsg/rpmsg_internal.h
> index 3fc83cd50e98..5acaa54a277a 100644
> --- a/drivers/rpmsg/rpmsg_internal.h
> +++ b/drivers/rpmsg/rpmsg_internal.h
> @@ -15,6 +15,27 @@
>  #include <linux/rpmsg.h>
>  #include <linux/poll.h>
>  
> +/*
> + * We're allocating buffers of 512 bytes each for communications. The
> + * number of buffers will be computed from the number of buffers supported
> + * by the vring, upto a maximum of 512 buffers (256 in each direction).
> + *
> + * Each buffer will have 16 bytes for the msg header and 496 bytes for
> + * the payload.
> + *
> + * This will utilize a maximum total space of 256KB for the buffers.
> + *
> + * We might also want to add support for user-provided buffers in time.
> + * This will allow bigger buffer size flexibility, and can also be used
> + * to achieve zero-copy messaging.
> + *
> + * Note that these numbers are purely a decision of this driver - we
> + * can change this without changing anything in the firmware of the remote
> + * processor.
> + */
> +#define MAX_RPMSG_NUM_BUFS	(512)
> +#define MAX_RPMSG_BUF_SIZE	(512)
> +
>  #define to_rpmsg_device(d) container_of(d, struct rpmsg_device, dev)
>  #define to_rpmsg_driver(d) container_of(d, struct rpmsg_driver, drv)
>  
> diff --git a/drivers/rpmsg/virtio_rpmsg_bus.c b/drivers/rpmsg/virtio_rpmsg_bus.c
> index 3d9e442883e1..6552928a440d 100644
> --- a/drivers/rpmsg/virtio_rpmsg_bus.c
> +++ b/drivers/rpmsg/virtio_rpmsg_bus.c
> @@ -133,27 +133,6 @@ struct virtio_rpmsg_channel {
>  #define to_virtio_rpmsg_channel(_rpdev) \
>  	container_of(_rpdev, struct virtio_rpmsg_channel, rpdev)
>  
> -/*
> - * We're allocating buffers of 512 bytes each for communications. The
> - * number of buffers will be computed from the number of buffers supported
> - * by the vring, upto a maximum of 512 buffers (256 in each direction).
> - *
> - * Each buffer will have 16 bytes for the msg header and 496 bytes for
> - * the payload.
> - *
> - * This will utilize a maximum total space of 256KB for the buffers.
> - *
> - * We might also want to add support for user-provided buffers in time.
> - * This will allow bigger buffer size flexibility, and can also be used
> - * to achieve zero-copy messaging.
> - *
> - * Note that these numbers are purely a decision of this driver - we
> - * can change this without changing anything in the firmware of the remote
> - * processor.
> - */
> -#define MAX_RPMSG_NUM_BUFS	(512)
> -#define MAX_RPMSG_BUF_SIZE	(512)
> -
>  /*
>   * Local addresses are dynamically allocated on-demand.
>   * We do not dynamically assign addresses from the low 1024 range,
Arnaud POULIQUEN Dec. 7, 2022, 8:59 a.m. UTC | #2
On 12/6/22 15:40, Piotr Wojtaszczyk wrote:
> Hi Arnaud,
> 
> On Tue, Dec 6, 2022 at 1:54 PM Arnaud POULIQUEN <arnaud.pouliquen@foss.st.com
> <mailto:arnaud.pouliquen@foss.st.com>> wrote:
>> On 12/6/22 09:50, Piotr Wojtaszczyk wrote:
>> > On a message reception copy the message to a SKB taken from preallocated
>> > pool instead of allocating a new SKB each time.
>> > During high rpmsg traffic this reduces consumed CPU time noticeably.
>>
>> Do you have any metrics to share?
> Tested on 1GHZ single core ARM Cortex-A55 (64bit), virtio backend.
> Ping-pong pair messages (receive + send) every 125us reduced cpu load from 7% to 6%.
> 
>> > +static inline
>> > +struct sk_buff *rpmsg_eptdev_get_skb(struct rpmsg_eptdev *eptdev)
>> > +{
>> > +     struct sk_buff *skb;
>> > +
>> > +     skb = skb_dequeue(&eptdev->skb_pool);
>> > +     if (!skb)
>> > +             skb = alloc_skb(MAX_RPMSG_BUF_SIZE, GFP_ATOMIC);
>>
>> The "get_mtu" endpoint ops should be used here.
>> But in any case this works for the virtio backend which defines get_mtu ops
>> (asit define the MAX_RPMSG_BUF_SIZE), but not for other backend such as glink.
>> Your proposal needs to be compatible with the legacy.
>>
>> Here is a proposal:
>>
>> static struct
>> sk_buff *rpmsg_eptdev_get_skb(struct rpmsg_eptdev *eptdev, int len)
>> {
>>         struct sk_buff *skb;
>>
>>         if (eptdev->ept->ops->get_mtu) {
>>                 skb = skb_dequeue(&eptdev->skb_pool);
>>                 if (!skb)
>>                         skb = alloc_skb(eptdev->ept->ops->get_mtu(eptdev->ept),
>>                                         GFP_ATOMIC);
>>         } else {
>>                 alloc_skb (len);
>>         }
>> }
> The received messages can have different lengths, if we try to reuse skb
> which was allocated for smaller a message previously, that is a problem, isn't it?
> I went for the worst case scenario in the virtio backend.

The get_mtu give you the max transmit unit which should be > len, but some
checks can be added

Regards,
Arnaud
> 
> 
>> > @@ -126,6 +161,18 @@ static int rpmsg_eptdev_open(struct inode *inode,
> struct file *filp)
>> >       struct rpmsg_endpoint *ept;
>> >       struct rpmsg_device *rpdev = eptdev->rpdev;
>> >       struct device *dev = &eptdev->dev;
>> > +     struct sk_buff *skb;
>> > +     int i;
>> > +
>> > +     /* Preallocate 8 SKBs */
>> > +     for (i = 0; i < 8; i++) {
>>
>> Do you need to preallocate them?
>> during runtime, it will try to reuse SKBs of the skb_pool and if no more
>> available it will create a new one.
>> This would also help to solve the issue of using MAX_RPMSG_BUF_SIZE
> Agree, we can allocate SKBs at run time if needed. I thought it would be better
> to start with some SKBs but I think now it's an overkill.
> 
> 
> -- 
> Piotr Wojtaszczyk
> Timesys
diff mbox series

Patch

diff --git a/drivers/rpmsg/rpmsg_char.c b/drivers/rpmsg/rpmsg_char.c
index ac50ed757765..76546ba72cdc 100644
--- a/drivers/rpmsg/rpmsg_char.c
+++ b/drivers/rpmsg/rpmsg_char.c
@@ -75,9 +75,44 @@  struct rpmsg_eptdev {
 
 	spinlock_t queue_lock;
 	struct sk_buff_head queue;
+	struct sk_buff_head skb_pool;
 	wait_queue_head_t readq;
 };
 
+static inline
+struct sk_buff *rpmsg_eptdev_get_skb(struct rpmsg_eptdev *eptdev)
+{
+	struct sk_buff *skb;
+
+	skb = skb_dequeue(&eptdev->skb_pool);
+	if (!skb)
+		skb = alloc_skb(MAX_RPMSG_BUF_SIZE, GFP_ATOMIC);
+	return skb;
+}
+
+static inline
+void rpmsg_eptdev_put_skb(struct rpmsg_eptdev *eptdev, struct sk_buff *skb)
+{
+	/* Recycle the skb */
+	skb->tail = 0;
+	skb->len = 0;
+	skb_queue_head(&eptdev->skb_pool, skb);
+}
+
+static void rpmsg_eptdev_free_all_skb(struct rpmsg_eptdev *eptdev)
+{
+	struct sk_buff *skb;
+
+	while (!skb_queue_empty(&eptdev->queue)) {
+		skb = skb_dequeue(&eptdev->queue);
+		kfree_skb(skb);
+	}
+	while (!skb_queue_empty(&eptdev->skb_pool)) {
+		skb = skb_dequeue(&eptdev->skb_pool);
+		kfree_skb(skb);
+	}
+}
+
 static int rpmsg_eptdev_destroy(struct device *dev, void *data)
 {
 	struct rpmsg_eptdev *eptdev = dev_to_eptdev(dev);
@@ -104,7 +139,7 @@  static int rpmsg_ept_cb(struct rpmsg_device *rpdev, void *buf, int len,
 	struct rpmsg_eptdev *eptdev = priv;
 	struct sk_buff *skb;
 
-	skb = alloc_skb(len, GFP_ATOMIC);
+	skb = rpmsg_eptdev_get_skb(eptdev);
 	if (!skb)
 		return -ENOMEM;
 
@@ -126,6 +161,18 @@  static int rpmsg_eptdev_open(struct inode *inode, struct file *filp)
 	struct rpmsg_endpoint *ept;
 	struct rpmsg_device *rpdev = eptdev->rpdev;
 	struct device *dev = &eptdev->dev;
+	struct sk_buff *skb;
+	int i;
+
+	/* Preallocate 8 SKBs */
+	for (i = 0; i < 8; i++) {
+		skb = rpmsg_eptdev_get_skb(eptdev);
+		if (!skb) {
+			rpmsg_eptdev_free_all_skb(eptdev);
+			return -ENOMEM;
+		}
+		rpmsg_eptdev_put_skb(eptdev, skb);
+	}
 
 	get_device(dev);
 
@@ -146,7 +193,6 @@  static int rpmsg_eptdev_release(struct inode *inode, struct file *filp)
 {
 	struct rpmsg_eptdev *eptdev = cdev_to_eptdev(inode->i_cdev);
 	struct device *dev = &eptdev->dev;
-	struct sk_buff *skb;
 
 	/* Close the endpoint, if it's not already destroyed by the parent */
 	mutex_lock(&eptdev->ept_lock);
@@ -157,10 +203,7 @@  static int rpmsg_eptdev_release(struct inode *inode, struct file *filp)
 	mutex_unlock(&eptdev->ept_lock);
 
 	/* Discard all SKBs */
-	while (!skb_queue_empty(&eptdev->queue)) {
-		skb = skb_dequeue(&eptdev->queue);
-		kfree_skb(skb);
-	}
+	rpmsg_eptdev_free_all_skb(eptdev);
 
 	put_device(dev);
 
@@ -209,7 +252,7 @@  static ssize_t rpmsg_eptdev_read_iter(struct kiocb *iocb, struct iov_iter *to)
 	if (copy_to_iter(skb->data, use, to) != use)
 		use = -EFAULT;
 
-	kfree_skb(skb);
+	rpmsg_eptdev_put_skb(eptdev, skb);
 
 	return use;
 }
@@ -358,6 +401,7 @@  static int rpmsg_eptdev_create(struct rpmsg_ctrldev *ctrldev,
 	mutex_init(&eptdev->ept_lock);
 	spin_lock_init(&eptdev->queue_lock);
 	skb_queue_head_init(&eptdev->queue);
+	skb_queue_head_init(&eptdev->skb_pool);
 	init_waitqueue_head(&eptdev->readq);
 
 	device_initialize(dev);
diff --git a/drivers/rpmsg/rpmsg_internal.h b/drivers/rpmsg/rpmsg_internal.h
index 3fc83cd50e98..5acaa54a277a 100644
--- a/drivers/rpmsg/rpmsg_internal.h
+++ b/drivers/rpmsg/rpmsg_internal.h
@@ -15,6 +15,27 @@ 
 #include <linux/rpmsg.h>
 #include <linux/poll.h>
 
+/*
+ * We're allocating buffers of 512 bytes each for communications. The
+ * number of buffers will be computed from the number of buffers supported
+ * by the vring, upto a maximum of 512 buffers (256 in each direction).
+ *
+ * Each buffer will have 16 bytes for the msg header and 496 bytes for
+ * the payload.
+ *
+ * This will utilize a maximum total space of 256KB for the buffers.
+ *
+ * We might also want to add support for user-provided buffers in time.
+ * This will allow bigger buffer size flexibility, and can also be used
+ * to achieve zero-copy messaging.
+ *
+ * Note that these numbers are purely a decision of this driver - we
+ * can change this without changing anything in the firmware of the remote
+ * processor.
+ */
+#define MAX_RPMSG_NUM_BUFS	(512)
+#define MAX_RPMSG_BUF_SIZE	(512)
+
 #define to_rpmsg_device(d) container_of(d, struct rpmsg_device, dev)
 #define to_rpmsg_driver(d) container_of(d, struct rpmsg_driver, drv)
 
diff --git a/drivers/rpmsg/virtio_rpmsg_bus.c b/drivers/rpmsg/virtio_rpmsg_bus.c
index 3d9e442883e1..6552928a440d 100644
--- a/drivers/rpmsg/virtio_rpmsg_bus.c
+++ b/drivers/rpmsg/virtio_rpmsg_bus.c
@@ -133,27 +133,6 @@  struct virtio_rpmsg_channel {
 #define to_virtio_rpmsg_channel(_rpdev) \
 	container_of(_rpdev, struct virtio_rpmsg_channel, rpdev)
 
-/*
- * We're allocating buffers of 512 bytes each for communications. The
- * number of buffers will be computed from the number of buffers supported
- * by the vring, upto a maximum of 512 buffers (256 in each direction).
- *
- * Each buffer will have 16 bytes for the msg header and 496 bytes for
- * the payload.
- *
- * This will utilize a maximum total space of 256KB for the buffers.
- *
- * We might also want to add support for user-provided buffers in time.
- * This will allow bigger buffer size flexibility, and can also be used
- * to achieve zero-copy messaging.
- *
- * Note that these numbers are purely a decision of this driver - we
- * can change this without changing anything in the firmware of the remote
- * processor.
- */
-#define MAX_RPMSG_NUM_BUFS	(512)
-#define MAX_RPMSG_BUF_SIZE	(512)
-
 /*
  * Local addresses are dynamically allocated on-demand.
  * We do not dynamically assign addresses from the low 1024 range,