diff mbox series

[net-next,1/4] net: xdp: introduce bulking for xdp tx return path

Message ID 7495b5ac96b0fd2bf5ab79b12e01bf0ee0fff803.1603824486.git.lorenzo@kernel.org (mailing list archive)
State Superseded
Delegated to: BPF
Headers show
Series xdp: introduce bulking for page_pool tx return path | expand

Commit Message

Lorenzo Bianconi Oct. 27, 2020, 7:04 p.m. UTC
Introduce bulking capability in xdp tx return path (XDP_REDIRECT).
xdp_return_frame is usually run inside the driver NAPI tx completion
loop so it is possible batch it.
Current implementation considers only page_pool memory model.
Convert mvneta driver to xdp_return_frame_bulk APIs.

Suggested-by: Jesper Dangaard Brouer <brouer@redhat.com>
Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org>
---
 drivers/net/ethernet/marvell/mvneta.c |  5 ++-
 include/net/xdp.h                     |  9 +++++
 net/core/xdp.c                        | 51 +++++++++++++++++++++++++++
 3 files changed, 64 insertions(+), 1 deletion(-)

Comments

Ilias Apalodimas Oct. 28, 2020, 9:27 a.m. UTC | #1
Hi Lorenzo,

On Tue, Oct 27, 2020 at 08:04:07PM +0100, Lorenzo Bianconi wrote:
> Introduce bulking capability in xdp tx return path (XDP_REDIRECT).
> xdp_return_frame is usually run inside the driver NAPI tx completion
> loop so it is possible batch it.
> Current implementation considers only page_pool memory model.
> Convert mvneta driver to xdp_return_frame_bulk APIs.
> 
> Suggested-by: Jesper Dangaard Brouer <brouer@redhat.com>
> Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org>
> ---
>  drivers/net/ethernet/marvell/mvneta.c |  5 ++-
>  include/net/xdp.h                     |  9 +++++
>  net/core/xdp.c                        | 51 +++++++++++++++++++++++++++
>  3 files changed, 64 insertions(+), 1 deletion(-)
> 
> diff --git a/drivers/net/ethernet/marvell/mvneta.c b/drivers/net/ethernet/marvell/mvneta.c
> index 54b0bf574c05..43ab8a73900e 100644
> --- a/drivers/net/ethernet/marvell/mvneta.c
> +++ b/drivers/net/ethernet/marvell/mvneta.c
> @@ -1834,8 +1834,10 @@ static void mvneta_txq_bufs_free(struct mvneta_port *pp,
>  				 struct netdev_queue *nq, bool napi)
>  {
>  	unsigned int bytes_compl = 0, pkts_compl = 0;
> +	struct xdp_frame_bulk bq;
>  	int i;
>  
> +	bq.xa = NULL;
>  	for (i = 0; i < num; i++) {
>  		struct mvneta_tx_buf *buf = &txq->buf[txq->txq_get_index];
>  		struct mvneta_tx_desc *tx_desc = txq->descs +
> @@ -1857,9 +1859,10 @@ static void mvneta_txq_bufs_free(struct mvneta_port *pp,
>  			if (napi && buf->type == MVNETA_TYPE_XDP_TX)
>  				xdp_return_frame_rx_napi(buf->xdpf);
>  			else
> -				xdp_return_frame(buf->xdpf);
> +				xdp_return_frame_bulk(buf->xdpf, &bq);
>  		}
>  	}
> +	xdp_flush_frame_bulk(&bq);
>  
>  	netdev_tx_completed_queue(nq, pkts_compl, bytes_compl);
>  }
> diff --git a/include/net/xdp.h b/include/net/xdp.h
> index 3814fb631d52..9567110845ef 100644
> --- a/include/net/xdp.h
> +++ b/include/net/xdp.h
> @@ -104,6 +104,12 @@ struct xdp_frame {
>  	struct net_device *dev_rx; /* used by cpumap */
>  };
>  
> +#define XDP_BULK_QUEUE_SIZE	16
> +struct xdp_frame_bulk {
> +	void *q[XDP_BULK_QUEUE_SIZE];
> +	int count;
> +	void *xa;
> +};
>  
>  static inline struct skb_shared_info *
>  xdp_get_shared_info_from_frame(struct xdp_frame *frame)
> @@ -194,6 +200,9 @@ struct xdp_frame *xdp_convert_buff_to_frame(struct xdp_buff *xdp)
>  void xdp_return_frame(struct xdp_frame *xdpf);
>  void xdp_return_frame_rx_napi(struct xdp_frame *xdpf);
>  void xdp_return_buff(struct xdp_buff *xdp);
> +void xdp_flush_frame_bulk(struct xdp_frame_bulk *bq);
> +void xdp_return_frame_bulk(struct xdp_frame *xdpf,
> +			   struct xdp_frame_bulk *bq);
>  
>  /* When sending xdp_frame into the network stack, then there is no
>   * return point callback, which is needed to release e.g. DMA-mapping
> diff --git a/net/core/xdp.c b/net/core/xdp.c
> index 48aba933a5a8..93eabd789246 100644
> --- a/net/core/xdp.c
> +++ b/net/core/xdp.c
> @@ -380,6 +380,57 @@ void xdp_return_frame_rx_napi(struct xdp_frame *xdpf)
>  }
>  EXPORT_SYMBOL_GPL(xdp_return_frame_rx_napi);
>  
> +void xdp_flush_frame_bulk(struct xdp_frame_bulk *bq)
> +{
> +	struct xdp_mem_allocator *xa = bq->xa;
> +	int i;
> +
> +	if (unlikely(!xa))
> +		return;
> +
> +	for (i = 0; i < bq->count; i++) {
> +		struct page *page = virt_to_head_page(bq->q[i]);
> +
> +		page_pool_put_full_page(xa->page_pool, page, false);
> +	}
> +	bq->count = 0;
> +}
> +EXPORT_SYMBOL_GPL(xdp_flush_frame_bulk);
> +
> +void xdp_return_frame_bulk(struct xdp_frame *xdpf,
> +			   struct xdp_frame_bulk *bq)
> +{
> +	struct xdp_mem_info *mem = &xdpf->mem;
> +	struct xdp_mem_allocator *xa, *nxa;
> +
> +	if (mem->type != MEM_TYPE_PAGE_POOL) {
> +		__xdp_return(xdpf->data, &xdpf->mem, false);
> +		return;
> +	}
> +
> +	rcu_read_lock();
> +
> +	xa = bq->xa;
> +	if (unlikely(!xa || mem->id != xa->mem.id)) {

Why is this marked as unlikely? The driver passes it as NULL. Should unlikely be
checked on both xa and the comparison?

> +		nxa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);

Is there a chance nxa can be NULL?

> +		if (unlikely(!xa)) {

Same here, driver passes it as NULL

> +			bq->count = 0;
> +			bq->xa = nxa;
> +			xa = nxa;
> +		}
> +	}
> +
> +	if (mem->id != xa->mem.id || bq->count == XDP_BULK_QUEUE_SIZE)
> +		xdp_flush_frame_bulk(bq);
> +
> +	bq->q[bq->count++] = xdpf->data;
> +	if (mem->id != xa->mem.id)
> +		bq->xa = nxa;
> +
> +	rcu_read_unlock();
> +}
> +EXPORT_SYMBOL_GPL(xdp_return_frame_bulk);
> +
>  void xdp_return_buff(struct xdp_buff *xdp)
>  {
>  	__xdp_return(xdp->data, &xdp->rxq->mem, true);
> -- 
> 2.26.2
> 

Cheers
/Ilias
Lorenzo Bianconi Oct. 28, 2020, 10:23 a.m. UTC | #2
> Hi Lorenzo,

Hi Ilias,

thx for the review.

> 
> On Tue, Oct 27, 2020 at 08:04:07PM +0100, Lorenzo Bianconi wrote:

[...]

> > +void xdp_return_frame_bulk(struct xdp_frame *xdpf,
> > +			   struct xdp_frame_bulk *bq)
> > +{
> > +	struct xdp_mem_info *mem = &xdpf->mem;
> > +	struct xdp_mem_allocator *xa, *nxa;
> > +
> > +	if (mem->type != MEM_TYPE_PAGE_POOL) {
> > +		__xdp_return(xdpf->data, &xdpf->mem, false);
> > +		return;
> > +	}
> > +
> > +	rcu_read_lock();
> > +
> > +	xa = bq->xa;
> > +	if (unlikely(!xa || mem->id != xa->mem.id)) {
> 
> Why is this marked as unlikely? The driver passes it as NULL. Should unlikely be
> checked on both xa and the comparison?

xa is NULL only for the first xdp_frame in the burst while it is set for
subsequent ones. Do you think it is better to remove it?

> 
> > +		nxa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
> 
> Is there a chance nxa can be NULL?

I do not think so since the page_pool is not destroyed while there are
in-flight pages, right?

Regards,
Lorenzo

> 
> > +		if (unlikely(!xa)) {
> 
> Same here, driver passes it as NULL
> 
> > +			bq->count = 0;
> > +			bq->xa = nxa;
> > +			xa = nxa;
> > +		}
> > +	}
> > +
> > +	if (mem->id != xa->mem.id || bq->count == XDP_BULK_QUEUE_SIZE)
> > +		xdp_flush_frame_bulk(bq);
> > +
> > +	bq->q[bq->count++] = xdpf->data;
> > +	if (mem->id != xa->mem.id)
> > +		bq->xa = nxa;
> > +
> > +	rcu_read_unlock();
> > +}
> > +EXPORT_SYMBOL_GPL(xdp_return_frame_bulk);
> > +
> >  void xdp_return_buff(struct xdp_buff *xdp)
> >  {
> >  	__xdp_return(xdp->data, &xdp->rxq->mem, true);
> > -- 
> > 2.26.2
> > 
> 
> Cheers
> /Ilias
Ilias Apalodimas Oct. 28, 2020, 10:59 a.m. UTC | #3
On Wed, Oct 28, 2020 at 11:23:04AM +0100, Lorenzo Bianconi wrote:
> > Hi Lorenzo,
> 
> Hi Ilias,
> 
> thx for the review.
> 
> > 
> > On Tue, Oct 27, 2020 at 08:04:07PM +0100, Lorenzo Bianconi wrote:
> 
> [...]
> 
> > > +void xdp_return_frame_bulk(struct xdp_frame *xdpf,
> > > +			   struct xdp_frame_bulk *bq)
> > > +{
> > > +	struct xdp_mem_info *mem = &xdpf->mem;
> > > +	struct xdp_mem_allocator *xa, *nxa;
> > > +
> > > +	if (mem->type != MEM_TYPE_PAGE_POOL) {
> > > +		__xdp_return(xdpf->data, &xdpf->mem, false);
> > > +		return;
> > > +	}
> > > +
> > > +	rcu_read_lock();
> > > +
> > > +	xa = bq->xa;
> > > +	if (unlikely(!xa || mem->id != xa->mem.id)) {
> > 
> > Why is this marked as unlikely? The driver passes it as NULL. Should unlikely be
> > checked on both xa and the comparison?
> 
> xa is NULL only for the first xdp_frame in the burst while it is set for
> subsequent ones. Do you think it is better to remove it?

Ah correct, missed the general context of the driver this runs in.

> 
> > 
> > > +		nxa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
> > 
> > Is there a chance nxa can be NULL?
> 
> I do not think so since the page_pool is not destroyed while there are
> in-flight pages, right?

I think so but I am not 100% sure. I'll apply the patch and have a closer look

Cheers
/Ilias
Lorenzo Bianconi Oct. 28, 2020, 11:28 a.m. UTC | #4
> On Wed, Oct 28, 2020 at 11:23:04AM +0100, Lorenzo Bianconi wrote:
> > > Hi Lorenzo,
> > 
> > Hi Ilias,
> > 
> > thx for the review.
> > 
> > > 
> > > On Tue, Oct 27, 2020 at 08:04:07PM +0100, Lorenzo Bianconi wrote:
> > 
> > [...]
> > 
> > > > +void xdp_return_frame_bulk(struct xdp_frame *xdpf,
> > > > +			   struct xdp_frame_bulk *bq)
> > > > +{
> > > > +	struct xdp_mem_info *mem = &xdpf->mem;
> > > > +	struct xdp_mem_allocator *xa, *nxa;
> > > > +
> > > > +	if (mem->type != MEM_TYPE_PAGE_POOL) {
> > > > +		__xdp_return(xdpf->data, &xdpf->mem, false);
> > > > +		return;
> > > > +	}
> > > > +
> > > > +	rcu_read_lock();
> > > > +
> > > > +	xa = bq->xa;
> > > > +	if (unlikely(!xa || mem->id != xa->mem.id)) {
> > > 
> > > Why is this marked as unlikely? The driver passes it as NULL. Should unlikely be
> > > checked on both xa and the comparison?
> > 
> > xa is NULL only for the first xdp_frame in the burst while it is set for
> > subsequent ones. Do you think it is better to remove it?
> 
> Ah correct, missed the general context of the driver this runs in.
> 
> > 
> > > 
> > > > +		nxa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
> > > 
> > > Is there a chance nxa can be NULL?
> > 
> > I do not think so since the page_pool is not destroyed while there are
> > in-flight pages, right?
> 
> I think so but I am not 100% sure. I'll apply the patch and have a closer look

ack, thx. I converted socionext driver to bulking APIs but I have not posted the
patch since I have not been able to test it. The code is available here:

https://github.com/LorenzoBianconi/net-next/commit/88c2995bca051fa38860acf7b915c90768460d37

Regards,
Lorenzo

> 
> Cheers
> /Ilias
Jesper Dangaard Brouer Oct. 28, 2020, 11:34 a.m. UTC | #5
On Tue, 27 Oct 2020 20:04:07 +0100
Lorenzo Bianconi <lorenzo@kernel.org> wrote:

> Introduce bulking capability in xdp tx return path (XDP_REDIRECT).
> xdp_return_frame is usually run inside the driver NAPI tx completion
> loop so it is possible batch it.
> Current implementation considers only page_pool memory model.
> Convert mvneta driver to xdp_return_frame_bulk APIs.
> 
> Suggested-by: Jesper Dangaard Brouer <brouer@redhat.com>

I think you/we have to explain better in this commit message, what the
idea/concept behind this bulk return is.  Or even explain this as a
comment above "xdp_return_frame_bulk".

Maybe add/append text to commit below:

The bulk API introduced is a defer and flush API, that will defer
the return if the xdp_mem_allocator object is the same, identified
via the mem.id field (xdp_mem_info).  Thus, the flush operation will
operate on the same xdp_mem_allocator object.

The bulk queue size of 16 is no coincident.  This is connected to how
XDP redirect will bulk xmit (upto 16) frames. Thus, the idea is for the
API to find these boundaries (via mem.id match), which is optimal for
both the I-cache and D-cache for the memory allocator code and object.

The data structure (xdp_frame_bulk) used for deferred elements is
stored/allocated on the function call-stack, which allows lockfree
access.


> Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org>
> ---
[...]
> diff --git a/include/net/xdp.h b/include/net/xdp.h
> index 3814fb631d52..9567110845ef 100644
> --- a/include/net/xdp.h
> +++ b/include/net/xdp.h
> @@ -104,6 +104,12 @@ struct xdp_frame {
>  	struct net_device *dev_rx; /* used by cpumap */
>  };
>  
> +#define XDP_BULK_QUEUE_SIZE	16

