diff mbox

[v17,7/8] Implement new driver for block replication

Message ID 1460362979-15146-8-git-send-email-xiecl.fnst@cn.fujitsu.com (mailing list archive)
State New, archived
Headers show

Commit Message

Changlong Xie April 11, 2016, 8:22 a.m. UTC
From: Wen Congyang <wency@cn.fujitsu.com>

Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
Signed-off-by: zhanghailiang <zhang.zhanghailiang@huawei.com>
Signed-off-by: Gonglei <arei.gonglei@huawei.com>
Signed-off-by: Changlong Xie <xiecl.fnst@cn.fujitsu.com>
---
 block/Makefile.objs      |   1 +
 block/replication.c      | 622 +++++++++++++++++++++++++++++++++++++++++++++++
 tests/.gitignore         |   1 +
 tests/Makefile           |   4 +
 tests/test-replication.c | 505 ++++++++++++++++++++++++++++++++++++++
 5 files changed, 1133 insertions(+)
 create mode 100644 block/replication.c
 create mode 100644 tests/test-replication.c

Comments

Stefan Hajnoczi April 13, 2016, 1:02 p.m. UTC | #1
On Mon, Apr 11, 2016 at 04:22:58PM +0800, Changlong Xie wrote:
> +static coroutine_fn int replication_co_writev(BlockDriverState *bs,
> +                                              int64_t sector_num,
> +                                              int remaining_sectors,
> +                                              QEMUIOVector *qiov)
> +{
> +    BDRVReplicationState *s = bs->opaque;
> +    QEMUIOVector hd_qiov;
> +    uint64_t bytes_done = 0;
> +    BdrvChild *top = bs->file;
> +    BdrvChild *base = s->secondary_disk;
> +    BlockDriverState *target;
> +    int ret, n;
> +
> +    ret = replication_get_io_status(s);
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    if (ret == 0) {
> +        ret = bdrv_co_writev(top->bs, sector_num,
> +                             remaining_sectors, qiov);
> +        return replication_return_value(s, ret);
> +    }
> +
> +    /*
> +     * Failover failed, only write to active disk if the sectors
> +     * have already been allocated in active disk/hidden disk.
> +     */
> +    qemu_iovec_init(&hd_qiov, qiov->niov);
> +    while (remaining_sectors > 0) {
> +        ret = bdrv_is_allocated_above(top->bs, base->bs, sector_num,
> +                                      remaining_sectors, &n);
> +        if (ret < 0) {
> +            return ret;
> +        }
> +
> +        qemu_iovec_reset(&hd_qiov);
> +        qemu_iovec_concat(&hd_qiov, qiov, bytes_done, n * BDRV_SECTOR_SIZE);
> +
> +        target = ret ? (top->bs) : (base->bs);
> +        ret = bdrv_co_writev(target, sector_num, n, &hd_qiov);
> +        if (ret < 0) {
> +            return ret;
> +        }
> +
> +        remaining_sectors -= n;
> +        sector_num += n;
> +        bytes_done += n * BDRV_SECTOR_SIZE;
> +    }
> +
> +    return 0;

qemu_iovec_destroy(&hd_qiov) is missing (also in error cases).
Changlong Xie April 14, 2016, 3:46 a.m. UTC | #2
On 04/13/2016 09:02 PM, Stefan Hajnoczi wrote:
> On Mon, Apr 11, 2016 at 04:22:58PM +0800, Changlong Xie wrote:
>> +static coroutine_fn int replication_co_writev(BlockDriverState *bs,
>> +                                              int64_t sector_num,
>> +                                              int remaining_sectors,
>> +                                              QEMUIOVector *qiov)
>> +{
>> +    BDRVReplicationState *s = bs->opaque;
>> +    QEMUIOVector hd_qiov;
>> +    uint64_t bytes_done = 0;
>> +    BdrvChild *top = bs->file;
>> +    BdrvChild *base = s->secondary_disk;
>> +    BlockDriverState *target;
>> +    int ret, n;
>> +
>> +    ret = replication_get_io_status(s);
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +
>> +    if (ret == 0) {
>> +        ret = bdrv_co_writev(top->bs, sector_num,
>> +                             remaining_sectors, qiov);
>> +        return replication_return_value(s, ret);
>> +    }
>> +
>> +    /*
>> +     * Failover failed, only write to active disk if the sectors
>> +     * have already been allocated in active disk/hidden disk.
>> +     */
>> +    qemu_iovec_init(&hd_qiov, qiov->niov);
>> +    while (remaining_sectors > 0) {
>> +        ret = bdrv_is_allocated_above(top->bs, base->bs, sector_num,
>> +                                      remaining_sectors, &n);
>> +        if (ret < 0) {
>> +            return ret;
>> +        }
>> +
>> +        qemu_iovec_reset(&hd_qiov);
>> +        qemu_iovec_concat(&hd_qiov, qiov, bytes_done, n * BDRV_SECTOR_SIZE);
>> +
>> +        target = ret ? (top->bs) : (base->bs);
>> +        ret = bdrv_co_writev(target, sector_num, n, &hd_qiov);
>> +        if (ret < 0) {
>> +            return ret;
>> +        }
>> +
>> +        remaining_sectors -= n;
>> +        sector_num += n;
>> +        bytes_done += n * BDRV_SECTOR_SIZE;
>> +    }
>> +
>> +    return 0;
>
> qemu_iovec_destroy(&hd_qiov) is missing (also in error cases).

Yes, it's memory leak here.

Thanks
	-Xie

>
diff mbox

Patch

diff --git a/block/Makefile.objs b/block/Makefile.objs
index fbfe647..5e28b45 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -23,6 +23,7 @@  block-obj-$(CONFIG_LIBSSH2) += ssh.o
 block-obj-y += accounting.o dirty-bitmap.o
 block-obj-y += write-threshold.o
 block-obj-y += backup.o
+block-obj-y += replication.o
 
 block-obj-y += crypto.o
 
diff --git a/block/replication.c b/block/replication.c
new file mode 100644
index 0000000..8609794
--- /dev/null
+++ b/block/replication.c
@@ -0,0 +1,622 @@ 
+/*
+ * Replication Block filter
+ *
+ * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
+ * Copyright (c) 2016 Intel Corporation
+ * Copyright (c) 2016 FUJITSU LIMITED
+ *
+ * Author:
+ *   Wen Congyang <wency@cn.fujitsu.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu-common.h"
+#include "block/nbd.h"
+#include "block/blockjob.h"
+#include "block/block_int.h"
+#include "qapi/error.h"
+#include "replication.h"
+
+typedef struct BDRVReplicationState {
+    ReplicationMode mode;
+    int replication_state;
+    BdrvChild *active_disk;
+    BdrvChild *hidden_disk;
+    BdrvChild *secondary_disk;
+    char *top_id;
+    ReplicationState *rs;
+    Error *blocker;
+    int orig_hidden_flags;
+    int orig_secondary_flags;
+    int error;
+} BDRVReplicationState;
+
+enum {
+    BLOCK_REPLICATION_NONE,             /* block replication is not started */
+    BLOCK_REPLICATION_RUNNING,          /* block replication is running */
+    BLOCK_REPLICATION_FAILOVER,         /* failover is running in background */
+    BLOCK_REPLICATION_FAILOVER_FAILED,  /* failover failed */
+    BLOCK_REPLICATION_DONE,             /* block replication is done */
+};
+
+static void replication_start(ReplicationState *rs, ReplicationMode mode,
+                              Error **errp);
+static void replication_do_checkpoint(ReplicationState *rs, Error **errp);
+static void replication_get_error(ReplicationState *rs, Error **errp);
+static void replication_stop(ReplicationState *rs, bool failover,
+                             Error **errp);
+
+#define REPLICATION_MODE        "mode"
+#define REPLICATION_TOP_ID      "top-id"
+static QemuOptsList replication_runtime_opts = {
+    .name = "replication",
+    .head = QTAILQ_HEAD_INITIALIZER(replication_runtime_opts.head),
+    .desc = {
+        {
+            .name = REPLICATION_MODE,
+            .type = QEMU_OPT_STRING,
+        },
+        {
+            .name = REPLICATION_TOP_ID,
+            .type = QEMU_OPT_STRING,
+        },
+        { /* end of list */ }
+    },
+};
+
+static ReplicationOps replication_ops = {
+    .start = replication_start,
+    .checkpoint = replication_do_checkpoint,
+    .get_error = replication_get_error,
+    .stop = replication_stop,
+};
+
+static int replication_open(BlockDriverState *bs, QDict *options,
+                            int flags, Error **errp)
+{
+    int ret;
+    BDRVReplicationState *s = bs->opaque;
+    Error *local_err = NULL;
+    QemuOpts *opts = NULL;
+    const char *mode;
+    const char *top_id;
+
+    ret = -EINVAL;
+    opts = qemu_opts_create(&replication_runtime_opts, NULL, 0, &error_abort);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (local_err) {
+        goto fail;
+    }
+
+    mode = qemu_opt_get(opts, REPLICATION_MODE);
+    if (!mode) {
+        error_setg(&local_err, "Missing the option mode");
+        goto fail;
+    }
+
+    if (!strcmp(mode, "primary")) {
+        s->mode = REPLICATION_MODE_PRIMARY;
+    } else if (!strcmp(mode, "secondary")) {
+        s->mode = REPLICATION_MODE_SECONDARY;
+        top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
+        s->top_id = g_strdup(top_id);
+        if (!s->top_id) {
+            error_setg(&local_err, "Missing the option top-id");
+            goto fail;
+        }
+    } else {
+        error_setg(&local_err,
+                   "The option mode's value should be primary or secondary");
+        goto fail;
+    }
+
+    s->rs = replication_new(bs, &replication_ops);
+
+    ret = 0;
+
+fail:
+    qemu_opts_del(opts);
+    error_propagate(errp, local_err);
+
+    return ret;
+}
+
+static void replication_close(BlockDriverState *bs)
+{
+    BDRVReplicationState *s = bs->opaque;
+
+    if (s->mode == REPLICATION_MODE_SECONDARY) {
+        g_free(s->top_id);
+    }
+
+    if (s->replication_state == BLOCK_REPLICATION_RUNNING) {
+        replication_stop(s->rs, false, NULL);
+    }
+
+    replication_remove(s->rs);
+}
+
+static int64_t replication_getlength(BlockDriverState *bs)
+{
+    return bdrv_getlength(bs->file->bs);
+}
+
+static int replication_get_io_status(BDRVReplicationState *s)
+{
+    switch (s->replication_state) {
+    case BLOCK_REPLICATION_NONE:
+        return -EIO;
+    case BLOCK_REPLICATION_RUNNING:
+        return 0;
+    case BLOCK_REPLICATION_FAILOVER:
+        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
+    case BLOCK_REPLICATION_FAILOVER_FAILED:
+        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 1;
+    case BLOCK_REPLICATION_DONE:
+        /*
+         * active commit job completes, and active disk and secondary_disk
+         * is swapped, so we can operate bs->file directly
+         */
+        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
+    default:
+        abort();
+    }
+}
+
+static int replication_return_value(BDRVReplicationState *s, int ret)
+{
+    if (s->mode == REPLICATION_MODE_SECONDARY) {
+        return ret;
+    }
+
+    if (ret < 0) {
+        s->error = ret;
+        ret = 0;
+    }
+
+    return ret;
+}
+
+static coroutine_fn int replication_co_readv(BlockDriverState *bs,
+                                             int64_t sector_num,
+                                             int remaining_sectors,
+                                             QEMUIOVector *qiov)
+{
+    BDRVReplicationState *s = bs->opaque;
+    int ret;
+
+    if (s->mode == REPLICATION_MODE_PRIMARY) {
+        /* We only use it to forward primary write requests */
+        return -EIO;
+    }
+
+    ret = replication_get_io_status(s);
+    if (ret < 0) {
+        return ret;
+    }
+
+    ret = bdrv_co_readv(bs->file->bs, sector_num, remaining_sectors, qiov);
+    return replication_return_value(s, ret);
+}
+
+static coroutine_fn int replication_co_writev(BlockDriverState *bs,
+                                              int64_t sector_num,
+                                              int remaining_sectors,
+                                              QEMUIOVector *qiov)
+{
+    BDRVReplicationState *s = bs->opaque;
+    QEMUIOVector hd_qiov;
+    uint64_t bytes_done = 0;
+    BdrvChild *top = bs->file;
+    BdrvChild *base = s->secondary_disk;
+    BlockDriverState *target;
+    int ret, n;
+
+    ret = replication_get_io_status(s);
+    if (ret < 0) {
+        return ret;
+    }
+
+    if (ret == 0) {
+        ret = bdrv_co_writev(top->bs, sector_num,
+                             remaining_sectors, qiov);
+        return replication_return_value(s, ret);
+    }
+
+    /*
+     * Failover failed, only write to active disk if the sectors
+     * have already been allocated in active disk/hidden disk.
+     */
+    qemu_iovec_init(&hd_qiov, qiov->niov);
+    while (remaining_sectors > 0) {
+        ret = bdrv_is_allocated_above(top->bs, base->bs, sector_num,
+                                      remaining_sectors, &n);
+        if (ret < 0) {
+            return ret;
+        }
+
+        qemu_iovec_reset(&hd_qiov);
+        qemu_iovec_concat(&hd_qiov, qiov, bytes_done, n * BDRV_SECTOR_SIZE);
+
+        target = ret ? (top->bs) : (base->bs);
+        ret = bdrv_co_writev(target, sector_num, n, &hd_qiov);
+        if (ret < 0) {
+            return ret;
+        }
+
+        remaining_sectors -= n;
+        sector_num += n;
+        bytes_done += n * BDRV_SECTOR_SIZE;
+    }
+
+    return 0;
+}
+
+static bool replication_recurse_is_first_non_filter(BlockDriverState *bs,
+                                                    BlockDriverState *candidate)
+{
+    return bdrv_recurse_is_first_non_filter(bs->file->bs, candidate);
+}
+
+static void secondary_do_checkpoint(BDRVReplicationState *s, Error **errp)
+{
+    Error *local_err = NULL;
+    int ret;
+
+    if (!s->secondary_disk->bs->job) {
+        error_setg(errp, "Backup job was cancelled unexpectedly");
+        return;
+    }
+
+    backup_do_checkpoint(s->secondary_disk->bs->job, &local_err);
+    if (local_err) {
+        error_propagate(errp, local_err);
+        return;
+    }
+
+    ret = s->active_disk->bs->drv->bdrv_make_empty(s->active_disk->bs);
+    if (ret < 0) {
+        error_setg(errp, "Cannot make active disk empty");
+        return;
+    }
+
+    ret = s->hidden_disk->bs->drv->bdrv_make_empty(s->hidden_disk->bs);
+    if (ret < 0) {
+        error_setg(errp, "Cannot make hidden disk empty");
+        return;
+    }
+}
+
+static void reopen_backing_file(BDRVReplicationState *s, bool writable,
+                                Error **errp)
+{
+    BlockReopenQueue *reopen_queue = NULL;
+    int orig_hidden_flags, orig_secondary_flags;
+    int new_hidden_flags, new_secondary_flags;
+    Error *local_err = NULL;
+
+    if (writable) {
+        orig_hidden_flags = s->orig_hidden_flags =
+                                bdrv_get_flags(s->hidden_disk->bs);
+        new_hidden_flags = (orig_hidden_flags | BDRV_O_RDWR) &
+                                                    ~BDRV_O_INACTIVE;
+        orig_secondary_flags = s->orig_secondary_flags =
+                                bdrv_get_flags(s->secondary_disk->bs);
+        new_secondary_flags = (orig_secondary_flags | BDRV_O_RDWR) &
+                                                     ~BDRV_O_INACTIVE;
+    } else {
+        orig_hidden_flags = (s->orig_hidden_flags | BDRV_O_RDWR) &
+                                                    ~BDRV_O_INACTIVE;
+        new_hidden_flags = s->orig_hidden_flags;
+        orig_secondary_flags = (s->orig_secondary_flags | BDRV_O_RDWR) &
+                                                    ~BDRV_O_INACTIVE;
+        new_secondary_flags = s->orig_secondary_flags;
+    }
+
+    if (orig_hidden_flags != new_hidden_flags) {
+        reopen_queue = bdrv_reopen_queue(reopen_queue, s->hidden_disk->bs, NULL,
+                                         new_hidden_flags);
+    }
+
+    if (!(orig_secondary_flags & BDRV_O_RDWR)) {
+        reopen_queue = bdrv_reopen_queue(reopen_queue, s->secondary_disk->bs,
+                                         NULL, new_secondary_flags);
+    }
+
+    if (reopen_queue) {
+        bdrv_reopen_multiple(reopen_queue, &local_err);
+        error_propagate(errp, local_err);
+    }
+}
+
+static void backup_job_cleanup(BDRVReplicationState *s)
+{
+    BlockDriverState *top_bs;
+
+    top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
+    if (!top_bs) {
+        return;
+    }
+    bdrv_op_unblock_all(top_bs, s->blocker);
+    error_free(s->blocker);
+    reopen_backing_file(s, false, NULL);
+}
+
+static void backup_job_completed(void *opaque, int ret)
+{
+    BDRVReplicationState *s = opaque;
+
+    if (s->replication_state != BLOCK_REPLICATION_FAILOVER) {
+        /* The backup job is cancelled unexpectedly */
+        s->error = -EIO;
+    }
+
+    backup_job_cleanup(s);
+}
+
+static void replication_start(ReplicationState *rs, ReplicationMode mode,
+                              Error **errp)
+{
+    BlockDriverState *bs = rs->opaque;
+    BDRVReplicationState *s;
+    BlockDriverState *top_bs;
+    int64_t active_length, hidden_length, disk_length;
+    AioContext *aio_context;
+    Error *local_err = NULL;
+
+    aio_context = bdrv_get_aio_context(bs);
+    aio_context_acquire(aio_context);
+    s = bs->opaque;
+
+    if (s->replication_state != BLOCK_REPLICATION_NONE) {
+        error_setg(errp, "Block replication is running or done");
+        aio_context_release(aio_context);
+        return;
+    }
+
+    if (s->mode != mode) {
+        error_setg(errp, "The parameter mode's value is invalid, needs %d,"
+                   " but got %d", s->mode, mode);
+        aio_context_release(aio_context);
+        return;
+    }
+
+    switch (s->mode) {
+    case REPLICATION_MODE_PRIMARY:
+        break;
+    case REPLICATION_MODE_SECONDARY:
+        s->active_disk = bs->file;
+        if (!s->active_disk || !s->active_disk->bs ||
+                                    !s->active_disk->bs->backing) {
+            error_setg(errp, "Active disk doesn't have backing file");
+            aio_context_release(aio_context);
+            return;
+        }
+
+        s->hidden_disk = s->active_disk->bs->backing;
+        if (!s->hidden_disk->bs || !s->hidden_disk->bs->backing) {
+            error_setg(errp, "Hidden disk doesn't have backing file");
+            aio_context_release(aio_context);
+            return;
+        }
+
+        s->secondary_disk = s->hidden_disk->bs->backing;
+        if (!s->secondary_disk->bs || !s->secondary_disk->bs->blk) {
+            error_setg(errp, "The secondary disk doesn't have block backend");
+            aio_context_release(aio_context);
+            return;
+        }
+
+        /* verify the length */
+        active_length = bdrv_getlength(s->active_disk->bs);
+        hidden_length = bdrv_getlength(s->hidden_disk->bs);
+        disk_length = bdrv_getlength(s->secondary_disk->bs);
+        if (active_length < 0 || hidden_length < 0 || disk_length < 0 ||
+            active_length != hidden_length || hidden_length != disk_length) {
+            error_setg(errp, "active disk, hidden disk, secondary disk's length"
+                       " are not the same");
+            aio_context_release(aio_context);
+            return;
+        }
+
+        if (!s->active_disk->bs->drv->bdrv_make_empty ||
+            !s->hidden_disk->bs->drv->bdrv_make_empty) {
+            error_setg(errp,
+                       "active disk or hidden disk doesn't support make_empty");
+            aio_context_release(aio_context);
+            return;
+        }
+
+        /* reopen the backing file in r/w mode */
+        reopen_backing_file(s, true, &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            aio_context_release(aio_context);
+            return;
+        }
+
+        /* start backup job now */
+        error_setg(&s->blocker,
+                   "block device is in use by internal backup job");
+
+        top_bs = bdrv_lookup_bs(s->top_id, s->top_id, errp);
+        if (!top_bs) {
+            aio_context_release(aio_context);
+            return;
+        }
+        bdrv_op_block_all(top_bs, s->blocker);
+        bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);
+
+        /*
+         * Must protect backup target if backup job was stopped/cancelled
+         * unexpectedly
+         */
+        bdrv_ref(s->hidden_disk->bs);
+
+        backup_start(s->secondary_disk->bs, s->hidden_disk->bs, 0,
+                     MIRROR_SYNC_MODE_NONE, NULL, BLOCKDEV_ON_ERROR_REPORT,
+                     BLOCKDEV_ON_ERROR_REPORT, backup_job_completed,
+                     s, NULL, &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            backup_job_cleanup(s);
+            bdrv_unref(s->hidden_disk->bs);
+            aio_context_release(aio_context);
+            return;
+        }
+        break;
+    default:
+        aio_context_release(aio_context);
+        abort();
+    }
+
+    s->replication_state = BLOCK_REPLICATION_RUNNING;
+
+    if (s->mode == REPLICATION_MODE_SECONDARY) {
+        secondary_do_checkpoint(s, errp);
+    }
+
+    s->error = 0;
+    aio_context_release(aio_context);
+}
+
+static void replication_do_checkpoint(ReplicationState *rs, Error **errp)
+{
+    BlockDriverState *bs = rs->opaque;
+    BDRVReplicationState *s;
+    AioContext *aio_context;
+
+    aio_context = bdrv_get_aio_context(bs);
+    aio_context_acquire(aio_context);
+    s = bs->opaque;
+
+    if (s->mode == REPLICATION_MODE_SECONDARY) {
+        secondary_do_checkpoint(s, errp);
+    }
+    aio_context_release(aio_context);
+}
+
+static void replication_get_error(ReplicationState *rs, Error **errp)
+{
+    BlockDriverState *bs = rs->opaque;
+    BDRVReplicationState *s;
+    AioContext *aio_context;
+
+    aio_context = bdrv_get_aio_context(bs);
+    aio_context_acquire(aio_context);
+    s = bs->opaque;
+
+    if (s->replication_state != BLOCK_REPLICATION_RUNNING) {
+        error_setg(errp, "Block replication is not running");
+        aio_context_release(aio_context);
+        return;
+    }
+
+    if (s->error) {
+        error_setg(errp, "I/O error occurred");
+        aio_context_release(aio_context);
+        return;
+    }
+    aio_context_release(aio_context);
+}
+
+static void replication_done(void *opaque, int ret)
+{
+    BlockDriverState *bs = opaque;
+    BDRVReplicationState *s = bs->opaque;
+
+    if (ret == 0) {
+        s->replication_state = BLOCK_REPLICATION_DONE;
+
+        /* refresh top bs's filename */
+        bdrv_refresh_filename(bs);
+        s->active_disk = NULL;
+        s->secondary_disk = NULL;
+        s->hidden_disk = NULL;
+        s->error = 0;
+    } else {
+        s->replication_state = BLOCK_REPLICATION_FAILOVER_FAILED;
+        s->error = -EIO;
+    }
+}
+
+static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
+{
+    BlockDriverState *bs = rs->opaque;
+    BDRVReplicationState *s;
+    AioContext *aio_context;
+
+    aio_context = bdrv_get_aio_context(bs);
+    aio_context_acquire(aio_context);
+    s = bs->opaque;
+
+    if (s->replication_state != BLOCK_REPLICATION_RUNNING) {
+        error_setg(errp, "Block replication is not running");
+        aio_context_release(aio_context);
+        return;
+    }
+
+    switch (s->mode) {
+    case REPLICATION_MODE_PRIMARY:
+        s->replication_state = BLOCK_REPLICATION_DONE;
+        s->error = 0;
+        break;
+    case REPLICATION_MODE_SECONDARY:
+        if (!failover) {
+            /*
+             * This BDS will be closed, and the job should be completed
+             * before the BDS is closed, because we will access hidden
+             * disk, secondary disk in backup_job_completed().
+             */
+            if (s->secondary_disk->bs->job) {
+                block_job_cancel_sync(s->secondary_disk->bs->job);
+            }
+            secondary_do_checkpoint(s, errp);
+            s->replication_state = BLOCK_REPLICATION_DONE;
+            aio_context_release(aio_context);
+            return;
+        }
+
+        s->replication_state = BLOCK_REPLICATION_FAILOVER;
+        if (s->secondary_disk->bs->job) {
+            block_job_cancel(s->secondary_disk->bs->job);
+        }
+
+        commit_active_start(s->active_disk->bs, s->secondary_disk->bs, 0,
+                            BLOCKDEV_ON_ERROR_REPORT, replication_done,
+                            bs, errp, true);
+        break;
+    default:
+        aio_context_release(aio_context);
+        abort();
+    }
+    aio_context_release(aio_context);
+}
+
+BlockDriver bdrv_replication = {
+    .format_name                = "replication",
+    .protocol_name              = "replication",
+    .instance_size              = sizeof(BDRVReplicationState),
+
+    .bdrv_open                  = replication_open,
+    .bdrv_close                 = replication_close,
+
+    .bdrv_getlength             = replication_getlength,
+    .bdrv_co_readv              = replication_co_readv,
+    .bdrv_co_writev             = replication_co_writev,
+
+    .is_filter                  = true,
+    .bdrv_recurse_is_first_non_filter = replication_recurse_is_first_non_filter,
+
+    .has_variable_length        = true,
+};
+
+static void bdrv_replication_init(void)
+{
+    bdrv_register(&bdrv_replication);
+}
+
+block_init(bdrv_replication_init);
diff --git a/tests/.gitignore b/tests/.gitignore
index 9eed229..a303aaa 100644
--- a/tests/.gitignore
+++ b/tests/.gitignore
@@ -57,6 +57,7 @@  test-qmp-introspect.[ch]
 test-qmp-marshal.c
 test-qmp-output-visitor
 test-rcu-list
