Message ID | 1525446256-1882-3-git-send-email-idryomov@gmail.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On Fri, 2018-05-04 at 17:04 +0200, Ilya Dryomov wrote: > dio_get_pagev_size() and dio_get_pages_alloc() introduced in commit > b5b98989dc7e ("ceph: combine as many iovec as possile into one OSD > request") assume that the passed iov_iter is ITER_IOVEC. This isn't > the case with splice where it ends up poking into the guts of ITER_BVEC > or ITER_PIPE iterators, causing lockups and crashes easily reproduced > with generic/095. > > Rather than trying to figure out gap alignment and stuff pages into > a page vector, add a helper for going from iov_iter to a bio_vec array > and make use of the new CEPH_OSD_DATA_TYPE_BVECS code. > > Fixes: b5b98989dc7e ("ceph: combine as many iovec as possile into one OSD request") > Link: http://tracker.ceph.com/issues/18130 > Signed-off-by: Ilya Dryomov <idryomov@gmail.com> > --- > fs/ceph/file.c | 193 +++++++++++++++++++++++++++++++++------------------------ > 1 file changed, 113 insertions(+), 80 deletions(-) > > diff --git a/fs/ceph/file.c b/fs/ceph/file.c > index 8ce7849f3fbd..cb5daff99816 100644 > --- a/fs/ceph/file.c > +++ b/fs/ceph/file.c > @@ -69,70 +69,99 @@ static __le32 ceph_flags_sys2wire(u32 flags) > * need to wait for MDS acknowledgement. > */ > > -/* > - * Calculate the length sum of direct io vectors that can > - * be combined into one page vector. > - */ > -static size_t dio_get_pagev_size(const struct iov_iter *it) > +static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, > + struct bio_vec *bvecs) Can you add a comment explaining what this does, and what it returns? I assume it returns the number of bytes represented by the resulting bvec array. Maybe also mention that the bvec array must be preallocated. > { > - const struct iovec *iov = it->iov; > - const struct iovec *iovend = iov + it->nr_segs; > - size_t size; > - > - size = iov->iov_len - it->iov_offset; > - /* > - * An iov can be page vectored when both the current tail > - * and the next base are page aligned. > - */ > - while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) && > - (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) { > - size += iov->iov_len; > - } > - dout("dio_get_pagevlen len = %zu\n", size); > - return size; > + size_t size = 0; > + int bvec_idx = 0; > + > + if (maxsize > iov_iter_count(iter)) > + maxsize = iov_iter_count(iter); > + > + while (size < maxsize) { > + struct page *pages[64]; /* DIO_PAGES */ Why 64 here? Magic numbers are nasty. Can you #define a constant and plug it in here, and in the iov_iter_get_pages call below? > + ssize_t bytes; > + size_t start; > + int idx = 0; > + > + bytes = iov_iter_get_pages(iter, pages, maxsize - size, 64, > + &start); > + if (bytes < 0) > + return size ?: bytes; > + > + iov_iter_advance(iter, bytes); > + size += bytes; > + > + for ( ; bytes; idx++, bvec_idx++) { > + struct bio_vec bv = { > + .bv_page = pages[idx], > + .bv_len = min_t(int, bytes, PAGE_SIZE - start), > + .bv_offset = start, > + }; > + > + bvecs[bvec_idx] = bv; > + bytes -= bv.bv_len; > + start = 0; > + } > + } > + > + return size; > } > > /* > - * Allocate a page vector based on (@it, @nbytes). > - * The return value is the tuple describing a page vector, > - * that is (@pages, @page_align, @num_pages). > + * iov_iter_get_pages() only considers one iov_iter segment, no matter > + * what maxsize or maxpages are given. For ITER_BVEC that is a single > + * page. > + * > + * Attempt to get up to @maxsize bytes worth of pages from @iter. > + * Return the number of bytes in the created bio_vec array, or an error. > */ > -static struct page ** > -dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes, > - size_t *page_align, int *num_pages) > +static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize, > + struct bio_vec **bvecs, int *num_bvecs) > { > - struct iov_iter tmp_it = *it; > - size_t align; > - struct page **pages; > - int ret = 0, idx, npages; > + struct bio_vec *bv; > + size_t orig_count = iov_iter_count(iter); > + ssize_t bytes; > + int npages; > > - align = (unsigned long)(it->iov->iov_base + it->iov_offset) & > - (PAGE_SIZE - 1); > - npages = calc_pages_for(align, nbytes); > - pages = kvmalloc(sizeof(*pages) * npages, GFP_KERNEL); > - if (!pages) > - return ERR_PTR(-ENOMEM); > + iov_iter_truncate(iter, maxsize); > + npages = iov_iter_npages(iter, INT_MAX); > + iov_iter_reexpand(iter, orig_count); > > - for (idx = 0; idx < npages; ) { > - size_t start; > - ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes, > - npages - idx, &start); > - if (ret < 0) > - goto fail; > + /* > + * __iter_get_bvecs() may populate only part of the array -- zero it > + * out. > + */ > + bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO); > + if (!bv) > + return -ENOMEM; > > - iov_iter_advance(&tmp_it, ret); > - nbytes -= ret; > - idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE; > + bytes = __iter_get_bvecs(iter, maxsize, bv); > + if (bytes < 0) { > + /* > + * No pages were pinned -- just free the array. > + */ > + kvfree(bv); > + return bytes; > } > > - BUG_ON(nbytes != 0); > - *num_pages = npages; > - *page_align = align; > - dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align); > - return pages; > -fail: > - ceph_put_page_vector(pages, idx, false); > - return ERR_PTR(ret); > + *bvecs = bv; > + *num_bvecs = npages; > + return bytes; > +} > + > +static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty) > +{ > + int i; > + > + for (i = 0; i < num_bvecs; i++) { > + if (bvecs[i].bv_page) { > + if (should_dirty) > + set_page_dirty_lock(bvecs[i].bv_page); > + put_page(bvecs[i].bv_page); > + } > + } > + kvfree(bvecs); > } > > /* > @@ -746,11 +775,12 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > struct inode *inode = req->r_inode; > struct ceph_aio_request *aio_req = req->r_priv; > struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); > - int num_pages = calc_pages_for((u64)osd_data->alignment, > - osd_data->length); > > - dout("ceph_aio_complete_req %p rc %d bytes %llu\n", > - inode, rc, osd_data->length); > + BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS); > + BUG_ON(!osd_data->num_bvecs); > + > + dout("ceph_aio_complete_req %p rc %d bytes %u\n", > + inode, rc, osd_data->bvec_pos.iter.bi_size); > > if (rc == -EOLDSNAPC) { > struct ceph_aio_work *aio_work; > @@ -768,9 +798,10 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > } else if (!aio_req->write) { > if (rc == -ENOENT) > rc = 0; > - if (rc >= 0 && osd_data->length > rc) { > - int zoff = osd_data->alignment + rc; > - int zlen = osd_data->length - rc; > + if (rc >= 0 && osd_data->bvec_pos.iter.bi_size > rc) { > + struct iov_iter i; > + int zlen = osd_data->bvec_pos.iter.bi_size - rc; > + > /* > * If read is satisfied by single OSD request, > * it can pass EOF. Otherwise read is within > @@ -785,13 +816,16 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > aio_req->total_len = rc + zlen; > } > > - if (zlen > 0) > - ceph_zero_page_vector_range(zoff, zlen, > - osd_data->pages); > + iov_iter_bvec(&i, ITER_BVEC, osd_data->bvec_pos.bvecs, > + osd_data->num_bvecs, > + osd_data->bvec_pos.iter.bi_size); > + iov_iter_advance(&i, rc); > + iov_iter_zero(zlen, &i); > } > } > > - ceph_put_page_vector(osd_data->pages, num_pages, aio_req->should_dirty); > + put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs, > + aio_req->should_dirty); > ceph_osdc_put_request(req); > > if (rc < 0) > @@ -879,7 +913,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > struct ceph_vino vino; > struct ceph_osd_request *req; > - struct page **pages; > + struct bio_vec *bvecs; > struct ceph_aio_request *aio_req = NULL; > int num_pages = 0; > int flags; > @@ -914,8 +948,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > } > > while (iov_iter_count(iter) > 0) { > - u64 size = dio_get_pagev_size(iter); > - size_t start = 0; > + u64 size = iov_iter_count(iter); > ssize_t len; > > if (write) > @@ -938,13 +971,14 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > break; > } > > - len = size; > - pages = dio_get_pages_alloc(iter, len, &start, &num_pages); > - if (IS_ERR(pages)) { > + len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages); > + if (len < 0) { > ceph_osdc_put_request(req); > - ret = PTR_ERR(pages); > + ret = len; > break; > } > + if (len != size) > + osd_req_op_extent_update(req, 0, len); > > /* > * To simplify error handling, allow AIO when IO within i_size > @@ -977,8 +1011,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > req->r_mtime = mtime; > } > > - osd_req_op_extent_osd_data_pages(req, 0, pages, len, start, > - false, false); > + osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len); > > if (aio_req) { > aio_req->total_len += len; > @@ -991,7 +1024,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs); > > pos += len; > - iov_iter_advance(iter, len); > continue; > } > > @@ -1004,25 +1036,26 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > if (ret == -ENOENT) > ret = 0; > if (ret >= 0 && ret < len && pos + ret < size) { > + struct iov_iter i; > int zlen = min_t(size_t, len - ret, > size - pos - ret); > - ceph_zero_page_vector_range(start + ret, zlen, > - pages); > + > + iov_iter_bvec(&i, ITER_BVEC, bvecs, num_pages, > + len); > + iov_iter_advance(&i, ret); > + iov_iter_zero(zlen, &i); > ret += zlen; > } > if (ret >= 0) > len = ret; > } > > - ceph_put_page_vector(pages, num_pages, should_dirty); > - > + put_bvecs(bvecs, num_pages, should_dirty); > ceph_osdc_put_request(req); > if (ret < 0) > break; > > pos += len; > - iov_iter_advance(iter, len); > - > if (!write && pos >= size) > break; > Looks good otherwise. If you clean up the above, you can add Reviewed-by: Jeff Layton <jlayton@redhat.com> -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
On Fri, May 4, 2018 at 6:05 PM, Jeff Layton <jlayton@redhat.com> wrote: > On Fri, 2018-05-04 at 17:04 +0200, Ilya Dryomov wrote: >> dio_get_pagev_size() and dio_get_pages_alloc() introduced in commit >> b5b98989dc7e ("ceph: combine as many iovec as possile into one OSD >> request") assume that the passed iov_iter is ITER_IOVEC. This isn't >> the case with splice where it ends up poking into the guts of ITER_BVEC >> or ITER_PIPE iterators, causing lockups and crashes easily reproduced >> with generic/095. >> >> Rather than trying to figure out gap alignment and stuff pages into >> a page vector, add a helper for going from iov_iter to a bio_vec array >> and make use of the new CEPH_OSD_DATA_TYPE_BVECS code. >> >> Fixes: b5b98989dc7e ("ceph: combine as many iovec as possile into one OSD request") >> Link: http://tracker.ceph.com/issues/18130 >> Signed-off-by: Ilya Dryomov <idryomov@gmail.com> >> --- >> fs/ceph/file.c | 193 +++++++++++++++++++++++++++++++++------------------------ >> 1 file changed, 113 insertions(+), 80 deletions(-) >> >> diff --git a/fs/ceph/file.c b/fs/ceph/file.c >> index 8ce7849f3fbd..cb5daff99816 100644 >> --- a/fs/ceph/file.c >> +++ b/fs/ceph/file.c >> @@ -69,70 +69,99 @@ static __le32 ceph_flags_sys2wire(u32 flags) >> * need to wait for MDS acknowledgement. >> */ >> >> -/* >> - * Calculate the length sum of direct io vectors that can >> - * be combined into one page vector. >> - */ >> -static size_t dio_get_pagev_size(const struct iov_iter *it) >> +static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, >> + struct bio_vec *bvecs) > > Can you add a comment explaining what this does, and what it returns? I > assume it returns the number of bytes represented by the resulting bvec > array. Maybe also mention that the bvec array must be preallocated. iter_get_bvecs_alloc() comment below explains it. __iter_get_bvecs() is a private helper -- I didn't see a reason to duplicate most of that. > >> { >> - const struct iovec *iov = it->iov; >> - const struct iovec *iovend = iov + it->nr_segs; >> - size_t size; >> - >> - size = iov->iov_len - it->iov_offset; >> - /* >> - * An iov can be page vectored when both the current tail >> - * and the next base are page aligned. >> - */ >> - while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) && >> - (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) { >> - size += iov->iov_len; >> - } >> - dout("dio_get_pagevlen len = %zu\n", size); >> - return size; >> + size_t size = 0; >> + int bvec_idx = 0; >> + >> + if (maxsize > iov_iter_count(iter)) >> + maxsize = iov_iter_count(iter); >> + >> + while (size < maxsize) { >> + struct page *pages[64]; /* DIO_PAGES */ > > Why 64 here? Magic numbers are nasty. Can you #define a constant and > plug it in here, and in the iov_iter_get_pages call below? Added /* * How many pages to get in one call to iov_iter_get_pages(). This * determines the size of the on-stack array used as a buffer. */ #define ITER_GET_BVECS_PAGES 64 Thanks, Ilya -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
On Fri, May 4, 2018 at 11:04 PM, Ilya Dryomov <idryomov@gmail.com> wrote: > dio_get_pagev_size() and dio_get_pages_alloc() introduced in commit > b5b98989dc7e ("ceph: combine as many iovec as possile into one OSD > request") assume that the passed iov_iter is ITER_IOVEC. This isn't > the case with splice where it ends up poking into the guts of ITER_BVEC > or ITER_PIPE iterators, causing lockups and crashes easily reproduced > with generic/095. > > Rather than trying to figure out gap alignment and stuff pages into > a page vector, add a helper for going from iov_iter to a bio_vec array > and make use of the new CEPH_OSD_DATA_TYPE_BVECS code. > > Fixes: b5b98989dc7e ("ceph: combine as many iovec as possile into one OSD request") > Link: http://tracker.ceph.com/issues/18130 > Signed-off-by: Ilya Dryomov <idryomov@gmail.com> > --- > fs/ceph/file.c | 193 +++++++++++++++++++++++++++++++++------------------------ > 1 file changed, 113 insertions(+), 80 deletions(-) > > diff --git a/fs/ceph/file.c b/fs/ceph/file.c > index 8ce7849f3fbd..cb5daff99816 100644 > --- a/fs/ceph/file.c > +++ b/fs/ceph/file.c > @@ -69,70 +69,99 @@ static __le32 ceph_flags_sys2wire(u32 flags) > * need to wait for MDS acknowledgement. > */ > > -/* > - * Calculate the length sum of direct io vectors that can > - * be combined into one page vector. > - */ > -static size_t dio_get_pagev_size(const struct iov_iter *it) > +static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, > + struct bio_vec *bvecs) > { > - const struct iovec *iov = it->iov; > - const struct iovec *iovend = iov + it->nr_segs; > - size_t size; > - > - size = iov->iov_len - it->iov_offset; > - /* > - * An iov can be page vectored when both the current tail > - * and the next base are page aligned. > - */ > - while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) && > - (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) { > - size += iov->iov_len; > - } > - dout("dio_get_pagevlen len = %zu\n", size); > - return size; > + size_t size = 0; > + int bvec_idx = 0; > + > + if (maxsize > iov_iter_count(iter)) > + maxsize = iov_iter_count(iter); > + > + while (size < maxsize) { > + struct page *pages[64]; /* DIO_PAGES */ > + ssize_t bytes; > + size_t start; > + int idx = 0; > + > + bytes = iov_iter_get_pages(iter, pages, maxsize - size, 64, > + &start); > + if (bytes < 0) > + return size ?: bytes; > + > + iov_iter_advance(iter, bytes); > + size += bytes; > + > + for ( ; bytes; idx++, bvec_idx++) { > + struct bio_vec bv = { > + .bv_page = pages[idx], > + .bv_len = min_t(int, bytes, PAGE_SIZE - start), > + .bv_offset = start, > + }; > + > + bvecs[bvec_idx] = bv; > + bytes -= bv.bv_len; > + start = 0; > + } > + } > + > + return size; > } > > /* > - * Allocate a page vector based on (@it, @nbytes). > - * The return value is the tuple describing a page vector, > - * that is (@pages, @page_align, @num_pages). > + * iov_iter_get_pages() only considers one iov_iter segment, no matter > + * what maxsize or maxpages are given. For ITER_BVEC that is a single > + * page. > + * > + * Attempt to get up to @maxsize bytes worth of pages from @iter. > + * Return the number of bytes in the created bio_vec array, or an error. > */ > -static struct page ** > -dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes, > - size_t *page_align, int *num_pages) > +static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize, > + struct bio_vec **bvecs, int *num_bvecs) > { > - struct iov_iter tmp_it = *it; > - size_t align; > - struct page **pages; > - int ret = 0, idx, npages; > + struct bio_vec *bv; > + size_t orig_count = iov_iter_count(iter); > + ssize_t bytes; > + int npages; > > - align = (unsigned long)(it->iov->iov_base + it->iov_offset) & > - (PAGE_SIZE - 1); > - npages = calc_pages_for(align, nbytes); > - pages = kvmalloc(sizeof(*pages) * npages, GFP_KERNEL); > - if (!pages) > - return ERR_PTR(-ENOMEM); > + iov_iter_truncate(iter, maxsize); > + npages = iov_iter_npages(iter, INT_MAX); > + iov_iter_reexpand(iter, orig_count); > > - for (idx = 0; idx < npages; ) { > - size_t start; > - ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes, > - npages - idx, &start); > - if (ret < 0) > - goto fail; > + /* > + * __iter_get_bvecs() may populate only part of the array -- zero it > + * out. > + */ > + bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO); > + if (!bv) > + return -ENOMEM; > > - iov_iter_advance(&tmp_it, ret); > - nbytes -= ret; > - idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE; > + bytes = __iter_get_bvecs(iter, maxsize, bv); > + if (bytes < 0) { > + /* > + * No pages were pinned -- just free the array. > + */ > + kvfree(bv); > + return bytes; > } > > - BUG_ON(nbytes != 0); > - *num_pages = npages; > - *page_align = align; > - dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align); > - return pages; > -fail: > - ceph_put_page_vector(pages, idx, false); > - return ERR_PTR(ret); > + *bvecs = bv; > + *num_bvecs = npages; > + return bytes; > +} > + > +static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty) > +{ > + int i; > + > + for (i = 0; i < num_bvecs; i++) { > + if (bvecs[i].bv_page) { > + if (should_dirty) > + set_page_dirty_lock(bvecs[i].bv_page); > + put_page(bvecs[i].bv_page); > + } > + } > + kvfree(bvecs); > } > > /* > @@ -746,11 +775,12 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > struct inode *inode = req->r_inode; > struct ceph_aio_request *aio_req = req->r_priv; > struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); > - int num_pages = calc_pages_for((u64)osd_data->alignment, > - osd_data->length); > > - dout("ceph_aio_complete_req %p rc %d bytes %llu\n", > - inode, rc, osd_data->length); > + BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS); > + BUG_ON(!osd_data->num_bvecs); > + > + dout("ceph_aio_complete_req %p rc %d bytes %u\n", > + inode, rc, osd_data->bvec_pos.iter.bi_size); > > if (rc == -EOLDSNAPC) { > struct ceph_aio_work *aio_work; > @@ -768,9 +798,10 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > } else if (!aio_req->write) { > if (rc == -ENOENT) > rc = 0; > - if (rc >= 0 && osd_data->length > rc) { > - int zoff = osd_data->alignment + rc; > - int zlen = osd_data->length - rc; > + if (rc >= 0 && osd_data->bvec_pos.iter.bi_size > rc) { > + struct iov_iter i; > + int zlen = osd_data->bvec_pos.iter.bi_size - rc; > + > /* > * If read is satisfied by single OSD request, > * it can pass EOF. Otherwise read is within > @@ -785,13 +816,16 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > aio_req->total_len = rc + zlen; > } > > - if (zlen > 0) > - ceph_zero_page_vector_range(zoff, zlen, > - osd_data->pages); > + iov_iter_bvec(&i, ITER_BVEC, osd_data->bvec_pos.bvecs, > + osd_data->num_bvecs, > + osd_data->bvec_pos.iter.bi_size); > + iov_iter_advance(&i, rc); > + iov_iter_zero(zlen, &i); > } > } > > - ceph_put_page_vector(osd_data->pages, num_pages, aio_req->should_dirty); > + put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs, > + aio_req->should_dirty); > ceph_osdc_put_request(req); > > if (rc < 0) > @@ -879,7 +913,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > struct ceph_vino vino; > struct ceph_osd_request *req; > - struct page **pages; > + struct bio_vec *bvecs; > struct ceph_aio_request *aio_req = NULL; > int num_pages = 0; > int flags; > @@ -914,8 +948,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > } > > while (iov_iter_count(iter) > 0) { > - u64 size = dio_get_pagev_size(iter); > - size_t start = 0; > + u64 size = iov_iter_count(iter); > ssize_t len; > > if (write) > @@ -938,13 +971,14 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > break; > } > > - len = size; > - pages = dio_get_pages_alloc(iter, len, &start, &num_pages); > - if (IS_ERR(pages)) { > + len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages); > + if (len < 0) { > ceph_osdc_put_request(req); > - ret = PTR_ERR(pages); > + ret = len; > break; > } > + if (len != size) > + osd_req_op_extent_update(req, 0, len); > > /* > * To simplify error handling, allow AIO when IO within i_size > @@ -977,8 +1011,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > req->r_mtime = mtime; > } > > - osd_req_op_extent_osd_data_pages(req, 0, pages, len, start, > - false, false); > + osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len); > > if (aio_req) { > aio_req->total_len += len; > @@ -991,7 +1024,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs); > > pos += len; > - iov_iter_advance(iter, len); > continue; > } > > @@ -1004,25 +1036,26 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > if (ret == -ENOENT) > ret = 0; > if (ret >= 0 && ret < len && pos + ret < size) { > + struct iov_iter i; > int zlen = min_t(size_t, len - ret, > size - pos - ret); > - ceph_zero_page_vector_range(start + ret, zlen, > - pages); > + > + iov_iter_bvec(&i, ITER_BVEC, bvecs, num_pages, > + len); > + iov_iter_advance(&i, ret); > + iov_iter_zero(zlen, &i); > ret += zlen; > } > if (ret >= 0) > len = ret; > } > > - ceph_put_page_vector(pages, num_pages, should_dirty); > - > + put_bvecs(bvecs, num_pages, should_dirty); > ceph_osdc_put_request(req); > if (ret < 0) > break; > > pos += len; > - iov_iter_advance(iter, len); > - > if (!write && pos >= size) > break; > > -- > 2.4.3 > Reviewed-by: "Yan, Zheng" <zyan@redhat.com> Thanks for fixing this. Yan, Zheng > -- > To unsubscribe from this list: send the line "unsubscribe ceph-devel" in > the body of a message to majordomo@vger.kernel.org > More majordomo info at http://vger.kernel.org/majordomo-info.html -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 8ce7849f3fbd..cb5daff99816 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -69,70 +69,99 @@ static __le32 ceph_flags_sys2wire(u32 flags) * need to wait for MDS acknowledgement. */ -/* - * Calculate the length sum of direct io vectors that can - * be combined into one page vector. - */ -static size_t dio_get_pagev_size(const struct iov_iter *it) +static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, + struct bio_vec *bvecs) { - const struct iovec *iov = it->iov; - const struct iovec *iovend = iov + it->nr_segs; - size_t size; - - size = iov->iov_len - it->iov_offset; - /* - * An iov can be page vectored when both the current tail - * and the next base are page aligned. - */ - while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) && - (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) { - size += iov->iov_len; - } - dout("dio_get_pagevlen len = %zu\n", size); - return size; + size_t size = 0; + int bvec_idx = 0; + + if (maxsize > iov_iter_count(iter)) + maxsize = iov_iter_count(iter); + + while (size < maxsize) { + struct page *pages[64]; /* DIO_PAGES */ + ssize_t bytes; + size_t start; + int idx = 0; + + bytes = iov_iter_get_pages(iter, pages, maxsize - size, 64, + &start); + if (bytes < 0) + return size ?: bytes; + + iov_iter_advance(iter, bytes); + size += bytes; + + for ( ; bytes; idx++, bvec_idx++) { + struct bio_vec bv = { + .bv_page = pages[idx], + .bv_len = min_t(int, bytes, PAGE_SIZE - start), + .bv_offset = start, + }; + + bvecs[bvec_idx] = bv; + bytes -= bv.bv_len; + start = 0; + } + } + + return size; } /* - * Allocate a page vector based on (@it, @nbytes). - * The return value is the tuple describing a page vector, - * that is (@pages, @page_align, @num_pages). + * iov_iter_get_pages() only considers one iov_iter segment, no matter + * what maxsize or maxpages are given. For ITER_BVEC that is a single + * page. + * + * Attempt to get up to @maxsize bytes worth of pages from @iter. + * Return the number of bytes in the created bio_vec array, or an error. */ -static struct page ** -dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes, - size_t *page_align, int *num_pages) +static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize, + struct bio_vec **bvecs, int *num_bvecs) { - struct iov_iter tmp_it = *it; - size_t align; - struct page **pages; - int ret = 0, idx, npages; + struct bio_vec *bv; + size_t orig_count = iov_iter_count(iter); + ssize_t bytes; + int npages; - align = (unsigned long)(it->iov->iov_base + it->iov_offset) & - (PAGE_SIZE - 1); - npages = calc_pages_for(align, nbytes); - pages = kvmalloc(sizeof(*pages) * npages, GFP_KERNEL); - if (!pages) - return ERR_PTR(-ENOMEM); + iov_iter_truncate(iter, maxsize); + npages = iov_iter_npages(iter, INT_MAX); + iov_iter_reexpand(iter, orig_count); - for (idx = 0; idx < npages; ) { - size_t start; - ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes, - npages - idx, &start); - if (ret < 0) - goto fail; + /* + * __iter_get_bvecs() may populate only part of the array -- zero it + * out. + */ + bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO); + if (!bv) + return -ENOMEM; - iov_iter_advance(&tmp_it, ret); - nbytes -= ret; - idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE; + bytes = __iter_get_bvecs(iter, maxsize, bv); + if (bytes < 0) { + /* + * No pages were pinned -- just free the array. + */ + kvfree(bv); + return bytes; } - BUG_ON(nbytes != 0); - *num_pages = npages; - *page_align = align; - dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align); - return pages; -fail: - ceph_put_page_vector(pages, idx, false); - return ERR_PTR(ret); + *bvecs = bv; + *num_bvecs = npages; + return bytes; +} + +static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty) +{ + int i; + + for (i = 0; i < num_bvecs; i++) { + if (bvecs[i].bv_page) { + if (should_dirty) + set_page_dirty_lock(bvecs[i].bv_page); + put_page(bvecs[i].bv_page); + } + } + kvfree(bvecs); } /* @@ -746,11 +775,12 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) struct inode *inode = req->r_inode; struct ceph_aio_request *aio_req = req->r_priv; struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); - int num_pages = calc_pages_for((u64)osd_data->alignment, - osd_data->length); - dout("ceph_aio_complete_req %p rc %d bytes %llu\n", - inode, rc, osd_data->length); + BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS); + BUG_ON(!osd_data->num_bvecs); + + dout("ceph_aio_complete_req %p rc %d bytes %u\n", + inode, rc, osd_data->bvec_pos.iter.bi_size); if (rc == -EOLDSNAPC) { struct ceph_aio_work *aio_work; @@ -768,9 +798,10 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) } else if (!aio_req->write) { if (rc == -ENOENT) rc = 0; - if (rc >= 0 && osd_data->length > rc) { - int zoff = osd_data->alignment + rc; - int zlen = osd_data->length - rc; + if (rc >= 0 && osd_data->bvec_pos.iter.bi_size > rc) { + struct iov_iter i; + int zlen = osd_data->bvec_pos.iter.bi_size - rc; + /* * If read is satisfied by single OSD request, * it can pass EOF. Otherwise read is within @@ -785,13 +816,16 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) aio_req->total_len = rc + zlen; } - if (zlen > 0) - ceph_zero_page_vector_range(zoff, zlen, - osd_data->pages); + iov_iter_bvec(&i, ITER_BVEC, osd_data->bvec_pos.bvecs, + osd_data->num_bvecs, + osd_data->bvec_pos.iter.bi_size); + iov_iter_advance(&i, rc); + iov_iter_zero(zlen, &i); } } - ceph_put_page_vector(osd_data->pages, num_pages, aio_req->should_dirty); + put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs, + aio_req->should_dirty); ceph_osdc_put_request(req); if (rc < 0) @@ -879,7 +913,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_vino vino; struct ceph_osd_request *req; - struct page **pages; + struct bio_vec *bvecs; struct ceph_aio_request *aio_req = NULL; int num_pages = 0; int flags; @@ -914,8 +948,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, } while (iov_iter_count(iter) > 0) { - u64 size = dio_get_pagev_size(iter); - size_t start = 0; + u64 size = iov_iter_count(iter); ssize_t len; if (write) @@ -938,13 +971,14 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, break; } - len = size; - pages = dio_get_pages_alloc(iter, len, &start, &num_pages); - if (IS_ERR(pages)) { + len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages); + if (len < 0) { ceph_osdc_put_request(req); - ret = PTR_ERR(pages); + ret = len; break; } + if (len != size) + osd_req_op_extent_update(req, 0, len); /* * To simplify error handling, allow AIO when IO within i_size @@ -977,8 +1011,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, req->r_mtime = mtime; } - osd_req_op_extent_osd_data_pages(req, 0, pages, len, start, - false, false); + osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len); if (aio_req) { aio_req->total_len += len; @@ -991,7 +1024,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs); pos += len; - iov_iter_advance(iter, len); continue; } @@ -1004,25 +1036,26 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, if (ret == -ENOENT) ret = 0; if (ret >= 0 && ret < len && pos + ret < size) { + struct iov_iter i; int zlen = min_t(size_t, len - ret, size - pos - ret); - ceph_zero_page_vector_range(start + ret, zlen, - pages); + + iov_iter_bvec(&i, ITER_BVEC, bvecs, num_pages, + len); + iov_iter_advance(&i, ret); + iov_iter_zero(zlen, &i); ret += zlen; } if (ret >= 0) len = ret; } - ceph_put_page_vector(pages, num_pages, should_dirty); - + put_bvecs(bvecs, num_pages, should_dirty); ceph_osdc_put_request(req); if (ret < 0) break; pos += len; - iov_iter_advance(iter, len); - if (!write && pos >= size) break;
dio_get_pagev_size() and dio_get_pages_alloc() introduced in commit b5b98989dc7e ("ceph: combine as many iovec as possile into one OSD request") assume that the passed iov_iter is ITER_IOVEC. This isn't the case with splice where it ends up poking into the guts of ITER_BVEC or ITER_PIPE iterators, causing lockups and crashes easily reproduced with generic/095. Rather than trying to figure out gap alignment and stuff pages into a page vector, add a helper for going from iov_iter to a bio_vec array and make use of the new CEPH_OSD_DATA_TYPE_BVECS code. Fixes: b5b98989dc7e ("ceph: combine as many iovec as possile into one OSD request") Link: http://tracker.ceph.com/issues/18130 Signed-off-by: Ilya Dryomov <idryomov@gmail.com> --- fs/ceph/file.c | 193 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 113 insertions(+), 80 deletions(-)