@@ -358,9 +358,11 @@ typedef struct RDMAContext {
struct ibv_context *verbs;
struct rdma_event_channel *channel;
struct ibv_qp *qp; /* queue pair */
- struct ibv_comp_channel *comp_channel; /* completion channel */
+ struct ibv_comp_channel *recv_comp_channel; /* recv completion channel */
+ struct ibv_comp_channel *send_comp_channel; /* send completion channel */
struct ibv_pd *pd; /* protection domain */
- struct ibv_cq *cq; /* completion queue */
+ struct ibv_cq *recv_cq; /* recvieve completion queue */
+ struct ibv_cq *send_cq; /* send completion queue */
/*
* If a previous write failed (perhaps because of a failed
@@ -1059,21 +1061,34 @@ static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
return -1;
}
- /* create completion channel */
- rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
- if (!rdma->comp_channel) {
- error_report("failed to allocate completion channel");
+ /* create receive completion channel */
+ rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
+ if (!rdma->recv_comp_channel) {
+ error_report("failed to allocate receive completion channel");
goto err_alloc_pd_cq;
}
/*
- * Completion queue can be filled by both read and write work requests,
- * so must reflect the sum of both possible queue sizes.
+ * Completion queue can be filled by read work requests.
*/
- rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
- NULL, rdma->comp_channel, 0);
- if (!rdma->cq) {
- error_report("failed to allocate completion queue");
+ rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
+ NULL, rdma->recv_comp_channel, 0);
+ if (!rdma->recv_cq) {
+ error_report("failed to allocate receive completion queue");
+ goto err_alloc_pd_cq;
+ }
+
+ /* create send completion channel */
+ rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
+ if (!rdma->send_comp_channel) {
+ error_report("failed to allocate send completion channel");
+ goto err_alloc_pd_cq;
+ }
+
+ rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
+ NULL, rdma->send_comp_channel, 0);
+ if (!rdma->send_cq) {
+ error_report("failed to allocate send completion queue");
goto err_alloc_pd_cq;
}
@@ -1083,11 +1098,19 @@ err_alloc_pd_cq:
if (rdma->pd) {
ibv_dealloc_pd(rdma->pd);
}
- if (rdma->comp_channel) {
- ibv_destroy_comp_channel(rdma->comp_channel);
+ if (rdma->recv_comp_channel) {
+ ibv_destroy_comp_channel(rdma->recv_comp_channel);
+ }
+ if (rdma->send_comp_channel) {
+ ibv_destroy_comp_channel(rdma->send_comp_channel);
+ }
+ if (rdma->recv_cq) {
+ ibv_destroy_cq(rdma->recv_cq);
+ rdma->recv_cq = NULL;
}
rdma->pd = NULL;
- rdma->comp_channel = NULL;
+ rdma->recv_comp_channel = NULL;
+ rdma->send_comp_channel = NULL;
return -1;
}
@@ -1104,8 +1127,8 @@ static int qemu_rdma_alloc_qp(RDMAContext *rdma)
attr.cap.max_recv_wr = 3;
attr.cap.max_send_sge = 1;
attr.cap.max_recv_sge = 1;
- attr.send_cq = rdma->cq;
- attr.recv_cq = rdma->cq;
+ attr.send_cq = rdma->send_cq;
+ attr.recv_cq = rdma->recv_cq;
attr.qp_type = IBV_QPT_RC;
ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
@@ -1496,14 +1519,14 @@ static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
* (of any kind) has completed.
* Return the work request ID that completed.
*/
-static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
- uint32_t *byte_len)
+static uint64_t qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
+ uint64_t *wr_id_out, uint32_t *byte_len)
{
int ret;
struct ibv_wc wc;
uint64_t wr_id;
- ret = ibv_poll_cq(rdma->cq, 1, &wc);
+ ret = ibv_poll_cq(cq, 1, &wc);
if (!ret) {
*wr_id_out = RDMA_WRID_NONE;
@@ -1575,7 +1598,8 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
/* Wait for activity on the completion channel.
* Returns 0 on success, none-0 on error.
*/
-static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
+static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
+ struct ibv_comp_channel *comp_channel)
{
struct rdma_cm_event *cm_event;
int ret = -1;
@@ -1586,7 +1610,7 @@ static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
*/
if (rdma->migration_started_on_destination &&
migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
- yield_until_fd_readable(rdma->comp_channel->fd);
+ yield_until_fd_readable(comp_channel->fd);
} else {
/* This is the source side, we're in a separate thread
* or destination prior to migration_fd_process_incoming()
@@ -1597,7 +1621,7 @@ static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
*/
while (!rdma->error_state && !rdma->received_error) {
GPollFD pfds[2];
- pfds[0].fd = rdma->comp_channel->fd;
+ pfds[0].fd = comp_channel->fd;
pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
pfds[0].revents = 0;
@@ -1655,6 +1679,17 @@ static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
return rdma->error_state;
}
+static struct ibv_comp_channel *to_channel(RDMAContext *rdma, int wrid)
+{
+ return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
+ rdma->recv_comp_channel;
+}
+
+static struct ibv_cq *to_cq(RDMAContext *rdma, int wrid)
+{
+ return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
+}
+
/*
* Block until the next work request has completed.
*
@@ -1675,13 +1710,15 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
struct ibv_cq *cq;
void *cq_ctx;
uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
+ struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
+ struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
- if (ibv_req_notify_cq(rdma->cq, 0)) {
+ if (ibv_req_notify_cq(poll_cq, 0)) {
return -1;
}
/* poll cq first */
while (wr_id != wrid_requested) {
- ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
+ ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
if (ret < 0) {
return ret;
}
@@ -1702,12 +1739,12 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
}
while (1) {
- ret = qemu_rdma_wait_comp_channel(rdma);
+ ret = qemu_rdma_wait_comp_channel(rdma, ch);
if (ret) {
goto err_block_for_wrid;
}
- ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
+ ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
if (ret) {
perror("ibv_get_cq_event");
goto err_block_for_wrid;
@@ -1721,7 +1758,7 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
}
while (wr_id != wrid_requested) {
- ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
+ ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
if (ret < 0) {
goto err_block_for_wrid;
}
@@ -2437,13 +2474,21 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
rdma_destroy_qp(rdma->cm_id);
rdma->qp = NULL;
}
- if (rdma->cq) {
- ibv_destroy_cq(rdma->cq);
- rdma->cq = NULL;
+ if (rdma->recv_cq) {
+ ibv_destroy_cq(rdma->recv_cq);
+ rdma->recv_cq = NULL;
}
- if (rdma->comp_channel) {
- ibv_destroy_comp_channel(rdma->comp_channel);
- rdma->comp_channel = NULL;
+ if (rdma->send_cq) {
+ ibv_destroy_cq(rdma->send_cq);
+ rdma->send_cq = NULL;
+ }
+ if (rdma->recv_comp_channel) {
+ ibv_destroy_comp_channel(rdma->recv_comp_channel);
+ rdma->recv_comp_channel = NULL;
+ }
+ if (rdma->send_comp_channel) {
+ ibv_destroy_comp_channel(rdma->send_comp_channel);
+ rdma->send_comp_channel = NULL;
}
if (rdma->pd) {
ibv_dealloc_pd(rdma->pd);
@@ -3115,10 +3160,14 @@ static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
if (io_read) {
- aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd,
+ aio_set_fd_handler(ctx, rioc->rdmain->recv_comp_channel->fd,
+ false, io_read, io_write, NULL, opaque);
+ aio_set_fd_handler(ctx, rioc->rdmain->send_comp_channel->fd,
false, io_read, io_write, NULL, opaque);
} else {
- aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd,
+ aio_set_fd_handler(ctx, rioc->rdmaout->recv_comp_channel->fd,
+ false, io_read, io_write, NULL, opaque);
+ aio_set_fd_handler(ctx, rioc->rdmaout->send_comp_channel->fd,
false, io_read, io_write, NULL, opaque);
}
}
@@ -3332,7 +3381,22 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
*/
while (1) {
uint64_t wr_id, wr_id_in;
- int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
+ int ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
+ if (ret < 0) {
+ error_report("rdma migration: polling error! %d", ret);
+ goto err;
+ }
+
+ wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
+
+ if (wr_id == RDMA_WRID_NONE) {
+ break;
+ }
+ }
+
+ while (1) {
+ uint64_t wr_id, wr_id_in;
+ int ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
if (ret < 0) {
error_report("rdma migration: polling error! %d", ret);
goto err;