@@ -894,6 +894,12 @@ int lnet_get_peer_ni_info(u32 peer_index, u64 *nid,
return false;
}
+static inline void
+lnet_inc_healthv(atomic_t *healthv)
+{
+ atomic_add_unless(healthv, 1, LNET_MAX_HEALTH_VALUE);
+}
+
void lnet_incr_stats(struct lnet_element_stats *stats,
enum lnet_msg_type msg_type,
enum lnet_stats_type stats_type);
@@ -478,6 +478,8 @@ struct lnet_peer_ni {
struct list_head lpni_peer_nis;
/* chain on remote peer list */
struct list_head lpni_on_remote_peer_ni_list;
+ /* chain on recovery queue */
+ struct list_head lpni_recovery;
/* chain on peer hash */
struct list_head lpni_hashlist;
/* messages blocking for tx credits */
@@ -529,6 +531,10 @@ struct lnet_peer_ni {
lnet_nid_t lpni_nid;
/* # refs */
atomic_t lpni_refcount;
+ /* health value for the peer */
+ atomic_t lpni_healthv;
+ /* recovery ping mdh */
+ struct lnet_handle_md lpni_recovery_ping_mdh;
/* CPT this peer attached on */
int lpni_cpt;
/* state flags -- protected by lpni_lock */
@@ -558,6 +564,10 @@ struct lnet_peer_ni {
/* Preferred path added due to traffic on non-MR peer_ni */
#define LNET_PEER_NI_NON_MR_PREF BIT(0)
+/* peer is being recovered. */
+#define LNET_PEER_NI_RECOVERY_PENDING BIT(1)
+/* peer is being deleted */
+#define LNET_PEER_NI_DELETING BIT(2)
struct lnet_peer {
/* chain on pt_peer_list */
@@ -1088,6 +1098,8 @@ struct lnet {
struct list_head **ln_mt_resendqs;
/* local NIs to recover */
struct list_head ln_mt_localNIRecovq;
+ /* local NIs to recover */
+ struct list_head ln_mt_peerNIRecovq;
/* recovery eq handler */
struct lnet_handle_eq ln_mt_eqh;
@@ -832,6 +832,7 @@ struct lnet_libhandle *
INIT_LIST_HEAD(&the_lnet.ln_dc_working);
INIT_LIST_HEAD(&the_lnet.ln_dc_expired);
INIT_LIST_HEAD(&the_lnet.ln_mt_localNIRecovq);
+ INIT_LIST_HEAD(&the_lnet.ln_mt_peerNIRecovq);
init_waitqueue_head(&the_lnet.ln_dc_waitq);
rc = lnet_descriptor_setup();
@@ -1025,15 +1025,6 @@ void lnet_usr_translate_stats(struct lnet_ioctl_element_msg_stats *msg_stats,
}
if (txpeer) {
- /*
- * TODO:
- * Once the patch for the health comes in we need to set
- * the health of the peer ni to bad when we fail to send
- * a message.
- * int status = msg->msg_ev.status;
- * if (status != 0)
- * lnet_set_peer_ni_health_locked(txpeer, false)
- */
msg->msg_txpeer = NULL;
lnet_peer_ni_decref_locked(txpeer);
}
@@ -1545,6 +1536,8 @@ void lnet_usr_translate_stats(struct lnet_ioctl_element_msg_stats *msg_stats,
int best_lpni_credits = INT_MIN;
bool preferred = false;
bool ni_is_pref;
+ int best_lpni_healthv = 0;
+ int lpni_healthv;
while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
/* if the best_ni we've chosen aleady has this lpni
@@ -1553,6 +1546,8 @@ void lnet_usr_translate_stats(struct lnet_ioctl_element_msg_stats *msg_stats,
ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
best_ni->ni_nid);
+ lpni_healthv = atomic_read(&lpni->lpni_healthv);
+
CDEBUG(D_NET, "%s ni_is_pref = %d\n",
libcfs_nid2str(best_ni->ni_nid), ni_is_pref);
@@ -1562,8 +1557,13 @@ void lnet_usr_translate_stats(struct lnet_ioctl_element_msg_stats *msg_stats,
lpni->lpni_txcredits, best_lpni_credits,
lpni->lpni_seq, best_lpni->lpni_seq);
+ /* pick the healthiest peer ni */
+ if (lpni_healthv < best_lpni_healthv) {
+ continue;
+ } else if (lpni_healthv > best_lpni_healthv) {
+ best_lpni_healthv = lpni_healthv;
/* if this is a preferred peer use it */
- if (!preferred && ni_is_pref) {
+ } else if (!preferred && ni_is_pref) {
preferred = true;
} else if (preferred && !ni_is_pref) {
/*
@@ -2408,6 +2408,16 @@ struct lnet_ni *
return 0;
}
+enum lnet_mt_event_type {
+ MT_TYPE_LOCAL_NI = 0,
+ MT_TYPE_PEER_NI
+};
+
+struct lnet_mt_event_info {
+ enum lnet_mt_event_type mt_type;
+ lnet_nid_t mt_nid;
+};
+
static void
lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt)
{
@@ -2503,6 +2513,7 @@ struct lnet_ni *
static void
lnet_recover_local_nis(void)
{
+ struct lnet_mt_event_info *ev_info;
struct list_head processed_list;
struct list_head local_queue;
struct lnet_handle_md mdh;
@@ -2550,15 +2561,24 @@ struct lnet_ni *
lnet_ni_unlock(ni);
lnet_net_unlock(0);
- /* protect the ni->ni_state field. Once we call the
- * lnet_send_ping function it's possible we receive
- * a response before we check the rc. The lock ensures
- * a stable value for the ni_state RECOVERY_PENDING bit
- */
+ CDEBUG(D_NET, "attempting to recover local ni: %s\n",
+ libcfs_nid2str(ni->ni_nid));
+
lnet_ni_lock(ni);
if (!(ni->ni_state & LNET_NI_STATE_RECOVERY_PENDING)) {
ni->ni_state |= LNET_NI_STATE_RECOVERY_PENDING;
lnet_ni_unlock(ni);
+
+ ev_info = kzalloc(sizeof(*ev_info), GFP_NOFS);
+ if (!ev_info) {
+ CERROR("out of memory. Can't recover %s\n",
+ libcfs_nid2str(ni->ni_nid));
+ lnet_ni_lock(ni);
+ ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
+ lnet_ni_unlock(ni);
+ continue;
+ }
+
mdh = ni->ni_ping_mdh;
/* Invalidate the ni mdh in case it's deleted.
* We'll unlink the mdh in this case below.
@@ -2587,9 +2607,10 @@ struct lnet_ni *
lnet_ni_decref_locked(ni, 0);
lnet_net_unlock(0);
- rc = lnet_send_ping(nid, &mdh,
- LNET_INTERFACES_MIN, (void *)nid,
- the_lnet.ln_mt_eqh, true);
+ ev_info->mt_type = MT_TYPE_LOCAL_NI;
+ ev_info->mt_nid = nid;
+ rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
+ ev_info, the_lnet.ln_mt_eqh, true);
/* lookup the nid again */
lnet_net_lock(0);
ni = lnet_nid2ni_locked(nid, 0);
@@ -2694,6 +2715,44 @@ struct lnet_ni *
}
static void
+lnet_unlink_lpni_recovery_mdh_locked(struct lnet_peer_ni *lpni, int cpt)
+{
+ struct lnet_handle_md recovery_mdh;
+
+ LNetInvalidateMDHandle(&recovery_mdh);
+
+ if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) {
+ recovery_mdh = lpni->lpni_recovery_ping_mdh;
+ LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
+ }
+ spin_unlock(&lpni->lpni_lock);
+ lnet_net_unlock(cpt);
+ if (!LNetMDHandleIsInvalid(recovery_mdh))
+ LNetMDUnlink(recovery_mdh);
+ lnet_net_lock(cpt);
+ spin_lock(&lpni->lpni_lock);
+}
+
+static void
+lnet_clean_peer_ni_recoveryq(void)
+{
+ struct lnet_peer_ni *lpni, *tmp;
+
+ lnet_net_lock(LNET_LOCK_EX);
+
+ list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_mt_peerNIRecovq,
+ lpni_recovery) {
+ list_del_init(&lpni->lpni_recovery);
+ spin_lock(&lpni->lpni_lock);
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, LNET_LOCK_EX);
+ spin_unlock(&lpni->lpni_lock);
+ lnet_peer_ni_decref_locked(lpni);
+ }
+
+ lnet_net_unlock(LNET_LOCK_EX);
+}
+
+static void
lnet_clean_resendqs(void)
{
struct lnet_msg *msg, *tmp;
@@ -2716,6 +2775,128 @@ struct lnet_ni *
cfs_percpt_free(the_lnet.ln_mt_resendqs);
}
+static void
+lnet_recover_peer_nis(void)
+{
+ struct lnet_mt_event_info *ev_info;
+ struct list_head processed_list;
+ struct list_head local_queue;
+ struct lnet_handle_md mdh;
+ struct lnet_peer_ni *lpni;
+ struct lnet_peer_ni *tmp;
+ lnet_nid_t nid;
+ int healthv;
+ int rc;
+
+ INIT_LIST_HEAD(&local_queue);
+ INIT_LIST_HEAD(&processed_list);
+
+ /* Always use cpt 0 for locking across all interactions with
+ * ln_mt_peerNIRecovq
+ */
+ lnet_net_lock(0);
+ list_splice_init(&the_lnet.ln_mt_peerNIRecovq,
+ &local_queue);
+ lnet_net_unlock(0);
+
+ list_for_each_entry_safe(lpni, tmp, &local_queue,
+ lpni_recovery) {
+ /* The same protection strategy is used here as is in the
+ * local recovery case.
+ */
+ lnet_net_lock(0);
+ healthv = atomic_read(&lpni->lpni_healthv);
+ spin_lock(&lpni->lpni_lock);
+ if (lpni->lpni_state & LNET_PEER_NI_DELETING ||
+ healthv == LNET_MAX_HEALTH_VALUE) {
+ list_del_init(&lpni->lpni_recovery);
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, 0);
+ spin_unlock(&lpni->lpni_lock);
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(0);
+ continue;
+ }
+ spin_unlock(&lpni->lpni_lock);
+ lnet_net_unlock(0);
+
+ /* NOTE: we're racing with peer deletion from user space.
+ * It's possible that a peer is deleted after we check its
+ * state. In this case the recovery can create a new peer
+ */
+ spin_lock(&lpni->lpni_lock);
+ if (!(lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) &&
+ !(lpni->lpni_state & LNET_PEER_NI_DELETING)) {
+ lpni->lpni_state |= LNET_PEER_NI_RECOVERY_PENDING;
+ spin_unlock(&lpni->lpni_lock);
+
+ ev_info = kzalloc(sizeof(*ev_info), GFP_NOFS);
+ if (!ev_info) {
+ CERROR("out of memory. Can't recover %s\n",
+ libcfs_nid2str(lpni->lpni_nid));
+ spin_lock(&lpni->lpni_lock);
+ lpni->lpni_state &=
+ ~LNET_PEER_NI_RECOVERY_PENDING;
+ spin_unlock(&lpni->lpni_lock);
+ continue;
+ }
+
+ /* look at the comments in lnet_recover_local_nis() */
+ mdh = lpni->lpni_recovery_ping_mdh;
+ LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
+ nid = lpni->lpni_nid;
+ lnet_net_lock(0);
+ list_del_init(&lpni->lpni_recovery);
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(0);
+
+ ev_info->mt_type = MT_TYPE_PEER_NI;
+ ev_info->mt_nid = nid;
+ rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
+ ev_info, the_lnet.ln_mt_eqh, true);
+ lnet_net_lock(0);
+ /* lnet_find_peer_ni_locked() grabs a refcount for
+ * us. No need to take it explicitly.
+ */
+ lpni = lnet_find_peer_ni_locked(nid);
+ if (!lpni) {
+ lnet_net_unlock(0);
+ LNetMDUnlink(mdh);
+ continue;
+ }
+
+ lpni->lpni_recovery_ping_mdh = mdh;
+ /* While we're unlocked the lpni could've been
+ * readded on the recovery queue. In this case we
+ * don't need to add it to the local queue, since
+ * it's already on there and the thread that added
+ * it would've incremented the refcount on the
+ * peer, which means we need to decref the refcount
+ * that was implicitly grabbed by find_peer_ni_locked.
+ * Otherwise, if the lpni is still not on
+ * the recovery queue, then we'll add it to the
+ * processed list.
+ */
+ if (list_empty(&lpni->lpni_recovery))
+ list_add_tail(&lpni->lpni_recovery,
+ &processed_list);
+ else
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(0);
+
+ spin_lock(&lpni->lpni_lock);
+ if (rc)
+ lpni->lpni_state &=
+ ~LNET_PEER_NI_RECOVERY_PENDING;
+ }
+ spin_unlock(&lpni->lpni_lock);
+ }
+
+ list_splice_init(&processed_list, &local_queue);
+ lnet_net_lock(0);
+ list_splice(&local_queue, &the_lnet.ln_mt_peerNIRecovq);
+ lnet_net_unlock(0);
+}
+
static int
lnet_monitor_thread(void *arg)
{
@@ -2736,6 +2917,8 @@ struct lnet_ni *
lnet_recover_local_nis();
+ lnet_recover_peer_nis();
+
/* TODO do we need to check if we should sleep without
* timeout? Technically, an active system will always
* have messages in flight so this check will always
@@ -2822,10 +3005,61 @@ struct lnet_ni *
}
static void
+lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info,
+ int status)
+{
+ lnet_nid_t nid = ev_info->mt_nid;
+
+ if (ev_info->mt_type == MT_TYPE_LOCAL_NI) {
+ struct lnet_ni *ni;
+
+ lnet_net_lock(0);
+ ni = lnet_nid2ni_locked(nid, 0);
+ if (!ni) {
+ lnet_net_unlock(0);
+ return;
+ }
+ lnet_ni_lock(ni);
+ ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
+ lnet_ni_unlock(ni);
+ lnet_net_unlock(0);
+
+ if (status != 0) {
+ CERROR("local NI recovery failed with %d\n", status);
+ return;
+ }
+ /* need to increment healthv for the ni here, because in
+ * the lnet_finalize() path we don't have access to this
+ * NI. And in order to get access to it, we'll need to
+ * carry forward too much information.
+ * In the peer case, it'll naturally be incremented
+ */
+ lnet_inc_healthv(&ni->ni_healthv);
+ } else {
+ struct lnet_peer_ni *lpni;
+ int cpt;
+
+ cpt = lnet_net_lock_current();
+ lpni = lnet_find_peer_ni_locked(nid);
+ if (!lpni) {
+ lnet_net_unlock(cpt);
+ return;
+ }
+ spin_lock(&lpni->lpni_lock);
+ lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+ spin_unlock(&lpni->lpni_lock);
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(cpt);
+
+ if (status != 0)
+ CERROR("peer NI recovery failed with %d\n", status);
+ }
+}
+
+static void
lnet_mt_event_handler(struct lnet_event *event)
{
- lnet_nid_t nid = (lnet_nid_t)event->md.user_ptr;
- struct lnet_ni *ni;
+ struct lnet_mt_event_info *ev_info = event->md.user_ptr;
struct lnet_ping_buffer *pbuf;
/* TODO: remove assert */
@@ -2837,37 +3071,25 @@ struct lnet_ni *
event->status);
switch (event->type) {
+ case LNET_EVENT_UNLINK:
+ CDEBUG(D_NET, "%s recovery ping unlinked\n",
+ libcfs_nid2str(ev_info->mt_nid));
+ /* fall-through */
case LNET_EVENT_REPLY:
- /* If the NI has been restored completely then remove from
- * the recovery queue
- */
- lnet_net_lock(0);
- ni = lnet_nid2ni_locked(nid, 0);
- if (!ni) {
- lnet_net_unlock(0);
- break;
- }
- lnet_ni_lock(ni);
- ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
- lnet_ni_unlock(ni);
- lnet_net_unlock(0);
+ lnet_handle_recovery_reply(ev_info, event->status);
break;
case LNET_EVENT_SEND:
CDEBUG(D_NET, "%s recovery message sent %s:%d\n",
- libcfs_nid2str(nid),
+ libcfs_nid2str(ev_info->mt_nid),
(event->status) ? "unsuccessfully" :
"successfully", event->status);
break;
- case LNET_EVENT_UNLINK:
- /* nothing to do */
- CDEBUG(D_NET, "%s recovery ping unlinked\n",
- libcfs_nid2str(nid));
- break;
default:
CERROR("Unexpected event: %d\n", event->type);
- return;
+ break;
}
if (event->unlinked) {
+ kfree(ev_info);
pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
lnet_ping_buffer_decref(pbuf);
}
@@ -2919,14 +3141,16 @@ int lnet_monitor_thr_start(void)
lnet_router_cleanup();
free_mem:
the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
- lnet_clean_resendqs();
lnet_clean_local_ni_recoveryq();
+ lnet_clean_peer_ni_recoveryq();
+ lnet_clean_resendqs();
LNetEQFree(the_lnet.ln_mt_eqh);
LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
return rc;
clean_queues:
- lnet_clean_resendqs();
lnet_clean_local_ni_recoveryq();
+ lnet_clean_peer_ni_recoveryq();
+ lnet_clean_resendqs();
return rc;
}
@@ -2949,8 +3173,9 @@ void lnet_monitor_thr_stop(void)
/* perform cleanup tasks */
lnet_router_cleanup();
- lnet_clean_resendqs();
lnet_clean_local_ni_recoveryq();
+ lnet_clean_peer_ni_recoveryq();
+ lnet_clean_resendqs();
rc = LNetEQFree(the_lnet.ln_mt_eqh);
LASSERT(rc == 0);
}
@@ -482,12 +482,6 @@
}
}
-static inline void
-lnet_inc_healthv(atomic_t *healthv)
-{
- atomic_add_unless(healthv, 1, LNET_MAX_HEALTH_VALUE);
-}
-
static void
lnet_handle_local_failure(struct lnet_msg *msg)
{
@@ -524,6 +518,43 @@
lnet_net_unlock(0);
}
+static void
+lnet_handle_remote_failure(struct lnet_msg *msg)
+{
+ struct lnet_peer_ni *lpni;
+
+ lpni = msg->msg_txpeer;
+
+ /* lpni could be NULL if we're in the LOLND case */
+ if (!lpni)
+ return;
+
+ lnet_net_lock(0);
+ /* the mt could've shutdown and cleaned up the queues */
+ if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
+ lnet_net_unlock(0);
+ return;
+ }
+
+ lnet_dec_healthv_locked(&lpni->lpni_healthv);
+ /* add the peer NI to the recovery queue if it's not already there
+ * and it's health value is actually below the maximum. It's
+ * possible that the sensitivity might be set to 0, and the health
+ * value will not be reduced. In this case, there is no reason to
+ * invoke recovery
+ */
+ if (list_empty(&lpni->lpni_recovery) &&
+ atomic_read(&lpni->lpni_healthv) < LNET_MAX_HEALTH_VALUE) {
+ CERROR("lpni %s added to recovery queue. Health = %d\n",
+ libcfs_nid2str(lpni->lpni_nid),
+ atomic_read(&lpni->lpni_healthv));
+ list_add_tail(&lpni->lpni_recovery,
+ &the_lnet.ln_mt_peerNIRecovq);
+ lnet_peer_ni_addref_locked(lpni);
+ }
+ lnet_net_unlock(0);
+}
+
/* Do a health check on the message:
* return -1 if we're not going to handle the error
* success case will return -1 as well
@@ -533,11 +564,20 @@
lnet_health_check(struct lnet_msg *msg)
{
enum lnet_msg_hstatus hstatus = msg->msg_health_status;
+ bool lo = false;
/* TODO: lnet_incr_hstats(hstatus); */
LASSERT(msg->msg_txni);
+ /* if we're sending to the LOLND then the msg_txpeer will not be
+ * set. So no need to sanity check it.
+ */
+ if (LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) != LOLND)
+ LASSERT(msg->msg_txpeer);
+ else
+ lo = true;
+
if (hstatus != LNET_MSG_STATUS_OK &&
ktime_compare(ktime_get(), msg->msg_deadline) >= 0)
return -1;
@@ -546,9 +586,21 @@
if (the_lnet.ln_state != LNET_STATE_RUNNING)
return -1;
+ CDEBUG(D_NET, "health check: %s->%s: %s: %s\n",
+ libcfs_nid2str(msg->msg_txni->ni_nid),
+ (lo) ? "self" : libcfs_nid2str(msg->msg_txpeer->lpni_nid),
+ lnet_msgtyp2str(msg->msg_type),
+ lnet_health_error2str(hstatus));
+
switch (hstatus) {
case LNET_MSG_STATUS_OK:
lnet_inc_healthv(&msg->msg_txni->ni_healthv);
+ /* It's possible msg_txpeer is NULL in the LOLND
+ * case.
+ */
+ if (msg->msg_txpeer)
+ lnet_inc_healthv(&msg->msg_txpeer->lpni_healthv);
+
/* we can finalize this message */
return -1;
case LNET_MSG_STATUS_LOCAL_INTERRUPT:
@@ -560,22 +612,27 @@
/* add to the re-send queue */
goto resend;
- /* TODO: since the remote dropped the message we can
- * attempt a resend safely.
- */
- case LNET_MSG_STATUS_REMOTE_DROPPED:
- break;
-
- /* These errors will not trigger a resend so simply
- * finalize the message
- */
+ /* These errors will not trigger a resend so simply
+ * finalize the message
+ */
case LNET_MSG_STATUS_LOCAL_ERROR:
lnet_handle_local_failure(msg);
return -1;
+
+ /* TODO: since the remote dropped the message we can
+ * attempt a resend safely.
+ */
+ case LNET_MSG_STATUS_REMOTE_DROPPED:
+ lnet_handle_remote_failure(msg);
+ goto resend;
+
case LNET_MSG_STATUS_REMOTE_ERROR:
case LNET_MSG_STATUS_REMOTE_TIMEOUT:
case LNET_MSG_STATUS_NETWORK_TIMEOUT:
+ lnet_handle_remote_failure(msg);
return -1;
+ default:
+ LBUG();
}
resend:
@@ -124,6 +124,7 @@
INIT_LIST_HEAD(&lpni->lpni_routes);
INIT_LIST_HEAD(&lpni->lpni_hashlist);
INIT_LIST_HEAD(&lpni->lpni_peer_nis);
+ INIT_LIST_HEAD(&lpni->lpni_recovery);
INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
spin_lock_init(&lpni->lpni_lock);
@@ -133,6 +134,7 @@
lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
lpni->lpni_nid = nid;
lpni->lpni_cpt = cpt;
+ atomic_set(&lpni->lpni_healthv, LNET_MAX_HEALTH_VALUE);
lnet_set_peer_ni_health_locked(lpni, true);
net = lnet_get_net_locked(LNET_NIDNET(nid));
@@ -331,6 +333,13 @@
/* remove peer ni from the hash list. */
list_del_init(&lpni->lpni_hashlist);
+ /* indicate the peer is being deleted so the monitor thread can
+ * remove it from the recovery queue.
+ */
+ spin_lock(&lpni->lpni_lock);
+ lpni->lpni_state |= LNET_PEER_NI_DELETING;
+ spin_unlock(&lpni->lpni_lock);
+
/* decrement the ref count on the peer table */
ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
LASSERT(atomic_read(&ptable->pt_number) > 0);