Maybe "#define DEV_MAP_BULK_SIZE 16" should be def to
XDP_BULK_QUEUE_SIZE, to express the described connection.

> +struct xdp_frame_bulk {
> +	void *q[XDP_BULK_QUEUE_SIZE];
> +	int count;
> +	void *xa;

Just a hunch (not benchmarked), but I think it will be more optimal to
place 'count' and '*xa' above the '*q' array.  (It might not matter at
all, as we cannot control the start alignment, when this is on the
stack.)

> +};
[...]

> diff --git a/net/core/xdp.c b/net/core/xdp.c
> index 48aba933a5a8..93eabd789246 100644
> --- a/net/core/xdp.c
> +++ b/net/core/xdp.c
> @@ -380,6 +380,57 @@ void xdp_return_frame_rx_napi(struct xdp_frame *xdpf)
>  }
>  EXPORT_SYMBOL_GPL(xdp_return_frame_rx_napi);
>  
> +void xdp_flush_frame_bulk(struct xdp_frame_bulk *bq)
> +{
> +	struct xdp_mem_allocator *xa = bq->xa;
> +	int i;
> +
> +	if (unlikely(!xa))
> +		return;
> +
> +	for (i = 0; i < bq->count; i++) {
> +		struct page *page = virt_to_head_page(bq->q[i]);
> +
> +		page_pool_put_full_page(xa->page_pool, page, false);
> +	}
> +	bq->count = 0;
> +}
> +EXPORT_SYMBOL_GPL(xdp_flush_frame_bulk);
> +

