@@ -945,6 +945,10 @@ struct lnet {
struct list_head ln_net_zombie;
/* the loopback NI */
struct lnet_ni *ln_loni;
+ /* resend messages list */
+ struct list_head ln_msg_resend;
+ /* spin lock to protect the msg resend list */
+ spinlock_t ln_msg_resend_lock;
/* remote networks with routes to them */
struct list_head *ln_remote_nets_hash;
@@ -211,6 +211,7 @@ static int lnet_discover(struct lnet_process_id id, u32 force,
lnet_init_locks(void)
{
spin_lock_init(&the_lnet.ln_eq_wait_lock);
+ spin_lock_init(&the_lnet.ln_msg_resend_lock);
init_waitqueue_head(&the_lnet.ln_eq_waitq);
init_waitqueue_head(&the_lnet.ln_rc_waitq);
mutex_init(&the_lnet.ln_lnd_mutex);
@@ -1652,6 +1653,10 @@ static void lnet_push_target_fini(void)
lnet_shutdown_lndnets(void)
{
struct lnet_net *net;
+ struct list_head resend;
+ struct lnet_msg *msg, *tmp;
+
+ INIT_LIST_HEAD(&resend);
/* NB called holding the global mutex */
@@ -1682,6 +1687,15 @@ static void lnet_push_target_fini(void)
net_list)) != NULL)
lnet_shutdown_lndnet(net);
+ spin_lock(&the_lnet.ln_msg_resend_lock);
+ list_splice(&the_lnet.ln_msg_resend, &resend);
+ spin_unlock(&the_lnet.ln_msg_resend_lock);
+
+ list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
+ list_del_init(&msg->msg_list);
+ lnet_finalize(msg, -ECANCELED);
+ }
+
lnet_net_lock(LNET_LOCK_EX);
the_lnet.ln_state = LNET_STATE_SHUTDOWN;
lnet_net_unlock(LNET_LOCK_EX);
@@ -2025,6 +2039,7 @@ int lnet_lib_init(void)
INIT_LIST_HEAD(&the_lnet.ln_lnds);
INIT_LIST_HEAD(&the_lnet.ln_net_zombie);
INIT_LIST_HEAD(&the_lnet.ln_rcd_zombie);
+ INIT_LIST_HEAD(&the_lnet.ln_msg_resend);
INIT_LIST_HEAD(&the_lnet.ln_rcd_deathrow);
/*
@@ -231,6 +231,22 @@
if (lp->lp_data)
lnet_ping_buffer_decref(lp->lp_data);
+ /* if there are messages still on the pending queue, then make
+ * sure to queue them on the ln_msg_resend list so they can be
+ * resent at a later point if the discovery thread is still
+ * running.
+ * If the discovery thread has stopped, then the wakeup will be a
+ * no-op, and it is expected the lnet_shutdown_lndnets() will
+ * eventually be called, which will traverse this list and
+ * finalize the messages on the list.
+ * We can not resend them now because we're holding the cpt lock.
+ * Releasing the lock can cause an inconsistent state
+ */
+ spin_lock(&the_lnet.ln_msg_resend_lock);
+ list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
+ spin_unlock(&the_lnet.ln_msg_resend_lock);
+ wake_up(&the_lnet.ln_dc_waitq);
+
kfree(lp);
}
@@ -1532,6 +1548,8 @@ struct lnet_peer_net *
LASSERT(lpni->lpni_rtr_refcount == 0);
LASSERT(list_empty(&lpni->lpni_txq));
LASSERT(lpni->lpni_txqnob == 0);
+ LASSERT(list_empty(&lpni->lpni_peer_nis));
+ LASSERT(list_empty(&lpni->lpni_on_remote_peer_ni_list));
lpn = lpni->lpni_peer_net;
lpni->lpni_peer_net = NULL;
@@ -1730,7 +1748,7 @@ static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
*/
static void lnet_peer_discovery_complete(struct lnet_peer *lp)
{
- struct lnet_msg *msg = NULL;
+ struct lnet_msg *msg, *tmp;
int rc = 0;
struct list_head pending_msgs;
@@ -1746,7 +1764,8 @@ static void lnet_peer_discovery_complete(struct lnet_peer *lp)
lnet_net_unlock(LNET_LOCK_EX);
/* iterate through all pending messages and send them again */
- list_for_each_entry(msg, &pending_msgs, msg_list) {
+ list_for_each_entry_safe(msg, tmp, &pending_msgs, msg_list) {
+ list_del_init(&msg->msg_list);
if (lp->lp_dc_error) {
lnet_finalize(msg, lp->lp_dc_error);
continue;
@@ -2970,6 +2989,8 @@ static int lnet_peer_discovery_wait_for_work(void)
break;
if (!list_empty(&the_lnet.ln_dc_request))
break;
+ if (!list_empty(&the_lnet.ln_msg_resend))
+ break;
if (lnet_peer_dc_timed_out(ktime_get_real_seconds()))
break;
lnet_net_unlock(cpt);
@@ -2995,6 +3016,47 @@ static int lnet_peer_discovery_wait_for_work(void)
return rc;
}
+/* Messages that were pending on a destroyed peer will be put on a global
+ * resend list. The message resend list will be checked by
+ * the discovery thread when it wakes up, and will resend messages. These
+ * messages can still be sendable in the case the lpni which was the initial
+ * cause of the message re-queue was transferred to another peer.
+ *
+ * It is possible that LNet could be shutdown while we're iterating
+ * through the list. lnet_shudown_lndnets() will attempt to access the
+ * resend list, but will have to wait until the spinlock is released, by
+ * which time there shouldn't be any more messages on the resend list.
+ * During shutdown lnet_send() will fail and lnet_finalize() will be called
+ * for the messages so they can be released. The other case is that
+ * lnet_shudown_lndnets() can finalize all the messages before this
+ * function can visit the resend list, in which case this function will be
+ * a no-op.
+ */
+static void lnet_resend_msgs(void)
+{
+ struct lnet_msg *msg, *tmp;
+ struct list_head resend;
+ int rc;
+
+ INIT_LIST_HEAD(&resend);
+
+ spin_lock(&the_lnet.ln_msg_resend_lock);
+ list_splice(&the_lnet.ln_msg_resend, &resend);
+ spin_unlock(&the_lnet.ln_msg_resend_lock);
+
+ list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
+ list_del_init(&msg->msg_list);
+ rc = lnet_send(msg->msg_src_nid_param, msg,
+ msg->msg_rtr_nid_param);
+ if (rc < 0) {
+ CNETERR("Error sending %s to %s: %d\n",
+ lnet_msgtyp2str(msg->msg_type),
+ libcfs_id2str(msg->msg_target), rc);
+ lnet_finalize(msg, rc);
+ }
+ }
+}
+
/* The discovery thread. */
static int lnet_peer_discovery(void *arg)
{
@@ -3008,6 +3070,8 @@ static int lnet_peer_discovery(void *arg)
if (lnet_peer_discovery_wait_for_work())
break;
+ lnet_resend_msgs();
+
if (lnet_push_target_resize_needed())
lnet_push_target_resize();