From 24fe962c86f55347385933a1b06ca71b60854690 Mon Sep 17 00:00:00 2001 From: Bernd Schubert Date: Mon, 20 Jan 2025 02:28:59 +0100 Subject: fuse: {io-uring} Handle SQEs - register commands This adds basic support for ring SQEs (with opcode=IORING_OP_URING_CMD). For now only FUSE_IO_URING_CMD_REGISTER is handled to register queue entries. Signed-off-by: Bernd Schubert Reviewed-by: Pavel Begunkov # io_uring Reviewed-by: Luis Henriques Signed-off-by: Miklos Szeredi --- fs/fuse/dev_uring.c | 326 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 326 insertions(+) create mode 100644 fs/fuse/dev_uring.c (limited to 'fs/fuse/dev_uring.c') diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c new file mode 100644 index 000000000000..42092a234570 --- /dev/null +++ b/fs/fuse/dev_uring.c @@ -0,0 +1,326 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * FUSE: Filesystem in Userspace + * Copyright (c) 2023-2024 DataDirect Networks. + */ + +#include "fuse_i.h" +#include "dev_uring_i.h" +#include "fuse_dev_i.h" + +#include +#include + +static bool __read_mostly enable_uring; +module_param(enable_uring, bool, 0644); +MODULE_PARM_DESC(enable_uring, + "Enable userspace communication through io-uring"); + +#define FUSE_URING_IOV_SEGS 2 /* header and payload */ + + +bool fuse_uring_enabled(void) +{ + return enable_uring; +} + +void fuse_uring_destruct(struct fuse_conn *fc) +{ + struct fuse_ring *ring = fc->ring; + int qid; + + if (!ring) + return; + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = ring->queues[qid]; + + if (!queue) + continue; + + WARN_ON(!list_empty(&queue->ent_avail_queue)); + WARN_ON(!list_empty(&queue->ent_commit_queue)); + + kfree(queue); + ring->queues[qid] = NULL; + } + + kfree(ring->queues); + kfree(ring); + fc->ring = NULL; +} + +/* + * Basic ring setup for this connection based on the provided configuration + */ +static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc) +{ + struct fuse_ring *ring; + size_t nr_queues = num_possible_cpus(); + struct fuse_ring *res = NULL; + size_t max_payload_size; + + ring = kzalloc(sizeof(*fc->ring), GFP_KERNEL_ACCOUNT); + if (!ring) + return NULL; + + ring->queues = kcalloc(nr_queues, sizeof(struct fuse_ring_queue *), + GFP_KERNEL_ACCOUNT); + if (!ring->queues) + goto out_err; + + max_payload_size = max(FUSE_MIN_READ_BUFFER, fc->max_write); + max_payload_size = max(max_payload_size, fc->max_pages * PAGE_SIZE); + + spin_lock(&fc->lock); + if (fc->ring) { + /* race, another thread created the ring in the meantime */ + spin_unlock(&fc->lock); + res = fc->ring; + goto out_err; + } + + fc->ring = ring; + ring->nr_queues = nr_queues; + ring->fc = fc; + ring->max_payload_sz = max_payload_size; + + spin_unlock(&fc->lock); + return ring; + +out_err: + kfree(ring->queues); + kfree(ring); + return res; +} + +static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, + int qid) +{ + struct fuse_conn *fc = ring->fc; + struct fuse_ring_queue *queue; + + queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); + if (!queue) + return NULL; + queue->qid = qid; + queue->ring = ring; + spin_lock_init(&queue->lock); + + INIT_LIST_HEAD(&queue->ent_avail_queue); + INIT_LIST_HEAD(&queue->ent_commit_queue); + + spin_lock(&fc->lock); + if (ring->queues[qid]) { + spin_unlock(&fc->lock); + kfree(queue); + return ring->queues[qid]; + } + + /* + * write_once and lock as the caller mostly doesn't take the lock at all + */ + WRITE_ONCE(ring->queues[qid], queue); + spin_unlock(&fc->lock); + + return queue; +} + +/* + * Make a ring entry available for fuse_req assignment + */ +static void fuse_uring_ent_avail(struct fuse_ring_ent *ent, + struct fuse_ring_queue *queue) +{ + WARN_ON_ONCE(!ent->cmd); + list_move(&ent->list, &queue->ent_avail_queue); + ent->state = FRRS_AVAILABLE; +} + +/* + * fuse_uring_req_fetch command handling + */ +static void fuse_uring_do_register(struct fuse_ring_ent *ent, + struct io_uring_cmd *cmd, + unsigned int issue_flags) +{ + struct fuse_ring_queue *queue = ent->queue; + + spin_lock(&queue->lock); + ent->cmd = cmd; + fuse_uring_ent_avail(ent, queue); + spin_unlock(&queue->lock); +} + +/* + * sqe->addr is a ptr to an iovec array, iov[0] has the headers, iov[1] + * the payload + */ +static int fuse_uring_get_iovec_from_sqe(const struct io_uring_sqe *sqe, + struct iovec iov[FUSE_URING_IOV_SEGS]) +{ + struct iovec __user *uiov = u64_to_user_ptr(READ_ONCE(sqe->addr)); + struct iov_iter iter; + ssize_t ret; + + if (sqe->len != FUSE_URING_IOV_SEGS) + return -EINVAL; + + /* + * Direction for buffer access will actually be READ and WRITE, + * using write for the import should include READ access as well. + */ + ret = import_iovec(WRITE, uiov, FUSE_URING_IOV_SEGS, + FUSE_URING_IOV_SEGS, &iov, &iter); + if (ret < 0) + return ret; + + return 0; +} + +static struct fuse_ring_ent * +fuse_uring_create_ring_ent(struct io_uring_cmd *cmd, + struct fuse_ring_queue *queue) +{ + struct fuse_ring *ring = queue->ring; + struct fuse_ring_ent *ent; + size_t payload_size; + struct iovec iov[FUSE_URING_IOV_SEGS]; + int err; + + err = fuse_uring_get_iovec_from_sqe(cmd->sqe, iov); + if (err) { + pr_info_ratelimited("Failed to get iovec from sqe, err=%d\n", + err); + return ERR_PTR(err); + } + + err = -EINVAL; + if (iov[0].iov_len < sizeof(struct fuse_uring_req_header)) { + pr_info_ratelimited("Invalid header len %zu\n", iov[0].iov_len); + return ERR_PTR(err); + } + + payload_size = iov[1].iov_len; + if (payload_size < ring->max_payload_sz) { + pr_info_ratelimited("Invalid req payload len %zu\n", + payload_size); + return ERR_PTR(err); + } + + err = -ENOMEM; + ent = kzalloc(sizeof(*ent), GFP_KERNEL_ACCOUNT); + if (!ent) + return ERR_PTR(err); + + INIT_LIST_HEAD(&ent->list); + + ent->queue = queue; + ent->headers = iov[0].iov_base; + ent->payload = iov[1].iov_base; + + return ent; +} + +/* + * Register header and payload buffer with the kernel and puts the + * entry as "ready to get fuse requests" on the queue + */ +static int fuse_uring_register(struct io_uring_cmd *cmd, + unsigned int issue_flags, struct fuse_conn *fc) +{ + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); + struct fuse_ring *ring = fc->ring; + struct fuse_ring_queue *queue; + struct fuse_ring_ent *ent; + int err; + unsigned int qid = READ_ONCE(cmd_req->qid); + + err = -ENOMEM; + if (!ring) { + ring = fuse_uring_create(fc); + if (!ring) + return err; + } + + if (qid >= ring->nr_queues) { + pr_info_ratelimited("fuse: Invalid ring qid %u\n", qid); + return -EINVAL; + } + + queue = ring->queues[qid]; + if (!queue) { + queue = fuse_uring_create_queue(ring, qid); + if (!queue) + return err; + } + + /* + * The created queue above does not need to be destructed in + * case of entry errors below, will be done at ring destruction time. + */ + + ent = fuse_uring_create_ring_ent(cmd, queue); + if (IS_ERR(ent)) + return PTR_ERR(ent); + + fuse_uring_do_register(ent, cmd, issue_flags); + + return 0; +} + +/* + * Entry function from io_uring to handle the given passthrough command + * (op code IORING_OP_URING_CMD) + */ +int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd, + unsigned int issue_flags) +{ + struct fuse_dev *fud; + struct fuse_conn *fc; + u32 cmd_op = cmd->cmd_op; + int err; + + if (!enable_uring) { + pr_info_ratelimited("fuse-io-uring is disabled\n"); + return -EOPNOTSUPP; + } + + /* This extra SQE size holds struct fuse_uring_cmd_req */ + if (!(issue_flags & IO_URING_F_SQE128)) + return -EINVAL; + + fud = fuse_get_dev(cmd->file); + if (!fud) { + pr_info_ratelimited("No fuse device found\n"); + return -ENOTCONN; + } + fc = fud->fc; + + if (fc->aborted) + return -ECONNABORTED; + if (!fc->connected) + return -ENOTCONN; + + /* + * fuse_uring_register() needs the ring to be initialized, + * we need to know the max payload size + */ + if (!fc->initialized) + return -EAGAIN; + + switch (cmd_op) { + case FUSE_IO_URING_CMD_REGISTER: + err = fuse_uring_register(cmd, issue_flags, fc); + if (err) { + pr_info_once("FUSE_IO_URING_CMD_REGISTER failed err=%d\n", + err); + return err; + } + break; + default: + return -EINVAL; + } + + return -EIOCBQUEUED; +} -- cgit v1.2.3 From c090c8abae4b6b77a1bee116aa6c385456ebef96 Mon Sep 17 00:00:00 2001 From: Bernd Schubert Date: Mon, 20 Jan 2025 02:29:03 +0100 Subject: fuse: Add io-uring sqe commit and fetch support This adds support for fuse request completion through ring SQEs (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing the ring entry it becomes available for new fuse requests. Handling of requests through the ring (SQE/CQE handling) is complete now. Fuse request data are copied through the mmaped ring buffer, there is no support for any zero copy yet. Signed-off-by: Bernd Schubert Reviewed-by: Pavel Begunkov # io_uring Reviewed-by: Luis Henriques Signed-off-by: Miklos Szeredi --- fs/fuse/dev_uring.c | 441 ++++++++++++++++++++++++++++++++++++++++++++++++++ fs/fuse/dev_uring_i.h | 12 ++ fs/fuse/fuse_i.h | 4 + 3 files changed, 457 insertions(+) (limited to 'fs/fuse/dev_uring.c') diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index 42092a234570..1030c1720990 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -24,6 +24,17 @@ bool fuse_uring_enabled(void) return enable_uring; } +static void fuse_uring_req_end(struct fuse_ring_ent *ent, struct fuse_req *req, + int error) +{ + ent->fuse_req = NULL; + if (error) + req->out.h.error = error; + + clear_bit(FR_SENT, &req->flags); + fuse_request_end(req); +} + void fuse_uring_destruct(struct fuse_conn *fc) { struct fuse_ring *ring = fc->ring; @@ -39,8 +50,11 @@ void fuse_uring_destruct(struct fuse_conn *fc) continue; WARN_ON(!list_empty(&queue->ent_avail_queue)); + WARN_ON(!list_empty(&queue->ent_w_req_queue)); WARN_ON(!list_empty(&queue->ent_commit_queue)); + WARN_ON(!list_empty(&queue->ent_in_userspace)); + kfree(queue->fpq.processing); kfree(queue); ring->queues[qid] = NULL; } @@ -99,20 +113,34 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, { struct fuse_conn *fc = ring->fc; struct fuse_ring_queue *queue; + struct list_head *pq; queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); if (!queue) return NULL; + pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL); + if (!pq) { + kfree(queue); + return NULL; + } + queue->qid = qid; queue->ring = ring; spin_lock_init(&queue->lock); INIT_LIST_HEAD(&queue->ent_avail_queue); INIT_LIST_HEAD(&queue->ent_commit_queue); + INIT_LIST_HEAD(&queue->ent_w_req_queue); + INIT_LIST_HEAD(&queue->ent_in_userspace); + INIT_LIST_HEAD(&queue->fuse_req_queue); + + queue->fpq.processing = pq; + fuse_pqueue_init(&queue->fpq); spin_lock(&fc->lock); if (ring->queues[qid]) { spin_unlock(&fc->lock); + kfree(queue->fpq.processing); kfree(queue); return ring->queues[qid]; } @@ -126,6 +154,214 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, return queue; } +/* + * Checks for errors and stores it into the request + */ +static int fuse_uring_out_header_has_err(struct fuse_out_header *oh, + struct fuse_req *req, + struct fuse_conn *fc) +{ + int err; + + err = -EINVAL; + if (oh->unique == 0) { + /* Not supported through io-uring yet */ + pr_warn_once("notify through fuse-io-uring not supported\n"); + goto err; + } + + if (oh->error <= -ERESTARTSYS || oh->error > 0) + goto err; + + if (oh->error) { + err = oh->error; + goto err; + } + + err = -ENOENT; + if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) { + pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n", + req->in.h.unique, + oh->unique & ~FUSE_INT_REQ_BIT); + goto err; + } + + /* + * Is it an interrupt reply ID? + * XXX: Not supported through fuse-io-uring yet, it should not even + * find the request - should not happen. + */ + WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT); + + err = 0; +err: + return err; +} + +static int fuse_uring_copy_from_ring(struct fuse_ring *ring, + struct fuse_req *req, + struct fuse_ring_ent *ent) +{ + struct fuse_copy_state cs; + struct fuse_args *args = req->args; + struct iov_iter iter; + int err; + struct fuse_uring_ent_in_out ring_in_out; + + err = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out, + sizeof(ring_in_out)); + if (err) + return -EFAULT; + + err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz, + &iter); + if (err) + return err; + + fuse_copy_init(&cs, 0, &iter); + cs.is_uring = 1; + cs.req = req; + + return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz); +} + + /* + * Copy data from the req to the ring buffer + */ +static int fuse_uring_args_to_ring(struct fuse_ring *ring, struct fuse_req *req, + struct fuse_ring_ent *ent) +{ + struct fuse_copy_state cs; + struct fuse_args *args = req->args; + struct fuse_in_arg *in_args = args->in_args; + int num_args = args->in_numargs; + int err; + struct iov_iter iter; + struct fuse_uring_ent_in_out ent_in_out = { + .flags = 0, + .commit_id = req->in.h.unique, + }; + + err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter); + if (err) { + pr_info_ratelimited("fuse: Import of user buffer failed\n"); + return err; + } + + fuse_copy_init(&cs, 1, &iter); + cs.is_uring = 1; + cs.req = req; + + if (num_args > 0) { + /* + * Expectation is that the first argument is the per op header. + * Some op code have that as zero size. + */ + if (args->in_args[0].size > 0) { + err = copy_to_user(&ent->headers->op_in, in_args->value, + in_args->size); + if (err) { + pr_info_ratelimited( + "Copying the header failed.\n"); + return -EFAULT; + } + } + in_args++; + num_args--; + } + + /* copy the payload */ + err = fuse_copy_args(&cs, num_args, args->in_pages, + (struct fuse_arg *)in_args, 0); + if (err) { + pr_info_ratelimited("%s fuse_copy_args failed\n", __func__); + return err; + } + + ent_in_out.payload_sz = cs.ring.copied_sz; + err = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out, + sizeof(ent_in_out)); + return err ? -EFAULT : 0; +} + +static int fuse_uring_copy_to_ring(struct fuse_ring_ent *ent, + struct fuse_req *req) +{ + struct fuse_ring_queue *queue = ent->queue; + struct fuse_ring *ring = queue->ring; + int err; + + err = -EIO; + if (WARN_ON(ent->state != FRRS_FUSE_REQ)) { + pr_err("qid=%d ring-req=%p invalid state %d on send\n", + queue->qid, ent, ent->state); + return err; + } + + err = -EINVAL; + if (WARN_ON(req->in.h.unique == 0)) + return err; + + /* copy the request */ + err = fuse_uring_args_to_ring(ring, req, ent); + if (unlikely(err)) { + pr_info_ratelimited("Copy to ring failed: %d\n", err); + return err; + } + + /* copy fuse_in_header */ + err = copy_to_user(&ent->headers->in_out, &req->in.h, + sizeof(req->in.h)); + if (err) { + err = -EFAULT; + return err; + } + + return 0; +} + +static int fuse_uring_prepare_send(struct fuse_ring_ent *ent, + struct fuse_req *req) +{ + int err; + + err = fuse_uring_copy_to_ring(ent, req); + if (!err) + set_bit(FR_SENT, &req->flags); + else + fuse_uring_req_end(ent, req, err); + + return err; +} + +/* + * Write data to the ring buffer and send the request to userspace, + * userspace will read it + * This is comparable with classical read(/dev/fuse) + */ +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ent, + struct fuse_req *req, + unsigned int issue_flags) +{ + struct fuse_ring_queue *queue = ent->queue; + int err; + struct io_uring_cmd *cmd; + + err = fuse_uring_prepare_send(ent, req); + if (err) + return err; + + spin_lock(&queue->lock); + cmd = ent->cmd; + ent->cmd = NULL; + ent->state = FRRS_USERSPACE; + list_move(&ent->list, &queue->ent_in_userspace); + spin_unlock(&queue->lock); + + io_uring_cmd_done(cmd, 0, 0, issue_flags); + return 0; +} + /* * Make a ring entry available for fuse_req assignment */ @@ -137,6 +373,203 @@ static void fuse_uring_ent_avail(struct fuse_ring_ent *ent, ent->state = FRRS_AVAILABLE; } +/* Used to find the request on SQE commit */ +static void fuse_uring_add_to_pq(struct fuse_ring_ent *ent, + struct fuse_req *req) +{ + struct fuse_ring_queue *queue = ent->queue; + struct fuse_pqueue *fpq = &queue->fpq; + unsigned int hash; + + req->ring_entry = ent; + hash = fuse_req_hash(req->in.h.unique); + list_move_tail(&req->list, &fpq->processing[hash]); +} + +/* + * Assign a fuse queue entry to the given entry + */ +static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ent, + struct fuse_req *req) +{ + struct fuse_ring_queue *queue = ent->queue; + struct fuse_conn *fc = req->fm->fc; + struct fuse_iqueue *fiq = &fc->iq; + + lockdep_assert_held(&queue->lock); + + if (WARN_ON_ONCE(ent->state != FRRS_AVAILABLE && + ent->state != FRRS_COMMIT)) { + pr_warn("%s qid=%d state=%d\n", __func__, ent->queue->qid, + ent->state); + } + + spin_lock(&fiq->lock); + clear_bit(FR_PENDING, &req->flags); + spin_unlock(&fiq->lock); + ent->fuse_req = req; + ent->state = FRRS_FUSE_REQ; + list_move(&ent->list, &queue->ent_w_req_queue); + fuse_uring_add_to_pq(ent, req); +} + +/* Fetch the next fuse request if available */ +static struct fuse_req *fuse_uring_ent_assign_req(struct fuse_ring_ent *ent) + __must_hold(&queue->lock) +{ + struct fuse_req *req; + struct fuse_ring_queue *queue = ent->queue; + struct list_head *req_queue = &queue->fuse_req_queue; + + lockdep_assert_held(&queue->lock); + + /* get and assign the next entry while it is still holding the lock */ + req = list_first_entry_or_null(req_queue, struct fuse_req, list); + if (req) + fuse_uring_add_req_to_ring_ent(ent, req); + + return req; +} + +/* + * Read data from the ring buffer, which user space has written to + * This is comparible with handling of classical write(/dev/fuse). + * Also make the ring request available again for new fuse requests. + */ +static void fuse_uring_commit(struct fuse_ring_ent *ent, struct fuse_req *req, + unsigned int issue_flags) +{ + struct fuse_ring *ring = ent->queue->ring; + struct fuse_conn *fc = ring->fc; + ssize_t err = 0; + + err = copy_from_user(&req->out.h, &ent->headers->in_out, + sizeof(req->out.h)); + if (err) { + req->out.h.error = -EFAULT; + goto out; + } + + err = fuse_uring_out_header_has_err(&req->out.h, req, fc); + if (err) { + /* req->out.h.error already set */ + goto out; + } + + err = fuse_uring_copy_from_ring(ring, req, ent); +out: + fuse_uring_req_end(ent, req, err); +} + +/* + * Get the next fuse req and send it + */ +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ent, + struct fuse_ring_queue *queue, + unsigned int issue_flags) +{ + int err; + struct fuse_req *req; + +retry: + spin_lock(&queue->lock); + fuse_uring_ent_avail(ent, queue); + req = fuse_uring_ent_assign_req(ent); + spin_unlock(&queue->lock); + + if (req) { + err = fuse_uring_send_next_to_ring(ent, req, issue_flags); + if (err) + goto retry; + } +} + +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent) +{ + struct fuse_ring_queue *queue = ent->queue; + + lockdep_assert_held(&queue->lock); + + if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) + return -EIO; + + ent->state = FRRS_COMMIT; + list_move(&ent->list, &queue->ent_commit_queue); + + return 0; +} + +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */ +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, + struct fuse_conn *fc) +{ + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); + struct fuse_ring_ent *ent; + int err; + struct fuse_ring *ring = fc->ring; + struct fuse_ring_queue *queue; + uint64_t commit_id = READ_ONCE(cmd_req->commit_id); + unsigned int qid = READ_ONCE(cmd_req->qid); + struct fuse_pqueue *fpq; + struct fuse_req *req; + + err = -ENOTCONN; + if (!ring) + return err; + + if (qid >= ring->nr_queues) + return -EINVAL; + + queue = ring->queues[qid]; + if (!queue) + return err; + fpq = &queue->fpq; + + spin_lock(&queue->lock); + /* Find a request based on the unique ID of the fuse request + * This should get revised, as it needs a hash calculation and list + * search. And full struct fuse_pqueue is needed (memory overhead). + * As well as the link from req to ring_ent. + */ + req = fuse_request_find(fpq, commit_id); + err = -ENOENT; + if (!req) { + pr_info("qid=%d commit_id %llu not found\n", queue->qid, + commit_id); + spin_unlock(&queue->lock); + return err; + } + list_del_init(&req->list); + ent = req->ring_entry; + req->ring_entry = NULL; + + err = fuse_ring_ent_set_commit(ent); + if (err != 0) { + pr_info_ratelimited("qid=%d commit_id %llu state %d", + queue->qid, commit_id, ent->state); + spin_unlock(&queue->lock); + req->out.h.error = err; + clear_bit(FR_SENT, &req->flags); + fuse_request_end(req); + return err; + } + + ent->cmd = cmd; + spin_unlock(&queue->lock); + + /* without the queue lock, as other locks are taken */ + fuse_uring_commit(ent, req, issue_flags); + + /* + * Fetching the next request is absolutely required as queued + * fuse requests would otherwise not get processed - committing + * and fetching is done in one step vs legacy fuse, which has separated + * read (fetch request) and write (commit result). + */ + fuse_uring_next_fuse_req(ent, queue, issue_flags); + return 0; +} + /* * fuse_uring_req_fetch command handling */ @@ -318,6 +751,14 @@ int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd, return err; } break; + case FUSE_IO_URING_CMD_COMMIT_AND_FETCH: + err = fuse_uring_commit_fetch(cmd, issue_flags, fc); + if (err) { + pr_info_once("FUSE_IO_URING_COMMIT_AND_FETCH failed err=%d\n", + err); + return err; + } + break; default: return -EINVAL; } diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h index ae1536355b36..44bf237f0d5a 100644 --- a/fs/fuse/dev_uring_i.h +++ b/fs/fuse/dev_uring_i.h @@ -20,6 +20,9 @@ enum fuse_ring_req_state { /* The ring entry is waiting for new fuse requests */ FRRS_AVAILABLE, + /* The ring entry got assigned a fuse req */ + FRRS_FUSE_REQ, + /* The ring entry is in or on the way to user space */ FRRS_USERSPACE, }; @@ -67,7 +70,16 @@ struct fuse_ring_queue { * entries in the process of being committed or in the process * to be sent to userspace */ + struct list_head ent_w_req_queue; struct list_head ent_commit_queue; + + /* entries in userspace */ + struct list_head ent_in_userspace; + + /* fuse requests waiting for an entry slot */ + struct list_head fuse_req_queue; + + struct fuse_pqueue fpq; }; /** diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h index fb981002287d..ba6901c1bc2d 100644 --- a/fs/fuse/fuse_i.h +++ b/fs/fuse/fuse_i.h @@ -438,6 +438,10 @@ struct fuse_req { /** fuse_mount this request belongs to */ struct fuse_mount *fm; + +#ifdef CONFIG_FUSE_IO_URING + void *ring_entry; +#endif }; struct fuse_iqueue; -- cgit v1.2.3 From 4a9bfb9b6850fec0685447aed280533cf980de70 Mon Sep 17 00:00:00 2001 From: Bernd Schubert Date: Mon, 20 Jan 2025 02:29:04 +0100 Subject: fuse: {io-uring} Handle teardown of ring entries On teardown struct file_operations::uring_cmd requests need to be completed by calling io_uring_cmd_done(). Not completing all ring entries would result in busy io-uring tasks giving warning messages in intervals and unreleased struct file. Additionally the fuse connection and with that the ring can only get released when all io-uring commands are completed. Completion is done with ring entries that are a) in waiting state for new fuse requests - io_uring_cmd_done is needed b) already in userspace - io_uring_cmd_done through teardown is not needed, the request can just get released. If fuse server is still active and commits such a ring entry, fuse_uring_cmd() already checks if the connection is active and then complete the io-uring itself with -ENOTCONN. I.e. special handling is not needed. This scheme is basically represented by the ring entry state FRRS_WAIT and FRRS_USERSPACE. Entries in state: - FRRS_INIT: No action needed, do not contribute to ring->queue_refs yet - All other states: Are currently processed by other tasks, async teardown is needed and it has to wait for the two states above. It could be also solved without an async teardown task, but would require additional if conditions in hot code paths. Also in my personal opinion the code looks cleaner with async teardown. Signed-off-by: Bernd Schubert Reviewed-by: Pavel Begunkov # io_uring Reviewed-by: Luis Henriques Signed-off-by: Miklos Szeredi --- fs/fuse/dev.c | 9 +++ fs/fuse/dev_uring.c | 207 ++++++++++++++++++++++++++++++++++++++++++++++++++ fs/fuse/dev_uring_i.h | 51 +++++++++++++ 3 files changed, 267 insertions(+) (limited to 'fs/fuse/dev_uring.c') diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index aa33eba51c51..1c21e491e891 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -6,6 +6,7 @@ See the file COPYING. */ +#include "dev_uring_i.h" #include "fuse_i.h" #include "fuse_dev_i.h" @@ -2291,6 +2292,12 @@ void fuse_abort_conn(struct fuse_conn *fc) spin_unlock(&fc->lock); fuse_dev_end_requests(&to_end); + + /* + * fc->lock must not be taken to avoid conflicts with io-uring + * locks + */ + fuse_uring_abort(fc); } else { spin_unlock(&fc->lock); } @@ -2302,6 +2309,8 @@ void fuse_wait_aborted(struct fuse_conn *fc) /* matches implicit memory barrier in fuse_drop_waiting() */ smp_mb(); wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0); + + fuse_uring_wait_stopped_queues(fc); } int fuse_dev_release(struct inode *inode, struct file *file) diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index 1030c1720990..1161b9aa5e11 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -35,6 +35,37 @@ static void fuse_uring_req_end(struct fuse_ring_ent *ent, struct fuse_req *req, fuse_request_end(req); } +/* Abort all list queued request on the given ring queue */ +static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue) +{ + struct fuse_req *req; + LIST_HEAD(req_list); + + spin_lock(&queue->lock); + list_for_each_entry(req, &queue->fuse_req_queue, list) + clear_bit(FR_PENDING, &req->flags); + list_splice_init(&queue->fuse_req_queue, &req_list); + spin_unlock(&queue->lock); + + /* must not hold queue lock to avoid order issues with fi->lock */ + fuse_dev_end_requests(&req_list); +} + +void fuse_uring_abort_end_requests(struct fuse_ring *ring) +{ + int qid; + struct fuse_ring_queue *queue; + + for (qid = 0; qid < ring->nr_queues; qid++) { + queue = READ_ONCE(ring->queues[qid]); + if (!queue) + continue; + + queue->stopped = true; + fuse_uring_abort_end_queue_requests(queue); + } +} + void fuse_uring_destruct(struct fuse_conn *fc) { struct fuse_ring *ring = fc->ring; @@ -94,10 +125,13 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc) goto out_err; } + init_waitqueue_head(&ring->stop_waitq); + fc->ring = ring; ring->nr_queues = nr_queues; ring->fc = fc; ring->max_payload_sz = max_payload_size; + atomic_set(&ring->queue_refs, 0); spin_unlock(&fc->lock); return ring; @@ -154,6 +188,175 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, return queue; } +static void fuse_uring_stop_fuse_req_end(struct fuse_req *req) +{ + clear_bit(FR_SENT, &req->flags); + req->out.h.error = -ECONNABORTED; + fuse_request_end(req); +} + +/* + * Release a request/entry on connection tear down + */ +static void fuse_uring_entry_teardown(struct fuse_ring_ent *ent) +{ + struct fuse_req *req; + struct io_uring_cmd *cmd; + + struct fuse_ring_queue *queue = ent->queue; + + spin_lock(&queue->lock); + cmd = ent->cmd; + ent->cmd = NULL; + req = ent->fuse_req; + ent->fuse_req = NULL; + if (req) { + /* remove entry from queue->fpq->processing */ + list_del_init(&req->list); + } + spin_unlock(&queue->lock); + + if (cmd) + io_uring_cmd_done(cmd, -ENOTCONN, 0, IO_URING_F_UNLOCKED); + + if (req) + fuse_uring_stop_fuse_req_end(req); + + list_del_init(&ent->list); + kfree(ent); +} + +static void fuse_uring_stop_list_entries(struct list_head *head, + struct fuse_ring_queue *queue, + enum fuse_ring_req_state exp_state) +{ + struct fuse_ring *ring = queue->ring; + struct fuse_ring_ent *ent, *next; + ssize_t queue_refs = SSIZE_MAX; + LIST_HEAD(to_teardown); + + spin_lock(&queue->lock); + list_for_each_entry_safe(ent, next, head, list) { + if (ent->state != exp_state) { + pr_warn("entry teardown qid=%d state=%d expected=%d", + queue->qid, ent->state, exp_state); + continue; + } + + list_move(&ent->list, &to_teardown); + } + spin_unlock(&queue->lock); + + /* no queue lock to avoid lock order issues */ + list_for_each_entry_safe(ent, next, &to_teardown, list) { + fuse_uring_entry_teardown(ent); + queue_refs = atomic_dec_return(&ring->queue_refs); + WARN_ON_ONCE(queue_refs < 0); + } +} + +static void fuse_uring_teardown_entries(struct fuse_ring_queue *queue) +{ + fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue, + FRRS_USERSPACE); + fuse_uring_stop_list_entries(&queue->ent_avail_queue, queue, + FRRS_AVAILABLE); +} + +/* + * Log state debug info + */ +static void fuse_uring_log_ent_state(struct fuse_ring *ring) +{ + int qid; + struct fuse_ring_ent *ent; + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = ring->queues[qid]; + + if (!queue) + continue; + + spin_lock(&queue->lock); + /* + * Log entries from the intermediate queue, the other queues + * should be empty + */ + list_for_each_entry(ent, &queue->ent_w_req_queue, list) { + pr_info(" ent-req-queue ring=%p qid=%d ent=%p state=%d\n", + ring, qid, ent, ent->state); + } + list_for_each_entry(ent, &queue->ent_commit_queue, list) { + pr_info(" ent-commit-queue ring=%p qid=%d ent=%p state=%d\n", + ring, qid, ent, ent->state); + } + spin_unlock(&queue->lock); + } + ring->stop_debug_log = 1; +} + +static void fuse_uring_async_stop_queues(struct work_struct *work) +{ + int qid; + struct fuse_ring *ring = + container_of(work, struct fuse_ring, async_teardown_work.work); + + /* XXX code dup */ + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]); + + if (!queue) + continue; + + fuse_uring_teardown_entries(queue); + } + + /* + * Some ring entries might be in the middle of IO operations, + * i.e. in process to get handled by file_operations::uring_cmd + * or on the way to userspace - we could handle that with conditions in + * run time code, but easier/cleaner to have an async tear down handler + * If there are still queue references left + */ + if (atomic_read(&ring->queue_refs) > 0) { + if (time_after(jiffies, + ring->teardown_time + FUSE_URING_TEARDOWN_TIMEOUT)) + fuse_uring_log_ent_state(ring); + + schedule_delayed_work(&ring->async_teardown_work, + FUSE_URING_TEARDOWN_INTERVAL); + } else { + wake_up_all(&ring->stop_waitq); + } +} + +/* + * Stop the ring queues + */ +void fuse_uring_stop_queues(struct fuse_ring *ring) +{ + int qid; + + for (qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]); + + if (!queue) + continue; + + fuse_uring_teardown_entries(queue); + } + + if (atomic_read(&ring->queue_refs) > 0) { + ring->teardown_time = jiffies; + INIT_DELAYED_WORK(&ring->async_teardown_work, + fuse_uring_async_stop_queues); + schedule_delayed_work(&ring->async_teardown_work, + FUSE_URING_TEARDOWN_INTERVAL); + } else { + wake_up_all(&ring->stop_waitq); + } +} + /* * Checks for errors and stores it into the request */ @@ -525,6 +728,9 @@ static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, return err; fpq = &queue->fpq; + if (!READ_ONCE(fc->connected) || READ_ONCE(queue->stopped)) + return err; + spin_lock(&queue->lock); /* Find a request based on the unique ID of the fuse request * This should get revised, as it needs a hash calculation and list @@ -652,6 +858,7 @@ fuse_uring_create_ring_ent(struct io_uring_cmd *cmd, ent->headers = iov[0].iov_base; ent->payload = iov[1].iov_base; + atomic_inc(&ring->queue_refs); return ent; } diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h index 44bf237f0d5a..a4316e118cbd 100644 --- a/fs/fuse/dev_uring_i.h +++ b/fs/fuse/dev_uring_i.h @@ -11,6 +11,9 @@ #ifdef CONFIG_FUSE_IO_URING +#define FUSE_URING_TEARDOWN_TIMEOUT (5 * HZ) +#define FUSE_URING_TEARDOWN_INTERVAL (HZ/20) + enum fuse_ring_req_state { FRRS_INVALID = 0, @@ -80,6 +83,8 @@ struct fuse_ring_queue { struct list_head fuse_req_queue; struct fuse_pqueue fpq; + + bool stopped; }; /** @@ -97,12 +102,51 @@ struct fuse_ring { size_t max_payload_sz; struct fuse_ring_queue **queues; + + /* + * Log ring entry states on stop when entries cannot be released + */ + unsigned int stop_debug_log : 1; + + wait_queue_head_t stop_waitq; + + /* async tear down */ + struct delayed_work async_teardown_work; + + /* log */ + unsigned long teardown_time; + + atomic_t queue_refs; }; bool fuse_uring_enabled(void); void fuse_uring_destruct(struct fuse_conn *fc); +void fuse_uring_stop_queues(struct fuse_ring *ring); +void fuse_uring_abort_end_requests(struct fuse_ring *ring); int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags); +static inline void fuse_uring_abort(struct fuse_conn *fc) +{ + struct fuse_ring *ring = fc->ring; + + if (ring == NULL) + return; + + if (atomic_read(&ring->queue_refs) > 0) { + fuse_uring_abort_end_requests(ring); + fuse_uring_stop_queues(ring); + } +} + +static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc) +{ + struct fuse_ring *ring = fc->ring; + + if (ring) + wait_event(ring->stop_waitq, + atomic_read(&ring->queue_refs) == 0); +} + #else /* CONFIG_FUSE_IO_URING */ struct fuse_ring; @@ -120,6 +164,13 @@ static inline bool fuse_uring_enabled(void) return false; } +static inline void fuse_uring_abort(struct fuse_conn *fc) +{ +} + +static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc) +{ +} #endif /* CONFIG_FUSE_IO_URING */ #endif /* _FS_FUSE_DEV_URING_I_H */ -- cgit v1.2.3 From c2c9af9a0b13261c36909036057a116f2edb5e1a Mon Sep 17 00:00:00 2001 From: Bernd Schubert Date: Mon, 20 Jan 2025 02:29:06 +0100 Subject: fuse: Allow to queue fg requests through io-uring This prepares queueing and sending foreground requests through io-uring. Signed-off-by: Bernd Schubert Reviewed-by: Pavel Begunkov # io_uring Reviewed-by: Luis Henriques Signed-off-by: Miklos Szeredi --- fs/fuse/dev_uring.c | 180 ++++++++++++++++++++++++++++++++++++++++++++++++++ fs/fuse/dev_uring_i.h | 8 +++ 2 files changed, 188 insertions(+) (limited to 'fs/fuse/dev_uring.c') diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index 1161b9aa5e11..728000434589 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -24,6 +24,29 @@ bool fuse_uring_enabled(void) return enable_uring; } +struct fuse_uring_pdu { + struct fuse_ring_ent *ent; +}; + +static const struct fuse_iqueue_ops fuse_io_uring_ops; + +static void uring_cmd_set_ring_ent(struct io_uring_cmd *cmd, + struct fuse_ring_ent *ring_ent) +{ + struct fuse_uring_pdu *pdu = + io_uring_cmd_to_pdu(cmd, struct fuse_uring_pdu); + + pdu->ent = ring_ent; +} + +static struct fuse_ring_ent *uring_cmd_to_ring_ent(struct io_uring_cmd *cmd) +{ + struct fuse_uring_pdu *pdu = + io_uring_cmd_to_pdu(cmd, struct fuse_uring_pdu); + + return pdu->ent; +} + static void fuse_uring_req_end(struct fuse_ring_ent *ent, struct fuse_req *req, int error) { @@ -776,6 +799,31 @@ static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, return 0; } +static bool is_ring_ready(struct fuse_ring *ring, int current_qid) +{ + int qid; + struct fuse_ring_queue *queue; + bool ready = true; + + for (qid = 0; qid < ring->nr_queues && ready; qid++) { + if (current_qid == qid) + continue; + + queue = ring->queues[qid]; + if (!queue) { + ready = false; + break; + } + + spin_lock(&queue->lock); + if (list_empty(&queue->ent_avail_queue)) + ready = false; + spin_unlock(&queue->lock); + } + + return ready; +} + /* * fuse_uring_req_fetch command handling */ @@ -784,11 +832,23 @@ static void fuse_uring_do_register(struct fuse_ring_ent *ent, unsigned int issue_flags) { struct fuse_ring_queue *queue = ent->queue; + struct fuse_ring *ring = queue->ring; + struct fuse_conn *fc = ring->fc; + struct fuse_iqueue *fiq = &fc->iq; spin_lock(&queue->lock); ent->cmd = cmd; fuse_uring_ent_avail(ent, queue); spin_unlock(&queue->lock); + + if (!ring->ready) { + bool ready = is_ring_ready(ring, queue->qid); + + if (ready) { + WRITE_ONCE(fiq->ops, &fuse_io_uring_ops); + WRITE_ONCE(ring->ready, true); + } + } } /* @@ -972,3 +1032,123 @@ int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd, return -EIOCBQUEUED; } + +static void fuse_uring_send(struct fuse_ring_ent *ent, struct io_uring_cmd *cmd, + ssize_t ret, unsigned int issue_flags) +{ + struct fuse_ring_queue *queue = ent->queue; + + spin_lock(&queue->lock); + ent->state = FRRS_USERSPACE; + list_move(&ent->list, &queue->ent_in_userspace); + ent->cmd = NULL; + spin_unlock(&queue->lock); + + io_uring_cmd_done(cmd, ret, 0, issue_flags); +} + +/* + * This prepares and sends the ring request in fuse-uring task context. + * User buffers are not mapped yet - the application does not have permission + * to write to it - this has to be executed in ring task context. + */ +static void fuse_uring_send_in_task(struct io_uring_cmd *cmd, + unsigned int issue_flags) +{ + struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd); + struct fuse_ring_queue *queue = ent->queue; + int err; + + if (!(issue_flags & IO_URING_F_TASK_DEAD)) { + err = fuse_uring_prepare_send(ent, ent->fuse_req); + if (err) { + fuse_uring_next_fuse_req(ent, queue, issue_flags); + return; + } + } else { + err = -ECANCELED; + } + + fuse_uring_send(ent, cmd, err, issue_flags); +} + +static struct fuse_ring_queue *fuse_uring_task_to_queue(struct fuse_ring *ring) +{ + unsigned int qid; + struct fuse_ring_queue *queue; + + qid = task_cpu(current); + + if (WARN_ONCE(qid >= ring->nr_queues, + "Core number (%u) exceeds nr queues (%zu)\n", qid, + ring->nr_queues)) + qid = 0; + + queue = ring->queues[qid]; + WARN_ONCE(!queue, "Missing queue for qid %d\n", qid); + + return queue; +} + +static void fuse_uring_dispatch_ent(struct fuse_ring_ent *ent) +{ + struct io_uring_cmd *cmd = ent->cmd; + + uring_cmd_set_ring_ent(cmd, ent); + io_uring_cmd_complete_in_task(cmd, fuse_uring_send_in_task); +} + +/* queue a fuse request and send it if a ring entry is available */ +void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req) +{ + struct fuse_conn *fc = req->fm->fc; + struct fuse_ring *ring = fc->ring; + struct fuse_ring_queue *queue; + struct fuse_ring_ent *ent = NULL; + int err; + + err = -EINVAL; + queue = fuse_uring_task_to_queue(ring); + if (!queue) + goto err; + + if (req->in.h.opcode != FUSE_NOTIFY_REPLY) + req->in.h.unique = fuse_get_unique(fiq); + + spin_lock(&queue->lock); + err = -ENOTCONN; + if (unlikely(queue->stopped)) + goto err_unlock; + + ent = list_first_entry_or_null(&queue->ent_avail_queue, + struct fuse_ring_ent, list); + if (ent) + fuse_uring_add_req_to_ring_ent(ent, req); + else + list_add_tail(&req->list, &queue->fuse_req_queue); + spin_unlock(&queue->lock); + + if (ent) + fuse_uring_dispatch_ent(ent); + + return; + +err_unlock: + spin_unlock(&queue->lock); +err: + req->out.h.error = err; + clear_bit(FR_PENDING, &req->flags); + fuse_request_end(req); +} + +static const struct fuse_iqueue_ops fuse_io_uring_ops = { + /* should be send over io-uring as enhancement */ + .send_forget = fuse_dev_queue_forget, + + /* + * could be send over io-uring, but interrupts should be rare, + * no need to make the code complex + */ + .send_interrupt = fuse_dev_queue_interrupt, + .send_req = fuse_uring_queue_fuse_req, +}; diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h index a4316e118cbd..0517a6eafc91 100644 --- a/fs/fuse/dev_uring_i.h +++ b/fs/fuse/dev_uring_i.h @@ -117,6 +117,8 @@ struct fuse_ring { unsigned long teardown_time; atomic_t queue_refs; + + bool ready; }; bool fuse_uring_enabled(void); @@ -124,6 +126,7 @@ void fuse_uring_destruct(struct fuse_conn *fc); void fuse_uring_stop_queues(struct fuse_ring *ring); void fuse_uring_abort_end_requests(struct fuse_ring *ring); int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags); +void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req); static inline void fuse_uring_abort(struct fuse_conn *fc) { @@ -147,6 +150,11 @@ static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc) atomic_read(&ring->queue_refs) == 0); } +static inline bool fuse_uring_ready(struct fuse_conn *fc) +{ + return fc->ring && fc->ring->ready; +} + #else /* CONFIG_FUSE_IO_URING */ struct fuse_ring; -- cgit v1.2.3 From 857b0263f30eebe13ab4b6a65156a0d6c8fc2210 Mon Sep 17 00:00:00 2001 From: Bernd Schubert Date: Mon, 20 Jan 2025 02:29:07 +0100 Subject: fuse: Allow to queue bg requests through io-uring This prepares queueing and sending background requests through io-uring. Signed-off-by: Bernd Schubert Reviewed-by: Pavel Begunkov # io_uring Reviewed-by: Luis Henriques Signed-off-by: Miklos Szeredi --- fs/fuse/dev.c | 26 +++++++++++++- fs/fuse/dev_uring.c | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++ fs/fuse/dev_uring_i.h | 12 +++++++ 3 files changed, 136 insertions(+), 1 deletion(-) (limited to 'fs/fuse/dev_uring.c') diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index ecf2f805f456..1b593b23f7b8 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -568,7 +568,25 @@ ssize_t __fuse_simple_request(struct mnt_idmap *idmap, return ret; } -static bool fuse_request_queue_background(struct fuse_req *req) +#ifdef CONFIG_FUSE_IO_URING +static bool fuse_request_queue_background_uring(struct fuse_conn *fc, + struct fuse_req *req) +{ + struct fuse_iqueue *fiq = &fc->iq; + + req->in.h.unique = fuse_get_unique(fiq); + req->in.h.len = sizeof(struct fuse_in_header) + + fuse_len_args(req->args->in_numargs, + (struct fuse_arg *) req->args->in_args); + + return fuse_uring_queue_bq_req(req); +} +#endif + +/* + * @return true if queued + */ +static int fuse_request_queue_background(struct fuse_req *req) { struct fuse_mount *fm = req->fm; struct fuse_conn *fc = fm->fc; @@ -580,6 +598,12 @@ static bool fuse_request_queue_background(struct fuse_req *req) atomic_inc(&fc->num_waiting); } __set_bit(FR_ISREPLY, &req->flags); + +#ifdef CONFIG_FUSE_IO_URING + if (fuse_uring_ready(fc)) + return fuse_request_queue_background_uring(fc, req); +#endif + spin_lock(&fc->bg_lock); if (likely(fc->connected)) { fc->num_background++; diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index 728000434589..27bc103c17c8 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -47,10 +47,53 @@ static struct fuse_ring_ent *uring_cmd_to_ring_ent(struct io_uring_cmd *cmd) return pdu->ent; } +static void fuse_uring_flush_bg(struct fuse_ring_queue *queue) +{ + struct fuse_ring *ring = queue->ring; + struct fuse_conn *fc = ring->fc; + + lockdep_assert_held(&queue->lock); + lockdep_assert_held(&fc->bg_lock); + + /* + * Allow one bg request per queue, ignoring global fc limits. + * This prevents a single queue from consuming all resources and + * eliminates the need for remote queue wake-ups when global + * limits are met but this queue has no more waiting requests. + */ + while ((fc->active_background < fc->max_background || + !queue->active_background) && + (!list_empty(&queue->fuse_req_bg_queue))) { + struct fuse_req *req; + + req = list_first_entry(&queue->fuse_req_bg_queue, + struct fuse_req, list); + fc->active_background++; + queue->active_background++; + + list_move_tail(&req->list, &queue->fuse_req_queue); + } +} + static void fuse_uring_req_end(struct fuse_ring_ent *ent, struct fuse_req *req, int error) { + struct fuse_ring_queue *queue = ent->queue; + struct fuse_ring *ring = queue->ring; + struct fuse_conn *fc = ring->fc; + + lockdep_assert_not_held(&queue->lock); + spin_lock(&queue->lock); ent->fuse_req = NULL; + if (test_bit(FR_BACKGROUND, &req->flags)) { + queue->active_background--; + spin_lock(&fc->bg_lock); + fuse_uring_flush_bg(queue); + spin_unlock(&fc->bg_lock); + } + + spin_unlock(&queue->lock); + if (error) req->out.h.error = error; @@ -78,6 +121,7 @@ void fuse_uring_abort_end_requests(struct fuse_ring *ring) { int qid; struct fuse_ring_queue *queue; + struct fuse_conn *fc = ring->fc; for (qid = 0; qid < ring->nr_queues; qid++) { queue = READ_ONCE(ring->queues[qid]); @@ -85,6 +129,13 @@ void fuse_uring_abort_end_requests(struct fuse_ring *ring) continue; queue->stopped = true; + + WARN_ON_ONCE(ring->fc->max_background != UINT_MAX); + spin_lock(&queue->lock); + spin_lock(&fc->bg_lock); + fuse_uring_flush_bg(queue); + spin_unlock(&fc->bg_lock); + spin_unlock(&queue->lock); fuse_uring_abort_end_queue_requests(queue); } } @@ -190,6 +241,7 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, INIT_LIST_HEAD(&queue->ent_w_req_queue); INIT_LIST_HEAD(&queue->ent_in_userspace); INIT_LIST_HEAD(&queue->fuse_req_queue); + INIT_LIST_HEAD(&queue->fuse_req_bg_queue); queue->fpq.processing = pq; fuse_pqueue_init(&queue->fpq); @@ -1141,6 +1193,53 @@ err: fuse_request_end(req); } +bool fuse_uring_queue_bq_req(struct fuse_req *req) +{ + struct fuse_conn *fc = req->fm->fc; + struct fuse_ring *ring = fc->ring; + struct fuse_ring_queue *queue; + struct fuse_ring_ent *ent = NULL; + + queue = fuse_uring_task_to_queue(ring); + if (!queue) + return false; + + spin_lock(&queue->lock); + if (unlikely(queue->stopped)) { + spin_unlock(&queue->lock); + return false; + } + + list_add_tail(&req->list, &queue->fuse_req_bg_queue); + + ent = list_first_entry_or_null(&queue->ent_avail_queue, + struct fuse_ring_ent, list); + spin_lock(&fc->bg_lock); + fc->num_background++; + if (fc->num_background == fc->max_background) + fc->blocked = 1; + fuse_uring_flush_bg(queue); + spin_unlock(&fc->bg_lock); + + /* + * Due to bg_queue flush limits there might be other bg requests + * in the queue that need to be handled first. Or no further req + * might be available. + */ + req = list_first_entry_or_null(&queue->fuse_req_queue, struct fuse_req, + list); + if (ent && req) { + fuse_uring_add_req_to_ring_ent(ent, req); + spin_unlock(&queue->lock); + + fuse_uring_dispatch_ent(ent); + } else { + spin_unlock(&queue->lock); + } + + return true; +} + static const struct fuse_iqueue_ops fuse_io_uring_ops = { /* should be send over io-uring as enhancement */ .send_forget = fuse_dev_queue_forget, diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h index 0517a6eafc91..0182be61778b 100644 --- a/fs/fuse/dev_uring_i.h +++ b/fs/fuse/dev_uring_i.h @@ -82,8 +82,13 @@ struct fuse_ring_queue { /* fuse requests waiting for an entry slot */ struct list_head fuse_req_queue; + /* background fuse requests */ + struct list_head fuse_req_bg_queue; + struct fuse_pqueue fpq; + unsigned int active_background; + bool stopped; }; @@ -127,6 +132,7 @@ void fuse_uring_stop_queues(struct fuse_ring *ring); void fuse_uring_abort_end_requests(struct fuse_ring *ring); int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags); void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req); +bool fuse_uring_queue_bq_req(struct fuse_req *req); static inline void fuse_uring_abort(struct fuse_conn *fc) { @@ -179,6 +185,12 @@ static inline void fuse_uring_abort(struct fuse_conn *fc) static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc) { } + +static inline bool fuse_uring_ready(struct fuse_conn *fc) +{ + return false; +} + #endif /* CONFIG_FUSE_IO_URING */ #endif /* _FS_FUSE_DEV_URING_I_H */ -- cgit v1.2.3 From b6236c8407cba5d7a108facb1bcfab24994d3814 Mon Sep 17 00:00:00 2001 From: Bernd Schubert Date: Mon, 20 Jan 2025 02:29:08 +0100 Subject: fuse: {io-uring} Prevent mount point hang on fuse-server termination When the fuse-server terminates while the fuse-client or kernel still has queued URING_CMDs, these commands retain references to the struct file used by the fuse connection. This prevents fuse_dev_release() from being invoked, resulting in a hung mount point. This patch addresses the issue by making queued URING_CMDs cancelable, allowing fuse_dev_release() to proceed as expected and preventing the mount point from hanging. Signed-off-by: Bernd Schubert Reviewed-by: Pavel Begunkov # io_uring Reviewed-by: Luis Henriques Signed-off-by: Miklos Szeredi --- fs/fuse/dev_uring.c | 69 ++++++++++++++++++++++++++++++++++++++++++++++++--- fs/fuse/dev_uring_i.h | 9 +++++++ 2 files changed, 75 insertions(+), 3 deletions(-) (limited to 'fs/fuse/dev_uring.c') diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index 27bc103c17c8..fa0451176385 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -150,6 +150,7 @@ void fuse_uring_destruct(struct fuse_conn *fc) for (qid = 0; qid < ring->nr_queues; qid++) { struct fuse_ring_queue *queue = ring->queues[qid]; + struct fuse_ring_ent *ent, *next; if (!queue) continue; @@ -159,6 +160,12 @@ void fuse_uring_destruct(struct fuse_conn *fc) WARN_ON(!list_empty(&queue->ent_commit_queue)); WARN_ON(!list_empty(&queue->ent_in_userspace)); + list_for_each_entry_safe(ent, next, &queue->ent_released, + list) { + list_del_init(&ent->list); + kfree(ent); + } + kfree(queue->fpq.processing); kfree(queue); ring->queues[qid] = NULL; @@ -242,6 +249,7 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, INIT_LIST_HEAD(&queue->ent_in_userspace); INIT_LIST_HEAD(&queue->fuse_req_queue); INIT_LIST_HEAD(&queue->fuse_req_bg_queue); + INIT_LIST_HEAD(&queue->ent_released); queue->fpq.processing = pq; fuse_pqueue_init(&queue->fpq); @@ -289,6 +297,15 @@ static void fuse_uring_entry_teardown(struct fuse_ring_ent *ent) /* remove entry from queue->fpq->processing */ list_del_init(&req->list); } + + /* + * The entry must not be freed immediately, due to access of direct + * pointer access of entries through IO_URING_F_CANCEL - there is a risk + * of race between daemon termination (which triggers IO_URING_F_CANCEL + * and accesses entries without checking the list state first + */ + list_move(&ent->list, &queue->ent_released); + ent->state = FRRS_RELEASED; spin_unlock(&queue->lock); if (cmd) @@ -296,9 +313,6 @@ static void fuse_uring_entry_teardown(struct fuse_ring_ent *ent) if (req) fuse_uring_stop_fuse_req_end(req); - - list_del_init(&ent->list); - kfree(ent); } static void fuse_uring_stop_list_entries(struct list_head *head, @@ -318,6 +332,7 @@ static void fuse_uring_stop_list_entries(struct list_head *head, continue; } + ent->state = FRRS_TEARDOWN; list_move(&ent->list, &to_teardown); } spin_unlock(&queue->lock); @@ -432,6 +447,46 @@ void fuse_uring_stop_queues(struct fuse_ring *ring) } } +/* + * Handle IO_URING_F_CANCEL, typically should come on daemon termination. + * + * Releasing the last entry should trigger fuse_dev_release() if + * the daemon was terminated + */ +static void fuse_uring_cancel(struct io_uring_cmd *cmd, + unsigned int issue_flags) +{ + struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd); + struct fuse_ring_queue *queue; + bool need_cmd_done = false; + + /* + * direct access on ent - it must not be destructed as long as + * IO_URING_F_CANCEL might come up + */ + queue = ent->queue; + spin_lock(&queue->lock); + if (ent->state == FRRS_AVAILABLE) { + ent->state = FRRS_USERSPACE; + list_move(&ent->list, &queue->ent_in_userspace); + need_cmd_done = true; + ent->cmd = NULL; + } + spin_unlock(&queue->lock); + + if (need_cmd_done) { + /* no queue lock to avoid lock order issues */ + io_uring_cmd_done(cmd, -ENOTCONN, 0, issue_flags); + } +} + +static void fuse_uring_prepare_cancel(struct io_uring_cmd *cmd, int issue_flags, + struct fuse_ring_ent *ring_ent) +{ + uring_cmd_set_ring_ent(cmd, ring_ent); + io_uring_cmd_mark_cancelable(cmd, issue_flags); +} + /* * Checks for errors and stores it into the request */ @@ -839,6 +894,7 @@ static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, spin_unlock(&queue->lock); /* without the queue lock, as other locks are taken */ + fuse_uring_prepare_cancel(cmd, issue_flags, ent); fuse_uring_commit(ent, req, issue_flags); /* @@ -888,6 +944,8 @@ static void fuse_uring_do_register(struct fuse_ring_ent *ent, struct fuse_conn *fc = ring->fc; struct fuse_iqueue *fiq = &fc->iq; + fuse_uring_prepare_cancel(cmd, issue_flags, ent); + spin_lock(&queue->lock); ent->cmd = cmd; fuse_uring_ent_avail(ent, queue); @@ -1038,6 +1096,11 @@ int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd, return -EOPNOTSUPP; } + if ((unlikely(issue_flags & IO_URING_F_CANCEL))) { + fuse_uring_cancel(cmd, issue_flags); + return 0; + } + /* This extra SQE size holds struct fuse_uring_cmd_req */ if (!(issue_flags & IO_URING_F_SQE128)) return -EINVAL; diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h index 0182be61778b..2102b3d0c1ae 100644 --- a/fs/fuse/dev_uring_i.h +++ b/fs/fuse/dev_uring_i.h @@ -28,6 +28,12 @@ enum fuse_ring_req_state { /* The ring entry is in or on the way to user space */ FRRS_USERSPACE, + + /* The ring entry is in teardown */ + FRRS_TEARDOWN, + + /* The ring entry is released, but not freed yet */ + FRRS_RELEASED, }; /** A fuse ring entry, part of the ring queue */ @@ -79,6 +85,9 @@ struct fuse_ring_queue { /* entries in userspace */ struct list_head ent_in_userspace; + /* entries that are released */ + struct list_head ent_released; + /* fuse requests waiting for an entry slot */ struct list_head fuse_req_queue; -- cgit v1.2.3 From 3393ff964e0fa5def66570c54a4612bf9df06b76 Mon Sep 17 00:00:00 2001 From: Bernd Schubert Date: Mon, 20 Jan 2025 02:29:09 +0100 Subject: fuse: block request allocation until io-uring init is complete Avoid races and block request allocation until io-uring queues are ready. This is a especially important for background requests, as bg request completion might cause lock order inversion of the typical queue->lock and then fc->bg_lock fuse_request_end spin_lock(&fc->bg_lock); flush_bg_queue fuse_send_one fuse_uring_queue_fuse_req spin_lock(&queue->lock); Signed-off-by: Bernd Schubert Reviewed-by: Luis Henriques Signed-off-by: Miklos Szeredi --- fs/fuse/dev.c | 3 ++- fs/fuse/dev_uring.c | 3 +++ fs/fuse/fuse_i.h | 3 +++ fs/fuse/inode.c | 2 ++ 4 files changed, 10 insertions(+), 1 deletion(-) (limited to 'fs/fuse/dev_uring.c') diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index 1b593b23f7b8..f002e8a096f9 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -76,7 +76,8 @@ void fuse_set_initialized(struct fuse_conn *fc) static bool fuse_block_alloc(struct fuse_conn *fc, bool for_background) { - return !fc->initialized || (for_background && fc->blocked); + return !fc->initialized || (for_background && fc->blocked) || + (fc->io_uring && !fuse_uring_ready(fc)); } static void fuse_drop_waiting(struct fuse_conn *fc) diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index fa0451176385..ea197ccd4c51 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -957,6 +957,7 @@ static void fuse_uring_do_register(struct fuse_ring_ent *ent, if (ready) { WRITE_ONCE(fiq->ops, &fuse_io_uring_ops); WRITE_ONCE(ring->ready, true); + wake_up_all(&fc->blocked_waitq); } } } @@ -1130,6 +1131,8 @@ int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd, if (err) { pr_info_once("FUSE_IO_URING_CMD_REGISTER failed err=%d\n", err); + fc->io_uring = 0; + wake_up_all(&fc->blocked_waitq); return err; } break; diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h index ba6901c1bc2d..fee96fe7887b 100644 --- a/fs/fuse/fuse_i.h +++ b/fs/fuse/fuse_i.h @@ -867,6 +867,9 @@ struct fuse_conn { /* Use pages instead of pointer for kernel I/O */ unsigned int use_pages_for_kvec_io:1; + /* Use io_uring for communication */ + unsigned int io_uring; + /** Maximum stack depth for passthrough backing files */ int max_stack_depth; diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c index 328797b9aac9..e9db2cb8c150 100644 --- a/fs/fuse/inode.c +++ b/fs/fuse/inode.c @@ -1390,6 +1390,8 @@ static void process_init_reply(struct fuse_mount *fm, struct fuse_args *args, else ok = false; } + if (flags & FUSE_OVER_IO_URING && fuse_uring_enabled()) + fc->io_uring = 1; } else { ra_pages = fc->max_read / PAGE_SIZE; fc->no_lock = 1; -- cgit v1.2.3 From 786412a73e7ee5b00ef3437bbf2f3a250759b2ae Mon Sep 17 00:00:00 2001 From: Bernd Schubert Date: Mon, 20 Jan 2025 02:29:10 +0100 Subject: fuse: enable fuse-over-io-uring All required parts are handled now, fuse-io-uring can be enabled. Signed-off-by: Bernd Schubert Reviewed-by: Pavel Begunkov # io_uring Reviewed-by: Luis Henriques Signed-off-by: Miklos Szeredi --- fs/fuse/dev.c | 3 +++ fs/fuse/dev_uring.c | 3 +-- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'fs/fuse/dev_uring.c') diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index f002e8a096f9..5b5f789b37eb 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -2493,6 +2493,9 @@ const struct file_operations fuse_dev_operations = { .fasync = fuse_dev_fasync, .unlocked_ioctl = fuse_dev_ioctl, .compat_ioctl = compat_ptr_ioctl, +#ifdef CONFIG_FUSE_IO_URING + .uring_cmd = fuse_uring_cmd, +#endif }; EXPORT_SYMBOL_GPL(fuse_dev_operations); diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index ea197ccd4c51..3bdc75518e5b 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -1084,8 +1084,7 @@ static int fuse_uring_register(struct io_uring_cmd *cmd, * Entry function from io_uring to handle the given passthrough command * (op code IORING_OP_URING_CMD) */ -int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd, - unsigned int issue_flags) +int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags) { struct fuse_dev *fud; struct fuse_conn *fc; -- cgit v1.2.3 From 2d4fde59fd502a65c1698b61ad4d0f10a9ab665a Mon Sep 17 00:00:00 2001 From: Bernd Schubert Date: Thu, 23 Jan 2025 17:55:32 +0100 Subject: fuse: prevent disabling io-uring on active connections The enable_uring module parameter allows administrators to enable/disable io-uring support for FUSE at runtime. However, disabling io-uring while connections already have it enabled can lead to an inconsistent state. Fix this by keeping io-uring enabled on connections that were already using it, even if the module parameter is later disabled. This ensures active FUSE mounts continue to function correctly. Signed-off-by: Bernd Schubert Reviewed-by: Luis Henriques Signed-off-by: Miklos Szeredi --- fs/fuse/dev_uring.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'fs/fuse/dev_uring.c') diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index 3bdc75518e5b..ebd2931b4f2a 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -1091,11 +1091,6 @@ int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags) u32 cmd_op = cmd->cmd_op; int err; - if (!enable_uring) { - pr_info_ratelimited("fuse-io-uring is disabled\n"); - return -EOPNOTSUPP; - } - if ((unlikely(issue_flags & IO_URING_F_CANCEL))) { fuse_uring_cancel(cmd, issue_flags); return 0; @@ -1112,6 +1107,12 @@ int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags) } fc = fud->fc; + /* Once a connection has io-uring enabled on it, it can't be disabled */ + if (!enable_uring && !fc->io_uring) { + pr_info_ratelimited("fuse-io-uring is disabled\n"); + return -EOPNOTSUPP; + } + if (fc->aborted) return -ECONNABORTED; if (!fc->connected) -- cgit v1.2.3