From patchwork Tue May 25 08:36:50 2010 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Yoshiaki Tamura X-Patchwork-Id: 102112 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by demeter.kernel.org (8.14.3/8.14.3) with ESMTP id o4P8fqmV030305 for ; Tue, 25 May 2010 08:41:52 GMT Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S932080Ab0EYIln (ORCPT ); Tue, 25 May 2010 04:41:43 -0400 Received: from sh.osrg.net ([192.16.179.4]:34900 "EHLO sh.osrg.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1756317Ab0EYIkl (ORCPT ); Tue, 25 May 2010 04:40:41 -0400 Received: from fs.osrg.net (postfix@fs.osrg.net [10.0.0.12]) by sh.osrg.net (8.14.3/8.14.3/OSRG-NET) with ESMTP id o4P8eJ8F010200; Tue, 25 May 2010 17:40:19 +0900 Received: from localhost (hype-wd0.osrg.net [10.72.1.16]) by fs.osrg.net (Postfix) with ESMTP id C54FA3E0300; Tue, 25 May 2010 17:40:17 +0900 (JST) From: Yoshiaki Tamura To: kvm@vger.kernel.org, qemu-devel@nongnu.org Cc: avi@redhat.com, aliguori@us.ibm.com, mtosatti@redhat.com, ohmura.kei@lab.ntt.co.jp, Yoshiaki Tamura Subject: [RFC PATCH 09/23] Introduce fault tolerant VM transaction QEMUFile and ft_mode. Date: Tue, 25 May 2010 17:36:50 +0900 Message-Id: <1274776624-16435-11-git-send-email-tamura.yoshiaki@lab.ntt.co.jp> X-Mailer: git-send-email 1.7.0.31.g1df487 In-Reply-To: <1274776624-16435-1-git-send-email-tamura.yoshiaki@lab.ntt.co.jp> References: <1274776624-16435-1-git-send-email-tamura.yoshiaki@lab.ntt.co.jp> X-Dispatcher: imput version 20070423(IM149) Lines: 536 X-Greylist: IP, sender and recipient auto-whitelisted, not delayed by milter-greylist-4.2.3 (demeter.kernel.org [140.211.167.41]); Tue, 25 May 2010 08:41:52 +0000 (UTC) X-Greylist: Sender IP whitelisted, not delayed by milter-greylist-3.0 (sh.osrg.net [192.16.179.4]); Tue, 25 May 2010 17:40:19 +0900 (JST) X-Virus-Scanned: clamav-milter 0.96 at sh X-Virus-Status: Clean Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org diff --git a/Makefile.objs b/Makefile.objs index b73e2cb..4388fb3 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -78,6 +78,7 @@ common-obj-y += qemu-char.o savevm.o #aio.o common-obj-y += msmouse.o ps2.o common-obj-y += qdev.o qdev-properties.o common-obj-y += qemu-config.o block-migration.o +common-obj-y += ft_transaction.o common-obj-$(CONFIG_BRLAPI) += baum.o common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o migration-fd.o diff --git a/ft_transaction.c b/ft_transaction.c new file mode 100644 index 0000000..92dc681 --- /dev/null +++ b/ft_transaction.c @@ -0,0 +1,418 @@ +/* + * Fault tolerant VM transaction QEMUFile + * + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * This source code is based on buffered_file.c. + * Copyright IBM, Corp. 2008 + * Authors: + * Anthony Liguori + */ + +#include "qemu-common.h" +#include "hw/hw.h" +#include "qemu-timer.h" +#include "sysemu.h" +#include "qemu-char.h" +#include "ft_transaction.h" + +// #define DEBUG_FT_TRANSACTION + +typedef struct QEMUFileFtTranx +{ + FtTranxPutBufferFunc *put_buffer; + FtTranxGetBufferFunc *get_buffer; + FtTranxCloseFunc *close; + void *opaque; + QEMUFile *file; + int has_error; + int is_sender; + int buf_max_size; + enum QEMU_VM_TRANSACTION_STATE tranx_state; + uint16_t tranx_id; + uint32_t seq; +} QEMUFileFtTranx; + +#define IO_BUF_SIZE 32768 + +#ifdef DEBUG_FT_TRANSACTION +#define dprintf(fmt, ...) \ + do { printf("ft_transaction: " fmt, ## __VA_ARGS__); } while (0) +#else +#define dprintf(fmt, ...) \ + do { } while (0) +#endif + +static ssize_t ft_tranx_flush_buffer(void *opaque, void *buf, int size) +{ + QEMUFileFtTranx *s = opaque; + size_t offset = 0; + ssize_t len; + + while (offset < size) { + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - offset); + + if (len <= 0) { + fprintf(stderr, "ft transaction flush buffer failed \n"); + s->has_error = 1; + offset = -EINVAL; + break; + } + + offset += len; + } + + return offset; +} + +static int ft_tranx_send_header(QEMUFileFtTranx *s) +{ + int ret = -1; + + dprintf("send header %d\n", s->tranx_state); + + ret = ft_tranx_flush_buffer(s, &s->tranx_state, sizeof(uint16_t)); + if (ret < 0) { + goto out; + } + ret = ft_tranx_flush_buffer(s, &s->tranx_id, sizeof(uint16_t)); + +out: + return ret; +} + +static int ft_tranx_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size) +{ + QEMUFileFtTranx *s = opaque; + ssize_t ret = -1; + + if (s->has_error) { + fprintf(stderr, "flush when error, bailing\n"); + return -EINVAL; + } + + ret = ft_tranx_send_header(s); + if (ret < 0) { + goto out; + } + + ret = ft_tranx_flush_buffer(s, &s->seq, sizeof(s->seq)); + if (ret < 0) { + goto out; + } + s->seq++; + + ret = ft_tranx_flush_buffer(s, &size, sizeof(uint32_t)); + if (ret < 0) { + goto out; + } + + ret = ft_tranx_flush_buffer(s, (uint8_t *)buf, size); + +out: + return ret; +} + +#if 0 +static int ft_tranx_put_vector(void *opaque, struct iovec *vector, int64_t pos, int count) +{ + QEMUFileFtTranx *s = opaque; + ssize_t ret = -1; + int i; + uint32_t size = 0; + + dprintf("putting %d vectors at %" PRId64 "\n", count, pos); + + if (s->has_error) { + dprintf("put vector when error, bailing\n"); + return -EINVAL; + } + + ret = ft_tranx_send_header(s); + if (ret < 0) { + return ret; + } + + ret = ft_tranx_flush_buffer(s, &s->seq, sizeof(s->seq)); + if (ret < 0) { + return ret; + } + s->seq++; + + for (i = 0; i < count; i++) + size += vector[i].iov_len; + + ret = ft_tranx_flush_buffer(s, &size, sizeof(uint32_t)); + if (ret < 0) { + return ret; + } + + while (count > 0) { + /* + * It will continue calling put_vector even if count > IOV_MAX. + */ + ret = s->put_vector(s->opaque, vector, + ((count>IOV_MAX)?IOV_MAX:count)); + + if (ret <= 0) { + fprintf(stderr, "ft transaction putting vector\n"); + s->has_error = 1; + return ret; + } + + for (i = 0; i < count; i++) { + /* ret represents -(length of remaining data). */ + ret -= vector[i].iov_len; + if (ret < 0) { + vector[i].iov_base += (ret + vector[i].iov_len); + vector[i].iov_len = -ret; + vector = &vector[i]; + break; + } + } + count -= i; + } + + return 0; +} +#endif + +static inline int ft_tranx_fill_buffer(void *opaque, void *buf, int size) +{ + QEMUFileFtTranx *s = opaque; + size_t offset = 0; + ssize_t len; + + while (ft_mode != FT_ERROR && offset < size) { + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset, + 0, size - offset); + if (len <= 0) { + fprintf(stderr, "ft_tranx fill buffer failed\n"); + s->has_error = 1; + return -EINVAL; + } + offset += len; + } + return 0; +} + +/* return QEMU_VM_TRANSACTION type */ +static int ft_tranx_get_next(QEMUFileFtTranx *s) +{ + uint16_t header; + uint16_t tranx_id; + + if ((ft_tranx_fill_buffer(s, &header, sizeof(header)) < 0) || + (ft_tranx_fill_buffer(s, &tranx_id, sizeof(tranx_id)) < 0)) { + return QEMU_VM_TRANSACTION_CANCEL; + } + + s->tranx_id = tranx_id; + + return header; +} + +static int ft_tranx_get_buffer(void *opaque, uint8_t *buf, + int64_t pos, int size) +{ + QEMUFileFtTranx *s = opaque; + QEMUFile *f = s->file; + uint32_t payload_len; + int ret = -1, offset; + + /* get transaction header*/ + ret = ft_tranx_get_next(s); + switch (ret) { + case QEMU_VM_TRANSACTION_BEGIN: + for (offset = 0;;) { + ret = ft_tranx_get_next(s); + /* CONTINUE or COMMIT must come afer BEGIN */ + if ((ret != QEMU_VM_TRANSACTION_CONTINUE) && + (ret != QEMU_VM_TRANSACTION_COMMIT)) { + goto error_out; + } + + if (ft_tranx_fill_buffer(s, &s->seq, sizeof(s->seq)) < 0) { + goto error_out; + } + + if (ret == QEMU_VM_TRANSACTION_COMMIT) { + ret = offset; + dprintf("QEMU_VM_TRANSACTION_COMMIT %d\n", offset); + break; + } + + if (ft_tranx_fill_buffer(s, &payload_len, + sizeof(payload_len)) < 0) { + goto error_out; + } + + /* Extend QEMUFile buf if there weren't enough space. */ + if (payload_len > (s->buf_max_size - offset)) { + s->buf_max_size += (payload_len - (s->buf_max_size - offset)); + buf = qemu_realloc_buffer(f, s->buf_max_size); + } + + if (ft_tranx_fill_buffer(s, buf + offset, payload_len) < 0) { + goto error_out; + } + offset += payload_len; + } + + s->tranx_state = QEMU_VM_TRANSACTION_ACK; + if (ft_tranx_send_header(s) < 0) { + goto error_out; + } + goto out; + + case QEMU_VM_TRANSACTION_ATOMIC: + /* not implemented yet */ + fprintf(stderr, "QEMU_VM_TRANSACTION_ATOMIC not implemented. %d\n", + ret); + goto error_out; + + case QEMU_VM_TRANSACTION_CANCEL: + dprintf("ft transaction canceled %d\n", ret); + ret = -1; + ft_mode = FT_OFF; + goto out; + + default: + fprintf(stderr, "unknown QEMU_VM_TRANSACTION_STATE %d\n", ret); + } + +error_out: + ret = -1; + ft_mode = FT_ERROR; +out: + return ret; +} + +static int ft_tranx_close(void *opaque) +{ + QEMUFileFtTranx *s = opaque; + int ret = -1; + + dprintf("closing\n"); + ret = s->close(s->opaque); + qemu_free(s); + + return ret; +} + +int qemu_ft_tranx_begin(void *opaque) +{ + QEMUFileFtTranx *s = opaque; + int ret = -1; + s->seq = 0; + + if (!s->is_sender && s->tranx_state == QEMU_VM_TRANSACTION_INIT) { + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction. */ + s->tranx_state = QEMU_VM_TRANSACTION_ACK; + ret = ft_tranx_send_header(s); + goto out; + } + + if (s->is_sender) { + if (s->tranx_state == QEMU_VM_TRANSACTION_INIT) { + ret = ft_tranx_get_next(s); + if (ret != QEMU_VM_TRANSACTION_ACK) { + fprintf(stderr, "ft_transaction receiving ack failed\n"); + ret = -1; + goto out; + } + } + + s->tranx_state = QEMU_VM_TRANSACTION_BEGIN; + if ((ret = ft_tranx_send_header(s)) < 0) { + goto out; + } + + s->tranx_state = QEMU_VM_TRANSACTION_CONTINUE; + ret = 0; + } + +out: + return ret; +} + +int qemu_ft_tranx_commit(void *opaque) +{ + QEMUFileFtTranx *s = opaque; + int ret = -1; + + if (!s->is_sender) { + s->tranx_state = QEMU_VM_TRANSACTION_ACK; + ret = ft_tranx_send_header(s); + } else { + /* flush buf before sending COMMIT */ + qemu_fflush(s->file); + + s->tranx_state = QEMU_VM_TRANSACTION_COMMIT; + ret = ft_tranx_send_header(s); + if (ret < 0) { + return ret; + } + + ret = ft_tranx_flush_buffer(s, &s->seq, sizeof(s->seq)); + if (ret < 0) { + return ret; + } + + /* FIX ME: can we remove this if statement? */ + if (ret >= 0) { + ret = ft_tranx_get_next(s); + if (ret != QEMU_VM_TRANSACTION_ACK) { + fprintf(stderr, "ft_transaction receiving ack failed\n"); + return -1; + } + } + + s->tranx_id++; + } + + return ret; +} + +int qemu_ft_tranx_cancel(void *opaque) +{ + QEMUFileFtTranx *s = opaque; + int ret = -1; + + if (s->is_sender) { + s->tranx_state = QEMU_VM_TRANSACTION_CANCEL; + if ((ret = ft_tranx_send_header(s)) < 0) { + fprintf(stderr, "ft cancel failed\n"); + } + } + + return ret; +} + +QEMUFile *qemu_fopen_ops_ft_tranx(void *opaque, + FtTranxPutBufferFunc *put_buffer, + FtTranxGetBufferFunc *get_buffer, + FtTranxCloseFunc *close, + int is_sender) +{ + QEMUFileFtTranx *s; + + s = qemu_mallocz(sizeof(*s)); + + s->opaque = opaque; + s->put_buffer = put_buffer; + s->get_buffer = get_buffer; + s->close = close; + s->buf_max_size = IO_BUF_SIZE; + s->is_sender = is_sender; + s->tranx_id = 0; + s->seq = 0; + + s->file = qemu_fopen_ops(s, ft_tranx_put_buffer, ft_tranx_get_buffer, + ft_tranx_close, NULL, NULL, NULL); + + return s->file; +} diff --git a/ft_transaction.h b/ft_transaction.h new file mode 100644 index 0000000..418106a --- /dev/null +++ b/ft_transaction.h @@ -0,0 +1,54 @@ +/* + * Fault tolerant VM transaction QEMUFile + * + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * This source code is based on buffered_file.h. + * Copyright IBM, Corp. 2008 + * Authors: + * Anthony Liguori + */ + +#ifndef QEMU_FT_TRANSACTION_FILE_H +#define QEMU_FT_TRANSACTION_FILE_H + +#include "hw/hw.h" + +enum QEMU_VM_TRANSACTION_STATE { + QEMU_VM_TRANSACTION_INIT, + QEMU_VM_TRANSACTION_BEGIN, + QEMU_VM_TRANSACTION_CONTINUE, + QEMU_VM_TRANSACTION_COMMIT, + QEMU_VM_TRANSACTION_CANCEL, + QEMU_VM_TRANSACTION_ATOMIC, + QEMU_VM_TRANSACTION_ACK, + QEMU_VM_TRANSACTION_NACK, +}; + +enum FT_MODE { + FT_OFF, + FT_INIT, + FT_TRANSACTION, + FT_ERROR, +}; +extern enum FT_MODE ft_mode; + +typedef ssize_t (FtTranxPutBufferFunc)(void *opaque, const void *data, size_t size); +typedef ssize_t (FtTranxPutVectorFunc)(void *opaque, const struct iovec *iov, int iovcnt); +typedef QEMUFileGetBufferFunc FtTranxGetBufferFunc; +typedef int (FtTranxCloseFunc)(void *opaque); + +int qemu_ft_tranx_begin(void *opaque); +int qemu_ft_tranx_commit(void *opaque); +int qemu_ft_tranx_cancel(void *opaque); + +QEMUFile *qemu_fopen_ops_ft_tranx(void *opaque, + FtTranxPutBufferFunc *put_buffer, + FtTranxGetBufferFunc *get_buffer, + FtTranxCloseFunc *close, + int is_sender); + +#endif diff --git a/migration.c b/migration.c index a2ca6ef..2adf7ad 100644 --- a/migration.c +++ b/migration.c @@ -15,6 +15,7 @@ #include "migration.h" #include "monitor.h" #include "buffered_file.h" +#include "ft_transaction.h" #include "sysemu.h" #include "block.h" #include "qemu_socket.h" @@ -31,6 +32,8 @@ do { } while (0) #endif +enum FT_MODE ft_mode = FT_OFF; + /* Migration speed throttling */ static uint32_t max_throttle = (32 << 20);