@@ -817,6 +817,7 @@ struct lnet_peer_ni *lnet_peer_get_ni_locked(struct lnet_peer *lp,
void lnet_peer_net_added(struct lnet_net *net);
lnet_nid_t lnet_peer_primary_nid_locked(lnet_nid_t nid);
int lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block);
+void lnet_peer_queue_message(struct lnet_peer *lp, struct lnet_msg *msg);
int lnet_peer_discovery_start(void);
void lnet_peer_discovery_stop(void);
void lnet_push_update_to_peers(int force);
@@ -4540,6 +4540,18 @@ static int lnet_ping(struct lnet_process_id id, signed long timeout,
if (rc)
goto out_decref;
+ /* The lpni (or lp) for this NID may have changed and our ref is
+ * the only thing keeping the old one around. Release the ref
+ * and lookup the lpni again
+ */
+ lnet_peer_ni_decref_locked(lpni);
+ lpni = lnet_find_peer_ni_locked(id.nid);
+ if (!lpni) {
+ rc = -ENOENT;
+ goto out;
+ }
+ lp = lpni->lpni_peer_net->lpn_peer;
+
i = 0;
p = NULL;
while ((p = lnet_get_next_peer_ni_locked(lp, NULL, p)) != NULL) {
@@ -1834,6 +1834,7 @@ struct lnet_ni *
int cpt)
{
struct lnet_peer *peer;
+ struct lnet_peer_ni *new_lpni;
int rc;
lnet_peer_ni_addref_locked(lpni);
@@ -1855,21 +1856,38 @@ struct lnet_ni *
lnet_peer_ni_decref_locked(lpni);
return rc;
}
- /* The peer may have changed. */
- peer = lpni->lpni_peer_net->lpn_peer;
+
+ new_lpni = lnet_find_peer_ni_locked(lpni->lpni_nid);
+ if (!new_lpni) {
+ lnet_peer_ni_decref_locked(lpni);
+ return -ENOENT;
+ }
+
+ peer = new_lpni->lpni_peer_net->lpn_peer;
spin_lock(&peer->lp_lock);
- if (lnet_peer_is_uptodate_locked(peer)) {
+ if (lpni == new_lpni && lnet_peer_is_uptodate_locked(peer)) {
+ /* The peer NI did not change and the peer is up to date.
+ * Nothing more to do.
+ */
spin_unlock(&peer->lp_lock);
lnet_peer_ni_decref_locked(lpni);
+ lnet_peer_ni_decref_locked(new_lpni);
return 0;
}
- /* queue message and return */
+ spin_unlock(&peer->lp_lock);
+
+ /* Either the peer NI changed during discovery, or the peer isn't up
+ * to date. In both cases we want to queue the message on the
+ * (possibly new) peer's pending queue and queue the peer for discovery
+ */
msg->msg_sending = 0;
msg->msg_txpeer = NULL;
- list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
- spin_unlock(&peer->lp_lock);
+ lnet_net_unlock(cpt);
+ lnet_peer_queue_message(peer, msg);
+ lnet_net_lock(cpt);
lnet_peer_ni_decref_locked(lpni);
+ lnet_peer_ni_decref_locked(new_lpni);
CDEBUG(D_NET, "msg %p delayed. %s pending discovery\n",
msg, libcfs_nid2str(peer->lp_primary_nid));
@@ -1346,6 +1346,16 @@ struct lnet_peer_ni *
rc = lnet_discover_peer_locked(lpni, cpt, true);
if (rc)
goto out_decref;
+ /* The lpni (or lp) for this NID may have changed and our ref is
+ * the only thing keeping the old one around. Release the ref
+ * and lookup the lpni again
+ */
+ lnet_peer_ni_decref_locked(lpni);
+ lpni = lnet_find_peer_ni_locked(nid);
+ if (!lpni) {
+ rc = -ENOENT;
+ goto out_unlock;
+ }
lp = lpni->lpni_peer_net->lpn_peer;
/* Only try once if discovery is disabled */
@@ -2054,6 +2064,26 @@ struct lnet_peer_ni *
return rc;
}
+/* Add the message to the peer's lp_dc_pendq and queue the peer for discovery */
+void
+lnet_peer_queue_message(struct lnet_peer *lp, struct lnet_msg *msg)
+{
+ /* The discovery thread holds net_lock/EX and lp_lock when it splices
+ * the lp_dc_pendq onto a local list for resending. Thus, we do the same
+ * when adding to the list and queuing the peer to ensure that we do not
+ * strand any messages on the lp_dc_pendq. This scheme ensures the
+ * message will be resent even if the peer is already being discovered.
+ * Therefore we needn't check the return value of
+ * lnet_peer_queue_for_discovery(lp).
+ */
+ lnet_net_lock(LNET_LOCK_EX);
+ spin_lock(&lp->lp_lock);
+ list_add_tail(&msg->msg_list, &lp->lp_dc_pendq);
+ spin_unlock(&lp->lp_lock);
+ lnet_peer_queue_for_discovery(lp);
+ lnet_net_unlock(LNET_LOCK_EX);
+}
+
/*
* Queue a peer for the attention of the discovery thread. Call with
* lnet_net_lock/EX held. Returns 0 if the peer was queued, and