Message ID | 20190625144111.11270-13-idryomov@gmail.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | rbd: support for object-map and fast-diff | expand |
On 06/25/2019 10:41 PM, Ilya Dryomov wrote: > Quiesce exclusive lock at the top of rbd_reacquire_lock() instead > of only when ceph_cls_set_cookie() fails. This avoids a deadlock on > rbd_dev->lock_rwsem. > > If rbd_dev->lock_rwsem is needed for I/O completion, set_cookie can > hang ceph-msgr worker thread if set_cookie reply ends up behind an I/O > reply, because, like lock and unlock requests, set_cookie is sent and > waited upon with rbd_dev->lock_rwsem held for write. > > Signed-off-by: Ilya Dryomov <idryomov@gmail.com> Reviewed-by: Dongsheng Yang <dongsheng.yang@easystack.cn> > --- > drivers/block/rbd.c | 35 +++++++++++++++++++++-------------- > 1 file changed, 21 insertions(+), 14 deletions(-) > > diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c > index 34bd45d336e6..5fcb4ebd981a 100644 > --- a/drivers/block/rbd.c > +++ b/drivers/block/rbd.c > @@ -3004,6 +3004,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie) > { > struct rbd_client_id cid = rbd_get_cid(rbd_dev); > > + rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; > strcpy(rbd_dev->lock_cookie, cookie); > rbd_set_owner_cid(rbd_dev, &cid); > queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); > @@ -3028,7 +3029,6 @@ static int rbd_lock(struct rbd_device *rbd_dev) > if (ret) > return ret; > > - rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; > __rbd_lock(rbd_dev, cookie); > return 0; > } > @@ -3411,13 +3411,11 @@ static void rbd_acquire_lock(struct work_struct *work) > } > } > > -/* > - * lock_rwsem must be held for write > - */ > -static bool rbd_release_lock(struct rbd_device *rbd_dev) > +static bool rbd_quiesce_lock(struct rbd_device *rbd_dev) > { > - dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, > - rbd_dev->lock_state); > + dout("%s rbd_dev %p\n", __func__, rbd_dev); > + lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem); > + > if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) > return false; > > @@ -3433,12 +3431,22 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev) > up_read(&rbd_dev->lock_rwsem); > > down_write(&rbd_dev->lock_rwsem); > - dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, > - rbd_dev->lock_state); > if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) > return false; > > + return true; > +} > + > +/* > + * lock_rwsem must be held for write > + */ > +static void rbd_release_lock(struct rbd_device *rbd_dev) > +{ > + if (!rbd_quiesce_lock(rbd_dev)) > + return; > + > rbd_unlock(rbd_dev); > + > /* > * Give others a chance to grab the lock - we would re-acquire > * almost immediately if we got new IO during ceph_osdc_sync() > @@ -3447,7 +3455,6 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev) > * after wake_requests() in rbd_handle_released_lock(). > */ > cancel_delayed_work(&rbd_dev->lock_dwork); > - return true; > } > > static void rbd_release_lock_work(struct work_struct *work) > @@ -3795,7 +3802,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) > char cookie[32]; > int ret; > > - WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); > + if (!rbd_quiesce_lock(rbd_dev)) > + return; > > format_lock_cookie(rbd_dev, cookie); > ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, > @@ -3811,9 +3819,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) > * Lock cookie cannot be updated on older OSDs, so do > * a manual release and queue an acquire. > */ > - if (rbd_release_lock(rbd_dev)) > - queue_delayed_work(rbd_dev->task_wq, > - &rbd_dev->lock_dwork, 0); > + rbd_unlock(rbd_dev); > + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); > } else { > __rbd_lock(rbd_dev, cookie); > }
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 34bd45d336e6..5fcb4ebd981a 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -3004,6 +3004,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie) { struct rbd_client_id cid = rbd_get_cid(rbd_dev); + rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; strcpy(rbd_dev->lock_cookie, cookie); rbd_set_owner_cid(rbd_dev, &cid); queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); @@ -3028,7 +3029,6 @@ static int rbd_lock(struct rbd_device *rbd_dev) if (ret) return ret; - rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; __rbd_lock(rbd_dev, cookie); return 0; } @@ -3411,13 +3411,11 @@ static void rbd_acquire_lock(struct work_struct *work) } } -/* - * lock_rwsem must be held for write - */ -static bool rbd_release_lock(struct rbd_device *rbd_dev) +static bool rbd_quiesce_lock(struct rbd_device *rbd_dev) { - dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, - rbd_dev->lock_state); + dout("%s rbd_dev %p\n", __func__, rbd_dev); + lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) return false; @@ -3433,12 +3431,22 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev) up_read(&rbd_dev->lock_rwsem); down_write(&rbd_dev->lock_rwsem); - dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, - rbd_dev->lock_state); if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) return false; + return true; +} + +/* + * lock_rwsem must be held for write + */ +static void rbd_release_lock(struct rbd_device *rbd_dev) +{ + if (!rbd_quiesce_lock(rbd_dev)) + return; + rbd_unlock(rbd_dev); + /* * Give others a chance to grab the lock - we would re-acquire * almost immediately if we got new IO during ceph_osdc_sync() @@ -3447,7 +3455,6 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev) * after wake_requests() in rbd_handle_released_lock(). */ cancel_delayed_work(&rbd_dev->lock_dwork); - return true; } static void rbd_release_lock_work(struct work_struct *work) @@ -3795,7 +3802,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) char cookie[32]; int ret; - WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); + if (!rbd_quiesce_lock(rbd_dev)) + return; format_lock_cookie(rbd_dev, cookie); ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, @@ -3811,9 +3819,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) * Lock cookie cannot be updated on older OSDs, so do * a manual release and queue an acquire. */ - if (rbd_release_lock(rbd_dev)) - queue_delayed_work(rbd_dev->task_wq, - &rbd_dev->lock_dwork, 0); + rbd_unlock(rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); } else { __rbd_lock(rbd_dev, cookie); }
Quiesce exclusive lock at the top of rbd_reacquire_lock() instead of only when ceph_cls_set_cookie() fails. This avoids a deadlock on rbd_dev->lock_rwsem. If rbd_dev->lock_rwsem is needed for I/O completion, set_cookie can hang ceph-msgr worker thread if set_cookie reply ends up behind an I/O reply, because, like lock and unlock requests, set_cookie is sent and waited upon with rbd_dev->lock_rwsem held for write. Signed-off-by: Ilya Dryomov <idryomov@gmail.com> --- drivers/block/rbd.c | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-)