diff mbox

[16/19] tcmu: don't block submitting context for block waits

Message ID 1509335079-5276-17-git-send-email-mchristi@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Mike Christie Oct. 30, 2017, 3:44 a.m. UTC
This patch has tcmu internally queue cmds if its ring buffer
is full. It also makes the TCMU_GLOBAL_MAX_BLOCKS limit a
hint instead of a hard limit, so we do not have to add any
new locks/atomics in the main IO path except when IO is not
running.

This fixes the following bugs:

1. We cannot sleep from the submitting context because it might be
called from a target recv context. This results in transport level
commands timing out. For example if the ring is full, we would
sleep, and a iscsi initiator would send a iscsi ping/nop which
times out because the target's recv thread is sleeping here.

2. Devices were not fairly scheduled to run when they hit the global
limit so they could time out waiting for ring space while others
got run.

Signed-off-by: Mike Christie <mchristi@redhat.com>
---
 drivers/target/target_core_user.c | 264 ++++++++++++++++++++++++--------------
 1 file changed, 169 insertions(+), 95 deletions(-)
diff mbox

Patch

diff --git a/drivers/target/target_core_user.c b/drivers/target/target_core_user.c
index 0630657..80dd17a 100644
--- a/drivers/target/target_core_user.c
+++ b/drivers/target/target_core_user.c
@@ -83,7 +83,10 @@ 
 /* The total size of the ring is 8M + 256K * PAGE_SIZE */
 #define TCMU_RING_SIZE (CMDR_SIZE + DATA_SIZE)
 
-/* Default maximum of the global data blocks(512K * PAGE_SIZE) */
+/*
+ * Default number of global data blocks(512K * PAGE_SIZE)
+ * when the unmap thread will be started.
+ */
 #define TCMU_GLOBAL_MAX_BLOCKS (512 * 1024)
 
 static u8 tcmu_kern_cmd_reply_supported;
