diff mbox

[9/9] rqdm core: add barrier support

Message ID 4AD801EB.60904@ct.jp.nec.com (mailing list archive)
State Accepted, archived
Delegated to: Alasdair Kergon
Headers show

Commit Message

Kiyoshi Ueda Oct. 16, 2009, 5:17 a.m. UTC
None
diff mbox

Patch

Index: 2.6.32-rc4/drivers/md/dm.c
===================================================================
--- 2.6.32-rc4.orig/drivers/md/dm.c
+++ 2.6.32-rc4/drivers/md/dm.c
@@ -143,9 +143,19 @@  struct mapped_device {
 	int barrier_error;
 
 	/*
+	 * Protect barrier_error from concurrent endio processing
+	 * in request-based dm.
+	 */
+	spinlock_t barrier_error_lock;
+
+	/*
 	 * Processing queue (flush/barriers)
 	 */
 	struct workqueue_struct *wq;
+	struct work_struct barrier_work;
+
+	/* A pointer to the currently processing pre/post flush request */
+	struct request *flush_request;
 
 	/*
 	 * The current mapping.
@@ -720,6 +730,23 @@  static void end_clone_bio(struct bio *cl
 	blk_update_request(tio->orig, 0, nr_bytes);
 }
 
+static void store_barrier_error(struct mapped_device *md, int error)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&md->barrier_error_lock, flags);
+	/*
+	 * Basically, the first error is taken, but:
+	 *   -EOPNOTSUPP supersedes any I/O error.
+	 *   Requeue request supersedes any I/O error but -EOPNOTSUPP.
+	 */
+	if (!md->barrier_error || error == -EOPNOTSUPP ||
+	    (md->barrier_error != -EOPNOTSUPP &&
+	     error == DM_ENDIO_REQUEUE))
+		md->barrier_error = error;
+	spin_unlock_irqrestore(&md->barrier_error_lock, flags);
+}
+
 /*
  * Don't touch any member of the md after calling this function because
  * the md may be freed in dm_put() at the end of this function.
@@ -757,11 +784,13 @@  static void free_rq_clone(struct request
 static void dm_end_request(struct request *clone, int error)
 {
 	int rw = rq_data_dir(clone);
+	int run_queue = 1;
+	bool is_barrier = blk_barrier_rq(clone);
 	struct dm_rq_target_io *tio = clone->end_io_data;
 	struct mapped_device *md = tio->md;
 	struct request *rq = tio->orig;
 
-	if (blk_pc_request(rq)) {
+	if (blk_pc_request(rq) && !is_barrier) {
 		rq->errors = clone->errors;
 		rq->resid_len = clone->resid_len;
 
@@ -776,9 +805,14 @@  static void dm_end_request(struct reques
 
 	free_rq_clone(clone);
 
-	blk_end_request_all(rq, error);
+	if (unlikely(is_barrier)) {
+		if (unlikely(error))
+			store_barrier_error(md, error);
+		run_queue = 0;
+	} else
+		blk_end_request_all(rq, error);
 
-	rq_completed(md, rw, 1);
+	rq_completed(md, rw, run_queue);
 }
 
 static void dm_unprep_request(struct request *rq)
@@ -803,6 +837,16 @@  void dm_requeue_unmapped_request(struct 
 	struct request_queue *q = rq->q;
 	unsigned long flags;
 
+	if (unlikely(blk_barrier_rq(clone))) {
+		/*
+		 * Barrier clones share an original request.
+		 * Leave it to dm_end_request(), which handles this special
+		 * case.
+		 */
+		dm_end_request(clone, DM_ENDIO_REQUEUE);
+		return;
+	}
+
 	dm_unprep_request(rq);
 
 	spin_lock_irqsave(q->queue_lock, flags);