Wondering if we should have a comment that explains the intent and idea
behind this function?

/* Defers return when frame belongs to same mem.id as previous frame */

> +void xdp_return_frame_bulk(struct xdp_frame *xdpf,
> +			   struct xdp_frame_bulk *bq)
> +{
> +	struct xdp_mem_info *mem = &xdpf->mem;
> +	struct xdp_mem_allocator *xa, *nxa;
> +
> +	if (mem->type != MEM_TYPE_PAGE_POOL) {
> +		__xdp_return(xdpf->data, &xdpf->mem, false);
> +		return;
> +	}
> +
> +	rcu_read_lock();
> +
> +	xa = bq->xa;
> +	if (unlikely(!xa || mem->id != xa->mem.id)) {
> +		nxa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
> +		if (unlikely(!xa)) {
> +			bq->count = 0;
> +			bq->xa = nxa;
> +			xa = nxa;
> +		}
> +	}
> +
> +	if (mem->id != xa->mem.id || bq->count == XDP_BULK_QUEUE_SIZE)
> +		xdp_flush_frame_bulk(bq);
> +
> +	bq->q[bq->count++] = xdpf->data;
> +	if (mem->id != xa->mem.id)
> +		bq->xa = nxa;
> +
> +	rcu_read_unlock();
> +}
> +EXPORT_SYMBOL_GPL(xdp_return_frame_bulk);
Lorenzo Bianconi Oct. 28, 2020, 11:43 a.m. UTC | #6
> On Tue, 27 Oct 2020 20:04:07 +0100
> Lorenzo Bianconi <lorenzo@kernel.org> wrote:
> 
> > Introduce bulking capability in xdp tx return path (XDP_REDIRECT).
> > xdp_return_frame is usually run inside the driver NAPI tx completion
> > loop so it is possible batch it.
> > Current implementation considers only page_pool memory model.
> > Convert mvneta driver to xdp_return_frame_bulk APIs.
> > 
> > Suggested-by: Jesper Dangaard Brouer <brouer@redhat.com>

