diff mbox series

[RFC,11/17] zuf: Write/Read implementation

Message ID 20190219115136.29952-12-boaz@plexistor.com (mailing list archive)
State New, archived
Headers show
Series zuf: ZUFS Zero-copy User-mode FileSystem | expand

Commit Message

Boaz Harrosh Feb. 19, 2019, 11:51 a.m. UTC
From: Boaz Harrosh <boazh@netapp.com>

The dispatch to the server can operate on buffers up to
4 Mega bytes. Any bigger operations are split up and dispatched
at this size.

Also if a multy-segments aio is used each segment is dispatched
on its own. (TODO this can be easily fixed with sg operations)

On write if any mmaped buffers changed, for example new
allocated holes do to this write or a previous mmaped COW
was written. A range subset of the written range can be returned
for the Kernel to call mapping_unmap on.

rw.c here also includes some operations for mmap. Will be used
in next patch.

Signed-off-by: Boaz Harrosh <boazh@netapp.com>
---
 fs/zuf/_extern.h  |  13 +-
 fs/zuf/_pr.h      |   1 +
 fs/zuf/rw.c       | 562 +++++++++++++++++++++++++++++++++++++++++++++-
 fs/zuf/zuf-core.c |  79 ++++++-
 fs/zuf/zus_api.h  | 191 ++++++++++++++++
 5 files changed, 841 insertions(+), 5 deletions(-)
diff mbox series

Patch