@@ -892,6 +936,19 @@  static void dm_complete_request(struct r
 	struct dm_rq_target_io *tio = clone->end_io_data;
 	struct request *rq = tio->orig;
 
+	if (unlikely(blk_barrier_rq(clone))) {
+		/*
+		 * Barrier clones share an original request.  So can't use
+		 * softirq_done with the original.
+		 * Pass the clone to dm_done() directly in this special case.
+		 * It is safe (even if clone->q->queue_lock is held here)
+		 * because there is no I/O dispatching during the completion
+		 * of barrier clone.
+		 */
+		dm_done(clone, error, true);
+		return;
+	}
+
 	tio->error = error;
 	rq->completion_data = clone;
 	blk_complete_request(rq);
@@ -908,6 +965,17 @@  void dm_kill_unmapped_request(struct req
 	struct dm_rq_target_io *tio = clone->end_io_data;
 	struct request *rq = tio->orig;
 
+	if (unlikely(blk_barrier_rq(clone))) {
+		/*
+		 * Barrier clones share an original request.
+		 * Leave it to dm_end_request(), which handles this special
+		 * case.
+		 */
+		BUG_ON(error > 0);
+		dm_end_request(clone, error);
+		return;
+	}
+
 	rq->cmd_flags |= REQ_FAILED;
 	dm_complete_request(clone, error);
 }
@@ -1362,11 +1430,6 @@  static int dm_make_request(struct reques
 {
 	struct mapped_device *md = q->queuedata;
 
-	if (unlikely(bio_rw_flagged(bio, BIO_RW_BARRIER))) {
-		bio_endio(bio, -EOPNOTSUPP);
-		return 0;
-	}
-
 	return md->saved_make_request_fn(q, bio); /* call __make_request() */
 }
 
@@ -1385,6 +1448,25 @@  static int dm_request(struct request_que
 	return _dm_request(q, bio);
 }
 
+/*
+ * Mark this request as flush request, so that dm_request_fn() can
+ * recognize.
+ */
+static void dm_rq_prepare_flush(struct request_queue *q, struct request *rq)
+{
+	rq->cmd_type = REQ_TYPE_LINUX_BLOCK;
+	rq->cmd[0] = REQ_LB_OP_FLUSH;
+}
+
+static bool dm_rq_is_flush_request(struct request *rq)
+{
+	if (rq->cmd_type == REQ_TYPE_LINUX_BLOCK &&
+	    rq->cmd[0] == REQ_LB_OP_FLUSH)
+		return true;
+	else
+		return false;
+}
+
 void dm_dispatch_request(struct request *rq)
 {
 	int r;
@@ -1430,16 +1512,24 @@  static int dm_rq_bio_constructor(struct 
 static int setup_clone(struct request *clone, struct request *rq,
 		       struct dm_rq_target_io *tio)
 {
-	int r = blk_rq_prep_clone(clone, rq, tio->md->bs, GFP_ATOMIC,
-				  dm_rq_bio_constructor, tio);
+	int r;
 
-	if (r)
-		return r;
+	if (dm_rq_is_flush_request(rq)) {
+		blk_rq_init(NULL, clone);
+		clone->cmd_type = REQ_TYPE_FS;
+		clone->cmd_flags |= (REQ_HARDBARRIER | WRITE);
+	} else {
+		r = blk_rq_prep_clone(clone, rq, tio->md->bs, GFP_ATOMIC,
+				      dm_rq_bio_constructor, tio);
+		if (r)
+			return r;
+
+		clone->cmd = rq->cmd;
+		clone->cmd_len = rq->cmd_len;
+		clone->sense = rq->sense;
+		clone->buffer = rq->buffer;
+	}
 
-	clone->cmd = rq->cmd;
-	clone->cmd_len = rq->cmd_len;
-	clone->sense = rq->sense;
-	clone->buffer = rq->buffer;
 	clone->end_io = end_clone_request;
 	clone->end_io_data = tio;
 
@@ -1480,6 +1570,9 @@  static int dm_prep_fn(struct request_que
 	struct mapped_device *md = q->queuedata;
 	struct request *clone;
 
+	if (unlikely(dm_rq_is_flush_request(rq)))
+		return BLKPREP_OK;
+
 	if (unlikely(rq->special)) {
 		DMWARN("Already has something in rq->special.");
 		return BLKPREP_KILL;
@@ -1559,6 +1652,14 @@  static void dm_request_fn(struct request
 		if (!rq)
 			goto plug_and_out;
 
+		if (unlikely(dm_rq_is_flush_request(rq))) {
+			BUG_ON(md->flush_request);
+			md->flush_request = rq;
+			blk_start_request(rq);
+			queue_work(md->wq, &md->barrier_work);
+			goto out;
+		}
+
 		ti = dm_table_find_target(map, blk_rq_pos(rq));
 		if (ti->type->busy && ti->type->busy(ti))
 			goto plug_and_out;
@@ -1725,6 +1826,7 @@  out:
 static const struct block_device_operations dm_blk_dops;
 
 static void dm_wq_work(struct work_struct *work);
+static void dm_rq_barrier_work(struct work_struct *work);
 
 /*
  * Allocate and initialise a blank device with a given minor.
@@ -1754,6 +1856,7 @@  static struct mapped_device *alloc_dev(i
 	init_rwsem(&md->io_lock);
 	mutex_init(&md->suspend_lock);
 	spin_lock_init(&md->deferred_lock);
+	spin_lock_init(&md->barrier_error_lock);
 	rwlock_init(&md->map_lock);
 	atomic_set(&md->holders, 1);
 	atomic_set(&md->open_count, 0);
@@ -1788,6 +1891,8 @@  static struct mapped_device *alloc_dev(i
 	blk_queue_softirq_done(md->queue, dm_softirq_done);
 	blk_queue_prep_rq(md->queue, dm_prep_fn);
 	blk_queue_lld_busy(md->queue, dm_lld_busy);
+	blk_queue_ordered(md->queue, QUEUE_ORDERED_DRAIN_FLUSH,
+			  dm_rq_prepare_flush);
 
 	md->disk = alloc_disk(1);
 	if (!md->disk)
@@ -1797,6 +1902,7 @@  static struct mapped_device *alloc_dev(i
 	atomic_set(&md->pending[1], 0);
 	init_waitqueue_head(&md->wait);
 	INIT_WORK(&md->work, dm_wq_work);
+	INIT_WORK(&md->barrier_work, dm_rq_barrier_work);
 	init_waitqueue_head(&md->eventq);
 
 	md->disk->major = _major;
@@ -2184,6 +2290,73 @@  static void dm_queue_flush(struct mapped
 	queue_work(md->wq, &md->work);
 }
 
+static void dm_rq_set_flush_nr(struct request *clone, unsigned flush_nr)
+{
+	struct dm_rq_target_io *tio = clone->end_io_data;
+
+	tio->info.flush_request = flush_nr;
+}
+
+/* Issue barrier requests to targets and wait for their completion. */
+static int dm_rq_barrier(struct mapped_device *md)
+{
+	int i, j;
+	struct dm_table *map = dm_get_table(md);
+	unsigned num_targets = dm_table_get_num_targets(map);
+	struct dm_target *ti;
+	struct request *clone;
+
+	md->barrier_error = 0;
+
+	for (i = 0; i < num_targets; i++) {
+		ti = dm_table_get_target(map, i);
+		for (j = 0; j < ti->num_flush_requests; j++) {
+			clone = clone_rq(md->flush_request, md, GFP_NOIO);
+			dm_rq_set_flush_nr(clone, j);
+			atomic_inc(&md->pending[rq_data_dir(clone)]);
+			map_request(ti, clone, md);
+		}
+	}
+
+	dm_wait_for_completion(md, TASK_UNINTERRUPTIBLE);
+	dm_table_put(map);
+
+	return md->barrier_error;
+}
+
+static void dm_rq_barrier_work(struct work_struct *work)
+{
+	int error;
+	struct mapped_device *md = container_of(work, struct mapped_device,
+						barrier_work);
+	struct request_queue *q = md->queue;
+	struct request *rq;
+	unsigned long flags;
+
+	/*
+	 * Hold the md reference here and leave it at the last part so that
+	 * the md can't be deleted by device opener when the barrier request
+	 * completes.
+	 */
+	dm_get(md);
+
+	error = dm_rq_barrier(md);
+
+	rq = md->flush_request;
+	md->flush_request = NULL;
+
+	if (error == DM_ENDIO_REQUEUE) {
+		spin_lock_irqsave(q->queue_lock, flags);
+		blk_requeue_request(q, rq);
+		spin_unlock_irqrestore(q->queue_lock, flags);
+	} else
+		blk_end_request_all(rq, error);
+
+	blk_run_queue(q);
+
+	dm_put(md);
+}
+
 /*
  * Swap in a new table (destroying old one).
  */
@@ -2324,11 +2497,16 @@  int dm_suspend(struct mapped_device *md,
 	set_bit(DMF_QUEUE_IO_TO_THREAD, &md->flags);
 	up_write(&md->io_lock);
 
-	flush_workqueue(md->wq);
-
+	/*
+	 * Request-based dm uses md->wq for barrier (dm_rq_barrier_work) which
+	 * can be kicked until md->queue is stopped.  So stop md->queue before
+	 * flushing md->wq.
+	 */
 	if (dm_request_based(md))
 		stop_queue(md->queue);
 
+	flush_workqueue(md->wq);
+
 	/*
 	 * At this point no more requests are entering target request routines.
 	 * We call dm_wait_for_completion to wait for all existing requests