@@ -78,22 +78,6 @@ typedef enum {
RBD_AIO_FLUSH
} RBDAIOCmd;
-typedef struct RBDAIOCB {
- BlockAIOCB common;
- int64_t ret;
- QEMUIOVector *qiov;
- RBDAIOCmd cmd;
- int error;
- struct BDRVRBDState *s;
-} RBDAIOCB;
-
-typedef struct RADOSCB {
- RBDAIOCB *acb;
- struct BDRVRBDState *s;
- int64_t size;
- int64_t ret;
-} RADOSCB;
-
typedef struct BDRVRBDState {
rados_t cluster;
rados_ioctx_t io_ctx;
@@ -105,6 +89,13 @@ typedef struct BDRVRBDState {
uint64_t object_size;
} BDRVRBDState;
+typedef struct RBDTask {
+ BlockDriverState *bs;
+ Coroutine *co;
+ bool complete;
+ int64_t ret;
+} RBDTask;
+
static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
BlockdevOptionsRbd *opts, bool cache,
const char *keypairs, const char *secretid,
@@ -337,13 +328,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
return ret;
}
-static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
-{
- RBDAIOCB *acb = rcb->acb;
- iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
- acb->qiov->size - offs);
-}
-
#ifdef LIBRBD_SUPPORTS_ENCRYPTION
static int qemu_rbd_convert_luks_options(
RbdEncryptionOptionsLUKSBase *luks_opts,
@@ -733,46 +717,6 @@ exit:
return ret;
}
-/*
- * This aio completion is being called from rbd_finish_bh() and runs in qemu
- * BH context.
- */
-static void qemu_rbd_complete_aio(RADOSCB *rcb)
-{
- RBDAIOCB *acb = rcb->acb;
- int64_t r;
-
- r = rcb->ret;
-
- if (acb->cmd != RBD_AIO_READ) {
- if (r < 0) {
- acb->ret = r;
- acb->error = 1;
- } else if (!acb->error) {
- acb->ret = rcb->size;
- }
- } else {
- if (r < 0) {
- qemu_rbd_memset(rcb, 0);
- acb->ret = r;
- acb->error = 1;
- } else if (r < rcb->size) {
- qemu_rbd_memset(rcb, r);
- if (!acb->error) {
- acb->ret = rcb->size;
- }
- } else if (!acb->error) {
- acb->ret = r;
- }
- }
-
- g_free(rcb);
-
- acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
-
- qemu_aio_unref(acb);
-}
-
static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
{
const char **vals;
@@ -1122,89 +1066,59 @@ static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
return 0;
}
-static const AIOCBInfo rbd_aiocb_info = {
- .aiocb_size = sizeof(RBDAIOCB),
-};
-
-static void rbd_finish_bh(void *opaque)
+static void qemu_rbd_finish_bh(void *opaque)
{
- RADOSCB *rcb = opaque;
- qemu_rbd_complete_aio(rcb);
+ RBDTask *task = opaque;
+ task->complete = 1;
+ aio_co_wake(task->co);
}
/*
- * This is the callback function for rbd_aio_read and _write
+ * This is the completion callback function for all rbd aio calls
+ * started from qemu_rbd_start_co().
*
* Note: this function is being called from a non qemu thread so
* we need to be careful about what we do here. Generally we only
* schedule a BH, and do the rest of the io completion handling
- * from rbd_finish_bh() which runs in a qemu context.
+ * from qemu_rbd_finish_bh() which runs in a qemu context.
*/
-static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
+static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
{
- RBDAIOCB *acb = rcb->acb;
-
- rcb->ret = rbd_aio_get_return_value(c);
+ task->ret = rbd_aio_get_return_value(c);
rbd_aio_release(c);
-
- replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
- rbd_finish_bh, rcb);
+ aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
+ qemu_rbd_finish_bh, task);
}
-static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
- int64_t off,
- QEMUIOVector *qiov,
- int64_t size,
- BlockCompletionFunc *cb,
- void *opaque,
- RBDAIOCmd cmd)
+static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
+ uint64_t offset,
+ uint64_t bytes,
+ QEMUIOVector *qiov,
+ int flags,
+ RBDAIOCmd cmd)
{
- RBDAIOCB *acb;
- RADOSCB *rcb = NULL;
+ BDRVRBDState *s = bs->opaque;
+ RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
rbd_completion_t c;
int r;
- BDRVRBDState *s = bs->opaque;
-
- acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
- acb->cmd = cmd;
- acb->qiov = qiov;
- assert(!qiov || qiov->size == size);
-
- rcb = g_new(RADOSCB, 1);
+ assert(!qiov || qiov->size == bytes);
- acb->ret = 0;
- acb->error = 0;
- acb->s = s;
-
- rcb->acb = acb;
- rcb->s = acb->s;
- rcb->size = size;
- r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
+ r = rbd_aio_create_completion(&task,
+ (rbd_callback_t) qemu_rbd_completion_cb, &c);
if (r < 0) {
- goto failed;
+ return r;
}
switch (cmd) {
- case RBD_AIO_WRITE:
- /*
- * RBD APIs don't allow us to write more than actual size, so in order
- * to support growing images, we resize the image before write
- * operations that exceed the current size.
- */
- if (off + size > s->image_size) {
- r = qemu_rbd_resize(bs, off + size);
- if (r < 0) {
- goto failed_completion;
- }
- }
- r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
- break;
case RBD_AIO_READ:
- r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
+ r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
+ break;
+ case RBD_AIO_WRITE:
+ r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
break;
case RBD_AIO_DISCARD:
- r = rbd_aio_discard(s->image, off, size, c);
+ r = rbd_aio_discard(s->image, offset, bytes, c);
break;
case RBD_AIO_FLUSH:
r = rbd_aio_flush(s->image, c);
@@ -1214,44 +1128,69 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
}
if (r < 0) {
- goto failed_completion;
+ error_report("rbd request failed early: cmd %d offset %" PRIu64
+ " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
+ bytes, flags, r, strerror(-r));
+ rbd_aio_release(c);
+ return r;
}
- return &acb->common;
-failed_completion:
- rbd_aio_release(c);
-failed:
- g_free(rcb);
+ while (!task.complete) {
+ qemu_coroutine_yield();
+ }
- qemu_aio_unref(acb);
- return NULL;
+ if (task.ret < 0) {
+ error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
+ PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
+ bytes, flags, task.ret, strerror(-task.ret));
+ return task.ret;
+ }
+
+ /* zero pad short reads */
+ if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
+ qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
+ }
+
+ return 0;
+}
+
+static int
+coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, uint64_t offset,
+ uint64_t bytes, QEMUIOVector *qiov,
+ int flags)
+{
+ return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
}
-static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
- uint64_t offset, uint64_t bytes,
- QEMUIOVector *qiov, int flags,
- BlockCompletionFunc *cb,
- void *opaque)
+static int
+coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, uint64_t offset,
+ uint64_t bytes, QEMUIOVector *qiov,
+ int flags)
{
- return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
- RBD_AIO_READ);
+ BDRVRBDState *s = bs->opaque;
+ /*
+ * RBD APIs don't allow us to write more than actual size, so in order
+ * to support growing images, we resize the image before write
+ * operations that exceed the current size.
+ */
+ if (offset + bytes > s->image_size) {
+ int r = qemu_rbd_resize(bs, offset + bytes);
+ if (r < 0) {
+ return r;
+ }
+ }
+ return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
}
-static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs,
- uint64_t offset, uint64_t bytes,
- QEMUIOVector *qiov, int flags,
- BlockCompletionFunc *cb,
- void *opaque)
+static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
{
- return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
- RBD_AIO_WRITE);
+ return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
}
-static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
- BlockCompletionFunc *cb,
- void *opaque)
+static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
+ int64_t offset, int count)
{
- return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
+ return qemu_rbd_start_co(bs, offset, count, NULL, 0, RBD_AIO_DISCARD);
}
static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
@@ -1450,16 +1389,6 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
return snap_count;
}
-static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
- int64_t offset,
- int bytes,
- BlockCompletionFunc *cb,
- void *opaque)
-{
- return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
- RBD_AIO_DISCARD);
-}
-
static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
Error **errp)
{
@@ -1540,11 +1469,10 @@ static BlockDriver bdrv_rbd = {
.bdrv_co_truncate = qemu_rbd_co_truncate,
.protocol_name = "rbd",
- .bdrv_aio_preadv = qemu_rbd_aio_preadv,
- .bdrv_aio_pwritev = qemu_rbd_aio_pwritev,
-
- .bdrv_aio_flush = qemu_rbd_aio_flush,
- .bdrv_aio_pdiscard = qemu_rbd_aio_pdiscard,
+ .bdrv_co_preadv = qemu_rbd_co_preadv,
+ .bdrv_co_pwritev = qemu_rbd_co_pwritev,
+ .bdrv_co_flush_to_disk = qemu_rbd_co_flush,
+ .bdrv_co_pdiscard = qemu_rbd_co_pdiscard,
.bdrv_snapshot_create = qemu_rbd_snap_create,
.bdrv_snapshot_delete = qemu_rbd_snap_remove,