@@ -2434,6 +2434,30 @@ void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
/*
+ * We can end up releasing caps as a result of abort_request().
+ * In that case, we probably want to ensure that the cap release message
+ * has an updated epoch barrier in it, so set the epoch barrier prior to
+ * aborting the first request.
+ */
+static int abort_on_full_fn(struct ceph_osd_request *req, void *arg)
+{
+ struct ceph_osd_client *osdc = req->r_osdc;
+ bool *victims = arg;
+
+ if (req->r_abort_on_full &&
+ (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
+ pool_full(osdc, req->r_t.target_oloc.pool))) {
+ if (!*victims) {
+ update_epoch_barrier(osdc, osdc->osdmap->epoch);
+ *victims = true;
+ }
+ abort_request(req, -ENOSPC);
+ }
+
+ return 0; /* continue iteration */
+}
+
+/*
* Drop all pending requests that are stalled waiting on a full condition to
* clear, and complete them with ENOSPC as the return code. Set the
* osdc->epoch_barrier to the latest map epoch that we've seen if any were
@@ -2441,61 +2465,10 @@ EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
*/
static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
{
- struct rb_node *n;
bool victims = false;
- dout("enter abort_on_full\n");
-
- if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
- goto out;
-
- /* Scan list and see if there is anything to abort */
- for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
- struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
- struct rb_node *m;
-
- m = rb_first(&osd->o_requests);
- while (m) {
- struct ceph_osd_request *req = rb_entry(m,
- struct ceph_osd_request, r_node);
- m = rb_next(m);
-
- if (req->r_abort_on_full) {
- victims = true;
- break;
- }
- }
- if (victims)
- break;
- }
-
- if (!victims)
- goto out;
-
- /*
- * Update the barrier to current epoch if it's behind that point,
- * since we know we have some calls to be aborted in the tree.
- */
- update_epoch_barrier(osdc, osdc->osdmap->epoch);
-
- for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
- struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
- struct rb_node *m;
-
- m = rb_first(&osd->o_requests);
- while (m) {
- struct ceph_osd_request *req = rb_entry(m,
- struct ceph_osd_request, r_node);
- m = rb_next(m);
-
- if (req->r_abort_on_full &&
- (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
- pool_full(osdc, req->r_t.target_oloc.pool)))
- abort_request(req, -ENOSPC);
- }
- }
-out:
- dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
+ if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc))
+ for_each_request(osdc, abort_on_full_fn, &victims);
}
static void check_pool_dne(struct ceph_osd_request *req)
Scanning the trees just to see if there is anything to abort is unnecessary -- all that is needed here is to update the epoch barrier first, before we start aborting. Simplify and do the update inside the loop before calling abort_request() for the first time. The switch to for_each_request() also fixes a bug: homeless requests weren't even considered for aborting. Signed-off-by: Ilya Dryomov <idryomov@gmail.com> --- net/ceph/osd_client.c | 79 +++++++++++++++++---------------------------------- 1 file changed, 26 insertions(+), 53 deletions(-)