Message ID | 893c5f1f472e252d9ae43a8348e0e0ef882936ce.1314466743.git.udeshpan@redhat.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On Sat, Aug 27, 2011 at 7:09 PM, Umesh Deshpande <udeshpan@redhat.com> wrote: > This patch creates a separate thread for the guest migration on the source side. > All exits (on completion/error) from the migration thread are handled by a > bottom handler, which is called from the iothread. > > Signed-off-by: Umesh Deshpande <udeshpan@redhat.com> > --- > buffered_file.c | 76 ++++++++++++++++++++---------------- > migration.c | 105 ++++++++++++++++++++++++++++++-------------------- > migration.h | 8 ++++ > qemu-thread-posix.c | 10 +++++ > qemu-thread.h | 1 + Will this patch break Windows builds by adding a function to qemu-thread-posix.c which is not implemented in qemu-thread-win32.c? Stefan -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
On 08/29/2011 05:09 AM, Stefan Hajnoczi wrote: > On Sat, Aug 27, 2011 at 7:09 PM, Umesh Deshpande<udeshpan@redhat.com> wrote: >> This patch creates a separate thread for the guest migration on the source side. >> All exits (on completion/error) from the migration thread are handled by a >> bottom handler, which is called from the iothread. >> >> Signed-off-by: Umesh Deshpande<udeshpan@redhat.com> >> --- >> buffered_file.c | 76 ++++++++++++++++++++---------------- >> migration.c | 105 ++++++++++++++++++++++++++++++-------------------- >> migration.h | 8 ++++ >> qemu-thread-posix.c | 10 +++++ >> qemu-thread.h | 1 + > Will this patch break Windows builds by adding a function to > qemu-thread-posix.c which is not implemented in qemu-thread-win32.c? Yes, equivalent function needs to be added in qemu-thread.win32.c -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
On Sat, Aug 27, 2011 at 02:09:48PM -0400, Umesh Deshpande wrote: > This patch creates a separate thread for the guest migration on the source side. > All exits (on completion/error) from the migration thread are handled by a > bottom handler, which is called from the iothread. > > Signed-off-by: Umesh Deshpande <udeshpan@redhat.com> > --- > buffered_file.c | 76 ++++++++++++++++++++---------------- > migration.c | 105 ++++++++++++++++++++++++++++++-------------------- > migration.h | 8 ++++ > qemu-thread-posix.c | 10 +++++ > qemu-thread.h | 1 + > 5 files changed, 124 insertions(+), 76 deletions(-) > > diff --git a/buffered_file.c b/buffered_file.c > index 41b42c3..c31852e 100644 > --- a/buffered_file.c > +++ b/buffered_file.c > @@ -16,6 +16,8 @@ > #include "qemu-timer.h" > #include "qemu-char.h" > #include "buffered_file.h" > +#include "migration.h" > +#include "qemu-thread.h" > > //#define DEBUG_BUFFERED_FILE > > @@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered > void *opaque; > QEMUFile *file; > int has_error; > + int closed; > int freeze_output; > size_t bytes_xfer; > size_t xfer_limit; > uint8_t *buffer; > size_t buffer_size; > size_t buffer_capacity; > - QEMUTimer *timer; > + QemuThread thread; > } QEMUFileBuffered; > > #ifdef DEBUG_BUFFERED_FILE > @@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in > offset = size; > } > > - if (pos == 0 && size == 0) { > - DPRINTF("file is ready\n"); > - if (s->bytes_xfer <= s->xfer_limit) { > - DPRINTF("notifying client\n"); > - s->put_ready(s->opaque); > - } > - } > - > return offset; > } > > @@ -173,22 +168,25 @@ static int buffered_close(void *opaque) > > DPRINTF("closing\n"); > > - while (!s->has_error && s->buffer_size) { > - buffered_flush(s); > - if (s->freeze_output) > - s->wait_for_unfreeze(s); > - } > + s->closed = 1; > > - ret = s->close(s->opaque); > + qemu_mutex_unlock_migrate_ram(); > + qemu_mutex_unlock_iothread(); This is using the ram mutex to protect migration thread specific data. A new lock should be introduced for that purpose. > - qemu_del_timer(s->timer); > - qemu_free_timer(s->timer); > + qemu_thread_join(&s->thread); > + /* Waits for the completion of the migration thread */ > + > + qemu_mutex_lock_iothread(); > + qemu_mutex_lock_migrate_ram(); > + > + ret = s->close(s->opaque); > qemu_free(s->buffer); > qemu_free(s); > > return ret; > } > > + > static int buffered_rate_limit(void *opaque) > { > QEMUFileBuffered *s = opaque; > @@ -228,26 +226,37 @@ static int64_t buffered_get_rate_limit(void *opaque) > return s->xfer_limit; > } > > -static void buffered_rate_tick(void *opaque) > +static void *migrate_vm(void *opaque) > { buffered_file.c was generic code that has now become migration specific (although migration was the only user). So it should either stop pretending to be generic code, by rename to migration_thread.c along with un-exporting interfaces, or it should remain generic and therefore all migration specific knowledge moved somewhere else. Anthony? > + int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100; > + struct timeval tv = { .tv_sec = 0, .tv_usec = 100000}; qemu_get_clock_ms should happen under iothread lock. > - if (s->freeze_output) > - return; > + current_time = qemu_get_clock_ms(rt_clock); > + if (!s->closed && (expire_time > current_time)) { > + tv.tv_usec = 1000 * (expire_time - current_time); > + select(0, NULL, NULL, NULL, &tv); > + continue; > + } > > - s->bytes_xfer = 0; > + s->bytes_xfer = 0; > > - buffered_flush(s); > + expire_time = qemu_get_clock_ms(rt_clock) + 100; > + if (!s->closed) { > + s->put_ready(s->opaque); > + } else { > + buffered_flush(s); > + } > + } > > - /* Add some checks around this */ > - s->put_ready(s->opaque); > + return NULL; > } > > QEMUFile *qemu_fopen_ops_buffered(void *opaque, > @@ -267,15 +276,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque, > s->put_ready = put_ready; > s->wait_for_unfreeze = wait_for_unfreeze; > s->close = close; > + s->closed = 0; > > s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, > buffered_close, buffered_rate_limit, > buffered_set_rate_limit, > - buffered_get_rate_limit); > - > - s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s); > + buffered_get_rate_limit); > > - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); > + qemu_thread_create(&s->thread, migrate_vm, s); > > return s->file; > } > diff --git a/migration.c b/migration.c > index af3a1f2..5df186d 100644 > --- a/migration.c > +++ b/migration.c > @@ -149,10 +149,12 @@ int do_migrate_set_speed(Monitor *mon, const QDict *qdict, QObject **ret_data) > } > max_throttle = d; > > + qemu_mutex_lock_migrate_ram(); > s = migrate_to_fms(current_migration); > if (s && s->file) { > qemu_file_set_rate_limit(s->file, max_throttle); > } > + qemu_mutex_unlock_migrate_ram(); This lock protects the RAMlist, and only the RAMlist, but here its being used to protect migration thread data. As noted above, a new lock should be introduced. > int ret = 0; > > - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); > - > if (s->file) { > DPRINTF("closing file\n"); > + qemu_mutex_lock_migrate_ram(); > if (qemu_fclose(s->file) != 0) { > ret = -1; > } > + qemu_mutex_unlock_migrate_ram(); > s->file = NULL; > } Again. -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
On 08/29/2011 08:49 PM, Marcelo Tosatti wrote: >> > -static void buffered_rate_tick(void *opaque) >> > +static void *migrate_vm(void *opaque) >> > { > > buffered_file.c was generic code that has now become migration specific > (although migration was the only user). So it should either stop > pretending to be generic code, by rename to migration_thread.c along > with un-exporting interfaces, or it should remain generic and therefore > all migration specific knowledge moved somewhere else. Actually, the thread function is ill-named. buffered_file.c is still generic code (or if it is not, it's a bug), except it should be called threaded_file.c. Moving it to migration.c is also an option of course. I asked Umesh to keep the abstraction for now, because it helped pinpointing places where abstractions were leaking in (such as the qemu_mutex_unlock_migrate_ram call that you found). >> + int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100; >> + struct timeval tv = { .tv_sec = 0, .tv_usec = 100000}; > > qemu_get_clock_ms should happen under iothread lock. For rt_clock it is safe. Should be documented, though. >> + qemu_mutex_lock_migrate_ram(); >> s = migrate_to_fms(current_migration); >> if (s && s->file) { >> qemu_file_set_rate_limit(s->file, max_throttle); >> } >> + qemu_mutex_unlock_migrate_ram(); > > This lock protects the RAMlist, and only the RAMlist, but here its > being used to protect migration thread data. As noted above, a new lock > should be introduced. Even better, freeing the buffered_file should be only done in the iothread (if this is not the case) so that the lock can be pushed down to buffered_set_rate_limit... > + qemu_mutex_lock_migrate_ram(); > if (qemu_fclose(s->file) != 0) { > ret = -1; > } > + qemu_mutex_unlock_migrate_ram(); ... and buffered_close (if a lock turns out to be needed at all). Paolo -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
On Tue, Aug 30, 2011 at 10:48:11AM +0200, Paolo Bonzini wrote: > On 08/29/2011 08:49 PM, Marcelo Tosatti wrote: > >>> -static void buffered_rate_tick(void *opaque) > >>> +static void *migrate_vm(void *opaque) > >>> { > > > >buffered_file.c was generic code that has now become migration specific > >(although migration was the only user). So it should either stop > >pretending to be generic code, by rename to migration_thread.c along > >with un-exporting interfaces, or it should remain generic and therefore > >all migration specific knowledge moved somewhere else. > > Actually, the thread function is ill-named. buffered_file.c is > still generic code (or if it is not, it's a bug), except it should > be called threaded_file.c. > > Moving it to migration.c is also an option of course. I asked Umesh > to keep the abstraction for now, because it helped pinpointing > places where abstractions were leaking in (such as the > qemu_mutex_unlock_migrate_ram call that you found). Fair enough, its indeed generic except misuse of ram lock. > >>+ int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100; > >>+ struct timeval tv = { .tv_sec = 0, .tv_usec = 100000}; > > > >qemu_get_clock_ms should happen under iothread lock. > > For rt_clock it is safe. Should be documented, though. > > >>+ qemu_mutex_lock_migrate_ram(); > >> s = migrate_to_fms(current_migration); > >> if (s && s->file) { > >> qemu_file_set_rate_limit(s->file, max_throttle); > >> } > >>+ qemu_mutex_unlock_migrate_ram(); > > > >This lock protects the RAMlist, and only the RAMlist, but here its > >being used to protect migration thread data. As noted above, a new lock > >should be introduced. > > Even better, freeing the buffered_file should be only done in the > iothread (if this is not the case) so that the lock can be pushed > down to buffered_set_rate_limit... Sounds good. > >+ qemu_mutex_lock_migrate_ram(); > > if (qemu_fclose(s->file) != 0) { > > ret = -1; > > } > >+ qemu_mutex_unlock_migrate_ram(); > > ... and buffered_close (if a lock turns out to be needed at all). > > Paolo -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
diff --git a/buffered_file.c b/buffered_file.c index 41b42c3..c31852e 100644 --- a/buffered_file.c +++ b/buffered_file.c @@ -16,6 +16,8 @@ #include "qemu-timer.h" #include "qemu-char.h" #include "buffered_file.h" +#include "migration.h" +#include "qemu-thread.h" //#define DEBUG_BUFFERED_FILE @@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered void *opaque; QEMUFile *file; int has_error; + int closed; int freeze_output; size_t bytes_xfer; size_t xfer_limit; uint8_t *buffer; size_t buffer_size; size_t buffer_capacity; - QEMUTimer *timer; + QemuThread thread; } QEMUFileBuffered; #ifdef DEBUG_BUFFERED_FILE @@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in offset = size; } - if (pos == 0 && size == 0) { - DPRINTF("file is ready\n"); - if (s->bytes_xfer <= s->xfer_limit) { - DPRINTF("notifying client\n"); - s->put_ready(s->opaque); - } - } - return offset; } @@ -173,22 +168,25 @@ static int buffered_close(void *opaque) DPRINTF("closing\n"); - while (!s->has_error && s->buffer_size) { - buffered_flush(s); - if (s->freeze_output) - s->wait_for_unfreeze(s); - } + s->closed = 1; - ret = s->close(s->opaque); + qemu_mutex_unlock_migrate_ram(); + qemu_mutex_unlock_iothread(); - qemu_del_timer(s->timer); - qemu_free_timer(s->timer); + qemu_thread_join(&s->thread); + /* Waits for the completion of the migration thread */ + + qemu_mutex_lock_iothread(); + qemu_mutex_lock_migrate_ram(); + + ret = s->close(s->opaque); qemu_free(s->buffer); qemu_free(s); return ret; } + static int buffered_rate_limit(void *opaque) { QEMUFileBuffered *s = opaque; @@ -228,26 +226,37 @@ static int64_t buffered_get_rate_limit(void *opaque) return s->xfer_limit; } -static void buffered_rate_tick(void *opaque) +static void *migrate_vm(void *opaque) { QEMUFileBuffered *s = opaque; + int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100; + struct timeval tv = { .tv_sec = 0, .tv_usec = 100000}; - if (s->has_error) { - buffered_close(s); - return; - } - - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + while (!s->has_error && (!s->closed || s->buffer_size)) { + if (s->freeze_output) { + s->wait_for_unfreeze(s); + s->freeze_output = 0; + continue; + } - if (s->freeze_output) - return; + current_time = qemu_get_clock_ms(rt_clock); + if (!s->closed && (expire_time > current_time)) { + tv.tv_usec = 1000 * (expire_time - current_time); + select(0, NULL, NULL, NULL, &tv); + continue; + } - s->bytes_xfer = 0; + s->bytes_xfer = 0; - buffered_flush(s); + expire_time = qemu_get_clock_ms(rt_clock) + 100; + if (!s->closed) { + s->put_ready(s->opaque); + } else { + buffered_flush(s); + } + } - /* Add some checks around this */ - s->put_ready(s->opaque); + return NULL; } QEMUFile *qemu_fopen_ops_buffered(void *opaque, @@ -267,15 +276,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque, s->put_ready = put_ready; s->wait_for_unfreeze = wait_for_unfreeze; s->close = close; + s->closed = 0; s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, buffered_close, buffered_rate_limit, buffered_set_rate_limit, - buffered_get_rate_limit); - - s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s); + buffered_get_rate_limit); - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + qemu_thread_create(&s->thread, migrate_vm, s); return s->file; } diff --git a/migration.c b/migration.c index af3a1f2..5df186d 100644 --- a/migration.c +++ b/migration.c @@ -149,10 +149,12 @@ int do_migrate_set_speed(Monitor *mon, const QDict *qdict, QObject **ret_data) } max_throttle = d; + qemu_mutex_lock_migrate_ram(); s = migrate_to_fms(current_migration); if (s && s->file) { qemu_file_set_rate_limit(s->file, max_throttle); } + qemu_mutex_unlock_migrate_ram(); return 0; } @@ -284,13 +286,13 @@ int migrate_fd_cleanup(FdMigrationState *s) { int ret = 0; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); - if (s->file) { DPRINTF("closing file\n"); + qemu_mutex_lock_migrate_ram(); if (qemu_fclose(s->file) != 0) { ret = -1; } + qemu_mutex_unlock_migrate_ram(); s->file = NULL; } @@ -311,7 +313,6 @@ void migrate_fd_put_notify(void *opaque) { FdMigrationState *s = opaque; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); qemu_file_put_notify(s->file); } @@ -327,76 +328,87 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) if (ret == -1) ret = -(s->get_error(s)); - if (ret == -EAGAIN) { - qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s); - } else if (ret < 0) { - if (s->mon) { - monitor_resume(s->mon); + return ret; +} + +static void migrate_fd_terminate(void *opaque) +{ + FdMigrationState *s = opaque; + + if (s->code == COMPLETE) { + if (migrate_fd_cleanup(s) < 0) { + if (s->old_vm_running) { + vm_start(); + } + s->state = MIG_STATE_ERROR; + } else { + s->state = MIG_STATE_COMPLETED; } - s->state = MIG_STATE_ERROR; notifier_list_notify(&migration_state_notifiers); + } else if (s->code == ERROR) { + migrate_fd_error(s); } - - return ret; } void migrate_fd_connect(FdMigrationState *s) { - int ret; - + s->code = START; + s->bh = qemu_bh_new(migrate_fd_terminate, s); s->file = qemu_fopen_ops_buffered(s, s->bandwidth_limit, migrate_fd_put_buffer, migrate_fd_put_ready, migrate_fd_wait_for_unfreeze, migrate_fd_close); - - DPRINTF("beginning savevm\n"); - ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, - s->mig_state.shared); - if (ret < 0) { - DPRINTF("failed, %d\n", ret); - migrate_fd_error(s); - return; - } - - migrate_fd_put_ready(s); } void migrate_fd_put_ready(void *opaque) { FdMigrationState *s = opaque; + int ret; - if (s->state != MIG_STATE_ACTIVE) { + qemu_mutex_lock_iothread(); + if (s->code != ACTIVE && s->code != START) { DPRINTF("put_ready returning because of non-active state\n"); + qemu_mutex_unlock_iothread(); return; } + migrate_fd_put_notify(s); + + if (!s->code) { + DPRINTF("beginning savevm\n"); + ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, + s->mig_state.shared); + if (ret < 0) { + DPRINTF("failed, %d\n", ret); + s->code = ERROR; + qemu_bh_schedule(s->bh); + qemu_mutex_unlock_iothread(); + return; + } + s->code = ACTIVE; + } + DPRINTF("iterate\n"); if (qemu_savevm_state_iterate(s->mon, s->file) == 1) { - int state; - int old_vm_running = vm_running; + s->old_vm_running = vm_running; DPRINTF("done iterating\n"); vm_stop(VMSTOP_MIGRATE); if ((qemu_savevm_state_complete(s->mon, s->file)) < 0) { - if (old_vm_running) { + if (s->old_vm_running) { vm_start(); } - state = MIG_STATE_ERROR; + s->code = ERROR; } else { - state = MIG_STATE_COMPLETED; + s->code = COMPLETE; } - if (migrate_fd_cleanup(s) < 0) { - if (old_vm_running) { - vm_start(); - } - state = MIG_STATE_ERROR; - } - s->state = state; - notifier_list_notify(&migration_state_notifiers); + + qemu_bh_schedule(s->bh); } + qemu_mutex_unlock_iothread(); } int migrate_fd_get_status(MigrationState *mig_state) @@ -426,22 +438,32 @@ void migrate_fd_release(MigrationState *mig_state) FdMigrationState *s = migrate_to_fms(mig_state); DPRINTF("releasing state\n"); - + if (s->state == MIG_STATE_ACTIVE) { s->state = MIG_STATE_CANCELLED; notifier_list_notify(&migration_state_notifiers); migrate_fd_cleanup(s); } + + if (s->bh) { + qemu_bh_delete(s->bh); + } + qemu_free(s); } void migrate_fd_wait_for_unfreeze(void *opaque) { FdMigrationState *s = opaque; - int ret; + int ret, state; DPRINTF("wait for unfreeze\n"); - if (s->state != MIG_STATE_ACTIVE) + + qemu_mutex_lock_iothread(); + state = s->state; + qemu_mutex_unlock_iothread(); + + if (state != MIG_STATE_ACTIVE) return; do { @@ -458,7 +480,6 @@ int migrate_fd_close(void *opaque) { FdMigrationState *s = opaque; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); return s->close(s); } diff --git a/migration.h b/migration.h index 050c56c..960559a 100644 --- a/migration.h +++ b/migration.h @@ -23,6 +23,11 @@ #define MIG_STATE_CANCELLED 1 #define MIG_STATE_ACTIVE 2 +#define START 0 +#define ACTIVE 1 +#define COMPLETE 2 +#define ERROR 3 + typedef struct MigrationState MigrationState; struct MigrationState @@ -45,10 +50,13 @@ struct FdMigrationState int fd; Monitor *mon; int state; + int code; + int old_vm_running; int (*get_error)(struct FdMigrationState*); int (*close)(struct FdMigrationState*); int (*write)(struct FdMigrationState*, const void *, size_t); void *opaque; + QEMUBH *bh; }; void process_incoming_migration(QEMUFile *f); diff --git a/qemu-thread-posix.c b/qemu-thread-posix.c index 2bd02ef..6a275be 100644 --- a/qemu-thread-posix.c +++ b/qemu-thread-posix.c @@ -115,6 +115,16 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex) error_exit(err, __func__); } +void qemu_thread_join(QemuThread *thread) +{ + int err; + + err = pthread_join(thread->thread, NULL); + if (err) { + error_exit(err, __func__); + } +} + void qemu_thread_create(QemuThread *thread, void *(*start_routine)(void*), void *arg) diff --git a/qemu-thread.h b/qemu-thread.h index 0a73d50..d5b99d5 100644 --- a/qemu-thread.h +++ b/qemu-thread.h @@ -30,6 +30,7 @@ void qemu_cond_destroy(QemuCond *cond); void qemu_cond_signal(QemuCond *cond); void qemu_cond_broadcast(QemuCond *cond); void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex); +void qemu_thread_join(QemuThread *thread); void qemu_thread_create(QemuThread *thread, void *(*start_routine)(void*),
This patch creates a separate thread for the guest migration on the source side. All exits (on completion/error) from the migration thread are handled by a bottom handler, which is called from the iothread. Signed-off-by: Umesh Deshpande <udeshpan@redhat.com> --- buffered_file.c | 76 ++++++++++++++++++++---------------- migration.c | 105 ++++++++++++++++++++++++++++++-------------------- migration.h | 8 ++++ qemu-thread-posix.c | 10 +++++ qemu-thread.h | 1 + 5 files changed, 124 insertions(+), 76 deletions(-)