Message ID | 20250107-fuse-uring-for-6-10-rfc4-v9-10-9c786f9a7a9d@ddn.com (mailing list archive) |
---|---|
State | New |
Headers | show |
Series | fuse: fuse-over-io-uring | expand |
Hi, On Tue, Jan 07 2025, Bernd Schubert wrote: > This adds support for fuse request completion through ring SQEs > (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing > the ring entry it becomes available for new fuse requests. > Handling of requests through the ring (SQE/CQE handling) > is complete now. > > Fuse request data are copied through the mmaped ring buffer, > there is no support for any zero copy yet. Please find below a few more comments. Also, please note that I'm trying to understand this patchset (and the whole fuse-over-io-uring thing), so most of my comments are minor nits. And those that are not may simply be wrong! I'm just noting them as I navigate through the code. (And by the way, thanks for this work!) > Signed-off-by: Bernd Schubert <bschubert@ddn.com> > --- > fs/fuse/dev_uring.c | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++ > fs/fuse/dev_uring_i.h | 12 ++ > fs/fuse/fuse_i.h | 4 + > 3 files changed, 466 insertions(+) > > diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c > index b44ba4033615e01041313c040035b6da6af0ee17..f44e66a7ea577390da87e9ac7d118a9416898c28 100644 > --- a/fs/fuse/dev_uring.c > +++ b/fs/fuse/dev_uring.c > @@ -26,6 +26,19 @@ bool fuse_uring_enabled(void) > return enable_uring; > } > > +static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err, > + int error) > +{ > + struct fuse_req *req = ring_ent->fuse_req; > + > + if (set_err) > + req->out.h.error = error; > + > + clear_bit(FR_SENT, &req->flags); > + fuse_request_end(ring_ent->fuse_req); > + ring_ent->fuse_req = NULL; > +} > + > void fuse_uring_destruct(struct fuse_conn *fc) > { > struct fuse_ring *ring = fc->ring; > @@ -41,8 +54,11 @@ void fuse_uring_destruct(struct fuse_conn *fc) > continue; > > WARN_ON(!list_empty(&queue->ent_avail_queue)); > + WARN_ON(!list_empty(&queue->ent_w_req_queue)); > WARN_ON(!list_empty(&queue->ent_commit_queue)); > + WARN_ON(!list_empty(&queue->ent_in_userspace)); > > + kfree(queue->fpq.processing); > kfree(queue); > ring->queues[qid] = NULL; > } > @@ -101,20 +117,34 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, > { > struct fuse_conn *fc = ring->fc; > struct fuse_ring_queue *queue; > + struct list_head *pq; > > queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); > if (!queue) > return NULL; > + pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL); > + if (!pq) { > + kfree(queue); > + return NULL; > + } > + > queue->qid = qid; > queue->ring = ring; > spin_lock_init(&queue->lock); > > INIT_LIST_HEAD(&queue->ent_avail_queue); > INIT_LIST_HEAD(&queue->ent_commit_queue); > + INIT_LIST_HEAD(&queue->ent_w_req_queue); > + INIT_LIST_HEAD(&queue->ent_in_userspace); > + INIT_LIST_HEAD(&queue->fuse_req_queue); > + > + queue->fpq.processing = pq; > + fuse_pqueue_init(&queue->fpq); > > spin_lock(&fc->lock); > if (ring->queues[qid]) { > spin_unlock(&fc->lock); > + kfree(queue->fpq.processing); > kfree(queue); > return ring->queues[qid]; > } > @@ -128,6 +158,214 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, > return queue; > } > > +/* > + * Checks for errors and stores it into the request > + */ > +static int fuse_uring_out_header_has_err(struct fuse_out_header *oh, > + struct fuse_req *req, > + struct fuse_conn *fc) > +{ > + int err; > + > + err = -EINVAL; > + if (oh->unique == 0) { > + /* Not supportd through io-uring yet */ typo: "supported" > + pr_warn_once("notify through fuse-io-uring not supported\n"); > + goto seterr; > + } > + > + err = -EINVAL; Not really needed, it already has this value. > + if (oh->error <= -ERESTARTSYS || oh->error > 0) > + goto seterr; > + > + if (oh->error) { > + err = oh->error; > + goto err; > + } > + > + err = -ENOENT; > + if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) { > + pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n", > + req->in.h.unique, > + oh->unique & ~FUSE_INT_REQ_BIT); > + goto seterr; > + } > + > + /* > + * Is it an interrupt reply ID? > + * XXX: Not supported through fuse-io-uring yet, it should not even > + * find the request - should not happen. > + */ > + WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT); > + > + return 0; > + > +seterr: > + oh->error = err; > +err: > + return err; > +} > + > +static int fuse_uring_copy_from_ring(struct fuse_ring *ring, > + struct fuse_req *req, > + struct fuse_ring_ent *ent) > +{ > + struct fuse_copy_state cs; > + struct fuse_args *args = req->args; > + struct iov_iter iter; > + int err, res; nit: no need for two variables; one of the 'int' variables could be removed. There are other functions with a similar pattern, but this was the first one that caught my attention. > + struct fuse_uring_ent_in_out ring_in_out; > + > + res = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out, > + sizeof(ring_in_out)); > + if (res) > + return -EFAULT; > + > + err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz, > + &iter); > + if (err) > + return err; > + > + fuse_copy_init(&cs, 0, &iter); > + cs.is_uring = 1; > + cs.req = req; > + > + return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz); > +} > + > + /* > + * Copy data from the req to the ring buffer > + */ nit: extra space in comment indentation. > > +static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req, > + struct fuse_ring_ent *ent) > +{ > + struct fuse_copy_state cs; > + struct fuse_args *args = req->args; > + struct fuse_in_arg *in_args = args->in_args; > + int num_args = args->in_numargs; > + int err, res; > + struct iov_iter iter; > + struct fuse_uring_ent_in_out ent_in_out = { > + .flags = 0, > + .commit_id = ent->commit_id, > + }; > + > + if (WARN_ON(ent_in_out.commit_id == 0)) > + return -EINVAL; > + > + err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter); > + if (err) { > + pr_info_ratelimited("fuse: Import of user buffer failed\n"); > + return err; > + } > + > + fuse_copy_init(&cs, 1, &iter); > + cs.is_uring = 1; > + cs.req = req; > + > + if (num_args > 0) { > + /* > + * Expectation is that the first argument is the per op header. > + * Some op code have that as zero. > + */ > + if (args->in_args[0].size > 0) { > + res = copy_to_user(&ent->headers->op_in, in_args->value, > + in_args->size); > + err = res > 0 ? -EFAULT : res; > + if (err) { > + pr_info_ratelimited( > + "Copying the header failed.\n"); > + return err; > + } > + } > + in_args++; > + num_args--; > + } > + > + /* copy the payload */ > + err = fuse_copy_args(&cs, num_args, args->in_pages, > + (struct fuse_arg *)in_args, 0); > + if (err) { > + pr_info_ratelimited("%s fuse_copy_args failed\n", __func__); > + return err; > + } > + > + ent_in_out.payload_sz = cs.ring.copied_sz; > + res = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out, > + sizeof(ent_in_out)); > + err = res > 0 ? -EFAULT : res; > + if (err) > + return err; Simply return err? :-) > + > + return 0; > +} > + > +static int > +fuse_uring_prepare_send(struct fuse_ring_ent *ring_ent) > +{ > + struct fuse_ring_queue *queue = ring_ent->queue; > + struct fuse_ring *ring = queue->ring; > + struct fuse_req *req = ring_ent->fuse_req; > + int err, res; > + > + err = -EIO; > + if (WARN_ON(ring_ent->state != FRRS_FUSE_REQ)) { > + pr_err("qid=%d ring-req=%p invalid state %d on send\n", > + queue->qid, ring_ent, ring_ent->state); > + err = -EIO; 'err' initialized twice. One of these could be removed. > + goto err; > + } > + > + /* copy the request */ > + err = fuse_uring_copy_to_ring(ring, req, ring_ent); > + if (unlikely(err)) { > + pr_info_ratelimited("Copy to ring failed: %d\n", err); > + goto err; > + } > + > + /* copy fuse_in_header */ > + res = copy_to_user(&ring_ent->headers->in_out, &req->in.h, > + sizeof(req->in.h)); > + err = res > 0 ? -EFAULT : res; > + if (err) > + goto err; > + > + set_bit(FR_SENT, &req->flags); > + return 0; > + > +err: > + fuse_uring_req_end(ring_ent, true, err); > + return err; > +} > + > +/* > + * Write data to the ring buffer and send the request to userspace, > + * userspace will read it > + * This is comparable with classical read(/dev/fuse) > + */ > +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent, > + unsigned int issue_flags) > +{ > + int err = 0; > + struct fuse_ring_queue *queue = ring_ent->queue; > + > + err = fuse_uring_prepare_send(ring_ent); > + if (err) > + goto err; Since this is the only place where this label is used, it could simply return 'err' and the label removed. > + > + spin_lock(&queue->lock); > + ring_ent->state = FRRS_USERSPACE; > + list_move(&ring_ent->list, &queue->ent_in_userspace); > + spin_unlock(&queue->lock); > + > + io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags); > + ring_ent->cmd = NULL; > + return 0; > + > +err: > + return err; > +} > + > /* > * Make a ring entry available for fuse_req assignment > */ > @@ -138,6 +376,210 @@ static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent, > ring_ent->state = FRRS_AVAILABLE; > } > > +/* Used to find the request on SQE commit */ > +static void fuse_uring_add_to_pq(struct fuse_ring_ent *ring_ent, > + struct fuse_req *req) > +{ > + struct fuse_ring_queue *queue = ring_ent->queue; > + struct fuse_pqueue *fpq = &queue->fpq; > + unsigned int hash; > + > + /* commit_id is the unique id of the request */ > + ring_ent->commit_id = req->in.h.unique; > + > + req->ring_entry = ring_ent; > + hash = fuse_req_hash(ring_ent->commit_id); > + list_move_tail(&req->list, &fpq->processing[hash]); > +} > + > +/* > + * Assign a fuse queue entry to the given entry > + */ > +static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ring_ent, > + struct fuse_req *req) > +{ > + struct fuse_ring_queue *queue = ring_ent->queue; > + > + lockdep_assert_held(&queue->lock); > + > + if (WARN_ON_ONCE(ring_ent->state != FRRS_AVAILABLE && > + ring_ent->state != FRRS_COMMIT)) { > + pr_warn("%s qid=%d state=%d\n", __func__, ring_ent->queue->qid, > + ring_ent->state); > + } > + list_del_init(&req->list); > + clear_bit(FR_PENDING, &req->flags); > + ring_ent->fuse_req = req; > + ring_ent->state = FRRS_FUSE_REQ; > + list_move(&ring_ent->list, &queue->ent_w_req_queue); > + fuse_uring_add_to_pq(ring_ent, req); > +} > + > +/* > + * Release the ring entry and fetch the next fuse request if available > + * > + * @return true if a new request has been fetched > + */ > +static bool fuse_uring_ent_assign_req(struct fuse_ring_ent *ring_ent) > + __must_hold(&queue->lock) > +{ > + struct fuse_req *req; > + struct fuse_ring_queue *queue = ring_ent->queue; > + struct list_head *req_queue = &queue->fuse_req_queue; > + > + lockdep_assert_held(&queue->lock); > + > + /* get and assign the next entry while it is still holding the lock */ > + req = list_first_entry_or_null(req_queue, struct fuse_req, list); > + if (req) { > + fuse_uring_add_req_to_ring_ent(ring_ent, req); > + return true; > + } > + > + return false; > +} > + > +/* > + * Read data from the ring buffer, which user space has written to > + * This is comparible with handling of classical write(/dev/fuse). nit: "comparable" > + * Also make the ring request available again for new fuse requests. > + */ > +static void fuse_uring_commit(struct fuse_ring_ent *ring_ent, > + unsigned int issue_flags) > +{ > + struct fuse_ring *ring = ring_ent->queue->ring; > + struct fuse_conn *fc = ring->fc; > + struct fuse_req *req = ring_ent->fuse_req; > + ssize_t err = 0; > + bool set_err = false; > + > + err = copy_from_user(&req->out.h, &ring_ent->headers->in_out, > + sizeof(req->out.h)); > + if (err) { > + req->out.h.error = err; > + goto out; > + } > + > + err = fuse_uring_out_header_has_err(&req->out.h, req, fc); > + if (err) { > + /* req->out.h.error already set */ > + goto out; > + } > + > + err = fuse_uring_copy_from_ring(ring, req, ring_ent); > + if (err) > + set_err = true; > + > +out: > + fuse_uring_req_end(ring_ent, set_err, err); > +} > + > +/* > + * Get the next fuse req and send it > + */ > +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent, > + struct fuse_ring_queue *queue, > + unsigned int issue_flags) > +{ > + int err; > + bool has_next; > + > +retry: > + spin_lock(&queue->lock); > + fuse_uring_ent_avail(ring_ent, queue); > + has_next = fuse_uring_ent_assign_req(ring_ent); > + spin_unlock(&queue->lock); > + > + if (has_next) { > + err = fuse_uring_send_next_to_ring(ring_ent, issue_flags); > + if (err) > + goto retry; I wonder whether this is safe. Maybe this is *obviously* safe, but I'm still trying to understand this patchset; so, for me, it is not :-) Would it be worth it trying to limit the maximum number of retries? > + } > +} > + > +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent) > +{ > + struct fuse_ring_queue *queue = ent->queue; > + > + lockdep_assert_held(&queue->lock); > + > + if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) > + return -EIO; > + > + ent->state = FRRS_COMMIT; > + list_move(&ent->list, &queue->ent_commit_queue); > + > + return 0; > +} > + > +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */ > +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, > + struct fuse_conn *fc) > +{ > + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); > + struct fuse_ring_ent *ring_ent; > + int err; > + struct fuse_ring *ring = fc->ring; > + struct fuse_ring_queue *queue; > + uint64_t commit_id = READ_ONCE(cmd_req->commit_id); > + unsigned int qid = READ_ONCE(cmd_req->qid); > + struct fuse_pqueue *fpq; > + struct fuse_req *req; > + > + err = -ENOTCONN; > + if (!ring) > + return err; > + > + if (qid >= ring->nr_queues) > + return -EINVAL; > + > + queue = ring->queues[qid]; > + if (!queue) > + return err; > + fpq = &queue->fpq; > + > + spin_lock(&queue->lock); > + /* Find a request based on the unique ID of the fuse request > + * This should get revised, as it needs a hash calculation and list > + * search. And full struct fuse_pqueue is needed (memory overhead). > + * As well as the link from req to ring_ent. > + */ > + req = fuse_request_find(fpq, commit_id); > + err = -ENOENT; > + if (!req) { > + pr_info("qid=%d commit_id %llu not found\n", queue->qid, > + commit_id); > + spin_unlock(&queue->lock); > + return err; > + } > + list_del_init(&req->list); > + ring_ent = req->ring_entry; > + req->ring_entry = NULL; > + > + err = fuse_ring_ent_set_commit(ring_ent); > + if (err != 0) { I'm probably missing something, but because we removed 'req' from the list above, aren't we leaking it if we get an error here? Cheers,
On 1/7/25 15:42, Luis Henriques wrote: > Hi, > > On Tue, Jan 07 2025, Bernd Schubert wrote: > >> This adds support for fuse request completion through ring SQEs >> (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing >> the ring entry it becomes available for new fuse requests. >> Handling of requests through the ring (SQE/CQE handling) >> is complete now. >> >> Fuse request data are copied through the mmaped ring buffer, >> there is no support for any zero copy yet. > > Please find below a few more comments. Thanks, I fixed all comments, except of retry in fuse_uring_next_fuse_req. [...] > > Also, please note that I'm trying to understand this patchset (and the > whole fuse-over-io-uring thing), so most of my comments are minor nits. > And those that are not may simply be wrong! I'm just noting them as I > navigate through the code. > > (And by the way, thanks for this work!) > >> +/* >> + * Get the next fuse req and send it >> + */ >> +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent, >> + struct fuse_ring_queue *queue, >> + unsigned int issue_flags) >> +{ >> + int err; >> + bool has_next; >> + >> +retry: >> + spin_lock(&queue->lock); >> + fuse_uring_ent_avail(ring_ent, queue); >> + has_next = fuse_uring_ent_assign_req(ring_ent); >> + spin_unlock(&queue->lock); >> + >> + if (has_next) { >> + err = fuse_uring_send_next_to_ring(ring_ent, issue_flags); >> + if (err) >> + goto retry; > > I wonder whether this is safe. Maybe this is *obviously* safe, but I'm > still trying to understand this patchset; so, for me, it is not :-) > > Would it be worth it trying to limit the maximum number of retries? No, we cannot limit retries. Let's do a simple example with one ring entry and also just one queue. Multiple applications create fuse requests. The first application fills the only available ring entry and submits it, the others just get queued in queue->fuse_req_queue. After that the application just waits request_wait_answer() On commit of the first request the ring task has to take the next request from queue->fuse_req_queue - if something fails with that request it has to complete it and proceed to the next request. If we would introduce a max-retries here, it would put the ring entry on hold (FRRS_AVAILABLE) and until another application comes, it would forever wait there. The applications waiting in request_wait_answer would never complete either. > >> + } >> +} >> + >> +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent) >> +{ >> + struct fuse_ring_queue *queue = ent->queue; >> + >> + lockdep_assert_held(&queue->lock); >> + >> + if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) >> + return -EIO; >> + >> + ent->state = FRRS_COMMIT; >> + list_move(&ent->list, &queue->ent_commit_queue); >> + >> + return 0; >> +} >> + >> +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */ >> +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, >> + struct fuse_conn *fc) >> +{ >> + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); >> + struct fuse_ring_ent *ring_ent; >> + int err; >> + struct fuse_ring *ring = fc->ring; >> + struct fuse_ring_queue *queue; >> + uint64_t commit_id = READ_ONCE(cmd_req->commit_id); >> + unsigned int qid = READ_ONCE(cmd_req->qid); >> + struct fuse_pqueue *fpq; >> + struct fuse_req *req; >> + >> + err = -ENOTCONN; >> + if (!ring) >> + return err; >> + >> + if (qid >= ring->nr_queues) >> + return -EINVAL; >> + >> + queue = ring->queues[qid]; >> + if (!queue) >> + return err; >> + fpq = &queue->fpq; >> + >> + spin_lock(&queue->lock); >> + /* Find a request based on the unique ID of the fuse request >> + * This should get revised, as it needs a hash calculation and list >> + * search. And full struct fuse_pqueue is needed (memory overhead). >> + * As well as the link from req to ring_ent. >> + */ >> + req = fuse_request_find(fpq, commit_id); >> + err = -ENOENT; >> + if (!req) { >> + pr_info("qid=%d commit_id %llu not found\n", queue->qid, >> + commit_id); >> + spin_unlock(&queue->lock); >> + return err; >> + } >> + list_del_init(&req->list); >> + ring_ent = req->ring_entry; >> + req->ring_entry = NULL; >> + >> + err = fuse_ring_ent_set_commit(ring_ent); >> + if (err != 0) { > > I'm probably missing something, but because we removed 'req' from the list > above, aren't we leaking it if we get an error here? Hmm, yeah, that is debatable. We basically have a grave error here. Either kernel or userspace are doing something wrong. Though probably you are right and we should end the request with EIO. Thanks, Bernd
On Tue, Jan 07 2025, Bernd Schubert wrote: > On 1/7/25 15:42, Luis Henriques wrote: >> Hi, >> On Tue, Jan 07 2025, Bernd Schubert wrote: >> >>> This adds support for fuse request completion through ring SQEs >>> (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing >>> the ring entry it becomes available for new fuse requests. >>> Handling of requests through the ring (SQE/CQE handling) >>> is complete now. >>> >>> Fuse request data are copied through the mmaped ring buffer, >>> there is no support for any zero copy yet. >> Please find below a few more comments. > > Thanks, I fixed all comments, except of retry in fuse_uring_next_fuse_req. Awesome, thanks for taking those comments into account. > [...] > >> Also, please note that I'm trying to understand this patchset (and the >> whole fuse-over-io-uring thing), so most of my comments are minor nits. >> And those that are not may simply be wrong! I'm just noting them as I >> navigate through the code. >> (And by the way, thanks for this work!) >> >>> +/* >>> + * Get the next fuse req and send it >>> + */ >>> +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent, >>> + struct fuse_ring_queue *queue, >>> + unsigned int issue_flags) >>> +{ >>> + int err; >>> + bool has_next; >>> + >>> +retry: >>> + spin_lock(&queue->lock); >>> + fuse_uring_ent_avail(ring_ent, queue); >>> + has_next = fuse_uring_ent_assign_req(ring_ent); >>> + spin_unlock(&queue->lock); >>> + >>> + if (has_next) { >>> + err = fuse_uring_send_next_to_ring(ring_ent, issue_flags); >>> + if (err) >>> + goto retry; >> I wonder whether this is safe. Maybe this is *obviously* safe, but I'm >> still trying to understand this patchset; so, for me, it is not :-) >> Would it be worth it trying to limit the maximum number of retries? > > No, we cannot limit retries. Let's do a simple example with one ring > entry and also just one queue. Multiple applications create fuse > requests. The first application fills the only available ring entry > and submits it, the others just get queued in queue->fuse_req_queue. > After that the application just waits request_wait_answer() > > On commit of the first request the ring task has to take the next > request from queue->fuse_req_queue - if something fails with that > request it has to complete it and proceed to the next request. > If we would introduce a max-retries here, it would put the ring entry > on hold (FRRS_AVAILABLE) and until another application comes, it would > forever wait there. The applications waiting in request_wait_answer > would never complete either. Oh! OK, I see it now. I totally misunderstood it then. Thanks for taking your taking explaining it. Cheers,
On Mon, Jan 6, 2025 at 4:25 PM Bernd Schubert <bschubert@ddn.com> wrote: > > This adds support for fuse request completion through ring SQEs > (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing > the ring entry it becomes available for new fuse requests. > Handling of requests through the ring (SQE/CQE handling) > is complete now. > > Fuse request data are copied through the mmaped ring buffer, > there is no support for any zero copy yet. > > Signed-off-by: Bernd Schubert <bschubert@ddn.com> > --- > fs/fuse/dev_uring.c | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++ > fs/fuse/dev_uring_i.h | 12 ++ > fs/fuse/fuse_i.h | 4 + > 3 files changed, 466 insertions(+) > > diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c > index b44ba4033615e01041313c040035b6da6af0ee17..f44e66a7ea577390da87e9ac7d118a9416898c28 100644 > --- a/fs/fuse/dev_uring.c > +++ b/fs/fuse/dev_uring.c > @@ -26,6 +26,19 @@ bool fuse_uring_enabled(void) > return enable_uring; > } > > +static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err, > + int error) > +{ > + struct fuse_req *req = ring_ent->fuse_req; > + > + if (set_err) > + req->out.h.error = error; I think we could get away with not having the "bool set_err" as an argument if we do "if (error)" directly. AFAICT, we can use the value of error directly since it always returns zero on success and any non-zero value is considered an error. > + > + clear_bit(FR_SENT, &req->flags); > + fuse_request_end(ring_ent->fuse_req); > + ring_ent->fuse_req = NULL; > +} > + > void fuse_uring_destruct(struct fuse_conn *fc) > { > struct fuse_ring *ring = fc->ring; > @@ -41,8 +54,11 @@ void fuse_uring_destruct(struct fuse_conn *fc) > continue; > > WARN_ON(!list_empty(&queue->ent_avail_queue)); > + WARN_ON(!list_empty(&queue->ent_w_req_queue)); > WARN_ON(!list_empty(&queue->ent_commit_queue)); > + WARN_ON(!list_empty(&queue->ent_in_userspace)); > > + kfree(queue->fpq.processing); > kfree(queue); > ring->queues[qid] = NULL; > } > @@ -101,20 +117,34 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, > { > struct fuse_conn *fc = ring->fc; > struct fuse_ring_queue *queue; > + struct list_head *pq; > > queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); > if (!queue) > return NULL; > + pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL); > + if (!pq) { > + kfree(queue); > + return NULL; > + } > + > queue->qid = qid; > queue->ring = ring; > spin_lock_init(&queue->lock); > > INIT_LIST_HEAD(&queue->ent_avail_queue); > INIT_LIST_HEAD(&queue->ent_commit_queue); > + INIT_LIST_HEAD(&queue->ent_w_req_queue); > + INIT_LIST_HEAD(&queue->ent_in_userspace); > + INIT_LIST_HEAD(&queue->fuse_req_queue); > + > + queue->fpq.processing = pq; > + fuse_pqueue_init(&queue->fpq); > > spin_lock(&fc->lock); > if (ring->queues[qid]) { > spin_unlock(&fc->lock); > + kfree(queue->fpq.processing); > kfree(queue); > return ring->queues[qid]; > } > @@ -128,6 +158,214 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, > return queue; > } > > +/* > + * Checks for errors and stores it into the request > + */ > +static int fuse_uring_out_header_has_err(struct fuse_out_header *oh, > + struct fuse_req *req, > + struct fuse_conn *fc) > +{ > + int err; > + > + err = -EINVAL; > + if (oh->unique == 0) { > + /* Not supportd through io-uring yet */ > + pr_warn_once("notify through fuse-io-uring not supported\n"); > + goto seterr; > + } > + > + err = -EINVAL; > + if (oh->error <= -ERESTARTSYS || oh->error > 0) > + goto seterr; > + > + if (oh->error) { > + err = oh->error; > + goto err; > + } > + > + err = -ENOENT; > + if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) { > + pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n", > + req->in.h.unique, > + oh->unique & ~FUSE_INT_REQ_BIT); > + goto seterr; > + } > + > + /* > + * Is it an interrupt reply ID? > + * XXX: Not supported through fuse-io-uring yet, it should not even > + * find the request - should not happen. > + */ > + WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT); > + > + return 0; > + > +seterr: > + oh->error = err; > +err: > + return err; > +} > + > +static int fuse_uring_copy_from_ring(struct fuse_ring *ring, > + struct fuse_req *req, > + struct fuse_ring_ent *ent) > +{ > + struct fuse_copy_state cs; > + struct fuse_args *args = req->args; > + struct iov_iter iter; > + int err, res; > + struct fuse_uring_ent_in_out ring_in_out; > + > + res = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out, > + sizeof(ring_in_out)); > + if (res) > + return -EFAULT; > + > + err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz, > + &iter); > + if (err) > + return err; > + > + fuse_copy_init(&cs, 0, &iter); > + cs.is_uring = 1; > + cs.req = req; > + > + return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz); > +} > + > + /* > + * Copy data from the req to the ring buffer > + */ > +static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req, > + struct fuse_ring_ent *ent) > +{ > + struct fuse_copy_state cs; > + struct fuse_args *args = req->args; > + struct fuse_in_arg *in_args = args->in_args; > + int num_args = args->in_numargs; > + int err, res; > + struct iov_iter iter; > + struct fuse_uring_ent_in_out ent_in_out = { > + .flags = 0, > + .commit_id = ent->commit_id, > + }; > + > + if (WARN_ON(ent_in_out.commit_id == 0)) > + return -EINVAL; > + > + err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter); > + if (err) { > + pr_info_ratelimited("fuse: Import of user buffer failed\n"); > + return err; > + } > + > + fuse_copy_init(&cs, 1, &iter); > + cs.is_uring = 1; > + cs.req = req; > + > + if (num_args > 0) { > + /* > + * Expectation is that the first argument is the per op header. > + * Some op code have that as zero. > + */ > + if (args->in_args[0].size > 0) { > + res = copy_to_user(&ent->headers->op_in, in_args->value, > + in_args->size); > + err = res > 0 ? -EFAULT : res; > + if (err) { > + pr_info_ratelimited( > + "Copying the header failed.\n"); > + return err; > + } > + } > + in_args++; > + num_args--; > + } > + > + /* copy the payload */ > + err = fuse_copy_args(&cs, num_args, args->in_pages, > + (struct fuse_arg *)in_args, 0); > + if (err) { > + pr_info_ratelimited("%s fuse_copy_args failed\n", __func__); > + return err; > + } > + > + ent_in_out.payload_sz = cs.ring.copied_sz; > + res = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out, > + sizeof(ent_in_out)); > + err = res > 0 ? -EFAULT : res; > + if (err) > + return err; > + > + return 0; > +} > + > +static int > +fuse_uring_prepare_send(struct fuse_ring_ent *ring_ent) > +{ > + struct fuse_ring_queue *queue = ring_ent->queue; > + struct fuse_ring *ring = queue->ring; > + struct fuse_req *req = ring_ent->fuse_req; > + int err, res; > + > + err = -EIO; > + if (WARN_ON(ring_ent->state != FRRS_FUSE_REQ)) { > + pr_err("qid=%d ring-req=%p invalid state %d on send\n", > + queue->qid, ring_ent, ring_ent->state); > + err = -EIO; > + goto err; > + } > + > + /* copy the request */ > + err = fuse_uring_copy_to_ring(ring, req, ring_ent); > + if (unlikely(err)) { > + pr_info_ratelimited("Copy to ring failed: %d\n", err); > + goto err; > + } > + > + /* copy fuse_in_header */ > + res = copy_to_user(&ring_ent->headers->in_out, &req->in.h, > + sizeof(req->in.h)); > + err = res > 0 ? -EFAULT : res; > + if (err) > + goto err; > + > + set_bit(FR_SENT, &req->flags); > + return 0; > + > +err: > + fuse_uring_req_end(ring_ent, true, err); > + return err; > +} > + > +/* > + * Write data to the ring buffer and send the request to userspace, > + * userspace will read it > + * This is comparable with classical read(/dev/fuse) > + */ > +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent, > + unsigned int issue_flags) > +{ > + int err = 0; > + struct fuse_ring_queue *queue = ring_ent->queue; > + > + err = fuse_uring_prepare_send(ring_ent); > + if (err) > + goto err; > + > + spin_lock(&queue->lock); > + ring_ent->state = FRRS_USERSPACE; > + list_move(&ring_ent->list, &queue->ent_in_userspace); > + spin_unlock(&queue->lock); > + > + io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags); > + ring_ent->cmd = NULL; > + return 0; > + > +err: > + return err; > +} > + > /* > * Make a ring entry available for fuse_req assignment > */ > @@ -138,6 +376,210 @@ static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent, > ring_ent->state = FRRS_AVAILABLE; > } > > +/* Used to find the request on SQE commit */ > +static void fuse_uring_add_to_pq(struct fuse_ring_ent *ring_ent, > + struct fuse_req *req) > +{ > + struct fuse_ring_queue *queue = ring_ent->queue; > + struct fuse_pqueue *fpq = &queue->fpq; > + unsigned int hash; > + > + /* commit_id is the unique id of the request */ > + ring_ent->commit_id = req->in.h.unique; > + > + req->ring_entry = ring_ent; > + hash = fuse_req_hash(ring_ent->commit_id); > + list_move_tail(&req->list, &fpq->processing[hash]); > +} > + > +/* > + * Assign a fuse queue entry to the given entry > + */ > +static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ring_ent, > + struct fuse_req *req) > +{ > + struct fuse_ring_queue *queue = ring_ent->queue; > + > + lockdep_assert_held(&queue->lock); > + > + if (WARN_ON_ONCE(ring_ent->state != FRRS_AVAILABLE && > + ring_ent->state != FRRS_COMMIT)) { > + pr_warn("%s qid=%d state=%d\n", __func__, ring_ent->queue->qid, > + ring_ent->state); > + } > + list_del_init(&req->list); > + clear_bit(FR_PENDING, &req->flags); > + ring_ent->fuse_req = req; > + ring_ent->state = FRRS_FUSE_REQ; > + list_move(&ring_ent->list, &queue->ent_w_req_queue); > + fuse_uring_add_to_pq(ring_ent, req); > +} > + > +/* > + * Release the ring entry and fetch the next fuse request if available > + * > + * @return true if a new request has been fetched > + */ > +static bool fuse_uring_ent_assign_req(struct fuse_ring_ent *ring_ent) > + __must_hold(&queue->lock) > +{ > + struct fuse_req *req; > + struct fuse_ring_queue *queue = ring_ent->queue; > + struct list_head *req_queue = &queue->fuse_req_queue; > + > + lockdep_assert_held(&queue->lock); > + > + /* get and assign the next entry while it is still holding the lock */ > + req = list_first_entry_or_null(req_queue, struct fuse_req, list); > + if (req) { > + fuse_uring_add_req_to_ring_ent(ring_ent, req); > + return true; > + } > + > + return false; > +} > + > +/* > + * Read data from the ring buffer, which user space has written to > + * This is comparible with handling of classical write(/dev/fuse). > + * Also make the ring request available again for new fuse requests. > + */ > +static void fuse_uring_commit(struct fuse_ring_ent *ring_ent, > + unsigned int issue_flags) > +{ > + struct fuse_ring *ring = ring_ent->queue->ring; > + struct fuse_conn *fc = ring->fc; > + struct fuse_req *req = ring_ent->fuse_req; > + ssize_t err = 0; > + bool set_err = false; > + > + err = copy_from_user(&req->out.h, &ring_ent->headers->in_out, > + sizeof(req->out.h)); > + if (err) { > + req->out.h.error = err; > + goto out; > + } > + > + err = fuse_uring_out_header_has_err(&req->out.h, req, fc); > + if (err) { > + /* req->out.h.error already set */ > + goto out; > + } > + > + err = fuse_uring_copy_from_ring(ring, req, ring_ent); > + if (err) > + set_err = true; > + > +out: > + fuse_uring_req_end(ring_ent, set_err, err); > +} > + > +/* > + * Get the next fuse req and send it > + */ > +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent, > + struct fuse_ring_queue *queue, > + unsigned int issue_flags) > +{ > + int err; > + bool has_next; > + > +retry: > + spin_lock(&queue->lock); > + fuse_uring_ent_avail(ring_ent, queue); > + has_next = fuse_uring_ent_assign_req(ring_ent); > + spin_unlock(&queue->lock); > + > + if (has_next) { > + err = fuse_uring_send_next_to_ring(ring_ent, issue_flags); > + if (err) > + goto retry; > + } > +} > + > +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent) > +{ > + struct fuse_ring_queue *queue = ent->queue; > + > + lockdep_assert_held(&queue->lock); > + > + if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) > + return -EIO; > + > + ent->state = FRRS_COMMIT; > + list_move(&ent->list, &queue->ent_commit_queue); > + > + return 0; > +} > + > +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */ > +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, > + struct fuse_conn *fc) > +{ > + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); > + struct fuse_ring_ent *ring_ent; > + int err; > + struct fuse_ring *ring = fc->ring; > + struct fuse_ring_queue *queue; > + uint64_t commit_id = READ_ONCE(cmd_req->commit_id); > + unsigned int qid = READ_ONCE(cmd_req->qid); > + struct fuse_pqueue *fpq; > + struct fuse_req *req; > + > + err = -ENOTCONN; > + if (!ring) > + return err; > + > + if (qid >= ring->nr_queues) > + return -EINVAL; > + > + queue = ring->queues[qid]; > + if (!queue) > + return err; > + fpq = &queue->fpq; > + > + spin_lock(&queue->lock); > + /* Find a request based on the unique ID of the fuse request > + * This should get revised, as it needs a hash calculation and list > + * search. And full struct fuse_pqueue is needed (memory overhead). > + * As well as the link from req to ring_ent. > + */ imo, the hash calculation and list search seems ok. I can't think of a more optimal way of doing it. Instead of using the full struct fuse_pqueue, I think we could just have the "struct list_head *processing" defined inside "struct fuse_ring_queue" and change fuse_request_find() to take in a list_head. I don't think we need a dedicated spinlock for the list either. We can just reuse queue->lock, as that's (currently) always held already when the processing list is accessed. > + req = fuse_request_find(fpq, commit_id); > + err = -ENOENT; > + if (!req) { > + pr_info("qid=%d commit_id %llu not found\n", queue->qid, > + commit_id); > + spin_unlock(&queue->lock); > + return err; > + } > + list_del_init(&req->list); > + ring_ent = req->ring_entry; > + req->ring_entry = NULL; Do we need to set this to NULL, given that the request will be cleaned up later in fuse_uring_req_end() anyways? > + > + err = fuse_ring_ent_set_commit(ring_ent); > + if (err != 0) { > + pr_info_ratelimited("qid=%d commit_id %llu state %d", > + queue->qid, commit_id, ring_ent->state); > + spin_unlock(&queue->lock); > + return err; > + } > + > + ring_ent->cmd = cmd; > + spin_unlock(&queue->lock); > + > + /* without the queue lock, as other locks are taken */ > + fuse_uring_commit(ring_ent, issue_flags); > + > + /* > + * Fetching the next request is absolutely required as queued > + * fuse requests would otherwise not get processed - committing > + * and fetching is done in one step vs legacy fuse, which has separated > + * read (fetch request) and write (commit result). > + */ > + fuse_uring_next_fuse_req(ring_ent, queue, issue_flags); If there's no request ready to read next, then no request will be fetched and this will return. However, as I understand it, once the uring is registered, userspace should only be interacting with the uring via FUSE_IO_URING_CMD_COMMIT_AND_FETCH. However for the case where no request was ready to read, it seems like userspace would have nothing to commit when it wants to fetch the next request? A more general question though: I imagine the most common use case from the server side is waiting / polling until there is a request to fetch. Could we not just do that here in the kernel instead with adding a waitqueue mechanism and having fuse_uring_next_fuse_req() only return when there is a request available? It seems like that would reduce the amount of overhead instead of doing the waiting/checking from the server side? > + return 0; > +} > + > /* > * fuse_uring_req_fetch command handling > */ > @@ -325,6 +767,14 @@ int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd, > return err; > } > break; > + case FUSE_IO_URING_CMD_COMMIT_AND_FETCH: > + err = fuse_uring_commit_fetch(cmd, issue_flags, fc); > + if (err) { > + pr_info_once("FUSE_IO_URING_COMMIT_AND_FETCH failed err=%d\n", > + err); > + return err; > + } > + break; > default: > return -EINVAL; > } > diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h > index 4e46dd65196d26dabc62dada33b17de9aa511c08..80f1c62d4df7f0ca77c4d5179068df6ffdbf7d85 100644 > --- a/fs/fuse/dev_uring_i.h > +++ b/fs/fuse/dev_uring_i.h > @@ -20,6 +20,9 @@ enum fuse_ring_req_state { > /* The ring entry is waiting for new fuse requests */ > FRRS_AVAILABLE, > > + /* The ring entry got assigned a fuse req */ > + FRRS_FUSE_REQ, > + > /* The ring entry is in or on the way to user space */ > FRRS_USERSPACE, > }; > @@ -70,7 +73,16 @@ struct fuse_ring_queue { > * entries in the process of being committed or in the process > * to be sent to userspace > */ > + struct list_head ent_w_req_queue; What does the w in this stand for? I find the name ambiguous here. Thanks, Joanne > struct list_head ent_commit_queue; > + > + /* entries in userspace */ > + struct list_head ent_in_userspace; > + > + /* fuse requests waiting for an entry slot */ > + struct list_head fuse_req_queue; > + > + struct fuse_pqueue fpq; > }; > > /** > diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h > index e545b0864dd51e82df61cc39bdf65d3d36a418dc..e71556894bc25808581424ec7bdd4afeebc81f15 100644 > --- a/fs/fuse/fuse_i.h > +++ b/fs/fuse/fuse_i.h > @@ -438,6 +438,10 @@ struct fuse_req { > > /** fuse_mount this request belongs to */ > struct fuse_mount *fm; > + > +#ifdef CONFIG_FUSE_IO_URING > + void *ring_entry; > +#endif > }; > > struct fuse_iqueue; > > -- > 2.43.0 >
On 1/7/25 00:25, Bernd Schubert wrote: > This adds support for fuse request completion through ring SQEs > (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing > the ring entry it becomes available for new fuse requests. > Handling of requests through the ring (SQE/CQE handling) > is complete now. > > Fuse request data are copied through the mmaped ring buffer, > there is no support for any zero copy yet. Reviewed-by: Pavel Begunkov <asml.silence@gmail.com> # io_uring > > Signed-off-by: Bernd Schubert <bschubert@ddn.com> > --- > fs/fuse/dev_uring.c | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++ > fs/fuse/dev_uring_i.h | 12 ++ > fs/fuse/fuse_i.h | 4 + > 3 files changed, 466 insertions(+) > > diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c > index b44ba4033615e01041313c040035b6da6af0ee17..f44e66a7ea577390da87e9ac7d118a9416898c28 100644 > --- a/fs/fuse/dev_uring.c > +++ b/fs/fuse/dev_uring.c > @@ -26,6 +26,19 @@ bool fuse_uring_enabled(void) ... > + > +/* > + * Write data to the ring buffer and send the request to userspace, > + * userspace will read it > + * This is comparable with classical read(/dev/fuse) > + */ > +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent, > + unsigned int issue_flags) > +{ > + int err = 0; > + struct fuse_ring_queue *queue = ring_ent->queue; > + > + err = fuse_uring_prepare_send(ring_ent); > + if (err) > + goto err; > + > + spin_lock(&queue->lock); > + ring_ent->state = FRRS_USERSPACE; > + list_move(&ring_ent->list, &queue->ent_in_userspace); > + spin_unlock(&queue->lock); > + > + io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags); > + ring_ent->cmd = NULL; I haven't checked if it races with some reallocation, but you might want to consider clearing it under the spin. spin_lock(&queue->lock); ... cmd = ring_ent->cmd; ring_ent->cmd = NULL; spin_unlock(&queue->lock); io_uring_cmd_done(cmd); Can be done on top if even needed.
On 1/17/25 12:18, Pavel Begunkov wrote: > On 1/7/25 00:25, Bernd Schubert wrote: >> This adds support for fuse request completion through ring SQEs >> (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing >> the ring entry it becomes available for new fuse requests. >> Handling of requests through the ring (SQE/CQE handling) >> is complete now. >> >> Fuse request data are copied through the mmaped ring buffer, >> there is no support for any zero copy yet. > > Reviewed-by: Pavel Begunkov <asml.silence@gmail.com> # io_uring > >> >> Signed-off-by: Bernd Schubert <bschubert@ddn.com> >> --- >> fs/fuse/dev_uring.c | 450 ++++++++++++++++++++++++++++++++++++++++ >> ++++++++++ >> fs/fuse/dev_uring_i.h | 12 ++ >> fs/fuse/fuse_i.h | 4 + >> 3 files changed, 466 insertions(+) >> >> diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c >> index >> b44ba4033615e01041313c040035b6da6af0ee17..f44e66a7ea577390da87e9ac7d118a9416898c28 100644 >> --- a/fs/fuse/dev_uring.c >> +++ b/fs/fuse/dev_uring.c >> @@ -26,6 +26,19 @@ bool fuse_uring_enabled(void) > ... >> + >> +/* >> + * Write data to the ring buffer and send the request to userspace, >> + * userspace will read it >> + * This is comparable with classical read(/dev/fuse) >> + */ >> +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent, >> + unsigned int issue_flags) >> +{ >> + int err = 0; >> + struct fuse_ring_queue *queue = ring_ent->queue; >> + >> + err = fuse_uring_prepare_send(ring_ent); >> + if (err) >> + goto err; >> + >> + spin_lock(&queue->lock); >> + ring_ent->state = FRRS_USERSPACE; >> + list_move(&ring_ent->list, &queue->ent_in_userspace); >> + spin_unlock(&queue->lock); >> + >> + io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags); >> + ring_ent->cmd = NULL; > > I haven't checked if it races with some reallocation, but > you might want to consider clearing it under the spin. > > spin_lock(&queue->lock); > ... > cmd = ring_ent->cmd; > ring_ent->cmd = NULL; > spin_unlock(&queue->lock); > > io_uring_cmd_done(cmd); > > Can be done on top if even needed. Yes, thanks for your review! That is what I actually have in the ddn branch, when I found the startup race. Thanks, Bernd
Hi Joanne, sorry for my late reply, I was occupied all week. On 1/13/25 23:44, Joanne Koong wrote: > On Mon, Jan 6, 2025 at 4:25 PM Bernd Schubert <bschubert@ddn.com> wrote: >> >> This adds support for fuse request completion through ring SQEs >> (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing >> the ring entry it becomes available for new fuse requests. >> Handling of requests through the ring (SQE/CQE handling) >> is complete now. >> >> Fuse request data are copied through the mmaped ring buffer, >> there is no support for any zero copy yet. >> >> Signed-off-by: Bernd Schubert <bschubert@ddn.com> >> --- >> fs/fuse/dev_uring.c | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++ >> fs/fuse/dev_uring_i.h | 12 ++ >> fs/fuse/fuse_i.h | 4 + >> 3 files changed, 466 insertions(+) >> >> diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c >> index b44ba4033615e01041313c040035b6da6af0ee17..f44e66a7ea577390da87e9ac7d118a9416898c28 100644 >> --- a/fs/fuse/dev_uring.c >> +++ b/fs/fuse/dev_uring.c >> @@ -26,6 +26,19 @@ bool fuse_uring_enabled(void) >> return enable_uring; >> } >> >> +static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err, >> + int error) >> +{ >> + struct fuse_req *req = ring_ent->fuse_req; >> + >> + if (set_err) >> + req->out.h.error = error; > > I think we could get away with not having the "bool set_err" as an > argument if we do "if (error)" directly. AFAICT, we can use the value > of error directly since it always returns zero on success and any > non-zero value is considered an error. I had done this because of fuse_uring_commit() err = fuse_uring_out_header_has_err(&req->out.h, req, fc); if (err) { /* req->out.h.error already set */ goto out; } In fuse_uring_out_header_has_err() the header might already have the error code, but there are other errors as well. Well, setting an existing error code saves us a few lines and conditions, so you are probably right and I removed that argument now. > >> + >> + clear_bit(FR_SENT, &req->flags); >> + fuse_request_end(ring_ent->fuse_req); >> + ring_ent->fuse_req = NULL; >> +} >> + >> void fuse_uring_destruct(struct fuse_conn *fc) >> { >> struct fuse_ring *ring = fc->ring; >> @@ -41,8 +54,11 @@ void fuse_uring_destruct(struct fuse_conn *fc) >> continue; >> >> WARN_ON(!list_empty(&queue->ent_avail_queue)); >> + WARN_ON(!list_empty(&queue->ent_w_req_queue)); >> WARN_ON(!list_empty(&queue->ent_commit_queue)); >> + WARN_ON(!list_empty(&queue->ent_in_userspace)); >> >> + kfree(queue->fpq.processing); >> kfree(queue); >> ring->queues[qid] = NULL; >> } >> @@ -101,20 +117,34 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, >> { >> struct fuse_conn *fc = ring->fc; >> struct fuse_ring_queue *queue; >> + struct list_head *pq; >> >> queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); >> if (!queue) >> return NULL; >> + pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL); >> + if (!pq) { >> + kfree(queue); >> + return NULL; >> + } >> + >> queue->qid = qid; >> queue->ring = ring; >> spin_lock_init(&queue->lock); >> >> INIT_LIST_HEAD(&queue->ent_avail_queue); >> INIT_LIST_HEAD(&queue->ent_commit_queue); >> + INIT_LIST_HEAD(&queue->ent_w_req_queue); >> + INIT_LIST_HEAD(&queue->ent_in_userspace); >> + INIT_LIST_HEAD(&queue->fuse_req_queue); >> + >> + queue->fpq.processing = pq; >> + fuse_pqueue_init(&queue->fpq); >> >> spin_lock(&fc->lock); >> if (ring->queues[qid]) { >> spin_unlock(&fc->lock); >> + kfree(queue->fpq.processing); >> kfree(queue); >> return ring->queues[qid]; >> } >> @@ -128,6 +158,214 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, >> return queue; >> } >> >> +/* >> + * Checks for errors and stores it into the request >> + */ >> +static int fuse_uring_out_header_has_err(struct fuse_out_header *oh, >> + struct fuse_req *req, >> + struct fuse_conn *fc) >> +{ >> + int err; >> + >> + err = -EINVAL; >> + if (oh->unique == 0) { >> + /* Not supportd through io-uring yet */ >> + pr_warn_once("notify through fuse-io-uring not supported\n"); >> + goto seterr; >> + } >> + >> + err = -EINVAL; >> + if (oh->error <= -ERESTARTSYS || oh->error > 0) >> + goto seterr; >> + >> + if (oh->error) { >> + err = oh->error; >> + goto err; >> + } >> + >> + err = -ENOENT; >> + if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) { >> + pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n", >> + req->in.h.unique, >> + oh->unique & ~FUSE_INT_REQ_BIT); >> + goto seterr; >> + } >> + >> + /* >> + * Is it an interrupt reply ID? >> + * XXX: Not supported through fuse-io-uring yet, it should not even >> + * find the request - should not happen. >> + */ >> + WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT); >> + >> + return 0; >> + >> +seterr: >> + oh->error = err; >> +err: >> + return err; >> +} >> + >> +static int fuse_uring_copy_from_ring(struct fuse_ring *ring, >> + struct fuse_req *req, >> + struct fuse_ring_ent *ent) >> +{ >> + struct fuse_copy_state cs; >> + struct fuse_args *args = req->args; >> + struct iov_iter iter; >> + int err, res; >> + struct fuse_uring_ent_in_out ring_in_out; >> + >> + res = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out, >> + sizeof(ring_in_out)); >> + if (res) >> + return -EFAULT; >> + >> + err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz, >> + &iter); >> + if (err) >> + return err; >> + >> + fuse_copy_init(&cs, 0, &iter); >> + cs.is_uring = 1; >> + cs.req = req; >> + >> + return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz); >> +} >> + >> + /* >> + * Copy data from the req to the ring buffer >> + */ >> +static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req, >> + struct fuse_ring_ent *ent) >> +{ >> + struct fuse_copy_state cs; >> + struct fuse_args *args = req->args; >> + struct fuse_in_arg *in_args = args->in_args; >> + int num_args = args->in_numargs; >> + int err, res; >> + struct iov_iter iter; >> + struct fuse_uring_ent_in_out ent_in_out = { >> + .flags = 0, >> + .commit_id = ent->commit_id, >> + }; >> + >> + if (WARN_ON(ent_in_out.commit_id == 0)) >> + return -EINVAL; >> + >> + err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter); >> + if (err) { >> + pr_info_ratelimited("fuse: Import of user buffer failed\n"); >> + return err; >> + } >> + >> + fuse_copy_init(&cs, 1, &iter); >> + cs.is_uring = 1; >> + cs.req = req; >> + >> + if (num_args > 0) { >> + /* >> + * Expectation is that the first argument is the per op header. >> + * Some op code have that as zero. >> + */ >> + if (args->in_args[0].size > 0) { >> + res = copy_to_user(&ent->headers->op_in, in_args->value, >> + in_args->size); >> + err = res > 0 ? -EFAULT : res; >> + if (err) { >> + pr_info_ratelimited( >> + "Copying the header failed.\n"); >> + return err; >> + } >> + } >> + in_args++; >> + num_args--; >> + } >> + >> + /* copy the payload */ >> + err = fuse_copy_args(&cs, num_args, args->in_pages, >> + (struct fuse_arg *)in_args, 0); >> + if (err) { >> + pr_info_ratelimited("%s fuse_copy_args failed\n", __func__); >> + return err; >> + } >> + >> + ent_in_out.payload_sz = cs.ring.copied_sz; >> + res = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out, >> + sizeof(ent_in_out)); >> + err = res > 0 ? -EFAULT : res; >> + if (err) >> + return err; >> + >> + return 0; >> +} >> + >> +static int >> +fuse_uring_prepare_send(struct fuse_ring_ent *ring_ent) >> +{ >> + struct fuse_ring_queue *queue = ring_ent->queue; >> + struct fuse_ring *ring = queue->ring; >> + struct fuse_req *req = ring_ent->fuse_req; >> + int err, res; >> + >> + err = -EIO; >> + if (WARN_ON(ring_ent->state != FRRS_FUSE_REQ)) { >> + pr_err("qid=%d ring-req=%p invalid state %d on send\n", >> + queue->qid, ring_ent, ring_ent->state); >> + err = -EIO; >> + goto err; >> + } >> + >> + /* copy the request */ >> + err = fuse_uring_copy_to_ring(ring, req, ring_ent); >> + if (unlikely(err)) { >> + pr_info_ratelimited("Copy to ring failed: %d\n", err); >> + goto err; >> + } >> + >> + /* copy fuse_in_header */ >> + res = copy_to_user(&ring_ent->headers->in_out, &req->in.h, >> + sizeof(req->in.h)); >> + err = res > 0 ? -EFAULT : res; >> + if (err) >> + goto err; >> + >> + set_bit(FR_SENT, &req->flags); >> + return 0; >> + >> +err: >> + fuse_uring_req_end(ring_ent, true, err); >> + return err; >> +} >> + >> +/* >> + * Write data to the ring buffer and send the request to userspace, >> + * userspace will read it >> + * This is comparable with classical read(/dev/fuse) >> + */ >> +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent, >> + unsigned int issue_flags) >> +{ >> + int err = 0; >> + struct fuse_ring_queue *queue = ring_ent->queue; >> + >> + err = fuse_uring_prepare_send(ring_ent); >> + if (err) >> + goto err; >> + >> + spin_lock(&queue->lock); >> + ring_ent->state = FRRS_USERSPACE; >> + list_move(&ring_ent->list, &queue->ent_in_userspace); >> + spin_unlock(&queue->lock); >> + >> + io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags); >> + ring_ent->cmd = NULL; >> + return 0; >> + >> +err: >> + return err; >> +} >> + >> /* >> * Make a ring entry available for fuse_req assignment >> */ >> @@ -138,6 +376,210 @@ static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent, >> ring_ent->state = FRRS_AVAILABLE; >> } >> >> +/* Used to find the request on SQE commit */ >> +static void fuse_uring_add_to_pq(struct fuse_ring_ent *ring_ent, >> + struct fuse_req *req) >> +{ >> + struct fuse_ring_queue *queue = ring_ent->queue; >> + struct fuse_pqueue *fpq = &queue->fpq; >> + unsigned int hash; >> + >> + /* commit_id is the unique id of the request */ >> + ring_ent->commit_id = req->in.h.unique; >> + >> + req->ring_entry = ring_ent; >> + hash = fuse_req_hash(ring_ent->commit_id); >> + list_move_tail(&req->list, &fpq->processing[hash]); >> +} >> + >> +/* >> + * Assign a fuse queue entry to the given entry >> + */ >> +static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ring_ent, >> + struct fuse_req *req) >> +{ >> + struct fuse_ring_queue *queue = ring_ent->queue; >> + >> + lockdep_assert_held(&queue->lock); >> + >> + if (WARN_ON_ONCE(ring_ent->state != FRRS_AVAILABLE && >> + ring_ent->state != FRRS_COMMIT)) { >> + pr_warn("%s qid=%d state=%d\n", __func__, ring_ent->queue->qid, >> + ring_ent->state); >> + } >> + list_del_init(&req->list); >> + clear_bit(FR_PENDING, &req->flags); >> + ring_ent->fuse_req = req; >> + ring_ent->state = FRRS_FUSE_REQ; >> + list_move(&ring_ent->list, &queue->ent_w_req_queue); >> + fuse_uring_add_to_pq(ring_ent, req); >> +} >> + >> +/* >> + * Release the ring entry and fetch the next fuse request if available >> + * >> + * @return true if a new request has been fetched >> + */ >> +static bool fuse_uring_ent_assign_req(struct fuse_ring_ent *ring_ent) >> + __must_hold(&queue->lock) >> +{ >> + struct fuse_req *req; >> + struct fuse_ring_queue *queue = ring_ent->queue; >> + struct list_head *req_queue = &queue->fuse_req_queue; >> + >> + lockdep_assert_held(&queue->lock); >> + >> + /* get and assign the next entry while it is still holding the lock */ >> + req = list_first_entry_or_null(req_queue, struct fuse_req, list); >> + if (req) { >> + fuse_uring_add_req_to_ring_ent(ring_ent, req); >> + return true; >> + } >> + >> + return false; >> +} >> + >> +/* >> + * Read data from the ring buffer, which user space has written to >> + * This is comparible with handling of classical write(/dev/fuse). >> + * Also make the ring request available again for new fuse requests. >> + */ >> +static void fuse_uring_commit(struct fuse_ring_ent *ring_ent, >> + unsigned int issue_flags) >> +{ >> + struct fuse_ring *ring = ring_ent->queue->ring; >> + struct fuse_conn *fc = ring->fc; >> + struct fuse_req *req = ring_ent->fuse_req; >> + ssize_t err = 0; >> + bool set_err = false; >> + >> + err = copy_from_user(&req->out.h, &ring_ent->headers->in_out, >> + sizeof(req->out.h)); >> + if (err) { >> + req->out.h.error = err; >> + goto out; >> + } >> + >> + err = fuse_uring_out_header_has_err(&req->out.h, req, fc); >> + if (err) { >> + /* req->out.h.error already set */ >> + goto out; >> + } >> + >> + err = fuse_uring_copy_from_ring(ring, req, ring_ent); >> + if (err) >> + set_err = true; >> + >> +out: >> + fuse_uring_req_end(ring_ent, set_err, err); >> +} >> + >> +/* >> + * Get the next fuse req and send it >> + */ >> +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent, >> + struct fuse_ring_queue *queue, >> + unsigned int issue_flags) >> +{ >> + int err; >> + bool has_next; >> + >> +retry: >> + spin_lock(&queue->lock); >> + fuse_uring_ent_avail(ring_ent, queue); >> + has_next = fuse_uring_ent_assign_req(ring_ent); >> + spin_unlock(&queue->lock); >> + >> + if (has_next) { >> + err = fuse_uring_send_next_to_ring(ring_ent, issue_flags); >> + if (err) >> + goto retry; >> + } >> +} >> + >> +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent) >> +{ >> + struct fuse_ring_queue *queue = ent->queue; >> + >> + lockdep_assert_held(&queue->lock); >> + >> + if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) >> + return -EIO; >> + >> + ent->state = FRRS_COMMIT; >> + list_move(&ent->list, &queue->ent_commit_queue); >> + >> + return 0; >> +} >> + >> +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */ >> +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, >> + struct fuse_conn *fc) >> +{ >> + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); >> + struct fuse_ring_ent *ring_ent; >> + int err; >> + struct fuse_ring *ring = fc->ring; >> + struct fuse_ring_queue *queue; >> + uint64_t commit_id = READ_ONCE(cmd_req->commit_id); >> + unsigned int qid = READ_ONCE(cmd_req->qid); >> + struct fuse_pqueue *fpq; >> + struct fuse_req *req; >> + >> + err = -ENOTCONN; >> + if (!ring) >> + return err; >> + >> + if (qid >= ring->nr_queues) >> + return -EINVAL; >> + >> + queue = ring->queues[qid]; >> + if (!queue) >> + return err; >> + fpq = &queue->fpq; >> + >> + spin_lock(&queue->lock); >> + /* Find a request based on the unique ID of the fuse request >> + * This should get revised, as it needs a hash calculation and list >> + * search. And full struct fuse_pqueue is needed (memory overhead). >> + * As well as the link from req to ring_ent. >> + */ > > imo, the hash calculation and list search seems ok. I can't think of a > more optimal way of doing it. Instead of using the full struct > fuse_pqueue, I think we could just have the "struct list_head > *processing" defined inside "struct fuse_ring_queue" and change > fuse_request_find() to take in a list_head. I don't think we need a > dedicated spinlock for the list either. We can just reuse queue->lock, > as that's (currently) always held already when the processing list is > accessed. Please see the attached patch, which uses xarray. Totally untested, though. I actually found an issue while writing this patch - FR_PENDING was cleared without holding fiq->lock, but that is important for request_wait_answer(). If something removes req from the list, we entirely loose the ring entry - can never be used anymore. Personally I think the attached patch is safer. > > >> + req = fuse_request_find(fpq, commit_id); >> + err = -ENOENT; >> + if (!req) { >> + pr_info("qid=%d commit_id %llu not found\n", queue->qid, >> + commit_id); >> + spin_unlock(&queue->lock); >> + return err; >> + } >> + list_del_init(&req->list); >> + ring_ent = req->ring_entry; >> + req->ring_entry = NULL; > > Do we need to set this to NULL, given that the request will be cleaned > up later in fuse_uring_req_end() anyways? It is not explicitly set to NULL in that function. Would you mind to keep it safe? > >> + >> + err = fuse_ring_ent_set_commit(ring_ent); >> + if (err != 0) { >> + pr_info_ratelimited("qid=%d commit_id %llu state %d", >> + queue->qid, commit_id, ring_ent->state); >> + spin_unlock(&queue->lock); >> + return err; >> + } >> + >> + ring_ent->cmd = cmd; >> + spin_unlock(&queue->lock); >> + >> + /* without the queue lock, as other locks are taken */ >> + fuse_uring_commit(ring_ent, issue_flags); >> + >> + /* >> + * Fetching the next request is absolutely required as queued >> + * fuse requests would otherwise not get processed - committing >> + * and fetching is done in one step vs legacy fuse, which has separated >> + * read (fetch request) and write (commit result). >> + */ >> + fuse_uring_next_fuse_req(ring_ent, queue, issue_flags); > > If there's no request ready to read next, then no request will be > fetched and this will return. However, as I understand it, once the > uring is registered, userspace should only be interacting with the > uring via FUSE_IO_URING_CMD_COMMIT_AND_FETCH. However for the case > where no request was ready to read, it seems like userspace would have > nothing to commit when it wants to fetch the next request? We have FUSE_IO_URING_CMD_REGISTER FUSE_IO_URING_CMD_COMMIT_AND_FETCH After _CMD_REGISTER the corresponding ring-entry is ready to get fuse requests and waiting. After it gets a request assigned and handles it by fuse server the _COMMIT_AND_FETCH scheme applies. Did you possibly miss that _CMD_REGISTER will already have it waiting? > > A more general question though: I imagine the most common use case > from the server side is waiting / polling until there is a request to > fetch. Could we not just do that here in the kernel instead with > adding a waitqueue mechanism and having fuse_uring_next_fuse_req() > only return when there is a request available? It seems like that > would reduce the amount of overhead instead of doing the > waiting/checking from the server side? The io-uring interface says that we should return -EIOCBQUEUED. If we would wait here, other SQEs that are submitted in parallel by fuse-server couldn't be handled anymore, as we wouldn't return to io-uring (all of this is in io-uring task context). > >> + return 0; >> +} >> + >> /* >> * fuse_uring_req_fetch command handling >> */ >> @@ -325,6 +767,14 @@ int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd, >> return err; >> } >> break; >> + case FUSE_IO_URING_CMD_COMMIT_AND_FETCH: >> + err = fuse_uring_commit_fetch(cmd, issue_flags, fc); >> + if (err) { >> + pr_info_once("FUSE_IO_URING_COMMIT_AND_FETCH failed err=%d\n", >> + err); >> + return err; >> + } >> + break; >> default: >> return -EINVAL; >> } >> diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h >> index 4e46dd65196d26dabc62dada33b17de9aa511c08..80f1c62d4df7f0ca77c4d5179068df6ffdbf7d85 100644 >> --- a/fs/fuse/dev_uring_i.h >> +++ b/fs/fuse/dev_uring_i.h >> @@ -20,6 +20,9 @@ enum fuse_ring_req_state { >> /* The ring entry is waiting for new fuse requests */ >> FRRS_AVAILABLE, >> >> + /* The ring entry got assigned a fuse req */ >> + FRRS_FUSE_REQ, >> + >> /* The ring entry is in or on the way to user space */ >> FRRS_USERSPACE, >> }; >> @@ -70,7 +73,16 @@ struct fuse_ring_queue { >> * entries in the process of being committed or in the process >> * to be sent to userspace >> */ >> + struct list_head ent_w_req_queue; > > What does the w in this stand for? I find the name ambiguous here. "entry-with-request-queue". Do you have another naming suggestion? Thanks, Bernd
On Sun, Jan 19, 2025 at 4:33 PM Bernd Schubert <bernd@bsbernd.com> wrote: > > Hi Joanne, > > sorry for my late reply, I was occupied all week. > > On 1/13/25 23:44, Joanne Koong wrote: > > On Mon, Jan 6, 2025 at 4:25 PM Bernd Schubert <bschubert@ddn.com> wrote: > >> > >> This adds support for fuse request completion through ring SQEs > >> (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing > >> the ring entry it becomes available for new fuse requests. > >> Handling of requests through the ring (SQE/CQE handling) > >> is complete now. > >> > >> Fuse request data are copied through the mmaped ring buffer, > >> there is no support for any zero copy yet. > >> > >> Signed-off-by: Bernd Schubert <bschubert@ddn.com> > >> --- > >> fs/fuse/dev_uring.c | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++ > >> fs/fuse/dev_uring_i.h | 12 ++ > >> fs/fuse/fuse_i.h | 4 + > >> 3 files changed, 466 insertions(+) > >> > >> diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c > >> index b44ba4033615e01041313c040035b6da6af0ee17..f44e66a7ea577390da87e9ac7d118a9416898c28 100644 > >> --- a/fs/fuse/dev_uring.c > >> +++ b/fs/fuse/dev_uring.c > >> @@ -26,6 +26,19 @@ bool fuse_uring_enabled(void) > >> return enable_uring; > >> } > >> > >> +static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err, > >> + int error) > >> +{ > >> + struct fuse_req *req = ring_ent->fuse_req; > >> + > >> + if (set_err) > >> + req->out.h.error = error; > > > > I think we could get away with not having the "bool set_err" as an > > argument if we do "if (error)" directly. AFAICT, we can use the value > > of error directly since it always returns zero on success and any > > non-zero value is considered an error. > > I had done this because of fuse_uring_commit() > > > err = fuse_uring_out_header_has_err(&req->out.h, req, fc); > if (err) { > /* req->out.h.error already set */ > goto out; > } > > > In fuse_uring_out_header_has_err() the header might already have the > error code, but there are other errors as well. Well, setting an > existing error code saves us a few lines and conditions, so you are > probably right and I removed that argument now. > > > > > >> + > >> + clear_bit(FR_SENT, &req->flags); > >> + fuse_request_end(ring_ent->fuse_req); > >> + ring_ent->fuse_req = NULL; > >> +} > >> + > >> void fuse_uring_destruct(struct fuse_conn *fc) > >> { > >> struct fuse_ring *ring = fc->ring; > >> @@ -41,8 +54,11 @@ void fuse_uring_destruct(struct fuse_conn *fc) > >> continue; > >> > >> WARN_ON(!list_empty(&queue->ent_avail_queue)); > >> + WARN_ON(!list_empty(&queue->ent_w_req_queue)); > >> WARN_ON(!list_empty(&queue->ent_commit_queue)); > >> + WARN_ON(!list_empty(&queue->ent_in_userspace)); > >> > >> + kfree(queue->fpq.processing); > >> kfree(queue); > >> ring->queues[qid] = NULL; > >> } > >> @@ -101,20 +117,34 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, > >> { > >> struct fuse_conn *fc = ring->fc; > >> struct fuse_ring_queue *queue; > >> + struct list_head *pq; > >> > >> queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); > >> if (!queue) > >> return NULL; > >> + pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL); > >> + if (!pq) { > >> + kfree(queue); > >> + return NULL; > >> + } > >> + > >> queue->qid = qid; > >> queue->ring = ring; > >> spin_lock_init(&queue->lock); > >> > >> INIT_LIST_HEAD(&queue->ent_avail_queue); > >> INIT_LIST_HEAD(&queue->ent_commit_queue); > >> + INIT_LIST_HEAD(&queue->ent_w_req_queue); > >> + INIT_LIST_HEAD(&queue->ent_in_userspace); > >> + INIT_LIST_HEAD(&queue->fuse_req_queue); > >> + > >> + queue->fpq.processing = pq; > >> + fuse_pqueue_init(&queue->fpq); > >> > >> spin_lock(&fc->lock); > >> if (ring->queues[qid]) { > >> spin_unlock(&fc->lock); > >> + kfree(queue->fpq.processing); > >> kfree(queue); > >> return ring->queues[qid]; > >> } > >> @@ -128,6 +158,214 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, > >> return queue; > >> } > >> > >> +/* > >> + * Checks for errors and stores it into the request > >> + */ > >> +static int fuse_uring_out_header_has_err(struct fuse_out_header *oh, > >> + struct fuse_req *req, > >> + struct fuse_conn *fc) > >> +{ > >> + int err; > >> + > >> + err = -EINVAL; > >> + if (oh->unique == 0) { > >> + /* Not supportd through io-uring yet */ > >> + pr_warn_once("notify through fuse-io-uring not supported\n"); > >> + goto seterr; > >> + } > >> + > >> + err = -EINVAL; > >> + if (oh->error <= -ERESTARTSYS || oh->error > 0) > >> + goto seterr; > >> + > >> + if (oh->error) { > >> + err = oh->error; > >> + goto err; > >> + } > >> + > >> + err = -ENOENT; > >> + if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) { > >> + pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n", > >> + req->in.h.unique, > >> + oh->unique & ~FUSE_INT_REQ_BIT); > >> + goto seterr; > >> + } > >> + > >> + /* > >> + * Is it an interrupt reply ID? > >> + * XXX: Not supported through fuse-io-uring yet, it should not even > >> + * find the request - should not happen. > >> + */ > >> + WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT); > >> + > >> + return 0; > >> + > >> +seterr: > >> + oh->error = err; > >> +err: > >> + return err; > >> +} > >> + > >> +static int fuse_uring_copy_from_ring(struct fuse_ring *ring, > >> + struct fuse_req *req, > >> + struct fuse_ring_ent *ent) > >> +{ > >> + struct fuse_copy_state cs; > >> + struct fuse_args *args = req->args; > >> + struct iov_iter iter; > >> + int err, res; > >> + struct fuse_uring_ent_in_out ring_in_out; > >> + > >> + res = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out, > >> + sizeof(ring_in_out)); > >> + if (res) > >> + return -EFAULT; > >> + > >> + err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz, > >> + &iter); > >> + if (err) > >> + return err; > >> + > >> + fuse_copy_init(&cs, 0, &iter); > >> + cs.is_uring = 1; > >> + cs.req = req; > >> + > >> + return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz); > >> +} > >> + > >> + /* > >> + * Copy data from the req to the ring buffer > >> + */ > >> +static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req, > >> + struct fuse_ring_ent *ent) > >> +{ > >> + struct fuse_copy_state cs; > >> + struct fuse_args *args = req->args; > >> + struct fuse_in_arg *in_args = args->in_args; > >> + int num_args = args->in_numargs; > >> + int err, res; > >> + struct iov_iter iter; > >> + struct fuse_uring_ent_in_out ent_in_out = { > >> + .flags = 0, > >> + .commit_id = ent->commit_id, > >> + }; > >> + > >> + if (WARN_ON(ent_in_out.commit_id == 0)) > >> + return -EINVAL; > >> + > >> + err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter); > >> + if (err) { > >> + pr_info_ratelimited("fuse: Import of user buffer failed\n"); > >> + return err; > >> + } > >> + > >> + fuse_copy_init(&cs, 1, &iter); > >> + cs.is_uring = 1; > >> + cs.req = req; > >> + > >> + if (num_args > 0) { > >> + /* > >> + * Expectation is that the first argument is the per op header. > >> + * Some op code have that as zero. > >> + */ > >> + if (args->in_args[0].size > 0) { > >> + res = copy_to_user(&ent->headers->op_in, in_args->value, > >> + in_args->size); > >> + err = res > 0 ? -EFAULT : res; > >> + if (err) { > >> + pr_info_ratelimited( > >> + "Copying the header failed.\n"); > >> + return err; > >> + } > >> + } > >> + in_args++; > >> + num_args--; > >> + } > >> + > >> + /* copy the payload */ > >> + err = fuse_copy_args(&cs, num_args, args->in_pages, > >> + (struct fuse_arg *)in_args, 0); > >> + if (err) { > >> + pr_info_ratelimited("%s fuse_copy_args failed\n", __func__); > >> + return err; > >> + } > >> + > >> + ent_in_out.payload_sz = cs.ring.copied_sz; > >> + res = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out, > >> + sizeof(ent_in_out)); > >> + err = res > 0 ? -EFAULT : res; > >> + if (err) > >> + return err; > >> + > >> + return 0; > >> +} > >> + > >> +static int > >> +fuse_uring_prepare_send(struct fuse_ring_ent *ring_ent) > >> +{ > >> + struct fuse_ring_queue *queue = ring_ent->queue; > >> + struct fuse_ring *ring = queue->ring; > >> + struct fuse_req *req = ring_ent->fuse_req; > >> + int err, res; > >> + > >> + err = -EIO; > >> + if (WARN_ON(ring_ent->state != FRRS_FUSE_REQ)) { > >> + pr_err("qid=%d ring-req=%p invalid state %d on send\n", > >> + queue->qid, ring_ent, ring_ent->state); > >> + err = -EIO; > >> + goto err; > >> + } > >> + > >> + /* copy the request */ > >> + err = fuse_uring_copy_to_ring(ring, req, ring_ent); > >> + if (unlikely(err)) { > >> + pr_info_ratelimited("Copy to ring failed: %d\n", err); > >> + goto err; > >> + } > >> + > >> + /* copy fuse_in_header */ > >> + res = copy_to_user(&ring_ent->headers->in_out, &req->in.h, > >> + sizeof(req->in.h)); > >> + err = res > 0 ? -EFAULT : res; > >> + if (err) > >> + goto err; > >> + > >> + set_bit(FR_SENT, &req->flags); > >> + return 0; > >> + > >> +err: > >> + fuse_uring_req_end(ring_ent, true, err); > >> + return err; > >> +} > >> + > >> +/* > >> + * Write data to the ring buffer and send the request to userspace, > >> + * userspace will read it > >> + * This is comparable with classical read(/dev/fuse) > >> + */ > >> +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent, > >> + unsigned int issue_flags) > >> +{ > >> + int err = 0; > >> + struct fuse_ring_queue *queue = ring_ent->queue; > >> + > >> + err = fuse_uring_prepare_send(ring_ent); > >> + if (err) > >> + goto err; > >> + > >> + spin_lock(&queue->lock); > >> + ring_ent->state = FRRS_USERSPACE; > >> + list_move(&ring_ent->list, &queue->ent_in_userspace); > >> + spin_unlock(&queue->lock); > >> + > >> + io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags); > >> + ring_ent->cmd = NULL; > >> + return 0; > >> + > >> +err: > >> + return err; > >> +} > >> + > >> /* > >> * Make a ring entry available for fuse_req assignment > >> */ > >> @@ -138,6 +376,210 @@ static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent, > >> ring_ent->state = FRRS_AVAILABLE; > >> } > >> > >> +/* Used to find the request on SQE commit */ > >> +static void fuse_uring_add_to_pq(struct fuse_ring_ent *ring_ent, > >> + struct fuse_req *req) > >> +{ > >> + struct fuse_ring_queue *queue = ring_ent->queue; > >> + struct fuse_pqueue *fpq = &queue->fpq; > >> + unsigned int hash; > >> + > >> + /* commit_id is the unique id of the request */ > >> + ring_ent->commit_id = req->in.h.unique; > >> + > >> + req->ring_entry = ring_ent; > >> + hash = fuse_req_hash(ring_ent->commit_id); > >> + list_move_tail(&req->list, &fpq->processing[hash]); > >> +} > >> + > >> +/* > >> + * Assign a fuse queue entry to the given entry > >> + */ > >> +static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ring_ent, > >> + struct fuse_req *req) > >> +{ > >> + struct fuse_ring_queue *queue = ring_ent->queue; > >> + > >> + lockdep_assert_held(&queue->lock); > >> + > >> + if (WARN_ON_ONCE(ring_ent->state != FRRS_AVAILABLE && > >> + ring_ent->state != FRRS_COMMIT)) { > >> + pr_warn("%s qid=%d state=%d\n", __func__, ring_ent->queue->qid, > >> + ring_ent->state); > >> + } > >> + list_del_init(&req->list); > >> + clear_bit(FR_PENDING, &req->flags); > >> + ring_ent->fuse_req = req; > >> + ring_ent->state = FRRS_FUSE_REQ; > >> + list_move(&ring_ent->list, &queue->ent_w_req_queue); > >> + fuse_uring_add_to_pq(ring_ent, req); > >> +} > >> + > >> +/* > >> + * Release the ring entry and fetch the next fuse request if available > >> + * > >> + * @return true if a new request has been fetched > >> + */ > >> +static bool fuse_uring_ent_assign_req(struct fuse_ring_ent *ring_ent) > >> + __must_hold(&queue->lock) > >> +{ > >> + struct fuse_req *req; > >> + struct fuse_ring_queue *queue = ring_ent->queue; > >> + struct list_head *req_queue = &queue->fuse_req_queue; > >> + > >> + lockdep_assert_held(&queue->lock); > >> + > >> + /* get and assign the next entry while it is still holding the lock */ > >> + req = list_first_entry_or_null(req_queue, struct fuse_req, list); > >> + if (req) { > >> + fuse_uring_add_req_to_ring_ent(ring_ent, req); > >> + return true; > >> + } > >> + > >> + return false; > >> +} > >> + > >> +/* > >> + * Read data from the ring buffer, which user space has written to > >> + * This is comparible with handling of classical write(/dev/fuse). > >> + * Also make the ring request available again for new fuse requests. > >> + */ > >> +static void fuse_uring_commit(struct fuse_ring_ent *ring_ent, > >> + unsigned int issue_flags) > >> +{ > >> + struct fuse_ring *ring = ring_ent->queue->ring; > >> + struct fuse_conn *fc = ring->fc; > >> + struct fuse_req *req = ring_ent->fuse_req; > >> + ssize_t err = 0; > >> + bool set_err = false; > >> + > >> + err = copy_from_user(&req->out.h, &ring_ent->headers->in_out, > >> + sizeof(req->out.h)); > >> + if (err) { > >> + req->out.h.error = err; > >> + goto out; > >> + } > >> + > >> + err = fuse_uring_out_header_has_err(&req->out.h, req, fc); > >> + if (err) { > >> + /* req->out.h.error already set */ > >> + goto out; > >> + } > >> + > >> + err = fuse_uring_copy_from_ring(ring, req, ring_ent); > >> + if (err) > >> + set_err = true; > >> + > >> +out: > >> + fuse_uring_req_end(ring_ent, set_err, err); > >> +} > >> + > >> +/* > >> + * Get the next fuse req and send it > >> + */ > >> +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent, > >> + struct fuse_ring_queue *queue, > >> + unsigned int issue_flags) > >> +{ > >> + int err; > >> + bool has_next; > >> + > >> +retry: > >> + spin_lock(&queue->lock); > >> + fuse_uring_ent_avail(ring_ent, queue); > >> + has_next = fuse_uring_ent_assign_req(ring_ent); > >> + spin_unlock(&queue->lock); > >> + > >> + if (has_next) { > >> + err = fuse_uring_send_next_to_ring(ring_ent, issue_flags); > >> + if (err) > >> + goto retry; > >> + } > >> +} > >> + > >> +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent) > >> +{ > >> + struct fuse_ring_queue *queue = ent->queue; > >> + > >> + lockdep_assert_held(&queue->lock); > >> + > >> + if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) > >> + return -EIO; > >> + > >> + ent->state = FRRS_COMMIT; > >> + list_move(&ent->list, &queue->ent_commit_queue); > >> + > >> + return 0; > >> +} > >> + > >> +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */ > >> +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, > >> + struct fuse_conn *fc) > >> +{ > >> + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); > >> + struct fuse_ring_ent *ring_ent; > >> + int err; > >> + struct fuse_ring *ring = fc->ring; > >> + struct fuse_ring_queue *queue; > >> + uint64_t commit_id = READ_ONCE(cmd_req->commit_id); > >> + unsigned int qid = READ_ONCE(cmd_req->qid); > >> + struct fuse_pqueue *fpq; > >> + struct fuse_req *req; > >> + > >> + err = -ENOTCONN; > >> + if (!ring) > >> + return err; > >> + > >> + if (qid >= ring->nr_queues) > >> + return -EINVAL; > >> + > >> + queue = ring->queues[qid]; > >> + if (!queue) > >> + return err; > >> + fpq = &queue->fpq; > >> + > >> + spin_lock(&queue->lock); > >> + /* Find a request based on the unique ID of the fuse request > >> + * This should get revised, as it needs a hash calculation and list > >> + * search. And full struct fuse_pqueue is needed (memory overhead). > >> + * As well as the link from req to ring_ent. > >> + */ > > > > imo, the hash calculation and list search seems ok. I can't think of a > > more optimal way of doing it. Instead of using the full struct > > fuse_pqueue, I think we could just have the "struct list_head > > *processing" defined inside "struct fuse_ring_queue" and change > > fuse_request_find() to take in a list_head. I don't think we need a > > dedicated spinlock for the list either. We can just reuse queue->lock, > > as that's (currently) always held already when the processing list is > > accessed. > > > Please see the attached patch, which uses xarray. Totally untested, though. > I actually found an issue while writing this patch - FR_PENDING was cleared > without holding fiq->lock, but that is important for request_wait_answer(). > If something removes req from the list, we entirely loose the ring entry - > can never be used anymore. Personally I think the attached patch is safer. > > > > > > > >> + req = fuse_request_find(fpq, commit_id); > >> + err = -ENOENT; > >> + if (!req) { > >> + pr_info("qid=%d commit_id %llu not found\n", queue->qid, > >> + commit_id); > >> + spin_unlock(&queue->lock); > >> + return err; > >> + } > >> + list_del_init(&req->list); > >> + ring_ent = req->ring_entry; > >> + req->ring_entry = NULL; > > > > Do we need to set this to NULL, given that the request will be cleaned > > up later in fuse_uring_req_end() anyways? > > It is not explicitly set to NULL in that function. Would you mind to keep > it safe? > > > > >> + > >> + err = fuse_ring_ent_set_commit(ring_ent); > >> + if (err != 0) { > >> + pr_info_ratelimited("qid=%d commit_id %llu state %d", > >> + queue->qid, commit_id, ring_ent->state); > >> + spin_unlock(&queue->lock); > >> + return err; > >> + } > >> + > >> + ring_ent->cmd = cmd; > >> + spin_unlock(&queue->lock); > >> + > >> + /* without the queue lock, as other locks are taken */ > >> + fuse_uring_commit(ring_ent, issue_flags); > >> + > >> + /* > >> + * Fetching the next request is absolutely required as queued > >> + * fuse requests would otherwise not get processed - committing > >> + * and fetching is done in one step vs legacy fuse, which has separated > >> + * read (fetch request) and write (commit result). > >> + */ > >> + fuse_uring_next_fuse_req(ring_ent, queue, issue_flags); > > > > If there's no request ready to read next, then no request will be > > fetched and this will return. However, as I understand it, once the > > uring is registered, userspace should only be interacting with the > > uring via FUSE_IO_URING_CMD_COMMIT_AND_FETCH. However for the case > > where no request was ready to read, it seems like userspace would have > > nothing to commit when it wants to fetch the next request? > > We have > > FUSE_IO_URING_CMD_REGISTER > FUSE_IO_URING_CMD_COMMIT_AND_FETCH > > > After _CMD_REGISTER the corresponding ring-entry is ready to get fuse > requests and waiting. After it gets a request assigned and handles it > by fuse server the _COMMIT_AND_FETCH scheme applies. Did you possibly > miss that _CMD_REGISTER will already have it waiting? > Sorry for the late reply. After _CMD_REGISTER and _COMMIT_AND_FETCH, it seems possible that there is no fuse request waiting until a later time? This is the scenario I'm envisioning: a) uring registers successfully and fetches request through _CMD_REGISTER b) server replies to request and fetches new request through _COMMIT_AND_FETCH c) server replies to request, tries to fetch new request but no request is ready, through _COMMIT_AND_FETCH maybe I'm missing something in my reading of the code, but how will the server then fetch the next request once the request is ready? It will have to commit something in order to fetch it since there's only _COMMIT_AND_FETCH which requires a commit, no? Thanks, Joanne > > > > > A more general question though: I imagine the most common use case > > from the server side is waiting / polling until there is a request to > > fetch. Could we not just do that here in the kernel instead with > > adding a waitqueue mechanism and having fuse_uring_next_fuse_req() > > only return when there is a request available? It seems like that > > would reduce the amount of overhead instead of doing the > > waiting/checking from the server side? > > The io-uring interface says that we should return -EIOCBQUEUED. If we > would wait here, other SQEs that are submitted in parallel by > fuse-server couldn't be handled anymore, as we wouldn't return > to io-uring (all of this is in io-uring task context). > > > > >> + return 0; > >> +} > >> + > >> /* > >> * fuse_uring_req_fetch command handling > >> */ > >> @@ -325,6 +767,14 @@ int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd, > >> return err; > >> } > >> break; > >> + case FUSE_IO_URING_CMD_COMMIT_AND_FETCH: > >> + err = fuse_uring_commit_fetch(cmd, issue_flags, fc); > >> + if (err) { > >> + pr_info_once("FUSE_IO_URING_COMMIT_AND_FETCH failed err=%d\n", > >> + err); > >> + return err; > >> + } > >> + break; > >> default: > >> return -EINVAL; > >> } > >> diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h > >> index 4e46dd65196d26dabc62dada33b17de9aa511c08..80f1c62d4df7f0ca77c4d5179068df6ffdbf7d85 100644 > >> --- a/fs/fuse/dev_uring_i.h > >> +++ b/fs/fuse/dev_uring_i.h > >> @@ -20,6 +20,9 @@ enum fuse_ring_req_state { > >> /* The ring entry is waiting for new fuse requests */ > >> FRRS_AVAILABLE, > >> > >> + /* The ring entry got assigned a fuse req */ > >> + FRRS_FUSE_REQ, > >> + > >> /* The ring entry is in or on the way to user space */ > >> FRRS_USERSPACE, > >> }; > >> @@ -70,7 +73,16 @@ struct fuse_ring_queue { > >> * entries in the process of being committed or in the process > >> * to be sent to userspace > >> */ > >> + struct list_head ent_w_req_queue; > > > > What does the w in this stand for? I find the name ambiguous here. > > "entry-with-request-queue". Do you have another naming suggestion? > > > Thanks, > Bernd >
On 1/22/25 01:04, Joanne Koong wrote: > On Sun, Jan 19, 2025 at 4:33 PM Bernd Schubert <bernd@bsbernd.com> wrote: >> >> Hi Joanne, >> >> sorry for my late reply, I was occupied all week. >> >> On 1/13/25 23:44, Joanne Koong wrote: >>> On Mon, Jan 6, 2025 at 4:25 PM Bernd Schubert <bschubert@ddn.com> wrote: >>>> >>>> This adds support for fuse request completion through ring SQEs >>>> (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing >>>> the ring entry it becomes available for new fuse requests. >>>> Handling of requests through the ring (SQE/CQE handling) >>>> is complete now. >>>> >>>> Fuse request data are copied through the mmaped ring buffer, >>>> there is no support for any zero copy yet. >>>> >>>> Signed-off-by: Bernd Schubert <bschubert@ddn.com> >>>> --- >>>> fs/fuse/dev_uring.c | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++ >>>> fs/fuse/dev_uring_i.h | 12 ++ >>>> fs/fuse/fuse_i.h | 4 + >>>> 3 files changed, 466 insertions(+) >>>> >>>> diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c >>>> index b44ba4033615e01041313c040035b6da6af0ee17..f44e66a7ea577390da87e9ac7d118a9416898c28 100644 >>>> --- a/fs/fuse/dev_uring.c >>>> +++ b/fs/fuse/dev_uring.c >>>> @@ -26,6 +26,19 @@ bool fuse_uring_enabled(void) >>>> return enable_uring; >>>> } >>>> >>>> +static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err, >>>> + int error) >>>> +{ >>>> + struct fuse_req *req = ring_ent->fuse_req; >>>> + >>>> + if (set_err) >>>> + req->out.h.error = error; >>> >>> I think we could get away with not having the "bool set_err" as an >>> argument if we do "if (error)" directly. AFAICT, we can use the value >>> of error directly since it always returns zero on success and any >>> non-zero value is considered an error. >> >> I had done this because of fuse_uring_commit() >> >> >> err = fuse_uring_out_header_has_err(&req->out.h, req, fc); >> if (err) { >> /* req->out.h.error already set */ >> goto out; >> } >> >> >> In fuse_uring_out_header_has_err() the header might already have the >> error code, but there are other errors as well. Well, setting an >> existing error code saves us a few lines and conditions, so you are >> probably right and I removed that argument now. >> >> >>> >>>> + >>>> + clear_bit(FR_SENT, &req->flags); >>>> + fuse_request_end(ring_ent->fuse_req); >>>> + ring_ent->fuse_req = NULL; >>>> +} >>>> + >>>> void fuse_uring_destruct(struct fuse_conn *fc) >>>> { >>>> struct fuse_ring *ring = fc->ring; >>>> @@ -41,8 +54,11 @@ void fuse_uring_destruct(struct fuse_conn *fc) >>>> continue; >>>> >>>> WARN_ON(!list_empty(&queue->ent_avail_queue)); >>>> + WARN_ON(!list_empty(&queue->ent_w_req_queue)); >>>> WARN_ON(!list_empty(&queue->ent_commit_queue)); >>>> + WARN_ON(!list_empty(&queue->ent_in_userspace)); >>>> >>>> + kfree(queue->fpq.processing); >>>> kfree(queue); >>>> ring->queues[qid] = NULL; >>>> } >>>> @@ -101,20 +117,34 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, >>>> { >>>> struct fuse_conn *fc = ring->fc; >>>> struct fuse_ring_queue *queue; >>>> + struct list_head *pq; >>>> >>>> queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); >>>> if (!queue) >>>> return NULL; >>>> + pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL); >>>> + if (!pq) { >>>> + kfree(queue); >>>> + return NULL; >>>> + } >>>> + >>>> queue->qid = qid; >>>> queue->ring = ring; >>>> spin_lock_init(&queue->lock); >>>> >>>> INIT_LIST_HEAD(&queue->ent_avail_queue); >>>> INIT_LIST_HEAD(&queue->ent_commit_queue); >>>> + INIT_LIST_HEAD(&queue->ent_w_req_queue); >>>> + INIT_LIST_HEAD(&queue->ent_in_userspace); >>>> + INIT_LIST_HEAD(&queue->fuse_req_queue); >>>> + >>>> + queue->fpq.processing = pq; >>>> + fuse_pqueue_init(&queue->fpq); >>>> >>>> spin_lock(&fc->lock); >>>> if (ring->queues[qid]) { >>>> spin_unlock(&fc->lock); >>>> + kfree(queue->fpq.processing); >>>> kfree(queue); >>>> return ring->queues[qid]; >>>> } >>>> @@ -128,6 +158,214 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, >>>> return queue; >>>> } >>>> >>>> +/* >>>> + * Checks for errors and stores it into the request >>>> + */ >>>> +static int fuse_uring_out_header_has_err(struct fuse_out_header *oh, >>>> + struct fuse_req *req, >>>> + struct fuse_conn *fc) >>>> +{ >>>> + int err; >>>> + >>>> + err = -EINVAL; >>>> + if (oh->unique == 0) { >>>> + /* Not supportd through io-uring yet */ >>>> + pr_warn_once("notify through fuse-io-uring not supported\n"); >>>> + goto seterr; >>>> + } >>>> + >>>> + err = -EINVAL; >>>> + if (oh->error <= -ERESTARTSYS || oh->error > 0) >>>> + goto seterr; >>>> + >>>> + if (oh->error) { >>>> + err = oh->error; >>>> + goto err; >>>> + } >>>> + >>>> + err = -ENOENT; >>>> + if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) { >>>> + pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n", >>>> + req->in.h.unique, >>>> + oh->unique & ~FUSE_INT_REQ_BIT); >>>> + goto seterr; >>>> + } >>>> + >>>> + /* >>>> + * Is it an interrupt reply ID? >>>> + * XXX: Not supported through fuse-io-uring yet, it should not even >>>> + * find the request - should not happen. >>>> + */ >>>> + WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT); >>>> + >>>> + return 0; >>>> + >>>> +seterr: >>>> + oh->error = err; >>>> +err: >>>> + return err; >>>> +} >>>> + >>>> +static int fuse_uring_copy_from_ring(struct fuse_ring *ring, >>>> + struct fuse_req *req, >>>> + struct fuse_ring_ent *ent) >>>> +{ >>>> + struct fuse_copy_state cs; >>>> + struct fuse_args *args = req->args; >>>> + struct iov_iter iter; >>>> + int err, res; >>>> + struct fuse_uring_ent_in_out ring_in_out; >>>> + >>>> + res = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out, >>>> + sizeof(ring_in_out)); >>>> + if (res) >>>> + return -EFAULT; >>>> + >>>> + err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz, >>>> + &iter); >>>> + if (err) >>>> + return err; >>>> + >>>> + fuse_copy_init(&cs, 0, &iter); >>>> + cs.is_uring = 1; >>>> + cs.req = req; >>>> + >>>> + return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz); >>>> +} >>>> + >>>> + /* >>>> + * Copy data from the req to the ring buffer >>>> + */ >>>> +static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req, >>>> + struct fuse_ring_ent *ent) >>>> +{ >>>> + struct fuse_copy_state cs; >>>> + struct fuse_args *args = req->args; >>>> + struct fuse_in_arg *in_args = args->in_args; >>>> + int num_args = args->in_numargs; >>>> + int err, res; >>>> + struct iov_iter iter; >>>> + struct fuse_uring_ent_in_out ent_in_out = { >>>> + .flags = 0, >>>> + .commit_id = ent->commit_id, >>>> + }; >>>> + >>>> + if (WARN_ON(ent_in_out.commit_id == 0)) >>>> + return -EINVAL; >>>> + >>>> + err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter); >>>> + if (err) { >>>> + pr_info_ratelimited("fuse: Import of user buffer failed\n"); >>>> + return err; >>>> + } >>>> + >>>> + fuse_copy_init(&cs, 1, &iter); >>>> + cs.is_uring = 1; >>>> + cs.req = req; >>>> + >>>> + if (num_args > 0) { >>>> + /* >>>> + * Expectation is that the first argument is the per op header. >>>> + * Some op code have that as zero. >>>> + */ >>>> + if (args->in_args[0].size > 0) { >>>> + res = copy_to_user(&ent->headers->op_in, in_args->value, >>>> + in_args->size); >>>> + err = res > 0 ? -EFAULT : res; >>>> + if (err) { >>>> + pr_info_ratelimited( >>>> + "Copying the header failed.\n"); >>>> + return err; >>>> + } >>>> + } >>>> + in_args++; >>>> + num_args--; >>>> + } >>>> + >>>> + /* copy the payload */ >>>> + err = fuse_copy_args(&cs, num_args, args->in_pages, >>>> + (struct fuse_arg *)in_args, 0); >>>> + if (err) { >>>> + pr_info_ratelimited("%s fuse_copy_args failed\n", __func__); >>>> + return err; >>>> + } >>>> + >>>> + ent_in_out.payload_sz = cs.ring.copied_sz; >>>> + res = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out, >>>> + sizeof(ent_in_out)); >>>> + err = res > 0 ? -EFAULT : res; >>>> + if (err) >>>> + return err; >>>> + >>>> + return 0; >>>> +} >>>> + >>>> +static int >>>> +fuse_uring_prepare_send(struct fuse_ring_ent *ring_ent) >>>> +{ >>>> + struct fuse_ring_queue *queue = ring_ent->queue; >>>> + struct fuse_ring *ring = queue->ring; >>>> + struct fuse_req *req = ring_ent->fuse_req; >>>> + int err, res; >>>> + >>>> + err = -EIO; >>>> + if (WARN_ON(ring_ent->state != FRRS_FUSE_REQ)) { >>>> + pr_err("qid=%d ring-req=%p invalid state %d on send\n", >>>> + queue->qid, ring_ent, ring_ent->state); >>>> + err = -EIO; >>>> + goto err; >>>> + } >>>> + >>>> + /* copy the request */ >>>> + err = fuse_uring_copy_to_ring(ring, req, ring_ent); >>>> + if (unlikely(err)) { >>>> + pr_info_ratelimited("Copy to ring failed: %d\n", err); >>>> + goto err; >>>> + } >>>> + >>>> + /* copy fuse_in_header */ >>>> + res = copy_to_user(&ring_ent->headers->in_out, &req->in.h, >>>> + sizeof(req->in.h)); >>>> + err = res > 0 ? -EFAULT : res; >>>> + if (err) >>>> + goto err; >>>> + >>>> + set_bit(FR_SENT, &req->flags); >>>> + return 0; >>>> + >>>> +err: >>>> + fuse_uring_req_end(ring_ent, true, err); >>>> + return err; >>>> +} >>>> + >>>> +/* >>>> + * Write data to the ring buffer and send the request to userspace, >>>> + * userspace will read it >>>> + * This is comparable with classical read(/dev/fuse) >>>> + */ >>>> +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent, >>>> + unsigned int issue_flags) >>>> +{ >>>> + int err = 0; >>>> + struct fuse_ring_queue *queue = ring_ent->queue; >>>> + >>>> + err = fuse_uring_prepare_send(ring_ent); >>>> + if (err) >>>> + goto err; >>>> + >>>> + spin_lock(&queue->lock); >>>> + ring_ent->state = FRRS_USERSPACE; >>>> + list_move(&ring_ent->list, &queue->ent_in_userspace); >>>> + spin_unlock(&queue->lock); >>>> + >>>> + io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags); >>>> + ring_ent->cmd = NULL; >>>> + return 0; >>>> + >>>> +err: >>>> + return err; >>>> +} >>>> + >>>> /* >>>> * Make a ring entry available for fuse_req assignment >>>> */ >>>> @@ -138,6 +376,210 @@ static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent, >>>> ring_ent->state = FRRS_AVAILABLE; >>>> } >>>> >>>> +/* Used to find the request on SQE commit */ >>>> +static void fuse_uring_add_to_pq(struct fuse_ring_ent *ring_ent, >>>> + struct fuse_req *req) >>>> +{ >>>> + struct fuse_ring_queue *queue = ring_ent->queue; >>>> + struct fuse_pqueue *fpq = &queue->fpq; >>>> + unsigned int hash; >>>> + >>>> + /* commit_id is the unique id of the request */ >>>> + ring_ent->commit_id = req->in.h.unique; >>>> + >>>> + req->ring_entry = ring_ent; >>>> + hash = fuse_req_hash(ring_ent->commit_id); >>>> + list_move_tail(&req->list, &fpq->processing[hash]); >>>> +} >>>> + >>>> +/* >>>> + * Assign a fuse queue entry to the given entry >>>> + */ >>>> +static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ring_ent, >>>> + struct fuse_req *req) >>>> +{ >>>> + struct fuse_ring_queue *queue = ring_ent->queue; >>>> + >>>> + lockdep_assert_held(&queue->lock); >>>> + >>>> + if (WARN_ON_ONCE(ring_ent->state != FRRS_AVAILABLE && >>>> + ring_ent->state != FRRS_COMMIT)) { >>>> + pr_warn("%s qid=%d state=%d\n", __func__, ring_ent->queue->qid, >>>> + ring_ent->state); >>>> + } >>>> + list_del_init(&req->list); >>>> + clear_bit(FR_PENDING, &req->flags); >>>> + ring_ent->fuse_req = req; >>>> + ring_ent->state = FRRS_FUSE_REQ; >>>> + list_move(&ring_ent->list, &queue->ent_w_req_queue); >>>> + fuse_uring_add_to_pq(ring_ent, req); >>>> +} >>>> + >>>> +/* >>>> + * Release the ring entry and fetch the next fuse request if available >>>> + * >>>> + * @return true if a new request has been fetched >>>> + */ >>>> +static bool fuse_uring_ent_assign_req(struct fuse_ring_ent *ring_ent) >>>> + __must_hold(&queue->lock) >>>> +{ >>>> + struct fuse_req *req; >>>> + struct fuse_ring_queue *queue = ring_ent->queue; >>>> + struct list_head *req_queue = &queue->fuse_req_queue; >>>> + >>>> + lockdep_assert_held(&queue->lock); >>>> + >>>> + /* get and assign the next entry while it is still holding the lock */ >>>> + req = list_first_entry_or_null(req_queue, struct fuse_req, list); >>>> + if (req) { >>>> + fuse_uring_add_req_to_ring_ent(ring_ent, req); >>>> + return true; >>>> + } >>>> + >>>> + return false; >>>> +} >>>> + >>>> +/* >>>> + * Read data from the ring buffer, which user space has written to >>>> + * This is comparible with handling of classical write(/dev/fuse). >>>> + * Also make the ring request available again for new fuse requests. >>>> + */ >>>> +static void fuse_uring_commit(struct fuse_ring_ent *ring_ent, >>>> + unsigned int issue_flags) >>>> +{ >>>> + struct fuse_ring *ring = ring_ent->queue->ring; >>>> + struct fuse_conn *fc = ring->fc; >>>> + struct fuse_req *req = ring_ent->fuse_req; >>>> + ssize_t err = 0; >>>> + bool set_err = false; >>>> + >>>> + err = copy_from_user(&req->out.h, &ring_ent->headers->in_out, >>>> + sizeof(req->out.h)); >>>> + if (err) { >>>> + req->out.h.error = err; >>>> + goto out; >>>> + } >>>> + >>>> + err = fuse_uring_out_header_has_err(&req->out.h, req, fc); >>>> + if (err) { >>>> + /* req->out.h.error already set */ >>>> + goto out; >>>> + } >>>> + >>>> + err = fuse_uring_copy_from_ring(ring, req, ring_ent); >>>> + if (err) >>>> + set_err = true; >>>> + >>>> +out: >>>> + fuse_uring_req_end(ring_ent, set_err, err); >>>> +} >>>> + >>>> +/* >>>> + * Get the next fuse req and send it >>>> + */ >>>> +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent, >>>> + struct fuse_ring_queue *queue, >>>> + unsigned int issue_flags) >>>> +{ >>>> + int err; >>>> + bool has_next; >>>> + >>>> +retry: >>>> + spin_lock(&queue->lock); >>>> + fuse_uring_ent_avail(ring_ent, queue); >>>> + has_next = fuse_uring_ent_assign_req(ring_ent); >>>> + spin_unlock(&queue->lock); >>>> + >>>> + if (has_next) { >>>> + err = fuse_uring_send_next_to_ring(ring_ent, issue_flags); >>>> + if (err) >>>> + goto retry; >>>> + } >>>> +} >>>> + >>>> +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent) >>>> +{ >>>> + struct fuse_ring_queue *queue = ent->queue; >>>> + >>>> + lockdep_assert_held(&queue->lock); >>>> + >>>> + if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) >>>> + return -EIO; >>>> + >>>> + ent->state = FRRS_COMMIT; >>>> + list_move(&ent->list, &queue->ent_commit_queue); >>>> + >>>> + return 0; >>>> +} >>>> + >>>> +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */ >>>> +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, >>>> + struct fuse_conn *fc) >>>> +{ >>>> + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); >>>> + struct fuse_ring_ent *ring_ent; >>>> + int err; >>>> + struct fuse_ring *ring = fc->ring; >>>> + struct fuse_ring_queue *queue; >>>> + uint64_t commit_id = READ_ONCE(cmd_req->commit_id); >>>> + unsigned int qid = READ_ONCE(cmd_req->qid); >>>> + struct fuse_pqueue *fpq; >>>> + struct fuse_req *req; >>>> + >>>> + err = -ENOTCONN; >>>> + if (!ring) >>>> + return err; >>>> + >>>> + if (qid >= ring->nr_queues) >>>> + return -EINVAL; >>>> + >>>> + queue = ring->queues[qid]; >>>> + if (!queue) >>>> + return err; >>>> + fpq = &queue->fpq; >>>> + >>>> + spin_lock(&queue->lock); >>>> + /* Find a request based on the unique ID of the fuse request >>>> + * This should get revised, as it needs a hash calculation and list >>>> + * search. And full struct fuse_pqueue is needed (memory overhead). >>>> + * As well as the link from req to ring_ent. >>>> + */ >>> >>> imo, the hash calculation and list search seems ok. I can't think of a >>> more optimal way of doing it. Instead of using the full struct >>> fuse_pqueue, I think we could just have the "struct list_head >>> *processing" defined inside "struct fuse_ring_queue" and change >>> fuse_request_find() to take in a list_head. I don't think we need a >>> dedicated spinlock for the list either. We can just reuse queue->lock, >>> as that's (currently) always held already when the processing list is >>> accessed. >> >> >> Please see the attached patch, which uses xarray. Totally untested, though. >> I actually found an issue while writing this patch - FR_PENDING was cleared >> without holding fiq->lock, but that is important for request_wait_answer(). >> If something removes req from the list, we entirely loose the ring entry - >> can never be used anymore. Personally I think the attached patch is safer. >> >> >>> >>> >>>> + req = fuse_request_find(fpq, commit_id); >>>> + err = -ENOENT; >>>> + if (!req) { >>>> + pr_info("qid=%d commit_id %llu not found\n", queue->qid, >>>> + commit_id); >>>> + spin_unlock(&queue->lock); >>>> + return err; >>>> + } >>>> + list_del_init(&req->list); >>>> + ring_ent = req->ring_entry; >>>> + req->ring_entry = NULL; >>> >>> Do we need to set this to NULL, given that the request will be cleaned >>> up later in fuse_uring_req_end() anyways? >> >> It is not explicitly set to NULL in that function. Would you mind to keep >> it safe? >> >>> >>>> + >>>> + err = fuse_ring_ent_set_commit(ring_ent); >>>> + if (err != 0) { >>>> + pr_info_ratelimited("qid=%d commit_id %llu state %d", >>>> + queue->qid, commit_id, ring_ent->state); >>>> + spin_unlock(&queue->lock); >>>> + return err; >>>> + } >>>> + >>>> + ring_ent->cmd = cmd; >>>> + spin_unlock(&queue->lock); >>>> + >>>> + /* without the queue lock, as other locks are taken */ >>>> + fuse_uring_commit(ring_ent, issue_flags); >>>> + >>>> + /* >>>> + * Fetching the next request is absolutely required as queued >>>> + * fuse requests would otherwise not get processed - committing >>>> + * and fetching is done in one step vs legacy fuse, which has separated >>>> + * read (fetch request) and write (commit result). >>>> + */ >>>> + fuse_uring_next_fuse_req(ring_ent, queue, issue_flags); >>> >>> If there's no request ready to read next, then no request will be >>> fetched and this will return. However, as I understand it, once the >>> uring is registered, userspace should only be interacting with the >>> uring via FUSE_IO_URING_CMD_COMMIT_AND_FETCH. However for the case >>> where no request was ready to read, it seems like userspace would have >>> nothing to commit when it wants to fetch the next request? >> >> We have >> >> FUSE_IO_URING_CMD_REGISTER >> FUSE_IO_URING_CMD_COMMIT_AND_FETCH >> >> >> After _CMD_REGISTER the corresponding ring-entry is ready to get fuse >> requests and waiting. After it gets a request assigned and handles it >> by fuse server the _COMMIT_AND_FETCH scheme applies. Did you possibly >> miss that _CMD_REGISTER will already have it waiting? >> > > Sorry for the late reply. After _CMD_REGISTER and _COMMIT_AND_FETCH, > it seems possible that there is no fuse request waiting until a later > time? This is the scenario I'm envisioning: > a) uring registers successfully and fetches request through _CMD_REGISTER > b) server replies to request and fetches new request through _COMMIT_AND_FETCH > c) server replies to request, tries to fetch new request but no > request is ready, through _COMMIT_AND_FETCH > > maybe I'm missing something in my reading of the code, but how will > the server then fetch the next request once the request is ready? It > will have to commit something in order to fetch it since there's only > _COMMIT_AND_FETCH which requires a commit, no? > The right name would be '_COMMIT_AND_FETCH_OR_WAIT'. Please see fuse_uring_next_fuse_req(). retry: spin_lock(&queue->lock); fuse_uring_ent_avail(ent, queue); --> entry available has_next = fuse_uring_ent_assign_req(ent); spin_unlock(&queue->lock); if (has_next) { err = fuse_uring_send_next_to_ring(ent, issue_flags); if (err) goto retry; } If there is no available request, the io-uring cmd stored in ent->cmd is just queued/available. Btw, Miklos added it to linux-next. Cheers, Bernd
On Tue, Jan 21, 2025 at 4:18 PM Bernd Schubert <bernd@bsbernd.com> wrote: > ... > >> > >>> > >>>> + > >>>> + err = fuse_ring_ent_set_commit(ring_ent); > >>>> + if (err != 0) { > >>>> + pr_info_ratelimited("qid=%d commit_id %llu state %d", > >>>> + queue->qid, commit_id, ring_ent->state); > >>>> + spin_unlock(&queue->lock); > >>>> + return err; > >>>> + } > >>>> + > >>>> + ring_ent->cmd = cmd; > >>>> + spin_unlock(&queue->lock); > >>>> + > >>>> + /* without the queue lock, as other locks are taken */ > >>>> + fuse_uring_commit(ring_ent, issue_flags); > >>>> + > >>>> + /* > >>>> + * Fetching the next request is absolutely required as queued > >>>> + * fuse requests would otherwise not get processed - committing > >>>> + * and fetching is done in one step vs legacy fuse, which has separated > >>>> + * read (fetch request) and write (commit result). > >>>> + */ > >>>> + fuse_uring_next_fuse_req(ring_ent, queue, issue_flags); > >>> > >>> If there's no request ready to read next, then no request will be > >>> fetched and this will return. However, as I understand it, once the > >>> uring is registered, userspace should only be interacting with the > >>> uring via FUSE_IO_URING_CMD_COMMIT_AND_FETCH. However for the case > >>> where no request was ready to read, it seems like userspace would have > >>> nothing to commit when it wants to fetch the next request? > >> > >> We have > >> > >> FUSE_IO_URING_CMD_REGISTER > >> FUSE_IO_URING_CMD_COMMIT_AND_FETCH > >> > >> > >> After _CMD_REGISTER the corresponding ring-entry is ready to get fuse > >> requests and waiting. After it gets a request assigned and handles it > >> by fuse server the _COMMIT_AND_FETCH scheme applies. Did you possibly > >> miss that _CMD_REGISTER will already have it waiting? > >> > > > > Sorry for the late reply. After _CMD_REGISTER and _COMMIT_AND_FETCH, > > it seems possible that there is no fuse request waiting until a later > > time? This is the scenario I'm envisioning: > > a) uring registers successfully and fetches request through _CMD_REGISTER > > b) server replies to request and fetches new request through _COMMIT_AND_FETCH > > c) server replies to request, tries to fetch new request but no > > request is ready, through _COMMIT_AND_FETCH > > > > maybe I'm missing something in my reading of the code, but how will > > the server then fetch the next request once the request is ready? It > > will have to commit something in order to fetch it since there's only > > _COMMIT_AND_FETCH which requires a commit, no? > > > > The right name would be '_COMMIT_AND_FETCH_OR_WAIT'. Please see > fuse_uring_next_fuse_req(). > > retry: > spin_lock(&queue->lock); > fuse_uring_ent_avail(ent, queue); --> entry available > has_next = fuse_uring_ent_assign_req(ent); > spin_unlock(&queue->lock); > > if (has_next) { > err = fuse_uring_send_next_to_ring(ent, issue_flags); > if (err) > goto retry; > } > > > If there is no available request, the io-uring cmd stored in ent->cmd is > just queued/available. Could you point me to where the wait happens? I think that's the part I'm missing. In my reading of the code, if there's no available request (eg queue->fuse_req_queue is empty), then I see that has_next will return false and fuse_uring_next_fuse_req() / fuse_uring_commit_fetch() returns without having fetched anything. Where does the "if there is no available request, the io-uring cmd is just queued/available" happen? > > Btw, Miklos added it to linux-next. > Awesome, can't wait to use it. Thanks, Joanne > > Cheers, > Bernd >
On 1/22/25 01:45, Joanne Koong wrote: > On Tue, Jan 21, 2025 at 4:18 PM Bernd Schubert <bernd@bsbernd.com> wrote: >> > ... >>>> >>>>> >>>>>> + >>>>>> + err = fuse_ring_ent_set_commit(ring_ent); >>>>>> + if (err != 0) { >>>>>> + pr_info_ratelimited("qid=%d commit_id %llu state %d", >>>>>> + queue->qid, commit_id, ring_ent->state); >>>>>> + spin_unlock(&queue->lock); >>>>>> + return err; >>>>>> + } >>>>>> + >>>>>> + ring_ent->cmd = cmd; >>>>>> + spin_unlock(&queue->lock); >>>>>> + >>>>>> + /* without the queue lock, as other locks are taken */ >>>>>> + fuse_uring_commit(ring_ent, issue_flags); >>>>>> + >>>>>> + /* >>>>>> + * Fetching the next request is absolutely required as queued >>>>>> + * fuse requests would otherwise not get processed - committing >>>>>> + * and fetching is done in one step vs legacy fuse, which has separated >>>>>> + * read (fetch request) and write (commit result). >>>>>> + */ >>>>>> + fuse_uring_next_fuse_req(ring_ent, queue, issue_flags); >>>>> >>>>> If there's no request ready to read next, then no request will be >>>>> fetched and this will return. However, as I understand it, once the >>>>> uring is registered, userspace should only be interacting with the >>>>> uring via FUSE_IO_URING_CMD_COMMIT_AND_FETCH. However for the case >>>>> where no request was ready to read, it seems like userspace would have >>>>> nothing to commit when it wants to fetch the next request? >>>> >>>> We have >>>> >>>> FUSE_IO_URING_CMD_REGISTER >>>> FUSE_IO_URING_CMD_COMMIT_AND_FETCH >>>> >>>> >>>> After _CMD_REGISTER the corresponding ring-entry is ready to get fuse >>>> requests and waiting. After it gets a request assigned and handles it >>>> by fuse server the _COMMIT_AND_FETCH scheme applies. Did you possibly >>>> miss that _CMD_REGISTER will already have it waiting? >>>> >>> >>> Sorry for the late reply. After _CMD_REGISTER and _COMMIT_AND_FETCH, >>> it seems possible that there is no fuse request waiting until a later >>> time? This is the scenario I'm envisioning: >>> a) uring registers successfully and fetches request through _CMD_REGISTER >>> b) server replies to request and fetches new request through _COMMIT_AND_FETCH >>> c) server replies to request, tries to fetch new request but no >>> request is ready, through _COMMIT_AND_FETCH >>> >>> maybe I'm missing something in my reading of the code, but how will >>> the server then fetch the next request once the request is ready? It >>> will have to commit something in order to fetch it since there's only >>> _COMMIT_AND_FETCH which requires a commit, no? >>> >> >> The right name would be '_COMMIT_AND_FETCH_OR_WAIT'. Please see >> fuse_uring_next_fuse_req(). >> >> retry: >> spin_lock(&queue->lock); >> fuse_uring_ent_avail(ent, queue); --> entry available >> has_next = fuse_uring_ent_assign_req(ent); >> spin_unlock(&queue->lock); >> >> if (has_next) { >> err = fuse_uring_send_next_to_ring(ent, issue_flags); >> if (err) >> goto retry; >> } >> >> >> If there is no available request, the io-uring cmd stored in ent->cmd is >> just queued/available. > > Could you point me to where the wait happens? I think that's the part > I'm missing. In my reading of the code, if there's no available > request (eg queue->fuse_req_queue is empty), then I see that has_next > will return false and fuse_uring_next_fuse_req() / > fuse_uring_commit_fetch() returns without having fetched anything. > Where does the "if there is no available request, the io-uring cmd is > just queued/available" happen? > You need to read it the other way around, without "has_next" the avail/queued entry is not removed from the list - it is available whenever a new request comes in. Looks like we either need refactoring or at least a comment. Thanks, Bernd
On 1/22/25 01:49, Bernd Schubert wrote: > > > On 1/22/25 01:45, Joanne Koong wrote: >> On Tue, Jan 21, 2025 at 4:18 PM Bernd Schubert <bernd@bsbernd.com> wrote: >>> >> ... >>>>> >>>>>> >>>>>>> + >>>>>>> + err = fuse_ring_ent_set_commit(ring_ent); >>>>>>> + if (err != 0) { >>>>>>> + pr_info_ratelimited("qid=%d commit_id %llu state %d", >>>>>>> + queue->qid, commit_id, ring_ent->state); >>>>>>> + spin_unlock(&queue->lock); >>>>>>> + return err; >>>>>>> + } >>>>>>> + >>>>>>> + ring_ent->cmd = cmd; >>>>>>> + spin_unlock(&queue->lock); >>>>>>> + >>>>>>> + /* without the queue lock, as other locks are taken */ >>>>>>> + fuse_uring_commit(ring_ent, issue_flags); >>>>>>> + >>>>>>> + /* >>>>>>> + * Fetching the next request is absolutely required as queued >>>>>>> + * fuse requests would otherwise not get processed - committing >>>>>>> + * and fetching is done in one step vs legacy fuse, which has separated >>>>>>> + * read (fetch request) and write (commit result). >>>>>>> + */ >>>>>>> + fuse_uring_next_fuse_req(ring_ent, queue, issue_flags); >>>>>> >>>>>> If there's no request ready to read next, then no request will be >>>>>> fetched and this will return. However, as I understand it, once the >>>>>> uring is registered, userspace should only be interacting with the >>>>>> uring via FUSE_IO_URING_CMD_COMMIT_AND_FETCH. However for the case >>>>>> where no request was ready to read, it seems like userspace would have >>>>>> nothing to commit when it wants to fetch the next request? >>>>> >>>>> We have >>>>> >>>>> FUSE_IO_URING_CMD_REGISTER >>>>> FUSE_IO_URING_CMD_COMMIT_AND_FETCH >>>>> >>>>> >>>>> After _CMD_REGISTER the corresponding ring-entry is ready to get fuse >>>>> requests and waiting. After it gets a request assigned and handles it >>>>> by fuse server the _COMMIT_AND_FETCH scheme applies. Did you possibly >>>>> miss that _CMD_REGISTER will already have it waiting? >>>>> >>>> >>>> Sorry for the late reply. After _CMD_REGISTER and _COMMIT_AND_FETCH, >>>> it seems possible that there is no fuse request waiting until a later >>>> time? This is the scenario I'm envisioning: >>>> a) uring registers successfully and fetches request through _CMD_REGISTER >>>> b) server replies to request and fetches new request through _COMMIT_AND_FETCH >>>> c) server replies to request, tries to fetch new request but no >>>> request is ready, through _COMMIT_AND_FETCH >>>> >>>> maybe I'm missing something in my reading of the code, but how will >>>> the server then fetch the next request once the request is ready? It >>>> will have to commit something in order to fetch it since there's only >>>> _COMMIT_AND_FETCH which requires a commit, no? >>>> >>> >>> The right name would be '_COMMIT_AND_FETCH_OR_WAIT'. Please see >>> fuse_uring_next_fuse_req(). >>> >>> retry: >>> spin_lock(&queue->lock); >>> fuse_uring_ent_avail(ent, queue); --> entry available >>> has_next = fuse_uring_ent_assign_req(ent); >>> spin_unlock(&queue->lock); >>> >>> if (has_next) { >>> err = fuse_uring_send_next_to_ring(ent, issue_flags); >>> if (err) >>> goto retry; >>> } >>> >>> >>> If there is no available request, the io-uring cmd stored in ent->cmd is >>> just queued/available. >> >> Could you point me to where the wait happens? I think that's the part >> I'm missing. In my reading of the code, if there's no available >> request (eg queue->fuse_req_queue is empty), then I see that has_next >> will return false and fuse_uring_next_fuse_req() / >> fuse_uring_commit_fetch() returns without having fetched anything. >> Where does the "if there is no available request, the io-uring cmd is >> just queued/available" happen? >> > > You need to read it the other way around, without "has_next" the > avail/queued entry is not removed from the list - it is available > whenever a new request comes in. Looks like we either need refactoring > or at least a comment. It also not the current task operation that waits - that happens in io-uring with 'io_uring_submit_and_wait' and wait-nr > 0. In fuse is is really just _not_ running io_uring_cmd_done() that make ent->cmd to be available. Does it help? Thanks, Bernd
On Tue, Jan 21, 2025 at 4:55 PM Bernd Schubert <bernd@bsbernd.com> wrote: > > > > On 1/22/25 01:49, Bernd Schubert wrote: > > > > > > On 1/22/25 01:45, Joanne Koong wrote: > >> On Tue, Jan 21, 2025 at 4:18 PM Bernd Schubert <bernd@bsbernd.com> wrote: > >>> > >> ... > >>>>> > >>>>>> > >>>>>>> + > >>>>>>> + err = fuse_ring_ent_set_commit(ring_ent); > >>>>>>> + if (err != 0) { > >>>>>>> + pr_info_ratelimited("qid=%d commit_id %llu state %d", > >>>>>>> + queue->qid, commit_id, ring_ent->state); > >>>>>>> + spin_unlock(&queue->lock); > >>>>>>> + return err; > >>>>>>> + } > >>>>>>> + > >>>>>>> + ring_ent->cmd = cmd; > >>>>>>> + spin_unlock(&queue->lock); > >>>>>>> + > >>>>>>> + /* without the queue lock, as other locks are taken */ > >>>>>>> + fuse_uring_commit(ring_ent, issue_flags); > >>>>>>> + > >>>>>>> + /* > >>>>>>> + * Fetching the next request is absolutely required as queued > >>>>>>> + * fuse requests would otherwise not get processed - committing > >>>>>>> + * and fetching is done in one step vs legacy fuse, which has separated > >>>>>>> + * read (fetch request) and write (commit result). > >>>>>>> + */ > >>>>>>> + fuse_uring_next_fuse_req(ring_ent, queue, issue_flags); > >>>>>> > >>>>>> If there's no request ready to read next, then no request will be > >>>>>> fetched and this will return. However, as I understand it, once the > >>>>>> uring is registered, userspace should only be interacting with the > >>>>>> uring via FUSE_IO_URING_CMD_COMMIT_AND_FETCH. However for the case > >>>>>> where no request was ready to read, it seems like userspace would have > >>>>>> nothing to commit when it wants to fetch the next request? > >>>>> > >>>>> We have > >>>>> > >>>>> FUSE_IO_URING_CMD_REGISTER > >>>>> FUSE_IO_URING_CMD_COMMIT_AND_FETCH > >>>>> > >>>>> > >>>>> After _CMD_REGISTER the corresponding ring-entry is ready to get fuse > >>>>> requests and waiting. After it gets a request assigned and handles it > >>>>> by fuse server the _COMMIT_AND_FETCH scheme applies. Did you possibly > >>>>> miss that _CMD_REGISTER will already have it waiting? > >>>>> > >>>> > >>>> Sorry for the late reply. After _CMD_REGISTER and _COMMIT_AND_FETCH, > >>>> it seems possible that there is no fuse request waiting until a later > >>>> time? This is the scenario I'm envisioning: > >>>> a) uring registers successfully and fetches request through _CMD_REGISTER > >>>> b) server replies to request and fetches new request through _COMMIT_AND_FETCH > >>>> c) server replies to request, tries to fetch new request but no > >>>> request is ready, through _COMMIT_AND_FETCH > >>>> > >>>> maybe I'm missing something in my reading of the code, but how will > >>>> the server then fetch the next request once the request is ready? It > >>>> will have to commit something in order to fetch it since there's only > >>>> _COMMIT_AND_FETCH which requires a commit, no? > >>>> > >>> > >>> The right name would be '_COMMIT_AND_FETCH_OR_WAIT'. Please see > >>> fuse_uring_next_fuse_req(). > >>> > >>> retry: > >>> spin_lock(&queue->lock); > >>> fuse_uring_ent_avail(ent, queue); --> entry available > >>> has_next = fuse_uring_ent_assign_req(ent); > >>> spin_unlock(&queue->lock); > >>> > >>> if (has_next) { > >>> err = fuse_uring_send_next_to_ring(ent, issue_flags); > >>> if (err) > >>> goto retry; > >>> } > >>> > >>> > >>> If there is no available request, the io-uring cmd stored in ent->cmd is > >>> just queued/available. > >> > >> Could you point me to where the wait happens? I think that's the part > >> I'm missing. In my reading of the code, if there's no available > >> request (eg queue->fuse_req_queue is empty), then I see that has_next > >> will return false and fuse_uring_next_fuse_req() / > >> fuse_uring_commit_fetch() returns without having fetched anything. > >> Where does the "if there is no available request, the io-uring cmd is > >> just queued/available" happen? > >> > > > > You need to read it the other way around, without "has_next" the > > avail/queued entry is not removed from the list - it is available > > whenever a new request comes in. Looks like we either need refactoring > > or at least a comment. > > It also not the current task operation that waits - that happens in > io-uring with 'io_uring_submit_and_wait' and wait-nr > 0. In fuse is is > really just _not_ running io_uring_cmd_done() that make ent->cmd to be > available. Oh I see, the io_uring_cmd_done handles it internally. It's the .send_req = fuse_uring_queue_fuse_req -> fuse_uring_send_req_in_task() -> io_uring_cmd_done() that gets triggered and signals to userspace that a fetch is ready when a new request is available later on. It makes sense to me now, thanks. > > Does it help? > > > Thanks, > Bernd
diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index b44ba4033615e01041313c040035b6da6af0ee17..f44e66a7ea577390da87e9ac7d118a9416898c28 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -26,6 +26,19 @@ bool fuse_uring_enabled(void) return enable_uring; } +static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err, + int error) +{ + struct fuse_req *req = ring_ent->fuse_req; + + if (set_err) + req->out.h.error = error; + + clear_bit(FR_SENT, &req->flags); + fuse_request_end(ring_ent->fuse_req); + ring_ent->fuse_req = NULL; +} + void fuse_uring_destruct(struct fuse_conn *fc) { struct fuse_ring *ring = fc->ring; @@ -41,8 +54,11 @@ void fuse_uring_destruct(struct fuse_conn *fc) continue; WARN_ON(!list_empty(&queue->ent_avail_queue)); + WARN_ON(!list_empty(&queue->ent_w_req_queue)); WARN_ON(!list_empty(&queue->ent_commit_queue)); + WARN_ON(!list_empty(&queue->ent_in_userspace)); + kfree(queue->fpq.processing); kfree(queue); ring->queues[qid] = NULL; } @@ -101,20 +117,34 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, { struct fuse_conn *fc = ring->fc; struct fuse_ring_queue *queue; + struct list_head *pq; queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); if (!queue) return NULL; + pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL); + if (!pq) { + kfree(queue); + return NULL; + } + queue->qid = qid; queue->ring = ring; spin_lock_init(&queue->lock); INIT_LIST_HEAD(&queue->ent_avail_queue); INIT_LIST_HEAD(&queue->ent_commit_queue); + INIT_LIST_HEAD(&queue->ent_w_req_queue); + INIT_LIST_HEAD(&queue->ent_in_userspace); + INIT_LIST_HEAD(&queue->fuse_req_queue); + + queue->fpq.processing = pq; + fuse_pqueue_init(&queue->fpq); spin_lock(&fc->lock); if (ring->queues[qid]) { spin_unlock(&fc->lock); + kfree(queue->fpq.processing); kfree(queue); return ring->queues[qid]; } @@ -128,6 +158,214 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, return queue; } +/* + * Checks for errors and stores it into the request + */ +static int fuse_uring_out_header_has_err(struct fuse_out_header *oh, + struct fuse_req *req, + struct fuse_conn *fc) +{ + int err; + + err = -EINVAL; + if (oh->unique == 0) { + /* Not supportd through io-uring yet */ + pr_warn_once("notify through fuse-io-uring not supported\n"); + goto seterr; + } + + err = -EINVAL; + if (oh->error <= -ERESTARTSYS || oh->error > 0) + goto seterr; + + if (oh->error) { + err = oh->error; + goto err; + } + + err = -ENOENT; + if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) { + pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n", + req->in.h.unique, + oh->unique & ~FUSE_INT_REQ_BIT); + goto seterr; + } + + /* + * Is it an interrupt reply ID? + * XXX: Not supported through fuse-io-uring yet, it should not even + * find the request - should not happen. + */ + WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT); + + return 0; + +seterr: + oh->error = err; +err: + return err; +} + +static int fuse_uring_copy_from_ring(struct fuse_ring *ring, + struct fuse_req *req, + struct fuse_ring_ent *ent) +{ + struct fuse_copy_state cs; + struct fuse_args *args = req->args; + struct iov_iter iter; + int err, res; + struct fuse_uring_ent_in_out ring_in_out; + + res = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out, + sizeof(ring_in_out)); + if (res) + return -EFAULT; + + err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz, + &iter); + if (err) + return err; + + fuse_copy_init(&cs, 0, &iter); + cs.is_uring = 1; + cs.req = req; + + return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz); +} + + /* + * Copy data from the req to the ring buffer + */ +static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req, + struct fuse_ring_ent *ent) +{ + struct fuse_copy_state cs; + struct fuse_args *args = req->args; + struct fuse_in_arg *in_args = args->in_args; + int num_args = args->in_numargs; + int err, res; + struct iov_iter iter; + struct fuse_uring_ent_in_out ent_in_out = { + .flags = 0, + .commit_id = ent->commit_id, + }; + + if (WARN_ON(ent_in_out.commit_id == 0)) + return -EINVAL; + + err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter); + if (err) { + pr_info_ratelimited("fuse: Import of user buffer failed\n"); + return err; + } + + fuse_copy_init(&cs, 1, &iter); + cs.is_uring = 1; + cs.req = req; + + if (num_args > 0) { + /* + * Expectation is that the first argument is the per op header. + * Some op code have that as zero. + */ + if (args->in_args[0].size > 0) { + res = copy_to_user(&ent->headers->op_in, in_args->value, + in_args->size); + err = res > 0 ? -EFAULT : res; + if (err) { + pr_info_ratelimited( + "Copying the header failed.\n"); + return err; + } + } + in_args++; + num_args--; + } + + /* copy the payload */ + err = fuse_copy_args(&cs, num_args, args->in_pages, + (struct fuse_arg *)in_args, 0); + if (err) { + pr_info_ratelimited("%s fuse_copy_args failed\n", __func__); + return err; + } + + ent_in_out.payload_sz = cs.ring.copied_sz; + res = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out, + sizeof(ent_in_out)); + err = res > 0 ? -EFAULT : res; + if (err) + return err; + + return 0; +} + +static int +fuse_uring_prepare_send(struct fuse_ring_ent *ring_ent) +{ + struct fuse_ring_queue *queue = ring_ent->queue; + struct fuse_ring *ring = queue->ring; + struct fuse_req *req = ring_ent->fuse_req; + int err, res; + + err = -EIO; + if (WARN_ON(ring_ent->state != FRRS_FUSE_REQ)) { + pr_err("qid=%d ring-req=%p invalid state %d on send\n", + queue->qid, ring_ent, ring_ent->state); + err = -EIO; + goto err; + } + + /* copy the request */ + err = fuse_uring_copy_to_ring(ring, req, ring_ent); + if (unlikely(err)) { + pr_info_ratelimited("Copy to ring failed: %d\n", err); + goto err; + } + + /* copy fuse_in_header */ + res = copy_to_user(&ring_ent->headers->in_out, &req->in.h, + sizeof(req->in.h)); + err = res > 0 ? -EFAULT : res; + if (err) + goto err; + + set_bit(FR_SENT, &req->flags); + return 0; + +err: + fuse_uring_req_end(ring_ent, true, err); + return err; +} + +/* + * Write data to the ring buffer and send the request to userspace, + * userspace will read it + * This is comparable with classical read(/dev/fuse) + */ +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent, + unsigned int issue_flags) +{ + int err = 0; + struct fuse_ring_queue *queue = ring_ent->queue; + + err = fuse_uring_prepare_send(ring_ent); + if (err) + goto err; + + spin_lock(&queue->lock); + ring_ent->state = FRRS_USERSPACE; + list_move(&ring_ent->list, &queue->ent_in_userspace); + spin_unlock(&queue->lock); + + io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags); + ring_ent->cmd = NULL; + return 0; + +err: + return err; +} + /* * Make a ring entry available for fuse_req assignment */ @@ -138,6 +376,210 @@ static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent, ring_ent->state = FRRS_AVAILABLE; } +/* Used to find the request on SQE commit */ +static void fuse_uring_add_to_pq(struct fuse_ring_ent *ring_ent, + struct fuse_req *req) +{ + struct fuse_ring_queue *queue = ring_ent->queue; + struct fuse_pqueue *fpq = &queue->fpq; + unsigned int hash; + + /* commit_id is the unique id of the request */ + ring_ent->commit_id = req->in.h.unique; + + req->ring_entry = ring_ent; + hash = fuse_req_hash(ring_ent->commit_id); + list_move_tail(&req->list, &fpq->processing[hash]); +} + +/* + * Assign a fuse queue entry to the given entry + */ +static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ring_ent, + struct fuse_req *req) +{ + struct fuse_ring_queue *queue = ring_ent->queue; + + lockdep_assert_held(&queue->lock); + + if (WARN_ON_ONCE(ring_ent->state != FRRS_AVAILABLE && + ring_ent->state != FRRS_COMMIT)) { + pr_warn("%s qid=%d state=%d\n", __func__, ring_ent->queue->qid, + ring_ent->state); + } + list_del_init(&req->list); + clear_bit(FR_PENDING, &req->flags); + ring_ent->fuse_req = req; + ring_ent->state = FRRS_FUSE_REQ; + list_move(&ring_ent->list, &queue->ent_w_req_queue); + fuse_uring_add_to_pq(ring_ent, req); +} + +/* + * Release the ring entry and fetch the next fuse request if available + * + * @return true if a new request has been fetched + */ +static bool fuse_uring_ent_assign_req(struct fuse_ring_ent *ring_ent) + __must_hold(&queue->lock) +{ + struct fuse_req *req; + struct fuse_ring_queue *queue = ring_ent->queue; + struct list_head *req_queue = &queue->fuse_req_queue; + + lockdep_assert_held(&queue->lock); + + /* get and assign the next entry while it is still holding the lock */ + req = list_first_entry_or_null(req_queue, struct fuse_req, list); + if (req) { + fuse_uring_add_req_to_ring_ent(ring_ent, req); + return true; + } + + return false; +} + +/* + * Read data from the ring buffer, which user space has written to + * This is comparible with handling of classical write(/dev/fuse). + * Also make the ring request available again for new fuse requests. + */ +static void fuse_uring_commit(struct fuse_ring_ent *ring_ent, + unsigned int issue_flags) +{ + struct fuse_ring *ring = ring_ent->queue->ring; + struct fuse_conn *fc = ring->fc; + struct fuse_req *req = ring_ent->fuse_req; + ssize_t err = 0; + bool set_err = false; + + err = copy_from_user(&req->out.h, &ring_ent->headers->in_out, + sizeof(req->out.h)); + if (err) { + req->out.h.error = err; + goto out; + } + + err = fuse_uring_out_header_has_err(&req->out.h, req, fc); + if (err) { + /* req->out.h.error already set */ + goto out; + } + + err = fuse_uring_copy_from_ring(ring, req, ring_ent); + if (err) + set_err = true; + +out: + fuse_uring_req_end(ring_ent, set_err, err); +} + +/* + * Get the next fuse req and send it + */ +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent, + struct fuse_ring_queue *queue, + unsigned int issue_flags) +{ + int err; + bool has_next; + +retry: + spin_lock(&queue->lock); + fuse_uring_ent_avail(ring_ent, queue); + has_next = fuse_uring_ent_assign_req(ring_ent); + spin_unlock(&queue->lock); + + if (has_next) { + err = fuse_uring_send_next_to_ring(ring_ent, issue_flags); + if (err) + goto retry; + } +} + +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent) +{ + struct fuse_ring_queue *queue = ent->queue; + + lockdep_assert_held(&queue->lock); + + if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) + return -EIO; + + ent->state = FRRS_COMMIT; + list_move(&ent->list, &queue->ent_commit_queue); + + return 0; +} + +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */ +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, + struct fuse_conn *fc) +{ + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); + struct fuse_ring_ent *ring_ent; + int err; + struct fuse_ring *ring = fc->ring; + struct fuse_ring_queue *queue; + uint64_t commit_id = READ_ONCE(cmd_req->commit_id); + unsigned int qid = READ_ONCE(cmd_req->qid); + struct fuse_pqueue *fpq; + struct fuse_req *req; + + err = -ENOTCONN; + if (!ring) + return err; + + if (qid >= ring->nr_queues) + return -EINVAL; + + queue = ring->queues[qid]; + if (!queue) + return err; + fpq = &queue->fpq; + + spin_lock(&queue->lock); + /* Find a request based on the unique ID of the fuse request + * This should get revised, as it needs a hash calculation and list + * search. And full struct fuse_pqueue is needed (memory overhead). + * As well as the link from req to ring_ent. + */ + req = fuse_request_find(fpq, commit_id); + err = -ENOENT; + if (!req) { + pr_info("qid=%d commit_id %llu not found\n", queue->qid, + commit_id); + spin_unlock(&queue->lock); + return err; + } + list_del_init(&req->list); + ring_ent = req->ring_entry; + req->ring_entry = NULL; + + err = fuse_ring_ent_set_commit(ring_ent); + if (err != 0) { + pr_info_ratelimited("qid=%d commit_id %llu state %d", + queue->qid, commit_id, ring_ent->state); + spin_unlock(&queue->lock); + return err; + } + + ring_ent->cmd = cmd; + spin_unlock(&queue->lock); + + /* without the queue lock, as other locks are taken */ + fuse_uring_commit(ring_ent, issue_flags); + + /* + * Fetching the next request is absolutely required as queued + * fuse requests would otherwise not get processed - committing + * and fetching is done in one step vs legacy fuse, which has separated + * read (fetch request) and write (commit result). + */ + fuse_uring_next_fuse_req(ring_ent, queue, issue_flags); + return 0; +} + /* * fuse_uring_req_fetch command handling */ @@ -325,6 +767,14 @@ int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd, return err; } break; + case FUSE_IO_URING_CMD_COMMIT_AND_FETCH: + err = fuse_uring_commit_fetch(cmd, issue_flags, fc); + if (err) { + pr_info_once("FUSE_IO_URING_COMMIT_AND_FETCH failed err=%d\n", + err); + return err; + } + break; default: return -EINVAL; } diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h index 4e46dd65196d26dabc62dada33b17de9aa511c08..80f1c62d4df7f0ca77c4d5179068df6ffdbf7d85 100644 --- a/fs/fuse/dev_uring_i.h +++ b/fs/fuse/dev_uring_i.h @@ -20,6 +20,9 @@ enum fuse_ring_req_state { /* The ring entry is waiting for new fuse requests */ FRRS_AVAILABLE, + /* The ring entry got assigned a fuse req */ + FRRS_FUSE_REQ, + /* The ring entry is in or on the way to user space */ FRRS_USERSPACE, }; @@ -70,7 +73,16 @@ struct fuse_ring_queue { * entries in the process of being committed or in the process * to be sent to userspace */ + struct list_head ent_w_req_queue; struct list_head ent_commit_queue; + + /* entries in userspace */ + struct list_head ent_in_userspace; + + /* fuse requests waiting for an entry slot */ + struct list_head fuse_req_queue; + + struct fuse_pqueue fpq; }; /** diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h index e545b0864dd51e82df61cc39bdf65d3d36a418dc..e71556894bc25808581424ec7bdd4afeebc81f15 100644 --- a/fs/fuse/fuse_i.h +++ b/fs/fuse/fuse_i.h @@ -438,6 +438,10 @@ struct fuse_req { /** fuse_mount this request belongs to */ struct fuse_mount *fm; + +#ifdef CONFIG_FUSE_IO_URING + void *ring_entry; +#endif }; struct fuse_iqueue;
This adds support for fuse request completion through ring SQEs (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing the ring entry it becomes available for new fuse requests. Handling of requests through the ring (SQE/CQE handling) is complete now. Fuse request data are copied through the mmaped ring buffer, there is no support for any zero copy yet. Signed-off-by: Bernd Schubert <bschubert@ddn.com> --- fs/fuse/dev_uring.c | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++ fs/fuse/dev_uring_i.h | 12 ++ fs/fuse/fuse_i.h | 4 + 3 files changed, 466 insertions(+)