diff mbox series

[RFC,11/35] ceph: Use ceph_databuf in DIO

Message ID 20250313233341.1675324-12-dhowells@redhat.com (mailing list archive)
State New
Headers show
Series ceph, rbd, netfs: Make ceph fully use netfslib | expand

Commit Message

David Howells March 13, 2025, 11:33 p.m. UTC
Stash the list of pages to be read into/written from during a ceph fs
direct read/write in a ceph_databuf struct rather than using a bvec array.
Eventually this will be replaced with just an iterator supplied by
netfslib.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Viacheslav Dubeyko <slava@dubeyko.com>
cc: Alex Markuze <amarkuze@redhat.com>
cc: Ilya Dryomov <idryomov@gmail.com>
cc: ceph-devel@vger.kernel.org
cc: linux-fsdevel@vger.kernel.org
---
 fs/ceph/file.c | 110 +++++++++++++++++++++----------------------------
 1 file changed, 47 insertions(+), 63 deletions(-)

Comments

Viacheslav Dubeyko March 17, 2025, 8:03 p.m. UTC | #1
On Thu, 2025-03-13 at 23:33 +0000, David Howells wrote:
> Stash the list of pages to be read into/written from during a ceph fs
> direct read/write in a ceph_databuf struct rather than using a bvec array.
> Eventually this will be replaced with just an iterator supplied by
> netfslib.
> 
> Signed-off-by: David Howells <dhowells@redhat.com>
> cc: Viacheslav Dubeyko <slava@dubeyko.com>
> cc: Alex Markuze <amarkuze@redhat.com>
> cc: Ilya Dryomov <idryomov@gmail.com>
> cc: ceph-devel@vger.kernel.org
> cc: linux-fsdevel@vger.kernel.org
> ---
>  fs/ceph/file.c | 110 +++++++++++++++++++++----------------------------
>  1 file changed, 47 insertions(+), 63 deletions(-)
> 
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 9de2960748b9..fb4024bc8274 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -82,11 +82,10 @@ static __le32 ceph_flags_sys2wire(struct ceph_mds_client *mdsc, u32 flags)
>   */
>  #define ITER_GET_BVECS_PAGES	64
>  
> -static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
> -				struct bio_vec *bvecs)
> +static int __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
> +			    struct ceph_databuf *dbuf)
>  {
>  	size_t size = 0;
> -	int bvec_idx = 0;
>  
>  	if (maxsize > iov_iter_count(iter))
>  		maxsize = iov_iter_count(iter);
> @@ -98,22 +97,24 @@ static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
>  		int idx = 0;
>  
>  		bytes = iov_iter_get_pages2(iter, pages, maxsize - size,
> -					   ITER_GET_BVECS_PAGES, &start);
> -		if (bytes < 0)
> -			return size ?: bytes;
> -
> -		size += bytes;
> +					    ITER_GET_BVECS_PAGES, &start);
> +		if (bytes < 0) {
> +			if (size == 0)
> +				return bytes;
> +			break;

I am slightly confused by 'break;' here. Do we have a loop around?

> +		}
>  
> -		for ( ; bytes; idx++, bvec_idx++) {
> +		while (bytes) {
>  			int len = min_t(int, bytes, PAGE_SIZE - start);
>  
> -			bvec_set_page(&bvecs[bvec_idx], pages[idx], len, start);
> +			ceph_databuf_append_page(dbuf, pages[idx++], start, len);
>  			bytes -= len;
> +			size += len;
>  			start = 0;
>  		}
>  	}
>  
> -	return size;
> +	return 0;

Do we really need to return zero here? It looks to me that we calculated the
size for returning here. Am I wrong?

>  }
>  
>  /*
> @@ -124,52 +125,44 @@ static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
>   * Attempt to get up to @maxsize bytes worth of pages from @iter.
>   * Return the number of bytes in the created bio_vec array, or an error.
>   */
> -static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize,
> -				    struct bio_vec **bvecs, int *num_bvecs)
> +static struct ceph_databuf *iter_get_bvecs_alloc(struct iov_iter *iter,
> +						 size_t maxsize, bool write)
>  {
> -	struct bio_vec *bv;
> +	struct ceph_databuf *dbuf;
>  	size_t orig_count = iov_iter_count(iter);
> -	ssize_t bytes;
> -	int npages;
> +	int npages, ret;
>  
>  	iov_iter_truncate(iter, maxsize);
>  	npages = iov_iter_npages(iter, INT_MAX);
>  	iov_iter_reexpand(iter, orig_count);
>  
> -	/*
> -	 * __iter_get_bvecs() may populate only part of the array -- zero it
> -	 * out.
> -	 */
> -	bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO);
> -	if (!bv)
> -		return -ENOMEM;
> +	if (write)
> +		dbuf = ceph_databuf_req_alloc(npages, 0, GFP_KERNEL);

