Message ID | 1306263078-18089-2-git-send-email-josh.durgin@dreamhost.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Apart from two cosmetic issues (see below), I think this patch is ready to replace the old rbd driver. You can add: Reviewed-by: Christian Brunner <chb@muc.de> Regards Christian 2011/5/24 Josh Durgin <josh.durgin@dreamhost.com>: > librbd stacks on top of librados to provide access > to rbd images. > > Using librbd simplifies the qemu code, and allows > qemu to use new versions of the rbd format > with few (if any) changes. > > Signed-off-by: Josh Durgin <josh.durgin@dreamhost.com> > Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net> > --- > block/rbd.c | 790 +++++++++++++++-------------------------------------- > block/rbd_types.h | 71 ----- > configure | 33 +-- > 3 files changed, 224 insertions(+), 670 deletions(-) > delete mode 100644 block/rbd_types.h > > diff --git a/block/rbd.c b/block/rbd.c > index 249a590..1c8e7c7 100644 > --- a/block/rbd.c > +++ b/block/rbd.c > @@ -1,20 +1,22 @@ > /* > * QEMU Block driver for RADOS (Ceph) > * > - * Copyright (C) 2010 Christian Brunner <chb@muc.de> > + * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, > + * Josh Durgin <josh.durgin@dreamhost.com> > * > * This work is licensed under the terms of the GNU GPL, version 2. See > * the COPYING file in the top-level directory. > * > */ > > +#include <inttypes.h> > + > #include "qemu-common.h" > #include "qemu-error.h" > > -#include "rbd_types.h" > #include "block_int.h" > > -#include <rados/librados.h> > +#include <rbd/librbd.h> > > > > @@ -40,6 +42,12 @@ > > #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) > > +#define RBD_MAX_CONF_NAME_SIZE 128 > +#define RBD_MAX_CONF_VAL_SIZE 512 > +#define RBD_MAX_CONF_SIZE 1024 > +#define RBD_MAX_POOL_NAME_SIZE 128 > +#define RBD_MAX_SNAP_NAME_SIZE 128 > + > typedef struct RBDAIOCB { > BlockDriverAIOCB common; > QEMUBH *bh; > @@ -48,7 +56,6 @@ typedef struct RBDAIOCB { > char *bounce; > int write; > int64_t sector_num; > - int aiocnt; > int error; > struct BDRVRBDState *s; > int cancelled; > @@ -59,7 +66,7 @@ typedef struct RADOSCB { > RBDAIOCB *acb; > struct BDRVRBDState *s; > int done; > - int64_t segsize; > + int64_t size; > char *buf; > int ret; > } RADOSCB; > @@ -69,25 +76,22 @@ typedef struct RADOSCB { > > typedef struct BDRVRBDState { > int fds[2]; > - rados_pool_t pool; > - rados_pool_t header_pool; > - char name[RBD_MAX_OBJ_NAME_SIZE]; > - char block_name[RBD_MAX_BLOCK_NAME_SIZE]; > - uint64_t size; > - uint64_t objsize; > + rados_t cluster; > + rados_ioctx_t io_ctx; > + rbd_image_t image; > + char name[RBD_MAX_IMAGE_NAME_SIZE]; > int qemu_aio_count; > + char *snap; > int event_reader_pos; > RADOSCB *event_rcb; > } BDRVRBDState; > > -typedef struct rbd_obj_header_ondisk RbdHeader1; > - > static void rbd_aio_bh_cb(void *opaque); > > -static int rbd_next_tok(char *dst, int dst_len, > - char *src, char delim, > - const char *name, > - char **p) > +static int qemu_rbd_next_tok(char *dst, int dst_len, > + char *src, char delim, > + const char *name, > + char **p) > { > int l; > char *end; > @@ -115,10 +119,10 @@ static int rbd_next_tok(char *dst, int dst_len, > return 0; > } > > -static int rbd_parsename(const char *filename, > - char *pool, int pool_len, > - char *snap, int snap_len, > - char *name, int name_len) > +static int qemu_rbd_parsename(const char *filename, > + char *pool, int pool_len, > + char *snap, int snap_len, > + char *name, int name_len) > { > const char *start; > char *p, *buf; > @@ -131,12 +135,12 @@ static int rbd_parsename(const char *filename, > buf = qemu_strdup(start); > p = buf; > > - ret = rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); > + ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); > if (ret < 0 || !p) { > ret = -EINVAL; > goto done; > } > - ret = rbd_next_tok(name, name_len, p, '@', "object name", &p); > + ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p); > if (ret < 0) { > goto done; > } > @@ -145,123 +149,35 @@ static int rbd_parsename(const char *filename, > goto done; > } > > - ret = rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p); > + ret = qemu_rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p); > > done: > qemu_free(buf); > return ret; > } > > -static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc) > -{ > - uint32_t len = strlen(name); > - uint32_t len_le = cpu_to_le32(len); > - /* total_len = encoding op + name + empty buffer */ > - uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t); > - uint8_t *desc = NULL; > - > - desc = qemu_malloc(total_len); > - > - *tmap_desc = (char *)desc; > - > - *desc = op; > - desc++; > - memcpy(desc, &len_le, sizeof(len_le)); > - desc += sizeof(len_le); > - memcpy(desc, name, len); > - desc += len; > - len = 0; /* no need for endian conversion for 0 */ > - memcpy(desc, &len, sizeof(len)); > - desc += sizeof(len); > - > - return (char *)desc - *tmap_desc; > -} > - > -static void free_tmap_op(char *tmap_desc) > -{ > - qemu_free(tmap_desc); > -} > - > -static int rbd_register_image(rados_pool_t pool, const char *name) > -{ > - char *tmap_desc; > - const char *dir = RBD_DIRECTORY; > - int ret; > - > - ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc); > - if (ret < 0) { > - return ret; > - } > - > - ret = rados_tmap_update(pool, dir, tmap_desc, ret); > - free_tmap_op(tmap_desc); > - > - return ret; > -} > - > -static int touch_rbd_info(rados_pool_t pool, const char *info_oid) > -{ > - int r = rados_write(pool, info_oid, 0, NULL, 0); > - if (r < 0) { > - return r; > - } > - return 0; > -} > - > -static int rbd_assign_bid(rados_pool_t pool, uint64_t *id) > -{ > - uint64_t out[1]; > - const char *info_oid = RBD_INFO; > - > - *id = 0; > - > - int r = touch_rbd_info(pool, info_oid); > - if (r < 0) { > - return r; > - } > - > - r = rados_exec(pool, info_oid, "rbd", "assign_bid", NULL, > - 0, (char *)out, sizeof(out)); > - if (r < 0) { > - return r; > - } > - > - le64_to_cpus(out); > - *id = out[0]; > - > - return 0; > -} > - > -static int rbd_create(const char *filename, QEMUOptionParameter *options) > +static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options) > { > int64_t bytes = 0; > int64_t objsize; > - uint64_t size; > - time_t mtime; > - uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER; > - char pool[RBD_MAX_SEG_NAME_SIZE]; > - char n[RBD_MAX_SEG_NAME_SIZE]; > - char name[RBD_MAX_OBJ_NAME_SIZE]; > - char snap_buf[RBD_MAX_SEG_NAME_SIZE]; > + int obj_order = 0; > + char pool[RBD_MAX_POOL_NAME_SIZE]; > + char name[RBD_MAX_IMAGE_NAME_SIZE]; > + char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; > char *snap = NULL; > - RbdHeader1 header; > - rados_pool_t p; > - uint64_t bid; > - uint32_t hi, lo; > + rados_t cluster; > + rados_ioctx_t io_ctx; > int ret; > > - if (rbd_parsename(filename, > - pool, sizeof(pool), > - snap_buf, sizeof(snap_buf), > - name, sizeof(name)) < 0) { > + if (qemu_rbd_parsename(filename, pool, sizeof(pool), > + snap_buf, sizeof(snap_buf), > + name, sizeof(name)) < 0) { > return -EINVAL; > } > if (snap_buf[0] != '\0') { > snap = snap_buf; > } > > - snprintf(n, sizeof(n), "%s%s", name, RBD_SUFFIX); > - > /* Read out options */ > while (options && options->name) { > if (!strcmp(options->name, BLOCK_OPT_SIZE)) { > @@ -277,82 +193,55 @@ static int rbd_create(const char *filename, QEMUOptionParameter *options) > error_report("obj size too small"); > return -EINVAL; > } > - obj_order = ffs(objsize) - 1; > + obj_order = ffs(objsize) - 1; > } > } > options++; > } > > - memset(&header, 0, sizeof(header)); > - pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT); > - pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE); > - pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION); > - header.image_size = cpu_to_le64(bytes); > - header.options.order = obj_order; > - header.options.crypt_type = RBD_CRYPT_NONE; > - header.options.comp_type = RBD_COMP_NONE; > - header.snap_seq = 0; > - header.snap_count = 0; > - > - if (rados_initialize(0, NULL) < 0) { > + if (rados_create(&cluster, NULL) < 0) { > error_report("error initializing"); > return -EIO; > } > > - if (rados_open_pool(pool, &p)) { > - error_report("error opening pool %s", pool); > - rados_deinitialize(); > + if (rados_conf_read_file(cluster, NULL) < 0) { > + error_report("error reading config file"); > + rados_shutdown(cluster); > return -EIO; > } > > - /* check for existing rbd header file */ > - ret = rados_stat(p, n, &size, &mtime); > - if (ret == 0) { > - ret=-EEXIST; > - goto done; > - } > - > - ret = rbd_assign_bid(p, &bid); > - if (ret < 0) { > - error_report("failed assigning block id"); > - rados_deinitialize(); > + if (rados_connect(cluster) < 0) { > + error_report("error connecting"); > + rados_shutdown(cluster); > return -EIO; > } > - hi = bid >> 32; > - lo = bid & 0xFFFFFFFF; > - snprintf(header.block_name, sizeof(header.block_name), "rb.%x.%x", hi, lo); > > - /* create header file */ > - ret = rados_write(p, n, 0, (const char *)&header, sizeof(header)); > - if (ret < 0) { > - goto done; > + if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) { > + error_report("error opening pool %s", pool); > + rados_shutdown(cluster); > + return -EIO; > } > > - ret = rbd_register_image(p, name); > -done: > - rados_close_pool(p); > - rados_deinitialize(); > + ret = rbd_create(io_ctx, name, bytes, &obj_order); > + rados_ioctx_destroy(io_ctx); > + rados_shutdown(cluster); > > return ret; > } > > /* > - * This aio completion is being called from rbd_aio_event_reader() and > - * runs in qemu context. It schedules a bh, but just in case the aio > + * This aio completion is being called from qemu_rbd_aio_event_reader() > + * and runs in qemu context. It schedules a bh, but just in case the aio > * was not cancelled before. > */ > -static void rbd_complete_aio(RADOSCB *rcb) > +static void qemu_rbd_complete_aio(RADOSCB *rcb) > { > RBDAIOCB *acb = rcb->acb; > int64_t r; > > - acb->aiocnt--; > - > if (acb->cancelled) { > - if (!acb->aiocnt) { > - qemu_vfree(acb->bounce); > - qemu_aio_release(acb); > - } > + qemu_vfree(acb->bounce); > + qemu_aio_release(acb); > goto done; > } > > @@ -363,32 +252,25 @@ static void rbd_complete_aio(RADOSCB *rcb) > acb->ret = r; > acb->error = 1; > } else if (!acb->error) { > - acb->ret += rcb->segsize; > + acb->ret = rcb->size; > } > } else { > - if (r == -ENOENT) { > - memset(rcb->buf, 0, rcb->segsize); > - if (!acb->error) { > - acb->ret += rcb->segsize; > - } > - } else if (r < 0) { > - memset(rcb->buf, 0, rcb->segsize); > + if (r < 0) { > + memset(rcb->buf, 0, rcb->size); > acb->ret = r; > acb->error = 1; > - } else if (r < rcb->segsize) { > - memset(rcb->buf + r, 0, rcb->segsize - r); > + } else if (r < rcb->size) { > + memset(rcb->buf + r, 0, rcb->size - r); > if (!acb->error) { > - acb->ret += rcb->segsize; > + acb->ret = rcb->size; > } > } else if (!acb->error) { > - acb->ret += r; > + acb->ret = r; > } > } > /* Note that acb->bh can be NULL in case where the aio was cancelled */ > - if (!acb->aiocnt) { > - acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); > - qemu_bh_schedule(acb->bh); > - } > + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); > + qemu_bh_schedule(acb->bh); > done: > qemu_free(rcb); > } > @@ -397,7 +279,7 @@ done: > * aio fd read handler. It runs in the qemu context and calls the > * completion handling of completed rados aio operations. > */ > -static void rbd_aio_event_reader(void *opaque) > +static void qemu_rbd_aio_event_reader(void *opaque) > { > BDRVRBDState *s = opaque; > > @@ -413,176 +295,74 @@ static void rbd_aio_event_reader(void *opaque) > s->event_reader_pos += ret; > if (s->event_reader_pos == sizeof(s->event_rcb)) { > s->event_reader_pos = 0; > - rbd_complete_aio(s->event_rcb); > - s->qemu_aio_count --; > + qemu_rbd_complete_aio(s->event_rcb); > + s->qemu_aio_count--; > } > } > } > } while (ret < 0 && errno == EINTR); > } > > -static int rbd_aio_flush_cb(void *opaque) > +static int qemu_rbd_aio_flush_cb(void *opaque) > { > BDRVRBDState *s = opaque; > > return (s->qemu_aio_count > 0); > } > > - > -static int rbd_set_snapc(rados_pool_t pool, const char *snap, RbdHeader1 *header) > -{ > - uint32_t snap_count = le32_to_cpu(header->snap_count); > - rados_snap_t *snaps = NULL; > - rados_snap_t seq; > - uint32_t i; > - uint64_t snap_names_len = le64_to_cpu(header->snap_names_len); > - int r; > - rados_snap_t snapid = 0; > - > - if (snap_count) { > - const char *header_snap = (const char *)&header->snaps[snap_count]; > - const char *end = header_snap + snap_names_len; > - snaps = qemu_malloc(sizeof(rados_snap_t) * header->snap_count); > - > - for (i=0; i < snap_count; i++) { > - snaps[i] = le64_to_cpu(header->snaps[i].id); > - > - if (snap && strcmp(snap, header_snap) == 0) { > - snapid = snaps[i]; > - } > - > - header_snap += strlen(header_snap) + 1; > - if (header_snap > end) { > - error_report("bad header, snapshot list broken"); > - } > - } > - } > - > - if (snap && !snapid) { > - error_report("snapshot not found"); > - qemu_free(snaps); > - return -ENOENT; > - } > - seq = le32_to_cpu(header->snap_seq); > - > - r = rados_set_snap_context(pool, seq, snaps, snap_count); > - > - rados_set_snap(pool, snapid); > - > - qemu_free(snaps); > - > - return r; > -} > - > -#define BUF_READ_START_LEN 4096 > - > -static int rbd_read_header(BDRVRBDState *s, char **hbuf) > -{ > - char *buf = NULL; > - char n[RBD_MAX_SEG_NAME_SIZE]; > - uint64_t len = BUF_READ_START_LEN; > - int r; > - > - snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX); > - > - buf = qemu_malloc(len); > - > - r = rados_read(s->header_pool, n, 0, buf, len); > - if (r < 0) { > - goto failed; > - } > - > - if (r < len) { > - goto done; > - } > - > - qemu_free(buf); > - buf = qemu_malloc(len); > - > - r = rados_stat(s->header_pool, n, &len, NULL); > - if (r < 0) { > - goto failed; > - } > - > - r = rados_read(s->header_pool, n, 0, buf, len); > - if (r < 0) { > - goto failed; > - } > - > -done: > - *hbuf = buf; > - return 0; > - > -failed: > - qemu_free(buf); > - return r; > -} > - > -static int rbd_open(BlockDriverState *bs, const char *filename, int flags) > +static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags) > { > BDRVRBDState *s = bs->opaque; > - RbdHeader1 *header; > - char pool[RBD_MAX_SEG_NAME_SIZE]; > - char snap_buf[RBD_MAX_SEG_NAME_SIZE]; > - char *snap = NULL; > - char *hbuf = NULL; > + char pool[RBD_MAX_POOL_NAME_SIZE]; > + char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; > int r; > > - if (rbd_parsename(filename, pool, sizeof(pool), > - snap_buf, sizeof(snap_buf), > - s->name, sizeof(s->name)) < 0) { > + if (qemu_rbd_parsename(filename, pool, sizeof(pool), > + snap_buf, sizeof(snap_buf), > + s->name, sizeof(s->name)) < 0) { > return -EINVAL; > } > + s->snap = NULL; > if (snap_buf[0] != '\0') { > - snap = snap_buf; > + s->snap = qemu_strdup(snap_buf); > } > > - if ((r = rados_initialize(0, NULL)) < 0) { > + r = rados_create(&s->cluster, NULL); > + if (r < 0) { > error_report("error initializing"); > return r; > } > > - if ((r = rados_open_pool(pool, &s->pool))) { > - error_report("error opening pool %s", pool); > - rados_deinitialize(); > + r = rados_conf_read_file(s->cluster, NULL); > + if (r < 0) { > + error_report("error reading config file"); > + rados_shutdown(s->cluster); > return r; > } > > - if ((r = rados_open_pool(pool, &s->header_pool))) { > - error_report("error opening pool %s", pool); > - rados_deinitialize(); > + r = rados_connect(s->cluster); > + if (r < 0) { > + error_report("error connecting"); > + rados_shutdown(s->cluster); > return r; > } > > - if ((r = rbd_read_header(s, &hbuf)) < 0) { > - error_report("error reading header from %s", s->name); > - goto failed; > - } > - > - if (memcmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) { > - error_report("Invalid header signature"); > - r = -EMEDIUMTYPE; > - goto failed; > - } > - > - if (memcmp(hbuf + 68, RBD_HEADER_VERSION, 8)) { > - error_report("Unknown image version"); > - r = -EMEDIUMTYPE; > - goto failed; > + r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); > + if (r < 0) { > + error_report("error opening pool %s", pool); > + rados_shutdown(s->cluster); > + return r; > } > > - header = (RbdHeader1 *) hbuf; > - s->size = le64_to_cpu(header->image_size); > - s->objsize = 1ULL << header->options.order; > - memcpy(s->block_name, header->block_name, sizeof(header->block_name)); > - > - r = rbd_set_snapc(s->pool, snap, header); > + r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); > if (r < 0) { > - error_report("failed setting snap context: %s", strerror(-r)); > - goto failed; > + error_report("error reading header from %s", s->name); > + rados_ioctx_destroy(s->io_ctx); > + rados_shutdown(s->cluster); > + return r; > } > > - bs->read_only = (snap != NULL); > + bs->read_only = (s->snap != NULL); > > s->event_reader_pos = 0; > r = qemu_pipe(s->fds); > @@ -592,23 +372,20 @@ static int rbd_open(BlockDriverState *bs, const char *filename, int flags) > } > fcntl(s->fds[0], F_SETFL, O_NONBLOCK); > fcntl(s->fds[1], F_SETFL, O_NONBLOCK); > - qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], rbd_aio_event_reader, NULL, > - rbd_aio_flush_cb, NULL, s); > + qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, > + NULL, qemu_rbd_aio_flush_cb, NULL, s); > > - qemu_free(hbuf); > > return 0; > > failed: > - qemu_free(hbuf); > - > - rados_close_pool(s->header_pool); > - rados_close_pool(s->pool); > - rados_deinitialize(); > + rbd_close(s->image); > + rados_ioctx_destroy(s->io_ctx); > + rados_shutdown(s->cluster); > return r; > } > > -static void rbd_close(BlockDriverState *bs) > +static void qemu_rbd_close(BlockDriverState *bs) > { > BDRVRBDState *s = bs->opaque; > > @@ -617,16 +394,17 @@ static void rbd_close(BlockDriverState *bs) > qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL, > NULL); > > - rados_close_pool(s->header_pool); > - rados_close_pool(s->pool); > - rados_deinitialize(); > + rbd_close(s->image); > + rados_ioctx_destroy(s->io_ctx); > + qemu_free(s->snap); > + rados_shutdown(s->cluster); > } > > /* > * Cancel aio. Since we don't reference acb in a non qemu threads, > * it is safe to access it here. > */ > -static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) > +static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) > { > RBDAIOCB *acb = (RBDAIOCB *) blockacb; > acb->cancelled = 1; > @@ -634,39 +412,28 @@ static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) > > static AIOPool rbd_aio_pool = { > .aiocb_size = sizeof(RBDAIOCB), > - .cancel = rbd_aio_cancel, > + .cancel = qemu_rbd_aio_cancel, > }; > > -/* > - * This is the callback function for rados_aio_read and _write > - * > - * Note: this function is being called from a non qemu thread so > - * we need to be careful about what we do here. Generally we only > - * write to the block notification pipe, and do the rest of the > - * io completion handling from rbd_aio_event_reader() which > - * runs in a qemu context. > - */ > -static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) > +static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) > { > - int ret; > - rcb->ret = rados_aio_get_return_value(c); > - rados_aio_release(c); > + int ret = 0; > while (1) { > fd_set wfd; > - int fd = rcb->s->fds[RBD_FD_WRITE]; > + int fd = s->fds[RBD_FD_WRITE]; > > - /* send the rcb pointer to the qemu thread that is responsible > - for the aio completion. Must do it in a qemu thread context */ > + /* send the op pointer to the qemu thread that is responsible > + for the aio/op completion. Must do it in a qemu thread context */ > ret = write(fd, (void *)&rcb, sizeof(rcb)); > if (ret >= 0) { > break; > } > if (errno == EINTR) { > continue; > - } > + } > if (errno != EAGAIN) { > break; > - } > + } > > FD_ZERO(&wfd); > FD_SET(fd, &wfd); > @@ -675,13 +442,31 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) > } while (ret < 0 && errno == EINTR); > } > > + return ret; > +} > + > +/* > + * This is the callback function for rbd_aio_read and _write > + * > + * Note: this function is being called from a non qemu thread so > + * we need to be careful about what we do here. Generally we only > + * write to the block notification pipe, and do the rest of the > + * io completion handling from qemu_rbd_aio_event_reader() which > + * runs in a qemu context. > + */ > +static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) > +{ > + int ret; > + rcb->ret = rbd_aio_get_return_value(c); > + rbd_aio_release(c); > + ret = qemu_rbd_send_pipe(rcb->s, rcb); > if (ret < 0) { > - error_report("failed writing to acb->s->fds\n"); > + error_report("failed writing to acb->s->fds"); > qemu_free(rcb); > } > } > > -/* Callback when all queued rados_aio requests are complete */ > +/* Callback when all queued rbd_aio requests are complete */ > > static void rbd_aio_bh_cb(void *opaque) > { > @@ -707,9 +492,7 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, > { > RBDAIOCB *acb; > RADOSCB *rcb; > - rados_completion_t c; > - char n[RBD_MAX_SEG_NAME_SIZE]; > - int64_t segnr, segoffs, segsize, last_segnr; > + rbd_completion_t c; > int64_t off, size; > char *buf; > > @@ -719,7 +502,6 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, > acb->write = write; > acb->qiov = qiov; > acb->bounce = qemu_blockalign(bs, qiov->size); > - acb->aiocnt = 0; > acb->ret = 0; > acb->error = 0; > acb->s = s; > @@ -734,95 +516,81 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, > > off = sector_num * BDRV_SECTOR_SIZE; > size = nb_sectors * BDRV_SECTOR_SIZE; > - segnr = off / s->objsize; > - segoffs = off % s->objsize; > - segsize = s->objsize - segoffs; > - > - last_segnr = ((off + size - 1) / s->objsize); > - acb->aiocnt = (last_segnr - segnr) + 1; > > - s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */ > + s->qemu_aio_count++; /* All the RADOSCB */ > > - while (size > 0) { > - if (size < segsize) { > - segsize = size; > - } > - > - snprintf(n, sizeof(n), "%s.%012" PRIx64, s->block_name, > - segnr); > - > - rcb = qemu_malloc(sizeof(RADOSCB)); > - rcb->done = 0; > - rcb->acb = acb; > - rcb->segsize = segsize; > - rcb->buf = buf; > - rcb->s = acb->s; > - > - if (write) { > - rados_aio_create_completion(rcb, NULL, > - (rados_callback_t) rbd_finish_aiocb, > - &c); > - rados_aio_write(s->pool, n, segoffs, buf, segsize, c); > - } else { > - rados_aio_create_completion(rcb, > - (rados_callback_t) rbd_finish_aiocb, > - NULL, &c); > - rados_aio_read(s->pool, n, segoffs, buf, segsize, c); > - } > + rcb = qemu_malloc(sizeof(RADOSCB)); > + rcb->done = 0; > + rcb->acb = acb; > + rcb->buf = buf; > + rcb->s = acb->s; > + rcb->size = size; > > - buf += segsize; > - size -= segsize; > - segoffs = 0; > - segsize = s->objsize; > - segnr++; > + if (write) { > + rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); > + rbd_aio_write(s->image, off, size, buf, c); > + } else { > + rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); > + rbd_aio_read(s->image, off, size, buf, c); > } > > return &acb->common; > } > > -static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs, > - int64_t sector_num, QEMUIOVector * qiov, > - int nb_sectors, > - BlockDriverCompletionFunc * cb, > - void *opaque) > +static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, > + int64_t sector_num, > + QEMUIOVector *qiov, > + int nb_sectors, > + BlockDriverCompletionFunc *cb, > + void *opaque) > { > return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0); > } > > -static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs, > - int64_t sector_num, QEMUIOVector * qiov, > - int nb_sectors, > - BlockDriverCompletionFunc * cb, > - void *opaque) > +static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, > + int64_t sector_num, > + QEMUIOVector *qiov, > + int nb_sectors, > + BlockDriverCompletionFunc *cb, > + void *opaque) > { > return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1); > } > > -static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi) > +static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) > { > BDRVRBDState *s = bs->opaque; > - bdi->cluster_size = s->objsize; > + rbd_image_info_t info; > + int r; > + > + r = rbd_stat(s->image, &info, sizeof(info)); > + if (r < 0) { > + return r; > + } > + > + bdi->cluster_size = info.obj_size; > return 0; > } > > -static int64_t rbd_getlength(BlockDriverState * bs) > +static int64_t qemu_rbd_getlength(BlockDriverState *bs) > { > BDRVRBDState *s = bs->opaque; > + rbd_image_info_t info; > + int r; > > - return s->size; > + r = rbd_stat(s->image, &info, sizeof(info)); > + if (r < 0) { > + return r; > + } > + > + return info.size; > } > > -static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) > +static int qemu_rbd_snap_create(BlockDriverState *bs, > + QEMUSnapshotInfo *sn_info) > { > BDRVRBDState *s = bs->opaque; > - char inbuf[512], outbuf[128]; > - uint64_t snap_id; > int r; > - char *p = inbuf; > - char *end = inbuf + sizeof(inbuf); > - char n[RBD_MAX_SEG_NAME_SIZE]; > - char *hbuf = NULL; > - RbdHeader1 *header; > > if (sn_info->name[0] == '\0') { > return -EINVAL; /* we need a name for rbd snapshots */ > @@ -841,185 +609,59 @@ static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) > return -ERANGE; > } > > - r = rados_selfmanaged_snap_create(s->header_pool, &snap_id); > + r = rbd_snap_create(s->image, sn_info->name); > if (r < 0) { > - error_report("failed to create snap id: %s", strerror(-r)); > + error_report("failed to create snap: %s", strerror(-r)); > return r; > } > > - *(uint32_t *)p = strlen(sn_info->name); > - cpu_to_le32s((uint32_t *)p); > - p += sizeof(uint32_t); > - strncpy(p, sn_info->name, end - p); > - p += strlen(p); > - if (p + sizeof(snap_id) > end) { > - error_report("invalid input parameter"); > - return -EINVAL; > - } > - > - *(uint64_t *)p = snap_id; > - cpu_to_le64s((uint64_t *)p); > - > - snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX); > - > - r = rados_exec(s->header_pool, n, "rbd", "snap_add", inbuf, > - sizeof(inbuf), outbuf, sizeof(outbuf)); > - if (r < 0) { > - error_report("rbd.snap_add execution failed failed: %s", strerror(-r)); > - return r; > - } > - > - sprintf(sn_info->id_str, "%s", sn_info->name); > - > - r = rbd_read_header(s, &hbuf); > - if (r < 0) { > - error_report("failed reading header: %s", strerror(-r)); > - return r; > - } > - > - header = (RbdHeader1 *) hbuf; > - r = rbd_set_snapc(s->pool, sn_info->name, header); > - if (r < 0) { > - error_report("failed setting snap context: %s", strerror(-r)); > - goto failed; > - } > - > - return 0; > - > -failed: > - qemu_free(header); > - return r; > -} > - > -static int decode32(char **p, const char *end, uint32_t *v) > -{ > - if (*p + 4 > end) { > - return -ERANGE; > - } > - > - *v = *(uint32_t *)(*p); > - le32_to_cpus(v); > - *p += 4; > return 0; > } > > -static int decode64(char **p, const char *end, uint64_t *v) > -{ > - if (*p + 8 > end) { > - return -ERANGE; > - } > - > - *v = *(uint64_t *)(*p); > - le64_to_cpus(v); > - *p += 8; > - return 0; > -} > - > -static int decode_str(char **p, const char *end, char **s) > -{ > - uint32_t len; > - int r; > - > - if ((r = decode32(p, end, &len)) < 0) { > - return r; > - } > - > - *s = qemu_malloc(len + 1); > - memcpy(*s, *p, len); > - *p += len; > - (*s)[len] = '\0'; > - > - return len; > -} > - > -static int rbd_snap_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab) > +static int qemu_rbd_snap_list(BlockDriverState *bs, > + QEMUSnapshotInfo **psn_tab) > { > BDRVRBDState *s = bs->opaque; > - char n[RBD_MAX_SEG_NAME_SIZE]; > QEMUSnapshotInfo *sn_info, *sn_tab = NULL; > - RbdHeader1 *header; > - char *hbuf = NULL; > - char *outbuf = NULL, *end, *buf; > - uint64_t len; > - uint64_t snap_seq; > - uint32_t snap_count; > int r, i; > + rbd_snap_info_t *snaps; > + int max_snaps = 100, snap_count; I think it would be nicer to have this defined at the beginning (e.g. RBD_MAX_SNAPS). > > - /* read header to estimate how much space we need to read the snap > - * list */ > - if ((r = rbd_read_header(s, &hbuf)) < 0) { > - goto done_err; > - } > - header = (RbdHeader1 *)hbuf; > - len = le64_to_cpu(header->snap_names_len); > - len += 1024; /* should have already been enough, but new snapshots might > - already been created since we read the header. just allocate > - a bit more, so that in most cases it'll suffice anyway */ > - qemu_free(hbuf); > - > - snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX); > - while (1) { > - qemu_free(outbuf); > - outbuf = qemu_malloc(len); > - > - r = rados_exec(s->header_pool, n, "rbd", "snap_list", NULL, 0, > - outbuf, len); > + do { > + snaps = qemu_malloc(sizeof(*snaps) * max_snaps); > + r = rbd_snap_list(s->image, snaps, &max_snaps); > if (r < 0) { > - error_report("rbd.snap_list execution failed failed: %s", strerror(-r)); > - goto done_err; > + qemu_free(snaps); > } > - if (r != len) { > - break; > - } > + } while (r == -ERANGE); > > - /* if we're here, we probably raced with some snaps creation */ > - len *= 2; > + if (r <= 0) { > + return r; > } > - buf = outbuf; > - end = buf + len; > > - if ((r = decode64(&buf, end, &snap_seq)) < 0) { > - goto done_err; > - } > - if ((r = decode32(&buf, end, &snap_count)) < 0) { > - goto done_err; > - } > + snap_count = r; This isn't really necessary. We could just use snap_count above (or r below). > > sn_tab = qemu_mallocz(snap_count * sizeof(QEMUSnapshotInfo)); > - for (i = 0; i < snap_count; i++) { > - uint64_t id, image_size; > - char *snap_name; > > - if ((r = decode64(&buf, end, &id)) < 0) { > - goto done_err; > - } > - if ((r = decode64(&buf, end, &image_size)) < 0) { > - goto done_err; > - } > - if ((r = decode_str(&buf, end, &snap_name)) < 0) { > - goto done_err; > - } > + for (i = 0; i < snap_count; i++) { > + const char *snap_name = snaps[i].name; > > sn_info = sn_tab + i; > pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); > pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); > - qemu_free(snap_name); > > - sn_info->vm_state_size = image_size; > + sn_info->vm_state_size = snaps[i].size; > sn_info->date_sec = 0; > sn_info->date_nsec = 0; > sn_info->vm_clock_nsec = 0; > } > + rbd_snap_list_end(snaps); > + > *psn_tab = sn_tab; > - qemu_free(outbuf); > return snap_count; > -done_err: > - qemu_free(sn_tab); > - qemu_free(outbuf); > - return r; > } > > -static QEMUOptionParameter rbd_create_options[] = { > +static QEMUOptionParameter qemu_rbd_create_options[] = { > { > .name = BLOCK_OPT_SIZE, > .type = OPT_SIZE, > @@ -1036,19 +678,19 @@ static QEMUOptionParameter rbd_create_options[] = { > static BlockDriver bdrv_rbd = { > .format_name = "rbd", > .instance_size = sizeof(BDRVRBDState), > - .bdrv_file_open = rbd_open, > - .bdrv_close = rbd_close, > - .bdrv_create = rbd_create, > - .bdrv_get_info = rbd_getinfo, > - .create_options = rbd_create_options, > - .bdrv_getlength = rbd_getlength, > + .bdrv_file_open = qemu_rbd_open, > + .bdrv_close = qemu_rbd_close, > + .bdrv_create = qemu_rbd_create, > + .bdrv_get_info = qemu_rbd_getinfo, > + .create_options = qemu_rbd_create_options, > + .bdrv_getlength = qemu_rbd_getlength, > .protocol_name = "rbd", > > - .bdrv_aio_readv = rbd_aio_readv, > - .bdrv_aio_writev = rbd_aio_writev, > + .bdrv_aio_readv = qemu_rbd_aio_readv, > + .bdrv_aio_writev = qemu_rbd_aio_writev, > > - .bdrv_snapshot_create = rbd_snap_create, > - .bdrv_snapshot_list = rbd_snap_list, > + .bdrv_snapshot_create = qemu_rbd_snap_create, > + .bdrv_snapshot_list = qemu_rbd_snap_list, > }; > > static void bdrv_rbd_init(void) > diff --git a/block/rbd_types.h b/block/rbd_types.h > deleted file mode 100644 > index f4cca99..0000000 > --- a/block/rbd_types.h > +++ /dev/null > @@ -1,71 +0,0 @@ > -/* > - * Ceph - scalable distributed file system > - * > - * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net> > - * > - * This is free software; you can redistribute it and/or > - * modify it under the terms of the GNU Lesser General Public > - * License version 2.1, as published by the Free Software > - * Foundation. See file COPYING.LIB. > - * > - */ > - > -#ifndef CEPH_RBD_TYPES_H > -#define CEPH_RBD_TYPES_H > - > - > -/* > - * rbd image 'foo' consists of objects > - * foo.rbd - image metadata > - * foo.00000000 > - * foo.00000001 > - * ... - data > - */ > - > -#define RBD_SUFFIX ".rbd" > -#define RBD_DIRECTORY "rbd_directory" > -#define RBD_INFO "rbd_info" > - > -#define RBD_DEFAULT_OBJ_ORDER 22 /* 4MB */ > - > -#define RBD_MAX_OBJ_NAME_SIZE 96 > -#define RBD_MAX_BLOCK_NAME_SIZE 24 > -#define RBD_MAX_SEG_NAME_SIZE 128 > - > -#define RBD_COMP_NONE 0 > -#define RBD_CRYPT_NONE 0 > - > -#define RBD_HEADER_TEXT "<<< Rados Block Device Image >>>\n" > -#define RBD_HEADER_SIGNATURE "RBD" > -#define RBD_HEADER_VERSION "001.005" > - > -struct rbd_info { > - uint64_t max_id; > -} __attribute__ ((packed)); > - > -struct rbd_obj_snap_ondisk { > - uint64_t id; > - uint64_t image_size; > -} __attribute__((packed)); > - > -struct rbd_obj_header_ondisk { > - char text[40]; > - char block_name[RBD_MAX_BLOCK_NAME_SIZE]; > - char signature[4]; > - char version[8]; > - struct { > - uint8_t order; > - uint8_t crypt_type; > - uint8_t comp_type; > - uint8_t unused; > - } __attribute__((packed)) options; > - uint64_t image_size; > - uint64_t snap_seq; > - uint32_t snap_count; > - uint32_t reserved; > - uint64_t snap_names_len; > - struct rbd_obj_snap_ondisk snaps[0]; > -} __attribute__((packed)); > - > - > -#endif > diff --git a/configure b/configure > index a318d37..378c238 100755 > --- a/configure > +++ b/configure > @@ -1917,41 +1917,24 @@ fi > if test "$rbd" != "no" ; then > cat > $TMPC <<EOF > #include <stdio.h> > -#include <rados/librados.h> > -int main(void) { rados_initialize(0, NULL); return 0; } > -EOF > - rbd_libs="-lrados" > - if compile_prog "" "$rbd_libs" ; then > - librados_too_old=no > - cat > $TMPC <<EOF > -#include <stdio.h> > -#include <rados/librados.h> > -#ifndef CEPH_OSD_TMAP_SET > -#error missing CEPH_OSD_TMAP_SET > -#endif > +#include <rbd/librbd.h> > int main(void) { > - int (*func)(const rados_pool_t pool, uint64_t *snapid) = rados_selfmanaged_snap_create; > - rados_initialize(0, NULL); > + rados_t cluster; > + rados_create(&cluster, NULL); > return 0; > } > EOF > - if compile_prog "" "$rbd_libs" ; then > - rbd=yes > - libs_tools="$rbd_libs $libs_tools" > - libs_softmmu="$rbd_libs $libs_softmmu" > - else > - rbd=no > - librados_too_old=yes > - fi > + rbd_libs="-lrbd -lrados" > + if compile_prog "" "$rbd_libs" ; then > + rbd=yes > + libs_tools="$rbd_libs $libs_tools" > + libs_softmmu="$rbd_libs $libs_softmmu" > else > if test "$rbd" = "yes" ; then > feature_not_found "rados block device" > fi > rbd=no > fi > - if test "$librados_too_old" = "yes" ; then > - echo "-> Your librados version is too old - upgrade needed to have rbd support" > - fi > fi > > ########################################## > -- > 1.7.2.3 > > > -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
On 05/25/2011 12:18 PM, Christian Brunner wrote: > Apart from two cosmetic issues (see below), I think this patch is > ready to replace the old rbd driver. You can add: > > Reviewed-by: Christian Brunner<chb@muc.de> > > Regards > Christian Thanks, I fixed these in v5. Josh > > 2011/5/24 Josh Durgin<josh.durgin@dreamhost.com>: >> librbd stacks on top of librados to provide access >> to rbd images. >> >> Using librbd simplifies the qemu code, and allows >> qemu to use new versions of the rbd format >> with few (if any) changes. >> >> Signed-off-by: Josh Durgin<josh.durgin@dreamhost.com> >> Signed-off-by: Yehuda Sadeh<yehuda@hq.newdream.net> >> --- >> block/rbd.c | 790 +++++++++++++++-------------------------------------- >> block/rbd_types.h | 71 ----- >> configure | 33 +-- >> 3 files changed, 224 insertions(+), 670 deletions(-) >> delete mode 100644 block/rbd_types.h >> >> diff --git a/block/rbd.c b/block/rbd.c >> index 249a590..1c8e7c7 100644 >> --- a/block/rbd.c >> +++ b/block/rbd.c >> @@ -1,20 +1,22 @@ >> /* >> * QEMU Block driver for RADOS (Ceph) >> * >> - * Copyright (C) 2010 Christian Brunner<chb@muc.de> >> + * Copyright (C) 2010-2011 Christian Brunner<chb@muc.de>, >> + * Josh Durgin<josh.durgin@dreamhost.com> >> * >> * This work is licensed under the terms of the GNU GPL, version 2. See >> * the COPYING file in the top-level directory. >> * >> */ >> >> +#include<inttypes.h> >> + >> #include "qemu-common.h" >> #include "qemu-error.h" >> >> -#include "rbd_types.h" >> #include "block_int.h" >> >> -#include<rados/librados.h> >> +#include<rbd/librbd.h> >> >> >> >> @@ -40,6 +42,12 @@ >> >> #define OBJ_MAX_SIZE (1UL<< OBJ_DEFAULT_OBJ_ORDER) >> >> +#define RBD_MAX_CONF_NAME_SIZE 128 >> +#define RBD_MAX_CONF_VAL_SIZE 512 >> +#define RBD_MAX_CONF_SIZE 1024 >> +#define RBD_MAX_POOL_NAME_SIZE 128 >> +#define RBD_MAX_SNAP_NAME_SIZE 128 >> + >> typedef struct RBDAIOCB { >> BlockDriverAIOCB common; >> QEMUBH *bh; >> @@ -48,7 +56,6 @@ typedef struct RBDAIOCB { >> char *bounce; >> int write; >> int64_t sector_num; >> - int aiocnt; >> int error; >> struct BDRVRBDState *s; >> int cancelled; >> @@ -59,7 +66,7 @@ typedef struct RADOSCB { >> RBDAIOCB *acb; >> struct BDRVRBDState *s; >> int done; >> - int64_t segsize; >> + int64_t size; >> char *buf; >> int ret; >> } RADOSCB; >> @@ -69,25 +76,22 @@ typedef struct RADOSCB { >> >> typedef struct BDRVRBDState { >> int fds[2]; >> - rados_pool_t pool; >> - rados_pool_t header_pool; >> - char name[RBD_MAX_OBJ_NAME_SIZE]; >> - char block_name[RBD_MAX_BLOCK_NAME_SIZE]; >> - uint64_t size; >> - uint64_t objsize; >> + rados_t cluster; >> + rados_ioctx_t io_ctx; >> + rbd_image_t image; >> + char name[RBD_MAX_IMAGE_NAME_SIZE]; >> int qemu_aio_count; >> + char *snap; >> int event_reader_pos; >> RADOSCB *event_rcb; >> } BDRVRBDState; >> >> -typedef struct rbd_obj_header_ondisk RbdHeader1; >> - >> static void rbd_aio_bh_cb(void *opaque); >> >> -static int rbd_next_tok(char *dst, int dst_len, >> - char *src, char delim, >> - const char *name, >> - char **p) >> +static int qemu_rbd_next_tok(char *dst, int dst_len, >> + char *src, char delim, >> + const char *name, >> + char **p) >> { >> int l; >> char *end; >> @@ -115,10 +119,10 @@ static int rbd_next_tok(char *dst, int dst_len, >> return 0; >> } >> >> -static int rbd_parsename(const char *filename, >> - char *pool, int pool_len, >> - char *snap, int snap_len, >> - char *name, int name_len) >> +static int qemu_rbd_parsename(const char *filename, >> + char *pool, int pool_len, >> + char *snap, int snap_len, >> + char *name, int name_len) >> { >> const char *start; >> char *p, *buf; >> @@ -131,12 +135,12 @@ static int rbd_parsename(const char *filename, >> buf = qemu_strdup(start); >> p = buf; >> >> - ret = rbd_next_tok(pool, pool_len, p, '/', "pool name",&p); >> + ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name",&p); >> if (ret< 0 || !p) { >> ret = -EINVAL; >> goto done; >> } >> - ret = rbd_next_tok(name, name_len, p, '@', "object name",&p); >> + ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name",&p); >> if (ret< 0) { >> goto done; >> } >> @@ -145,123 +149,35 @@ static int rbd_parsename(const char *filename, >> goto done; >> } >> >> - ret = rbd_next_tok(snap, snap_len, p, '\0', "snap name",&p); >> + ret = qemu_rbd_next_tok(snap, snap_len, p, '\0', "snap name",&p); >> >> done: >> qemu_free(buf); >> return ret; >> } >> >> -static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc) >> -{ >> - uint32_t len = strlen(name); >> - uint32_t len_le = cpu_to_le32(len); >> - /* total_len = encoding op + name + empty buffer */ >> - uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t); >> - uint8_t *desc = NULL; >> - >> - desc = qemu_malloc(total_len); >> - >> - *tmap_desc = (char *)desc; >> - >> - *desc = op; >> - desc++; >> - memcpy(desc,&len_le, sizeof(len_le)); >> - desc += sizeof(len_le); >> - memcpy(desc, name, len); >> - desc += len; >> - len = 0; /* no need for endian conversion for 0 */ >> - memcpy(desc,&len, sizeof(len)); >> - desc += sizeof(len); >> - >> - return (char *)desc - *tmap_desc; >> -} >> - >> -static void free_tmap_op(char *tmap_desc) >> -{ >> - qemu_free(tmap_desc); >> -} >> - >> -static int rbd_register_image(rados_pool_t pool, const char *name) >> -{ >> - char *tmap_desc; >> - const char *dir = RBD_DIRECTORY; >> - int ret; >> - >> - ret = create_tmap_op(CEPH_OSD_TMAP_SET, name,&tmap_desc); >> - if (ret< 0) { >> - return ret; >> - } >> - >> - ret = rados_tmap_update(pool, dir, tmap_desc, ret); >> - free_tmap_op(tmap_desc); >> - >> - return ret; >> -} >> - >> -static int touch_rbd_info(rados_pool_t pool, const char *info_oid) >> -{ >> - int r = rados_write(pool, info_oid, 0, NULL, 0); >> - if (r< 0) { >> - return r; >> - } >> - return 0; >> -} >> - >> -static int rbd_assign_bid(rados_pool_t pool, uint64_t *id) >> -{ >> - uint64_t out[1]; >> - const char *info_oid = RBD_INFO; >> - >> - *id = 0; >> - >> - int r = touch_rbd_info(pool, info_oid); >> - if (r< 0) { >> - return r; >> - } >> - >> - r = rados_exec(pool, info_oid, "rbd", "assign_bid", NULL, >> - 0, (char *)out, sizeof(out)); >> - if (r< 0) { >> - return r; >> - } >> - >> - le64_to_cpus(out); >> - *id = out[0]; >> - >> - return 0; >> -} >> - >> -static int rbd_create(const char *filename, QEMUOptionParameter *options) >> +static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options) >> { >> int64_t bytes = 0; >> int64_t objsize; >> - uint64_t size; >> - time_t mtime; >> - uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER; >> - char pool[RBD_MAX_SEG_NAME_SIZE]; >> - char n[RBD_MAX_SEG_NAME_SIZE]; >> - char name[RBD_MAX_OBJ_NAME_SIZE]; >> - char snap_buf[RBD_MAX_SEG_NAME_SIZE]; >> + int obj_order = 0; >> + char pool[RBD_MAX_POOL_NAME_SIZE]; >> + char name[RBD_MAX_IMAGE_NAME_SIZE]; >> + char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; >> char *snap = NULL; >> - RbdHeader1 header; >> - rados_pool_t p; >> - uint64_t bid; >> - uint32_t hi, lo; >> + rados_t cluster; >> + rados_ioctx_t io_ctx; >> int ret; >> >> - if (rbd_parsename(filename, >> - pool, sizeof(pool), >> - snap_buf, sizeof(snap_buf), >> - name, sizeof(name))< 0) { >> + if (qemu_rbd_parsename(filename, pool, sizeof(pool), >> + snap_buf, sizeof(snap_buf), >> + name, sizeof(name))< 0) { >> return -EINVAL; >> } >> if (snap_buf[0] != '\0') { >> snap = snap_buf; >> } >> >> - snprintf(n, sizeof(n), "%s%s", name, RBD_SUFFIX); >> - >> /* Read out options */ >> while (options&& options->name) { >> if (!strcmp(options->name, BLOCK_OPT_SIZE)) { >> @@ -277,82 +193,55 @@ static int rbd_create(const char *filename, QEMUOptionParameter *options) >> error_report("obj size too small"); >> return -EINVAL; >> } >> - obj_order = ffs(objsize) - 1; >> + obj_order = ffs(objsize) - 1; >> } >> } >> options++; >> } >> >> - memset(&header, 0, sizeof(header)); >> - pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT); >> - pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE); >> - pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION); >> - header.image_size = cpu_to_le64(bytes); >> - header.options.order = obj_order; >> - header.options.crypt_type = RBD_CRYPT_NONE; >> - header.options.comp_type = RBD_COMP_NONE; >> - header.snap_seq = 0; >> - header.snap_count = 0; >> - >> - if (rados_initialize(0, NULL)< 0) { >> + if (rados_create(&cluster, NULL)< 0) { >> error_report("error initializing"); >> return -EIO; >> } >> >> - if (rados_open_pool(pool,&p)) { >> - error_report("error opening pool %s", pool); >> - rados_deinitialize(); >> + if (rados_conf_read_file(cluster, NULL)< 0) { >> + error_report("error reading config file"); >> + rados_shutdown(cluster); >> return -EIO; >> } >> >> - /* check for existing rbd header file */ >> - ret = rados_stat(p, n,&size,&mtime); >> - if (ret == 0) { >> - ret=-EEXIST; >> - goto done; >> - } >> - >> - ret = rbd_assign_bid(p,&bid); >> - if (ret< 0) { >> - error_report("failed assigning block id"); >> - rados_deinitialize(); >> + if (rados_connect(cluster)< 0) { >> + error_report("error connecting"); >> + rados_shutdown(cluster); >> return -EIO; >> } >> - hi = bid>> 32; >> - lo = bid& 0xFFFFFFFF; >> - snprintf(header.block_name, sizeof(header.block_name), "rb.%x.%x", hi, lo); >> >> - /* create header file */ >> - ret = rados_write(p, n, 0, (const char *)&header, sizeof(header)); >> - if (ret< 0) { >> - goto done; >> + if (rados_ioctx_create(cluster, pool,&io_ctx)< 0) { >> + error_report("error opening pool %s", pool); >> + rados_shutdown(cluster); >> + return -EIO; >> } >> >> - ret = rbd_register_image(p, name); >> -done: >> - rados_close_pool(p); >> - rados_deinitialize(); >> + ret = rbd_create(io_ctx, name, bytes,&obj_order); >> + rados_ioctx_destroy(io_ctx); >> + rados_shutdown(cluster); >> >> return ret; >> } >> >> /* >> - * This aio completion is being called from rbd_aio_event_reader() and >> - * runs in qemu context. It schedules a bh, but just in case the aio >> + * This aio completion is being called from qemu_rbd_aio_event_reader() >> + * and runs in qemu context. It schedules a bh, but just in case the aio >> * was not cancelled before. >> */ >> -static void rbd_complete_aio(RADOSCB *rcb) >> +static void qemu_rbd_complete_aio(RADOSCB *rcb) >> { >> RBDAIOCB *acb = rcb->acb; >> int64_t r; >> >> - acb->aiocnt--; >> - >> if (acb->cancelled) { >> - if (!acb->aiocnt) { >> - qemu_vfree(acb->bounce); >> - qemu_aio_release(acb); >> - } >> + qemu_vfree(acb->bounce); >> + qemu_aio_release(acb); >> goto done; >> } >> >> @@ -363,32 +252,25 @@ static void rbd_complete_aio(RADOSCB *rcb) >> acb->ret = r; >> acb->error = 1; >> } else if (!acb->error) { >> - acb->ret += rcb->segsize; >> + acb->ret = rcb->size; >> } >> } else { >> - if (r == -ENOENT) { >> - memset(rcb->buf, 0, rcb->segsize); >> - if (!acb->error) { >> - acb->ret += rcb->segsize; >> - } >> - } else if (r< 0) { >> - memset(rcb->buf, 0, rcb->segsize); >> + if (r< 0) { >> + memset(rcb->buf, 0, rcb->size); >> acb->ret = r; >> acb->error = 1; >> - } else if (r< rcb->segsize) { >> - memset(rcb->buf + r, 0, rcb->segsize - r); >> + } else if (r< rcb->size) { >> + memset(rcb->buf + r, 0, rcb->size - r); >> if (!acb->error) { >> - acb->ret += rcb->segsize; >> + acb->ret = rcb->size; >> } >> } else if (!acb->error) { >> - acb->ret += r; >> + acb->ret = r; >> } >> } >> /* Note that acb->bh can be NULL in case where the aio was cancelled */ >> - if (!acb->aiocnt) { >> - acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); >> - qemu_bh_schedule(acb->bh); >> - } >> + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); >> + qemu_bh_schedule(acb->bh); >> done: >> qemu_free(rcb); >> } >> @@ -397,7 +279,7 @@ done: >> * aio fd read handler. It runs in the qemu context and calls the >> * completion handling of completed rados aio operations. >> */ >> -static void rbd_aio_event_reader(void *opaque) >> +static void qemu_rbd_aio_event_reader(void *opaque) >> { >> BDRVRBDState *s = opaque; >> >> @@ -413,176 +295,74 @@ static void rbd_aio_event_reader(void *opaque) >> s->event_reader_pos += ret; >> if (s->event_reader_pos == sizeof(s->event_rcb)) { >> s->event_reader_pos = 0; >> - rbd_complete_aio(s->event_rcb); >> - s->qemu_aio_count --; >> + qemu_rbd_complete_aio(s->event_rcb); >> + s->qemu_aio_count--; >> } >> } >> } >> } while (ret< 0&& errno == EINTR); >> } >> >> -static int rbd_aio_flush_cb(void *opaque) >> +static int qemu_rbd_aio_flush_cb(void *opaque) >> { >> BDRVRBDState *s = opaque; >> >> return (s->qemu_aio_count> 0); >> } >> >> - >> -static int rbd_set_snapc(rados_pool_t pool, const char *snap, RbdHeader1 *header) >> -{ >> - uint32_t snap_count = le32_to_cpu(header->snap_count); >> - rados_snap_t *snaps = NULL; >> - rados_snap_t seq; >> - uint32_t i; >> - uint64_t snap_names_len = le64_to_cpu(header->snap_names_len); >> - int r; >> - rados_snap_t snapid = 0; >> - >> - if (snap_count) { >> - const char *header_snap = (const char *)&header->snaps[snap_count]; >> - const char *end = header_snap + snap_names_len; >> - snaps = qemu_malloc(sizeof(rados_snap_t) * header->snap_count); >> - >> - for (i=0; i< snap_count; i++) { >> - snaps[i] = le64_to_cpu(header->snaps[i].id); >> - >> - if (snap&& strcmp(snap, header_snap) == 0) { >> - snapid = snaps[i]; >> - } >> - >> - header_snap += strlen(header_snap) + 1; >> - if (header_snap> end) { >> - error_report("bad header, snapshot list broken"); >> - } >> - } >> - } >> - >> - if (snap&& !snapid) { >> - error_report("snapshot not found"); >> - qemu_free(snaps); >> - return -ENOENT; >> - } >> - seq = le32_to_cpu(header->snap_seq); >> - >> - r = rados_set_snap_context(pool, seq, snaps, snap_count); >> - >> - rados_set_snap(pool, snapid); >> - >> - qemu_free(snaps); >> - >> - return r; >> -} >> - >> -#define BUF_READ_START_LEN 4096 >> - >> -static int rbd_read_header(BDRVRBDState *s, char **hbuf) >> -{ >> - char *buf = NULL; >> - char n[RBD_MAX_SEG_NAME_SIZE]; >> - uint64_t len = BUF_READ_START_LEN; >> - int r; >> - >> - snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX); >> - >> - buf = qemu_malloc(len); >> - >> - r = rados_read(s->header_pool, n, 0, buf, len); >> - if (r< 0) { >> - goto failed; >> - } >> - >> - if (r< len) { >> - goto done; >> - } >> - >> - qemu_free(buf); >> - buf = qemu_malloc(len); >> - >> - r = rados_stat(s->header_pool, n,&len, NULL); >> - if (r< 0) { >> - goto failed; >> - } >> - >> - r = rados_read(s->header_pool, n, 0, buf, len); >> - if (r< 0) { >> - goto failed; >> - } >> - >> -done: >> - *hbuf = buf; >> - return 0; >> - >> -failed: >> - qemu_free(buf); >> - return r; >> -} >> - >> -static int rbd_open(BlockDriverState *bs, const char *filename, int flags) >> +static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags) >> { >> BDRVRBDState *s = bs->opaque; >> - RbdHeader1 *header; >> - char pool[RBD_MAX_SEG_NAME_SIZE]; >> - char snap_buf[RBD_MAX_SEG_NAME_SIZE]; >> - char *snap = NULL; >> - char *hbuf = NULL; >> + char pool[RBD_MAX_POOL_NAME_SIZE]; >> + char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; >> int r; >> >> - if (rbd_parsename(filename, pool, sizeof(pool), >> - snap_buf, sizeof(snap_buf), >> - s->name, sizeof(s->name))< 0) { >> + if (qemu_rbd_parsename(filename, pool, sizeof(pool), >> + snap_buf, sizeof(snap_buf), >> + s->name, sizeof(s->name))< 0) { >> return -EINVAL; >> } >> + s->snap = NULL; >> if (snap_buf[0] != '\0') { >> - snap = snap_buf; >> + s->snap = qemu_strdup(snap_buf); >> } >> >> - if ((r = rados_initialize(0, NULL))< 0) { >> + r = rados_create(&s->cluster, NULL); >> + if (r< 0) { >> error_report("error initializing"); >> return r; >> } >> >> - if ((r = rados_open_pool(pool,&s->pool))) { >> - error_report("error opening pool %s", pool); >> - rados_deinitialize(); >> + r = rados_conf_read_file(s->cluster, NULL); >> + if (r< 0) { >> + error_report("error reading config file"); >> + rados_shutdown(s->cluster); >> return r; >> } >> >> - if ((r = rados_open_pool(pool,&s->header_pool))) { >> - error_report("error opening pool %s", pool); >> - rados_deinitialize(); >> + r = rados_connect(s->cluster); >> + if (r< 0) { >> + error_report("error connecting"); >> + rados_shutdown(s->cluster); >> return r; >> } >> >> - if ((r = rbd_read_header(s,&hbuf))< 0) { >> - error_report("error reading header from %s", s->name); >> - goto failed; >> - } >> - >> - if (memcmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) { >> - error_report("Invalid header signature"); >> - r = -EMEDIUMTYPE; >> - goto failed; >> - } >> - >> - if (memcmp(hbuf + 68, RBD_HEADER_VERSION, 8)) { >> - error_report("Unknown image version"); >> - r = -EMEDIUMTYPE; >> - goto failed; >> + r = rados_ioctx_create(s->cluster, pool,&s->io_ctx); >> + if (r< 0) { >> + error_report("error opening pool %s", pool); >> + rados_shutdown(s->cluster); >> + return r; >> } >> >> - header = (RbdHeader1 *) hbuf; >> - s->size = le64_to_cpu(header->image_size); >> - s->objsize = 1ULL<< header->options.order; >> - memcpy(s->block_name, header->block_name, sizeof(header->block_name)); >> - >> - r = rbd_set_snapc(s->pool, snap, header); >> + r = rbd_open(s->io_ctx, s->name,&s->image, s->snap); >> if (r< 0) { >> - error_report("failed setting snap context: %s", strerror(-r)); >> - goto failed; >> + error_report("error reading header from %s", s->name); >> + rados_ioctx_destroy(s->io_ctx); >> + rados_shutdown(s->cluster); >> + return r; >> } >> >> - bs->read_only = (snap != NULL); >> + bs->read_only = (s->snap != NULL); >> >> s->event_reader_pos = 0; >> r = qemu_pipe(s->fds); >> @@ -592,23 +372,20 @@ static int rbd_open(BlockDriverState *bs, const char *filename, int flags) >> } >> fcntl(s->fds[0], F_SETFL, O_NONBLOCK); >> fcntl(s->fds[1], F_SETFL, O_NONBLOCK); >> - qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], rbd_aio_event_reader, NULL, >> - rbd_aio_flush_cb, NULL, s); >> + qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, >> + NULL, qemu_rbd_aio_flush_cb, NULL, s); >> >> - qemu_free(hbuf); >> >> return 0; >> >> failed: >> - qemu_free(hbuf); >> - >> - rados_close_pool(s->header_pool); >> - rados_close_pool(s->pool); >> - rados_deinitialize(); >> + rbd_close(s->image); >> + rados_ioctx_destroy(s->io_ctx); >> + rados_shutdown(s->cluster); >> return r; >> } >> >> -static void rbd_close(BlockDriverState *bs) >> +static void qemu_rbd_close(BlockDriverState *bs) >> { >> BDRVRBDState *s = bs->opaque; >> >> @@ -617,16 +394,17 @@ static void rbd_close(BlockDriverState *bs) >> qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL, >> NULL); >> >> - rados_close_pool(s->header_pool); >> - rados_close_pool(s->pool); >> - rados_deinitialize(); >> + rbd_close(s->image); >> + rados_ioctx_destroy(s->io_ctx); >> + qemu_free(s->snap); >> + rados_shutdown(s->cluster); >> } >> >> /* >> * Cancel aio. Since we don't reference acb in a non qemu threads, >> * it is safe to access it here. >> */ >> -static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) >> +static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) >> { >> RBDAIOCB *acb = (RBDAIOCB *) blockacb; >> acb->cancelled = 1; >> @@ -634,39 +412,28 @@ static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) >> >> static AIOPool rbd_aio_pool = { >> .aiocb_size = sizeof(RBDAIOCB), >> - .cancel = rbd_aio_cancel, >> + .cancel = qemu_rbd_aio_cancel, >> }; >> >> -/* >> - * This is the callback function for rados_aio_read and _write >> - * >> - * Note: this function is being called from a non qemu thread so >> - * we need to be careful about what we do here. Generally we only >> - * write to the block notification pipe, and do the rest of the >> - * io completion handling from rbd_aio_event_reader() which >> - * runs in a qemu context. >> - */ >> -static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) >> +static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) >> { >> - int ret; >> - rcb->ret = rados_aio_get_return_value(c); >> - rados_aio_release(c); >> + int ret = 0; >> while (1) { >> fd_set wfd; >> - int fd = rcb->s->fds[RBD_FD_WRITE]; >> + int fd = s->fds[RBD_FD_WRITE]; >> >> - /* send the rcb pointer to the qemu thread that is responsible >> - for the aio completion. Must do it in a qemu thread context */ >> + /* send the op pointer to the qemu thread that is responsible >> + for the aio/op completion. Must do it in a qemu thread context */ >> ret = write(fd, (void *)&rcb, sizeof(rcb)); >> if (ret>= 0) { >> break; >> } >> if (errno == EINTR) { >> continue; >> - } >> + } >> if (errno != EAGAIN) { >> break; >> - } >> + } >> >> FD_ZERO(&wfd); >> FD_SET(fd,&wfd); >> @@ -675,13 +442,31 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) >> } while (ret< 0&& errno == EINTR); >> } >> >> + return ret; >> +} >> + >> +/* >> + * This is the callback function for rbd_aio_read and _write >> + * >> + * Note: this function is being called from a non qemu thread so >> + * we need to be careful about what we do here. Generally we only >> + * write to the block notification pipe, and do the rest of the >> + * io completion handling from qemu_rbd_aio_event_reader() which >> + * runs in a qemu context. >> + */ >> +static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) >> +{ >> + int ret; >> + rcb->ret = rbd_aio_get_return_value(c); >> + rbd_aio_release(c); >> + ret = qemu_rbd_send_pipe(rcb->s, rcb); >> if (ret< 0) { >> - error_report("failed writing to acb->s->fds\n"); >> + error_report("failed writing to acb->s->fds"); >> qemu_free(rcb); >> } >> } >> >> -/* Callback when all queued rados_aio requests are complete */ >> +/* Callback when all queued rbd_aio requests are complete */ >> >> static void rbd_aio_bh_cb(void *opaque) >> { >> @@ -707,9 +492,7 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, >> { >> RBDAIOCB *acb; >> RADOSCB *rcb; >> - rados_completion_t c; >> - char n[RBD_MAX_SEG_NAME_SIZE]; >> - int64_t segnr, segoffs, segsize, last_segnr; >> + rbd_completion_t c; >> int64_t off, size; >> char *buf; >> >> @@ -719,7 +502,6 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, >> acb->write = write; >> acb->qiov = qiov; >> acb->bounce = qemu_blockalign(bs, qiov->size); >> - acb->aiocnt = 0; >> acb->ret = 0; >> acb->error = 0; >> acb->s = s; >> @@ -734,95 +516,81 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, >> >> off = sector_num * BDRV_SECTOR_SIZE; >> size = nb_sectors * BDRV_SECTOR_SIZE; >> - segnr = off / s->objsize; >> - segoffs = off % s->objsize; >> - segsize = s->objsize - segoffs; >> - >> - last_segnr = ((off + size - 1) / s->objsize); >> - acb->aiocnt = (last_segnr - segnr) + 1; >> >> - s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */ >> + s->qemu_aio_count++; /* All the RADOSCB */ >> >> - while (size> 0) { >> - if (size< segsize) { >> - segsize = size; >> - } >> - >> - snprintf(n, sizeof(n), "%s.%012" PRIx64, s->block_name, >> - segnr); >> - >> - rcb = qemu_malloc(sizeof(RADOSCB)); >> - rcb->done = 0; >> - rcb->acb = acb; >> - rcb->segsize = segsize; >> - rcb->buf = buf; >> - rcb->s = acb->s; >> - >> - if (write) { >> - rados_aio_create_completion(rcb, NULL, >> - (rados_callback_t) rbd_finish_aiocb, >> -&c); >> - rados_aio_write(s->pool, n, segoffs, buf, segsize, c); >> - } else { >> - rados_aio_create_completion(rcb, >> - (rados_callback_t) rbd_finish_aiocb, >> - NULL,&c); >> - rados_aio_read(s->pool, n, segoffs, buf, segsize, c); >> - } >> + rcb = qemu_malloc(sizeof(RADOSCB)); >> + rcb->done = 0; >> + rcb->acb = acb; >> + rcb->buf = buf; >> + rcb->s = acb->s; >> + rcb->size = size; >> >> - buf += segsize; >> - size -= segsize; >> - segoffs = 0; >> - segsize = s->objsize; >> - segnr++; >> + if (write) { >> + rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb,&c); >> + rbd_aio_write(s->image, off, size, buf, c); >> + } else { >> + rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb,&c); >> + rbd_aio_read(s->image, off, size, buf, c); >> } >> >> return&acb->common; >> } >> >> -static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs, >> - int64_t sector_num, QEMUIOVector * qiov, >> - int nb_sectors, >> - BlockDriverCompletionFunc * cb, >> - void *opaque) >> +static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, >> + int64_t sector_num, >> + QEMUIOVector *qiov, >> + int nb_sectors, >> + BlockDriverCompletionFunc *cb, >> + void *opaque) >> { >> return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0); >> } >> >> -static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs, >> - int64_t sector_num, QEMUIOVector * qiov, >> - int nb_sectors, >> - BlockDriverCompletionFunc * cb, >> - void *opaque) >> +static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, >> + int64_t sector_num, >> + QEMUIOVector *qiov, >> + int nb_sectors, >> + BlockDriverCompletionFunc *cb, >> + void *opaque) >> { >> return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1); >> } >> >> -static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi) >> +static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) >> { >> BDRVRBDState *s = bs->opaque; >> - bdi->cluster_size = s->objsize; >> + rbd_image_info_t info; >> + int r; >> + >> + r = rbd_stat(s->image,&info, sizeof(info)); >> + if (r< 0) { >> + return r; >> + } >> + >> + bdi->cluster_size = info.obj_size; >> return 0; >> } >> >> -static int64_t rbd_getlength(BlockDriverState * bs) >> +static int64_t qemu_rbd_getlength(BlockDriverState *bs) >> { >> BDRVRBDState *s = bs->opaque; >> + rbd_image_info_t info; >> + int r; >> >> - return s->size; >> + r = rbd_stat(s->image,&info, sizeof(info)); >> + if (r< 0) { >> + return r; >> + } >> + >> + return info.size; >> } >> >> -static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) >> +static int qemu_rbd_snap_create(BlockDriverState *bs, >> + QEMUSnapshotInfo *sn_info) >> { >> BDRVRBDState *s = bs->opaque; >> - char inbuf[512], outbuf[128]; >> - uint64_t snap_id; >> int r; >> - char *p = inbuf; >> - char *end = inbuf + sizeof(inbuf); >> - char n[RBD_MAX_SEG_NAME_SIZE]; >> - char *hbuf = NULL; >> - RbdHeader1 *header; >> >> if (sn_info->name[0] == '\0') { >> return -EINVAL; /* we need a name for rbd snapshots */ >> @@ -841,185 +609,59 @@ static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) >> return -ERANGE; >> } >> >> - r = rados_selfmanaged_snap_create(s->header_pool,&snap_id); >> + r = rbd_snap_create(s->image, sn_info->name); >> if (r< 0) { >> - error_report("failed to create snap id: %s", strerror(-r)); >> + error_report("failed to create snap: %s", strerror(-r)); >> return r; >> } >> >> - *(uint32_t *)p = strlen(sn_info->name); >> - cpu_to_le32s((uint32_t *)p); >> - p += sizeof(uint32_t); >> - strncpy(p, sn_info->name, end - p); >> - p += strlen(p); >> - if (p + sizeof(snap_id)> end) { >> - error_report("invalid input parameter"); >> - return -EINVAL; >> - } >> - >> - *(uint64_t *)p = snap_id; >> - cpu_to_le64s((uint64_t *)p); >> - >> - snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX); >> - >> - r = rados_exec(s->header_pool, n, "rbd", "snap_add", inbuf, >> - sizeof(inbuf), outbuf, sizeof(outbuf)); >> - if (r< 0) { >> - error_report("rbd.snap_add execution failed failed: %s", strerror(-r)); >> - return r; >> - } >> - >> - sprintf(sn_info->id_str, "%s", sn_info->name); >> - >> - r = rbd_read_header(s,&hbuf); >> - if (r< 0) { >> - error_report("failed reading header: %s", strerror(-r)); >> - return r; >> - } >> - >> - header = (RbdHeader1 *) hbuf; >> - r = rbd_set_snapc(s->pool, sn_info->name, header); >> - if (r< 0) { >> - error_report("failed setting snap context: %s", strerror(-r)); >> - goto failed; >> - } >> - >> - return 0; >> - >> -failed: >> - qemu_free(header); >> - return r; >> -} >> - >> -static int decode32(char **p, const char *end, uint32_t *v) >> -{ >> - if (*p + 4> end) { >> - return -ERANGE; >> - } >> - >> - *v = *(uint32_t *)(*p); >> - le32_to_cpus(v); >> - *p += 4; >> return 0; >> } >> >> -static int decode64(char **p, const char *end, uint64_t *v) >> -{ >> - if (*p + 8> end) { >> - return -ERANGE; >> - } >> - >> - *v = *(uint64_t *)(*p); >> - le64_to_cpus(v); >> - *p += 8; >> - return 0; >> -} >> - >> -static int decode_str(char **p, const char *end, char **s) >> -{ >> - uint32_t len; >> - int r; >> - >> - if ((r = decode32(p, end,&len))< 0) { >> - return r; >> - } >> - >> - *s = qemu_malloc(len + 1); >> - memcpy(*s, *p, len); >> - *p += len; >> - (*s)[len] = '\0'; >> - >> - return len; >> -} >> - >> -static int rbd_snap_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab) >> +static int qemu_rbd_snap_list(BlockDriverState *bs, >> + QEMUSnapshotInfo **psn_tab) >> { >> BDRVRBDState *s = bs->opaque; >> - char n[RBD_MAX_SEG_NAME_SIZE]; >> QEMUSnapshotInfo *sn_info, *sn_tab = NULL; >> - RbdHeader1 *header; >> - char *hbuf = NULL; >> - char *outbuf = NULL, *end, *buf; >> - uint64_t len; >> - uint64_t snap_seq; >> - uint32_t snap_count; >> int r, i; >> + rbd_snap_info_t *snaps; >> + int max_snaps = 100, snap_count; > > I think it would be nicer to have this defined at the beginning (e.g. > RBD_MAX_SNAPS). > >> >> - /* read header to estimate how much space we need to read the snap >> - * list */ >> - if ((r = rbd_read_header(s,&hbuf))< 0) { >> - goto done_err; >> - } >> - header = (RbdHeader1 *)hbuf; >> - len = le64_to_cpu(header->snap_names_len); >> - len += 1024; /* should have already been enough, but new snapshots might >> - already been created since we read the header. just allocate >> - a bit more, so that in most cases it'll suffice anyway */ >> - qemu_free(hbuf); >> - >> - snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX); >> - while (1) { >> - qemu_free(outbuf); >> - outbuf = qemu_malloc(len); >> - >> - r = rados_exec(s->header_pool, n, "rbd", "snap_list", NULL, 0, >> - outbuf, len); >> + do { >> + snaps = qemu_malloc(sizeof(*snaps) * max_snaps); >> + r = rbd_snap_list(s->image, snaps,&max_snaps); >> if (r< 0) { >> - error_report("rbd.snap_list execution failed failed: %s", strerror(-r)); >> - goto done_err; >> + qemu_free(snaps); >> } >> - if (r != len) { >> - break; >> - } >> + } while (r == -ERANGE); >> >> - /* if we're here, we probably raced with some snaps creation */ >> - len *= 2; >> + if (r<= 0) { >> + return r; >> } >> - buf = outbuf; >> - end = buf + len; >> >> - if ((r = decode64(&buf, end,&snap_seq))< 0) { >> - goto done_err; >> - } >> - if ((r = decode32(&buf, end,&snap_count))< 0) { >> - goto done_err; >> - } >> + snap_count = r; > > This isn't really necessary. We could just use snap_count above (or r below). > >> >> sn_tab = qemu_mallocz(snap_count * sizeof(QEMUSnapshotInfo)); >> - for (i = 0; i< snap_count; i++) { >> - uint64_t id, image_size; >> - char *snap_name; >> >> - if ((r = decode64(&buf, end,&id))< 0) { >> - goto done_err; >> - } >> - if ((r = decode64(&buf, end,&image_size))< 0) { >> - goto done_err; >> - } >> - if ((r = decode_str(&buf, end,&snap_name))< 0) { >> - goto done_err; >> - } >> + for (i = 0; i< snap_count; i++) { >> + const char *snap_name = snaps[i].name; >> >> sn_info = sn_tab + i; >> pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); >> pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); >> - qemu_free(snap_name); >> >> - sn_info->vm_state_size = image_size; >> + sn_info->vm_state_size = snaps[i].size; >> sn_info->date_sec = 0; >> sn_info->date_nsec = 0; >> sn_info->vm_clock_nsec = 0; >> } >> + rbd_snap_list_end(snaps); >> + >> *psn_tab = sn_tab; >> - qemu_free(outbuf); >> return snap_count; >> -done_err: >> - qemu_free(sn_tab); >> - qemu_free(outbuf); >> - return r; >> } >> >> -static QEMUOptionParameter rbd_create_options[] = { >> +static QEMUOptionParameter qemu_rbd_create_options[] = { >> { >> .name = BLOCK_OPT_SIZE, >> .type = OPT_SIZE, >> @@ -1036,19 +678,19 @@ static QEMUOptionParameter rbd_create_options[] = { >> static BlockDriver bdrv_rbd = { >> .format_name = "rbd", >> .instance_size = sizeof(BDRVRBDState), >> - .bdrv_file_open = rbd_open, >> - .bdrv_close = rbd_close, >> - .bdrv_create = rbd_create, >> - .bdrv_get_info = rbd_getinfo, >> - .create_options = rbd_create_options, >> - .bdrv_getlength = rbd_getlength, >> + .bdrv_file_open = qemu_rbd_open, >> + .bdrv_close = qemu_rbd_close, >> + .bdrv_create = qemu_rbd_create, >> + .bdrv_get_info = qemu_rbd_getinfo, >> + .create_options = qemu_rbd_create_options, >> + .bdrv_getlength = qemu_rbd_getlength, >> .protocol_name = "rbd", >> >> - .bdrv_aio_readv = rbd_aio_readv, >> - .bdrv_aio_writev = rbd_aio_writev, >> + .bdrv_aio_readv = qemu_rbd_aio_readv, >> + .bdrv_aio_writev = qemu_rbd_aio_writev, >> >> - .bdrv_snapshot_create = rbd_snap_create, >> - .bdrv_snapshot_list = rbd_snap_list, >> + .bdrv_snapshot_create = qemu_rbd_snap_create, >> + .bdrv_snapshot_list = qemu_rbd_snap_list, >> }; >> >> static void bdrv_rbd_init(void) >> diff --git a/block/rbd_types.h b/block/rbd_types.h >> deleted file mode 100644 >> index f4cca99..0000000 >> --- a/block/rbd_types.h >> +++ /dev/null >> @@ -1,71 +0,0 @@ >> -/* >> - * Ceph - scalable distributed file system >> - * >> - * Copyright (C) 2004-2010 Sage Weil<sage@newdream.net> >> - * >> - * This is free software; you can redistribute it and/or >> - * modify it under the terms of the GNU Lesser General Public >> - * License version 2.1, as published by the Free Software >> - * Foundation. See file COPYING.LIB. >> - * >> - */ >> - >> -#ifndef CEPH_RBD_TYPES_H >> -#define CEPH_RBD_TYPES_H >> - >> - >> -/* >> - * rbd image 'foo' consists of objects >> - * foo.rbd - image metadata >> - * foo.00000000 >> - * foo.00000001 >> - * ... - data >> - */ >> - >> -#define RBD_SUFFIX ".rbd" >> -#define RBD_DIRECTORY "rbd_directory" >> -#define RBD_INFO "rbd_info" >> - >> -#define RBD_DEFAULT_OBJ_ORDER 22 /* 4MB */ >> - >> -#define RBD_MAX_OBJ_NAME_SIZE 96 >> -#define RBD_MAX_BLOCK_NAME_SIZE 24 >> -#define RBD_MAX_SEG_NAME_SIZE 128 >> - >> -#define RBD_COMP_NONE 0 >> -#define RBD_CRYPT_NONE 0 >> - >> -#define RBD_HEADER_TEXT "<<< Rados Block Device Image>>>\n" >> -#define RBD_HEADER_SIGNATURE "RBD" >> -#define RBD_HEADER_VERSION "001.005" >> - >> -struct rbd_info { >> - uint64_t max_id; >> -} __attribute__ ((packed)); >> - >> -struct rbd_obj_snap_ondisk { >> - uint64_t id; >> - uint64_t image_size; >> -} __attribute__((packed)); >> - >> -struct rbd_obj_header_ondisk { >> - char text[40]; >> - char block_name[RBD_MAX_BLOCK_NAME_SIZE]; >> - char signature[4]; >> - char version[8]; >> - struct { >> - uint8_t order; >> - uint8_t crypt_type; >> - uint8_t comp_type; >> - uint8_t unused; >> - } __attribute__((packed)) options; >> - uint64_t image_size; >> - uint64_t snap_seq; >> - uint32_t snap_count; >> - uint32_t reserved; >> - uint64_t snap_names_len; >> - struct rbd_obj_snap_ondisk snaps[0]; >> -} __attribute__((packed)); >> - >> - >> -#endif >> diff --git a/configure b/configure >> index a318d37..378c238 100755 >> --- a/configure >> +++ b/configure >> @@ -1917,41 +1917,24 @@ fi >> if test "$rbd" != "no" ; then >> cat> $TMPC<<EOF >> #include<stdio.h> >> -#include<rados/librados.h> >> -int main(void) { rados_initialize(0, NULL); return 0; } >> -EOF >> - rbd_libs="-lrados" >> - if compile_prog "" "$rbd_libs" ; then >> - librados_too_old=no >> - cat> $TMPC<<EOF >> -#include<stdio.h> >> -#include<rados/librados.h> >> -#ifndef CEPH_OSD_TMAP_SET >> -#error missing CEPH_OSD_TMAP_SET >> -#endif >> +#include<rbd/librbd.h> >> int main(void) { >> - int (*func)(const rados_pool_t pool, uint64_t *snapid) = rados_selfmanaged_snap_create; >> - rados_initialize(0, NULL); >> + rados_t cluster; >> + rados_create(&cluster, NULL); >> return 0; >> } >> EOF >> - if compile_prog "" "$rbd_libs" ; then >> - rbd=yes >> - libs_tools="$rbd_libs $libs_tools" >> - libs_softmmu="$rbd_libs $libs_softmmu" >> - else >> - rbd=no >> - librados_too_old=yes >> - fi >> + rbd_libs="-lrbd -lrados" >> + if compile_prog "" "$rbd_libs" ; then >> + rbd=yes >> + libs_tools="$rbd_libs $libs_tools" >> + libs_softmmu="$rbd_libs $libs_softmmu" >> else >> if test "$rbd" = "yes" ; then >> feature_not_found "rados block device" >> fi >> rbd=no >> fi >> - if test "$librados_too_old" = "yes" ; then >> - echo "-> Your librados version is too old - upgrade needed to have rbd support" >> - fi >> fi >> >> ########################################## >> -- >> 1.7.2.3 >> >> >> -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
diff --git a/block/rbd.c b/block/rbd.c index 249a590..1c8e7c7 100644 --- a/block/rbd.c +++ b/block/rbd.c @@ -1,20 +1,22 @@ /* * QEMU Block driver for RADOS (Ceph) * - * Copyright (C) 2010 Christian Brunner <chb@muc.de> + * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, + * Josh Durgin <josh.durgin@dreamhost.com> * * This work is licensed under the terms of the GNU GPL, version 2. See * the COPYING file in the top-level directory. * */ +#include <inttypes.h> + #include "qemu-common.h" #include "qemu-error.h" -#include "rbd_types.h" #include "block_int.h" -#include <rados/librados.h> +#include <rbd/librbd.h> @@ -40,6 +42,12 @@ #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) +#define RBD_MAX_CONF_NAME_SIZE 128 +#define RBD_MAX_CONF_VAL_SIZE 512 +#define RBD_MAX_CONF_SIZE 1024 +#define RBD_MAX_POOL_NAME_SIZE 128 +#define RBD_MAX_SNAP_NAME_SIZE 128 + typedef struct RBDAIOCB { BlockDriverAIOCB common; QEMUBH *bh; @@ -48,7 +56,6 @@ typedef struct RBDAIOCB { char *bounce; int write; int64_t sector_num; - int aiocnt; int error; struct BDRVRBDState *s; int cancelled; @@ -59,7 +66,7 @@ typedef struct RADOSCB { RBDAIOCB *acb; struct BDRVRBDState *s; int done; - int64_t segsize; + int64_t size; char *buf; int ret; } RADOSCB; @@ -69,25 +76,22 @@ typedef struct RADOSCB { typedef struct BDRVRBDState { int fds[2]; - rados_pool_t pool; - rados_pool_t header_pool; - char name[RBD_MAX_OBJ_NAME_SIZE]; - char block_name[RBD_MAX_BLOCK_NAME_SIZE]; - uint64_t size; - uint64_t objsize; + rados_t cluster; + rados_ioctx_t io_ctx; + rbd_image_t image; + char name[RBD_MAX_IMAGE_NAME_SIZE]; int qemu_aio_count; + char *snap; int event_reader_pos; RADOSCB *event_rcb; } BDRVRBDState; -typedef struct rbd_obj_header_ondisk RbdHeader1; - static void rbd_aio_bh_cb(void *opaque); -static int rbd_next_tok(char *dst, int dst_len, - char *src, char delim, - const char *name, - char **p) +static int qemu_rbd_next_tok(char *dst, int dst_len, + char *src, char delim, + const char *name, + char **p) { int l; char *end; @@ -115,10 +119,10 @@ static int rbd_next_tok(char *dst, int dst_len, return 0; } -static int rbd_parsename(const char *filename, - char *pool, int pool_len, - char *snap, int snap_len, - char *name, int name_len) +static int qemu_rbd_parsename(const char *filename, + char *pool, int pool_len, + char *snap, int snap_len, + char *name, int name_len) { const char *start; char *p, *buf; @@ -131,12 +135,12 @@ static int rbd_parsename(const char *filename, buf = qemu_strdup(start); p = buf; - ret = rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); + ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); if (ret < 0 || !p) { ret = -EINVAL; goto done; } - ret = rbd_next_tok(name, name_len, p, '@', "object name", &p); + ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p); if (ret < 0) { goto done; } @@ -145,123 +149,35 @@ static int rbd_parsename(const char *filename, goto done; } - ret = rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p); + ret = qemu_rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p); done: qemu_free(buf); return ret; } -static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc) -{ - uint32_t len = strlen(name); - uint32_t len_le = cpu_to_le32(len); - /* total_len = encoding op + name + empty buffer */ - uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t); - uint8_t *desc = NULL; - - desc = qemu_malloc(total_len); - - *tmap_desc = (char *)desc; - - *desc = op; - desc++; - memcpy(desc, &len_le, sizeof(len_le)); - desc += sizeof(len_le); - memcpy(desc, name, len); - desc += len; - len = 0; /* no need for endian conversion for 0 */ - memcpy(desc, &len, sizeof(len)); - desc += sizeof(len); - - return (char *)desc - *tmap_desc; -} - -static void free_tmap_op(char *tmap_desc) -{ - qemu_free(tmap_desc); -} - -static int rbd_register_image(rados_pool_t pool, const char *name) -{ - char *tmap_desc; - const char *dir = RBD_DIRECTORY; - int ret; - - ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc); - if (ret < 0) { - return ret; - } - - ret = rados_tmap_update(pool, dir, tmap_desc, ret); - free_tmap_op(tmap_desc); - - return ret; -} - -static int touch_rbd_info(rados_pool_t pool, const char *info_oid) -{ - int r = rados_write(pool, info_oid, 0, NULL, 0); - if (r < 0) { - return r; - } - return 0; -} - -static int rbd_assign_bid(rados_pool_t pool, uint64_t *id) -{ - uint64_t out[1]; - const char *info_oid = RBD_INFO; - - *id = 0; - - int r = touch_rbd_info(pool, info_oid); - if (r < 0) { - return r; - } - - r = rados_exec(pool, info_oid, "rbd", "assign_bid", NULL, - 0, (char *)out, sizeof(out)); - if (r < 0) { - return r; - } - - le64_to_cpus(out); - *id = out[0]; - - return 0; -} - -static int rbd_create(const char *filename, QEMUOptionParameter *options) +static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options) { int64_t bytes = 0; int64_t objsize; - uint64_t size; - time_t mtime; - uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER; - char pool[RBD_MAX_SEG_NAME_SIZE]; - char n[RBD_MAX_SEG_NAME_SIZE]; - char name[RBD_MAX_OBJ_NAME_SIZE]; - char snap_buf[RBD_MAX_SEG_NAME_SIZE]; + int obj_order = 0; + char pool[RBD_MAX_POOL_NAME_SIZE]; + char name[RBD_MAX_IMAGE_NAME_SIZE]; + char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; char *snap = NULL; - RbdHeader1 header; - rados_pool_t p; - uint64_t bid; - uint32_t hi, lo; + rados_t cluster; + rados_ioctx_t io_ctx; int ret; - if (rbd_parsename(filename, - pool, sizeof(pool), - snap_buf, sizeof(snap_buf), - name, sizeof(name)) < 0) { + if (qemu_rbd_parsename(filename, pool, sizeof(pool), + snap_buf, sizeof(snap_buf), + name, sizeof(name)) < 0) { return -EINVAL; } if (snap_buf[0] != '\0') { snap = snap_buf; } - snprintf(n, sizeof(n), "%s%s", name, RBD_SUFFIX); - /* Read out options */ while (options && options->name) { if (!strcmp(options->name, BLOCK_OPT_SIZE)) { @@ -277,82 +193,55 @@ static int rbd_create(const char *filename, QEMUOptionParameter *options) error_report("obj size too small"); return -EINVAL; } - obj_order = ffs(objsize) - 1; + obj_order = ffs(objsize) - 1; } } options++; } - memset(&header, 0, sizeof(header)); - pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT); - pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE); - pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION); - header.image_size = cpu_to_le64(bytes); - header.options.order = obj_order; - header.options.crypt_type = RBD_CRYPT_NONE; - header.options.comp_type = RBD_COMP_NONE; - header.snap_seq = 0; - header.snap_count = 0; - - if (rados_initialize(0, NULL) < 0) { + if (rados_create(&cluster, NULL) < 0) { error_report("error initializing"); return -EIO; } - if (rados_open_pool(pool, &p)) { - error_report("error opening pool %s", pool); - rados_deinitialize(); + if (rados_conf_read_file(cluster, NULL) < 0) { + error_report("error reading config file"); + rados_shutdown(cluster); return -EIO; } - /* check for existing rbd header file */ - ret = rados_stat(p, n, &size, &mtime); - if (ret == 0) { - ret=-EEXIST; - goto done; - } - - ret = rbd_assign_bid(p, &bid); - if (ret < 0) { - error_report("failed assigning block id"); - rados_deinitialize(); + if (rados_connect(cluster) < 0) { + error_report("error connecting"); + rados_shutdown(cluster); return -EIO; } - hi = bid >> 32; - lo = bid & 0xFFFFFFFF; - snprintf(header.block_name, sizeof(header.block_name), "rb.%x.%x", hi, lo); - /* create header file */ - ret = rados_write(p, n, 0, (const char *)&header, sizeof(header)); - if (ret < 0) { - goto done; + if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) { + error_report("error opening pool %s", pool); + rados_shutdown(cluster); + return -EIO; } - ret = rbd_register_image(p, name); -done: - rados_close_pool(p); - rados_deinitialize(); + ret = rbd_create(io_ctx, name, bytes, &obj_order); + rados_ioctx_destroy(io_ctx); + rados_shutdown(cluster); return ret; } /* - * This aio completion is being called from rbd_aio_event_reader() and - * runs in qemu context. It schedules a bh, but just in case the aio + * This aio completion is being called from qemu_rbd_aio_event_reader() + * and runs in qemu context. It schedules a bh, but just in case the aio * was not cancelled before. */ -static void rbd_complete_aio(RADOSCB *rcb) +static void qemu_rbd_complete_aio(RADOSCB *rcb) { RBDAIOCB *acb = rcb->acb; int64_t r; - acb->aiocnt--; - if (acb->cancelled) { - if (!acb->aiocnt) { - qemu_vfree(acb->bounce); - qemu_aio_release(acb); - } + qemu_vfree(acb->bounce); + qemu_aio_release(acb); goto done; } @@ -363,32 +252,25 @@ static void rbd_complete_aio(RADOSCB *rcb) acb->ret = r; acb->error = 1; } else if (!acb->error) { - acb->ret += rcb->segsize; + acb->ret = rcb->size; } } else { - if (r == -ENOENT) { - memset(rcb->buf, 0, rcb->segsize); - if (!acb->error) { - acb->ret += rcb->segsize; - } - } else if (r < 0) { - memset(rcb->buf, 0, rcb->segsize); + if (r < 0) { + memset(rcb->buf, 0, rcb->size); acb->ret = r; acb->error = 1; - } else if (r < rcb->segsize) { - memset(rcb->buf + r, 0, rcb->segsize - r); + } else if (r < rcb->size) { + memset(rcb->buf + r, 0, rcb->size - r); if (!acb->error) { - acb->ret += rcb->segsize; + acb->ret = rcb->size; } } else if (!acb->error) { - acb->ret += r; + acb->ret = r; } } /* Note that acb->bh can be NULL in case where the aio was cancelled */ - if (!acb->aiocnt) { - acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); - qemu_bh_schedule(acb->bh); - } + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); + qemu_bh_schedule(acb->bh); done: qemu_free(rcb); } @@ -397,7 +279,7 @@ done: * aio fd read handler. It runs in the qemu context and calls the * completion handling of completed rados aio operations. */ -static void rbd_aio_event_reader(void *opaque) +static void qemu_rbd_aio_event_reader(void *opaque) { BDRVRBDState *s = opaque; @@ -413,176 +295,74 @@ static void rbd_aio_event_reader(void *opaque) s->event_reader_pos += ret; if (s->event_reader_pos == sizeof(s->event_rcb)) { s->event_reader_pos = 0; - rbd_complete_aio(s->event_rcb); - s->qemu_aio_count --; + qemu_rbd_complete_aio(s->event_rcb); + s->qemu_aio_count--; } } } } while (ret < 0 && errno == EINTR); } -static int rbd_aio_flush_cb(void *opaque) +static int qemu_rbd_aio_flush_cb(void *opaque) { BDRVRBDState *s = opaque; return (s->qemu_aio_count > 0); } - -static int rbd_set_snapc(rados_pool_t pool, const char *snap, RbdHeader1 *header) -{ - uint32_t snap_count = le32_to_cpu(header->snap_count); - rados_snap_t *snaps = NULL; - rados_snap_t seq; - uint32_t i; - uint64_t snap_names_len = le64_to_cpu(header->snap_names_len); - int r; - rados_snap_t snapid = 0; - - if (snap_count) { - const char *header_snap = (const char *)&header->snaps[snap_count]; - const char *end = header_snap + snap_names_len; - snaps = qemu_malloc(sizeof(rados_snap_t) * header->snap_count); - - for (i=0; i < snap_count; i++) { - snaps[i] = le64_to_cpu(header->snaps[i].id); - - if (snap && strcmp(snap, header_snap) == 0) { - snapid = snaps[i]; - } - - header_snap += strlen(header_snap) + 1; - if (header_snap > end) { - error_report("bad header, snapshot list broken"); - } - } - } - - if (snap && !snapid) { - error_report("snapshot not found"); - qemu_free(snaps); - return -ENOENT; - } - seq = le32_to_cpu(header->snap_seq); - - r = rados_set_snap_context(pool, seq, snaps, snap_count); - - rados_set_snap(pool, snapid); - - qemu_free(snaps); - - return r; -} - -#define BUF_READ_START_LEN 4096 - -static int rbd_read_header(BDRVRBDState *s, char **hbuf) -{ - char *buf = NULL; - char n[RBD_MAX_SEG_NAME_SIZE]; - uint64_t len = BUF_READ_START_LEN; - int r; - - snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX); - - buf = qemu_malloc(len); - - r = rados_read(s->header_pool, n, 0, buf, len); - if (r < 0) { - goto failed; - } - - if (r < len) { - goto done; - } - - qemu_free(buf); - buf = qemu_malloc(len); - - r = rados_stat(s->header_pool, n, &len, NULL); - if (r < 0) { - goto failed; - } - - r = rados_read(s->header_pool, n, 0, buf, len); - if (r < 0) { - goto failed; - } - -done: - *hbuf = buf; - return 0; - -failed: - qemu_free(buf); - return r; -} - -static int rbd_open(BlockDriverState *bs, const char *filename, int flags) +static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags) { BDRVRBDState *s = bs->opaque; - RbdHeader1 *header; - char pool[RBD_MAX_SEG_NAME_SIZE]; - char snap_buf[RBD_MAX_SEG_NAME_SIZE]; - char *snap = NULL; - char *hbuf = NULL; + char pool[RBD_MAX_POOL_NAME_SIZE]; + char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; int r; - if (rbd_parsename(filename, pool, sizeof(pool), - snap_buf, sizeof(snap_buf), - s->name, sizeof(s->name)) < 0) { + if (qemu_rbd_parsename(filename, pool, sizeof(pool), + snap_buf, sizeof(snap_buf), + s->name, sizeof(s->name)) < 0) { return -EINVAL; } + s->snap = NULL; if (snap_buf[0] != '\0') { - snap = snap_buf; + s->snap = qemu_strdup(snap_buf); } - if ((r = rados_initialize(0, NULL)) < 0) { + r = rados_create(&s->cluster, NULL); + if (r < 0) { error_report("error initializing"); return r; } - if ((r = rados_open_pool(pool, &s->pool))) { - error_report("error opening pool %s", pool); - rados_deinitialize(); + r = rados_conf_read_file(s->cluster, NULL); + if (r < 0) { + error_report("error reading config file"); + rados_shutdown(s->cluster); return r; } - if ((r = rados_open_pool(pool, &s->header_pool))) { - error_report("error opening pool %s", pool); - rados_deinitialize(); + r = rados_connect(s->cluster); + if (r < 0) { + error_report("error connecting"); + rados_shutdown(s->cluster); return r; } - if ((r = rbd_read_header(s, &hbuf)) < 0) { - error_report("error reading header from %s", s->name); - goto failed; - } - - if (memcmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) { - error_report("Invalid header signature"); - r = -EMEDIUMTYPE; - goto failed; - } - - if (memcmp(hbuf + 68, RBD_HEADER_VERSION, 8)) { - error_report("Unknown image version"); - r = -EMEDIUMTYPE; - goto failed; + r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); + if (r < 0) { + error_report("error opening pool %s", pool); + rados_shutdown(s->cluster); + return r; } - header = (RbdHeader1 *) hbuf; - s->size = le64_to_cpu(header->image_size); - s->objsize = 1ULL << header->options.order; - memcpy(s->block_name, header->block_name, sizeof(header->block_name)); - - r = rbd_set_snapc(s->pool, snap, header); + r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); if (r < 0) { - error_report("failed setting snap context: %s", strerror(-r)); - goto failed; + error_report("error reading header from %s", s->name); + rados_ioctx_destroy(s->io_ctx); + rados_shutdown(s->cluster); + return r; } - bs->read_only = (snap != NULL); + bs->read_only = (s->snap != NULL); s->event_reader_pos = 0; r = qemu_pipe(s->fds); @@ -592,23 +372,20 @@ static int rbd_open(BlockDriverState *bs, const char *filename, int flags) } fcntl(s->fds[0], F_SETFL, O_NONBLOCK); fcntl(s->fds[1], F_SETFL, O_NONBLOCK); - qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], rbd_aio_event_reader, NULL, - rbd_aio_flush_cb, NULL, s); + qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, + NULL, qemu_rbd_aio_flush_cb, NULL, s); - qemu_free(hbuf); return 0; failed: - qemu_free(hbuf); - - rados_close_pool(s->header_pool); - rados_close_pool(s->pool); - rados_deinitialize(); + rbd_close(s->image); + rados_ioctx_destroy(s->io_ctx); + rados_shutdown(s->cluster); return r; } -static void rbd_close(BlockDriverState *bs) +static void qemu_rbd_close(BlockDriverState *bs) { BDRVRBDState *s = bs->opaque; @@ -617,16 +394,17 @@ static void rbd_close(BlockDriverState *bs) qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL, NULL); - rados_close_pool(s->header_pool); - rados_close_pool(s->pool); - rados_deinitialize(); + rbd_close(s->image); + rados_ioctx_destroy(s->io_ctx); + qemu_free(s->snap); + rados_shutdown(s->cluster); } /* * Cancel aio. Since we don't reference acb in a non qemu threads, * it is safe to access it here. */ -static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) +static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) { RBDAIOCB *acb = (RBDAIOCB *) blockacb; acb->cancelled = 1; @@ -634,39 +412,28 @@ static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) static AIOPool rbd_aio_pool = { .aiocb_size = sizeof(RBDAIOCB), - .cancel = rbd_aio_cancel, + .cancel = qemu_rbd_aio_cancel, }; -/* - * This is the callback function for rados_aio_read and _write - * - * Note: this function is being called from a non qemu thread so - * we need to be careful about what we do here. Generally we only - * write to the block notification pipe, and do the rest of the - * io completion handling from rbd_aio_event_reader() which - * runs in a qemu context. - */ -static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) +static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) { - int ret; - rcb->ret = rados_aio_get_return_value(c); - rados_aio_release(c); + int ret = 0; while (1) { fd_set wfd; - int fd = rcb->s->fds[RBD_FD_WRITE]; + int fd = s->fds[RBD_FD_WRITE]; - /* send the rcb pointer to the qemu thread that is responsible - for the aio completion. Must do it in a qemu thread context */ + /* send the op pointer to the qemu thread that is responsible + for the aio/op completion. Must do it in a qemu thread context */ ret = write(fd, (void *)&rcb, sizeof(rcb)); if (ret >= 0) { break; } if (errno == EINTR) { continue; - } + } if (errno != EAGAIN) { break; - } + } FD_ZERO(&wfd); FD_SET(fd, &wfd); @@ -675,13 +442,31 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) } while (ret < 0 && errno == EINTR); } + return ret; +} + +/* + * This is the callback function for rbd_aio_read and _write + * + * Note: this function is being called from a non qemu thread so + * we need to be careful about what we do here. Generally we only + * write to the block notification pipe, and do the rest of the + * io completion handling from qemu_rbd_aio_event_reader() which + * runs in a qemu context. + */ +static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) +{ + int ret; + rcb->ret = rbd_aio_get_return_value(c); + rbd_aio_release(c); + ret = qemu_rbd_send_pipe(rcb->s, rcb); if (ret < 0) { - error_report("failed writing to acb->s->fds\n"); + error_report("failed writing to acb->s->fds"); qemu_free(rcb); } } -/* Callback when all queued rados_aio requests are complete */ +/* Callback when all queued rbd_aio requests are complete */ static void rbd_aio_bh_cb(void *opaque) { @@ -707,9 +492,7 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, { RBDAIOCB *acb; RADOSCB *rcb; - rados_completion_t c; - char n[RBD_MAX_SEG_NAME_SIZE]; - int64_t segnr, segoffs, segsize, last_segnr; + rbd_completion_t c; int64_t off, size; char *buf; @@ -719,7 +502,6 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, acb->write = write; acb->qiov = qiov; acb->bounce = qemu_blockalign(bs, qiov->size); - acb->aiocnt = 0; acb->ret = 0; acb->error = 0; acb->s = s; @@ -734,95 +516,81 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, off = sector_num * BDRV_SECTOR_SIZE; size = nb_sectors * BDRV_SECTOR_SIZE; - segnr = off / s->objsize; - segoffs = off % s->objsize; - segsize = s->objsize - segoffs; - - last_segnr = ((off + size - 1) / s->objsize); - acb->aiocnt = (last_segnr - segnr) + 1; - s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */ + s->qemu_aio_count++; /* All the RADOSCB */ - while (size > 0) { - if (size < segsize) { - segsize = size; - } - - snprintf(n, sizeof(n), "%s.%012" PRIx64, s->block_name, - segnr); - - rcb = qemu_malloc(sizeof(RADOSCB)); - rcb->done = 0; - rcb->acb = acb; - rcb->segsize = segsize; - rcb->buf = buf; - rcb->s = acb->s; - - if (write) { - rados_aio_create_completion(rcb, NULL, - (rados_callback_t) rbd_finish_aiocb, - &c); - rados_aio_write(s->pool, n, segoffs, buf, segsize, c); - } else { - rados_aio_create_completion(rcb, - (rados_callback_t) rbd_finish_aiocb, - NULL, &c); - rados_aio_read(s->pool, n, segoffs, buf, segsize, c); - } + rcb = qemu_malloc(sizeof(RADOSCB)); + rcb->done = 0; + rcb->acb = acb; + rcb->buf = buf; + rcb->s = acb->s; + rcb->size = size; - buf += segsize; - size -= segsize; - segoffs = 0; - segsize = s->objsize; - segnr++; + if (write) { + rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); + rbd_aio_write(s->image, off, size, buf, c); + } else { + rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); + rbd_aio_read(s->image, off, size, buf, c); } return &acb->common; } -static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs, - int64_t sector_num, QEMUIOVector * qiov, - int nb_sectors, - BlockDriverCompletionFunc * cb, - void *opaque) +static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, + int64_t sector_num, + QEMUIOVector *qiov, + int nb_sectors, + BlockDriverCompletionFunc *cb, + void *opaque) { return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0); } -static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs, - int64_t sector_num, QEMUIOVector * qiov, - int nb_sectors, - BlockDriverCompletionFunc * cb, - void *opaque) +static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, + int64_t sector_num, + QEMUIOVector *qiov, + int nb_sectors, + BlockDriverCompletionFunc *cb, + void *opaque) { return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1); } -static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi) +static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) { BDRVRBDState *s = bs->opaque; - bdi->cluster_size = s->objsize; + rbd_image_info_t info; + int r; + + r = rbd_stat(s->image, &info, sizeof(info)); + if (r < 0) { + return r; + } + + bdi->cluster_size = info.obj_size; return 0; } -static int64_t rbd_getlength(BlockDriverState * bs) +static int64_t qemu_rbd_getlength(BlockDriverState *bs) { BDRVRBDState *s = bs->opaque; + rbd_image_info_t info; + int r; - return s->size; + r = rbd_stat(s->image, &info, sizeof(info)); + if (r < 0) { + return r; + } + + return info.size; } -static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) +static int qemu_rbd_snap_create(BlockDriverState *bs, + QEMUSnapshotInfo *sn_info) { BDRVRBDState *s = bs->opaque; - char inbuf[512], outbuf[128]; - uint64_t snap_id; int r; - char *p = inbuf; - char *end = inbuf + sizeof(inbuf); - char n[RBD_MAX_SEG_NAME_SIZE]; - char *hbuf = NULL; - RbdHeader1 *header; if (sn_info->name[0] == '\0') { return -EINVAL; /* we need a name for rbd snapshots */ @@ -841,185 +609,59 @@ static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) return -ERANGE; } - r = rados_selfmanaged_snap_create(s->header_pool, &snap_id); + r = rbd_snap_create(s->image, sn_info->name); if (r < 0) { - error_report("failed to create snap id: %s", strerror(-r)); + error_report("failed to create snap: %s", strerror(-r)); return r; } - *(uint32_t *)p = strlen(sn_info->name); - cpu_to_le32s((uint32_t *)p); - p += sizeof(uint32_t); - strncpy(p, sn_info->name, end - p); - p += strlen(p); - if (p + sizeof(snap_id) > end) { - error_report("invalid input parameter"); - return -EINVAL; - } - - *(uint64_t *)p = snap_id; - cpu_to_le64s((uint64_t *)p); - - snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX); - - r = rados_exec(s->header_pool, n, "rbd", "snap_add", inbuf, - sizeof(inbuf), outbuf, sizeof(outbuf)); - if (r < 0) { - error_report("rbd.snap_add execution failed failed: %s", strerror(-r)); - return r; - } - - sprintf(sn_info->id_str, "%s", sn_info->name); - - r = rbd_read_header(s, &hbuf); - if (r < 0) { - error_report("failed reading header: %s", strerror(-r)); - return r; - } - - header = (RbdHeader1 *) hbuf; - r = rbd_set_snapc(s->pool, sn_info->name, header); - if (r < 0) { - error_report("failed setting snap context: %s", strerror(-r)); - goto failed; - } - - return 0; - -failed: - qemu_free(header); - return r; -} - -static int decode32(char **p, const char *end, uint32_t *v) -{ - if (*p + 4 > end) { - return -ERANGE; - } - - *v = *(uint32_t *)(*p); - le32_to_cpus(v); - *p += 4; return 0; } -static int decode64(char **p, const char *end, uint64_t *v) -{ - if (*p + 8 > end) { - return -ERANGE; - } - - *v = *(uint64_t *)(*p); - le64_to_cpus(v); - *p += 8; - return 0; -} - -static int decode_str(char **p, const char *end, char **s) -{ - uint32_t len; - int r; - - if ((r = decode32(p, end, &len)) < 0) { - return r; - } - - *s = qemu_malloc(len + 1); - memcpy(*s, *p, len); - *p += len; - (*s)[len] = '\0'; - - return len; -} - -static int rbd_snap_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab) +static int qemu_rbd_snap_list(BlockDriverState *bs, + QEMUSnapshotInfo **psn_tab) { BDRVRBDState *s = bs->opaque; - char n[RBD_MAX_SEG_NAME_SIZE]; QEMUSnapshotInfo *sn_info, *sn_tab = NULL; - RbdHeader1 *header; - char *hbuf = NULL; - char *outbuf = NULL, *end, *buf; - uint64_t len; - uint64_t snap_seq; - uint32_t snap_count; int r, i; + rbd_snap_info_t *snaps; + int max_snaps = 100, snap_count; - /* read header to estimate how much space we need to read the snap - * list */ - if ((r = rbd_read_header(s, &hbuf)) < 0) { - goto done_err; - } - header = (RbdHeader1 *)hbuf; - len = le64_to_cpu(header->snap_names_len); - len += 1024; /* should have already been enough, but new snapshots might - already been created since we read the header. just allocate - a bit more, so that in most cases it'll suffice anyway */ - qemu_free(hbuf); - - snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX); - while (1) { - qemu_free(outbuf); - outbuf = qemu_malloc(len); - - r = rados_exec(s->header_pool, n, "rbd", "snap_list", NULL, 0, - outbuf, len); + do { + snaps = qemu_malloc(sizeof(*snaps) * max_snaps); + r = rbd_snap_list(s->image, snaps, &max_snaps); if (r < 0) { - error_report("rbd.snap_list execution failed failed: %s", strerror(-r)); - goto done_err; + qemu_free(snaps); } - if (r != len) { - break; - } + } while (r == -ERANGE); - /* if we're here, we probably raced with some snaps creation */ - len *= 2; + if (r <= 0) { + return r; } - buf = outbuf; - end = buf + len; - if ((r = decode64(&buf, end, &snap_seq)) < 0) { - goto done_err; - } - if ((r = decode32(&buf, end, &snap_count)) < 0) { - goto done_err; - } + snap_count = r; sn_tab = qemu_mallocz(snap_count * sizeof(QEMUSnapshotInfo)); - for (i = 0; i < snap_count; i++) { - uint64_t id, image_size; - char *snap_name; - if ((r = decode64(&buf, end, &id)) < 0) { - goto done_err; - } - if ((r = decode64(&buf, end, &image_size)) < 0) { - goto done_err; - } - if ((r = decode_str(&buf, end, &snap_name)) < 0) { - goto done_err; - } + for (i = 0; i < snap_count; i++) { + const char *snap_name = snaps[i].name; sn_info = sn_tab + i; pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); - qemu_free(snap_name); - sn_info->vm_state_size = image_size; + sn_info->vm_state_size = snaps[i].size; sn_info->date_sec = 0; sn_info->date_nsec = 0; sn_info->vm_clock_nsec = 0; } + rbd_snap_list_end(snaps); + *psn_tab = sn_tab; - qemu_free(outbuf); return snap_count; -done_err: - qemu_free(sn_tab); - qemu_free(outbuf); - return r; } -static QEMUOptionParameter rbd_create_options[] = { +static QEMUOptionParameter qemu_rbd_create_options[] = { { .name = BLOCK_OPT_SIZE, .type = OPT_SIZE, @@ -1036,19 +678,19 @@ static QEMUOptionParameter rbd_create_options[] = { static BlockDriver bdrv_rbd = { .format_name = "rbd", .instance_size = sizeof(BDRVRBDState), - .bdrv_file_open = rbd_open, - .bdrv_close = rbd_close, - .bdrv_create = rbd_create, - .bdrv_get_info = rbd_getinfo, - .create_options = rbd_create_options, - .bdrv_getlength = rbd_getlength, + .bdrv_file_open = qemu_rbd_open, + .bdrv_close = qemu_rbd_close, + .bdrv_create = qemu_rbd_create, + .bdrv_get_info = qemu_rbd_getinfo, + .create_options = qemu_rbd_create_options, + .bdrv_getlength = qemu_rbd_getlength, .protocol_name = "rbd", - .bdrv_aio_readv = rbd_aio_readv, - .bdrv_aio_writev = rbd_aio_writev, + .bdrv_aio_readv = qemu_rbd_aio_readv, + .bdrv_aio_writev = qemu_rbd_aio_writev, - .bdrv_snapshot_create = rbd_snap_create, - .bdrv_snapshot_list = rbd_snap_list, + .bdrv_snapshot_create = qemu_rbd_snap_create, + .bdrv_snapshot_list = qemu_rbd_snap_list, }; static void bdrv_rbd_init(void) diff --git a/block/rbd_types.h b/block/rbd_types.h deleted file mode 100644 index f4cca99..0000000 --- a/block/rbd_types.h +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net> - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING.LIB. - * - */ - -#ifndef CEPH_RBD_TYPES_H -#define CEPH_RBD_TYPES_H - - -/* - * rbd image 'foo' consists of objects - * foo.rbd - image metadata - * foo.00000000 - * foo.00000001 - * ... - data - */ - -#define RBD_SUFFIX ".rbd" -#define RBD_DIRECTORY "rbd_directory" -#define RBD_INFO "rbd_info" - -#define RBD_DEFAULT_OBJ_ORDER 22 /* 4MB */ - -#define RBD_MAX_OBJ_NAME_SIZE 96 -#define RBD_MAX_BLOCK_NAME_SIZE 24 -#define RBD_MAX_SEG_NAME_SIZE 128 - -#define RBD_COMP_NONE 0 -#define RBD_CRYPT_NONE 0 - -#define RBD_HEADER_TEXT "<<< Rados Block Device Image >>>\n" -#define RBD_HEADER_SIGNATURE "RBD" -#define RBD_HEADER_VERSION "001.005" - -struct rbd_info { - uint64_t max_id; -} __attribute__ ((packed)); - -struct rbd_obj_snap_ondisk { - uint64_t id; - uint64_t image_size; -} __attribute__((packed)); - -struct rbd_obj_header_ondisk { - char text[40]; - char block_name[RBD_MAX_BLOCK_NAME_SIZE]; - char signature[4]; - char version[8]; - struct { - uint8_t order; - uint8_t crypt_type; - uint8_t comp_type; - uint8_t unused; - } __attribute__((packed)) options; - uint64_t image_size; - uint64_t snap_seq; - uint32_t snap_count; - uint32_t reserved; - uint64_t snap_names_len; - struct rbd_obj_snap_ondisk snaps[0]; -} __attribute__((packed)); - - -#endif diff --git a/configure b/configure index a318d37..378c238 100755 --- a/configure +++ b/configure @@ -1917,41 +1917,24 @@ fi if test "$rbd" != "no" ; then cat > $TMPC <<EOF #include <stdio.h> -#include <rados/librados.h> -int main(void) { rados_initialize(0, NULL); return 0; } -EOF - rbd_libs="-lrados" - if compile_prog "" "$rbd_libs" ; then - librados_too_old=no - cat > $TMPC <<EOF -#include <stdio.h> -#include <rados/librados.h> -#ifndef CEPH_OSD_TMAP_SET -#error missing CEPH_OSD_TMAP_SET -#endif +#include <rbd/librbd.h> int main(void) { - int (*func)(const rados_pool_t pool, uint64_t *snapid) = rados_selfmanaged_snap_create; - rados_initialize(0, NULL); + rados_t cluster; + rados_create(&cluster, NULL); return 0; } EOF - if compile_prog "" "$rbd_libs" ; then - rbd=yes - libs_tools="$rbd_libs $libs_tools" - libs_softmmu="$rbd_libs $libs_softmmu" - else - rbd=no - librados_too_old=yes - fi + rbd_libs="-lrbd -lrados" + if compile_prog "" "$rbd_libs" ; then + rbd=yes + libs_tools="$rbd_libs $libs_tools" + libs_softmmu="$rbd_libs $libs_softmmu" else if test "$rbd" = "yes" ; then feature_not_found "rados block device" fi rbd=no fi - if test "$librados_too_old" = "yes" ; then - echo "-> Your librados version is too old - upgrade needed to have rbd support" - fi fi ##########################################