Hi Jesper,

thx for the review.

> 
> I think you/we have to explain better in this commit message, what the
> idea/concept behind this bulk return is.  Or even explain this as a
> comment above "xdp_return_frame_bulk".
> 
> Maybe add/append text to commit below:
> 
> The bulk API introduced is a defer and flush API, that will defer
> the return if the xdp_mem_allocator object is the same, identified
> via the mem.id field (xdp_mem_info).  Thus, the flush operation will
> operate on the same xdp_mem_allocator object.
> 
> The bulk queue size of 16 is no coincident.  This is connected to how
> XDP redirect will bulk xmit (upto 16) frames. Thus, the idea is for the
> API to find these boundaries (via mem.id match), which is optimal for
> both the I-cache and D-cache for the memory allocator code and object.
> 
> The data structure (xdp_frame_bulk) used for deferred elements is
> stored/allocated on the function call-stack, which allows lockfree
> access.

ack, I will add it in v2

> 
> 
> > Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org>
> > ---
> [...]
> > diff --git a/include/net/xdp.h b/include/net/xdp.h
> > index 3814fb631d52..9567110845ef 100644
> > --- a/include/net/xdp.h
> > +++ b/include/net/xdp.h
> > @@ -104,6 +104,12 @@ struct xdp_frame {
> >  	struct net_device *dev_rx; /* used by cpumap */
> >  };
> >  
> > +#define XDP_BULK_QUEUE_SIZE	16
> 
> Maybe "#define DEV_MAP_BULK_SIZE 16" should be def to
> XDP_BULK_QUEUE_SIZE, to express the described connection.

ack, I guess we can fix it in a following patch

