@@ -188,6 +188,29 @@ static inline int lnet_md_unlinkable(struct lnet_libmd *md)
cfs_percpt_unlock(the_lnet.ln_res_lock, cpt);
}
+static inline void lnet_md_wait_handling(struct lnet_libmd *md, int cpt)
+{
+ wait_queue_head_t *wq = __var_waitqueue(md);
+ struct wait_bit_queue_entry entry;
+ wait_queue_entry_t *wqe = &entry.wq_entry;
+
+ init_wait_var_entry(&entry, md, 0);
+ prepare_to_wait_event(wq, wqe, TASK_IDLE);
+ if (md->md_flags & LNET_MD_FLAG_HANDLING) {
+ /* Race with unlocked call to ->md_handler.
+ * It is safe to drop the res_lock here as the
+ * caller has only just claimed it.
+ */
+ lnet_res_unlock(cpt);
+ schedule();
+ /* Cannot check md now, it might be freed. Caller
+ * must reclaim reference and check.
+ */
+ lnet_res_lock(cpt);
+ }
+ finish_wait(wq, wqe);
+}
+
static inline void
lnet_md_free(struct lnet_libmd *md)
{
@@ -213,6 +213,15 @@ struct lnet_libmd {
#define LNET_MD_FLAG_ZOMBIE BIT(0)
#define LNET_MD_FLAG_AUTO_UNLINK BIT(1)
#define LNET_MD_FLAG_ABORTED BIT(2)
+/* LNET_MD_FLAG_HANDLING is set when a non-unlink event handler
+ * is being called for an event relating to the md.
+ * It ensures only one such handler runs at a time.
+ * The final "unlink" event is only called once the
+ * md_refcount has reached zero, and this flag has been cleared,
+ * ensuring that it doesn't race with any other event handler
+ * call.
+ */
+#define LNET_MD_FLAG_HANDLING BIT(3)
struct lnet_test_peer {
/* info about peers we are trying to fail */
@@ -75,6 +75,7 @@
LASSERT(!list_empty(&md->md_list));
list_del_init(&md->md_list);
+ LASSERT(!(md->md_flags & LNET_MD_FLAG_HANDLING));
lnet_md_free(md);
}
@@ -448,7 +449,8 @@ int lnet_cpt_of_md(struct lnet_libmd *md, unsigned int offset)
LNetMDUnlink(struct lnet_handle_md mdh)
{
struct lnet_event ev;
- struct lnet_libmd *md;
+ struct lnet_libmd *md = NULL;
+ lnet_handler_t handler = NULL;
int cpt;
LASSERT(the_lnet.ln_refcount > 0);
@@ -456,10 +458,18 @@ int lnet_cpt_of_md(struct lnet_libmd *md, unsigned int offset)
cpt = lnet_cpt_of_cookie(mdh.cookie);
lnet_res_lock(cpt);
- md = lnet_handle2md(&mdh);
- if (!md) {
- lnet_res_unlock(cpt);
- return -ENOENT;
+ while (!md) {
+ md = lnet_handle2md(&mdh);
+ if (!md) {
+ lnet_res_unlock(cpt);
+ return -ENOENT;
+ }
+ if (md->md_refcount == 0 &&
+ md->md_flags & LNET_MD_FLAG_HANDLING) {
+ /* Race with unlocked call to ->md_handler. */
+ lnet_md_wait_handling(md, cpt);
+ md = NULL;
+ }
}
md->md_flags |= LNET_MD_FLAG_ABORTED;
@@ -470,7 +480,7 @@ int lnet_cpt_of_md(struct lnet_libmd *md, unsigned int offset)
*/
if (md->md_handler && !md->md_refcount) {
lnet_build_unlink_event(md, &ev);
- md->md_handler(&ev);
+ handler = md->md_handler;
}
if (md->md_rspt_ptr)
@@ -479,6 +489,10 @@ int lnet_cpt_of_md(struct lnet_libmd *md, unsigned int offset)
lnet_md_unlink(md);
lnet_res_unlock(cpt);
+
+ if (handler)
+ handler(&ev);
+
return 0;
}
EXPORT_SYMBOL(LNetMDUnlink);
@@ -938,11 +938,20 @@
}
static void
-lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status)
+lnet_msg_detach_md(struct lnet_msg *msg, int status)
{
struct lnet_libmd *md = msg->msg_md;
+ lnet_handler_t handler = NULL;
+ int cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
int unlink;
+ lnet_res_lock(cpt);
+ while (md->md_flags & LNET_MD_FLAG_HANDLING)
+ /* An event handler is running - wait for it to
+ * complete to avoid races.
+ */
+ lnet_md_wait_handling(md, cpt);
+
/* Now it's safe to drop my caller's ref */
md->md_refcount--;
LASSERT(md->md_refcount >= 0);
@@ -956,17 +965,30 @@
msg->msg_ev.status = status;
}
msg->msg_ev.unlinked = unlink;
- md->md_handler(&msg->msg_ev);
+ handler = md->md_handler;
+ if (!unlink)
+ md->md_flags |= LNET_MD_FLAG_HANDLING;
}
if (unlink || (md->md_refcount == 0 &&
md->md_threshold == LNET_MD_THRESH_INF))
lnet_detach_rsp_tracker(md, cpt);
+ msg->msg_md = NULL;
if (unlink)
lnet_md_unlink(md);
- msg->msg_md = NULL;
+ lnet_res_unlock(cpt);
+
+ if (handler) {
+ handler(&msg->msg_ev);
+ if (!unlink) {
+ lnet_res_lock(cpt);
+ md->md_flags &= ~LNET_MD_FLAG_HANDLING;
+ wake_up_var(md);
+ lnet_res_unlock(cpt);
+ }
+ }
}
static bool
@@ -1101,12 +1123,8 @@
/* We're not going to resend this message so detach its MD and invoke
* the appropriate callbacks
*/
- if (msg->msg_md) {
- cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
- lnet_res_lock(cpt);
- lnet_msg_detach_md(msg, cpt, status);
- lnet_res_unlock(cpt);
- }
+ if (msg->msg_md)
+ lnet_msg_detach_md(msg, status);
again:
if (!msg->msg_tx_committed && !msg->msg_rx_committed) {