@@ -43,6 +43,8 @@
#include <asm/page.h>
#include <linux/task_work.h>
#include <linux/namei.h>
+#include <linux/pipe_fs_i.h>
+#include <linux/splice.h>
#include <uapi/linux/ublk_cmd.h>
#define UBLK_MINORS (1U << MINORBITS)
@@ -154,6 +156,8 @@ struct ublk_device {
unsigned long state;
int ub_number;
+ struct srcu_struct srcu;
+
struct mutex mutex;
spinlock_t mm_lock;
@@ -537,6 +541,9 @@ static int ublk_map_io(const struct ublk_queue *ubq, const struct request *req,
if (req_op(req) != REQ_OP_WRITE && req_op(req) != REQ_OP_FLUSH)
return rq_bytes;
+ if (ubq->flags & UBLK_F_SUPPORT_ZERO_COPY)
+ return rq_bytes;
+
if (ublk_rq_has_data(req)) {
struct ublk_map_data data = {
.ubq = ubq,
@@ -558,6 +565,9 @@ static int ublk_unmap_io(const struct ublk_queue *ubq,
{
const unsigned int rq_bytes = blk_rq_bytes(req);
+ if (ubq->flags & UBLK_F_SUPPORT_ZERO_COPY)
+ return rq_bytes;
+
if (req_op(req) == REQ_OP_READ && ublk_rq_has_data(req)) {
struct ublk_map_data data = {
.ubq = ubq,
@@ -1221,6 +1231,7 @@ static void ublk_stop_dev(struct ublk_device *ub)
del_gendisk(ub->ub_disk);
ub->dev_info.state = UBLK_S_DEV_DEAD;
ub->dev_info.ublksrv_pid = -1;
+ synchronize_srcu(&ub->srcu);
put_disk(ub->ub_disk);
ub->ub_disk = NULL;
unlock:
@@ -1355,13 +1366,155 @@ static int ublk_ch_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
return -EIOCBQUEUED;
}
+static void ublk_pipe_buf_release(struct pipe_inode_info *pipe,
+ struct pipe_buffer *buf)
+{
+}
+
+static const struct pipe_buf_operations ublk_pipe_buf_ops = {
+ .release = ublk_pipe_buf_release,
+};
+
+static inline bool ublk_check_splice_rw(const struct request *req,
+ unsigned int flags)
+{
+ flags &= (SPLICE_F_KERN_FOR_READ | SPLICE_F_KERN_FOR_WRITE);
+
+ if (req_op(req) == REQ_OP_READ && flags == SPLICE_F_KERN_FOR_READ)
+ return true;
+
+ if (req_op(req) == REQ_OP_WRITE && flags == SPLICE_F_KERN_FOR_WRITE)
+ return true;
+
+ return false;
+}
+
+static ssize_t ublk_splice_read(struct file *in, loff_t *ppos,
+ struct pipe_inode_info *pipe,
+ size_t len, unsigned int flags)
+{
+ struct ublk_device *ub = in->private_data;
+ struct req_iterator rq_iter;
+ struct bio_vec bv;
+ struct request *req;
+ struct ublk_queue *ubq;
+ u16 tag, q_id;
+ unsigned int done;
+ int ret, buf_offset, srcu_idx;
+
+ if (!ub)
+ return -EPERM;
+
+ /* only support direct pipe and we do need confirm */
+ if (pipe != current->splice_pipe || !(flags &
+ SPLICE_F_KERN_NEED_CONFIRM))
+ return -EACCES;
+
+ ret = -EINVAL;
+
+ /* protect request queue & disk removed */
+ srcu_idx = srcu_read_lock(&ub->srcu);
+
+ if (ub->dev_info.state == UBLK_S_DEV_DEAD)
+ goto exit;
+
+ tag = ublk_pos_to_tag(*ppos);
+ q_id = ublk_pos_to_hwq(*ppos);
+ buf_offset = ublk_pos_to_buf_offset(*ppos);
+
+ if (q_id >= ub->dev_info.nr_hw_queues)
+ goto exit;
+
+ ubq = ublk_get_queue(ub, q_id);
+ if (!ubq)
+ goto exit;
+
+ if (!(ubq->flags & UBLK_F_SUPPORT_ZERO_COPY))
+ goto exit;
+
+ /*
+ * So far just support splice read buffer from ubq daemon context
+ * because request may be gone in ->splice_read() if the splice
+ * is called from other context.
+ *
+ * TODO: add request protection and relax the following limit.
+ */
+ if (ubq->ubq_daemon != current)
+ goto exit;
+
+ if (tag >= ubq->q_depth)
+ goto exit;
+
+ req = blk_mq_tag_to_rq(ub->tag_set.tags[q_id], tag);
+ if (!req || !blk_mq_request_started(req))
+ goto exit;
+
+ pr_devel("%s: qid %d tag %u offset %x, request bytes %u, len %llu flags %x\n",
+ __func__, tag, q_id, buf_offset, blk_rq_bytes(req),
+ (unsigned long long)len, flags);
+
+ if (!ublk_check_splice_rw(req, flags))
+ goto exit;
+
+ if (!ublk_rq_has_data(req) || !len)
+ goto exit;
+
+ if (buf_offset + len > blk_rq_bytes(req))
+ goto exit;
+
+ done = ret = 0;
+ rq_for_each_bvec(bv, req, rq_iter) {
+ struct pipe_buffer buf = {
+ .ops = &ublk_pipe_buf_ops,
+ .flags = 0,
+ .page = bv.bv_page,
+ .offset = bv.bv_offset,
+ .len = bv.bv_len,
+ };
+
+ if (buf_offset > 0) {
+ if (buf_offset >= bv.bv_len) {
+ buf_offset -= bv.bv_len;
+ continue;
+ } else {
+ buf.offset += buf_offset;
+ buf.len -= buf_offset;
+ buf_offset = 0;
+ }
+ }
+
+ if (done + buf.len > len)
+ buf.len = len - done;
+ done += buf.len;
+
+ ret = add_to_pipe(pipe, &buf);
+ if (unlikely(ret < 0)) {
+ done -= buf.len;
+ break;
+ }
+ if (done >= len)
+ break;
+ }
+
+ if (done) {
+ *ppos += done;
+ ret = done;
+
+ pipe_ack_page_consume(pipe);
+ }
+exit:
+ srcu_read_unlock(&ub->srcu, srcu_idx);
+ return ret;
+}
+
static const struct file_operations ublk_ch_fops = {
.owner = THIS_MODULE,
.open = ublk_ch_open,
.release = ublk_ch_release,
- .llseek = no_llseek,
+ .llseek = noop_llseek,
.uring_cmd = ublk_ch_uring_cmd,
.mmap = ublk_ch_mmap,
+ .splice_read = ublk_splice_read,
};
static void ublk_deinit_queue(struct ublk_device *ub, int q_id)
@@ -1472,6 +1625,7 @@ static void ublk_cdev_rel(struct device *dev)
ublk_deinit_queues(ub);
ublk_free_dev_number(ub);
mutex_destroy(&ub->mutex);
+ cleanup_srcu_struct(&ub->srcu);
kfree(ub);
}
@@ -1600,17 +1754,18 @@ static int ublk_ctrl_start_dev(struct ublk_device *ub, struct io_uring_cmd *cmd)
set_bit(GD_SUPPRESS_PART_SCAN, &disk->state);
get_device(&ub->cdev_dev);
+ ub->dev_info.state = UBLK_S_DEV_LIVE;
ret = add_disk(disk);
if (ret) {
/*
* Has to drop the reference since ->free_disk won't be
* called in case of add_disk failure.
*/
+ ub->dev_info.state = UBLK_S_DEV_DEAD;
ublk_put_device(ub);
goto out_put_disk;
}
set_bit(UB_STATE_USED, &ub->state);
- ub->dev_info.state = UBLK_S_DEV_LIVE;
out_put_disk:
if (ret)
put_disk(disk);
@@ -1718,6 +1873,9 @@ static int ublk_ctrl_add_dev(struct io_uring_cmd *cmd)
ub = kzalloc(sizeof(*ub), GFP_KERNEL);
if (!ub)
goto out_unlock;
+ ret = init_srcu_struct(&ub->srcu);
+ if (ret)
+ goto out_free_ub;
mutex_init(&ub->mutex);
spin_lock_init(&ub->mm_lock);
INIT_WORK(&ub->quiesce_work, ublk_quiesce_work_fn);
@@ -1726,7 +1884,7 @@ static int ublk_ctrl_add_dev(struct io_uring_cmd *cmd)
ret = ublk_alloc_dev_number(ub, header->dev_id);
if (ret < 0)
- goto out_free_ub;
+ goto out_clean_srcu;
memcpy(&ub->dev_info, &info, sizeof(info));
@@ -1744,9 +1902,6 @@ static int ublk_ctrl_add_dev(struct io_uring_cmd *cmd)
if (!IS_BUILTIN(CONFIG_BLK_DEV_UBLK))
ub->dev_info.flags |= UBLK_F_URING_CMD_COMP_IN_TASK;
- /* We are not ready to support zero copy */
- ub->dev_info.flags &= ~UBLK_F_SUPPORT_ZERO_COPY;
-
ub->dev_info.nr_hw_queues = min_t(unsigned int,
ub->dev_info.nr_hw_queues, nr_cpu_ids);
ublk_align_max_io_size(ub);
@@ -1776,6 +1931,8 @@ static int ublk_ctrl_add_dev(struct io_uring_cmd *cmd)
ublk_deinit_queues(ub);
out_free_dev_number:
ublk_free_dev_number(ub);
+out_clean_srcu:
+ cleanup_srcu_struct(&ub->srcu);
out_free_ub:
mutex_destroy(&ub->mutex);
kfree(ub);
@@ -54,7 +54,36 @@
#define UBLKSRV_IO_BUF_OFFSET 0x80000000
/* tag bit is 12bit, so at most 4096 IOs for each queue */
-#define UBLK_MAX_QUEUE_DEPTH 4096
+#define UBLK_TAG_BITS 12
+#define UBLK_MAX_QUEUE_DEPTH (1U << UBLK_TAG_BITS)
+
+/* used in ->splice_read for supporting zero-copy */
+#define UBLK_BUFS_SIZE_BITS 42
+#define UBLK_BUFS_SIZE_MASK ((1ULL << UBLK_BUFS_SIZE_BITS) - 1)
+#define UBLK_BUF_SIZE_BITS (UBLK_BUFS_SIZE_BITS - UBLK_TAG_BITS)
+#define UBLK_BUF_MAX_SIZE (1ULL << UBLK_BUF_SIZE_BITS)
+
+static inline __u16 ublk_pos_to_hwq(__u64 pos)
+{
+ return pos >> UBLK_BUFS_SIZE_BITS;
+}
+
+static inline __u32 ublk_pos_to_buf_offset(__u64 pos)
+{
+ return (pos & UBLK_BUFS_SIZE_MASK) & (UBLK_BUF_MAX_SIZE - 1);
+}
+
+static inline __u16 ublk_pos_to_tag(__u64 pos)
+{
+ return (pos & UBLK_BUFS_SIZE_MASK) >> UBLK_BUF_SIZE_BITS;
+}
+
+/* offset of single buffer, which has to be < UBLK_BUX_MAX_SIZE */
+static inline __u64 ublk_pos(__u16 q_id, __u16 tag, __u32 offset)
+{
+ return (((__u64)q_id) << UBLK_BUFS_SIZE_BITS) |
+ ((((__u64)tag) << UBLK_BUF_SIZE_BITS) + offset);
+}
/*
* zero copy requires 4k block size, and can remap ublk driver's io
The initial idea of using splice for zero copy is from Miklos and Stefan. Now io_uring supports IORING_OP_READ[WRITE]_SPLICE_BUF, and ublk can pass request pages via direct/kernel private pipe to backend io_uring read/write handling code. This zero copy implementation improves sequential IO performance obviously. ublksrv code: https://github.com/ming1/ubdsrv/commits/io_uring_splice_buf So far, only loop/null target supports zero copy: ublk add -t loop -f $file -z ublk add -t none -z Signed-off-by: Ming Lei <ming.lei@redhat.com> --- drivers/block/ublk_drv.c | 169 ++++++++++++++++++++++++++++++++-- include/uapi/linux/ublk_cmd.h | 31 ++++++- 2 files changed, 193 insertions(+), 7 deletions(-)