@@ -308,6 +308,7 @@
#define OBD_FAIL_LDLM_GRANT_CHECK 0x32a
#define OBD_FAIL_LDLM_LOCAL_CANCEL_PAUSE 0x32c
+#define OBD_FAIL_LDLM_REPLAY_PAUSE 0x32e
/* LOCKLESS IO */
#define OBD_FAIL_LDLM_SET_CONTENTION 0x385
@@ -2220,6 +2220,8 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
"Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
ldlm_ns_name(ns), ns->ns_nr_unused);
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_REPLAY_PAUSE, cfs_fail_val);
+
/*
* We don't need to care whether or not LRU resize is enabled
* because the LDLM_LRU_FLAG_NO_WAIT policy doesn't use the
@@ -204,6 +204,8 @@ static int ll_dom_lock_cancel(struct inode *inode, struct ldlm_lock *lock)
if (IS_ERR(env))
return PTR_ERR(env);
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_REPLAY_PAUSE, cfs_fail_val);
+
/* reach MDC layer to flush data under the DoM ldlm lock */
rc = cl_object_flush(env, lli->lli_clob, lock);
if (rc == -ENODATA) {
@@ -251,6 +251,11 @@ struct lov_mirror_entry {
unsigned short lre_end; /* end index of this mirror */
};
+enum lov_object_flags {
+ /* Layout is invalid, set when layout lock is lost */
+ LO_LAYOUT_INVALID = 0x1,
+};
+
/**
* lov-specific file state.
*
@@ -281,10 +286,9 @@ struct lov_object {
*/
enum lov_layout_type lo_type;
/**
- * True if layout is invalid. This bit is cleared when layout lock
- * is lost.
+ * Object flags.
*/
- bool lo_layout_invalid;
+ unsigned long lo_obj_flags;
/**
* How many IOs are on going on this object. Layout can be changed
* only if there is no active IO.
@@ -177,7 +177,7 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
LASSERT(old_obj);
old_lov = cl2lov(lu2cl(old_obj));
- if (old_lov->lo_layout_invalid) {
+ if (test_bit(LO_LAYOUT_INVALID, &old_lov->lo_obj_flags)) {
/* the object's layout has already changed but isn't
* refreshed
*/
@@ -628,7 +628,7 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
LASSERT(lsm->lsm_entry_count > 0);
LASSERT(!lov->lo_lsm);
lov->lo_lsm = lsm_addref(lsm);
- lov->lo_layout_invalid = true;
+ set_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags);
dump_lsm(D_INODE, lsm);
@@ -910,7 +910,8 @@ static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
static int lov_print_empty(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct lu_object *o)
{
- (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
+ (*p)(env, cookie, "empty %d\n",
+ test_bit(LO_LAYOUT_INVALID, &lu2lov(o)->lo_obj_flags));
return 0;
}
@@ -923,8 +924,8 @@ static int lov_print_composite(const struct lu_env *env, void *cookie,
(*p)(env, cookie, "entries: %d, %s, lsm{%p 0x%08X %d %u}:\n",
lsm->lsm_entry_count,
- lov->lo_layout_invalid ? "invalid" : "valid", lsm,
- lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
+ test_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags) ? "invalid" :
+ "valid", lsm, lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
lsm->lsm_layout_gen);
for (i = 0; i < lsm->lsm_entry_count; i++) {
@@ -953,8 +954,8 @@ static int lov_print_released(const struct lu_env *env, void *cookie,
(*p)(env, cookie,
"released: %s, lsm{%p 0x%08X %d %u}:\n",
- lov->lo_layout_invalid ? "invalid" : "valid", lsm,
- lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
+ test_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags) ? "invalid" :
+ "valid", lsm, lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
lsm->lsm_layout_gen);
return 0;
}
@@ -967,7 +968,8 @@ static int lov_print_foreign(const struct lu_env *env, void *cookie,
(*p)(env, cookie,
"foreign: %s, lsm{%p 0x%08X %d %u}:\n",
- lov->lo_layout_invalid ? "invalid" : "valid", lsm,
+ test_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags) ?
+ "invalid" : "valid", lsm,
lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
lsm->lsm_layout_gen);
(*p)(env, cookie,
@@ -1352,15 +1354,15 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
dump_lsm(D_INODE, lsm);
}
- lov_conf_lock(lov);
if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
- lov->lo_layout_invalid = true;
+ set_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags);
result = 0;
- goto out;
+ goto out_lsm;
}
+ lov_conf_lock(lov);
if (conf->coc_opc == OBJECT_CONF_WAIT) {
- if (lov->lo_layout_invalid &&
+ if (test_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags) &&
atomic_read(&lov->lo_active_ios) > 0) {
lov_conf_unlock(lov);
result = lov_layout_wait(env, lov);
@@ -1378,26 +1380,31 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
(lov->lo_lsm->lsm_entries[0]->lsme_pattern ==
lsm->lsm_entries[0]->lsme_pattern))) {
/* same version of layout */
- lov->lo_layout_invalid = false;
+ clear_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags);
result = 0;
goto out;
}
/* will change layout - check if there still exists active IO. */
if (atomic_read(&lov->lo_active_ios) > 0) {
- lov->lo_layout_invalid = true;
+ set_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags);
result = -EBUSY;
goto out;
}
result = lov_layout_change(env, lov, lsm, conf);
- lov->lo_layout_invalid = result != 0;
+ if (result)
+ set_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags);
+ else
+ clear_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags);
out:
lov_conf_unlock(lov);
+out_lsm:
lov_lsm_put(lsm);
- CDEBUG(D_INODE, DFID " lo_layout_invalid=%d\n",
- PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
+ CDEBUG(D_INODE, DFID " lo_layout_invalid=%u\n",
+ PFID(lu_object_fid(lov2lu(lov))),
+ test_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags));
return result;
}
@@ -2254,7 +2261,8 @@ static struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
lsm = lsm_addref(lov->lo_lsm);
CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
lsm, atomic_read(&lsm->lsm_refc),
- lov->lo_layout_invalid, current);
+ test_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags),
+ current);
}
lov_conf_thaw(lov);
return lsm;