Message ID | 5171CA43.5070200@inktank.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Reviewed-by: Josh Durgin <josh.durgin@inktank.com> Alex Elder <elder@inktank.com> wrote: >As a step toward implementing layered writes, implement reading the >data for a target object from the parent image for a write request >whose target object is known to not exist. Add a copyup_pages field >to an image request to track the page array used (only) for such a >request. > >Signed-off-by: Alex Elder <elder@inktank.com> >--- > drivers/block/rbd.c | 152 >++++++++++++++++++++++++++++++++++++++++++++++++--- > 1 file changed, 143 insertions(+), 9 deletions(-) > >diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c >index 91fcf36..c5d0619 100644 >--- a/drivers/block/rbd.c >+++ b/drivers/block/rbd.c >@@ -250,6 +250,7 @@ struct rbd_img_request { > struct request *rq; /* block request */ > struct rbd_obj_request *obj_request; /* obj req initiator */ > }; >+ struct page **copyup_pages; > spinlock_t completion_lock;/* protects next_completion */ > u32 next_completion; > rbd_img_callback_t callback; >@@ -350,6 +351,8 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock); > static LIST_HEAD(rbd_client_list); /* clients */ > static DEFINE_SPINLOCK(rbd_client_list_lock); > >+static int rbd_img_request_submit(struct rbd_img_request >*img_request); >+ > static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); > static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); > >@@ -1956,6 +1959,133 @@ out_unwind: > return -ENOMEM; > } > >+static void >+rbd_img_obj_parent_read_full_callback(struct rbd_img_request >*img_request) >+{ >+ struct rbd_obj_request *orig_request; >+ struct page **pages; >+ u32 page_count; >+ int result; >+ u64 obj_size; >+ u64 xferred; >+ >+ rbd_assert(img_request_child_test(img_request)); >+ >+ /* First get what we need from the image request */ >+ >+ pages = img_request->copyup_pages; >+ rbd_assert(pages != NULL); >+ img_request->copyup_pages = NULL; >+ >+ orig_request = img_request->obj_request; >+ rbd_assert(orig_request != NULL); >+ >+ result = img_request->result; >+ obj_size = img_request->length; >+ xferred = img_request->xferred; >+ >+ rbd_img_request_put(img_request); >+ >+ obj_request_existence_set(orig_request, true); >+ >+ page_count = (u32)calc_pages_for(0, obj_size); >+ ceph_release_page_vector(pages, page_count); >+ >+ /* Resubmit the original request (for now). */ >+ >+ orig_request->result = rbd_img_obj_request_submit(orig_request); >+ if (orig_request->result) { >+ obj_request_done_set(orig_request); >+ rbd_obj_request_complete(orig_request); >+ } >+} >+ >+/* >+ * Read from the parent image the range of data that covers the >+ * entire target of the given object request. This is used for >+ * satisfying a layered image write request when the target of an >+ * object request from the image request does not exist. >+ * >+ * A page array big enough to hold the returned data is allocated >+ * and supplied to rbd_img_request_fill() as the "data descriptor." >+ * When the read completes, this page array will be transferred to >+ * the original object request for the copyup operation. >+ * >+ * If an error occurs, record it as the result of the original >+ * object request and mark it done so it gets completed. >+ */ >+static int rbd_img_obj_parent_read_full(struct rbd_obj_request >*obj_request) >+{ >+ struct rbd_img_request *img_request = NULL; >+ struct rbd_img_request *parent_request = NULL; >+ struct rbd_device *rbd_dev; >+ u64 img_offset; >+ u64 length; >+ struct page **pages = NULL; >+ u32 page_count; >+ int result; >+ >+ rbd_assert(obj_request_img_data_test(obj_request)); >+ rbd_assert(obj_request->type == OBJ_REQUEST_BIO); >+ >+ img_request = obj_request->img_request; >+ rbd_assert(img_request != NULL); >+ rbd_dev = img_request->rbd_dev; >+ rbd_assert(rbd_dev->parent != NULL); >+ >+ /* >+ * Determine the byte range covered by the object in the >+ * child image to which the original request was to be sent. >+ */ >+ img_offset = obj_request->img_offset - obj_request->offset; >+ length = (u64)1 << rbd_dev->header.obj_order; >+ >+ /* >+ * Allocate a page array big enough to receive the data read >+ * from the parent. >+ */ >+ page_count = (u32)calc_pages_for(0, length); >+ pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); >+ if (IS_ERR(pages)) { >+ result = PTR_ERR(pages); >+ pages = NULL; >+ goto out_err; >+ } >+ >+ result = -ENOMEM; >+ parent_request = rbd_img_request_create(rbd_dev->parent, >+ img_offset, length, >+ false, true); >+ if (!parent_request) >+ goto out_err; >+ rbd_obj_request_get(obj_request); >+ parent_request->obj_request = obj_request; >+ >+ result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, >pages); >+ if (result) >+ goto out_err; >+ parent_request->copyup_pages = pages; >+ >+ parent_request->callback = rbd_img_obj_parent_read_full_callback; >+ result = rbd_img_request_submit(parent_request); >+ if (!result) >+ return 0; >+ >+ parent_request->copyup_pages = NULL; >+ parent_request->obj_request = NULL; >+ rbd_obj_request_put(obj_request); >+out_err: >+ if (pages) >+ ceph_release_page_vector(pages, page_count); >+ if (parent_request) >+ rbd_img_request_put(parent_request); >+ obj_request->result = result; >+ obj_request->xferred = 0; >+ obj_request_done_set(obj_request); >+ >+ return result; >+} >+ > static void rbd_img_obj_exists_callback(struct rbd_obj_request >*obj_request) > { > struct rbd_obj_request *orig_request; >@@ -1996,7 +2126,7 @@ static void rbd_img_obj_exists_callback(struct >rbd_obj_request *obj_request) > obj_request_existence_set(orig_request, false); > } else if (result) { > orig_request->result = result; >- goto out_err; >+ goto out; > } > > /* >@@ -2004,7 +2134,7 @@ static void rbd_img_obj_exists_callback(struct >rbd_obj_request *obj_request) > * whether the target object exists. > */ > orig_request->result = rbd_img_obj_request_submit(orig_request); >-out_err: >+out: > if (orig_request->result) > rbd_obj_request_complete(orig_request); > rbd_obj_request_put(orig_request); >@@ -2070,15 +2200,13 @@ out: >static int rbd_img_obj_request_submit(struct rbd_obj_request >*obj_request) > { > struct rbd_img_request *img_request; >+ bool known; > > rbd_assert(obj_request_img_data_test(obj_request)); > > img_request = obj_request->img_request; > rbd_assert(img_request); > >- /* (At the moment we don't care whether it exists or not...) */ >- (void) obj_request_exists_test; >- > /* > * Only layered writes need special handling. If it's not a > * layered write, or it is a layered write but we know the >@@ -2087,7 +2215,8 @@ static int rbd_img_obj_request_submit(struct >rbd_obj_request *obj_request) > */ > if (!img_request_write_test(img_request) || > !img_request_layered_test(img_request) || >- obj_request_known_test(obj_request)) { >+ ((known = obj_request_known_test(obj_request)) && >+ obj_request_exists_test(obj_request))) { > > struct rbd_device *rbd_dev; > struct ceph_osd_client *osdc; >@@ -2099,10 +2228,15 @@ static int rbd_img_obj_request_submit(struct >rbd_obj_request *obj_request) > } > > /* >- * It's a layered write and we don't know whether the target >- * exists. Issue existence check; once that completes the >- * original request will be submitted again. >+ * It's a layered write. The target object might exist but >+ * we may not know that yet. If we know it doesn't exist, >+ * start by reading the data for the full target object from >+ * the parent so we can use it for a copyup to the target. > */ >+ if (known) >+ return rbd_img_obj_parent_read_full(obj_request); >+ >+ /* We don't know whether the target exists. Go find out. */ > > return rbd_img_obj_exists_submit(obj_request); > } -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 91fcf36..c5d0619 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -250,6 +250,7 @@ struct rbd_img_request { struct request *rq; /* block request */ struct rbd_obj_request *obj_request; /* obj req initiator */ }; + struct page **copyup_pages; spinlock_t completion_lock;/* protects next_completion */ u32 next_completion; rbd_img_callback_t callback; @@ -350,6 +351,8 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock); static LIST_HEAD(rbd_client_list); /* clients */ static DEFINE_SPINLOCK(rbd_client_list_lock); +static int rbd_img_request_submit(struct rbd_img_request *img_request); + static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); @@ -1956,6 +1959,133 @@ out_unwind: return -ENOMEM; } +static void +rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) +{ + struct rbd_obj_request *orig_request; + struct page **pages; + u32 page_count; + int result; + u64 obj_size; + u64 xferred; + + rbd_assert(img_request_child_test(img_request)); + + /* First get what we need from the image request */ + + pages = img_request->copyup_pages; + rbd_assert(pages != NULL); + img_request->copyup_pages = NULL; + + orig_request = img_request->obj_request; + rbd_assert(orig_request != NULL); + + result = img_request->result; + obj_size = img_request->length; + xferred = img_request->xferred; + + rbd_img_request_put(img_request); + + obj_request_existence_set(orig_request, true); + + page_count = (u32)calc_pages_for(0, obj_size); + ceph_release_page_vector(pages, page_count); + + /* Resubmit the original request (for now). */ + + orig_request->result = rbd_img_obj_request_submit(orig_request); + if (orig_request->result) { + obj_request_done_set(orig_request); + rbd_obj_request_complete(orig_request); + } +} + +/* + * Read from the parent image the range of data that covers the + * entire target of the given object request. This is used for + * satisfying a layered image write request when the target of an + * object request from the image request does not exist. + * + * A page array big enough to hold the returned data is allocated + * and supplied to rbd_img_request_fill() as the "data descriptor." + * When the read completes, this page array will be transferred to + * the original object request for the copyup operation. + * + * If an error occurs, record it as the result of the original + * object request and mark it done so it gets completed. + */ +static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request = NULL; + struct rbd_img_request *parent_request = NULL; + struct rbd_device *rbd_dev; + u64 img_offset; + u64 length; + struct page **pages = NULL; + u32 page_count; + int result; + + rbd_assert(obj_request_img_data_test(obj_request)); + rbd_assert(obj_request->type == OBJ_REQUEST_BIO); + + img_request = obj_request->img_request; + rbd_assert(img_request != NULL); + rbd_dev = img_request->rbd_dev; + rbd_assert(rbd_dev->parent != NULL); + + /* + * Determine the byte range covered by the object in the + * child image to which the original request was to be sent. + */ + img_offset = obj_request->img_offset - obj_request->offset; + length = (u64)1 << rbd_dev->header.obj_order; + + /* + * Allocate a page array big enough to receive the data read + * from the parent. + */ + page_count = (u32)calc_pages_for(0, length); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) { + result = PTR_ERR(pages); + pages = NULL; + goto out_err; + } + + result = -ENOMEM; + parent_request = rbd_img_request_create(rbd_dev->parent, + img_offset, length, + false, true); + if (!parent_request) + goto out_err; + rbd_obj_request_get(obj_request); + parent_request->obj_request = obj_request; + + result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); + if (result) + goto out_err; + parent_request->copyup_pages = pages; + + parent_request->callback = rbd_img_obj_parent_read_full_callback; + result = rbd_img_request_submit(parent_request); + if (!result) + return 0; + + parent_request->copyup_pages = NULL; + parent_request->obj_request = NULL; + rbd_obj_request_put(obj_request); +out_err: + if (pages) + ceph_release_page_vector(pages, page_count); + if (parent_request) + rbd_img_request_put(parent_request); + obj_request->result = result; + obj_request->xferred = 0; + obj_request_done_set(obj_request); + + return result; +} + static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
As a step toward implementing layered writes, implement reading the data for a target object from the parent image for a write request whose target object is known to not exist. Add a copyup_pages field to an image request to track the page array used (only) for such a request. Signed-off-by: Alex Elder <elder@inktank.com> --- drivers/block/rbd.c | 152 ++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 143 insertions(+), 9 deletions(-) { struct rbd_obj_request *orig_request; @@ -1996,7 +2126,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) obj_request_existence_set(orig_request, false); } else if (result) { orig_request->result = result; - goto out_err; + goto out; } /* @@ -2004,7 +2134,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) * whether the target object exists. */ orig_request->result = rbd_img_obj_request_submit(orig_request); -out_err: +out: if (orig_request->result) rbd_obj_request_complete(orig_request); rbd_obj_request_put(orig_request); @@ -2070,15 +2200,13 @@ out: static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) { struct rbd_img_request *img_request; + bool known; rbd_assert(obj_request_img_data_test(obj_request)); img_request = obj_request->img_request; rbd_assert(img_request); - /* (At the moment we don't care whether it exists or not...) */ - (void) obj_request_exists_test; - /* * Only layered writes need special handling. If it's not a * layered write, or it is a layered write but we know the @@ -2087,7 +2215,8 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) */ if (!img_request_write_test(img_request) || !img_request_layered_test(img_request) || - obj_request_known_test(obj_request)) { + ((known = obj_request_known_test(obj_request)) && + obj_request_exists_test(obj_request))) { struct rbd_device *rbd_dev; struct ceph_osd_client *osdc; @@ -2099,10 +2228,15 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) } /* - * It's a layered write and we don't know whether the target - * exists. Issue existence check; once that completes the - * original request will be submitted again. + * It's a layered write. The target object might exist but + * we may not know that yet. If we know it doesn't exist, + * start by reading the data for the full target object from + * the parent so we can use it for a copyup to the target. */ + if (known) + return rbd_img_obj_parent_read_full(obj_request); + + /* We don't know whether the target exists. Go find out. */ return rbd_img_obj_exists_submit(obj_request); }