@@ -798,6 +798,7 @@ enum {
Opt_read_only,
Opt_read_write,
Opt_lock_on_read,
+ Opt_exclusive,
Opt_err
};
@@ -810,6 +811,7 @@ static match_table_t rbd_opts_tokens = {
{Opt_read_write, "read_write"},
{Opt_read_write, "rw"}, /* Alternate spelling */
{Opt_lock_on_read, "lock_on_read"},
+ {Opt_exclusive, "exclusive"},
{Opt_err, NULL}
};
@@ -817,11 +819,13 @@ struct rbd_options {
int queue_depth;
bool read_only;
bool lock_on_read;
+ bool exclusive;
};
#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
#define RBD_READ_ONLY_DEFAULT false
#define RBD_LOCK_ON_READ_DEFAULT false
+#define RBD_EXCLUSIVE_DEFAULT false
static int parse_rbd_opts_token(char *c, void *private)
{
@@ -860,6 +864,9 @@ static int parse_rbd_opts_token(char *c, void *private)
case Opt_lock_on_read:
rbd_opts->lock_on_read = true;
break;
+ case Opt_exclusive:
+ rbd_opts->exclusive = true;
+ break;
default:
/* libceph prints "bad option" msg */
return -EINVAL;
@@ -3440,6 +3447,18 @@ static void rbd_acquire_lock(struct work_struct *work)
ret = rbd_request_lock(rbd_dev);
if (ret == -ETIMEDOUT) {
goto again; /* treat this as a dead client */
+ } else if (ret == -EROFS) {
+ rbd_warn(rbd_dev, "peer will not release lock");
+ /*
+ * If this is rbd_add_acquire_lock(), we want to fail
+ * immediately -- reuse BLACKLISTED flag. Otherwise we
+ * want to block.
+ */
+ if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
+ set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
+ /* wake "rbd map --exclusive" process */
+ wake_requests(rbd_dev, false);
+ }
} else if (ret < 0) {
rbd_warn(rbd_dev, "error requesting lock: %d", ret);
mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
@@ -3606,9 +3625,15 @@ static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
result = 0;
if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
- dout("%s rbd_dev %p queueing unlock_work\n", __func__,
- rbd_dev);
- queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
+ if (!rbd_dev->opts->exclusive) {
+ dout("%s rbd_dev %p queueing unlock_work\n",
+ __func__, rbd_dev);
+ queue_work(rbd_dev->task_wq,
+ &rbd_dev->unlock_work);
+ } else {
+ /* refuse to release the lock */
+ result = -EROFS;
+ }
}
}
@@ -4073,8 +4098,14 @@ static void rbd_queue_workfn(struct work_struct *work)
if (must_be_locked) {
down_read(&rbd_dev->lock_rwsem);
if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
- !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
+ !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
+ if (rbd_dev->opts->exclusive) {
+ rbd_warn(rbd_dev, "exclusive lock required");
+ result = -EROFS;
+ goto err_unlock;
+ }
rbd_wait_state_locked(rbd_dev);
+ }
if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
result = -EBLACKLISTED;
goto err_unlock;
@@ -5640,6 +5671,7 @@ static int rbd_add_parse_args(const char *buf,
rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
+ rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
copts = ceph_parse_options(options, mon_addrs,
mon_addrs + mon_addrs_size - 1,
@@ -5698,6 +5730,33 @@ static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
return ret;
}
+static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
+{
+ down_write(&rbd_dev->lock_rwsem);
+ if (__rbd_is_lock_owner(rbd_dev))
+ rbd_unlock(rbd_dev);
+ up_write(&rbd_dev->lock_rwsem);
+}
+
+static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
+{
+ if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
+ rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
+ return -EINVAL;
+ }
+
+ /* FIXME: "rbd map --exclusive" should be in interruptible */
+ down_read(&rbd_dev->lock_rwsem);
+ rbd_wait_state_locked(rbd_dev);
+ up_read(&rbd_dev->lock_rwsem);
+ if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
+ rbd_warn(rbd_dev, "failed to acquire exclusive lock");
+ return -EROFS;
+ }
+
+ return 0;
+}
+
/*
* An rbd format 2 image has a unique identifier, distinct from the
* name given to it by the user. Internally, that identifier is
@@ -6141,11 +6200,17 @@ static ssize_t do_rbd_add(struct bus_type *bus,
if (rc)
goto err_out_image_probe;
+ if (rbd_dev->opts->exclusive) {
+ rc = rbd_add_acquire_lock(rbd_dev);
+ if (rc)
+ goto err_out_device_setup;
+ }
+
/* Everything's ready. Announce the disk to the world. */
rc = device_add(&rbd_dev->dev);
if (rc)
- goto err_out_device_setup;
+ goto err_out_image_lock;
add_disk(rbd_dev->disk);
/* see rbd_init_disk() */
@@ -6163,6 +6228,8 @@ static ssize_t do_rbd_add(struct bus_type *bus,
module_put(THIS_MODULE);
return rc;
+err_out_image_lock:
+ rbd_dev_image_unlock(rbd_dev);
err_out_device_setup:
rbd_dev_device_release(rbd_dev);
err_out_image_probe:
@@ -6286,11 +6353,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
spin_unlock(&rbd_dev_list_lock);
device_del(&rbd_dev->dev);
- down_write(&rbd_dev->lock_rwsem);
- if (__rbd_is_lock_owner(rbd_dev))
- rbd_unlock(rbd_dev);
- up_write(&rbd_dev->lock_rwsem);
-
+ rbd_dev_image_unlock(rbd_dev);
rbd_dev_device_release(rbd_dev);
rbd_dev_image_release(rbd_dev);
rbd_dev_destroy(rbd_dev);
Support disabling automatic exclusive lock transfers to allow users to be in charge of which node should own the lock while being able to reuse exclusive lock's built-in blacklist/break-lock functionality. Signed-off-by: Ilya Dryomov <idryomov@gmail.com> --- drivers/block/rbd.c | 83 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 73 insertions(+), 10 deletions(-)