diff mbox series

[RFC,3/4] ceph: introduce ceph_submit_write() method

Message ID 20250205000249.123054-4-slava@dubeyko.com (mailing list archive)
State New
Headers show
Series ceph: fix generic/421 test failure | expand

Commit Message

Viacheslav Dubeyko Feb. 5, 2025, 12:02 a.m. UTC
From: Viacheslav Dubeyko <Slava.Dubeyko@ibm.com>

Final responsibility of ceph_writepages_start() is
to submit write requests for processed dirty folios/pages.
The ceph_submit_write() summarize all this logic in
one method.

The generic/421 fails to finish because of the issue:

Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.894678] INFO: task kworker/u48:0:11 blocked for more than 122 seconds.
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.895403] Not tainted 6.13.0-rc5+ #1
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.895867] "echo 0 > /proc/sys/kernel/hung_task_timeout_secs" disables this message.
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.896633] task:kworker/u48:0 state:D stack:0 pid:11 tgid:11 ppid:2 flags:0x00004000
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.896641] Workqueue: writeback wb_workfn (flush-ceph-24)
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897614] Call Trace:
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897620] <TASK>
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897629] __schedule+0x443/0x16b0
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897637] schedule+0x2b/0x140
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897640] io_schedule+0x4c/0x80
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897643] folio_wait_bit_common+0x11b/0x310
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897646] ? _raw_spin_unlock_irq+0xe/0x50
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897652] ? __pfx_wake_page_function+0x10/0x10
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897655] __folio_lock+0x17/0x30
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897658] ceph_writepages_start+0xca9/0x1fb0
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897663] ? fsnotify_remove_queued_event+0x2f/0x40
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897668] do_writepages+0xd2/0x240
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897672] __writeback_single_inode+0x44/0x350
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897675] writeback_sb_inodes+0x25c/0x550
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897680] wb_writeback+0x89/0x310
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897683] ? finish_task_switch.isra.0+0x97/0x310
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897687] wb_workfn+0xb5/0x410
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897689] process_one_work+0x188/0x3d0
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897692] worker_thread+0x2b5/0x3c0
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897694] ? __pfx_worker_thread+0x10/0x10
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897696] kthread+0xe1/0x120
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897699] ? __pfx_kthread+0x10/0x10
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897701] ret_from_fork+0x43/0x70
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897705] ? __pfx_kthread+0x10/0x10
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897707] ret_from_fork_asm+0x1a/0x30
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897711] </TASK>

There are two problems here:

if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) {
     rc = -EIO;
     goto release_folios;
}

(1) ceph_kill_sb() doesn't wait ending of flushing
all dirty folios/pages because of racy nature of
mdsc->stopping_blockers. As a result, mdsc->stopping
becomes CEPH_MDSC_STOPPING_FLUSHED too early.
(2) The ceph_inc_osd_stopping_blocker(fsc->mdsc) fails
to increment mdsc->stopping_blockers. Finally,
already locked folios/pages are never been unlocked
and the logic tries to lock the same page second time.

This patch implements refactoring of ceph_submit_write()
and also it solves the second issue.

Signed-off-by: Viacheslav Dubeyko <Slava.Dubeyko@ibm.com>
---
 fs/ceph/addr.c | 461 +++++++++++++++++++++++++++----------------------
 1 file changed, 257 insertions(+), 204 deletions(-)

Comments

Matthew Wilcox Feb. 14, 2025, 9:11 p.m. UTC | #1
On Tue, Feb 04, 2025 at 04:02:48PM -0800, Viacheslav Dubeyko wrote:
> This patch implements refactoring of ceph_submit_write()
> and also it solves the second issue.
> 
> Signed-off-by: Viacheslav Dubeyko <Slava.Dubeyko@ibm.com>

This kind of giant refactoring to solve a bug is a really bad idea.
First, it's going to need to be backported to older kernels.  How far
back?  You need to identify that with a Fixes: line.

