Message ID | e5a28f3e3d8ff3c5c08b307ef320051632832718.1311971938.git.udeshpan@redhat.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On 07/29/2011 10:57 PM, Umesh Deshpande wrote: > This patch creates a separate thread for the guest migration on the source side. > > Signed-off-by: Umesh Deshpande<udeshpan@redhat.com> Looks pretty good! One thing that shows, is that the interface separation between buffered_file.c is migration.c is pretty weird. Your patch makes it somewhat worse, but it was like this before so it's not your fault. The good thing is that if buffered_file.c uses threads, you can fix a large part of this and get even simpler code: 1) there is really just one way to implement migrate_fd_put_notify, and with your simplifications it does not belong anymore in migration.c. 2) s->callback is actually not NULL exactly if s->file->frozen_output is true, you can remove it as well; 3) buffered_close is messy because it can be called from both the iothread (monitor->migrate_fd_cancel->migrate_fd_cleanup->qemu_fclose) or the migration thread (after qemu_savevm_state_complete). But buffered_close is actually very similar to your thread function (it does flush+wait_for_unfreeze, basically)! So buffered_close can be simply: s->closed = 1; ret = qemu_thread_join(s->thread); /* doesn't exist yet :) */ qemu_free(...); return ret; Another nit is that here: > + if (migrate_fd_check_expire()) { > + buffered_rate_tick(s->file); > + } > + > + if (s->state != MIG_STATE_ACTIVE) { > + break; > + } > + > + if (s->callback) { > + migrate_fd_wait_for_unfreeze(s); > + s->callback(s); > + } you can still have a busy wait. Putting it all together, you can move the thread function back to buffered_file.c like: while (!s->closed || (!s->has_error && s->buffer_size)) { if (s->freeze_output) { qemu_mutex_unlock_iothread(); s->wait_for_unfreeze(s); qemu_mutex_lock_iothread(); /* This comes from qemu_file_put_notify (via buffered_put_buffer---can be simplified a lot too?). s->freeze_output = 0; /* Test again for cancellation. */ continue; } int64_t current_time = qemu_get_clock_ms(rt_clock); if (s->expire_time > current_time) { struct timeval tv = { .tv_sec = 0, .tv_usec = ... }; qemu_mutex_unlock_iothread(); select (0, NULL, NULL, NULL, &tv); qemu_mutex_lock_iothread(); s->expire_time = qemu_get_clock_ms(rt_clock) + 100; continue; } /* This comes from buffered_rate_tick. */ s->bytes_xfer = 0; buffered_flush(s); if (!s->closed) { s->put_ready(s->opaque); } } ret = s->close(s->opaque); ... Does it look sane? Paolo -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
On 08/01/2011 05:37 AM, Paolo Bonzini wrote: > On 07/29/2011 10:57 PM, Umesh Deshpande wrote: >> This patch creates a separate thread for the guest migration on the >> source side. >> >> Signed-off-by: Umesh Deshpande<udeshpan@redhat.com> > > Looks pretty good! > > One thing that shows, is that the interface separation between > buffered_file.c is migration.c is pretty weird. Your patch makes it > somewhat worse, but it was like this before so it's not your fault. > The good thing is that if buffered_file.c uses threads, you can fix a > large part of this and get even simpler code: > > 1) there is really just one way to implement migrate_fd_put_notify, > and with your simplifications it does not belong anymore in migration.c. > > 2) s->callback is actually not NULL exactly if s->file->frozen_output > is true, you can remove it as well; > > 3) buffered_close is messy because it can be called from both the > iothread (monitor->migrate_fd_cancel->migrate_fd_cleanup->qemu_fclose) > or the migration thread (after qemu_savevm_state_complete). But > buffered_close is actually very similar to your thread function (it > does flush+wait_for_unfreeze, basically)! So buffered_close can be > simply: > > s->closed = 1; > ret = qemu_thread_join(s->thread); /* doesn't exist yet :) */ > qemu_free(...); > return ret; > > Another nit is that here: > >> + if (migrate_fd_check_expire()) { >> + buffered_rate_tick(s->file); >> + } >> + >> + if (s->state != MIG_STATE_ACTIVE) { >> + break; >> + } >> + >> + if (s->callback) { >> + migrate_fd_wait_for_unfreeze(s); >> + s->callback(s); >> + } > > you can still have a busy wait. > > Putting it all together, you can move the thread function back to > buffered_file.c like: > > while (!s->closed || (!s->has_error && s->buffer_size)) { > if (s->freeze_output) { > qemu_mutex_unlock_iothread(); > s->wait_for_unfreeze(s); > qemu_mutex_lock_iothread(); > /* This comes from qemu_file_put_notify (via > buffered_put_buffer---can be simplified a lot too?). > s->freeze_output = 0; > /* Test again for cancellation. */ > continue; > } > > int64_t current_time = qemu_get_clock_ms(rt_clock); > if (s->expire_time > current_time) { > struct timeval tv = { .tv_sec = 0, .tv_usec = ... }; > qemu_mutex_unlock_iothread(); > select (0, NULL, NULL, NULL, &tv); > qemu_mutex_lock_iothread(); > s->expire_time = qemu_get_clock_ms(rt_clock) + 100; > continue; > } > > /* This comes from buffered_rate_tick. */ > s->bytes_xfer = 0; > buffered_flush(s); > if (!s->closed) { > s->put_ready(s->opaque); > } > } > > ret = s->close(s->opaque); > ... > > Does it look sane? > I kept this in migration.c to call qemu_savevm_state_begin. (The way it is done currently. i.e. to keep access to FdMigrationState in migration.c) Calling it from buffered_file.c would be inconsistent in that sense. or we will have to call it from the iothread before spawning the migration thread. Also why is the separation between FdMigrationState and QEMUFileBuffered is required. Is QEMUFileBuffered designed to use also for things other than migration? Thanks Umesh > > Paolo -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
On 08/01/2011 11:00 PM, Umesh Deshpande wrote: >> > I kept this in migration.c to call qemu_savevm_state_begin. (The way it > is done currently. i.e. to keep access to FdMigrationState in migration.c) > Calling it from buffered_file.c would be inconsistent in that sense. or > we will have to call it from the iothread before spawning the migration > thread. Right, I missed that. Perhaps you can call it the first time put_ready is called. > Also why is the separation between FdMigrationState and QEMUFileBuffered > is required. Is QEMUFileBuffered designed to use also for things other > than migration? No, but let's keep it this way for now. It may be an annoyance, but it also helps making a reusable architecture, and it can probably be cleaned up substantially with thread support. Paolo -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
diff --git a/buffered_file.c b/buffered_file.c index 41b42c3..d4146bf 100644 --- a/buffered_file.c +++ b/buffered_file.c @@ -16,12 +16,16 @@ #include "qemu-timer.h" #include "qemu-char.h" #include "buffered_file.h" +#include "migration.h" +#include "savevm.h" +#include "qemu-thread.h" //#define DEBUG_BUFFERED_FILE typedef struct QEMUFileBuffered { BufferedPutFunc *put_buffer; + BufferedBeginFunc *begin; BufferedPutReadyFunc *put_ready; BufferedWaitForUnfreezeFunc *wait_for_unfreeze; BufferedCloseFunc *close; @@ -35,6 +39,7 @@ typedef struct QEMUFileBuffered size_t buffer_size; size_t buffer_capacity; QEMUTimer *timer; + QemuThread thread; } QEMUFileBuffered; #ifdef DEBUG_BUFFERED_FILE @@ -181,8 +186,6 @@ static int buffered_close(void *opaque) ret = s->close(s->opaque); - qemu_del_timer(s->timer); - qemu_free_timer(s->timer); qemu_free(s->buffer); qemu_free(s); @@ -228,17 +231,15 @@ static int64_t buffered_get_rate_limit(void *opaque) return s->xfer_limit; } -static void buffered_rate_tick(void *opaque) +void buffered_rate_tick(QEMUFile *file) { - QEMUFileBuffered *s = opaque; + QEMUFileBuffered *s = file->opaque; if (s->has_error) { buffered_close(s); return; } - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); - if (s->freeze_output) return; @@ -250,9 +251,17 @@ static void buffered_rate_tick(void *opaque) s->put_ready(s->opaque); } +static void *migrate_vm(void *opaque) +{ + QEMUFileBuffered *s = opaque; + s->begin(s->opaque); + return NULL; +} + QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t bytes_per_sec, BufferedPutFunc *put_buffer, + BufferedBeginFunc *begin, BufferedPutReadyFunc *put_ready, BufferedWaitForUnfreezeFunc *wait_for_unfreeze, BufferedCloseFunc *close) @@ -264,6 +273,7 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque, s->opaque = opaque; s->xfer_limit = bytes_per_sec / 10; s->put_buffer = put_buffer; + s->begin = begin; s->put_ready = put_ready; s->wait_for_unfreeze = wait_for_unfreeze; s->close = close; @@ -271,11 +281,9 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque, s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, buffered_close, buffered_rate_limit, buffered_set_rate_limit, - buffered_get_rate_limit); - - s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s); + buffered_get_rate_limit); - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + qemu_thread_create(&s->thread, migrate_vm, s); return s->file; } diff --git a/buffered_file.h b/buffered_file.h index 98d358b..cfe2833 100644 --- a/buffered_file.h +++ b/buffered_file.h @@ -17,12 +17,16 @@ #include "hw/hw.h" typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size); +typedef void (BufferedBeginFunc)(void *opaque); typedef void (BufferedPutReadyFunc)(void *opaque); typedef void (BufferedWaitForUnfreezeFunc)(void *opaque); typedef int (BufferedCloseFunc)(void *opaque); +void buffered_rate_tick(QEMUFile *file); + QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit, BufferedPutFunc *put_buffer, + BufferedBeginFunc *begin, BufferedPutReadyFunc *put_ready, BufferedWaitForUnfreezeFunc *wait_for_unfreeze, BufferedCloseFunc *close); diff --git a/migration.c b/migration.c index af3a1f2..bf86067 100644 --- a/migration.c +++ b/migration.c @@ -31,6 +31,8 @@ do { } while (0) #endif +static int64_t expire_time; + /* Migration speed throttling */ static int64_t max_throttle = (32 << 20); @@ -284,8 +286,6 @@ int migrate_fd_cleanup(FdMigrationState *s) { int ret = 0; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); - if (s->file) { DPRINTF("closing file\n"); if (qemu_fclose(s->file) != 0) { @@ -310,8 +310,7 @@ int migrate_fd_cleanup(FdMigrationState *s) void migrate_fd_put_notify(void *opaque) { FdMigrationState *s = opaque; - - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); + s->callback = NULL; qemu_file_put_notify(s->file); } @@ -328,7 +327,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) ret = -(s->get_error(s)); if (ret == -EAGAIN) { - qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s); + s->callback = migrate_fd_put_notify; } else if (ret < 0) { if (s->mon) { monitor_resume(s->mon); @@ -342,27 +341,66 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) void migrate_fd_connect(FdMigrationState *s) { - int ret; - + s->callback = NULL; s->file = qemu_fopen_ops_buffered(s, s->bandwidth_limit, migrate_fd_put_buffer, + migrate_fd_begin, migrate_fd_put_ready, migrate_fd_wait_for_unfreeze, migrate_fd_close); +} + +static int migrate_fd_check_expire(void) +{ + int64_t current_time = qemu_get_clock_ms(rt_clock); + + if (expire_time > current_time) { + return 0; + } else { + expire_time = qemu_get_clock_ms(rt_clock) + 100; + return 1; + } +} + +void migrate_fd_begin(void *arg) +{ + FdMigrationState *s = arg; + int ret; + qemu_mutex_lock_iothread(); DPRINTF("beginning savevm\n"); ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, s->mig_state.shared); if (ret < 0) { DPRINTF("failed, %d\n", ret); migrate_fd_error(s); - return; + goto out; } - + + expire_time = qemu_get_clock_ms(rt_clock) + 100; migrate_fd_put_ready(s); + + while (s->state == MIG_STATE_ACTIVE) { + if (migrate_fd_check_expire()) { + buffered_rate_tick(s->file); + } + + if (s->state != MIG_STATE_ACTIVE) { + break; + } + + if (s->callback) { + migrate_fd_wait_for_unfreeze(s); + s->callback(s); + } + } + +out: + qemu_mutex_unlock_iothread(); } + void migrate_fd_put_ready(void *opaque) { FdMigrationState *s = opaque; @@ -376,8 +414,6 @@ void migrate_fd_put_ready(void *opaque) if (qemu_savevm_state_iterate(s->mon, s->file) == 1) { int state; int old_vm_running = vm_running; - - DPRINTF("done iterating\n"); vm_stop(VMSTOP_MIGRATE); if ((qemu_savevm_state_complete(s->mon, s->file)) < 0) { @@ -458,7 +494,6 @@ int migrate_fd_close(void *opaque) { FdMigrationState *s = opaque; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); return s->close(s); } diff --git a/migration.h b/migration.h index 050c56c..8ed34ab 100644 --- a/migration.h +++ b/migration.h @@ -48,6 +48,7 @@ struct FdMigrationState int (*get_error)(struct FdMigrationState*); int (*close)(struct FdMigrationState*); int (*write)(struct FdMigrationState*, const void *, size_t); + void (*callback)(void *); void *opaque; }; @@ -118,6 +119,8 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size); void migrate_fd_connect(FdMigrationState *s); +void migrate_fd_begin(void *opaque); + void migrate_fd_put_ready(void *opaque); int migrate_fd_get_status(MigrationState *mig_state); diff --git a/savevm.c b/savevm.c index 8139bc7..4859a34 100644 --- a/savevm.c +++ b/savevm.c @@ -82,6 +82,7 @@ #include "qemu_socket.h" #include "qemu-queue.h" #include "cpus.h" +#include "savevm.h" #define SELF_ANNOUNCE_ROUNDS 5 @@ -155,27 +156,6 @@ void qemu_announce_self(void) /***********************************************************/ /* savevm/loadvm support */ -#define IO_BUF_SIZE 32768 - -struct QEMUFile { - QEMUFilePutBufferFunc *put_buffer; - QEMUFileGetBufferFunc *get_buffer; - QEMUFileCloseFunc *close; - QEMUFileRateLimit *rate_limit; - QEMUFileSetRateLimit *set_rate_limit; - QEMUFileGetRateLimit *get_rate_limit; - void *opaque; - int is_write; - - int64_t buf_offset; /* start of buffer when writing, end of buffer - when reading */ - int buf_index; - int buf_size; /* 0 when writing */ - uint8_t buf[IO_BUF_SIZE]; - - int has_error; -}; - typedef struct QEMUFileStdio { FILE *stdio_file; diff --git a/savevm.h b/savevm.h new file mode 100644 index 0000000..954eded --- /dev/null +++ b/savevm.h @@ -0,0 +1,29 @@ +#ifndef QEMU_SAVEVM_H +#define QEMU_SAVEVM_H + +#include "hw/hw.h" + +#define IO_BUF_SIZE 32768 + +struct QEMUFile { + QEMUFilePutBufferFunc *put_buffer; + QEMUFileGetBufferFunc *get_buffer; + QEMUFileCloseFunc *close; + QEMUFileRateLimit *rate_limit; + QEMUFileSetRateLimit *set_rate_limit; + QEMUFileGetRateLimit *get_rate_limit; + void *opaque; + int is_write; + + int64_t buf_offset; /* start of buffer when writing, end of buffer + when reading */ + int buf_index; + int buf_size; /* 0 when + * writing + * */ + uint8_t buf[IO_BUF_SIZE]; + + int has_error; +}; + +#endif
This patch creates a separate thread for the guest migration on the source side. Signed-off-by: Umesh Deshpande <udeshpan@redhat.com> --- buffered_file.c | 28 ++++++++++++++++--------- buffered_file.h | 4 +++ migration.c | 59 +++++++++++++++++++++++++++++++++++++++++++----------- migration.h | 3 ++ savevm.c | 22 +------------------- savevm.h | 29 +++++++++++++++++++++++++++ 6 files changed, 102 insertions(+), 43 deletions(-) create mode 100644 savevm.h