new file mode 100644
@@ -0,0 +1,131 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _FS_CEPH_JOURNAL_H
+#define _FS_CEPH_JOURNAL_H
+
+#include <linux/bitrev.h>
+#include <linux/completion.h>
+#include <linux/kref.h>
+#include <linux/mempool.h>
+#include <linux/rbtree.h>
+#include <linux/refcount.h>
+
+#include <linux/ceph/types.h>
+#include <linux/ceph/osdmap.h>
+#include <linux/ceph/messenger.h>
+#include <linux/ceph/msgpool.h>
+#include <linux/ceph/auth.h>
+#include <linux/ceph/pagelist.h>
+#include <linux/ceph/cls_journaler_client.h>
+
+struct ceph_msg;
+struct ceph_snap_context;
+struct ceph_osd_request;
+struct ceph_osd_client;
+
+#define JOURNAL_HEADER_PREFIX "journal."
+#define JOURNAL_OBJECT_PREFIX "journal_data."
+
+#define LOCAL_MIRROR_UUID ""
+
+static const uint32_t HEADER_FIXED_SIZE = 25; /// preamble, version, entry tid, tag id
+static const uint32_t REMAINDER_FIXED_SIZE = 8; /// data size, crc
+
+static const uint64_t PREAMBLE = 0x3141592653589793;
+
+struct ceph_journaler_future {
+ uint64_t tag_tid;
+ uint64_t entry_tid;
+ uint64_t commit_tid;
+};
+
+struct ceph_journaler_entry {
+ uint64_t tag_tid;
+ uint64_t entry_tid;
+ ssize_t data_len;
+ char *data;
+
+ struct list_head node;
+};
+
+struct ceph_journaler_entry *ceph_journaler_entry_decode(void **p, void *end);
+void ceph_journaler_entry_encode(struct ceph_journaler_entry *entry, void **p, void *end);
+
+struct entry_tid {
+ struct list_head node;
+ uint64_t tag_tid;
+ uint64_t entry_tid;
+};
+
+struct commit_entry {
+ struct rb_node r_node;
+ uint64_t commit_tid;
+ uint64_t object_num;
+ uint64_t tag_tid;
+ uint64_t entry_tid;
+ bool committed;
+};
+
+struct ceph_journaler {
+ struct ceph_osd_client *osdc;
+ struct ceph_object_locator data_oloc;
+
+ struct ceph_object_id header_oid;
+ struct ceph_object_locator header_oloc;
+
+ char *object_oid_prefix;
+
+ struct mutex mutex;
+ spinlock_t meta_lock;
+ spinlock_t entry_tid_lock;
+ spinlock_t commit_lock;
+ uint8_t order;
+ uint8_t splay_width;
+ int64_t pool_id;
+
+ uint64_t commit_tid;
+
+ uint64_t minimum_set;
+ uint64_t active_set;
+
+ struct list_head clients;
+ struct ceph_journaler_client *client;
+ struct ceph_journaler_client *clients_array;
+ struct list_head object_positions_pending;
+
+ struct rb_root commit_entries;
+
+ struct list_head entry_tids;
+
+ struct workqueue_struct *task_wq;
+ struct work_struct notify_update_work;
+ struct work_struct commit_work;
+
+ uint64_t active_tag_tid;
+ bool commit_pos_valid;
+ struct ceph_journaler_object_pos *commit_pos;
+ uint64_t splay_offset;
+
+ int (*handle_entry)(void *entry_handler, struct ceph_journaler_entry *entry);
+ void *entry_handler;
+
+ struct ceph_osd_linger_request *watch_handle;
+};
+
+struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc,
+ const char *journal_id,
+ struct ceph_object_locator*_oloc);
+
+void ceph_journaler_destroy(struct ceph_journaler *journal);
+
+int ceph_journaler_open(struct ceph_journaler *journal);
+void ceph_journaler_close(struct ceph_journaler *journal);
+
+void start_replay(struct ceph_journaler *journaler);
+
+int ceph_journaler_append(struct ceph_journaler *journaler, uint64_t tag_tid, char *data, ssize_t data_len, struct ceph_journaler_future **future);
+void ceph_journaler_client_committed(struct ceph_journaler *journaler, uint64_t commit_tid);
+
+int ceph_journaler_allocate_tag(struct ceph_journaler *journaler, uint64_t tag_class, void *buf, uint32_t buf_len, struct ceph_journaler_tag *tag);
+
+int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *client_id, uint32_t id_len, struct ceph_journaler_client *client_result);
+#endif
@@ -14,5 +14,6 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
crypto.o armor.o \
auth_x.o \
ceph_fs.o ceph_strings.o ceph_hash.o \
- pagevec.o snapshot.o string_table.o
+ pagevec.o snapshot.o string_table.o \
+ journaler.o cls_journaler_client.o
new file mode 100644
@@ -0,0 +1,1208 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/ceph/ceph_debug.h>
+
+#include <linux/crc32c.h>
+#include <linux/module.h>
+#include <linux/err.h>
+#include <linux/highmem.h>
+#include <linux/mm.h>
+#include <linux/pagemap.h>
+#include <linux/slab.h>
+#include <linux/uaccess.h>
+#ifdef CONFIG_BLOCK
+#include <linux/bio.h>
+#endif
+
+#include <linux/ceph/ceph_features.h>
+#include <linux/ceph/libceph.h>
+#include <linux/ceph/osd_client.h>
+#include <linux/ceph/journaler.h>
+#include <linux/ceph/cls_journaler_client.h>
+#include <linux/ceph/messenger.h>
+#include <linux/ceph/decode.h>
+#include <linux/ceph/auth.h>
+#include <linux/ceph/pagelist.h>
+#include <linux/ceph/striper.h>
+
+#define LOCAL_MIRROR_UUID ""
+
+static char *object_oid_prefix(int pool_id, const char *journal_id)
+{
+ char *prefix = NULL;
+ ssize_t len = snprintf(NULL, 0, "%s%d.%s.", JOURNAL_OBJECT_PREFIX, pool_id, journal_id);
+
+ prefix = kzalloc(len + 1, GFP_KERNEL);
+
+ if (!prefix)
+ return prefix;
+
+ WARN_ON(snprintf(prefix, len + 1, "%s%d.%s.", JOURNAL_OBJECT_PREFIX, pool_id, journal_id) != len);
+
+ return prefix;
+}
+
+static void notify_update(struct ceph_journaler* journaler);
+static void journaler_notify_update(struct work_struct *work)
+{
+ struct ceph_journaler *journaler = container_of(work, struct ceph_journaler,
+ notify_update_work);
+
+ notify_update(journaler);
+}
+
+static int copy_object_pos(struct ceph_journaler_object_pos *pos, struct ceph_journaler_object_pos **new_pos)
+{
+ struct ceph_journaler_object_pos *temp_pos;
+
+ temp_pos = kzalloc(sizeof(*temp_pos), GFP_KERNEL);
+ if (temp_pos == NULL) {
+ return -ENOMEM;
+ }
+ INIT_LIST_HEAD(&temp_pos->node);
+ temp_pos->object_num = pos->object_num;
+ temp_pos->tag_tid = pos->tag_tid;
+ temp_pos->entry_tid = pos->entry_tid;
+
+ *new_pos = temp_pos;
+
+ return 0;
+}
+
+static void journaler_client_commit(struct work_struct *work)
+{
+ struct ceph_journaler *journaler = container_of(work, struct ceph_journaler,
+ commit_work);
+
+ struct list_head object_positions;
+ struct ceph_journaler_object_pos *pos = NULL, *next = NULL;
+ int ret = 0;
+
+ INIT_LIST_HEAD(&object_positions);
+ spin_lock(&journaler->commit_lock);
+ list_for_each_entry_safe(pos, next, &journaler->object_positions_pending, node) {
+ struct ceph_journaler_object_pos *new_pos = NULL;
+
+ ret = copy_object_pos(pos, &new_pos);
+ list_add_tail(&new_pos->node, &object_positions);
+ }
+ spin_unlock(&journaler->commit_lock);
+
+ ret = ceph_cls_journaler_client_committed(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, journaler->client, &object_positions);
+
+ list_for_each_entry_safe(pos, next, &object_positions, node) {
+ list_del(&pos->node);
+ kfree(pos);
+ }
+
+ queue_work(journaler->task_wq, &journaler->notify_update_work);
+ return;
+}
+
+
+struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc,
+ const char *journal_id,
+ struct ceph_object_locator *oloc)
+{
+ struct ceph_journaler *journaler = NULL;
+ int ret = 0;
+
+ journaler = kzalloc(sizeof(struct ceph_journaler), GFP_KERNEL);
+ if (!journaler)
+ return NULL;
+
+ ceph_oid_init(&journaler->header_oid);
+ ret = ceph_oid_aprintf(&journaler->header_oid, GFP_KERNEL, "%s%s",
+ JOURNAL_HEADER_PREFIX, journal_id);
+ if (ret) {
+ pr_err("aprintf error : %d", ret);
+ goto err_free_journaler;
+ }
+
+ journaler->osdc = osdc;
+ ceph_oloc_init(&journaler->header_oloc);
+ ceph_oloc_copy(&journaler->header_oloc, oloc);
+ ceph_oloc_init(&journaler->data_oloc);
+ INIT_LIST_HEAD(&journaler->clients);
+ INIT_LIST_HEAD(&journaler->entry_tids);
+ INIT_LIST_HEAD(&journaler->object_positions_pending);
+ journaler->commit_entries = RB_ROOT;
+ journaler->object_oid_prefix = object_oid_prefix(journaler->header_oloc.pool, journal_id);
+ journaler->commit_tid = 0;
+ journaler->client = NULL;
+ journaler->clients_array = NULL;
+
+ spin_lock_init(&journaler->meta_lock);
+ spin_lock_init(&journaler->entry_tid_lock);
+ spin_lock_init(&journaler->commit_lock);
+ mutex_init(&journaler->mutex);
+ INIT_WORK(&journaler->notify_update_work, journaler_notify_update);
+ INIT_WORK(&journaler->commit_work, journaler_client_commit);
+
+ journaler->task_wq = alloc_ordered_workqueue("journaler-tasks", WQ_MEM_RECLAIM);
+ if (!journaler->task_wq)
+ goto err_free_oid_prefix;
+
+ return journaler;
+
+err_free_oid_prefix:
+ kfree(journaler->object_oid_prefix);
+ ceph_oid_destroy(&journaler->header_oid);
+ ceph_oloc_destroy(&journaler->header_oloc);
+ ceph_oloc_destroy(&journaler->data_oloc);
+err_free_journaler:
+ kfree(journaler);
+ return NULL;
+}
+EXPORT_SYMBOL(ceph_journaler_create);
+
+void ceph_journaler_destroy(struct ceph_journaler *journaler)
+{
+ destroy_workqueue(journaler->task_wq);
+ kfree(journaler->object_oid_prefix);
+ ceph_oid_destroy(&journaler->header_oid);
+ ceph_oloc_destroy(&journaler->header_oloc);
+ ceph_oloc_destroy(&journaler->data_oloc);
+ kfree(journaler);
+}
+EXPORT_SYMBOL(ceph_journaler_destroy);
+
+static void notify_update(struct ceph_journaler* journaler)
+{
+ int ret;
+
+ ret = ceph_osdc_notify(journaler->osdc, &journaler->header_oid,
+ &journaler->header_oloc, NULL, 0,
+ 5000, NULL, NULL);
+
+ if (ret)
+ pr_err("notify_update failed: %d", ret);
+}
+
+static int set_minimum_set(struct ceph_journaler* journaler, uint64_t minimum_set)
+{
+ int ret = 0;
+
+ ret = ceph_cls_journaler_set_minimum_set(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, minimum_set);
+ if (ret < 0) {
+ pr_err("%s: failed to set_minimum_set: %d", __func__, ret);
+ return ret;
+ }
+
+ queue_work(journaler->task_wq, &journaler->notify_update_work);
+
+ return ret;
+}
+
+static int ceph_journaler_obj_remove_sync(struct ceph_journaler *journaler,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc);
+static int remove_set(struct ceph_journaler *journaler, uint64_t object_set)
+{
+ uint64_t object_num = 0;
+ int splay_offset = 0;
+ struct ceph_object_id object_oid;
+ int ret = 0;
+
+ ceph_oid_init(&object_oid);
+ for (splay_offset = 0; splay_offset < journaler->splay_width; splay_offset++) {
+ object_num = splay_offset + (object_set * journaler->splay_width);
+ if (!ceph_oid_empty(&object_oid)) {
+ ceph_oid_destroy(&object_oid);
+ ceph_oid_init(&object_oid);
+ }
+ ret = ceph_oid_aprintf(&object_oid, GFP_KERNEL, "%s%llu",
+ journaler->object_oid_prefix, object_num);
+ if (ret) {
+ pr_err("aprintf error : %d", ret);
+ return ret;
+ }
+ ret = ceph_journaler_obj_remove_sync(journaler, &object_oid, &journaler->data_oloc);
+ if (ret < 0) {
+ pr_err("%s: failed to remove object: %llu", __func__, object_num);
+ return ret;
+ }
+ }
+
+ return ret;
+}
+
+static void destroy_client(struct ceph_journaler_client *client);
+static int refresh(struct ceph_journaler *journaler)
+{
+ int ret = 0;
+ int i = 0;
+ uint32_t client_num = 0;
+ struct ceph_journaler_client *clients = NULL;
+ struct ceph_journaler_client *client, *next;
+ uint64_t minimum_commit_set;
+ uint64_t minimum_set;
+
+ ret = ceph_cls_journaler_get_mutable_metas(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, &journaler->minimum_set, &journaler->active_set);
+ if (ret)
+ return ret;
+
+ ret = ceph_cls_journaler_client_list(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, &clients, &client_num);
+ if (ret)
+ return ret;
+
+ list_for_each_entry_safe(client, next, &journaler->clients, node) {
+ list_del(&client->node);
+ destroy_client(client);
+ }
+
+ if (journaler->clients_array != NULL)
+ kfree(journaler->clients_array);
+
+ journaler->clients_array = clients;
+
+ for (i = 0; i < client_num; i++) {
+ struct ceph_journaler_client *client = &clients[i];
+
+ list_add_tail(&client->node, &journaler->clients);
+ if (!memcmp(client->id, LOCAL_MIRROR_UUID, sizeof(LOCAL_MIRROR_UUID))) {
+ journaler->client = client;
+ }
+ }
+
+ minimum_commit_set = journaler->active_set;
+ list_for_each_entry(client, &journaler->clients, node) {
+ //TODO check the state of client
+ struct ceph_journaler_object_pos *pos;
+ list_for_each_entry(pos, &client->object_positions, node) {
+ uint64_t object_set = pos->object_num / journaler->splay_width;
+ if (object_set < journaler->minimum_set) {
+ minimum_commit_set = object_set;
+ }
+ }
+ }
+
+ minimum_set = journaler->minimum_set;
+ pr_debug("minimum_commit_set: %llu, minimum_set: %llu", minimum_commit_set, minimum_set);
+ if (minimum_commit_set > minimum_set) {
+ uint64_t trim_set = minimum_set;
+ while (trim_set < minimum_commit_set) {
+ ret = remove_set(journaler, trim_set);
+ if (ret < 0) {
+ pr_err("failed to trim object_set: %llu", trim_set);
+ return ret;
+ }
+ trim_set++;
+ }
+ ret = set_minimum_set(journaler, minimum_commit_set);
+ if (ret < 0) {
+ pr_err("failed to set minimum set to %llu", minimum_commit_set);
+ return ret;
+ }
+ }
+
+ return 0;
+
+}
+
+static void journaler_watch_cb(void *arg, u64 notify_id, u64 cookie,
+ u64 notifier_id, void *data, size_t data_len)
+{
+ struct ceph_journaler *journaler = arg;
+ int ret;
+
+ ret = ceph_osdc_notify_ack(journaler->osdc, &journaler->header_oid,
+ &journaler->header_oloc, notify_id, cookie,
+ NULL, 0);
+
+ if (ret)
+ pr_err("acknowledge_notify failed: %d", ret);
+
+ mutex_lock(&journaler->mutex);
+ ret = refresh(journaler);
+ mutex_unlock(&journaler->mutex);
+
+ if (ret < 0) {
+ pr_err("%s: failed to refresh journaler: %d", __func__, ret);
+ }
+}
+
+static void journaler_watch_errcb(void *arg, u64 cookie, int err)
+{
+ pr_err("journaler watch error: %d", err);
+}
+
+static int journaler_watch(struct ceph_journaler *journaler)
+{
+ struct ceph_osd_client *osdc = journaler->osdc;
+ struct ceph_osd_linger_request *handle;
+
+ handle = ceph_osdc_watch(osdc, &journaler->header_oid,
+ &journaler->header_oloc, journaler_watch_cb,
+ journaler_watch_errcb, journaler);
+ if (IS_ERR(handle))
+ return PTR_ERR(handle);
+
+ journaler->watch_handle = handle;
+ return 0;
+}
+
+static void journaler_unwatch(struct ceph_journaler *journaler)
+{
+ struct ceph_osd_client *osdc = journaler->osdc;
+ int ret = 0;
+
+ ret = ceph_osdc_unwatch(osdc, journaler->watch_handle);
+ if (ret)
+ pr_err("%s: failed to unwatch: %d", __func__, ret);
+
+ journaler->watch_handle = NULL;
+}
+
+static int ceph_journaler_obj_remove_sync(struct ceph_journaler *journaler,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc)
+
+{
+ struct ceph_osd_client *osdc = journaler->osdc;
+ struct ceph_osd_request *req;
+ int ret;
+
+ req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+ if (!req)
+ return -ENOMEM;
+
+ ceph_oid_copy(&req->r_base_oid, oid);
+ ceph_oloc_copy(&req->r_base_oloc, oloc);
+ req->r_flags = CEPH_OSD_FLAG_WRITE;
+
+ ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+ if (ret)
+ goto out_req;
+
+ osd_req_op_init(req, 0, CEPH_OSD_OP_DELETE, 0);
+
+ ceph_osdc_start_request(osdc, req, false);
+ ret = ceph_osdc_wait_request(osdc, req);
+
+out_req:
+ ceph_osdc_put_request(req);
+ return ret;
+}
+
+//TODO make it async
+static int ceph_journaler_obj_write_sync(struct ceph_journaler *journaler,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ void *buf, int buf_len)
+
+{
+ struct ceph_osd_client *osdc = journaler->osdc;
+ struct ceph_osd_request *req;
+ struct page **pages;
+ int num_pages = calc_pages_for(0, buf_len);
+ int ret;
+
+ req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+ if (!req)
+ return -ENOMEM;
+
+ ceph_oid_copy(&req->r_base_oid, oid);
+ ceph_oloc_copy(&req->r_base_oloc, oloc);
+ req->r_flags = CEPH_OSD_FLAG_WRITE;
+
+ ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+ if (ret)
+ goto out_req;
+
+ pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+ if (IS_ERR(pages)) {
+ ret = PTR_ERR(pages);
+ goto out_req;
+ }
+
+ ceph_copy_to_page_vector(pages, buf, 0, buf_len);
+
+ osd_req_op_extent_init(req, 0, CEPH_OSD_OP_APPEND, 0, buf_len, 0, 0);
+ osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
+ true);
+
+ ceph_osdc_start_request(osdc, req, false);
+ ret = ceph_osdc_wait_request(osdc, req);
+
+out_req:
+ ceph_osdc_put_request(req);
+ return ret;
+}
+
+static int ceph_journaler_obj_read_sync(struct ceph_journaler *journaler,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ void *buf, uint32_t read_off, uint64_t buf_len)
+
+{
+ struct ceph_osd_client *osdc = journaler->osdc;
+ struct ceph_osd_request *req;
+ struct page **pages;
+ int num_pages = calc_pages_for(0, buf_len);
+ int ret;
+
+ req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+ if (!req)
+ return -ENOMEM;
+
+ ceph_oid_copy(&req->r_base_oid, oid);
+ ceph_oloc_copy(&req->r_base_oloc, oloc);
+ req->r_flags = CEPH_OSD_FLAG_READ;
+
+ ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+ if (ret)
+ goto out_req;
+
+ pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+ if (IS_ERR(pages)) {
+ ret = PTR_ERR(pages);
+ goto out_req;
+ }
+
+ osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, read_off, buf_len, 0, 0);
+ osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
+ true);
+
+ ceph_osdc_start_request(osdc, req, false);
+ ret = ceph_osdc_wait_request(osdc, req);
+ if (ret >= 0)
+ ceph_copy_from_page_vector(pages, buf, 0, ret);
+
+out_req:
+ ceph_osdc_put_request(req);
+ return ret;
+}
+
+static bool entry_is_readable(struct ceph_journaler *journaler, void *buf, void *end, uint32_t *bytes_needed)
+{
+ uint32_t remaining = end - buf;
+ uint64_t preamble;
+ uint32_t data_size;
+ void *origin_buf = buf;
+ uint32_t crc = 0, crc_encoded = 0;
+
+ if (remaining < HEADER_FIXED_SIZE) {
+ *bytes_needed = HEADER_FIXED_SIZE - remaining;
+ return false;
+ }
+
+ preamble = ceph_decode_64(&buf);
+ if (PREAMBLE != preamble) {
+ pr_err("preamble is not correct");
+ *bytes_needed = 0;
+ return false;
+ }
+
+ buf += (HEADER_FIXED_SIZE - sizeof(preamble));
+ remaining = end - buf;
+ if (remaining < sizeof(uint32_t)) {
+ *bytes_needed = sizeof(uint32_t) - remaining;
+ return false;
+ }
+
+ data_size = ceph_decode_32(&buf);
+ remaining = end - buf;
+ if (remaining < data_size) {
+ *bytes_needed = data_size - remaining;
+ return false;
+ }
+
+ buf += data_size;
+
+ remaining = end - buf;
+ if (remaining < sizeof(uint32_t)) {
+ *bytes_needed = sizeof(uint32_t) - remaining;
+ return false;
+ }
+
+ *bytes_needed = 0;
+ crc = crc32c(0, origin_buf, buf - origin_buf);
+
+ crc_encoded = ceph_decode_32(&buf);
+
+ if (crc != crc_encoded) {
+ return false;
+ }
+ return true;
+}
+
+static void playback_entry(struct ceph_journaler *journaler, struct ceph_journaler_entry *entry)
+{
+ //TODO verify the entry, skip stale tag_tid or others.
+ if (journaler->handle_entry != NULL) {
+ journaler->handle_entry(journaler->entry_handler, entry);
+ }
+}
+
+static void reserve_entry_tid(struct ceph_journaler *journaler, uint64_t tag_tid, uint64_t entry_tid)
+{
+ struct entry_tid *pos;
+
+ spin_lock(&journaler->entry_tid_lock);
+ list_for_each_entry(pos, &journaler->entry_tids, node) {
+ if (pos->tag_tid == tag_tid) {
+ if (pos->entry_tid < entry_tid) {
+ pos->entry_tid = entry_tid;
+ }
+
+ spin_unlock(&journaler->entry_tid_lock);
+ return;
+ }
+ }
+
+ pos = kzalloc(sizeof(struct entry_tid), GFP_KERNEL);
+ WARN_ON(pos == NULL);
+
+ pos->tag_tid = tag_tid;
+ pos->entry_tid = entry_tid;
+ INIT_LIST_HEAD(&pos->node);
+
+ list_add_tail(&pos->node, &journaler->entry_tids);
+ spin_unlock(&journaler->entry_tid_lock);
+
+ return;
+}
+
+DEFINE_RB_INSDEL_FUNCS(commit_entry, struct commit_entry, commit_tid, r_node)
+
+static int add_commit_entry(struct ceph_journaler *journaler, uint64_t commit_tid, uint64_t object_num, uint64_t tag_tid, uint64_t entry_tid)
+{
+ struct commit_entry *entry = NULL;
+ int ret = 0;
+
+ entry = kzalloc(sizeof(*entry), GFP_KERNEL);
+ if (entry == NULL) {
+ ret = -ENOMEM;
+ goto out;
+ }
+ RB_CLEAR_NODE(&entry->r_node);
+
+ entry->commit_tid = commit_tid;
+ entry->object_num = object_num;
+ entry->tag_tid = tag_tid;
+ entry->entry_tid = entry_tid;
+
+ spin_lock(&journaler->commit_lock);
+ insert_commit_entry(&journaler->commit_entries, entry);
+ spin_unlock(&journaler->commit_lock);
+
+out:
+ return ret;
+}
+
+static uint64_t allocate_commit_tid(struct ceph_journaler *journaler,
+ uint64_t object_num, uint64_t tag_tid,
+ uint64_t entry_tid)
+{
+ return ++journaler->commit_tid;
+}
+
+void ceph_journaler_client_committed(struct ceph_journaler *journaler, uint64_t commit_tid);
+static void process_entries(struct ceph_journaler *journaler, struct list_head *entry_list, struct ceph_journaler_object_pos *position)
+{
+ struct ceph_journaler_entry *entry, *next;
+ bool found_commit = false;
+ uint64_t commit_tid;
+
+ list_for_each_entry_safe(entry, next, entry_list, node) {
+ if (entry->tag_tid == position->tag_tid &&
+ entry->entry_tid == position->entry_tid) {
+ found_commit = true;
+ continue;
+ } else if (found_commit) {
+ reserve_entry_tid(journaler, entry->tag_tid, entry->entry_tid);
+ commit_tid = allocate_commit_tid(journaler, position->object_num, entry->tag_tid, entry->entry_tid);
+ playback_entry(journaler, entry);
+ add_commit_entry(journaler, commit_tid, position->object_num, entry->tag_tid, entry->entry_tid);
+ ceph_journaler_client_committed(journaler, commit_tid);
+ } else {
+ reserve_entry_tid(journaler, entry->tag_tid, entry->entry_tid);
+ }
+ list_del(&entry->node);
+ kfree(entry);
+ }
+ return;
+}
+
+static struct ceph_journaler_entry *journaler_entry_decode(void **p, void *end);
+static void fetch(struct ceph_journaler *journaler, uint64_t object_num)
+{
+ struct ceph_object_id object_oid;
+ int ret = 0;
+ void *buf = NULL, *read_buf = NULL, *buf_p = NULL;
+ void *end = NULL;
+ uint64_t read_len = 2 << journaler->order;
+ uint32_t read_off = 0;
+ uint64_t buf_len = read_len;
+ struct list_head entry_list;
+ bool position_found = false;
+
+ struct ceph_journaler_object_pos *pos;
+
+ list_for_each_entry(pos, &journaler->client->object_positions, node) {
+ if (pos->object_num == object_num) {
+ position_found = true;
+ break;
+ }
+ }
+
+ if (!position_found) {
+ return;
+ }
+
+ INIT_LIST_HEAD(&entry_list);
+ ceph_oid_init(&object_oid);
+ ret = ceph_oid_aprintf(&object_oid, GFP_KERNEL, "%s%llu",
+ journaler->object_oid_prefix, object_num);
+ if (ret) {
+ pr_err("aprintf error : %d", ret);
+ return;
+ }
+
+ buf = vmalloc(buf_len);
+ if (!buf) {
+ pr_err("failed to vmalloc buf: %llu", buf_len);
+ goto err_free_object_oid;
+ }
+ read_buf = buf;
+ buf_p = buf;
+
+refetch:
+ ret = ceph_journaler_obj_read_sync(journaler, &object_oid, &journaler->data_oloc, read_buf, read_off, read_len);
+ if (ret == -ENOENT) {
+ pr_err("no such object: %d", ret);
+ goto err_free_buf;
+ } else if (ret < 0) {
+ pr_err("failed to read: %d", ret);
+ goto err_free_buf;
+ } else if (ret == 0) {
+ pr_err("no data: %d", ret);
+ goto err_free_buf;
+ }
+ read_off = read_off + ret;
+
+ end = read_buf + ret;
+ while (buf < end) {
+ uint32_t bytes_needed = 0;
+ struct ceph_journaler_entry *entry = NULL;
+
+ if (!entry_is_readable(journaler, buf, end, &bytes_needed)) {
+ uint64_t remain = end - buf;
+ if (bytes_needed != 0) {
+ void *new_buf = vmalloc(read_len + remain);
+ if (!new_buf) {
+ pr_err("failed to alloc new buf");
+ goto err_free_buf;
+ }
+ memcpy(new_buf, buf, remain);
+ vfree(buf_p);
+ buf_p = new_buf;
+ buf = new_buf;
+ read_buf = buf + remain;
+ goto refetch;
+ } else {
+ pr_err("entry corruption");
+ goto err_free_buf;
+ }
+ }
+ entry = journaler_entry_decode(&buf, end);
+ if (!entry)
+ goto err_free_buf;
+
+ list_add_tail(&entry->node, &entry_list);
+ }
+
+ process_entries(journaler, &entry_list, pos);
+
+err_free_buf:
+ vfree(buf_p);
+err_free_object_oid:
+ ceph_oid_destroy(&object_oid);
+ return;
+}
+
+void start_replay(struct ceph_journaler *journaler)
+{
+ struct ceph_journaler_object_pos *active_pos = NULL;
+ struct ceph_journaler_client *client = NULL;
+ uint64_t *fetch_objects = kzalloc(sizeof(uint64_t) * journaler->splay_width, GFP_KERNEL);
+ int index = 0;
+ int i = 0;
+
+ mutex_lock(&journaler->mutex);
+ client = journaler->client;
+ active_pos = list_first_entry(&journaler->client->object_positions, struct ceph_journaler_object_pos, node);
+
+ journaler->active_tag_tid = active_pos->tag_tid;
+ journaler->commit_pos_valid = true;
+ journaler->commit_pos = active_pos;
+ journaler->splay_offset = active_pos->object_num % journaler->splay_width;
+
+ list_for_each_entry(active_pos, &client->object_positions, node) {
+ fetch_objects[index++] = active_pos->object_num;
+ }
+
+ for (i = 0; i < index; i++) {
+ fetch(journaler, fetch_objects[i]);
+ }
+ mutex_unlock(&journaler->mutex);
+}
+EXPORT_SYMBOL(start_replay);
+
+int ceph_journaler_open(struct ceph_journaler *journaler)
+{
+ struct ceph_journaler_object_pos *pos = NULL;
+ int ret = 0;
+
+ ret = journaler_watch(journaler);
+ if (ret) {
+ pr_err("journaler_watch error: %d", ret);
+ return ret;
+ }
+
+ ret = ceph_cls_journaler_get_immutable_metas(journaler->osdc,
+ &journaler->header_oid,
+ &journaler->header_oloc,
+ &journaler->order,
+ &journaler->splay_width,
+ &journaler->pool_id);
+ if (ret) {
+ pr_err("failed to get immutable metas.");;
+ goto err_unwatch;
+ }
+
+ if (journaler->pool_id == -1) {
+ ceph_oloc_copy(&journaler->data_oloc, &journaler->header_oloc);
+ journaler->pool_id = journaler->data_oloc.pool;
+ } else {
+ journaler->data_oloc.pool = journaler->pool_id;
+ }
+
+ mutex_lock(&journaler->mutex);
+ ret = refresh(journaler);
+ mutex_unlock(&journaler->mutex);
+ if (ret)
+ goto err_unwatch;
+
+ list_for_each_entry(pos, &journaler->client->object_positions, node) {
+ struct ceph_journaler_object_pos *new_pos = NULL;
+
+ ret = copy_object_pos(pos, &new_pos);
+ if (ret) {
+ goto err_unwatch;
+ }
+
+ list_add_tail(&new_pos->node, &journaler->object_positions_pending);
+ }
+
+ return 0;
+
+err_unwatch:
+ journaler_unwatch(journaler);
+
+ return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_open);
+
+static void destroy_client(struct ceph_journaler_client *client)
+{
+ struct ceph_journaler_object_pos *pos, *next;
+
+ list_for_each_entry_safe(pos, next, &client->object_positions, node) {
+ list_del(&pos->node);
+ kfree(pos);
+ }
+ kfree(client->id);
+ kfree(client->data);
+
+}
+
+void ceph_journaler_close(struct ceph_journaler *journaler)
+{
+ struct ceph_journaler_client *client = NULL, *next = NULL;
+ struct commit_entry *entry = NULL;
+ struct entry_tid *entry_tid = NULL, *entry_tid_next = NULL;
+ struct rb_node *n;
+
+ journaler_unwatch(journaler);
+
+ list_for_each_entry_safe(client, next, &journaler->clients, node) {
+ list_del(&client->node);
+ destroy_client(client);
+ }
+
+ if (journaler->clients_array != NULL)
+ kfree(journaler->clients_array);
+
+ for (n = rb_first(&journaler->commit_entries); n;) {
+ entry = rb_entry(n, struct commit_entry, r_node);
+
+ n = rb_next(n);
+ erase_commit_entry(&journaler->commit_entries, entry);
+ kfree(entry);
+ }
+
+
+ list_for_each_entry_safe(entry_tid, entry_tid_next, &journaler->entry_tids, node) {
+ list_del(&entry_tid->node);
+ kfree(entry_tid);
+ }
+
+ return;
+}
+EXPORT_SYMBOL(ceph_journaler_close);
+
+static struct ceph_journaler_entry *journaler_entry_decode(void **p, void *end)
+{
+ struct ceph_journaler_entry *entry = NULL;
+ uint64_t preamble = 0;
+ uint8_t version = 0;
+ uint32_t crc = 0, crc_encoded = 0;
+ void *start = *p;
+
+ preamble = ceph_decode_64(p);
+ if (PREAMBLE != preamble) {
+ return NULL;
+ }
+
+ version = ceph_decode_8(p);
+
+ if (version != 1)
+ return NULL;
+
+ entry = kzalloc(sizeof(struct ceph_journaler_entry), GFP_KERNEL);
+
+ INIT_LIST_HEAD(&entry->node);
+ entry->entry_tid = ceph_decode_64(p);
+ entry->tag_tid = ceph_decode_64(p);
+ entry->data = ceph_extract_encoded_string(p, end, &entry->data_len, GFP_NOIO);
+ if (!entry->data)
+ goto error;
+
+ crc = crc32c(0, start, *p - start);
+
+ crc_encoded = ceph_decode_32(p);
+
+ if (crc != crc_encoded)
+ goto free_data;
+
+ return entry;
+free_data:
+ kfree(entry->data);
+error:
+ kfree(entry);
+ return NULL;
+}
+
+static void journaler_entry_encode(struct ceph_journaler_entry *entry, void **p, void *end)
+{
+ void *start = *p;
+ uint32_t crc = 0;
+
+ ceph_encode_64(p, PREAMBLE);
+ ceph_encode_8(p, (uint8_t)1);
+ ceph_encode_64(p, entry->entry_tid);
+ ceph_encode_64(p, entry->tag_tid);
+ ceph_encode_string(p, end, entry->data, entry->data_len);
+
+ crc = crc32c(0, start, *p - start);
+
+ ceph_encode_32(p, crc);
+
+ return;
+}
+
+// record
+static ssize_t ceph_entry_buf_size(struct ceph_journaler_entry *entry)
+{
+ // PEAMBLE(8) + version(1) + entry_tid(8) + tag_tid(8) + string_len(4) + crc(4) = 33
+ return entry->data_len + 33;
+}
+
+static uint64_t get_object(struct ceph_journaler *journaler, uint64_t splay_offset)
+{
+ return splay_offset + (journaler->splay_width * journaler->active_set);
+}
+
+static void advance_object_set(struct ceph_journaler *journaler)
+{
+ int ret = 0;
+
+ journaler->active_set++;
+
+ ret = ceph_cls_journaler_set_active_set(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, journaler->active_set);
+}
+
+static int ceph_journaler_object_append(struct ceph_journaler *journaler, uint64_t object_num,
+ struct ceph_journaler_future *future,
+ struct ceph_journaler_entry *entry)
+{
+ void *buf = NULL;
+ void *start_buf = NULL;
+ void *end = NULL;
+ ssize_t buf_len;
+ struct ceph_object_id object_oid;
+ int ret = 0;
+
+
+ buf_len = ceph_entry_buf_size(entry);
+ buf = vmalloc(buf_len);
+ end = buf + buf_len;
+ start_buf = buf;
+
+ journaler_entry_encode(entry, &buf, end);
+
+ ceph_oid_init(&object_oid);
+
+retry:
+ if (!ceph_oid_empty(&object_oid)) {
+ ceph_oid_destroy(&object_oid);
+ ceph_oid_init(&object_oid);
+ }
+ ret = ceph_oid_aprintf(&object_oid, GFP_KERNEL, "%s%llu",
+ journaler->object_oid_prefix, object_num);
+
+ //TODO send "guard append" and "write" in a single request
+ ret = ceph_cls_journaler_guard_append(journaler->osdc, &object_oid, &journaler->header_oloc, 1 << journaler->order);
+
+ if (ret == -EOVERFLOW) {
+ pr_debug("overflow: %llu", journaler->active_set);
+ advance_object_set(journaler);
+ object_num = get_object(journaler, entry->entry_tid % journaler->splay_width);
+ goto retry;
+ }
+
+ ret = ceph_journaler_obj_write_sync(journaler, &object_oid, &journaler->data_oloc, start_buf, buf_len);
+
+ if (ret) {
+ pr_err("error in write entry: %d", ret);
+ }
+
+ pr_debug("write event: %d, tagtid: %llu", ret, entry->tag_tid);
+
+ return ret;
+}
+
+static uint64_t allocate_entry_tid(struct ceph_journaler *journaler, uint64_t tag_tid)
+{
+ struct entry_tid *pos = NULL;
+ uint64_t entry_tid = 0;
+
+ spin_lock(&journaler->entry_tid_lock);
+ list_for_each_entry(pos, &journaler->entry_tids, node) {
+ if (pos->tag_tid == tag_tid) {
+ entry_tid = pos->entry_tid++;
+ spin_unlock(&journaler->entry_tid_lock);
+ return entry_tid;
+ }
+ }
+ pos = kzalloc(sizeof(struct entry_tid), GFP_KERNEL);
+ WARN_ON(pos == NULL);
+
+ pos->tag_tid = tag_tid;
+ pos->entry_tid = 0;
+ INIT_LIST_HEAD(&pos->node);
+
+ list_add_tail(&pos->node, &journaler->entry_tids);
+ entry_tid = pos->entry_tid++;
+ spin_unlock(&journaler->entry_tid_lock);
+
+ return entry_tid;
+}
+
+static struct ceph_journaler_future *create_future(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid)
+{
+ struct ceph_journaler_future *future = NULL;
+
+ future = kzalloc(sizeof(struct ceph_journaler_future), GFP_KERNEL);
+ if (!future)
+ return NULL;
+ future->tag_tid = tag_tid;
+ future->entry_tid = entry_tid;
+ future->commit_tid = commit_tid;
+
+ return future;
+}
+
+static struct ceph_journaler_entry *create_entry(uint64_t tag_tid, uint64_t entry_tid, char* data, ssize_t data_len)
+{
+ struct ceph_journaler_entry *entry = NULL;
+
+ entry = kzalloc(sizeof(struct ceph_journaler_entry), GFP_KERNEL);
+ if (!entry)
+ return NULL;
+ entry->tag_tid = tag_tid;
+ entry->entry_tid = entry_tid;
+ entry->data = data;
+ entry->data_len = data_len;
+
+ return entry;
+}
+
+int ceph_journaler_append(struct ceph_journaler *journaler, uint64_t tag_tid, char *data, ssize_t data_len, struct ceph_journaler_future **journal_future)
+{
+ uint64_t entry_tid;
+ uint8_t splay_width;
+ uint8_t splay_offset;
+
+ uint64_t object_num;
+ uint64_t commit_tid;
+
+ struct ceph_journaler_future *future;
+
+ struct ceph_journaler_entry *entry;
+
+ int ret = 0;
+
+ spin_lock(&journaler->meta_lock);
+ entry_tid = allocate_entry_tid(journaler, tag_tid);
+ splay_width = journaler->splay_width;
+ splay_offset = entry_tid % splay_width;
+
+ object_num = get_object(journaler, splay_offset);
+ commit_tid = allocate_commit_tid(journaler, object_num, tag_tid, entry_tid);
+
+ future = create_future(tag_tid, entry_tid, commit_tid);
+
+ entry = create_entry(tag_tid, entry_tid, data, data_len);
+ spin_unlock(&journaler->meta_lock);
+
+ ret = ceph_journaler_object_append(journaler, object_num, future, entry);
+ if (ret)
+ goto out;
+
+ ret = add_commit_entry(journaler, commit_tid, object_num, tag_tid, entry_tid);
+ if (ret)
+ goto out;
+
+ *journal_future = future;
+out:
+ return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_append);
+
+static int add_object_position(struct commit_entry *entry, struct list_head *object_positions, uint64_t splay_width)
+{
+ struct ceph_journaler_object_pos *position = NULL;
+ uint8_t splay_offset = entry->object_num % splay_width;
+ bool found = false;
+ int ret = 0;
+
+ list_for_each_entry(position, object_positions, node) {
+ if (splay_offset == position->object_num % splay_width) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ position = kzalloc(sizeof(*position), GFP_KERNEL);
+
+ if (!position) {
+ pr_err("failed to allocate position");
+ return -ENOMEM;
+ }
+ list_add(&position->node, object_positions);
+ } else {
+ list_move(&position->node, object_positions);
+ }
+
+ position->object_num = entry->object_num;
+ position->tag_tid = entry->tag_tid;
+ position->entry_tid = entry->entry_tid;
+
+ return ret;
+}
+
+void ceph_journaler_client_committed(struct ceph_journaler *journaler, uint64_t commit_tid)
+{
+ struct commit_entry *entry = NULL;
+ bool update_client_commit = true;
+ struct list_head object_positions;
+ struct rb_node *n;
+
+ INIT_LIST_HEAD(&object_positions);
+ spin_lock(&journaler->commit_lock);
+ for (n = rb_first(&journaler->commit_entries); n; n = rb_next(n)) {
+ entry = rb_entry(n, struct commit_entry, r_node);
+
+ if (entry->commit_tid == commit_tid) {
+ entry->committed = true;
+ break;
+ }
+
+ if (entry->committed == false) {
+ update_client_commit = false;
+ }
+ }
+
+ if (update_client_commit) {
+ for (n = rb_first(&journaler->commit_entries); n;) {
+ entry = rb_entry(n, struct commit_entry, r_node);
+ n = rb_next(n);
+
+ if (entry->commit_tid > commit_tid)
+ break;
+ add_object_position(entry, &journaler->object_positions_pending, journaler->splay_width);
+ erase_commit_entry(&journaler->commit_entries, entry);
+ kfree(entry);
+ }
+ }
+ spin_unlock(&journaler->commit_lock);
+
+ if (update_client_commit) {
+ queue_work(journaler->task_wq, &journaler->commit_work);
+ }
+}
+EXPORT_SYMBOL(ceph_journaler_client_committed);
+
+int ceph_journaler_allocate_tag(struct ceph_journaler *journaler, uint64_t tag_class, void *buf, uint32_t buf_len, struct ceph_journaler_tag *tag)
+{
+ uint64_t tag_tid = 0;
+ int ret = 0;
+
+ ret = ceph_cls_journaler_get_next_tag_tid(journaler->osdc,
+ &journaler->header_oid,
+ &journaler->header_oloc,
+ &tag_tid);
+ if (ret)
+ goto out;
+
+ ret = ceph_cls_journaler_tag_create(journaler->osdc,
+ &journaler->header_oid,
+ &journaler->header_oloc,
+ tag_tid, tag_class,
+ buf, buf_len);
+ if (ret)
+ goto out;
+
+ ret = ceph_cls_journaler_get_tag(journaler->osdc,
+ &journaler->header_oid,
+ &journaler->header_oloc,
+ tag_tid, tag);
+ if (ret)
+ goto out;
+
+out:
+ return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_allocate_tag);
+
+int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *client_id, uint32_t id_len, struct ceph_journaler_client *client_result)
+{
+ struct ceph_journaler_client *client = list_first_entry(&journaler->clients, struct ceph_journaler_client, node);
+
+ int ret = -ENOENT;
+
+ list_for_each_entry(client, &journaler->clients, node) {
+ if (!memcmp(client->id, client_id, sizeof(*client_id))) {
+ client_result = client;
+ ret = 0;
+ break;
+ }
+ }
+
+ return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_get_cached_client);
This is the generic journaling in ceph kernel client. There are three works this module has to do: (1) journal recording: generic journaling module provide an api named as ceph_journaler_append(). this function is used to append event entries to journaling. (2) journal replaying: When we are going to make sure the data and journal are consistent in opening journal, we can call the api of start_replay() to replay the all events recorded but not committed. (3) journal trimming: This is transparent to upper layer, when some old journal entries is not needed, we need to trim the releated objects Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn> --- include/linux/ceph/journaler.h | 131 +++++ net/ceph/Makefile | 3 +- net/ceph/journaler.c | 1208 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1341 insertions(+), 1 deletion(-) create mode 100644 include/linux/ceph/journaler.h create mode 100644 net/ceph/journaler.c