From patchwork Thu Aug 11 15:32:41 2011 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Umesh Deshpande X-Patchwork-Id: 1057862 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by demeter2.kernel.org (8.14.4/8.14.4) with ESMTP id p7BFaZWQ021570 for ; Thu, 11 Aug 2011 15:36:38 GMT Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1753025Ab1HKPc7 (ORCPT ); Thu, 11 Aug 2011 11:32:59 -0400 Received: from mx1.redhat.com ([209.132.183.28]:38126 "EHLO mx1.redhat.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1752921Ab1HKPc5 (ORCPT ); Thu, 11 Aug 2011 11:32:57 -0400 Received: from int-mx02.intmail.prod.int.phx2.redhat.com (int-mx02.intmail.prod.int.phx2.redhat.com [10.5.11.12]) by mx1.redhat.com (8.14.4/8.14.4) with ESMTP id p7BFWvV0001339 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-SHA bits=256 verify=OK) for ; Thu, 11 Aug 2011 11:32:57 -0400 Received: from ns3.rdu.redhat.com (ns3.rdu.redhat.com [10.11.255.199]) by int-mx02.intmail.prod.int.phx2.redhat.com (8.13.8/8.13.8) with ESMTP id p7BFWuWl008858; Thu, 11 Aug 2011 11:32:56 -0400 Received: from umeshhome.redhat.com (vpn-8-38.rdu.redhat.com [10.11.8.38]) by ns3.rdu.redhat.com (8.13.8/8.13.8) with ESMTP id p7BFWtYh018642; Thu, 11 Aug 2011 11:32:56 -0400 From: Umesh Deshpande To: kvm@vger.kernel.org Cc: quintela@redhat.com, mtosatti@redhat.com, pbonzini@redhat.com, Umesh Deshpande Subject: [RFC PATCH v3 1/4] separate thread for VM migration Date: Thu, 11 Aug 2011 11:32:41 -0400 Message-Id: <6ac256e1f481ea28678bae846a13714302f258db.1313076455.git.udeshpan@redhat.com> In-Reply-To: References: X-Scanned-By: MIMEDefang 2.67 on 10.5.11.12 Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org X-Greylist: IP, sender and recipient auto-whitelisted, not delayed by milter-greylist-4.2.6 (demeter2.kernel.org [140.211.167.43]); Thu, 11 Aug 2011 15:36:38 +0000 (UTC) This patch creates a separate thread for the guest migration on the source side. migrate_cancel request from the iothread is handled asynchronously. That is, iothread submits migrate_cancel to the migration thread and returns, while the migration thread attends this request at the next iteration to terminate its execution. Signed-off-by: Umesh Deshpande --- buffered_file.c | 85 ++++++++++++++++++++++++++++++++---------------------- buffered_file.h | 4 ++ migration.c | 49 ++++++++++++++----------------- migration.h | 6 ++++ 4 files changed, 82 insertions(+), 62 deletions(-) diff --git a/buffered_file.c b/buffered_file.c index 41b42c3..19932b6 100644 --- a/buffered_file.c +++ b/buffered_file.c @@ -16,6 +16,8 @@ #include "qemu-timer.h" #include "qemu-char.h" #include "buffered_file.h" +#include "migration.h" +#include "qemu-thread.h" //#define DEBUG_BUFFERED_FILE @@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered void *opaque; QEMUFile *file; int has_error; + int closed; int freeze_output; size_t bytes_xfer; size_t xfer_limit; uint8_t *buffer; size_t buffer_size; size_t buffer_capacity; - QEMUTimer *timer; + QemuThread thread; } QEMUFileBuffered; #ifdef DEBUG_BUFFERED_FILE @@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in offset = size; } - if (pos == 0 && size == 0) { - DPRINTF("file is ready\n"); - if (s->bytes_xfer <= s->xfer_limit) { - DPRINTF("notifying client\n"); - s->put_ready(s->opaque); - } - } - return offset; } @@ -175,20 +170,20 @@ static int buffered_close(void *opaque) while (!s->has_error && s->buffer_size) { buffered_flush(s); - if (s->freeze_output) + if (s->freeze_output) { s->wait_for_unfreeze(s); + } } - ret = s->close(s->opaque); + s->closed = 1; - qemu_del_timer(s->timer); - qemu_free_timer(s->timer); + ret = s->close(s->opaque); qemu_free(s->buffer); - qemu_free(s); return ret; } + static int buffered_rate_limit(void *opaque) { QEMUFileBuffered *s = opaque; @@ -228,34 +223,55 @@ static int64_t buffered_get_rate_limit(void *opaque) return s->xfer_limit; } -static void buffered_rate_tick(void *opaque) +static void *migrate_vm(void *opaque) { QEMUFileBuffered *s = opaque; + int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100; + struct timeval tv = { .tv_sec = 0, .tv_usec = 100000}; - if (s->has_error) { - buffered_close(s); - return; - } + qemu_mutex_lock_iothread(); - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + while (!s->closed) { + if (s->freeze_output) { + s->wait_for_unfreeze(s); + s->freeze_output = 0; + continue; + } - if (s->freeze_output) - return; + if (s->has_error) { + break; + } + + current_time = qemu_get_clock_ms(rt_clock); + if (!s->closed && (expire_time > current_time)) { + tv.tv_usec = 1000 * (expire_time - current_time); + select(0, NULL, NULL, NULL, &tv); + continue; + } - s->bytes_xfer = 0; + s->bytes_xfer = 0; + buffered_flush(s); - buffered_flush(s); + expire_time = qemu_get_clock_ms(rt_clock) + 100; + s->put_ready(s->opaque); + } - /* Add some checks around this */ - s->put_ready(s->opaque); + if (s->has_error) { + buffered_close(s); + } + qemu_free(s); + + qemu_mutex_unlock_iothread(); + + return NULL; } QEMUFile *qemu_fopen_ops_buffered(void *opaque, - size_t bytes_per_sec, - BufferedPutFunc *put_buffer, - BufferedPutReadyFunc *put_ready, - BufferedWaitForUnfreezeFunc *wait_for_unfreeze, - BufferedCloseFunc *close) + size_t bytes_per_sec, + BufferedPutFunc *put_buffer, + BufferedPutReadyFunc *put_ready, + BufferedWaitForUnfreezeFunc *wait_for_unfreeze, + BufferedCloseFunc *close) { QEMUFileBuffered *s; @@ -267,15 +283,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque, s->put_ready = put_ready; s->wait_for_unfreeze = wait_for_unfreeze; s->close = close; + s->closed = 0; s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, buffered_close, buffered_rate_limit, buffered_set_rate_limit, - buffered_get_rate_limit); - - s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s); + buffered_get_rate_limit); - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + qemu_thread_create(&s->thread, migrate_vm, s); return s->file; } diff --git a/buffered_file.h b/buffered_file.h index 98d358b..477bf7c 100644 --- a/buffered_file.h +++ b/buffered_file.h @@ -17,9 +17,13 @@ #include "hw/hw.h" typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size); +typedef void (BufferedBeginFunc)(void *opaque); typedef void (BufferedPutReadyFunc)(void *opaque); typedef void (BufferedWaitForUnfreezeFunc)(void *opaque); typedef int (BufferedCloseFunc)(void *opaque); +typedef void (BufferedWaitForCancelFunc)(void *opaque); + +void wait_for_cancel(void *opaque); QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit, BufferedPutFunc *put_buffer, diff --git a/migration.c b/migration.c index af3a1f2..d8a0abb 100644 --- a/migration.c +++ b/migration.c @@ -284,8 +284,6 @@ int migrate_fd_cleanup(FdMigrationState *s) { int ret = 0; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); - if (s->file) { DPRINTF("closing file\n"); if (qemu_fclose(s->file) != 0) { @@ -307,14 +305,6 @@ int migrate_fd_cleanup(FdMigrationState *s) return ret; } -void migrate_fd_put_notify(void *opaque) -{ - FdMigrationState *s = opaque; - - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); - qemu_file_put_notify(s->file); -} - ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) { FdMigrationState *s = opaque; @@ -327,9 +317,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) if (ret == -1) ret = -(s->get_error(s)); - if (ret == -EAGAIN) { - qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s); - } else if (ret < 0) { + if (ret < 0 && ret != -EAGAIN) { if (s->mon) { monitor_resume(s->mon); } @@ -342,36 +330,40 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) void migrate_fd_connect(FdMigrationState *s) { - int ret; - + s->begin = 1; s->file = qemu_fopen_ops_buffered(s, s->bandwidth_limit, migrate_fd_put_buffer, migrate_fd_put_ready, migrate_fd_wait_for_unfreeze, migrate_fd_close); - - DPRINTF("beginning savevm\n"); - ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, - s->mig_state.shared); - if (ret < 0) { - DPRINTF("failed, %d\n", ret); - migrate_fd_error(s); - return; - } - - migrate_fd_put_ready(s); } void migrate_fd_put_ready(void *opaque) { FdMigrationState *s = opaque; + int ret; if (s->state != MIG_STATE_ACTIVE) { DPRINTF("put_ready returning because of non-active state\n"); + if (s->state == MIG_STATE_CANCELLED) { + migrate_fd_terminate(s); + } return; } + if (s->begin) { + DPRINTF("beginning savevm\n"); + ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, + s->mig_state.shared); + if (ret < 0) { + DPRINTF("failed, %d\n", ret); + migrate_fd_error(s); + return; + } + s->begin = 0; + } + DPRINTF("iterate\n"); if (qemu_savevm_state_iterate(s->mon, s->file) == 1) { int state; @@ -415,6 +407,10 @@ void migrate_fd_cancel(MigrationState *mig_state) DPRINTF("cancelling migration\n"); s->state = MIG_STATE_CANCELLED; +} + +void migrate_fd_terminate(FdMigrationState *s) +{ notifier_list_notify(&migration_state_notifiers); qemu_savevm_state_cancel(s->mon, s->file); @@ -458,7 +454,6 @@ int migrate_fd_close(void *opaque) { FdMigrationState *s = opaque; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); return s->close(s); } diff --git a/migration.h b/migration.h index 050c56c..887f84c 100644 --- a/migration.h +++ b/migration.h @@ -45,9 +45,11 @@ struct FdMigrationState int fd; Monitor *mon; int state; + int begin; int (*get_error)(struct FdMigrationState*); int (*close)(struct FdMigrationState*); int (*write)(struct FdMigrationState*, const void *, size_t); + void (*callback)(void *); void *opaque; }; @@ -118,12 +120,16 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size); void migrate_fd_connect(FdMigrationState *s); +void migrate_fd_begin(void *opaque); + void migrate_fd_put_ready(void *opaque); int migrate_fd_get_status(MigrationState *mig_state); void migrate_fd_cancel(MigrationState *mig_state); +void migrate_fd_terminate(FdMigrationState *s); + void migrate_fd_release(MigrationState *mig_state); void migrate_fd_wait_for_unfreeze(void *opaque);