@@ -987,7 +987,7 @@ static void migration_ioc_process_incoming_no_header(QIOChannel *ioc,
/* Multiple connections */
assert(migration_needs_multiple_sockets());
if (migrate_multifd()) {
- multifd_recv_new_channel(ioc, &local_err);
+ multifd_recv_new_channel(ioc, false, &local_err);
} else {
assert(migrate_postcopy_preempt());
f = qemu_file_new_input(ioc);
@@ -1031,6 +1031,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
postcopy_preempt_new_channel(migration_incoming_get_current(), f);
break;
case MIG_CHANNEL_TYPE_MULTIFD:
+ case MIG_CHANNEL_TYPE_MULTIFD_DEVICE_STATE:
{
Error *local_err = NULL;
@@ -1039,7 +1040,9 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
return;
}
- multifd_recv_new_channel(ioc, &local_err);
+ multifd_recv_new_channel(ioc,
+ header.channel_type == MIG_CHANNEL_TYPE_MULTIFD_DEVICE_STATE,
+ &local_err);
if (local_err) {
error_propagate(errp, local_err);
return;
@@ -22,6 +22,7 @@
#include "file.h"
#include "migration.h"
#include "migration-stats.h"
+#include "savevm.h"
#include "socket.h"
#include "tls.h"
#include "qemu-file.h"
@@ -404,7 +405,7 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
uint32_t zero_num = pages->num - pages->normal_num;
int i;
- packet->flags = cpu_to_be32(p->flags);
+ packet->hdr.flags = cpu_to_be32(p->flags);
packet->pages_alloc = cpu_to_be32(p->pages->allocated);
packet->normal_pages = cpu_to_be32(pages->normal_num);
packet->zero_pages = cpu_to_be32(zero_num);
@@ -432,28 +433,44 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
p->flags, p->next_packet_size);
}
-static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p, MultiFDPacketHdr_t *hdr,
+ Error **errp)
{
- MultiFDPacket_t *packet = p->packet;
- int i;
-
- packet->magic = be32_to_cpu(packet->magic);
- if (packet->magic != MULTIFD_MAGIC) {
+ hdr->magic = be32_to_cpu(hdr->magic);
+ if (hdr->magic != MULTIFD_MAGIC) {
error_setg(errp, "multifd: received packet "
"magic %x and expected magic %x",
- packet->magic, MULTIFD_MAGIC);
+ hdr->magic, MULTIFD_MAGIC);
return -1;
}
- packet->version = be32_to_cpu(packet->version);
- if (packet->version != MULTIFD_VERSION) {
+ hdr->version = be32_to_cpu(hdr->version);
+ if (hdr->version != MULTIFD_VERSION) {
error_setg(errp, "multifd: received packet "
"version %u and expected version %u",
- packet->version, MULTIFD_VERSION);
+ hdr->version, MULTIFD_VERSION);
return -1;
}
- p->flags = be32_to_cpu(packet->flags);
+ p->flags = be32_to_cpu(hdr->flags);
+
+ return 0;
+}
+
+static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p, Error **errp)
+{
+ MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
+
+ packet->instance_id = be32_to_cpu(packet->instance_id);
+ p->next_packet_size = be32_to_cpu(packet->next_packet_size);
+
+ return 0;
+}
+
+static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
+{
+ MultiFDPacket_t *packet = p->packet;
+ int i;
packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
/*
@@ -485,7 +502,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
p->next_packet_size = be32_to_cpu(packet->next_packet_size);
p->packet_num = be64_to_cpu(packet->packet_num);
- p->packets_recved++;
p->total_normal_pages += p->normal_num;
p->total_zero_pages += p->zero_num;
@@ -533,6 +549,19 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
return 0;
}
+static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+{
+ p->packets_recved++;
+
+ if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
+ return multifd_recv_unfill_packet_device_state(p, errp);
+ } else {
+ return multifd_recv_unfill_packet_ram(p, errp);
+ }
+
+ g_assert_not_reached();
+}
+
static bool multifd_send_should_exit(void)
{
return qatomic_read(&multifd_send_state->exiting);
@@ -1239,8 +1268,8 @@ bool multifd_send_setup(void)
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
- p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
- p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+ p->packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
+ p->packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
/* We need one extra place for the packet header */
p->iov = g_new0(struct iovec, page_count + 1);
@@ -1415,6 +1444,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
p->packet_len = 0;
g_free(p->packet);
p->packet = NULL;
+ g_clear_pointer(&p->packet_dev_state, g_free);
g_free(p->iov);
p->iov = NULL;
g_free(p->normal);
@@ -1474,6 +1504,8 @@ void multifd_recv_sync_main(void)
for (i = 0; i < thread_count; i++) {
MultiFDRecvParams *p = &multifd_recv_state->params[i];
+ assert(!p->is_device_state_dedicated);
+
trace_multifd_recv_sync_main_signal(p->id);
qemu_sem_post(&p->sem);
}
@@ -1489,6 +1521,12 @@ void multifd_recv_sync_main(void)
* the work (pending_job=false).
*/
for (i = 0; i < thread_count; i++) {
+ MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+ if (p->is_device_state_dedicated) {
+ continue;
+ }
+
trace_multifd_recv_sync_main_wait(i);
qemu_sem_wait(&multifd_recv_state->sem_sync);
}
@@ -1507,6 +1545,10 @@ void multifd_recv_sync_main(void)
for (i = 0; i < thread_count; i++) {
MultiFDRecvParams *p = &multifd_recv_state->params[i];
+ if (p->is_device_state_dedicated) {
+ continue;
+ }
+
WITH_QEMU_LOCK_GUARD(&p->mutex) {
if (multifd_recv_state->packet_num < p->packet_num) {
multifd_recv_state->packet_num = p->packet_num;
@@ -1529,8 +1571,13 @@ static void *multifd_recv_thread(void *opaque)
rcu_register_thread();
while (true) {
+ MultiFDPacketHdr_t hdr;
uint32_t flags = 0;
+ bool is_device_state = false;
bool has_data = false;
+ uint8_t *pkt_buf;
+ size_t pkt_len;
+
p->normal_num = 0;
if (use_packets) {
@@ -1538,8 +1585,27 @@ static void *multifd_recv_thread(void *opaque)
break;
}
- ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
- p->packet_len, &local_err);
+ ret = qio_channel_read_all_eof(p->c, (void *)&hdr,
+ sizeof(hdr), &local_err);
+ if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
+ break;
+ }
+
+ ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
+ if (ret) {
+ break;
+ }
+
+ is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
+ if (is_device_state) {
+ pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
+ pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
+ } else {
+ pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
+ pkt_len = p->packet_len - sizeof(hdr);
+ }
+
+ ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len, &local_err);
if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
break;
}
@@ -1582,8 +1648,39 @@ static void *multifd_recv_thread(void *opaque)
has_data = !!p->data->size;
}
- if (has_data) {
- ret = multifd_recv_state->ops->recv(p, &local_err);
+ if (!is_device_state) {
+ if (p->is_device_state_dedicated) {
+ error_setg(&local_err,
+ "multifd: received non-device-state packet on device-state-dedicated thread");
+ break;
+ }
+
+ if (has_data) {
+ ret = multifd_recv_state->ops->recv(p, &local_err);
+ if (ret != 0) {
+ break;
+ }
+ }
+ } else {
+ g_autofree char *idstr = NULL;
+ g_autofree char *dev_state_buf = NULL;
+
+ assert(use_packets);
+
+ if (p->next_packet_size > 0) {
+ dev_state_buf = g_malloc(p->next_packet_size);
+
+ ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, &local_err);
+ if (ret != 0) {
+ break;
+ }
+ }
+
+ idstr = g_strndup(p->packet_dev_state->idstr, sizeof(p->packet_dev_state->idstr));
+ ret = qemu_loadvm_load_state_buffer(idstr,
+ p->packet_dev_state->instance_id,
+ dev_state_buf, p->next_packet_size,
+ &local_err);
if (ret != 0) {
break;
}
@@ -1591,6 +1688,11 @@ static void *multifd_recv_thread(void *opaque)
if (use_packets) {
if (flags & MULTIFD_FLAG_SYNC) {
+ if (is_device_state) {
+ error_setg(&local_err, "multifd: received SYNC device state packet");
+ break;
+ }
+
qemu_sem_post(&multifd_recv_state->sem_sync);
qemu_sem_wait(&p->sem_sync);
}
@@ -1662,6 +1764,7 @@ int multifd_recv_setup(Error **errp)
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
+ p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
}
p->name = g_strdup_printf("multifdrecv_%d", i);
p->iov = g_new0(struct iovec, page_count);
@@ -1703,7 +1806,9 @@ bool multifd_recv_all_channels_created(void)
* Try to receive all multifd channels to get ready for the migration.
* Sets @errp when failing to receive the current channel.
*/
-void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
+void multifd_recv_new_channel(QIOChannel *ioc,
+ bool is_device_state_dedicated,
+ Error **errp)
{
MultiFDRecvParams *p;
Error *local_err = NULL;
@@ -1733,6 +1838,7 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
error_propagate(errp, local_err);
return;
}
+ p->is_device_state_dedicated = is_device_state_dedicated;
p->c = ioc;
object_ref(OBJECT(ioc));
@@ -24,7 +24,7 @@ int multifd_recv_setup(Error **errp);
void multifd_recv_cleanup(void);
void multifd_recv_shutdown(void);
bool multifd_recv_all_channels_created(void);
-void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
+void multifd_recv_new_channel(QIOChannel *ioc, bool is_device_state_dedicated, Error **errp);
void multifd_recv_sync_main(void);
int multifd_send_sync_main(void);
bool multifd_queue_page(RAMBlock *block, ram_addr_t offset);
@@ -41,6 +41,12 @@ MultiFDRecvData *multifd_get_recv_data(void);
#define MULTIFD_FLAG_ZLIB (1 << 1)
#define MULTIFD_FLAG_ZSTD (2 << 1)
+/*
+ * If set it means that this packet contains device state
+ * (MultiFDPacketDeviceState_t), not RAM data (MultiFDPacket_t).
+ */
+#define MULTIFD_FLAG_DEVICE_STATE (1 << 4)
+
/* This value needs to be a multiple of qemu_target_page_size() */
#define MULTIFD_PACKET_SIZE (512 * 1024)
@@ -48,6 +54,11 @@ typedef struct {
uint32_t magic;
uint32_t version;
uint32_t flags;
+} __attribute__((packed)) MultiFDPacketHdr_t;
+
+typedef struct {
+ MultiFDPacketHdr_t hdr;
+
/* maximum number of allocated pages */
uint32_t pages_alloc;
/* non zero pages */
@@ -68,6 +79,16 @@ typedef struct {
uint64_t offset[];
} __attribute__((packed)) MultiFDPacket_t;
+typedef struct {
+ MultiFDPacketHdr_t hdr;
+
+ char idstr[256] QEMU_NONSTRING;
+ uint32_t instance_id;
+
+ /* size of the next packet that contains the actual data */
+ uint32_t next_packet_size;
+} __attribute__((packed)) MultiFDPacketDeviceState_t;
+
typedef struct {
/* number of used pages */
uint32_t num;
@@ -87,6 +108,13 @@ struct MultiFDRecvData {
off_t file_offset;
};
+typedef struct {
+ char *idstr;
+ uint32_t instance_id;
+ char *buf;
+ size_t buf_len;
+} MultiFDDeviceState_t;
+
typedef struct {
/* Fields are only written at creating/deletion time */
/* No lock required for them, they are read only */
@@ -175,6 +203,7 @@ typedef struct {
uint32_t page_size;
/* number of pages in a full packet */
uint32_t page_count;
+ bool is_device_state_dedicated;
/* syncs main thread and channels */
QemuSemaphore sem_sync;
@@ -194,8 +223,9 @@ typedef struct {
/* thread local variables. No locking required */
- /* pointer to the packet */
+ /* pointers to the possible packet types */
MultiFDPacket_t *packet;
+ MultiFDPacketDeviceState_t *packet_dev_state;
/* size of the next packet that contains pages */
uint32_t next_packet_size;
/* packets received through this channel */