diff mbox series

[539/622] lustre: ldlm: FLOCK request can be processed twice

Message ID 1582838290-17243-540-git-send-email-jsimmons@infradead.org (mailing list archive)
State New, archived
Headers show
Series lustre: sync closely to 2.13.52 | expand

Commit Message

James Simmons Feb. 27, 2020, 9:16 p.m. UTC
From: Andriy Skulysh <c17819@cray.com>

Original request can be processed after resend
request, so it can create a lock on MDT without
client lock or unlock other lock.

Make flock enqueue to use modify RPC slot.

Cray-bug-id: LUS-5739
WC-bug-id: https://jira.whamcloud.com/browse/LU-12828
Lustre-commit: 85a12c6c8d7a ("LU-12828 ldlm: FLOCK request can be processed twice")
Signed-off-by: Andriy Skulysh <c17819@cray.com>
Signed-off-by: Vitaly Fertman <c17818@cray.com>
Reviewed-by: Alexander Boyko <c17825@cray.com>
Reviewed-by: Andrew Perepechko <c17827@cray.com>
Reviewed-on: https://review.whamcloud.com/36340
Reviewed-by: Alexandr Boyko <c17825@cray.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Signed-off-by: James Simmons <jsimmons@infradead.org>
---
 fs/lustre/include/lustre_dlm.h |  3 +++
 fs/lustre/include/lustre_mdc.h | 25 -----------------------
 fs/lustre/include/lustre_net.h |  3 ++-
 fs/lustre/include/obd_class.h  |  6 ++----
 fs/lustre/ldlm/ldlm_request.c  | 34 ++++++++++++++++++++++++++++---
 fs/lustre/mdc/mdc_locks.c      | 45 +++++++++++++-----------------------------
 fs/lustre/mdc/mdc_reint.c      |  4 ++--
 fs/lustre/mdc/mdc_request.c    | 20 +++++++++----------
 fs/lustre/obdclass/genops.c    | 30 +++++-----------------------
 fs/lustre/ptlrpc/client.c      | 29 +++++++++++++++++++++++++--
 10 files changed, 96 insertions(+), 103 deletions(-)
diff mbox series

Patch

diff --git a/fs/lustre/include/lustre_dlm.h b/fs/lustre/include/lustre_dlm.h
index 7621d1e..31d360e 100644
--- a/fs/lustre/include/lustre_dlm.h
+++ b/fs/lustre/include/lustre_dlm.h
@@ -959,6 +959,8 @@  struct ldlm_enqueue_info {
 	void			*ei_cbdata;
 	/* whether enqueue slave stripes */
 	unsigned int		ei_enq_slave:1;
+	/* whether acquire rpc slot */
+	unsigned int		ei_enq_slot:1;
 };
 
 extern struct obd_ops ldlm_obd_ops;
@@ -1279,6 +1281,7 @@  int ldlm_prep_elc_req(struct obd_export *exp,
 		      int version, int opc, int canceloff,
 		      struct list_head *cancels, int count);
 
+struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len);
 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 			  enum ldlm_type type, u8 with_policy,
 			  enum ldlm_mode mode,
diff --git a/fs/lustre/include/lustre_mdc.h b/fs/lustre/include/lustre_mdc.h
index f57783d..d7b6e4a 100644
--- a/fs/lustre/include/lustre_mdc.h
+++ b/fs/lustre/include/lustre_mdc.h
@@ -60,31 +60,6 @@ 
 struct ptlrpc_request;
 struct obd_device;
 