> 
> > +struct xdp_frame_bulk {
> > +	void *q[XDP_BULK_QUEUE_SIZE];
> > +	int count;
> > +	void *xa;
> 
> Just a hunch (not benchmarked), but I think it will be more optimal to
> place 'count' and '*xa' above the '*q' array.  (It might not matter at
> all, as we cannot control the start alignment, when this is on the
> stack.)

ack. I will fix in v2.

> 
> > +};
> [...]
> 
> > diff --git a/net/core/xdp.c b/net/core/xdp.c
> > index 48aba933a5a8..93eabd789246 100644
> > --- a/net/core/xdp.c
> > +++ b/net/core/xdp.c
> > @@ -380,6 +380,57 @@ void xdp_return_frame_rx_napi(struct xdp_frame *xdpf)
> >  }
> >  EXPORT_SYMBOL_GPL(xdp_return_frame_rx_napi);
> >  
> > +void xdp_flush_frame_bulk(struct xdp_frame_bulk *bq)
> > +{
> > +	struct xdp_mem_allocator *xa = bq->xa;
> > +	int i;
> > +
> > +	if (unlikely(!xa))
> > +		return;
> > +
> > +	for (i = 0; i < bq->count; i++) {
> > +		struct page *page = virt_to_head_page(bq->q[i]);
> > +
> > +		page_pool_put_full_page(xa->page_pool, page, false);
> > +	}
> > +	bq->count = 0;
> > +}
> > +EXPORT_SYMBOL_GPL(xdp_flush_frame_bulk);
> > +
> 
> Wondering if we should have a comment that explains the intent and idea
> behind this function?
> 
> /* Defers return when frame belongs to same mem.id as previous frame */
> 

ack.

Regards,
Lorenzo

> > +void xdp_return_frame_bulk(struct xdp_frame *xdpf,
> > +			   struct xdp_frame_bulk *bq)
> > +{
> > +	struct xdp_mem_info *mem = &xdpf->mem;
> > +	struct xdp_mem_allocator *xa, *nxa;
> > +
> > +	if (mem->type != MEM_TYPE_PAGE_POOL) {
> > +		__xdp_return(xdpf->data, &xdpf->mem, false);
> > +		return;
> > +	}
> > +
> > +	rcu_read_lock();
> > +
> > +	xa = bq->xa;
> > +	if (unlikely(!xa || mem->id != xa->mem.id)) {
> > +		nxa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
> > +		if (unlikely(!xa)) {
> > +			bq->count = 0;
> > +			bq->xa = nxa;
> > +			xa = nxa;
> > +		}
> > +	}
> > +
> > +	if (mem->id != xa->mem.id || bq->count == XDP_BULK_QUEUE_SIZE)
> > +		xdp_flush_frame_bulk(bq);
> > +
> > +	bq->q[bq->count++] = xdpf->data;
> > +	if (mem->id != xa->mem.id)
> > +		bq->xa = nxa;
> > +
> > +	rcu_read_unlock();
> > +}
> > +EXPORT_SYMBOL_GPL(xdp_return_frame_bulk);
> 
> 
> -- 
> Best regards,
>   Jesper Dangaard Brouer
>   MSc.CS, Principal Kernel Engineer at Red Hat
>   LinkedIn: http://www.linkedin.com/in/brouer
>
Jesper Dangaard Brouer Oct. 29, 2020, 1:52 p.m. UTC | #7
On Tue, 27 Oct 2020 20:04:07 +0100
Lorenzo Bianconi <lorenzo@kernel.org> wrote:

> diff --git a/net/core/xdp.c b/net/core/xdp.c
> index 48aba933a5a8..93eabd789246 100644
> --- a/net/core/xdp.c
> +++ b/net/core/xdp.c
> @@ -380,6 +380,57 @@ void xdp_return_frame_rx_napi(struct xdp_frame *xdpf)
>  }
>  EXPORT_SYMBOL_GPL(xdp_return_frame_rx_napi);
>  
> +void xdp_flush_frame_bulk(struct xdp_frame_bulk *bq)
> +{
> +	struct xdp_mem_allocator *xa = bq->xa;
> +	int i;
> +
> +	if (unlikely(!xa))
> +		return;
> +
> +	for (i = 0; i < bq->count; i++) {
> +		struct page *page = virt_to_head_page(bq->q[i]);
> +
> +		page_pool_put_full_page(xa->page_pool, page, false);
> +	}
> +	bq->count = 0;
> +}
> +EXPORT_SYMBOL_GPL(xdp_flush_frame_bulk);
> +
> +void xdp_return_frame_bulk(struct xdp_frame *xdpf,
> +			   struct xdp_frame_bulk *bq)
> +{
> +	struct xdp_mem_info *mem = &xdpf->mem;
> +	struct xdp_mem_allocator *xa, *nxa;
> +
> +	if (mem->type != MEM_TYPE_PAGE_POOL) {
> +		__xdp_return(xdpf->data, &xdpf->mem, false);
> +		return;
> +	}
> +
> +	rcu_read_lock();
> +
> +	xa = bq->xa;
> +	if (unlikely(!xa || mem->id != xa->mem.id)) {
> +		nxa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
> +		if (unlikely(!xa)) {
> +			bq->count = 0;
> +			bq->xa = nxa;
> +			xa = nxa;
> +		}
> +	}
> +
> +	if (mem->id != xa->mem.id || bq->count == XDP_BULK_QUEUE_SIZE)
> +		xdp_flush_frame_bulk(bq);
> +
> +	bq->q[bq->count++] = xdpf->data;
> +	if (mem->id != xa->mem.id)
> +		bq->xa = nxa;
> +
> +	rcu_read_unlock();
> +}
> +EXPORT_SYMBOL_GPL(xdp_return_frame_bulk);

