@@ -183,9 +183,31 @@ struct rbd_obj_request {
u64 length; /* bytes from offset */
unsigned long flags;
- struct rbd_img_request *img_request;
- u64 img_offset; /* image relative offset */
- struct list_head links; /* img_request->obj_requests */
+ /*
+ * An object request associated with an image will have its
+ * img_data flag set; a standlone object request will not.
+ *
+ * A standalone object request will have which == BAD_WHICH
+ * and a null obj_request pointer.
+ *
+ * An object request initiated in support of a layered image
+ * object (to check for its existence before a write) will
+ * have which == BAD_WHICH and a non-null obj_request pointer.
+ *
+ * Finally, an object request for rbd image data will have
+ * which != BAD_WHICH, and will have a non-null img_request
+ * pointer. The value of which will be in the range
+ * 0..(img_request->obj_request_count-1).
+ */
+ union {
+ struct rbd_obj_request *obj_request; /* STAT op */
+ struct {
+ struct rbd_img_request *img_request;
+ u64 img_offset;
+ /* links for img_request->obj_requests list */
+ struct list_head links;
+ };
+ };
u32 which; /* posn image request list */
enum obj_request_type type;
@@ -1656,10 +1678,6 @@ static struct rbd_img_request
*rbd_img_request_create(
INIT_LIST_HEAD(&img_request->obj_requests);
kref_init(&img_request->kref);
- (void) obj_request_existence_set;
- (void) obj_request_known_test;
- (void) obj_request_exists_test;
-
rbd_img_request_get(img_request); /* Avoid a warning */
rbd_img_request_put(img_request); /* TEMPORARY */
This is a step toward fully implementing layered writes. Add checks before request submission for the object(s) associated with an image request. For write requests, if we don't know that the target object exists, issue a STAT request to find out. When that request completes, mark the known and exists flags for the original object request accordingly and re-submit the object request. (Note that this still does the existence check only; the copyup operation is not yet done.) A new object request is created to perform the existence check. A pointer to the original request is added to that object request to allow the stat request to re-issue the original request after updating its flags. If there is a failure with the stat request the error code is stored with the original request, which is then completed. This resolves: http://tracker.ceph.com/issues/3418 Signed-off-by: Alex Elder <elder@inktank.com> --- drivers/block/rbd.c | 155 ++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 147 insertions(+), 8 deletions(-) @@ -1847,18 +1865,139 @@ out_unwind: return -ENOMEM; } +static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) +{ + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + struct rbd_obj_request *orig_request; + + rbd_assert(!obj_request_img_data_test(obj_request)); + orig_request = obj_request->obj_request; + obj_request->obj_request = NULL;; + + dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__, + obj_request, orig_request, obj_request->result, + obj_request->xferred, obj_request->length); + + rbd_assert(orig_request); + rbd_assert(orig_request->img_request); + rbd_dev = orig_request->img_request->rbd_dev; + osdc = &rbd_dev->rbd_client->client->osdc; + + /* + * Our only purpose here is to determine whether the object + * exists, and we don't want to treat the non-existence as + * an error. If something else comes back, transfer the + * error to the original request and complete it now. + */ + if (!obj_request->result) { + obj_request_existence_set(orig_request, true); + } else if (obj_request->result == -ENOENT) { + obj_request_existence_set(orig_request, false); + obj_request->result = 0; + } else if (obj_request->result) { + orig_request->result = obj_request->result; + goto out_err; + } + + /* Done with the stat request */ + + rbd_obj_request_put(obj_request); + + /* + * Resubmit the original request now that we have recorded + * whether the target object exists. + */ + orig_request->result = rbd_obj_request_submit(osdc, orig_request); +out_err: + if (orig_request->result) + rbd_obj_request_complete(orig_request); + rbd_obj_request_put(orig_request); +} + +static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) +{ + struct rbd_obj_request *stat_request; + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + struct page **pages = NULL; + u32 page_count; + size_t size; + int ret; + + /* + * The response data for a STAT call consists of: + * le64 length; + * struct { + * le32 tv_sec; + * le32 tv_nsec; + * } mtime; + */ + size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); + page_count = (u32)calc_pages_for(0, size); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + ret = -ENOMEM; + stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, + OBJ_REQUEST_PAGES); + if (!stat_request) + goto out; + + rbd_obj_request_get(obj_request); + stat_request->obj_request = obj_request; + stat_request->pages = pages; + stat_request->page_count = page_count; + + rbd_assert(obj_request->img_request); + rbd_dev = obj_request->img_request->rbd_dev; + stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, + stat_request); + if (!stat_request->osd_req) + goto out; + stat_request->callback = rbd_img_obj_exists_callback; + + osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT); + osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, + false, false); + rbd_osd_req_format(stat_request, false); + + osdc = &rbd_dev->rbd_client->client->osdc; + ret = rbd_obj_request_submit(osdc, stat_request); +out: + if (ret) + rbd_obj_request_put(obj_request); + + return ret; +} + static int rbd_img_request_submit(struct rbd_img_request *img_request) { struct rbd_device *rbd_dev = img_request->rbd_dev; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; struct rbd_obj_request *next_obj_request; + bool write_request = img_request_write_test(img_request); + bool layered = img_request_layered_test(img_request); dout("%s: img %p\n", __func__, img_request); for_each_obj_request_safe(img_request, obj_request, next_obj_request) { + bool known; + bool object_exists; int ret; - ret = rbd_obj_request_submit(osdc, obj_request); + /* + * We need to know whether the target object exists + * for a layered write. Issue an existence check + * first if we need to. + */ + known = obj_request_known_test(obj_request); + object_exists = known && obj_request_exists_test(obj_request); + if (!write_request || !layered || object_exists) + ret = rbd_obj_request_submit(osdc, obj_request); + else + ret = rbd_img_obj_exists_submit(obj_request); if (ret) return ret; }