@@ -92,6 +92,11 @@ struct {
MultiFDMethods *ops;
} *multifd_recv_state;
+static bool multifd_use_packets(void)
+{
+ return !migrate_mapped_ram();
+}
+
/* Multifd without compression */
/**
@@ -122,6 +127,19 @@ static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
return;
}
+static void multifd_send_prepare_iovs(MultiFDSendParams *p)
+{
+ MultiFDPages_t *pages = p->pages;
+
+ for (int i = 0; i < pages->num; i++) {
+ p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
+ p->iov[p->iovs_num].iov_len = p->page_size;
+ p->iovs_num++;
+ }
+
+ p->next_packet_size = pages->num * p->page_size;
+}
+
/**
* nocomp_send_prepare: prepare date to be able to send
*
@@ -136,9 +154,13 @@ static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
{
bool use_zero_copy_send = migrate_zero_copy_send();
- MultiFDPages_t *pages = p->pages;
int ret;
+ if (!multifd_use_packets()) {
+ multifd_send_prepare_iovs(p);
+ return 0;
+ }
+
if (!use_zero_copy_send) {
/*
* Only !zerocopy needs the header in IOV; zerocopy will
@@ -147,13 +169,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
multifd_send_prepare_header(p);
}
- for (int i = 0; i < pages->num; i++) {
- p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
- p->iov[p->iovs_num].iov_len = p->page_size;
- p->iovs_num++;
- }
-
- p->next_packet_size = pages->num * p->page_size;
+ multifd_send_prepare_iovs(p);
p->flags |= MULTIFD_FLAG_NOCOMP;
multifd_send_fill_packet(p);
@@ -208,7 +224,13 @@ static void nocomp_recv_cleanup(MultiFDRecvParams *p)
*/
static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
{
- uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
+ uint32_t flags;
+
+ if (!multifd_use_packets()) {
+ return 0;
+ }
+
+ flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
if (flags != MULTIFD_FLAG_NOCOMP) {
error_setg(errp, "multifd %u: flags received %x flags expected %x",
@@ -795,15 +817,18 @@ static void *multifd_send_thread(void *opaque)
MigrationThread *thread = NULL;
Error *local_err = NULL;
int ret = 0;
+ bool use_packets = multifd_use_packets();
thread = migration_threads_add(p->name, qemu_get_thread_id());
trace_multifd_send_thread_start(p->id);
rcu_register_thread();
- if (multifd_send_initial_packet(p, &local_err) < 0) {
- ret = -1;
- goto out;
+ if (use_packets) {
+ if (multifd_send_initial_packet(p, &local_err) < 0) {
+ ret = -1;
+ goto out;
+ }
}
while (true) {
@@ -854,16 +879,20 @@ static void *multifd_send_thread(void *opaque)
* it doesn't require explicit memory barriers.
*/
assert(qatomic_read(&p->pending_sync));
- p->flags = MULTIFD_FLAG_SYNC;
- multifd_send_fill_packet(p);
- ret = qio_channel_write_all(p->c, (void *)p->packet,
- p->packet_len, &local_err);
- if (ret != 0) {
- break;
+
+ if (use_packets) {
+ p->flags = MULTIFD_FLAG_SYNC;
+ multifd_send_fill_packet(p);
+ ret = qio_channel_write_all(p->c, (void *)p->packet,
+ p->packet_len, &local_err);
+ if (ret != 0) {
+ break;
+ }
+ /* p->next_packet_size will always be zero for a SYNC packet */
+ stat64_add(&mig_stats.multifd_bytes, p->packet_len);
+ p->flags = 0;
}
- /* p->next_packet_size will always be zero for a SYNC packet */
- stat64_add(&mig_stats.multifd_bytes, p->packet_len);
- p->flags = 0;
+
qatomic_set(&p->pending_sync, false);
qemu_sem_post(&p->sem_sync);
}
@@ -1018,6 +1047,7 @@ bool multifd_send_setup(void)
Error *local_err = NULL;
int thread_count, ret = 0;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+ bool use_packets = multifd_use_packets();
uint8_t i;
if (!migrate_multifd()) {
@@ -1040,14 +1070,20 @@ bool multifd_send_setup(void)
qemu_sem_init(&p->sem_sync, 0);
p->id = i;
p->pages = multifd_pages_init(page_count);
- p->packet_len = sizeof(MultiFDPacket_t)
- + sizeof(uint64_t) * page_count;
- p->packet = g_malloc0(p->packet_len);
- p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
- p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+
+ if (use_packets) {
+ p->packet_len = sizeof(MultiFDPacket_t)
+ + sizeof(uint64_t) * page_count;
+ p->packet = g_malloc0(p->packet_len);
+ p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
+ p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+
+ /* We need one extra place for the packet header */
+ p->iov = g_new0(struct iovec, page_count + 1);
+ } else {
+ p->iov = g_new0(struct iovec, page_count);
+ }
p->name = g_strdup_printf("multifdsend_%d", i);
- /* We need one extra place for the packet header */
- p->iov = g_new0(struct iovec, page_count + 1);
p->page_size = qemu_target_page_size();
p->page_count = page_count;
p->write_flags = 0;
@@ -1110,7 +1146,9 @@ static void multifd_recv_terminate_threads(Error *err)
* multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
* however try to wakeup it without harm in cleanup phase.
*/
- qemu_sem_post(&p->sem_sync);
+ if (multifd_use_packets()) {
+ qemu_sem_post(&p->sem_sync);
+ }
/*
* We could arrive here for two reasons:
@@ -1185,7 +1223,7 @@ void multifd_recv_sync_main(void)
int thread_count = migrate_multifd_channels();
int i;
- if (!migrate_multifd()) {
+ if (!migrate_multifd() || !multifd_use_packets()) {
return;
}
@@ -1220,13 +1258,14 @@ static void *multifd_recv_thread(void *opaque)
{
MultiFDRecvParams *p = opaque;
Error *local_err = NULL;
+ bool use_packets = multifd_use_packets();
int ret;
trace_multifd_recv_thread_start(p->id);
rcu_register_thread();
while (true) {
- uint32_t flags;
+ uint32_t flags = 0;
bool has_data = false;
p->normal_num = 0;
@@ -1234,25 +1273,27 @@ static void *multifd_recv_thread(void *opaque)
break;
}
- ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
- p->packet_len, &local_err);
- if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
- break;
- }
+ if (use_packets) {
+ ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
+ p->packet_len, &local_err);
+ if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
+ break;
+ }
- qemu_mutex_lock(&p->mutex);
- ret = multifd_recv_unfill_packet(p, &local_err);
- if (ret) {
+ qemu_mutex_lock(&p->mutex);
+ ret = multifd_recv_unfill_packet(p, &local_err);
+ if (ret) {
+ qemu_mutex_unlock(&p->mutex);
+ break;
+ }
+
+ flags = p->flags;
+ /* recv methods don't know how to handle the SYNC flag */
+ p->flags &= ~MULTIFD_FLAG_SYNC;
+ has_data = !!p->normal_num;
qemu_mutex_unlock(&p->mutex);
- break;
}
- flags = p->flags;
- /* recv methods don't know how to handle the SYNC flag */
- p->flags &= ~MULTIFD_FLAG_SYNC;
- has_data = !!p->normal_num;
- qemu_mutex_unlock(&p->mutex);
-
if (has_data) {
ret = multifd_recv_state->ops->recv(p, &local_err);
if (ret != 0) {
@@ -1260,9 +1301,11 @@ static void *multifd_recv_thread(void *opaque)
}
}
- if (flags & MULTIFD_FLAG_SYNC) {
- qemu_sem_post(&multifd_recv_state->sem_sync);
- qemu_sem_wait(&p->sem_sync);
+ if (use_packets) {
+ if (flags & MULTIFD_FLAG_SYNC) {
+ qemu_sem_post(&multifd_recv_state->sem_sync);
+ qemu_sem_wait(&p->sem_sync);
+ }
}
}
@@ -1281,6 +1324,7 @@ int multifd_recv_setup(Error **errp)
{
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+ bool use_packets = multifd_use_packets();
uint8_t i;
/*
@@ -1305,9 +1349,12 @@ int multifd_recv_setup(Error **errp)
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem_sync, 0);
p->id = i;
- p->packet_len = sizeof(MultiFDPacket_t)
- + sizeof(uint64_t) * page_count;
- p->packet = g_malloc0(p->packet_len);
+
+ if (use_packets) {
+ p->packet_len = sizeof(MultiFDPacket_t)
+ + sizeof(uint64_t) * page_count;
+ p->packet = g_malloc0(p->packet_len);
+ }
p->name = g_strdup_printf("multifdrecv_%d", i);
p->iov = g_new0(struct iovec, page_count);
p->normal = g_new0(ram_addr_t, page_count);
@@ -1351,18 +1398,24 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
{
MultiFDRecvParams *p;
Error *local_err = NULL;
+ bool use_packets = multifd_use_packets();
int id;
- id = multifd_recv_initial_packet(ioc, &local_err);
- if (id < 0) {
- multifd_recv_terminate_threads(local_err);
- error_propagate_prepend(errp, local_err,
- "failed to receive packet"
- " via multifd channel %d: ",
- qatomic_read(&multifd_recv_state->count));
- return;
+ if (use_packets) {
+ id = multifd_recv_initial_packet(ioc, &local_err);
+ if (id < 0) {
+ multifd_recv_terminate_threads(local_err);
+ error_propagate_prepend(errp, local_err,
+ "failed to receive packet"
+ " via multifd channel %d: ",
+ qatomic_read(&multifd_recv_state->count));
+ return;
+ }
+ trace_multifd_recv_new_channel(id);
+ } else {
+ /* next patch gives this a meaningful value */
+ id = 0;
}
- trace_multifd_recv_new_channel(id);
p = &multifd_recv_state->params[id];
if (p->c != NULL) {