diff --git a/fs/zuf/_extern.h b/fs/zuf/_extern.h
index 391484b0e125..2905fe20cec7 100644
--- a/fs/zuf/_extern.h
+++ b/fs/zuf/_extern.h
@@ -46,13 +46,21 @@  bool zuf_dir_emit(struct super_block *sb, struct dir_context *ctx,
 uint zuf_prepare_symname(struct zufs_ioc_new_inode *ioc_new_inode,
 			const char *symname, ulong len, struct page *pages[2]);
 
-
 /* rw.c */
+int _zuf_get_put_block(struct zuf_sb_info *sbi, struct zuf_inode_info *zii,
+			  enum e_zufs_operation op, int rw, ulong index,
+			  struct zufs_ioc_IO *get_block);
+int zuf_rw_read_page(struct zuf_sb_info *sbi, struct inode *inode,
+		     struct page *page, u64 filepos);
 ssize_t zuf_rw_read_iter(struct super_block *sb, struct inode *inode,
 			 struct kiocb *kiocb, struct iov_iter *ii);
 ssize_t zuf_rw_write_iter(struct super_block *sb, struct inode *inode,
 			  struct kiocb *kiocb, struct iov_iter *ii);
 int zuf_trim_edge(struct inode *inode, ulong filepos, uint len);
+int zuf_iom_execute_sync(struct super_block *sb, struct inode *inode,
+			 __u64 *iom_e, uint iom_n);
+int zuf_iom_execute_async(struct super_block *sb, struct zus_iomap_build *iomb,
+			 __u64 *iom_e_user, uint iom_n);
 
 /* super.c */
 int zuf_init_inodecache(void);
@@ -61,6 +69,9 @@  void zuf_destroy_inodecache(void);
 struct dentry *zuf_mount(struct file_system_type *fs_type, int flags,
 			 const char *dev_name, void *data);
 
+struct super_block *zuf_sb_from_id(struct zuf_root_info *zri, __u64 sb_id,
+				   struct zus_sb_info *zus_sbi);
+
 /* zuf-core.c */
 int zufc_zts_init(struct zuf_root_info *zri); /* Some private types in core */
 void zufc_zts_fini(struct zuf_root_info *zri);
diff --git a/fs/zuf/_pr.h b/fs/zuf/_pr.h
index 85641b6f1478..151e127f513b 100644
--- a/fs/zuf/_pr.h
+++ b/fs/zuf/_pr.h
@@ -40,6 +40,7 @@ 
 /* ~~~ channel prints ~~~ */
 #define zuf_dbg_err(s, args ...)	zuf_chan_debug("error", s, ##args)
 #define zuf_dbg_vfs(s, args ...)	zuf_chan_debug("vfs  ", s, ##args)
+#define zuf_dbg_rw(s, args ...)		zuf_chan_debug("rw   ", s, ##args)
 #define zuf_dbg_t1(s, args ...)		zuf_chan_debug("t1   ", s, ##args)
 #define zuf_dbg_t2(s, args ...)		zuf_chan_debug("t2dbg", s, ##args)
 #define zuf_dbg_t2_rw(s, args ...)	zuf_chan_debug("t2grw", s, ##args)
diff --git a/fs/zuf/rw.c b/fs/zuf/rw.c
index 335bfd256499..400d24ea7914 100644
--- a/fs/zuf/rw.c
+++ b/fs/zuf/rw.c
@@ -18,20 +18,576 @@ 
 #include "zuf.h"
 #include "t2.h"
 
+#define	rand_tag(kiocb)	(kiocb->ki_filp->f_mode & FMODE_RANDOM)
+#define	kiocb_ra(kiocb)	(&kiocb->ki_filp->f_ra)
+
+static int _ioc_bounds_check(struct zufs_iomap *ziom,
+			     struct zufs_iomap *user_ziom, void *ziom_end)
+{
+	size_t iom_max_bytes = ziom_end - (void *)&user_ziom->iom_e;
+
+	if (unlikely((iom_max_bytes / sizeof(__u64) < ziom->iom_max))) {
+		zuf_err("kernel-buff-size(0x%zx) < ziom->iom_max(0x%x)\n",
+			(iom_max_bytes / sizeof(__u64)), ziom->iom_max);
+		return -EINVAL;
+	}
+
+	if (unlikely(ziom->iom_max < ziom->iom_n)) {
+		zuf_err("ziom->iom_max(0x%x) < ziom->iom_n(0x%x)\n",
+			ziom->iom_max, ziom->iom_n);
+		return -EINVAL;
+	}
+
+	return 0;
+}
+
+static int rw_overflow_handler(struct zuf_dispatch_op *zdo, void *arg,
+			       ulong max_bytes)
+{
+	struct zufs_ioc_IO *io = container_of(zdo->hdr, typeof(*io), hdr);
+	struct zufs_ioc_IO *io_user = arg;
+	ulong flags = io->flags;
+	int err;
+
+	*io = *io_user;
+
+	/* FIXME: Why do we need to restore the original flags? */
+	io_user->flags = flags;
+
+	err = _ioc_bounds_check(&io->ziom, &io_user->ziom, arg + max_bytes);
+	if (unlikely(err))
+		return err;
+
+	if ((io->hdr.err == -EZUFS_RETRY) &&
+	    io->ziom.iom_n && _zufs_iom_pop(io->iom_e)) {
+
+		zuf_dbg_rw(
+			"[%s]zuf_iom_execute_sync(%d) max=0x%lx iom_e[%d] => %d\n",
+			zuf_op_name(io->hdr.operation), io->ziom.iom_n,
+			max_bytes, _zufs_iom_opt_type(io_user->iom_e),
+			io->hdr.err);
+
+		io->hdr.err = zuf_iom_execute_sync(zdo->sb, zdo->inode,
+						   io_user->iom_e,
+						   io->ziom.iom_n);
+		err = EZUF_RETRY_DONE;
+	} else {
+		if (io->hdr.err == -EZUFS_RETRY)
+			zuf_err("ZUSfs violating API\n");
+		err = 0;
+	}
+
+	return err;
+}
+
+static int _IO_dispatch(struct zuf_sb_info *sbi, struct zufs_ioc_IO *IO,
+			struct zuf_inode_info *zii, int operation,
+			uint pgoffset, struct page **pages, uint nump,
+			u64 filepos, uint len)
+{
+	struct zuf_dispatch_op zdo;
+	int err;
+
+	IO->hdr.operation = operation;
+	IO->hdr.in_len = sizeof(*IO);
+	IO->hdr.out_len = sizeof(*IO);
+	IO->hdr.offset = pgoffset;
+	IO->hdr.len = len;
+	IO->zus_ii = zii->zus_ii;
+	IO->filepos = filepos;
+	IO->last_pos = filepos;
+
+	zuf_dispatch_init(&zdo, &IO->hdr, pages, nump);
+	zdo.oh = rw_overflow_handler;
+	zdo.sb = sbi->sb;
+	zdo.inode = &zii->vfs_inode;
+
+	zuf_dbg_verbose("[%ld][%s] fp=0x%llx nump=0x%x len=0x%x\n",
+			zdo.inode ? zdo.inode->i_ino : -1,
+			zuf_op_name(operation), filepos, nump, len);
+
+	err = __zufc_dispatch(ZUF_ROOT(sbi), &zdo);
+	if (unlikely(err == -EZUFS_RETRY)) {
+		zuf_err("Unexpected ZUS return => %d\n", err);
+		err = -EIO;
+	}
+	return err;
+}
+
+int _zuf_get_put_block(struct zuf_sb_info *sbi, struct zuf_inode_info *zii,
+			  enum e_zufs_operation op, int rw, ulong index,
+			  struct zufs_ioc_IO *get_block)
+{
+	if (op == ZUFS_OP_GET_BLOCK)
+		get_block->gp_block.rw = rw;
+	/* for put keep untouched, return as was set by server */
+
+	return _IO_dispatch(sbi, get_block, zii, op, 0, NULL, 0,
+			    md_p2o(index), 0);
+}
+
+int zuf_rw_read_page(struct zuf_sb_info *sbi, struct inode *inode,
+		     struct page *page, u64 filepos)
+{
+	struct zufs_ioc_IO io = {};
+	struct page *pages[1];
+	uint nump;
+	int err;
+
+	pages[0] = page;
+	nump = 1;
+
+	err = _IO_dispatch(sbi, &io, ZUII(inode), ZUFS_OP_READ, 0, pages, nump,
+			   filepos, PAGE_SIZE);
+	return err;
+}
+
 /* ZERO a part of a single block. len does not cross a block boundary */
 int zuf_trim_edge(struct inode *inode, ulong filepos, uint len)
 {
-	return -EIO;
+	struct page *zero_page = ZERO_PAGE(0);
+	struct zufs_ioc_IO io = {};
+	struct page *pages[1];
+	uint nump;
+	int err;
+
+	pages[0] = zero_page;
+	nump = 1;
+
+	err = _IO_dispatch(SBI(inode->i_sb), &io, ZUII(inode), ZUFS_OP_WRITE,
+			   0, pages, nump, filepos, len);
+	return err;
+
+}
+
+static struct page *_addr_to_page(unsigned long addr)
+{
+	const void *p = (const void *)addr;
+
+	return is_vmalloc_addr(p) ? vmalloc_to_page(p) : virt_to_page(p);
+}
+
+static ssize_t _iov_iter_get_pages_kvec(struct iov_iter *ii,
+		   struct page **pages, size_t maxsize, uint maxpages,
+		   size_t *start)
+{
+	ssize_t bytes;
+	size_t i, nump;
+	unsigned long addr = (unsigned long)ii->kvec->iov_base;
+
+	*start = addr & (PAGE_SIZE - 1);
+	bytes = min_t(ssize_t, iov_iter_single_seg_count(ii), maxsize);
+	nump = min_t(size_t, DIV_ROUND_UP(bytes + *start, PAGE_SIZE), maxpages);
+
+	/* TODO: FUSE assumes single page for ITER_KVEC. Boaz: Remove? */
+	WARN_ON(nump > 1);
+
+	for (i = 0; i < nump; ++i) {
+		pages[i] = _addr_to_page(addr + (i * PAGE_SIZE));
+
+		get_page(pages[i]);
+	}
+	return bytes;
+}
+
+static ssize_t _iov_iter_get_pages_any(struct iov_iter *ii,
+		   struct page **pages, size_t maxsize, uint maxpages,
+		   size_t *start)
+{
+	ssize_t bytes;
+
+	bytes = unlikely(ii->type & ITER_KVEC) ?
+		_iov_iter_get_pages_kvec(ii, pages, maxsize, maxpages, start) :
+		iov_iter_get_pages(ii, pages, maxsize, maxpages, start);
+
+	if (unlikely(bytes < 0))
+		zuf_dbg_err("[%d] bytes=%ld type=%d count=%lu",
+			smp_processor_id(), bytes, ii->type, ii->count);
+
+	return bytes;
+}
+
+static ssize_t _zufs_IO(struct zuf_sb_info *sbi, struct inode *inode,
+			struct file_ra_state *ra, int operation,
+			struct iov_iter *ii, loff_t pos, ulong flags)
+{
+	int err = 0;
+	loff_t start_pos = pos;
+
+	while (iov_iter_count(ii)) {
+		struct zufs_ioc_IO io = {};
+		struct page *pages[ZUS_API_MAP_MAX_PAGES];
+		uint nump;
+		ssize_t bytes;
+		size_t pgoffset;
+		uint i;
+
+		if (ra) {
+			io.ra.start		= ra->start;
+			io.ra.ra_pages	= ra->ra_pages;
+			io.ra.prev_pos	= ra->prev_pos;
+		}
+		io.flags = flags;
+
+		bytes = _iov_iter_get_pages_any(ii, pages,
+					ZUS_API_MAP_MAX_SIZE,
+					ZUS_API_MAP_MAX_PAGES, &pgoffset);
+		if (unlikely(bytes < 0)) {
+			err = bytes;
+			break;
+		}
+
+		nump = DIV_ROUND_UP(bytes + pgoffset, PAGE_SIZE);
+
+		err = _IO_dispatch(sbi, &io, ZUII(inode), operation,
+				   pgoffset, pages, nump, pos, bytes);
+
+		bytes = io.last_pos - pos;
+
+		iov_iter_advance(ii, bytes);
+		pos += bytes;
+
+		if (ra) {
+			ra->start	= io.ra.start;
+			ra->ra_pages	= io.ra.ra_pages;
+			ra->prev_pos	= io.ra.prev_pos;
+		}
+		if (io.wr_unmap.len)
+			unmap_mapping_range(inode->i_mapping,
+					    io.wr_unmap.offset,
+					    io.wr_unmap.len, 0);
+
+		for (i = 0; i < nump; ++i)
+			put_page(pages[i]);
+
+		if (unlikely(err))
+			break;
+	}
+
+	if (unlikely(pos == start_pos))
+		return err;
+	return pos - start_pos;
 }
 
 ssize_t zuf_rw_read_iter(struct super_block *sb, struct inode *inode,
 			 struct kiocb *kiocb, struct iov_iter *ii)
 {
-	return -EIO;
+	ssize_t ret;
+
+	/* EOF protection */
+	if (unlikely(kiocb->ki_pos > i_size_read(inode)))
+		return 0;
+
+	iov_iter_truncate(ii, i_size_read(inode) - kiocb->ki_pos);
+	if (unlikely(!iov_iter_count(ii))) {
+		/* Don't let zero len reads have any effect */
+		zuf_dbg_rw("called with NULL len\n");
+		return 0;
+	}
+
+	ret = _zufs_IO(SBI(sb), inode, kiocb_ra(kiocb), ZUFS_OP_READ, ii,
+		       kiocb->ki_pos, rand_tag(kiocb) ? ZUFS_IO_RAND : 0);
+	if (unlikely(ret < 0))
+		return ret;
+
+	kiocb->ki_pos += ret;
+	return ret;
 }
 
 ssize_t zuf_rw_write_iter(struct super_block *sb, struct inode *inode,
 			  struct kiocb *kiocb, struct iov_iter *ii)
 {
-	return -EIO;
+	ssize_t ret;
+	ulong flags = 0;
+
+	if (kiocb->ki_filp->f_flags & O_DSYNC ||
+	    IS_SYNC(kiocb->ki_filp->f_mapping->host))
+		flags = ZUFS_IO_DSYNC;
+	if (kiocb->ki_filp->f_flags & O_DIRECT)
+		flags |= ZUFS_IO_DIRECT;
+
+	ret = _zufs_IO(SBI(inode->i_sb), inode, NULL, ZUFS_OP_WRITE, ii,
+		       kiocb->ki_pos, flags);
+	if (unlikely(ret < 0))
+		return ret;
+
+	kiocb->ki_pos += ret;
+	return ret;
+}
+
+/* ~~~~ iom_dec.c ~~~ */
+/* for now here (at rw.c) looks logical */
+
+static int __iom_add_t2_io_len(struct super_block *sb, struct t2_io_state *tis,
+			       zu_dpp_t t1, ulong t2_bn, __u64 num_pages)
+{
+	void *ptr;
+	struct page *page;
+	int i, err;
+
+	ptr = zuf_dpp_t_addr(sb, t1);
+	if (unlikely(!ptr)) {
+		zuf_err("Bad t1 zu_dpp_t t1=0x%llx t2=0x%lx num_pages=0x%llx\n",
+			t1, t2_bn, num_pages);
+		return -EFAULT; /* zuf_dpp_t_addr already yeld */
+	}
+
+	page = virt_to_page(ptr);
+	if (unlikely(!page)) {
+		zuf_err("bad t1(0x%llx)\n", t1);
+		return -EFAULT;
+	}
+
+	for (i = 0; i < num_pages; ++i) {
+		err = t2_io_add(tis, t2_bn++, page++);
+		if (unlikely(err))
+			return err;
+	}
+	return 0;
+}
+
+static int iom_add_t2_io_len(struct super_block *sb, struct t2_io_state *tis,
+			     __u64 **cur_e)
+{
+	struct zufs_iom_t2_io_len *t2iol = (void *)*cur_e;
+	int err = __iom_add_t2_io_len(sb, tis, t2iol->iom.t1_val,
+				      _zufs_iom_first_val(&t2iol->iom.t2_val),
+				      t2iol->num_pages);
+
+	*cur_e = (void *)(t2iol + 1);
+	return err;
+}
+
+static int iom_add_t2_io(struct super_block *sb, struct t2_io_state *tis,
+			 __u64 **cur_e)
+{
+	struct zufs_iom_t2_io *t2io = (void *)*cur_e;
+
+	int err = __iom_add_t2_io_len(sb, tis, t2io->t1_val,
+				      _zufs_iom_first_val(&t2io->t2_val), 1);
+
+	*cur_e = (void *)(t2io + 1);
+	return err;
+}
+
+static int iom_t2_zusmem_io(struct super_block *sb, struct t2_io_state *tis,
+			    __u64 **cur_e)
+{
+	struct zufs_iom_t2_zusmem_io *mem_io = (void *)*cur_e;
+	ulong t2_bn = _zufs_iom_first_val(&mem_io->t2_val);
+	ulong user_ptr = (ulong)mem_io->zus_mem_ptr;
+	int rw = _zufs_iom_opt_type(*cur_e) == IOM_T2_ZUSMEM_WRITE ?
+						WRITE : READ;
+	int num_p = md_o2p_up(mem_io->len);
+	int num_p_r;
+	struct page *pages[16];
+	int i, err = 0;
+
+	if (16 < num_p) {
+		zuf_err("num_p(%d) > 16\n", num_p);
+		return -EINVAL;
+	}
+
+	num_p_r = get_user_pages_fast(user_ptr, num_p, rw,
+				      pages);
+	if (num_p_r != num_p) {
+		zuf_err("!!!! get_user_pages_fast num_p_r(%d) != num_p(%d)\n",
+			num_p_r, num_p);
+		err = -EFAULT;
+		goto out;
+	}
+
+	for (i = 0; i < num_p_r && !err; ++i)
+		err = t2_io_add(tis, t2_bn++, pages[i]);
+
+out:
+	for (i = 0; i < num_p_r; ++i)
+		put_page(pages[i]);
+
+	*cur_e = (void *)(mem_io + 1);
+	return err;
+}
+
+static int iom_unmap(struct super_block *sb, struct inode *inode, __u64 **cur_e)
+{
+	struct zufs_iom_unmap *iom_unmap = (void *)*cur_e;
+	struct inode *inode_look = NULL;
+	ulong	unmap_index = _zufs_iom_first_val(&iom_unmap->unmap_index);
+	ulong	unmap_n = iom_unmap->unmap_n;
+	ulong	ino = iom_unmap->ino;
+
+	if (!inode || ino) {
+		if (WARN_ON(!ino)) {
+			zuf_err("[%ld] 0x%lx-0x%lx\n",
+				inode ? inode->i_ino : -1, unmap_index,
+				unmap_n);
+			goto out;
+		}
+		inode_look = ilookup(sb, ino);
+		if (!inode_look) {
+			/* From the time we requested an unmap to now
+			 * inode was evicted from cache so surely it no longer
+			 * have any mappings. Cool job was already done for us.
+			 * Even if a racing thread reloads the inode it will
+			 * not have this mapping we wanted to clear, but only
+			 * new ones.
+			 * TODO: For now warn when this happen, because in
+			 *    current usage it cannot happen. But before
+			 *    upstream we should convert to zuf_dbg_err
+			 */
+			zuf_warn("[%ld] 0x%lx-0x%lx\n",
+				 ino, unmap_index, unmap_n);
+			goto out;
+		}
+
+		inode = inode_look;
+	}
+
+	zuf_dbg_rw("[%ld] 0x%lx-0x%lx\n", inode->i_ino, unmap_index, unmap_n);
+
+	unmap_mapping_range(inode->i_mapping, md_p2o(unmap_index),
+			    md_p2o(unmap_n), 0);
+
+	if (inode_look)
+		iput(inode_look);
+
+out:
+	*cur_e = (void *)(iom_unmap + 1);
+	return 0;
+}
+
+struct _iom_exec_info {
+	struct super_block *sb;
+	struct inode *inode;
+	struct t2_io_state *rd_tis;
+	struct t2_io_state *wr_tis;
+	__u64 *iom_e;
+	uint iom_n;
+	bool print;
+};
+
+static int _iom_execute_inline(struct _iom_exec_info *iei)
+{
+	__u64 *cur_e, *end_e;
+	int err = 0;
+#ifdef CONFIG_ZUF_DEBUG
+	uint wrs = 0;
+	uint rds = 0;
+	uint uns = 0;
+	uint wrmem = 0;
+	uint rdmem = 0;
+#	define	WRS()	(++wrs)
+#	define	RDS()	(++rds)
+#	define	UNS()	(++uns)
+#	define	WRMEM()	(++wrmem)
+#	define	RDMEM()	(++rdmem)
+#else
+#	define	WRS()
+#	define	RDS()
+#	define	UNS()
+#	define	WRMEM()
+#	define	RDMEM()
+#endif /* !def CONFIG_ZUF_DEBUG */
+
+	cur_e =  iei->iom_e;
+	end_e = cur_e + iei->iom_n;
+	while (cur_e && (cur_e < end_e)) {
+		uint op;
+
+		op = _zufs_iom_opt_type(cur_e);
+
+		switch (op) {
+		case IOM_NONE:
+			return 0;
+
+		case IOM_T2_WRITE:
+			err = iom_add_t2_io(iei->sb, iei->wr_tis, &cur_e);
+			WRS();
+			break;
+		case IOM_T2_READ:
+			err = iom_add_t2_io(iei->sb, iei->rd_tis, &cur_e);
+			RDS();
+			break;
+
+		case IOM_T2_WRITE_LEN:
+			err = iom_add_t2_io_len(iei->sb, iei->wr_tis, &cur_e);
+			WRS();
+			break;
+		case IOM_T2_READ_LEN:
+			err = iom_add_t2_io_len(iei->sb, iei->rd_tis, &cur_e);
+			RDS();
+			break;
+
+		case IOM_T2_ZUSMEM_WRITE:
+			err = iom_t2_zusmem_io(iei->sb, iei->wr_tis, &cur_e);
+			WRMEM();
+			break;
+		case IOM_T2_ZUSMEM_READ:
+			err = iom_t2_zusmem_io(iei->sb, iei->rd_tis, &cur_e);
+			RDMEM();
+			break;
+
+		case IOM_UNMAP:
+			err = iom_unmap(iei->sb, iei->inode, &cur_e);
+			UNS();
+			break;
+
+		default:
+			zuf_err("!!!!! Bad opt %d\n",
+				_zufs_iom_opt_type(cur_e));
+			err = -EIO;
+			break;
+		}
+
+		if (unlikely(err))
+			break;
+	}
+
+#ifdef CONFIG_ZUF_DEBUG
+	zuf_dbg_rw("exec wrs=%d rds=%d uns=%d rdmem=%d wrmem=%d => %d\n",
+		   wrs, rds, uns, rdmem, wrmem, err);
+#endif
+
+	return err;
+}
+
+/* inode here is the default inode if ioc_unmap->ino is zero
+ * this is an optimization for the unmap done at write_iter hot path.
+ */
+int zuf_iom_execute_sync(struct super_block *sb, struct inode *inode,
+			 __u64 *iom_e_user, uint iom_n)
+{
+	struct zuf_sb_info *sbi = SBI(sb);
+	struct t2_io_state rd_tis = {};
+	struct t2_io_state wr_tis = {};
+	struct _iom_exec_info iei = {};
+	int err, err_r, err_w;
+
+	t2_io_begin(sbi->md, READ, NULL, 0, -1, &rd_tis);
+	t2_io_begin(sbi->md, WRITE, NULL, 0, -1, &wr_tis);
+
+	iei.sb = sb;
+	iei.inode = inode;
+	iei.rd_tis = &rd_tis;
+	iei.wr_tis = &wr_tis;
+	iei.iom_e = iom_e_user;
+	iei.iom_n = iom_n;
+	iei.print = 0;
+
+	err = _iom_execute_inline(&iei);
+
+	err_r = t2_io_end(&rd_tis, true);
+	err_w = t2_io_end(&wr_tis, true);
+
+	/* TODO: not sure if OK when _iom_execute return with -ENOMEM
+	 * In such a case, we might be better of skiping t2_io_ends.
+	 */
+	return err ?: (err_r ?: err_w);
+}
+
+int zuf_iom_execute_async(struct super_block *sb, struct zus_iomap_build *iomb,
+			 __u64 *iom_e_user, uint iom_n)
+{
+	zuf_err("Async IOM NOT supported Yet!!!\n");
+	return -EFAULT;
 }
diff --git a/fs/zuf/zuf-core.c b/fs/zuf/zuf-core.c
index 96ffc6244daa..371c2e93dd81 100644
--- a/fs/zuf/zuf-core.c
+++ b/fs/zuf/zuf-core.c
@@ -657,7 +657,8 @@  static int _zu_wait(struct file *file, void *parg)
 		 * we should have a bit set in zt->zdo->hdr set per operation.
 		 * TODO: Why this does not work?
 		 */
-		_map_pages(zt, zt->zdo->pages, zt->zdo->nump, 0);
+		_map_pages(zt, zt->zdo->pages, zt->zdo->nump,
+			   zt->zdo->hdr->operation == ZUFS_OP_WRITE);
 		memcpy(zt->opt_buff, zt->zdo->hdr, zt->zdo->hdr->in_len);
 	} else {
 		struct zufs_ioc_hdr *hdr = zt->opt_buff;
@@ -776,6 +777,10 @@  const char *zuf_op_name(enum e_zufs_operation op)
 		CASE_ENUM_NAME(ZUFS_OP_READDIR		);
 		CASE_ENUM_NAME(ZUFS_OP_CLONE		);
 		CASE_ENUM_NAME(ZUFS_OP_COPY		);
+		CASE_ENUM_NAME(ZUFS_OP_READ		);
+		CASE_ENUM_NAME(ZUFS_OP_WRITE		);
+		CASE_ENUM_NAME(ZUFS_OP_GET_BLOCK	);
+		CASE_ENUM_NAME(ZUFS_OP_PUT_BLOCK	);
 		CASE_ENUM_NAME(ZUFS_OP_GET_SYMLINK	);
 		CASE_ENUM_NAME(ZUFS_OP_SETATTR		);
 		CASE_ENUM_NAME(ZUFS_OP_FALLOCATE	);
@@ -832,6 +837,30 @@  static inline struct zu_exec_buff *_ebuff_from_file(struct file *file)
 	return ebuff;
 }
 
