@@ -302,6 +302,7 @@
#define OBD_FAIL_LDLM_CP_CB_WAIT3 0x321
#define OBD_FAIL_LDLM_CP_CB_WAIT4 0x322
#define OBD_FAIL_LDLM_CP_CB_WAIT5 0x323
+#define OBD_FAIL_LDLM_PAUSE_CANCEL_LOCAL 0x329
#define OBD_FAIL_LDLM_GRANT_CHECK 0x32a
#define OBD_FAIL_LDLM_LOCAL_CANCEL_PAUSE 0x32c
@@ -187,15 +187,29 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
LDLM_LOCK_RELEASE(lock);
}
+static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
+{
+ if (req->rq_no_reply)
+ return 0;
+
+ req->rq_status = rc;
+ if (!req->rq_packed_final) {
+ rc = lustre_pack_reply(req, 1, NULL, NULL);
+ if (rc)
+ return rc;
+ }
+ return ptlrpc_reply(req);
+}
+
/*
* Callback handler for receiving incoming completion ASTs.
*
* This only can happen on client side.
*/
-static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
- struct ldlm_namespace *ns,
- struct ldlm_request *dlm_req,
- struct ldlm_lock *lock)
+static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
+ struct ldlm_namespace *ns,
+ struct ldlm_request *dlm_req,
+ struct ldlm_lock *lock)
{
int lvb_len;
LIST_HEAD(ast_list);
@@ -206,6 +220,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
long to = HZ;
+ ldlm_callback_reply(req, 0);
+
while (to > 0) {
schedule_timeout_interruptible(to);
if (ldlm_is_granted(lock) ||
@@ -250,6 +266,12 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
lock_res_and_lock(lock);
}
+ if (ldlm_is_failed(lock)) {
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_RELEASE(lock);
+ return -EINVAL;
+ }
+
if (ldlm_is_destroyed(lock) ||
ldlm_is_granted(lock)) {
/* bug 11300: the lock has already been granted */
@@ -321,6 +343,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
wake_up(&lock->l_waitq);
}
LDLM_LOCK_RELEASE(lock);
+
+ return 0;
}
/**
@@ -373,20 +397,6 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
LDLM_LOCK_RELEASE(lock);
}
-static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
-{
- if (req->rq_no_reply)
- return 0;
-
- req->rq_status = rc;
- if (!req->rq_packed_final) {
- rc = lustre_pack_reply(req, 1, NULL, NULL);
- if (rc)
- return rc;
- }
- return ptlrpc_reply(req);
-}
-
static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
enum ldlm_cancel_flags cancel_flags)
{
@@ -714,8 +724,9 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
case LDLM_CP_CALLBACK:
CDEBUG(D_INODE, "completion ast\n");
req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
- ldlm_callback_reply(req, 0);
- ldlm_handle_cp_callback(req, ns, dlm_req, lock);
+ rc = ldlm_handle_cp_callback(req, ns, dlm_req, lock);
+ if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE))
+ ldlm_callback_reply(req, rc);
break;
case LDLM_GL_CALLBACK:
CDEBUG(D_INODE, "glimpse ast\n");
@@ -964,6 +964,9 @@ static u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
bool local_only;
LDLM_DEBUG(lock, "client-side cancel");
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL_LOCAL,
+ cfs_fail_val);
+
/* Set this flag to prevent others from getting new references*/
lock_res_and_lock(lock);
ldlm_set_cbpending(lock);