Message ID | 20250313233341.1675324-12-dhowells@redhat.com (mailing list archive) |
---|---|
State | New |
Headers | show |
Series | ceph, rbd, netfs: Make ceph fully use netfslib | expand |
On Thu, 2025-03-13 at 23:33 +0000, David Howells wrote: > Stash the list of pages to be read into/written from during a ceph fs > direct read/write in a ceph_databuf struct rather than using a bvec array. > Eventually this will be replaced with just an iterator supplied by > netfslib. > > Signed-off-by: David Howells <dhowells@redhat.com> > cc: Viacheslav Dubeyko <slava@dubeyko.com> > cc: Alex Markuze <amarkuze@redhat.com> > cc: Ilya Dryomov <idryomov@gmail.com> > cc: ceph-devel@vger.kernel.org > cc: linux-fsdevel@vger.kernel.org > --- > fs/ceph/file.c | 110 +++++++++++++++++++++---------------------------- > 1 file changed, 47 insertions(+), 63 deletions(-) > > diff --git a/fs/ceph/file.c b/fs/ceph/file.c > index 9de2960748b9..fb4024bc8274 100644 > --- a/fs/ceph/file.c > +++ b/fs/ceph/file.c > @@ -82,11 +82,10 @@ static __le32 ceph_flags_sys2wire(struct ceph_mds_client *mdsc, u32 flags) > */ > #define ITER_GET_BVECS_PAGES 64 > > -static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, > - struct bio_vec *bvecs) > +static int __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, > + struct ceph_databuf *dbuf) > { > size_t size = 0; > - int bvec_idx = 0; > > if (maxsize > iov_iter_count(iter)) > maxsize = iov_iter_count(iter); > @@ -98,22 +97,24 @@ static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, > int idx = 0; > > bytes = iov_iter_get_pages2(iter, pages, maxsize - size, > - ITER_GET_BVECS_PAGES, &start); > - if (bytes < 0) > - return size ?: bytes; > - > - size += bytes; > + ITER_GET_BVECS_PAGES, &start); > + if (bytes < 0) { > + if (size == 0) > + return bytes; > + break; I am slightly confused by 'break;' here. Do we have a loop around? > + } > > - for ( ; bytes; idx++, bvec_idx++) { > + while (bytes) { > int len = min_t(int, bytes, PAGE_SIZE - start); > > - bvec_set_page(&bvecs[bvec_idx], pages[idx], len, start); > + ceph_databuf_append_page(dbuf, pages[idx++], start, len); > bytes -= len; > + size += len; > start = 0; > } > } > > - return size; > + return 0; Do we really need to return zero here? It looks to me that we calculated the size for returning here. Am I wrong? > } > > /* > @@ -124,52 +125,44 @@ static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, > * Attempt to get up to @maxsize bytes worth of pages from @iter. > * Return the number of bytes in the created bio_vec array, or an error. > */ > -static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize, > - struct bio_vec **bvecs, int *num_bvecs) > +static struct ceph_databuf *iter_get_bvecs_alloc(struct iov_iter *iter, > + size_t maxsize, bool write) > { > - struct bio_vec *bv; > + struct ceph_databuf *dbuf; > size_t orig_count = iov_iter_count(iter); > - ssize_t bytes; > - int npages; > + int npages, ret; > > iov_iter_truncate(iter, maxsize); > npages = iov_iter_npages(iter, INT_MAX); > iov_iter_reexpand(iter, orig_count); > > - /* > - * __iter_get_bvecs() may populate only part of the array -- zero it > - * out. > - */ > - bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO); > - if (!bv) > - return -ENOMEM; > + if (write) > + dbuf = ceph_databuf_req_alloc(npages, 0, GFP_KERNEL); I am still feeling confused of allocated npages of zero size. :) > + else > + dbuf = ceph_databuf_reply_alloc(npages, 0, GFP_KERNEL); > + if (!dbuf) > + return ERR_PTR(-ENOMEM); > > - bytes = __iter_get_bvecs(iter, maxsize, bv); > - if (bytes < 0) { > + ret = __iter_get_bvecs(iter, maxsize, dbuf); > + if (ret < 0) { > /* > * No pages were pinned -- just free the array. > */ > - kvfree(bv); > - return bytes; > + ceph_databuf_release(dbuf); > + return ERR_PTR(ret); > } > > - *bvecs = bv; > - *num_bvecs = npages; > - return bytes; > + return dbuf; > } > > -static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty) > +static void ceph_dirty_pages(struct ceph_databuf *dbuf) Does it mean that we never used should_dirty argument with false value? Or the main goal of this method is always making the pages dirty? > { > + struct bio_vec *bvec = dbuf->bvec; > int i; > > - for (i = 0; i < num_bvecs; i++) { > - if (bvecs[i].bv_page) { > - if (should_dirty) > - set_page_dirty_lock(bvecs[i].bv_page); > - put_page(bvecs[i].bv_page); So, which code will put_page() now? Thanks, Slava. > - } > - } > - kvfree(bvecs); > + for (i = 0; i < dbuf->nr_bvec; i++) > + if (bvec[i].bv_page) > + set_page_dirty_lock(bvec[i].bv_page); > } > > /* > @@ -1338,14 +1331,11 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); > struct ceph_osd_req_op *op = &req->r_ops[0]; > struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric; > - unsigned int len = osd_data->bvec_pos.iter.bi_size; > + size_t len = osd_data->iter.count; > bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ); > struct ceph_client *cl = ceph_inode_to_client(inode); > > - BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS); > - BUG_ON(!osd_data->num_bvecs); > - > - doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %u\n", req, > + doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %zu\n", req, > inode, ceph_vinop(inode), rc, len); > > if (rc == -EOLDSNAPC) { > @@ -1367,7 +1357,6 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > if (rc == -ENOENT) > rc = 0; > if (rc >= 0 && len > rc) { > - struct iov_iter i; > int zlen = len - rc; > > /* > @@ -1384,10 +1373,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > aio_req->total_len = rc + zlen; > } > > - iov_iter_bvec(&i, ITER_DEST, osd_data->bvec_pos.bvecs, > - osd_data->num_bvecs, len); > - iov_iter_advance(&i, rc); > - iov_iter_zero(zlen, &i); > + iov_iter_advance(&osd_data->iter, rc); > + iov_iter_zero(zlen, &osd_data->iter); > } > } > > @@ -1401,8 +1388,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > req->r_end_latency, len, rc); > } > > - put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs, > - aio_req->should_dirty); > + if (aio_req->should_dirty) > + ceph_dirty_pages(osd_data->dbuf); > ceph_osdc_put_request(req); > > if (rc < 0) > @@ -1491,9 +1478,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > struct ceph_client_metric *metric = &fsc->mdsc->metric; > struct ceph_vino vino; > struct ceph_osd_request *req; > - struct bio_vec *bvecs; > struct ceph_aio_request *aio_req = NULL; > - int num_pages = 0; > + struct ceph_databuf *dbuf = NULL; > int flags; > int ret = 0; > struct timespec64 mtime = current_time(inode); > @@ -1529,8 +1515,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > > while (iov_iter_count(iter) > 0) { > u64 size = iov_iter_count(iter); > - ssize_t len; > struct ceph_osd_req_op *op; > + size_t len; > int readop = sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ; > int extent_cnt; > > @@ -1563,16 +1549,17 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > } > } > > - len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages); > - if (len < 0) { > + dbuf = iter_get_bvecs_alloc(iter, size, write); > + if (IS_ERR(dbuf)) { > ceph_osdc_put_request(req); > - ret = len; > + ret = PTR_ERR(dbuf); > break; > } > + len = ceph_databuf_len(dbuf); > if (len != size) > osd_req_op_extent_update(req, 0, len); > > - osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len); > + osd_req_op_extent_osd_databuf(req, 0, dbuf); > > /* > * To simplify error handling, allow AIO when IO within i_size > @@ -1637,20 +1624,17 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > ret = 0; > > if (ret >= 0 && ret < len && pos + ret < size) { > - struct iov_iter i; > int zlen = min_t(size_t, len - ret, > size - pos - ret); > > - iov_iter_bvec(&i, ITER_DEST, bvecs, num_pages, len); > - iov_iter_advance(&i, ret); > - iov_iter_zero(zlen, &i); > + iov_iter_advance(&dbuf->iter, ret); > + iov_iter_zero(zlen, &dbuf->iter); > ret += zlen; > } > if (ret >= 0) > len = ret; > } > > - put_bvecs(bvecs, num_pages, should_dirty); > ceph_osdc_put_request(req); > if (ret < 0) > break; > >
Viacheslav Dubeyko <Slava.Dubeyko@ibm.com> wrote: > > + ITER_GET_BVECS_PAGES, &start); > > + if (bytes < 0) { > > + if (size == 0) > > + return bytes; > > + break; > > I am slightly confused by 'break;' here. Do we have a loop around? Yes. You need to look at the original code as the while-directive didn't make it into the patch context;-). > > - return size; > > + return 0; > > Do we really need to return zero here? It looks to me that we calculated the > size for returning here. Am I wrong? The only caller only cares if an error is returned. It doesn't actually care about the size. The size is stored in the databuf anyway. > > + dbuf = ceph_databuf_req_alloc(npages, 0, GFP_KERNEL); > > I am still feeling confused of allocated npages of zero size. :) That's not what it's saying. It's allocating npages' worth of bio_vec[] and not creating any bufferage. The bio_vecs will be loaded from a DIO request. As mentioned in a previous reply, it might be worth creating a separate databuf API call for this case. > > -static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty) > > +static void ceph_dirty_pages(struct ceph_databuf *dbuf) > > Does it mean that we never used should_dirty argument with false value? Or the > main goal of this method is always making the pages dirty? > > > { > > + struct bio_vec *bvec = dbuf->bvec; > > int i; > > > > - for (i = 0; i < num_bvecs; i++) { > > - if (bvecs[i].bv_page) { > > - if (should_dirty) > > - set_page_dirty_lock(bvecs[i].bv_page); > > - put_page(bvecs[i].bv_page); > > So, which code will put_page() now? The dirtying of pages is split from the putting of those pages. The databuf releaser puts the pages, but doesn't dirty them. ceph_aio_complete_req() needs to do that itself. Netfslib does this on behalf of the filesystem and switching to that will delegate the responsibility. Also in future, netfslib will handle putting the page refs or unpinning the pages as appropriate - and ceph should not then take refs on those pages (indeed, as struct page is disintegrated into different types such as folios, there may not even *be* a ref counter on some of the pages). David
diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 9de2960748b9..fb4024bc8274 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -82,11 +82,10 @@ static __le32 ceph_flags_sys2wire(struct ceph_mds_client *mdsc, u32 flags) */ #define ITER_GET_BVECS_PAGES 64 -static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, - struct bio_vec *bvecs) +static int __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, + struct ceph_databuf *dbuf) { size_t size = 0; - int bvec_idx = 0; if (maxsize > iov_iter_count(iter)) maxsize = iov_iter_count(iter); @@ -98,22 +97,24 @@ static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, int idx = 0; bytes = iov_iter_get_pages2(iter, pages, maxsize - size, - ITER_GET_BVECS_PAGES, &start); - if (bytes < 0) - return size ?: bytes; - - size += bytes; + ITER_GET_BVECS_PAGES, &start); + if (bytes < 0) { + if (size == 0) + return bytes; + break; + } - for ( ; bytes; idx++, bvec_idx++) { + while (bytes) { int len = min_t(int, bytes, PAGE_SIZE - start); - bvec_set_page(&bvecs[bvec_idx], pages[idx], len, start); + ceph_databuf_append_page(dbuf, pages[idx++], start, len); bytes -= len; + size += len; start = 0; } } - return size; + return 0; } /* @@ -124,52 +125,44 @@ static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, * Attempt to get up to @maxsize bytes worth of pages from @iter. * Return the number of bytes in the created bio_vec array, or an error. */ -static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize, - struct bio_vec **bvecs, int *num_bvecs) +static struct ceph_databuf *iter_get_bvecs_alloc(struct iov_iter *iter, + size_t maxsize, bool write) { - struct bio_vec *bv; + struct ceph_databuf *dbuf; size_t orig_count = iov_iter_count(iter); - ssize_t bytes; - int npages; + int npages, ret; iov_iter_truncate(iter, maxsize); npages = iov_iter_npages(iter, INT_MAX); iov_iter_reexpand(iter, orig_count); - /* - * __iter_get_bvecs() may populate only part of the array -- zero it - * out. - */ - bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO); - if (!bv) - return -ENOMEM; + if (write) + dbuf = ceph_databuf_req_alloc(npages, 0, GFP_KERNEL); + else + dbuf = ceph_databuf_reply_alloc(npages, 0, GFP_KERNEL); + if (!dbuf) + return ERR_PTR(-ENOMEM); - bytes = __iter_get_bvecs(iter, maxsize, bv); - if (bytes < 0) { + ret = __iter_get_bvecs(iter, maxsize, dbuf); + if (ret < 0) { /* * No pages were pinned -- just free the array. */ - kvfree(bv); - return bytes; + ceph_databuf_release(dbuf); + return ERR_PTR(ret); } - *bvecs = bv; - *num_bvecs = npages; - return bytes; + return dbuf; } -static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty) +static void ceph_dirty_pages(struct ceph_databuf *dbuf) { + struct bio_vec *bvec = dbuf->bvec; int i; - for (i = 0; i < num_bvecs; i++) { - if (bvecs[i].bv_page) { - if (should_dirty) - set_page_dirty_lock(bvecs[i].bv_page); - put_page(bvecs[i].bv_page); - } - } - kvfree(bvecs); + for (i = 0; i < dbuf->nr_bvec; i++) + if (bvec[i].bv_page) + set_page_dirty_lock(bvec[i].bv_page); } /* @@ -1338,14 +1331,11 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); struct ceph_osd_req_op *op = &req->r_ops[0]; struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric; - unsigned int len = osd_data->bvec_pos.iter.bi_size; + size_t len = osd_data->iter.count; bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ); struct ceph_client *cl = ceph_inode_to_client(inode); - BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS); - BUG_ON(!osd_data->num_bvecs); - - doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %u\n", req, + doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %zu\n", req, inode, ceph_vinop(inode), rc, len); if (rc == -EOLDSNAPC) { @@ -1367,7 +1357,6 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) if (rc == -ENOENT) rc = 0; if (rc >= 0 && len > rc) { - struct iov_iter i; int zlen = len - rc; /* @@ -1384,10 +1373,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) aio_req->total_len = rc + zlen; } - iov_iter_bvec(&i, ITER_DEST, osd_data->bvec_pos.bvecs, - osd_data->num_bvecs, len); - iov_iter_advance(&i, rc); - iov_iter_zero(zlen, &i); + iov_iter_advance(&osd_data->iter, rc); + iov_iter_zero(zlen, &osd_data->iter); } } @@ -1401,8 +1388,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) req->r_end_latency, len, rc); } - put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs, - aio_req->should_dirty); + if (aio_req->should_dirty) + ceph_dirty_pages(osd_data->dbuf); ceph_osdc_put_request(req); if (rc < 0) @@ -1491,9 +1478,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, struct ceph_client_metric *metric = &fsc->mdsc->metric; struct ceph_vino vino; struct ceph_osd_request *req; - struct bio_vec *bvecs; struct ceph_aio_request *aio_req = NULL; - int num_pages = 0; + struct ceph_databuf *dbuf = NULL; int flags; int ret = 0; struct timespec64 mtime = current_time(inode); @@ -1529,8 +1515,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, while (iov_iter_count(iter) > 0) { u64 size = iov_iter_count(iter); - ssize_t len; struct ceph_osd_req_op *op; + size_t len; int readop = sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ; int extent_cnt; @@ -1563,16 +1549,17 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, } } - len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages); - if (len < 0) { + dbuf = iter_get_bvecs_alloc(iter, size, write); + if (IS_ERR(dbuf)) { ceph_osdc_put_request(req); - ret = len; + ret = PTR_ERR(dbuf); break; } + len = ceph_databuf_len(dbuf); if (len != size) osd_req_op_extent_update(req, 0, len); - osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len); + osd_req_op_extent_osd_databuf(req, 0, dbuf); /* * To simplify error handling, allow AIO when IO within i_size @@ -1637,20 +1624,17 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, ret = 0; if (ret >= 0 && ret < len && pos + ret < size) { - struct iov_iter i; int zlen = min_t(size_t, len - ret, size - pos - ret); - iov_iter_bvec(&i, ITER_DEST, bvecs, num_pages, len); - iov_iter_advance(&i, ret); - iov_iter_zero(zlen, &i); + iov_iter_advance(&dbuf->iter, ret); + iov_iter_zero(zlen, &dbuf->iter); ret += zlen; } if (ret >= 0) len = ret; } - put_bvecs(bvecs, num_pages, should_dirty); ceph_osdc_put_request(req); if (ret < 0) break;
Stash the list of pages to be read into/written from during a ceph fs direct read/write in a ceph_databuf struct rather than using a bvec array. Eventually this will be replaced with just an iterator supplied by netfslib. Signed-off-by: David Howells <dhowells@redhat.com> cc: Viacheslav Dubeyko <slava@dubeyko.com> cc: Alex Markuze <amarkuze@redhat.com> cc: Ilya Dryomov <idryomov@gmail.com> cc: ceph-devel@vger.kernel.org cc: linux-fsdevel@vger.kernel.org --- fs/ceph/file.c | 110 +++++++++++++++++++++---------------------------- 1 file changed, 47 insertions(+), 63 deletions(-)