I am still feeling confused of allocated npages of zero size. :)

> +	else
> +		dbuf = ceph_databuf_reply_alloc(npages, 0, GFP_KERNEL);
> +	if (!dbuf)
> +		return ERR_PTR(-ENOMEM);
>  
> -	bytes = __iter_get_bvecs(iter, maxsize, bv);
> -	if (bytes < 0) {
> +	ret = __iter_get_bvecs(iter, maxsize, dbuf);
> +	if (ret < 0) {
>  		/*
>  		 * No pages were pinned -- just free the array.
>  		 */
> -		kvfree(bv);
> -		return bytes;
> +		ceph_databuf_release(dbuf);
> +		return ERR_PTR(ret);
>  	}
>  
> -	*bvecs = bv;
> -	*num_bvecs = npages;
> -	return bytes;
> +	return dbuf;
>  }
>  
> -static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty)
> +static void ceph_dirty_pages(struct ceph_databuf *dbuf)

Does it mean that we never used should_dirty argument with false value? Or the
main goal of this method is always making the pages dirty?

>  {
> +	struct bio_vec *bvec = dbuf->bvec;
>  	int i;
>  
> -	for (i = 0; i < num_bvecs; i++) {
> -		if (bvecs[i].bv_page) {
> -			if (should_dirty)
> -				set_page_dirty_lock(bvecs[i].bv_page);
> -			put_page(bvecs[i].bv_page);

So, which code will put_page() now?

Thanks,
Slava.

> -		}
> -	}
> -	kvfree(bvecs);
> +	for (i = 0; i < dbuf->nr_bvec; i++)
> +		if (bvec[i].bv_page)
> +			set_page_dirty_lock(bvec[i].bv_page);
>  }
>  
>  /*
> @@ -1338,14 +1331,11 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
>  	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
>  	struct ceph_osd_req_op *op = &req->r_ops[0];
>  	struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric;
> -	unsigned int len = osd_data->bvec_pos.iter.bi_size;
> +	size_t len = osd_data->iter.count;
>  	bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ);
>  	struct ceph_client *cl = ceph_inode_to_client(inode);
>  
> -	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
> -	BUG_ON(!osd_data->num_bvecs);
> -
> -	doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %u\n", req,
> +	doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %zu\n", req,
>  	      inode, ceph_vinop(inode), rc, len);
>  
>  	if (rc == -EOLDSNAPC) {
> @@ -1367,7 +1357,6 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
>  		if (rc == -ENOENT)
>  			rc = 0;
>  		if (rc >= 0 && len > rc) {
> -			struct iov_iter i;
>  			int zlen = len - rc;
>  
>  			/*
> @@ -1384,10 +1373,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
>  				aio_req->total_len = rc + zlen;
>  			}
>  
> -			iov_iter_bvec(&i, ITER_DEST, osd_data->bvec_pos.bvecs,
> -				      osd_data->num_bvecs, len);
> -			iov_iter_advance(&i, rc);
> -			iov_iter_zero(zlen, &i);
> +			iov_iter_advance(&osd_data->iter, rc);
> +			iov_iter_zero(zlen, &osd_data->iter);
>  		}
>  	}
>  
> @@ -1401,8 +1388,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
>  						 req->r_end_latency, len, rc);
>  	}
>  
> -	put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs,
> -		  aio_req->should_dirty);
> +	if (aio_req->should_dirty)
> +		ceph_dirty_pages(osd_data->dbuf);
>  	ceph_osdc_put_request(req);
>  
>  	if (rc < 0)
> @@ -1491,9 +1478,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
>  	struct ceph_client_metric *metric = &fsc->mdsc->metric;
>  	struct ceph_vino vino;
>  	struct ceph_osd_request *req;
> -	struct bio_vec *bvecs;
>  	struct ceph_aio_request *aio_req = NULL;
> -	int num_pages = 0;
> +	struct ceph_databuf *dbuf = NULL;
>  	int flags;
>  	int ret = 0;
>  	struct timespec64 mtime = current_time(inode);
> @@ -1529,8 +1515,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
>  
>  	while (iov_iter_count(iter) > 0) {
>  		u64 size = iov_iter_count(iter);
> -		ssize_t len;
>  		struct ceph_osd_req_op *op;
> +		size_t len;
>  		int readop = sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ;
>  		int extent_cnt;
>  
> @@ -1563,16 +1549,17 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
>  			}
>  		}
>  
> -		len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages);
> -		if (len < 0) {
> +		dbuf = iter_get_bvecs_alloc(iter, size, write);
> +		if (IS_ERR(dbuf)) {
>  			ceph_osdc_put_request(req);
> -			ret = len;
> +			ret = PTR_ERR(dbuf);
>  			break;
>  		}
> +		len = ceph_databuf_len(dbuf);
>  		if (len != size)
>  			osd_req_op_extent_update(req, 0, len);
>  
> -		osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len);
> +		osd_req_op_extent_osd_databuf(req, 0, dbuf);
>  
>  		/*
>  		 * To simplify error handling, allow AIO when IO within i_size
> @@ -1637,20 +1624,17 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
>  				ret = 0;
>  
>  			if (ret >= 0 && ret < len && pos + ret < size) {
> -				struct iov_iter i;
>  				int zlen = min_t(size_t, len - ret,
>  						 size - pos - ret);
>  
> -				iov_iter_bvec(&i, ITER_DEST, bvecs, num_pages, len);
> -				iov_iter_advance(&i, ret);
> -				iov_iter_zero(zlen, &i);
> +				iov_iter_advance(&dbuf->iter, ret);
> +				iov_iter_zero(zlen, &dbuf->iter);
>  				ret += zlen;
>  			}
>  			if (ret >= 0)
>  				len = ret;
>  		}
>  
> -		put_bvecs(bvecs, num_pages, should_dirty);
>  		ceph_osdc_put_request(req);
>  		if (ret < 0)
>  			break;
> 
>
David Howells March 17, 2025, 10:26 p.m. UTC | #2
Viacheslav Dubeyko <Slava.Dubeyko@ibm.com> wrote:

> > +					    ITER_GET_BVECS_PAGES, &start);
> > +		if (bytes < 0) {
> > +			if (size == 0)
> > +				return bytes;
> > +			break;
> 
> I am slightly confused by 'break;' here. Do we have a loop around?

Yes.  You need to look at the original code as the while-directive didn't make
it into the patch context;-).

> > -	return size;
> > +	return 0;
> 
> Do we really need to return zero here? It looks to me that we calculated the
> size for returning here. Am I wrong?

The only caller only cares if an error is returned.  It doesn't actually care
about the size.  The size is stored in the databuf anyway.

> > +		dbuf = ceph_databuf_req_alloc(npages, 0, GFP_KERNEL);
> 
> I am still feeling confused of allocated npages of zero size. :)

That's not what it's saying.  It's allocating npages' worth of bio_vec[] and
not creating any bufferage.  The bio_vecs will be loaded from a DIO request.
As mentioned in a previous reply, it might be worth creating a separate
databuf API call for this case.

> > -static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty)
> > +static void ceph_dirty_pages(struct ceph_databuf *dbuf)
> 
> Does it mean that we never used should_dirty argument with false value? Or the
> main goal of this method is always making the pages dirty?
> 
> >  {
> > +	struct bio_vec *bvec = dbuf->bvec;
> >  	int i;
> >  
> > -	for (i = 0; i < num_bvecs; i++) {
> > -		if (bvecs[i].bv_page) {
> > -			if (should_dirty)
> > -				set_page_dirty_lock(bvecs[i].bv_page);
> > -			put_page(bvecs[i].bv_page);
> 
> So, which code will put_page() now?

The dirtying of pages is split from the putting of those pages.  The databuf
releaser puts the pages, but doesn't dirty them.  ceph_aio_complete_req()
needs to do that itself.  Netfslib does this on behalf of the filesystem and
switching to that will delegate the responsibility.

Also in future, netfslib will handle putting the page refs or unpinning the
pages as appropriate - and ceph should not then take refs on those pages
(indeed, as struct page is disintegrated into different types such as folios,
there may not even *be* a ref counter on some of the pages).

David
diff mbox series

Patch

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 9de2960748b9..fb4024bc8274 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -82,11 +82,10 @@  static __le32 ceph_flags_sys2wire(struct ceph_mds_client *mdsc, u32 flags)
  */
 #define ITER_GET_BVECS_PAGES	64
 
