diff mbox series

[12/20] rbd: lock should be quiesced on reacquire

Message ID 20190625144111.11270-13-idryomov@gmail.com (mailing list archive)
State New, archived
Headers show
Series rbd: support for object-map and fast-diff | expand

Commit Message

Ilya Dryomov June 25, 2019, 2:41 p.m. UTC
Quiesce exclusive lock at the top of rbd_reacquire_lock() instead
of only when ceph_cls_set_cookie() fails.  This avoids a deadlock on
rbd_dev->lock_rwsem.

If rbd_dev->lock_rwsem is needed for I/O completion, set_cookie can
hang ceph-msgr worker thread if set_cookie reply ends up behind an I/O
reply, because, like lock and unlock requests, set_cookie is sent and
waited upon with rbd_dev->lock_rwsem held for write.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 drivers/block/rbd.c | 35 +++++++++++++++++++++--------------
 1 file changed, 21 insertions(+), 14 deletions(-)

Comments

Dongsheng Yang July 1, 2019, 5:29 a.m. UTC | #1
On 06/25/2019 10:41 PM, Ilya Dryomov wrote:
> Quiesce exclusive lock at the top of rbd_reacquire_lock() instead
> of only when ceph_cls_set_cookie() fails.  This avoids a deadlock on
> rbd_dev->lock_rwsem.
>
> If rbd_dev->lock_rwsem is needed for I/O completion, set_cookie can
> hang ceph-msgr worker thread if set_cookie reply ends up behind an I/O
> reply, because, like lock and unlock requests, set_cookie is sent and
> waited upon with rbd_dev->lock_rwsem held for write.
>
> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>

Reviewed-by: Dongsheng Yang <dongsheng.yang@easystack.cn>
> ---
>   drivers/block/rbd.c | 35 +++++++++++++++++++++--------------
>   1 file changed, 21 insertions(+), 14 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index 34bd45d336e6..5fcb4ebd981a 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -3004,6 +3004,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
>   {
>   	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
>   
> +	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
>   	strcpy(rbd_dev->lock_cookie, cookie);
>   	rbd_set_owner_cid(rbd_dev, &cid);
>   	queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
> @@ -3028,7 +3029,6 @@ static int rbd_lock(struct rbd_device *rbd_dev)
>   	if (ret)
>   		return ret;
>   
> -	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
>   	__rbd_lock(rbd_dev, cookie);
>   	return 0;
>   }
> @@ -3411,13 +3411,11 @@ static void rbd_acquire_lock(struct work_struct *work)
>   	}
>   }
>   
> -/*
> - * lock_rwsem must be held for write
> - */
> -static bool rbd_release_lock(struct rbd_device *rbd_dev)
> +static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
>   {
> -	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
> -	     rbd_dev->lock_state);
> +	dout("%s rbd_dev %p\n", __func__, rbd_dev);
> +	lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem);
> +
>   	if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
>   		return false;
>   
> @@ -3433,12 +3431,22 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev)
>   	up_read(&rbd_dev->lock_rwsem);
>   
>   	down_write(&rbd_dev->lock_rwsem);
> -	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
> -	     rbd_dev->lock_state);
>   	if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
>   		return false;
>   
> +	return true;
> +}
> +
> +/*
> + * lock_rwsem must be held for write
> + */
> +static void rbd_release_lock(struct rbd_device *rbd_dev)
> +{
> +	if (!rbd_quiesce_lock(rbd_dev))
> +		return;
> +
>   	rbd_unlock(rbd_dev);
> +
>   	/*
>   	 * Give others a chance to grab the lock - we would re-acquire
>   	 * almost immediately if we got new IO during ceph_osdc_sync()
> @@ -3447,7 +3455,6 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev)
>   	 * after wake_requests() in rbd_handle_released_lock().
>   	 */
>   	cancel_delayed_work(&rbd_dev->lock_dwork);
> -	return true;
>   }
>   
>   static void rbd_release_lock_work(struct work_struct *work)
> @@ -3795,7 +3802,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
>   	char cookie[32];
>   	int ret;
>   
> -	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
> +	if (!rbd_quiesce_lock(rbd_dev))
> +		return;
>   
>   	format_lock_cookie(rbd_dev, cookie);
>   	ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
> @@ -3811,9 +3819,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
>   		 * Lock cookie cannot be updated on older OSDs, so do
>   		 * a manual release and queue an acquire.
>   		 */
> -		if (rbd_release_lock(rbd_dev))
> -			queue_delayed_work(rbd_dev->task_wq,
> -					   &rbd_dev->lock_dwork, 0);
> +		rbd_unlock(rbd_dev);
> +		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
>   	} else {
>   		__rbd_lock(rbd_dev, cookie);
>   	}
diff mbox series

Patch

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 34bd45d336e6..5fcb4ebd981a 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3004,6 +3004,7 @@  static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
 {
 	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
 
+	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
 	strcpy(rbd_dev->lock_cookie, cookie);
 	rbd_set_owner_cid(rbd_dev, &cid);
 	queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
@@ -3028,7 +3029,6 @@  static int rbd_lock(struct rbd_device *rbd_dev)
 	if (ret)
 		return ret;
 
-	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
 	__rbd_lock(rbd_dev, cookie);
 	return 0;
 }
@@ -3411,13 +3411,11 @@  static void rbd_acquire_lock(struct work_struct *work)
 	}
 }
 
-/*
- * lock_rwsem must be held for write
- */
-static bool rbd_release_lock(struct rbd_device *rbd_dev)
+static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
 {
-	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
-	     rbd_dev->lock_state);
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+	lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem);
+
 	if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
 		return false;
 
@@ -3433,12 +3431,22 @@  static bool rbd_release_lock(struct rbd_device *rbd_dev)
 	up_read(&rbd_dev->lock_rwsem);
 
 	down_write(&rbd_dev->lock_rwsem);
-	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
-	     rbd_dev->lock_state);
 	if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
 		return false;
 
+	return true;
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static void rbd_release_lock(struct rbd_device *rbd_dev)
+{
+	if (!rbd_quiesce_lock(rbd_dev))
+		return;
+
 	rbd_unlock(rbd_dev);
+
 	/*
 	 * Give others a chance to grab the lock - we would re-acquire
 	 * almost immediately if we got new IO during ceph_osdc_sync()
@@ -3447,7 +3455,6 @@  static bool rbd_release_lock(struct rbd_device *rbd_dev)
 	 * after wake_requests() in rbd_handle_released_lock().
 	 */
 	cancel_delayed_work(&rbd_dev->lock_dwork);
-	return true;
 }
 
 static void rbd_release_lock_work(struct work_struct *work)
@@ -3795,7 +3802,8 @@  static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
 	char cookie[32];
 	int ret;
 
-	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+	if (!rbd_quiesce_lock(rbd_dev))
+		return;
 
 	format_lock_cookie(rbd_dev, cookie);
 	ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
@@ -3811,9 +3819,8 @@  static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
 		 * Lock cookie cannot be updated on older OSDs, so do
 		 * a manual release and queue an acquire.
 		 */
-		if (rbd_release_lock(rbd_dev))
-			queue_delayed_work(rbd_dev->task_wq,
-					   &rbd_dev->lock_dwork, 0);
+		rbd_unlock(rbd_dev);
+		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
 	} else {
 		__rbd_lock(rbd_dev, cookie);
 	}