-static inline void mdc_get_mod_rpc_slot(struct ptlrpc_request *req,
-					struct lookup_intent *it)
-{
-	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
-	u32 opc;
-	u16 tag;
-
-	opc = lustre_msg_get_opc(req->rq_reqmsg);
-	tag = obd_get_mod_rpc_slot(cli, opc, it);
-	lustre_msg_set_tag(req->rq_reqmsg, tag);
-	ptlrpc_reassign_next_xid(req);
-}
-
-static inline void mdc_put_mod_rpc_slot(struct ptlrpc_request *req,
-					struct lookup_intent *it)
-{
-	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
-	u32 opc;
-	u16 tag;
-
-	opc = lustre_msg_get_opc(req->rq_reqmsg);
-	tag = lustre_msg_get_tag(req->rq_reqmsg);
-	obd_put_mod_rpc_slot(cli, opc, it, tag);
-}
-
 /**
  * Update the maximum possible easize.
  *
diff --git a/fs/lustre/include/lustre_net.h b/fs/lustre/include/lustre_net.h
index 87e1d60..90a0b01 100644
--- a/fs/lustre/include/lustre_net.h
+++ b/fs/lustre/include/lustre_net.h
@@ -1919,7 +1919,8 @@  void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
 u64 ptlrpc_next_xid(void);
 u64 ptlrpc_sample_next_xid(void);
 u64 ptlrpc_req_xid(struct ptlrpc_request *request);
-void ptlrpc_reassign_next_xid(struct ptlrpc_request *req);
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req);
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req);
 
 /* Set of routines to run a function in ptlrpcd context */
 void *ptlrpcd_alloc_work(struct obd_import *imp,
diff --git a/fs/lustre/include/obd_class.h b/fs/lustre/include/obd_class.h
index bc01eca..a099768 100644
--- a/fs/lustre/include/obd_class.h
+++ b/fs/lustre/include/obd_class.h
@@ -115,10 +115,8 @@  static inline char *obd_import_nid2str(struct obd_import *imp)
 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, u16 max);
 int obd_mod_rpc_stats_seq_show(struct client_obd *cli, struct seq_file *seq);
 
-u16 obd_get_mod_rpc_slot(struct client_obd *cli, u32 opc,
-			 struct lookup_intent *it);
-void obd_put_mod_rpc_slot(struct client_obd *cli, u32 opc,
-			  struct lookup_intent *it, u16 tag);
+u16 obd_get_mod_rpc_slot(struct client_obd *cli, u32 opc);
+void obd_put_mod_rpc_slot(struct client_obd *cli, u32 opc, u16 tag);
 
 struct llog_handle;
 struct llog_rec_hdr;
diff --git a/fs/lustre/ldlm/ldlm_request.c b/fs/lustre/ldlm/ldlm_request.c
index 20bdba4..6df057d 100644
--- a/fs/lustre/ldlm/ldlm_request.c
+++ b/fs/lustre/ldlm/ldlm_request.c
@@ -347,6 +347,11 @@  static void failed_lock_cleanup(struct ldlm_namespace *ns,
 	}
 }
 
+static bool ldlm_request_slot_needed(enum ldlm_type type)
+{
+	return type == LDLM_FLOCK || type == LDLM_IBITS;
+}
+
 /**
  * Finishing portion of client lock enqueue code.
  *
@@ -365,6 +370,11 @@  int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 	struct ldlm_reply *reply;
 	int cleanup_phase = 1;
 
+	if (ldlm_request_slot_needed(type))
+		obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
+
+	ptlrpc_put_mod_rpc_slot(req);
+
 	lock = ldlm_handle2lock(lockh);
 	/* ldlm_cli_enqueue is holding a reference on this lock. */
 	if (!lock) {
@@ -662,8 +672,7 @@  int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
 }
 EXPORT_SYMBOL(ldlm_prep_enqueue_req);
 
-static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp,
-						int lvb_len)
+struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len)
 {
 	struct ptlrpc_request *req;
 	int rc;
@@ -682,6 +691,7 @@  static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp,
 	ptlrpc_request_set_replen(req);
 	return req;
 }