-static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
-				struct bio_vec *bvecs)
+static int __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
+			    struct ceph_databuf *dbuf)
 {
 	size_t size = 0;
-	int bvec_idx = 0;
 
 	if (maxsize > iov_iter_count(iter))
 		maxsize = iov_iter_count(iter);
@@ -98,22 +97,24 @@  static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
 		int idx = 0;
 
 		bytes = iov_iter_get_pages2(iter, pages, maxsize - size,
-					   ITER_GET_BVECS_PAGES, &start);
-		if (bytes < 0)
-			return size ?: bytes;
-
-		size += bytes;
+					    ITER_GET_BVECS_PAGES, &start);
+		if (bytes < 0) {
+			if (size == 0)
+				return bytes;
+			break;
+		}
 
-		for ( ; bytes; idx++, bvec_idx++) {
+		while (bytes) {
 			int len = min_t(int, bytes, PAGE_SIZE - start);
 
-			bvec_set_page(&bvecs[bvec_idx], pages[idx], len, start);
+			ceph_databuf_append_page(dbuf, pages[idx++], start, len);
 			bytes -= len;
+			size += len;
 			start = 0;
 		}
 	}
 
-	return size;
+	return 0;
 }
 
 /*
@@ -124,52 +125,44 @@  static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
  * Attempt to get up to @maxsize bytes worth of pages from @iter.
  * Return the number of bytes in the created bio_vec array, or an error.
  */
