@@ -664,13 +664,138 @@ void migrate_start_colo_process(MigrationState *s)
qemu_mutex_lock_iothread();
}
-static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
- Error **errp)
+static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
+ QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
+{
+ uint64_t total_size;
+ uint64_t value;
+ Error *local_err = NULL;
+ int ret;
+
+ qemu_mutex_lock_iothread();
+ vm_stop_force_state(RUN_STATE_COLO);
+ trace_colo_vm_state_change("run", "stop");
+ qemu_mutex_unlock_iothread();
+
+ /* FIXME: This is unnecessary for periodic checkpoint mode */
+ colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
+ &local_err);
+ if (local_err) {
+ error_propagate(errp, local_err);
+ return;
+ }
+
+ colo_receive_check_message(mis->from_src_file,
+ COLO_MESSAGE_VMSTATE_SEND, &local_err);
+ if (local_err) {
+ error_propagate(errp, local_err);
+ return;
+ }
+
+ qemu_mutex_lock_iothread();
+ cpu_synchronize_all_pre_loadvm();
+ ret = qemu_loadvm_state_main(mis->from_src_file, mis);
+ qemu_mutex_unlock_iothread();
+
+ if (ret < 0) {
+ error_setg(errp, "Load VM's live state (ram) error");
+ return;
+ }
+
+ value = colo_receive_message_value(mis->from_src_file,
+ COLO_MESSAGE_VMSTATE_SIZE, &local_err);
+ if (local_err) {
+ error_propagate(errp, local_err);
+ return;
+ }
+
+ /*
+ * Read VM device state data into channel buffer,
+ * It's better to re-use the memory allocated.
+ * Here we need to handle the channel buffer directly.
+ */
+ if (value > bioc->capacity) {
+ bioc->capacity = value;
+ bioc->data = g_realloc(bioc->data, bioc->capacity);
+ }
+ total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
+ if (total_size != value) {
+ error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
+ " %" PRIu64, total_size, value);
+ return;
+ }
+ bioc->usage = total_size;
+ qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
+
+ colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
+ &local_err);
+ if (local_err) {
+ error_propagate(errp, local_err);
+ return;
+ }
+
+ qemu_mutex_lock_iothread();
+ vmstate_loading = true;
+ ret = qemu_load_device_state(fb);
+ if (ret < 0) {
+ error_setg(errp, "COLO: load device state failed");
+ qemu_mutex_unlock_iothread();
+ return;
+ }
+
+#ifdef CONFIG_REPLICATION
+ replication_get_error_all(&local_err);
+ if (local_err) {
+ error_propagate(errp, local_err);
+ qemu_mutex_unlock_iothread();
+ return;
+ }
+
+ /* discard colo disk buffer */
+ replication_do_checkpoint_all(&local_err);
+ if (local_err) {
+ error_propagate(errp, local_err);
+ qemu_mutex_unlock_iothread();
+ return;
+ }
+#else
+ abort();
+#endif
+ /* Notify all filters of all NIC to do checkpoint */
+ colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
+
+ if (local_err) {
+ error_propagate(errp, local_err);
+ qemu_mutex_unlock_iothread();
+ return;
+ }
+
+ vmstate_loading = false;
+ vm_start();
+ trace_colo_vm_state_change("stop", "run");
+ qemu_mutex_unlock_iothread();
+
+ if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
+ failover_set_state(FAILOVER_STATUS_RELAUNCH,
+ FAILOVER_STATUS_NONE);
+ failover_request_active(NULL);
+ return;
+ }
+
+ colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
+ &local_err);
+ if (local_err) {
+ error_propagate(errp, local_err);
+ }
+}
+
+static void colo_wait_handle_message(MigrationIncomingState *mis,
+ QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
{
COLOMessage msg;
Error *local_err = NULL;
- msg = colo_receive_message(f, &local_err);
+ msg = colo_receive_message(mis->from_src_file, &local_err);
if (local_err) {
error_propagate(errp, local_err);
return;
@@ -678,10 +803,9 @@ static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
switch (msg) {
case COLO_MESSAGE_CHECKPOINT_REQUEST:
- *checkpoint_request = 1;
+ colo_incoming_process_checkpoint(mis, fb, bioc, errp);
break;
default:
- *checkpoint_request = 0;
error_setg(errp, "Got unknown COLO message: %d", msg);
break;
}
@@ -692,10 +816,7 @@ void *colo_process_incoming_thread(void *opaque)
MigrationIncomingState *mis = opaque;
QEMUFile *fb = NULL;
QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
- uint64_t total_size;
- uint64_t value;
Error *local_err = NULL;
- int ret;
rcu_register_thread();
qemu_sem_init(&mis->colo_incoming_sem, 0);
@@ -749,134 +870,19 @@ void *colo_process_incoming_thread(void *opaque)
}
while (mis->state == MIGRATION_STATUS_COLO) {
- int request = 0;
-
- colo_wait_handle_message(mis->from_src_file, &request, &local_err);
+ colo_wait_handle_message(mis, fb, bioc, &local_err);
if (local_err) {
- goto out;
+ error_report_err(local_err);
+ break;
}
- assert(request);
if (failover_get_state() != FAILOVER_STATUS_NONE) {
error_report("failover request");
- goto out;
- }
-
- qemu_mutex_lock_iothread();
- vm_stop_force_state(RUN_STATE_COLO);
- trace_colo_vm_state_change("run", "stop");
- qemu_mutex_unlock_iothread();
-
- /* FIXME: This is unnecessary for periodic checkpoint mode */
- colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
- &local_err);
- if (local_err) {
- goto out;
- }
-
- colo_receive_check_message(mis->from_src_file,
- COLO_MESSAGE_VMSTATE_SEND, &local_err);
- if (local_err) {
- goto out;
- }
-
- qemu_mutex_lock_iothread();
- cpu_synchronize_all_pre_loadvm();
- ret = qemu_loadvm_state_main(mis->from_src_file, mis);
- qemu_mutex_unlock_iothread();
-
- if (ret < 0) {
- error_report("Load VM's live state (ram) error");
- goto out;
- }
-
- value = colo_receive_message_value(mis->from_src_file,
- COLO_MESSAGE_VMSTATE_SIZE, &local_err);
- if (local_err) {
- goto out;
- }
-
- /*
- * Read VM device state data into channel buffer,
- * It's better to re-use the memory allocated.
- * Here we need to handle the channel buffer directly.
- */
- if (value > bioc->capacity) {
- bioc->capacity = value;
- bioc->data = g_realloc(bioc->data, bioc->capacity);
- }
- total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
- if (total_size != value) {
- error_report("Got %" PRIu64 " VMState data, less than expected"
- " %" PRIu64, total_size, value);
- goto out;
- }
- bioc->usage = total_size;
- qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
-
- colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
- &local_err);
- if (local_err) {
- goto out;
- }
-
- qemu_mutex_lock_iothread();
- vmstate_loading = true;
- ret = qemu_load_device_state(fb);
- if (ret < 0) {
- error_report("COLO: load device state failed");
- qemu_mutex_unlock_iothread();
- goto out;
- }
-
-#ifdef CONFIG_REPLICATION
- replication_get_error_all(&local_err);
- if (local_err) {
- qemu_mutex_unlock_iothread();
- goto out;
- }
-
- /* discard colo disk buffer */
- replication_do_checkpoint_all(&local_err);
- if (local_err) {
- qemu_mutex_unlock_iothread();
- goto out;
- }
-#else
- abort();
-#endif
- /* Notify all filters of all NIC to do checkpoint */
- colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
-
- if (local_err) {
- qemu_mutex_unlock_iothread();
- goto out;
- }
-
- vmstate_loading = false;
- vm_start();
- trace_colo_vm_state_change("stop", "run");
- qemu_mutex_unlock_iothread();
-
- if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
- failover_set_state(FAILOVER_STATUS_RELAUNCH,
- FAILOVER_STATUS_NONE);
- failover_request_active(NULL);
- goto out;
- }
-
- colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
- &local_err);
- if (local_err) {
- goto out;
+ break;
}
}
out:
vmstate_loading = false;
- /* Throw the unreported error message after exited from loop */
- if (local_err) {
- error_report_err(local_err);
- }
/*
* There are only two reasons we can get here, some error happened