+static int _ebuff_bounds_check(struct zu_exec_buff *ebuff, ulong buff,
+			       struct zufs_iomap *ziom,
+			       struct zufs_iomap *user_ziom, void *ziom_end)
+{
+	size_t iom_max_bytes = ziom_end - (void *)&user_ziom->iom_e;
+
+	if (buff != ebuff->vma->vm_start ||
+	    ebuff->vma->vm_end < buff + iom_max_bytes) {
+		WARN_ON_ONCE(1);
+		zuf_err("Executing out off bound vm_start=0x%lx vm_end=0x%lx buff=0x%lx buff_end=0x%lx\n",
+			ebuff->vma->vm_start, ebuff->vma->vm_end, buff,
+			buff + iom_max_bytes);
+		return -EINVAL;
+	}
+
+	if (unlikely((iom_max_bytes / sizeof(__u64) < ziom->iom_max)))
+		return -EINVAL;
+
+	if (unlikely(ziom->iom_max < ziom->iom_n))
+		return -EINVAL;
+
+	return 0;
+}
+
 static int _zu_ebuff_alloc(struct file *file, void *arg)
 {
 	struct zufs_ioc_alloc_buffer ioc_alloc;
@@ -884,6 +913,52 @@  static void zufc_ebuff_release(struct file *file)
 	kfree(ebuff);
 }
 
