@@ -221,7 +221,7 @@ u64 fuse_get_unique(struct fuse_iqueue *fiq)
}
EXPORT_SYMBOL_GPL(fuse_get_unique);
-static unsigned int fuse_req_hash(u64 unique)
+unsigned int fuse_req_hash(u64 unique)
{
return hash_long(unique & ~FUSE_INT_REQ_BIT, FUSE_PQ_HASH_BITS);
}
@@ -1899,7 +1899,7 @@ static int fuse_notify(struct fuse_conn *fc, enum fuse_notify_code code,
}
/* Look up request on processing list by unique ID */
-static struct fuse_req *request_find(struct fuse_pqueue *fpq, u64 unique)
+struct fuse_req *fuse_request_find(struct fuse_pqueue *fpq, u64 unique)
{
unsigned int hash = fuse_req_hash(unique);
struct fuse_req *req;
@@ -1983,7 +1983,7 @@ static ssize_t fuse_dev_do_write(struct fuse_dev *fud,
spin_lock(&fpq->lock);
req = NULL;
if (fpq->connected)
- req = request_find(fpq, oh.unique & ~FUSE_INT_REQ_BIT);
+ req = fuse_request_find(fpq, oh.unique & ~FUSE_INT_REQ_BIT);
err = -ENOENT;
if (!req) {
@@ -24,6 +24,19 @@ bool fuse_uring_enabled(void)
return enable_uring;
}
+static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err,
+ int error)
+{
+ struct fuse_req *req = ring_ent->fuse_req;
+
+ if (set_err)
+ req->out.h.error = error;
+
+ clear_bit(FR_SENT, &req->flags);
+ fuse_request_end(ring_ent->fuse_req);
+ ring_ent->fuse_req = NULL;
+}
+
static int fuse_ring_ent_unset_userspace(struct fuse_ring_ent *ent)
{
struct fuse_ring_queue *queue = ent->queue;
@@ -54,8 +67,11 @@ void fuse_uring_destruct(struct fuse_conn *fc)
continue;
WARN_ON(!list_empty(&queue->ent_avail_queue));
+ WARN_ON(!list_empty(&queue->ent_w_req_queue));
WARN_ON(!list_empty(&queue->ent_commit_queue));
+ WARN_ON(!list_empty(&queue->ent_in_userspace));
+ kfree(queue->fpq.processing);
kfree(queue);
ring->queues[qid] = NULL;
}
@@ -114,13 +130,21 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring,
{
struct fuse_conn *fc = ring->fc;
struct fuse_ring_queue *queue;
+ struct list_head *pq;
queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT);
if (!queue)
return ERR_PTR(-ENOMEM);
+ pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL);
+ if (!pq) {
+ kfree(queue);
+ return ERR_PTR(-ENOMEM);
+ }
+
spin_lock(&fc->lock);
if (ring->queues[qid]) {
spin_unlock(&fc->lock);
+ kfree(queue->fpq.processing);
kfree(queue);
return ring->queues[qid];
}
@@ -131,6 +155,12 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring,
INIT_LIST_HEAD(&queue->ent_avail_queue);
INIT_LIST_HEAD(&queue->ent_commit_queue);
+ INIT_LIST_HEAD(&queue->ent_w_req_queue);
+ INIT_LIST_HEAD(&queue->ent_in_userspace);
+ INIT_LIST_HEAD(&queue->fuse_req_queue);
+
+ queue->fpq.processing = pq;
+ fuse_pqueue_init(&queue->fpq);
WRITE_ONCE(ring->queues[qid], queue);
spin_unlock(&fc->lock);
@@ -138,6 +168,213 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring,
return queue;
}
+static void
+fuse_uring_async_send_to_ring(struct io_uring_cmd *cmd,
+ unsigned int issue_flags)
+{
+ io_uring_cmd_done(cmd, 0, 0, issue_flags);
+}
+
+/*
+ * Checks for errors and stores it into the request
+ */
+static int fuse_uring_out_header_has_err(struct fuse_out_header *oh,
+ struct fuse_req *req,
+ struct fuse_conn *fc)
+{
+ int err;
+
+ err = -EINVAL;
+ if (oh->unique == 0) {
+ /* Not supportd through io-uring yet */
+ pr_warn_once("fuse: notify through fuse-io-uring not supported\n");
+ goto seterr;
+ }
+
+ err = -EINVAL;
+ if (oh->error <= -ERESTARTSYS || oh->error > 0)
+ goto seterr;
+
+ if (oh->error) {
+ err = oh->error;
+ goto err;
+ }
+
+ err = -ENOENT;
+ if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) {
+ pr_warn_ratelimited("Unexpected seqno mismatch, expected: %llu got %llu\n",
+ req->in.h.unique, oh->unique & ~FUSE_INT_REQ_BIT);
+ goto seterr;
+ }
+
+ /*
+ * Is it an interrupt reply ID?
+ * XXX: Not supported through fuse-io-uring yet, it should not even
+ * find the request - should not happen.
+ */
+ WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT);
+
+ return 0;
+
+seterr:
+ oh->error = err;
+err:
+ return err;
+}
+
+static int fuse_uring_copy_from_ring(struct fuse_ring *ring,
+ struct fuse_req *req,
+ struct fuse_ring_ent *ent)
+{
+ struct fuse_copy_state cs;
+ struct fuse_args *args = req->args;
+ struct iov_iter iter;
+ int err, res;
+ struct fuse_uring_ent_in_out ring_in_out;
+
+ res = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out,
+ sizeof(ring_in_out));
+ if (res)
+ return -EFAULT;
+
+ err = import_ubuf(ITER_SOURCE, ent->payload, ent->max_arg_len, &iter);
+ if (err)
+ return err;
+
+ fuse_copy_init(&cs, 0, &iter);
+ cs.is_uring = 1;
+ cs.req = req;
+
+ return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz);
+}
+
+ /*
+ * Copy data from the req to the ring buffer
+ */
+static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req,
+ struct fuse_ring_ent *ent)
+{
+ struct fuse_copy_state cs;
+ struct fuse_args *args = req->args;
+ struct fuse_in_arg *in_args = args->in_args;
+ int num_args = args->in_numargs;
+ int err, res;
+ struct iov_iter iter;
+ struct fuse_uring_ent_in_out ring_in_out = { .flags = 0 };
+
+ if (num_args == 0)
+ return 0;
+
+ err = import_ubuf(ITER_DEST, ent->payload, ent->max_arg_len, &iter);
+ if (err) {
+ pr_info_ratelimited("fuse: Import of user buffer failed\n");
+ return err;
+ }
+
+ fuse_copy_init(&cs, 1, &iter);
+ cs.is_uring = 1;
+ cs.req = req;
+
+ /*
+ * Expectation is that the first argument is the per op header.
+ * Some op code have that as zero.
+ */
+ if (args->in_args[0].size > 0) {
+ res = copy_to_user(&ent->headers->op_in, in_args->value,
+ in_args->size);
+ err = res > 0 ? -EFAULT : res;
+ if (err) {
+ pr_info_ratelimited("Copying the header failed.\n");
+ return err;
+ }
+ }
+ in_args++;
+ num_args--;
+
+ /* copy the payload */
+ err = fuse_copy_args(&cs, num_args, args->in_pages,
+ (struct fuse_arg *)in_args, 0);
+ if (err) {
+ pr_info_ratelimited("%s fuse_copy_args failed\n", __func__);
+ return err;
+ }
+
+ ring_in_out.payload_sz = cs.ring.offset;
+ res = copy_to_user(&ent->headers->ring_ent_in_out, &ring_in_out,
+ sizeof(ring_in_out));
+ err = res > 0 ? -EFAULT : res;
+ if (err)
+ return err;
+
+ return 0;
+}
+
+static int
+fuse_uring_prepare_send(struct fuse_ring_ent *ring_ent)
+{
+ struct fuse_ring_queue *queue = ring_ent->queue;
+ struct fuse_ring *ring = queue->ring;
+ struct fuse_req *req = ring_ent->fuse_req;
+ int err = 0, res;
+
+ if (WARN_ON(ring_ent->state != FRRS_FUSE_REQ)) {
+ pr_err("qid=%d ring-req=%p invalid state %d on send\n",
+ queue->qid, ring_ent, ring_ent->state);
+ err = -EIO;
+ }
+
+ if (err)
+ return err;
+
+ /* copy the request */
+ err = fuse_uring_copy_to_ring(ring, req, ring_ent);
+ if (unlikely(err)) {
+ pr_info("Copy to ring failed: %d\n", err);
+ goto err;
+ }
+
+ /* copy fuse_in_header */
+ res = copy_to_user(&ring_ent->headers->in_out, &req->in.h,
+ sizeof(req->in.h));
+ err = res > 0 ? -EFAULT : res;
+ if (err)
+ goto err;
+
+ set_bit(FR_SENT, &req->flags);
+ return 0;
+
+err:
+ fuse_uring_req_end(ring_ent, true, err);
+ return err;
+}
+
+/*
+ * Write data to the ring buffer and send the request to userspace,
+ * userspace will read it
+ * This is comparable with classical read(/dev/fuse)
+ */
+static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent)
+{
+ int err = 0;
+ struct fuse_ring_queue *queue = ring_ent->queue;
+
+ err = fuse_uring_prepare_send(ring_ent);
+ if (err)
+ goto err;
+
+ spin_lock(&queue->lock);
+ ring_ent->state = FRRS_USERSPACE;
+ list_move(&ring_ent->list, &queue->ent_in_userspace);
+ spin_unlock(&queue->lock);
+
+ io_uring_cmd_complete_in_task(ring_ent->cmd,
+ fuse_uring_async_send_to_ring);
+ return 0;
+
+err:
+ return err;
+}
+
/*
* Make a ring entry available for fuse_req assignment
*/
@@ -148,6 +385,189 @@ static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent,
ring_ent->state = FRRS_WAIT;
}
+/* Used to find the request on SQE commit */
+static void fuse_uring_add_to_pq(struct fuse_ring_ent *ring_ent,
+ struct fuse_req *req)
+{
+ struct fuse_ring_queue *queue = ring_ent->queue;
+ struct fuse_pqueue *fpq = &queue->fpq;
+ unsigned int hash;
+
+ req->ring_entry = ring_ent;
+ hash = fuse_req_hash(req->in.h.unique);
+ list_move_tail(&req->list, &fpq->processing[hash]);
+}
+
+/*
+ * Assign a fuse queue entry to the given entry
+ */
+static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ring_ent,
+ struct fuse_req *req)
+{
+ struct fuse_ring_queue *queue = ring_ent->queue;
+
+ lockdep_assert_held(&queue->lock);
+
+ if (WARN_ON_ONCE(ring_ent->state != FRRS_WAIT &&
+ ring_ent->state != FRRS_COMMIT)) {
+ pr_warn("%s qid=%d state=%d\n", __func__, ring_ent->queue->qid,
+ ring_ent->state);
+ }
+ list_del_init(&req->list);
+ clear_bit(FR_PENDING, &req->flags);
+ ring_ent->fuse_req = req;
+ ring_ent->state = FRRS_FUSE_REQ;
+ list_move(&ring_ent->list, &queue->ent_w_req_queue);
+ fuse_uring_add_to_pq(ring_ent, req);
+}
+
+/*
+ * Release the ring entry and fetch the next fuse request if available
+ *
+ * @return true if a new request has been fetched
+ */
+static bool fuse_uring_ent_assign_req(struct fuse_ring_ent *ring_ent)
+ __must_hold(&queue->lock)
+{
+ struct fuse_req *req = NULL;
+ struct fuse_ring_queue *queue = ring_ent->queue;
+ struct list_head *req_queue = &queue->fuse_req_queue;
+
+ lockdep_assert_held(&queue->lock);
+
+ /* get and assign the next entry while it is still holding the lock */
+ if (!list_empty(req_queue)) {
+ req = list_first_entry(req_queue, struct fuse_req, list);
+ fuse_uring_add_req_to_ring_ent(ring_ent, req);
+ }
+
+ return req ? true : false;
+}
+
+/*
+ * Read data from the ring buffer, which user space has written to
+ * This is comparible with handling of classical write(/dev/fuse).
+ * Also make the ring request available again for new fuse requests.
+ */
+static void fuse_uring_commit(struct fuse_ring_ent *ring_ent,
+ unsigned int issue_flags)
+{
+ struct fuse_ring *ring = ring_ent->queue->ring;
+ struct fuse_conn *fc = ring->fc;
+ struct fuse_req *req = ring_ent->fuse_req;
+ ssize_t err = 0;
+ bool set_err = false;
+
+ err = copy_from_user(&req->out.h, &ring_ent->headers->in_out,
+ sizeof(req->out.h));
+ if (err) {
+ req->out.h.error = err;
+ goto out;
+ }
+
+ err = fuse_uring_out_header_has_err(&req->out.h, req, fc);
+ if (err) {
+ /* req->out.h.error already set */
+ goto out;
+ }
+
+ err = fuse_uring_copy_from_ring(ring, req, ring_ent);
+ if (err)
+ set_err = true;
+
+out:
+ fuse_uring_req_end(ring_ent, set_err, err);
+}
+
+/*
+ * Get the next fuse req and send it
+ */
+static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent,
+ struct fuse_ring_queue *queue)
+{
+ int has_next, err;
+ int prev_state = ring_ent->state;
+
+ do {
+ spin_lock(&queue->lock);
+ has_next = fuse_uring_ent_assign_req(ring_ent);
+ if (!has_next) {
+ fuse_uring_ent_avail(ring_ent, queue);
+ spin_unlock(&queue->lock);
+ break; /* no request left */
+ }
+ spin_unlock(&queue->lock);
+
+ err = fuse_uring_send_next_to_ring(ring_ent);
+ if (err)
+ ring_ent->state = prev_state;
+ } while (err);
+}
+
+/* FUSE_URING_REQ_COMMIT_AND_FETCH handler */
+static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags,
+ struct fuse_conn *fc)
+{
+ const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe);
+ struct fuse_ring_ent *ring_ent;
+ int err;
+ struct fuse_ring *ring = fc->ring;
+ struct fuse_ring_queue *queue;
+ uint64_t commit_id = cmd_req->commit_id;
+ struct fuse_pqueue fpq;
+ struct fuse_req *req;
+
+ err = -ENOTCONN;
+ if (!ring)
+ return err;
+
+ queue = ring->queues[cmd_req->qid];
+ if (!queue)
+ return err;
+ fpq = queue->fpq;
+
+ spin_lock(&queue->lock);
+ /* Find a request based on the unique ID of the fuse request
+ * This should get revised, as it needs a hash calculation and list
+ * search. And full struct fuse_pqueue is needed (memory overhead).
+ * As well as the link from req to ring_ent.
+ */
+ req = fuse_request_find(&fpq, commit_id);
+ err = -ENOENT;
+ if (!req) {
+ pr_info("qid=%d commit_id %llu not found\n", queue->qid,
+ commit_id);
+ spin_unlock(&queue->lock);
+ return err;
+ }
+ list_del_init(&req->list);
+ ring_ent = req->ring_entry;
+ req->ring_entry = NULL;
+
+ err = fuse_ring_ent_unset_userspace(ring_ent);
+ if (err != 0) {
+ pr_info_ratelimited("qid=%d commit_id %llu state %d",
+ queue->qid, commit_id, ring_ent->state);
+ spin_unlock(&queue->lock);
+ return err;
+ }
+
+ ring_ent->cmd = cmd;
+ spin_unlock(&queue->lock);
+
+ /* without the queue lock, as other locks are taken */
+ fuse_uring_commit(ring_ent, issue_flags);
+
+ /*
+ * Fetching the next request is absolutely required as queued
+ * fuse requests would otherwise not get processed - committing
+ * and fetching is done in one step vs legacy fuse, which has separated
+ * read (fetch request) and write (commit result).
+ */
+ fuse_uring_next_fuse_req(ring_ent, queue);
+ return 0;
+}
+
/*
* fuse_uring_req_fetch command handling
*/
@@ -310,6 +730,9 @@ int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
return err;
}
break;
+ case FUSE_URING_REQ_COMMIT_AND_FETCH:
+ err = fuse_uring_commit_fetch(cmd, issue_flags, fc);
+ break;
default:
return -EINVAL;
}
@@ -20,6 +20,9 @@ enum fuse_ring_req_state {
/* The ring entry is waiting for new fuse requests */
FRRS_WAIT,
+ /* The ring entry got assigned a fuse req */
+ FRRS_FUSE_REQ,
+
/* The ring entry is in or on the way to user space */
FRRS_USERSPACE,
};
@@ -72,7 +75,16 @@ struct fuse_ring_queue {
* entries in the process of being committed or in the process
* to be send to userspace
*/
+ struct list_head ent_w_req_queue;
struct list_head ent_commit_queue;
+
+ /* entries in userspace */
+ struct list_head ent_in_userspace;
+
+ /* fuse requests waiting for an entry slot */
+ struct list_head fuse_req_queue;
+
+ struct fuse_pqueue fpq;
};
/**
@@ -7,6 +7,7 @@
#define _FS_FUSE_DEV_I_H
#include <linux/types.h>
+#include <linux/fs.h>
/* Ordinary requests have even IDs, while interrupts IDs are odd */
#define FUSE_INT_REQ_BIT (1ULL << 0)
@@ -14,6 +15,8 @@
struct fuse_arg;
struct fuse_args;
+struct fuse_pqueue;
+struct fuse_req;
struct fuse_copy_state {
int write;
@@ -43,6 +46,9 @@ static inline struct fuse_dev *fuse_get_dev(struct file *file)
return READ_ONCE(file->private_data);
}
+unsigned int fuse_req_hash(u64 unique);
+struct fuse_req *fuse_request_find(struct fuse_pqueue *fpq, u64 unique);
+
void fuse_dev_end_requests(struct list_head *head);
void fuse_copy_init(struct fuse_copy_state *cs, int write,
@@ -435,6 +435,10 @@ struct fuse_req {
/** fuse_mount this request belongs to */
struct fuse_mount *fm;
+
+#ifdef CONFIG_FUSE_IO_URING
+ void *ring_entry;
+#endif
};
struct fuse_iqueue;
@@ -1221,6 +1225,11 @@ void fuse_change_entry_timeout(struct dentry *entry, struct fuse_entry_out *o);
*/
struct fuse_conn *fuse_conn_get(struct fuse_conn *fc);
+/**
+ * Initialize the fuse processing queue
+ */
+void fuse_pqueue_init(struct fuse_pqueue *fpq);
+
/**
* Initialize fuse_conn
*/
@@ -906,7 +906,7 @@ static void fuse_iqueue_init(struct fuse_iqueue *fiq,
fiq->priv = priv;
}
-static void fuse_pqueue_init(struct fuse_pqueue *fpq)
+void fuse_pqueue_init(struct fuse_pqueue *fpq)
{
unsigned int i;
This adds support for fuse request completion through ring SQEs (FUSE_URING_REQ_COMMIT_AND_FETCH handling). After committing the ring entry it becomes available for new fuse requests. Handling of requests through the ring (SQE/CQE handling) is complete now. Fuse request data are copied through the mmaped ring buffer, there is no support for any zero copy yet. Signed-off-by: Bernd Schubert <bschubert@ddn.com> --- fs/fuse/dev.c | 6 +- fs/fuse/dev_uring.c | 423 ++++++++++++++++++++++++++++++++++++++++++++++++++ fs/fuse/dev_uring_i.h | 12 ++ fs/fuse/fuse_dev_i.h | 6 + fs/fuse/fuse_i.h | 9 ++ fs/fuse/inode.c | 2 +- 6 files changed, 454 insertions(+), 4 deletions(-)