@@ -21,6 +21,7 @@
#include "qemu_socket.h"
#include "block-migration.h"
#include "qemu-objects.h"
+#include "event-tap.h"
//#define DEBUG_MIGRATION
@@ -309,6 +310,20 @@ void migrate_fd_put_notify(void *opaque)
qemu_file_put_notify(s->file);
}
+static void migrate_fd_get_notify(void *opaque)
+{
+ FdMigrationState *s = opaque;
+
+ qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
+ qemu_file_get_notify(s->file);
+ if (qemu_file_has_error(s->file)) {
+ ft_mode = FT_ERROR;
+ qemu_savevm_state_cancel(s->mon, s->file);
+ migrate_fd_error(s);
+ event_tap_unregister();
+ }
+}
+
ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
{
FdMigrationState *s = opaque;
@@ -333,15 +348,20 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
return ret;
}
-int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, int size)
+int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, size_t size)
{
FdMigrationState *s = opaque;
- ssize_t ret;
+ int ret;
ret = s->read(s, data, size);
- if (ret == -1)
+ if (ret == -1) {
ret = -(s->get_error(s));
-
+ }
+
+ if (ret == -EAGAIN) {
+ qemu_set_fd_handler2(s->fd, NULL, migrate_fd_get_notify, NULL, s);
+ }
+
return ret;
}
@@ -368,6 +388,231 @@ void migrate_fd_connect(FdMigrationState *s)
migrate_fd_put_ready(s);
}
+static void migrate_ft_trans_error(FdMigrationState *s)
+{
+ ft_mode = FT_ERROR;
+ qemu_savevm_state_cancel(s->mon, s->file);
+ migrate_fd_error(s);
+ event_tap_unregister();
+}
+
+static int migrate_ft_trans_commit(void *opaque)
+{
+ FdMigrationState *s = opaque;
+ int ret = -1;
+
+ if (ft_mode != FT_TRANSACTION_COMMIT && ft_mode != FT_TRANSACTION_ATOMIC) {
+ fprintf(stderr,
+ "migrate_ft_trans_commit: invalid ft_mode %d\n", ft_mode);
+ goto out;
+ }
+
+ do {
+ if (ft_mode == FT_TRANSACTION_ATOMIC) {
+ if (qemu_ft_trans_begin(s->file) < 0) {
+ fprintf(stderr, "qemu_ft_trans_begin failed\n");
+ goto out;
+ }
+
+ if ((ret = qemu_savevm_trans_begin(s->mon, s->file, 0)) < 0) {
+ fprintf(stderr, "qemu_savevm_trans_begin failed\n");
+ goto out;
+ }
+
+ ft_mode = FT_TRANSACTION_COMMIT;
+ if (ret) {
+ /* don't proceed until if fd isn't ready */
+ goto out;
+ }
+ }
+
+ /* make the VM state consistent by flushing outstanding events */
+ vm_stop(0);
+
+ /* send at full speed */
+ qemu_file_set_rate_limit(s->file, 0);
+
+ if ((ret = qemu_savevm_trans_complete(s->mon, s->file)) < 0) {
+ fprintf(stderr, "qemu_savevm_trans_complete failed\n");
+ goto out;
+ }
+
+ if (ret) {
+ /* don't proceed until if fd isn't ready */
+ ret = 1;
+ goto out;
+ }
+
+ if ((ret = qemu_ft_trans_commit(s->file)) < 0) {
+ fprintf(stderr, "qemu_ft_trans_commit failed\n");
+ goto out;
+ }
+
+ if (ret) {
+ ft_mode = FT_TRANSACTION_RECV;
+ ret = 1;
+ goto out;
+ }
+
+ /* flush and check if events are remaining */
+ if ((ret = event_tap_flush_one()) < 0) {
+ fprintf(stderr, "event_tap_flush_one failed\n");
+ goto out;
+ }
+
+ ft_mode = ret ? FT_TRANSACTION_BEGIN : FT_TRANSACTION_ATOMIC;
+ } while (ft_mode != FT_TRANSACTION_BEGIN);
+
+ vm_start();
+ ret = 0;
+
+out:
+ return ret;
+}
+
+static int migrate_ft_trans_get_ready(void *opaque)
+{
+ FdMigrationState *s = opaque;
+ int ret = -1;
+
+ if (ft_mode != FT_TRANSACTION_RECV) {
+ fprintf(stderr,
+ "migrate_ft_trans_get_ready: invalid ft_mode %d\n", ft_mode);
+ goto error_out;
+ }
+
+ /* flush and check if events are remaining */
+ if ((ret = event_tap_flush_one()) < 0) {
+ fprintf(stderr, "event_tap_flush_one failed\n");
+ goto error_out;
+ }
+
+ if (ret) {
+ ft_mode = FT_TRANSACTION_BEGIN;
+ } else {
+ ft_mode = FT_TRANSACTION_ATOMIC;
+ if ((ret = migrate_ft_trans_commit(s)) < 0) {
+ goto error_out;
+ }
+ if (ret) {
+ goto out;
+ }
+ }
+
+ vm_start();
+ ret = 0;
+ goto out;
+
+error_out:
+ migrate_ft_trans_error(s);
+
+out:
+ return ret;
+}
+
+static int migrate_ft_trans_put_ready(void)
+{
+ FdMigrationState *s = migrate_to_fms(current_migration);
+ int ret = -1, init = 0, timeout;
+ static int64_t start, now;
+
+ switch (ft_mode) {
+ case FT_INIT:
+ init = 1;
+ case FT_TRANSACTION_BEGIN:
+ now = start = qemu_get_clock(vm_clock);
+ /* start transatcion at best effort */
+ qemu_file_set_rate_limit(s->file, 1);
+
+ if (qemu_ft_trans_begin(s->file) < 0) {
+ fprintf(stderr, "qemu_transaction_begin failed\n");
+ goto error_out;
+ }
+
+ vm_stop(0);
+
+ if ((ret = qemu_savevm_trans_begin(s->mon, s->file, init)) < 0) {
+ fprintf(stderr, "qemu_savevm_trans_begin\n");
+ goto error_out;
+ }
+
+ if (ret) {
+ ft_mode = FT_TRANSACTION_ITER;
+ vm_start();
+ } else {
+ ft_mode = FT_TRANSACTION_COMMIT;
+ if (migrate_ft_trans_commit(s) < 0) {
+ goto error_out;
+ }
+ }
+ break;
+
+ case FT_TRANSACTION_ITER:
+ now = qemu_get_clock(vm_clock);
+ timeout = ((now - start) >= max_downtime);
+ if (timeout || qemu_savevm_state_iterate(s->mon, s->file) == 1) {
+ DPRINTF("ft trans iter timeout %d\n", timeout);
+
+ ft_mode = FT_TRANSACTION_COMMIT;
+ if (migrate_ft_trans_commit(s) < 0) {
+ goto error_out;
+ }
+ return 1;
+ }
+
+ ft_mode = FT_TRANSACTION_ITER;
+ break;
+
+ case FT_TRANSACTION_ATOMIC:
+ case FT_TRANSACTION_COMMIT:
+ if (migrate_ft_trans_commit(s) < 0) {
+ goto error_out;
+ }
+ break;
+
+ default:
+ fprintf(stderr,
+ "migrate_ft_trans_put_ready: invalid ft_mode %d", ft_mode);
+ goto error_out;
+ }
+
+ ret = 0;
+ goto out;
+
+error_out:
+ migrate_ft_trans_error(s);
+
+out:
+ return ret;
+}
+
+static void migrate_ft_trans_connect(FdMigrationState *s, int old_vm_running)
+{
+ /* close buffered_file and open ft_trans_file
+ * NB: fd won't get closed, and reused by ft_trans_file
+ */
+ qemu_fclose(s->file);
+
+ s->file = qemu_fopen_ops_ft_trans(s,
+ migrate_fd_put_buffer,
+ migrate_fd_get_buffer,
+ migrate_ft_trans_put_ready,
+ migrate_ft_trans_get_ready,
+ migrate_fd_wait_for_unfreeze,
+ migrate_fd_close,
+ 1);
+ socket_set_nodelay(s->fd);
+
+ /* events are tapped from now */
+ if (event_tap_register(migrate_ft_trans_put_ready) < 0) {
+ migrate_ft_trans_error(s);
+ }
+
+ if (old_vm_running) {
+ vm_start();
+ }
+}
+
void migrate_fd_put_ready(void *opaque)
{
FdMigrationState *s = opaque;
@@ -393,6 +638,11 @@ void migrate_fd_put_ready(void *opaque)
} else {
state = MIG_STATE_COMPLETED;
}
+
+ if (ft_mode && state == MIG_STATE_COMPLETED) {
+ return migrate_ft_trans_connect(s, old_vm_running);
+ }
+
if (migrate_fd_cleanup(s) < 0) {
if (old_vm_running) {
vm_start();
@@ -419,8 +669,14 @@ void migrate_fd_cancel(MigrationState *mig_state)
DPRINTF("cancelling migration\n");
s->state = MIG_STATE_CANCELLED;
- qemu_savevm_state_cancel(s->mon, s->file);
+ if (ft_mode) {
+ qemu_ft_trans_cancel(s->file);
+ ft_mode = FT_OFF;
+ event_tap_unregister();
+ }
+
+ qemu_savevm_state_cancel(s->mon, s->file);
migrate_fd_cleanup(s);
}
@@ -116,7 +116,7 @@ void migrate_fd_put_notify(void *opaque);
ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
-int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, int size);
+int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, size_t size);
void migrate_fd_connect(FdMigrationState *s);