@@ -308,7 +308,8 @@ struct rsocket {
uint64_t tcp_opts;
unsigned int keepalive_time;
- int ctrl_avail;
+ unsigned int ctrl_seqno;
+ unsigned int ctrl_max_seqno;
uint16_t sseq_no;
uint16_t sseq_comp;
uint16_t rseq_no;
@@ -563,6 +564,7 @@ static void rs_remove(struct rsocket *rs)
pthread_mutex_unlock(&mut);
}
+/* We only inherit from listening sockets */
static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
{
struct rsocket *rs;
@@ -585,7 +587,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
rs->sq_size = inherited_rs->sq_size;
rs->rq_size = inherited_rs->rq_size;
if (type == SOCK_STREAM) {
- rs->ctrl_avail = inherited_rs->ctrl_avail;
+ rs->ctrl_max_seqno = inherited_rs->ctrl_max_seqno;
rs->target_iomap_size = inherited_rs->target_iomap_size;
}
} else {
@@ -595,7 +597,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
rs->sq_size = def_sqsize;
rs->rq_size = def_rqsize;
if (type == SOCK_STREAM) {
- rs->ctrl_avail = RS_QP_CTRL_SIZE;
+ rs->ctrl_max_seqno = RS_QP_CTRL_SIZE;
rs->target_iomap_size = def_iomap_size;
}
}
@@ -723,7 +725,7 @@ static int rs_init_bufs(struct rsocket *rs)
rs->rbuf_free_offset = rs->rbuf_size >> 1;
rs->rbuf_bytes_avail = rs->rbuf_size >> 1;
- rs->sqe_avail = rs->sq_size - rs->ctrl_avail;
+ rs->sqe_avail = rs->sq_size - rs->ctrl_max_seqno;
rs->rseq_comp = rs->rq_size >> 1;
return 0;
}
@@ -1786,11 +1788,11 @@ static void rs_send_credits(struct rsocket *rs)
struct ibv_sge ibsge;
struct rs_sge sge;
- rs->ctrl_avail--;
+ rs->ctrl_seqno++;
rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1);
if (rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) {
if (rs->opts & RS_OPT_MSG_SEND)
- rs->ctrl_avail--;
+ rs->ctrl_seqno++;
if (!(rs->opts & RS_OPT_SWAP_SGL)) {
sge.addr = (uintptr_t) &rs->rbuf[rs->rbuf_free_offset];
@@ -1824,16 +1826,27 @@ static void rs_send_credits(struct rsocket *rs)
}
}
+static inline int rs_ctrl_avail(struct rsocket *rs)
+{
+ return rs->ctrl_seqno != rs->ctrl_max_seqno;
+}
+
+/* Protocols that do not support RDMA write with immediate may require 2 msgs */
+static inline int rs_2ctrl_avail(struct rsocket *rs)
+{
+ return (int)((rs->ctrl_seqno + 1) - rs->ctrl_max_seqno) < 0;
+}
+
static int rs_give_credits(struct rsocket *rs)
{
if (!(rs->opts & RS_OPT_MSG_SEND)) {
return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
- rs->ctrl_avail && (rs->state & rs_connected);
+ rs_ctrl_avail(rs) && (rs->state & rs_connected);
} else {
return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
- (rs->ctrl_avail > 1) && (rs->state & rs_connected);
+ rs_2ctrl_avail(rs) && (rs->state & rs_connected);
}
}
@@ -1895,10 +1908,10 @@ static int rs_poll_cq(struct rsocket *rs)
} else {
switch (rs_msg_op(rs_wr_data(wc.wr_id))) {
case RS_OP_SGL:
- rs->ctrl_avail++;
+ rs->ctrl_max_seqno++;
break;
case RS_OP_CTRL:
- rs->ctrl_avail++;
+ rs->ctrl_max_seqno++;
if (rs_msg_data(rs_wr_data(wc.wr_id)) == RS_CTRL_DISCONNECT)
rs->state = rs_disconnected;
break;
@@ -2237,7 +2250,7 @@ static int rs_conn_can_send(struct rsocket *rs)
static int rs_conn_can_send_ctrl(struct rsocket *rs)
{
- return rs->ctrl_avail || !(rs->state & rs_connected);
+ return rs_ctrl_avail(rs) || !(rs->state & rs_connected);
}
static int rs_have_rdata(struct rsocket *rs)
@@ -2252,7 +2265,8 @@ static int rs_conn_have_rdata(struct rsocket *rs)
static int rs_conn_all_sends_done(struct rsocket *rs)
{
- return ((rs->sqe_avail + rs->ctrl_avail) == rs->sq_size) ||
+ return ((((int) rs->ctrl_max_seqno) - ((int) rs->ctrl_seqno)) +
+ rs->sqe_avail == rs->sq_size) ||
!(rs->state & rs_connected);
}
@@ -3189,14 +3203,14 @@ int rshutdown(int socket, int how)
goto out;
ctrl = RS_CTRL_DISCONNECT;
}
- if (!rs->ctrl_avail) {
+ if (!rs_ctrl_avail(rs)) {
ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl);
if (ret)
goto out;
}
- if ((rs->state & rs_connected) && rs->ctrl_avail) {
- rs->ctrl_avail--;
+ if ((rs->state & rs_connected) && rs_ctrl_avail(rs)) {
+ rs->ctrl_seqno++;
ret = rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, ctrl));
}
}
@@ -4158,7 +4172,7 @@ static void tcp_svc_process_sock(struct rs_svc *svc)
static void tcp_svc_send_keepalive(struct rsocket *rs)
{
fastlock_acquire(&rs->cq_lock);
- if ((rs->ctrl_avail > 1) && (rs->state & rs_connected))
+ if (rs_2ctrl_avail(rs) && (rs->state & rs_connected))
rs_send_credits(rs);
fastlock_release(&rs->cq_lock);
}