Message ID | 1612339311-114805-18-git-send-email-zhengchuan@huawei.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | Support Multifd for RDMA migration | expand |
* Chuan Zheng (zhengchuan@huawei.com) wrote: > Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com> > Signed-off-by: Chuan Zheng <zhengchuan@huawei.com> > --- > migration/rdma.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- > 1 file changed, 61 insertions(+), 4 deletions(-) > > diff --git a/migration/rdma.c b/migration/rdma.c > index 2097839..c19a91f 100644 > --- a/migration/rdma.c > +++ b/migration/rdma.c > @@ -2002,6 +2002,20 @@ static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, > .repeat = 1, > }; > > + /* use multifd to send data */ > + if (migrate_use_multifd()) { > + int channel = get_multifd_RDMA_channel(); > + int ret = 0; > + MultiFDSendParams *multifd_send_param = NULL; > + ret = get_multifd_send_param(channel, &multifd_send_param); > + if (ret) { > + error_report("rdma: error getting multifd_send_param(%d)", channel); > + return -EINVAL; > + } > + rdma = (RDMAContext *)multifd_send_param->rdma; > + block = &(rdma->local_ram_blocks.block[current_index]); > + } > + > retry: > sge.addr = (uintptr_t)(block->local_host_addr + > (current_addr - block->offset)); > @@ -2197,6 +2211,27 @@ retry: > return 0; > } > > +static int multifd_rdma_write_flush(void) > +{ > + /* The multifd RDMA threads send data */ > + MultiFDSendParams *multifd_send_param = NULL; > + RDMAContext *rdma = NULL; > + MigrationState *s = migrate_get_current(); > + int ret = 0; > + > + ret = get_multifd_send_param(s->rdma_channel, > + &multifd_send_param); > + if (ret) { > + error_report("rdma: error getting multifd_send_param(%d)", > + s->rdma_channel); Do we need these error_report's for get_multifd_send_param calls - how can they fail in practice? > + return ret; > + } > + rdma = (RDMAContext *)(multifd_send_param->rdma); > + rdma->nb_sent++; > + > + return ret; But this doesn't actually 'flush' anything? > +} > + > /* > * Push out any unwritten RDMA operations. > * > @@ -2219,8 +2254,15 @@ static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) > } > > if (ret == 0) { > - rdma->nb_sent++; > - trace_qemu_rdma_write_flush(rdma->nb_sent); > + if (migrate_use_multifd()) { > + ret = multifd_rdma_write_flush(); > + if (ret) { > + return ret; > + } > + } else { > + rdma->nb_sent++; > + trace_qemu_rdma_write_flush(rdma->nb_sent); > + } > } > > rdma->current_length = 0; > @@ -4062,6 +4104,7 @@ wait_reg_complete: > } > > qemu_sem_post(&multifd_send_param->sem_sync); > + qemu_sem_wait(&multifd_send_param->sem); why? > } > } > > @@ -4443,6 +4486,7 @@ static void *multifd_rdma_send_thread(void *opaque) > Error *local_err = NULL; > int ret = 0; > RDMAControlHeader head = { .len = 0, .repeat = 1 }; > + RDMAContext *rdma = p->rdma; > > trace_multifd_send_thread_start(p->id); > if (multifd_send_initial_packet(p, &local_err) < 0) { > @@ -4451,7 +4495,7 @@ static void *multifd_rdma_send_thread(void *opaque) > > /* wait for semaphore notification to register memory */ > qemu_sem_wait(&p->sem_sync); > - if (qemu_rdma_registration(p->rdma) < 0) { > + if (qemu_rdma_registration(rdma) < 0) { > goto out; > } > /* > @@ -4466,12 +4510,25 @@ static void *multifd_rdma_send_thread(void *opaque) > break; > } > } > + /* To complete polling(CQE) */ > + while (rdma->nb_sent) { Where is nb_sent decremented? > + ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); > + if (ret < 0) { > + error_report("multifd RDMA migration: " > + "complete polling error!"); > + return NULL; > + } > + } > /* Send FINISHED to the destination */ > head.type = RDMA_CONTROL_REGISTER_FINISHED; > - ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL); > + ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); > if (ret < 0) { > + error_report("multifd RDMA migration: " > + "sending remote error!"); > return NULL; > } > + /* sync main thread */ > + qemu_sem_post(&p->sem); > } > > out: > -- > 1.8.3.1 >
On 2021/2/4 18:18, Dr. David Alan Gilbert wrote: > * Chuan Zheng (zhengchuan@huawei.com) wrote: >> Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com> >> Signed-off-by: Chuan Zheng <zhengchuan@huawei.com> >> --- >> migration/rdma.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- >> 1 file changed, 61 insertions(+), 4 deletions(-) >> >> diff --git a/migration/rdma.c b/migration/rdma.c >> index 2097839..c19a91f 100644 >> --- a/migration/rdma.c >> +++ b/migration/rdma.c >> @@ -2002,6 +2002,20 @@ static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, >> .repeat = 1, >> }; >> >> + /* use multifd to send data */ >> + if (migrate_use_multifd()) { >> + int channel = get_multifd_RDMA_channel(); >> + int ret = 0; >> + MultiFDSendParams *multifd_send_param = NULL; >> + ret = get_multifd_send_param(channel, &multifd_send_param); >> + if (ret) { >> + error_report("rdma: error getting multifd_send_param(%d)", channel); >> + return -EINVAL; >> + } >> + rdma = (RDMAContext *)multifd_send_param->rdma; >> + block = &(rdma->local_ram_blocks.block[current_index]); >> + } >> + >> retry: >> sge.addr = (uintptr_t)(block->local_host_addr + >> (current_addr - block->offset)); >> @@ -2197,6 +2211,27 @@ retry: >> return 0; >> } >> >> +static int multifd_rdma_write_flush(void) >> +{ >> + /* The multifd RDMA threads send data */ >> + MultiFDSendParams *multifd_send_param = NULL; >> + RDMAContext *rdma = NULL; >> + MigrationState *s = migrate_get_current(); >> + int ret = 0; >> + >> + ret = get_multifd_send_param(s->rdma_channel, >> + &multifd_send_param); >> + if (ret) { >> + error_report("rdma: error getting multifd_send_param(%d)", >> + s->rdma_channel); > > Do we need these error_report's for get_multifd_send_param calls - how > can they fail in practice? > Maybe we do not need it. The s->rdma_channel should not exceed the migrate_multifd_channels and should not negative. >> + return ret; >> + } >> + rdma = (RDMAContext *)(multifd_send_param->rdma); >> + rdma->nb_sent++; >> + >> + return ret; > > But this doesn't actually 'flush' anything? > Yes, it just use to increase the nb_sent. we need to choose a more suitable function name. >> +} >> + >> /* >> * Push out any unwritten RDMA operations. >> * >> @@ -2219,8 +2254,15 @@ static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) >> } >> >> if (ret == 0) { >> - rdma->nb_sent++; >> - trace_qemu_rdma_write_flush(rdma->nb_sent); >> + if (migrate_use_multifd()) { >> + ret = multifd_rdma_write_flush(); >> + if (ret) { >> + return ret; >> + } >> + } else { >> + rdma->nb_sent++; >> + trace_qemu_rdma_write_flush(rdma->nb_sent); >> + } >> } >> >> rdma->current_length = 0; >> @@ -4062,6 +4104,7 @@ wait_reg_complete: >> } >> >> qemu_sem_post(&multifd_send_param->sem_sync); >> + qemu_sem_wait(&multifd_send_param->sem); > > why? > The multifd send thread would post sem signal after finishing sending data. The main thread need wait for multifd RDMA send threads to poll the CQE. >> } >> } >> >> @@ -4443,6 +4486,7 @@ static void *multifd_rdma_send_thread(void *opaque) >> Error *local_err = NULL; >> int ret = 0; >> RDMAControlHeader head = { .len = 0, .repeat = 1 }; >> + RDMAContext *rdma = p->rdma; >> >> trace_multifd_send_thread_start(p->id); >> if (multifd_send_initial_packet(p, &local_err) < 0) { >> @@ -4451,7 +4495,7 @@ static void *multifd_rdma_send_thread(void *opaque) >> >> /* wait for semaphore notification to register memory */ >> qemu_sem_wait(&p->sem_sync); >> - if (qemu_rdma_registration(p->rdma) < 0) { >> + if (qemu_rdma_registration(rdma) < 0) { >> goto out; >> } >> /* >> @@ -4466,12 +4510,25 @@ static void *multifd_rdma_send_thread(void *opaque) >> break; >> } >> } >> + /* To complete polling(CQE) */ >> + while (rdma->nb_sent) { > > Where is nb_sent decremented? > the nb_sent is decreased in qemu_rdma_poll which is called by qemu_rdma_block_for_wrid. >> + ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); >> + if (ret < 0) { >> + error_report("multifd RDMA migration: " >> + "complete polling error!"); >> + return NULL; >> + } >> + } >> /* Send FINISHED to the destination */ >> head.type = RDMA_CONTROL_REGISTER_FINISHED; >> - ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL); >> + ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); >> if (ret < 0) { >> + error_report("multifd RDMA migration: " >> + "sending remote error!"); >> return NULL; >> } >> + /* sync main thread */ >> + qemu_sem_post(&p->sem); >> } >> >> out: >> -- >> 1.8.3.1 >>
diff --git a/migration/rdma.c b/migration/rdma.c index 2097839..c19a91f 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -2002,6 +2002,20 @@ static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, .repeat = 1, }; + /* use multifd to send data */ + if (migrate_use_multifd()) { + int channel = get_multifd_RDMA_channel(); + int ret = 0; + MultiFDSendParams *multifd_send_param = NULL; + ret = get_multifd_send_param(channel, &multifd_send_param); + if (ret) { + error_report("rdma: error getting multifd_send_param(%d)", channel); + return -EINVAL; + } + rdma = (RDMAContext *)multifd_send_param->rdma; + block = &(rdma->local_ram_blocks.block[current_index]); + } + retry: sge.addr = (uintptr_t)(block->local_host_addr + (current_addr - block->offset)); @@ -2197,6 +2211,27 @@ retry: return 0; } +static int multifd_rdma_write_flush(void) +{ + /* The multifd RDMA threads send data */ + MultiFDSendParams *multifd_send_param = NULL; + RDMAContext *rdma = NULL; + MigrationState *s = migrate_get_current(); + int ret = 0; + + ret = get_multifd_send_param(s->rdma_channel, + &multifd_send_param); + if (ret) { + error_report("rdma: error getting multifd_send_param(%d)", + s->rdma_channel); + return ret; + } + rdma = (RDMAContext *)(multifd_send_param->rdma); + rdma->nb_sent++; + + return ret; +} + /* * Push out any unwritten RDMA operations. * @@ -2219,8 +2254,15 @@ static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) } if (ret == 0) { - rdma->nb_sent++; - trace_qemu_rdma_write_flush(rdma->nb_sent); + if (migrate_use_multifd()) { + ret = multifd_rdma_write_flush(); + if (ret) { + return ret; + } + } else { + rdma->nb_sent++; + trace_qemu_rdma_write_flush(rdma->nb_sent); + } } rdma->current_length = 0; @@ -4062,6 +4104,7 @@ wait_reg_complete: } qemu_sem_post(&multifd_send_param->sem_sync); + qemu_sem_wait(&multifd_send_param->sem); } } @@ -4443,6 +4486,7 @@ static void *multifd_rdma_send_thread(void *opaque) Error *local_err = NULL; int ret = 0; RDMAControlHeader head = { .len = 0, .repeat = 1 }; + RDMAContext *rdma = p->rdma; trace_multifd_send_thread_start(p->id); if (multifd_send_initial_packet(p, &local_err) < 0) { @@ -4451,7 +4495,7 @@ static void *multifd_rdma_send_thread(void *opaque) /* wait for semaphore notification to register memory */ qemu_sem_wait(&p->sem_sync); - if (qemu_rdma_registration(p->rdma) < 0) { + if (qemu_rdma_registration(rdma) < 0) { goto out; } /* @@ -4466,12 +4510,25 @@ static void *multifd_rdma_send_thread(void *opaque) break; } } + /* To complete polling(CQE) */ + while (rdma->nb_sent) { + ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); + if (ret < 0) { + error_report("multifd RDMA migration: " + "complete polling error!"); + return NULL; + } + } /* Send FINISHED to the destination */ head.type = RDMA_CONTROL_REGISTER_FINISHED; - ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL); + ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); if (ret < 0) { + error_report("multifd RDMA migration: " + "sending remote error!"); return NULL; } + /* sync main thread */ + qemu_sem_post(&p->sem); } out: