@@ -78,6 +78,7 @@ common-obj-y += qemu-char.o savevm.o #aio.o
common-obj-y += msmouse.o ps2.o
common-obj-y += qdev.o qdev-properties.o
common-obj-y += qemu-config.o block-migration.o
+common-obj-y += ft_transaction.o
common-obj-$(CONFIG_BRLAPI) += baum.o
common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o migration-fd.o
new file mode 100644
@@ -0,0 +1,423 @@
+/*
+ * Fault tolerant VM transaction QEMUFile
+ *
+ * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ *
+ * This source code is based on buffered_file.c.
+ * Copyright IBM, Corp. 2008
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ */
+
+#include "qemu-common.h"
+#include "hw/hw.h"
+#include "qemu-timer.h"
+#include "sysemu.h"
+#include "qemu-char.h"
+#include "ft_transaction.h"
+
+// #define DEBUG_FT_TRANSACTION
+
+typedef struct QEMUFileFtTranx
+{
+ FtTranxPutBufferFunc *put_buffer;
+ FtTranxPutVectorFunc *put_vector;
+ FtTranxGetBufferFunc *get_buffer;
+ FtTranxGetVectorFunc *get_vector;
+ FtTranxCloseFunc *close;
+ void *opaque;
+ QEMUFile *file;
+ int has_error;
+ int is_sender;
+ int buf_max_size;
+ enum QEMU_VM_TRANSACTION_STATE tranx_state;
+ uint16_t tranx_id;
+ uint32_t seq;
+} QEMUFileFtTranx;
+
+#define IO_BUF_SIZE 32768
+
+#ifdef DEBUG_FT_TRANSACTION
+#define dprintf(fmt, ...) \
+ do { printf("ft_transaction: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define dprintf(fmt, ...) \
+ do { } while (0)
+#endif
+
+static ssize_t ft_tranx_flush_buffer(void *opaque, void *buf, int size)
+{
+ QEMUFileFtTranx *s = opaque;
+ size_t offset = 0;
+ ssize_t len;
+
+ while (offset < size) {
+ len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - offset);
+
+ if (len <= 0) {
+ fprintf(stderr, "ft transaction flush buffer failed \n");
+ s->has_error = 1;
+ offset = -EINVAL;
+ break;
+ }
+
+ offset += len;
+ }
+
+ return offset;
+}
+
+static int ft_tranx_send_header(QEMUFileFtTranx *s)
+{
+ int ret = -1;
+
+ dprintf("send header %d\n", s->tranx_state);
+
+ ret = ft_tranx_flush_buffer(s, &s->tranx_state, sizeof(uint16_t));
+ if (ret < 0) {
+ goto out;
+ }
+ ret = ft_tranx_flush_buffer(s, &s->tranx_id, sizeof(uint16_t));
+
+out:
+ return ret;
+}
+
+static int ft_tranx_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
+{
+ QEMUFileFtTranx *s = opaque;
+ ssize_t ret = -1;
+
+ if (s->has_error) {
+ fprintf(stderr, "flush when error, bailing\n");
+ return -EINVAL;
+ }
+
+ ret = ft_tranx_send_header(s);
+ if (ret < 0) {
+ goto out;
+ }
+
+ ret = ft_tranx_flush_buffer(s, &s->seq, sizeof(s->seq));
+ if (ret < 0) {
+ goto out;
+ }
+ s->seq++;
+
+ ret = ft_tranx_flush_buffer(s, &size, sizeof(uint32_t));
+ if (ret < 0) {
+ goto out;
+ }
+
+ ret = ft_tranx_flush_buffer(s, (uint8_t *)buf, size);
+
+out:
+ return ret;
+}
+
+static int ft_tranx_put_vector(void *opaque, struct iovec *vector, int64_t pos, int count)
+{
+ QEMUFileFtTranx *s = opaque;
+ ssize_t ret = -1;
+ int i;
+ uint32_t size = 0;
+
+ dprintf("putting %d vectors at %" PRId64 "\n", count, pos);
+
+ if (s->has_error) {
+ dprintf("put vector when error, bailing\n");
+ return -EINVAL;
+ }
+
+ ret = ft_tranx_send_header(s);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = ft_tranx_flush_buffer(s, &s->seq, sizeof(s->seq));
+ if (ret < 0) {
+ return ret;
+ }
+ s->seq++;
+
+ for (i = 0; i < count; i++)
+ size += vector[i].iov_len;
+
+ ret = ft_tranx_flush_buffer(s, &size, sizeof(uint32_t));
+ if (ret < 0) {
+ return ret;
+ }
+
+ while (count > 0) {
+ /*
+ * It will continue calling put_vector even if count > IOV_MAX.
+ */
+ ret = s->put_vector(s->opaque, vector,
+ ((count>IOV_MAX)?IOV_MAX:count));
+
+ if (ret <= 0) {
+ fprintf(stderr, "ft transaction putting vector\n");
+ s->has_error = 1;
+ return ret;
+ }
+
+ for (i = 0; i < count; i++) {
+ /* ret represents -(length of remaining data). */
+ ret -= vector[i].iov_len;
+ if (ret < 0) {
+ vector[i].iov_base += (ret + vector[i].iov_len);
+ vector[i].iov_len = -ret;
+ vector = &vector[i];
+ break;
+ }
+ }
+ count -= i;
+ }
+
+ return 0;
+}
+
+static inline int ft_tranx_fill_buffer(void *opaque, void *buf, int size)
+{
+ QEMUFileFtTranx *s = opaque;
+ size_t offset = 0;
+ ssize_t len;
+
+ while (ft_mode != FT_ERROR && offset < size) {
+ len = s->get_buffer(s->opaque, (uint8_t *)buf + offset,
+ 0, size - offset);
+ if (len <= 0) {
+ fprintf(stderr, "ft_tranx fill buffer failed\n");
+ s->has_error = 1;
+ return -EINVAL;
+ }
+ offset += len;
+ }
+ return 0;
+}
+
+/* return QEMU_VM_TRANSACTION type */
+static int ft_tranx_get_next(QEMUFileFtTranx *s)
+{
+ uint16_t header;
+ uint16_t tranx_id;
+
+ if ((ft_tranx_fill_buffer(s, &header, sizeof(header)) < 0) ||
+ (ft_tranx_fill_buffer(s, &tranx_id, sizeof(tranx_id)) < 0)) {
+ return QEMU_VM_TRANSACTION_CANCEL;
+ }
+
+ s->tranx_id = tranx_id;
+
+ return header;
+}
+
+static int ft_tranx_get_buffer(void *opaque, uint8_t *buf,
+ int64_t pos, int size)
+{
+ QEMUFileFtTranx *s = opaque;
+ QEMUFile *f = s->file;
+ uint32_t payload_len;
+ int ret = -1, offset;
+
+ /* get transaction header*/
+ ret = ft_tranx_get_next(s);
+ switch (ret) {
+ case QEMU_VM_TRANSACTION_BEGIN:
+ for (offset = 0;;) {
+ ret = ft_tranx_get_next(s);
+ /* CONTINUE or COMMIT must come afer BEGIN */
+ if ((ret != QEMU_VM_TRANSACTION_CONTINUE) &&
+ (ret != QEMU_VM_TRANSACTION_COMMIT)) {
+ goto error_out;
+ }
+
+ if (ft_tranx_fill_buffer(s, &s->seq, sizeof(s->seq)) < 0) {
+ goto error_out;
+ }
+
+ if (ret == QEMU_VM_TRANSACTION_COMMIT) {
+ ret = offset;
+ dprintf("QEMU_VM_TRANSACTION_COMMIT %d\n", offset);
+ break;
+ }
+
+ if (ft_tranx_fill_buffer(s, &payload_len,
+ sizeof(payload_len)) < 0) {
+ goto error_out;
+ }
+
+ /* Extend QEMUFile buf if there weren't enough space. */
+ if (payload_len > (s->buf_max_size - offset)) {
+ s->buf_max_size += (payload_len - (s->buf_max_size - offset));
+ buf = qemu_realloc_buffer(f, s->buf_max_size);
+ }
+
+ if (ft_tranx_fill_buffer(s, buf + offset, payload_len) < 0) {
+ goto error_out;
+ }
+ offset += payload_len;
+ }
+
+ s->tranx_state = QEMU_VM_TRANSACTION_ACK;
+ if (ft_tranx_send_header(s) < 0) {
+ goto error_out;
+ }
+ goto out;
+
+ case QEMU_VM_TRANSACTION_ATOMIC:
+ /* not implemented yet */
+ fprintf(stderr, "QEMU_VM_TRANSACTION_ATOMIC not implemented. %d\n",
+ ret);
+ goto error_out;
+
+ case QEMU_VM_TRANSACTION_CANCEL:
+ dprintf("ft transaction canceled %d\n", ret);
+ ret = -1;
+ ft_mode = FT_OFF;
+ goto out;
+
+ default:
+ fprintf(stderr, "unknown QEMU_VM_TRANSACTION_STATE %d\n", ret);
+ }
+
+error_out:
+ ret = -1;
+ ft_mode = FT_ERROR;
+out:
+ return ret;
+}
+
+static int ft_tranx_close(void *opaque)
+{
+ QEMUFileFtTranx *s = opaque;
+ int ret = -1;
+
+ dprintf("closing\n");
+ ret = s->close(s->opaque);
+ qemu_free(s);
+
+ return ret;
+}
+
+int qemu_ft_tranx_begin(void *opaque)
+{
+ QEMUFileFtTranx *s = opaque;
+ int ret = -1;
+ s->seq = 0;
+
+ if (!s->is_sender && s->tranx_state == QEMU_VM_TRANSACTION_INIT) {
+ /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction. */
+ s->tranx_state = QEMU_VM_TRANSACTION_ACK;
+ ret = ft_tranx_send_header(s);
+ goto out;
+ }
+
+ if (s->is_sender) {
+ if (s->tranx_state == QEMU_VM_TRANSACTION_INIT) {
+ ret = ft_tranx_get_next(s);
+ if (ret != QEMU_VM_TRANSACTION_ACK) {
+ fprintf(stderr, "ft_transaction receiving ack failed\n");
+ ret = -1;
+ goto out;
+ }
+ }
+
+ s->tranx_state = QEMU_VM_TRANSACTION_BEGIN;
+ if ((ret = ft_tranx_send_header(s)) < 0) {
+ goto out;
+ }
+
+ s->tranx_state = QEMU_VM_TRANSACTION_CONTINUE;
+ ret = 0;
+ }
+
+out:
+ return ret;
+}
+
+int qemu_ft_tranx_commit(void *opaque)
+{
+ QEMUFileFtTranx *s = opaque;
+ int ret = -1;
+
+ if (!s->is_sender) {
+ s->tranx_state = QEMU_VM_TRANSACTION_ACK;
+ ret = ft_tranx_send_header(s);
+ } else {
+ /* flush buf before sending COMMIT */
+ qemu_fflush(s->file);
+
+ s->tranx_state = QEMU_VM_TRANSACTION_COMMIT;
+ ret = ft_tranx_send_header(s);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = ft_tranx_flush_buffer(s, &s->seq, sizeof(s->seq));
+ if (ret < 0) {
+ return ret;
+ }
+
+ /* FIX ME: can we remove this if statement? */
+ if (ret >= 0) {
+ ret = ft_tranx_get_next(s);
+ if (ret != QEMU_VM_TRANSACTION_ACK) {
+ fprintf(stderr, "ft_transaction receiving ack failed\n");
+ return -1;
+ }
+ }
+
+ s->tranx_id++;
+ }
+
+ return ret;
+}
+
+int qemu_ft_tranx_cancel(void *opaque)
+{
+ QEMUFileFtTranx *s = opaque;
+ int ret = -1;
+
+ if (s->is_sender) {
+ s->tranx_state = QEMU_VM_TRANSACTION_CANCEL;
+ if ((ret = ft_tranx_send_header(s)) < 0) {
+ fprintf(stderr, "ft cancel failed\n");
+ }
+ }
+
+ return ret;
+}
+
+QEMUFile *qemu_fopen_ops_ft_tranx(void *opaque,
+ FtTranxPutBufferFunc *put_buffer,
+ FtTranxPutVectorFunc *put_vector,
+ FtTranxGetBufferFunc *get_buffer,
+ FtTranxGetVectorFunc *get_vector,
+ FtTranxCloseFunc *close,
+ int is_sender)
+{
+ QEMUFileFtTranx *s;
+
+ s = qemu_mallocz(sizeof(*s));
+
+ s->opaque = opaque;
+ s->put_buffer = put_buffer;
+ s->put_vector = put_vector;
+ s->get_buffer = get_buffer;
+ s->get_vector = get_vector;
+ s->close = close;
+ s->buf_max_size = IO_BUF_SIZE;
+ s->is_sender = is_sender;
+ s->tranx_id = 0;
+ s->seq = 0;
+
+ s->file = qemu_fopen_ops(s, ft_tranx_put_buffer, ft_tranx_put_vector,
+ ft_tranx_get_buffer, NULL, ft_tranx_close,
+ NULL, NULL, NULL);
+
+ return s->file;
+}
new file mode 100644
@@ -0,0 +1,57 @@
+/*
+ * Fault tolerant VM transaction QEMUFile
+ *
+ * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ *
+ * This source code is based on buffered_file.h.
+ * Copyright IBM, Corp. 2008
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ */
+
+#ifndef QEMU_FT_TRANSACTION_FILE_H
+#define QEMU_FT_TRANSACTION_FILE_H
+
+#include "hw/hw.h"
+
+enum QEMU_VM_TRANSACTION_STATE {
+ QEMU_VM_TRANSACTION_INIT,
+ QEMU_VM_TRANSACTION_BEGIN,
+ QEMU_VM_TRANSACTION_CONTINUE,
+ QEMU_VM_TRANSACTION_COMMIT,
+ QEMU_VM_TRANSACTION_CANCEL,
+ QEMU_VM_TRANSACTION_ATOMIC,
+ QEMU_VM_TRANSACTION_ACK,
+ QEMU_VM_TRANSACTION_NACK,
+};
+
+enum FT_MODE {
+ FT_OFF,
+ FT_INIT,
+ FT_TRANSACTION,
+ FT_ERROR,
+};
+extern enum FT_MODE ft_mode;
+
+typedef ssize_t (FtTranxPutBufferFunc)(void *opaque, const void *data, size_t size);
+typedef ssize_t (FtTranxPutVectorFunc)(void *opaque, const struct iovec *iov, int iovcnt);
+typedef QEMUFileGetBufferFunc FtTranxGetBufferFunc;
+typedef QEMUFileGetVectorFunc FtTranxGetVectorFunc;
+typedef int (FtTranxCloseFunc)(void *opaque);
+
+int qemu_ft_tranx_begin(void *opaque);
+int qemu_ft_tranx_commit(void *opaque);
+int qemu_ft_tranx_cancel(void *opaque);
+
+QEMUFile *qemu_fopen_ops_ft_tranx(void *opaque,
+ FtTranxPutBufferFunc *put_buffer,
+ FtTranxPutVectorFunc *put_vector,
+ FtTranxGetBufferFunc *get_buffer,
+ FtTranxGetVectorFunc *get_vector,
+ FtTranxCloseFunc *close,
+ int is_sender);
+
+#endif
@@ -15,6 +15,7 @@
#include "migration.h"
#include "monitor.h"
#include "buffered_file.h"
+#include "ft_transaction.h"
#include "sysemu.h"
#include "block.h"
#include "qemu_socket.h"
@@ -31,6 +32,8 @@
do { } while (0)
#endif
+enum FT_MODE ft_mode = FT_OFF;
+
/* Migration speed throttling */
static uint32_t max_throttle = (32 << 20);