@@ -240,6 +240,7 @@ enum io_uring_op {
IORING_OP_URING_CMD,
IORING_OP_SEND_ZC,
IORING_OP_SENDMSG_ZC,
+ IORING_OP_RECV_ZC,
/* this goes last, obviously */
IORING_OP_LAST,
@@ -70,6 +70,16 @@ struct io_sr_msg {
struct io_kiocb *notif;
};
+struct io_recvzc {
+ struct file *file;
+ unsigned len;
+ unsigned done_io;
+ unsigned msg_flags;
+ u16 flags;
+
+ u32 datalen;
+};
+
static inline bool io_check_multishot(struct io_kiocb *req,
unsigned int issue_flags)
{
@@ -588,7 +598,8 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
if (sr->msg_flags & MSG_ERRQUEUE)
req->flags |= REQ_F_CLEAR_POLLIN;
if (sr->flags & IORING_RECV_MULTISHOT) {
- if (!(req->flags & REQ_F_BUFFER_SELECT))
+ if (!(req->flags & REQ_F_BUFFER_SELECT)
+ && req->opcode != IORING_OP_RECV_ZC)
return -EINVAL;
if (sr->msg_flags & MSG_WAITALL)
return -EINVAL;
@@ -636,7 +647,7 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
unsigned int cflags;
cflags = io_put_kbuf(req, issue_flags);
- if (msg->msg_inq && msg->msg_inq != -1)
+ if (msg && msg->msg_inq && msg->msg_inq != -1)
cflags |= IORING_CQE_F_SOCK_NONEMPTY;
if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
@@ -651,7 +662,7 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
io_recv_prep_retry(req);
/* Known not-empty or unknown state, retry */
if (cflags & IORING_CQE_F_SOCK_NONEMPTY ||
- msg->msg_inq == -1)
+ (msg && msg->msg_inq == -1))
return false;
if (issue_flags & IO_URING_F_MULTISHOT)
*ret = IOU_ISSUE_SKIP_COMPLETE;
@@ -955,9 +966,8 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
return ret;
}
-static __maybe_unused
-struct io_zc_rx_ifq *io_zc_verify_sock(struct io_kiocb *req,
- struct socket *sock)
+static struct io_zc_rx_ifq *io_zc_verify_sock(struct io_kiocb *req,
+ struct socket *sock)
{
unsigned token = READ_ONCE(sock->zc_rx_idx);
unsigned ifq_idx = token >> IO_ZC_IFQ_IDX_OFFSET;
@@ -974,6 +984,106 @@ struct io_zc_rx_ifq *io_zc_verify_sock(struct io_kiocb *req,
return ifq;
}
+int io_recvzc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+ struct io_recvzc *zc = io_kiocb_to_cmd(req, struct io_recvzc);
+ u64 recvzc_cmd;
+
+ recvzc_cmd = READ_ONCE(sqe->addr3);
+ zc->datalen = recvzc_cmd >> 32;
+ if (recvzc_cmd & 0xffff)
+ return -EINVAL;
+ if (!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN))
+ return -EINVAL;
+ if (unlikely(sqe->file_index || sqe->addr2))
+ return -EINVAL;
+
+ zc->len = READ_ONCE(sqe->len);
+ zc->flags = READ_ONCE(sqe->ioprio);
+ if (zc->flags & ~(RECVMSG_FLAGS))
+ return -EINVAL;
+ zc->msg_flags = READ_ONCE(sqe->msg_flags);
+ if (zc->msg_flags & MSG_DONTWAIT)
+ req->flags |= REQ_F_NOWAIT;
+ if (zc->msg_flags & MSG_ERRQUEUE)
+ req->flags |= REQ_F_CLEAR_POLLIN;
+ if (zc->flags & IORING_RECV_MULTISHOT) {
+ if (zc->msg_flags & MSG_WAITALL)
+ return -EINVAL;
+ if (req->opcode == IORING_OP_RECV && zc->len)
+ return -EINVAL;
+ req->flags |= REQ_F_APOLL_MULTISHOT;
+ }
+
+#ifdef CONFIG_COMPAT
+ if (req->ctx->compat)
+ zc->msg_flags |= MSG_CMSG_COMPAT;
+#endif
+ zc->done_io = 0;
+ return 0;
+}
+
+int io_recvzc(struct io_kiocb *req, unsigned int issue_flags)
+{
+ struct io_recvzc *zc = io_kiocb_to_cmd(req, struct io_recvzc);
+ struct socket *sock;
+ unsigned flags;
+ int ret, min_ret = 0;
+ bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
+ struct io_zc_rx_ifq *ifq;
+
+ if (issue_flags & IO_URING_F_UNLOCKED)
+ return -EAGAIN;
+
+ if (!(req->flags & REQ_F_POLLED) &&
+ (zc->flags & IORING_RECVSEND_POLL_FIRST))
+ return -EAGAIN;
+
+ sock = sock_from_file(req->file);
+ if (unlikely(!sock))
+ return -ENOTSOCK;
+ ifq = io_zc_verify_sock(req, sock);
+ if (!ifq)
+ return -EINVAL;
+
+retry_multishot:
+ flags = zc->msg_flags;
+ if (force_nonblock)
+ flags |= MSG_DONTWAIT;
+ if (flags & MSG_WAITALL)
+ min_ret = zc->len;
+
+ ret = io_zc_rx_recv(sock, zc->datalen, flags);
+ if (ret < min_ret) {
+ if (ret == -EAGAIN && force_nonblock) {
+ if (issue_flags & IO_URING_F_MULTISHOT)
+ return IOU_ISSUE_SKIP_COMPLETE;
+ return -EAGAIN;
+ }
+ if (ret > 0 && io_net_retry(sock, flags)) {
+ zc->len -= ret;
+ zc->done_io += ret;
+ req->flags |= REQ_F_PARTIAL_IO;
+ return -EAGAIN;
+ }
+ if (ret == -ERESTARTSYS)
+ ret = -EINTR;
+ req_set_fail(req);
+ } else if ((flags & MSG_WAITALL) && (flags & (MSG_TRUNC | MSG_CTRUNC))) {
+ req_set_fail(req);
+ }
+
+ if (ret > 0)
+ ret += zc->done_io;
+ else if (zc->done_io)
+ ret = zc->done_io;
+
+ if (!io_recv_finish(req, &ret, 0, ret <= 0, issue_flags))
+ goto retry_multishot;
+
+ return ret;
+}
+
void io_send_zc_cleanup(struct io_kiocb *req)
{
struct io_sr_msg *zc = io_kiocb_to_cmd(req, struct io_sr_msg);
@@ -33,6 +33,7 @@
#include "poll.h"
#include "cancel.h"
#include "rw.h"
+#include "zc_rx.h"
static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags)
{
@@ -426,6 +427,18 @@ const struct io_issue_def io_issue_defs[] = {
.issue = io_sendmsg_zc,
#else
.prep = io_eopnotsupp_prep,
+#endif
+ },
+ [IORING_OP_RECV_ZC] = {
+ .needs_file = 1,
+ .unbound_nonreg_file = 1,
+ .pollin = 1,
+ .ioprio = 1,
+#if defined(CONFIG_NET)
+ .prep = io_recvzc_prep,
+ .issue = io_recvzc,
+#else
+ .prep = io_eopnotsupp_prep,
#endif
},
};
@@ -648,6 +661,9 @@ const struct io_cold_def io_cold_defs[] = {
.fail = io_sendrecv_fail,
#endif
},
+ [IORING_OP_RECV_ZC] = {
+ .name = "RECV_ZC",
+ },
};
const char *io_uring_get_opcode(u8 opcode)
@@ -6,6 +6,7 @@
#include <linux/io_uring.h>
#include <linux/netdevice.h>
#include <linux/nospec.h>
+#include <net/tcp.h>
#include <uapi/linux/io_uring.h>
@@ -40,6 +41,13 @@ struct io_zc_rx_pool {
u32 freelist[];
};
+static inline u32 io_zc_rx_cqring_entries(struct io_zc_rx_ifq *ifq)
+{
+ struct io_rbuf_ring *ring = ifq->ring;
+
+ return ifq->cached_cq_tail - READ_ONCE(ring->cq.head);
+}
+
static inline struct device *netdev2dev(struct net_device *dev)
{
return dev->dev.parent;
@@ -311,6 +319,8 @@ int io_register_zc_rx_ifq(struct io_ring_ctx *ctx,
size_t ring_sz, rqes_sz, cqes_sz;
int ret;
+ if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
+ return -EINVAL;
if (copy_from_user(®, arg, sizeof(reg)))
return -EFAULT;
if (ctx->ifq)
@@ -444,6 +454,14 @@ int io_register_zc_rx_sock(struct io_ring_ctx *ctx,
return 0;
}
+static void io_zc_rx_get_buf_uref(struct io_zc_rx_pool *pool, u32 pgid)
+{
+ if (WARN_ON(pgid >= pool->nr_pages))
+ return;
+
+ atomic_add(IO_ZC_RX_UREF, &pool->bufs[pgid].refcount);
+}
+
static bool io_zc_rx_put_buf_uref(struct io_zc_rx_buf *buf)
{
if (atomic_read(&buf->refcount) < IO_ZC_RX_UREF)
@@ -549,4 +567,225 @@ struct io_zc_rx_buf *io_zc_rx_buf_from_page(struct io_zc_rx_ifq *ifq,
}
EXPORT_SYMBOL(io_zc_rx_buf_from_page);
+static struct io_zc_rx_ifq *io_zc_rx_ifq_skb(struct sk_buff *skb)
+{
+ struct ubuf_info *uarg = skb_zcopy(skb);
+
+ if (uarg && uarg->callback == io_zc_rx_skb_free)
+ return container_of(uarg, struct io_zc_rx_ifq, uarg);
+ return NULL;
+}
+
+static int zc_rx_recv_frag(struct io_zc_rx_ifq *ifq, const skb_frag_t *frag,
+ int off, int len)
+{
+ struct io_uring_rbuf_cqe *cqe;
+ unsigned int cq_idx, queued, free, entries;
+ struct page *page;
+ unsigned int mask;
+ u32 pgid;
+
+ page = skb_frag_page(frag);
+ off += skb_frag_off(frag);
+
+ if (likely(ifq && is_zc_rx_page(page))) {
+ mask = ifq->cq_entries - 1;
+ pgid = page_private(page) & 0xffffffff;
+ io_zc_rx_get_buf_uref(ifq->pool, pgid);
+ cq_idx = ifq->cached_cq_tail & mask;
+ smp_rmb();
+ queued = min(io_zc_rx_cqring_entries(ifq), ifq->cq_entries);
+ free = ifq->cq_entries - queued;
+ entries = min(free, ifq->cq_entries - cq_idx);
+ if (!entries)
+ return -ENOBUFS;
+ cqe = &ifq->cqes[cq_idx];
+ ifq->cached_cq_tail++;
+ cqe->region = 0;
+ cqe->off = pgid * PAGE_SIZE + off;
+ cqe->len = len;
+ cqe->flags = 0;
+ } else {
+ /* TODO: copy frags that aren't backed by zc pages */
+ WARN_ON_ONCE(1);
+ return -ENOMEM;
+ }
+
+ return len;
+}
+
+static int
+zc_rx_recv_skb(read_descriptor_t *desc, struct sk_buff *skb,
+ unsigned int offset, size_t len)
+{
+ struct io_zc_rx_ifq *ifq;
+ struct sk_buff *frag_iter;
+ unsigned start, start_off;
+ int i, copy, end, off;
+ int ret = 0;
+
+ ifq = io_zc_rx_ifq_skb(skb);
+ if (!ifq) {
+ pr_debug("non zerocopy pages are not supported\n");
+ return -EFAULT;
+ }
+ start = skb_headlen(skb);
+ start_off = offset;
+
+ // TODO: copy payload in skb linear data */
+ WARN_ON_ONCE(offset < start);
+
+ for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
+ const skb_frag_t *frag;
+
+ WARN_ON(start > offset + len);
+
+ frag = &skb_shinfo(skb)->frags[i];
+ end = start + skb_frag_size(frag);
+
+ if (offset < end) {
+ copy = end - offset;
+ if (copy > len)
+ copy = len;
+
+ off = offset - start;
+ ret = zc_rx_recv_frag(ifq, frag, off, copy);
+ if (ret < 0)
+ goto out;
+
+ offset += ret;
+ len -= ret;
+ if (len == 0 || ret != copy)
+ goto out;
+ }
+ start = end;
+ }
+
+ skb_walk_frags(skb, frag_iter) {
+ WARN_ON(start > offset + len);
+
+ end = start + frag_iter->len;
+ if (offset < end) {
+ copy = end - offset;
+ if (copy > len)
+ copy = len;
+
+ off = offset - start;
+ ret = zc_rx_recv_skb(desc, frag_iter, off, copy);
+ if (ret < 0)
+ goto out;
+
+ offset += ret;
+ len -= ret;
+ if (len == 0 || ret != copy)
+ goto out;
+ }
+ start = end;
+ }
+
+out:
+ smp_store_release(&ifq->ring->cq.tail, ifq->cached_cq_tail);
+ if (offset == start_off)
+ return ret;
+ return offset - start_off;
+}
+
+static int io_zc_rx_tcp_read(struct sock *sk)
+{
+ read_descriptor_t rd_desc = {
+ .count = 1,
+ };
+
+ return tcp_read_sock(sk, &rd_desc, zc_rx_recv_skb);
+}
+
+static int io_zc_rx_tcp_recvmsg(struct sock *sk, unsigned int recv_limit,
+ int flags, int *addr_len)
+{
+ size_t used;
+ long timeo;
+ int ret;
+
+ ret = used = 0;
+
+ lock_sock(sk);
+
+ timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+ while (recv_limit) {
+ ret = io_zc_rx_tcp_read(sk);
+ if (ret < 0)
+ break;
+ if (!ret) {
+ if (used)
+ break;
+ if (sock_flag(sk, SOCK_DONE))
+ break;
+ if (sk->sk_err) {
+ ret = sock_error(sk);
+ break;
+ }
+ if (sk->sk_shutdown & RCV_SHUTDOWN)
+ break;
+ if (sk->sk_state == TCP_CLOSE) {
+ ret = -ENOTCONN;
+ break;
+ }
+ if (!timeo) {
+ ret = -EAGAIN;
+ break;
+ }
+ if (!skb_queue_empty(&sk->sk_receive_queue))
+ break;
+ sk_wait_data(sk, &timeo, NULL);
+ if (signal_pending(current)) {
+ ret = sock_intr_errno(timeo);
+ break;
+ }
+ continue;
+ }
+ recv_limit -= ret;
+ used += ret;
+
+ if (!timeo)
+ break;
+ release_sock(sk);
+ lock_sock(sk);
+
+ if (sk->sk_err || sk->sk_state == TCP_CLOSE ||
+ (sk->sk_shutdown & RCV_SHUTDOWN) ||
+ signal_pending(current))
+ break;
+ }
+
+ release_sock(sk);
+
+ /* TODO: handle timestamping */
+
+ if (used)
+ return used;
+
+ return ret;
+}
+
+int io_zc_rx_recv(struct socket *sock, unsigned int limit, unsigned int flags)
+{
+ struct sock *sk = sock->sk;
+ const struct proto *prot;
+ int addr_len = 0;
+ int ret;
+
+ if (flags & MSG_ERRQUEUE)
+ return -EOPNOTSUPP;
+
+ prot = READ_ONCE(sk->sk_prot);
+ if (prot->recvmsg != tcp_recvmsg)
+ return -EPROTONOSUPPORT;
+
+ sock_rps_record_flow(sk);
+
+ ret = io_zc_rx_tcp_recvmsg(sk, limit, flags, &addr_len);
+
+ return ret;
+}
+
#endif
@@ -60,4 +60,8 @@ static inline int io_register_zc_rx_sock(struct io_ring_ctx *ctx,
}
#endif
+int io_recvzc(struct io_kiocb *req, unsigned int issue_flags);
+int io_recvzc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+int io_zc_rx_recv(struct socket *sock, unsigned int limit, unsigned int flags);
+
#endif