We (Ilias my co-maintainer and I) think above code is hard to read and
understand (as a reader you need to keep too many cases in your head).

I think we both have proposals to improve this, here is mine:

/* Defers return when frame belongs to same mem.id as previous frame */
void xdp_return_frame_bulk(struct xdp_frame *xdpf,
                           struct xdp_frame_bulk *bq)
{
        struct xdp_mem_info *mem = &xdpf->mem;
        struct xdp_mem_allocator *xa;

        if (mem->type != MEM_TYPE_PAGE_POOL) {
                __xdp_return(xdpf->data, &xdpf->mem, false);
                return;
        }

        rcu_read_lock();

        xa = bq->xa;
        if (unlikely(!xa)) {
		xa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
                bq->count = 0;
                bq->xa = xa;
        }

        if (bq->count == XDP_BULK_QUEUE_SIZE)
                xdp_flush_frame_bulk(bq);

        if (mem->id != xa->mem.id) {
		xdp_flush_frame_bulk(bq);
		bq->xa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
        }

	bq->q[bq->count++] = xdpf->data;

        rcu_read_unlock();
}

Please review for correctness, and also for readability.
Lorenzo Bianconi Oct. 29, 2020, 2:02 p.m. UTC | #8
> On Tue, 27 Oct 2020 20:04:07 +0100
> Lorenzo Bianconi <lorenzo@kernel.org> wrote:
> 
> > diff --git a/net/core/xdp.c b/net/core/xdp.c
> > index 48aba933a5a8..93eabd789246 100644
> > --- a/net/core/xdp.c
> > +++ b/net/core/xdp.c
> > @@ -380,6 +380,57 @@ void xdp_return_frame_rx_napi(struct xdp_frame *xdpf)
> >  }
> >  EXPORT_SYMBOL_GPL(xdp_return_frame_rx_napi);
> >  
> > +void xdp_flush_frame_bulk(struct xdp_frame_bulk *bq)
> > +{
> > +	struct xdp_mem_allocator *xa = bq->xa;
> > +	int i;
> > +
> > +	if (unlikely(!xa))
> > +		return;
> > +
> > +	for (i = 0; i < bq->count; i++) {
> > +		struct page *page = virt_to_head_page(bq->q[i]);
> > +
> > +		page_pool_put_full_page(xa->page_pool, page, false);
> > +	}
> > +	bq->count = 0;
> > +}
> > +EXPORT_SYMBOL_GPL(xdp_flush_frame_bulk);
> > +
> > +void xdp_return_frame_bulk(struct xdp_frame *xdpf,
> > +			   struct xdp_frame_bulk *bq)
> > +{
> > +	struct xdp_mem_info *mem = &xdpf->mem;
> > +	struct xdp_mem_allocator *xa, *nxa;
> > +
> > +	if (mem->type != MEM_TYPE_PAGE_POOL) {
> > +		__xdp_return(xdpf->data, &xdpf->mem, false);
> > +		return;
> > +	}
> > +
> > +	rcu_read_lock();
> > +
> > +	xa = bq->xa;
> > +	if (unlikely(!xa || mem->id != xa->mem.id)) {
> > +		nxa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
> > +		if (unlikely(!xa)) {
> > +			bq->count = 0;
> > +			bq->xa = nxa;
> > +			xa = nxa;
> > +		}
> > +	}
> > +
> > +	if (mem->id != xa->mem.id || bq->count == XDP_BULK_QUEUE_SIZE)
> > +		xdp_flush_frame_bulk(bq);
> > +
> > +	bq->q[bq->count++] = xdpf->data;
> > +	if (mem->id != xa->mem.id)
> > +		bq->xa = nxa;
> > +
> > +	rcu_read_unlock();
> > +}
> > +EXPORT_SYMBOL_GPL(xdp_return_frame_bulk);
> 
> We (Ilias my co-maintainer and I) think above code is hard to read and
> understand (as a reader you need to keep too many cases in your head).
> 
> I think we both have proposals to improve this, here is mine:
> 
> /* Defers return when frame belongs to same mem.id as previous frame */
> void xdp_return_frame_bulk(struct xdp_frame *xdpf,
>                            struct xdp_frame_bulk *bq)
> {
>         struct xdp_mem_info *mem = &xdpf->mem;
>         struct xdp_mem_allocator *xa;
> 
>         if (mem->type != MEM_TYPE_PAGE_POOL) {
>                 __xdp_return(xdpf->data, &xdpf->mem, false);
>                 return;
>         }
> 
>         rcu_read_lock();
> 
>         xa = bq->xa;
>         if (unlikely(!xa)) {
> 		xa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
>                 bq->count = 0;
>                 bq->xa = xa;
>         }
> 
>         if (bq->count == XDP_BULK_QUEUE_SIZE)
>                 xdp_flush_frame_bulk(bq);
> 
>         if (mem->id != xa->mem.id) {
> 		xdp_flush_frame_bulk(bq);
> 		bq->xa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
>         }
> 
> 	bq->q[bq->count++] = xdpf->data;
> 
>         rcu_read_unlock();
> }
> 
> Please review for correctness, and also for readability.

