@@ -1904,6 +1904,7 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
u32 version, int opcode, char **bufs,
struct ptlrpc_cli_ctx *ctx);
void ptlrpc_req_finished(struct ptlrpc_request *request);
+void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request);
struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req);
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
unsigned int nfrags,
@@ -1713,6 +1713,8 @@ struct root_squash_info {
spinlock_t rsi_lock; /* protects rsi_nosquash_nids */
};
+int server_name2index(const char *svname, u32 *idx, const char **endptr);
+
/* linux-module.c */
struct obd_ioctl_data;
int obd_ioctl_getdata(struct obd_ioctl_data **data, int *len, void __user *arg);
@@ -365,6 +365,7 @@
#define OBD_FAIL_PTLRPC_BULK_REPLY_ATTACH 0x522
#define OBD_FAIL_PTLRPC_ROUND_XID 0x530
#define OBD_FAIL_PTLRPC_CONNECT_RACE 0x531
+#define OBD_FAIL_PTLRPC_IDLE_RACE 0x533
#define OBD_FAIL_OBD_PING_NET 0x600
/* OBD_FAIL_OBD_LOG_CANCEL_NET 0x601 obsolete since 1.5 */
@@ -618,8 +618,7 @@ int server_name2fsname(const char *svname, char *fsname,
* rc < 0 on error
* if endptr isn't NULL it is set to end of name
*/
-static int server_name2index(const char *svname, u32 *idx,
- const char **endptr)
+int server_name2index(const char *svname, u32 *idx, const char **endptr)
{
unsigned long index;
int rc;
@@ -658,6 +657,7 @@ static int server_name2index(const char *svname, u32 *idx,
return rc;
}
+EXPORT_SYMBOL(server_name2index);
/*************** mount common between server and client ***************/
@@ -1037,6 +1037,11 @@ static int osc_lock_enqueue(const struct lu_env *env,
if (osc_lock_is_lockless(oscl)) {
oio->oi_lockless = 1;
} else if (!async) {
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_IDLE_RACE)) {
+ OBD_RACE(OBD_FAIL_PTLRPC_IDLE_RACE);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ schedule_timeout(HZ / 2);
+ }
LASSERT(oscl->ols_state == OLS_GRANTED);
LASSERT(oscl->ols_hold);
LASSERT(oscl->ols_dlmlock);
@@ -2543,6 +2543,17 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
ptlrpc_request_cache_free(request);
}
+static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
+/**
+ * Drop one request reference. Must be called with import imp_lock held.
+ * When reference count drops to zero, request is freed.
+ */
+void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
+{
+ assert_spin_locked(&request->rq_import->imp_lock);
+ (void)__ptlrpc_req_finished(request, 1);
+}
+
/**
* Helper function
* Drops one reference count for request @request.
@@ -1653,7 +1653,6 @@ static struct ptlrpc_request *ptlrpc_disconnect_prep_req(struct obd_import *imp)
req->rq_timeout = min_t(timeout_t, req->rq_timeout,
INITIAL_CONNECT_TIMEOUT);
- import_set_state(imp, LUSTRE_IMP_CONNECTING);
req->rq_send_state = LUSTRE_IMP_CONNECTING;
ptlrpc_request_set_replen(req);
@@ -1701,16 +1700,20 @@ int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
!ptlrpc_import_in_recovery(imp));
}
- spin_lock(&imp->imp_lock);
- if (imp->imp_state != LUSTRE_IMP_FULL)
- goto out;
- spin_unlock(&imp->imp_lock);
-
req = ptlrpc_disconnect_prep_req(imp);
if (IS_ERR(req)) {
rc = PTR_ERR(req);
goto set_state;
}
+
+ spin_lock(&imp->imp_lock);
+ if (imp->imp_state != LUSTRE_IMP_FULL) {
+ ptlrpc_req_finished_with_imp_lock(req);
+ goto out;
+ }
+ import_set_state_nolock(imp, LUSTRE_IMP_CONNECTING);
+ spin_unlock(&imp->imp_lock);
+
rc = ptlrpc_queue_wait(req);
ptlrpc_req_finished(req);
@@ -1794,6 +1797,21 @@ static int ptlrpc_disconnect_idle_interpret(const struct lu_env *env,
return 0;
}
+static bool ptlrpc_can_idle(struct obd_import *imp)
+{
+ struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
+
+ /* one request for disconnect rpc */
+ if (atomic_read(&imp->imp_reqs) > 1)
+ return false;
+
+ /* any lock increases ns_bref being a resource holder */
+ if (ns && atomic_read(&ns->ns_bref) > 0)
+ return false;
+
+ return true;
+}
+
int ptlrpc_disconnect_and_idle_import(struct obd_import *imp)
{
struct ptlrpc_request *req;
@@ -1804,31 +1822,38 @@ int ptlrpc_disconnect_and_idle_import(struct obd_import *imp)
if (ptlrpc_import_in_recovery(imp))
return 0;
- spin_lock(&imp->imp_lock);
+ req = ptlrpc_disconnect_prep_req(imp);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
- if (imp->imp_state != LUSTRE_IMP_FULL) {
+ req->rq_interpret_reply = ptlrpc_disconnect_idle_interpret;
+
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_IDLE_RACE)) {
+ u32 idx;
+
+ server_name2index(imp->imp_obd->obd_name, &idx, NULL);
+ if (idx == 0)
+ OBD_RACE(OBD_FAIL_PTLRPC_IDLE_RACE);
+ }
+
+ spin_lock(&imp->imp_lock);
+ if (imp->imp_state != LUSTRE_IMP_FULL || !ptlrpc_can_idle(imp)) {
+ ptlrpc_req_finished_with_imp_lock(req);
spin_unlock(&imp->imp_lock);
return 0;
}
+ import_set_state_nolock(imp, LUSTRE_IMP_CONNECTING);
+ /* don't make noise at reconnection */
+ imp->imp_was_idle = 1;
spin_unlock(&imp->imp_lock);
- req = ptlrpc_disconnect_prep_req(imp);
- if (IS_ERR(req))
- return PTR_ERR(req);
-
CDEBUG_LIMIT(imp->imp_idle_debug, "%s: disconnect after %llus idle\n",
imp->imp_obd->obd_name,
ktime_get_real_seconds() - imp->imp_last_reply_time);
- /* don't make noise at reconnection */
- spin_lock(&imp->imp_lock);
- imp->imp_was_idle = 1;
- spin_unlock(&imp->imp_lock);
-
- req->rq_interpret_reply = ptlrpc_disconnect_idle_interpret;
ptlrpcd_add_req(req);
- return 0;
+ return 1;
}
EXPORT_SYMBOL(ptlrpc_disconnect_and_idle_import);
@@ -127,8 +127,9 @@ static int ptlrpc_ping(struct obd_import *imp)
{
struct ptlrpc_request *req;
- if (ptlrpc_check_import_is_idle(imp))
- return ptlrpc_disconnect_and_idle_import(imp);
+ if (ptlrpc_check_import_is_idle(imp) &&
+ ptlrpc_disconnect_and_idle_import(imp) == 1)
+ return 0;
req = ptlrpc_prep_ping(imp);
if (!req) {