It's also really hard to review and know whether it's right.  You might
have introduced several new bugs while doing it.  In general, bugfixes
first, refactor later.  I *think* this means we can do without 1/7 of
the patches I resent earlier today, but it's really hard to be sure.
Viacheslav Dubeyko Feb. 14, 2025, 9:21 p.m. UTC | #2
On Fri, 2025-02-14 at 21:11 +0000, Matthew Wilcox wrote:
> On Tue, Feb 04, 2025 at 04:02:48PM -0800, Viacheslav Dubeyko wrote:
> > This patch implements refactoring of ceph_submit_write()
> > and also it solves the second issue.
> > 
> > Signed-off-by: Viacheslav Dubeyko <Slava.Dubeyko@ibm.com>
> 
> This kind of giant refactoring to solve a bug is a really bad idea.
> First, it's going to need to be backported to older kernels.  How far
> back?  You need to identify that with a Fixes: line.
> 
> It's also really hard to review and know whether it's right.  You might
> have introduced several new bugs while doing it.  In general, bugfixes
> first, refactor later.  I *think* this means we can do without 1/7 of
> the patches I resent earlier today, but it's really hard to be sure.

I really would like not to do refactoring here. But the function is huge and
logic is pretty complicated. It's hard to follow the logic of the method.
Refactoring is doing only of moving the existing code into smaller functions and
nothing more.

I can not to do the refactoring and simply add the fixes but function will be
bigger and more complex, from my point of view.

Thanks,
Slava.
diff mbox series

Patch

diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 739329846a07..02d20c000dc5 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1395,6 +1395,245 @@  int ceph_process_folio_batch(struct address_space *mapping,
 	return rc;
 }
 