the code seems fine to me (and even easier to read :)).
I will update v2 using this approach. Thx.

Regards,
Lorenzo

> 
> -- 
> Best regards,
>   Jesper Dangaard Brouer
>   MSc.CS, Principal Kernel Engineer at Red Hat
>   LinkedIn: http://www.linkedin.com/in/brouer
>
Ilias Apalodimas Oct. 29, 2020, 2:08 p.m. UTC | #9
On Thu, Oct 29, 2020 at 03:02:16PM +0100, Lorenzo Bianconi wrote:
> > On Tue, 27 Oct 2020 20:04:07 +0100
> > Lorenzo Bianconi <lorenzo@kernel.org> wrote:
> > 
> > > diff --git a/net/core/xdp.c b/net/core/xdp.c
> > > index 48aba933a5a8..93eabd789246 100644
> > > --- a/net/core/xdp.c
> > > +++ b/net/core/xdp.c
> > > @@ -380,6 +380,57 @@ void xdp_return_frame_rx_napi(struct xdp_frame *xdpf)
> > >  }
> > >  EXPORT_SYMBOL_GPL(xdp_return_frame_rx_napi);
> > >  
> > > +void xdp_flush_frame_bulk(struct xdp_frame_bulk *bq)
> > > +{
> > > +	struct xdp_mem_allocator *xa = bq->xa;
> > > +	int i;
> > > +
> > > +	if (unlikely(!xa))
> > > +		return;
> > > +
> > > +	for (i = 0; i < bq->count; i++) {
> > > +		struct page *page = virt_to_head_page(bq->q[i]);
> > > +
> > > +		page_pool_put_full_page(xa->page_pool, page, false);
> > > +	}
> > > +	bq->count = 0;
> > > +}
> > > +EXPORT_SYMBOL_GPL(xdp_flush_frame_bulk);
> > > +
> > > +void xdp_return_frame_bulk(struct xdp_frame *xdpf,
> > > +			   struct xdp_frame_bulk *bq)
> > > +{
> > > +	struct xdp_mem_info *mem = &xdpf->mem;
> > > +	struct xdp_mem_allocator *xa, *nxa;
> > > +
> > > +	if (mem->type != MEM_TYPE_PAGE_POOL) {
> > > +		__xdp_return(xdpf->data, &xdpf->mem, false);
> > > +		return;
> > > +	}
> > > +
> > > +	rcu_read_lock();
> > > +
> > > +	xa = bq->xa;
> > > +	if (unlikely(!xa || mem->id != xa->mem.id)) {
> > > +		nxa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
> > > +		if (unlikely(!xa)) {
> > > +			bq->count = 0;
> > > +			bq->xa = nxa;
> > > +			xa = nxa;
> > > +		}
> > > +	}
> > > +
> > > +	if (mem->id != xa->mem.id || bq->count == XDP_BULK_QUEUE_SIZE)
> > > +		xdp_flush_frame_bulk(bq);
> > > +
> > > +	bq->q[bq->count++] = xdpf->data;
> > > +	if (mem->id != xa->mem.id)
> > > +		bq->xa = nxa;
> > > +
> > > +	rcu_read_unlock();
> > > +}
> > > +EXPORT_SYMBOL_GPL(xdp_return_frame_bulk);
> > 
> > We (Ilias my co-maintainer and I) think above code is hard to read and
> > understand (as a reader you need to keep too many cases in your head).
> > 
> > I think we both have proposals to improve this, here is mine:
> > 
> > /* Defers return when frame belongs to same mem.id as previous frame */
> > void xdp_return_frame_bulk(struct xdp_frame *xdpf,
> >                            struct xdp_frame_bulk *bq)
> > {
> >         struct xdp_mem_info *mem = &xdpf->mem;
> >         struct xdp_mem_allocator *xa;
> > 
> >         if (mem->type != MEM_TYPE_PAGE_POOL) {
> >                 __xdp_return(xdpf->data, &xdpf->mem, false);
> >                 return;
> >         }
> > 
> >         rcu_read_lock();
> > 
> >         xa = bq->xa;
> >         if (unlikely(!xa)) {
> > 		xa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
> >                 bq->count = 0;
> >                 bq->xa = xa;
> >         }
> > 
> >         if (bq->count == XDP_BULK_QUEUE_SIZE)
> >                 xdp_flush_frame_bulk(bq);
> > 
> >         if (mem->id != xa->mem.id) {
> > 		xdp_flush_frame_bulk(bq);
> > 		bq->xa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
> >         }
> > 
> > 	bq->q[bq->count++] = xdpf->data;
> > 
> >         rcu_read_unlock();
> > }
> > 
> > Please review for correctness, and also for readability.
> 
> the code seems fine to me (and even easier to read :)).
> I will update v2 using this approach. Thx.
+1 this is close to what we discussed this morning and it detangles 1 more 'weird' 
if case 


