@@ -818,18 +818,24 @@ struct md_callback {
void *data, int flag);
};
-struct md_enqueue_info;
-/* metadata stat-ahead */
-
-struct md_enqueue_info {
- struct md_op_data mi_data;
- struct lookup_intent mi_it;
- struct lustre_handle mi_lockh;
- struct inode *mi_dir;
- struct ldlm_enqueue_info mi_einfo;
- int (*mi_cb)(struct ptlrpc_request *req,
- struct md_enqueue_info *minfo, int rc);
- void *mi_cbdata;
+enum md_opcode {
+ MD_OP_NONE = 0,
+ MD_OP_GETATTR = 1,
+ MD_OP_MAX,
+};
+
+struct md_op_item {
+ enum md_opcode mop_opc;
+ struct md_op_data mop_data;
+ struct lookup_intent mop_it;
+ struct lustre_handle mop_lockh;
+ struct ldlm_enqueue_info mop_einfo;
+ int (*mop_cb)(struct req_capsule *pill,
+ struct md_op_item *item,
+ int rc);
+ void *mop_cbdata;
+ struct inode *mop_dir;
+ u64 mop_lock_flags;
};
struct obd_ops {
@@ -1060,8 +1066,8 @@ struct md_ops {
const char *name, int namelen,
struct lu_fid *fid);
- int (*intent_getattr_async)(struct obd_export *,
- struct md_enqueue_info *);
+ int (*intent_getattr_async)(struct obd_export *exp,
+ struct md_op_item *item);
int (*revalidate_lock)(struct obd_export *, struct lookup_intent *,
struct lu_fid *, u64 *bits);
@@ -1594,7 +1594,7 @@ static inline int md_init_ea_size(struct obd_export *exp, u32 easize,
}
static inline int md_intent_getattr_async(struct obd_export *exp,
- struct md_enqueue_info *minfo)
+ struct md_op_item *item)
{
int rc;
@@ -1605,7 +1605,7 @@ static inline int md_intent_getattr_async(struct obd_export *exp,
lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
LPROC_MD_INTENT_GETATTR_ASYNC);
- return MDP(exp->exp_obd, intent_getattr_async)(exp, minfo);
+ return MDP(exp->exp_obd, intent_getattr_async)(exp, item);
}
static inline int md_revalidate_lock(struct obd_export *exp,
@@ -1477,17 +1477,12 @@ struct ll_statahead_info {
* is not a hidden one
*/
unsigned int sai_skip_hidden;/* skipped hidden dentry count */
- unsigned int sai_ls_all:1, /* "ls -al", do stat-ahead for
+ unsigned int sai_ls_all:1; /* "ls -al", do stat-ahead for
* hidden entries
*/
- sai_in_readpage:1;/* statahead in readdir() */
wait_queue_head_t sai_waitq; /* stat-ahead wait queue */
struct task_struct *sai_task; /* stat-ahead thread */
struct task_struct *sai_agl_task; /* AGL thread */
- struct list_head sai_interim_entries; /* entries which got async
- * stat reply, but not
- * instantiated
- */
struct list_head sai_entries; /* completed entries */
struct list_head sai_agls; /* AGLs to be sent */
struct list_head sai_cache[LL_SA_CACHE_SIZE];
@@ -55,13 +55,12 @@ enum se_stat {
/*
* sa_entry is not refcounted: statahead thread allocates it and do async stat,
- * and in async stat callback ll_statahead_interpret() will add it into
- * sai_interim_entries, later statahead thread will call sa_handle_callback() to
- * instantiate entry and move it into sai_entries, and then only scanner process
- * can access and free it.
+ * and in async stat callback ll_statahead_interpret() will prepare the inode
+ * and set lock data in the ptlrpcd context. Then the scanner process will be
+ * woken up if this entry is the waiting one, can access and free it.
*/
struct sa_entry {
- /* link into sai_interim_entries or sai_entries */
+ /* link into sai_entries */
struct list_head se_list;
/* link into sai hash table locally */
struct list_head se_hash;
@@ -73,10 +72,6 @@ struct sa_entry {
enum se_stat se_state;
/* entry size, contains name */
int se_size;
- /* pointer to async getattr enqueue info */
- struct md_enqueue_info *se_minfo;
- /* pointer to the async getattr request */
- struct ptlrpc_request *se_req;
/* pointer to the target inode */
struct inode *se_inode;
/* entry name */
@@ -113,9 +108,7 @@ static inline int sa_hash(int val)
spin_unlock(&sai->sai_cache_lock[i]);
}
-/*
- * Remove entry from SA table.
- */
+/* unhash entry from sai_cache */
static inline void
sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
{
@@ -138,12 +131,6 @@ static inline int sa_sent_full(struct ll_statahead_info *sai)
return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
}
-/* got async stat replies */
-static inline int sa_has_callback(struct ll_statahead_info *sai)
-{
- return !list_empty(&sai->sai_interim_entries);
-}
-
static inline int agl_list_empty(struct ll_statahead_info *sai)
{
return list_empty(&sai->sai_agls);
@@ -267,8 +254,8 @@ static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
}
/* called by scanner after use, sa_entry will be killed */
-static void sa_put(struct ll_statahead_info *sai, struct sa_entry *entry,
- struct ll_inode_info *lli)
+static void
+sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
{
struct sa_entry *tmp, *next;
@@ -295,11 +282,6 @@ static void sa_put(struct ll_statahead_info *sai, struct sa_entry *entry,
break;
sa_kill(sai, tmp);
}
-
- spin_lock(&lli->lli_sa_lock);
- if (sai->sai_task)
- wake_up_process(sai->sai_task);
- spin_unlock(&lli->lli_sa_lock);
}
/*
@@ -334,55 +316,55 @@ static void sa_put(struct ll_statahead_info *sai, struct sa_entry *entry,
}
/* finish async stat RPC arguments */
-static void sa_fini_data(struct md_enqueue_info *minfo)
+static void sa_fini_data(struct md_op_item *item)
{
- ll_unlock_md_op_lsm(&minfo->mi_data);
- iput(minfo->mi_dir);
- kfree(minfo);
+ ll_unlock_md_op_lsm(&item->mop_data);
+ iput(item->mop_dir);
+ kfree(item);
}
-static int ll_statahead_interpret(struct ptlrpc_request *req,
- struct md_enqueue_info *minfo, int rc);
+static int ll_statahead_interpret(struct req_capsule *pill,
+ struct md_op_item *item, int rc);
/*
* prepare arguments for async stat RPC.
*/
-static struct md_enqueue_info *
+static struct md_op_item *
sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
{
- struct md_enqueue_info *minfo;
+ struct md_op_item *item;
struct ldlm_enqueue_info *einfo;
- struct md_op_data *op_data;
+ struct md_op_data *op_data;
- minfo = kzalloc(sizeof(*minfo), GFP_NOFS);
- if (!minfo)
+ item = kzalloc(sizeof(*item), GFP_NOFS);
+ if (!item)
return ERR_PTR(-ENOMEM);
- op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
+ op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
entry->se_qstr.name, entry->se_qstr.len, 0,
LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data)) {
- kfree(minfo);
- return (struct md_enqueue_info *)op_data;
+ kfree(item);
+ return ERR_CAST(item);
}
if (!child)
op_data->op_fid2 = entry->se_fid;
- minfo->mi_it.it_op = IT_GETATTR;
- minfo->mi_dir = igrab(dir);
- minfo->mi_cb = ll_statahead_interpret;
- minfo->mi_cbdata = entry;
-
- einfo = &minfo->mi_einfo;
- einfo->ei_type = LDLM_IBITS;
- einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
- einfo->ei_cb_bl = ll_md_blocking_ast;
- einfo->ei_cb_cp = ldlm_completion_ast;
- einfo->ei_cb_gl = NULL;
+ item->mop_it.it_op = IT_GETATTR;
+ item->mop_dir = igrab(dir);
+ item->mop_cb = ll_statahead_interpret;
+ item->mop_cbdata = entry;
+
+ einfo = &item->mop_einfo;
+ einfo->ei_type = LDLM_IBITS;
+ einfo->ei_mode = it_to_lock_mode(&item->mop_it);
+ einfo->ei_cb_bl = ll_md_blocking_ast;
+ einfo->ei_cb_cp = ldlm_completion_ast;
+ einfo->ei_cb_gl = NULL;
einfo->ei_cbdata = NULL;
- return minfo;
+ return item;
}
/*
@@ -393,22 +375,8 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
{
struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
- struct md_enqueue_info *minfo = entry->se_minfo;
- struct ptlrpc_request *req = entry->se_req;
bool wakeup;
- /* release resources used in RPC */
- if (minfo) {
- entry->se_minfo = NULL;
- ll_intent_release(&minfo->mi_it);
- sa_fini_data(minfo);
- }
-
- if (req) {
- entry->se_req = NULL;
- ptlrpc_req_finished(req);
- }
-
spin_lock(&lli->lli_sa_lock);
wakeup = __sa_make_ready(sai, entry, ret);
spin_unlock(&lli->lli_sa_lock);
@@ -465,7 +433,6 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
sai->sai_index = 1;
init_waitqueue_head(&sai->sai_waitq);
- INIT_LIST_HEAD(&sai->sai_interim_entries);
INIT_LIST_HEAD(&sai->sai_entries);
INIT_LIST_HEAD(&sai->sai_agls);
@@ -528,7 +495,6 @@ static void ll_sai_put(struct ll_statahead_info *sai)
LASSERT(sai->sai_task == NULL);
LASSERT(sai->sai_agl_task == NULL);
LASSERT(sai->sai_sent == sai->sai_replied);
- LASSERT(!sa_has_callback(sai));
list_for_each_entry_safe(entry, next, &sai->sai_entries,
se_list)
@@ -619,26 +585,63 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
}
/*
- * prepare inode for sa entry, add it into agl list, now sa_entry is ready
- * to be used by scanner process.
+ * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
+ * the inode and set lock data directly in the ptlrpcd context. It will wake up
+ * the directory listing process if the dentry is the waiting one.
*/
-static void sa_instantiate(struct ll_statahead_info *sai,
- struct sa_entry *entry)
+static int ll_statahead_interpret(struct req_capsule *pill,
+ struct md_op_item *item, int rc)
{
- struct inode *dir = sai->sai_dentry->d_inode;
- struct inode *child;
- struct md_enqueue_info *minfo;
- struct lookup_intent *it;
- struct ptlrpc_request *req;
+ struct lookup_intent *it = &item->mop_it;
+ struct inode *dir = item->mop_dir;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_info *sai = lli->lli_sai;
+ struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
struct mdt_body *body;
- int rc = 0;
+ struct inode *child;
+ u64 handle = 0;
+
+ if (it_disposition(it, DISP_LOOKUP_NEG))
+ rc = -ENOENT;
- LASSERT(entry->se_handle != 0);
+ /*
+ * because statahead thread will wait for all inflight RPC to finish,
+ * sai should be always valid, no need to refcount
+ */
+ LASSERT(sai);
+ LASSERT(entry);
- minfo = entry->se_minfo;
- it = &minfo->mi_it;
- req = entry->se_req;
- body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+ CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
+ entry->se_qstr.len, entry->se_qstr.name, rc);
+
+ if (rc != 0) {
+ ll_intent_release(it);
+ sa_fini_data(item);
+ } else {
+ /*
+ * release ibits lock ASAP to avoid deadlock when statahead
+ * thread enqueues lock on parent in readdir and another
+ * process enqueues lock on child with parent lock held, eg.
+ * unlink.
+ */
+ handle = it->it_lock_handle;
+ ll_intent_drop_lock(it);
+ ll_unlock_md_op_lsm(&item->mop_data);
+ }
+
+ if (rc != 0) {
+ spin_lock(&lli->lli_sa_lock);
+ if (__sa_make_ready(sai, entry, rc))
+ wake_up(&sai->sai_waitq);
+
+ sai->sai_replied++;
+ spin_unlock(&lli->lli_sa_lock);
+
+ return rc;
+ }
+
+ entry->se_handle = handle;
+ body = req_capsule_server_get(pill, &RMF_MDT_BODY);
if (!body) {
rc = -EFAULT;
goto out;
@@ -646,7 +649,7 @@ static void sa_instantiate(struct ll_statahead_info *sai,
child = entry->se_inode;
/* revalidate; unlinked and re-created with the same name */
- if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->mbo_fid1))) {
+ if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
if (child) {
entry->se_inode = NULL;
iput(child);
@@ -663,7 +666,7 @@ static void sa_instantiate(struct ll_statahead_info *sai,
goto out;
}
- rc = ll_prep_inode(&child, &req->rq_pill, dir->i_sb, it);
+ rc = ll_prep_inode(&child, pill, dir->i_sb, it);
if (rc)
goto out;
@@ -676,107 +679,18 @@ static void sa_instantiate(struct ll_statahead_info *sai,
if (agl_should_run(sai, child))
ll_agl_add(sai, child, entry->se_index);
-
out:
/*
- * sa_make_ready() will drop ldlm ibits lock refcount by calling
+ * First it will drop ldlm ibits lock refcount by calling
* ll_intent_drop_lock() in spite of failures. Do not worry about
* calling ll_intent_drop_lock() more than once.
*/
+ ll_intent_release(&item->mop_it);
+ sa_fini_data(item);
sa_make_ready(sai, entry, rc);
-}
-
-/* once there are async stat replies, instantiate sa_entry from replies */
-static void sa_handle_callback(struct ll_statahead_info *sai)
-{
- struct ll_inode_info *lli;
-
- lli = ll_i2info(sai->sai_dentry->d_inode);
spin_lock(&lli->lli_sa_lock);
- while (sa_has_callback(sai)) {
- struct sa_entry *entry;
-
- entry = list_first_entry(&sai->sai_interim_entries,
- struct sa_entry, se_list);
- list_del_init(&entry->se_list);
- spin_unlock(&lli->lli_sa_lock);
-
- sa_instantiate(sai, entry);
- spin_lock(&lli->lli_sa_lock);
- }
- spin_unlock(&lli->lli_sa_lock);
-}
-
-/*
- * callback for async stat, because this is called in ptlrpcd context, we only
- * put sa_entry in sai_cb_entries list, and let sa_handle_callback() to really
- * prepare inode and instantiate sa_entry later.
- */
-static int ll_statahead_interpret(struct ptlrpc_request *req,
- struct md_enqueue_info *minfo, int rc)
-{
- struct lookup_intent *it = &minfo->mi_it;
- struct inode *dir = minfo->mi_dir;
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_statahead_info *sai = lli->lli_sai;
- struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
- u64 handle = 0;
-
- if (it_disposition(it, DISP_LOOKUP_NEG))
- rc = -ENOENT;
-
- /*
- * because statahead thread will wait for all inflight RPC to finish,
- * sai should be always valid, no need to refcount
- */
- LASSERT(sai);
- LASSERT(entry);
-
- CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
- entry->se_qstr.len, entry->se_qstr.name, rc);
-
- if (rc) {
- ll_intent_release(it);
- sa_fini_data(minfo);
- } else {
- /*
- * release ibits lock ASAP to avoid deadlock when statahead
- * thread enqueues lock on parent in readdir and another
- * process enqueues lock on child with parent lock held, eg.
- * unlink.
- */
- handle = it->it_lock_handle;
- ll_intent_drop_lock(it);
- ll_unlock_md_op_lsm(&minfo->mi_data);
- }
-
- spin_lock(&lli->lli_sa_lock);
- if (rc) {
- if (__sa_make_ready(sai, entry, rc))
- wake_up(&sai->sai_waitq);
- } else {
- int first = 0;
-
- entry->se_minfo = minfo;
- entry->se_req = ptlrpc_request_addref(req);
- /*
- * Release the async ibits lock ASAP to avoid deadlock
- * when statahead thread tries to enqueue lock on parent
- * for readpage and other tries to enqueue lock on child
- * with parent's lock held, for example: unlink.
- */
- entry->se_handle = handle;
- if (!sa_has_callback(sai))
- first = 1;
-
- list_add_tail(&entry->se_list, &sai->sai_interim_entries);
-
- if (first && sai->sai_task)
- wake_up_process(sai->sai_task);
- }
sai->sai_replied++;
-
spin_unlock(&lli->lli_sa_lock);
return rc;
@@ -785,16 +699,16 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
/* async stat for file not found in dcache */
static int sa_lookup(struct inode *dir, struct sa_entry *entry)
{
- struct md_enqueue_info *minfo;
+ struct md_op_item *item;
int rc;
- minfo = sa_prep_data(dir, NULL, entry);
- if (IS_ERR(minfo))
- return PTR_ERR(minfo);
+ item = sa_prep_data(dir, NULL, entry);
+ if (IS_ERR(item))
+ return PTR_ERR(item);
- rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
+ rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
if (rc)
- sa_fini_data(minfo);
+ sa_fini_data(item);
return rc;
}
@@ -814,7 +728,7 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
.it_op = IT_GETATTR,
.it_lock_handle = 0
};
- struct md_enqueue_info *minfo;
+ struct md_op_item *item;
int rc;
if (unlikely(!inode))
@@ -823,9 +737,9 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
if (d_mountpoint(dentry))
return 1;
- minfo = sa_prep_data(dir, inode, entry);
- if (IS_ERR(minfo))
- return PTR_ERR(minfo);
+ item = sa_prep_data(dir, inode, entry);
+ if (IS_ERR(item))
+ return PTR_ERR(item);
entry->se_inode = igrab(inode);
rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
@@ -833,15 +747,15 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
if (rc == 1) {
entry->se_handle = it.it_lock_handle;
ll_intent_release(&it);
- sa_fini_data(minfo);
+ sa_fini_data(item);
return 1;
}
- rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
+ rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
if (rc) {
entry->se_inode = NULL;
iput(inode);
- sa_fini_data(minfo);
+ sa_fini_data(item);
}
return rc;
@@ -934,14 +848,14 @@ static void ll_stop_agl(struct ll_statahead_info *sai)
return;
CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
- sai, (unsigned int)sai->sai_agl_task->pid);
- kthread_stop(sai->sai_agl_task);
+ sai, (unsigned int)agl_task->pid);
+ kthread_stop(agl_task);
- sai->sai_agl_task = NULL;
spin_lock(&plli->lli_agl_lock);
- while ((clli = list_first_entry_or_null(&sai->sai_agls,
- struct ll_inode_info,
- lli_agl_list)) != NULL) {
+ clli = list_first_entry_or_null(&sai->sai_agls,
+ struct ll_inode_info,
+ lli_agl_list);
+ if (clli) {
list_del_init(&clli->lli_agl_list);
spin_unlock(&plli->lli_agl_lock);
clli->lli_agl_index = 0;
@@ -950,7 +864,7 @@ static void ll_stop_agl(struct ll_statahead_info *sai)
}
spin_unlock(&plli->lli_agl_lock);
CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
- sai, sai->sai_dentry);
+ sai, parent);
ll_sai_put(sai);
}
@@ -1014,10 +928,8 @@ static int ll_statahead_thread(void *arg)
break;
}
- sai->sai_in_readpage = 1;
page = ll_get_dir_page(dir, op_data, pos);
ll_unlock_md_op_lsm(op_data);
- sai->sai_in_readpage = 0;
if (IS_ERR(page)) {
rc = PTR_ERR(page);
CDEBUG(D_READA,
@@ -1081,14 +993,9 @@ static int ll_statahead_thread(void *arg)
while (({set_current_state(TASK_IDLE);
sai->sai_task; })) {
- if (sa_has_callback(sai)) {
- __set_current_state(TASK_RUNNING);
- sa_handle_callback(sai);
- }
-
spin_lock(&lli->lli_agl_lock);
while (sa_sent_full(sai) &&
- !agl_list_empty(sai)) {
+ !list_empty(&sai->sai_agls)) {
struct ll_inode_info *clli;
__set_current_state(TASK_RUNNING);
@@ -1140,16 +1047,11 @@ static int ll_statahead_thread(void *arg)
/*
* statahead is finished, but statahead entries need to be cached, wait
- * for file release to stop me.
+ * for file release closedir() call to stop me.
*/
while (({set_current_state(TASK_IDLE);
sai->sai_task; })) {
- if (sa_has_callback(sai)) {
- __set_current_state(TASK_RUNNING);
- sa_handle_callback(sai);
- } else {
- schedule();
- }
+ schedule();
}
__set_current_state(TASK_RUNNING);
out:
@@ -1159,13 +1061,9 @@ static int ll_statahead_thread(void *arg)
* wait for inflight statahead RPCs to finish, and then we can free sai
* safely because statahead RPC will access sai data
*/
- while (sai->sai_sent != sai->sai_replied) {
+ while (sai->sai_sent != sai->sai_replied)
/* in case we're not woken up, timeout wait */
msleep(125);
- }
-
- /* release resources held by statahead RPCs */
- sa_handle_callback(sai);
CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %pd\n",
sai, parent);
@@ -1173,8 +1071,8 @@ static int ll_statahead_thread(void *arg)
spin_lock(&lli->lli_sa_lock);
sai->sai_task = NULL;
spin_unlock(&lli->lli_sa_lock);
-
wake_up(&sai->sai_waitq);
+
ll_sai_put(sai);
return rc;
@@ -1200,8 +1098,8 @@ void ll_authorize_statahead(struct inode *dir, void *key)
}
/*
- * deauthorize opened dir handle @key to statahead, but statahead thread may
- * still be running, notify it to quit.
+ * deauthorize opened dir handle @key to statahead, and notify statahead thread
+ * to quit if it's running.
*/
void ll_deauthorize_statahead(struct inode *dir, void *key)
{
@@ -1427,10 +1325,6 @@ static int revalidate_statahead_dentry(struct inode *dir,
goto out_unplug;
}
- /* if statahead is busy in readdir, help it do post-work */
- if (!sa_ready(entry) && sai->sai_in_readpage)
- sa_handle_callback(sai);
-
if (!sa_ready(entry)) {
spin_lock(&lli->lli_sa_lock);
sai->sai_index_wait = entry->se_index;
@@ -1507,7 +1401,7 @@ static int revalidate_statahead_dentry(struct inode *dir,
*/
ldd = ll_d2d(*dentryp);
ldd->lld_sa_generation = lli->lli_sa_generation;
- sa_put(sai, entry, lli);
+ sa_put(sai, entry);
spin_lock(&lli->lli_sa_lock);
if (sai->sai_task)
wake_up_process(sai->sai_task);
@@ -1591,7 +1485,6 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
spin_lock(&lli->lli_sa_lock);
lli->lli_sai = NULL;
spin_unlock(&lli->lli_sa_lock);
- atomic_dec(&ll_i2sbi(parent->d_inode)->ll_sa_running);
rc = PTR_ERR(task);
CERROR("can't start ll_sa thread, rc : %d\n", rc);
goto out;
@@ -3431,9 +3431,9 @@ static int lmv_clear_open_replay_data(struct obd_export *exp,
}
static int lmv_intent_getattr_async(struct obd_export *exp,
- struct md_enqueue_info *minfo)
+ struct md_op_item *item)
{
- struct md_op_data *op_data = &minfo->mi_data;
+ struct md_op_data *op_data = &item->mop_data;
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_tgt_desc *ptgt = NULL;
@@ -3457,7 +3457,7 @@ static int lmv_intent_getattr_async(struct obd_export *exp,
if (ctgt != ptgt)
return -EREMOTE;
- return md_intent_getattr_async(ptgt->ltd_exp, minfo);
+ return md_intent_getattr_async(ptgt->ltd_exp, item);
}
static int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
@@ -130,8 +130,7 @@ int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
struct lu_fid *fid, u64 *bits);
-int mdc_intent_getattr_async(struct obd_export *exp,
- struct md_enqueue_info *minfo);
+int mdc_intent_getattr_async(struct obd_export *exp, struct md_op_item *item);
enum ldlm_mode mdc_lock_match(struct obd_export *exp, u64 flags,
const struct lu_fid *fid, enum ldlm_type type,
@@ -49,7 +49,7 @@
struct mdc_getattr_args {
struct obd_export *ga_exp;
- struct md_enqueue_info *ga_minfo;
+ struct md_op_item *ga_item;
};
int it_open_error(int phase, struct lookup_intent *it)
@@ -1360,10 +1360,10 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
{
struct mdc_getattr_args *ga = args;
struct obd_export *exp = ga->ga_exp;
- struct md_enqueue_info *minfo = ga->ga_minfo;
- struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
- struct lookup_intent *it = &minfo->mi_it;
- struct lustre_handle *lockh = &minfo->mi_lockh;
+ struct md_op_item *item = ga->ga_item;
+ struct ldlm_enqueue_info *einfo = &item->mop_einfo;
+ struct lookup_intent *it = &item->mop_it;
+ struct lustre_handle *lockh = &item->mop_lockh;
struct ldlm_reply *lockrep;
u64 flags = LDLM_FL_HAS_INTENT;
@@ -1388,18 +1388,17 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
if (rc)
goto out;
- rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
-
+ rc = mdc_finish_intent_lock(exp, req, &item->mop_data, it, lockh);
out:
- minfo->mi_cb(req, minfo, rc);
+ item->mop_cb(&req->rq_pill, item, rc);
return 0;
}
int mdc_intent_getattr_async(struct obd_export *exp,
- struct md_enqueue_info *minfo)
+ struct md_op_item *item)
{
- struct md_op_data *op_data = &minfo->mi_data;
- struct lookup_intent *it = &minfo->mi_it;
+ struct md_op_data *op_data = &item->mop_data;
+ struct lookup_intent *it = &item->mop_it;
struct ptlrpc_request *req;
struct mdc_getattr_args *ga;
struct ldlm_res_id res_id;
@@ -1428,11 +1427,11 @@ int mdc_intent_getattr_async(struct obd_export *exp,
* to avoid possible races. It is safe to have glimpse handler
* for non-DOM locks and costs nothing.
*/
- if (!minfo->mi_einfo.ei_cb_gl)
- minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
+ if (!item->mop_einfo.ei_cb_gl)
+ item->mop_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
- rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
- &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
+ rc = ldlm_cli_enqueue(exp, &req, &item->mop_einfo, &res_id, &policy,
+ &flags, NULL, 0, LVB_T_NONE, &item->mop_lockh, 1);
if (rc < 0) {
ptlrpc_req_finished(req);
return rc;
@@ -1440,7 +1439,7 @@ int mdc_intent_getattr_async(struct obd_export *exp,
ga = ptlrpc_req_async_args(ga, req);
ga->ga_exp = exp;
- ga->ga_minfo = minfo;
+ ga->ga_item = item;
req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
ptlrpcd_add_req(req);