+static inline
+void ceph_shift_unused_folios_left(struct folio_batch *fbatch)
+{
+	unsigned j, n = 0;
+
+	/* shift unused page to beginning of fbatch */
+	for (j = 0; j < folio_batch_count(fbatch); j++) {
+		if (!fbatch->folios[j])
+			continue;
+
+		if (n < j) {
+			fbatch->folios[n] = fbatch->folios[j];
+		}
+
+		n++;
+	}
+
+	fbatch->nr = n;
+}
+
+static
+int ceph_submit_write(struct address_space *mapping,
+			struct writeback_control *wbc,
+			struct ceph_writeback_ctl *ceph_wbc)
+{
+	struct inode *inode = mapping->host;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
+	struct ceph_client *cl = fsc->client;
+	struct ceph_vino vino = ceph_vino(inode);
+	struct ceph_osd_request *req = NULL;
+	struct page *page = NULL;
+	bool caching = ceph_is_cache_enabled(inode);
+	u64 offset;
+	u64 len;
+	unsigned i;
+
+new_request:
+	offset = ceph_fscrypt_page_offset(ceph_wbc->pages[0]);
+	len = ceph_wbc->wsize;
+
+	req = ceph_osdc_new_request(&fsc->client->osdc,
+				    &ci->i_layout, vino,
+				    offset, &len, 0, ceph_wbc->num_ops,
+				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
+				    ceph_wbc->snapc, ceph_wbc->truncate_seq,
+				    ceph_wbc->truncate_size, false);
+	if (IS_ERR(req)) {
+		req = ceph_osdc_new_request(&fsc->client->osdc,
+					    &ci->i_layout, vino,
+					    offset, &len, 0,
+					    min(ceph_wbc->num_ops,
+						CEPH_OSD_SLAB_OPS),
+					    CEPH_OSD_OP_WRITE,
+					    CEPH_OSD_FLAG_WRITE,
+					    ceph_wbc->snapc,
+					    ceph_wbc->truncate_seq,
+					    ceph_wbc->truncate_size,
+					    true);
+		BUG_ON(IS_ERR(req));
+	}
+
+	page = ceph_wbc->pages[ceph_wbc->locked_pages - 1];
+	BUG_ON(len < ceph_fscrypt_page_offset(page) + thp_size(page) - offset);
+
+	if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) {
+		for (i = 0; i < folio_batch_count(&ceph_wbc->fbatch); i++) {
+			struct folio *folio = ceph_wbc->fbatch.folios[i];
+
+			if (!folio)
+				continue;
+
+			page = &folio->page;
+			redirty_page_for_writepage(wbc, page);
+			unlock_page(page);
+		}
+
+		for (i = 0; i < ceph_wbc->locked_pages; i++) {
+			page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]);
+
+			if (!page)
+				continue;
+
+			redirty_page_for_writepage(wbc, page);
+			unlock_page(page);
+		}
+
+		ceph_osdc_put_request(req);
+		return -EIO;
+	}
+
+	req->r_callback = writepages_finish;
+	req->r_inode = inode;
+
+	/* Format the osd request message and submit the write */
+	len = 0;
+	ceph_wbc->data_pages = ceph_wbc->pages;
+	ceph_wbc->op_idx = 0;
+	for (i = 0; i < ceph_wbc->locked_pages; i++) {
+		u64 cur_offset;
+
+		page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]);
+		cur_offset = page_offset(page);
+
+		/*
+		 * Discontinuity in page range? Ceph can handle that by just passing
+		 * multiple extents in the write op.
+		 */
+		if (offset + len != cur_offset) {
+			/* If it's full, stop here */
+			if (ceph_wbc->op_idx + 1 == req->r_num_ops)
+				break;
+
+			/* Kick off an fscache write with what we have so far. */
+			ceph_fscache_write_to_cache(inode, offset, len, caching);
+
+			/* Start a new extent */
+			osd_req_op_extent_dup_last(req, ceph_wbc->op_idx,
+						   cur_offset - offset);
+
+			doutc(cl, "got pages at %llu~%llu\n", offset, len);
+
+			osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx,
+							 ceph_wbc->data_pages,
+							 len, 0,
+							 ceph_wbc->from_pool,
+							 false);
+			osd_req_op_extent_update(req, ceph_wbc->op_idx, len);
+
+			len = 0;
+			offset = cur_offset;
+			ceph_wbc->data_pages = ceph_wbc->pages + i;
+			ceph_wbc->op_idx++;
+		}
+
+		set_page_writeback(page);
+
+		if (caching)
+			ceph_set_page_fscache(page);
+
+		len += thp_size(page);
+	}
+
+	ceph_fscache_write_to_cache(inode, offset, len, caching);
+
+	if (ceph_wbc->size_stable) {
+		len = min(len, ceph_wbc->i_size - offset);
+	} else if (i == ceph_wbc->locked_pages) {
+		/* writepages_finish() clears writeback pages
+		 * according to the data length, so make sure
+		 * data length covers all locked pages */
+		u64 min_len = len + 1 - thp_size(page);
+		len = get_writepages_data_length(inode,
+						 ceph_wbc->pages[i - 1],
+						 offset);
+		len = max(len, min_len);
+	}
+
+	if (IS_ENCRYPTED(inode))
+		len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE);
+
+	doutc(cl, "got pages at %llu~%llu\n", offset, len);
+
+	if (IS_ENCRYPTED(inode) &&
+	    ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) {
+		pr_warn_client(cl,
+			"bad encrypted write offset=%lld len=%llu\n",
+			offset, len);
+	}
+
+	osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx,
+					 ceph_wbc->data_pages, len,
+					 0, ceph_wbc->from_pool, false);
+	osd_req_op_extent_update(req, ceph_wbc->op_idx, len);
+
+	BUG_ON(ceph_wbc->op_idx + 1 != req->r_num_ops);
+
+	ceph_wbc->from_pool = false;
+	if (i < ceph_wbc->locked_pages) {
+		BUG_ON(ceph_wbc->num_ops <= req->r_num_ops);
+		ceph_wbc->num_ops -= req->r_num_ops;
+		ceph_wbc->locked_pages -= i;
+
+		/* allocate new pages array for next request */
+		ceph_wbc->data_pages = ceph_wbc->pages;
+		__ceph_allocate_page_array(ceph_wbc, ceph_wbc->locked_pages);
+		memcpy(ceph_wbc->pages, ceph_wbc->data_pages + i,
+			ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages));
+		memset(ceph_wbc->data_pages + i, 0,
+			ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages));
+	} else {
+		BUG_ON(ceph_wbc->num_ops != req->r_num_ops);
+		/* request message now owns the pages array */
+		ceph_wbc->pages = NULL;
+	}
+
+	req->r_mtime = inode_get_mtime(inode);
+	ceph_osdc_start_request(&fsc->client->osdc, req);
+	req = NULL;
+
+	wbc->nr_to_write -= i;
+	if (ceph_wbc->pages)
+		goto new_request;
+
+	return 0;
+}
+
+static
+void ceph_wait_until_current_writes_complete(struct address_space *mapping,
+					     struct writeback_control *wbc,
+					     struct ceph_writeback_ctl *ceph_wbc)
+{
+	struct page *page;
+	unsigned i, nr;
+
+	if (wbc->sync_mode != WB_SYNC_NONE &&
+	    ceph_wbc->start_index == 0 && /* all dirty pages were checked */
+	    !ceph_wbc->head_snapc) {
+		ceph_wbc->index = 0;
+
+		while ((ceph_wbc->index <= ceph_wbc->end) &&
+			(nr = filemap_get_folios_tag(mapping,
+						     &ceph_wbc->index,
+						     (pgoff_t)-1,
+						     PAGECACHE_TAG_WRITEBACK,
+						     &ceph_wbc->fbatch))) {
+			for (i = 0; i < nr; i++) {
+				page = &ceph_wbc->fbatch.folios[i]->page;
+				if (page_snap_context(page) != ceph_wbc->snapc)
+					continue;
+				wait_on_page_writeback(page);
+			}
+
+			folio_batch_release(&ceph_wbc->fbatch);
+			cond_resched();
+		}
+	}
+}
+
 /*
  * initiate async writeback
  */
