diff mbox

rbd: rework rbd_request_fn()

Message ID 1407224324-4811-1-git-send-email-ilya.dryomov@inktank.com (mailing list archive)
State New, archived
Headers show

Commit Message

Ilya Dryomov Aug. 5, 2014, 7:38 a.m. UTC
While it was never a good idea to sleep in request_fn(), commit
34c6bc2c919a ("locking/mutexes: Add extra reschedule point") made it
a *bad* idea.  mutex_lock() since 3.15 may reschedule *before* putting
task on the mutex wait queue, which for tasks in !TASK_RUNNING state
means block forever.  request_fn() may be called with !TASK_RUNNING on
the way to schedule() in io_schedule().

Offload request handling to a workqueue, one per rbd device, to avoid
calling blocking primitives from rbd_request_fn().

Cc: stable@vger.kernel.org # 3.15+
Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com>
---
 drivers/block/rbd.c |  195 +++++++++++++++++++++++++++++++--------------------
 1 file changed, 118 insertions(+), 77 deletions(-)

Comments

Christoph Hellwig Aug. 5, 2014, 7:44 a.m. UTC | #1
On Tue, Aug 05, 2014 at 11:38:44AM +0400, Ilya Dryomov wrote:
> While it was never a good idea to sleep in request_fn(), commit
> 34c6bc2c919a ("locking/mutexes: Add extra reschedule point") made it
> a *bad* idea.  mutex_lock() since 3.15 may reschedule *before* putting
> task on the mutex wait queue, which for tasks in !TASK_RUNNING state
> means block forever.  request_fn() may be called with !TASK_RUNNING on
> the way to schedule() in io_schedule().
> 
> Offload request handling to a workqueue, one per rbd device, to avoid
> calling blocking primitives from rbd_request_fn().

Btw, for the future you might want to consider converting rbd to use the
blk-mq infrastructure, which calls the I/O submission function from user
context and will allow you to sleep.

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Alex Elder Aug. 6, 2014, 6:32 p.m. UTC | #2
On 08/05/2014 02:38 AM, Ilya Dryomov wrote:
> While it was never a good idea to sleep in request_fn(), commit
> 34c6bc2c919a ("locking/mutexes: Add extra reschedule point") made it
> a *bad* idea.  mutex_lock() since 3.15 may reschedule *before* putting
> task on the mutex wait queue, which for tasks in !TASK_RUNNING state
> means block forever.  request_fn() may be called with !TASK_RUNNING on
> the way to schedule() in io_schedule().
> 
> Offload request handling to a workqueue, one per rbd device, to avoid
> calling blocking primitives from rbd_request_fn().

Off topic...  If you supply "--patience" to your git diff command
you'll get an easier-to-read result in some cases (like this one).
If you like that you can just do:
    git config --global --add diff.algorithm patience

I have several comments below, none of which are bugs, just
suggestions.

I was thinking you could use a single work queue for all rbd
devices, but I'm not sure that's very important.  You'd have
to stash the rbd_dev pointer in every request somewhere.

Reviewed-by: Alex Elder <elder@linaro.org>


> Cc: stable@vger.kernel.org # 3.15+
> Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com>
> ---
>  drivers/block/rbd.c |  195 +++++++++++++++++++++++++++++++--------------------
>  1 file changed, 118 insertions(+), 77 deletions(-)
> 
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index e3412317d3f9..e07d9d71b187 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -42,6 +42,7 @@
>  #include <linux/blkdev.h>
>  #include <linux/slab.h>
>  #include <linux/idr.h>
> +#include <linux/workqueue.h>
>  
>  #include "rbd_types.h"
>  
> @@ -332,7 +333,10 @@ struct rbd_device {
>  
>  	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
>  
> +	struct list_head	rq_queue;	/* incoming rq queue */
>  	spinlock_t		lock;		/* queue, flags, open_count */
> +	struct workqueue_struct	*rq_wq;
> +	struct work_struct	rq_work;
>  
>  	struct rbd_image_header	header;
>  	unsigned long		flags;		/* possibly lock protected */
> @@ -3176,102 +3180,129 @@ out:
>  	return ret;
>  }
>  
> -static void rbd_request_fn(struct request_queue *q)
> -		__releases(q->queue_lock) __acquires(q->queue_lock)
> +static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
>  {

. . .

> +	if (offset + length > rbd_dev->mapping.size) {
> +		rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
> +			 length, rbd_dev->mapping.size);
> +		result = -EIO;
> +		goto err_rq;
> +	}
>  
> -		spin_unlock_irq(q->queue_lock);
> +	img_request = rbd_img_request_create(rbd_dev, offset, length, wr);

This isn't new with your patch, but perhaps we should consider
having rbd_img_request_create() use GFP_KERNEL or at least GFP_NOIO
rather than GFP_ATOMIC.  In general, I think a pass ought to be
made through all the allocation calls to make sure they have the
right flags.  Some may be too restrictive, some maybe not enough.

Anyway, a task for another day...

> +	if (!img_request) {
> +		result = -ENOMEM;
> +		goto err_rq;
> +	}
> +	img_request->rq = rq;

 . . .

> +/*
> + * Called with q->queue_lock held and interrupts disabled, possibly on
> + * the way to schedule().  Do not sleep here!
> + */
> +static void rbd_request_fn(struct request_queue *q)
> +{
> +	struct rbd_device *rbd_dev = q->queuedata;
> +	struct request *rq;
> +	int queued = 0;
> +
> +	rbd_assert(rbd_dev);
> +
> +	while ((rq = blk_fetch_request(q))) {
> +		/* Ignore any non-FS requests that filter through. */
> +		if (rq->cmd_type != REQ_TYPE_FS) {
> +			dout("%s: non-fs request type %d\n", __func__,
> +				(int) rq->cmd_type);
> +			__blk_end_request_all(rq, 0);
> +			continue;
>  		}
> +
> +		list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
> +		queued++;
		if (!queued)
			queued++;
Or alternately, make queued be a Boolean.  My point is
to guarantee there is no wrap (which is nearly but not quite
impossible).

>  	}
> +
> +	if (queued)
> +		queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
>  }
>  
>  /*

. . .

> @@ -5067,6 +5105,8 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
>  
>  	return ret;
>  
> +err_out_workqueue:
> +	destroy_workqueue(rbd_dev->rq_wq);

	rbd_dev->rq_wq = NULL;

>  err_out_mapping:
>  	rbd_dev_mapping_clear(rbd_dev);
>  err_out_disk:
> @@ -5313,6 +5353,7 @@ static void rbd_dev_device_release(struct device *dev)
>  {
>  	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
>  
> +	destroy_workqueue(rbd_dev->rq_wq);
>  	rbd_free_disk(rbd_dev);
>  	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
>  	rbd_dev_mapping_clear(rbd_dev);
> 

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Ilya Dryomov Aug. 6, 2014, 7:06 p.m. UTC | #3
On Wed, Aug 6, 2014 at 10:32 PM, Alex Elder <elder@ieee.org> wrote:
> On 08/05/2014 02:38 AM, Ilya Dryomov wrote:
>> While it was never a good idea to sleep in request_fn(), commit
>> 34c6bc2c919a ("locking/mutexes: Add extra reschedule point") made it
>> a *bad* idea.  mutex_lock() since 3.15 may reschedule *before* putting
>> task on the mutex wait queue, which for tasks in !TASK_RUNNING state
>> means block forever.  request_fn() may be called with !TASK_RUNNING on
>> the way to schedule() in io_schedule().
>>
>> Offload request handling to a workqueue, one per rbd device, to avoid
>> calling blocking primitives from rbd_request_fn().
>
> Off topic...  If you supply "--patience" to your git diff command
> you'll get an easier-to-read result in some cases (like this one).
> If you like that you can just do:
>     git config --global --add diff.algorithm patience

I'm generally using (and used to review this patch in particular)
--histogram.  In my experience it gives slightly better results than
--patience.  Didn't think of putting it into .gitconfig though.

>
> I have several comments below, none of which are bugs, just
> suggestions.
>
> I was thinking you could use a single work queue for all rbd
> devices, but I'm not sure that's very important.  You'd have
> to stash the rbd_dev pointer in every request somewhere.
>
> Reviewed-by: Alex Elder <elder@linaro.org>
>
>
>> Cc: stable@vger.kernel.org # 3.15+
>> Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com>
>> ---
>>  drivers/block/rbd.c |  195 +++++++++++++++++++++++++++++++--------------------
>>  1 file changed, 118 insertions(+), 77 deletions(-)
>>
>> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
>> index e3412317d3f9..e07d9d71b187 100644
>> --- a/drivers/block/rbd.c
>> +++ b/drivers/block/rbd.c
>> @@ -42,6 +42,7 @@
>>  #include <linux/blkdev.h>
>>  #include <linux/slab.h>
>>  #include <linux/idr.h>
>> +#include <linux/workqueue.h>
>>
>>  #include "rbd_types.h"
>>
>> @@ -332,7 +333,10 @@ struct rbd_device {
>>
>>       char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
>>
>> +     struct list_head        rq_queue;       /* incoming rq queue */
>>       spinlock_t              lock;           /* queue, flags, open_count */
>> +     struct workqueue_struct *rq_wq;
>> +     struct work_struct      rq_work;
>>
>>       struct rbd_image_header header;
>>       unsigned long           flags;          /* possibly lock protected */
>> @@ -3176,102 +3180,129 @@ out:
>>       return ret;
>>  }
>>
>> -static void rbd_request_fn(struct request_queue *q)
>> -             __releases(q->queue_lock) __acquires(q->queue_lock)
>> +static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
>>  {
>
> . . .
>
>> +     if (offset + length > rbd_dev->mapping.size) {
>> +             rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
>> +                      length, rbd_dev->mapping.size);
>> +             result = -EIO;
>> +             goto err_rq;
>> +     }
>>
>> -             spin_unlock_irq(q->queue_lock);
>> +     img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
>
> This isn't new with your patch, but perhaps we should consider
> having rbd_img_request_create() use GFP_KERNEL or at least GFP_NOIO
> rather than GFP_ATOMIC.  In general, I think a pass ought to be
> made through all the allocation calls to make sure they have the
> right flags.  Some may be too restrictive, some maybe not enough.