+static int _zu_iomap_exec(struct file *file, void *arg)
+{
+	struct zuf_root_info *zri = ZRI(file->f_inode->i_sb);
+	struct zu_exec_buff *ebuff = _ebuff_from_file(file);
+	struct zufs_ioc_iomap_exec ioc_iomap;
+	struct zufs_ioc_iomap_exec *user_iomap;
+
+	struct super_block *sb;
+	int err;
+
+	if (unlikely(!ebuff))
+		return -EINVAL;
+
+	user_iomap = ebuff->opt_buff;
+	/* do all checks on a kernel copy so malicious Server cannot
+	 * crash the Kernel
+	 */
+	ioc_iomap = *user_iomap;
+
+	err = _ebuff_bounds_check(ebuff, (ulong)arg, &ioc_iomap.ziom,
+				  &user_iomap->ziom,
+				  ebuff->opt_buff + ebuff->alloc_size);
+	if (unlikely(err)) {
+		zuf_err("illegal iomap: iom_max=%u iom_n=%u\n",
+			ioc_iomap.ziom.iom_max, ioc_iomap.ziom.iom_n);
+		return err;
+	}
+
+	/* The ID of the super block received in mount */
+	sb = zuf_sb_from_id(zri, ioc_iomap.sb_id, ioc_iomap.zus_sbi);
+	if (unlikely(!sb))
+		return -EINVAL;
+
+	if (ioc_iomap.wait_for_done)
+		err = zuf_iom_execute_sync(sb, NULL, user_iomap->ziom.iom_e,
+					   ioc_iomap.ziom.iom_n);
+	else
+		err =  zuf_iom_execute_async(sb, ioc_iomap.ziom.iomb,
+					     user_iomap->ziom.iom_e,
+					     ioc_iomap.ziom.iom_n);
+
+	user_iomap->hdr.err = err;
+	zuf_dbg_core("OUT => %d\n", err);
+	return 0; /* report err at hdr, but the command was executed */
+};
+
 static int _zu_break(struct file *filp, void *parg)
 {
 	struct zuf_root_info *zri = ZRI(filp->f_inode->i_sb);
@@ -928,6 +1003,8 @@  long zufc_ioctl(struct file *file, unsigned int cmd, ulong arg)
 		return _zu_wait(file, parg);
 	case ZU_IOC_ALLOC_BUFFER:
 		return _zu_ebuff_alloc(file, parg);
+	case ZU_IOC_IOMAP_EXEC:
+		return _zu_iomap_exec(file, parg);
 	case ZU_IOC_BREAK_ALL:
 		return _zu_break(file, parg);
 	default:
diff --git a/fs/zuf/zus_api.h b/fs/zuf/zus_api.h
index 32e8c2cae518..26b7b56f96c4 100644
--- a/fs/zuf/zus_api.h
+++ b/fs/zuf/zus_api.h
@@ -340,6 +340,10 @@  enum e_zufs_operation {
 	ZUFS_OP_CLONE,
 	ZUFS_OP_COPY,
 
+	ZUFS_OP_READ,
+	ZUFS_OP_WRITE,
+	ZUFS_OP_GET_BLOCK,
+	ZUFS_OP_PUT_BLOCK,
 	ZUFS_OP_GET_SYMLINK,
 	ZUFS_OP_SETATTR,
 	ZUFS_OP_FALLOCATE,
@@ -573,6 +577,116 @@  struct zufs_ioc_seek {
 	__u64 offset_out;
 };
 
+/* ~~~~ io_map structures && IOCTL(s) ~~~~ */
+/*
+ * These set of structures and helpers are used in return of zufs_ioc_IO and
+ * also at ZU_IOC_IOMAP_EXEC, NULL terminating list (array)
+ *
+ * Each iom_elemet stars with an __u64 of which the 8 hight bits carry an
+ * operation_type, And the 56 bits value denotes a page offset, (md_o2p()) or a
+ * length. operation_type is one of ZUFS_IOM_TYPE enum.
+ * The interpreter then jumps to the next operation depending on the size
+ * of the defined operation.
+ */
+
+enum ZUFS_IOM_TYPE {
+	IOM_NONE	= 0,
+	IOM_T1_WRITE	= 1,
+	IOM_T1_READ	= 2,
+
+	IOM_T2_WRITE	= 3,
+	IOM_T2_READ	= 4,
+	IOM_T2_WRITE_LEN = 5,
+	IOM_T2_READ_LEN	= 6,
+
+	IOM_T2_ZUSMEM_WRITE = 7,
+	IOM_T2_ZUSMEM_READ = 8,
+
+	IOM_UNMAP	= 9,
+
+	IOM_NUM_LEGAL_OPT,
+};
+
+#define ZUFS_IOM_VAL_BITS	56
+#define ZUFS_IOM_FIRST_VAL_MASK ((1UL << ZUFS_IOM_VAL_BITS) - 1)
+
+static inline ulong _zufs_iom_first_val(__u64 *iom_elemets)
+{
+	return *iom_elemets & ZUFS_IOM_FIRST_VAL_MASK;
+}
+
+static inline enum ZUFS_IOM_TYPE _zufs_iom_opt_type(__u64 *iom_e)
+{
+	uint ret = (*iom_e) >> ZUFS_IOM_VAL_BITS;
+
+	if (ret >= IOM_NUM_LEGAL_OPT)
+		return IOM_NONE;
+	return ret;
+}
+
+static inline bool _zufs_iom_pop(__u64 *iom_e)
+{
+	return _zufs_iom_opt_type(iom_e) != IOM_NONE;
+}
+
+/* IOM_T2_WRITE / IOM_T2_READ */
+struct zufs_iom_t2_io {
+	__u64	t2_val;
+	zu_dpp_t t1_val;
+};
+
+/* IOM_T2_WRITE_LEN / IOM_T2_READ_LEN */
+struct zufs_iom_t2_io_len {
+	struct zufs_iom_t2_io iom;
+	__u64 num_pages;
+} __packed;
+
+/* IOM_T2_ZUSMEM_WRITE / IOM_T2_ZUSMEM_READ */
+struct zufs_iom_t2_zusmem_io {
+	__u64	t2_val;
+	__u64	zus_mem_ptr; /* needs an get_user_pages() */
+	__u64	len;
+};
+
+/* IOM_UNMAP:
+ *	Executes unmap_mapping_range & remove of zuf's block-caching
+ *
+ * For now iom_unmap means even_cows=0, because Kernel takes care of all
+ * the cases of the even_cows=1. In future if needed it will be on the high
+ * bit of unmap_n.
+ */
+struct zufs_iom_unmap {
+	__u64	unmap_index;	/* Offset in pages of inode */
+	__u64	unmap_n;	/* Num pages to unmap (0 means: to eof) */
+	__u64	ino;		/* Pages of this inode */
+} __packed;
+
+#define ZUFS_WRITE_OP_SPACE						\
+	((sizeof(struct zufs_iom_unmap) +				\
+	  sizeof(struct zufs_iom_t2_io)) / sizeof(__u64) + sizeof(__u64))
+
+struct zus_iomap_build;
+/* For ZUFS_OP_IOM_DONE */
+struct zufs_ioc_iomap_done {
+	struct zufs_ioc_hdr hdr;
+	/* IN */
+	struct zus_sb_info *zus_sbi;
+
+	/* The cookie received from zufs_ioc_iomap_exec */
+	struct	zus_iomap_build *iomb;
+};
+
+struct zufs_iomap {
+	/* A cookie from zus to return when execution is done */
+	struct	zus_iomap_build *iomb;
+
+	__u32	iom_max;	/* num of __u64 allocated	 */
+	__u32	iom_n;		/* num of valid __u64 in iom_e	 */
+	__u64	iom_e[];	/* encoded operations to execute */
+
+	/* This struct must be last */
+};
+
 /* Allocate a special_file that will be a dual-port communication buffer with
  * user mode.
  * Server will access the buffer via the mmap of this file.
@@ -604,4 +718,81 @@  struct zufs_ioc_alloc_buffer {
 };
 #define ZU_IOC_ALLOC_BUFFER	_IOWR('Z', 17, struct zufs_ioc_init)
 
+/*
+ * Execute an iomap in behalf of the Server
+ *
+ * NOTE: this IOCTL must come on an above ZU_IOC_ALLOC_BUFFER type file
+ * and the passed arg-buffer must be the pointer returned from an mmap
+ * call preformed in the file, before the call to this IOC.
+ * If this is not done the IOCTL will return EINVAL.
+ */
+struct zufs_ioc_iomap_exec {
+	struct zufs_ioc_hdr hdr;
+	/* The ID of the super block received in mount */
+	__u64	sb_id;
+	/* We verify the sb_id validity against zus_sbi */
+	struct zus_sb_info *zus_sbi;
+	/* If application buffers they are from this IO*/
+	__u64	zt_iocontext;
+	/* Only return from IOCTL when finished. iomap_done NOT called */
+	__u32	wait_for_done;
+	__u32	__pad;
+
+	struct zufs_iomap ziom; /* must be last */
+};
+#define ZU_IOC_IOMAP_EXEC	_IOWR('Z', 18, struct zufs_ioc_iomap_exec)
+
+/*
+ * ZUFS_OP_READ / ZUFS_OP_WRITE
+ *       also
+ * ZUFS_OP_GET_BLOCK / ZUFS_OP_PUT_BLOCK
+ */
+/* flags for gp_block->ret_flags */
+enum {
+	ZUFS_GBF_RESERVED = 1,	/* Not used */
+	ZUFS_GBF_NEW = 2,	/* In write, allocated a new block need unmap */
+};
+
+/* zufs_ioc_IO extra write flags */
+#define ZUFS_IO_DSYNC	(1 << 0)
+#define ZUFS_IO_DIRECT	(1 << 1)
+#define ZUFS_IO_RAND	(1 << 2)
+
+struct zufs_ioc_IO {
+	struct zufs_ioc_hdr hdr;
+
+	/* IN */
+	struct zus_inode_info *zus_ii;
+	__u64 filepos;
+	__u64 flags;		/* read/write flags IN */
+
+	/* in / OUT */
+	union {
+		/* For reads */
+		struct __zufs_ra {
+			ulong start;
+			__u64 prev_pos;
+			__u32 ra_pages;
+			__u32 ra_pad; /* we need this*/
+		} ra;
+		/* For writes */
+		struct __zufs_write_unmap {
+			__u32  offset;
+			__u32  len;
+		} wr_unmap;
+		struct __zufs_get_put_block {
+			zu_dpp_t pmem_bn; /* zero means: map-read a hole */
+			__u32 rw;	  /* rw flags also return from GB */
+			__u32 ret_flags;  /* One of ZUFS_GBF_XXX */
+			void *priv;	  /**/
+		} gp_block;
+	};
+
+	/* The i_size I saw in this IO. If 0, than error code at .hdr.err */
+	__u64 last_pos;
+
+	struct zufs_iomap ziom;
+	__u64 iom_e[ZUFS_WRITE_OP_SPACE]; /* One tier_up for WRITE or GB */
+};
+
 #endif /* _LINUX_ZUFS_API_H */