Message ID | 1229970bf6f36fd4689169a2e47fdcc664d28366.1605020963.git.lorenzo@kernel.org (mailing list archive) |
---|---|
State | Superseded |
Delegated to: | Netdev Maintainers |
Headers | show |
Series | xdp: introduce bulking for page_pool tx return path | expand |
Context | Check | Description |
---|---|---|
netdev/cover_letter | success | Link |
netdev/fixes_present | fail | Series targets non-next tree, but doesn't contain any Fixes tags |
netdev/patch_count | success | Link |
netdev/tree_selection | success | Clearly marked for net |
netdev/subject_prefix | success | Link |
netdev/source_inline | success | Was 0 now: 0 |
netdev/verify_signedoff | success | Link |
netdev/module_param | success | Was 0 now: 0 |
netdev/build_32bit | success | Errors and warnings before: 28 this patch: 28 |
netdev/kdoc | success | Errors and warnings before: 0 this patch: 0 |
netdev/verify_fixes | success | Link |
netdev/checkpatch | success | total: 0 errors, 0 warnings, 0 checks, 157 lines checked |
netdev/build_allmodconfig_warn | success | Errors and warnings before: 34 this patch: 34 |
netdev/header_inline | success | Link |
netdev/stable | success | Stable not CCed |
Lorenzo Bianconi wrote: > Introduce the capability to batch page_pool ptr_ring refill since it is > usually run inside the driver NAPI tx completion loop. > > Suggested-by: Jesper Dangaard Brouer <brouer@redhat.com> > Co-developed-by: Jesper Dangaard Brouer <brouer@redhat.com> > Signed-off-by: Jesper Dangaard Brouer <brouer@redhat.com> > Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org> > --- > include/net/page_pool.h | 26 ++++++++++++++++ > net/core/page_pool.c | 69 +++++++++++++++++++++++++++++++++++------ > net/core/xdp.c | 9 ++---- > 3 files changed, 87 insertions(+), 17 deletions(-) [...] > +/* Caller must not use data area after call, as this function overwrites it */ > +void page_pool_put_page_bulk(struct page_pool *pool, void **data, > + int count) > +{ > + int i, bulk_len = 0, pa_len = 0; > + > + for (i = 0; i < count; i++) { > + struct page *page = virt_to_head_page(data[i]); > + > + page = __page_pool_put_page(pool, page, -1, false); > + /* Approved for bulk recycling in ptr_ring cache */ > + if (page) > + data[bulk_len++] = page; > + } > + > + if (unlikely(!bulk_len)) > + return; > + > + /* Bulk producer into ptr_ring page_pool cache */ > + page_pool_ring_lock(pool); > + for (i = 0; i < bulk_len; i++) { > + if (__ptr_ring_produce(&pool->ring, data[i])) > + data[pa_len++] = data[i]; How about bailing out on the first error? bulk_len should be less than 16 right, so should we really keep retying hoping ring->size changes? > + } > + page_pool_ring_unlock(pool); > + > + if (likely(!pa_len)) > + return; > + > + /* ptr_ring cache full, free pages outside producer lock since > + * put_page() with refcnt == 1 can be an expensive operation > + */ > + for (i = 0; i < pa_len; i++) > + page_pool_return_page(pool, data[i]); > +} > +EXPORT_SYMBOL(page_pool_put_page_bulk); > + Otherwise LGTM.
> Lorenzo Bianconi wrote: > > Introduce the capability to batch page_pool ptr_ring refill since it is > > usually run inside the driver NAPI tx completion loop. > > > > Suggested-by: Jesper Dangaard Brouer <brouer@redhat.com> > > Co-developed-by: Jesper Dangaard Brouer <brouer@redhat.com> > > Signed-off-by: Jesper Dangaard Brouer <brouer@redhat.com> > > Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org> > > --- > > include/net/page_pool.h | 26 ++++++++++++++++ > > net/core/page_pool.c | 69 +++++++++++++++++++++++++++++++++++------ > > net/core/xdp.c | 9 ++---- > > 3 files changed, 87 insertions(+), 17 deletions(-) > > [...] > > > +/* Caller must not use data area after call, as this function overwrites it */ > > +void page_pool_put_page_bulk(struct page_pool *pool, void **data, > > + int count) > > +{ > > + int i, bulk_len = 0, pa_len = 0; > > + > > + for (i = 0; i < count; i++) { > > + struct page *page = virt_to_head_page(data[i]); > > + > > + page = __page_pool_put_page(pool, page, -1, false); > > + /* Approved for bulk recycling in ptr_ring cache */ > > + if (page) > > + data[bulk_len++] = page; > > + } > > + > > + if (unlikely(!bulk_len)) > > + return; > > + > > + /* Bulk producer into ptr_ring page_pool cache */ > > + page_pool_ring_lock(pool); > > + for (i = 0; i < bulk_len; i++) { > > + if (__ptr_ring_produce(&pool->ring, data[i])) > > + data[pa_len++] = data[i]; > > How about bailing out on the first error? bulk_len should be less than > 16 right, so should we really keep retying hoping ring->size changes? do you mean doing something like: page_pool_ring_lock(pool); if (__ptr_ring_full(&pool->ring)) { pa_len = bulk_len; page_pool_ring_unlock(pool); goto out; } ... out: for (i = 0; i < pa_len; i++) { ... } I do not know if it is better or not since the consumer can run in parallel. @Jesper/Ilias: any idea? Regards, Lorenzo > > > + } > > + page_pool_ring_unlock(pool); > > + > > + if (likely(!pa_len)) > > + return; > > + > > + /* ptr_ring cache full, free pages outside producer lock since > > + * put_page() with refcnt == 1 can be an expensive operation > > + */ > > + for (i = 0; i < pa_len; i++) > > + page_pool_return_page(pool, data[i]); > > +} > > +EXPORT_SYMBOL(page_pool_put_page_bulk); > > + > > Otherwise LGTM.
On Wed, 11 Nov 2020 11:43:31 +0100 Lorenzo Bianconi <lorenzo@kernel.org> wrote: > > Lorenzo Bianconi wrote: > > > Introduce the capability to batch page_pool ptr_ring refill since it is > > > usually run inside the driver NAPI tx completion loop. > > > > > > Suggested-by: Jesper Dangaard Brouer <brouer@redhat.com> > > > Co-developed-by: Jesper Dangaard Brouer <brouer@redhat.com> > > > Signed-off-by: Jesper Dangaard Brouer <brouer@redhat.com> > > > Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org> > > > --- > > > include/net/page_pool.h | 26 ++++++++++++++++ > > > net/core/page_pool.c | 69 +++++++++++++++++++++++++++++++++++------ > > > net/core/xdp.c | 9 ++---- > > > 3 files changed, 87 insertions(+), 17 deletions(-) > > > > [...] > > > > > +/* Caller must not use data area after call, as this function overwrites it */ > > > +void page_pool_put_page_bulk(struct page_pool *pool, void **data, > > > + int count) > > > +{ > > > + int i, bulk_len = 0, pa_len = 0; > > > + > > > + for (i = 0; i < count; i++) { > > > + struct page *page = virt_to_head_page(data[i]); > > > + > > > + page = __page_pool_put_page(pool, page, -1, false); > > > + /* Approved for bulk recycling in ptr_ring cache */ > > > + if (page) > > > + data[bulk_len++] = page; > > > + } > > > + > > > + if (unlikely(!bulk_len)) > > > + return; > > > + > > > + /* Bulk producer into ptr_ring page_pool cache */ > > > + page_pool_ring_lock(pool); > > > + for (i = 0; i < bulk_len; i++) { > > > + if (__ptr_ring_produce(&pool->ring, data[i])) > > > + data[pa_len++] = data[i]; > > > > How about bailing out on the first error? bulk_len should be less than > > 16 right, so should we really keep retying hoping ring->size changes? > > do you mean doing something like: > > page_pool_ring_lock(pool); > if (__ptr_ring_full(&pool->ring)) { > pa_len = bulk_len; > page_pool_ring_unlock(pool); > goto out; > } > ... > out: > for (i = 0; i < pa_len; i++) { > ... > } I think this is the change John is looking for: diff --git a/net/core/page_pool.c b/net/core/page_pool.c index a06606f07df0..3093fe4e1cd7 100644 --- a/net/core/page_pool.c +++ b/net/core/page_pool.c @@ -424,7 +424,7 @@ EXPORT_SYMBOL(page_pool_put_page); void page_pool_put_page_bulk(struct page_pool *pool, void **data, int count) { - int i, bulk_len = 0, pa_len = 0; + int i, bulk_len = 0; bool order0 = (pool->p.order == 0); for (i = 0; i < count; i++) { @@ -448,17 +448,18 @@ void page_pool_put_page_bulk(struct page_pool *pool, void **data, page_pool_ring_lock(pool); for (i = 0; i < bulk_len; i++) { if (__ptr_ring_produce(&pool->ring, data[i])) - data[pa_len++] = data[i]; + break; /* ring_full */ } page_pool_ring_unlock(pool); - if (likely(!pa_len)) + /* Hopefully all pages was return into ptr_ring */ + if (likely(i == bulk_len)) return; - /* ptr_ring cache full, free pages outside producer lock since - * put_page() with refcnt == 1 can be an expensive operation + /* ptr_ring cache full, free remaining pages outside producer lock + * since put_page() with refcnt == 1 can be an expensive operation */ - for (i = 0; i < pa_len; i++) + for (; i < bulk_len; i++) page_pool_return_page(pool, data[i]); } EXPORT_SYMBOL(page_pool_put_page_bulk); > I do not know if it is better or not since the consumer can run in > parallel. @Jesper/Ilias: any idea? Currently it is not very likely that the consumer runs in parallel, but is it possible. (As you experienced on your testlab with mlx5, the DMA-TX completion did run on another CPU, but I asked you to reconfigure this to have it run on same CPU, as it is suboptimal). When we (finally) support this memory type for SKBs it will be more normal to happen. But, John is right, for ptr_ring we should exit as soon the first "produce" fails. This is because I know how ptr_ring works internally. The "consumer" will free slots in chunks of 16 slots, so it is not very likely that a slot opens up. > > > > > + } > > > + page_pool_ring_unlock(pool); > > > + > > > + if (likely(!pa_len)) > > > + return; > > > + > > > + /* ptr_ring cache full, free pages outside producer lock since > > > + * put_page() with refcnt == 1 can be an expensive operation > > > + */ > > > + for (i = 0; i < pa_len; i++) > > > + page_pool_return_page(pool, data[i]); > > > +} > > > +EXPORT_SYMBOL(page_pool_put_page_bulk); > > > + > > > > Otherwise LGTM.
diff --git a/include/net/page_pool.h b/include/net/page_pool.h index 81d7773f96cd..b5b195305346 100644 --- a/include/net/page_pool.h +++ b/include/net/page_pool.h @@ -152,6 +152,8 @@ struct page_pool *page_pool_create(const struct page_pool_params *params); void page_pool_destroy(struct page_pool *pool); void page_pool_use_xdp_mem(struct page_pool *pool, void (*disconnect)(void *)); void page_pool_release_page(struct page_pool *pool, struct page *page); +void page_pool_put_page_bulk(struct page_pool *pool, void **data, + int count); #else static inline void page_pool_destroy(struct page_pool *pool) { @@ -165,6 +167,11 @@ static inline void page_pool_release_page(struct page_pool *pool, struct page *page) { } + +static inline void page_pool_put_page_bulk(struct page_pool *pool, void **data, + int count) +{ +} #endif void page_pool_put_page(struct page_pool *pool, struct page *page, @@ -215,4 +222,23 @@ static inline void page_pool_nid_changed(struct page_pool *pool, int new_nid) if (unlikely(pool->p.nid != new_nid)) page_pool_update_nid(pool, new_nid); } + +static inline void page_pool_ring_lock(struct page_pool *pool) + __acquires(&pool->ring.producer_lock) +{ + if (in_serving_softirq()) + spin_lock(&pool->ring.producer_lock); + else + spin_lock_bh(&pool->ring.producer_lock); +} + +static inline void page_pool_ring_unlock(struct page_pool *pool) + __releases(&pool->ring.producer_lock) +{ + if (in_serving_softirq()) + spin_unlock(&pool->ring.producer_lock); + else + spin_unlock_bh(&pool->ring.producer_lock); +} + #endif /* _NET_PAGE_POOL_H */ diff --git a/net/core/page_pool.c b/net/core/page_pool.c index ef98372facf6..a3e6051ac978 100644 --- a/net/core/page_pool.c +++ b/net/core/page_pool.c @@ -11,6 +11,8 @@ #include <linux/device.h> #include <net/page_pool.h> +#include <net/xdp.h> + #include <linux/dma-direction.h> #include <linux/dma-mapping.h> #include <linux/page-flags.h> @@ -362,8 +364,9 @@ static bool pool_page_reusable(struct page_pool *pool, struct page *page) * If the page refcnt != 1, then the page will be returned to memory * subsystem. */ -void page_pool_put_page(struct page_pool *pool, struct page *page, - unsigned int dma_sync_size, bool allow_direct) +static __always_inline struct page * +__page_pool_put_page(struct page_pool *pool, struct page *page, + unsigned int dma_sync_size, bool allow_direct) { /* This allocator is optimized for the XDP mode that uses * one-frame-per-page, but have fallbacks that act like the @@ -379,15 +382,12 @@ void page_pool_put_page(struct page_pool *pool, struct page *page, page_pool_dma_sync_for_device(pool, page, dma_sync_size); - if (allow_direct && in_serving_softirq()) - if (page_pool_recycle_in_cache(page, pool)) - return; + if (allow_direct && in_serving_softirq() && + page_pool_recycle_in_cache(page, pool)) + return NULL; - if (!page_pool_recycle_in_ring(pool, page)) { - /* Cache full, fallback to free pages */ - page_pool_return_page(pool, page); - } - return; + /* Page found as candidate for recycling */ + return page; } /* Fallback/non-XDP mode: API user have elevated refcnt. * @@ -405,9 +405,58 @@ void page_pool_put_page(struct page_pool *pool, struct page *page, /* Do not replace this with page_pool_return_page() */ page_pool_release_page(pool, page); put_page(page); + + return NULL; +} + +void page_pool_put_page(struct page_pool *pool, struct page *page, + unsigned int dma_sync_size, bool allow_direct) +{ + page = __page_pool_put_page(pool, page, dma_sync_size, allow_direct); + if (page && !page_pool_recycle_in_ring(pool, page)) { + /* Cache full, fallback to free pages */ + page_pool_return_page(pool, page); + } } EXPORT_SYMBOL(page_pool_put_page); +/* Caller must not use data area after call, as this function overwrites it */ +void page_pool_put_page_bulk(struct page_pool *pool, void **data, + int count) +{ + int i, bulk_len = 0, pa_len = 0; + + for (i = 0; i < count; i++) { + struct page *page = virt_to_head_page(data[i]); + + page = __page_pool_put_page(pool, page, -1, false); + /* Approved for bulk recycling in ptr_ring cache */ + if (page) + data[bulk_len++] = page; + } + + if (unlikely(!bulk_len)) + return; + + /* Bulk producer into ptr_ring page_pool cache */ + page_pool_ring_lock(pool); + for (i = 0; i < bulk_len; i++) { + if (__ptr_ring_produce(&pool->ring, data[i])) + data[pa_len++] = data[i]; + } + page_pool_ring_unlock(pool); + + if (likely(!pa_len)) + return; + + /* ptr_ring cache full, free pages outside producer lock since + * put_page() with refcnt == 1 can be an expensive operation + */ + for (i = 0; i < pa_len; i++) + page_pool_return_page(pool, data[i]); +} +EXPORT_SYMBOL(page_pool_put_page_bulk); + static void page_pool_empty_ring(struct page_pool *pool) { struct page *page; diff --git a/net/core/xdp.c b/net/core/xdp.c index bbaee7fdd44f..3d330ebda893 100644 --- a/net/core/xdp.c +++ b/net/core/xdp.c @@ -393,16 +393,11 @@ EXPORT_SYMBOL_GPL(xdp_return_frame_rx_napi); void xdp_flush_frame_bulk(struct xdp_frame_bulk *bq) { struct xdp_mem_allocator *xa = bq->xa; - int i; - if (unlikely(!xa)) + if (unlikely(!xa || !bq->count)) return; - for (i = 0; i < bq->count; i++) { - struct page *page = virt_to_head_page(bq->q[i]); - - page_pool_put_full_page(xa->page_pool, page, false); - } + page_pool_put_page_bulk(xa->page_pool, bq->q, bq->count); /* bq->xa is not cleared to save lookup, if mem.id same in next bulk */ bq->count = 0; }