Definitely, I already have a patch that switches img_request to NOIO -
allocating it with ATOMIC in workfn doesn't make sense.

As for the general pass, I think there is a ticket for that, so
hopefully I'll get to it sometime..

Thanks,

                Ilya
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Ilya Dryomov Aug. 6, 2014, 7:09 p.m. UTC | #4
On Wed, Aug 6, 2014 at 11:06 PM, Ilya Dryomov <ilya.dryomov@inktank.com> wrote:
> On Wed, Aug 6, 2014 at 10:32 PM, Alex Elder <elder@ieee.org> wrote:
>> On 08/05/2014 02:38 AM, Ilya Dryomov wrote:
>>> While it was never a good idea to sleep in request_fn(), commit
>>> 34c6bc2c919a ("locking/mutexes: Add extra reschedule point") made it
>>> a *bad* idea.  mutex_lock() since 3.15 may reschedule *before* putting
>>> task on the mutex wait queue, which for tasks in !TASK_RUNNING state
>>> means block forever.  request_fn() may be called with !TASK_RUNNING on
>>> the way to schedule() in io_schedule().
>>>
>>> Offload request handling to a workqueue, one per rbd device, to avoid
>>> calling blocking primitives from rbd_request_fn().
>>
>> Off topic...  If you supply "--patience" to your git diff command
>> you'll get an easier-to-read result in some cases (like this one).
>> If you like that you can just do:
>>     git config --global --add diff.algorithm patience
>
> I'm generally using (and used to review this patch in particular)
> --histogram.  In my experience it gives slightly better results than
> --patience.  Didn't think of putting it into .gitconfig though.

Looks like --histogram beats --patience in this case too.
alloc_workqueue() hunk is much better, otherwise identical.

Thanks,

                Ilya
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Alex Elder Aug. 6, 2014, 7:10 p.m. UTC | #5
On 08/06/2014 02:09 PM, Ilya Dryomov wrote:
>>> Off topic...  If you supply "--patience" to your git diff command
>>> >> you'll get an easier-to-read result in some cases (like this one).
>>> >> If you like that you can just do:
>>> >>     git config --global --add diff.algorithm patience
>> >
>> > I'm generally using (and used to review this patch in particular)
>> > --histogram.  In my experience it gives slightly better results than
>> > --patience.  Didn't think of putting it into .gitconfig though.
> Looks like --histogram beats --patience in this case too.
> alloc_workqueue() hunk is much better, otherwise identical.

I didn't know about histogram...  I guess I should update
my git config.

					-Alex
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Ilya Dryomov Aug. 7, 2014, 7:21 a.m. UTC | #6
On Wed, Aug 6, 2014 at 10:32 PM, Alex Elder <elder@ieee.org> wrote:
> On 08/05/2014 02:38 AM, Ilya Dryomov wrote:
>> While it was never a good idea to sleep in request_fn(), commit
>> 34c6bc2c919a ("locking/mutexes: Add extra reschedule point") made it
>> a *bad* idea.  mutex_lock() since 3.15 may reschedule *before* putting
>> task on the mutex wait queue, which for tasks in !TASK_RUNNING state
>> means block forever.  request_fn() may be called with !TASK_RUNNING on
>> the way to schedule() in io_schedule().
>>
>> Offload request handling to a workqueue, one per rbd device, to avoid
>> calling blocking primitives from rbd_request_fn().
>
> Off topic...  If you supply "--patience" to your git diff command
> you'll get an easier-to-read result in some cases (like this one).
> If you like that you can just do:
>     git config --global --add diff.algorithm patience
>
> I have several comments below, none of which are bugs, just
> suggestions.
>
> I was thinking you could use a single work queue for all rbd
> devices, but I'm not sure that's very important.  You'd have
> to stash the rbd_dev pointer in every request somewhere.

