@@ -115,6 +115,28 @@ static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
}
}
+static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
+ Error **errp)
+{
+ Error *local_err = NULL;
+ uint64_t value;
+ int ret;
+
+ colo_receive_check_message(f, expect_msg, &local_err);
+ if (local_err) {
+ error_propagate(errp, local_err);
+ return 0;
+ }
+
+ value = qemu_get_be64(f);
+ ret = qemu_file_get_error(f);
+ if (ret < 0) {
+ error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
+ COLOMessage_lookup[expect_msg]);
+ }
+ return value;
+}
+
static int colo_do_checkpoint_transaction(MigrationState *s,
QIOChannelBuffer *bioc,
QEMUFile *fb)
@@ -290,6 +312,10 @@ static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
void *colo_process_incoming_thread(void *opaque)
{
MigrationIncomingState *mis = opaque;
+ QEMUFile *fb = NULL;
+ QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
+ uint64_t total_size;
+ uint64_t value;
Error *local_err = NULL;
migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
@@ -307,6 +333,10 @@ void *colo_process_incoming_thread(void *opaque)
*/
qemu_file_set_blocking(mis->from_src_file, true);
+ bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
+ fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
+ object_unref(OBJECT(bioc));
+
colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
&local_err);
if (local_err) {
@@ -334,7 +364,29 @@ void *colo_process_incoming_thread(void *opaque)
goto out;
}
- /* TODO: read migration data into colo buffer */
+ value = colo_receive_message_value(mis->from_src_file,
+ COLO_MESSAGE_VMSTATE_SIZE, &local_err);
+ if (local_err) {
+ goto out;
+ }
+
+ /*
+ * Read VM device state data into channel buffer,
+ * It's better to re-use the memory allocated.
+ * Here we need to handle the channel buffer directly.
+ */
+ if (value > bioc->capacity) {
+ bioc->capacity = value;
+ bioc->data = g_realloc(bioc->data, bioc->capacity);
+ }
+ total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
+ if (total_size != value) {
+ error_report("Got %" PRIu64 " VMState data, less than expected"
+ " %" PRIu64, total_size, value);
+ goto out;
+ }
+ bioc->usage = total_size;
+ qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
&local_err);
@@ -342,7 +394,14 @@ void *colo_process_incoming_thread(void *opaque)
goto out;
}
- /* TODO: load vm state */
+ qemu_mutex_lock_iothread();
+ qemu_system_reset(VMRESET_SILENT);
+ if (qemu_loadvm_state(fb) < 0) {
+ error_report("COLO: loadvm failed");
+ qemu_mutex_unlock_iothread();
+ goto out;
+ }
+ qemu_mutex_unlock_iothread();
colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
&local_err);
@@ -357,6 +416,10 @@ out:
error_report_err(local_err);
}
+ if (fb) {
+ qemu_fclose(fb);
+ }
+
if (mis->to_src_file) {
qemu_fclose(mis->to_src_file);
}