-static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize,
-				    struct bio_vec **bvecs, int *num_bvecs)
+static struct ceph_databuf *iter_get_bvecs_alloc(struct iov_iter *iter,
+						 size_t maxsize, bool write)
 {
-	struct bio_vec *bv;
+	struct ceph_databuf *dbuf;
 	size_t orig_count = iov_iter_count(iter);
-	ssize_t bytes;
-	int npages;
+	int npages, ret;
 
 	iov_iter_truncate(iter, maxsize);
 	npages = iov_iter_npages(iter, INT_MAX);
 	iov_iter_reexpand(iter, orig_count);
 
-	/*
-	 * __iter_get_bvecs() may populate only part of the array -- zero it
-	 * out.
-	 */
-	bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO);
-	if (!bv)
-		return -ENOMEM;
+	if (write)
+		dbuf = ceph_databuf_req_alloc(npages, 0, GFP_KERNEL);
+	else
+		dbuf = ceph_databuf_reply_alloc(npages, 0, GFP_KERNEL);
+	if (!dbuf)
+		return ERR_PTR(-ENOMEM);
 
-	bytes = __iter_get_bvecs(iter, maxsize, bv);
-	if (bytes < 0) {
+	ret = __iter_get_bvecs(iter, maxsize, dbuf);
+	if (ret < 0) {
 		/*
 		 * No pages were pinned -- just free the array.
 		 */
-		kvfree(bv);
-		return bytes;
+		ceph_databuf_release(dbuf);
+		return ERR_PTR(ret);
 	}
 
-	*bvecs = bv;
-	*num_bvecs = npages;
-	return bytes;
+	return dbuf;
 }
 
