@@ -28,16 +28,19 @@
*/
+#include <linux/crc32c.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/osd_client.h>
#include <linux/ceph/mon_client.h>
#include <linux/ceph/cls_lock_client.h>
#include <linux/ceph/striper.h>
#include <linux/ceph/decode.h>
+#include <linux/ceph/journaler.h>
#include <linux/parser.h>
#include <linux/bsearch.h>
#include <linux/kernel.h>
+#include <linux/bio.h>
#include <linux/device.h>
#include <linux/module.h>
#include <linux/blk-mq.h>
@@ -378,6 +381,8 @@ struct rbd_device {
atomic_t parent_ref;
struct rbd_device *parent;
+ struct rbd_journal *journal;
+
/* Block layer tags. */
struct blk_mq_tag_set tag_set;
@@ -408,6 +413,22 @@ enum rbd_dev_flags {
RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
};
+#define LOCAL_MIRROR_UUID ""
+#define LOCAL_CLIENT_ID ""
+
+enum rbd_journal_state {
+ RBD_JOURNAL_STATE_INITIALIZED,
+ RBD_JOURNAL_STATE_OPENED,
+ RBD_JOURNAL_STATE_CLOSED,
+};
+
+struct rbd_journal {
+ struct ceph_journaler *journaler;
+ uint64_t tag_tid;
+ /* state is protected by rbd_dev->lock_rwsem */
+ enum rbd_journal_state state;
+};
+
static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
static LIST_HEAD(rbd_dev_list); /* devices */
@@ -2681,6 +2702,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
/*
* lock_rwsem must be held for write
*/
+static int rbd_dev_open_journal(struct rbd_device *rbd_dev);
static int rbd_lock(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
@@ -2697,6 +2719,15 @@ static int rbd_lock(struct rbd_device *rbd_dev)
if (ret)
return ret;
+ if (rbd_dev->header.features & RBD_FEATURE_JOURNALING) {
+ ret = rbd_dev_open_journal(rbd_dev);
+ if (ret) {
+ rbd_warn(rbd_dev, "open journal failed: %d", ret);
+ set_disk_ro(rbd_dev->disk, true);
+ set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
+ return -EBLACKLISTED;
+ }
+ }
rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
__rbd_lock(rbd_dev, cookie);
return 0;
@@ -2705,6 +2736,7 @@ static int rbd_lock(struct rbd_device *rbd_dev)
/*
* lock_rwsem must be held for write
*/
+static void rbd_journal_close(struct rbd_journal *journal);
static void rbd_unlock(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
@@ -2713,6 +2745,9 @@ static void rbd_unlock(struct rbd_device *rbd_dev)
WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
rbd_dev->lock_cookie[0] == '\0');
+ if (rbd_dev->journal)
+ rbd_journal_close(rbd_dev->journal);
+
ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
RBD_LOCK_NAME, rbd_dev->lock_cookie);
if (ret && ret != -ENOENT)
@@ -5750,6 +5785,207 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
return ret;
}
+static int rbd_journal_allocate_tag(struct rbd_journal *journal);
+static int rbd_journal_open(struct rbd_journal *journal)
+{
+ struct ceph_journaler *journaler = journal->journaler;
+ int ret = 0;
+
+ ret = ceph_journaler_open(journaler);
+ if (ret)
+ goto out;
+
+ ret = ceph_journaler_start_replay(journaler);
+ if (ret)
+ goto err_close_journaler;
+
+ ret = rbd_journal_allocate_tag(journal);
+ if (ret)
+ goto err_close_journaler;
+
+ journal->state = RBD_JOURNAL_STATE_OPENED;
+ return ret;
+
+err_close_journaler:
+ ceph_journaler_close(journaler);
+
+out:
+ return ret;
+}
+
+static int rbd_dev_open_journal(struct rbd_device *rbd_dev)
+{
+ int ret = 0;
+ struct rbd_journal *journal = NULL;
+ struct ceph_journaler *journaler = NULL;
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+
+ if (rbd_dev->journal && rbd_dev->journal->state == RBD_JOURNAL_STATE_OPENED)
+ return 0;
+
+ // create journal
+ if (!rbd_dev->journal) {
+ journal = kzalloc(sizeof(struct rbd_journal), GFP_KERNEL);
+ if (!journal)
+ return -ENOMEM;
+
+ journal->state = RBD_JOURNAL_STATE_INITIALIZED;
+ journaler = ceph_journaler_create(osdc, &rbd_dev->header_oloc,
+ rbd_dev->spec->image_id,
+ LOCAL_CLIENT_ID);
+ if (!journaler) {
+ ret = -ENOMEM;
+ goto err_free_journal;
+ }
+
+ journaler->entry_handler = rbd_dev;
+ journaler->handle_entry = rbd_journal_replay;
+
+ journal->journaler = journaler;
+ rbd_dev->journal = journal;
+ }
+
+ // open journal
+ ret = rbd_journal_open(rbd_dev->journal);
+ if (ret)
+ goto err_destroy_journaler;
+
+ return ret;
+
+err_destroy_journaler:
+ ceph_journaler_destroy(journaler);
+err_free_journal:
+ kfree(rbd_dev->journal);
+ rbd_dev->journal = NULL;
+ return ret;
+}
+
+static void rbd_journal_close(struct rbd_journal *journal)
+{
+ if (journal->state == RBD_JOURNAL_STATE_CLOSED)
+ return;
+ ceph_journaler_close(journal->journaler);
+ journal->tag_tid = 0;
+ journal->state = RBD_JOURNAL_STATE_CLOSED;
+}
+
+static void rbd_dev_close_journal(struct rbd_device *rbd_dev)
+{
+ struct ceph_journaler *journaler = NULL;
+
+ if (!rbd_dev->journal)
+ return;
+
+ rbd_journal_close(rbd_dev->journal);
+
+ journaler = rbd_dev->journal->journaler;
+ ceph_journaler_destroy(journaler);
+ kfree(rbd_dev->journal);
+ rbd_dev->journal = NULL;
+}
+
+typedef struct rbd_journal_tag_predecessor {
+ bool commit_valid;
+ uint64_t tag_tid;
+ uint64_t entry_tid;
+ uint32_t uuid_len;
+ char *mirror_uuid;
+} rbd_journal_tag_predecessor;
+
+typedef struct rbd_journal_tag_data {
+ struct rbd_journal_tag_predecessor predecessor;
+ uint32_t uuid_len;
+ char *mirror_uuid;
+} rbd_journal_tag_data;
+
+static uint32_t tag_data_encoding_size(struct rbd_journal_tag_data *tag_data)
+{
+ // sizeof(uuid_len) 4 + uuid_len + 1 commit_valid + 8 tag_tid +
+ // 8 entry_tid + 4 sizeof(uuid_len) + uuid_len
+ return (4 + tag_data->uuid_len + 1 + 8 + 8 + 4 +
+ tag_data->predecessor.uuid_len);
+}
+
+static void predecessor_encode(void **p, void *end,
+ struct rbd_journal_tag_predecessor *predecessor)
+{
+ ceph_encode_string(p, end, predecessor->mirror_uuid,
+ predecessor->uuid_len);
+ ceph_encode_8(p, predecessor->commit_valid);
+ ceph_encode_64(p, predecessor->tag_tid);
+ ceph_encode_64(p, predecessor->entry_tid);
+}
+
+static int rbd_journal_encode_tag_data(void **p, void *end,
+ struct rbd_journal_tag_data *tag_data)
+{
+ struct rbd_journal_tag_predecessor *predecessor = &tag_data->predecessor;
+
+ ceph_encode_string(p, end, tag_data->mirror_uuid, tag_data->uuid_len);
+ predecessor_encode(p, end, predecessor);
+
+ return 0;
+}
+
+static int rbd_journal_allocate_tag(struct rbd_journal *journal)
+{
+ struct ceph_journaler_tag tag = {};
+ struct rbd_journal_tag_data tag_data = {};
+ struct ceph_journaler *journaler = journal->journaler;
+ struct ceph_journaler_client *client;
+ struct rbd_journal_tag_predecessor *predecessor;
+ struct ceph_journaler_object_pos *position;
+ void *orig_buf = NULL, *buf = NULL, *p = NULL, *end = NULL;
+ uint32_t buf_len;
+ int ret = 0;
+
+ ret = ceph_journaler_get_cached_client(journaler, LOCAL_CLIENT_ID, &client);
+ if (ret)
+ goto out;
+
+ predecessor = &tag_data.predecessor;
+ position = list_first_entry(&client->object_positions,
+ struct ceph_journaler_object_pos, node);
+
+ predecessor->commit_valid = true;
+ predecessor->tag_tid = position->tag_tid;
+ predecessor->entry_tid = position->entry_tid;
+ predecessor->uuid_len = 0;
+ predecessor->mirror_uuid = LOCAL_MIRROR_UUID;
+
+ tag_data.uuid_len = 0;
+ tag_data.mirror_uuid = LOCAL_MIRROR_UUID;
+
+ buf_len = tag_data_encoding_size(&tag_data);
+
+ p = kmalloc(buf_len, GFP_KERNEL);
+ if (!p) {
+ pr_err("failed to allocate tag data");
+ return -ENOMEM;
+ }
+
+ end = p + buf_len;
+ orig_buf = buf = p;
+ ret = rbd_journal_encode_tag_data(&p, end, &tag_data);
+ if (ret) {
+ pr_err("error in tag data");
+ goto free_buf;
+ }
+
+ ret = ceph_journaler_allocate_tag(journaler, 0, buf, buf_len, &tag);
+ if (ret)
+ goto free_data;
+
+ journal->tag_tid = tag.tid;
+free_data:
+ if(tag.data)
+ kfree(tag.data);
+free_buf:
+ kfree(orig_buf);
+out:
+ return ret;
+}
+
static void rbd_dev_image_release(struct rbd_device *rbd_dev)
{
rbd_dev_unprobe(rbd_dev);
@@ -6074,6 +6310,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
device_del(&rbd_dev->dev);
rbd_dev_image_unlock(rbd_dev);
+ rbd_dev_close_journal(rbd_dev);
rbd_dev_device_release(rbd_dev);
rbd_dev_image_release(rbd_dev);
rbd_dev_destroy(rbd_dev);
This commit introduce rbd_journal into rbd_device. with journaling feature enabled, We will open journal after exclusive-lock acquired and close journal before exclusive-lock released. Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn> --- drivers/block/rbd.c | 237 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 237 insertions(+)