+test-replication
 test-rfifolock
 test-string-input-visitor
 test-string-output-visitor
diff --git a/tests/Makefile b/tests/Makefile
index 9de9598..babd2de 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -99,6 +99,7 @@  check-unit-y += tests/test-crypto-xts$(EXESUF)
 check-unit-y += tests/test-crypto-block$(EXESUF)
 gcov-files-test-logging-y = tests/test-logging.c
 check-unit-y += tests/test-logging$(EXESUF)
+check-unit-y += tests/test-replication$(EXESUF)
 
 check-block-$(CONFIG_POSIX) += tests/qemu-iotests-quick.sh
 
@@ -442,6 +443,9 @@  tests/test-base64$(EXESUF): tests/test-base64.o \
 
 tests/test-logging$(EXESUF): tests/test-logging.o $(test-util-obj-y)
 
+tests/test-replication$(EXESUF): tests/test-replication.o $(test-util-obj-y) \
+	$(test-block-obj-y)
+
 tests/test-qapi-types.c tests/test-qapi-types.h :\
 $(SRC_PATH)/tests/qapi-schema/qapi-schema-test.json $(SRC_PATH)/scripts/qapi-types.py $(qapi-py)
 	$(call quiet-command,$(PYTHON) $(SRC_PATH)/scripts/qapi-types.py \
diff --git a/tests/test-replication.c b/tests/test-replication.c
new file mode 100644
index 0000000..0d1f346
--- /dev/null
+++ b/tests/test-replication.c
@@ -0,0 +1,505 @@ 
+/*
+ * Block replication tests
+ *
+ * Copyright (c) 2016 FUJITSU LIMITED
+ * Author: Changlong Xie <xiecl.fnst@cn.fujitsu.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+
+#include "qapi/error.h"
+#include "replication.h"
+#include "block/block_int.h"
+#include "sysemu/block-backend.h"
+
+#define IMG_SIZE (64 * 1024 * 1024)
+
+/* primary */
+#define P_LOCAL_DISK "/tmp/p_local_disk.XXXXXX"
+#define P_COMMAND "driver=replication,mode=primary,node-name=xxx,"\
+                  "file.driver=qcow2,file.file.filename="P_LOCAL_DISK
+
+/* secondary */
+#define S_LOCAL_DISK "/tmp/s_local_disk.XXXXXX"
+#define S_ACTIVE_DISK "/tmp/s_active_disk.XXXXXX"
+#define S_HIDDEN_DISK "/tmp/s_hidden_disk.XXXXXX"
+#define S_ID "secondary-id"
+#define S_LOCAL_DISK_ID "secondary-local-disk-id"
+#define S_COMMAND1 "file.filename="S_LOCAL_DISK",driver=qcow2"
+#define S_COMMAND2 "driver=replication,mode=secondary,top-id="S_ID","\
+                   "file.driver=qcow2,file.file.filename="S_ACTIVE_DISK","\
+                   "file.backing.driver=qcow2,file.backing.file.filename="\
+                   ""S_HIDDEN_DISK",file.backing.backing="S_LOCAL_DISK_ID
+
+/* FIXME: steal from blockdev.c, any better way? */
+QemuOptsList qemu_drive_opts = {
+    .name = "drive",
+    .head = QTAILQ_HEAD_INITIALIZER(qemu_drive_opts.head),
+    .desc = {
+        { /* end of list */ }
+    },
+};
+
+static void io_read(BlockDriverState *bs, long pattern, int64_t pattern_offset,
+                    int64_t pattern_count, int64_t offset, int64_t count,
+                    bool expect_failed)
+{
+    char *buf;
+    void *cmp_buf;
+    int ret;
+
+    /* 1. alloc pattern buffer */
+    if (pattern) {
+        cmp_buf = g_malloc(pattern_count);
+        memset(cmp_buf, pattern, pattern_count);
+    }
+
+    /* 2. alloc read buffer */
+    buf = qemu_blockalign(bs, count);
+    memset(buf, 0xab, count);
+
+    /* 3. do read */
+    ret = bdrv_read(bs, offset >> 9, (uint8_t *)buf, count >> 9);
+
+    /* 4. assert and compare buf */
+    if (expect_failed) {
+        g_assert(ret < 0);
+    } else {
+        g_assert(ret >= 0);
+        if (pattern) {
+            g_assert(memcmp(buf + pattern_offset, cmp_buf, pattern_count) <= 0);
+            g_free(cmp_buf);
+        }
+    }
+    g_free(buf);
+}
+
+static void io_write(BlockDriverState *bs, long pattern, int64_t pattern_count,
+                     int64_t offset, int64_t count, bool expect_failed)
+{
+    void *pattern_buf;
+    int ret;
+
+    /* 1. alloc pattern buffer */
+    if (pattern) {
+        pattern_buf = g_malloc(pattern_count);
+        memset(pattern_buf, pattern, pattern_count);
+    }
+
+    /* 2. do write */
+    if (pattern) {
+        ret = bdrv_write(bs, offset >> 9, (uint8_t *)pattern_buf, count >> 9);
+    } else {
+        ret = bdrv_write_zeroes(bs, offset >> 9, count >> 9, 0);
+    }
+
+    /* 3. assert */
+    if (expect_failed) {
+        g_assert(ret < 0);
+    } else {
+        g_assert(ret >= 0);
+        g_free(pattern_buf);
+    }
+}
+
+static void prepare_imgs(void)
+{
+    Error *local_err = NULL;
+
+    /* Primary */
+    bdrv_img_create(P_LOCAL_DISK, "qcow2", NULL, NULL, NULL, IMG_SIZE,
+                    BDRV_O_RDWR, &local_err, true);
+    g_assert(!local_err);
+
+    /* Secondary */
+    bdrv_img_create(S_LOCAL_DISK, "qcow2", NULL, NULL, NULL, IMG_SIZE,
+                    BDRV_O_RDWR, &local_err, true);
+    g_assert(!local_err);
+    bdrv_img_create(S_ACTIVE_DISK, "qcow2", NULL, NULL, NULL, IMG_SIZE,
+                    BDRV_O_RDWR, &local_err, true);
+    g_assert(!local_err);
+    bdrv_img_create(S_HIDDEN_DISK, "qcow2", NULL, NULL, NULL, IMG_SIZE,
+                    BDRV_O_RDWR, &local_err, true);
+    g_assert(!local_err);
+}
+
+static void cleanup_imgs(void)
+{
+    /* Primary */
+    unlink(P_LOCAL_DISK);
+
+    /* Secondary */
+    unlink(S_LOCAL_DISK);
+    unlink(S_ACTIVE_DISK);
+    unlink(S_HIDDEN_DISK);
+}
+
+static BlockDriverState *start_primary(void)
+{
+    BlockDriverState *bs;
+    QemuOpts *opts;
+    QDict *qdict;
+    Error *local_err = NULL;
+    int ret;
+
+    /* init Primary BS without BB */
+    bs = bdrv_new();
+    g_assert(bs);
+
+    opts = qemu_opts_parse_noisily(&qemu_drive_opts, P_COMMAND, false);
+    qdict = qemu_opts_to_qdict(opts, NULL);
+
+    qdict_set_default_str(qdict, BDRV_OPT_CACHE_DIRECT, "off");
+    qdict_set_default_str(qdict, BDRV_OPT_CACHE_NO_FLUSH, "off");
+
+    ret = bdrv_open(&bs, NULL, NULL, qdict, BDRV_O_RDWR, &local_err);
+    g_assert(ret >= 0);
+    g_assert(!local_err);
+
+    qemu_opts_del(opts);
+
+    return bs;
+}
+
+static void teardown_primary(BlockDriverState *bs)
+{
+    /* only destroy BS, since we didn't initialize BB in Primary */
+    bdrv_unref(bs);
+}
+
+static void test_primary_read(void)
+{
+    BlockDriverState *bs;
+
+    bs = start_primary();
+    /* read from 0 to IMG_SIZE */
+    io_read(bs, 0, 0, IMG_SIZE, 0, IMG_SIZE, true);
+
+    teardown_primary(bs);
+}
+
+static void test_primary_write(void)
+{
+    BlockDriverState *bs;
+
+    bs = start_primary();
+    /* write from 0 to IMG_SIZE */
+    io_write(bs, 0, IMG_SIZE, 0, IMG_SIZE, true);
+
+    teardown_primary(bs);
+}
+
+static void test_primary_start(void)
+{
+    BlockDriverState *bs;
+    Error *local_err = NULL;
+
+    bs = start_primary();
+
+    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
+    g_assert(!local_err);
+    /* read from 0 to IMG_SIZE */
+    io_read(bs, 0, 0, IMG_SIZE, 0, IMG_SIZE, true);
+
+    /* write 0x22 from 0 to IMG_SIZE */
+    io_write(bs, 0x22, IMG_SIZE, 0, IMG_SIZE, false);
+
+    teardown_primary(bs);
+}
+
+static void test_primary_stop(void)
+{
+    BlockDriverState *bs;
+    Error *local_err = NULL;
+    bool failover = true;
+
+    bs = start_primary();
+
+    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
+    g_assert(!local_err);
+
+    replication_stop_all(failover, &local_err);
+    g_assert(!local_err);
+
+    teardown_primary(bs);
+}
+
+static void test_primary_do_checkpoint(void)
+{
+    BlockDriverState *bs;
+    Error *local_err = NULL;
+
+    bs = start_primary();
+
+    replication_do_checkpoint_all(&local_err);
+    g_assert(!local_err);
+
+    teardown_primary(bs);
+}
+
+static void test_primary_get_error(void)
+{
+    BlockDriverState *bs;
+    Error *local_err = NULL;
+
+    bs = start_primary();
+
+    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
+    g_assert(!local_err);
+
+    replication_get_error_all(&local_err);
+    g_assert(!local_err);
+
+    teardown_primary(bs);
+}
+
+static BlockDriverState *start_secondary(void)
+{
+    QemuOpts *opts;
+    QDict *qdict;
+    BlockBackend *blk;
+    BlockDriverState *bs;
+    Error *local_err = NULL;
+
+    /* 1. add S_LOCAL_DISK and forge S_LOCAL_DISK_ID */
+    opts = qemu_opts_parse_noisily(&qemu_drive_opts, S_COMMAND1, false);
+    qdict = qemu_opts_to_qdict(opts, NULL);
+
+    qdict_set_default_str(qdict, BDRV_OPT_CACHE_DIRECT, "off");
+    qdict_set_default_str(qdict, BDRV_OPT_CACHE_NO_FLUSH, "off");
+
+    blk = blk_new_open(NULL, NULL, qdict, BDRV_O_RDWR, &local_err);
+    assert(blk);
+    monitor_add_blk(blk, S_LOCAL_DISK_ID, &local_err);
+    g_assert(!local_err);
+
+    /* 2. format S_LOCAL_DISK with pattern "0x11" */
+    bs = blk_bs(blk);
+    io_write(bs, 0x11, IMG_SIZE, 0, IMG_SIZE, false);
+
+    qemu_opts_del(opts);
+
+    /* 3. add S_(ACTIVE/HIDDEN)_DISK and forge S_ID */
+    opts = qemu_opts_parse_noisily(&qemu_drive_opts, S_COMMAND2, false);
+    qdict = qemu_opts_to_qdict(opts, NULL);
+
+    qdict_set_default_str(qdict, BDRV_OPT_CACHE_DIRECT, "off");
+    qdict_set_default_str(qdict, BDRV_OPT_CACHE_NO_FLUSH, "off");
+
+    blk = blk_new_open(NULL, NULL, qdict, BDRV_O_RDWR, &local_err);
+    assert(blk);
+    monitor_add_blk(blk, S_ID, &local_err);
+    g_assert(!local_err);
+
+    qemu_opts_del(opts);
+
+    /* return top bs */
+    return blk_bs(blk);
+}
+
+static void teardown_secondary(void)
+{
+    /* only need to destroy two BBs */
+    BlockBackend *blk;
+
+    /* 1. remove S_LOCAL_DISK_ID */
+    blk = blk_by_name(S_LOCAL_DISK_ID);
+    assert(blk);
+
+    monitor_remove_blk(blk);
+    blk_unref(blk);
+
+    /* 2. remove S_ID */
+    blk = blk_by_name(S_ID);
+    assert(blk);
+
+    monitor_remove_blk(blk);
+    blk_unref(blk);
+}
+
+static void test_secondary_read(void)
+{
+    BlockDriverState *top_bs;
+
+    top_bs = start_secondary();
+    /* read from 0 to IMG_SIZE */
+    io_read(top_bs, 0, 0, IMG_SIZE, 0, IMG_SIZE, true);
+
+    teardown_secondary();
+}
+
+static void test_secondary_write(void)
+{
+    BlockDriverState *bs;
+
+    bs = start_secondary();
+    /* write from 0 to IMG_SIZE */
+    io_write(bs, 0, IMG_SIZE, 0, IMG_SIZE, true);
+
+    teardown_secondary();
+}
+
+static void test_secondary_start(void)
+{
+    BlockBackend *blk;
+    BlockDriverState *top_bs, *local_bs;
+    Error *local_err = NULL;
+    bool failover = true;
+
+    top_bs = start_secondary();
+    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
+    g_assert(!local_err);
+
+    /* 1. read from S_LOCAL_DISK (0, IMG_SIZE) */
+    io_read(top_bs, 0x11, 0, IMG_SIZE, 0, IMG_SIZE, false);
+
+    /* 2. write 0x22 to S_LOCAL_DISK (IMG_SIZE / 2, IMG_SIZE) */
+    blk = blk_by_name(S_LOCAL_DISK_ID);
+    local_bs = blk_bs(blk);
+
+    io_write(local_bs, 0x22, IMG_SIZE / 2, IMG_SIZE / 2, IMG_SIZE / 2, false);
+
+    /* 2.1 replication will backup S_LOCAL_DISK to S_HIDDEN_DISK */
+    io_read(top_bs, 0x11, IMG_SIZE / 2, IMG_SIZE / 2, 0, IMG_SIZE, false);
+
+    /* 3. write 0x33 to S_ACTIVE_DISK (0, IMG_SIZE / 2) */
+    io_write(top_bs, 0x33, IMG_SIZE / 2, 0, IMG_SIZE / 2, false);
+
+    /* 3.1 read from S_ACTIVE_DISK (0, IMG_SIZE/2) */
+    io_read(top_bs, 0x33, 0, IMG_SIZE / 2, 0, IMG_SIZE / 2, false);
+
+    /* unblock top_bs */
+    replication_stop_all(failover, &local_err);
+    g_assert(!local_err);
+
+    teardown_secondary();
+}
+
+static void test_secondary_stop(void)
+{
+    BlockBackend *blk;
+    BlockDriverState *top_bs, *local_bs;
+    Error *local_err = NULL;
+    bool failover = true;
+
+    top_bs = start_secondary();
+    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
+    g_assert(!local_err);
+
+    /* 1. write 0x22 to S_LOCAL_DISK (IMG_SIZE / 2, IMG_SIZE) */
+    blk = blk_by_name(S_LOCAL_DISK_ID);
+    local_bs = blk_bs(blk);
+
+    io_write(local_bs, 0x22, IMG_SIZE / 2, IMG_SIZE / 2, IMG_SIZE / 2, false);
+
+    /* 2. replication will backup S_LOCAL_DISK to S_HIDDEN_DISK */
+    io_read(top_bs, 0x11, IMG_SIZE / 2, IMG_SIZE / 2, 0, IMG_SIZE, false);
+
+    /* 3. write 0x33 to S_ACTIVE_DISK (0, IMG_SIZE / 2) */
+    io_write(top_bs, 0x33, IMG_SIZE / 2, 0, IMG_SIZE / 2, false);
+
+    /* 4. do active commit */
+    replication_stop_all(failover, &local_err);
+    g_assert(!local_err);
+
+    /* 5. read from S_LOCAL_DISK (0, IMG_SIZE / 2) */
+    io_read(top_bs, 0x33, 0, IMG_SIZE / 2, 0, IMG_SIZE / 2, false);
+
+    /* 6. read from S_LOCAL_DISK (IMG_SIZE / 2, IMG_SIZE) */
+    io_read(top_bs, 0x22, IMG_SIZE / 2, IMG_SIZE / 2, 0, IMG_SIZE, false);
+
+    teardown_secondary();
+}
+
+static void test_secondary_do_checkpoint(void)
+{
+    BlockBackend *blk;
+    BlockDriverState *top_bs, *local_bs;
+    Error *local_err = NULL;
+    bool failover = true;
+
+    top_bs = start_secondary();
+    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
+    g_assert(!local_err);
+
+    /* 1. write 0x22 to S_LOCAL_DISK (IMG_SIZE / 2, IMG_SIZE) */
+    blk = blk_by_name(S_LOCAL_DISK_ID);
+    local_bs = blk_bs(blk);
+
+    io_write(local_bs, 0x22, IMG_SIZE / 2, IMG_SIZE / 2, IMG_SIZE / 2, false);
+
+    /* 2. replication will backup S_LOCAL_DISK to S_HIDDEN_DISK */
+    io_read(top_bs, 0x11, IMG_SIZE / 2, IMG_SIZE / 2, 0, IMG_SIZE, false);
+
+    replication_do_checkpoint_all(&local_err);
+    g_assert(!local_err);
+
+    /* 3. after checkpoint, read pattern 0x22 from S_LOCAL_DISK */
+    io_read(top_bs, 0x22, IMG_SIZE / 2, IMG_SIZE / 2, 0, IMG_SIZE, false);
+
+    /* unblock top_bs */
+    replication_stop_all(failover, &local_err);
+    g_assert(!local_err);
+
+    teardown_secondary();
+}
+
+static void test_secondary_get_error(void)
+{
+    Error *local_err = NULL;
+    bool failover = true;
+
+    start_secondary();
+    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
+    g_assert(!local_err);
+
+    replication_get_error_all(&local_err);
+    g_assert(!local_err);
+
+    /* unblock top_bs */
+    replication_stop_all(failover, &local_err);
+    g_assert(!local_err);
+
+    teardown_secondary();
+}
+
+int main(int argc, char **argv)
+{
+    int ret;
+    qemu_init_main_loop(&error_fatal);
+    bdrv_init();
+
+    do {} while (g_main_context_iteration(NULL, false));
+    g_test_init(&argc, &argv, NULL);
+
+    prepare_imgs();
+
+    /* Primary */
+    g_test_add_func("/replication/primary/read",    test_primary_read);
+    g_test_add_func("/replication/primary/write",   test_primary_write);
+    g_test_add_func("/replication/primary/start",   test_primary_start);
+    g_test_add_func("/replication/primary/stop",    test_primary_stop);
+    g_test_add_func("/replication/primary/do_checkpoint",
+                    test_primary_do_checkpoint);
+    g_test_add_func("/replication/primary/get_error",
+                    test_primary_get_error);
+
+    /* Secondary */
+    g_test_add_func("/replication/secondary/read",  test_secondary_read);
+    g_test_add_func("/replication/secondary/write", test_secondary_write);
+    g_test_add_func("/replication/secondary/start", test_secondary_start);
+    g_test_add_func("/replication/secondary/stop",  test_secondary_stop);
+    g_test_add_func("/replication/secondary/do_checkpoint",
+                    test_secondary_do_checkpoint);
+    g_test_add_func("/replication/secondary/get_error",
+                    test_secondary_get_error);
+
+    ret = g_test_run();
+
+    cleanup_imgs();
+
+    return ret;
+}