That's possible, but workqueues are not threads and are supposed to be
relatively cheap.  And for the future, it's probably worth switching to
blk-mq infrastructure, as Christoph suggested.

Thanks,

                Ilya
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index e3412317d3f9..e07d9d71b187 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -42,6 +42,7 @@ 
 #include <linux/blkdev.h>
 #include <linux/slab.h>
 #include <linux/idr.h>
+#include <linux/workqueue.h>
 
 #include "rbd_types.h"
 
@@ -332,7 +333,10 @@  struct rbd_device {
 
 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 
+	struct list_head	rq_queue;	/* incoming rq queue */
 	spinlock_t		lock;		/* queue, flags, open_count */
+	struct workqueue_struct	*rq_wq;
+	struct work_struct	rq_work;
 
 	struct rbd_image_header	header;
 	unsigned long		flags;		/* possibly lock protected */
@@ -3176,102 +3180,129 @@  out:
 	return ret;
 }
 
-static void rbd_request_fn(struct request_queue *q)
-		__releases(q->queue_lock) __acquires(q->queue_lock)
+static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
 {
-	struct rbd_device *rbd_dev = q->queuedata;
-	struct request *rq;
+	struct rbd_img_request *img_request;
+	u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
+	u64 length = blk_rq_bytes(rq);
+	bool wr = rq_data_dir(rq) == WRITE;
 	int result;
 
-	while ((rq = blk_fetch_request(q))) {
-		bool write_request = rq_data_dir(rq) == WRITE;
-		struct rbd_img_request *img_request;
-		u64 offset;
-		u64 length;
+	/* Ignore/skip any zero-length requests */
 
-		/* Ignore any non-FS requests that filter through. */
+	if (!length) {
+		dout("%s: zero-length request\n", __func__);
+		result = 0;
+		goto err_rq;
+	}
 
-		if (rq->cmd_type != REQ_TYPE_FS) {
-			dout("%s: non-fs request type %d\n", __func__,
-				(int) rq->cmd_type);
-			__blk_end_request_all(rq, 0);
-			continue;
+	/* Disallow writes to a read-only device */
+
+	if (wr) {
+		if (rbd_dev->mapping.read_only) {
+			result = -EROFS;
+			goto err_rq;
 		}
+		rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
+	}
 
-		/* Ignore/skip any zero-length requests */
+	/*
+	 * Quit early if the mapped snapshot no longer exists.  It's
+	 * still possible the snapshot will have disappeared by the
+	 * time our request arrives at the osd, but there's no sense in
+	 * sending it if we already know.
+	 */
+	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
+		dout("request for non-existent snapshot");
+		rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
+		result = -ENXIO;
+		goto err_rq;
+	}
 
-		offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
-		length = (u64) blk_rq_bytes(rq);
+	if (offset && length > U64_MAX - offset + 1) {
+		rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
+			 length);
+		result = -EINVAL;
+		goto err_rq;	/* Shouldn't happen */
+	}
 
-		if (!length) {
-			dout("%s: zero-length request\n", __func__);
-			__blk_end_request_all(rq, 0);
-			continue;
-		}
+	if (offset + length > rbd_dev->mapping.size) {
+		rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
+			 length, rbd_dev->mapping.size);
+		result = -EIO;
+		goto err_rq;
+	}
 
-		spin_unlock_irq(q->queue_lock);
+	img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
+	if (!img_request) {
+		result = -ENOMEM;
+		goto err_rq;
+	}
+	img_request->rq = rq;
 
-		/* Disallow writes to a read-only device */
+	result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
+	if (result)
+		goto err_img_request;
 
-		if (write_request) {
-			result = -EROFS;
-			if (rbd_dev->mapping.read_only)
-				goto end_request;
-			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
-		}
+	result = rbd_img_request_submit(img_request);
+	if (result)
+		goto err_img_request;
 