-static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty)
+static void ceph_dirty_pages(struct ceph_databuf *dbuf)
 {
+	struct bio_vec *bvec = dbuf->bvec;
 	int i;
 
-	for (i = 0; i < num_bvecs; i++) {
-		if (bvecs[i].bv_page) {
-			if (should_dirty)
-				set_page_dirty_lock(bvecs[i].bv_page);
-			put_page(bvecs[i].bv_page);
-		}
-	}
-	kvfree(bvecs);
+	for (i = 0; i < dbuf->nr_bvec; i++)
+		if (bvec[i].bv_page)
+			set_page_dirty_lock(bvec[i].bv_page);
 }
 
 /*
@@ -1338,14 +1331,11 @@  static void ceph_aio_complete_req(struct ceph_osd_request *req)
 	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
 	struct ceph_osd_req_op *op = &req->r_ops[0];
 	struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric;
-	unsigned int len = osd_data->bvec_pos.iter.bi_size;
+	size_t len = osd_data->iter.count;
 	bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ);
 	struct ceph_client *cl = ceph_inode_to_client(inode);
 
-	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
-	BUG_ON(!osd_data->num_bvecs);
-
-	doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %u\n", req,
+	doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %zu\n", req,
 	      inode, ceph_vinop(inode), rc, len);
 
 	if (rc == -EOLDSNAPC) {
@@ -1367,7 +1357,6 @@  static void ceph_aio_complete_req(struct ceph_osd_request *req)
 		if (rc == -ENOENT)
 			rc = 0;
 		if (rc >= 0 && len > rc) {
-			struct iov_iter i;
 			int zlen = len - rc;
 
 			/*
@@ -1384,10 +1373,8 @@  static void ceph_aio_complete_req(struct ceph_osd_request *req)
 				aio_req->total_len = rc + zlen;
 			}
 
-			iov_iter_bvec(&i, ITER_DEST, osd_data->bvec_pos.bvecs,
-				      osd_data->num_bvecs, len);
-			iov_iter_advance(&i, rc);
-			iov_iter_zero(zlen, &i);
+			iov_iter_advance(&osd_data->iter, rc);
+			iov_iter_zero(zlen, &osd_data->iter);
 		}
 	}
 
@@ -1401,8 +1388,8 @@  static void ceph_aio_complete_req(struct ceph_osd_request *req)
 						 req->r_end_latency, len, rc);
 	}
 
-	put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs,
-		  aio_req->should_dirty);
+	if (aio_req->should_dirty)
+		ceph_dirty_pages(osd_data->dbuf);
 	ceph_osdc_put_request(req);
 
 	if (rc < 0)
@@ -1491,9 +1478,8 @@  ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 	struct ceph_client_metric *metric = &fsc->mdsc->metric;
 	struct ceph_vino vino;
 	struct ceph_osd_request *req;
-	struct bio_vec *bvecs;
 	struct ceph_aio_request *aio_req = NULL;
-	int num_pages = 0;
+	struct ceph_databuf *dbuf = NULL;
 	int flags;
 	int ret = 0;
 	struct timespec64 mtime = current_time(inode);
@@ -1529,8 +1515,8 @@  ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 
 	while (iov_iter_count(iter) > 0) {
 		u64 size = iov_iter_count(iter);
-		ssize_t len;
 		struct ceph_osd_req_op *op;
+		size_t len;
 		int readop = sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ;
 		int extent_cnt;
 
@@ -1563,16 +1549,17 @@  ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 			}
 		}
 
-		len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages);
-		if (len < 0) {
+		dbuf = iter_get_bvecs_alloc(iter, size, write);
+		if (IS_ERR(dbuf)) {
 			ceph_osdc_put_request(req);
-			ret = len;
+			ret = PTR_ERR(dbuf);
 			break;
 		}
+		len = ceph_databuf_len(dbuf);
 		if (len != size)
 			osd_req_op_extent_update(req, 0, len);
 
-		osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len);
+		osd_req_op_extent_osd_databuf(req, 0, dbuf);
 
 		/*
 		 * To simplify error handling, allow AIO when IO within i_size
@@ -1637,20 +1624,17 @@  ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 				ret = 0;
 
 			if (ret >= 0 && ret < len && pos + ret < size) {
-				struct iov_iter i;
 				int zlen = min_t(size_t, len - ret,
 						 size - pos - ret);
 
-				iov_iter_bvec(&i, ITER_DEST, bvecs, num_pages, len);
-				iov_iter_advance(&i, ret);
-				iov_iter_zero(zlen, &i);
+				iov_iter_advance(&dbuf->iter, ret);
+				iov_iter_zero(zlen, &dbuf->iter);
 				ret += zlen;
 			}
 			if (ret >= 0)
 				len = ret;
 		}
 
-		put_bvecs(bvecs, num_pages, should_dirty);
 		ceph_osdc_put_request(req);
 		if (ret < 0)
 			break;