@@ -855,6 +855,7 @@ enum ldlm_match_flags {
LDLM_MATCH_AST = BIT(1),
LDLM_MATCH_AST_ANY = BIT(2),
LDLM_MATCH_RIGHT = BIT(3),
+ LDLM_MATCH_GROUP = BIT(4),
};
/**
@@ -319,11 +319,6 @@ struct osc_object {
const struct osc_object_operations *oo_obj_ops;
bool oo_initialized;
-
- wait_queue_head_t oo_group_waitq;
- struct mutex oo_group_mutex;
- u64 oo_group_users;
- unsigned long oo_group_gid;
};
static inline void osc_build_res_name(struct osc_object *osc,
@@ -660,16 +655,6 @@ int osc_object_glimpse(const struct lu_env *env, const struct cl_object *obj,
int osc_object_find_cbdata(const struct lu_env *env, struct cl_object *obj,
ldlm_iterator_t iter, void *data);
int osc_object_prune(const struct lu_env *env, struct cl_object *obj);
-void osc_grouplock_inc_locked(struct osc_object *osc, struct ldlm_lock *lock);
-void osc_grouplock_dec(struct osc_object *osc, struct ldlm_lock *lock);
-int osc_grouplock_enqueue_init(const struct lu_env *env,
- struct osc_object *obj,
- struct osc_lock *oscl,
- struct lustre_handle *lh);
-void osc_grouplock_enqueue_fini(const struct lu_env *env,
- struct osc_object *obj,
- struct osc_lock *oscl,
- struct lustre_handle *lh);
/* osc_request.c */
void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd);
@@ -324,6 +324,7 @@ static int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
return 0;
}
ldlm_set_destroyed(lock);
+ wake_up(&lock->l_waitq);
ldlm_lock_remove_from_lru(lock);
class_handle_unhash(&lock->l_handle);
@@ -1067,10 +1068,12 @@ static bool lock_matches(struct ldlm_lock *lock, void *vdata)
* can still happen.
*/
if (ldlm_is_cbpending(lock) &&
- !(data->lmd_flags & LDLM_FL_CBPENDING))
+ !(data->lmd_flags & LDLM_FL_CBPENDING) &&
+ !(data->lmd_match & LDLM_MATCH_GROUP))
return false;
- if (!(data->lmd_match & LDLM_MATCH_UNREF) && ldlm_is_cbpending(lock) &&
+ if (!(data->lmd_match & (LDLM_MATCH_UNREF | LDLM_MATCH_GROUP)) &&
+ ldlm_is_cbpending(lock) &&
!lock->l_readers && !lock->l_writers)
return false;
@@ -1136,7 +1139,12 @@ static bool lock_matches(struct ldlm_lock *lock, void *vdata)
return false;
matched:
- if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
+ /**
+ * In case the lock is a CBPENDING grouplock, just pin it and return,
+ * we need to wait until it gets to DESTROYED.
+ */
+ if ((data->lmd_flags & LDLM_FL_TEST_LOCK) ||
+ (ldlm_is_cbpending(lock) && (data->lmd_match & LDLM_MATCH_GROUP))) {
LDLM_LOCK_GET(lock);
ldlm_lock_touch_in_lru(lock);
} else {
@@ -1296,6 +1304,7 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
};
struct ldlm_resource *res;
struct ldlm_lock *lock;
+ struct ldlm_lock *group_lock;
int matched;
if (!ns) {
@@ -1314,6 +1323,8 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
return 0;
}
+repeat:
+ group_lock = NULL;
LDLM_RESOURCE_ADDREF(res);
lock_res(res);
if (res->lr_type == LDLM_EXTENT)
@@ -1323,8 +1334,19 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
if (!lock && !(flags & LDLM_FL_BLOCK_GRANTED))
lock = search_queue(&res->lr_waiting, &data);
matched = lock ? mode : 0;
+
+ if (lock && ldlm_is_cbpending(lock) &&
+ (data.lmd_match & LDLM_MATCH_GROUP))
+ group_lock = lock;
unlock_res(res);
LDLM_RESOURCE_DELREF(res);
+
+ if (group_lock) {
+ l_wait_event_abortable(group_lock->l_waitq,
+ ldlm_is_destroyed(lock));
+ LDLM_LOCK_RELEASE(lock);
+ goto repeat;
+ }
ldlm_resource_putref(res);
if (lock) {
@@ -2522,15 +2522,30 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
if (ll_file_nolock(file))
return -EOPNOTSUPP;
- read_lock(&lli->lli_lock);
+retry:
+ if (file->f_flags & O_NONBLOCK) {
+ if (!mutex_trylock(&lli->lli_group_mutex))
+ return -EAGAIN;
+ } else
+ mutex_lock(&lli->lli_group_mutex);
+
if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
CWARN("group lock already existed with gid %lu\n",
fd->fd_grouplock.lg_gid);
- read_unlock(&lli->lli_lock);
- return -EINVAL;
+ rc = -EINVAL;
+ goto out;
+ }
+ if (arg != lli->lli_group_gid && lli->lli_group_users != 0) {
+ if (file->f_flags & O_NONBLOCK) {
+ rc = -EAGAIN;
+ goto out;
+ }
+ mutex_unlock(&lli->lli_group_mutex);
+ wait_var_event(&lli->lli_group_users, !lli->lli_group_users);
+ rc = 0;
+ goto retry;
}
LASSERT(!fd->fd_grouplock.lg_lock);
- read_unlock(&lli->lli_lock);
/**
* XXX: group lock needs to protect all OST objects while PFL
@@ -2549,8 +2564,10 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
u16 refcheck;
env = cl_env_get(&refcheck);
- if (IS_ERR(env))
- return PTR_ERR(env);
+ if (IS_ERR(env)) {
+ rc = PTR_ERR(env);
+ goto out;
+ }
rc = cl_object_layout_get(env, obj, &cl);
if (rc >= 0 && cl.cl_is_composite)
@@ -2559,28 +2576,26 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
cl_env_put(env, &refcheck);
if (rc < 0)
- return rc;
+ goto out;
}
rc = cl_get_grouplock(ll_i2info(inode)->lli_clob,
arg, (file->f_flags & O_NONBLOCK), &grouplock);
- if (rc)
- return rc;
- write_lock(&lli->lli_lock);
- if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
- write_unlock(&lli->lli_lock);
- CERROR("another thread just won the race\n");
- cl_put_grouplock(&grouplock);
- return -EINVAL;
- }
+ if (rc)
+ goto out;
fd->fd_flags |= LL_FILE_GROUP_LOCKED;
fd->fd_grouplock = grouplock;
- write_unlock(&lli->lli_lock);
+ if (lli->lli_group_users == 0)
+ lli->lli_group_gid = grouplock.lg_gid;
+ lli->lli_group_users++;
CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
- return 0;
+out:
+ mutex_unlock(&lli->lli_group_mutex);
+
+ return rc;
}
static int ll_put_grouplock(struct inode *inode, struct file *file,
@@ -2589,31 +2604,40 @@ static int ll_put_grouplock(struct inode *inode, struct file *file,
struct ll_inode_info *lli = ll_i2info(inode);
struct ll_file_data *fd = file->private_data;
struct ll_grouplock grouplock;
+ int rc;
- write_lock(&lli->lli_lock);
+ mutex_lock(&lli->lli_group_mutex);
if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
- write_unlock(&lli->lli_lock);
CWARN("no group lock held\n");
- return -EINVAL;
+ rc = -EINVAL;
+ goto out;
}
-
LASSERT(fd->fd_grouplock.lg_lock);
if (fd->fd_grouplock.lg_gid != arg) {
CWARN("group lock %lu doesn't match current id %lu\n",
arg, fd->fd_grouplock.lg_gid);
- write_unlock(&lli->lli_lock);
- return -EINVAL;
+ rc = -EINVAL;
+ goto out;
}
grouplock = fd->fd_grouplock;
memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
- write_unlock(&lli->lli_lock);
cl_put_grouplock(&grouplock);
+
+ lli->lli_group_users--;
+ if (lli->lli_group_users == 0) {
+ lli->lli_group_gid = 0;
+ wake_up_var(&lli->lli_group_users);
+ }
CDEBUG(D_INFO, "group lock %lu released\n", arg);
- return 0;
+ rc = 0;
+out:
+ mutex_unlock(&lli->lli_group_mutex);
+
+ return rc;
}
/**
@@ -253,6 +253,9 @@ struct ll_inode_info {
u64 lli_pcc_generation;
enum pcc_dataset_flags lli_pcc_dsflags;
struct pcc_inode *lli_pcc_inode;
+ struct mutex lli_group_mutex;
+ u64 lli_group_users;
+ unsigned long lli_group_gid;
u64 lli_attr_valid;
u64 lli_lazysize;
@@ -1194,6 +1194,9 @@ void ll_lli_init(struct ll_inode_info *lli)
lli->lli_pcc_inode = NULL;
lli->lli_pcc_dsflags = PCC_DATASET_INVALID;
lli->lli_pcc_generation = 0;
+ mutex_init(&lli->lli_group_mutex);
+ lli->lli_group_users = 0;
+ lli->lli_group_gid = 0;
}
mutex_init(&lli->lli_layout_mutex);
memset(lli->lli_jobid, 0, sizeof(lli->lli_jobid));
@@ -330,7 +330,6 @@ static int mdc_dlm_canceling(const struct lu_env *env,
*/
if (obj) {
struct cl_attr *attr = &osc_env_info(env)->oti_attr;
- void *data;
/* Destroy pages covered by the extent of the DLM lock */
result = mdc_lock_flush(env, cl2osc(obj), cl_index(obj, 0),
@@ -340,17 +339,12 @@ static int mdc_dlm_canceling(const struct lu_env *env,
*/
/* losing a lock, update kms */
lock_res_and_lock(dlmlock);
- data = dlmlock->l_ast_data;
dlmlock->l_ast_data = NULL;
cl_object_attr_lock(obj);
attr->cat_kms = 0;
cl_object_attr_update(env, obj, attr, CAT_KMS);
cl_object_attr_unlock(obj);
unlock_res_and_lock(dlmlock);
-
- /* Skip dec in case mdc_object_ast_clear() did it */
- if (data && dlmlock->l_req_mode == LCK_GROUP)
- osc_grouplock_dec(cl2osc(obj), dlmlock);
cl_object_put(env, obj);
}
return result;
@@ -457,7 +451,7 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc,
}
static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
- struct lustre_handle *lockh, int errcode)
+ struct lustre_handle *lockh)
{
struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj);
struct ldlm_lock *dlmlock;
@@ -510,9 +504,6 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
LASSERT(oscl->ols_state != OLS_GRANTED);
oscl->ols_state = OLS_GRANTED;
-
- if (errcode != ELDLM_LOCK_MATCHED && dlmlock->l_req_mode == LCK_GROUP)
- osc_grouplock_inc_locked(osc, dlmlock);
}
/**
@@ -544,7 +535,7 @@ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh,
CDEBUG(D_INODE, "rc %d, err %d\n", rc, errcode);
if (rc == 0)
- mdc_lock_granted(env, oscl, lockh, errcode);
+ mdc_lock_granted(env, oscl, lockh);
/* Error handling, some errors are tolerable. */
if (oscl->ols_glimpse && rc == -ENAVAIL) {
@@ -706,7 +697,8 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
struct ldlm_intent *lit;
enum ldlm_mode mode;
bool glimpse = *flags & LDLM_FL_HAS_INTENT;
- u64 match_flags = *flags;
+ u64 search_flags = *flags;
+ u64 match_flags = 0;
LIST_HEAD(cancels);
int rc, count;
int lvb_size;
@@ -716,11 +708,14 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
if (einfo->ei_mode == LCK_PR)
mode |= LCK_PW;
- match_flags |= LDLM_FL_LVB_READY;
+ search_flags |= LDLM_FL_LVB_READY;
if (glimpse)
- match_flags |= LDLM_FL_BLOCK_GRANTED;
- mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
- einfo->ei_type, policy, mode, &lockh);
+ search_flags |= LDLM_FL_BLOCK_GRANTED;
+ if (mode == LCK_GROUP)
+ match_flags = LDLM_MATCH_GROUP;
+ mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
+ res_id, einfo->ei_type, policy, mode,
+ &lockh, match_flags);
if (mode) {
struct ldlm_lock *matched;
@@ -833,9 +828,9 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
*
* This function does not wait for the network communication to complete.
*/
-static int __mdc_lock_enqueue(const struct lu_env *env,
- const struct cl_lock_slice *slice,
- struct cl_io *unused, struct cl_sync_io *anchor)
+static int mdc_lock_enqueue(const struct lu_env *env,
+ const struct cl_lock_slice *slice,
+ struct cl_io *unused, struct cl_sync_io *anchor)
{
struct osc_thread_info *info = osc_env_info(env);
struct osc_io *oio = osc_env_io(env);
@@ -921,28 +916,6 @@ static int __mdc_lock_enqueue(const struct lu_env *env,
return result;
}
-static int mdc_lock_enqueue(const struct lu_env *env,
- const struct cl_lock_slice *slice,
- struct cl_io *unused, struct cl_sync_io *anchor)
-{
- struct osc_object *obj = cl2osc(slice->cls_obj);
- struct osc_lock *oscl = cl2osc_lock(slice);
- struct lustre_handle lh = { 0 };
- int rc;
-
- if (oscl->ols_cl.cls_lock->cll_descr.cld_mode == CLM_GROUP) {
- rc = osc_grouplock_enqueue_init(env, obj, oscl, &lh);
- if (rc < 0)
- return rc;
- }
-
- rc = __mdc_lock_enqueue(env, slice, unused, anchor);
-
- if (oscl->ols_cl.cls_lock->cll_descr.cld_mode == CLM_GROUP)
- osc_grouplock_enqueue_fini(env, obj, oscl, &lh);
- return rc;
-}
-
static const struct cl_lock_operations mdc_lock_lockless_ops = {
.clo_fini = osc_lock_fini,
.clo_enqueue = mdc_lock_enqueue,
@@ -1468,9 +1441,6 @@ static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data)
memcpy(lvb, &oinfo->loi_lvb, sizeof(oinfo->loi_lvb));
cl_object_attr_unlock(&osc->oo_cl);
ldlm_clear_lvb_cached(lock);
-
- if (lock->l_req_mode == LCK_GROUP)
- osc_grouplock_dec(osc, lock);
}
return LDLM_ITER_CONTINUE;
}
@@ -198,7 +198,7 @@ void osc_lock_lvb_update(const struct lu_env *env,
}
static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
- struct lustre_handle *lockh, int errcode)
+ struct lustre_handle *lockh)
{
struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj);
struct ldlm_lock *dlmlock;
@@ -254,126 +254,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
LASSERT(oscl->ols_state != OLS_GRANTED);
oscl->ols_state = OLS_GRANTED;
-
- if (errcode != ELDLM_LOCK_MATCHED && dlmlock->l_req_mode == LCK_GROUP)
- osc_grouplock_inc_locked(osc, dlmlock);
-}
-
-void osc_grouplock_inc_locked(struct osc_object *osc, struct ldlm_lock *lock)
-{
- LASSERT(lock->l_req_mode == LCK_GROUP);
-
- if (osc->oo_group_users == 0)
- osc->oo_group_gid = lock->l_policy_data.l_extent.gid;
- osc->oo_group_users++;
-
- LDLM_DEBUG(lock, "users %llu gid %llu\n",
- osc->oo_group_users,
- lock->l_policy_data.l_extent.gid);
-}
-EXPORT_SYMBOL(osc_grouplock_inc_locked);
-
-void osc_grouplock_dec(struct osc_object *osc, struct ldlm_lock *lock)
-{
- LASSERT(lock->l_req_mode == LCK_GROUP);
-
- mutex_lock(&osc->oo_group_mutex);
-
- LASSERT(osc->oo_group_users > 0);
- osc->oo_group_users--;
- if (osc->oo_group_users == 0) {
- osc->oo_group_gid = 0;
- wake_up_all(&osc->oo_group_waitq);
- }
- mutex_unlock(&osc->oo_group_mutex);
-
- LDLM_DEBUG(lock, "users %llu gid %lu\n",
- osc->oo_group_users, osc->oo_group_gid);
}
-EXPORT_SYMBOL(osc_grouplock_dec);
-
-int osc_grouplock_enqueue_init(const struct lu_env *env,
- struct osc_object *obj,
- struct osc_lock *oscl,
- struct lustre_handle *lh)
-{
- struct cl_lock_descr *need = &oscl->ols_cl.cls_lock->cll_descr;
- int rc = 0;
-
- LASSERT(need->cld_mode == CLM_GROUP);
-
- while (true) {
- bool check_gid = true;
-
- if (oscl->ols_flags & LDLM_FL_BLOCK_NOWAIT) {
- if (!mutex_trylock(&obj->oo_group_mutex))
- return -EAGAIN;
- } else {
- mutex_lock(&obj->oo_group_mutex);
- }
-
- /**
- * If a grouplock of the same gid already exists, match it
- * here in advance. Otherwise, if that lock is being cancelled
- * there is a chance to get 2 grouplocks for the same file.
- */
- if (obj->oo_group_users &&
- obj->oo_group_gid == need->cld_gid) {
- struct osc_thread_info *info = osc_env_info(env);
- struct ldlm_res_id *resname = &info->oti_resname;
- union ldlm_policy_data *policy = &info->oti_policy;
- struct cl_lock *lock = oscl->ols_cl.cls_lock;
- u64 flags = oscl->ols_flags | LDLM_FL_BLOCK_GRANTED;
- struct ldlm_namespace *ns;
- enum ldlm_mode mode;
-
- ns = osc_export(obj)->exp_obd->obd_namespace;
- ostid_build_res_name(&obj->oo_oinfo->loi_oi, resname);
- osc_lock_build_policy(env, lock, policy);
- mode = ldlm_lock_match(ns, flags, resname,
- oscl->ols_einfo.ei_type, policy,
- oscl->ols_einfo.ei_mode, lh);
- if (mode)
- oscl->ols_flags |= LDLM_FL_MATCH_LOCK;
- else
- check_gid = false;
- }
-
- /**
- * If a grouplock exists but cannot be matched, let it to flush
- * and wait just for zero users for now.
- */
- if (obj->oo_group_users == 0 ||
- (check_gid && obj->oo_group_gid == need->cld_gid))
- break;
-
- mutex_unlock(&obj->oo_group_mutex);
- if (oscl->ols_flags & LDLM_FL_BLOCK_NOWAIT)
- return -EAGAIN;
-
- rc = l_wait_event_abortable(obj->oo_group_waitq,
- !obj->oo_group_users);
- if (rc)
- return rc;
- }
-
- return 0;
-}
-EXPORT_SYMBOL(osc_grouplock_enqueue_init);
-
-void osc_grouplock_enqueue_fini(const struct lu_env *env,
- struct osc_object *obj,
- struct osc_lock *oscl,
- struct lustre_handle *lh)
-{
- LASSERT(oscl->ols_cl.cls_lock->cll_descr.cld_mode == CLM_GROUP);
-
- /* If a user was added on enqueue_init, decref it */
- if (lustre_handle_is_used(lh))
- ldlm_lock_decref(lh, oscl->ols_einfo.ei_mode);
- mutex_unlock(&obj->oo_group_mutex);
-}
-EXPORT_SYMBOL(osc_grouplock_enqueue_fini);
/**
* Lock upcall function that is executed either when a reply to ENQUEUE rpc is
@@ -403,7 +284,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh,
}
if (rc == 0)
- osc_lock_granted(env, oscl, lockh, errcode);
+ osc_lock_granted(env, oscl, lockh);
/* Error handling, some errors are tolerable. */
if (oscl->ols_glimpse && rc == -ENAVAIL) {
@@ -540,7 +421,6 @@ static int __osc_dlm_blocking_ast(const struct lu_env *env,
struct ldlm_extent *extent = &dlmlock->l_policy_data.l_extent;
struct cl_attr *attr = &osc_env_info(env)->oti_attr;
u64 old_kms;
- void *data;
/* Destroy pages covered by the extent of the DLM lock */
result = osc_lock_flush(cl2osc(obj),
@@ -553,7 +433,6 @@ static int __osc_dlm_blocking_ast(const struct lu_env *env,
/* clearing l_ast_data after flushing data,
* to let glimpse ast find the lock and the object
*/
- data = dlmlock->l_ast_data;
dlmlock->l_ast_data = NULL;
cl_object_attr_lock(obj);
/* Must get the value under the lock to avoid race. */
@@ -567,9 +446,6 @@ static int __osc_dlm_blocking_ast(const struct lu_env *env,
cl_object_attr_unlock(obj);
unlock_res_and_lock(dlmlock);
- /* Skip dec in case osc_object_ast_clear() did it */
- if (data && dlmlock->l_req_mode == LCK_GROUP)
- osc_grouplock_dec(cl2osc(obj), dlmlock);
cl_object_put(env, obj);
}
return result;
@@ -1055,9 +931,9 @@ int osc_lock_enqueue_wait(const struct lu_env *env, struct osc_object *obj,
*
* This function does not wait for the network communication to complete.
*/
-static int __osc_lock_enqueue(const struct lu_env *env,
- const struct cl_lock_slice *slice,
- struct cl_io *unused, struct cl_sync_io *anchor)
+static int osc_lock_enqueue(const struct lu_env *env,
+ const struct cl_lock_slice *slice,
+ struct cl_io *unused, struct cl_sync_io *anchor)
{
struct osc_thread_info *info = osc_env_info(env);
struct osc_io *oio = osc_env_io(env);
@@ -1177,29 +1053,6 @@ static int __osc_lock_enqueue(const struct lu_env *env,
return result;
}
-static int osc_lock_enqueue(const struct lu_env *env,
- const struct cl_lock_slice *slice,
- struct cl_io *unused, struct cl_sync_io *anchor)
-{
- struct osc_object *obj = cl2osc(slice->cls_obj);
- struct osc_lock *oscl = cl2osc_lock(slice);
- struct lustre_handle lh = { 0 };
- int rc;
-
- if (oscl->ols_cl.cls_lock->cll_descr.cld_mode == CLM_GROUP) {
- rc = osc_grouplock_enqueue_init(env, obj, oscl, &lh);
- if (rc < 0)
- return rc;
- }
-
- rc = __osc_lock_enqueue(env, slice, unused, anchor);
-
- if (oscl->ols_cl.cls_lock->cll_descr.cld_mode == CLM_GROUP)
- osc_grouplock_enqueue_fini(env, obj, oscl, &lh);
-
- return rc;
-}
-
/**
* Breaks a link between osc_lock and dlm_lock.
*/
@@ -74,10 +74,6 @@ int osc_object_init(const struct lu_env *env, struct lu_object *obj,
atomic_set(&osc->oo_nr_ios, 0);
init_waitqueue_head(&osc->oo_io_waitq);
- init_waitqueue_head(&osc->oo_group_waitq);
- mutex_init(&osc->oo_group_mutex);
- osc->oo_group_users = 0;
- osc->oo_group_gid = 0;
osc->oo_root.rb_node = NULL;
INIT_LIST_HEAD(&osc->oo_hp_exts);
@@ -117,7 +113,6 @@ void osc_object_free(const struct lu_env *env, struct lu_object *obj)
LASSERT(atomic_read(&osc->oo_nr_writes) == 0);
LASSERT(list_empty(&osc->oo_ol_list));
LASSERT(!atomic_read(&osc->oo_nr_ios));
- LASSERT(!osc->oo_group_users);
lu_object_fini(obj);
/* osc doen't contain an lu_object_header, so we don't need call_rcu */
@@ -230,17 +225,6 @@ static int osc_object_ast_clear(struct ldlm_lock *lock, void *data)
memcpy(lvb, &oinfo->loi_lvb, sizeof(oinfo->loi_lvb));
cl_object_attr_unlock(&osc->oo_cl);
ldlm_clear_lvb_cached(lock);
-
- /**
- * Object is being destroyed and gets unlinked from the lock,
- * IO is finished and no cached data is left under the lock. As
- * grouplock is immediately marked CBPENDING it is not reused.
- * It will also be not possible to flush data later due to a
- * NULL l_ast_data - enough conditions to let new grouplocks to
- * be enqueued even if the lock still exists on client.
- */
- if (lock->l_req_mode == LCK_GROUP)
- osc_grouplock_dec(osc, lock);
}
return LDLM_ITER_CONTINUE;
}
@@ -3009,7 +3009,8 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
struct lustre_handle lockh = { 0 };
struct ptlrpc_request *req = NULL;
int intent = *flags & LDLM_FL_HAS_INTENT;
- u64 match_flags = *flags;
+ u64 search_flags = *flags;
+ u64 match_flags = 0;
enum ldlm_mode mode;
int rc;
@@ -3040,11 +3041,14 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
* because they will not actually use the lock.
*/
if (!speculative)
- match_flags |= LDLM_FL_LVB_READY;
+ search_flags |= LDLM_FL_LVB_READY;
if (intent != 0)
- match_flags |= LDLM_FL_BLOCK_GRANTED;
- mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
- einfo->ei_type, policy, mode, &lockh);
+ search_flags |= LDLM_FL_BLOCK_GRANTED;
+ if (mode == LCK_GROUP)
+ match_flags = LDLM_MATCH_GROUP;
+ mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
+ res_id, einfo->ei_type, policy, mode,
+ &lockh, match_flags);
if (mode) {
struct ldlm_lock *matched;