-		/*
-		 * Quit early if the mapped snapshot no longer
-		 * exists.  It's still possible the snapshot will
-		 * have disappeared by the time our request arrives
-		 * at the osd, but there's no sense in sending it if
-		 * we already know.
-		 */
-		if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
-			dout("request for non-existent snapshot");
-			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
-			result = -ENXIO;
-			goto end_request;
-		}
+	return;
 
-		result = -EINVAL;
-		if (offset && length > U64_MAX - offset + 1) {
-			rbd_warn(rbd_dev, "bad request range (%llu~%llu)",
-				offset, length);
-			goto end_request;	/* Shouldn't happen */
-		}
+err_img_request:
+	rbd_img_request_put(img_request);
+err_rq:
+	if (result)
+		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
+			 wr ? "write" : "read", length, offset, result);
+	blk_end_request_all(rq, result);
+}
 
-		result = -EIO;
-		if (offset + length > rbd_dev->mapping.size) {
-			rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)",
-				offset, length, rbd_dev->mapping.size);
-			goto end_request;
-		}
+static void rbd_request_workfn(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev =
+	    container_of(work, struct rbd_device, rq_work);
+	struct request *rq, *next;
+	LIST_HEAD(requests);
 
-		result = -ENOMEM;
-		img_request = rbd_img_request_create(rbd_dev, offset, length,
-							write_request);
-		if (!img_request)
-			goto end_request;
+	spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
+	list_splice_init(&rbd_dev->rq_queue, &requests);
+	spin_unlock_irq(&rbd_dev->lock);
 
-		img_request->rq = rq;
+	list_for_each_entry_safe(rq, next, &requests, queuelist) {
+		list_del_init(&rq->queuelist);
+		rbd_handle_request(rbd_dev, rq);
+	}
+}
 
-		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-						rq->bio);
-		if (!result)
-			result = rbd_img_request_submit(img_request);
-		if (result)
-			rbd_img_request_put(img_request);
-end_request:
-		spin_lock_irq(q->queue_lock);
-		if (result < 0) {
-			rbd_warn(rbd_dev, "%s %llx at %llx result %d",
-				write_request ? "write" : "read",
-				length, offset, result);
-
-			__blk_end_request_all(rq, result);
+/*
+ * Called with q->queue_lock held and interrupts disabled, possibly on
+ * the way to schedule().  Do not sleep here!
+ */
+static void rbd_request_fn(struct request_queue *q)
+{
+	struct rbd_device *rbd_dev = q->queuedata;
+	struct request *rq;
+	int queued = 0;
+
+	rbd_assert(rbd_dev);
+
+	while ((rq = blk_fetch_request(q))) {
+		/* Ignore any non-FS requests that filter through. */
+		if (rq->cmd_type != REQ_TYPE_FS) {
+			dout("%s: non-fs request type %d\n", __func__,
+				(int) rq->cmd_type);
+			__blk_end_request_all(rq, 0);
+			continue;
 		}
+
+		list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
+		queued++;
 	}
+
+	if (queued)
+		queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
 }
 
 /*
@@ -3847,6 +3878,8 @@  static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 		return NULL;
 
 	spin_lock_init(&rbd_dev->lock);
+	INIT_LIST_HEAD(&rbd_dev->rq_queue);
+	INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
 	rbd_dev->flags = 0;
 	atomic_set(&rbd_dev->parent_ref, 0);
 	INIT_LIST_HEAD(&rbd_dev->node);
@@ -5050,12 +5083,17 @@  static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 	ret = rbd_dev_mapping_set(rbd_dev);
 	if (ret)
 		goto err_out_disk;
+
 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
 	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
+	rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0);
+	if (!rbd_dev->rq_wq)
+		goto err_out_mapping;
+
 	ret = rbd_bus_add_dev(rbd_dev);
 	if (ret)
-		goto err_out_mapping;
+		goto err_out_workqueue;
 
 	/* Everything's ready.  Announce the disk to the world. */
 
@@ -5067,6 +5105,8 @@  static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 
 	return ret;
 
+err_out_workqueue:
+	destroy_workqueue(rbd_dev->rq_wq);
 err_out_mapping:
 	rbd_dev_mapping_clear(rbd_dev);
 err_out_disk:
@@ -5313,6 +5353,7 @@  static void rbd_dev_device_release(struct device *dev)
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
+	destroy_workqueue(rbd_dev->rq_wq);
 	rbd_free_disk(rbd_dev);
 	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 	rbd_dev_mapping_clear(rbd_dev);