+EXPORT_SYMBOL(ldlm_enqueue_pack);
 
 /**
  * Client-side lock enqueue.
@@ -814,6 +824,24 @@  int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 					     LDLM_GLIMPSE_ENQUEUE);
 	}
 
+	/* It is important to obtain modify RPC slot first (if applicable), so
+	 * that threads that are waiting for a modify RPC slot are not polluting
+	 * our rpcs in flight counter.
+	 */
+	if (einfo->ei_enq_slot)
+		ptlrpc_get_mod_rpc_slot(req);
+
+	if (ldlm_request_slot_needed(einfo->ei_type)) {
+		rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
+		if (rc) {
+			if (einfo->ei_enq_slot)
+				ptlrpc_put_mod_rpc_slot(req);
+			failed_lock_cleanup(ns, lock, einfo->ei_mode);
+			LDLM_LOCK_RELEASE(lock);
+			goto out;
+		}
+	}
+
 	if (async) {
 		LASSERT(reqp);
 		return 0;
@@ -835,7 +863,7 @@  int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 		LDLM_LOCK_RELEASE(lock);
 	else
 		rc = err;
-
+out:
 	if (!req_passed_in && req) {
 		ptlrpc_req_finished(req);
 		if (reqp)
diff --git a/fs/lustre/mdc/mdc_locks.c b/fs/lustre/mdc/mdc_locks.c
index 4d40087..60bbae1 100644
--- a/fs/lustre/mdc/mdc_locks.c
+++ b/fs/lustre/mdc/mdc_locks.c
@@ -856,6 +856,16 @@  static int mdc_finish_enqueue(struct obd_export *exp,
 	return rc;
 }
 
+static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it)
+{
+	if (it &&
+	    (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
+	     it->it_op == IT_READDIR ||
+	     (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
+		return true;
+	return false;
+}
+
 /* We always reserve enough space in the reply packet for a stripe MD, because
  * we don't know in advance the file type.
  */
@@ -877,7 +887,7 @@  int mdc_enqueue_base(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 		.l_inodebits = { MDS_INODELOCK_XATTR }
 	};
 	struct obd_device *obddev = class_exp2obd(exp);
-	struct ptlrpc_request *req = NULL;
+	struct ptlrpc_request *req;
 	u64 flags, saved_flags = extra_lock_flags;
 	struct ldlm_res_id res_id;
 	int generation, resends = 0;
@@ -920,6 +930,7 @@  int mdc_enqueue_base(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 		LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
 			 einfo->ei_type);
 		res_id.name[3] = LDLM_FLOCK;
+		req = ldlm_enqueue_pack(exp, 0);
 	} else if (it->it_op & IT_OPEN) {
 		req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
 	} else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
@@ -947,21 +958,7 @@  int mdc_enqueue_base(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 		req->rq_sent = ktime_get_real_seconds() + resends;
 	}
 
-	/* It is important to obtain modify RPC slot first (if applicable), so
-	 * that threads that are waiting for a modify RPC slot are not polluting
-	 * our rpcs in flight counter.
-	 * We do not do flock request limiting, though
-	 */
-	if (it) {
-		mdc_get_mod_rpc_slot(req, it);
-		rc = obd_get_request_slot(&obddev->u.cli);
-		if (rc != 0) {
-			mdc_put_mod_rpc_slot(req, it);
-			mdc_clear_replay_flag(req, 0);
-			ptlrpc_req_finished(req);
-			return rc;
-		}
-	}
+	einfo->ei_enq_slot = !mdc_skip_mod_rpc_slot(it);
 
 	/* With Data-on-MDT the glimpse callback is needed too.
 	 * It is set here in advance but not in mdc_finish_enqueue()
@@ -987,12 +984,10 @@  int mdc_enqueue_base(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 		    (einfo->ei_type == LDLM_FLOCK) &&
 		    (einfo->ei_mode == LCK_NL))
 			goto resend;
+		ptlrpc_req_finished(req);
 		return rc;
 	}
 
-	obd_put_request_slot(&obddev->u.cli);
-	mdc_put_mod_rpc_slot(req, it);
-
 	if (rc < 0) {
 		CDEBUG(D_INFO,
 		       "%s: ldlm_cli_enqueue " DFID ":" DFID "=%s failed: rc = %d\n",
@@ -1343,16 +1338,12 @@  static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
 	struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
 	struct lookup_intent *it;
 	struct lustre_handle *lockh;
-	struct obd_device *obddev;
 	struct ldlm_reply *lockrep;
 	u64 flags = LDLM_FL_HAS_INTENT;
 
 	it = &minfo->mi_it;
 	lockh = &minfo->mi_lockh;
 
-	obddev = class_exp2obd(exp);
-
-	obd_put_request_slot(&obddev->u.cli);
 	if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
 		rc = -ETIMEDOUT;
 
@@ -1387,7 +1378,6 @@  int mdc_intent_getattr_async(struct obd_export *exp,
 	struct lookup_intent *it = &minfo->mi_it;
 	struct ptlrpc_request *req;
 	struct mdc_getattr_args *ga;
-	struct obd_device *obddev = class_exp2obd(exp);
 	struct ldlm_res_id res_id;
 	union ldlm_policy_data policy = {
 		.l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
@@ -1409,12 +1399,6 @@  int mdc_intent_getattr_async(struct obd_export *exp,
 	if (IS_ERR(req))
 		return PTR_ERR(req);
 
-	rc = obd_get_request_slot(&obddev->u.cli);
-	if (rc != 0) {
-		ptlrpc_req_finished(req);
-		return rc;
-	}
-
 	/* With Data-on-MDT the glimpse callback is needed too.
 	 * It is set here in advance but not in mdc_finish_enqueue()
 	 * to avoid possible races. It is safe to have glimpse handler
@@ -1426,7 +1410,6 @@  int mdc_intent_getattr_async(struct obd_export *exp,
 	rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
 			      &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
 	if (rc < 0) {
-		obd_put_request_slot(&obddev->u.cli);
 		ptlrpc_req_finished(req);
 		return rc;
 	}
diff --git a/fs/lustre/mdc/mdc_reint.c b/fs/lustre/mdc/mdc_reint.c
index 0dc0de4..dade5686 100644
--- a/fs/lustre/mdc/mdc_reint.c
+++ b/fs/lustre/mdc/mdc_reint.c
@@ -47,9 +47,9 @@  static int mdc_reint(struct ptlrpc_request *request, int level)
 
 	request->rq_send_state = level;
 
-	mdc_get_mod_rpc_slot(request, NULL);
+	ptlrpc_get_mod_rpc_slot(request);
 	rc = ptlrpc_queue_wait(request);
-	mdc_put_mod_rpc_slot(request, NULL);
+	ptlrpc_put_mod_rpc_slot(request);
 	if (rc)
 		CDEBUG(D_INFO, "error in handling %d\n", rc);
 	else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY))
diff --git a/fs/lustre/mdc/mdc_request.c b/fs/lustre/mdc/mdc_request.c
index 54f6d15..8569858 100644
--- a/fs/lustre/mdc/mdc_request.c
+++ b/fs/lustre/mdc/mdc_request.c
@@ -412,12 +412,12 @@  static int mdc_xattr_common(struct obd_export *exp,
 
 	/* make rpc */
 	if (opcode == MDS_REINT)
-		mdc_get_mod_rpc_slot(req, NULL);
+		ptlrpc_get_mod_rpc_slot(req);
 
 	rc = ptlrpc_queue_wait(req);
 
 	if (opcode == MDS_REINT)
-		mdc_put_mod_rpc_slot(req, NULL);
+		ptlrpc_put_mod_rpc_slot(req);
 
 	if (rc)
 		ptlrpc_req_finished(req);
@@ -990,9 +990,9 @@  static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
 
 	ptlrpc_request_set_replen(req);
 
-	mdc_get_mod_rpc_slot(req, NULL);
+	ptlrpc_get_mod_rpc_slot(req);
 	rc = ptlrpc_queue_wait(req);
-	mdc_put_mod_rpc_slot(req, NULL);
+	ptlrpc_put_mod_rpc_slot(req);
 
 	if (!req->rq_repmsg) {
 		CDEBUG(D_RPCTRACE, "request %p failed to send: rc = %d\n", req,
@@ -1779,9 +1779,9 @@  static int mdc_ioc_hsm_progress(struct obd_export *exp,
 
 	ptlrpc_request_set_replen(req);
 
-	mdc_get_mod_rpc_slot(req, NULL);
+	ptlrpc_get_mod_rpc_slot(req);
 	rc = ptlrpc_queue_wait(req);
-	mdc_put_mod_rpc_slot(req, NULL);
+	ptlrpc_put_mod_rpc_slot(req);
 out:
 	ptlrpc_req_finished(req);
 	return rc;
@@ -1984,9 +1984,9 @@  static int mdc_ioc_hsm_state_set(struct obd_export *exp,
 
 	ptlrpc_request_set_replen(req);
 
-	mdc_get_mod_rpc_slot(req, NULL);
+	ptlrpc_get_mod_rpc_slot(req);
 	rc = ptlrpc_queue_wait(req);
-	mdc_put_mod_rpc_slot(req, NULL);
+	ptlrpc_put_mod_rpc_slot(req);
 out:
 	ptlrpc_req_finished(req);
 	return rc;
@@ -2049,9 +2049,9 @@  static int mdc_ioc_hsm_request(struct obd_export *exp,
 
 	ptlrpc_request_set_replen(req);
 
-	mdc_get_mod_rpc_slot(req, NULL);
+	ptlrpc_get_mod_rpc_slot(req);
 	rc = ptlrpc_queue_wait(req);
-	mdc_put_mod_rpc_slot(req, NULL);
+	ptlrpc_put_mod_rpc_slot(req);
 out:
 	ptlrpc_req_finished(req);
 	return rc;
diff --git a/fs/lustre/obdclass/genops.c b/fs/lustre/obdclass/genops.c
index 7f841d5..bceb055 100644
--- a/fs/lustre/obdclass/genops.c
+++ b/fs/lustre/obdclass/genops.c
@@ -1495,36 +1495,18 @@  static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
 	return avail;
 }
 
-static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
-{
-	if (it &&
-	    (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
-	     it->it_op == IT_READDIR ||
-	     (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
-		return true;
-	return false;
-}
-
 /* Get a modify RPC slot from the obd client @cli according
- * to the kind of operation @opc that is going to be sent
- * and the intent @it of the operation if it applies.
+ * to the kind of operation @opc that is going to be sent.
  * If the maximum number of modify RPCs in flight is reached
  * the thread is put to sleep.
  * Returns the tag to be set in the request message. Tag 0
  * is reserved for non-modifying requests.
  */
-u16 obd_get_mod_rpc_slot(struct client_obd *cli, u32 opc,
-			 struct lookup_intent *it)
+u16 obd_get_mod_rpc_slot(struct client_obd *cli, u32 opc)
 {
 	bool close_req = false;
 	u16 i, max;
 
-	/* read-only metadata RPCs don't consume a slot on MDT
-	 * for reply reconstruction
-	 */
-	if (obd_skip_mod_rpc_slot(it))
-		return 0;
-
 	if (opc == MDS_CLOSE)
 		close_req = true;
 
@@ -1567,15 +1549,13 @@  u16 obd_get_mod_rpc_slot(struct client_obd *cli, u32 opc,
 
 /*
  * Put a modify RPC slot from the obd client @cli according
- * to the kind of operation @opc that has been sent and the
- * intent @it of the operation if it applies.
+ * to the kind of operation @opc that has been sent.
  */
-void obd_put_mod_rpc_slot(struct client_obd *cli, u32 opc,
-			  struct lookup_intent *it, u16 tag)
+void obd_put_mod_rpc_slot(struct client_obd *cli, u32 opc, u16 tag)
 {
 	bool close_req = false;
 
-	if (obd_skip_mod_rpc_slot(it))
+	if (tag == 0)
 		return;
 
 	if (opc == MDS_CLOSE)
diff --git a/fs/lustre/ptlrpc/client.c b/fs/lustre/ptlrpc/client.c
index 8d874f2..632ddf1 100644
--- a/fs/lustre/ptlrpc/client.c
+++ b/fs/lustre/ptlrpc/client.c
@@ -717,7 +717,7 @@  static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
 
 static atomic64_t ptlrpc_last_xid;
 
-void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
+static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
 {
 	spin_lock(&req->rq_import->imp_lock);
 	list_del_init(&req->rq_unreplied_list);
@@ -725,7 +725,32 @@  void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
 	spin_unlock(&req->rq_import->imp_lock);
 	DEBUG_REQ(D_RPCTRACE, req, "reassign xid");
 }
-EXPORT_SYMBOL(ptlrpc_reassign_next_xid);
+
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req)
+{
+	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+	u32 opc;
+	u16 tag;
+
+	opc = lustre_msg_get_opc(req->rq_reqmsg);
+	tag = obd_get_mod_rpc_slot(cli, opc);
+	lustre_msg_set_tag(req->rq_reqmsg, tag);
+	ptlrpc_reassign_next_xid(req);
+}
+EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot);
+
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req)
+{
+	u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+	if (tag != 0) {
+		struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+		u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+		obd_put_mod_rpc_slot(cli, opc, tag);
+	}
+}
+EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot);
 
 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
 			     u32 version, int opcode, char **bufs,