@@ -188,6 +188,19 @@ void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
return ret;
}
+/* Called from the network driver, in napi context. */
+u64 io_zctap_buffer(struct io_kiocb *req, size_t *len)
+{
+ struct io_ring_ctx *ctx = req->ctx;
+ struct io_buffer_list *bl;
+ void __user *ret = NULL;
+
+ bl = io_buffer_get_list(ctx, req->buf_index);
+ if (likely(bl))
+ ret = io_ring_buffer_select(req, len, bl, IO_URING_F_UNLOCKED);
+ return (u64)ret;
+}
+
static __cold int io_init_bl_list(struct io_ring_ctx *ctx)
{
int i;
@@ -50,6 +50,8 @@ unsigned int __io_put_kbuf(struct io_kiocb *req, unsigned issue_flags);
void io_kbuf_recycle_legacy(struct io_kiocb *req, unsigned issue_flags);
+u64 io_zctap_buffer(struct io_kiocb *req, size_t *len);
+
static inline void io_kbuf_recycle_ring(struct io_kiocb *req)
{
/*
@@ -24,6 +24,8 @@ struct ifq_region {
int nr_pages;
u16 id;
+ spinlock_t freelist_lock;
+
struct io_zctap_buf *buf;
u16 freelist[];
};
@@ -40,20 +42,146 @@ static u64 zctap_mk_page_info(u16 region_id, u16 pgid)
return (u64)0xface << 48 | (u64)region_id << 16 | (u64)pgid;
}
+static u64 zctap_page_info(const struct page *page)
+{
+ return page_private(page);
+}
+
+static u16 zctap_page_id(const struct page *page)
+{
+ return zctap_page_info(page) & 0xffff;
+}
+
+/* driver bias cannot be larger than this */
+#define IO_ZCTAP_UREF 0x10000
+#define IO_ZCTAP_KREF_MASK (IO_ZCTAP_UREF - 1)
+
+/* return user refs back, indicate whether buffer is reusable */
+static bool io_zctap_put_buf_uref(struct io_zctap_buf *buf)
+{
+ if (atomic_read(&buf->refcount) < IO_ZCTAP_UREF) {
+ WARN_ONCE(1, "uref botch: %x < %x, id:%d page:%px\n",
+ atomic_read(&buf->refcount), IO_ZCTAP_UREF,
+ zctap_page_id(buf->page),
+ buf->page);
+ return false;
+ }
+
+ return atomic_sub_and_test(IO_ZCTAP_UREF, &buf->refcount);
+}
+
+/* gets a user-supplied buffer from the fill queue */
+static struct io_zctap_buf *io_zctap_get_buffer(struct io_zctap_ifq *ifq,
+ u16 *buf_pgid)
+{
+ struct io_zctap_buf *buf;
+ struct ifq_region *ifr;
+ struct io_kiocb req;
+ int pgid, region_id;
+ size_t len = 0;
+ u64 addr;
+
+ ifr = ifq->region;
+retry:
+ req = (struct io_kiocb) {
+ .ctx = ifq->ctx,
+ .buf_index = ifq->fill_bgid,
+ };
+ /* IN: uses buf_index as buffer group.
+ * OUT: buf_index of actual buffer. (and req->buf_list set)
+ * (this comes from the user-supplied bufid)
+ */
+ addr = io_zctap_buffer(&req, &len);
+ if (!addr)
+ return NULL;
+
+ pgid = addr & 0xffff;
+ region_id = (addr >> 16) & 0xffff;
+ if (region_id) {
+ WARN_RATELIMIT(1, "region_id %d > max 1", region_id);
+ return NULL;
+ }
+
+ if (pgid > ifr->nr_pages) {
+ WARN_RATELIMIT(1, "bufid %d > max %d", pgid, ifr->nr_pages);
+ return NULL;
+ }
+
+ buf = &ifr->buf[pgid];
+ if (!io_zctap_put_buf_uref(buf))
+ goto retry;
+
+ *buf_pgid = pgid;
+ return buf;
+}
+
+/* if on exit/teardown path, can skip this work */
+static void io_zctap_recycle_buf(struct ifq_region *ifr,
+ struct io_zctap_buf *buf)
+{
+ spin_lock(&ifr->freelist_lock);
+
+ ifr->freelist[ifr->free_count++] = buf - ifr->buf;
+
+ spin_unlock(&ifr->freelist_lock);
+}
+
struct io_zctap_buf *io_zctap_get_buf(struct io_zctap_ifq *ifq, int refc)
{
- return NULL;
+ struct ifq_region *ifr = ifq->region;
+ struct io_zctap_buf *buf;
+ u16 pgid;
+
+ spin_lock(&ifr->freelist_lock);
+
+ buf = NULL;
+ if (ifr->free_count) {
+ pgid = ifr->freelist[--ifr->free_count];
+ buf = &ifr->buf[pgid];
+ }
+
+ spin_unlock(&ifr->freelist_lock);
+
+ if (!buf) {
+ buf = io_zctap_get_buffer(ifq, &pgid);
+ if (!buf)
+ return NULL;
+ }
+
+ WARN_ON(atomic_read(&buf->refcount));
+ atomic_set(&buf->refcount, refc & IO_ZCTAP_KREF_MASK);
+
+ return buf;
}
EXPORT_SYMBOL(io_zctap_get_buf);
+/* called from driver and networking stack. */
void io_zctap_put_buf(struct io_zctap_ifq *ifq, struct io_zctap_buf *buf)
{
+ struct ifq_region *ifr = ifq->region;
+
+ /* XXX move to inline function later. */
+ if (!atomic_dec_and_test(&buf->refcount))
+ return;
+
+ io_zctap_recycle_buf(ifr, buf);
}
EXPORT_SYMBOL(io_zctap_put_buf);
+/* called from driver and networking stack. */
void io_zctap_put_buf_refs(struct io_zctap_ifq *ifq, struct io_zctap_buf *buf,
unsigned count)
{
+ struct ifq_region *ifr = ifq->region;
+ unsigned refs;
+
+ refs = atomic_read(&buf->refcount) & IO_ZCTAP_KREF_MASK;
+ WARN(refs < count, "driver refcount botch: %u < %u\n", refs, count);
+
+ if (!atomic_sub_and_test(count, &buf->refcount))
+ return;
+
+ io_zctap_recycle_buf(ifr, buf);
}
EXPORT_SYMBOL(io_zctap_put_buf_refs);
@@ -176,6 +304,7 @@ int io_provide_ifq_region(struct io_zctap_ifq *ifq, u16 id)
return -ENOMEM;
}
+ spin_lock_init(&ifr->freelist_lock);
ifr->nr_pages = nr_pages;
ifr->imu = imu;
ifr->free_count = nr_pages;
Flesh out the driver API functions introduced earlier. The driver gets a buffer with the specified reference count. If the driver specifies a large refcount (bias), it decrements this as skb fragments go up the stack, and the driver releases the references when finished with the buffer. When ownership of the fragment is transferred to the user, a user refcount is incremented, and correspondingly decremented when returned. When all refcounts are released, the buffer is safe to reuse. The user/kernel split is needed to differentiate between "safe to reuse the buffer" and "still in use by the kernel". The locking here can likely be improved. Signed-off-by: Jonathan Lemon <jonathan.lemon@gmail.com> --- io_uring/kbuf.c | 13 +++++ io_uring/kbuf.h | 2 + io_uring/zctap.c | 131 ++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 145 insertions(+), 1 deletion(-)