@@ -461,7 +461,12 @@ enum lu_object_header_flags {
/**
* Mark this object has already been taken out of cache.
*/
- LU_OBJECT_UNHASHED = 1,
+ LU_OBJECT_UNHASHED = 1,
+ /**
+ * Object is initialized, when object is found in cache, it may not be
+ * initialized yet, the object allocator will initialize it.
+ */
+ LU_OBJECT_INITED = 2
};
enum lu_object_header_attr {
@@ -656,6 +661,14 @@ static inline int lu_object_is_dying(const struct lu_object_header *h)
return test_bit(LU_OBJECT_HEARD_BANSHEE, &h->loh_flags);
}
+/**
+ * Return true if object is initialized.
+ */
+static inline int lu_object_is_inited(const struct lu_object_header *h)
+{
+ return test_bit(LU_OBJECT_INITED, &h->loh_flags);
+}
+
void lu_object_put(const struct lu_env *env, struct lu_object *o);
void lu_object_unhash(const struct lu_env *env, struct lu_object *o);
int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s, int nr,
@@ -371,6 +371,7 @@
#define OBD_FAIL_OBD_IDX_READ_BREAK 0x608
#define OBD_FAIL_OBD_NO_LRU 0x609
#define OBD_FAIL_OBDCLASS_MODULE_LOAD 0x60a
+#define OBD_FAIL_OBD_ZERO_NLINK_RACE 0x60b
#define OBD_FAIL_TGT_REPLY_NET 0x700
#define OBD_FAIL_TGT_CONN_RACE 0x701
@@ -67,13 +67,14 @@ struct lu_site_bkt_data {
struct list_head lsb_lru;
/**
* Wait-queue signaled when an object in this site is ultimately
- * destroyed (lu_object_free()). It is used by lu_object_find() to
- * wait before re-trying when object in the process of destruction is
- * found in the hash table.
+ * destroyed (lu_object_free()) or initialized (lu_object_start()).
+ * It is used by lu_object_find() to wait before re-trying when
+ * object in the process of destruction is found in the hash table;
+ * or wait object to be initialized by the allocator.
*
* \see htable_lookup().
*/
- wait_queue_head_t lsb_marche_funebre;
+ wait_queue_head_t lsb_waitq;
};
enum {
@@ -116,7 +117,7 @@ enum {
cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
- return &bkt->lsb_marche_funebre;
+ return &bkt->lsb_waitq;
}
EXPORT_SYMBOL(lu_site_wq_from_fid);
@@ -168,7 +169,7 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
* somebody may be waiting for this, currently only
* used for cl_object, see cl_object_put_last().
*/
- wake_up_all(&bkt->lsb_marche_funebre);
+ wake_up_all(&bkt->lsb_waitq);
}
return;
}
@@ -255,16 +256,9 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
*/
static struct lu_object *lu_object_alloc(const struct lu_env *env,
struct lu_device *dev,
- const struct lu_fid *f,
- const struct lu_object_conf *conf)
+ const struct lu_fid *f)
{
- struct lu_object *scan;
struct lu_object *top;
- struct list_head *layers;
- unsigned int init_mask = 0;
- unsigned int init_flag;
- int clean;
- int result;
/*
* Create top-level object slice. This will also create
@@ -280,6 +274,27 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
* after this point.
*/
top->lo_header->loh_fid = *f;
+
+ return top;
+}
+
+/**
+ * Initialize object.
+ *
+ * This is called after object hash insertion to avoid returning an object with
+ * stale attributes.
+ */
+static int lu_object_start(const struct lu_env *env, struct lu_device *dev,
+ struct lu_object *top,
+ const struct lu_object_conf *conf)
+{
+ struct lu_object *scan;
+ struct list_head *layers;
+ unsigned int init_mask = 0;
+ unsigned int init_flag;
+ int clean;
+ int result;
+
layers = &top->lo_header->loh_layers;
do {
@@ -295,10 +310,9 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
clean = 0;
scan->lo_header = top->lo_header;
result = scan->lo_ops->loo_object_init(env, scan, conf);
- if (result != 0) {
- lu_object_free(env, top);
- return ERR_PTR(result);
- }
+ if (result)
+ return result;
+
init_mask |= init_flag;
next:
init_flag <<= 1;
@@ -308,15 +322,16 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
list_for_each_entry_reverse(scan, layers, lo_linkage) {
if (scan->lo_ops->loo_object_start) {
result = scan->lo_ops->loo_object_start(env, scan);
- if (result != 0) {
- lu_object_free(env, top);
- return ERR_PTR(result);
- }
+ if (result)
+ return result;
}
}
lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
- return top;
+
+ set_bit(LU_OBJECT_INITED, &top->lo_header->loh_flags);
+
+ return 0;
}
/**
@@ -598,7 +613,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
const struct lu_fid *f,
u64 *version)
{
- struct lu_site_bkt_data *bkt;
struct lu_object_header *h;
struct hlist_node *hnode;
u64 ver = cfs_hash_bd_version_get(bd);
@@ -607,7 +621,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
return ERR_PTR(-ENOENT);
*version = ver;
- bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
/* cfs_hash_bd_peek_locked is a somehow "internal" function
* of cfs_hash, it doesn't add refcount on object.
*/
@@ -681,7 +694,9 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
struct lu_site *s;
struct cfs_hash *hs;
struct cfs_hash_bd bd;
+ struct lu_site_bkt_data *bkt;
u64 version = 0;
+ int rc;
/*
* This uses standard index maintenance protocol:
@@ -703,26 +718,50 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
*/
s = dev->ld_site;
hs = s->ls_obj_hash;
+ if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
+ lu_site_purge(env, s, -1);
cfs_hash_bd_get(hs, f, &bd);
+ bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
if (!(conf && conf->loc_flags & LOC_F_NEW)) {
cfs_hash_bd_lock(hs, &bd, 1);
o = htable_lookup(s, &bd, f, &version);
cfs_hash_bd_unlock(hs, &bd, 1);
- if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
+ if (!IS_ERR(o)) {
+ if (likely(lu_object_is_inited(o->lo_header)))
+ return o;
+
+ wait_event_idle(bkt->lsb_waitq,
+ lu_object_is_inited(o->lo_header) ||
+ lu_object_is_dying(o->lo_header));
+
+ if (lu_object_is_dying(o->lo_header)) {
+ lu_object_put(env, o);
+
+ return ERR_PTR(-ENOENT);
+ }
+
+ return o;
+ }
+
+ if (PTR_ERR(o) != -ENOENT)
return o;
}
+
/*
- * Allocate new object. This may result in rather complicated
- * operations, including fld queries, inode loading, etc.
+ * Allocate new object, NB, object is uninitialized in case object
+ * is changed between allocation and hash insertion, thus the object
+ * with stale attributes is returned.
*/
- o = lu_object_alloc(env, dev, f, conf);
+ o = lu_object_alloc(env, dev, f);
if (IS_ERR(o))
return o;
LASSERT(lu_fid_eq(lu_object_fid(o), f));
+ CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE);
+
cfs_hash_bd_lock(hs, &bd, 1);
if (conf && conf->loc_flags & LOC_F_NEW)
@@ -733,6 +772,20 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
cfs_hash_bd_unlock(hs, &bd, 1);
+ /*
+ * This may result in rather complicated operations, including
+ * fld queries, inode loading, etc.
+ */
+ rc = lu_object_start(env, dev, o, conf);
+ if (rc) {
+ set_bit(LU_OBJECT_HEARD_BANSHEE,
+ &o->lo_header->loh_flags);
+ lu_object_put(env, o);
+ return ERR_PTR(rc);
+ }
+
+ wake_up_all(&bkt->lsb_waitq);
+
lu_object_limit(env, dev);
return o;
@@ -741,6 +794,20 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
cfs_hash_bd_unlock(hs, &bd, 1);
lu_object_free(env, o);
+
+ if (!(conf && conf->loc_flags & LOC_F_NEW) &&
+ !lu_object_is_inited(shadow->lo_header)) {
+ wait_event_idle(bkt->lsb_waitq,
+ lu_object_is_inited(shadow->lo_header) ||
+ lu_object_is_dying(shadow->lo_header));
+
+ if (lu_object_is_dying(shadow->lo_header)) {
+ lu_object_put(env, shadow);
+
+ return ERR_PTR(-ENOENT);
+ }
+ }
+
return shadow;
}
EXPORT_SYMBOL(lu_object_find_at);
@@ -998,7 +1065,7 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
INIT_LIST_HEAD(&bkt->lsb_lru);
- init_waitqueue_head(&bkt->lsb_marche_funebre);
+ init_waitqueue_head(&bkt->lsb_waitq);
}
s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
@@ -187,7 +187,7 @@ static inline void cfs_race(u32 id)
CERROR("cfs_race id %x sleeping\n", id);
rc = wait_event_interruptible(cfs_race_waitq,
!!cfs_race_state);
- CERROR("cfs_fail_race id %x awake, rc=%d\n", id, rc);
+ CERROR("cfs_fail_race id %x awake: rc=%d\n", id, rc);
} else {
CERROR("cfs_fail_race id %x waking\n", id);
cfs_race_state = 1;
@@ -198,4 +198,42 @@ static inline void cfs_race(u32 id)
#define CFS_RACE(id) cfs_race(id)
+/**
+ * Wait on race.
+ *
+ * The first thread that calls this with a matching fail_loc is put to sleep,
+ * but subseqent callers of this won't sleep. Until another thread that calls
+ * cfs_race_wakeup(), the first thread will be woken up and continue.
+ */
+static inline void cfs_race_wait(u32 id)
+{
+ if (CFS_FAIL_PRECHECK(id)) {
+ if (unlikely(__cfs_fail_check_set(id, 0, CFS_FAIL_LOC_NOSET))) {
+ int rc;
+
+ cfs_race_state = 0;
+ CERROR("cfs_race id %x sleeping\n", id);
+ rc = wait_event_interruptible(cfs_race_waitq,
+ cfs_race_state != 0);
+ CERROR("cfs_fail_race id %x awake: rc=%d\n", id, rc);
+ }
+ }
+}
+#define CFS_RACE_WAIT(id) cfs_race_wait(id)
+
+/**
+ * Wake up the thread that is waiting on the matching fail_loc.
+ */
+static inline void cfs_race_wakeup(u32 id)
+{
+ if (CFS_FAIL_PRECHECK(id)) {
+ if (likely(!__cfs_fail_check_set(id, 0, CFS_FAIL_LOC_NOSET))) {
+ CERROR("cfs_fail_race id %x waking\n", id);
+ cfs_race_state = 1;
+ wake_up(&cfs_race_waitq);
+ }
+ }
+}
+#define CFS_RACE_WAKEUP(id) cfs_race_wakeup(id)
+
#endif /* _LIBCFS_FAIL_H */