@@ -68,6 +68,10 @@ typedef struct {
* Number of pages transferred that were not full of zeros.
*/
Stat64 normal_pages;
+ /*
+ * Number of pages transferred that were full of zeros.
+ */
+ Stat64 zero_pages;
/*
* Number of bytes sent during postcopy.
*/
@@ -97,10 +101,6 @@ typedef struct {
* Total number of bytes transferred.
*/
Stat64 transferred;
- /*
- * Number of pages transferred that were full of zeros.
- */
- Stat64 zero_pages;
} MigrationAtomicStats;
extern MigrationAtomicStats mig_stats;
@@ -12,6 +12,7 @@
#include "qemu/osdep.h"
#include "qemu/rcu.h"
+#include "qemu/cutils.h"
#include "exec/target_page.h"
#include "sysemu/sysemu.h"
#include "exec/ramblock.h"
@@ -268,6 +269,7 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
packet->normal_pages = cpu_to_be32(p->normal_num);
packet->next_packet_size = cpu_to_be32(p->next_packet_size);
packet->packet_num = cpu_to_be64(p->packet_num);
+ packet->zero_pages = cpu_to_be32(p->zero_num);
if (p->pages->block) {
strncpy(packet->ramblock, p->pages->block->idstr, 256);
@@ -279,6 +281,12 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
packet->offset[i] = cpu_to_be64(temp);
}
+ for (i = 0; i < p->zero_num; i++) {
+ /* there are architectures where ram_addr_t is 32 bit */
+ uint64_t temp = p->zero[i];
+
+ packet->offset[p->normal_num + i] = cpu_to_be64(temp);
+ }
}
static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
@@ -327,7 +335,15 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
p->next_packet_size = be32_to_cpu(packet->next_packet_size);
p->packet_num = be64_to_cpu(packet->packet_num);
- if (p->normal_num == 0) {
+ p->zero_num = be32_to_cpu(packet->zero_pages);
+ if (p->zero_num > packet->pages_alloc - p->normal_num) {
+ error_setg(errp, "multifd: received packet "
+ "with %u zero pages and expected maximum pages are %u",
+ p->zero_num, packet->pages_alloc - p->normal_num) ;
+ return -1;
+ }
+
+ if (p->normal_num == 0 && p->zero_num == 0) {
return 0;
}
@@ -353,6 +369,18 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
p->normal[i] = offset;
}
+ for (i = 0; i < p->zero_num; i++) {
+ uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]);
+
+ if (offset > (p->block->used_length - p->page_size)) {
+ error_setg(errp, "multifd: offset too long %" PRIu64
+ " (max " RAM_ADDR_FMT ")",
+ offset, p->block->used_length);
+ return -1;
+ }
+ p->zero[i] = offset;
+ }
+
return 0;
}
@@ -432,6 +460,8 @@ static int multifd_send_pages(QEMUFile *f)
p->packet_num = multifd_send_state->packet_num++;
multifd_send_state->pages = p->pages;
p->pages = pages;
+ stat64_add(&mig_stats.normal_pages, p->normal_num);
+ stat64_add(&mig_stats.zero_pages, p->zero_num);
qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&p->sem);
@@ -548,6 +578,8 @@ void multifd_save_cleanup(void)
p->iov = NULL;
g_free(p->normal);
p->normal = NULL;
+ g_free(p->zero);
+ p->zero = NULL;
multifd_send_state->ops->send_cleanup(p, &local_err);
if (local_err) {
migrate_set_error(migrate_get_current(), local_err);
@@ -622,7 +654,7 @@ int multifd_send_sync_main(QEMUFile *f)
}
p->packet_num = multifd_send_state->packet_num++;
- p->flags |= MULTIFD_FLAG_SYNC;
+ p->sync_needed = true;
p->pending_job++;
qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&p->sem);
@@ -648,6 +680,8 @@ static void *multifd_send_thread(void *opaque)
MultiFDSendParams *p = opaque;
MigrationThread *thread = NULL;
Error *local_err = NULL;
+ /* older qemu don't understand zero page on multifd channel */
+ bool use_multifd_zero_page = !migrate_use_main_zero_page();
int ret = 0;
bool use_zero_copy_send = migrate_zero_copy_send();
@@ -673,9 +707,17 @@ static void *multifd_send_thread(void *opaque)
qemu_mutex_lock(&p->mutex);
if (p->pending_job) {
+ RAMBlock *rb = p->pages->block;
uint64_t packet_num = p->packet_num;
- uint32_t flags;
+ p->flags = 0;
+ if (p->sync_needed) {
+ p->flags |= MULTIFD_FLAG_SYNC;
+ p->sync_needed = false;
+ }
+ qemu_mutex_unlock(&p->mutex);
+
p->normal_num = 0;
+ p->zero_num = 0;
if (use_zero_copy_send) {
p->iovs_num = 0;
@@ -684,27 +726,27 @@ static void *multifd_send_thread(void *opaque)
}
for (int i = 0; i < p->pages->num; i++) {
- p->normal[p->normal_num] = p->pages->offset[i];
- p->normal_num++;
+ uint64_t offset = p->pages->offset[i];
+ if (use_multifd_zero_page &&
+ buffer_is_zero(rb->host + offset, p->page_size)) {
+ p->zero[p->zero_num] = offset;
+ p->zero_num++;
+ ram_release_page(rb->idstr, offset);
+ } else {
+ p->normal[p->normal_num] = offset;
+ p->normal_num++;
+ }
}
if (p->normal_num) {
ret = multifd_send_state->ops->send_prepare(p, &local_err);
if (ret != 0) {
- qemu_mutex_unlock(&p->mutex);
break;
}
}
multifd_send_fill_packet(p);
- flags = p->flags;
- p->flags = 0;
- p->num_packets++;
- p->total_normal_pages += p->normal_num;
- p->pages->num = 0;
- p->pages->block = NULL;
- qemu_mutex_unlock(&p->mutex);
- trace_multifd_send(p->id, packet_num, p->normal_num, flags,
+ trace_multifd_send(p->id, packet_num, p->normal_num, p->zero_num, p->flags,
p->next_packet_size);
if (use_zero_copy_send) {
@@ -731,10 +773,15 @@ static void *multifd_send_thread(void *opaque)
stat64_add(&mig_stats.multifd_bytes, p->next_packet_size);
stat64_add(&mig_stats.transferred, p->next_packet_size);
qemu_mutex_lock(&p->mutex);
+ p->num_packets++;
+ p->total_normal_pages += p->normal_num;
+ p->total_zero_pages += p->zero_num;
+ p->pages->num = 0;
+ p->pages->block = NULL;
p->pending_job--;
qemu_mutex_unlock(&p->mutex);
- if (flags & MULTIFD_FLAG_SYNC) {
+ if (p->flags & MULTIFD_FLAG_SYNC) {
qemu_sem_post(&p->sem_sync);
}
} else if (p->quit) {
@@ -768,7 +815,8 @@ out:
rcu_unregister_thread();
migration_threads_remove(thread);
- trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages);
+ trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages,
+ p->total_zero_pages);
return NULL;
}
@@ -944,6 +992,7 @@ int multifd_save_setup(Error **errp)
p->normal = g_new0(ram_addr_t, page_count);
p->page_size = qemu_target_page_size();
p->page_count = page_count;
+ p->zero = g_new0(ram_addr_t, page_count);
if (migrate_zero_copy_send()) {
p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
@@ -1059,6 +1108,8 @@ void multifd_load_cleanup(void)
p->iov = NULL;
g_free(p->normal);
p->normal = NULL;
+ g_free(p->zero);
+ p->zero = NULL;
multifd_recv_state->ops->recv_cleanup(p);
}
qemu_sem_destroy(&multifd_recv_state->sem_sync);
@@ -1105,7 +1156,7 @@ static void *multifd_recv_thread(void *opaque)
rcu_register_thread();
while (true) {
- uint32_t flags;
+ bool sync_needed = false;
if (p->quit) {
break;
@@ -1124,13 +1175,14 @@ static void *multifd_recv_thread(void *opaque)
break;
}
- flags = p->flags;
+ trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num,
+ p->flags, p->next_packet_size);
+ sync_needed = p->flags & MULTIFD_FLAG_SYNC;
/* recv methods don't know how to handle the SYNC flag */
p->flags &= ~MULTIFD_FLAG_SYNC;
- trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
- p->next_packet_size);
p->num_packets++;
p->total_normal_pages += p->normal_num;
+ p->total_zero_pages += p->zero_num;
qemu_mutex_unlock(&p->mutex);
if (p->normal_num) {
@@ -1140,7 +1192,14 @@ static void *multifd_recv_thread(void *opaque)
}
}
- if (flags & MULTIFD_FLAG_SYNC) {
+ for (int i = 0; i < p->zero_num; i++) {
+ void *page = p->host + p->zero[i];
+ if (!buffer_is_zero(page, p->page_size)) {
+ memset(page, 0, p->page_size);
+ }
+ }
+
+ if (sync_needed) {
qemu_sem_post(&multifd_recv_state->sem_sync);
qemu_sem_wait(&p->sem_sync);
}
@@ -1155,7 +1214,8 @@ static void *multifd_recv_thread(void *opaque)
qemu_mutex_unlock(&p->mutex);
rcu_unregister_thread();
- trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages);
+ trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages,
+ p->total_zero_pages);
return NULL;
}
@@ -1196,6 +1256,7 @@ int multifd_load_setup(Error **errp)
p->normal = g_new0(ram_addr_t, page_count);
p->page_count = page_count;
p->page_size = qemu_target_page_size();
+ p->zero = g_new0(ram_addr_t, page_count);
}
for (i = 0; i < thread_count; i++) {
@@ -48,7 +48,10 @@ typedef struct {
/* size of the next packet that contains pages */
uint32_t next_packet_size;
uint64_t packet_num;
- uint64_t unused[4]; /* Reserved for future use */
+ /* zero pages */
+ uint32_t zero_pages;
+ uint32_t unused32[1]; /* Reserved for future use */
+ uint64_t unused64[3]; /* Reserved for future use */
char ramblock[256];
uint64_t offset[];
} __attribute__((packed)) MultiFDPacket_t;
@@ -118,10 +121,14 @@ typedef struct {
MultiFDPacket_t *packet;
/* size of the next packet that contains pages */
uint32_t next_packet_size;
+ /* Do we need to do an iteration sync */
+ bool sync_needed;
/* packets sent through this channel */
uint64_t num_packets;
/* non zero pages sent through this channel */
uint64_t total_normal_pages;
+ /* zero pages sent through this channel */
+ uint64_t total_zero_pages;
/* buffers to send */
struct iovec *iov;
/* number of iovs used */
@@ -130,6 +137,10 @@ typedef struct {
ram_addr_t *normal;
/* num of non zero pages */
uint32_t normal_num;
+ /* Pages that are zero */
+ ram_addr_t *zero;
+ /* num of zero pages */
+ uint32_t zero_num;
/* used for compression methods */
void *data;
} MultiFDSendParams;
@@ -181,12 +192,18 @@ typedef struct {
uint8_t *host;
/* non zero pages recv through this channel */
uint64_t total_normal_pages;
+ /* zero pages recv through this channel */
+ uint64_t total_zero_pages;
/* buffers to recv */
struct iovec *iov;
/* Pages that are not zero */
ram_addr_t *normal;
/* num of non zero pages */
uint32_t normal_num;
+ /* Pages that are zero */
+ ram_addr_t *zero;
+ /* num of zero pages */
+ uint32_t zero_num;
/* used for de-compression methods */
void *data;
} MultiFDRecvParams;
@@ -189,6 +189,7 @@ Property migration_properties[] = {
DEFINE_PROP_MIG_CAP("x-block", MIGRATION_CAPABILITY_BLOCK),
DEFINE_PROP_MIG_CAP("x-return-path", MIGRATION_CAPABILITY_RETURN_PATH),
DEFINE_PROP_MIG_CAP("x-multifd", MIGRATION_CAPABILITY_MULTIFD),
+ DEFINE_PROP_MIG_CAP("x-main-zero-page", MIGRATION_CAPABILITY_MAIN_ZERO_PAGE),
DEFINE_PROP_MIG_CAP("x-background-snapshot",
MIGRATION_CAPABILITY_BACKGROUND_SNAPSHOT),
#ifdef CONFIG_LINUX
@@ -278,6 +279,13 @@ bool migrate_multifd(void)
return s->capabilities[MIGRATION_CAPABILITY_MULTIFD];
}
+bool migrate_use_main_zero_page(void)
+{
+ MigrationState *s = migrate_get_current();
+
+ return s->capabilities[MIGRATION_CAPABILITY_MAIN_ZERO_PAGE];
+}
+
bool migrate_pause_before_switchover(void)
{
MigrationState *s = migrate_get_current();
@@ -431,6 +439,7 @@ INITIALIZE_MIGRATE_CAPS_SET(check_caps_background_snapshot,
MIGRATION_CAPABILITY_LATE_BLOCK_ACTIVATE,
MIGRATION_CAPABILITY_RETURN_PATH,
MIGRATION_CAPABILITY_MULTIFD,
+ MIGRATION_CAPABILITY_MAIN_ZERO_PAGE,
MIGRATION_CAPABILITY_PAUSE_BEFORE_SWITCHOVER,
MIGRATION_CAPABILITY_AUTO_CONVERGE,
MIGRATION_CAPABILITY_RELEASE_RAM,
@@ -499,6 +508,9 @@ bool migrate_caps_check(bool *old_caps, bool *new_caps, Error **errp)
error_setg(errp, "Postcopy is not yet compatible with multifd");
return false;
}
+ if (new_caps[MIGRATION_CAPABILITY_MAIN_ZERO_PAGE]) {
+ error_setg(errp, "Postcopy is not yet compatible with main zero copy");
+ }
}
if (new_caps[MIGRATION_CAPABILITY_BACKGROUND_SNAPSHOT]) {
@@ -85,6 +85,7 @@ int migrate_multifd_channels(void);
MultiFDCompression migrate_multifd_compression(void);
int migrate_multifd_zlib_level(void);
int migrate_multifd_zstd_level(void);
+bool migrate_use_main_zero_page(void);
uint8_t migrate_throttle_trigger_threshold(void);
const char *migrate_tls_authz(void);
const char *migrate_tls_creds(void);
@@ -1292,7 +1292,6 @@ static int ram_save_multifd_page(QEMUFile *file, RAMBlock *block,
if (multifd_queue_page(file, block, offset) < 0) {
return -1;
}
- stat64_add(&mig_stats.normal_pages, 1);
return 1;
}
@@ -2149,17 +2148,43 @@ static int ram_save_target_page_legacy(RAMState *rs, PageSearchStatus *pss)
}
return res;
}
-
/*
- * Do not use multifd in postcopy as one whole host page should be
- * placed. Meanwhile postcopy requires atomic update of pages, so even
- * if host page size == guest page size the dest guest during run may
- * still see partially copied pages which is data corruption.
+ * Do not use multifd for:
+ * 1. Compression as the first page in the new block should be posted out
+ * before sending the compressed page
+ * 2. In postcopy as one whole host page should be placed
*/
- if (migrate_multifd() && !migration_in_postcopy()) {
+ if (!save_page_use_compression(rs) && migrate_multifd()
+ && !migration_in_postcopy()) {
+ return ram_save_multifd_page(pss->pss_channel, block, offset);
+ }
+
+ return ram_save_page(rs, pss);
+}
+
+/**
+ * ram_save_target_page_multifd: save one target page
+ *
+ * Returns the number of pages written
+ *
+ * @rs: current RAM state
+ * @pss: data about the page we want to send
+ */
+static int ram_save_target_page_multifd(RAMState *rs, PageSearchStatus *pss)
+{
+ RAMBlock *block = pss->block;
+ ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS;
+ int res;
+
+ if (!migration_in_postcopy()) {
return ram_save_multifd_page(pss->pss_channel, block, offset);
}
+ res = save_zero_page(pss, pss->pss_channel, block, offset);
+ if (res > 0) {
+ return res;
+ }
+
return ram_save_page(rs, pss);
}
@@ -3066,7 +3091,13 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
ram_control_after_iterate(f, RAM_CONTROL_SETUP);
migration_ops = g_malloc0(sizeof(MigrationOps));
- migration_ops->ram_save_target_page = ram_save_target_page_legacy;
+
+ if (migrate_multifd() && !migrate_use_main_zero_page()) {
+ migration_ops->ram_save_target_page = ram_save_target_page_multifd;
+ } else {
+ migration_ops->ram_save_target_page = ram_save_target_page_legacy;
+ }
+
ret = multifd_send_sync_main(f);
if (ret < 0) {
return ret;
@@ -125,21 +125,21 @@ postcopy_preempt_reset_channel(void) ""
# multifd.c
multifd_new_send_channel_async(uint8_t id) "channel %u"
-multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " pages %u flags 0x%x next packet size %u"
+multifd_recv(uint8_t id, uint64_t packet_num, uint32_t normal, uint32_t zero, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u zero pages %u flags 0x%x next packet size %u"
multifd_recv_new_channel(uint8_t id) "channel %u"
multifd_recv_sync_main(long packet_num) "packet num %ld"
multifd_recv_sync_main_signal(uint8_t id) "channel %u"
multifd_recv_sync_main_wait(uint8_t id) "channel %u"
multifd_recv_terminate_threads(bool error) "error %d"
-multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %u packets %" PRIu64 " pages %" PRIu64
+multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages, uint64_t zero_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 " zero pages %" PRIu64
multifd_recv_thread_start(uint8_t id) "%u"
-multifd_send(uint8_t id, uint64_t packet_num, uint32_t normal, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u flags 0x%x next packet size %u"
+multifd_send(uint8_t id, uint64_t packet_num, uint32_t normalpages, uint32_t zero_pages, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u zero pages %u flags 0x%x next packet size %u"
multifd_send_error(uint8_t id) "channel %u"
multifd_send_sync_main(long packet_num) "packet num %ld"
multifd_send_sync_main_signal(uint8_t id) "channel %u"
multifd_send_sync_main_wait(uint8_t id) "channel %u"
multifd_send_terminate_threads(bool error) "error %d"
-multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64
+multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages, uint64_t zero_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 " zero pages %" PRIu64
multifd_send_thread_start(uint8_t id) "%u"
multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s"
multifd_tls_outgoing_handshake_error(void *ioc, const char *err) "ioc=%p err=%s"
@@ -508,6 +508,11 @@
# and should not affect the correctness of postcopy migration.
# (since 7.1)
#
+# @main-zero-page: If enabled, the detection of zero pages will be
+# done on the main thread. Otherwise it is done on
+# the multifd threads.
+# (since 7.1)
+#
# @switchover-ack: If enabled, migration will not stop the source VM
# and complete the migration until an ACK is received from the
# destination that it's OK to do so. Exactly when this ACK is
@@ -540,7 +545,7 @@
{ 'name': 'x-ignore-shared', 'features': [ 'unstable' ] },
'validate-uuid', 'background-snapshot',
'zero-copy-send', 'postcopy-preempt', 'switchover-ack',
- 'dirty-limit'] }
+ 'dirty-limit', 'main-zero-page'] }
##
# @MigrationCapabilityStatus: