@@ -685,6 +685,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
rs->rs_rx_traces = 0;
rs->rs_tos = 0;
rs->rs_conn = NULL;
+ rs->rs_conn_path = NULL;
spin_lock_bh(&rds_sock_lock);
list_add_tail(&rs->rs_item, &rds_sock_list);
@@ -147,6 +147,7 @@ static void __rds_conn_path_init(struct rds_connection *conn,
INIT_WORK(&cp->cp_down_w, rds_shutdown_worker);
mutex_init(&cp->cp_cm_lock);
cp->cp_flags = 0;
+ init_waitqueue_head(&cp->cp_up_waitq);
}
/*
@@ -913,7 +914,7 @@ void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
rcu_read_unlock();
return;
}
- if (rds_conn_path_state(cp) == RDS_CONN_DOWN)
+ if (rds_conn_path_down(cp))
rds_queue_reconnect(cp);
rcu_read_unlock();
}
@@ -134,6 +134,8 @@ struct rds_conn_path {
unsigned int cp_unacked_packets;
unsigned int cp_unacked_bytes;
unsigned int cp_index;
+
+ wait_queue_head_t cp_up_waitq; /* start up waitq */
};
/* One rds_connection per RDS address pair */
@@ -607,10 +609,11 @@ struct rds_sock {
struct rds_transport *rs_transport;
/*
- * rds_sendmsg caches the conn it used the last time around.
- * This helps avoid costly lookups.
+ * rds_sendmsg caches the conn and conn_path it used the last time
+ * around. This helps avoid costly lookups.
*/
struct rds_connection *rs_conn;
+ struct rds_conn_path *rs_conn_path;
/* flag indicating we were congested or not */
int rs_congested;
@@ -1044,15 +1044,15 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
static int rds_send_mprds_hash(struct rds_sock *rs,
struct rds_connection *conn, int nonblock)
{
+ struct rds_conn_path *cp;
int hash;
if (conn->c_npaths == 0)
hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
else
hash = RDS_MPATH_HASH(rs, conn->c_npaths);
- if (conn->c_npaths == 0 && hash != 0) {
- rds_send_ping(conn, 0);
-
+ cp = &conn->c_path[hash];
+ if (!conn->c_npaths && rds_conn_path_down(cp)) {
/* The underlying connection is not up yet. Need to wait
* until it is up to be sure that the non-zero c_path can be
* used. But if we are interrupted, we have to use the zero
@@ -1066,10 +1066,19 @@ static int rds_send_mprds_hash(struct rds_sock *rs,
return 0;
if (wait_event_interruptible(conn->c_hs_waitq,
conn->c_npaths != 0))
- hash = 0;
+ return 0;
}
if (conn->c_npaths == 1)
hash = 0;
+
+ /* Wait until the chosen path is up. If it is interrupted,
+ * just return as this is an optimization to make sure that
+ * the message is sent.
+ */
+ cp = &conn->c_path[hash];
+ if (rds_conn_path_down(cp))
+ wait_event_interruptible(cp->cp_up_waitq,
+ !rds_conn_path_down(cp));
}
return hash;
}
@@ -1290,6 +1299,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
if (rs->rs_conn && ipv6_addr_equal(&rs->rs_conn->c_faddr, &daddr) &&
rs->rs_tos == rs->rs_conn->c_tos) {
conn = rs->rs_conn;
+ cpath = rs->rs_conn_path;
} else {
conn = rds_conn_create_outgoing(sock_net(sock->sk),
&rs->rs_bound_addr, &daddr,
@@ -1300,14 +1310,30 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
ret = PTR_ERR(conn);
goto out;
}
+ if (conn->c_trans->t_mp_capable) {
+ /* c_npaths == 0 if we have not talked to this peer
+ * before. Initiate a connection request to the
+ * peer right away.
+ */
+ if (!conn->c_npaths &&
+ rds_conn_path_down(&conn->c_path[0])) {
+ /* rds_connd_queue_reconnect_work() ensures
+ * that only one request is queued. And
+ * rds_send_ping() ensures that only one ping
+ * is outstanding.
+ */
+ rds_cond_queue_reconnect_work(&conn->c_path[0],
+ 0);
+ rds_send_ping(conn, 0);
+ }
+ cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, 0)];
+ } else {
+ cpath = &conn->c_path[0];
+ }
rs->rs_conn = conn;
+ rs->rs_conn_path = cpath;
}
- if (conn->c_trans->t_mp_capable)
- cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
- else
- cpath = &conn->c_path[0];
-
rm->m_conn_path = cpath;
/* Parse any control messages the user may have included. */
@@ -1335,7 +1361,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
}
if (rds_conn_path_down(cpath))
- rds_check_all_paths(conn);
+ rds_conn_path_connect_if_down(cpath);
ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
if (ret) {
@@ -73,6 +73,7 @@ void rds_tcp_state_change(struct sock *sk)
rds_conn_path_drop(cp, false);
} else {
rds_connect_path_complete(cp, RDS_CONN_CONNECTING);
+ wake_up(&cp->cp_up_waitq);
}
break;
case TCP_CLOSE_WAIT:
@@ -211,6 +211,7 @@ int rds_tcp_accept_one(struct socket *sock)
} else {
rds_tcp_set_callbacks(new_sock, cp);
rds_connect_path_complete(cp, RDS_CONN_CONNECTING);
+ wake_up(&cp->cp_up_waitq);
}
new_sock = NULL;
ret = 0;