@@ -649,13 +649,10 @@ struct ldlm_lock {
*/
struct portals_handle l_handle;
/**
- * Internal spinlock protects l_resource. We should hold this lock
- * first before taking res_lock.
- */
- spinlock_t l_lock;
- /**
* Pointer to actual resource this lock is in.
- * ldlm_lock_change_resource() can change this.
+ * ldlm_lock_change_resource() can change this on the client.
+ * When this is possible, rcu must be used to stablise
+ * the resource while we lock and check it hasn't been changed.
*/
struct ldlm_resource *l_resource;
/**
@@ -889,9 +886,13 @@ struct ldlm_resource {
/**
* List item for list in namespace hash.
- * protected by ns_lock
+ * protected by ns_lock.
+ * Shared with linkage for RCU-delayed free.
*/
- struct hlist_node lr_hash;
+ union {
+ struct hlist_node lr_hash;
+ struct rcu_head lr_rcu;
+ };
/** Reference count for this resource */
atomic_t lr_refcount;
@@ -41,19 +41,25 @@
*
* LDLM locking uses resource to serialize access to locks
* but there is a case when we change resource of lock upon
- * enqueue reply. We rely on lock->l_resource = new_res
+ * enqueue reply. We rely on rcu_assign_pointer(lock->l_resource, new_res)
* being an atomic operation.
*/
struct ldlm_resource *lock_res_and_lock(struct ldlm_lock *lock)
- __acquires(&lock->l_lock)
- __acquires(&lock->l_resource->lr_lock)
+__acquires(&lock->l_resource->lr_lock)
{
- spin_lock(&lock->l_lock);
+ struct ldlm_resource *res;
- lock_res(lock->l_resource);
-
- ldlm_set_res_locked(lock);
- return lock->l_resource;
+ rcu_read_lock();
+ while (1) {
+ res = rcu_dereference(lock->l_resource);
+ lock_res(res);
+ if (res == lock->l_resource) {
+ ldlm_set_res_locked(lock);
+ rcu_read_unlock();
+ return res;
+ }
+ unlock_res(res);
+ }
}
EXPORT_SYMBOL(lock_res_and_lock);
@@ -61,13 +67,10 @@ struct ldlm_resource *lock_res_and_lock(struct ldlm_lock *lock)
* Unlock a lock and its resource previously locked with lock_res_and_lock
*/
void unlock_res_and_lock(struct ldlm_lock *lock)
- __releases(&lock->l_resource->lr_lock)
- __releases(&lock->l_lock)
+__releases(&lock->l_resource->lr_lock)
{
- /* on server-side resource of lock doesn't change */
ldlm_clear_res_locked(lock);
unlock_res(lock->l_resource);
- spin_unlock(&lock->l_lock);
}
EXPORT_SYMBOL(unlock_res_and_lock);
@@ -385,8 +385,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
if (!lock)
return NULL;
- spin_lock_init(&lock->l_lock);
- lock->l_resource = resource;
+ RCU_INIT_POINTER(lock->l_resource, resource);
lu_ref_add(&resource->lr_reference, "lock", lock);
refcount_set(&lock->l_handle.h_ref, 2);
@@ -455,12 +454,13 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
lu_ref_add(&newres->lr_reference, "lock", lock);
/*
- * To flip the lock from the old to the new resource, lock, oldres and
- * newres have to be locked. Resource spin-locks are nested within
- * lock->l_lock, and are taken in the memory address order to avoid
- * dead-locks.
+ * To flip the lock from the old to the new resource, oldres
+ * and newres have to be locked. Resource spin-locks are taken
+ * in the memory address order to avoid dead-locks.
+ * As this is the only circumstance where ->l_resource
+ * can change, and this cannot race with itself, it is safe
+ * to access lock->l_resource without being careful about locking.
*/
- spin_lock(&lock->l_lock);
oldres = lock->l_resource;
if (oldres < newres) {
lock_res(oldres);
@@ -471,9 +471,9 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
}
LASSERT(memcmp(new_resid, &oldres->lr_name,
sizeof(oldres->lr_name)) != 0);
- lock->l_resource = newres;
+ rcu_assign_pointer(lock->l_resource, newres);
unlock_res(oldres);
- unlock_res_and_lock(lock);
+ unlock_res(newres);
/* ...and the flowers are still standing! */
lu_ref_del(&oldres->lr_reference, "lock", lock);
@@ -1875,11 +1875,11 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
va_list args;
struct va_format vaf;
- if (spin_trylock(&lock->l_lock)) {
- if (lock->l_resource)
- resource = ldlm_resource_getref(lock->l_resource);
- spin_unlock(&lock->l_lock);
- }
+ rcu_read_lock();
+ resource = rcu_dereference(lock->l_resource);
+ if (resource && !atomic_inc_not_zero(&resource->lr_refcount))
+ resource = NULL;
+ rcu_read_unlock();
va_start(args, fmt);
vaf.fmt = fmt;
@@ -1208,6 +1208,23 @@ static int ldlm_cleanup(void)
return 0;
}
+void ldlm_resource_init_once(void *p)
+{
+ /*
+ * It is import to initialise the spinlock only once,
+ * as ldlm_lock_change_resource() could try to lock
+ * the resource *after* it has been freed and possibly
+ * reused. SLAB_TYPESAFE_BY_RCU ensures the memory won't
+ * be freed while the lock is being taken, but we need to
+ * ensure that it doesn't get reinitialized either.
+ */
+ struct ldlm_resource *res = p;
+
+ memset(res, 0, sizeof(*res));
+ mutex_init(&res->lr_lvb_mutex);
+ spin_lock_init(&res->lr_lock);
+}
+
int ldlm_init(void)
{
mutex_init(&ldlm_ref_mutex);
@@ -1215,7 +1232,9 @@ int ldlm_init(void)
mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
ldlm_resource_slab = kmem_cache_create("ldlm_resources",
sizeof(struct ldlm_resource), 0,
- SLAB_HWCACHE_ALIGN, NULL);
+ SLAB_TYPESAFE_BY_RCU |
+ SLAB_HWCACHE_ALIGN,
+ ldlm_resource_init_once);
if (!ldlm_resource_slab)
return -ENOMEM;
@@ -1248,6 +1267,7 @@ void ldlm_exit(void)
{
if (ldlm_refcount)
CERROR("ldlm_refcount is %d in %s!\n", ldlm_refcount, __func__);
+ synchronize_rcu();
kmem_cache_destroy(ldlm_resource_slab);
/*
* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
@@ -1064,12 +1064,14 @@ static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type)
{
struct ldlm_resource *res;
- res = kmem_cache_zalloc(ldlm_resource_slab, GFP_NOFS);
+ res = kmem_cache_alloc(ldlm_resource_slab, GFP_NOFS);
if (!res)
return NULL;
INIT_LIST_HEAD(&res->lr_granted);
INIT_LIST_HEAD(&res->lr_waiting);
+ res->lr_lvb_inode = NULL;
+ res->lr_lvb_len = 0;
if (ldlm_type == LDLM_EXTENT) {
int idx;
@@ -1087,17 +1089,13 @@ static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type)
res->lr_itree[idx].lit_mode = BIT(idx);
res->lr_itree[idx].lit_root = RB_ROOT_CACHED;
}
+ } else {
+ res->lr_itree = NULL;
}
atomic_set(&res->lr_refcount, 1);
- spin_lock_init(&res->lr_lock);
lu_ref_init(&res->lr_reference);
- /* Since LVB init can be delayed now, there is no longer need to
- * immediately acquire mutex here.
- */
- mutex_init(&res->lr_lvb_mutex);
-
return res;
}