@@ -106,6 +109,7 @@  struct tcmu_nl_cmd {
 struct tcmu_dev {
 	struct list_head node;
 	struct kref kref;
+
 	struct se_device se_dev;
 
 	char *name;
@@ -128,10 +132,9 @@  struct tcmu_dev {
 	size_t data_off;
 	size_t data_size;
 
-	wait_queue_head_t wait_cmdr;
 	struct mutex cmdr_lock;
+	struct list_head cmdr_queue;
 
-	bool waiting_global;
 	uint32_t dbi_max;
 	uint32_t dbi_thresh;
 	DECLARE_BITMAP(data_bitmap, DATA_BLOCK_BITS);
@@ -160,6 +163,7 @@  struct tcmu_dev {
 struct tcmu_cmd {
 	struct se_cmd *se_cmd;
 	struct tcmu_dev *tcmu_dev;
+	struct list_head cmdr_queue_entry;
 
 	uint16_t cmd_id;
 
@@ -174,7 +178,16 @@  struct tcmu_cmd {
 #define TCMU_CMD_BIT_EXPIRED 0
 	unsigned long flags;
 };
-
+/*
+ * To avoid dead lock the mutex lock order should always be:
+ *
+ * mutex_lock(&root_udev_mutex);
+ * ...
+ * mutex_lock(&tcmu_dev->cmdr_lock);
+ * mutex_unlock(&tcmu_dev->cmdr_lock);
+ * ...
+ * mutex_unlock(&root_udev_mutex);
+ */
 static DEFINE_MUTEX(root_udev_mutex);
 static LIST_HEAD(root_udev);
 
@@ -182,7 +195,7 @@  struct tcmu_cmd {
 static LIST_HEAD(timed_out_udevs);
 
 static atomic_t global_db_count = ATOMIC_INIT(0);
-static struct work_struct tcmu_unmap_work;
+static struct delayed_work tcmu_unmap_work;
 
 static struct kmem_cache *tcmu_cmd_cache;
 
@@ -346,10 +359,8 @@  static inline bool tcmu_get_empty_block(struct tcmu_dev *udev,
 	page = radix_tree_lookup(&udev->data_blocks, dbi);
 	if (!page) {
 		if (atomic_add_return(1, &global_db_count) >
-					TCMU_GLOBAL_MAX_BLOCKS) {
-			atomic_dec(&global_db_count);
-			return false;
-		}
+					TCMU_GLOBAL_MAX_BLOCKS)
+			schedule_delayed_work(&tcmu_unmap_work, 0);
 
 		/* try to get new page from the mm */
 		page = alloc_page(GFP_KERNEL);
@@ -380,18 +391,11 @@  static bool tcmu_get_empty_blocks(struct tcmu_dev *udev,
 {
 	int i;
 
-	udev->waiting_global = false;
-
 	for (i = tcmu_cmd->dbi_cur; i < tcmu_cmd->dbi_cnt; i++) {
 		if (!tcmu_get_empty_block(udev, tcmu_cmd))
-			goto err;
+			return false;
 	}
 	return true;
-
-err:
-	udev->waiting_global = true;
-	schedule_work(&tcmu_unmap_work);
-	return false;
 }
 
 static inline struct page *
@@ -437,6 +441,7 @@  static struct tcmu_cmd *tcmu_alloc_cmd(struct se_cmd *se_cmd)
 	if (!tcmu_cmd)
 		return NULL;
 
+	INIT_LIST_HEAD(&tcmu_cmd->cmdr_queue_entry);
 	tcmu_cmd->se_cmd = se_cmd;
 	tcmu_cmd->tcmu_dev = udev;
 
@@ -741,6 +746,10 @@  static int tcmu_setup_cmd_timer(struct tcmu_cmd *tcmu_cmd)
 	unsigned long tmo = udev->cmd_time_out;
 	int cmd_id;
 
+	/*
+	 * If it was on the cmdr queue waiting we do not reset the timer
+	 * for requeues and when it is finally sent to userspace.
+	 */
 	if (tcmu_cmd->cmd_id)
 		return 0;
 
@@ -752,13 +761,31 @@  static int tcmu_setup_cmd_timer(struct tcmu_cmd *tcmu_cmd)
 	tcmu_cmd->cmd_id = cmd_id;
 
 	if (!tmo)
-		return 0;
+		tmo = TCMU_TIME_OUT;
+
+	pr_debug("allocated cmd %u for dev %s tmo %lu\n", tcmu_cmd->cmd_id,
+		 udev->name, tmo / MSEC_PER_SEC);
 
 	tcmu_cmd->deadline = round_jiffies_up(jiffies + msecs_to_jiffies(tmo));
 	mod_timer(&udev->timeout, tcmu_cmd->deadline);
 	return 0;
 }
 
+static int add_to_cmdr_queue(struct tcmu_cmd *tcmu_cmd)
+{
+	struct tcmu_dev *udev = tcmu_cmd->tcmu_dev;
+	int ret;
+
+	ret = tcmu_setup_cmd_timer(tcmu_cmd);
+	if (ret)
+		return ret;
+
+	list_add_tail(&tcmu_cmd->cmdr_queue_entry, &udev->cmdr_queue);
+	pr_debug("adding cmd %u on dev %s to ring space wait queue\n",
+		 tcmu_cmd->cmd_id, udev->name);
+	return 0;
+}
+
 /**
  * queue_cmd_ring - queue cmd to ring or internally
  * @tcmu_cmd: cmd to queue
@@ -767,6 +794,7 @@  static int tcmu_setup_cmd_timer(struct tcmu_cmd *tcmu_cmd)
  * Returns:
  * -1 we cannot queue internally or to the ring.
  *  0 success
+ *  1 internally queued to wait for ring memory to free.
  */
 static sense_reason_t queue_cmd_ring(struct tcmu_cmd *tcmu_cmd, int *scsi_err)
 {
@@ -804,7 +832,8 @@  static sense_reason_t queue_cmd_ring(struct tcmu_cmd *tcmu_cmd, int *scsi_err)
 	base_command_size = tcmu_cmd_get_base_cmd_size(tcmu_cmd->dbi_cnt);
 	command_size = tcmu_cmd_get_cmd_size(tcmu_cmd, base_command_size);
 
-	mutex_lock(&udev->cmdr_lock);
+	if (!list_empty(&udev->cmdr_queue))
+		goto queue;
 
 	mb = udev->mb_addr;
 	cmd_head = mb->cmd_head % udev->cmdr_size; /* UAM */
@@ -813,42 +842,18 @@  static sense_reason_t queue_cmd_ring(struct tcmu_cmd *tcmu_cmd, int *scsi_err)
 		pr_warn("TCMU: Request of size %zu/%zu is too big for %u/%zu "
 			"cmd ring/data area\n", command_size, data_length,
 			udev->cmdr_size, udev->data_size);
-		mutex_unlock(&udev->cmdr_lock);
 		*scsi_err = TCM_INVALID_CDB_FIELD;
 		return -1;
 	}
 
-	while (!is_ring_space_avail(udev, tcmu_cmd, command_size, data_length)) {
-		int ret;
-		DEFINE_WAIT(__wait);
-
+	if (!is_ring_space_avail(udev, tcmu_cmd, command_size, data_length)) {
 		/*
 		 * Don't leave commands partially setup because the unmap
 		 * thread might need the blocks to make forward progress.
 		 */
 		tcmu_cmd_free_data(tcmu_cmd, tcmu_cmd->dbi_cur);
 		tcmu_cmd_reset_dbi_cur(tcmu_cmd);
-
-		prepare_to_wait(&udev->wait_cmdr, &__wait, TASK_INTERRUPTIBLE);
-
-		pr_debug("sleeping for ring space\n");
-		mutex_unlock(&udev->cmdr_lock);
-		if (udev->cmd_time_out)
-			ret = schedule_timeout(
-					msecs_to_jiffies(udev->cmd_time_out));
-		else
-			ret = schedule_timeout(msecs_to_jiffies(TCMU_TIME_OUT));
-		finish_wait(&udev->wait_cmdr, &__wait);
-		if (!ret) {
-			pr_warn("tcmu: command timed out\n");
-			*scsi_err = TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
-			return -1;
-		}
-
-		mutex_lock(&udev->cmdr_lock);
-
-		/* We dropped cmdr_lock, cmd_head is stale */
-		cmd_head = mb->cmd_head % udev->cmdr_size; /* UAM */
+		goto queue;
 	}
 
 	/* Insert a PAD if end-of-ring space is too small */
@@ -921,31 +926,39 @@  static sense_reason_t queue_cmd_ring(struct tcmu_cmd *tcmu_cmd, int *scsi_err)
 
 	UPDATE_HEAD(mb->cmd_head, command_size, udev->cmdr_size);
 	tcmu_flush_dcache_range(mb, sizeof(*mb));
-	mutex_unlock(&udev->cmdr_lock);
 
 	/* TODO: only if FLUSH and FUA? */
 	uio_event_notify(&udev->uio_info);
 
-	if (udev->cmd_time_out)
-		mod_timer(&udev->timeout, round_jiffies_up(jiffies +
-			  msecs_to_jiffies(udev->cmd_time_out)));
-
 	return 0;
+
+queue:
+	if (add_to_cmdr_queue(tcmu_cmd)) {
+		*scsi_err = TCM_OUT_OF_RESOURCES;
+		return -1;
+	}
+
+	return 1;
 }
 
 static sense_reason_t
 tcmu_queue_cmd(struct se_cmd *se_cmd)
 {
+	struct se_device *se_dev = se_cmd->se_dev;
+	struct tcmu_dev *udev = TCMU_DEV(se_dev);
 	struct tcmu_cmd *tcmu_cmd;
 	sense_reason_t scsi_ret;
+	int ret;
 
 	tcmu_cmd = tcmu_alloc_cmd(se_cmd);
 	if (!tcmu_cmd)
 		return TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
 
-	if (queue_cmd_ring(tcmu_cmd, &scsi_ret) < 0)
+	mutex_lock(&udev->cmdr_lock);
+	ret = queue_cmd_ring(tcmu_cmd, &scsi_ret);
+	mutex_unlock(&udev->cmdr_lock);
+	if (ret < 0)
 		tcmu_free_cmd(tcmu_cmd);
-
 	return scsi_ret;
 }
 
@@ -1033,10 +1046,15 @@  static unsigned int tcmu_handle_completions(struct tcmu_dev *udev)
 		handled++;
 	}
 
-	if (mb->cmd_tail == mb->cmd_head)
-		del_timer(&udev->timeout); /* no more pending cmds */
-
-	wake_up(&udev->wait_cmdr);
+	if (mb->cmd_tail == mb->cmd_head && list_empty(&udev->cmdr_queue)) {
+		del_timer(&udev->timeout);
+		/*
+		 * not more pending or waiting commands so try to reclaim
+		 * blocks if needed.
+		 */
+		if (atomic_read(&global_db_count) > TCMU_GLOBAL_MAX_BLOCKS)
+			schedule_delayed_work(&tcmu_unmap_work, 0);
+	}
 
 	return handled;
 }
@@ -1044,6 +1062,10 @@  static unsigned int tcmu_handle_completions(struct tcmu_dev *udev)
 static int tcmu_check_expired_cmd(int id, void *p, void *data)
 {
 	struct tcmu_cmd *cmd = p;
+	struct tcmu_dev *udev = cmd->tcmu_dev;
+	u8 scsi_status;
+	struct se_cmd *se_cmd;
+	bool is_running;
 
 	if (test_bit(TCMU_CMD_BIT_EXPIRED, &cmd->flags))
 		return 0;
@@ -1051,10 +1073,27 @@  static int tcmu_check_expired_cmd(int id, void *p, void *data)
 	if (!time_after(jiffies, cmd->deadline))
 		return 0;
 
-	set_bit(TCMU_CMD_BIT_EXPIRED, &cmd->flags);
-	target_complete_cmd(cmd->se_cmd, SAM_STAT_CHECK_CONDITION);
+	is_running = list_empty(&cmd->cmdr_queue_entry);
+	pr_debug("Timing out cmd %u on dev %s that is %s.\n",
+		 id, udev->name, is_running ? "inflight" : "queued");
+
+	se_cmd = cmd->se_cmd;
 	cmd->se_cmd = NULL;
 
+	if (is_running) {
+		set_bit(TCMU_CMD_BIT_EXPIRED, &cmd->flags);
+		/*
+		 * target_complete_cmd will translate this to LUN COMM FAILURE
+		 */
+		scsi_status = SAM_STAT_CHECK_CONDITION;
+	} else {
+		list_del_init(&cmd->cmdr_queue_entry);
+
+		idr_remove(&udev->commands, id);
+		tcmu_free_cmd(cmd);
+		scsi_status = SAM_STAT_TASK_SET_FULL;
+	}
+	target_complete_cmd(se_cmd, scsi_status);
 	return 0;
 }
 
@@ -1069,7 +1108,7 @@  static void tcmu_device_timedout(unsigned long data)
 		list_add_tail(&udev->timedout_entry, &timed_out_udevs);
 	spin_unlock(&timed_out_udevs_lock);
 
-	schedule_work(&tcmu_unmap_work);
+	schedule_delayed_work(&tcmu_unmap_work, 0);
 }
 
 static int tcmu_attach_hba(struct se_hba *hba, u32 host_id)
@@ -1110,10 +1149,10 @@  static struct se_device *tcmu_alloc_device(struct se_hba *hba, const char *name)
 	udev->hba = hba;
 	udev->cmd_time_out = TCMU_TIME_OUT;
 
-	init_waitqueue_head(&udev->wait_cmdr);
 	mutex_init(&udev->cmdr_lock);
 
 	INIT_LIST_HEAD(&udev->timedout_entry);
+	INIT_LIST_HEAD(&udev->cmdr_queue);
 	idr_init(&udev->commands);
 
 	setup_timer(&udev->timeout, tcmu_device_timedout,
@@ -1127,13 +1166,63 @@  static struct se_device *tcmu_alloc_device(struct se_hba *hba, const char *name)
 	return &udev->se_dev;
 }
 
+static bool run_cmdr_queue(struct tcmu_dev *udev)
+{
+	struct tcmu_cmd *tcmu_cmd, *tmp_cmd;
+	LIST_HEAD(cmds);
+	bool drained = true;
+	sense_reason_t scsi_ret;
+	int ret;
+
+	if (list_empty(&udev->cmdr_queue))
+		return true;
+
+	pr_debug("running %s's cmdr queue\n", udev->name);
+
+	list_splice_init(&udev->cmdr_queue, &cmds);
+
+	list_for_each_entry_safe(tcmu_cmd, tmp_cmd, &cmds, cmdr_queue_entry) {
+		list_del_init(&tcmu_cmd->cmdr_queue_entry);
+
+	        pr_debug("removing cmd %u on dev %s from queue\n",
+		         tcmu_cmd->cmd_id, udev->name);
+
+		ret = queue_cmd_ring(tcmu_cmd, &scsi_ret);
+		if (ret < 0) {
+		        pr_debug("cmd %u on dev %s failed with %u\n",
+			         tcmu_cmd->cmd_id, udev->name, scsi_ret);
+
+			idr_remove(&udev->commands, tcmu_cmd->cmd_id);
+			/*
+			 * Ignore scsi_ret for now. target_complete_cmd
+			 * drops it.
+			 */
+			target_complete_cmd(tcmu_cmd->se_cmd,
+					    SAM_STAT_CHECK_CONDITION);
+			tcmu_free_cmd(tcmu_cmd);
+		} else if (ret > 0) {
+			pr_debug("ran out of space during cmdr queue run\n");
+			/*
+			 * cmd was requeued, so just put all cmds back in
+			 * the queue
+			 */
+			list_splice_tail(&cmds, &udev->cmdr_queue);
+			drained = false;
+			goto done;
+		}
+	}
+done:
+	return drained;
+}
+
 static int tcmu_irqcontrol(struct uio_info *info, s32 irq_on)
 {
-	struct tcmu_dev *tcmu_dev = container_of(info, struct tcmu_dev, uio_info);
+	struct tcmu_dev *udev = container_of(info, struct tcmu_dev, uio_info);
 
-	mutex_lock(&tcmu_dev->cmdr_lock);
-	tcmu_handle_completions(tcmu_dev);
-	mutex_unlock(&tcmu_dev->cmdr_lock);
+	mutex_lock(&udev->cmdr_lock);
+	tcmu_handle_completions(udev);
+	run_cmdr_queue(udev);
+	mutex_unlock(&udev->cmdr_lock);
 
 	return 0;
 }
@@ -1529,7 +1618,6 @@  static int tcmu_configure_device(struct se_device *dev)
 	udev->data_off = CMDR_SIZE;
 	udev->data_size = DATA_SIZE;
 	udev->dbi_thresh = 0; /* Default in Idle state */
-	udev->waiting_global = false;
 
 	/* Initialise the mailbox of the ring buffer */
 	mb = udev->mb_addr;
@@ -1974,12 +2062,14 @@  static ssize_t tcmu_emulate_write_cache_store(struct config_item *item,
 	.tb_dev_attrib_attrs	= NULL,
 };
 
-
 static void find_free_blocks(void)
 {
 	struct tcmu_dev *udev;
 	loff_t off;
-	uint32_t start, end, block;
+	u32 start, end, block, total_freed = 0;
+
+	if (atomic_read(&global_db_count) <= TCMU_GLOBAL_MAX_BLOCKS)
+		return;
 
 	mutex_lock(&root_udev_mutex);
 	list_for_each_entry(udev, &root_udev, node) {
@@ -1988,8 +2078,8 @@  static void find_free_blocks(void)
 		/* Try to complete the finished commands first */
 		tcmu_handle_completions(udev);
 
-		/* Skip the udevs waiting the global pool or in idle */
-		if (udev->waiting_global || !udev->dbi_thresh) {
+		/* Skip the udevs in idle */
+		if (!udev->dbi_thresh) {
 			mutex_unlock(&udev->cmdr_lock);
 			continue;
 		}
@@ -1998,8 +2088,8 @@  static void find_free_blocks(void)
 		block = find_last_bit(udev->data_bitmap, end);
 		if (block == udev->dbi_max) {
 			/*
-			 * The last bit is dbi_max, so there is
-			 * no need to shrink any blocks.
+			 * The last bit is dbi_max, so it is not possible
+			 * reclaim any blocks.
 			 */
 			mutex_unlock(&udev->cmdr_lock);
 			continue;
@@ -2019,30 +2109,15 @@  static void find_free_blocks(void)
 		/* Release the block pages */
 		tcmu_blocks_release(&udev->data_blocks, start, end);
 		mutex_unlock(&udev->cmdr_lock);
-	}
-	mutex_unlock(&root_udev_mutex);
-}
 
-static void run_cmdr_queues(void)
-{
-	struct tcmu_dev *udev;
-
-	/*
-	 * Try to wake up the udevs who are waiting
-	 * for the global data block pool.
-	 */
-	mutex_lock(&root_udev_mutex);
-	list_for_each_entry(udev, &root_udev, node) {
-		mutex_lock(&udev->cmdr_lock);
-		if (!udev->waiting_global) {
-			mutex_unlock(&udev->cmdr_lock);
-			break;
-		}
-		mutex_unlock(&udev->cmdr_lock);
-
-		wake_up(&udev->wait_cmdr);
+		total_freed += end - start;
+		pr_debug("Freed %u blocks (total %u) from %s.\n", end - start,
+			 total_freed, udev->name);
 	}
 	mutex_unlock(&root_udev_mutex);
+
+	if (atomic_read(&global_db_count) > TCMU_GLOBAL_MAX_BLOCKS)
+		schedule_delayed_work(&tcmu_unmap_work, msecs_to_jiffies(5000));
 }
 
 static void check_timedout_devices(void)
@@ -2071,7 +2146,6 @@  static void tcmu_unmap_work_fn(struct work_struct *work)
 {
 	check_timedout_devices();
 	find_free_blocks();
-	run_cmdr_queues();
 }
 
 static int __init tcmu_module_init(void)
@@ -2080,7 +2154,7 @@  static int __init tcmu_module_init(void)
 
 	BUILD_BUG_ON((sizeof(struct tcmu_cmd_entry) % TCMU_OP_ALIGN_SIZE) != 0);
 
-	INIT_WORK(&tcmu_unmap_work, tcmu_unmap_work_fn);
+	INIT_DELAYED_WORK(&tcmu_unmap_work, tcmu_unmap_work_fn);
 
 	tcmu_cmd_cache = kmem_cache_create("tcmu_cmd_cache",
 				sizeof(struct tcmu_cmd),
@@ -2143,7 +2217,7 @@  static int __init tcmu_module_init(void)
 
 static void __exit tcmu_module_exit(void)
 {
-	cancel_work_sync(&tcmu_unmap_work);
+	cancel_delayed_work_sync(&tcmu_unmap_work);
 	target_backend_unregister(&tcmu_ops);
 	kfree(tcmu_attrs);
 	genl_unregister_family(&tcmu_genl_family);