Thanks
/Ilias
> 
> Regards,
> Lorenzo
> 
> > 
> > -- 
> > Best regards,
> >   Jesper Dangaard Brouer
> >   MSc.CS, Principal Kernel Engineer at Red Hat
> >   LinkedIn: http://www.linkedin.com/in/brouer
> >
diff mbox series

Patch

diff --git a/drivers/net/ethernet/marvell/mvneta.c b/drivers/net/ethernet/marvell/mvneta.c
index 54b0bf574c05..43ab8a73900e 100644
--- a/drivers/net/ethernet/marvell/mvneta.c
+++ b/drivers/net/ethernet/marvell/mvneta.c
@@ -1834,8 +1834,10 @@  static void mvneta_txq_bufs_free(struct mvneta_port *pp,
 				 struct netdev_queue *nq, bool napi)
 {
 	unsigned int bytes_compl = 0, pkts_compl = 0;
+	struct xdp_frame_bulk bq;
 	int i;
 
+	bq.xa = NULL;
 	for (i = 0; i < num; i++) {
 		struct mvneta_tx_buf *buf = &txq->buf[txq->txq_get_index];
 		struct mvneta_tx_desc *tx_desc = txq->descs +
@@ -1857,9 +1859,10 @@  static void mvneta_txq_bufs_free(struct mvneta_port *pp,
 			if (napi && buf->type == MVNETA_TYPE_XDP_TX)
 				xdp_return_frame_rx_napi(buf->xdpf);
 			else
-				xdp_return_frame(buf->xdpf);
+				xdp_return_frame_bulk(buf->xdpf, &bq);
 		}
 	}
+	xdp_flush_frame_bulk(&bq);
 
 	netdev_tx_completed_queue(nq, pkts_compl, bytes_compl);
 }
diff --git a/include/net/xdp.h b/include/net/xdp.h
index 3814fb631d52..9567110845ef 100644
--- a/include/net/xdp.h
+++ b/include/net/xdp.h
@@ -104,6 +104,12 @@  struct xdp_frame {
 	struct net_device *dev_rx; /* used by cpumap */
 };
 
+#define XDP_BULK_QUEUE_SIZE	16
+struct xdp_frame_bulk {
+	void *q[XDP_BULK_QUEUE_SIZE];
+	int count;
+	void *xa;
+};
 
 static inline struct skb_shared_info *
 xdp_get_shared_info_from_frame(struct xdp_frame *frame)
@@ -194,6 +200,9 @@  struct xdp_frame *xdp_convert_buff_to_frame(struct xdp_buff *xdp)
 void xdp_return_frame(struct xdp_frame *xdpf);
 void xdp_return_frame_rx_napi(struct xdp_frame *xdpf);
 void xdp_return_buff(struct xdp_buff *xdp);
+void xdp_flush_frame_bulk(struct xdp_frame_bulk *bq);
+void xdp_return_frame_bulk(struct xdp_frame *xdpf,
+			   struct xdp_frame_bulk *bq);
 
 /* When sending xdp_frame into the network stack, then there is no
  * return point callback, which is needed to release e.g. DMA-mapping
diff --git a/net/core/xdp.c b/net/core/xdp.c
index 48aba933a5a8..93eabd789246 100644
--- a/net/core/xdp.c
+++ b/net/core/xdp.c
@@ -380,6 +380,57 @@  void xdp_return_frame_rx_napi(struct xdp_frame *xdpf)
 }
 EXPORT_SYMBOL_GPL(xdp_return_frame_rx_napi);
 
+void xdp_flush_frame_bulk(struct xdp_frame_bulk *bq)
+{
+	struct xdp_mem_allocator *xa = bq->xa;
+	int i;
+
+	if (unlikely(!xa))
+		return;
+
+	for (i = 0; i < bq->count; i++) {
+		struct page *page = virt_to_head_page(bq->q[i]);
+
+		page_pool_put_full_page(xa->page_pool, page, false);
+	}
+	bq->count = 0;
+}
+EXPORT_SYMBOL_GPL(xdp_flush_frame_bulk);
+
+void xdp_return_frame_bulk(struct xdp_frame *xdpf,
+			   struct xdp_frame_bulk *bq)
+{
+	struct xdp_mem_info *mem = &xdpf->mem;
+	struct xdp_mem_allocator *xa, *nxa;
+
+	if (mem->type != MEM_TYPE_PAGE_POOL) {
+		__xdp_return(xdpf->data, &xdpf->mem, false);
+		return;
+	}
+
+	rcu_read_lock();
+
+	xa = bq->xa;
+	if (unlikely(!xa || mem->id != xa->mem.id)) {
+		nxa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params);
+		if (unlikely(!xa)) {
+			bq->count = 0;
+			bq->xa = nxa;
+			xa = nxa;
+		}
+	}
+
+	if (mem->id != xa->mem.id || bq->count == XDP_BULK_QUEUE_SIZE)
+		xdp_flush_frame_bulk(bq);
+
+	bq->q[bq->count++] = xdpf->data;
+	if (mem->id != xa->mem.id)
+		bq->xa = nxa;
+
+	rcu_read_unlock();
+}
+EXPORT_SYMBOL_GPL(xdp_return_frame_bulk);
+
 void xdp_return_buff(struct xdp_buff *xdp)
 {
 	__xdp_return(xdp->data, &xdp->rxq->mem, true);