@@ -61,7 +61,7 @@
#define RS_SNDLOWAT 2048
#define RS_QP_MIN_SIZE 16
#define RS_QP_MAX_SIZE 0xFFFE
-#define RS_QP_CTRL_SIZE 4
+#define RS_QP_CTRL_SIZE 4 /* must be power of 2 */
#define RS_CONN_RETRIES 6
#define RS_SGL_SIZE 2
static struct index_map idm;
@@ -195,7 +195,7 @@ struct rs_iomap_mr {
int index; /* -1 if mapping is local and not in iomap_list */
};
-#define RS_MIN_INLINE (sizeof(struct rs_sge))
+#define RS_MAX_CTRL_MSG (sizeof(struct rs_sge))
#define rs_host_is_net() (1 == htonl(1))
#define RS_CONN_FLAG_NET (1 << 0)
#define RS_CONN_FLAG_IOMAP (1 << 1)
@@ -506,9 +506,6 @@ void rs_configure(void)
if ((f = fopen(RS_CONF_DIR "/inline_default", "r"))) {
(void) fscanf(f, "%hu", &def_inline);
fclose(f);
-
- if (def_inline < RS_MIN_INLINE)
- def_inline = RS_MIN_INLINE;
}
if ((f = fopen(RS_CONF_DIR "/sqsize_default", "r"))) {
@@ -678,18 +675,21 @@ static void ds_set_qp_size(struct rsocket *rs)
static int rs_init_bufs(struct rsocket *rs)
{
- uint32_t rbuf_msg_size;
+ uint32_t total_rbuf_size, total_sbuf_size;
size_t len;
rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
if (!rs->rmsg)
return ERR(ENOMEM);
- rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
+ total_sbuf_size = rs->sbuf_size;
+ if (rs->sq_inline < RS_MAX_CTRL_MSG)
+ total_sbuf_size += RS_MAX_CTRL_MSG * RS_QP_CTRL_SIZE;
+ rs->sbuf = calloc(total_sbuf_size, 1);
if (!rs->sbuf)
return ERR(ENOMEM);
- rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
+ rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, total_sbuf_size);
if (!rs->smr)
return -1;
@@ -708,14 +708,14 @@ static int rs_init_bufs(struct rsocket *rs)
if (rs->target_iomap_size)
rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
- rbuf_msg_size = rs->rbuf_size;
+ total_rbuf_size = rs->rbuf_size;
if (rs->opts & RS_OPT_MSG_SEND)
- rbuf_msg_size += rs->rq_size * RS_MSG_SIZE;
- rs->rbuf = calloc(rbuf_msg_size, 1);
+ total_rbuf_size += rs->rq_size * RS_MSG_SIZE;
+ rs->rbuf = calloc(total_rbuf_size, 1);
if (!rs->rbuf)
return ERR(ENOMEM);
- rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rbuf_msg_size);
+ rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, total_rbuf_size);
if (!rs->rmr)
return -1;
@@ -862,8 +862,8 @@ static int rs_create_ep(struct rsocket *rs)
return ret;
rs->sq_inline = qp_attr.cap.max_inline_data;
- if (rs->sq_inline < RS_MIN_INLINE)
- return ERR(EINVAL);
+ if ((rs->opts & RS_OPT_MSG_SEND) && (rs->sq_inline < RS_MSG_SIZE))
+ return ERR(ENOTSUP);
for (i = 0; i < rs->rq_size; i++) {
ret = rs_post_recv(rs);
@@ -1497,11 +1497,6 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
goto err;
rs->sq_inline = qp_attr.cap.max_inline_data;
- if (rs->sq_inline < RS_MIN_INLINE) {
- ret = ERR(ENOMEM);
- goto err;
- }
-
ret = ds_add_qp_dest(qp, src_addr, addrlen);
if (ret)
goto err;
@@ -1613,6 +1608,12 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
return ret;
}
+static void *rs_get_ctrl_buf(struct rsocket *rs)
+{
+ return rs->sbuf + rs->sbuf_size +
+ RS_MAX_CTRL_MSG * (rs->ctrl_seqno & (RS_QP_CTRL_SIZE - 1));
+}
+
static int rs_post_msg(struct rsocket *rs, uint32_t msg)
{
struct ibv_send_wr wr, *bad;
@@ -1774,7 +1775,7 @@ static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
- flags, addr, rs->remote_iomap.key);
+ flags, addr, rs->remote_iomap.key);
}
static uint32_t rs_sbuf_left(struct rsocket *rs)
@@ -1786,7 +1787,8 @@ static uint32_t rs_sbuf_left(struct rsocket *rs)
static void rs_send_credits(struct rsocket *rs)
{
struct ibv_sge ibsge;
- struct rs_sge sge;
+ struct rs_sge sge, *sge_buf;
+ int flags;
rs->ctrl_seqno++;
rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1);
@@ -1804,16 +1806,23 @@ static void rs_send_credits(struct rsocket *rs)
sge.length = bswap_32(rs->rbuf_size >> 1);
}
- ibsge.addr = (uintptr_t) &sge;
- ibsge.lkey = 0;
+ if (rs->sq_inline < sizeof sge) {
+ sge_buf = rs_get_ctrl_buf(rs);
+ memcpy(sge_buf, &sge, sizeof sge);
+ ibsge.addr = (uintptr_t) sge_buf;
+ ibsge.lkey = rs->smr->lkey;
+ flags = 0;
+ } else {
+ ibsge.addr = (uintptr_t) &sge;
+ ibsge.lkey = 0;
+ flags = IBV_SEND_INLINE;
+ }
ibsge.length = sizeof(sge);
rs_post_write_msg(rs, &ibsge, 1,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
- IBV_SEND_INLINE,
- rs->remote_sgl.addr +
- rs->remote_sge * sizeof(struct rs_sge),
- rs->remote_sgl.key);
+ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), flags,
+ rs->remote_sgl.addr + rs->remote_sge * sizeof(struct rs_sge),
+ rs->remote_sgl.key);
rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
rs->rbuf_free_offset += rs->rbuf_size >> 1;
@@ -3456,8 +3465,6 @@ int rsetsockopt(int socket, int level, int optname,
break;
case RDMA_INLINE:
rs->sq_inline = min(*(uint32_t *) optval, RS_QP_MAX_SIZE);
- if (rs->sq_inline < RS_MIN_INLINE)
- rs->sq_inline = RS_MIN_INLINE;
ret = 0;
break;
case RDMA_IOMAPSIZE: