diff mbox

[04/12,v2] rbd: implement sync object read with new code

Message ID 51016249.4010405@inktank.com (mailing list archive)
State New, archived
Headers show

Commit Message

Alex Elder Jan. 24, 2013, 4:33 p.m. UTC
Reimplement the synchronous read operation used for reading a
version 1 header using the new request tracking code.  Name the
resulting function rbd_obj_read_sync() to better reflect that
it's a full object operation, not an object request.  To do this,
implement a new obj_req_pages object request type.

This implements a new mechanism to allow the caller to wait for
completion for an rbd_obj_request by calling rbd_obj_request_wait().

This partially resolves:
    http://tracker.newdream.net/issues/3755

Signed-off-by: Alex Elder <elder@inktank.com>
---
 drivers/block/rbd.c |   96
++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 92 insertions(+), 4 deletions(-)

 	const char		*object_name;
@@ -182,7 +182,13 @@ struct rbd_obj_request {
 	u32			which;		/* posn image request list */

 	enum obj_req_type	type;
-	struct bio		*bio_list;
+	union {
+		struct bio	*bio_list;
+		struct {
+			struct page	**pages;
+			u32		page_count;
+		};
+	};

 	struct ceph_osd_request	*osd_req;

@@ -192,6 +198,7 @@ struct rbd_obj_request {
 	atomic_t		done;

 	rbd_obj_callback_t	callback;
+	struct completion	completion;

 	struct kref		kref;
 };
@@ -1077,6 +1084,7 @@ static bool obj_req_type_valid(enum obj_req_type type)
 {
 	switch (type) {
 	case obj_req_bio:
+	case obj_req_pages:
 		return true;
 	default:
 		return false;
@@ -1291,14 +1299,23 @@ static void rbd_img_request_complete(struct
rbd_img_request *img_request)
 		rbd_img_request_put(img_request);
 }

+/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
+
+static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
+{
+	return wait_for_completion_interruptible(&obj_request->completion);
+}
+
 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
 {
 	if (obj_request->callback)
 		obj_request->callback(obj_request);
+	else
+		complete_all(&obj_request->completion);
 }

 /*
- * Request sync osd read
+ * Synchronously read a range from an object into a provided buffer
  */
 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
 			  const char *object_name,
@@ -1556,6 +1573,11 @@ static struct ceph_osd_request *rbd_osd_req_create(
 		/* osd client requires "num pages" even for bio */
 		osd_req->r_num_pages = calc_pages_for(offset, length);
 		break;
+	case obj_req_pages:
+		osd_req->r_pages = obj_request->pages;
+		osd_req->r_num_pages = obj_request->page_count;
+		osd_req->r_page_alignment = offset & ~PAGE_MASK;
+		break;
 	}

 	if (write_request) {
@@ -1618,6 +1640,7 @@ static struct rbd_obj_request
*rbd_obj_request_create(const char *object_name,
 	obj_request->type = type;
 	INIT_LIST_HEAD(&obj_request->links);
 	atomic_set(&obj_request->done, 0);
+	init_completion(&obj_request->completion);
 	kref_init(&obj_request->kref);

 	return obj_request;
@@ -1641,6 +1664,11 @@ static void rbd_obj_request_destroy(struct kref
*kref)
 		if (obj_request->bio_list)
 			bio_chain_put(obj_request->bio_list);
 		break;
+	case obj_req_pages:
+		if (obj_request->pages)
+			ceph_release_page_vector(obj_request->pages,
+						obj_request->page_count);
+		break;
 	}

 	kfree(obj_request);
@@ -1988,6 +2016,65 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
 	put_disk(disk);
 }

+static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
+				const char *object_name,
+				u64 offset, u64 length,
+				char *buf, u64 *version)
+
+{
+	struct ceph_osd_req_op *op;
+	struct rbd_obj_request *obj_request;
+	struct ceph_osd_client *osdc;
+	struct page **pages = NULL;
+	u32 page_count;
+	int ret;
+
+	page_count = (u32) calc_pages_for(offset, length);
+	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+	if (IS_ERR(pages))
+		ret = PTR_ERR(pages);
+
+	ret = -ENOMEM;
+	obj_request = rbd_obj_request_create(object_name, offset, length,
+						obj_req_pages);
+	if (!obj_request)
+		goto out;
+
+	obj_request->pages = pages;
+	obj_request->page_count = page_count;
+
+	op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
+	if (!op)
+		goto out;
+	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+						obj_request, op);
+	rbd_osd_req_op_destroy(op);
+	if (!obj_request->osd_req)
+		goto out;
+
+	osdc = &rbd_dev->rbd_client->client->osdc;
+	ret = rbd_obj_request_submit(osdc, obj_request);
+	if (ret)
+		goto out;
+	ret = rbd_obj_request_wait(obj_request);
+	if (ret)
+		goto out;
+
+	ret = obj_request->result;
+	if (ret < 0)
+		goto out;
+	ret = ceph_copy_from_page_vector(pages, buf, 0, obj_request->xferred);
+	if (version)
+		*version = obj_request->version;
+out:
+	if (obj_request)
+		rbd_obj_request_put(obj_request);
+	else
+		ceph_release_page_vector(pages, page_count);
+
+	return ret;
+}
+
 /*
  * Read the complete header for the given rbd device.
  *
@@ -2026,7 +2113,8 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev,
u64 *version)
 		if (!ondisk)
 			return ERR_PTR(-ENOMEM);

-		ret = rbd_req_sync_read(rbd_dev, rbd_dev->header_name,
+		(void) rbd_req_sync_read;	/* avoid a warning */
+		ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
 				       0, size,
 				       (char *) ondisk, version);

Comments

Josh Durgin Jan. 29, 2013, 10:48 a.m. UTC | #1
A couple small comments, but looks good.

Reviewed-by: Josh Durgin <josh.durgin@inktank.com>

On 01/24/2013 08:33 AM, Alex Elder wrote:
> Reimplement the synchronous read operation used for reading a
> version 1 header using the new request tracking code.  Name the
> resulting function rbd_obj_read_sync() to better reflect that
> it's a full object operation, not an object request.  To do this,
> implement a new obj_req_pages object request type.
>
> This implements a new mechanism to allow the caller to wait for
> completion for an rbd_obj_request by calling rbd_obj_request_wait().
>
> This partially resolves:
>      http://tracker.newdream.net/issues/3755
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
>   drivers/block/rbd.c |   96
> ++++++++++++++++++++++++++++++++++++++++++++++++---
>   1 file changed, 92 insertions(+), 4 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index 3302cea..742236b 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -170,7 +170,7 @@ typedef void (*rbd_img_callback_t)(struct
> rbd_img_request *);
>   struct rbd_obj_request;
>   typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
>
> -enum obj_req_type { obj_req_bio };	/* More types to come */
> +enum obj_req_type { obj_req_bio, obj_req_pages };

Should be capitalized

>   struct rbd_obj_request {
>   	const char		*object_name;
> @@ -182,7 +182,13 @@ struct rbd_obj_request {
>   	u32			which;		/* posn image request list */
>
>   	enum obj_req_type	type;
> -	struct bio		*bio_list;
> +	union {
> +		struct bio	*bio_list;
> +		struct {
> +			struct page	**pages;
> +			u32		page_count;
> +		};
> +	};
>
>   	struct ceph_osd_request	*osd_req;
>
> @@ -192,6 +198,7 @@ struct rbd_obj_request {
>   	atomic_t		done;
>
>   	rbd_obj_callback_t	callback;
> +	struct completion	completion;
>
>   	struct kref		kref;
>   };
> @@ -1077,6 +1084,7 @@ static bool obj_req_type_valid(enum obj_req_type type)
>   {
>   	switch (type) {
>   	case obj_req_bio:
> +	case obj_req_pages:
>   		return true;
>   	default:
>   		return false;
> @@ -1291,14 +1299,23 @@ static void rbd_img_request_complete(struct
> rbd_img_request *img_request)
>   		rbd_img_request_put(img_request);
>   }
>
> +/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
> +
> +static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
> +{
> +	return wait_for_completion_interruptible(&obj_request->completion);
> +}
> +
>   static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
>   {
>   	if (obj_request->callback)
>   		obj_request->callback(obj_request);
> +	else
> +		complete_all(&obj_request->completion);
>   }
>
>   /*
> - * Request sync osd read
> + * Synchronously read a range from an object into a provided buffer
>    */
>   static int rbd_req_sync_read(struct rbd_device *rbd_dev,
>   			  const char *object_name,
> @@ -1556,6 +1573,11 @@ static struct ceph_osd_request *rbd_osd_req_create(
>   		/* osd client requires "num pages" even for bio */
>   		osd_req->r_num_pages = calc_pages_for(offset, length);
>   		break;
> +	case obj_req_pages:
> +		osd_req->r_pages = obj_request->pages;
> +		osd_req->r_num_pages = obj_request->page_count;
> +		osd_req->r_page_alignment = offset & ~PAGE_MASK;
> +		break;
>   	}
>
>   	if (write_request) {
> @@ -1618,6 +1640,7 @@ static struct rbd_obj_request
> *rbd_obj_request_create(const char *object_name,
>   	obj_request->type = type;
>   	INIT_LIST_HEAD(&obj_request->links);
>   	atomic_set(&obj_request->done, 0);
> +	init_completion(&obj_request->completion);
>   	kref_init(&obj_request->kref);
>
>   	return obj_request;
> @@ -1641,6 +1664,11 @@ static void rbd_obj_request_destroy(struct kref
> *kref)
>   		if (obj_request->bio_list)
>   			bio_chain_put(obj_request->bio_list);
>   		break;
> +	case obj_req_pages:
> +		if (obj_request->pages)
> +			ceph_release_page_vector(obj_request->pages,
> +						obj_request->page_count);
> +		break;
>   	}
>
>   	kfree(obj_request);
> @@ -1988,6 +2016,65 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
>   	put_disk(disk);
>   }
>
> +static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
> +				const char *object_name,
> +				u64 offset, u64 length,
> +				char *buf, u64 *version)
> +
> +{
> +	struct ceph_osd_req_op *op;
> +	struct rbd_obj_request *obj_request;
> +	struct ceph_osd_client *osdc;
> +	struct page **pages = NULL;
> +	u32 page_count;
> +	int ret;
> +
> +	page_count = (u32) calc_pages_for(offset, length);
> +	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
> +	if (IS_ERR(pages))
> +		ret = PTR_ERR(pages);
> +
> +	ret = -ENOMEM;
> +	obj_request = rbd_obj_request_create(object_name, offset, length,
> +						obj_req_pages);
> +	if (!obj_request)
> +		goto out;
> +
> +	obj_request->pages = pages;
> +	obj_request->page_count = page_count;
> +
> +	op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
> +	if (!op)
> +		goto out;
> +	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
> +						obj_request, op);
> +	rbd_osd_req_op_destroy(op);
> +	if (!obj_request->osd_req)
> +		goto out;
> +
> +	osdc = &rbd_dev->rbd_client->client->osdc;
> +	ret = rbd_obj_request_submit(osdc, obj_request);
> +	if (ret)
> +		goto out;
> +	ret = rbd_obj_request_wait(obj_request);
> +	if (ret)
> +		goto out;
> +
> +	ret = obj_request->result;
> +	if (ret < 0)
> +		goto out;
> +	ret = ceph_copy_from_page_vector(pages, buf, 0, obj_request->xferred);
> +	if (version)
> +		*version = obj_request->version;

No need to worry about the version. Nothing should use it.

> +out:
> +	if (obj_request)
> +		rbd_obj_request_put(obj_request);
> +	else
> +		ceph_release_page_vector(pages, page_count);
> +
> +	return ret;
> +}
> +
>   /*
>    * Read the complete header for the given rbd device.
>    *
> @@ -2026,7 +2113,8 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev,
> u64 *version)
>   		if (!ondisk)
>   			return ERR_PTR(-ENOMEM);
>
> -		ret = rbd_req_sync_read(rbd_dev, rbd_dev->header_name,
> +		(void) rbd_req_sync_read;	/* avoid a warning */
> +		ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
>   				       0, size,
>   				       (char *) ondisk, version);
>

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 3302cea..742236b 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -170,7 +170,7 @@  typedef void (*rbd_img_callback_t)(struct
rbd_img_request *);
 struct rbd_obj_request;
 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);

-enum obj_req_type { obj_req_bio };	/* More types to come */
+enum obj_req_type { obj_req_bio, obj_req_pages };

 struct rbd_obj_request {