@@ -1402,17 +1641,12 @@  static int ceph_writepages_start(struct address_space *mapping,
 				 struct writeback_control *wbc)
 {
 	struct inode *inode = mapping->host;
-	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
 	struct ceph_client *cl = fsc->client;
-	struct ceph_vino vino = ceph_vino(inode);
 	struct ceph_writeback_ctl ceph_wbc;
-	struct ceph_osd_request *req = NULL;
 	int rc = 0;
-	bool caching = ceph_is_cache_enabled(inode);
 
-	if (wbc->sync_mode == WB_SYNC_NONE &&
-	    fsc->write_congested)
+	if (wbc->sync_mode == WB_SYNC_NONE && fsc->write_congested)
 		return 0;
 
 	doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode),
@@ -1439,9 +1673,6 @@  static int ceph_writepages_start(struct address_space *mapping,
 		tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end);
 
 	while (!has_writeback_done(&ceph_wbc)) {
-		unsigned i;
-		struct page *page;
-
 		ceph_wbc.locked_pages = 0;
 		ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT;
 
@@ -1459,6 +1690,7 @@  static int ceph_writepages_start(struct address_space *mapping,
 		if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages)
 			break;
 
+process_folio_batch:
 		rc = ceph_process_folio_batch(mapping, wbc, &ceph_wbc);
 		if (rc)
 			goto release_folios;
@@ -1466,187 +1698,30 @@  static int ceph_writepages_start(struct address_space *mapping,
 		/* did we get anything? */
 		if (!ceph_wbc.locked_pages)
 			goto release_folios;
-		if (i) {
-			unsigned j, n = 0;
-			/* shift unused page to beginning of fbatch */
-			for (j = 0; j < ceph_wbc.nr_folios; j++) {
-				if (!ceph_wbc.fbatch.folios[j])
-					continue;
-				if (n < j) {
-					ceph_wbc.fbatch.folios[n] =
-						ceph_wbc.fbatch.folios[j];
-				}
-				n++;
-			}
-			ceph_wbc.fbatch.nr = n;
 
-			if (ceph_wbc.nr_folios && i == ceph_wbc.nr_folios &&
+		if (ceph_wbc.processed_in_fbatch) {
+			ceph_shift_unused_folios_left(&ceph_wbc.fbatch);
+
+			if (folio_batch_count(&ceph_wbc.fbatch) == 0 &&
 			    ceph_wbc.locked_pages < ceph_wbc.max_pages) {
 				doutc(cl, "reached end fbatch, trying for more\n");
-				folio_batch_release(&ceph_wbc.fbatch);
 				goto get_more_pages;
 			}
 		}
 
-new_request:
-		ceph_wbc.offset = ceph_fscrypt_page_offset(ceph_wbc.pages[0]);
-		ceph_wbc.len = ceph_wbc.wsize;
-
-		req = ceph_osdc_new_request(&fsc->client->osdc,
-					&ci->i_layout, vino,
-					ceph_wbc.offset, &ceph_wbc.len,
-					0, ceph_wbc.num_ops,
-					CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
-					ceph_wbc.snapc, ceph_wbc.truncate_seq,
-					ceph_wbc.truncate_size, false);
-		if (IS_ERR(req)) {
-			req = ceph_osdc_new_request(&fsc->client->osdc,
-						&ci->i_layout, vino,
-						ceph_wbc.offset, &ceph_wbc.len,
-						0, min(ceph_wbc.num_ops,
-						    CEPH_OSD_SLAB_OPS),
-						CEPH_OSD_OP_WRITE,
-						CEPH_OSD_FLAG_WRITE,
-						ceph_wbc.snapc,
-						ceph_wbc.truncate_seq,
-						ceph_wbc.truncate_size, true);
-			BUG_ON(IS_ERR(req));
-		}
-		BUG_ON(ceph_wbc.len <
-			ceph_fscrypt_page_offset(ceph_wbc.pages[ceph_wbc.locked_pages - 1]) +
-				thp_size(ceph_wbc.pages[ceph_wbc.locked_pages - 1]) -
-					ceph_wbc.offset);
-
-		if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) {
-			rc = -EIO;
+		rc = ceph_submit_write(mapping, wbc, &ceph_wbc);
+		if (rc)
 			goto release_folios;
-		}
-		req->r_callback = writepages_finish;
-		req->r_inode = inode;
-
-		/* Format the osd request message and submit the write */
-		ceph_wbc.len = 0;
-		ceph_wbc.data_pages = ceph_wbc.pages;
-		ceph_wbc.op_idx = 0;
-		for (i = 0; i < ceph_wbc.locked_pages; i++) {
-			struct page *page =
-				ceph_fscrypt_pagecache_page(ceph_wbc.pages[i]);
-
-			u64 cur_offset = page_offset(page);
-			/*
-			 * Discontinuity in page range? Ceph can handle that by just passing
-			 * multiple extents in the write op.
-			 */
-			if (ceph_wbc.offset + ceph_wbc.len != cur_offset) {
-				/* If it's full, stop here */
-				if (ceph_wbc.op_idx + 1 == req->r_num_ops)
-					break;
-
-				/* Kick off an fscache write with what we have so far. */
-				ceph_fscache_write_to_cache(inode, ceph_wbc.offset,
-							    ceph_wbc.len, caching);
-
-				/* Start a new extent */
-				osd_req_op_extent_dup_last(req, ceph_wbc.op_idx,
-							   cur_offset -
-								ceph_wbc.offset);
-				doutc(cl, "got pages at %llu~%llu\n",
-					ceph_wbc.offset,
-					ceph_wbc.len);
-				osd_req_op_extent_osd_data_pages(req,
-							ceph_wbc.op_idx,
-							ceph_wbc.data_pages,
-							ceph_wbc.len, 0,
-							ceph_wbc.from_pool, false);
-				osd_req_op_extent_update(req, ceph_wbc.op_idx,
-							 ceph_wbc.len);
-
-				ceph_wbc.len = 0;
-				ceph_wbc.offset = cur_offset;
-				ceph_wbc.data_pages = ceph_wbc.pages + i;
-				ceph_wbc.op_idx++;
-			}
-
-			set_page_writeback(page);
-			if (caching)
-				ceph_set_page_fscache(page);
-			ceph_wbc.len += thp_size(page);
-		}
-		ceph_fscache_write_to_cache(inode, ceph_wbc.offset,
-					    ceph_wbc.len, caching);
-
-		if (ceph_wbc.size_stable) {
-			ceph_wbc.len = min(ceph_wbc.len,
-					    ceph_wbc.i_size - ceph_wbc.offset);
-		} else if (i == ceph_wbc.locked_pages) {
-			/* writepages_finish() clears writeback pages
-			 * according to the data length, so make sure
-			 * data length covers all locked pages */
-			u64 min_len = ceph_wbc.len + 1 - thp_size(page);
-			ceph_wbc.len =
-				get_writepages_data_length(inode,
-							ceph_wbc.pages[i - 1],
-							ceph_wbc.offset);
-			ceph_wbc.len = max(ceph_wbc.len, min_len);
-		}
-		if (IS_ENCRYPTED(inode)) {
-			ceph_wbc.len = round_up(ceph_wbc.len,
-						CEPH_FSCRYPT_BLOCK_SIZE);
-		}
 
-		doutc(cl, "got pages at %llu~%llu\n",
-			ceph_wbc.offset, ceph_wbc.len);
+		ceph_wbc.locked_pages = 0;
+		ceph_wbc.strip_unit_end = 0;
 
-		if (IS_ENCRYPTED(inode) &&
-		    ((ceph_wbc.offset | ceph_wbc.len) & ~CEPH_FSCRYPT_BLOCK_MASK))
-			pr_warn_client(cl,
-				"bad encrypted write offset=%lld len=%llu\n",
-				ceph_wbc.offset, ceph_wbc.len);
-
-		osd_req_op_extent_osd_data_pages(req, ceph_wbc.op_idx,
-						 ceph_wbc.data_pages,
-						 ceph_wbc.len,
-						 0, ceph_wbc.from_pool, false);
-		osd_req_op_extent_update(req, ceph_wbc.op_idx, ceph_wbc.len);
-
-		BUG_ON(ceph_wbc.op_idx + 1 != req->r_num_ops);
-
-		ceph_wbc.from_pool = false;
-		if (i < ceph_wbc.locked_pages) {
-			BUG_ON(ceph_wbc.num_ops <= req->r_num_ops);
-			ceph_wbc.num_ops -= req->r_num_ops;
-			ceph_wbc.locked_pages -= i;
-
-			/* allocate new pages array for next request */
-			ceph_wbc.data_pages = ceph_wbc.pages;
-			ceph_wbc.pages = kmalloc_array(ceph_wbc.locked_pages,
-							sizeof(*ceph_wbc.pages),
-							GFP_NOFS);
-			if (!ceph_wbc.pages) {
-				ceph_wbc.from_pool = true;
-				ceph_wbc.pages =
-					mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
-				BUG_ON(!ceph_wbc.pages);
-			}
-			memcpy(ceph_wbc.pages, ceph_wbc.data_pages + i,
-			       ceph_wbc.locked_pages * sizeof(*ceph_wbc.pages));
-			memset(ceph_wbc.data_pages + i, 0,
-			       ceph_wbc.locked_pages * sizeof(*ceph_wbc.pages));
-		} else {
-			BUG_ON(ceph_wbc.num_ops != req->r_num_ops);
-			ceph_wbc.index = ceph_wbc.pages[i - 1]->index + 1;
-			/* request message now owns the pages array */
-			ceph_wbc.pages = NULL;
+		if (folio_batch_count(&ceph_wbc.fbatch) > 0) {
+			ceph_wbc.nr_folios =
+				folio_batch_count(&ceph_wbc.fbatch);
+			goto process_folio_batch;
 		}
 
-		req->r_mtime = inode_get_mtime(inode);
-		ceph_osdc_start_request(&fsc->client->osdc, req);
-		req = NULL;
-
-		wbc->nr_to_write -= i;
-		if (ceph_wbc.pages)
-			goto new_request;
-
 		/*
 		 * We stop writing back only if we are not doing
 		 * integrity sync. In case of integrity sync we have to
@@ -1666,32 +1741,12 @@  static int ceph_writepages_start(struct address_space *mapping,
 	if (ceph_wbc.should_loop && !ceph_wbc.done) {
 		/* more to do; loop back to beginning of file */
 		doutc(cl, "looping back to beginning of file\n");
-		ceph_wbc.end = ceph_wbc.start_index - 1; /* OK even when start_index == 0 */
+		/* OK even when start_index == 0 */
+		ceph_wbc.end = ceph_wbc.start_index - 1;
 
 		/* to write dirty pages associated with next snapc,
 		 * we need to wait until current writes complete */
-		if (wbc->sync_mode != WB_SYNC_NONE &&
-		    ceph_wbc.start_index == 0 && /* all dirty pages were checked */
-		    !ceph_wbc.head_snapc) {
-			struct page *page;
-			unsigned i, nr;
-			ceph_wbc.index = 0;
-			while ((ceph_wbc.index <= ceph_wbc.end) &&
-			       (nr = filemap_get_folios_tag(mapping,
-						&ceph_wbc.index,
-						(pgoff_t)-1,
-						PAGECACHE_TAG_WRITEBACK,
-						&ceph_wbc.fbatch))) {
-				for (i = 0; i < nr; i++) {
-					page = &ceph_wbc.fbatch.folios[i]->page;
-					if (page_snap_context(page) != ceph_wbc.snapc)
-						continue;
-					wait_on_page_writeback(page);
-				}
-				folio_batch_release(&ceph_wbc.fbatch);
-				cond_resched();
-			}
-		}
+		ceph_wait_until_current_writes_complete(mapping, wbc, &ceph_wbc);
 
 		ceph_wbc.start_index = 0;
 		ceph_wbc.index = 0;
@@ -1702,15 +1757,13 @@  static int ceph_writepages_start(struct address_space *mapping,
 		mapping->writeback_index = ceph_wbc.index;
 
 out:
-	ceph_osdc_put_request(req);
 	ceph_put_snap_context(ceph_wbc.last_snapc);
 	doutc(cl, "%llx.%llx dend - startone, rc = %d\n", ceph_vinop(inode),
 	      rc);
+
 	return rc;
 }
 
-
-
 /*
  * See if a given @snapc is either writeable, or already written.
  */