@@ -16,6 +16,8 @@
#include "qemu-timer.h"
#include "qemu-char.h"
#include "buffered_file.h"
+#include "migration.h"
+#include "qemu-thread.h"
//#define DEBUG_BUFFERED_FILE
@@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered
void *opaque;
QEMUFile *file;
int has_error;
+ int closed;
int freeze_output;
size_t bytes_xfer;
size_t xfer_limit;
uint8_t *buffer;
size_t buffer_size;
size_t buffer_capacity;
- QEMUTimer *timer;
+ QemuThread thread;
} QEMUFileBuffered;
#ifdef DEBUG_BUFFERED_FILE
@@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in
offset = size;
}
- if (pos == 0 && size == 0) {
- DPRINTF("file is ready\n");
- if (s->bytes_xfer <= s->xfer_limit) {
- DPRINTF("notifying client\n");
- s->put_ready(s->opaque);
- }
- }
-
return offset;
}
@@ -173,22 +168,25 @@ static int buffered_close(void *opaque)
DPRINTF("closing\n");
- while (!s->has_error && s->buffer_size) {
- buffered_flush(s);
- if (s->freeze_output)
- s->wait_for_unfreeze(s);
- }
+ s->closed = 1;
- ret = s->close(s->opaque);
+ qemu_mutex_unlock_migthread();
+ qemu_mutex_unlock_iothread();
+
+ qemu_thread_join(&s->thread);
+ /* Waits for the completion of the migration thread */
- qemu_del_timer(s->timer);
- qemu_free_timer(s->timer);
+ qemu_mutex_lock_iothread();
+ qemu_mutex_lock_migthread();
+
+ ret = s->close(s->opaque);
qemu_free(s->buffer);
qemu_free(s);
return ret;
}
+
static int buffered_rate_limit(void *opaque)
{
QEMUFileBuffered *s = opaque;
@@ -228,26 +226,36 @@ static int64_t buffered_get_rate_limit(void *opaque)
return s->xfer_limit;
}
-static void buffered_rate_tick(void *opaque)
+static void *migrate_vm(void *opaque)
{
QEMUFileBuffered *s = opaque;
+ int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100;
+ struct timeval tv = { .tv_sec = 0, .tv_usec = 100000};
- if (s->has_error) {
- buffered_close(s);
- return;
- }
-
- qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+ while (!s->has_error && (!s->closed || s->buffer_size)) {
+ if (s->freeze_output) {
+ s->wait_for_unfreeze(s);
+ s->freeze_output = 0;
+ continue;
+ }
- if (s->freeze_output)
- return;
+ current_time = qemu_get_clock_ms(rt_clock);
+ if (!s->closed && (expire_time > current_time)) {
+ tv.tv_usec = 1000 * (expire_time - current_time);
+ select(0, NULL, NULL, NULL, &tv);
+ continue;
+ }
- s->bytes_xfer = 0;
+ s->bytes_xfer = 0;
+ buffered_flush(s);
- buffered_flush(s);
+ expire_time = qemu_get_clock_ms(rt_clock) + 100;
+ if (!s->closed) {
+ s->put_ready(s->opaque);
+ }
+ }
- /* Add some checks around this */
- s->put_ready(s->opaque);
+ return NULL;
}
QEMUFile *qemu_fopen_ops_buffered(void *opaque,
@@ -267,15 +275,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
s->put_ready = put_ready;
s->wait_for_unfreeze = wait_for_unfreeze;
s->close = close;
+ s->closed = 0;
s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
buffered_close, buffered_rate_limit,
buffered_set_rate_limit,
- buffered_get_rate_limit);
-
- s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
+ buffered_get_rate_limit);
- qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+ qemu_thread_create(&s->thread, migrate_vm, s);
return s->file;
}
@@ -149,10 +149,12 @@ int do_migrate_set_speed(Monitor *mon, const QDict *qdict, QObject **ret_data)
}
max_throttle = d;
+ qemu_mutex_lock_migthread();
s = migrate_to_fms(current_migration);
if (s && s->file) {
qemu_file_set_rate_limit(s->file, max_throttle);
}
+ qemu_mutex_unlock_migthread();
return 0;
}
@@ -284,8 +286,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
{
int ret = 0;
- qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
if (s->file) {
DPRINTF("closing file\n");
if (qemu_fclose(s->file) != 0) {
@@ -307,14 +307,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
return ret;
}
-void migrate_fd_put_notify(void *opaque)
-{
- FdMigrationState *s = opaque;
-
- qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
- qemu_file_put_notify(s->file);
-}
-
ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
{
FdMigrationState *s = opaque;
@@ -327,76 +319,91 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
if (ret == -1)
ret = -(s->get_error(s));
- if (ret == -EAGAIN) {
- qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
- } else if (ret < 0) {
- if (s->mon) {
- monitor_resume(s->mon);
+ return ret;
+}
+
+static void migrate_fd_terminate(void *opaque)
+{
+ FdMigrationState *s = opaque;
+
+ qemu_mutex_lock_migthread();
+
+ if (s->code == COMPLETE) {
+ if (migrate_fd_cleanup(s) < 0) {
+ if (s->old_vm_running) {
+ vm_start();
+ }
+ s->state = MIG_STATE_ERROR;
+ } else {
+ s->state = MIG_STATE_COMPLETED;
}
- s->state = MIG_STATE_ERROR;
notifier_list_notify(&migration_state_notifiers);
+ } else if (s->code == RESUME) {
+ if (s->old_vm_running) {
+ vm_start();
+ }
+ migrate_fd_error(s);
+ } else if (s->code == ERROR) {
+ migrate_fd_error(s);
}
- return ret;
+ qemu_mutex_unlock_migthread();
}
void migrate_fd_connect(FdMigrationState *s)
{
- int ret;
-
+ s->code = START;
+ s->bh = qemu_bh_new(migrate_fd_terminate, s);
s->file = qemu_fopen_ops_buffered(s,
s->bandwidth_limit,
migrate_fd_put_buffer,
migrate_fd_put_ready,
migrate_fd_wait_for_unfreeze,
migrate_fd_close);
-
- DPRINTF("beginning savevm\n");
- ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
- s->mig_state.shared);
- if (ret < 0) {
- DPRINTF("failed, %d\n", ret);
- migrate_fd_error(s);
- return;
- }
-
- migrate_fd_put_ready(s);
}
void migrate_fd_put_ready(void *opaque)
{
FdMigrationState *s = opaque;
+ int ret;
- if (s->state != MIG_STATE_ACTIVE) {
+ qemu_mutex_lock_iothread();
+ if (s->code != ACTIVE && s->code != START) {
DPRINTF("put_ready returning because of non-active state\n");
+ qemu_mutex_unlock_iothread();
return;
}
+ if (!s->code) {
+ DPRINTF("beginning savevm\n");
+ ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
+ s->mig_state.shared);
+ if (ret < 0) {
+ DPRINTF("failed, %d\n", ret);
+ s->code = ERROR;
+ qemu_bh_schedule(s->bh);
+ qemu_mutex_unlock_iothread();
+ return;
+ }
+ s->code = ACTIVE;
+ }
+
DPRINTF("iterate\n");
if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
- int state;
- int old_vm_running = vm_running;
+ s->old_vm_running = vm_running;
DPRINTF("done iterating\n");
vm_stop(VMSTOP_MIGRATE);
if ((qemu_savevm_state_complete(s->mon, s->file)) < 0) {
- if (old_vm_running) {
- vm_start();
- }
- state = MIG_STATE_ERROR;
+ s->code = RESUME;
} else {
- state = MIG_STATE_COMPLETED;
- }
- if (migrate_fd_cleanup(s) < 0) {
- if (old_vm_running) {
- vm_start();
- }
- state = MIG_STATE_ERROR;
+ s->code = COMPLETE;
}
- s->state = state;
- notifier_list_notify(&migration_state_notifiers);
+
+ qemu_bh_schedule(s->bh);
}
+ qemu_mutex_unlock_iothread();
}
int migrate_fd_get_status(MigrationState *mig_state)
@@ -416,9 +423,11 @@ void migrate_fd_cancel(MigrationState *mig_state)
s->state = MIG_STATE_CANCELLED;
notifier_list_notify(&migration_state_notifiers);
- qemu_savevm_state_cancel(s->mon, s->file);
+ qemu_mutex_lock_migthread();
+ qemu_savevm_state_cancel(s->mon, s->file);
migrate_fd_cleanup(s);
+ qemu_mutex_unlock_migthread();
}
void migrate_fd_release(MigrationState *mig_state)
@@ -426,22 +435,34 @@ void migrate_fd_release(MigrationState *mig_state)
FdMigrationState *s = migrate_to_fms(mig_state);
DPRINTF("releasing state\n");
-
+
if (s->state == MIG_STATE_ACTIVE) {
s->state = MIG_STATE_CANCELLED;
notifier_list_notify(&migration_state_notifiers);
+ qemu_mutex_lock_migthread();
migrate_fd_cleanup(s);
+ qemu_mutex_unlock_migthread();
+ }
+
+ if (s->bh) {
+ qemu_bh_delete(s->bh);
}
+
qemu_free(s);
}
void migrate_fd_wait_for_unfreeze(void *opaque)
{
FdMigrationState *s = opaque;
- int ret;
+ int ret, state;
DPRINTF("wait for unfreeze\n");
- if (s->state != MIG_STATE_ACTIVE)
+
+ qemu_mutex_lock_iothread();
+ state = s->state;
+ qemu_mutex_unlock_iothread();
+
+ if (state != MIG_STATE_ACTIVE)
return;
do {
@@ -458,7 +479,6 @@ int migrate_fd_close(void *opaque)
{
FdMigrationState *s = opaque;
- qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
return s->close(s);
}
@@ -23,6 +23,12 @@
#define MIG_STATE_CANCELLED 1
#define MIG_STATE_ACTIVE 2
+#define START 0
+#define ACTIVE 1
+#define COMPLETE 2
+#define ERROR 3
+#define RESUME 4
+
typedef struct MigrationState MigrationState;
struct MigrationState
@@ -45,10 +51,13 @@ struct FdMigrationState
int fd;
Monitor *mon;
int state;
+ int code;
+ int old_vm_running;
int (*get_error)(struct FdMigrationState*);
int (*close)(struct FdMigrationState*);
int (*write)(struct FdMigrationState*, const void *, size_t);
void *opaque;
+ QEMUBH *bh;
};
void process_incoming_migration(QEMUFile *f);
@@ -115,6 +115,16 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
error_exit(err, __func__);
}
+void qemu_thread_join(QemuThread *thread)
+{
+ int err;
+
+ err = pthread_join(thread->thread, NULL);
+ if (err) {
+ error_exit(err, __func__);
+ }
+}
+
void qemu_thread_create(QemuThread *thread,
void *(*start_routine)(void*),
void *arg)
@@ -30,6 +30,7 @@ void qemu_cond_destroy(QemuCond *cond);
void qemu_cond_signal(QemuCond *cond);
void qemu_cond_broadcast(QemuCond *cond);
void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex);
+void qemu_thread_join(QemuThread *thread);
void qemu_thread_create(QemuThread *thread,
void *(*start_routine)(void*),
@@ -481,11 +481,6 @@ int qemu_fclose(QEMUFile *f)
return ret;
}
-void qemu_file_put_notify(QEMUFile *f)
-{
- f->put_buffer(f->opaque, NULL, 0, 0);
-}
-
void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size)
{
int l;
This patch creates a separate thread for the guest migration on the source side. All exits (on completion/error) from the migration thread are handled by a bottom handler, which is called from the iothread. Signed-off-by: Umesh Deshpande <udeshpan@redhat.com> --- buffered_file.c | 75 +++++++++++++++++-------------- migration.c | 122 +++++++++++++++++++++++++++++--------------------- migration.h | 9 ++++ qemu-thread-posix.c | 10 ++++ qemu-thread.h | 1 + savevm.c | 5 -- 6 files changed, 132 insertions(+), 90 deletions(-)